1
16
17
18
19
20 package ristretto
21
22 import (
23 "bytes"
24 "errors"
25 "fmt"
26 "sync/atomic"
27 "time"
28
29 "github.com/dgraph-io/ristretto/z"
30 )
31
32 var (
33
34 setBufSize = 32 * 1024
35 )
36
37 type onEvictFunc func(uint64, uint64, interface{}, int64)
38
39
40
41
42 type Cache struct {
43
44 store store
45
46 policy policy
47
48
49 getBuf *ringBuffer
50
51
52 setBuf chan *item
53
54 onEvict onEvictFunc
55
56
57
58 keyToHash func(interface{}) (uint64, uint64)
59
60 stop chan struct{}
61
62 cost func(value interface{}) int64
63
64 cleanupTicker *time.Ticker
65
66
67 Metrics *Metrics
68 }
69
70
71 type Config struct {
72
73
74
75
76
77
78
79
80 NumCounters int64
81
82
83
84
85
86
87
88
89 MaxCost int64
90
91
92
93
94 BufferItems int64
95
96
97
98
99 Metrics bool
100
101
102 OnEvict func(key, conflict uint64, value interface{}, cost int64)
103
104
105
106 KeyToHash func(key interface{}) (uint64, uint64)
107
108
109
110 Cost func(value interface{}) int64
111 }
112
113 type itemFlag byte
114
115 const (
116 itemNew itemFlag = iota
117 itemDelete
118 itemUpdate
119 )
120
121
122 type item struct {
123 flag itemFlag
124 key uint64
125 conflict uint64
126 value interface{}
127 cost int64
128 expiration time.Time
129 }
130
131
132 func NewCache(config *Config) (*Cache, error) {
133 switch {
134 case config.NumCounters == 0:
135 return nil, errors.New("NumCounters can't be zero")
136 case config.MaxCost == 0:
137 return nil, errors.New("MaxCost can't be zero")
138 case config.BufferItems == 0:
139 return nil, errors.New("BufferItems can't be zero")
140 }
141 policy := newPolicy(config.NumCounters, config.MaxCost)
142 cache := &Cache{
143 store: newStore(),
144 policy: policy,
145 getBuf: newRingBuffer(policy, config.BufferItems),
146 setBuf: make(chan *item, setBufSize),
147 onEvict: config.OnEvict,
148 keyToHash: config.KeyToHash,
149 stop: make(chan struct{}),
150 cost: config.Cost,
151 cleanupTicker: time.NewTicker(time.Duration(bucketDurationSecs) * time.Second / 2),
152 }
153 if cache.keyToHash == nil {
154 cache.keyToHash = z.KeyToHash
155 }
156 if config.Metrics {
157 cache.collectMetrics()
158 }
159
160
161
162 go cache.processItems()
163 return cache, nil
164 }
165
166
167
168
169 func (c *Cache) Get(key interface{}) (interface{}, bool) {
170 if c == nil || key == nil {
171 return nil, false
172 }
173 keyHash, conflictHash := c.keyToHash(key)
174 c.getBuf.Push(keyHash)
175 value, ok := c.store.Get(keyHash, conflictHash)
176 if ok {
177 c.Metrics.add(hit, keyHash, 1)
178 } else {
179 c.Metrics.add(miss, keyHash, 1)
180 }
181 return value, ok
182 }
183
184
185
186
187
188
189
190
191
192
193 func (c *Cache) Set(key, value interface{}, cost int64) bool {
194 return c.SetWithTTL(key, value, cost, 0*time.Second)
195 }
196
197
198
199
200
201 func (c *Cache) SetWithTTL(key, value interface{}, cost int64, ttl time.Duration) bool {
202 if c == nil || key == nil {
203 return false
204 }
205
206 var expiration time.Time
207 switch {
208 case ttl == 0:
209
210 break
211 case ttl < 0:
212
213 return false
214 default:
215 expiration = time.Now().Add(ttl)
216 }
217
218 keyHash, conflictHash := c.keyToHash(key)
219 i := &item{
220 flag: itemNew,
221 key: keyHash,
222 conflict: conflictHash,
223 value: value,
224 cost: cost,
225 expiration: expiration,
226 }
227
228
229 if c.store.Update(i) {
230 i.flag = itemUpdate
231 }
232
233 select {
234 case c.setBuf <- i:
235 return true
236 default:
237 if i.flag == itemUpdate {
238
239
240
241 return true
242 }
243 c.Metrics.add(dropSets, keyHash, 1)
244 return false
245 }
246 }
247
248
249 func (c *Cache) Del(key interface{}) {
250 if c == nil || key == nil {
251 return
252 }
253 keyHash, conflictHash := c.keyToHash(key)
254
255 c.store.Del(keyHash, conflictHash)
256
257
258
259
260 c.setBuf <- &item{
261 flag: itemDelete,
262 key: keyHash,
263 conflict: conflictHash,
264 }
265 }
266
267
268 func (c *Cache) Close() {
269 if c == nil || c.stop == nil {
270 return
271 }
272
273 c.stop <- struct{}{}
274 close(c.stop)
275 c.stop = nil
276 close(c.setBuf)
277 c.policy.Close()
278 }
279
280
281
282
283 func (c *Cache) Clear() {
284 if c == nil {
285 return
286 }
287
288 c.stop <- struct{}{}
289
290
291 loop:
292 for {
293 select {
294 case <-c.setBuf:
295 default:
296 break loop
297 }
298 }
299
300
301 c.policy.Clear()
302 c.store.Clear()
303
304 if c.Metrics != nil {
305 c.Metrics.Clear()
306 }
307
308 go c.processItems()
309 }
310
311
312 func (c *Cache) processItems() {
313 for {
314 select {
315 case i := <-c.setBuf:
316
317 if i.cost == 0 && c.cost != nil && i.flag != itemDelete {
318 i.cost = c.cost(i.value)
319 }
320 switch i.flag {
321 case itemNew:
322 victims, added := c.policy.Add(i.key, i.cost)
323 if added {
324 c.store.Set(i)
325 c.Metrics.add(keyAdd, i.key, 1)
326 }
327 for _, victim := range victims {
328 victim.conflict, victim.value = c.store.Del(victim.key, 0)
329 if c.onEvict != nil {
330 c.onEvict(victim.key, victim.conflict, victim.value, victim.cost)
331 }
332 }
333
334 case itemUpdate:
335 c.policy.Update(i.key, i.cost)
336
337 case itemDelete:
338 c.policy.Del(i.key)
339 c.store.Del(i.key, i.conflict)
340 }
341 case <-c.cleanupTicker.C:
342 c.store.Cleanup(c.policy, c.onEvict)
343 case <-c.stop:
344 return
345 }
346 }
347 }
348
349
350
351 func (c *Cache) collectMetrics() {
352 c.Metrics = newMetrics()
353 c.policy.CollectMetrics(c.Metrics)
354 }
355
356 type metricType int
357
358 const (
359
360 hit = iota
361 miss
362
363 keyAdd
364 keyUpdate
365 keyEvict
366
367 costAdd
368 costEvict
369
370 dropSets
371 rejectSets
372
373
374 dropGets
375 keepGets
376
377 doNotUse
378 )
379
380 func stringFor(t metricType) string {
381 switch t {
382 case hit:
383 return "hit"
384 case miss:
385 return "miss"
386 case keyAdd:
387 return "keys-added"
388 case keyUpdate:
389 return "keys-updated"
390 case keyEvict:
391 return "keys-evicted"
392 case costAdd:
393 return "cost-added"
394 case costEvict:
395 return "cost-evicted"
396 case dropSets:
397 return "sets-dropped"
398 case rejectSets:
399 return "sets-rejected"
400 case dropGets:
401 return "gets-dropped"
402 case keepGets:
403 return "gets-kept"
404 default:
405 return "unidentified"
406 }
407 }
408
409
410 type Metrics struct {
411 all [doNotUse][]*uint64
412 }
413
414 func newMetrics() *Metrics {
415 s := &Metrics{}
416 for i := 0; i < doNotUse; i++ {
417 s.all[i] = make([]*uint64, 256)
418 slice := s.all[i]
419 for j := range slice {
420 slice[j] = new(uint64)
421 }
422 }
423 return s
424 }
425
426 func (p *Metrics) add(t metricType, hash, delta uint64) {
427 if p == nil {
428 return
429 }
430 valp := p.all[t]
431
432
433 idx := (hash % 25) * 10
434 atomic.AddUint64(valp[idx], delta)
435 }
436
437 func (p *Metrics) get(t metricType) uint64 {
438 if p == nil {
439 return 0
440 }
441 valp := p.all[t]
442 var total uint64
443 for i := range valp {
444 total += atomic.LoadUint64(valp[i])
445 }
446 return total
447 }
448
449
450 func (p *Metrics) Hits() uint64 {
451 return p.get(hit)
452 }
453
454
455 func (p *Metrics) Misses() uint64 {
456 return p.get(miss)
457 }
458
459
460 func (p *Metrics) KeysAdded() uint64 {
461 return p.get(keyAdd)
462 }
463
464
465 func (p *Metrics) KeysUpdated() uint64 {
466 return p.get(keyUpdate)
467 }
468
469
470 func (p *Metrics) KeysEvicted() uint64 {
471 return p.get(keyEvict)
472 }
473
474
475 func (p *Metrics) CostAdded() uint64 {
476 return p.get(costAdd)
477 }
478
479
480 func (p *Metrics) CostEvicted() uint64 {
481 return p.get(costEvict)
482 }
483
484
485
486 func (p *Metrics) SetsDropped() uint64 {
487 return p.get(dropSets)
488 }
489
490
491 func (p *Metrics) SetsRejected() uint64 {
492 return p.get(rejectSets)
493 }
494
495
496
497 func (p *Metrics) GetsDropped() uint64 {
498 return p.get(dropGets)
499 }
500
501
502 func (p *Metrics) GetsKept() uint64 {
503 return p.get(keepGets)
504 }
505
506
507
508 func (p *Metrics) Ratio() float64 {
509 if p == nil {
510 return 0.0
511 }
512 hits, misses := p.get(hit), p.get(miss)
513 if hits == 0 && misses == 0 {
514 return 0.0
515 }
516 return float64(hits) / float64(hits+misses)
517 }
518
519
520 func (p *Metrics) Clear() {
521 if p == nil {
522 return
523 }
524 for i := 0; i < doNotUse; i++ {
525 for j := range p.all[i] {
526 atomic.StoreUint64(p.all[i][j], 0)
527 }
528 }
529 }
530
531
532 func (p *Metrics) String() string {
533 if p == nil {
534 return ""
535 }
536 var buf bytes.Buffer
537 for i := 0; i < doNotUse; i++ {
538 t := metricType(i)
539 fmt.Fprintf(&buf, "%s: %d ", stringFor(t), p.get(t))
540 }
541 fmt.Fprintf(&buf, "gets-total: %d ", p.get(hit)+p.get(miss))
542 fmt.Fprintf(&buf, "hit-ratio: %.2f", p.Ratio())
543 return buf.String()
544 }
545
View as plain text