...

Source file src/github.com/dgraph-io/ristretto/cache.go

Documentation: github.com/dgraph-io/ristretto

     1  /*
     2   * Copyright 2019 Dgraph Labs, Inc. and Contributors
     3   *
     4   * Licensed under the Apache License, Version 2.0 (the "License");
     5   * you may not use this file except in compliance with the License.
     6   * You may obtain a copy of the License at
     7   *
     8   *     http://www.apache.org/licenses/LICENSE-2.0
     9   *
    10   * Unless required by applicable law or agreed to in writing, software
    11   * distributed under the License is distributed on an "AS IS" BASIS,
    12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13   * See the License for the specific language governing permissions and
    14   * limitations under the License.
    15   */
    16  
    17  // Ristretto is a fast, fixed size, in-memory cache with a dual focus on
    18  // throughput and hit ratio performance. You can easily add Ristretto to an
    19  // existing system and keep the most valuable data where you need it.
    20  package ristretto
    21  
    22  import (
    23  	"bytes"
    24  	"errors"
    25  	"fmt"
    26  	"sync/atomic"
    27  	"time"
    28  
    29  	"github.com/dgraph-io/ristretto/z"
    30  )
    31  
    32  var (
    33  	// TODO: find the optimal value for this or make it configurable
    34  	setBufSize = 32 * 1024
    35  )
    36  
    37  type onEvictFunc func(uint64, uint64, interface{}, int64)
    38  
    39  // Cache is a thread-safe implementation of a hashmap with a TinyLFU admission
    40  // policy and a Sampled LFU eviction policy. You can use the same Cache instance
    41  // from as many goroutines as you want.
    42  type Cache struct {
    43  	// store is the central concurrent hashmap where key-value items are stored.
    44  	store store
    45  	// policy determines what gets let in to the cache and what gets kicked out.
    46  	policy policy
    47  	// getBuf is a custom ring buffer implementation that gets pushed to when
    48  	// keys are read.
    49  	getBuf *ringBuffer
    50  	// setBuf is a buffer allowing us to batch/drop Sets during times of high
    51  	// contention.
    52  	setBuf chan *item
    53  	// onEvict is called for item evictions.
    54  	onEvict onEvictFunc
    55  	// KeyToHash function is used to customize the key hashing algorithm.
    56  	// Each key will be hashed using the provided function. If keyToHash value
    57  	// is not set, the default keyToHash function is used.
    58  	keyToHash func(interface{}) (uint64, uint64)
    59  	// stop is used to stop the processItems goroutine.
    60  	stop chan struct{}
    61  	// cost calculates cost from a value.
    62  	cost func(value interface{}) int64
    63  	// cleanupTicker is used to periodically check for entries whose TTL has passed.
    64  	cleanupTicker *time.Ticker
    65  	// Metrics contains a running log of important statistics like hits, misses,
    66  	// and dropped items.
    67  	Metrics *Metrics
    68  }
    69  
    70  // Config is passed to NewCache for creating new Cache instances.
    71  type Config struct {
    72  	// NumCounters determines the number of counters (keys) to keep that hold
    73  	// access frequency information. It's generally a good idea to have more
    74  	// counters than the max cache capacity, as this will improve eviction
    75  	// accuracy and subsequent hit ratios.
    76  	//
    77  	// For example, if you expect your cache to hold 1,000,000 items when full,
    78  	// NumCounters should be 10,000,000 (10x). Each counter takes up 4 bits, so
    79  	// keeping 10,000,000 counters would require 5MB of memory.
    80  	NumCounters int64
    81  	// MaxCost can be considered as the cache capacity, in whatever units you
    82  	// choose to use.
    83  	//
    84  	// For example, if you want the cache to have a max capacity of 100MB, you
    85  	// would set MaxCost to 100,000,000 and pass an item's number of bytes as
    86  	// the `cost` parameter for calls to Set. If new items are accepted, the
    87  	// eviction process will take care of making room for the new item and not
    88  	// overflowing the MaxCost value.
    89  	MaxCost int64
    90  	// BufferItems determines the size of Get buffers.
    91  	//
    92  	// Unless you have a rare use case, using `64` as the BufferItems value
    93  	// results in good performance.
    94  	BufferItems int64
    95  	// Metrics determines whether cache statistics are kept during the cache's
    96  	// lifetime. There *is* some overhead to keeping statistics, so you should
    97  	// only set this flag to true when testing or throughput performance isn't a
    98  	// major factor.
    99  	Metrics bool
   100  	// OnEvict is called for every eviction and passes the hashed key, value,
   101  	// and cost to the function.
   102  	OnEvict func(key, conflict uint64, value interface{}, cost int64)
   103  	// KeyToHash function is used to customize the key hashing algorithm.
   104  	// Each key will be hashed using the provided function. If keyToHash value
   105  	// is not set, the default keyToHash function is used.
   106  	KeyToHash func(key interface{}) (uint64, uint64)
   107  	// Cost evaluates a value and outputs a corresponding cost. This function
   108  	// is ran after Set is called for a new item or an item update with a cost
   109  	// param of 0.
   110  	Cost func(value interface{}) int64
   111  }
   112  
   113  type itemFlag byte
   114  
   115  const (
   116  	itemNew itemFlag = iota
   117  	itemDelete
   118  	itemUpdate
   119  )
   120  
   121  // item is passed to setBuf so items can eventually be added to the cache.
   122  type item struct {
   123  	flag       itemFlag
   124  	key        uint64
   125  	conflict   uint64
   126  	value      interface{}
   127  	cost       int64
   128  	expiration time.Time
   129  }
   130  
   131  // NewCache returns a new Cache instance and any configuration errors, if any.
   132  func NewCache(config *Config) (*Cache, error) {
   133  	switch {
   134  	case config.NumCounters == 0:
   135  		return nil, errors.New("NumCounters can't be zero")
   136  	case config.MaxCost == 0:
   137  		return nil, errors.New("MaxCost can't be zero")
   138  	case config.BufferItems == 0:
   139  		return nil, errors.New("BufferItems can't be zero")
   140  	}
   141  	policy := newPolicy(config.NumCounters, config.MaxCost)
   142  	cache := &Cache{
   143  		store:         newStore(),
   144  		policy:        policy,
   145  		getBuf:        newRingBuffer(policy, config.BufferItems),
   146  		setBuf:        make(chan *item, setBufSize),
   147  		onEvict:       config.OnEvict,
   148  		keyToHash:     config.KeyToHash,
   149  		stop:          make(chan struct{}),
   150  		cost:          config.Cost,
   151  		cleanupTicker: time.NewTicker(time.Duration(bucketDurationSecs) * time.Second / 2),
   152  	}
   153  	if cache.keyToHash == nil {
   154  		cache.keyToHash = z.KeyToHash
   155  	}
   156  	if config.Metrics {
   157  		cache.collectMetrics()
   158  	}
   159  	// NOTE: benchmarks seem to show that performance decreases the more
   160  	//       goroutines we have running cache.processItems(), so 1 should
   161  	//       usually be sufficient
   162  	go cache.processItems()
   163  	return cache, nil
   164  }
   165  
   166  // Get returns the value (if any) and a boolean representing whether the
   167  // value was found or not. The value can be nil and the boolean can be true at
   168  // the same time.
   169  func (c *Cache) Get(key interface{}) (interface{}, bool) {
   170  	if c == nil || key == nil {
   171  		return nil, false
   172  	}
   173  	keyHash, conflictHash := c.keyToHash(key)
   174  	c.getBuf.Push(keyHash)
   175  	value, ok := c.store.Get(keyHash, conflictHash)
   176  	if ok {
   177  		c.Metrics.add(hit, keyHash, 1)
   178  	} else {
   179  		c.Metrics.add(miss, keyHash, 1)
   180  	}
   181  	return value, ok
   182  }
   183  
   184  // Set attempts to add the key-value item to the cache. If it returns false,
   185  // then the Set was dropped and the key-value item isn't added to the cache. If
   186  // it returns true, there's still a chance it could be dropped by the policy if
   187  // its determined that the key-value item isn't worth keeping, but otherwise the
   188  // item will be added and other items will be evicted in order to make room.
   189  //
   190  // To dynamically evaluate the items cost using the Config.Coster function, set
   191  // the cost parameter to 0 and Coster will be ran when needed in order to find
   192  // the items true cost.
   193  func (c *Cache) Set(key, value interface{}, cost int64) bool {
   194  	return c.SetWithTTL(key, value, cost, 0*time.Second)
   195  }
   196  
   197  // SetWithTTL works like Set but adds a key-value pair to the cache that will expire
   198  // after the specified TTL (time to live) has passed. A zero value means the value never
   199  // expires, which is identical to calling Set. A negative value is a no-op and the value
   200  // is discarded.
   201  func (c *Cache) SetWithTTL(key, value interface{}, cost int64, ttl time.Duration) bool {
   202  	if c == nil || key == nil {
   203  		return false
   204  	}
   205  
   206  	var expiration time.Time
   207  	switch {
   208  	case ttl == 0:
   209  		// No expiration.
   210  		break
   211  	case ttl < 0:
   212  		// Treat this a a no-op.
   213  		return false
   214  	default:
   215  		expiration = time.Now().Add(ttl)
   216  	}
   217  
   218  	keyHash, conflictHash := c.keyToHash(key)
   219  	i := &item{
   220  		flag:       itemNew,
   221  		key:        keyHash,
   222  		conflict:   conflictHash,
   223  		value:      value,
   224  		cost:       cost,
   225  		expiration: expiration,
   226  	}
   227  	// cost is eventually updated. The expiration must also be immediately updated
   228  	// to prevent items from being prematurely removed from the map.
   229  	if c.store.Update(i) {
   230  		i.flag = itemUpdate
   231  	}
   232  	// Attempt to send item to policy.
   233  	select {
   234  	case c.setBuf <- i:
   235  		return true
   236  	default:
   237  		if i.flag == itemUpdate {
   238  			// Return true if this was an update operation since we've already
   239  			// updated the store. For all the other operations (set/delete), we
   240  			// return false which means the item was not inserted.
   241  			return true
   242  		}
   243  		c.Metrics.add(dropSets, keyHash, 1)
   244  		return false
   245  	}
   246  }
   247  
   248  // Del deletes the key-value item from the cache if it exists.
   249  func (c *Cache) Del(key interface{}) {
   250  	if c == nil || key == nil {
   251  		return
   252  	}
   253  	keyHash, conflictHash := c.keyToHash(key)
   254  	// Delete immediately.
   255  	c.store.Del(keyHash, conflictHash)
   256  	// If we've set an item, it would be applied slightly later.
   257  	// So we must push the same item to `setBuf` with the deletion flag.
   258  	// This ensures that if a set is followed by a delete, it will be
   259  	// applied in the correct order.
   260  	c.setBuf <- &item{
   261  		flag:     itemDelete,
   262  		key:      keyHash,
   263  		conflict: conflictHash,
   264  	}
   265  }
   266  
   267  // Close stops all goroutines and closes all channels.
   268  func (c *Cache) Close() {
   269  	if c == nil || c.stop == nil {
   270  		return
   271  	}
   272  	// Block until processItems goroutine is returned.
   273  	c.stop <- struct{}{}
   274  	close(c.stop)
   275  	c.stop = nil
   276  	close(c.setBuf)
   277  	c.policy.Close()
   278  }
   279  
   280  // Clear empties the hashmap and zeroes all policy counters. Note that this is
   281  // not an atomic operation (but that shouldn't be a problem as it's assumed that
   282  // Set/Get calls won't be occurring until after this).
   283  func (c *Cache) Clear() {
   284  	if c == nil {
   285  		return
   286  	}
   287  	// Block until processItems goroutine is returned.
   288  	c.stop <- struct{}{}
   289  
   290  	// Clear out the setBuf channel.
   291  loop:
   292  	for {
   293  		select {
   294  		case <-c.setBuf:
   295  		default:
   296  			break loop
   297  		}
   298  	}
   299  
   300  	// Clear value hashmap and policy data.
   301  	c.policy.Clear()
   302  	c.store.Clear()
   303  	// Only reset metrics if they're enabled.
   304  	if c.Metrics != nil {
   305  		c.Metrics.Clear()
   306  	}
   307  	// Restart processItems goroutine.
   308  	go c.processItems()
   309  }
   310  
   311  // processItems is ran by goroutines processing the Set buffer.
   312  func (c *Cache) processItems() {
   313  	for {
   314  		select {
   315  		case i := <-c.setBuf:
   316  			// Calculate item cost value if new or update.
   317  			if i.cost == 0 && c.cost != nil && i.flag != itemDelete {
   318  				i.cost = c.cost(i.value)
   319  			}
   320  			switch i.flag {
   321  			case itemNew:
   322  				victims, added := c.policy.Add(i.key, i.cost)
   323  				if added {
   324  					c.store.Set(i)
   325  					c.Metrics.add(keyAdd, i.key, 1)
   326  				}
   327  				for _, victim := range victims {
   328  					victim.conflict, victim.value = c.store.Del(victim.key, 0)
   329  					if c.onEvict != nil {
   330  						c.onEvict(victim.key, victim.conflict, victim.value, victim.cost)
   331  					}
   332  				}
   333  
   334  			case itemUpdate:
   335  				c.policy.Update(i.key, i.cost)
   336  
   337  			case itemDelete:
   338  				c.policy.Del(i.key) // Deals with metrics updates.
   339  				c.store.Del(i.key, i.conflict)
   340  			}
   341  		case <-c.cleanupTicker.C:
   342  			c.store.Cleanup(c.policy, c.onEvict)
   343  		case <-c.stop:
   344  			return
   345  		}
   346  	}
   347  }
   348  
   349  // collectMetrics just creates a new *Metrics instance and adds the pointers
   350  // to the cache and policy instances.
   351  func (c *Cache) collectMetrics() {
   352  	c.Metrics = newMetrics()
   353  	c.policy.CollectMetrics(c.Metrics)
   354  }
   355  
   356  type metricType int
   357  
   358  const (
   359  	// The following 2 keep track of hits and misses.
   360  	hit = iota
   361  	miss
   362  	// The following 3 keep track of number of keys added, updated and evicted.
   363  	keyAdd
   364  	keyUpdate
   365  	keyEvict
   366  	// The following 2 keep track of cost of keys added and evicted.
   367  	costAdd
   368  	costEvict
   369  	// The following keep track of how many sets were dropped or rejected later.
   370  	dropSets
   371  	rejectSets
   372  	// The following 2 keep track of how many gets were kept and dropped on the
   373  	// floor.
   374  	dropGets
   375  	keepGets
   376  	// This should be the final enum. Other enums should be set before this.
   377  	doNotUse
   378  )
   379  
   380  func stringFor(t metricType) string {
   381  	switch t {
   382  	case hit:
   383  		return "hit"
   384  	case miss:
   385  		return "miss"
   386  	case keyAdd:
   387  		return "keys-added"
   388  	case keyUpdate:
   389  		return "keys-updated"
   390  	case keyEvict:
   391  		return "keys-evicted"
   392  	case costAdd:
   393  		return "cost-added"
   394  	case costEvict:
   395  		return "cost-evicted"
   396  	case dropSets:
   397  		return "sets-dropped"
   398  	case rejectSets:
   399  		return "sets-rejected" // by policy.
   400  	case dropGets:
   401  		return "gets-dropped"
   402  	case keepGets:
   403  		return "gets-kept"
   404  	default:
   405  		return "unidentified"
   406  	}
   407  }
   408  
   409  // Metrics is a snapshot of performance statistics for the lifetime of a cache instance.
   410  type Metrics struct {
   411  	all [doNotUse][]*uint64
   412  }
   413  
   414  func newMetrics() *Metrics {
   415  	s := &Metrics{}
   416  	for i := 0; i < doNotUse; i++ {
   417  		s.all[i] = make([]*uint64, 256)
   418  		slice := s.all[i]
   419  		for j := range slice {
   420  			slice[j] = new(uint64)
   421  		}
   422  	}
   423  	return s
   424  }
   425  
   426  func (p *Metrics) add(t metricType, hash, delta uint64) {
   427  	if p == nil {
   428  		return
   429  	}
   430  	valp := p.all[t]
   431  	// Avoid false sharing by padding at least 64 bytes of space between two
   432  	// atomic counters which would be incremented.
   433  	idx := (hash % 25) * 10
   434  	atomic.AddUint64(valp[idx], delta)
   435  }
   436  
   437  func (p *Metrics) get(t metricType) uint64 {
   438  	if p == nil {
   439  		return 0
   440  	}
   441  	valp := p.all[t]
   442  	var total uint64
   443  	for i := range valp {
   444  		total += atomic.LoadUint64(valp[i])
   445  	}
   446  	return total
   447  }
   448  
   449  // Hits is the number of Get calls where a value was found for the corresponding key.
   450  func (p *Metrics) Hits() uint64 {
   451  	return p.get(hit)
   452  }
   453  
   454  // Misses is the number of Get calls where a value was not found for the corresponding key.
   455  func (p *Metrics) Misses() uint64 {
   456  	return p.get(miss)
   457  }
   458  
   459  // KeysAdded is the total number of Set calls where a new key-value item was added.
   460  func (p *Metrics) KeysAdded() uint64 {
   461  	return p.get(keyAdd)
   462  }
   463  
   464  // KeysUpdated is the total number of Set calls where the value was updated.
   465  func (p *Metrics) KeysUpdated() uint64 {
   466  	return p.get(keyUpdate)
   467  }
   468  
   469  // KeysEvicted is the total number of keys evicted.
   470  func (p *Metrics) KeysEvicted() uint64 {
   471  	return p.get(keyEvict)
   472  }
   473  
   474  // CostAdded is the sum of costs that have been added (successful Set calls).
   475  func (p *Metrics) CostAdded() uint64 {
   476  	return p.get(costAdd)
   477  }
   478  
   479  // CostEvicted is the sum of all costs that have been evicted.
   480  func (p *Metrics) CostEvicted() uint64 {
   481  	return p.get(costEvict)
   482  }
   483  
   484  // SetsDropped is the number of Set calls that don't make it into internal
   485  // buffers (due to contention or some other reason).
   486  func (p *Metrics) SetsDropped() uint64 {
   487  	return p.get(dropSets)
   488  }
   489  
   490  // SetsRejected is the number of Set calls rejected by the policy (TinyLFU).
   491  func (p *Metrics) SetsRejected() uint64 {
   492  	return p.get(rejectSets)
   493  }
   494  
   495  // GetsDropped is the number of Get counter increments that are dropped
   496  // internally.
   497  func (p *Metrics) GetsDropped() uint64 {
   498  	return p.get(dropGets)
   499  }
   500  
   501  // GetsKept is the number of Get counter increments that are kept.
   502  func (p *Metrics) GetsKept() uint64 {
   503  	return p.get(keepGets)
   504  }
   505  
   506  // Ratio is the number of Hits over all accesses (Hits + Misses). This is the
   507  // percentage of successful Get calls.
   508  func (p *Metrics) Ratio() float64 {
   509  	if p == nil {
   510  		return 0.0
   511  	}
   512  	hits, misses := p.get(hit), p.get(miss)
   513  	if hits == 0 && misses == 0 {
   514  		return 0.0
   515  	}
   516  	return float64(hits) / float64(hits+misses)
   517  }
   518  
   519  // Clear resets all the metrics.
   520  func (p *Metrics) Clear() {
   521  	if p == nil {
   522  		return
   523  	}
   524  	for i := 0; i < doNotUse; i++ {
   525  		for j := range p.all[i] {
   526  			atomic.StoreUint64(p.all[i][j], 0)
   527  		}
   528  	}
   529  }
   530  
   531  // String returns a string representation of the metrics.
   532  func (p *Metrics) String() string {
   533  	if p == nil {
   534  		return ""
   535  	}
   536  	var buf bytes.Buffer
   537  	for i := 0; i < doNotUse; i++ {
   538  		t := metricType(i)
   539  		fmt.Fprintf(&buf, "%s: %d ", stringFor(t), p.get(t))
   540  	}
   541  	fmt.Fprintf(&buf, "gets-total: %d ", p.get(hit)+p.get(miss))
   542  	fmt.Fprintf(&buf, "hit-ratio: %.2f", p.Ratio())
   543  	return buf.String()
   544  }
   545  

View as plain text