...

Source file src/k8s.io/client-go/tools/cache/controller.go

Documentation: k8s.io/client-go/tools/cache

     1  /*
     2  Copyright 2015 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package cache
    18  
    19  import (
    20  	"errors"
    21  	"sync"
    22  	"time"
    23  
    24  	"k8s.io/apimachinery/pkg/runtime"
    25  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    26  	"k8s.io/apimachinery/pkg/util/wait"
    27  	"k8s.io/utils/clock"
    28  )
    29  
    30  // This file implements a low-level controller that is used in
    31  // sharedIndexInformer, which is an implementation of
    32  // SharedIndexInformer.  Such informers, in turn, are key components
    33  // in the high level controllers that form the backbone of the
    34  // Kubernetes control plane.  Look at those for examples, or the
    35  // example in
    36  // https://github.com/kubernetes/client-go/tree/master/examples/workqueue
    37  // .
    38  
    39  // Config contains all the settings for one of these low-level controllers.
    40  type Config struct {
    41  	// The queue for your objects - has to be a DeltaFIFO due to
    42  	// assumptions in the implementation. Your Process() function
    43  	// should accept the output of this Queue's Pop() method.
    44  	Queue
    45  
    46  	// Something that can list and watch your objects.
    47  	ListerWatcher
    48  
    49  	// Something that can process a popped Deltas.
    50  	Process ProcessFunc
    51  
    52  	// ObjectType is an example object of the type this controller is
    53  	// expected to handle.
    54  	ObjectType runtime.Object
    55  
    56  	// ObjectDescription is the description to use when logging type-specific information about this controller.
    57  	ObjectDescription string
    58  
    59  	// FullResyncPeriod is the period at which ShouldResync is considered.
    60  	FullResyncPeriod time.Duration
    61  
    62  	// ShouldResync is periodically used by the reflector to determine
    63  	// whether to Resync the Queue. If ShouldResync is `nil` or
    64  	// returns true, it means the reflector should proceed with the
    65  	// resync.
    66  	ShouldResync ShouldResyncFunc
    67  
    68  	// If true, when Process() returns an error, re-enqueue the object.
    69  	// TODO: add interface to let you inject a delay/backoff or drop
    70  	//       the object completely if desired. Pass the object in
    71  	//       question to this interface as a parameter.  This is probably moot
    72  	//       now that this functionality appears at a higher level.
    73  	RetryOnError bool
    74  
    75  	// Called whenever the ListAndWatch drops the connection with an error.
    76  	WatchErrorHandler WatchErrorHandler
    77  
    78  	// WatchListPageSize is the requested chunk size of initial and relist watch lists.
    79  	WatchListPageSize int64
    80  }
    81  
    82  // ShouldResyncFunc is a type of function that indicates if a reflector should perform a
    83  // resync or not. It can be used by a shared informer to support multiple event handlers with custom
    84  // resync periods.
    85  type ShouldResyncFunc func() bool
    86  
    87  // ProcessFunc processes a single object.
    88  type ProcessFunc func(obj interface{}, isInInitialList bool) error
    89  
    90  // `*controller` implements Controller
    91  type controller struct {
    92  	config         Config
    93  	reflector      *Reflector
    94  	reflectorMutex sync.RWMutex
    95  	clock          clock.Clock
    96  }
    97  
    98  // Controller is a low-level controller that is parameterized by a
    99  // Config and used in sharedIndexInformer.
   100  type Controller interface {
   101  	// Run does two things.  One is to construct and run a Reflector
   102  	// to pump objects/notifications from the Config's ListerWatcher
   103  	// to the Config's Queue and possibly invoke the occasional Resync
   104  	// on that Queue.  The other is to repeatedly Pop from the Queue
   105  	// and process with the Config's ProcessFunc.  Both of these
   106  	// continue until `stopCh` is closed.
   107  	Run(stopCh <-chan struct{})
   108  
   109  	// HasSynced delegates to the Config's Queue
   110  	HasSynced() bool
   111  
   112  	// LastSyncResourceVersion delegates to the Reflector when there
   113  	// is one, otherwise returns the empty string
   114  	LastSyncResourceVersion() string
   115  }
   116  
   117  // New makes a new Controller from the given Config.
   118  func New(c *Config) Controller {
   119  	ctlr := &controller{
   120  		config: *c,
   121  		clock:  &clock.RealClock{},
   122  	}
   123  	return ctlr
   124  }
   125  
   126  // Run begins processing items, and will continue until a value is sent down stopCh or it is closed.
   127  // It's an error to call Run more than once.
   128  // Run blocks; call via go.
   129  func (c *controller) Run(stopCh <-chan struct{}) {
   130  	defer utilruntime.HandleCrash()
   131  	go func() {
   132  		<-stopCh
   133  		c.config.Queue.Close()
   134  	}()
   135  	r := NewReflectorWithOptions(
   136  		c.config.ListerWatcher,
   137  		c.config.ObjectType,
   138  		c.config.Queue,
   139  		ReflectorOptions{
   140  			ResyncPeriod:    c.config.FullResyncPeriod,
   141  			TypeDescription: c.config.ObjectDescription,
   142  			Clock:           c.clock,
   143  		},
   144  	)
   145  	r.ShouldResync = c.config.ShouldResync
   146  	r.WatchListPageSize = c.config.WatchListPageSize
   147  	if c.config.WatchErrorHandler != nil {
   148  		r.watchErrorHandler = c.config.WatchErrorHandler
   149  	}
   150  
   151  	c.reflectorMutex.Lock()
   152  	c.reflector = r
   153  	c.reflectorMutex.Unlock()
   154  
   155  	var wg wait.Group
   156  
   157  	wg.StartWithChannel(stopCh, r.Run)
   158  
   159  	wait.Until(c.processLoop, time.Second, stopCh)
   160  	wg.Wait()
   161  }
   162  
   163  // Returns true once this controller has completed an initial resource listing
   164  func (c *controller) HasSynced() bool {
   165  	return c.config.Queue.HasSynced()
   166  }
   167  
   168  func (c *controller) LastSyncResourceVersion() string {
   169  	c.reflectorMutex.RLock()
   170  	defer c.reflectorMutex.RUnlock()
   171  	if c.reflector == nil {
   172  		return ""
   173  	}
   174  	return c.reflector.LastSyncResourceVersion()
   175  }
   176  
   177  // processLoop drains the work queue.
   178  // TODO: Consider doing the processing in parallel. This will require a little thought
   179  // to make sure that we don't end up processing the same object multiple times
   180  // concurrently.
   181  //
   182  // TODO: Plumb through the stopCh here (and down to the queue) so that this can
   183  // actually exit when the controller is stopped. Or just give up on this stuff
   184  // ever being stoppable. Converting this whole package to use Context would
   185  // also be helpful.
   186  func (c *controller) processLoop() {
   187  	for {
   188  		obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
   189  		if err != nil {
   190  			if err == ErrFIFOClosed {
   191  				return
   192  			}
   193  			if c.config.RetryOnError {
   194  				// This is the safe way to re-enqueue.
   195  				c.config.Queue.AddIfNotPresent(obj)
   196  			}
   197  		}
   198  	}
   199  }
   200  
   201  // ResourceEventHandler can handle notifications for events that
   202  // happen to a resource. The events are informational only, so you
   203  // can't return an error.  The handlers MUST NOT modify the objects
   204  // received; this concerns not only the top level of structure but all
   205  // the data structures reachable from it.
   206  //   - OnAdd is called when an object is added.
   207  //   - OnUpdate is called when an object is modified. Note that oldObj is the
   208  //     last known state of the object-- it is possible that several changes
   209  //     were combined together, so you can't use this to see every single
   210  //     change. OnUpdate is also called when a re-list happens, and it will
   211  //     get called even if nothing changed. This is useful for periodically
   212  //     evaluating or syncing something.
   213  //   - OnDelete will get the final state of the item if it is known, otherwise
   214  //     it will get an object of type DeletedFinalStateUnknown. This can
   215  //     happen if the watch is closed and misses the delete event and we don't
   216  //     notice the deletion until the subsequent re-list.
   217  type ResourceEventHandler interface {
   218  	OnAdd(obj interface{}, isInInitialList bool)
   219  	OnUpdate(oldObj, newObj interface{})
   220  	OnDelete(obj interface{})
   221  }
   222  
   223  // ResourceEventHandlerFuncs is an adaptor to let you easily specify as many or
   224  // as few of the notification functions as you want while still implementing
   225  // ResourceEventHandler.  This adapter does not remove the prohibition against
   226  // modifying the objects.
   227  //
   228  // See ResourceEventHandlerDetailedFuncs if your use needs to propagate
   229  // HasSynced.
   230  type ResourceEventHandlerFuncs struct {
   231  	AddFunc    func(obj interface{})
   232  	UpdateFunc func(oldObj, newObj interface{})
   233  	DeleteFunc func(obj interface{})
   234  }
   235  
   236  // OnAdd calls AddFunc if it's not nil.
   237  func (r ResourceEventHandlerFuncs) OnAdd(obj interface{}, isInInitialList bool) {
   238  	if r.AddFunc != nil {
   239  		r.AddFunc(obj)
   240  	}
   241  }
   242  
   243  // OnUpdate calls UpdateFunc if it's not nil.
   244  func (r ResourceEventHandlerFuncs) OnUpdate(oldObj, newObj interface{}) {
   245  	if r.UpdateFunc != nil {
   246  		r.UpdateFunc(oldObj, newObj)
   247  	}
   248  }
   249  
   250  // OnDelete calls DeleteFunc if it's not nil.
   251  func (r ResourceEventHandlerFuncs) OnDelete(obj interface{}) {
   252  	if r.DeleteFunc != nil {
   253  		r.DeleteFunc(obj)
   254  	}
   255  }
   256  
   257  // ResourceEventHandlerDetailedFuncs is exactly like ResourceEventHandlerFuncs
   258  // except its AddFunc accepts the isInInitialList parameter, for propagating
   259  // HasSynced.
   260  type ResourceEventHandlerDetailedFuncs struct {
   261  	AddFunc    func(obj interface{}, isInInitialList bool)
   262  	UpdateFunc func(oldObj, newObj interface{})
   263  	DeleteFunc func(obj interface{})
   264  }
   265  
   266  // OnAdd calls AddFunc if it's not nil.
   267  func (r ResourceEventHandlerDetailedFuncs) OnAdd(obj interface{}, isInInitialList bool) {
   268  	if r.AddFunc != nil {
   269  		r.AddFunc(obj, isInInitialList)
   270  	}
   271  }
   272  
   273  // OnUpdate calls UpdateFunc if it's not nil.
   274  func (r ResourceEventHandlerDetailedFuncs) OnUpdate(oldObj, newObj interface{}) {
   275  	if r.UpdateFunc != nil {
   276  		r.UpdateFunc(oldObj, newObj)
   277  	}
   278  }
   279  
   280  // OnDelete calls DeleteFunc if it's not nil.
   281  func (r ResourceEventHandlerDetailedFuncs) OnDelete(obj interface{}) {
   282  	if r.DeleteFunc != nil {
   283  		r.DeleteFunc(obj)
   284  	}
   285  }
   286  
   287  // FilteringResourceEventHandler applies the provided filter to all events coming
   288  // in, ensuring the appropriate nested handler method is invoked. An object
   289  // that starts passing the filter after an update is considered an add, and an
   290  // object that stops passing the filter after an update is considered a delete.
   291  // Like the handlers, the filter MUST NOT modify the objects it is given.
   292  type FilteringResourceEventHandler struct {
   293  	FilterFunc func(obj interface{}) bool
   294  	Handler    ResourceEventHandler
   295  }
   296  
   297  // OnAdd calls the nested handler only if the filter succeeds
   298  func (r FilteringResourceEventHandler) OnAdd(obj interface{}, isInInitialList bool) {
   299  	if !r.FilterFunc(obj) {
   300  		return
   301  	}
   302  	r.Handler.OnAdd(obj, isInInitialList)
   303  }
   304  
   305  // OnUpdate ensures the proper handler is called depending on whether the filter matches
   306  func (r FilteringResourceEventHandler) OnUpdate(oldObj, newObj interface{}) {
   307  	newer := r.FilterFunc(newObj)
   308  	older := r.FilterFunc(oldObj)
   309  	switch {
   310  	case newer && older:
   311  		r.Handler.OnUpdate(oldObj, newObj)
   312  	case newer && !older:
   313  		r.Handler.OnAdd(newObj, false)
   314  	case !newer && older:
   315  		r.Handler.OnDelete(oldObj)
   316  	default:
   317  		// do nothing
   318  	}
   319  }
   320  
   321  // OnDelete calls the nested handler only if the filter succeeds
   322  func (r FilteringResourceEventHandler) OnDelete(obj interface{}) {
   323  	if !r.FilterFunc(obj) {
   324  		return
   325  	}
   326  	r.Handler.OnDelete(obj)
   327  }
   328  
   329  // DeletionHandlingMetaNamespaceKeyFunc checks for
   330  // DeletedFinalStateUnknown objects before calling
   331  // MetaNamespaceKeyFunc.
   332  func DeletionHandlingMetaNamespaceKeyFunc(obj interface{}) (string, error) {
   333  	if d, ok := obj.(DeletedFinalStateUnknown); ok {
   334  		return d.Key, nil
   335  	}
   336  	return MetaNamespaceKeyFunc(obj)
   337  }
   338  
   339  // DeletionHandlingObjectToName checks for
   340  // DeletedFinalStateUnknown objects before calling
   341  // ObjectToName.
   342  func DeletionHandlingObjectToName(obj interface{}) (ObjectName, error) {
   343  	if d, ok := obj.(DeletedFinalStateUnknown); ok {
   344  		return ParseObjectName(d.Key)
   345  	}
   346  	return ObjectToName(obj)
   347  }
   348  
   349  // NewInformer returns a Store and a controller for populating the store
   350  // while also providing event notifications. You should only used the returned
   351  // Store for Get/List operations; Add/Modify/Deletes will cause the event
   352  // notifications to be faulty.
   353  //
   354  // Parameters:
   355  //   - lw is list and watch functions for the source of the resource you want to
   356  //     be informed of.
   357  //   - objType is an object of the type that you expect to receive.
   358  //   - resyncPeriod: if non-zero, will re-list this often (you will get OnUpdate
   359  //     calls, even if nothing changed). Otherwise, re-list will be delayed as
   360  //     long as possible (until the upstream source closes the watch or times out,
   361  //     or you stop the controller).
   362  //   - h is the object you want notifications sent to.
   363  func NewInformer(
   364  	lw ListerWatcher,
   365  	objType runtime.Object,
   366  	resyncPeriod time.Duration,
   367  	h ResourceEventHandler,
   368  ) (Store, Controller) {
   369  	// This will hold the client state, as we know it.
   370  	clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc)
   371  
   372  	return clientState, newInformer(lw, objType, resyncPeriod, h, clientState, nil)
   373  }
   374  
   375  // NewIndexerInformer returns an Indexer and a Controller for populating the index
   376  // while also providing event notifications. You should only used the returned
   377  // Index for Get/List operations; Add/Modify/Deletes will cause the event
   378  // notifications to be faulty.
   379  //
   380  // Parameters:
   381  //   - lw is list and watch functions for the source of the resource you want to
   382  //     be informed of.
   383  //   - objType is an object of the type that you expect to receive.
   384  //   - resyncPeriod: if non-zero, will re-list this often (you will get OnUpdate
   385  //     calls, even if nothing changed). Otherwise, re-list will be delayed as
   386  //     long as possible (until the upstream source closes the watch or times out,
   387  //     or you stop the controller).
   388  //   - h is the object you want notifications sent to.
   389  //   - indexers is the indexer for the received object type.
   390  func NewIndexerInformer(
   391  	lw ListerWatcher,
   392  	objType runtime.Object,
   393  	resyncPeriod time.Duration,
   394  	h ResourceEventHandler,
   395  	indexers Indexers,
   396  ) (Indexer, Controller) {
   397  	// This will hold the client state, as we know it.
   398  	clientState := NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers)
   399  
   400  	return clientState, newInformer(lw, objType, resyncPeriod, h, clientState, nil)
   401  }
   402  
   403  // NewTransformingInformer returns a Store and a controller for populating
   404  // the store while also providing event notifications. You should only used
   405  // the returned Store for Get/List operations; Add/Modify/Deletes will cause
   406  // the event notifications to be faulty.
   407  // The given transform function will be called on all objects before they will
   408  // put into the Store and corresponding Add/Modify/Delete handlers will
   409  // be invoked for them.
   410  func NewTransformingInformer(
   411  	lw ListerWatcher,
   412  	objType runtime.Object,
   413  	resyncPeriod time.Duration,
   414  	h ResourceEventHandler,
   415  	transformer TransformFunc,
   416  ) (Store, Controller) {
   417  	// This will hold the client state, as we know it.
   418  	clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc)
   419  
   420  	return clientState, newInformer(lw, objType, resyncPeriod, h, clientState, transformer)
   421  }
   422  
   423  // NewTransformingIndexerInformer returns an Indexer and a controller for
   424  // populating the index while also providing event notifications. You should
   425  // only used the returned Index for Get/List operations; Add/Modify/Deletes
   426  // will cause the event notifications to be faulty.
   427  // The given transform function will be called on all objects before they will
   428  // be put into the Index and corresponding Add/Modify/Delete handlers will
   429  // be invoked for them.
   430  func NewTransformingIndexerInformer(
   431  	lw ListerWatcher,
   432  	objType runtime.Object,
   433  	resyncPeriod time.Duration,
   434  	h ResourceEventHandler,
   435  	indexers Indexers,
   436  	transformer TransformFunc,
   437  ) (Indexer, Controller) {
   438  	// This will hold the client state, as we know it.
   439  	clientState := NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers)
   440  
   441  	return clientState, newInformer(lw, objType, resyncPeriod, h, clientState, transformer)
   442  }
   443  
   444  // Multiplexes updates in the form of a list of Deltas into a Store, and informs
   445  // a given handler of events OnUpdate, OnAdd, OnDelete
   446  func processDeltas(
   447  	// Object which receives event notifications from the given deltas
   448  	handler ResourceEventHandler,
   449  	clientState Store,
   450  	deltas Deltas,
   451  	isInInitialList bool,
   452  ) error {
   453  	// from oldest to newest
   454  	for _, d := range deltas {
   455  		obj := d.Object
   456  
   457  		switch d.Type {
   458  		case Sync, Replaced, Added, Updated:
   459  			if old, exists, err := clientState.Get(obj); err == nil && exists {
   460  				if err := clientState.Update(obj); err != nil {
   461  					return err
   462  				}
   463  				handler.OnUpdate(old, obj)
   464  			} else {
   465  				if err := clientState.Add(obj); err != nil {
   466  					return err
   467  				}
   468  				handler.OnAdd(obj, isInInitialList)
   469  			}
   470  		case Deleted:
   471  			if err := clientState.Delete(obj); err != nil {
   472  				return err
   473  			}
   474  			handler.OnDelete(obj)
   475  		}
   476  	}
   477  	return nil
   478  }
   479  
   480  // newInformer returns a controller for populating the store while also
   481  // providing event notifications.
   482  //
   483  // Parameters
   484  //   - lw is list and watch functions for the source of the resource you want to
   485  //     be informed of.
   486  //   - objType is an object of the type that you expect to receive.
   487  //   - resyncPeriod: if non-zero, will re-list this often (you will get OnUpdate
   488  //     calls, even if nothing changed). Otherwise, re-list will be delayed as
   489  //     long as possible (until the upstream source closes the watch or times out,
   490  //     or you stop the controller).
   491  //   - h is the object you want notifications sent to.
   492  //   - clientState is the store you want to populate
   493  func newInformer(
   494  	lw ListerWatcher,
   495  	objType runtime.Object,
   496  	resyncPeriod time.Duration,
   497  	h ResourceEventHandler,
   498  	clientState Store,
   499  	transformer TransformFunc,
   500  ) Controller {
   501  	// This will hold incoming changes. Note how we pass clientState in as a
   502  	// KeyLister, that way resync operations will result in the correct set
   503  	// of update/delete deltas.
   504  	fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
   505  		KnownObjects:          clientState,
   506  		EmitDeltaTypeReplaced: true,
   507  		Transformer:           transformer,
   508  	})
   509  
   510  	cfg := &Config{
   511  		Queue:            fifo,
   512  		ListerWatcher:    lw,
   513  		ObjectType:       objType,
   514  		FullResyncPeriod: resyncPeriod,
   515  		RetryOnError:     false,
   516  
   517  		Process: func(obj interface{}, isInInitialList bool) error {
   518  			if deltas, ok := obj.(Deltas); ok {
   519  				return processDeltas(h, clientState, deltas, isInInitialList)
   520  			}
   521  			return errors.New("object given as Process argument is not Deltas")
   522  		},
   523  	}
   524  	return New(cfg)
   525  }
   526  

View as plain text