...

Source file src/sigs.k8s.io/cli-utils/pkg/kstatus/polling/clusterreader/caching_reader.go

Documentation: sigs.k8s.io/cli-utils/pkg/kstatus/polling/clusterreader

     1  // Copyright 2020 The Kubernetes Authors.
     2  // SPDX-License-Identifier: Apache-2.0
     3  
     4  package clusterreader
     5  
     6  import (
     7  	"context"
     8  	"errors"
     9  	"fmt"
    10  	"sync"
    11  
    12  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    13  	"k8s.io/apimachinery/pkg/api/meta"
    14  	metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
    15  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    16  	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
    17  	"k8s.io/apimachinery/pkg/fields"
    18  	"k8s.io/apimachinery/pkg/labels"
    19  	"k8s.io/apimachinery/pkg/runtime"
    20  	"k8s.io/apimachinery/pkg/runtime/schema"
    21  	"k8s.io/client-go/tools/pager"
    22  	"sigs.k8s.io/cli-utils/pkg/kstatus/polling/engine"
    23  	"sigs.k8s.io/cli-utils/pkg/object"
    24  	"sigs.k8s.io/controller-runtime/pkg/client"
    25  )
    26  
    27  // This map is hard-coded knowledge that a Deployment contains and
    28  // ReplicaSet, and that a ReplicaSet in turn contains Pods, etc., and the
    29  // approach to finding status being used here requires hardcoding that
    30  // knowledge in the status client library.
    31  // TODO: These should probably be defined in the statusreaders rather than here.
    32  var genGroupKinds = map[schema.GroupKind][]schema.GroupKind{
    33  	schema.GroupKind{Group: "apps", Kind: "Deployment"}: { //nolint:gofmt
    34  		{
    35  			Group: "apps",
    36  			Kind:  "ReplicaSet",
    37  		},
    38  	},
    39  	schema.GroupKind{Group: "apps", Kind: "ReplicaSet"}: { //nolint:gofmt
    40  		{
    41  			Group: "",
    42  			Kind:  "Pod",
    43  		},
    44  	},
    45  	schema.GroupKind{Group: "apps", Kind: "StatefulSet"}: { //nolint:gofmt
    46  		{
    47  			Group: "",
    48  			Kind:  "Pod",
    49  		},
    50  	},
    51  }
    52  
    53  // NewCachingClusterReader returns a new instance of the ClusterReader. The
    54  // ClusterReader needs will use the clusterreader to fetch resources from the cluster,
    55  // while the mapper is used to resolve the version for GroupKinds. The set of
    56  // identifiers is needed so the ClusterReader can figure out which GroupKind
    57  // and namespace combinations it needs to cache when the Sync function is called.
    58  // We only want to fetch the resources that are actually needed.
    59  func NewCachingClusterReader(reader client.Reader, mapper meta.RESTMapper, identifiers object.ObjMetadataSet) (engine.ClusterReader, error) {
    60  	gvkNamespaceSet := newGnSet()
    61  	for _, id := range identifiers {
    62  		// For every identifier, add the GroupVersionKind and namespace combination to the gvkNamespaceSet and
    63  		// check the genGroupKinds map for any generated resources that also should be included.
    64  		err := buildGvkNamespaceSet([]schema.GroupKind{id.GroupKind}, id.Namespace, gvkNamespaceSet)
    65  		if err != nil {
    66  			return nil, err
    67  		}
    68  	}
    69  
    70  	return &CachingClusterReader{
    71  		reader: reader,
    72  		mapper: mapper,
    73  		gns:    gvkNamespaceSet.gvkNamespaces,
    74  	}, nil
    75  }
    76  
    77  func buildGvkNamespaceSet(gks []schema.GroupKind, namespace string, gvkNamespaceSet *gvkNamespaceSet) error {
    78  	for _, gk := range gks {
    79  		gvkNamespaceSet.add(gkNamespace{
    80  			GroupKind: gk,
    81  			Namespace: namespace,
    82  		})
    83  		genGKs, found := genGroupKinds[gk]
    84  		if found {
    85  			err := buildGvkNamespaceSet(genGKs, namespace, gvkNamespaceSet)
    86  			if err != nil {
    87  				return err
    88  			}
    89  		}
    90  	}
    91  	return nil
    92  }
    93  
    94  type gvkNamespaceSet struct {
    95  	gvkNamespaces []gkNamespace
    96  	seen          map[gkNamespace]struct{}
    97  }
    98  
    99  func newGnSet() *gvkNamespaceSet {
   100  	return &gvkNamespaceSet{
   101  		seen: make(map[gkNamespace]struct{}),
   102  	}
   103  }
   104  
   105  func (g *gvkNamespaceSet) add(gn gkNamespace) {
   106  	if _, found := g.seen[gn]; !found {
   107  		g.gvkNamespaces = append(g.gvkNamespaces, gn)
   108  		g.seen[gn] = struct{}{}
   109  	}
   110  }
   111  
   112  // CachingClusterReader is an implementation of the ObserverReader interface that will
   113  // pre-fetch all resources needed before every sync loop. The resources needed are decided by
   114  // finding all combinations of GroupVersionKind and namespace referenced by the provided
   115  // identifiers. This list is then expanded to include any known generated resource types.
   116  type CachingClusterReader struct {
   117  	mx sync.RWMutex
   118  
   119  	// clusterreader provides functions to read and list resources from the
   120  	// cluster.
   121  	reader client.Reader
   122  
   123  	// mapper is the client-side representation of the server-side scheme. It is used
   124  	// to resolve GroupVersionKind from GroupKind.
   125  	mapper meta.RESTMapper
   126  
   127  	// gns contains the slice of all the GVK and namespace combinations that
   128  	// should be included in the cache. This is computed based the resource identifiers
   129  	// passed in when the CachingClusterReader is created and augmented with other
   130  	// resource types needed to compute status (see genGroupKinds).
   131  	gns []gkNamespace
   132  
   133  	// cache contains the resources found in the cluster for the given combination
   134  	// of GVK and namespace. Before each polling cycle, the framework will call the
   135  	// Sync function, which is responsible for repopulating the cache.
   136  	cache map[gkNamespace]cacheEntry
   137  }
   138  
   139  type cacheEntry struct {
   140  	resources unstructured.UnstructuredList
   141  	err       error
   142  }
   143  
   144  // gkNamespace contains information about a GroupVersionKind and a namespace.
   145  type gkNamespace struct {
   146  	GroupKind schema.GroupKind
   147  	Namespace string
   148  }
   149  
   150  // Get looks up the resource identified by the key and the object GVK in the cache. If the needed combination
   151  // of GVK and namespace is not part of the cache, that is considered an error.
   152  func (c *CachingClusterReader) Get(_ context.Context, key client.ObjectKey, obj *unstructured.Unstructured) error {
   153  	c.mx.RLock()
   154  	defer c.mx.RUnlock()
   155  	gvk := obj.GetObjectKind().GroupVersionKind()
   156  	mapping, err := c.mapper.RESTMapping(gvk.GroupKind())
   157  	if err != nil {
   158  		return err
   159  	}
   160  	gn := gkNamespace{
   161  		GroupKind: gvk.GroupKind(),
   162  		Namespace: key.Namespace,
   163  	}
   164  	cacheEntry, found := c.cache[gn]
   165  	if !found {
   166  		return fmt.Errorf("GVK %s and Namespace %s not found in cache", gvk.String(), gn.Namespace)
   167  	}
   168  
   169  	if cacheEntry.err != nil {
   170  		return cacheEntry.err
   171  	}
   172  	for _, u := range cacheEntry.resources.Items {
   173  		if u.GetName() == key.Name {
   174  			obj.Object = u.Object
   175  			return nil
   176  		}
   177  	}
   178  	return apierrors.NewNotFound(mapping.Resource.GroupResource(), key.Name)
   179  }
   180  
   181  // ListNamespaceScoped lists all resource identifier by the GVK of the list, the namespace and the selector
   182  // from the cache. If the needed combination of GVK and namespace is not part of the cache, that is considered an error.
   183  func (c *CachingClusterReader) ListNamespaceScoped(_ context.Context, list *unstructured.UnstructuredList, namespace string, selector labels.Selector) error {
   184  	c.mx.RLock()
   185  	defer c.mx.RUnlock()
   186  	gvk := list.GroupVersionKind()
   187  	gn := gkNamespace{
   188  		GroupKind: gvk.GroupKind(),
   189  		Namespace: namespace,
   190  	}
   191  
   192  	cacheEntry, found := c.cache[gn]
   193  	if !found {
   194  		return fmt.Errorf("GVK %s and Namespace %s not found in cache", gvk.String(), gn.Namespace)
   195  	}
   196  
   197  	if cacheEntry.err != nil {
   198  		return cacheEntry.err
   199  	}
   200  
   201  	var items []unstructured.Unstructured
   202  	for _, u := range cacheEntry.resources.Items {
   203  		if selector.Matches(labels.Set(u.GetLabels())) {
   204  			items = append(items, u)
   205  		}
   206  	}
   207  	list.Items = items
   208  	return nil
   209  }
   210  
   211  // ListClusterScoped lists all resource identifier by the GVK of the list and selector
   212  // from the cache. If the needed combination of GVK and namespace (which for clusterscoped resources
   213  // will always be the empty string) is not part of the cache, that is considered an error.
   214  func (c *CachingClusterReader) ListClusterScoped(ctx context.Context, list *unstructured.UnstructuredList, selector labels.Selector) error {
   215  	return c.ListNamespaceScoped(ctx, list, "", selector)
   216  }
   217  
   218  // Sync loops over the list of gkNamespace we know of, and uses list calls to fetch the resources.
   219  // This information populates the cache.
   220  func (c *CachingClusterReader) Sync(ctx context.Context) error {
   221  	c.mx.Lock()
   222  	defer c.mx.Unlock()
   223  	cache := make(map[gkNamespace]cacheEntry)
   224  	for _, gn := range c.gns {
   225  		mapping, err := c.mapper.RESTMapping(gn.GroupKind)
   226  		if err != nil {
   227  			if meta.IsNoMatchError(err) {
   228  				// If we get a NoMatchError, it means we are checking for
   229  				// a type that doesn't exist. Presumably the CRD is being
   230  				// applied, so it will be added. Reset the RESTMapper to
   231  				// make sure we pick up any new resource types on the
   232  				// APIServer.
   233  				cache[gn] = cacheEntry{
   234  					err: err,
   235  				}
   236  				continue
   237  			}
   238  			return err
   239  		}
   240  		ns := ""
   241  		if mapping.Scope == meta.RESTScopeNamespace {
   242  			ns = gn.Namespace
   243  		}
   244  		list, err := c.listUnstructured(ctx, mapping.GroupVersionKind, ns)
   245  		if err != nil {
   246  			// If the context was cancelled, we just stop the work and return
   247  			// the error.
   248  			if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
   249  				return err
   250  			}
   251  			// For other errors, we just keep it the error. Whenever any pollers
   252  			// request a resource covered by this gns, we just return the
   253  			// error.
   254  			cache[gn] = cacheEntry{
   255  				err: err,
   256  			}
   257  			continue
   258  		}
   259  		cache[gn] = cacheEntry{
   260  			resources: *list,
   261  		}
   262  	}
   263  	c.cache = cache
   264  	return nil
   265  }
   266  
   267  // listUnstructured performs one or more LIST calls, paginating the requests
   268  // and aggregating the results.  If aggregated, only the ResourceVersion,
   269  // SelfLink, and Items will be populated. The default page size is 500.
   270  func (c *CachingClusterReader) listUnstructured(
   271  	ctx context.Context,
   272  	gvk schema.GroupVersionKind,
   273  	namespace string,
   274  ) (*unstructured.UnstructuredList, error) {
   275  	mOpts := metav1.ListOptions{}
   276  	mOpts.SetGroupVersionKind(gvk)
   277  	obj, _, err := pager.New(c.listPageFunc(namespace)).List(ctx, mOpts)
   278  	if err != nil {
   279  		return nil, err
   280  	}
   281  
   282  	switch t := obj.(type) {
   283  	case *unstructured.UnstructuredList:
   284  		// all in one
   285  		return t, nil
   286  	case *metainternalversion.List:
   287  		// aggregated result
   288  		u := &unstructured.UnstructuredList{}
   289  		u.SetGroupVersionKind(gvk)
   290  		// Only ResourceVersion & SelfLink are copied into the aggregated result
   291  		// by ListPager.
   292  		if t.ResourceVersion != "" {
   293  			u.SetResourceVersion(t.ResourceVersion)
   294  		}
   295  		if t.SelfLink != "" { // nolint:staticcheck
   296  			u.SetSelfLink(t.SelfLink) // nolint:staticcheck
   297  		}
   298  		u.Items = make([]unstructured.Unstructured, len(t.Items))
   299  		for i, item := range t.Items {
   300  			ui, ok := item.(*unstructured.Unstructured)
   301  			if !ok {
   302  				return nil, fmt.Errorf("unexpected list item type: %t", item)
   303  			}
   304  			u.Items[i] = *ui
   305  		}
   306  		return u, nil
   307  	default:
   308  		return nil, fmt.Errorf("unexpected list type: %t", t)
   309  	}
   310  }
   311  
   312  func (c *CachingClusterReader) listPageFunc(namespace string) pager.ListPageFunc {
   313  	return func(ctx context.Context, mOpts metav1.ListOptions) (runtime.Object, error) {
   314  		mOptsCopy := mOpts
   315  		labelSelector, err := labels.Parse(mOpts.LabelSelector)
   316  		if err != nil {
   317  			return nil, fmt.Errorf("failed to parse label selector: %w", err)
   318  		}
   319  		fieldSelector, err := fields.ParseSelector(mOpts.FieldSelector)
   320  		if err != nil {
   321  			return nil, fmt.Errorf("failed to parse field selector: %w", err)
   322  		}
   323  		cOpts := &client.ListOptions{
   324  			LabelSelector: labelSelector,
   325  			FieldSelector: fieldSelector,
   326  			Namespace:     namespace,
   327  			Limit:         mOpts.Limit,
   328  			Continue:      mOpts.Continue,
   329  			Raw:           &mOptsCopy,
   330  		}
   331  		var list unstructured.UnstructuredList
   332  		list.SetGroupVersionKind(mOpts.GroupVersionKind())
   333  		// Note: client.ListOptions only supports Exact ResourceVersion matching.
   334  		// So leave ResourceVersion blank to get Any ResourceVersion.
   335  		err = c.reader.List(ctx, &list, cOpts)
   336  		return &list, err
   337  	}
   338  }
   339  

View as plain text