...

Source file src/sigs.k8s.io/controller-runtime/pkg/cache/informer_cache.go

Documentation: sigs.k8s.io/controller-runtime/pkg/cache

     1  /*
     2  Copyright 2018 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package cache
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"strings"
    23  
    24  	apimeta "k8s.io/apimachinery/pkg/api/meta"
    25  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    26  	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
    27  	"k8s.io/apimachinery/pkg/runtime"
    28  	"k8s.io/apimachinery/pkg/runtime/schema"
    29  	"k8s.io/client-go/tools/cache"
    30  
    31  	"sigs.k8s.io/controller-runtime/pkg/cache/internal"
    32  	"sigs.k8s.io/controller-runtime/pkg/client"
    33  	"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
    34  )
    35  
    36  var (
    37  	_ Informers     = &informerCache{}
    38  	_ client.Reader = &informerCache{}
    39  	_ Cache         = &informerCache{}
    40  )
    41  
    42  // ErrCacheNotStarted is returned when trying to read from the cache that wasn't started.
    43  type ErrCacheNotStarted struct{}
    44  
    45  func (*ErrCacheNotStarted) Error() string {
    46  	return "the cache is not started, can not read objects"
    47  }
    48  
    49  var _ error = (*ErrCacheNotStarted)(nil)
    50  
    51  // ErrResourceNotCached indicates that the resource type
    52  // the client asked the cache for is not cached, i.e. the
    53  // corresponding informer does not exist yet.
    54  type ErrResourceNotCached struct {
    55  	GVK schema.GroupVersionKind
    56  }
    57  
    58  // Error returns the error
    59  func (r ErrResourceNotCached) Error() string {
    60  	return fmt.Sprintf("%s is not cached", r.GVK.String())
    61  }
    62  
    63  var _ error = (*ErrResourceNotCached)(nil)
    64  
    65  // informerCache is a Kubernetes Object cache populated from internal.Informers.
    66  // informerCache wraps internal.Informers.
    67  type informerCache struct {
    68  	scheme *runtime.Scheme
    69  	*internal.Informers
    70  	readerFailOnMissingInformer bool
    71  }
    72  
    73  // Get implements Reader.
    74  func (ic *informerCache) Get(ctx context.Context, key client.ObjectKey, out client.Object, opts ...client.GetOption) error {
    75  	gvk, err := apiutil.GVKForObject(out, ic.scheme)
    76  	if err != nil {
    77  		return err
    78  	}
    79  
    80  	started, cache, err := ic.getInformerForKind(ctx, gvk, out)
    81  	if err != nil {
    82  		return err
    83  	}
    84  
    85  	if !started {
    86  		return &ErrCacheNotStarted{}
    87  	}
    88  	return cache.Reader.Get(ctx, key, out, opts...)
    89  }
    90  
    91  // List implements Reader.
    92  func (ic *informerCache) List(ctx context.Context, out client.ObjectList, opts ...client.ListOption) error {
    93  	gvk, cacheTypeObj, err := ic.objectTypeForListObject(out)
    94  	if err != nil {
    95  		return err
    96  	}
    97  
    98  	started, cache, err := ic.getInformerForKind(ctx, *gvk, cacheTypeObj)
    99  	if err != nil {
   100  		return err
   101  	}
   102  
   103  	if !started {
   104  		return &ErrCacheNotStarted{}
   105  	}
   106  
   107  	return cache.Reader.List(ctx, out, opts...)
   108  }
   109  
   110  // objectTypeForListObject tries to find the runtime.Object and associated GVK
   111  // for a single object corresponding to the passed-in list type. We need them
   112  // because they are used as cache map key.
   113  func (ic *informerCache) objectTypeForListObject(list client.ObjectList) (*schema.GroupVersionKind, runtime.Object, error) {
   114  	gvk, err := apiutil.GVKForObject(list, ic.scheme)
   115  	if err != nil {
   116  		return nil, nil, err
   117  	}
   118  
   119  	// We need the non-list GVK, so chop off the "List" from the end of the kind.
   120  	gvk.Kind = strings.TrimSuffix(gvk.Kind, "List")
   121  
   122  	// Handle unstructured.UnstructuredList.
   123  	if _, isUnstructured := list.(runtime.Unstructured); isUnstructured {
   124  		u := &unstructured.Unstructured{}
   125  		u.SetGroupVersionKind(gvk)
   126  		return &gvk, u, nil
   127  	}
   128  	// Handle metav1.PartialObjectMetadataList.
   129  	if _, isPartialObjectMetadata := list.(*metav1.PartialObjectMetadataList); isPartialObjectMetadata {
   130  		pom := &metav1.PartialObjectMetadata{}
   131  		pom.SetGroupVersionKind(gvk)
   132  		return &gvk, pom, nil
   133  	}
   134  
   135  	// Any other list type should have a corresponding non-list type registered
   136  	// in the scheme. Use that to create a new instance of the non-list type.
   137  	cacheTypeObj, err := ic.scheme.New(gvk)
   138  	if err != nil {
   139  		return nil, nil, err
   140  	}
   141  	return &gvk, cacheTypeObj, nil
   142  }
   143  
   144  func applyGetOptions(opts ...InformerGetOption) *internal.GetOptions {
   145  	cfg := &InformerGetOptions{}
   146  	for _, opt := range opts {
   147  		opt(cfg)
   148  	}
   149  	return (*internal.GetOptions)(cfg)
   150  }
   151  
   152  // GetInformerForKind returns the informer for the GroupVersionKind. If no informer exists, one will be started.
   153  func (ic *informerCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, opts ...InformerGetOption) (Informer, error) {
   154  	// Map the gvk to an object
   155  	obj, err := ic.scheme.New(gvk)
   156  	if err != nil {
   157  		return nil, err
   158  	}
   159  
   160  	_, i, err := ic.Informers.Get(ctx, gvk, obj, applyGetOptions(opts...))
   161  	if err != nil {
   162  		return nil, err
   163  	}
   164  	return i.Informer, nil
   165  }
   166  
   167  // GetInformer returns the informer for the obj. If no informer exists, one will be started.
   168  func (ic *informerCache) GetInformer(ctx context.Context, obj client.Object, opts ...InformerGetOption) (Informer, error) {
   169  	gvk, err := apiutil.GVKForObject(obj, ic.scheme)
   170  	if err != nil {
   171  		return nil, err
   172  	}
   173  
   174  	_, i, err := ic.Informers.Get(ctx, gvk, obj, applyGetOptions(opts...))
   175  	if err != nil {
   176  		return nil, err
   177  	}
   178  	return i.Informer, nil
   179  }
   180  
   181  func (ic *informerCache) getInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (bool, *internal.Cache, error) {
   182  	if ic.readerFailOnMissingInformer {
   183  		cache, started, ok := ic.Informers.Peek(gvk, obj)
   184  		if !ok {
   185  			return false, nil, &ErrResourceNotCached{GVK: gvk}
   186  		}
   187  		return started, cache, nil
   188  	}
   189  
   190  	return ic.Informers.Get(ctx, gvk, obj, &internal.GetOptions{})
   191  }
   192  
   193  // RemoveInformer deactivates and removes the informer from the cache.
   194  func (ic *informerCache) RemoveInformer(_ context.Context, obj client.Object) error {
   195  	gvk, err := apiutil.GVKForObject(obj, ic.scheme)
   196  	if err != nil {
   197  		return err
   198  	}
   199  
   200  	ic.Informers.Remove(gvk, obj)
   201  	return nil
   202  }
   203  
   204  // NeedLeaderElection implements the LeaderElectionRunnable interface
   205  // to indicate that this can be started without requiring the leader lock.
   206  func (ic *informerCache) NeedLeaderElection() bool {
   207  	return false
   208  }
   209  
   210  // IndexField adds an indexer to the underlying informer, using extractValue function to get
   211  // value(s) from the given field. This index can then be used by passing a field selector
   212  // to List. For one-to-one compatibility with "normal" field selectors, only return one value.
   213  // The values may be anything. They will automatically be prefixed with the namespace of the
   214  // given object, if present. The objects passed are guaranteed to be objects of the correct type.
   215  func (ic *informerCache) IndexField(ctx context.Context, obj client.Object, field string, extractValue client.IndexerFunc) error {
   216  	informer, err := ic.GetInformer(ctx, obj)
   217  	if err != nil {
   218  		return err
   219  	}
   220  	return indexByField(informer, field, extractValue)
   221  }
   222  
   223  func indexByField(informer Informer, field string, extractValue client.IndexerFunc) error {
   224  	indexFunc := func(objRaw interface{}) ([]string, error) {
   225  		// TODO(directxman12): check if this is the correct type?
   226  		obj, isObj := objRaw.(client.Object)
   227  		if !isObj {
   228  			return nil, fmt.Errorf("object of type %T is not an Object", objRaw)
   229  		}
   230  		meta, err := apimeta.Accessor(obj)
   231  		if err != nil {
   232  			return nil, err
   233  		}
   234  		ns := meta.GetNamespace()
   235  
   236  		rawVals := extractValue(obj)
   237  		var vals []string
   238  		if ns == "" {
   239  			// if we're not doubling the keys for the namespaced case, just create a new slice with same length
   240  			vals = make([]string, len(rawVals))
   241  		} else {
   242  			// if we need to add non-namespaced versions too, double the length
   243  			vals = make([]string, len(rawVals)*2)
   244  		}
   245  		for i, rawVal := range rawVals {
   246  			// save a namespaced variant, so that we can ask
   247  			// "what are all the object matching a given index *in a given namespace*"
   248  			vals[i] = internal.KeyToNamespacedKey(ns, rawVal)
   249  			if ns != "" {
   250  				// if we have a namespace, also inject a special index key for listing
   251  				// regardless of the object namespace
   252  				vals[i+len(rawVals)] = internal.KeyToNamespacedKey("", rawVal)
   253  			}
   254  		}
   255  
   256  		return vals, nil
   257  	}
   258  
   259  	return informer.AddIndexers(cache.Indexers{internal.FieldIndexName(field): indexFunc})
   260  }
   261  

View as plain text