...

Source file src/sigs.k8s.io/controller-runtime/pkg/cache/cache.go

Documentation: sigs.k8s.io/controller-runtime/pkg/cache

     1  /*
     2  Copyright 2018 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package cache
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"net/http"
    23  	"sort"
    24  	"time"
    25  
    26  	"golang.org/x/exp/maps"
    27  	corev1 "k8s.io/api/core/v1"
    28  	"k8s.io/apimachinery/pkg/api/meta"
    29  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    30  	"k8s.io/apimachinery/pkg/fields"
    31  	"k8s.io/apimachinery/pkg/labels"
    32  	"k8s.io/apimachinery/pkg/runtime"
    33  	"k8s.io/apimachinery/pkg/runtime/schema"
    34  	"k8s.io/client-go/kubernetes/scheme"
    35  	"k8s.io/client-go/rest"
    36  	toolscache "k8s.io/client-go/tools/cache"
    37  	"k8s.io/utils/ptr"
    38  
    39  	"sigs.k8s.io/controller-runtime/pkg/cache/internal"
    40  	"sigs.k8s.io/controller-runtime/pkg/client"
    41  	"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
    42  )
    43  
    44  var (
    45  	defaultSyncPeriod = 10 * time.Hour
    46  )
    47  
    48  // InformerGetOptions defines the behavior of how informers are retrieved.
    49  type InformerGetOptions internal.GetOptions
    50  
    51  // InformerGetOption defines an option that alters the behavior of how informers are retrieved.
    52  type InformerGetOption func(*InformerGetOptions)
    53  
    54  // BlockUntilSynced determines whether a get request for an informer should block
    55  // until the informer's cache has synced.
    56  func BlockUntilSynced(shouldBlock bool) InformerGetOption {
    57  	return func(opts *InformerGetOptions) {
    58  		opts.BlockUntilSynced = &shouldBlock
    59  	}
    60  }
    61  
    62  // Cache knows how to load Kubernetes objects, fetch informers to request
    63  // to receive events for Kubernetes objects (at a low-level),
    64  // and add indices to fields on the objects stored in the cache.
    65  type Cache interface {
    66  	// Reader acts as a client to objects stored in the cache.
    67  	client.Reader
    68  
    69  	// Informers loads informers and adds field indices.
    70  	Informers
    71  }
    72  
    73  // Informers knows how to create or fetch informers for different
    74  // group-version-kinds, and add indices to those informers.  It's safe to call
    75  // GetInformer from multiple threads.
    76  type Informers interface {
    77  	// GetInformer fetches or constructs an informer for the given object that corresponds to a single
    78  	// API kind and resource.
    79  	GetInformer(ctx context.Context, obj client.Object, opts ...InformerGetOption) (Informer, error)
    80  
    81  	// GetInformerForKind is similar to GetInformer, except that it takes a group-version-kind, instead
    82  	// of the underlying object.
    83  	GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, opts ...InformerGetOption) (Informer, error)
    84  
    85  	// RemoveInformer removes an informer entry and stops it if it was running.
    86  	RemoveInformer(ctx context.Context, obj client.Object) error
    87  
    88  	// Start runs all the informers known to this cache until the context is closed.
    89  	// It blocks.
    90  	Start(ctx context.Context) error
    91  
    92  	// WaitForCacheSync waits for all the caches to sync. Returns false if it could not sync a cache.
    93  	WaitForCacheSync(ctx context.Context) bool
    94  
    95  	// FieldIndexer adds indices to the managed informers.
    96  	client.FieldIndexer
    97  }
    98  
    99  // Informer allows you to interact with the underlying informer.
   100  type Informer interface {
   101  	// AddEventHandler adds an event handler to the shared informer using the shared informer's resync
   102  	// period. Events to a single handler are delivered sequentially, but there is no coordination
   103  	// between different handlers.
   104  	// It returns a registration handle for the handler that can be used to remove
   105  	// the handler again and an error if the handler cannot be added.
   106  	AddEventHandler(handler toolscache.ResourceEventHandler) (toolscache.ResourceEventHandlerRegistration, error)
   107  
   108  	// AddEventHandlerWithResyncPeriod adds an event handler to the shared informer using the
   109  	// specified resync period. Events to a single handler are delivered sequentially, but there is
   110  	// no coordination between different handlers.
   111  	// It returns a registration handle for the handler that can be used to remove
   112  	// the handler again and an error if the handler cannot be added.
   113  	AddEventHandlerWithResyncPeriod(handler toolscache.ResourceEventHandler, resyncPeriod time.Duration) (toolscache.ResourceEventHandlerRegistration, error)
   114  
   115  	// RemoveEventHandler removes a previously added event handler given by
   116  	// its registration handle.
   117  	// This function is guaranteed to be idempotent and thread-safe.
   118  	RemoveEventHandler(handle toolscache.ResourceEventHandlerRegistration) error
   119  
   120  	// AddIndexers adds indexers to this store. If this is called after there is already data
   121  	// in the store, the results are undefined.
   122  	AddIndexers(indexers toolscache.Indexers) error
   123  
   124  	// HasSynced return true if the informers underlying store has synced.
   125  	HasSynced() bool
   126  	// IsStopped returns true if the informer has been stopped.
   127  	IsStopped() bool
   128  }
   129  
   130  // AllNamespaces should be used as the map key to deliminate namespace settings
   131  // that apply to all namespaces that themselves do not have explicit settings.
   132  const AllNamespaces = metav1.NamespaceAll
   133  
   134  // Options are the optional arguments for creating a new Cache object.
   135  type Options struct {
   136  	// HTTPClient is the http client to use for the REST client
   137  	HTTPClient *http.Client
   138  
   139  	// Scheme is the scheme to use for mapping objects to GroupVersionKinds
   140  	Scheme *runtime.Scheme
   141  
   142  	// Mapper is the RESTMapper to use for mapping GroupVersionKinds to Resources
   143  	Mapper meta.RESTMapper
   144  
   145  	// SyncPeriod determines the minimum frequency at which watched resources are
   146  	// reconciled. A lower period will correct entropy more quickly, but reduce
   147  	// responsiveness to change if there are many watched resources. Change this
   148  	// value only if you know what you are doing. Defaults to 10 hours if unset.
   149  	// there will a 10 percent jitter between the SyncPeriod of all controllers
   150  	// so that all controllers will not send list requests simultaneously.
   151  	//
   152  	// This applies to all controllers.
   153  	//
   154  	// A period sync happens for two reasons:
   155  	// 1. To insure against a bug in the controller that causes an object to not
   156  	// be requeued, when it otherwise should be requeued.
   157  	// 2. To insure against an unknown bug in controller-runtime, or its dependencies,
   158  	// that causes an object to not be requeued, when it otherwise should be
   159  	// requeued, or to be removed from the queue, when it otherwise should not
   160  	// be removed.
   161  	//
   162  	// If you want
   163  	// 1. to insure against missed watch events, or
   164  	// 2. to poll services that cannot be watched,
   165  	// then we recommend that, instead of changing the default period, the
   166  	// controller requeue, with a constant duration `t`, whenever the controller
   167  	// is "done" with an object, and would otherwise not requeue it, i.e., we
   168  	// recommend the `Reconcile` function return `reconcile.Result{RequeueAfter: t}`,
   169  	// instead of `reconcile.Result{}`.
   170  	SyncPeriod *time.Duration
   171  
   172  	// ReaderFailOnMissingInformer configures the cache to return a ErrResourceNotCached error when a user
   173  	// requests, using Get() and List(), a resource the cache does not already have an informer for.
   174  	//
   175  	// This error is distinct from an errors.NotFound.
   176  	//
   177  	// Defaults to false, which means that the cache will start a new informer
   178  	// for every new requested resource.
   179  	ReaderFailOnMissingInformer bool
   180  
   181  	// DefaultNamespaces maps namespace names to cache configs. If set, only
   182  	// the namespaces in here will be watched and it will by used to default
   183  	// ByObject.Namespaces for all objects if that is nil.
   184  	//
   185  	// It is possible to have specific Config for just some namespaces
   186  	// but cache all namespaces by using the AllNamespaces const as the map key.
   187  	// This will then include all namespaces that do not have a more specific
   188  	// setting.
   189  	//
   190  	// The options in the Config that are nil will be defaulted from
   191  	// the respective Default* settings.
   192  	DefaultNamespaces map[string]Config
   193  
   194  	// DefaultLabelSelector will be used as a label selector for all objects
   195  	// unless there is already one set in ByObject or DefaultNamespaces.
   196  	DefaultLabelSelector labels.Selector
   197  
   198  	// DefaultFieldSelector will be used as a field selector for all object types
   199  	// unless there is already one set in ByObject or DefaultNamespaces.
   200  	DefaultFieldSelector fields.Selector
   201  
   202  	// DefaultTransform will be used as transform for all object types
   203  	// unless there is already one set in ByObject or DefaultNamespaces.
   204  	//
   205  	// A typical usecase for this is to use TransformStripManagedFields
   206  	// to reduce the caches memory usage.
   207  	DefaultTransform toolscache.TransformFunc
   208  
   209  	// DefaultWatchErrorHandler will be used to the WatchErrorHandler which is called
   210  	// whenever ListAndWatch drops the connection with an error.
   211  	//
   212  	// After calling this handler, the informer will backoff and retry.
   213  	DefaultWatchErrorHandler toolscache.WatchErrorHandler
   214  
   215  	// DefaultUnsafeDisableDeepCopy is the default for UnsafeDisableDeepCopy
   216  	// for everything that doesn't specify this.
   217  	//
   218  	// Be very careful with this, when enabled you must DeepCopy any object before mutating it,
   219  	// otherwise you will mutate the object in the cache.
   220  	//
   221  	// This will be used for all object types, unless it is set in ByObject or
   222  	// DefaultNamespaces.
   223  	DefaultUnsafeDisableDeepCopy *bool
   224  
   225  	// ByObject restricts the cache's ListWatch to the desired fields per GVK at the specified object.
   226  	// If unset, this will fall through to the Default* settings.
   227  	ByObject map[client.Object]ByObject
   228  
   229  	// newInformer allows overriding of NewSharedIndexInformer for testing.
   230  	newInformer *func(toolscache.ListerWatcher, runtime.Object, time.Duration, toolscache.Indexers) toolscache.SharedIndexInformer
   231  }
   232  
   233  // ByObject offers more fine-grained control over the cache's ListWatch by object.
   234  type ByObject struct {
   235  	// Namespaces maps a namespace name to cache configs. If set, only the
   236  	// namespaces in this map will be cached.
   237  	//
   238  	// Settings in the map value that are unset will be defaulted.
   239  	// Use an empty value for the specific setting to prevent that.
   240  	//
   241  	// It is possible to have specific Config for just some namespaces
   242  	// but cache all namespaces by using the AllNamespaces const as the map key.
   243  	// This will then include all namespaces that do not have a more specific
   244  	// setting.
   245  	//
   246  	// A nil map allows to default this to the cache's DefaultNamespaces setting.
   247  	// An empty map prevents this and means that all namespaces will be cached.
   248  	//
   249  	// The defaulting follows the following precedence order:
   250  	// 1. ByObject
   251  	// 2. DefaultNamespaces[namespace]
   252  	// 3. Default*
   253  	//
   254  	// This must be unset for cluster-scoped objects.
   255  	Namespaces map[string]Config
   256  
   257  	// Label represents a label selector for the object.
   258  	Label labels.Selector
   259  
   260  	// Field represents a field selector for the object.
   261  	Field fields.Selector
   262  
   263  	// Transform is a transformer function for the object which gets applied
   264  	// when objects of the transformation are about to be committed to the cache.
   265  	//
   266  	// This function is called both for new objects to enter the cache,
   267  	// and for updated objects.
   268  	Transform toolscache.TransformFunc
   269  
   270  	// UnsafeDisableDeepCopy indicates not to deep copy objects during get or
   271  	// list objects per GVK at the specified object.
   272  	// Be very careful with this, when enabled you must DeepCopy any object before mutating it,
   273  	// otherwise you will mutate the object in the cache.
   274  	UnsafeDisableDeepCopy *bool
   275  }
   276  
   277  // Config describes all potential options for a given watch.
   278  type Config struct {
   279  	// LabelSelector specifies a label selector. A nil value allows to
   280  	// default this.
   281  	//
   282  	// Set to labels.Everything() if you don't want this defaulted.
   283  	LabelSelector labels.Selector
   284  
   285  	// FieldSelector specifics a field selector. A nil value allows to
   286  	// default this.
   287  	//
   288  	// Set to fields.Everything() if you don't want this defaulted.
   289  	FieldSelector fields.Selector
   290  
   291  	// Transform specifies a transform func. A nil value allows to default
   292  	// this.
   293  	//
   294  	// Set to an empty func to prevent this:
   295  	// func(in interface{}) (interface{}, error) { return in, nil }
   296  	Transform toolscache.TransformFunc
   297  
   298  	// UnsafeDisableDeepCopy specifies if List and Get requests against the
   299  	// cache should not DeepCopy. A nil value allows to default this.
   300  	UnsafeDisableDeepCopy *bool
   301  }
   302  
   303  // NewCacheFunc - Function for creating a new cache from the options and a rest config.
   304  type NewCacheFunc func(config *rest.Config, opts Options) (Cache, error)
   305  
   306  // New initializes and returns a new Cache.
   307  func New(cfg *rest.Config, opts Options) (Cache, error) {
   308  	opts, err := defaultOpts(cfg, opts)
   309  	if err != nil {
   310  		return nil, err
   311  	}
   312  
   313  	newCacheFunc := newCache(cfg, opts)
   314  
   315  	var defaultCache Cache
   316  	if len(opts.DefaultNamespaces) > 0 {
   317  		defaultConfig := optionDefaultsToConfig(&opts)
   318  		defaultCache = newMultiNamespaceCache(newCacheFunc, opts.Scheme, opts.Mapper, opts.DefaultNamespaces, &defaultConfig)
   319  	} else {
   320  		defaultCache = newCacheFunc(optionDefaultsToConfig(&opts), corev1.NamespaceAll)
   321  	}
   322  
   323  	if len(opts.ByObject) == 0 {
   324  		return defaultCache, nil
   325  	}
   326  
   327  	delegating := &delegatingByGVKCache{
   328  		scheme:       opts.Scheme,
   329  		caches:       make(map[schema.GroupVersionKind]Cache, len(opts.ByObject)),
   330  		defaultCache: defaultCache,
   331  	}
   332  
   333  	for obj, config := range opts.ByObject {
   334  		gvk, err := apiutil.GVKForObject(obj, opts.Scheme)
   335  		if err != nil {
   336  			return nil, fmt.Errorf("failed to get GVK for type %T: %w", obj, err)
   337  		}
   338  		var cache Cache
   339  		if len(config.Namespaces) > 0 {
   340  			cache = newMultiNamespaceCache(newCacheFunc, opts.Scheme, opts.Mapper, config.Namespaces, nil)
   341  		} else {
   342  			cache = newCacheFunc(byObjectToConfig(config), corev1.NamespaceAll)
   343  		}
   344  		delegating.caches[gvk] = cache
   345  	}
   346  
   347  	return delegating, nil
   348  }
   349  
   350  // TransformStripManagedFields strips the managed fields of an object before it is committed to the cache.
   351  // If you are not explicitly accessing managedFields from your code, setting this as `DefaultTransform`
   352  // on the cache can lead to a significant reduction in memory usage.
   353  func TransformStripManagedFields() toolscache.TransformFunc {
   354  	return func(in any) (any, error) {
   355  		// Nilcheck managed fields to avoid hitting https://github.com/kubernetes/kubernetes/issues/124337
   356  		if obj, err := meta.Accessor(in); err == nil && obj.GetManagedFields() != nil {
   357  			obj.SetManagedFields(nil)
   358  		}
   359  
   360  		return in, nil
   361  	}
   362  }
   363  
   364  func optionDefaultsToConfig(opts *Options) Config {
   365  	return Config{
   366  		LabelSelector:         opts.DefaultLabelSelector,
   367  		FieldSelector:         opts.DefaultFieldSelector,
   368  		Transform:             opts.DefaultTransform,
   369  		UnsafeDisableDeepCopy: opts.DefaultUnsafeDisableDeepCopy,
   370  	}
   371  }
   372  
   373  func byObjectToConfig(byObject ByObject) Config {
   374  	return Config{
   375  		LabelSelector:         byObject.Label,
   376  		FieldSelector:         byObject.Field,
   377  		Transform:             byObject.Transform,
   378  		UnsafeDisableDeepCopy: byObject.UnsafeDisableDeepCopy,
   379  	}
   380  }
   381  
   382  type newCacheFunc func(config Config, namespace string) Cache
   383  
   384  func newCache(restConfig *rest.Config, opts Options) newCacheFunc {
   385  	return func(config Config, namespace string) Cache {
   386  		return &informerCache{
   387  			scheme: opts.Scheme,
   388  			Informers: internal.NewInformers(restConfig, &internal.InformersOpts{
   389  				HTTPClient:   opts.HTTPClient,
   390  				Scheme:       opts.Scheme,
   391  				Mapper:       opts.Mapper,
   392  				ResyncPeriod: *opts.SyncPeriod,
   393  				Namespace:    namespace,
   394  				Selector: internal.Selector{
   395  					Label: config.LabelSelector,
   396  					Field: config.FieldSelector,
   397  				},
   398  				Transform:             config.Transform,
   399  				WatchErrorHandler:     opts.DefaultWatchErrorHandler,
   400  				UnsafeDisableDeepCopy: ptr.Deref(config.UnsafeDisableDeepCopy, false),
   401  				NewInformer:           opts.newInformer,
   402  			}),
   403  			readerFailOnMissingInformer: opts.ReaderFailOnMissingInformer,
   404  		}
   405  	}
   406  }
   407  
   408  func defaultOpts(config *rest.Config, opts Options) (Options, error) {
   409  	config = rest.CopyConfig(config)
   410  	if config.UserAgent == "" {
   411  		config.UserAgent = rest.DefaultKubernetesUserAgent()
   412  	}
   413  
   414  	// Use the rest HTTP client for the provided config if unset
   415  	if opts.HTTPClient == nil {
   416  		var err error
   417  		opts.HTTPClient, err = rest.HTTPClientFor(config)
   418  		if err != nil {
   419  			return Options{}, fmt.Errorf("could not create HTTP client from config: %w", err)
   420  		}
   421  	}
   422  
   423  	// Use the default Kubernetes Scheme if unset
   424  	if opts.Scheme == nil {
   425  		opts.Scheme = scheme.Scheme
   426  	}
   427  
   428  	// Construct a new Mapper if unset
   429  	if opts.Mapper == nil {
   430  		var err error
   431  		opts.Mapper, err = apiutil.NewDynamicRESTMapper(config, opts.HTTPClient)
   432  		if err != nil {
   433  			return Options{}, fmt.Errorf("could not create RESTMapper from config: %w", err)
   434  		}
   435  	}
   436  
   437  	for obj, byObject := range opts.ByObject {
   438  		isNamespaced, err := apiutil.IsObjectNamespaced(obj, opts.Scheme, opts.Mapper)
   439  		if err != nil {
   440  			return opts, fmt.Errorf("failed to determine if %T is namespaced: %w", obj, err)
   441  		}
   442  		if !isNamespaced && byObject.Namespaces != nil {
   443  			return opts, fmt.Errorf("type %T is not namespaced, but its ByObject.Namespaces setting is not nil", obj)
   444  		}
   445  
   446  		if isNamespaced && byObject.Namespaces == nil {
   447  			byObject.Namespaces = maps.Clone(opts.DefaultNamespaces)
   448  		}
   449  
   450  		// Default the namespace-level configs first, because they need to use the undefaulted type-level config
   451  		// to be able to potentially fall through to settings from DefaultNamespaces.
   452  		for namespace, config := range byObject.Namespaces {
   453  			// 1. Default from the undefaulted type-level config
   454  			config = defaultConfig(config, byObjectToConfig(byObject))
   455  
   456  			// 2. Default from the namespace-level config. This was defaulted from the global default config earlier, but
   457  			//    might not have an entry for the current namespace.
   458  			if defaultNamespaceSettings, hasDefaultNamespace := opts.DefaultNamespaces[namespace]; hasDefaultNamespace {
   459  				config = defaultConfig(config, defaultNamespaceSettings)
   460  			}
   461  
   462  			// 3. Default from the global defaults
   463  			config = defaultConfig(config, optionDefaultsToConfig(&opts))
   464  
   465  			if namespace == metav1.NamespaceAll {
   466  				config.FieldSelector = fields.AndSelectors(
   467  					appendIfNotNil(
   468  						namespaceAllSelector(maps.Keys(byObject.Namespaces)),
   469  						config.FieldSelector,
   470  					)...,
   471  				)
   472  			}
   473  
   474  			byObject.Namespaces[namespace] = config
   475  		}
   476  
   477  		// Only default ByObject iself if it isn't namespaced or has no namespaces configured, as only
   478  		// then any of this will be honored.
   479  		if !isNamespaced || len(byObject.Namespaces) == 0 {
   480  			defaultedConfig := defaultConfig(byObjectToConfig(byObject), optionDefaultsToConfig(&opts))
   481  			byObject.Label = defaultedConfig.LabelSelector
   482  			byObject.Field = defaultedConfig.FieldSelector
   483  			byObject.Transform = defaultedConfig.Transform
   484  			byObject.UnsafeDisableDeepCopy = defaultedConfig.UnsafeDisableDeepCopy
   485  		}
   486  
   487  		opts.ByObject[obj] = byObject
   488  	}
   489  
   490  	// Default namespaces after byObject has been defaulted, otherwise a namespace without selectors
   491  	// will get the `Default` selectors, then get copied to byObject and then not get defaulted from
   492  	// byObject, as it already has selectors.
   493  	for namespace, cfg := range opts.DefaultNamespaces {
   494  		cfg = defaultConfig(cfg, optionDefaultsToConfig(&opts))
   495  		if namespace == metav1.NamespaceAll {
   496  			cfg.FieldSelector = fields.AndSelectors(
   497  				appendIfNotNil(
   498  					namespaceAllSelector(maps.Keys(opts.DefaultNamespaces)),
   499  					cfg.FieldSelector,
   500  				)...,
   501  			)
   502  		}
   503  		opts.DefaultNamespaces[namespace] = cfg
   504  	}
   505  
   506  	// Default the resync period to 10 hours if unset
   507  	if opts.SyncPeriod == nil {
   508  		opts.SyncPeriod = &defaultSyncPeriod
   509  	}
   510  	return opts, nil
   511  }
   512  
   513  func defaultConfig(toDefault, defaultFrom Config) Config {
   514  	if toDefault.LabelSelector == nil {
   515  		toDefault.LabelSelector = defaultFrom.LabelSelector
   516  	}
   517  	if toDefault.FieldSelector == nil {
   518  		toDefault.FieldSelector = defaultFrom.FieldSelector
   519  	}
   520  	if toDefault.Transform == nil {
   521  		toDefault.Transform = defaultFrom.Transform
   522  	}
   523  	if toDefault.UnsafeDisableDeepCopy == nil {
   524  		toDefault.UnsafeDisableDeepCopy = defaultFrom.UnsafeDisableDeepCopy
   525  	}
   526  
   527  	return toDefault
   528  }
   529  
   530  func namespaceAllSelector(namespaces []string) []fields.Selector {
   531  	selectors := make([]fields.Selector, 0, len(namespaces)-1)
   532  	sort.Strings(namespaces)
   533  	for _, namespace := range namespaces {
   534  		if namespace != metav1.NamespaceAll {
   535  			selectors = append(selectors, fields.OneTermNotEqualSelector("metadata.namespace", namespace))
   536  		}
   537  	}
   538  
   539  	return selectors
   540  }
   541  
   542  func appendIfNotNil[T comparable](a []T, b T) []T {
   543  	if b != *new(T) {
   544  		return append(a, b)
   545  	}
   546  	return a
   547  }
   548  

View as plain text