...

Source file src/sigs.k8s.io/controller-runtime/pkg/cache/multi_namespace_cache.go

Documentation: sigs.k8s.io/controller-runtime/pkg/cache

     1  /*
     2  Copyright 2019 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package cache
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"time"
    23  
    24  	corev1 "k8s.io/api/core/v1"
    25  	apimeta "k8s.io/apimachinery/pkg/api/meta"
    26  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    27  	"k8s.io/apimachinery/pkg/runtime"
    28  	"k8s.io/apimachinery/pkg/runtime/schema"
    29  	toolscache "k8s.io/client-go/tools/cache"
    30  
    31  	"sigs.k8s.io/controller-runtime/pkg/client"
    32  	"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
    33  )
    34  
    35  // a new global namespaced cache to handle cluster scoped resources.
    36  const globalCache = "_cluster-scope"
    37  
    38  func newMultiNamespaceCache(
    39  	newCache newCacheFunc,
    40  	scheme *runtime.Scheme,
    41  	restMapper apimeta.RESTMapper,
    42  	namespaces map[string]Config,
    43  	globalConfig *Config, // may be nil in which case no cache for cluster-scoped objects will be created
    44  ) Cache {
    45  	// Create every namespace cache.
    46  	caches := map[string]Cache{}
    47  	for namespace, config := range namespaces {
    48  		caches[namespace] = newCache(config, namespace)
    49  	}
    50  
    51  	// Create a cache for cluster scoped resources if requested
    52  	var clusterCache Cache
    53  	if globalConfig != nil {
    54  		clusterCache = newCache(*globalConfig, corev1.NamespaceAll)
    55  	}
    56  
    57  	return &multiNamespaceCache{
    58  		namespaceToCache: caches,
    59  		Scheme:           scheme,
    60  		RESTMapper:       restMapper,
    61  		clusterCache:     clusterCache,
    62  	}
    63  }
    64  
    65  // multiNamespaceCache knows how to handle multiple namespaced caches
    66  // Use this feature when scoping permissions for your
    67  // operator to a list of namespaces instead of watching every namespace
    68  // in the cluster.
    69  type multiNamespaceCache struct {
    70  	Scheme           *runtime.Scheme
    71  	RESTMapper       apimeta.RESTMapper
    72  	namespaceToCache map[string]Cache
    73  	clusterCache     Cache
    74  }
    75  
    76  var _ Cache = &multiNamespaceCache{}
    77  
    78  // Methods for multiNamespaceCache to conform to the Informers interface.
    79  
    80  func (c *multiNamespaceCache) GetInformer(ctx context.Context, obj client.Object, opts ...InformerGetOption) (Informer, error) {
    81  	// If the object is cluster scoped, get the informer from clusterCache,
    82  	// if not use the namespaced caches.
    83  	isNamespaced, err := apiutil.IsObjectNamespaced(obj, c.Scheme, c.RESTMapper)
    84  	if err != nil {
    85  		return nil, err
    86  	}
    87  	if !isNamespaced {
    88  		clusterCacheInformer, err := c.clusterCache.GetInformer(ctx, obj, opts...)
    89  		if err != nil {
    90  			return nil, err
    91  		}
    92  
    93  		return &multiNamespaceInformer{
    94  			namespaceToInformer: map[string]Informer{
    95  				globalCache: clusterCacheInformer,
    96  			},
    97  		}, nil
    98  	}
    99  
   100  	namespaceToInformer := map[string]Informer{}
   101  	for ns, cache := range c.namespaceToCache {
   102  		informer, err := cache.GetInformer(ctx, obj, opts...)
   103  		if err != nil {
   104  			return nil, err
   105  		}
   106  		namespaceToInformer[ns] = informer
   107  	}
   108  
   109  	return &multiNamespaceInformer{namespaceToInformer: namespaceToInformer}, nil
   110  }
   111  
   112  func (c *multiNamespaceCache) RemoveInformer(ctx context.Context, obj client.Object) error {
   113  	// If the object is clusterscoped, get the informer from clusterCache,
   114  	// if not use the namespaced caches.
   115  	isNamespaced, err := apiutil.IsObjectNamespaced(obj, c.Scheme, c.RESTMapper)
   116  	if err != nil {
   117  		return err
   118  	}
   119  	if !isNamespaced {
   120  		return c.clusterCache.RemoveInformer(ctx, obj)
   121  	}
   122  
   123  	for _, cache := range c.namespaceToCache {
   124  		err := cache.RemoveInformer(ctx, obj)
   125  		if err != nil {
   126  			return err
   127  		}
   128  	}
   129  
   130  	return nil
   131  }
   132  
   133  func (c *multiNamespaceCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, opts ...InformerGetOption) (Informer, error) {
   134  	// If the object is cluster scoped, get the informer from clusterCache,
   135  	// if not use the namespaced caches.
   136  	isNamespaced, err := apiutil.IsGVKNamespaced(gvk, c.RESTMapper)
   137  	if err != nil {
   138  		return nil, err
   139  	}
   140  	if !isNamespaced {
   141  		clusterCacheInformer, err := c.clusterCache.GetInformerForKind(ctx, gvk, opts...)
   142  		if err != nil {
   143  			return nil, err
   144  		}
   145  
   146  		return &multiNamespaceInformer{
   147  			namespaceToInformer: map[string]Informer{
   148  				globalCache: clusterCacheInformer,
   149  			},
   150  		}, nil
   151  	}
   152  
   153  	namespaceToInformer := map[string]Informer{}
   154  	for ns, cache := range c.namespaceToCache {
   155  		informer, err := cache.GetInformerForKind(ctx, gvk, opts...)
   156  		if err != nil {
   157  			return nil, err
   158  		}
   159  		namespaceToInformer[ns] = informer
   160  	}
   161  
   162  	return &multiNamespaceInformer{namespaceToInformer: namespaceToInformer}, nil
   163  }
   164  
   165  func (c *multiNamespaceCache) Start(ctx context.Context) error {
   166  	errs := make(chan error)
   167  	// start global cache
   168  	if c.clusterCache != nil {
   169  		go func() {
   170  			err := c.clusterCache.Start(ctx)
   171  			if err != nil {
   172  				errs <- fmt.Errorf("failed to start cluster-scoped cache: %w", err)
   173  			}
   174  		}()
   175  	}
   176  
   177  	// start namespaced caches
   178  	for ns, cache := range c.namespaceToCache {
   179  		go func(ns string, cache Cache) {
   180  			if err := cache.Start(ctx); err != nil {
   181  				errs <- fmt.Errorf("failed to start cache for namespace %s: %w", ns, err)
   182  			}
   183  		}(ns, cache)
   184  	}
   185  	select {
   186  	case <-ctx.Done():
   187  		return nil
   188  	case err := <-errs:
   189  		return err
   190  	}
   191  }
   192  
   193  func (c *multiNamespaceCache) WaitForCacheSync(ctx context.Context) bool {
   194  	synced := true
   195  	for _, cache := range c.namespaceToCache {
   196  		if !cache.WaitForCacheSync(ctx) {
   197  			synced = false
   198  		}
   199  	}
   200  
   201  	// check if cluster scoped cache has synced
   202  	if c.clusterCache != nil && !c.clusterCache.WaitForCacheSync(ctx) {
   203  		synced = false
   204  	}
   205  	return synced
   206  }
   207  
   208  func (c *multiNamespaceCache) IndexField(ctx context.Context, obj client.Object, field string, extractValue client.IndexerFunc) error {
   209  	isNamespaced, err := apiutil.IsObjectNamespaced(obj, c.Scheme, c.RESTMapper)
   210  	if err != nil {
   211  		return err
   212  	}
   213  
   214  	if !isNamespaced {
   215  		return c.clusterCache.IndexField(ctx, obj, field, extractValue)
   216  	}
   217  
   218  	for _, cache := range c.namespaceToCache {
   219  		if err := cache.IndexField(ctx, obj, field, extractValue); err != nil {
   220  			return err
   221  		}
   222  	}
   223  	return nil
   224  }
   225  
   226  func (c *multiNamespaceCache) Get(ctx context.Context, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error {
   227  	isNamespaced, err := apiutil.IsObjectNamespaced(obj, c.Scheme, c.RESTMapper)
   228  	if err != nil {
   229  		return err
   230  	}
   231  
   232  	if !isNamespaced {
   233  		// Look into the global cache to fetch the object
   234  		return c.clusterCache.Get(ctx, key, obj)
   235  	}
   236  
   237  	cache, ok := c.namespaceToCache[key.Namespace]
   238  	if !ok {
   239  		if global, hasGlobal := c.namespaceToCache[metav1.NamespaceAll]; hasGlobal {
   240  			return global.Get(ctx, key, obj, opts...)
   241  		}
   242  		return fmt.Errorf("unable to get: %v because of unknown namespace for the cache", key)
   243  	}
   244  	return cache.Get(ctx, key, obj, opts...)
   245  }
   246  
   247  // List multi namespace cache will get all the objects in the namespaces that the cache is watching if asked for all namespaces.
   248  func (c *multiNamespaceCache) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error {
   249  	listOpts := client.ListOptions{}
   250  	listOpts.ApplyOptions(opts)
   251  
   252  	isNamespaced, err := apiutil.IsObjectNamespaced(list, c.Scheme, c.RESTMapper)
   253  	if err != nil {
   254  		return err
   255  	}
   256  
   257  	if !isNamespaced {
   258  		// Look at the global cache to get the objects with the specified GVK
   259  		return c.clusterCache.List(ctx, list, opts...)
   260  	}
   261  
   262  	if listOpts.Namespace != corev1.NamespaceAll {
   263  		cache, ok := c.namespaceToCache[listOpts.Namespace]
   264  		if !ok {
   265  			return fmt.Errorf("unable to list: %v because of unknown namespace for the cache", listOpts.Namespace)
   266  		}
   267  		return cache.List(ctx, list, opts...)
   268  	}
   269  
   270  	listAccessor, err := apimeta.ListAccessor(list)
   271  	if err != nil {
   272  		return err
   273  	}
   274  
   275  	allItems, err := apimeta.ExtractList(list)
   276  	if err != nil {
   277  		return err
   278  	}
   279  
   280  	limitSet := listOpts.Limit > 0
   281  
   282  	var resourceVersion string
   283  	for _, cache := range c.namespaceToCache {
   284  		listObj := list.DeepCopyObject().(client.ObjectList)
   285  		err = cache.List(ctx, listObj, &listOpts)
   286  		if err != nil {
   287  			return err
   288  		}
   289  		items, err := apimeta.ExtractList(listObj)
   290  		if err != nil {
   291  			return err
   292  		}
   293  		accessor, err := apimeta.ListAccessor(listObj)
   294  		if err != nil {
   295  			return fmt.Errorf("object: %T must be a list type", list)
   296  		}
   297  		allItems = append(allItems, items...)
   298  
   299  		// The last list call should have the most correct resource version.
   300  		resourceVersion = accessor.GetResourceVersion()
   301  		if limitSet {
   302  			// decrement Limit by the number of items
   303  			// fetched from the current namespace.
   304  			listOpts.Limit -= int64(len(items))
   305  
   306  			// if a Limit was set and the number of
   307  			// items read has reached this set limit,
   308  			// then stop reading.
   309  			if listOpts.Limit == 0 {
   310  				break
   311  			}
   312  		}
   313  	}
   314  	listAccessor.SetResourceVersion(resourceVersion)
   315  
   316  	return apimeta.SetList(list, allItems)
   317  }
   318  
   319  // multiNamespaceInformer knows how to handle interacting with the underlying informer across multiple namespaces.
   320  type multiNamespaceInformer struct {
   321  	namespaceToInformer map[string]Informer
   322  }
   323  
   324  type handlerRegistration struct {
   325  	handles map[string]toolscache.ResourceEventHandlerRegistration
   326  }
   327  
   328  type syncer interface {
   329  	HasSynced() bool
   330  }
   331  
   332  // HasSynced asserts that the handler has been called for the full initial state of the informer.
   333  // This uses syncer to be compatible between client-go 1.27+ and older versions when the interface changed.
   334  func (h handlerRegistration) HasSynced() bool {
   335  	for _, reg := range h.handles {
   336  		if s, ok := reg.(syncer); ok {
   337  			if !s.HasSynced() {
   338  				return false
   339  			}
   340  		}
   341  	}
   342  	return true
   343  }
   344  
   345  var _ Informer = &multiNamespaceInformer{}
   346  
   347  // AddEventHandler adds the handler to each informer.
   348  func (i *multiNamespaceInformer) AddEventHandler(handler toolscache.ResourceEventHandler) (toolscache.ResourceEventHandlerRegistration, error) {
   349  	handles := handlerRegistration{
   350  		handles: make(map[string]toolscache.ResourceEventHandlerRegistration, len(i.namespaceToInformer)),
   351  	}
   352  
   353  	for ns, informer := range i.namespaceToInformer {
   354  		registration, err := informer.AddEventHandler(handler)
   355  		if err != nil {
   356  			return nil, err
   357  		}
   358  		handles.handles[ns] = registration
   359  	}
   360  
   361  	return handles, nil
   362  }
   363  
   364  // AddEventHandlerWithResyncPeriod adds the handler with a resync period to each namespaced informer.
   365  func (i *multiNamespaceInformer) AddEventHandlerWithResyncPeriod(handler toolscache.ResourceEventHandler, resyncPeriod time.Duration) (toolscache.ResourceEventHandlerRegistration, error) {
   366  	handles := handlerRegistration{
   367  		handles: make(map[string]toolscache.ResourceEventHandlerRegistration, len(i.namespaceToInformer)),
   368  	}
   369  
   370  	for ns, informer := range i.namespaceToInformer {
   371  		registration, err := informer.AddEventHandlerWithResyncPeriod(handler, resyncPeriod)
   372  		if err != nil {
   373  			return nil, err
   374  		}
   375  		handles.handles[ns] = registration
   376  	}
   377  
   378  	return handles, nil
   379  }
   380  
   381  // RemoveEventHandler removes a previously added event handler given by its registration handle.
   382  func (i *multiNamespaceInformer) RemoveEventHandler(h toolscache.ResourceEventHandlerRegistration) error {
   383  	handles, ok := h.(handlerRegistration)
   384  	if !ok {
   385  		return fmt.Errorf("registration is not a registration returned by multiNamespaceInformer")
   386  	}
   387  	for ns, informer := range i.namespaceToInformer {
   388  		registration, ok := handles.handles[ns]
   389  		if !ok {
   390  			continue
   391  		}
   392  		if err := informer.RemoveEventHandler(registration); err != nil {
   393  			return err
   394  		}
   395  	}
   396  	return nil
   397  }
   398  
   399  // AddIndexers adds the indexers to each informer.
   400  func (i *multiNamespaceInformer) AddIndexers(indexers toolscache.Indexers) error {
   401  	for _, informer := range i.namespaceToInformer {
   402  		err := informer.AddIndexers(indexers)
   403  		if err != nil {
   404  			return err
   405  		}
   406  	}
   407  	return nil
   408  }
   409  
   410  // HasSynced checks if each informer has synced.
   411  func (i *multiNamespaceInformer) HasSynced() bool {
   412  	for _, informer := range i.namespaceToInformer {
   413  		if !informer.HasSynced() {
   414  			return false
   415  		}
   416  	}
   417  	return true
   418  }
   419  
   420  // IsStopped checks if each namespaced informer has stopped, returns false if any are still running.
   421  func (i *multiNamespaceInformer) IsStopped() bool {
   422  	for _, informer := range i.namespaceToInformer {
   423  		if stopped := informer.IsStopped(); !stopped {
   424  			return false
   425  		}
   426  	}
   427  	return true
   428  }
   429  

View as plain text