...

Source file src/k8s.io/client-go/tools/cache/shared_informer.go

Documentation: k8s.io/client-go/tools/cache

     1  /*
     2  Copyright 2015 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package cache
    18  
    19  import (
    20  	"errors"
    21  	"fmt"
    22  	"sync"
    23  	"time"
    24  
    25  	"k8s.io/apimachinery/pkg/api/meta"
    26  	"k8s.io/apimachinery/pkg/runtime"
    27  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    28  	"k8s.io/apimachinery/pkg/util/wait"
    29  	"k8s.io/client-go/tools/cache/synctrack"
    30  	"k8s.io/utils/buffer"
    31  	"k8s.io/utils/clock"
    32  
    33  	"k8s.io/klog/v2"
    34  
    35  	clientgofeaturegate "k8s.io/client-go/features"
    36  )
    37  
    38  // SharedInformer provides eventually consistent linkage of its
    39  // clients to the authoritative state of a given collection of
    40  // objects.  An object is identified by its API group, kind/resource,
    41  // namespace (if any), and name; the `ObjectMeta.UID` is not part of
    42  // an object's ID as far as this contract is concerned.  One
    43  // SharedInformer provides linkage to objects of a particular API
    44  // group and kind/resource.  The linked object collection of a
    45  // SharedInformer may be further restricted to one namespace (if
    46  // applicable) and/or by label selector and/or field selector.
    47  //
    48  // The authoritative state of an object is what apiservers provide
    49  // access to, and an object goes through a strict sequence of states.
    50  // An object state is either (1) present with a ResourceVersion and
    51  // other appropriate content or (2) "absent".
    52  //
    53  // A SharedInformer maintains a local cache --- exposed by GetStore(),
    54  // by GetIndexer() in the case of an indexed informer, and possibly by
    55  // machinery involved in creating and/or accessing the informer --- of
    56  // the state of each relevant object.  This cache is eventually
    57  // consistent with the authoritative state.  This means that, unless
    58  // prevented by persistent communication problems, if ever a
    59  // particular object ID X is authoritatively associated with a state S
    60  // then for every SharedInformer I whose collection includes (X, S)
    61  // eventually either (1) I's cache associates X with S or a later
    62  // state of X, (2) I is stopped, or (3) the authoritative state
    63  // service for X terminates.  To be formally complete, we say that the
    64  // absent state meets any restriction by label selector or field
    65  // selector.
    66  //
    67  // For a given informer and relevant object ID X, the sequence of
    68  // states that appears in the informer's cache is a subsequence of the
    69  // states authoritatively associated with X.  That is, some states
    70  // might never appear in the cache but ordering among the appearing
    71  // states is correct.  Note, however, that there is no promise about
    72  // ordering between states seen for different objects.
    73  //
    74  // The local cache starts out empty, and gets populated and updated
    75  // during `Run()`.
    76  //
    77  // As a simple example, if a collection of objects is henceforth
    78  // unchanging, a SharedInformer is created that links to that
    79  // collection, and that SharedInformer is `Run()` then that
    80  // SharedInformer's cache eventually holds an exact copy of that
    81  // collection (unless it is stopped too soon, the authoritative state
    82  // service ends, or communication problems between the two
    83  // persistently thwart achievement).
    84  //
    85  // As another simple example, if the local cache ever holds a
    86  // non-absent state for some object ID and the object is eventually
    87  // removed from the authoritative state then eventually the object is
    88  // removed from the local cache (unless the SharedInformer is stopped
    89  // too soon, the authoritative state service ends, or communication
    90  // problems persistently thwart the desired result).
    91  //
    92  // The keys in the Store are of the form namespace/name for namespaced
    93  // objects, and are simply the name for non-namespaced objects.
    94  // Clients can use `MetaNamespaceKeyFunc(obj)` to extract the key for
    95  // a given object, and `SplitMetaNamespaceKey(key)` to split a key
    96  // into its constituent parts.
    97  //
    98  // Every query against the local cache is answered entirely from one
    99  // snapshot of the cache's state.  Thus, the result of a `List` call
   100  // will not contain two entries with the same namespace and name.
   101  //
   102  // A client is identified here by a ResourceEventHandler.  For every
   103  // update to the SharedInformer's local cache and for every client
   104  // added before `Run()`, eventually either the SharedInformer is
   105  // stopped or the client is notified of the update.  A client added
   106  // after `Run()` starts gets a startup batch of notifications of
   107  // additions of the objects existing in the cache at the time that
   108  // client was added; also, for every update to the SharedInformer's
   109  // local cache after that client was added, eventually either the
   110  // SharedInformer is stopped or that client is notified of that
   111  // update.  Client notifications happen after the corresponding cache
   112  // update and, in the case of a SharedIndexInformer, after the
   113  // corresponding index updates.  It is possible that additional cache
   114  // and index updates happen before such a prescribed notification.
   115  // For a given SharedInformer and client, the notifications are
   116  // delivered sequentially.  For a given SharedInformer, client, and
   117  // object ID, the notifications are delivered in order.  Because
   118  // `ObjectMeta.UID` has no role in identifying objects, it is possible
   119  // that when (1) object O1 with ID (e.g. namespace and name) X and
   120  // `ObjectMeta.UID` U1 in the SharedInformer's local cache is deleted
   121  // and later (2) another object O2 with ID X and ObjectMeta.UID U2 is
   122  // created the informer's clients are not notified of (1) and (2) but
   123  // rather are notified only of an update from O1 to O2. Clients that
   124  // need to detect such cases might do so by comparing the `ObjectMeta.UID`
   125  // field of the old and the new object in the code that handles update
   126  // notifications (i.e. `OnUpdate` method of ResourceEventHandler).
   127  //
   128  // A client must process each notification promptly; a SharedInformer
   129  // is not engineered to deal well with a large backlog of
   130  // notifications to deliver.  Lengthy processing should be passed off
   131  // to something else, for example through a
   132  // `client-go/util/workqueue`.
   133  //
   134  // A delete notification exposes the last locally known non-absent
   135  // state, except that its ResourceVersion is replaced with a
   136  // ResourceVersion in which the object is actually absent.
   137  type SharedInformer interface {
   138  	// AddEventHandler adds an event handler to the shared informer using
   139  	// the shared informer's resync period.  Events to a single handler are
   140  	// delivered sequentially, but there is no coordination between
   141  	// different handlers.
   142  	// It returns a registration handle for the handler that can be used to
   143  	// remove the handler again, or to tell if the handler is synced (has
   144  	// seen every item in the initial list).
   145  	AddEventHandler(handler ResourceEventHandler) (ResourceEventHandlerRegistration, error)
   146  	// AddEventHandlerWithResyncPeriod adds an event handler to the
   147  	// shared informer with the requested resync period; zero means
   148  	// this handler does not care about resyncs.  The resync operation
   149  	// consists of delivering to the handler an update notification
   150  	// for every object in the informer's local cache; it does not add
   151  	// any interactions with the authoritative storage.  Some
   152  	// informers do no resyncs at all, not even for handlers added
   153  	// with a non-zero resyncPeriod.  For an informer that does
   154  	// resyncs, and for each handler that requests resyncs, that
   155  	// informer develops a nominal resync period that is no shorter
   156  	// than the requested period but may be longer.  The actual time
   157  	// between any two resyncs may be longer than the nominal period
   158  	// because the implementation takes time to do work and there may
   159  	// be competing load and scheduling noise.
   160  	// It returns a registration handle for the handler that can be used to remove
   161  	// the handler again and an error if the handler cannot be added.
   162  	AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) (ResourceEventHandlerRegistration, error)
   163  	// RemoveEventHandler removes a formerly added event handler given by
   164  	// its registration handle.
   165  	// This function is guaranteed to be idempotent, and thread-safe.
   166  	RemoveEventHandler(handle ResourceEventHandlerRegistration) error
   167  	// GetStore returns the informer's local cache as a Store.
   168  	GetStore() Store
   169  	// GetController is deprecated, it does nothing useful
   170  	GetController() Controller
   171  	// Run starts and runs the shared informer, returning after it stops.
   172  	// The informer will be stopped when stopCh is closed.
   173  	Run(stopCh <-chan struct{})
   174  	// HasSynced returns true if the shared informer's store has been
   175  	// informed by at least one full LIST of the authoritative state
   176  	// of the informer's object collection.  This is unrelated to "resync".
   177  	//
   178  	// Note that this doesn't tell you if an individual handler is synced!!
   179  	// For that, please call HasSynced on the handle returned by
   180  	// AddEventHandler.
   181  	HasSynced() bool
   182  	// LastSyncResourceVersion is the resource version observed when last synced with the underlying
   183  	// store. The value returned is not synchronized with access to the underlying store and is not
   184  	// thread-safe.
   185  	LastSyncResourceVersion() string
   186  
   187  	// The WatchErrorHandler is called whenever ListAndWatch drops the
   188  	// connection with an error. After calling this handler, the informer
   189  	// will backoff and retry.
   190  	//
   191  	// The default implementation looks at the error type and tries to log
   192  	// the error message at an appropriate level.
   193  	//
   194  	// There's only one handler, so if you call this multiple times, last one
   195  	// wins; calling after the informer has been started returns an error.
   196  	//
   197  	// The handler is intended for visibility, not to e.g. pause the consumers.
   198  	// The handler should return quickly - any expensive processing should be
   199  	// offloaded.
   200  	SetWatchErrorHandler(handler WatchErrorHandler) error
   201  
   202  	// The TransformFunc is called for each object which is about to be stored.
   203  	//
   204  	// This function is intended for you to take the opportunity to
   205  	// remove, transform, or normalize fields. One use case is to strip unused
   206  	// metadata fields out of objects to save on RAM cost.
   207  	//
   208  	// Must be set before starting the informer.
   209  	//
   210  	// Please see the comment on TransformFunc for more details.
   211  	SetTransform(handler TransformFunc) error
   212  
   213  	// IsStopped reports whether the informer has already been stopped.
   214  	// Adding event handlers to already stopped informers is not possible.
   215  	// An informer already stopped will never be started again.
   216  	IsStopped() bool
   217  }
   218  
   219  // Opaque interface representing the registration of ResourceEventHandler for
   220  // a SharedInformer. Must be supplied back to the same SharedInformer's
   221  // `RemoveEventHandler` to unregister the handlers.
   222  //
   223  // Also used to tell if the handler is synced (has had all items in the initial
   224  // list delivered).
   225  type ResourceEventHandlerRegistration interface {
   226  	// HasSynced reports if both the parent has synced and all pre-sync
   227  	// events have been delivered.
   228  	HasSynced() bool
   229  }
   230  
   231  // SharedIndexInformer provides add and get Indexers ability based on SharedInformer.
   232  type SharedIndexInformer interface {
   233  	SharedInformer
   234  	// AddIndexers add indexers to the informer before it starts.
   235  	AddIndexers(indexers Indexers) error
   236  	GetIndexer() Indexer
   237  }
   238  
   239  // NewSharedInformer creates a new instance for the ListerWatcher. See NewSharedIndexInformerWithOptions for full details.
   240  func NewSharedInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration) SharedInformer {
   241  	return NewSharedIndexInformer(lw, exampleObject, defaultEventHandlerResyncPeriod, Indexers{})
   242  }
   243  
   244  // NewSharedIndexInformer creates a new instance for the ListerWatcher and specified Indexers. See
   245  // NewSharedIndexInformerWithOptions for full details.
   246  func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
   247  	return NewSharedIndexInformerWithOptions(
   248  		lw,
   249  		exampleObject,
   250  		SharedIndexInformerOptions{
   251  			ResyncPeriod: defaultEventHandlerResyncPeriod,
   252  			Indexers:     indexers,
   253  		},
   254  	)
   255  }
   256  
   257  // NewSharedIndexInformerWithOptions creates a new instance for the ListerWatcher.
   258  // The created informer will not do resyncs if options.ResyncPeriod is zero.  Otherwise: for each
   259  // handler that with a non-zero requested resync period, whether added
   260  // before or after the informer starts, the nominal resync period is
   261  // the requested resync period rounded up to a multiple of the
   262  // informer's resync checking period.  Such an informer's resync
   263  // checking period is established when the informer starts running,
   264  // and is the maximum of (a) the minimum of the resync periods
   265  // requested before the informer starts and the
   266  // options.ResyncPeriod given here and (b) the constant
   267  // `minimumResyncPeriod` defined in this file.
   268  func NewSharedIndexInformerWithOptions(lw ListerWatcher, exampleObject runtime.Object, options SharedIndexInformerOptions) SharedIndexInformer {
   269  	realClock := &clock.RealClock{}
   270  
   271  	return &sharedIndexInformer{
   272  		indexer:                         NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, options.Indexers),
   273  		processor:                       &sharedProcessor{clock: realClock},
   274  		listerWatcher:                   lw,
   275  		objectType:                      exampleObject,
   276  		objectDescription:               options.ObjectDescription,
   277  		resyncCheckPeriod:               options.ResyncPeriod,
   278  		defaultEventHandlerResyncPeriod: options.ResyncPeriod,
   279  		clock:                           realClock,
   280  		cacheMutationDetector:           NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
   281  	}
   282  }
   283  
   284  // SharedIndexInformerOptions configures a sharedIndexInformer.
   285  type SharedIndexInformerOptions struct {
   286  	// ResyncPeriod is the default event handler resync period and resync check
   287  	// period. If unset/unspecified, these are defaulted to 0 (do not resync).
   288  	ResyncPeriod time.Duration
   289  
   290  	// Indexers is the sharedIndexInformer's indexers. If unset/unspecified, no indexers are configured.
   291  	Indexers Indexers
   292  
   293  	// ObjectDescription is the sharedIndexInformer's object description. This is passed through to the
   294  	// underlying Reflector's type description.
   295  	ObjectDescription string
   296  }
   297  
   298  // InformerSynced is a function that can be used to determine if an informer has synced.  This is useful for determining if caches have synced.
   299  type InformerSynced func() bool
   300  
   301  const (
   302  	// syncedPollPeriod controls how often you look at the status of your sync funcs
   303  	syncedPollPeriod = 100 * time.Millisecond
   304  
   305  	// initialBufferSize is the initial number of event notifications that can be buffered.
   306  	initialBufferSize = 1024
   307  )
   308  
   309  // WaitForNamedCacheSync is a wrapper around WaitForCacheSync that generates log messages
   310  // indicating that the caller identified by name is waiting for syncs, followed by
   311  // either a successful or failed sync.
   312  func WaitForNamedCacheSync(controllerName string, stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool {
   313  	klog.Infof("Waiting for caches to sync for %s", controllerName)
   314  
   315  	if !WaitForCacheSync(stopCh, cacheSyncs...) {
   316  		utilruntime.HandleError(fmt.Errorf("unable to sync caches for %s", controllerName))
   317  		return false
   318  	}
   319  
   320  	klog.Infof("Caches are synced for %s", controllerName)
   321  	return true
   322  }
   323  
   324  // WaitForCacheSync waits for caches to populate.  It returns true if it was successful, false
   325  // if the controller should shutdown
   326  // callers should prefer WaitForNamedCacheSync()
   327  func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool {
   328  	err := wait.PollImmediateUntil(syncedPollPeriod,
   329  		func() (bool, error) {
   330  			for _, syncFunc := range cacheSyncs {
   331  				if !syncFunc() {
   332  					return false, nil
   333  				}
   334  			}
   335  			return true, nil
   336  		},
   337  		stopCh)
   338  	if err != nil {
   339  		return false
   340  	}
   341  
   342  	return true
   343  }
   344  
   345  // `*sharedIndexInformer` implements SharedIndexInformer and has three
   346  // main components.  One is an indexed local cache, `indexer Indexer`.
   347  // The second main component is a Controller that pulls
   348  // objects/notifications using the ListerWatcher and pushes them into
   349  // a DeltaFIFO --- whose knownObjects is the informer's local cache
   350  // --- while concurrently Popping Deltas values from that fifo and
   351  // processing them with `sharedIndexInformer::HandleDeltas`.  Each
   352  // invocation of HandleDeltas, which is done with the fifo's lock
   353  // held, processes each Delta in turn.  For each Delta this both
   354  // updates the local cache and stuffs the relevant notification into
   355  // the sharedProcessor.  The third main component is that
   356  // sharedProcessor, which is responsible for relaying those
   357  // notifications to each of the informer's clients.
   358  type sharedIndexInformer struct {
   359  	indexer    Indexer
   360  	controller Controller
   361  
   362  	processor             *sharedProcessor
   363  	cacheMutationDetector MutationDetector
   364  
   365  	listerWatcher ListerWatcher
   366  
   367  	// objectType is an example object of the type this informer is expected to handle. If set, an event
   368  	// with an object with a mismatching type is dropped instead of being delivered to listeners.
   369  	objectType runtime.Object
   370  
   371  	// objectDescription is the description of this informer's objects. This typically defaults to
   372  	objectDescription string
   373  
   374  	// resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call
   375  	// shouldResync to check if any of our listeners need a resync.
   376  	resyncCheckPeriod time.Duration
   377  	// defaultEventHandlerResyncPeriod is the default resync period for any handlers added via
   378  	// AddEventHandler (i.e. they don't specify one and just want to use the shared informer's default
   379  	// value).
   380  	defaultEventHandlerResyncPeriod time.Duration
   381  	// clock allows for testability
   382  	clock clock.Clock
   383  
   384  	started, stopped bool
   385  	startedLock      sync.Mutex
   386  
   387  	// blockDeltas gives a way to stop all event distribution so that a late event handler
   388  	// can safely join the shared informer.
   389  	blockDeltas sync.Mutex
   390  
   391  	// Called whenever the ListAndWatch drops the connection with an error.
   392  	watchErrorHandler WatchErrorHandler
   393  
   394  	transform TransformFunc
   395  }
   396  
   397  // dummyController hides the fact that a SharedInformer is different from a dedicated one
   398  // where a caller can `Run`.  The run method is disconnected in this case, because higher
   399  // level logic will decide when to start the SharedInformer and related controller.
   400  // Because returning information back is always asynchronous, the legacy callers shouldn't
   401  // notice any change in behavior.
   402  type dummyController struct {
   403  	informer *sharedIndexInformer
   404  }
   405  
   406  func (v *dummyController) Run(stopCh <-chan struct{}) {
   407  }
   408  
   409  func (v *dummyController) HasSynced() bool {
   410  	return v.informer.HasSynced()
   411  }
   412  
   413  func (v *dummyController) LastSyncResourceVersion() string {
   414  	if clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InformerResourceVersion) {
   415  		return v.informer.LastSyncResourceVersion()
   416  	}
   417  
   418  	return ""
   419  }
   420  
   421  type updateNotification struct {
   422  	oldObj interface{}
   423  	newObj interface{}
   424  }
   425  
   426  type addNotification struct {
   427  	newObj          interface{}
   428  	isInInitialList bool
   429  }
   430  
   431  type deleteNotification struct {
   432  	oldObj interface{}
   433  }
   434  
   435  func (s *sharedIndexInformer) SetWatchErrorHandler(handler WatchErrorHandler) error {
   436  	s.startedLock.Lock()
   437  	defer s.startedLock.Unlock()
   438  
   439  	if s.started {
   440  		return fmt.Errorf("informer has already started")
   441  	}
   442  
   443  	s.watchErrorHandler = handler
   444  	return nil
   445  }
   446  
   447  func (s *sharedIndexInformer) SetTransform(handler TransformFunc) error {
   448  	s.startedLock.Lock()
   449  	defer s.startedLock.Unlock()
   450  
   451  	if s.started {
   452  		return fmt.Errorf("informer has already started")
   453  	}
   454  
   455  	s.transform = handler
   456  	return nil
   457  }
   458  
   459  func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
   460  	defer utilruntime.HandleCrash()
   461  
   462  	if s.HasStarted() {
   463  		klog.Warningf("The sharedIndexInformer has started, run more than once is not allowed")
   464  		return
   465  	}
   466  
   467  	func() {
   468  		s.startedLock.Lock()
   469  		defer s.startedLock.Unlock()
   470  
   471  		fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
   472  			KnownObjects:          s.indexer,
   473  			EmitDeltaTypeReplaced: true,
   474  			Transformer:           s.transform,
   475  		})
   476  
   477  		cfg := &Config{
   478  			Queue:             fifo,
   479  			ListerWatcher:     s.listerWatcher,
   480  			ObjectType:        s.objectType,
   481  			ObjectDescription: s.objectDescription,
   482  			FullResyncPeriod:  s.resyncCheckPeriod,
   483  			RetryOnError:      false,
   484  			ShouldResync:      s.processor.shouldResync,
   485  
   486  			Process:           s.HandleDeltas,
   487  			WatchErrorHandler: s.watchErrorHandler,
   488  		}
   489  
   490  		s.controller = New(cfg)
   491  		s.controller.(*controller).clock = s.clock
   492  		s.started = true
   493  	}()
   494  
   495  	// Separate stop channel because Processor should be stopped strictly after controller
   496  	processorStopCh := make(chan struct{})
   497  	var wg wait.Group
   498  	defer wg.Wait()              // Wait for Processor to stop
   499  	defer close(processorStopCh) // Tell Processor to stop
   500  	wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
   501  	wg.StartWithChannel(processorStopCh, s.processor.run)
   502  
   503  	defer func() {
   504  		s.startedLock.Lock()
   505  		defer s.startedLock.Unlock()
   506  		s.stopped = true // Don't want any new listeners
   507  	}()
   508  	s.controller.Run(stopCh)
   509  }
   510  
   511  func (s *sharedIndexInformer) HasStarted() bool {
   512  	s.startedLock.Lock()
   513  	defer s.startedLock.Unlock()
   514  	return s.started
   515  }
   516  
   517  func (s *sharedIndexInformer) HasSynced() bool {
   518  	s.startedLock.Lock()
   519  	defer s.startedLock.Unlock()
   520  
   521  	if s.controller == nil {
   522  		return false
   523  	}
   524  	return s.controller.HasSynced()
   525  }
   526  
   527  func (s *sharedIndexInformer) LastSyncResourceVersion() string {
   528  	s.startedLock.Lock()
   529  	defer s.startedLock.Unlock()
   530  
   531  	if s.controller == nil {
   532  		return ""
   533  	}
   534  	return s.controller.LastSyncResourceVersion()
   535  }
   536  
   537  func (s *sharedIndexInformer) GetStore() Store {
   538  	return s.indexer
   539  }
   540  
   541  func (s *sharedIndexInformer) GetIndexer() Indexer {
   542  	return s.indexer
   543  }
   544  
   545  func (s *sharedIndexInformer) AddIndexers(indexers Indexers) error {
   546  	s.startedLock.Lock()
   547  	defer s.startedLock.Unlock()
   548  
   549  	if s.stopped {
   550  		return fmt.Errorf("indexer was not added because it has stopped already")
   551  	}
   552  
   553  	return s.indexer.AddIndexers(indexers)
   554  }
   555  
   556  func (s *sharedIndexInformer) GetController() Controller {
   557  	return &dummyController{informer: s}
   558  }
   559  
   560  func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) (ResourceEventHandlerRegistration, error) {
   561  	return s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod)
   562  }
   563  
   564  func determineResyncPeriod(desired, check time.Duration) time.Duration {
   565  	if desired == 0 {
   566  		return desired
   567  	}
   568  	if check == 0 {
   569  		klog.Warningf("The specified resyncPeriod %v is invalid because this shared informer doesn't support resyncing", desired)
   570  		return 0
   571  	}
   572  	if desired < check {
   573  		klog.Warningf("The specified resyncPeriod %v is being increased to the minimum resyncCheckPeriod %v", desired, check)
   574  		return check
   575  	}
   576  	return desired
   577  }
   578  
   579  const minimumResyncPeriod = 1 * time.Second
   580  
   581  func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) (ResourceEventHandlerRegistration, error) {
   582  	s.startedLock.Lock()
   583  	defer s.startedLock.Unlock()
   584  
   585  	if s.stopped {
   586  		return nil, fmt.Errorf("handler %v was not added to shared informer because it has stopped already", handler)
   587  	}
   588  
   589  	if resyncPeriod > 0 {
   590  		if resyncPeriod < minimumResyncPeriod {
   591  			klog.Warningf("resyncPeriod %v is too small. Changing it to the minimum allowed value of %v", resyncPeriod, minimumResyncPeriod)
   592  			resyncPeriod = minimumResyncPeriod
   593  		}
   594  
   595  		if resyncPeriod < s.resyncCheckPeriod {
   596  			if s.started {
   597  				klog.Warningf("resyncPeriod %v is smaller than resyncCheckPeriod %v and the informer has already started. Changing it to %v", resyncPeriod, s.resyncCheckPeriod, s.resyncCheckPeriod)
   598  				resyncPeriod = s.resyncCheckPeriod
   599  			} else {
   600  				// if the event handler's resyncPeriod is smaller than the current resyncCheckPeriod, update
   601  				// resyncCheckPeriod to match resyncPeriod and adjust the resync periods of all the listeners
   602  				// accordingly
   603  				s.resyncCheckPeriod = resyncPeriod
   604  				s.processor.resyncCheckPeriodChanged(resyncPeriod)
   605  			}
   606  		}
   607  	}
   608  
   609  	listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize, s.HasSynced)
   610  
   611  	if !s.started {
   612  		return s.processor.addListener(listener), nil
   613  	}
   614  
   615  	// in order to safely join, we have to
   616  	// 1. stop sending add/update/delete notifications
   617  	// 2. do a list against the store
   618  	// 3. send synthetic "Add" events to the new handler
   619  	// 4. unblock
   620  	s.blockDeltas.Lock()
   621  	defer s.blockDeltas.Unlock()
   622  
   623  	handle := s.processor.addListener(listener)
   624  	for _, item := range s.indexer.List() {
   625  		// Note that we enqueue these notifications with the lock held
   626  		// and before returning the handle. That means there is never a
   627  		// chance for anyone to call the handle's HasSynced method in a
   628  		// state when it would falsely return true (i.e., when the
   629  		// shared informer is synced but it has not observed an Add
   630  		// with isInitialList being true, nor when the thread
   631  		// processing notifications somehow goes faster than this
   632  		// thread adding them and the counter is temporarily zero).
   633  		listener.add(addNotification{newObj: item, isInInitialList: true})
   634  	}
   635  	return handle, nil
   636  }
   637  
   638  func (s *sharedIndexInformer) HandleDeltas(obj interface{}, isInInitialList bool) error {
   639  	s.blockDeltas.Lock()
   640  	defer s.blockDeltas.Unlock()
   641  
   642  	if deltas, ok := obj.(Deltas); ok {
   643  		return processDeltas(s, s.indexer, deltas, isInInitialList)
   644  	}
   645  	return errors.New("object given as Process argument is not Deltas")
   646  }
   647  
   648  // Conforms to ResourceEventHandler
   649  func (s *sharedIndexInformer) OnAdd(obj interface{}, isInInitialList bool) {
   650  	// Invocation of this function is locked under s.blockDeltas, so it is
   651  	// save to distribute the notification
   652  	s.cacheMutationDetector.AddObject(obj)
   653  	s.processor.distribute(addNotification{newObj: obj, isInInitialList: isInInitialList}, false)
   654  }
   655  
   656  // Conforms to ResourceEventHandler
   657  func (s *sharedIndexInformer) OnUpdate(old, new interface{}) {
   658  	isSync := false
   659  
   660  	// If is a Sync event, isSync should be true
   661  	// If is a Replaced event, isSync is true if resource version is unchanged.
   662  	// If RV is unchanged: this is a Sync/Replaced event, so isSync is true
   663  
   664  	if accessor, err := meta.Accessor(new); err == nil {
   665  		if oldAccessor, err := meta.Accessor(old); err == nil {
   666  			// Events that didn't change resourceVersion are treated as resync events
   667  			// and only propagated to listeners that requested resync
   668  			isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
   669  		}
   670  	}
   671  
   672  	// Invocation of this function is locked under s.blockDeltas, so it is
   673  	// save to distribute the notification
   674  	s.cacheMutationDetector.AddObject(new)
   675  	s.processor.distribute(updateNotification{oldObj: old, newObj: new}, isSync)
   676  }
   677  
   678  // Conforms to ResourceEventHandler
   679  func (s *sharedIndexInformer) OnDelete(old interface{}) {
   680  	// Invocation of this function is locked under s.blockDeltas, so it is
   681  	// save to distribute the notification
   682  	s.processor.distribute(deleteNotification{oldObj: old}, false)
   683  }
   684  
   685  // IsStopped reports whether the informer has already been stopped
   686  func (s *sharedIndexInformer) IsStopped() bool {
   687  	s.startedLock.Lock()
   688  	defer s.startedLock.Unlock()
   689  	return s.stopped
   690  }
   691  
   692  func (s *sharedIndexInformer) RemoveEventHandler(handle ResourceEventHandlerRegistration) error {
   693  	s.startedLock.Lock()
   694  	defer s.startedLock.Unlock()
   695  
   696  	// in order to safely remove, we have to
   697  	// 1. stop sending add/update/delete notifications
   698  	// 2. remove and stop listener
   699  	// 3. unblock
   700  	s.blockDeltas.Lock()
   701  	defer s.blockDeltas.Unlock()
   702  	return s.processor.removeListener(handle)
   703  }
   704  
   705  // sharedProcessor has a collection of processorListener and can
   706  // distribute a notification object to its listeners.  There are two
   707  // kinds of distribute operations.  The sync distributions go to a
   708  // subset of the listeners that (a) is recomputed in the occasional
   709  // calls to shouldResync and (b) every listener is initially put in.
   710  // The non-sync distributions go to every listener.
   711  type sharedProcessor struct {
   712  	listenersStarted bool
   713  	listenersLock    sync.RWMutex
   714  	// Map from listeners to whether or not they are currently syncing
   715  	listeners map[*processorListener]bool
   716  	clock     clock.Clock
   717  	wg        wait.Group
   718  }
   719  
   720  func (p *sharedProcessor) getListener(registration ResourceEventHandlerRegistration) *processorListener {
   721  	p.listenersLock.RLock()
   722  	defer p.listenersLock.RUnlock()
   723  
   724  	if p.listeners == nil {
   725  		return nil
   726  	}
   727  
   728  	if result, ok := registration.(*processorListener); ok {
   729  		if _, exists := p.listeners[result]; exists {
   730  			return result
   731  		}
   732  	}
   733  
   734  	return nil
   735  }
   736  
   737  func (p *sharedProcessor) addListener(listener *processorListener) ResourceEventHandlerRegistration {
   738  	p.listenersLock.Lock()
   739  	defer p.listenersLock.Unlock()
   740  
   741  	if p.listeners == nil {
   742  		p.listeners = make(map[*processorListener]bool)
   743  	}
   744  
   745  	p.listeners[listener] = true
   746  
   747  	if p.listenersStarted {
   748  		p.wg.Start(listener.run)
   749  		p.wg.Start(listener.pop)
   750  	}
   751  
   752  	return listener
   753  }
   754  
   755  func (p *sharedProcessor) removeListener(handle ResourceEventHandlerRegistration) error {
   756  	p.listenersLock.Lock()
   757  	defer p.listenersLock.Unlock()
   758  
   759  	listener, ok := handle.(*processorListener)
   760  	if !ok {
   761  		return fmt.Errorf("invalid key type %t", handle)
   762  	} else if p.listeners == nil {
   763  		// No listeners are registered, do nothing
   764  		return nil
   765  	} else if _, exists := p.listeners[listener]; !exists {
   766  		// Listener is not registered, just do nothing
   767  		return nil
   768  	}
   769  
   770  	delete(p.listeners, listener)
   771  
   772  	if p.listenersStarted {
   773  		close(listener.addCh)
   774  	}
   775  
   776  	return nil
   777  }
   778  
   779  func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
   780  	p.listenersLock.RLock()
   781  	defer p.listenersLock.RUnlock()
   782  
   783  	for listener, isSyncing := range p.listeners {
   784  		switch {
   785  		case !sync:
   786  			// non-sync messages are delivered to every listener
   787  			listener.add(obj)
   788  		case isSyncing:
   789  			// sync messages are delivered to every syncing listener
   790  			listener.add(obj)
   791  		default:
   792  			// skipping a sync obj for a non-syncing listener
   793  		}
   794  	}
   795  }
   796  
   797  func (p *sharedProcessor) run(stopCh <-chan struct{}) {
   798  	func() {
   799  		p.listenersLock.RLock()
   800  		defer p.listenersLock.RUnlock()
   801  		for listener := range p.listeners {
   802  			p.wg.Start(listener.run)
   803  			p.wg.Start(listener.pop)
   804  		}
   805  		p.listenersStarted = true
   806  	}()
   807  	<-stopCh
   808  
   809  	p.listenersLock.Lock()
   810  	defer p.listenersLock.Unlock()
   811  	for listener := range p.listeners {
   812  		close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
   813  	}
   814  
   815  	// Wipe out list of listeners since they are now closed
   816  	// (processorListener cannot be re-used)
   817  	p.listeners = nil
   818  
   819  	// Reset to false since no listeners are running
   820  	p.listenersStarted = false
   821  
   822  	p.wg.Wait() // Wait for all .pop() and .run() to stop
   823  }
   824  
   825  // shouldResync queries every listener to determine if any of them need a resync, based on each
   826  // listener's resyncPeriod.
   827  func (p *sharedProcessor) shouldResync() bool {
   828  	p.listenersLock.Lock()
   829  	defer p.listenersLock.Unlock()
   830  
   831  	resyncNeeded := false
   832  	now := p.clock.Now()
   833  	for listener := range p.listeners {
   834  		// need to loop through all the listeners to see if they need to resync so we can prepare any
   835  		// listeners that are going to be resyncing.
   836  		shouldResync := listener.shouldResync(now)
   837  		p.listeners[listener] = shouldResync
   838  
   839  		if shouldResync {
   840  			resyncNeeded = true
   841  			listener.determineNextResync(now)
   842  		}
   843  	}
   844  	return resyncNeeded
   845  }
   846  
   847  func (p *sharedProcessor) resyncCheckPeriodChanged(resyncCheckPeriod time.Duration) {
   848  	p.listenersLock.RLock()
   849  	defer p.listenersLock.RUnlock()
   850  
   851  	for listener := range p.listeners {
   852  		resyncPeriod := determineResyncPeriod(
   853  			listener.requestedResyncPeriod, resyncCheckPeriod)
   854  		listener.setResyncPeriod(resyncPeriod)
   855  	}
   856  }
   857  
   858  // processorListener relays notifications from a sharedProcessor to
   859  // one ResourceEventHandler --- using two goroutines, two unbuffered
   860  // channels, and an unbounded ring buffer.  The `add(notification)`
   861  // function sends the given notification to `addCh`.  One goroutine
   862  // runs `pop()`, which pumps notifications from `addCh` to `nextCh`
   863  // using storage in the ring buffer while `nextCh` is not keeping up.
   864  // Another goroutine runs `run()`, which receives notifications from
   865  // `nextCh` and synchronously invokes the appropriate handler method.
   866  //
   867  // processorListener also keeps track of the adjusted requested resync
   868  // period of the listener.
   869  type processorListener struct {
   870  	nextCh chan interface{}
   871  	addCh  chan interface{}
   872  
   873  	handler ResourceEventHandler
   874  
   875  	syncTracker *synctrack.SingleFileTracker
   876  
   877  	// pendingNotifications is an unbounded ring buffer that holds all notifications not yet distributed.
   878  	// There is one per listener, but a failing/stalled listener will have infinite pendingNotifications
   879  	// added until we OOM.
   880  	// TODO: This is no worse than before, since reflectors were backed by unbounded DeltaFIFOs, but
   881  	// we should try to do something better.
   882  	pendingNotifications buffer.RingGrowing
   883  
   884  	// requestedResyncPeriod is how frequently the listener wants a
   885  	// full resync from the shared informer, but modified by two
   886  	// adjustments.  One is imposing a lower bound,
   887  	// `minimumResyncPeriod`.  The other is another lower bound, the
   888  	// sharedIndexInformer's `resyncCheckPeriod`, that is imposed (a) only
   889  	// in AddEventHandlerWithResyncPeriod invocations made after the
   890  	// sharedIndexInformer starts and (b) only if the informer does
   891  	// resyncs at all.
   892  	requestedResyncPeriod time.Duration
   893  	// resyncPeriod is the threshold that will be used in the logic
   894  	// for this listener.  This value differs from
   895  	// requestedResyncPeriod only when the sharedIndexInformer does
   896  	// not do resyncs, in which case the value here is zero.  The
   897  	// actual time between resyncs depends on when the
   898  	// sharedProcessor's `shouldResync` function is invoked and when
   899  	// the sharedIndexInformer processes `Sync` type Delta objects.
   900  	resyncPeriod time.Duration
   901  	// nextResync is the earliest time the listener should get a full resync
   902  	nextResync time.Time
   903  	// resyncLock guards access to resyncPeriod and nextResync
   904  	resyncLock sync.Mutex
   905  }
   906  
   907  // HasSynced returns true if the source informer has synced, and all
   908  // corresponding events have been delivered.
   909  func (p *processorListener) HasSynced() bool {
   910  	return p.syncTracker.HasSynced()
   911  }
   912  
   913  func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int, hasSynced func() bool) *processorListener {
   914  	ret := &processorListener{
   915  		nextCh:                make(chan interface{}),
   916  		addCh:                 make(chan interface{}),
   917  		handler:               handler,
   918  		syncTracker:           &synctrack.SingleFileTracker{UpstreamHasSynced: hasSynced},
   919  		pendingNotifications:  *buffer.NewRingGrowing(bufferSize),
   920  		requestedResyncPeriod: requestedResyncPeriod,
   921  		resyncPeriod:          resyncPeriod,
   922  	}
   923  
   924  	ret.determineNextResync(now)
   925  
   926  	return ret
   927  }
   928  
   929  func (p *processorListener) add(notification interface{}) {
   930  	if a, ok := notification.(addNotification); ok && a.isInInitialList {
   931  		p.syncTracker.Start()
   932  	}
   933  	p.addCh <- notification
   934  }
   935  
   936  func (p *processorListener) pop() {
   937  	defer utilruntime.HandleCrash()
   938  	defer close(p.nextCh) // Tell .run() to stop
   939  
   940  	var nextCh chan<- interface{}
   941  	var notification interface{}
   942  	for {
   943  		select {
   944  		case nextCh <- notification:
   945  			// Notification dispatched
   946  			var ok bool
   947  			notification, ok = p.pendingNotifications.ReadOne()
   948  			if !ok { // Nothing to pop
   949  				nextCh = nil // Disable this select case
   950  			}
   951  		case notificationToAdd, ok := <-p.addCh:
   952  			if !ok {
   953  				return
   954  			}
   955  			if notification == nil { // No notification to pop (and pendingNotifications is empty)
   956  				// Optimize the case - skip adding to pendingNotifications
   957  				notification = notificationToAdd
   958  				nextCh = p.nextCh
   959  			} else { // There is already a notification waiting to be dispatched
   960  				p.pendingNotifications.WriteOne(notificationToAdd)
   961  			}
   962  		}
   963  	}
   964  }
   965  
   966  func (p *processorListener) run() {
   967  	// this call blocks until the channel is closed.  When a panic happens during the notification
   968  	// we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
   969  	// the next notification will be attempted.  This is usually better than the alternative of never
   970  	// delivering again.
   971  	stopCh := make(chan struct{})
   972  	wait.Until(func() {
   973  		for next := range p.nextCh {
   974  			switch notification := next.(type) {
   975  			case updateNotification:
   976  				p.handler.OnUpdate(notification.oldObj, notification.newObj)
   977  			case addNotification:
   978  				p.handler.OnAdd(notification.newObj, notification.isInInitialList)
   979  				if notification.isInInitialList {
   980  					p.syncTracker.Finished()
   981  				}
   982  			case deleteNotification:
   983  				p.handler.OnDelete(notification.oldObj)
   984  			default:
   985  				utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
   986  			}
   987  		}
   988  		// the only way to get here is if the p.nextCh is empty and closed
   989  		close(stopCh)
   990  	}, 1*time.Second, stopCh)
   991  }
   992  
   993  // shouldResync deterimines if the listener needs a resync. If the listener's resyncPeriod is 0,
   994  // this always returns false.
   995  func (p *processorListener) shouldResync(now time.Time) bool {
   996  	p.resyncLock.Lock()
   997  	defer p.resyncLock.Unlock()
   998  
   999  	if p.resyncPeriod == 0 {
  1000  		return false
  1001  	}
  1002  
  1003  	return now.After(p.nextResync) || now.Equal(p.nextResync)
  1004  }
  1005  
  1006  func (p *processorListener) determineNextResync(now time.Time) {
  1007  	p.resyncLock.Lock()
  1008  	defer p.resyncLock.Unlock()
  1009  
  1010  	p.nextResync = now.Add(p.resyncPeriod)
  1011  }
  1012  
  1013  func (p *processorListener) setResyncPeriod(resyncPeriod time.Duration) {
  1014  	p.resyncLock.Lock()
  1015  	defer p.resyncLock.Unlock()
  1016  
  1017  	p.resyncPeriod = resyncPeriod
  1018  }
  1019  

View as plain text