1
16
17 package ristretto
18
19 import (
20 "math"
21 "sync"
22
23 "github.com/dgraph-io/ristretto/z"
24 )
25
26 const (
27
28
29 lfuSample = 5
30 )
31
32
33
34
35
36 type policy interface {
37 ringConsumer
38
39
40
41 Add(uint64, int64) ([]*item, bool)
42
43 Has(uint64) bool
44
45 Del(uint64)
46
47 Cap() int64
48
49 Close()
50
51 Update(uint64, int64)
52
53 Cost(uint64) int64
54
55 CollectMetrics(*Metrics)
56
57 Clear()
58 }
59
60 func newPolicy(numCounters, maxCost int64) policy {
61 return newDefaultPolicy(numCounters, maxCost)
62 }
63
64 type defaultPolicy struct {
65 sync.Mutex
66 admit *tinyLFU
67 evict *sampledLFU
68 itemsCh chan []uint64
69 stop chan struct{}
70 metrics *Metrics
71 }
72
73 func newDefaultPolicy(numCounters, maxCost int64) *defaultPolicy {
74 p := &defaultPolicy{
75 admit: newTinyLFU(numCounters),
76 evict: newSampledLFU(maxCost),
77 itemsCh: make(chan []uint64, 3),
78 stop: make(chan struct{}),
79 }
80 go p.processItems()
81 return p
82 }
83
84 func (p *defaultPolicy) CollectMetrics(metrics *Metrics) {
85 p.metrics = metrics
86 p.evict.metrics = metrics
87 }
88
89 type policyPair struct {
90 key uint64
91 cost int64
92 }
93
94 func (p *defaultPolicy) processItems() {
95 for {
96 select {
97 case items := <-p.itemsCh:
98 p.Lock()
99 p.admit.Push(items)
100 p.Unlock()
101 case <-p.stop:
102 return
103 }
104 }
105 }
106
107 func (p *defaultPolicy) Push(keys []uint64) bool {
108 if len(keys) == 0 {
109 return true
110 }
111 select {
112 case p.itemsCh <- keys:
113 p.metrics.add(keepGets, keys[0], uint64(len(keys)))
114 return true
115 default:
116 p.metrics.add(dropGets, keys[0], uint64(len(keys)))
117 return false
118 }
119 }
120
121
122
123
124 func (p *defaultPolicy) Add(key uint64, cost int64) ([]*item, bool) {
125 p.Lock()
126 defer p.Unlock()
127
128
129 if cost > p.evict.maxCost {
130 return nil, false
131 }
132
133
134 if has := p.evict.updateIfHas(key, cost); has {
135
136 return nil, false
137 }
138
139
140
141 room := p.evict.roomLeft(cost)
142 if room >= 0 {
143
144
145 p.evict.add(key, cost)
146 p.metrics.add(costAdd, key, uint64(cost))
147 return nil, true
148 }
149
150
151 incHits := p.admit.Estimate(key)
152
153
154
155
156 sample := make([]*policyPair, 0, lfuSample)
157
158 victims := make([]*item, 0)
159
160
161
162 for ; room < 0; room = p.evict.roomLeft(cost) {
163
164 sample = p.evict.fillSample(sample)
165
166
167 minKey, minHits, minId, minCost := uint64(0), int64(math.MaxInt64), 0, int64(0)
168 for i, pair := range sample {
169
170 if hits := p.admit.Estimate(pair.key); hits < minHits {
171 minKey, minHits, minId, minCost = pair.key, hits, i, pair.cost
172 }
173 }
174
175
176 if incHits < minHits {
177 p.metrics.add(rejectSets, key, 1)
178 return victims, false
179 }
180
181
182 p.evict.del(minKey)
183
184
185 sample[minId] = sample[len(sample)-1]
186 sample = sample[:len(sample)-1]
187
188 victims = append(victims, &item{
189 key: minKey,
190 conflict: 0,
191 cost: minCost,
192 })
193 }
194
195 p.evict.add(key, cost)
196 p.metrics.add(costAdd, key, uint64(cost))
197 return victims, true
198 }
199
200 func (p *defaultPolicy) Has(key uint64) bool {
201 p.Lock()
202 _, exists := p.evict.keyCosts[key]
203 p.Unlock()
204 return exists
205 }
206
207 func (p *defaultPolicy) Del(key uint64) {
208 p.Lock()
209 p.evict.del(key)
210 p.Unlock()
211 }
212
213 func (p *defaultPolicy) Cap() int64 {
214 p.Lock()
215 capacity := int64(p.evict.maxCost - p.evict.used)
216 p.Unlock()
217 return capacity
218 }
219
220 func (p *defaultPolicy) Update(key uint64, cost int64) {
221 p.Lock()
222 p.evict.updateIfHas(key, cost)
223 p.Unlock()
224 }
225
226 func (p *defaultPolicy) Cost(key uint64) int64 {
227 p.Lock()
228 if cost, found := p.evict.keyCosts[key]; found {
229 p.Unlock()
230 return cost
231 }
232 p.Unlock()
233 return -1
234 }
235
236 func (p *defaultPolicy) Clear() {
237 p.Lock()
238 p.admit.clear()
239 p.evict.clear()
240 p.Unlock()
241 }
242
243 func (p *defaultPolicy) Close() {
244
245 p.stop <- struct{}{}
246 close(p.stop)
247 close(p.itemsCh)
248 }
249
250
251 type sampledLFU struct {
252 keyCosts map[uint64]int64
253 maxCost int64
254 used int64
255 metrics *Metrics
256 }
257
258 func newSampledLFU(maxCost int64) *sampledLFU {
259 return &sampledLFU{
260 keyCosts: make(map[uint64]int64),
261 maxCost: maxCost,
262 }
263 }
264
265 func (p *sampledLFU) roomLeft(cost int64) int64 {
266 return p.maxCost - (p.used + cost)
267 }
268
269 func (p *sampledLFU) fillSample(in []*policyPair) []*policyPair {
270 if len(in) >= lfuSample {
271 return in
272 }
273 for key, cost := range p.keyCosts {
274 in = append(in, &policyPair{key, cost})
275 if len(in) >= lfuSample {
276 return in
277 }
278 }
279 return in
280 }
281
282 func (p *sampledLFU) del(key uint64) {
283 cost, ok := p.keyCosts[key]
284 if !ok {
285 return
286 }
287 p.used -= cost
288 delete(p.keyCosts, key)
289 p.metrics.add(costEvict, key, uint64(cost))
290 p.metrics.add(keyEvict, key, 1)
291 }
292
293 func (p *sampledLFU) add(key uint64, cost int64) {
294 p.keyCosts[key] = cost
295 p.used += cost
296 }
297
298 func (p *sampledLFU) updateIfHas(key uint64, cost int64) bool {
299 if prev, found := p.keyCosts[key]; found {
300
301
302 p.metrics.add(keyUpdate, key, 1)
303 if prev > cost {
304 diff := prev - cost
305 p.metrics.add(costAdd, key, ^uint64(uint64(diff)-1))
306 } else if cost > prev {
307 diff := cost - prev
308 p.metrics.add(costAdd, key, uint64(diff))
309 }
310 p.used += cost - prev
311 p.keyCosts[key] = cost
312 return true
313 }
314 return false
315 }
316
317 func (p *sampledLFU) clear() {
318 p.used = 0
319 p.keyCosts = make(map[uint64]int64)
320 }
321
322
323
324
325 type tinyLFU struct {
326 freq *cmSketch
327 door *z.Bloom
328 incrs int64
329 resetAt int64
330 }
331
332 func newTinyLFU(numCounters int64) *tinyLFU {
333 return &tinyLFU{
334 freq: newCmSketch(numCounters),
335 door: z.NewBloomFilter(float64(numCounters), 0.01),
336 resetAt: numCounters,
337 }
338 }
339
340 func (p *tinyLFU) Push(keys []uint64) {
341 for _, key := range keys {
342 p.Increment(key)
343 }
344 }
345
346 func (p *tinyLFU) Estimate(key uint64) int64 {
347 hits := p.freq.Estimate(key)
348 if p.door.Has(key) {
349 hits++
350 }
351 return hits
352 }
353
354 func (p *tinyLFU) Increment(key uint64) {
355
356 if added := p.door.AddIfNotHas(key); !added {
357
358 p.freq.Increment(key)
359 }
360 p.incrs++
361 if p.incrs >= p.resetAt {
362 p.reset()
363 }
364 }
365
366 func (p *tinyLFU) reset() {
367
368 p.incrs = 0
369
370 p.door.Clear()
371
372 p.freq.Reset()
373 }
374
375 func (p *tinyLFU) clear() {
376 p.incrs = 0
377 p.door.Clear()
378 p.freq.Clear()
379 }
380
View as plain text