...

Source file src/sigs.k8s.io/controller-runtime/pkg/cache/internal/cache_reader.go

Documentation: sigs.k8s.io/controller-runtime/pkg/cache/internal

     1  /*
     2  Copyright 2018 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package internal
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"reflect"
    23  
    24  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    25  	apimeta "k8s.io/apimachinery/pkg/api/meta"
    26  	"k8s.io/apimachinery/pkg/fields"
    27  	"k8s.io/apimachinery/pkg/labels"
    28  	"k8s.io/apimachinery/pkg/runtime"
    29  	"k8s.io/apimachinery/pkg/runtime/schema"
    30  	"k8s.io/client-go/tools/cache"
    31  
    32  	"sigs.k8s.io/controller-runtime/pkg/client"
    33  	"sigs.k8s.io/controller-runtime/pkg/internal/field/selector"
    34  )
    35  
    36  // CacheReader is a client.Reader.
    37  var _ client.Reader = &CacheReader{}
    38  
    39  // CacheReader wraps a cache.Index to implement the client.Reader interface for a single type.
    40  type CacheReader struct {
    41  	// indexer is the underlying indexer wrapped by this cache.
    42  	indexer cache.Indexer
    43  
    44  	// groupVersionKind is the group-version-kind of the resource.
    45  	groupVersionKind schema.GroupVersionKind
    46  
    47  	// scopeName is the scope of the resource (namespaced or cluster-scoped).
    48  	scopeName apimeta.RESTScopeName
    49  
    50  	// disableDeepCopy indicates not to deep copy objects during get or list objects.
    51  	// Be very careful with this, when enabled you must DeepCopy any object before mutating it,
    52  	// otherwise you will mutate the object in the cache.
    53  	disableDeepCopy bool
    54  }
    55  
    56  // Get checks the indexer for the object and writes a copy of it if found.
    57  func (c *CacheReader) Get(_ context.Context, key client.ObjectKey, out client.Object, _ ...client.GetOption) error {
    58  	if c.scopeName == apimeta.RESTScopeNameRoot {
    59  		key.Namespace = ""
    60  	}
    61  	storeKey := objectKeyToStoreKey(key)
    62  
    63  	// Lookup the object from the indexer cache
    64  	obj, exists, err := c.indexer.GetByKey(storeKey)
    65  	if err != nil {
    66  		return err
    67  	}
    68  
    69  	// Not found, return an error
    70  	if !exists {
    71  		return apierrors.NewNotFound(schema.GroupResource{
    72  			Group: c.groupVersionKind.Group,
    73  			// Resource gets set as Kind in the error so this is fine
    74  			Resource: c.groupVersionKind.Kind,
    75  		}, key.Name)
    76  	}
    77  
    78  	// Verify the result is a runtime.Object
    79  	if _, isObj := obj.(runtime.Object); !isObj {
    80  		// This should never happen
    81  		return fmt.Errorf("cache contained %T, which is not an Object", obj)
    82  	}
    83  
    84  	if c.disableDeepCopy {
    85  		// skip deep copy which might be unsafe
    86  		// you must DeepCopy any object before mutating it outside
    87  	} else {
    88  		// deep copy to avoid mutating cache
    89  		obj = obj.(runtime.Object).DeepCopyObject()
    90  	}
    91  
    92  	// Copy the value of the item in the cache to the returned value
    93  	// TODO(directxman12): this is a terrible hack, pls fix (we should have deepcopyinto)
    94  	outVal := reflect.ValueOf(out)
    95  	objVal := reflect.ValueOf(obj)
    96  	if !objVal.Type().AssignableTo(outVal.Type()) {
    97  		return fmt.Errorf("cache had type %s, but %s was asked for", objVal.Type(), outVal.Type())
    98  	}
    99  	reflect.Indirect(outVal).Set(reflect.Indirect(objVal))
   100  	if !c.disableDeepCopy {
   101  		out.GetObjectKind().SetGroupVersionKind(c.groupVersionKind)
   102  	}
   103  
   104  	return nil
   105  }
   106  
   107  // List lists items out of the indexer and writes them to out.
   108  func (c *CacheReader) List(_ context.Context, out client.ObjectList, opts ...client.ListOption) error {
   109  	var objs []interface{}
   110  	var err error
   111  
   112  	listOpts := client.ListOptions{}
   113  	listOpts.ApplyOptions(opts)
   114  
   115  	if listOpts.Continue != "" {
   116  		return fmt.Errorf("continue list option is not supported by the cache")
   117  	}
   118  
   119  	switch {
   120  	case listOpts.FieldSelector != nil:
   121  		requiresExact := selector.RequiresExactMatch(listOpts.FieldSelector)
   122  		if !requiresExact {
   123  			return fmt.Errorf("non-exact field matches are not supported by the cache")
   124  		}
   125  		// list all objects by the field selector. If this is namespaced and we have one, ask for the
   126  		// namespaced index key. Otherwise, ask for the non-namespaced variant by using the fake "all namespaces"
   127  		// namespace.
   128  		objs, err = byIndexes(c.indexer, listOpts.FieldSelector.Requirements(), listOpts.Namespace)
   129  	case listOpts.Namespace != "":
   130  		objs, err = c.indexer.ByIndex(cache.NamespaceIndex, listOpts.Namespace)
   131  	default:
   132  		objs = c.indexer.List()
   133  	}
   134  	if err != nil {
   135  		return err
   136  	}
   137  	var labelSel labels.Selector
   138  	if listOpts.LabelSelector != nil {
   139  		labelSel = listOpts.LabelSelector
   140  	}
   141  
   142  	limitSet := listOpts.Limit > 0
   143  
   144  	runtimeObjs := make([]runtime.Object, 0, len(objs))
   145  	for _, item := range objs {
   146  		// if the Limit option is set and the number of items
   147  		// listed exceeds this limit, then stop reading.
   148  		if limitSet && int64(len(runtimeObjs)) >= listOpts.Limit {
   149  			break
   150  		}
   151  		obj, isObj := item.(runtime.Object)
   152  		if !isObj {
   153  			return fmt.Errorf("cache contained %T, which is not an Object", item)
   154  		}
   155  		meta, err := apimeta.Accessor(obj)
   156  		if err != nil {
   157  			return err
   158  		}
   159  		if labelSel != nil {
   160  			lbls := labels.Set(meta.GetLabels())
   161  			if !labelSel.Matches(lbls) {
   162  				continue
   163  			}
   164  		}
   165  
   166  		var outObj runtime.Object
   167  		if c.disableDeepCopy || (listOpts.UnsafeDisableDeepCopy != nil && *listOpts.UnsafeDisableDeepCopy) {
   168  			// skip deep copy which might be unsafe
   169  			// you must DeepCopy any object before mutating it outside
   170  			outObj = obj
   171  		} else {
   172  			outObj = obj.DeepCopyObject()
   173  			outObj.GetObjectKind().SetGroupVersionKind(c.groupVersionKind)
   174  		}
   175  		runtimeObjs = append(runtimeObjs, outObj)
   176  	}
   177  	return apimeta.SetList(out, runtimeObjs)
   178  }
   179  
   180  func byIndexes(indexer cache.Indexer, requires fields.Requirements, namespace string) ([]interface{}, error) {
   181  	var (
   182  		err  error
   183  		objs []interface{}
   184  		vals []string
   185  	)
   186  	indexers := indexer.GetIndexers()
   187  	for idx, req := range requires {
   188  		indexName := FieldIndexName(req.Field)
   189  		indexedValue := KeyToNamespacedKey(namespace, req.Value)
   190  		if idx == 0 {
   191  			// we use first require to get snapshot data
   192  			// TODO(halfcrazy): use complicated index when client-go provides byIndexes
   193  			// https://github.com/kubernetes/kubernetes/issues/109329
   194  			objs, err = indexer.ByIndex(indexName, indexedValue)
   195  			if err != nil {
   196  				return nil, err
   197  			}
   198  			if len(objs) == 0 {
   199  				return nil, nil
   200  			}
   201  			continue
   202  		}
   203  		fn, exist := indexers[indexName]
   204  		if !exist {
   205  			return nil, fmt.Errorf("index with name %s does not exist", indexName)
   206  		}
   207  		filteredObjects := make([]interface{}, 0, len(objs))
   208  		for _, obj := range objs {
   209  			vals, err = fn(obj)
   210  			if err != nil {
   211  				return nil, err
   212  			}
   213  			for _, val := range vals {
   214  				if val == indexedValue {
   215  					filteredObjects = append(filteredObjects, obj)
   216  					break
   217  				}
   218  			}
   219  		}
   220  		if len(filteredObjects) == 0 {
   221  			return nil, nil
   222  		}
   223  		objs = filteredObjects
   224  	}
   225  	return objs, nil
   226  }
   227  
   228  // objectKeyToStorageKey converts an object key to store key.
   229  // It's akin to MetaNamespaceKeyFunc. It's separate from
   230  // String to allow keeping the key format easily in sync with
   231  // MetaNamespaceKeyFunc.
   232  func objectKeyToStoreKey(k client.ObjectKey) string {
   233  	if k.Namespace == "" {
   234  		return k.Name
   235  	}
   236  	return k.Namespace + "/" + k.Name
   237  }
   238  
   239  // FieldIndexName constructs the name of the index over the given field,
   240  // for use with an indexer.
   241  func FieldIndexName(field string) string {
   242  	return "field:" + field
   243  }
   244  
   245  // allNamespacesNamespace is used as the "namespace" when we want to list across all namespaces.
   246  const allNamespacesNamespace = "__all_namespaces"
   247  
   248  // KeyToNamespacedKey prefixes the given index key with a namespace
   249  // for use in field selector indexes.
   250  func KeyToNamespacedKey(ns string, baseKey string) string {
   251  	if ns != "" {
   252  		return ns + "/" + baseKey
   253  	}
   254  	return allNamespacesNamespace + "/" + baseKey
   255  }
   256  

View as plain text