...

Source file src/github.com/launchdarkly/ccache/layeredcache.go

Documentation: github.com/launchdarkly/ccache

     1  // An LRU cached aimed at high concurrency
     2  package ccache
     3  
     4  import (
     5  	"container/list"
     6  	"hash/fnv"
     7  	"sync/atomic"
     8  	"time"
     9  )
    10  
    11  type LayeredCache struct {
    12  	*Configuration
    13  	list        *list.List
    14  	buckets     []*layeredBucket
    15  	bucketMask  uint32
    16  	size        int64
    17  	deletables  chan *Item
    18  	promotables chan *Item
    19  	control     chan interface{}
    20  }
    21  
    22  // Create a new layered cache with the specified configuration.
    23  // A layered cache used a two keys to identify a value: a primary key
    24  // and a secondary key. Get, Set and Delete require both a primary and
    25  // secondary key. However, DeleteAll requires only a primary key, deleting
    26  // all values that share the same primary key.
    27  
    28  // Layered Cache is useful as an HTTP cache, where an HTTP purge might
    29  // delete multiple variants of the same resource:
    30  // primary key = "user/44"
    31  // secondary key 1 = ".json"
    32  // secondary key 2 = ".xml"
    33  
    34  // See ccache.Configure() for creating a configuration
    35  func Layered(config *Configuration) *LayeredCache {
    36  	c := &LayeredCache{
    37  		list:          list.New(),
    38  		Configuration: config,
    39  		bucketMask:    uint32(config.buckets) - 1,
    40  		buckets:       make([]*layeredBucket, config.buckets),
    41  		deletables:    make(chan *Item, config.deleteBuffer),
    42  		control:       make(chan interface{}),
    43  	}
    44  	for i := 0; i < int(config.buckets); i++ {
    45  		c.buckets[i] = &layeredBucket{
    46  			buckets: make(map[string]*bucket),
    47  		}
    48  	}
    49  	c.restart()
    50  	return c
    51  }
    52  
    53  func (c *LayeredCache) ItemCount() int {
    54  	count := 0
    55  	for _, b := range c.buckets {
    56  		count += b.itemCount()
    57  	}
    58  	return count
    59  }
    60  
    61  // Get an item from the cache. Returns nil if the item wasn't found.
    62  // This can return an expired item. Use item.Expired() to see if the item
    63  // is expired and item.TTL() to see how long until the item expires (which
    64  // will be negative for an already expired item).
    65  func (c *LayeredCache) Get(primary, secondary string) *Item {
    66  	item := c.bucket(primary).get(primary, secondary)
    67  	if item == nil {
    68  		return nil
    69  	}
    70  	if item.expires > time.Now().UnixNano() {
    71  		c.promote(item)
    72  	}
    73  	return item
    74  }
    75  
    76  func (c *LayeredCache) ForEachFunc(primary string, matches func(key string, item *Item) bool) {
    77  	c.bucket(primary).forEachFunc(primary, matches)
    78  }
    79  
    80  // Get the secondary cache for a given primary key. This operation will
    81  // never return nil. In the case where the primary key does not exist, a
    82  // new, underlying, empty bucket will be created and returned.
    83  func (c *LayeredCache) GetOrCreateSecondaryCache(primary string) *SecondaryCache {
    84  	primaryBkt := c.bucket(primary)
    85  	bkt := primaryBkt.getSecondaryBucket(primary)
    86  	primaryBkt.Lock()
    87  	if bkt == nil {
    88  		bkt = &bucket{lookup: make(map[string]*Item)}
    89  		primaryBkt.buckets[primary] = bkt
    90  	}
    91  	primaryBkt.Unlock()
    92  	return &SecondaryCache{
    93  		bucket: bkt,
    94  		pCache: c,
    95  	}
    96  }
    97  
    98  // Used when the cache was created with the Track() configuration option.
    99  // Avoid otherwise
   100  func (c *LayeredCache) TrackingGet(primary, secondary string) TrackedItem {
   101  	item := c.Get(primary, secondary)
   102  	if item == nil {
   103  		return NilTracked
   104  	}
   105  	item.track()
   106  	return item
   107  }
   108  
   109  // Set the value in the cache for the specified duration
   110  func (c *LayeredCache) TrackingSet(primary, secondary string, value interface{}, duration time.Duration) TrackedItem {
   111  	return c.set(primary, secondary, value, duration, true)
   112  }
   113  
   114  // Set the value in the cache for the specified duration
   115  func (c *LayeredCache) Set(primary, secondary string, value interface{}, duration time.Duration) {
   116  	c.set(primary, secondary, value, duration, false)
   117  }
   118  
   119  // Replace the value if it exists, does not set if it doesn't.
   120  // Returns true if the item existed an was replaced, false otherwise.
   121  // Replace does not reset item's TTL nor does it alter its position in the LRU
   122  func (c *LayeredCache) Replace(primary, secondary string, value interface{}) bool {
   123  	item := c.bucket(primary).get(primary, secondary)
   124  	if item == nil {
   125  		return false
   126  	}
   127  	c.Set(primary, secondary, value, item.TTL())
   128  	return true
   129  }
   130  
   131  // Attempts to get the value from the cache and calles fetch on a miss.
   132  // If fetch returns an error, no value is cached and the error is returned back
   133  // to the caller.
   134  func (c *LayeredCache) Fetch(primary, secondary string, duration time.Duration, fetch func() (interface{}, error)) (*Item, error) {
   135  	item := c.Get(primary, secondary)
   136  	if item != nil {
   137  		return item, nil
   138  	}
   139  	value, err := fetch()
   140  	if err != nil {
   141  		return nil, err
   142  	}
   143  	return c.set(primary, secondary, value, duration, false), nil
   144  }
   145  
   146  // Remove the item from the cache, return true if the item was present, false otherwise.
   147  func (c *LayeredCache) Delete(primary, secondary string) bool {
   148  	item := c.bucket(primary).delete(primary, secondary)
   149  	if item != nil {
   150  		c.deletables <- item
   151  		return true
   152  	}
   153  	return false
   154  }
   155  
   156  // Deletes all items that share the same primary key
   157  func (c *LayeredCache) DeleteAll(primary string) bool {
   158  	return c.bucket(primary).deleteAll(primary, c.deletables)
   159  }
   160  
   161  // Deletes all items that share the same primary key and prefix.
   162  func (c *LayeredCache) DeletePrefix(primary, prefix string) int {
   163  	return c.bucket(primary).deletePrefix(primary, prefix, c.deletables)
   164  }
   165  
   166  // Deletes all items that share the same primary key and where the matches func evaluates to true.
   167  func (c *LayeredCache) DeleteFunc(primary string, matches func(key string, item *Item) bool) int {
   168  	return c.bucket(primary).deleteFunc(primary, matches, c.deletables)
   169  }
   170  
   171  // Clears the cache
   172  func (c *LayeredCache) Clear() {
   173  	done := make(chan struct{})
   174  	c.control <- clear{done: done}
   175  	<-done
   176  }
   177  
   178  func (c *LayeredCache) Stop() {
   179  	close(c.promotables)
   180  	<-c.control
   181  }
   182  
   183  // Gets the number of items removed from the cache due to memory pressure since
   184  // the last time GetDropped was called
   185  func (c *LayeredCache) GetDropped() int {
   186  	res := make(chan int)
   187  	c.control <- getDropped{res: res}
   188  	return <-res
   189  }
   190  
   191  // Sets a new max size. That can result in a GC being run if the new maxium size
   192  // is smaller than the cached size
   193  func (c *LayeredCache) SetMaxSize(size int64) {
   194  	c.control <- setMaxSize{size}
   195  }
   196  
   197  func (c *LayeredCache) restart() {
   198  	c.promotables = make(chan *Item, c.promoteBuffer)
   199  	c.control = make(chan interface{})
   200  	go c.worker()
   201  }
   202  
   203  func (c *LayeredCache) set(primary, secondary string, value interface{}, duration time.Duration, track bool) *Item {
   204  	item, existing := c.bucket(primary).set(primary, secondary, value, duration, track)
   205  	if existing != nil {
   206  		c.deletables <- existing
   207  	}
   208  	c.promote(item)
   209  	return item
   210  }
   211  
   212  func (c *LayeredCache) bucket(key string) *layeredBucket {
   213  	h := fnv.New32a()
   214  	h.Write([]byte(key))
   215  	return c.buckets[h.Sum32()&c.bucketMask]
   216  }
   217  
   218  func (c *LayeredCache) promote(item *Item) {
   219  	c.promotables <- item
   220  }
   221  
   222  func (c *LayeredCache) worker() {
   223  	defer close(c.control)
   224  	dropped := 0
   225  	for {
   226  		select {
   227  		case item, ok := <-c.promotables:
   228  			if ok == false {
   229  				return
   230  			}
   231  			if c.doPromote(item) && c.size > c.maxSize {
   232  				dropped += c.gc()
   233  			}
   234  		case item := <-c.deletables:
   235  			if item.element == nil {
   236  				atomic.StoreInt32(&item.promotions, -2)
   237  			} else {
   238  				c.size -= item.size
   239  				if c.onDelete != nil {
   240  					c.onDelete(item)
   241  				}
   242  				c.list.Remove(item.element)
   243  			}
   244  		case control := <-c.control:
   245  			switch msg := control.(type) {
   246  			case getDropped:
   247  				msg.res <- dropped
   248  				dropped = 0
   249  			case setMaxSize:
   250  				c.maxSize = msg.size
   251  				if c.size > c.maxSize {
   252  					dropped += c.gc()
   253  				}
   254  			case clear:
   255  				for _, bucket := range c.buckets {
   256  					bucket.clear()
   257  				}
   258  				c.size = 0
   259  				c.list = list.New()
   260  				msg.done <- struct{}{}
   261  			}
   262  		}
   263  	}
   264  }
   265  
   266  func (c *LayeredCache) doPromote(item *Item) bool {
   267  	// deleted before it ever got promoted
   268  	if atomic.LoadInt32(&item.promotions) == -2 {
   269  		return false
   270  	}
   271  	if item.element != nil { //not a new item
   272  		if item.shouldPromote(c.getsPerPromote) {
   273  			c.list.MoveToFront(item.element)
   274  			atomic.StoreInt32(&item.promotions, 0)
   275  		}
   276  		return false
   277  	}
   278  	c.size += item.size
   279  	item.element = c.list.PushFront(item)
   280  	return true
   281  }
   282  
   283  func (c *LayeredCache) gc() int {
   284  	element := c.list.Back()
   285  	dropped := 0
   286  	for i := 0; i < c.itemsToPrune; i++ {
   287  		if element == nil {
   288  			return dropped
   289  		}
   290  		prev := element.Prev()
   291  		item := element.Value.(*Item)
   292  		if c.tracking == false || atomic.LoadInt32(&item.refCount) == 0 {
   293  			c.bucket(item.group).delete(item.group, item.key)
   294  			c.size -= item.size
   295  			c.list.Remove(element)
   296  			item.promotions = -2
   297  			dropped += 1
   298  		}
   299  		element = prev
   300  	}
   301  	return dropped
   302  }
   303  

View as plain text