...

Source file src/k8s.io/apimachinery/pkg/util/cache/expiring.go

Documentation: k8s.io/apimachinery/pkg/util/cache

     1  /*
     2  Copyright 2019 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package cache
    18  
    19  import (
    20  	"container/heap"
    21  	"sync"
    22  	"time"
    23  
    24  	"k8s.io/utils/clock"
    25  )
    26  
    27  // NewExpiring returns an initialized expiring cache.
    28  func NewExpiring() *Expiring {
    29  	return NewExpiringWithClock(clock.RealClock{})
    30  }
    31  
    32  // NewExpiringWithClock is like NewExpiring but allows passing in a custom
    33  // clock for testing.
    34  func NewExpiringWithClock(clock clock.Clock) *Expiring {
    35  	return &Expiring{
    36  		clock: clock,
    37  		cache: make(map[interface{}]entry),
    38  	}
    39  }
    40  
    41  // Expiring is a map whose entries expire after a per-entry timeout.
    42  type Expiring struct {
    43  	// AllowExpiredGet causes the expiration check to be skipped on Get.
    44  	// It should only be used when a key always corresponds to the exact same value.
    45  	// Thus when this field is true, expired keys are considered valid
    46  	// until the next call to Set (which causes the GC to run).
    47  	// It may not be changed concurrently with calls to Get.
    48  	AllowExpiredGet bool
    49  
    50  	clock clock.Clock
    51  
    52  	// mu protects the below fields
    53  	mu sync.RWMutex
    54  	// cache is the internal map that backs the cache.
    55  	cache map[interface{}]entry
    56  	// generation is used as a cheap resource version for cache entries. Cleanups
    57  	// are scheduled with a key and generation. When the cleanup runs, it first
    58  	// compares its generation with the current generation of the entry. It
    59  	// deletes the entry iff the generation matches. This prevents cleanups
    60  	// scheduled for earlier versions of an entry from deleting later versions of
    61  	// an entry when Set() is called multiple times with the same key.
    62  	//
    63  	// The integer value of the generation of an entry is meaningless.
    64  	generation uint64
    65  
    66  	heap expiringHeap
    67  }
    68  
    69  type entry struct {
    70  	val        interface{}
    71  	expiry     time.Time
    72  	generation uint64
    73  }
    74  
    75  // Get looks up an entry in the cache.
    76  func (c *Expiring) Get(key interface{}) (val interface{}, ok bool) {
    77  	c.mu.RLock()
    78  	defer c.mu.RUnlock()
    79  	e, ok := c.cache[key]
    80  	if !ok {
    81  		return nil, false
    82  	}
    83  	if !c.AllowExpiredGet && !c.clock.Now().Before(e.expiry) {
    84  		return nil, false
    85  	}
    86  	return e.val, true
    87  }
    88  
    89  // Set sets a key/value/expiry entry in the map, overwriting any previous entry
    90  // with the same key. The entry expires at the given expiry time, but its TTL
    91  // may be lengthened or shortened by additional calls to Set(). Garbage
    92  // collection of expired entries occurs during calls to Set(), however calls to
    93  // Get() will not return expired entries that have not yet been garbage
    94  // collected.
    95  func (c *Expiring) Set(key interface{}, val interface{}, ttl time.Duration) {
    96  	now := c.clock.Now()
    97  	expiry := now.Add(ttl)
    98  
    99  	c.mu.Lock()
   100  	defer c.mu.Unlock()
   101  
   102  	c.generation++
   103  
   104  	c.cache[key] = entry{
   105  		val:        val,
   106  		expiry:     expiry,
   107  		generation: c.generation,
   108  	}
   109  
   110  	// Run GC inline before pushing the new entry.
   111  	c.gc(now)
   112  
   113  	heap.Push(&c.heap, &expiringHeapEntry{
   114  		key:        key,
   115  		expiry:     expiry,
   116  		generation: c.generation,
   117  	})
   118  }
   119  
   120  // Delete deletes an entry in the map.
   121  func (c *Expiring) Delete(key interface{}) {
   122  	c.mu.Lock()
   123  	defer c.mu.Unlock()
   124  	c.del(key, 0)
   125  }
   126  
   127  // del deletes the entry for the given key. The generation argument is the
   128  // generation of the entry that should be deleted. If the generation has been
   129  // changed (e.g. if a set has occurred on an existing element but the old
   130  // cleanup still runs), this is a noop. If the generation argument is 0, the
   131  // entry's generation is ignored and the entry is deleted.
   132  //
   133  // del must be called under the write lock.
   134  func (c *Expiring) del(key interface{}, generation uint64) {
   135  	e, ok := c.cache[key]
   136  	if !ok {
   137  		return
   138  	}
   139  	if generation != 0 && generation != e.generation {
   140  		return
   141  	}
   142  	delete(c.cache, key)
   143  }
   144  
   145  // Len returns the number of items in the cache.
   146  func (c *Expiring) Len() int {
   147  	c.mu.RLock()
   148  	defer c.mu.RUnlock()
   149  	return len(c.cache)
   150  }
   151  
   152  func (c *Expiring) gc(now time.Time) {
   153  	for {
   154  		// Return from gc if the heap is empty or the next element is not yet
   155  		// expired.
   156  		//
   157  		// heap[0] is a peek at the next element in the heap, which is not obvious
   158  		// from looking at the (*expiringHeap).Pop() implementation below.
   159  		// heap.Pop() swaps the first entry with the last entry of the heap, then
   160  		// calls (*expiringHeap).Pop() which returns the last element.
   161  		if len(c.heap) == 0 || now.Before(c.heap[0].expiry) {
   162  			return
   163  		}
   164  		cleanup := heap.Pop(&c.heap).(*expiringHeapEntry)
   165  		c.del(cleanup.key, cleanup.generation)
   166  	}
   167  }
   168  
   169  type expiringHeapEntry struct {
   170  	key        interface{}
   171  	expiry     time.Time
   172  	generation uint64
   173  }
   174  
   175  // expiringHeap is a min-heap ordered by expiration time of its entries. The
   176  // expiring cache uses this as a priority queue to efficiently organize entries
   177  // which will be garbage collected once they expire.
   178  type expiringHeap []*expiringHeapEntry
   179  
   180  var _ heap.Interface = &expiringHeap{}
   181  
   182  func (cq expiringHeap) Len() int {
   183  	return len(cq)
   184  }
   185  
   186  func (cq expiringHeap) Less(i, j int) bool {
   187  	return cq[i].expiry.Before(cq[j].expiry)
   188  }
   189  
   190  func (cq expiringHeap) Swap(i, j int) {
   191  	cq[i], cq[j] = cq[j], cq[i]
   192  }
   193  
   194  func (cq *expiringHeap) Push(c interface{}) {
   195  	*cq = append(*cq, c.(*expiringHeapEntry))
   196  }
   197  
   198  func (cq *expiringHeap) Pop() interface{} {
   199  	c := (*cq)[cq.Len()-1]
   200  	*cq = (*cq)[:cq.Len()-1]
   201  	return c
   202  }
   203  

View as plain text