...

Source file src/github.com/dgraph-io/ristretto/policy.go

Documentation: github.com/dgraph-io/ristretto

     1  /*
     2   * Copyright 2020 Dgraph Labs, Inc. and Contributors
     3   *
     4   * Licensed under the Apache License, Version 2.0 (the "License");
     5   * you may not use this file except in compliance with the License.
     6   * You may obtain a copy of the License at
     7   *
     8   *     http://www.apache.org/licenses/LICENSE-2.0
     9   *
    10   * Unless required by applicable law or agreed to in writing, software
    11   * distributed under the License is distributed on an "AS IS" BASIS,
    12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13   * See the License for the specific language governing permissions and
    14   * limitations under the License.
    15   */
    16  
    17  package ristretto
    18  
    19  import (
    20  	"math"
    21  	"sync"
    22  
    23  	"github.com/dgraph-io/ristretto/z"
    24  )
    25  
    26  const (
    27  	// lfuSample is the number of items to sample when looking at eviction
    28  	// candidates. 5 seems to be the most optimal number [citation needed].
    29  	lfuSample = 5
    30  )
    31  
    32  // policy is the interface encapsulating eviction/admission behavior.
    33  //
    34  // TODO: remove this interface and just rename defaultPolicy to policy, as we
    35  //       are probably only going to use/implement/maintain one policy.
    36  type policy interface {
    37  	ringConsumer
    38  	// Add attempts to Add the key-cost pair to the Policy. It returns a slice
    39  	// of evicted keys and a bool denoting whether or not the key-cost pair
    40  	// was added. If it returns true, the key should be stored in cache.
    41  	Add(uint64, int64) ([]*item, bool)
    42  	// Has returns true if the key exists in the Policy.
    43  	Has(uint64) bool
    44  	// Del deletes the key from the Policy.
    45  	Del(uint64)
    46  	// Cap returns the available capacity.
    47  	Cap() int64
    48  	// Close stops all goroutines and closes all channels.
    49  	Close()
    50  	// Update updates the cost value for the key.
    51  	Update(uint64, int64)
    52  	// Cost returns the cost value of a key or -1 if missing.
    53  	Cost(uint64) int64
    54  	// Optionally, set stats object to track how policy is performing.
    55  	CollectMetrics(*Metrics)
    56  	// Clear zeroes out all counters and clears hashmaps.
    57  	Clear()
    58  }
    59  
    60  func newPolicy(numCounters, maxCost int64) policy {
    61  	return newDefaultPolicy(numCounters, maxCost)
    62  }
    63  
    64  type defaultPolicy struct {
    65  	sync.Mutex
    66  	admit   *tinyLFU
    67  	evict   *sampledLFU
    68  	itemsCh chan []uint64
    69  	stop    chan struct{}
    70  	metrics *Metrics
    71  }
    72  
    73  func newDefaultPolicy(numCounters, maxCost int64) *defaultPolicy {
    74  	p := &defaultPolicy{
    75  		admit:   newTinyLFU(numCounters),
    76  		evict:   newSampledLFU(maxCost),
    77  		itemsCh: make(chan []uint64, 3),
    78  		stop:    make(chan struct{}),
    79  	}
    80  	go p.processItems()
    81  	return p
    82  }
    83  
    84  func (p *defaultPolicy) CollectMetrics(metrics *Metrics) {
    85  	p.metrics = metrics
    86  	p.evict.metrics = metrics
    87  }
    88  
    89  type policyPair struct {
    90  	key  uint64
    91  	cost int64
    92  }
    93  
    94  func (p *defaultPolicy) processItems() {
    95  	for {
    96  		select {
    97  		case items := <-p.itemsCh:
    98  			p.Lock()
    99  			p.admit.Push(items)
   100  			p.Unlock()
   101  		case <-p.stop:
   102  			return
   103  		}
   104  	}
   105  }
   106  
   107  func (p *defaultPolicy) Push(keys []uint64) bool {
   108  	if len(keys) == 0 {
   109  		return true
   110  	}
   111  	select {
   112  	case p.itemsCh <- keys:
   113  		p.metrics.add(keepGets, keys[0], uint64(len(keys)))
   114  		return true
   115  	default:
   116  		p.metrics.add(dropGets, keys[0], uint64(len(keys)))
   117  		return false
   118  	}
   119  }
   120  
   121  // Add decides whether the item with the given key and cost should be accepted by
   122  // the policy. It returns the list of victims that have been evicted and a boolean
   123  // indicating whether the incoming item should be accepted.
   124  func (p *defaultPolicy) Add(key uint64, cost int64) ([]*item, bool) {
   125  	p.Lock()
   126  	defer p.Unlock()
   127  
   128  	// Cannot add an item bigger than entire cache.
   129  	if cost > p.evict.maxCost {
   130  		return nil, false
   131  	}
   132  
   133  	// No need to go any further if the item is already in the cache.
   134  	if has := p.evict.updateIfHas(key, cost); has {
   135  		// An update does not count as an addition, so return false.
   136  		return nil, false
   137  	}
   138  
   139  	// If the execution reaches this point, the key doesn't exist in the cache.
   140  	// Calculate the remaining room in the cache (usually bytes).
   141  	room := p.evict.roomLeft(cost)
   142  	if room >= 0 {
   143  		// There's enough room in the cache to store the new item without
   144  		// overflowing. Do that now and stop here.
   145  		p.evict.add(key, cost)
   146  		p.metrics.add(costAdd, key, uint64(cost))
   147  		return nil, true
   148  	}
   149  
   150  	// incHits is the hit count for the incoming item.
   151  	incHits := p.admit.Estimate(key)
   152  	// sample is the eviction candidate pool to be filled via random sampling.
   153  	// TODO: perhaps we should use a min heap here. Right now our time
   154  	// complexity is N for finding the min. Min heap should bring it down to
   155  	// O(lg N).
   156  	sample := make([]*policyPair, 0, lfuSample)
   157  	// As items are evicted they will be appended to victims.
   158  	victims := make([]*item, 0)
   159  
   160  	// Delete victims until there's enough space or a minKey is found that has
   161  	// more hits than incoming item.
   162  	for ; room < 0; room = p.evict.roomLeft(cost) {
   163  		// Fill up empty slots in sample.
   164  		sample = p.evict.fillSample(sample)
   165  
   166  		// Find minimally used item in sample.
   167  		minKey, minHits, minId, minCost := uint64(0), int64(math.MaxInt64), 0, int64(0)
   168  		for i, pair := range sample {
   169  			// Look up hit count for sample key.
   170  			if hits := p.admit.Estimate(pair.key); hits < minHits {
   171  				minKey, minHits, minId, minCost = pair.key, hits, i, pair.cost
   172  			}
   173  		}
   174  
   175  		// If the incoming item isn't worth keeping in the policy, reject.
   176  		if incHits < minHits {
   177  			p.metrics.add(rejectSets, key, 1)
   178  			return victims, false
   179  		}
   180  
   181  		// Delete the victim from metadata.
   182  		p.evict.del(minKey)
   183  
   184  		// Delete the victim from sample.
   185  		sample[minId] = sample[len(sample)-1]
   186  		sample = sample[:len(sample)-1]
   187  		// Store victim in evicted victims slice.
   188  		victims = append(victims, &item{
   189  			key:      minKey,
   190  			conflict: 0,
   191  			cost:     minCost,
   192  		})
   193  	}
   194  
   195  	p.evict.add(key, cost)
   196  	p.metrics.add(costAdd, key, uint64(cost))
   197  	return victims, true
   198  }
   199  
   200  func (p *defaultPolicy) Has(key uint64) bool {
   201  	p.Lock()
   202  	_, exists := p.evict.keyCosts[key]
   203  	p.Unlock()
   204  	return exists
   205  }
   206  
   207  func (p *defaultPolicy) Del(key uint64) {
   208  	p.Lock()
   209  	p.evict.del(key)
   210  	p.Unlock()
   211  }
   212  
   213  func (p *defaultPolicy) Cap() int64 {
   214  	p.Lock()
   215  	capacity := int64(p.evict.maxCost - p.evict.used)
   216  	p.Unlock()
   217  	return capacity
   218  }
   219  
   220  func (p *defaultPolicy) Update(key uint64, cost int64) {
   221  	p.Lock()
   222  	p.evict.updateIfHas(key, cost)
   223  	p.Unlock()
   224  }
   225  
   226  func (p *defaultPolicy) Cost(key uint64) int64 {
   227  	p.Lock()
   228  	if cost, found := p.evict.keyCosts[key]; found {
   229  		p.Unlock()
   230  		return cost
   231  	}
   232  	p.Unlock()
   233  	return -1
   234  }
   235  
   236  func (p *defaultPolicy) Clear() {
   237  	p.Lock()
   238  	p.admit.clear()
   239  	p.evict.clear()
   240  	p.Unlock()
   241  }
   242  
   243  func (p *defaultPolicy) Close() {
   244  	// Block until the p.processItems goroutine returns.
   245  	p.stop <- struct{}{}
   246  	close(p.stop)
   247  	close(p.itemsCh)
   248  }
   249  
   250  // sampledLFU is an eviction helper storing key-cost pairs.
   251  type sampledLFU struct {
   252  	keyCosts map[uint64]int64
   253  	maxCost  int64
   254  	used     int64
   255  	metrics  *Metrics
   256  }
   257  
   258  func newSampledLFU(maxCost int64) *sampledLFU {
   259  	return &sampledLFU{
   260  		keyCosts: make(map[uint64]int64),
   261  		maxCost:  maxCost,
   262  	}
   263  }
   264  
   265  func (p *sampledLFU) roomLeft(cost int64) int64 {
   266  	return p.maxCost - (p.used + cost)
   267  }
   268  
   269  func (p *sampledLFU) fillSample(in []*policyPair) []*policyPair {
   270  	if len(in) >= lfuSample {
   271  		return in
   272  	}
   273  	for key, cost := range p.keyCosts {
   274  		in = append(in, &policyPair{key, cost})
   275  		if len(in) >= lfuSample {
   276  			return in
   277  		}
   278  	}
   279  	return in
   280  }
   281  
   282  func (p *sampledLFU) del(key uint64) {
   283  	cost, ok := p.keyCosts[key]
   284  	if !ok {
   285  		return
   286  	}
   287  	p.used -= cost
   288  	delete(p.keyCosts, key)
   289  	p.metrics.add(costEvict, key, uint64(cost))
   290  	p.metrics.add(keyEvict, key, 1)
   291  }
   292  
   293  func (p *sampledLFU) add(key uint64, cost int64) {
   294  	p.keyCosts[key] = cost
   295  	p.used += cost
   296  }
   297  
   298  func (p *sampledLFU) updateIfHas(key uint64, cost int64) bool {
   299  	if prev, found := p.keyCosts[key]; found {
   300  		// Update the cost of an existing key, but don't worry about evicting.
   301  		// Evictions will be handled the next time a new item is added.
   302  		p.metrics.add(keyUpdate, key, 1)
   303  		if prev > cost {
   304  			diff := prev - cost
   305  			p.metrics.add(costAdd, key, ^uint64(uint64(diff)-1))
   306  		} else if cost > prev {
   307  			diff := cost - prev
   308  			p.metrics.add(costAdd, key, uint64(diff))
   309  		}
   310  		p.used += cost - prev
   311  		p.keyCosts[key] = cost
   312  		return true
   313  	}
   314  	return false
   315  }
   316  
   317  func (p *sampledLFU) clear() {
   318  	p.used = 0
   319  	p.keyCosts = make(map[uint64]int64)
   320  }
   321  
   322  // tinyLFU is an admission helper that keeps track of access frequency using
   323  // tiny (4-bit) counters in the form of a count-min sketch.
   324  // tinyLFU is NOT thread safe.
   325  type tinyLFU struct {
   326  	freq    *cmSketch
   327  	door    *z.Bloom
   328  	incrs   int64
   329  	resetAt int64
   330  }
   331  
   332  func newTinyLFU(numCounters int64) *tinyLFU {
   333  	return &tinyLFU{
   334  		freq:    newCmSketch(numCounters),
   335  		door:    z.NewBloomFilter(float64(numCounters), 0.01),
   336  		resetAt: numCounters,
   337  	}
   338  }
   339  
   340  func (p *tinyLFU) Push(keys []uint64) {
   341  	for _, key := range keys {
   342  		p.Increment(key)
   343  	}
   344  }
   345  
   346  func (p *tinyLFU) Estimate(key uint64) int64 {
   347  	hits := p.freq.Estimate(key)
   348  	if p.door.Has(key) {
   349  		hits++
   350  	}
   351  	return hits
   352  }
   353  
   354  func (p *tinyLFU) Increment(key uint64) {
   355  	// Flip doorkeeper bit if not already done.
   356  	if added := p.door.AddIfNotHas(key); !added {
   357  		// Increment count-min counter if doorkeeper bit is already set.
   358  		p.freq.Increment(key)
   359  	}
   360  	p.incrs++
   361  	if p.incrs >= p.resetAt {
   362  		p.reset()
   363  	}
   364  }
   365  
   366  func (p *tinyLFU) reset() {
   367  	// Zero out incrs.
   368  	p.incrs = 0
   369  	// clears doorkeeper bits
   370  	p.door.Clear()
   371  	// halves count-min counters
   372  	p.freq.Reset()
   373  }
   374  
   375  func (p *tinyLFU) clear() {
   376  	p.incrs = 0
   377  	p.door.Clear()
   378  	p.freq.Clear()
   379  }
   380  

View as plain text