...

Source file src/sigs.k8s.io/cli-utils/pkg/kstatus/watcher/object_status_reporter.go

Documentation: sigs.k8s.io/cli-utils/pkg/kstatus/watcher

     1  // Copyright 2022 The Kubernetes Authors.
     2  // SPDX-License-Identifier: Apache-2.0
     3  
     4  package watcher
     5  
     6  import (
     7  	"context"
     8  	"errors"
     9  	"fmt"
    10  	"io"
    11  	"sync"
    12  	"time"
    13  
    14  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    15  	"k8s.io/apimachinery/pkg/api/meta"
    16  	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
    17  	"k8s.io/apimachinery/pkg/runtime/schema"
    18  	"k8s.io/apimachinery/pkg/util/wait"
    19  	"k8s.io/client-go/tools/cache"
    20  	"k8s.io/klog/v2"
    21  	"k8s.io/utils/clock"
    22  	"sigs.k8s.io/cli-utils/pkg/kstatus/polling/engine"
    23  	"sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
    24  	"sigs.k8s.io/cli-utils/pkg/kstatus/status"
    25  	"sigs.k8s.io/cli-utils/pkg/object"
    26  )
    27  
    28  // GroupKindNamespace identifies an informer target.
    29  // When used as an informer target, the namespace is optional.
    30  // When the namespace is empty for namespaced resources, all namespaces are watched.
    31  type GroupKindNamespace struct {
    32  	Group     string
    33  	Kind      string
    34  	Namespace string
    35  }
    36  
    37  // String returns a serialized form suitable for logging.
    38  func (gkn GroupKindNamespace) String() string {
    39  	return fmt.Sprintf("%s/%s/namespaces/%s",
    40  		gkn.Group, gkn.Kind, gkn.Namespace)
    41  }
    42  
    43  func (gkn GroupKindNamespace) GroupKind() schema.GroupKind {
    44  	return schema.GroupKind{Group: gkn.Group, Kind: gkn.Kind}
    45  }
    46  
    47  // ObjectStatusReporter reports on updates to objects (instances) using a
    48  // network of informers to watch one or more resources (types).
    49  //
    50  // Unlike SharedIndexInformer, ObjectStatusReporter...
    51  //   - Reports object status.
    52  //   - Can watch multiple resource types simultaneously.
    53  //   - Specific objects can be ignored for efficiency by specifying an ObjectFilter.
    54  //   - Resolves GroupKinds into Resources at runtime, to pick up newly added
    55  //     resources.
    56  //   - Starts and Stops individual watches automaically to reduce errors when a
    57  //     CRD or Namespace is deleted.
    58  //   - Resources can be watched in root-scope mode or namespace-scope mode,
    59  //     allowing the caller to optimize for efficiency or least-privilege.
    60  //   - Gives unschedulable Pods (and objects that generate them) a 15s grace
    61  //     period before reporting them as Failed.
    62  //   - Resets the RESTMapper cache automatically when CRDs are modified.
    63  //
    64  // ObjectStatusReporter is NOT repeatable. It will panic if started more than
    65  // once. If you need a repeatable factory, use DefaultStatusWatcher.
    66  //
    67  // TODO: support detection of added/removed api extensions at runtime
    68  // TODO: Watch CRDs & Namespaces, even if not in the set of IDs.
    69  // TODO: Retry with backoff if in namespace-scoped mode, to allow CRDs & namespaces to be created asynchronously
    70  type ObjectStatusReporter struct {
    71  	// InformerFactory is used to build informers
    72  	InformerFactory *DynamicInformerFactory
    73  
    74  	// Mapper is used to map from GroupKind to GroupVersionKind.
    75  	Mapper meta.RESTMapper
    76  
    77  	// StatusReader specifies a custom implementation of the
    78  	// engine.StatusReader interface that will be used to compute reconcile
    79  	// status for resource objects.
    80  	StatusReader engine.StatusReader
    81  
    82  	// ClusterReader is used to look up generated objects on-demand.
    83  	// Generated objects (ex: Deployment > ReplicaSet > Pod) are sometimes
    84  	// required for computing parent object status, to compensate for
    85  	// controllers that aren't following status conventions.
    86  	ClusterReader engine.ClusterReader
    87  
    88  	// GroupKinds is the list of GroupKinds to watch.
    89  	Targets []GroupKindNamespace
    90  
    91  	// ObjectFilter is used to decide which objects to ingore.
    92  	ObjectFilter ObjectFilter
    93  
    94  	// RESTScope specifies whether to ListAndWatch resources at the namespace
    95  	// or cluster (root) level. Using root scope is more efficient, but
    96  	// namespace scope may require fewer permissions.
    97  	RESTScope meta.RESTScope
    98  
    99  	// lock guards modification of the subsequent stateful fields
   100  	lock sync.Mutex
   101  
   102  	// gk2gkn maps GKs to GKNs to make it easy/cheap to look up.
   103  	gk2gkn map[schema.GroupKind]map[GroupKindNamespace]struct{}
   104  
   105  	// ns2gkn maps Namespaces to GKNs to make it easy/cheap to look up.
   106  	ns2gkn map[string]map[GroupKindNamespace]struct{}
   107  
   108  	// informerRefs tracks which informers have been started and stopped
   109  	informerRefs map[GroupKindNamespace]*informerReference
   110  
   111  	// context will be cancelled when the reporter should stop.
   112  	context context.Context
   113  
   114  	// cancel function that stops the context.
   115  	// This should only be called after the terminal error event has been sent.
   116  	cancel context.CancelFunc
   117  
   118  	// funnel multiplexes multiple input channels into one output channel,
   119  	// allowing input channels to be added and removed at runtime.
   120  	funnel *eventFunnel
   121  
   122  	// taskManager makes it possible to cancel scheduled tasks.
   123  	taskManager *taskManager
   124  
   125  	started bool
   126  	stopped bool
   127  }
   128  
   129  func (w *ObjectStatusReporter) Start(ctx context.Context) <-chan event.Event {
   130  	w.lock.Lock()
   131  	defer w.lock.Unlock()
   132  
   133  	if w.started {
   134  		panic("ObjectStatusInformer cannot be restarted")
   135  	}
   136  
   137  	w.taskManager = &taskManager{}
   138  
   139  	// Map GroupKinds to sets of GroupKindNamespaces for fast lookups.
   140  	// This is the only time we modify the map.
   141  	// So it should be safe to read from multiple threads after this.
   142  	w.gk2gkn = make(map[schema.GroupKind]map[GroupKindNamespace]struct{})
   143  	for _, gkn := range w.Targets {
   144  		gk := gkn.GroupKind()
   145  		m, found := w.gk2gkn[gk]
   146  		if !found {
   147  			m = make(map[GroupKindNamespace]struct{})
   148  			w.gk2gkn[gk] = m
   149  		}
   150  		m[gkn] = struct{}{}
   151  	}
   152  
   153  	// Map namespaces to sets of GroupKindNamespaces for fast lookups.
   154  	// This is the only time we modify the map.
   155  	// So it should be safe to read from multiple threads after this.
   156  	w.ns2gkn = make(map[string]map[GroupKindNamespace]struct{})
   157  	for _, gkn := range w.Targets {
   158  		ns := gkn.Namespace
   159  		m, found := w.ns2gkn[ns]
   160  		if !found {
   161  			m = make(map[GroupKindNamespace]struct{})
   162  			w.ns2gkn[ns] = m
   163  		}
   164  		m[gkn] = struct{}{}
   165  	}
   166  
   167  	// Initialize the informer map with references to track their start/stop.
   168  	// This is the only time we modify the map.
   169  	// So it should be safe to read from multiple threads after this.
   170  	w.informerRefs = make(map[GroupKindNamespace]*informerReference, len(w.Targets))
   171  	for _, gkn := range w.Targets {
   172  		w.informerRefs[gkn] = &informerReference{}
   173  	}
   174  
   175  	ctx, cancel := context.WithCancel(ctx)
   176  	w.context = ctx
   177  	w.cancel = cancel
   178  
   179  	// Use an event funnel to multiplex events through multiple input channels
   180  	// into out output channel. We can't use the normal fan-in pattern, because
   181  	// we need to be able to add and remove new input channels at runtime, as
   182  	// new informers are created and destroyed.
   183  	w.funnel = newEventFunnel(ctx)
   184  
   185  	// Send start requests.
   186  	for _, gkn := range w.Targets {
   187  		w.startInformer(gkn)
   188  	}
   189  
   190  	w.started = true
   191  
   192  	// Block until the event funnel is closed.
   193  	// The event funnel will close after all the informer channels are closed.
   194  	// The informer channels will close after the informers have stopped.
   195  	// The informers will stop after their context is cancelled.
   196  	go func() {
   197  		<-w.funnel.Done()
   198  
   199  		w.lock.Lock()
   200  		defer w.lock.Unlock()
   201  		w.stopped = true
   202  	}()
   203  
   204  	// Wait until all informers are synced or stopped, then send a SyncEvent.
   205  	syncEventCh := make(chan event.Event)
   206  	err := w.funnel.AddInputChannel(syncEventCh)
   207  	if err != nil {
   208  		// Reporter already stopped.
   209  		return handleFatalError(fmt.Errorf("reporter failed to start: %v", err))
   210  	}
   211  	go func() {
   212  		defer close(syncEventCh)
   213  		// TODO: should we use something less aggressive, like wait.BackoffUntil?
   214  		if cache.WaitForCacheSync(ctx.Done(), w.HasSynced) {
   215  			syncEventCh <- event.Event{
   216  				Type: event.SyncEvent,
   217  			}
   218  		}
   219  	}()
   220  
   221  	return w.funnel.OutputChannel()
   222  }
   223  
   224  // Stop triggers the cancellation of the reporter context, and closure of the
   225  // event channel without sending an error event.
   226  func (w *ObjectStatusReporter) Stop() {
   227  	klog.V(4).Info("Stopping reporter")
   228  	w.cancel()
   229  }
   230  
   231  // HasSynced returns true if all the started informers have been synced.
   232  //
   233  // Use the following to block waiting for synchronization:
   234  // synced := cache.WaitForCacheSync(stopCh, informer.HasSynced)
   235  func (w *ObjectStatusReporter) HasSynced() bool {
   236  	w.lock.Lock()
   237  	defer w.lock.Unlock()
   238  
   239  	if w.stopped || !w.started {
   240  		return false
   241  	}
   242  
   243  	pending := make([]GroupKindNamespace, 0, len(w.informerRefs))
   244  	for gke, informer := range w.informerRefs {
   245  		if informer.HasStarted() && !informer.HasSynced() {
   246  			pending = append(pending, gke)
   247  		}
   248  	}
   249  	if len(pending) > 0 {
   250  		klog.V(5).Infof("Informers pending synchronization: %v", pending)
   251  		return false
   252  	}
   253  	return true
   254  }
   255  
   256  // startInformer adds the specified GroupKindNamespace to the start channel to
   257  // be started asynchronously.
   258  func (w *ObjectStatusReporter) startInformer(gkn GroupKindNamespace) {
   259  	ctx, ok := w.informerRefs[gkn].Start(w.context)
   260  	if !ok {
   261  		klog.V(5).Infof("Watch start skipped (already started): %v", gkn)
   262  		// already started
   263  		return
   264  	}
   265  	go w.startInformerWithRetry(ctx, gkn)
   266  }
   267  
   268  // stopInformer stops the informer watching the specified GroupKindNamespace.
   269  func (w *ObjectStatusReporter) stopInformer(gkn GroupKindNamespace) {
   270  	w.informerRefs[gkn].Stop()
   271  }
   272  
   273  func (w *ObjectStatusReporter) startInformerWithRetry(ctx context.Context, gkn GroupKindNamespace) {
   274  	realClock := &clock.RealClock{}
   275  	backoffManager := wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock)
   276  	retryCtx, retryCancel := context.WithCancel(ctx)
   277  
   278  	wait.BackoffUntil(func() {
   279  		err := w.startInformerNow(
   280  			ctx,
   281  			gkn,
   282  		)
   283  		if err != nil {
   284  			if meta.IsNoMatchError(err) {
   285  				// CRD (or api extension) not installed
   286  				// TODO: retry if CRDs are not being watched
   287  				klog.V(3).Infof("Watch start error (blocking until CRD is added): %v: %v", gkn, err)
   288  				// Cancel the parent context, which will stop the retries too.
   289  				w.stopInformer(gkn)
   290  				return
   291  			}
   292  
   293  			// Create a temporary input channel to send the error event.
   294  			eventCh := make(chan event.Event)
   295  			defer close(eventCh)
   296  			err := w.funnel.AddInputChannel(eventCh)
   297  			if err != nil {
   298  				// Reporter already stopped.
   299  				// This is fine. 🔥
   300  				klog.V(5).Infof("Informer failed to start: %v", err)
   301  				return
   302  			}
   303  			// Send error event and stop the reporter!
   304  			w.handleFatalError(eventCh, err)
   305  			return
   306  		}
   307  		// Success! - Stop retrying
   308  		retryCancel()
   309  	}, backoffManager, true, retryCtx.Done())
   310  }
   311  
   312  // startInformerNow starts an informer to watch for changes to a
   313  // GroupKindNamespace. Changes are filtered and passed by event channel into the
   314  // funnel. Each update event includes the computed status of the object.
   315  // An error is returned if the informer could not be created.
   316  func (w *ObjectStatusReporter) startInformerNow(
   317  	ctx context.Context,
   318  	gkn GroupKindNamespace,
   319  ) error {
   320  	// Look up the mapping for this GroupKind.
   321  	// If it doesn't exist, either delay watching or emit an error.
   322  	gk := gkn.GroupKind()
   323  	mapping, err := w.Mapper.RESTMapping(gk)
   324  	if err != nil {
   325  		// Might be a NoResourceMatchError/NoKindMatchError
   326  		return err
   327  	}
   328  
   329  	informer := w.InformerFactory.NewInformer(ctx, mapping, gkn.Namespace)
   330  
   331  	w.informerRefs[gkn].SetInformer(informer)
   332  
   333  	eventCh := make(chan event.Event)
   334  
   335  	// Add this event channel to the output multiplexer
   336  	err = w.funnel.AddInputChannel(eventCh)
   337  	if err != nil {
   338  		// Reporter already stopped.
   339  		return fmt.Errorf("informer failed to build event handler: %w", err)
   340  	}
   341  
   342  	// Handler called when ListAndWatch errors.
   343  	// Custom handler stops the informer if the resource is NotFound (CRD deleted).
   344  	err = informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
   345  		w.watchErrorHandler(gkn, eventCh, err)
   346  	})
   347  	if err != nil {
   348  		// Should never happen.
   349  		// Informer can't have started yet. We just created it.
   350  		return fmt.Errorf("failed to set error handler on new informer for %v: %v", mapping.Resource, err)
   351  	}
   352  
   353  	informer.AddEventHandler(w.eventHandler(ctx, eventCh))
   354  
   355  	// Start the informer in the background.
   356  	// Informer will be stopped when the context is cancelled.
   357  	go func() {
   358  		klog.V(3).Infof("Watch starting: %v", gkn)
   359  		informer.Run(ctx.Done())
   360  		klog.V(3).Infof("Watch stopped: %v", gkn)
   361  		// Signal to the caller there will be no more events for this GroupKind.
   362  		close(eventCh)
   363  	}()
   364  
   365  	return nil
   366  }
   367  
   368  func (w *ObjectStatusReporter) forEachTargetWithGroupKind(gk schema.GroupKind, fn func(GroupKindNamespace)) {
   369  	for gkn := range w.gk2gkn[gk] {
   370  		fn(gkn)
   371  	}
   372  }
   373  
   374  func (w *ObjectStatusReporter) forEachTargetWithNamespace(ns string, fn func(GroupKindNamespace)) {
   375  	for gkn := range w.ns2gkn[ns] {
   376  		fn(gkn)
   377  	}
   378  }
   379  
   380  // readStatusFromObject is a convenience function to read object status with a
   381  // StatusReader using a ClusterReader to retrieve generated objects.
   382  func (w *ObjectStatusReporter) readStatusFromObject(
   383  	ctx context.Context,
   384  	obj *unstructured.Unstructured,
   385  ) (*event.ResourceStatus, error) {
   386  	return w.StatusReader.ReadStatusForObject(ctx, w.ClusterReader, obj)
   387  }
   388  
   389  // readStatusFromCluster is a convenience function to read object status with a
   390  // StatusReader using a ClusterReader to retrieve the object and its generated
   391  // objects.
   392  func (w *ObjectStatusReporter) readStatusFromCluster(
   393  	ctx context.Context,
   394  	id object.ObjMetadata,
   395  ) (*event.ResourceStatus, error) {
   396  	return w.StatusReader.ReadStatus(ctx, w.ClusterReader, id)
   397  }
   398  
   399  // deletedStatus builds a ResourceStatus for a deleted object.
   400  //
   401  // StatusReader.ReadStatusForObject doesn't handle nil objects as input. So
   402  // this builds the status manually.
   403  // TODO: find a way to delegate this back to the status package.
   404  func deletedStatus(id object.ObjMetadata) *event.ResourceStatus {
   405  	// Status is always NotFound after deltion.
   406  	// Passed obj represents the last known state, not the current state.
   407  	result := &event.ResourceStatus{
   408  		Identifier: id,
   409  		Status:     status.NotFoundStatus,
   410  		Message:    "Resource not found",
   411  	}
   412  
   413  	return &event.ResourceStatus{
   414  		Identifier: id,
   415  		Resource:   nil, // deleted object has no
   416  		Status:     result.Status,
   417  		Message:    result.Message,
   418  		// If deleted with foreground deletion, a finalizer will have blocked
   419  		// deletion until all the generated resources are deleted.
   420  		// TODO: Handle lookup of generated resources when not using foreground deletion.
   421  		GeneratedResources: nil,
   422  	}
   423  }
   424  
   425  // eventHandler builds an event handler to compute object status.
   426  // Returns an event channel on which these stats updates will be reported.
   427  func (w *ObjectStatusReporter) eventHandler(
   428  	ctx context.Context,
   429  	eventCh chan<- event.Event,
   430  ) cache.ResourceEventHandler {
   431  	var handler cache.ResourceEventHandlerFuncs
   432  
   433  	handler.AddFunc = func(iobj interface{}) {
   434  		// Bail early if the context is cancelled, to avoid unnecessary work.
   435  		if ctx.Err() != nil {
   436  			return
   437  		}
   438  
   439  		obj, ok := iobj.(*unstructured.Unstructured)
   440  		if !ok {
   441  			panic(fmt.Sprintf("AddFunc received unexpected object type %T", iobj))
   442  		}
   443  		id := object.UnstructuredToObjMetadata(obj)
   444  		if w.ObjectFilter.Filter(obj) {
   445  			klog.V(7).Infof("Watch Event Skipped: AddFunc: %s", id)
   446  			return
   447  		}
   448  		klog.V(5).Infof("AddFunc: Computing status for object: %s", id)
   449  
   450  		// cancel any scheduled status update for this object
   451  		w.taskManager.Cancel(id)
   452  
   453  		rs, err := w.readStatusFromObject(ctx, obj)
   454  		if err != nil {
   455  			// Send error event and stop the reporter!
   456  			w.handleFatalError(eventCh, fmt.Errorf("failed to compute object status: %s: %w", id, err))
   457  			return
   458  		}
   459  
   460  		if object.IsNamespace(obj) {
   461  			klog.V(5).Infof("AddFunc: Namespace added: %v", id)
   462  			w.onNamespaceAdd(obj)
   463  		} else if object.IsCRD(obj) {
   464  			klog.V(5).Infof("AddFunc: CRD added: %v", id)
   465  			w.onCRDAdd(obj)
   466  		}
   467  
   468  		if isObjectUnschedulable(rs) {
   469  			klog.V(5).Infof("AddFunc: object unschedulable: %v", id)
   470  			// schedule delayed status update
   471  			w.taskManager.Schedule(ctx, id, status.ScheduleWindow,
   472  				w.newStatusCheckTaskFunc(ctx, eventCh, id))
   473  		}
   474  
   475  		klog.V(7).Infof("AddFunc: sending update event: %v", rs)
   476  		eventCh <- event.Event{
   477  			Type:     event.ResourceUpdateEvent,
   478  			Resource: rs,
   479  		}
   480  	}
   481  
   482  	handler.UpdateFunc = func(_, iobj interface{}) {
   483  		// Bail early if the context is cancelled, to avoid unnecessary work.
   484  		if ctx.Err() != nil {
   485  			return
   486  		}
   487  
   488  		obj, ok := iobj.(*unstructured.Unstructured)
   489  		if !ok {
   490  			panic(fmt.Sprintf("UpdateFunc received unexpected object type %T", iobj))
   491  		}
   492  		id := object.UnstructuredToObjMetadata(obj)
   493  		if w.ObjectFilter.Filter(obj) {
   494  			klog.V(7).Infof("UpdateFunc: Watch Event Skipped: %s", id)
   495  			return
   496  		}
   497  		klog.V(5).Infof("UpdateFunc: Computing status for object: %s", id)
   498  
   499  		// cancel any scheduled status update for this object
   500  		w.taskManager.Cancel(id)
   501  
   502  		rs, err := w.readStatusFromObject(ctx, obj)
   503  		if err != nil {
   504  			// Send error event and stop the reporter!
   505  			w.handleFatalError(eventCh, fmt.Errorf("failed to compute object status: %s: %w", id, err))
   506  			return
   507  		}
   508  
   509  		if object.IsNamespace(obj) {
   510  			klog.V(5).Infof("UpdateFunc: Namespace updated: %v", id)
   511  			w.onNamespaceUpdate(obj)
   512  		} else if object.IsCRD(obj) {
   513  			klog.V(5).Infof("UpdateFunc: CRD updated: %v", id)
   514  			w.onCRDUpdate(obj)
   515  		}
   516  
   517  		if isObjectUnschedulable(rs) {
   518  			klog.V(5).Infof("UpdateFunc: object unschedulable: %v", id)
   519  			// schedule delayed status update
   520  			w.taskManager.Schedule(ctx, id, status.ScheduleWindow,
   521  				w.newStatusCheckTaskFunc(ctx, eventCh, id))
   522  		}
   523  
   524  		klog.V(7).Infof("UpdateFunc: sending update event: %v", rs)
   525  		eventCh <- event.Event{
   526  			Type:     event.ResourceUpdateEvent,
   527  			Resource: rs,
   528  		}
   529  	}
   530  
   531  	handler.DeleteFunc = func(iobj interface{}) {
   532  		// Bail early if the context is cancelled, to avoid unnecessary work.
   533  		if ctx.Err() != nil {
   534  			return
   535  		}
   536  
   537  		if tombstone, ok := iobj.(cache.DeletedFinalStateUnknown); ok {
   538  			// Last state unknown. Possibly stale.
   539  			// TODO: Should we propegate this uncertainty to the caller?
   540  			iobj = tombstone.Obj
   541  		}
   542  		obj, ok := iobj.(*unstructured.Unstructured)
   543  		if !ok {
   544  			panic(fmt.Sprintf("DeleteFunc received unexpected object type %T", iobj))
   545  		}
   546  		id := object.UnstructuredToObjMetadata(obj)
   547  		if w.ObjectFilter.Filter(obj) {
   548  			klog.V(7).Infof("DeleteFunc: Watch Event Skipped: %s", id)
   549  			return
   550  		}
   551  		klog.V(5).Infof("DeleteFunc: Computing status for object: %s", id)
   552  
   553  		// cancel any scheduled status update for this object
   554  		w.taskManager.Cancel(id)
   555  
   556  		if object.IsNamespace(obj) {
   557  			klog.V(5).Infof("DeleteFunc: Namespace deleted: %v", id)
   558  			w.onNamespaceDelete(obj)
   559  		} else if object.IsCRD(obj) {
   560  			klog.V(5).Infof("DeleteFunc: CRD deleted: %v", id)
   561  			w.onCRDDelete(obj)
   562  		}
   563  
   564  		rs := deletedStatus(id)
   565  		klog.V(7).Infof("DeleteFunc: sending update event: %v", rs)
   566  		eventCh <- event.Event{
   567  			Type:     event.ResourceUpdateEvent,
   568  			Resource: rs,
   569  		}
   570  	}
   571  
   572  	return handler
   573  }
   574  
   575  // onCRDAdd handles creating a new informer to watch the new resource type.
   576  func (w *ObjectStatusReporter) onCRDAdd(obj *unstructured.Unstructured) {
   577  	gk, found := object.GetCRDGroupKind(obj)
   578  	if !found {
   579  		id := object.UnstructuredToObjMetadata(obj)
   580  		klog.Warningf("Invalid CRD added: missing group and/or kind: %v", id)
   581  		// Don't return an error, because this should not inturrupt the task queue.
   582  		// TODO: Allow non-fatal errors to be reported using a specific error type.
   583  		return
   584  	}
   585  	klog.V(3).Infof("CRD added for %s", gk)
   586  
   587  	klog.V(3).Info("Resetting RESTMapper")
   588  	// Reset mapper to invalidate cache.
   589  	meta.MaybeResetRESTMapper(w.Mapper)
   590  
   591  	w.forEachTargetWithGroupKind(gk, func(gkn GroupKindNamespace) {
   592  		w.startInformer(gkn)
   593  	})
   594  }
   595  
   596  // onCRDUpdate handles creating a new informer to watch the updated resource type.
   597  func (w *ObjectStatusReporter) onCRDUpdate(newObj *unstructured.Unstructured) {
   598  	gk, found := object.GetCRDGroupKind(newObj)
   599  	if !found {
   600  		id := object.UnstructuredToObjMetadata(newObj)
   601  		klog.Warningf("Invalid CRD updated: missing group and/or kind: %v", id)
   602  		// Don't return an error, because this should not inturrupt the task queue.
   603  		// TODO: Allow non-fatal errors to be reported using a specific error type.
   604  		return
   605  	}
   606  	klog.V(3).Infof("CRD updated for %s", gk)
   607  
   608  	klog.V(3).Info("Resetting RESTMapper")
   609  	// Reset mapper to invalidate cache.
   610  	meta.MaybeResetRESTMapper(w.Mapper)
   611  
   612  	w.forEachTargetWithGroupKind(gk, func(gkn GroupKindNamespace) {
   613  		w.startInformer(gkn)
   614  	})
   615  }
   616  
   617  // onCRDDelete handles stopping the informer watching the deleted resource type.
   618  func (w *ObjectStatusReporter) onCRDDelete(oldObj *unstructured.Unstructured) {
   619  	gk, found := object.GetCRDGroupKind(oldObj)
   620  	if !found {
   621  		id := object.UnstructuredToObjMetadata(oldObj)
   622  		klog.Warningf("Invalid CRD deleted: missing group and/or kind: %v", id)
   623  		// Don't return an error, because this should not inturrupt the task queue.
   624  		// TODO: Allow non-fatal errors to be reported using a specific error type.
   625  		return
   626  	}
   627  	klog.V(3).Infof("CRD deleted for %s", gk)
   628  
   629  	w.forEachTargetWithGroupKind(gk, func(gkn GroupKindNamespace) {
   630  		w.stopInformer(gkn)
   631  	})
   632  
   633  	klog.V(3).Info("Resetting RESTMapper")
   634  	// Reset mapper to invalidate cache.
   635  	meta.MaybeResetRESTMapper(w.Mapper)
   636  }
   637  
   638  // onNamespaceAdd handles creating new informers to watch this namespace.
   639  func (w *ObjectStatusReporter) onNamespaceAdd(obj *unstructured.Unstructured) {
   640  	if w.RESTScope == meta.RESTScopeRoot {
   641  		// When watching resources across all namespaces,
   642  		// we don't need to start or stop any
   643  		// namespace-specific informers.
   644  		return
   645  	}
   646  	namespace := obj.GetName()
   647  	w.forEachTargetWithNamespace(namespace, func(gkn GroupKindNamespace) {
   648  		w.startInformer(gkn)
   649  	})
   650  }
   651  
   652  // onNamespaceUpdate handles creating new informers to watch this namespace.
   653  func (w *ObjectStatusReporter) onNamespaceUpdate(obj *unstructured.Unstructured) {
   654  	if w.RESTScope == meta.RESTScopeRoot {
   655  		// When watching resources across all namespaces,
   656  		// we don't need to start or stop any
   657  		// namespace-specific informers.
   658  		return
   659  	}
   660  	namespace := obj.GetName()
   661  	w.forEachTargetWithNamespace(namespace, func(gkn GroupKindNamespace) {
   662  		w.startInformer(gkn)
   663  	})
   664  }
   665  
   666  // onNamespaceDelete handles stopping informers watching this namespace.
   667  func (w *ObjectStatusReporter) onNamespaceDelete(obj *unstructured.Unstructured) {
   668  	if w.RESTScope == meta.RESTScopeRoot {
   669  		// When watching resources across all namespaces,
   670  		// we don't need to start or stop any
   671  		// namespace-specific informers.
   672  		return
   673  	}
   674  	namespace := obj.GetName()
   675  	w.forEachTargetWithNamespace(namespace, func(gkn GroupKindNamespace) {
   676  		w.stopInformer(gkn)
   677  	})
   678  }
   679  
   680  // newStatusCheckTaskFunc returns a taskFund that reads the status of an object
   681  // from the cluster and sends it over the event channel.
   682  //
   683  // This method should only be used for generated resource objects, as it's much
   684  // slower at scale than watching the resource for updates.
   685  func (w *ObjectStatusReporter) newStatusCheckTaskFunc(
   686  	ctx context.Context,
   687  	eventCh chan<- event.Event,
   688  	id object.ObjMetadata,
   689  ) taskFunc {
   690  	return func() {
   691  		klog.V(5).Infof("Re-reading object status: %s", status.ScheduleWindow, id)
   692  		// check again
   693  		rs, err := w.readStatusFromCluster(ctx, id)
   694  		if err != nil {
   695  			// Send error event and stop the reporter!
   696  			// TODO: retry N times before terminating
   697  			w.handleFatalError(eventCh, err)
   698  			return
   699  		}
   700  		eventCh <- event.Event{
   701  			Type:     event.ResourceUpdateEvent,
   702  			Resource: rs,
   703  		}
   704  	}
   705  }
   706  
   707  func (w *ObjectStatusReporter) handleFatalError(eventCh chan<- event.Event, err error) {
   708  	klog.V(5).Infof("Reporter error: %v", err)
   709  	if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
   710  		return
   711  	}
   712  	eventCh <- event.Event{
   713  		Type:  event.ErrorEvent,
   714  		Error: err,
   715  	}
   716  	w.Stop()
   717  }
   718  
   719  // watchErrorHandler logs errors and cancels the informer for this GroupKind
   720  // if the NotFound error is received, which usually means the CRD was deleted.
   721  // Based on DefaultWatchErrorHandler from k8s.io/client-go@v0.23.2/tools/cache/reflector.go
   722  func (w *ObjectStatusReporter) watchErrorHandler(gkn GroupKindNamespace, eventCh chan<- event.Event, err error) {
   723  	switch {
   724  	// Stop channel closed
   725  	case err == io.EOF:
   726  		klog.V(5).Infof("ListAndWatch error (termination expected): %v: %v", gkn, err)
   727  
   728  	// Watch connection closed
   729  	case err == io.ErrUnexpectedEOF:
   730  		klog.V(1).Infof("ListAndWatch error (retry expected): %v: %v", gkn, err)
   731  
   732  	// Context done
   733  	case errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded):
   734  		klog.V(5).Infof("ListAndWatch error (termination expected): %v: %v", gkn, err)
   735  
   736  	// resourceVersion too old
   737  	case apierrors.IsResourceExpired(err):
   738  		// Keep retrying
   739  		klog.V(5).Infof("ListAndWatch error (retry expected): %v: %v", gkn, err)
   740  
   741  	// Resource unregistered (DEPRECATED, see NotFound)
   742  	case apierrors.IsGone(err):
   743  		klog.V(5).Infof("ListAndWatch error (retry expected): %v: %v", gkn, err)
   744  
   745  	// Resource not registered
   746  	case apierrors.IsNotFound(err):
   747  		klog.V(3).Infof("ListAndWatch error (termination expected): %v: stopping all informers for this GroupKind: %v", gkn, err)
   748  		w.forEachTargetWithGroupKind(gkn.GroupKind(), func(gkn GroupKindNamespace) {
   749  			w.stopInformer(gkn)
   750  		})
   751  
   752  	// Insufficient permissions
   753  	case apierrors.IsForbidden(err):
   754  		klog.V(3).Infof("ListAndWatch error (termination expected): %v: stopping all informers: %v", gkn, err)
   755  		w.handleFatalError(eventCh, err)
   756  
   757  	// Unexpected error
   758  	default:
   759  		klog.Warningf("ListAndWatch error (retry expected): %v: %v", gkn, err)
   760  	}
   761  }
   762  
   763  // informerReference tracks informer lifecycle.
   764  type informerReference struct {
   765  	// lock guards the subsequent stateful fields
   766  	lock sync.Mutex
   767  
   768  	informer cache.SharedIndexInformer
   769  	context  context.Context
   770  	cancel   context.CancelFunc
   771  	started  bool
   772  }
   773  
   774  // Start returns a wrapped context that can be cancelled.
   775  // Returns nil & false if already started.
   776  func (ir *informerReference) Start(ctx context.Context) (context.Context, bool) {
   777  	ir.lock.Lock()
   778  	defer ir.lock.Unlock()
   779  
   780  	if ir.started {
   781  		return nil, false
   782  	}
   783  
   784  	ctx, cancel := context.WithCancel(ctx)
   785  	ir.context = ctx
   786  	ir.cancel = cancel
   787  	ir.started = true
   788  
   789  	return ctx, true
   790  }
   791  
   792  func (ir *informerReference) SetInformer(informer cache.SharedIndexInformer) {
   793  	ir.lock.Lock()
   794  	defer ir.lock.Unlock()
   795  
   796  	ir.informer = informer
   797  }
   798  
   799  func (ir *informerReference) HasSynced() bool {
   800  	ir.lock.Lock()
   801  	defer ir.lock.Unlock()
   802  
   803  	if !ir.started {
   804  		return false
   805  	}
   806  	if ir.informer == nil {
   807  		return false
   808  	}
   809  	return ir.informer.HasSynced()
   810  }
   811  
   812  func (ir *informerReference) HasStarted() bool {
   813  	ir.lock.Lock()
   814  	defer ir.lock.Unlock()
   815  
   816  	return ir.started
   817  }
   818  
   819  // Stop cancels the context, if it's been started.
   820  func (ir *informerReference) Stop() {
   821  	ir.lock.Lock()
   822  	defer ir.lock.Unlock()
   823  
   824  	if !ir.started {
   825  		return
   826  	}
   827  
   828  	ir.cancel()
   829  	ir.started = false
   830  	ir.context = nil
   831  }
   832  
   833  type taskFunc func()
   834  
   835  // taskManager manages a set of tasks with object identifiers.
   836  // This makes starting and stopping the tasks thread-safe.
   837  type taskManager struct {
   838  	lock        sync.Mutex
   839  	cancelFuncs map[object.ObjMetadata]context.CancelFunc
   840  }
   841  
   842  func (tm *taskManager) Schedule(parentCtx context.Context, id object.ObjMetadata, delay time.Duration, task taskFunc) {
   843  	tm.lock.Lock()
   844  	defer tm.lock.Unlock()
   845  
   846  	if tm.cancelFuncs == nil {
   847  		tm.cancelFuncs = make(map[object.ObjMetadata]context.CancelFunc)
   848  	}
   849  
   850  	cancel, found := tm.cancelFuncs[id]
   851  	if found {
   852  		// Cancel the existing scheduled task and replace it.
   853  		cancel()
   854  	}
   855  
   856  	taskCtx, cancel := context.WithTimeout(context.Background(), delay)
   857  	tm.cancelFuncs[id] = cancel
   858  
   859  	go func() {
   860  		klog.V(5).Infof("Task scheduled (%v) for object (%s)", delay, id)
   861  		select {
   862  		case <-parentCtx.Done():
   863  			// stop waiting
   864  			cancel()
   865  		case <-taskCtx.Done():
   866  			if taskCtx.Err() == context.DeadlineExceeded {
   867  				klog.V(5).Infof("Task executing (after %v) for object (%v)", delay, id)
   868  				task()
   869  			}
   870  			// else stop waiting
   871  		}
   872  	}()
   873  }
   874  
   875  func (tm *taskManager) Cancel(id object.ObjMetadata) {
   876  	tm.lock.Lock()
   877  	defer tm.lock.Unlock()
   878  
   879  	cancelFunc, found := tm.cancelFuncs[id]
   880  	if !found {
   881  		// already cancelled or not added
   882  		return
   883  	}
   884  	delete(tm.cancelFuncs, id)
   885  	cancelFunc()
   886  	if len(tm.cancelFuncs) == 0 {
   887  		tm.cancelFuncs = nil
   888  	}
   889  }
   890  

View as plain text