...

Source file src/sigs.k8s.io/cli-utils/pkg/kstatus/polling/engine/engine.go

Documentation: sigs.k8s.io/cli-utils/pkg/kstatus/polling/engine

     1  // Copyright 2020 The Kubernetes Authors.
     2  // SPDX-License-Identifier: Apache-2.0
     3  
     4  package engine
     5  
     6  import (
     7  	"context"
     8  	"errors"
     9  	"fmt"
    10  	"time"
    11  
    12  	"k8s.io/apimachinery/pkg/api/meta"
    13  	"k8s.io/apimachinery/pkg/runtime/schema"
    14  	"sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
    15  	"sigs.k8s.io/cli-utils/pkg/object"
    16  	"sigs.k8s.io/controller-runtime/pkg/client"
    17  )
    18  
    19  // ClusterReaderFactory provides an interface that can be implemented to provide custom
    20  // ClusterReader implementations in the StatusPoller.
    21  type ClusterReaderFactory interface {
    22  	New(reader client.Reader, mapper meta.RESTMapper, identifiers object.ObjMetadataSet) (ClusterReader, error)
    23  }
    24  
    25  type ClusterReaderFactoryFunc func(client.Reader, meta.RESTMapper, object.ObjMetadataSet) (ClusterReader, error)
    26  
    27  func (c ClusterReaderFactoryFunc) New(r client.Reader, m meta.RESTMapper, ids object.ObjMetadataSet) (ClusterReader, error) {
    28  	return c(r, m, ids)
    29  }
    30  
    31  // PollerEngine provides functionality for polling a cluster for status of a set of resources.
    32  type PollerEngine struct {
    33  	Reader               client.Reader
    34  	Mapper               meta.RESTMapper
    35  	StatusReaders        []StatusReader
    36  	DefaultStatusReader  StatusReader
    37  	ClusterReaderFactory ClusterReaderFactory
    38  }
    39  
    40  // Poll will create a new statusPollerRunner that will poll all the resources provided and report their status
    41  // back on the event channel returned. The statusPollerRunner can be cancelled at any time by cancelling the
    42  // context passed in.
    43  // The context can be used to stop the polling process by using timeout, deadline or
    44  // cancellation.
    45  func (s *PollerEngine) Poll(ctx context.Context, identifiers object.ObjMetadataSet, options Options) <-chan event.Event {
    46  	eventChannel := make(chan event.Event)
    47  
    48  	go func() {
    49  		defer close(eventChannel)
    50  
    51  		err := s.validateIdentifiers(identifiers)
    52  		if err != nil {
    53  			handleError(eventChannel, err)
    54  			return
    55  		}
    56  
    57  		clusterReader, err := s.ClusterReaderFactory.New(s.Reader, s.Mapper, identifiers)
    58  		if err != nil {
    59  			handleError(eventChannel, fmt.Errorf("error creating new ClusterReader: %w", err))
    60  			return
    61  		}
    62  
    63  		runner := &statusPollerRunner{
    64  			clusterReader:            clusterReader,
    65  			statusReaders:            s.StatusReaders,
    66  			defaultStatusReader:      s.DefaultStatusReader,
    67  			identifiers:              identifiers,
    68  			previousResourceStatuses: make(map[object.ObjMetadata]*event.ResourceStatus),
    69  			eventChannel:             eventChannel,
    70  			pollingInterval:          options.PollInterval,
    71  		}
    72  		runner.Run(ctx)
    73  	}()
    74  
    75  	return eventChannel
    76  }
    77  
    78  func handleError(eventChannel chan event.Event, err error) {
    79  	eventChannel <- event.Event{
    80  		Type:  event.ErrorEvent,
    81  		Error: err,
    82  	}
    83  }
    84  
    85  // validateIdentifiers makes sure that all namespaced resources
    86  // passed in
    87  func (s *PollerEngine) validateIdentifiers(identifiers object.ObjMetadataSet) error {
    88  	for _, id := range identifiers {
    89  		mapping, err := s.Mapper.RESTMapping(id.GroupKind)
    90  		if err != nil {
    91  			// If we can't find a match, just keep going. This can happen
    92  			// if CRDs and CRs are applied at the same time.
    93  			if meta.IsNoMatchError(err) {
    94  				continue
    95  			}
    96  			return err
    97  		}
    98  		if mapping.Scope.Name() == meta.RESTScopeNameNamespace && id.Namespace == "" {
    99  			return fmt.Errorf("resource %s %s is namespace scoped, but namespace is not set",
   100  				id.GroupKind.String(), id.Name)
   101  		}
   102  	}
   103  	return nil
   104  }
   105  
   106  // Options contains the different parameters that can be used to adjust the
   107  // behavior of the PollerEngine.
   108  // Timeout is not one of the options here as this should be accomplished by
   109  // setting a timeout on the context: https://golang.org/pkg/context/
   110  type Options struct {
   111  
   112  	// PollInterval defines how often the PollerEngine should poll the cluster for the latest
   113  	// state of the resources.
   114  	PollInterval time.Duration
   115  }
   116  
   117  // statusPollerRunner is responsible for polling of a set of resources. Each call to Poll will create
   118  // a new statusPollerRunner, which means we can keep state in the runner and all data will only be accessed
   119  // by a single goroutine, meaning we don't need synchronization.
   120  // The statusPollerRunner uses an implementation of the ClusterReader interface to talk to the
   121  // kubernetes cluster. Currently this can be either the cached ClusterReader that syncs all needed resources
   122  // with LIST calls before each polling loop, or the normal ClusterReader that just forwards each call
   123  // to the client.Reader from controller-runtime.
   124  type statusPollerRunner struct {
   125  	// clusterReader is the interface for fetching and listing resources from the cluster. It can be implemented
   126  	// to make call directly to the cluster or use caching to reduce the number of calls to the cluster.
   127  	clusterReader ClusterReader
   128  
   129  	// statusReaders contains the resource specific statusReaders. These will contain logic for how to
   130  	// compute status for specific GroupKinds. These will use an ClusterReader to fetch
   131  	// status of a resource and any generated resources.
   132  	statusReaders []StatusReader
   133  
   134  	// defaultStatusReader is the generic engine that is used for all GroupKinds that
   135  	// doesn't have a specific engine in the statusReaders map.
   136  	defaultStatusReader StatusReader
   137  
   138  	// identifiers contains the set of identifiers for the resources that should be polled.
   139  	// Each resource is identified by GroupKind, namespace and name.
   140  	identifiers object.ObjMetadataSet
   141  
   142  	// previousResourceStatuses keeps track of the last event for each
   143  	// of the polled resources. This is used to make sure we only
   144  	// send events on the event channel when something has actually changed.
   145  	previousResourceStatuses map[object.ObjMetadata]*event.ResourceStatus
   146  
   147  	// eventChannel is a channel where any updates to the status of resources
   148  	// will be sent. The caller of Poll will listen for updates.
   149  	eventChannel chan event.Event
   150  
   151  	// pollingInterval determines how often we should poll the cluster for
   152  	// the latest state of resources.
   153  	pollingInterval time.Duration
   154  }
   155  
   156  // Run starts the polling loop of the statusReaders.
   157  func (r *statusPollerRunner) Run(ctx context.Context) {
   158  	// Sets up ticker that will trigger the regular polling loop at a regular interval.
   159  	ticker := time.NewTicker(r.pollingInterval)
   160  	defer func() {
   161  		ticker.Stop()
   162  	}()
   163  
   164  	err := r.syncAndPoll(ctx)
   165  	if err != nil {
   166  		r.handleSyncAndPollErr(err)
   167  		return
   168  	}
   169  
   170  	for {
   171  		select {
   172  		case <-ctx.Done():
   173  			return
   174  		case <-ticker.C:
   175  			// First sync and then compute status for all resources.
   176  			err := r.syncAndPoll(ctx)
   177  			if err != nil {
   178  				r.handleSyncAndPollErr(err)
   179  				return
   180  			}
   181  		}
   182  	}
   183  }
   184  
   185  // handleSyncAndPollErr decides what to do if we encounter an error while
   186  // fetching resources to compute status. Errors are usually returned
   187  // as an ErrorEvent, but we handle context cancellation or deadline exceeded
   188  // differently since they aren't really errors, but a signal that the
   189  // process should shut down.
   190  func (r *statusPollerRunner) handleSyncAndPollErr(err error) {
   191  	if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
   192  		return
   193  	}
   194  	r.eventChannel <- event.Event{
   195  		Type:  event.ErrorEvent,
   196  		Error: err,
   197  	}
   198  }
   199  
   200  func (r *statusPollerRunner) syncAndPoll(ctx context.Context) error {
   201  	// First trigger a sync of the ClusterReader. This may or may not actually
   202  	// result in calls to the cluster, depending on the implementation.
   203  	// If this call fails, there is no clean way to recover, so we just return an ErrorEvent
   204  	// and shut down.
   205  	err := r.clusterReader.Sync(ctx)
   206  	if err != nil {
   207  		return err
   208  	}
   209  	// Poll all resources and compute status. If the polling of resources has completed (based
   210  	// on information from the StatusAggregator and the value of pollUntilCancelled), we send
   211  	// a CompletedEvent and return.
   212  	return r.pollStatusForAllResources(ctx)
   213  }
   214  
   215  // pollStatusForAllResources iterates over all the resources in the set and delegates
   216  // to the appropriate engine to compute the status.
   217  func (r *statusPollerRunner) pollStatusForAllResources(ctx context.Context) error {
   218  	for _, id := range r.identifiers {
   219  		// Check if the context has been cancelled on every iteration.
   220  		select {
   221  		case <-ctx.Done():
   222  			return ctx.Err()
   223  		default:
   224  		}
   225  		gk := id.GroupKind
   226  		statusReader := r.statusReaderForGroupKind(gk)
   227  		resourceStatus, err := statusReader.ReadStatus(ctx, r.clusterReader, id)
   228  		if err != nil {
   229  			return err
   230  		}
   231  		if r.isUpdatedResourceStatus(resourceStatus) {
   232  			r.previousResourceStatuses[id] = resourceStatus
   233  			r.eventChannel <- event.Event{
   234  				Type:     event.ResourceUpdateEvent,
   235  				Resource: resourceStatus,
   236  			}
   237  		}
   238  	}
   239  	return nil
   240  }
   241  
   242  func (r *statusPollerRunner) statusReaderForGroupKind(gk schema.GroupKind) StatusReader {
   243  	for _, sr := range r.statusReaders {
   244  		if sr.Supports(gk) {
   245  			return sr
   246  		}
   247  	}
   248  	return r.defaultStatusReader
   249  }
   250  
   251  func (r *statusPollerRunner) isUpdatedResourceStatus(resourceStatus *event.ResourceStatus) bool {
   252  	oldResourceStatus, found := r.previousResourceStatuses[resourceStatus.Identifier]
   253  	if !found {
   254  		return true
   255  	}
   256  	return !event.ResourceStatusEqual(resourceStatus, oldResourceStatus)
   257  }
   258  

View as plain text