...

Source file src/k8s.io/client-go/tools/cache/mutation_cache.go

Documentation: k8s.io/client-go/tools/cache

     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package cache
    18  
    19  import (
    20  	"fmt"
    21  	"strconv"
    22  	"sync"
    23  	"time"
    24  
    25  	"k8s.io/klog/v2"
    26  
    27  	"k8s.io/apimachinery/pkg/api/meta"
    28  	"k8s.io/apimachinery/pkg/runtime"
    29  	utilcache "k8s.io/apimachinery/pkg/util/cache"
    30  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    31  	"k8s.io/apimachinery/pkg/util/sets"
    32  )
    33  
    34  // MutationCache is able to take the result of update operations and stores them in an LRU
    35  // that can be used to provide a more current view of a requested object.  It requires interpreting
    36  // resourceVersions for comparisons.
    37  // Implementations must be thread-safe.
    38  // TODO find a way to layer this into an informer/lister
    39  type MutationCache interface {
    40  	GetByKey(key string) (interface{}, bool, error)
    41  	ByIndex(indexName, indexKey string) ([]interface{}, error)
    42  	Mutation(interface{})
    43  }
    44  
    45  // ResourceVersionComparator is able to compare object versions.
    46  type ResourceVersionComparator interface {
    47  	CompareResourceVersion(lhs, rhs runtime.Object) int
    48  }
    49  
    50  // NewIntegerResourceVersionMutationCache returns a MutationCache that understands how to
    51  // deal with objects that have a resource version that:
    52  //
    53  //   - is an integer
    54  //   - increases when updated
    55  //   - is comparable across the same resource in a namespace
    56  //
    57  // Most backends will have these semantics. Indexer may be nil. ttl controls how long an item
    58  // remains in the mutation cache before it is removed.
    59  //
    60  // If includeAdds is true, objects in the mutation cache will be returned even if they don't exist
    61  // in the underlying store. This is only safe if your use of the cache can handle mutation entries
    62  // remaining in the cache for up to ttl when mutations and deletes occur very closely in time.
    63  func NewIntegerResourceVersionMutationCache(backingCache Store, indexer Indexer, ttl time.Duration, includeAdds bool) MutationCache {
    64  	return &mutationCache{
    65  		backingCache:  backingCache,
    66  		indexer:       indexer,
    67  		mutationCache: utilcache.NewLRUExpireCache(100),
    68  		comparator:    etcdObjectVersioner{},
    69  		ttl:           ttl,
    70  		includeAdds:   includeAdds,
    71  	}
    72  }
    73  
    74  // mutationCache doesn't guarantee that it returns values added via Mutation since they can page out and
    75  // since you can't distinguish between, "didn't observe create" and "was deleted after create",
    76  // if the key is missing from the backing cache, we always return it as missing
    77  type mutationCache struct {
    78  	lock          sync.Mutex
    79  	backingCache  Store
    80  	indexer       Indexer
    81  	mutationCache *utilcache.LRUExpireCache
    82  	includeAdds   bool
    83  	ttl           time.Duration
    84  
    85  	comparator ResourceVersionComparator
    86  }
    87  
    88  // GetByKey is never guaranteed to return back the value set in Mutation.  It could be paged out, it could
    89  // be older than another copy, the backingCache may be more recent or, you might have written twice into the same key.
    90  // You get a value that was valid at some snapshot of time and will always return the newer of backingCache and mutationCache.
    91  func (c *mutationCache) GetByKey(key string) (interface{}, bool, error) {
    92  	c.lock.Lock()
    93  	defer c.lock.Unlock()
    94  
    95  	obj, exists, err := c.backingCache.GetByKey(key)
    96  	if err != nil {
    97  		return nil, false, err
    98  	}
    99  	if !exists {
   100  		if !c.includeAdds {
   101  			// we can't distinguish between, "didn't observe create" and "was deleted after create", so
   102  			// if the key is missing, we always return it as missing
   103  			return nil, false, nil
   104  		}
   105  		obj, exists = c.mutationCache.Get(key)
   106  		if !exists {
   107  			return nil, false, nil
   108  		}
   109  	}
   110  	objRuntime, ok := obj.(runtime.Object)
   111  	if !ok {
   112  		return obj, true, nil
   113  	}
   114  	return c.newerObject(key, objRuntime), true, nil
   115  }
   116  
   117  // ByIndex returns the newer objects that match the provided index and indexer key.
   118  // Will return an error if no indexer was provided.
   119  func (c *mutationCache) ByIndex(name string, indexKey string) ([]interface{}, error) {
   120  	c.lock.Lock()
   121  	defer c.lock.Unlock()
   122  	if c.indexer == nil {
   123  		return nil, fmt.Errorf("no indexer has been provided to the mutation cache")
   124  	}
   125  	keys, err := c.indexer.IndexKeys(name, indexKey)
   126  	if err != nil {
   127  		return nil, err
   128  	}
   129  	var items []interface{}
   130  	keySet := sets.NewString()
   131  	for _, key := range keys {
   132  		keySet.Insert(key)
   133  		obj, exists, err := c.indexer.GetByKey(key)
   134  		if err != nil {
   135  			return nil, err
   136  		}
   137  		if !exists {
   138  			continue
   139  		}
   140  		if objRuntime, ok := obj.(runtime.Object); ok {
   141  			items = append(items, c.newerObject(key, objRuntime))
   142  		} else {
   143  			items = append(items, obj)
   144  		}
   145  	}
   146  
   147  	if c.includeAdds {
   148  		fn := c.indexer.GetIndexers()[name]
   149  		// Keys() is returned oldest to newest, so full traversal does not alter the LRU behavior
   150  		for _, key := range c.mutationCache.Keys() {
   151  			updated, ok := c.mutationCache.Get(key)
   152  			if !ok {
   153  				continue
   154  			}
   155  			if keySet.Has(key.(string)) {
   156  				continue
   157  			}
   158  			elements, err := fn(updated)
   159  			if err != nil {
   160  				klog.V(4).Infof("Unable to calculate an index entry for mutation cache entry %s: %v", key, err)
   161  				continue
   162  			}
   163  			for _, inIndex := range elements {
   164  				if inIndex != indexKey {
   165  					continue
   166  				}
   167  				items = append(items, updated)
   168  				break
   169  			}
   170  		}
   171  	}
   172  
   173  	return items, nil
   174  }
   175  
   176  // newerObject checks the mutation cache for a newer object and returns one if found. If the
   177  // mutated object is older than the backing object, it is removed from the  Must be
   178  // called while the lock is held.
   179  func (c *mutationCache) newerObject(key string, backing runtime.Object) runtime.Object {
   180  	mutatedObj, exists := c.mutationCache.Get(key)
   181  	if !exists {
   182  		return backing
   183  	}
   184  	mutatedObjRuntime, ok := mutatedObj.(runtime.Object)
   185  	if !ok {
   186  		return backing
   187  	}
   188  	if c.comparator.CompareResourceVersion(backing, mutatedObjRuntime) >= 0 {
   189  		c.mutationCache.Remove(key)
   190  		return backing
   191  	}
   192  	return mutatedObjRuntime
   193  }
   194  
   195  // Mutation adds a change to the cache that can be returned in GetByKey if it is newer than the backingCache
   196  // copy.  If you call Mutation twice with the same object on different threads, one will win, but its not defined
   197  // which one.  This doesn't affect correctness, since the GetByKey guaranteed of "later of these two caches" is
   198  // preserved, but you may not get the version of the object you want.  The object you get is only guaranteed to
   199  // "one that was valid at some point in time", not "the one that I want".
   200  func (c *mutationCache) Mutation(obj interface{}) {
   201  	c.lock.Lock()
   202  	defer c.lock.Unlock()
   203  
   204  	key, err := DeletionHandlingMetaNamespaceKeyFunc(obj)
   205  	if err != nil {
   206  		// this is a "nice to have", so failures shouldn't do anything weird
   207  		utilruntime.HandleError(err)
   208  		return
   209  	}
   210  
   211  	if objRuntime, ok := obj.(runtime.Object); ok {
   212  		if mutatedObj, exists := c.mutationCache.Get(key); exists {
   213  			if mutatedObjRuntime, ok := mutatedObj.(runtime.Object); ok {
   214  				if c.comparator.CompareResourceVersion(objRuntime, mutatedObjRuntime) < 0 {
   215  					return
   216  				}
   217  			}
   218  		}
   219  	}
   220  	c.mutationCache.Add(key, obj, c.ttl)
   221  }
   222  
   223  // etcdObjectVersioner implements versioning and extracting etcd node information
   224  // for objects that have an embedded ObjectMeta or ListMeta field.
   225  type etcdObjectVersioner struct{}
   226  
   227  // ObjectResourceVersion implements Versioner
   228  func (a etcdObjectVersioner) ObjectResourceVersion(obj runtime.Object) (uint64, error) {
   229  	accessor, err := meta.Accessor(obj)
   230  	if err != nil {
   231  		return 0, err
   232  	}
   233  	version := accessor.GetResourceVersion()
   234  	if len(version) == 0 {
   235  		return 0, nil
   236  	}
   237  	return strconv.ParseUint(version, 10, 64)
   238  }
   239  
   240  // CompareResourceVersion compares etcd resource versions.  Outside this API they are all strings,
   241  // but etcd resource versions are special, they're actually ints, so we can easily compare them.
   242  func (a etcdObjectVersioner) CompareResourceVersion(lhs, rhs runtime.Object) int {
   243  	lhsVersion, err := a.ObjectResourceVersion(lhs)
   244  	if err != nil {
   245  		// coder error
   246  		panic(err)
   247  	}
   248  	rhsVersion, err := a.ObjectResourceVersion(rhs)
   249  	if err != nil {
   250  		// coder error
   251  		panic(err)
   252  	}
   253  
   254  	if lhsVersion == rhsVersion {
   255  		return 0
   256  	}
   257  	if lhsVersion < rhsVersion {
   258  		return -1
   259  	}
   260  
   261  	return 1
   262  }
   263  

View as plain text