...

Source file src/sigs.k8s.io/cli-utils/pkg/kstatus/watcher/default_status_watcher.go

Documentation: sigs.k8s.io/cli-utils/pkg/kstatus/watcher

     1  // Copyright 2022 The Kubernetes Authors.
     2  // SPDX-License-Identifier: Apache-2.0
     3  
     4  package watcher
     5  
     6  import (
     7  	"context"
     8  	"fmt"
     9  	"time"
    10  
    11  	"k8s.io/apimachinery/pkg/api/meta"
    12  	"k8s.io/apimachinery/pkg/runtime/schema"
    13  	"k8s.io/client-go/dynamic"
    14  	"k8s.io/klog/v2"
    15  	"sigs.k8s.io/cli-utils/pkg/kstatus/polling/clusterreader"
    16  	"sigs.k8s.io/cli-utils/pkg/kstatus/polling/engine"
    17  	"sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
    18  	"sigs.k8s.io/cli-utils/pkg/kstatus/polling/statusreaders"
    19  	"sigs.k8s.io/cli-utils/pkg/object"
    20  )
    21  
    22  // DefaultStatusWatcher reports on status updates to a set of objects.
    23  //
    24  // Use NewDefaultStatusWatcher to build a DefaultStatusWatcher with default settings.
    25  type DefaultStatusWatcher struct {
    26  	// DynamicClient is used to watch of resource objects.
    27  	DynamicClient dynamic.Interface
    28  
    29  	// Mapper is used to map from GroupKind to GroupVersionKind.
    30  	Mapper meta.RESTMapper
    31  
    32  	// ResyncPeriod is how often the objects are retrieved to re-synchronize,
    33  	// in case any events were missed.
    34  	ResyncPeriod time.Duration
    35  
    36  	// StatusReader specifies a custom implementation of the
    37  	// engine.StatusReader interface that will be used to compute reconcile
    38  	// status for resource objects.
    39  	StatusReader engine.StatusReader
    40  
    41  	// ClusterReader is used to look up generated objects on-demand.
    42  	// Generated objects (ex: Deployment > ReplicaSet > Pod) are sometimes
    43  	// required for computing parent object status, to compensate for
    44  	// controllers that aren't following status conventions.
    45  	ClusterReader engine.ClusterReader
    46  }
    47  
    48  var _ StatusWatcher = &DefaultStatusWatcher{}
    49  
    50  // NewDefaultStatusWatcher constructs a DynamicStatusWatcher with defaults
    51  // chosen for general use. If you need different settings, consider building a
    52  // DynamicStatusWatcher directly.
    53  func NewDefaultStatusWatcher(dynamicClient dynamic.Interface, mapper meta.RESTMapper) *DefaultStatusWatcher {
    54  	return &DefaultStatusWatcher{
    55  		DynamicClient: dynamicClient,
    56  		Mapper:        mapper,
    57  		ResyncPeriod:  1 * time.Hour,
    58  		StatusReader:  statusreaders.NewDefaultStatusReader(mapper),
    59  		ClusterReader: &clusterreader.DynamicClusterReader{
    60  			DynamicClient: dynamicClient,
    61  			Mapper:        mapper,
    62  		},
    63  	}
    64  }
    65  
    66  // Watch the cluster for changes made to the specified objects.
    67  // Returns an event channel on which these updates (and errors) will be reported.
    68  // Each update event includes the computed status of the object.
    69  func (w *DefaultStatusWatcher) Watch(ctx context.Context, ids object.ObjMetadataSet, opts Options) <-chan event.Event {
    70  	strategy := opts.RESTScopeStrategy
    71  	if strategy == RESTScopeAutomatic {
    72  		strategy = autoSelectRESTScopeStrategy(ids)
    73  	}
    74  
    75  	var scope meta.RESTScope
    76  	var targets []GroupKindNamespace
    77  	switch strategy {
    78  	case RESTScopeRoot:
    79  		scope = meta.RESTScopeRoot
    80  		targets = rootScopeGKNs(ids)
    81  		klog.V(3).Infof("DynamicStatusWatcher starting in root-scoped mode (targets: %d)", len(targets))
    82  	case RESTScopeNamespace:
    83  		scope = meta.RESTScopeNamespace
    84  		targets = namespaceScopeGKNs(ids)
    85  		klog.V(3).Infof("DynamicStatusWatcher starting in namespace-scoped mode (targets: %d)", len(targets))
    86  	default:
    87  		return handleFatalError(fmt.Errorf("invalid RESTScopeStrategy: %v", strategy))
    88  	}
    89  
    90  	informer := &ObjectStatusReporter{
    91  		InformerFactory: NewDynamicInformerFactory(w.DynamicClient, w.ResyncPeriod),
    92  		Mapper:          w.Mapper,
    93  		StatusReader:    w.StatusReader,
    94  		ClusterReader:   w.ClusterReader,
    95  		Targets:         targets,
    96  		ObjectFilter:    &AllowListObjectFilter{AllowList: ids},
    97  		RESTScope:       scope,
    98  	}
    99  	return informer.Start(ctx)
   100  }
   101  
   102  func handleFatalError(err error) <-chan event.Event {
   103  	eventCh := make(chan event.Event)
   104  	go func() {
   105  		defer close(eventCh)
   106  		eventCh <- event.Event{
   107  			Type:  event.ErrorEvent,
   108  			Error: err,
   109  		}
   110  	}()
   111  	return eventCh
   112  }
   113  
   114  func autoSelectRESTScopeStrategy(ids object.ObjMetadataSet) RESTScopeStrategy {
   115  	if len(uniqueNamespaces(ids)) > 1 {
   116  		return RESTScopeRoot
   117  	}
   118  	return RESTScopeNamespace
   119  }
   120  
   121  func rootScopeGKNs(ids object.ObjMetadataSet) []GroupKindNamespace {
   122  	gks := uniqueGKs(ids)
   123  	targets := make([]GroupKindNamespace, len(gks))
   124  	for i, gk := range gks {
   125  		targets[i] = GroupKindNamespace{
   126  			Group:     gk.Group,
   127  			Kind:      gk.Kind,
   128  			Namespace: "",
   129  		}
   130  	}
   131  	return targets
   132  }
   133  
   134  func namespaceScopeGKNs(ids object.ObjMetadataSet) []GroupKindNamespace {
   135  	return uniqueGKNs(ids)
   136  }
   137  
   138  // uniqueGKNs returns a set of unique GroupKindNamespaces from a set of object identifiers.
   139  func uniqueGKNs(ids object.ObjMetadataSet) []GroupKindNamespace {
   140  	gknMap := make(map[GroupKindNamespace]struct{})
   141  	for _, id := range ids {
   142  		gkn := GroupKindNamespace{Group: id.GroupKind.Group, Kind: id.GroupKind.Kind, Namespace: id.Namespace}
   143  		gknMap[gkn] = struct{}{}
   144  	}
   145  	gknList := make([]GroupKindNamespace, 0, len(gknMap))
   146  	for gk := range gknMap {
   147  		gknList = append(gknList, gk)
   148  	}
   149  	return gknList
   150  }
   151  
   152  // uniqueGKs returns a set of unique GroupKinds from a set of object identifiers.
   153  func uniqueGKs(ids object.ObjMetadataSet) []schema.GroupKind {
   154  	gkMap := make(map[schema.GroupKind]struct{})
   155  	for _, id := range ids {
   156  		gkn := schema.GroupKind{Group: id.GroupKind.Group, Kind: id.GroupKind.Kind}
   157  		gkMap[gkn] = struct{}{}
   158  	}
   159  	gkList := make([]schema.GroupKind, 0, len(gkMap))
   160  	for gk := range gkMap {
   161  		gkList = append(gkList, gk)
   162  	}
   163  	return gkList
   164  }
   165  
   166  func uniqueNamespaces(ids object.ObjMetadataSet) []string {
   167  	nsMap := make(map[string]struct{})
   168  	for _, id := range ids {
   169  		nsMap[id.Namespace] = struct{}{}
   170  	}
   171  	nsList := make([]string, 0, len(nsMap))
   172  	for ns := range nsMap {
   173  		nsList = append(nsList, ns)
   174  	}
   175  	return nsList
   176  }
   177  

View as plain text