...

Source file src/k8s.io/client-go/tools/cache/thread_safe_store.go

Documentation: k8s.io/client-go/tools/cache

     1  /*
     2  Copyright 2014 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package cache
    18  
    19  import (
    20  	"fmt"
    21  	"sync"
    22  
    23  	"k8s.io/apimachinery/pkg/util/sets"
    24  )
    25  
    26  // ThreadSafeStore is an interface that allows concurrent indexed
    27  // access to a storage backend.  It is like Indexer but does not
    28  // (necessarily) know how to extract the Store key from a given
    29  // object.
    30  //
    31  // TL;DR caveats: you must not modify anything returned by Get or List as it will break
    32  // the indexing feature in addition to not being thread safe.
    33  //
    34  // The guarantees of thread safety provided by List/Get are only valid if the caller
    35  // treats returned items as read-only. For example, a pointer inserted in the store
    36  // through `Add` will be returned as is by `Get`. Multiple clients might invoke `Get`
    37  // on the same key and modify the pointer in a non-thread-safe way. Also note that
    38  // modifying objects stored by the indexers (if any) will *not* automatically lead
    39  // to a re-index. So it's not a good idea to directly modify the objects returned by
    40  // Get/List, in general.
    41  type ThreadSafeStore interface {
    42  	Add(key string, obj interface{})
    43  	Update(key string, obj interface{})
    44  	Delete(key string)
    45  	Get(key string) (item interface{}, exists bool)
    46  	List() []interface{}
    47  	ListKeys() []string
    48  	Replace(map[string]interface{}, string)
    49  	Index(indexName string, obj interface{}) ([]interface{}, error)
    50  	IndexKeys(indexName, indexedValue string) ([]string, error)
    51  	ListIndexFuncValues(name string) []string
    52  	ByIndex(indexName, indexedValue string) ([]interface{}, error)
    53  	GetIndexers() Indexers
    54  
    55  	// AddIndexers adds more indexers to this store. This supports adding indexes after the store already has items.
    56  	AddIndexers(newIndexers Indexers) error
    57  	// Resync is a no-op and is deprecated
    58  	Resync() error
    59  }
    60  
    61  // storeIndex implements the indexing functionality for Store interface
    62  type storeIndex struct {
    63  	// indexers maps a name to an IndexFunc
    64  	indexers Indexers
    65  	// indices maps a name to an Index
    66  	indices Indices
    67  }
    68  
    69  func (i *storeIndex) reset() {
    70  	i.indices = Indices{}
    71  }
    72  
    73  func (i *storeIndex) getKeysFromIndex(indexName string, obj interface{}) (sets.String, error) {
    74  	indexFunc := i.indexers[indexName]
    75  	if indexFunc == nil {
    76  		return nil, fmt.Errorf("Index with name %s does not exist", indexName)
    77  	}
    78  
    79  	indexedValues, err := indexFunc(obj)
    80  	if err != nil {
    81  		return nil, err
    82  	}
    83  	index := i.indices[indexName]
    84  
    85  	var storeKeySet sets.String
    86  	if len(indexedValues) == 1 {
    87  		// In majority of cases, there is exactly one value matching.
    88  		// Optimize the most common path - deduping is not needed here.
    89  		storeKeySet = index[indexedValues[0]]
    90  	} else {
    91  		// Need to de-dupe the return list.
    92  		// Since multiple keys are allowed, this can happen.
    93  		storeKeySet = sets.String{}
    94  		for _, indexedValue := range indexedValues {
    95  			for key := range index[indexedValue] {
    96  				storeKeySet.Insert(key)
    97  			}
    98  		}
    99  	}
   100  
   101  	return storeKeySet, nil
   102  }
   103  
   104  func (i *storeIndex) getKeysByIndex(indexName, indexedValue string) (sets.String, error) {
   105  	indexFunc := i.indexers[indexName]
   106  	if indexFunc == nil {
   107  		return nil, fmt.Errorf("Index with name %s does not exist", indexName)
   108  	}
   109  
   110  	index := i.indices[indexName]
   111  	return index[indexedValue], nil
   112  }
   113  
   114  func (i *storeIndex) getIndexValues(indexName string) []string {
   115  	index := i.indices[indexName]
   116  	names := make([]string, 0, len(index))
   117  	for key := range index {
   118  		names = append(names, key)
   119  	}
   120  	return names
   121  }
   122  
   123  func (i *storeIndex) addIndexers(newIndexers Indexers) error {
   124  	oldKeys := sets.StringKeySet(i.indexers)
   125  	newKeys := sets.StringKeySet(newIndexers)
   126  
   127  	if oldKeys.HasAny(newKeys.List()...) {
   128  		return fmt.Errorf("indexer conflict: %v", oldKeys.Intersection(newKeys))
   129  	}
   130  
   131  	for k, v := range newIndexers {
   132  		i.indexers[k] = v
   133  	}
   134  	return nil
   135  }
   136  
   137  // updateSingleIndex modifies the objects location in the named index:
   138  // - for create you must provide only the newObj
   139  // - for update you must provide both the oldObj and the newObj
   140  // - for delete you must provide only the oldObj
   141  // updateSingleIndex must be called from a function that already has a lock on the cache
   142  func (i *storeIndex) updateSingleIndex(name string, oldObj interface{}, newObj interface{}, key string) {
   143  	var oldIndexValues, indexValues []string
   144  	indexFunc, ok := i.indexers[name]
   145  	if !ok {
   146  		// Should never happen. Caller is responsible for ensuring this exists, and should call with lock
   147  		// held to avoid any races.
   148  		panic(fmt.Errorf("indexer %q does not exist", name))
   149  	}
   150  	if oldObj != nil {
   151  		var err error
   152  		oldIndexValues, err = indexFunc(oldObj)
   153  		if err != nil {
   154  			panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
   155  		}
   156  	} else {
   157  		oldIndexValues = oldIndexValues[:0]
   158  	}
   159  
   160  	if newObj != nil {
   161  		var err error
   162  		indexValues, err = indexFunc(newObj)
   163  		if err != nil {
   164  			panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
   165  		}
   166  	} else {
   167  		indexValues = indexValues[:0]
   168  	}
   169  
   170  	index := i.indices[name]
   171  	if index == nil {
   172  		index = Index{}
   173  		i.indices[name] = index
   174  	}
   175  
   176  	if len(indexValues) == 1 && len(oldIndexValues) == 1 && indexValues[0] == oldIndexValues[0] {
   177  		// We optimize for the most common case where indexFunc returns a single value which has not been changed
   178  		return
   179  	}
   180  
   181  	for _, value := range oldIndexValues {
   182  		i.deleteKeyFromIndex(key, value, index)
   183  	}
   184  	for _, value := range indexValues {
   185  		i.addKeyToIndex(key, value, index)
   186  	}
   187  }
   188  
   189  // updateIndices modifies the objects location in the managed indexes:
   190  // - for create you must provide only the newObj
   191  // - for update you must provide both the oldObj and the newObj
   192  // - for delete you must provide only the oldObj
   193  // updateIndices must be called from a function that already has a lock on the cache
   194  func (i *storeIndex) updateIndices(oldObj interface{}, newObj interface{}, key string) {
   195  	for name := range i.indexers {
   196  		i.updateSingleIndex(name, oldObj, newObj, key)
   197  	}
   198  }
   199  
   200  func (i *storeIndex) addKeyToIndex(key, indexValue string, index Index) {
   201  	set := index[indexValue]
   202  	if set == nil {
   203  		set = sets.String{}
   204  		index[indexValue] = set
   205  	}
   206  	set.Insert(key)
   207  }
   208  
   209  func (i *storeIndex) deleteKeyFromIndex(key, indexValue string, index Index) {
   210  	set := index[indexValue]
   211  	if set == nil {
   212  		return
   213  	}
   214  	set.Delete(key)
   215  	// If we don't delete the set when zero, indices with high cardinality
   216  	// short lived resources can cause memory to increase over time from
   217  	// unused empty sets. See `kubernetes/kubernetes/issues/84959`.
   218  	if len(set) == 0 {
   219  		delete(index, indexValue)
   220  	}
   221  }
   222  
   223  // threadSafeMap implements ThreadSafeStore
   224  type threadSafeMap struct {
   225  	lock  sync.RWMutex
   226  	items map[string]interface{}
   227  
   228  	// index implements the indexing functionality
   229  	index *storeIndex
   230  }
   231  
   232  func (c *threadSafeMap) Add(key string, obj interface{}) {
   233  	c.Update(key, obj)
   234  }
   235  
   236  func (c *threadSafeMap) Update(key string, obj interface{}) {
   237  	c.lock.Lock()
   238  	defer c.lock.Unlock()
   239  	oldObject := c.items[key]
   240  	c.items[key] = obj
   241  	c.index.updateIndices(oldObject, obj, key)
   242  }
   243  
   244  func (c *threadSafeMap) Delete(key string) {
   245  	c.lock.Lock()
   246  	defer c.lock.Unlock()
   247  	if obj, exists := c.items[key]; exists {
   248  		c.index.updateIndices(obj, nil, key)
   249  		delete(c.items, key)
   250  	}
   251  }
   252  
   253  func (c *threadSafeMap) Get(key string) (item interface{}, exists bool) {
   254  	c.lock.RLock()
   255  	defer c.lock.RUnlock()
   256  	item, exists = c.items[key]
   257  	return item, exists
   258  }
   259  
   260  func (c *threadSafeMap) List() []interface{} {
   261  	c.lock.RLock()
   262  	defer c.lock.RUnlock()
   263  	list := make([]interface{}, 0, len(c.items))
   264  	for _, item := range c.items {
   265  		list = append(list, item)
   266  	}
   267  	return list
   268  }
   269  
   270  // ListKeys returns a list of all the keys of the objects currently
   271  // in the threadSafeMap.
   272  func (c *threadSafeMap) ListKeys() []string {
   273  	c.lock.RLock()
   274  	defer c.lock.RUnlock()
   275  	list := make([]string, 0, len(c.items))
   276  	for key := range c.items {
   277  		list = append(list, key)
   278  	}
   279  	return list
   280  }
   281  
   282  func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion string) {
   283  	c.lock.Lock()
   284  	defer c.lock.Unlock()
   285  	c.items = items
   286  
   287  	// rebuild any index
   288  	c.index.reset()
   289  	for key, item := range c.items {
   290  		c.index.updateIndices(nil, item, key)
   291  	}
   292  }
   293  
   294  // Index returns a list of items that match the given object on the index function.
   295  // Index is thread-safe so long as you treat all items as immutable.
   296  func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{}, error) {
   297  	c.lock.RLock()
   298  	defer c.lock.RUnlock()
   299  
   300  	storeKeySet, err := c.index.getKeysFromIndex(indexName, obj)
   301  	if err != nil {
   302  		return nil, err
   303  	}
   304  
   305  	list := make([]interface{}, 0, storeKeySet.Len())
   306  	for storeKey := range storeKeySet {
   307  		list = append(list, c.items[storeKey])
   308  	}
   309  	return list, nil
   310  }
   311  
   312  // ByIndex returns a list of the items whose indexed values in the given index include the given indexed value
   313  func (c *threadSafeMap) ByIndex(indexName, indexedValue string) ([]interface{}, error) {
   314  	c.lock.RLock()
   315  	defer c.lock.RUnlock()
   316  
   317  	set, err := c.index.getKeysByIndex(indexName, indexedValue)
   318  	if err != nil {
   319  		return nil, err
   320  	}
   321  	list := make([]interface{}, 0, set.Len())
   322  	for key := range set {
   323  		list = append(list, c.items[key])
   324  	}
   325  
   326  	return list, nil
   327  }
   328  
   329  // IndexKeys returns a list of the Store keys of the objects whose indexed values in the given index include the given indexed value.
   330  // IndexKeys is thread-safe so long as you treat all items as immutable.
   331  func (c *threadSafeMap) IndexKeys(indexName, indexedValue string) ([]string, error) {
   332  	c.lock.RLock()
   333  	defer c.lock.RUnlock()
   334  
   335  	set, err := c.index.getKeysByIndex(indexName, indexedValue)
   336  	if err != nil {
   337  		return nil, err
   338  	}
   339  	return set.List(), nil
   340  }
   341  
   342  func (c *threadSafeMap) ListIndexFuncValues(indexName string) []string {
   343  	c.lock.RLock()
   344  	defer c.lock.RUnlock()
   345  
   346  	return c.index.getIndexValues(indexName)
   347  }
   348  
   349  func (c *threadSafeMap) GetIndexers() Indexers {
   350  	return c.index.indexers
   351  }
   352  
   353  func (c *threadSafeMap) AddIndexers(newIndexers Indexers) error {
   354  	c.lock.Lock()
   355  	defer c.lock.Unlock()
   356  
   357  	if err := c.index.addIndexers(newIndexers); err != nil {
   358  		return err
   359  	}
   360  
   361  	// If there are already items, index them
   362  	for key, item := range c.items {
   363  		for name := range newIndexers {
   364  			c.index.updateSingleIndex(name, nil, item, key)
   365  		}
   366  	}
   367  
   368  	return nil
   369  }
   370  
   371  func (c *threadSafeMap) Resync() error {
   372  	// Nothing to do
   373  	return nil
   374  }
   375  
   376  // NewThreadSafeStore creates a new instance of ThreadSafeStore.
   377  func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore {
   378  	return &threadSafeMap{
   379  		items: map[string]interface{}{},
   380  		index: &storeIndex{
   381  			indexers: indexers,
   382  			indices:  indices,
   383  		},
   384  	}
   385  }
   386  

View as plain text