...

Source file src/sigs.k8s.io/controller-runtime/pkg/cache/internal/informers.go

Documentation: sigs.k8s.io/controller-runtime/pkg/cache/internal

     1  /*
     2  Copyright 2018 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package internal
    18  
    19  import (
    20  	"context"
    21  	"errors"
    22  	"fmt"
    23  	"math/rand"
    24  	"net/http"
    25  	"sync"
    26  	"time"
    27  
    28  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    29  	"k8s.io/apimachinery/pkg/api/meta"
    30  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    31  	"k8s.io/apimachinery/pkg/runtime"
    32  	"k8s.io/apimachinery/pkg/runtime/schema"
    33  	"k8s.io/apimachinery/pkg/runtime/serializer"
    34  	"k8s.io/apimachinery/pkg/watch"
    35  	"k8s.io/client-go/dynamic"
    36  	"k8s.io/client-go/metadata"
    37  	"k8s.io/client-go/rest"
    38  	"k8s.io/client-go/tools/cache"
    39  	"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
    40  	"sigs.k8s.io/controller-runtime/pkg/internal/syncs"
    41  )
    42  
    43  // InformersOpts configures an InformerMap.
    44  type InformersOpts struct {
    45  	HTTPClient            *http.Client
    46  	Scheme                *runtime.Scheme
    47  	Mapper                meta.RESTMapper
    48  	ResyncPeriod          time.Duration
    49  	Namespace             string
    50  	NewInformer           *func(cache.ListerWatcher, runtime.Object, time.Duration, cache.Indexers) cache.SharedIndexInformer
    51  	Selector              Selector
    52  	Transform             cache.TransformFunc
    53  	UnsafeDisableDeepCopy bool
    54  	WatchErrorHandler     cache.WatchErrorHandler
    55  }
    56  
    57  // NewInformers creates a new InformersMap that can create informers under the hood.
    58  func NewInformers(config *rest.Config, options *InformersOpts) *Informers {
    59  	newInformer := cache.NewSharedIndexInformer
    60  	if options.NewInformer != nil {
    61  		newInformer = *options.NewInformer
    62  	}
    63  	return &Informers{
    64  		config:     config,
    65  		httpClient: options.HTTPClient,
    66  		scheme:     options.Scheme,
    67  		mapper:     options.Mapper,
    68  		tracker: tracker{
    69  			Structured:   make(map[schema.GroupVersionKind]*Cache),
    70  			Unstructured: make(map[schema.GroupVersionKind]*Cache),
    71  			Metadata:     make(map[schema.GroupVersionKind]*Cache),
    72  		},
    73  		codecs:                serializer.NewCodecFactory(options.Scheme),
    74  		paramCodec:            runtime.NewParameterCodec(options.Scheme),
    75  		resync:                options.ResyncPeriod,
    76  		startWait:             make(chan struct{}),
    77  		namespace:             options.Namespace,
    78  		selector:              options.Selector,
    79  		transform:             options.Transform,
    80  		unsafeDisableDeepCopy: options.UnsafeDisableDeepCopy,
    81  		newInformer:           newInformer,
    82  		watchErrorHandler:     options.WatchErrorHandler,
    83  	}
    84  }
    85  
    86  // Cache contains the cached data for an Cache.
    87  type Cache struct {
    88  	// Informer is the cached informer
    89  	Informer cache.SharedIndexInformer
    90  
    91  	// CacheReader wraps Informer and implements the CacheReader interface for a single type
    92  	Reader CacheReader
    93  
    94  	// Stop can be used to stop this individual informer.
    95  	stop chan struct{}
    96  }
    97  
    98  // Start starts the informer managed by a MapEntry.
    99  // Blocks until the informer stops. The informer can be stopped
   100  // either individually (via the entry's stop channel) or globally
   101  // via the provided stop argument.
   102  func (c *Cache) Start(stop <-chan struct{}) {
   103  	// Stop on either the whole map stopping or just this informer being removed.
   104  	internalStop, cancel := syncs.MergeChans(stop, c.stop)
   105  	defer cancel()
   106  	c.Informer.Run(internalStop)
   107  }
   108  
   109  type tracker struct {
   110  	Structured   map[schema.GroupVersionKind]*Cache
   111  	Unstructured map[schema.GroupVersionKind]*Cache
   112  	Metadata     map[schema.GroupVersionKind]*Cache
   113  }
   114  
   115  // GetOptions provides configuration to customize the behavior when
   116  // getting an informer.
   117  type GetOptions struct {
   118  	// BlockUntilSynced controls if the informer retrieval will block until the informer is synced. Defaults to `true`.
   119  	BlockUntilSynced *bool
   120  }
   121  
   122  // Informers create and caches Informers for (runtime.Object, schema.GroupVersionKind) pairs.
   123  // It uses a standard parameter codec constructed based on the given generated Scheme.
   124  type Informers struct {
   125  	// httpClient is used to create a new REST client
   126  	httpClient *http.Client
   127  
   128  	// scheme maps runtime.Objects to GroupVersionKinds
   129  	scheme *runtime.Scheme
   130  
   131  	// config is used to talk to the apiserver
   132  	config *rest.Config
   133  
   134  	// mapper maps GroupVersionKinds to Resources
   135  	mapper meta.RESTMapper
   136  
   137  	// tracker tracks informers keyed by their type and groupVersionKind
   138  	tracker tracker
   139  
   140  	// codecs is used to create a new REST client
   141  	codecs serializer.CodecFactory
   142  
   143  	// paramCodec is used by list and watch
   144  	paramCodec runtime.ParameterCodec
   145  
   146  	// resync is the base frequency the informers are resynced
   147  	// a 10 percent jitter will be added to the resync period between informers
   148  	// so that all informers will not send list requests simultaneously.
   149  	resync time.Duration
   150  
   151  	// mu guards access to the map
   152  	mu sync.RWMutex
   153  
   154  	// started is true if the informers have been started
   155  	started bool
   156  
   157  	// startWait is a channel that is closed after the
   158  	// informer has been started.
   159  	startWait chan struct{}
   160  
   161  	// waitGroup is the wait group that is used to wait for all informers to stop
   162  	waitGroup sync.WaitGroup
   163  
   164  	// stopped is true if the informers have been stopped
   165  	stopped bool
   166  
   167  	// ctx is the context to stop informers
   168  	ctx context.Context
   169  
   170  	// namespace is the namespace that all ListWatches are restricted to
   171  	// default or empty string means all namespaces
   172  	namespace string
   173  
   174  	selector              Selector
   175  	transform             cache.TransformFunc
   176  	unsafeDisableDeepCopy bool
   177  
   178  	// NewInformer allows overriding of the shared index informer constructor for testing.
   179  	newInformer func(cache.ListerWatcher, runtime.Object, time.Duration, cache.Indexers) cache.SharedIndexInformer
   180  
   181  	// WatchErrorHandler allows the shared index informer's
   182  	// watchErrorHandler to be set by overriding the options
   183  	// or to use the default watchErrorHandler
   184  	watchErrorHandler cache.WatchErrorHandler
   185  }
   186  
   187  // Start calls Run on each of the informers and sets started to true. Blocks on the context.
   188  // It doesn't return start because it can't return an error, and it's not a runnable directly.
   189  func (ip *Informers) Start(ctx context.Context) error {
   190  	if err := func() error {
   191  		ip.mu.Lock()
   192  		defer ip.mu.Unlock()
   193  
   194  		if ip.started {
   195  			return errors.New("Informer already started") //nolint:stylecheck
   196  		}
   197  
   198  		// Set the context so it can be passed to informers that are added later
   199  		ip.ctx = ctx
   200  
   201  		// Start each informer
   202  		for _, i := range ip.tracker.Structured {
   203  			ip.startInformerLocked(i)
   204  		}
   205  		for _, i := range ip.tracker.Unstructured {
   206  			ip.startInformerLocked(i)
   207  		}
   208  		for _, i := range ip.tracker.Metadata {
   209  			ip.startInformerLocked(i)
   210  		}
   211  
   212  		// Set started to true so we immediately start any informers added later.
   213  		ip.started = true
   214  		close(ip.startWait)
   215  
   216  		return nil
   217  	}(); err != nil {
   218  		return err
   219  	}
   220  	<-ctx.Done() // Block until the context is done
   221  	ip.mu.Lock()
   222  	ip.stopped = true // Set stopped to true so we don't start any new informers
   223  	ip.mu.Unlock()
   224  	ip.waitGroup.Wait() // Block until all informers have stopped
   225  	return nil
   226  }
   227  
   228  func (ip *Informers) startInformerLocked(cacheEntry *Cache) {
   229  	// Don't start the informer in case we are already waiting for the items in
   230  	// the waitGroup to finish, since waitGroups don't support waiting and adding
   231  	// at the same time.
   232  	if ip.stopped {
   233  		return
   234  	}
   235  
   236  	ip.waitGroup.Add(1)
   237  	go func() {
   238  		defer ip.waitGroup.Done()
   239  		cacheEntry.Start(ip.ctx.Done())
   240  	}()
   241  }
   242  
   243  func (ip *Informers) waitForStarted(ctx context.Context) bool {
   244  	select {
   245  	case <-ip.startWait:
   246  		return true
   247  	case <-ctx.Done():
   248  		return false
   249  	}
   250  }
   251  
   252  // getHasSyncedFuncs returns all the HasSynced functions for the informers in this map.
   253  func (ip *Informers) getHasSyncedFuncs() []cache.InformerSynced {
   254  	ip.mu.RLock()
   255  	defer ip.mu.RUnlock()
   256  
   257  	res := make([]cache.InformerSynced, 0,
   258  		len(ip.tracker.Structured)+len(ip.tracker.Unstructured)+len(ip.tracker.Metadata),
   259  	)
   260  	for _, i := range ip.tracker.Structured {
   261  		res = append(res, i.Informer.HasSynced)
   262  	}
   263  	for _, i := range ip.tracker.Unstructured {
   264  		res = append(res, i.Informer.HasSynced)
   265  	}
   266  	for _, i := range ip.tracker.Metadata {
   267  		res = append(res, i.Informer.HasSynced)
   268  	}
   269  	return res
   270  }
   271  
   272  // WaitForCacheSync waits until all the caches have been started and synced.
   273  func (ip *Informers) WaitForCacheSync(ctx context.Context) bool {
   274  	if !ip.waitForStarted(ctx) {
   275  		return false
   276  	}
   277  	return cache.WaitForCacheSync(ctx.Done(), ip.getHasSyncedFuncs()...)
   278  }
   279  
   280  // Peek attempts to get the informer for the GVK, but does not start one if one does not exist.
   281  func (ip *Informers) Peek(gvk schema.GroupVersionKind, obj runtime.Object) (res *Cache, started bool, ok bool) {
   282  	ip.mu.RLock()
   283  	defer ip.mu.RUnlock()
   284  	i, ok := ip.informersByType(obj)[gvk]
   285  	return i, ip.started, ok
   286  }
   287  
   288  // Get will create a new Informer and add it to the map of specificInformersMap if none exists. Returns
   289  // the Informer from the map.
   290  func (ip *Informers) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object, opts *GetOptions) (bool, *Cache, error) {
   291  	// Return the informer if it is found
   292  	i, started, ok := ip.Peek(gvk, obj)
   293  	if !ok {
   294  		var err error
   295  		if i, started, err = ip.addInformerToMap(gvk, obj); err != nil {
   296  			return started, nil, err
   297  		}
   298  	}
   299  
   300  	shouldBlock := true
   301  	if opts.BlockUntilSynced != nil {
   302  		shouldBlock = *opts.BlockUntilSynced
   303  	}
   304  
   305  	if shouldBlock && started && !i.Informer.HasSynced() {
   306  		// Wait for it to sync before returning the Informer so that folks don't read from a stale cache.
   307  		if !cache.WaitForCacheSync(ctx.Done(), i.Informer.HasSynced) {
   308  			return started, nil, apierrors.NewTimeoutError(fmt.Sprintf("failed waiting for %T Informer to sync", obj), 0)
   309  		}
   310  	}
   311  
   312  	return started, i, nil
   313  }
   314  
   315  // Remove removes an informer entry and stops it if it was running.
   316  func (ip *Informers) Remove(gvk schema.GroupVersionKind, obj runtime.Object) {
   317  	ip.mu.Lock()
   318  	defer ip.mu.Unlock()
   319  
   320  	informerMap := ip.informersByType(obj)
   321  
   322  	entry, ok := informerMap[gvk]
   323  	if !ok {
   324  		return
   325  	}
   326  	close(entry.stop)
   327  	delete(informerMap, gvk)
   328  }
   329  
   330  func (ip *Informers) informersByType(obj runtime.Object) map[schema.GroupVersionKind]*Cache {
   331  	switch obj.(type) {
   332  	case runtime.Unstructured:
   333  		return ip.tracker.Unstructured
   334  	case *metav1.PartialObjectMetadata, *metav1.PartialObjectMetadataList:
   335  		return ip.tracker.Metadata
   336  	default:
   337  		return ip.tracker.Structured
   338  	}
   339  }
   340  
   341  // addInformerToMap either returns an existing informer or creates a new informer, adds it to the map and returns it.
   342  func (ip *Informers) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.Object) (*Cache, bool, error) {
   343  	ip.mu.Lock()
   344  	defer ip.mu.Unlock()
   345  
   346  	// Check the cache to see if we already have an Informer. If we do, return the Informer.
   347  	// This is for the case where 2 routines tried to get the informer when it wasn't in the map
   348  	// so neither returned early, but the first one created it.
   349  	if i, ok := ip.informersByType(obj)[gvk]; ok {
   350  		return i, ip.started, nil
   351  	}
   352  
   353  	// Create a NewSharedIndexInformer and add it to the map.
   354  	listWatcher, err := ip.makeListWatcher(gvk, obj)
   355  	if err != nil {
   356  		return nil, false, err
   357  	}
   358  	sharedIndexInformer := ip.newInformer(&cache.ListWatch{
   359  		ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
   360  			ip.selector.ApplyToList(&opts)
   361  			return listWatcher.ListFunc(opts)
   362  		},
   363  		WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
   364  			ip.selector.ApplyToList(&opts)
   365  			opts.Watch = true // Watch needs to be set to true separately
   366  			return listWatcher.WatchFunc(opts)
   367  		},
   368  	}, obj, calculateResyncPeriod(ip.resync), cache.Indexers{
   369  		cache.NamespaceIndex: cache.MetaNamespaceIndexFunc,
   370  	})
   371  
   372  	// Set WatchErrorHandler on SharedIndexInformer if set
   373  	if ip.watchErrorHandler != nil {
   374  		if err := sharedIndexInformer.SetWatchErrorHandler(ip.watchErrorHandler); err != nil {
   375  			return nil, false, err
   376  		}
   377  	}
   378  
   379  	// Check to see if there is a transformer for this gvk
   380  	if err := sharedIndexInformer.SetTransform(ip.transform); err != nil {
   381  		return nil, false, err
   382  	}
   383  
   384  	mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
   385  	if err != nil {
   386  		return nil, false, err
   387  	}
   388  
   389  	// Create the new entry and set it in the map.
   390  	i := &Cache{
   391  		Informer: sharedIndexInformer,
   392  		Reader: CacheReader{
   393  			indexer:          sharedIndexInformer.GetIndexer(),
   394  			groupVersionKind: gvk,
   395  			scopeName:        mapping.Scope.Name(),
   396  			disableDeepCopy:  ip.unsafeDisableDeepCopy,
   397  		},
   398  		stop: make(chan struct{}),
   399  	}
   400  	ip.informersByType(obj)[gvk] = i
   401  
   402  	// Start the informer in case the InformersMap has started, otherwise it will be
   403  	// started when the InformersMap starts.
   404  	if ip.started {
   405  		ip.startInformerLocked(i)
   406  	}
   407  	return i, ip.started, nil
   408  }
   409  
   410  func (ip *Informers) makeListWatcher(gvk schema.GroupVersionKind, obj runtime.Object) (*cache.ListWatch, error) {
   411  	// Kubernetes APIs work against Resources, not GroupVersionKinds.  Map the
   412  	// groupVersionKind to the Resource API we will use.
   413  	mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
   414  	if err != nil {
   415  		return nil, err
   416  	}
   417  
   418  	// Figure out if the GVK we're dealing with is global, or namespace scoped.
   419  	var namespace string
   420  	if mapping.Scope.Name() == meta.RESTScopeNameNamespace {
   421  		namespace = restrictNamespaceBySelector(ip.namespace, ip.selector)
   422  	}
   423  
   424  	switch obj.(type) {
   425  	//
   426  	// Unstructured
   427  	//
   428  	case runtime.Unstructured:
   429  		// If the rest configuration has a negotiated serializer passed in,
   430  		// we should remove it and use the one that the dynamic client sets for us.
   431  		cfg := rest.CopyConfig(ip.config)
   432  		cfg.NegotiatedSerializer = nil
   433  		dynamicClient, err := dynamic.NewForConfigAndClient(cfg, ip.httpClient)
   434  		if err != nil {
   435  			return nil, err
   436  		}
   437  		resources := dynamicClient.Resource(mapping.Resource)
   438  		return &cache.ListWatch{
   439  			ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
   440  				if namespace != "" {
   441  					return resources.Namespace(namespace).List(ip.ctx, opts)
   442  				}
   443  				return resources.List(ip.ctx, opts)
   444  			},
   445  			// Setup the watch function
   446  			WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
   447  				if namespace != "" {
   448  					return resources.Namespace(namespace).Watch(ip.ctx, opts)
   449  				}
   450  				return resources.Watch(ip.ctx, opts)
   451  			},
   452  		}, nil
   453  	//
   454  	// Metadata
   455  	//
   456  	case *metav1.PartialObjectMetadata, *metav1.PartialObjectMetadataList:
   457  		// Always clear the negotiated serializer and use the one
   458  		// set from the metadata client.
   459  		cfg := rest.CopyConfig(ip.config)
   460  		cfg.NegotiatedSerializer = nil
   461  
   462  		// Grab the metadata metadataClient.
   463  		metadataClient, err := metadata.NewForConfigAndClient(cfg, ip.httpClient)
   464  		if err != nil {
   465  			return nil, err
   466  		}
   467  		resources := metadataClient.Resource(mapping.Resource)
   468  
   469  		return &cache.ListWatch{
   470  			ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
   471  				var (
   472  					list *metav1.PartialObjectMetadataList
   473  					err  error
   474  				)
   475  				if namespace != "" {
   476  					list, err = resources.Namespace(namespace).List(ip.ctx, opts)
   477  				} else {
   478  					list, err = resources.List(ip.ctx, opts)
   479  				}
   480  				if list != nil {
   481  					for i := range list.Items {
   482  						list.Items[i].SetGroupVersionKind(gvk)
   483  					}
   484  				}
   485  				return list, err
   486  			},
   487  			// Setup the watch function
   488  			WatchFunc: func(opts metav1.ListOptions) (watcher watch.Interface, err error) {
   489  				if namespace != "" {
   490  					watcher, err = resources.Namespace(namespace).Watch(ip.ctx, opts)
   491  				} else {
   492  					watcher, err = resources.Watch(ip.ctx, opts)
   493  				}
   494  				if err != nil {
   495  					return nil, err
   496  				}
   497  				return newGVKFixupWatcher(gvk, watcher), nil
   498  			},
   499  		}, nil
   500  	//
   501  	// Structured.
   502  	//
   503  	default:
   504  		client, err := apiutil.RESTClientForGVK(gvk, false, ip.config, ip.codecs, ip.httpClient)
   505  		if err != nil {
   506  			return nil, err
   507  		}
   508  		listGVK := gvk.GroupVersion().WithKind(gvk.Kind + "List")
   509  		listObj, err := ip.scheme.New(listGVK)
   510  		if err != nil {
   511  			return nil, err
   512  		}
   513  		return &cache.ListWatch{
   514  			ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
   515  				// Build the request.
   516  				req := client.Get().Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec)
   517  				if namespace != "" {
   518  					req.Namespace(namespace)
   519  				}
   520  
   521  				// Create the resulting object, and execute the request.
   522  				res := listObj.DeepCopyObject()
   523  				if err := req.Do(ip.ctx).Into(res); err != nil {
   524  					return nil, err
   525  				}
   526  				return res, nil
   527  			},
   528  			// Setup the watch function
   529  			WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
   530  				// Build the request.
   531  				req := client.Get().Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec)
   532  				if namespace != "" {
   533  					req.Namespace(namespace)
   534  				}
   535  				// Call the watch.
   536  				return req.Watch(ip.ctx)
   537  			},
   538  		}, nil
   539  	}
   540  }
   541  
   542  // newGVKFixupWatcher adds a wrapper that preserves the GVK information when
   543  // events come in.
   544  //
   545  // This works around a bug where GVK information is not passed into mapping
   546  // functions when using the OnlyMetadata option in the builder.
   547  // This issue is most likely caused by kubernetes/kubernetes#80609.
   548  // See kubernetes-sigs/controller-runtime#1484.
   549  //
   550  // This was originally implemented as a cache.ResourceEventHandler wrapper but
   551  // that contained a data race which was resolved by setting the GVK in a watch
   552  // wrapper, before the objects are written to the cache.
   553  // See kubernetes-sigs/controller-runtime#1650.
   554  //
   555  // The original watch wrapper was found to be incompatible with
   556  // k8s.io/client-go/tools/cache.Reflector so it has been re-implemented as a
   557  // watch.Filter which is compatible.
   558  // See kubernetes-sigs/controller-runtime#1789.
   559  func newGVKFixupWatcher(gvk schema.GroupVersionKind, watcher watch.Interface) watch.Interface {
   560  	return watch.Filter(
   561  		watcher,
   562  		func(in watch.Event) (watch.Event, bool) {
   563  			in.Object.GetObjectKind().SetGroupVersionKind(gvk)
   564  			return in, true
   565  		},
   566  	)
   567  }
   568  
   569  // calculateResyncPeriod returns a duration based on the desired input
   570  // this is so that multiple controllers don't get into lock-step and all
   571  // hammer the apiserver with list requests simultaneously.
   572  func calculateResyncPeriod(resync time.Duration) time.Duration {
   573  	// the factor will fall into [0.9, 1.1)
   574  	factor := rand.Float64()/5.0 + 0.9 //nolint:gosec
   575  	return time.Duration(float64(resync.Nanoseconds()) * factor)
   576  }
   577  
   578  // restrictNamespaceBySelector returns either a global restriction for all ListWatches
   579  // if not default/empty, or the namespace that a ListWatch for the specific resource
   580  // is restricted to, based on a specified field selector for metadata.namespace field.
   581  func restrictNamespaceBySelector(namespaceOpt string, s Selector) string {
   582  	if namespaceOpt != "" {
   583  		// namespace is already restricted
   584  		return namespaceOpt
   585  	}
   586  	fieldSelector := s.Field
   587  	if fieldSelector == nil || fieldSelector.Empty() {
   588  		return ""
   589  	}
   590  	// check whether a selector includes the namespace field
   591  	value, found := fieldSelector.RequiresExactMatch("metadata.namespace")
   592  	if found {
   593  		return value
   594  	}
   595  	return ""
   596  }
   597  

View as plain text