...

Source file src/github.com/launchdarkly/ccache/cache.go

Documentation: github.com/launchdarkly/ccache

     1  // An LRU cached aimed at high concurrency
     2  package ccache
     3  
     4  import (
     5  	"container/list"
     6  	"hash/fnv"
     7  	"sync/atomic"
     8  	"time"
     9  )
    10  
    11  // The cache has a generic 'control' channel that is used to send
    12  // messages to the worker. These are the messages that can be sent to it
    13  type getDropped struct {
    14  	res chan int
    15  }
    16  type setMaxSize struct {
    17  	size int64
    18  }
    19  
    20  type clear struct {
    21  	done chan struct{}
    22  }
    23  
    24  type Cache struct {
    25  	*Configuration
    26  	list        *list.List
    27  	size        int64
    28  	buckets     []*bucket
    29  	bucketMask  uint32
    30  	deletables  chan *Item
    31  	promotables chan *Item
    32  	control     chan interface{}
    33  }
    34  
    35  // Create a new cache with the specified configuration
    36  // See ccache.Configure() for creating a configuration
    37  func New(config *Configuration) *Cache {
    38  	c := &Cache{
    39  		list:          list.New(),
    40  		Configuration: config,
    41  		bucketMask:    uint32(config.buckets) - 1,
    42  		buckets:       make([]*bucket, config.buckets),
    43  		control:       make(chan interface{}),
    44  	}
    45  	for i := 0; i < config.buckets; i++ {
    46  		c.buckets[i] = &bucket{
    47  			lookup: make(map[string]*Item),
    48  		}
    49  	}
    50  	c.restart()
    51  	return c
    52  }
    53  
    54  func (c *Cache) ItemCount() int {
    55  	count := 0
    56  	for _, b := range c.buckets {
    57  		count += b.itemCount()
    58  	}
    59  	return count
    60  }
    61  
    62  func (c *Cache) DeletePrefix(prefix string) int {
    63  	count := 0
    64  	for _, b := range c.buckets {
    65  		count += b.deletePrefix(prefix, c.deletables)
    66  	}
    67  	return count
    68  }
    69  
    70  // Deletes all items that the matches func evaluates to true.
    71  func (c *Cache) DeleteFunc(matches func(key string, item *Item) bool) int {
    72  	count := 0
    73  	for _, b := range c.buckets {
    74  		count += b.deleteFunc(matches, c.deletables)
    75  	}
    76  	return count
    77  }
    78  
    79  func (c *Cache) ForEachFunc(matches func(key string, item *Item) bool) {
    80  	for _, b := range c.buckets {
    81  		if !b.forEachFunc(matches) {
    82  			break
    83  		}
    84  	}
    85  }
    86  
    87  // Get an item from the cache. Returns nil if the item wasn't found.
    88  // This can return an expired item. Use item.Expired() to see if the item
    89  // is expired and item.TTL() to see how long until the item expires (which
    90  // will be negative for an already expired item).
    91  func (c *Cache) Get(key string) *Item {
    92  	item := c.bucket(key).get(key)
    93  	if item == nil {
    94  		return nil
    95  	}
    96  	if !item.Expired() {
    97  		c.promote(item)
    98  	}
    99  	return item
   100  }
   101  
   102  // Used when the cache was created with the Track() configuration option.
   103  // Avoid otherwise
   104  func (c *Cache) TrackingGet(key string) TrackedItem {
   105  	item := c.Get(key)
   106  	if item == nil {
   107  		return NilTracked
   108  	}
   109  	item.track()
   110  	return item
   111  }
   112  
   113  // Used when the cache was created with the Track() configuration option.
   114  // Sets the item, and returns a tracked reference to it.
   115  func (c *Cache) TrackingSet(key string, value interface{}, duration time.Duration) TrackedItem {
   116  	return c.set(key, value, duration, true)
   117  }
   118  
   119  // Set the value in the cache for the specified duration
   120  func (c *Cache) Set(key string, value interface{}, duration time.Duration) {
   121  	c.set(key, value, duration, false)
   122  }
   123  
   124  // Replace the value if it exists, does not set if it doesn't.
   125  // Returns true if the item existed an was replaced, false otherwise.
   126  // Replace does not reset item's TTL
   127  func (c *Cache) Replace(key string, value interface{}) bool {
   128  	item := c.bucket(key).get(key)
   129  	if item == nil {
   130  		return false
   131  	}
   132  	c.Set(key, value, item.TTL())
   133  	return true
   134  }
   135  
   136  // Attempts to get the value from the cache and calles fetch on a miss (missing
   137  // or stale item). If fetch returns an error, no value is cached and the error
   138  // is returned back to the caller.
   139  func (c *Cache) Fetch(key string, duration time.Duration, fetch func() (interface{}, error)) (*Item, error) {
   140  	item := c.Get(key)
   141  	if item != nil && !item.Expired() {
   142  		return item, nil
   143  	}
   144  	value, err := fetch()
   145  	if err != nil {
   146  		return nil, err
   147  	}
   148  	return c.set(key, value, duration, false), nil
   149  }
   150  
   151  // Remove the item from the cache, return true if the item was present, false otherwise.
   152  func (c *Cache) Delete(key string) bool {
   153  	item := c.bucket(key).delete(key)
   154  	if item != nil {
   155  		c.deletables <- item
   156  		return true
   157  	}
   158  	return false
   159  }
   160  
   161  // Clears the cache
   162  func (c *Cache) Clear() {
   163  	done := make(chan struct{})
   164  	c.control <- clear{done: done}
   165  	<-done
   166  }
   167  
   168  // Stops the background worker. Operations performed on the cache after Stop
   169  // is called are likely to panic
   170  func (c *Cache) Stop() {
   171  	close(c.promotables)
   172  	<-c.control
   173  }
   174  
   175  // Gets the number of items removed from the cache due to memory pressure since
   176  // the last time GetDropped was called
   177  func (c *Cache) GetDropped() int {
   178  	res := make(chan int)
   179  	c.control <- getDropped{res: res}
   180  	return <-res
   181  }
   182  
   183  // Sets a new max size. That can result in a GC being run if the new maxium size
   184  // is smaller than the cached size
   185  func (c *Cache) SetMaxSize(size int64) {
   186  	c.control <- setMaxSize{size}
   187  }
   188  
   189  func (c *Cache) restart() {
   190  	c.deletables = make(chan *Item, c.deleteBuffer)
   191  	c.promotables = make(chan *Item, c.promoteBuffer)
   192  	c.control = make(chan interface{})
   193  	go c.worker()
   194  }
   195  
   196  func (c *Cache) deleteItem(bucket *bucket, item *Item) {
   197  	bucket.delete(item.key) //stop other GETs from getting it
   198  	c.deletables <- item
   199  }
   200  
   201  func (c *Cache) set(key string, value interface{}, duration time.Duration, track bool) *Item {
   202  	item, existing := c.bucket(key).set(key, value, duration, track)
   203  	if existing != nil {
   204  		c.deletables <- existing
   205  	}
   206  	c.promote(item)
   207  	return item
   208  }
   209  
   210  func (c *Cache) bucket(key string) *bucket {
   211  	h := fnv.New32a()
   212  	h.Write([]byte(key))
   213  	return c.buckets[h.Sum32()&c.bucketMask]
   214  }
   215  
   216  func (c *Cache) promote(item *Item) {
   217  	select {
   218  	case c.promotables <- item:
   219  	default:
   220  	}
   221  
   222  }
   223  
   224  func (c *Cache) worker() {
   225  	defer close(c.control)
   226  	dropped := 0
   227  	for {
   228  		select {
   229  		case item, ok := <-c.promotables:
   230  			if ok == false {
   231  				goto drain
   232  			}
   233  			if c.doPromote(item) && c.size > c.maxSize {
   234  				dropped += c.gc()
   235  			}
   236  		case item := <-c.deletables:
   237  			c.doDelete(item)
   238  		case control := <-c.control:
   239  			switch msg := control.(type) {
   240  			case getDropped:
   241  				msg.res <- dropped
   242  				dropped = 0
   243  			case setMaxSize:
   244  				c.maxSize = msg.size
   245  				if c.size > c.maxSize {
   246  					dropped += c.gc()
   247  				}
   248  			case clear:
   249  				for _, bucket := range c.buckets {
   250  					bucket.clear()
   251  				}
   252  				c.size = 0
   253  				c.list = list.New()
   254  				msg.done <- struct{}{}
   255  			}
   256  		}
   257  	}
   258  
   259  drain:
   260  	for {
   261  		select {
   262  		case item := <-c.deletables:
   263  			c.doDelete(item)
   264  		default:
   265  			close(c.deletables)
   266  			return
   267  		}
   268  	}
   269  }
   270  
   271  func (c *Cache) doDelete(item *Item) {
   272  	if item.element == nil {
   273  		item.promotions = -2
   274  	} else {
   275  		c.size -= item.size
   276  		if c.onDelete != nil {
   277  			c.onDelete(item)
   278  		}
   279  		c.list.Remove(item.element)
   280  	}
   281  }
   282  
   283  func (c *Cache) doPromote(item *Item) bool {
   284  	//already deleted
   285  	if item.promotions == -2 {
   286  		return false
   287  	}
   288  	if item.element != nil { //not a new item
   289  		if item.shouldPromote(c.getsPerPromote) {
   290  			c.list.MoveToFront(item.element)
   291  			item.promotions = 0
   292  		}
   293  		return false
   294  	}
   295  
   296  	c.size += item.size
   297  	item.element = c.list.PushFront(item)
   298  	return true
   299  }
   300  
   301  func (c *Cache) gc() int {
   302  	dropped := 0
   303  	element := c.list.Back()
   304  	for i := 0; i < c.itemsToPrune; i++ {
   305  		if element == nil {
   306  			return dropped
   307  		}
   308  		prev := element.Prev()
   309  		item := element.Value.(*Item)
   310  		if c.tracking == false || atomic.LoadInt32(&item.refCount) == 0 {
   311  			c.bucket(item.key).delete(item.key)
   312  			c.size -= item.size
   313  			c.list.Remove(element)
   314  			if c.onDelete != nil {
   315  				c.onDelete(item)
   316  			}
   317  			dropped += 1
   318  			item.promotions = -2
   319  		}
   320  		element = prev
   321  	}
   322  	return dropped
   323  }
   324  

View as plain text