...

Source file src/k8s.io/client-go/tools/cache/delta_fifo.go

Documentation: k8s.io/client-go/tools/cache

     1  /*
     2  Copyright 2014 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package cache
    18  
    19  import (
    20  	"errors"
    21  	"fmt"
    22  	"sync"
    23  	"time"
    24  
    25  	"k8s.io/apimachinery/pkg/util/sets"
    26  
    27  	"k8s.io/klog/v2"
    28  	utiltrace "k8s.io/utils/trace"
    29  )
    30  
    31  // DeltaFIFOOptions is the configuration parameters for DeltaFIFO. All are
    32  // optional.
    33  type DeltaFIFOOptions struct {
    34  
    35  	// KeyFunction is used to figure out what key an object should have. (It's
    36  	// exposed in the returned DeltaFIFO's KeyOf() method, with additional
    37  	// handling around deleted objects and queue state).
    38  	// Optional, the default is MetaNamespaceKeyFunc.
    39  	KeyFunction KeyFunc
    40  
    41  	// KnownObjects is expected to return a list of keys that the consumer of
    42  	// this queue "knows about". It is used to decide which items are missing
    43  	// when Replace() is called; 'Deleted' deltas are produced for the missing items.
    44  	// KnownObjects may be nil if you can tolerate missing deletions on Replace().
    45  	KnownObjects KeyListerGetter
    46  
    47  	// EmitDeltaTypeReplaced indicates that the queue consumer
    48  	// understands the Replaced DeltaType. Before the `Replaced` event type was
    49  	// added, calls to Replace() were handled the same as Sync(). For
    50  	// backwards-compatibility purposes, this is false by default.
    51  	// When true, `Replaced` events will be sent for items passed to a Replace() call.
    52  	// When false, `Sync` events will be sent instead.
    53  	EmitDeltaTypeReplaced bool
    54  
    55  	// If set, will be called for objects before enqueueing them. Please
    56  	// see the comment on TransformFunc for details.
    57  	Transformer TransformFunc
    58  }
    59  
    60  // DeltaFIFO is like FIFO, but differs in two ways.  One is that the
    61  // accumulator associated with a given object's key is not that object
    62  // but rather a Deltas, which is a slice of Delta values for that
    63  // object.  Applying an object to a Deltas means to append a Delta
    64  // except when the potentially appended Delta is a Deleted and the
    65  // Deltas already ends with a Deleted.  In that case the Deltas does
    66  // not grow, although the terminal Deleted will be replaced by the new
    67  // Deleted if the older Deleted's object is a
    68  // DeletedFinalStateUnknown.
    69  //
    70  // The other difference is that DeltaFIFO has two additional ways that
    71  // an object can be applied to an accumulator: Replaced and Sync.
    72  // If EmitDeltaTypeReplaced is not set to true, Sync will be used in
    73  // replace events for backwards compatibility.  Sync is used for periodic
    74  // resync events.
    75  //
    76  // DeltaFIFO is a producer-consumer queue, where a Reflector is
    77  // intended to be the producer, and the consumer is whatever calls
    78  // the Pop() method.
    79  //
    80  // DeltaFIFO solves this use case:
    81  //   - You want to process every object change (delta) at most once.
    82  //   - When you process an object, you want to see everything
    83  //     that's happened to it since you last processed it.
    84  //   - You want to process the deletion of some of the objects.
    85  //   - You might want to periodically reprocess objects.
    86  //
    87  // DeltaFIFO's Pop(), Get(), and GetByKey() methods return
    88  // interface{} to satisfy the Store/Queue interfaces, but they
    89  // will always return an object of type Deltas. List() returns
    90  // the newest object from each accumulator in the FIFO.
    91  //
    92  // A DeltaFIFO's knownObjects KeyListerGetter provides the abilities
    93  // to list Store keys and to get objects by Store key.  The objects in
    94  // question are called "known objects" and this set of objects
    95  // modifies the behavior of the Delete, Replace, and Resync methods
    96  // (each in a different way).
    97  //
    98  // A note on threading: If you call Pop() in parallel from multiple
    99  // threads, you could end up with multiple threads processing slightly
   100  // different versions of the same object.
   101  type DeltaFIFO struct {
   102  	// lock/cond protects access to 'items' and 'queue'.
   103  	lock sync.RWMutex
   104  	cond sync.Cond
   105  
   106  	// `items` maps a key to a Deltas.
   107  	// Each such Deltas has at least one Delta.
   108  	items map[string]Deltas
   109  
   110  	// `queue` maintains FIFO order of keys for consumption in Pop().
   111  	// There are no duplicates in `queue`.
   112  	// A key is in `queue` if and only if it is in `items`.
   113  	queue []string
   114  
   115  	// populated is true if the first batch of items inserted by Replace() has been populated
   116  	// or Delete/Add/Update/AddIfNotPresent was called first.
   117  	populated bool
   118  	// initialPopulationCount is the number of items inserted by the first call of Replace()
   119  	initialPopulationCount int
   120  
   121  	// keyFunc is used to make the key used for queued item
   122  	// insertion and retrieval, and should be deterministic.
   123  	keyFunc KeyFunc
   124  
   125  	// knownObjects list keys that are "known" --- affecting Delete(),
   126  	// Replace(), and Resync()
   127  	knownObjects KeyListerGetter
   128  
   129  	// Used to indicate a queue is closed so a control loop can exit when a queue is empty.
   130  	// Currently, not used to gate any of CRUD operations.
   131  	closed bool
   132  
   133  	// emitDeltaTypeReplaced is whether to emit the Replaced or Sync
   134  	// DeltaType when Replace() is called (to preserve backwards compat).
   135  	emitDeltaTypeReplaced bool
   136  
   137  	// Called with every object if non-nil.
   138  	transformer TransformFunc
   139  }
   140  
   141  // TransformFunc allows for transforming an object before it will be processed.
   142  // TransformFunc (similarly to ResourceEventHandler functions) should be able
   143  // to correctly handle the tombstone of type cache.DeletedFinalStateUnknown.
   144  //
   145  // New in v1.27: In such cases, the contained object will already have gone
   146  // through the transform object separately (when it was added / updated prior
   147  // to the delete), so the TransformFunc can likely safely ignore such objects
   148  // (i.e., just return the input object).
   149  //
   150  // The most common usage pattern is to clean-up some parts of the object to
   151  // reduce component memory usage if a given component doesn't care about them.
   152  //
   153  // New in v1.27: unless the object is a DeletedFinalStateUnknown, TransformFunc
   154  // sees the object before any other actor, and it is now safe to mutate the
   155  // object in place instead of making a copy.
   156  //
   157  // Note that TransformFunc is called while inserting objects into the
   158  // notification queue and is therefore extremely performance sensitive; please
   159  // do not do anything that will take a long time.
   160  type TransformFunc func(interface{}) (interface{}, error)
   161  
   162  // DeltaType is the type of a change (addition, deletion, etc)
   163  type DeltaType string
   164  
   165  // Change type definition
   166  const (
   167  	Added   DeltaType = "Added"
   168  	Updated DeltaType = "Updated"
   169  	Deleted DeltaType = "Deleted"
   170  	// Replaced is emitted when we encountered watch errors and had to do a
   171  	// relist. We don't know if the replaced object has changed.
   172  	//
   173  	// NOTE: Previous versions of DeltaFIFO would use Sync for Replace events
   174  	// as well. Hence, Replaced is only emitted when the option
   175  	// EmitDeltaTypeReplaced is true.
   176  	Replaced DeltaType = "Replaced"
   177  	// Sync is for synthetic events during a periodic resync.
   178  	Sync DeltaType = "Sync"
   179  )
   180  
   181  // Delta is a member of Deltas (a list of Delta objects) which
   182  // in its turn is the type stored by a DeltaFIFO. It tells you what
   183  // change happened, and the object's state after* that change.
   184  //
   185  // [*] Unless the change is a deletion, and then you'll get the final
   186  // state of the object before it was deleted.
   187  type Delta struct {
   188  	Type   DeltaType
   189  	Object interface{}
   190  }
   191  
   192  // Deltas is a list of one or more 'Delta's to an individual object.
   193  // The oldest delta is at index 0, the newest delta is the last one.
   194  type Deltas []Delta
   195  
   196  // NewDeltaFIFO returns a Queue which can be used to process changes to items.
   197  //
   198  // keyFunc is used to figure out what key an object should have. (It is
   199  // exposed in the returned DeltaFIFO's KeyOf() method, with additional handling
   200  // around deleted objects and queue state).
   201  //
   202  // 'knownObjects' may be supplied to modify the behavior of Delete,
   203  // Replace, and Resync.  It may be nil if you do not need those
   204  // modifications.
   205  //
   206  // TODO: consider merging keyLister with this object, tracking a list of
   207  // "known" keys when Pop() is called. Have to think about how that
   208  // affects error retrying.
   209  //
   210  //	NOTE: It is possible to misuse this and cause a race when using an
   211  //	external known object source.
   212  //	Whether there is a potential race depends on how the consumer
   213  //	modifies knownObjects. In Pop(), process function is called under
   214  //	lock, so it is safe to update data structures in it that need to be
   215  //	in sync with the queue (e.g. knownObjects).
   216  //
   217  //	Example:
   218  //	In case of sharedIndexInformer being a consumer
   219  //	(https://github.com/kubernetes/kubernetes/blob/0cdd940f/staging/src/k8s.io/client-go/tools/cache/shared_informer.go#L192),
   220  //	there is no race as knownObjects (s.indexer) is modified safely
   221  //	under DeltaFIFO's lock. The only exceptions are GetStore() and
   222  //	GetIndexer() methods, which expose ways to modify the underlying
   223  //	storage. Currently these two methods are used for creating Lister
   224  //	and internal tests.
   225  //
   226  // Also see the comment on DeltaFIFO.
   227  //
   228  // Warning: This constructs a DeltaFIFO that does not differentiate between
   229  // events caused by a call to Replace (e.g., from a relist, which may
   230  // contain object updates), and synthetic events caused by a periodic resync
   231  // (which just emit the existing object). See https://issue.k8s.io/86015 for details.
   232  //
   233  // Use `NewDeltaFIFOWithOptions(DeltaFIFOOptions{..., EmitDeltaTypeReplaced: true})`
   234  // instead to receive a `Replaced` event depending on the type.
   235  //
   236  // Deprecated: Equivalent to NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: keyFunc, KnownObjects: knownObjects})
   237  func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
   238  	return NewDeltaFIFOWithOptions(DeltaFIFOOptions{
   239  		KeyFunction:  keyFunc,
   240  		KnownObjects: knownObjects,
   241  	})
   242  }
   243  
   244  // NewDeltaFIFOWithOptions returns a Queue which can be used to process changes to
   245  // items. See also the comment on DeltaFIFO.
   246  func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
   247  	if opts.KeyFunction == nil {
   248  		opts.KeyFunction = MetaNamespaceKeyFunc
   249  	}
   250  
   251  	f := &DeltaFIFO{
   252  		items:        map[string]Deltas{},
   253  		queue:        []string{},
   254  		keyFunc:      opts.KeyFunction,
   255  		knownObjects: opts.KnownObjects,
   256  
   257  		emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced,
   258  		transformer:           opts.Transformer,
   259  	}
   260  	f.cond.L = &f.lock
   261  	return f
   262  }
   263  
   264  var (
   265  	_ = Queue(&DeltaFIFO{}) // DeltaFIFO is a Queue
   266  )
   267  
   268  var (
   269  	// ErrZeroLengthDeltasObject is returned in a KeyError if a Deltas
   270  	// object with zero length is encountered (should be impossible,
   271  	// but included for completeness).
   272  	ErrZeroLengthDeltasObject = errors.New("0 length Deltas object; can't get key")
   273  )
   274  
   275  // Close the queue.
   276  func (f *DeltaFIFO) Close() {
   277  	f.lock.Lock()
   278  	defer f.lock.Unlock()
   279  	f.closed = true
   280  	f.cond.Broadcast()
   281  }
   282  
   283  // KeyOf exposes f's keyFunc, but also detects the key of a Deltas object or
   284  // DeletedFinalStateUnknown objects.
   285  func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) {
   286  	if d, ok := obj.(Deltas); ok {
   287  		if len(d) == 0 {
   288  			return "", KeyError{obj, ErrZeroLengthDeltasObject}
   289  		}
   290  		obj = d.Newest().Object
   291  	}
   292  	if d, ok := obj.(DeletedFinalStateUnknown); ok {
   293  		return d.Key, nil
   294  	}
   295  	return f.keyFunc(obj)
   296  }
   297  
   298  // HasSynced returns true if an Add/Update/Delete/AddIfNotPresent are called first,
   299  // or the first batch of items inserted by Replace() has been popped.
   300  func (f *DeltaFIFO) HasSynced() bool {
   301  	f.lock.Lock()
   302  	defer f.lock.Unlock()
   303  	return f.hasSynced_locked()
   304  }
   305  
   306  func (f *DeltaFIFO) hasSynced_locked() bool {
   307  	return f.populated && f.initialPopulationCount == 0
   308  }
   309  
   310  // Add inserts an item, and puts it in the queue. The item is only enqueued
   311  // if it doesn't already exist in the set.
   312  func (f *DeltaFIFO) Add(obj interface{}) error {
   313  	f.lock.Lock()
   314  	defer f.lock.Unlock()
   315  	f.populated = true
   316  	return f.queueActionLocked(Added, obj)
   317  }
   318  
   319  // Update is just like Add, but makes an Updated Delta.
   320  func (f *DeltaFIFO) Update(obj interface{}) error {
   321  	f.lock.Lock()
   322  	defer f.lock.Unlock()
   323  	f.populated = true
   324  	return f.queueActionLocked(Updated, obj)
   325  }
   326  
   327  // Delete is just like Add, but makes a Deleted Delta. If the given
   328  // object does not already exist, it will be ignored. (It may have
   329  // already been deleted by a Replace (re-list), for example.)  In this
   330  // method `f.knownObjects`, if not nil, provides (via GetByKey)
   331  // _additional_ objects that are considered to already exist.
   332  func (f *DeltaFIFO) Delete(obj interface{}) error {
   333  	id, err := f.KeyOf(obj)
   334  	if err != nil {
   335  		return KeyError{obj, err}
   336  	}
   337  	f.lock.Lock()
   338  	defer f.lock.Unlock()
   339  	f.populated = true
   340  	if f.knownObjects == nil {
   341  		if _, exists := f.items[id]; !exists {
   342  			// Presumably, this was deleted when a relist happened.
   343  			// Don't provide a second report of the same deletion.
   344  			return nil
   345  		}
   346  	} else {
   347  		// We only want to skip the "deletion" action if the object doesn't
   348  		// exist in knownObjects and it doesn't have corresponding item in items.
   349  		// Note that even if there is a "deletion" action in items, we can ignore it,
   350  		// because it will be deduped automatically in "queueActionLocked"
   351  		_, exists, err := f.knownObjects.GetByKey(id)
   352  		_, itemsExist := f.items[id]
   353  		if err == nil && !exists && !itemsExist {
   354  			// Presumably, this was deleted when a relist happened.
   355  			// Don't provide a second report of the same deletion.
   356  			return nil
   357  		}
   358  	}
   359  
   360  	// exist in items and/or KnownObjects
   361  	return f.queueActionLocked(Deleted, obj)
   362  }
   363  
   364  // AddIfNotPresent inserts an item, and puts it in the queue. If the item is already
   365  // present in the set, it is neither enqueued nor added to the set.
   366  //
   367  // This is useful in a single producer/consumer scenario so that the consumer can
   368  // safely retry items without contending with the producer and potentially enqueueing
   369  // stale items.
   370  //
   371  // Important: obj must be a Deltas (the output of the Pop() function). Yes, this is
   372  // different from the Add/Update/Delete functions.
   373  func (f *DeltaFIFO) AddIfNotPresent(obj interface{}) error {
   374  	deltas, ok := obj.(Deltas)
   375  	if !ok {
   376  		return fmt.Errorf("object must be of type deltas, but got: %#v", obj)
   377  	}
   378  	id, err := f.KeyOf(deltas)
   379  	if err != nil {
   380  		return KeyError{obj, err}
   381  	}
   382  	f.lock.Lock()
   383  	defer f.lock.Unlock()
   384  	f.addIfNotPresent(id, deltas)
   385  	return nil
   386  }
   387  
   388  // addIfNotPresent inserts deltas under id if it does not exist, and assumes the caller
   389  // already holds the fifo lock.
   390  func (f *DeltaFIFO) addIfNotPresent(id string, deltas Deltas) {
   391  	f.populated = true
   392  	if _, exists := f.items[id]; exists {
   393  		return
   394  	}
   395  
   396  	f.queue = append(f.queue, id)
   397  	f.items[id] = deltas
   398  	f.cond.Broadcast()
   399  }
   400  
   401  // re-listing and watching can deliver the same update multiple times in any
   402  // order. This will combine the most recent two deltas if they are the same.
   403  func dedupDeltas(deltas Deltas) Deltas {
   404  	n := len(deltas)
   405  	if n < 2 {
   406  		return deltas
   407  	}
   408  	a := &deltas[n-1]
   409  	b := &deltas[n-2]
   410  	if out := isDup(a, b); out != nil {
   411  		deltas[n-2] = *out
   412  		return deltas[:n-1]
   413  	}
   414  	return deltas
   415  }
   416  
   417  // If a & b represent the same event, returns the delta that ought to be kept.
   418  // Otherwise, returns nil.
   419  // TODO: is there anything other than deletions that need deduping?
   420  func isDup(a, b *Delta) *Delta {
   421  	if out := isDeletionDup(a, b); out != nil {
   422  		return out
   423  	}
   424  	// TODO: Detect other duplicate situations? Are there any?
   425  	return nil
   426  }
   427  
   428  // keep the one with the most information if both are deletions.
   429  func isDeletionDup(a, b *Delta) *Delta {
   430  	if b.Type != Deleted || a.Type != Deleted {
   431  		return nil
   432  	}
   433  	// Do more sophisticated checks, or is this sufficient?
   434  	if _, ok := b.Object.(DeletedFinalStateUnknown); ok {
   435  		return a
   436  	}
   437  	return b
   438  }
   439  
   440  // queueActionLocked appends to the delta list for the object.
   441  // Caller must lock first.
   442  func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
   443  	id, err := f.KeyOf(obj)
   444  	if err != nil {
   445  		return KeyError{obj, err}
   446  	}
   447  
   448  	// Every object comes through this code path once, so this is a good
   449  	// place to call the transform func.  If obj is a
   450  	// DeletedFinalStateUnknown tombstone, then the containted inner object
   451  	// will already have gone through the transformer, but we document that
   452  	// this can happen. In cases involving Replace(), such an object can
   453  	// come through multiple times.
   454  	if f.transformer != nil {
   455  		var err error
   456  		obj, err = f.transformer(obj)
   457  		if err != nil {
   458  			return err
   459  		}
   460  	}
   461  
   462  	oldDeltas := f.items[id]
   463  	newDeltas := append(oldDeltas, Delta{actionType, obj})
   464  	newDeltas = dedupDeltas(newDeltas)
   465  
   466  	if len(newDeltas) > 0 {
   467  		if _, exists := f.items[id]; !exists {
   468  			f.queue = append(f.queue, id)
   469  		}
   470  		f.items[id] = newDeltas
   471  		f.cond.Broadcast()
   472  	} else {
   473  		// This never happens, because dedupDeltas never returns an empty list
   474  		// when given a non-empty list (as it is here).
   475  		// If somehow it happens anyway, deal with it but complain.
   476  		if oldDeltas == nil {
   477  			klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; ignoring", id, oldDeltas, obj)
   478  			return nil
   479  		}
   480  		klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; breaking invariant by storing empty Deltas", id, oldDeltas, obj)
   481  		f.items[id] = newDeltas
   482  		return fmt.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; broke DeltaFIFO invariant by storing empty Deltas", id, oldDeltas, obj)
   483  	}
   484  	return nil
   485  }
   486  
   487  // List returns a list of all the items; it returns the object
   488  // from the most recent Delta.
   489  // You should treat the items returned inside the deltas as immutable.
   490  func (f *DeltaFIFO) List() []interface{} {
   491  	f.lock.RLock()
   492  	defer f.lock.RUnlock()
   493  	return f.listLocked()
   494  }
   495  
   496  func (f *DeltaFIFO) listLocked() []interface{} {
   497  	list := make([]interface{}, 0, len(f.items))
   498  	for _, item := range f.items {
   499  		list = append(list, item.Newest().Object)
   500  	}
   501  	return list
   502  }
   503  
   504  // ListKeys returns a list of all the keys of the objects currently
   505  // in the FIFO.
   506  func (f *DeltaFIFO) ListKeys() []string {
   507  	f.lock.RLock()
   508  	defer f.lock.RUnlock()
   509  	list := make([]string, 0, len(f.queue))
   510  	for _, key := range f.queue {
   511  		list = append(list, key)
   512  	}
   513  	return list
   514  }
   515  
   516  // Get returns the complete list of deltas for the requested item,
   517  // or sets exists=false.
   518  // You should treat the items returned inside the deltas as immutable.
   519  func (f *DeltaFIFO) Get(obj interface{}) (item interface{}, exists bool, err error) {
   520  	key, err := f.KeyOf(obj)
   521  	if err != nil {
   522  		return nil, false, KeyError{obj, err}
   523  	}
   524  	return f.GetByKey(key)
   525  }
   526  
   527  // GetByKey returns the complete list of deltas for the requested item,
   528  // setting exists=false if that list is empty.
   529  // You should treat the items returned inside the deltas as immutable.
   530  func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
   531  	f.lock.RLock()
   532  	defer f.lock.RUnlock()
   533  	d, exists := f.items[key]
   534  	if exists {
   535  		// Copy item's slice so operations on this slice
   536  		// won't interfere with the object we return.
   537  		d = copyDeltas(d)
   538  	}
   539  	return d, exists, nil
   540  }
   541  
   542  // IsClosed checks if the queue is closed
   543  func (f *DeltaFIFO) IsClosed() bool {
   544  	f.lock.Lock()
   545  	defer f.lock.Unlock()
   546  	return f.closed
   547  }
   548  
   549  // Pop blocks until the queue has some items, and then returns one.  If
   550  // multiple items are ready, they are returned in the order in which they were
   551  // added/updated. The item is removed from the queue (and the store) before it
   552  // is returned, so if you don't successfully process it, you need to add it back
   553  // with AddIfNotPresent().
   554  // process function is called under lock, so it is safe to update data structures
   555  // in it that need to be in sync with the queue (e.g. knownKeys). The PopProcessFunc
   556  // may return an instance of ErrRequeue with a nested error to indicate the current
   557  // item should be requeued (equivalent to calling AddIfNotPresent under the lock).
   558  // process should avoid expensive I/O operation so that other queue operations, i.e.
   559  // Add() and Get(), won't be blocked for too long.
   560  //
   561  // Pop returns a 'Deltas', which has a complete list of all the things
   562  // that happened to the object (deltas) while it was sitting in the queue.
   563  func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
   564  	f.lock.Lock()
   565  	defer f.lock.Unlock()
   566  	for {
   567  		for len(f.queue) == 0 {
   568  			// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
   569  			// When Close() is called, the f.closed is set and the condition is broadcasted.
   570  			// Which causes this loop to continue and return from the Pop().
   571  			if f.closed {
   572  				return nil, ErrFIFOClosed
   573  			}
   574  
   575  			f.cond.Wait()
   576  		}
   577  		isInInitialList := !f.hasSynced_locked()
   578  		id := f.queue[0]
   579  		f.queue = f.queue[1:]
   580  		depth := len(f.queue)
   581  		if f.initialPopulationCount > 0 {
   582  			f.initialPopulationCount--
   583  		}
   584  		item, ok := f.items[id]
   585  		if !ok {
   586  			// This should never happen
   587  			klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id)
   588  			continue
   589  		}
   590  		delete(f.items, id)
   591  		// Only log traces if the queue depth is greater than 10 and it takes more than
   592  		// 100 milliseconds to process one item from the queue.
   593  		// Queue depth never goes high because processing an item is locking the queue,
   594  		// and new items can't be added until processing finish.
   595  		// https://github.com/kubernetes/kubernetes/issues/103789
   596  		if depth > 10 {
   597  			trace := utiltrace.New("DeltaFIFO Pop Process",
   598  				utiltrace.Field{Key: "ID", Value: id},
   599  				utiltrace.Field{Key: "Depth", Value: depth},
   600  				utiltrace.Field{Key: "Reason", Value: "slow event handlers blocking the queue"})
   601  			defer trace.LogIfLong(100 * time.Millisecond)
   602  		}
   603  		err := process(item, isInInitialList)
   604  		if e, ok := err.(ErrRequeue); ok {
   605  			f.addIfNotPresent(id, item)
   606  			err = e.Err
   607  		}
   608  		// Don't need to copyDeltas here, because we're transferring
   609  		// ownership to the caller.
   610  		return item, err
   611  	}
   612  }
   613  
   614  // Replace atomically does two things: (1) it adds the given objects
   615  // using the Sync or Replace DeltaType and then (2) it does some deletions.
   616  // In particular: for every pre-existing key K that is not the key of
   617  // an object in `list` there is the effect of
   618  // `Delete(DeletedFinalStateUnknown{K, O})` where O is the latest known
   619  // object of K. The pre-existing keys are those in the union set of the keys in
   620  // `f.items` and `f.knownObjects` (if not nil). The last known object for key K is
   621  // the one present in the last delta in `f.items`. If there is no delta for K
   622  // in `f.items`, it is the object in `f.knownObjects`
   623  func (f *DeltaFIFO) Replace(list []interface{}, _ string) error {
   624  	f.lock.Lock()
   625  	defer f.lock.Unlock()
   626  	keys := make(sets.String, len(list))
   627  
   628  	// keep backwards compat for old clients
   629  	action := Sync
   630  	if f.emitDeltaTypeReplaced {
   631  		action = Replaced
   632  	}
   633  
   634  	// Add Sync/Replaced action for each new item.
   635  	for _, item := range list {
   636  		key, err := f.KeyOf(item)
   637  		if err != nil {
   638  			return KeyError{item, err}
   639  		}
   640  		keys.Insert(key)
   641  		if err := f.queueActionLocked(action, item); err != nil {
   642  			return fmt.Errorf("couldn't enqueue object: %v", err)
   643  		}
   644  	}
   645  
   646  	// Do deletion detection against objects in the queue
   647  	queuedDeletions := 0
   648  	for k, oldItem := range f.items {
   649  		if keys.Has(k) {
   650  			continue
   651  		}
   652  		// Delete pre-existing items not in the new list.
   653  		// This could happen if watch deletion event was missed while
   654  		// disconnected from apiserver.
   655  		var deletedObj interface{}
   656  		if n := oldItem.Newest(); n != nil {
   657  			deletedObj = n.Object
   658  
   659  			// if the previous object is a DeletedFinalStateUnknown, we have to extract the actual Object
   660  			if d, ok := deletedObj.(DeletedFinalStateUnknown); ok {
   661  				deletedObj = d.Obj
   662  			}
   663  		}
   664  		queuedDeletions++
   665  		if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
   666  			return err
   667  		}
   668  	}
   669  
   670  	if f.knownObjects != nil {
   671  		// Detect deletions for objects not present in the queue, but present in KnownObjects
   672  		knownKeys := f.knownObjects.ListKeys()
   673  		for _, k := range knownKeys {
   674  			if keys.Has(k) {
   675  				continue
   676  			}
   677  			if len(f.items[k]) > 0 {
   678  				continue
   679  			}
   680  
   681  			deletedObj, exists, err := f.knownObjects.GetByKey(k)
   682  			if err != nil {
   683  				deletedObj = nil
   684  				klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
   685  			} else if !exists {
   686  				deletedObj = nil
   687  				klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
   688  			}
   689  			queuedDeletions++
   690  			if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
   691  				return err
   692  			}
   693  		}
   694  	}
   695  
   696  	if !f.populated {
   697  		f.populated = true
   698  		f.initialPopulationCount = keys.Len() + queuedDeletions
   699  	}
   700  
   701  	return nil
   702  }
   703  
   704  // Resync adds, with a Sync type of Delta, every object listed by
   705  // `f.knownObjects` whose key is not already queued for processing.
   706  // If `f.knownObjects` is `nil` then Resync does nothing.
   707  func (f *DeltaFIFO) Resync() error {
   708  	f.lock.Lock()
   709  	defer f.lock.Unlock()
   710  
   711  	if f.knownObjects == nil {
   712  		return nil
   713  	}
   714  
   715  	keys := f.knownObjects.ListKeys()
   716  	for _, k := range keys {
   717  		if err := f.syncKeyLocked(k); err != nil {
   718  			return err
   719  		}
   720  	}
   721  	return nil
   722  }
   723  
   724  func (f *DeltaFIFO) syncKeyLocked(key string) error {
   725  	obj, exists, err := f.knownObjects.GetByKey(key)
   726  	if err != nil {
   727  		klog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, key)
   728  		return nil
   729  	} else if !exists {
   730  		klog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", key)
   731  		return nil
   732  	}
   733  
   734  	// If we are doing Resync() and there is already an event queued for that object,
   735  	// we ignore the Resync for it. This is to avoid the race, in which the resync
   736  	// comes with the previous value of object (since queueing an event for the object
   737  	// doesn't trigger changing the underlying store <knownObjects>.
   738  	id, err := f.KeyOf(obj)
   739  	if err != nil {
   740  		return KeyError{obj, err}
   741  	}
   742  	if len(f.items[id]) > 0 {
   743  		return nil
   744  	}
   745  
   746  	if err := f.queueActionLocked(Sync, obj); err != nil {
   747  		return fmt.Errorf("couldn't queue object: %v", err)
   748  	}
   749  	return nil
   750  }
   751  
   752  // A KeyListerGetter is anything that knows how to list its keys and look up by key.
   753  type KeyListerGetter interface {
   754  	KeyLister
   755  	KeyGetter
   756  }
   757  
   758  // A KeyLister is anything that knows how to list its keys.
   759  type KeyLister interface {
   760  	ListKeys() []string
   761  }
   762  
   763  // A KeyGetter is anything that knows how to get the value stored under a given key.
   764  type KeyGetter interface {
   765  	// GetByKey returns the value associated with the key, or sets exists=false.
   766  	GetByKey(key string) (value interface{}, exists bool, err error)
   767  }
   768  
   769  // Oldest is a convenience function that returns the oldest delta, or
   770  // nil if there are no deltas.
   771  func (d Deltas) Oldest() *Delta {
   772  	if len(d) > 0 {
   773  		return &d[0]
   774  	}
   775  	return nil
   776  }
   777  
   778  // Newest is a convenience function that returns the newest delta, or
   779  // nil if there are no deltas.
   780  func (d Deltas) Newest() *Delta {
   781  	if n := len(d); n > 0 {
   782  		return &d[n-1]
   783  	}
   784  	return nil
   785  }
   786  
   787  // copyDeltas returns a shallow copy of d; that is, it copies the slice but not
   788  // the objects in the slice. This allows Get/List to return an object that we
   789  // know won't be clobbered by a subsequent modifications.
   790  func copyDeltas(d Deltas) Deltas {
   791  	d2 := make(Deltas, len(d))
   792  	copy(d2, d)
   793  	return d2
   794  }
   795  
   796  // DeletedFinalStateUnknown is placed into a DeltaFIFO in the case where an object
   797  // was deleted but the watch deletion event was missed while disconnected from
   798  // apiserver. In this case we don't know the final "resting" state of the object, so
   799  // there's a chance the included `Obj` is stale.
   800  type DeletedFinalStateUnknown struct {
   801  	Key string
   802  	Obj interface{}
   803  }
   804  

View as plain text