...

Source file src/sigs.k8s.io/cli-utils/pkg/kstatus/polling/polling.go

Documentation: sigs.k8s.io/cli-utils/pkg/kstatus/polling

     1  // Copyright 2020 The Kubernetes Authors.
     2  // SPDX-License-Identifier: Apache-2.0
     3  
     4  package polling
     5  
     6  import (
     7  	"context"
     8  	"fmt"
     9  	"time"
    10  
    11  	"k8s.io/apimachinery/pkg/api/meta"
    12  	cmdutil "k8s.io/kubectl/pkg/cmd/util"
    13  	"k8s.io/kubectl/pkg/scheme"
    14  	"sigs.k8s.io/cli-utils/pkg/kstatus/polling/clusterreader"
    15  	"sigs.k8s.io/cli-utils/pkg/kstatus/polling/engine"
    16  	"sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
    17  	"sigs.k8s.io/cli-utils/pkg/kstatus/polling/statusreaders"
    18  	"sigs.k8s.io/cli-utils/pkg/kstatus/status"
    19  	"sigs.k8s.io/cli-utils/pkg/object"
    20  	"sigs.k8s.io/controller-runtime/pkg/client"
    21  )
    22  
    23  // NewStatusPoller creates a new StatusPoller using the given clusterreader and mapper. The StatusPoller
    24  // will use the client for all calls to the cluster.
    25  func NewStatusPoller(reader client.Reader, mapper meta.RESTMapper, o Options) *StatusPoller {
    26  	setDefaults(&o)
    27  	var statusReaders []engine.StatusReader
    28  
    29  	statusReaders = append(statusReaders, o.CustomStatusReaders...)
    30  
    31  	srs, defaultStatusReader := createStatusReaders(mapper)
    32  	statusReaders = append(statusReaders, srs...)
    33  
    34  	return &StatusPoller{
    35  		engine: &engine.PollerEngine{
    36  			Reader:               reader,
    37  			Mapper:               mapper,
    38  			DefaultStatusReader:  defaultStatusReader,
    39  			StatusReaders:        statusReaders,
    40  			ClusterReaderFactory: o.ClusterReaderFactory,
    41  		},
    42  	}
    43  }
    44  
    45  // NewStatusPollerFromFactory creates a new StatusPoller instance from the
    46  // passed in factory.
    47  func NewStatusPollerFromFactory(f cmdutil.Factory, o Options) (*StatusPoller, error) {
    48  	config, err := f.ToRESTConfig()
    49  	if err != nil {
    50  		return nil, fmt.Errorf("error getting RESTConfig: %w", err)
    51  	}
    52  
    53  	mapper, err := f.ToRESTMapper()
    54  	if err != nil {
    55  		return nil, fmt.Errorf("error getting RESTMapper: %w", err)
    56  	}
    57  
    58  	c, err := client.New(config, client.Options{Scheme: scheme.Scheme, Mapper: mapper})
    59  	if err != nil {
    60  		return nil, fmt.Errorf("error creating client: %w", err)
    61  	}
    62  
    63  	return NewStatusPoller(c, mapper, o), nil
    64  }
    65  
    66  func setDefaults(o *Options) {
    67  	if o.ClusterReaderFactory == nil {
    68  		o.ClusterReaderFactory = engine.ClusterReaderFactoryFunc(clusterreader.NewCachingClusterReader)
    69  	}
    70  }
    71  
    72  // Options can be provided when creating a new StatusPoller to customize the
    73  // behavior.
    74  type Options struct {
    75  	// CustomStatusReaders specifies any implementations of the engine.StatusReader interface that will
    76  	// be used to compute reconcile status for resources.
    77  	CustomStatusReaders []engine.StatusReader
    78  
    79  	// ClusterReaderFactory allows for custom implementations of the engine.ClusterReader interface
    80  	// in the StatusPoller. The default implementation if the clusterreader.CachingClusterReader.
    81  	ClusterReaderFactory engine.ClusterReaderFactory
    82  }
    83  
    84  // StatusPoller provides functionality for polling a cluster for status for a set of resources.
    85  type StatusPoller struct {
    86  	engine *engine.PollerEngine
    87  }
    88  
    89  // Poll will create a new statusPollerRunner that will poll all the resources provided and report their status
    90  // back on the event channel returned. The statusPollerRunner can be cancelled at any time by cancelling the
    91  // context passed in.
    92  func (s *StatusPoller) Poll(ctx context.Context, identifiers object.ObjMetadataSet, options PollOptions) <-chan event.Event {
    93  	return s.engine.Poll(ctx, identifiers, engine.Options{
    94  		PollInterval: options.PollInterval,
    95  	})
    96  }
    97  
    98  // PollOptions defines the levers available for tuning the behavior of the
    99  // StatusPoller.
   100  type PollOptions struct {
   101  	// PollInterval defines how often the PollerEngine should poll the cluster for the latest
   102  	// state of the resources.
   103  	PollInterval time.Duration
   104  }
   105  
   106  // createStatusReaders creates an instance of all the statusreaders. This includes a set of statusreaders for
   107  // a particular GroupKind, and a default engine used for all resource types that does not have
   108  // a specific statusreaders.
   109  // TODO: We should consider making the registration more automatic instead of having to create each of them
   110  // here. Also, it might be worth creating them on demand.
   111  func createStatusReaders(mapper meta.RESTMapper) ([]engine.StatusReader, engine.StatusReader) {
   112  	defaultStatusReader := statusreaders.NewGenericStatusReader(mapper, status.Compute)
   113  
   114  	replicaSetStatusReader := statusreaders.NewReplicaSetStatusReader(mapper, defaultStatusReader)
   115  	deploymentStatusReader := statusreaders.NewDeploymentResourceReader(mapper, replicaSetStatusReader)
   116  	statefulSetStatusReader := statusreaders.NewStatefulSetResourceReader(mapper, defaultStatusReader)
   117  
   118  	statusReaders := []engine.StatusReader{
   119  		deploymentStatusReader,
   120  		statefulSetStatusReader,
   121  		replicaSetStatusReader,
   122  	}
   123  
   124  	return statusReaders, defaultStatusReader
   125  }
   126  

View as plain text