...

Source file src/github.com/cert-manager/issuer-lib/internal/kubeutil/watch.go

Documentation: github.com/cert-manager/issuer-lib/internal/kubeutil

     1  /*
     2  Copyright 2023 The cert-manager Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package kubeutil
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"math/rand"
    23  
    24  	"github.com/go-logr/logr"
    25  	apimeta "k8s.io/apimachinery/pkg/api/meta"
    26  	"k8s.io/apimachinery/pkg/fields"
    27  	"k8s.io/apimachinery/pkg/runtime"
    28  	"k8s.io/client-go/util/workqueue"
    29  	"sigs.k8s.io/controller-runtime/pkg/cache"
    30  	"sigs.k8s.io/controller-runtime/pkg/client"
    31  	"sigs.k8s.io/controller-runtime/pkg/event"
    32  	"sigs.k8s.io/controller-runtime/pkg/handler"
    33  	"sigs.k8s.io/controller-runtime/pkg/reconcile"
    34  )
    35  
    36  type linkedResourceHandler struct {
    37  	cache      cache.Cache
    38  	objType    client.Object
    39  	addToQueue func(q workqueue.RateLimitingInterface, req reconcile.Request)
    40  
    41  	refField string
    42  	scheme   *runtime.Scheme
    43  	logger   logr.Logger
    44  }
    45  
    46  // NewLinkedResourceHandler returns a `handler.EventHandler` that can be
    47  // passed to a `ctrl.Watch` function. Such a hander transforms the watch events
    48  // that originate from the `source.Source` that was passed to `ctrl.Watch`.
    49  // This LinkedResourceHandler transforms a watch event for a watched resource to
    50  // events for all resources that link to this watched resource.
    51  //
    52  //	eg.: resources A1, A2 and A3 all have a spec field that contains the B1 resource
    53  //	name. Now, we watch the B1 resource for changes and translate a watch event for
    54  //	this resource to an event for resource A1, an event for A2 and an event for A3
    55  //
    56  // To achieve this result performantly, we use a cache with an index. This cache
    57  // contains all resources (A1, A2, ...) that reference resources we receive events for.
    58  // We also register an index which is kept up-to-date by the cache when a resource (A1, A2, ...)
    59  // is added, removed or update. This index contains unique "<namespace>/<name>" identifiers
    60  // generated by the provided `toId` function. These identifiers represent what resource
    61  // (B1, B2, ...) the resources in the cache (A1, A2, ...) link to. Lastly, the `addToQueue`
    62  // function can be used to alter the operation that adds an item to the working queue. This
    63  // makes it possible to, instead of adding the event in the queue, post events on a channel.
    64  // The default nil value for `addToQueue` results in just using the `q.Add(req)` function to
    65  // add events to the queue.
    66  func NewLinkedResourceHandler(
    67  	cacheCtx context.Context,
    68  	logger logr.Logger,
    69  	scheme *runtime.Scheme,
    70  	cache cache.Cache,
    71  	objType client.Object,
    72  	toId func(obj client.Object) []string,
    73  	addToQueue func(q workqueue.RateLimitingInterface, req reconcile.Request),
    74  ) (handler.EventHandler, error) {
    75  	// a random index name prevents collisions with other indexes
    76  	refField := fmt.Sprintf(".x-index.%s", randStringRunes(10))
    77  
    78  	if err := SetGroupVersionKind(scheme, objType); err != nil {
    79  		return nil, err
    80  	}
    81  
    82  	// the registered index allows us to quickly list cached resources
    83  	// based on the index value which contains the unique identifier
    84  	// for the linked resource that we received an event for
    85  	if err := cache.IndexField(cacheCtx, objType, refField, toId); err != nil {
    86  		return nil, err
    87  	}
    88  
    89  	return &linkedResourceHandler{
    90  		logger:     logger,
    91  		scheme:     scheme,
    92  		cache:      cache,
    93  		objType:    objType,
    94  		addToQueue: addToQueue,
    95  
    96  		refField: refField,
    97  	}, nil
    98  }
    99  
   100  // findObjectsForKind is a handler.MapFunc which returns the namespaced names
   101  // of all the resultingType resources having an reference that matches the
   102  // supplied sourceType.
   103  // Errors are logged (not returned) due to a limitation of the handler.MapFunc
   104  // interface. See
   105  // https://github.com/kubernetes-sigs/controller-runtime/issues/1996
   106  // https://github.com/kubernetes-sigs/controller-runtime/issues/1923
   107  func (r *linkedResourceHandler) findObjectsForKind(ctx context.Context, object client.Object) []reconcile.Request {
   108  	logger := r.logger.WithName("FindObjectsForKind").WithValues(
   109  		"object", client.ObjectKeyFromObject(object),
   110  		"objectType", fmt.Sprintf("%T", object),
   111  	)
   112  
   113  	objList, err := NewListObject(r.scheme, r.objType.GetObjectKind().GroupVersionKind())
   114  	if err != nil {
   115  		logger.Error(err, "While creating a List object")
   116  		return nil
   117  	}
   118  	listOps := &client.ListOptions{
   119  		FieldSelector: fields.OneTermEqualSelector(r.refField, fmt.Sprintf("%s/%s", object.GetNamespace(), object.GetName())),
   120  	}
   121  
   122  	if err := r.cache.List(ctx, objList, listOps); err != nil {
   123  		logger.Error(err, "While listing liked resources")
   124  		return nil
   125  	}
   126  
   127  	requests := make([]reconcile.Request, 0, apimeta.LenList(objList))
   128  	if err := apimeta.EachListItem(objList, func(object runtime.Object) error {
   129  		clientObj, ok := object.(client.Object)
   130  		if !ok {
   131  			return fmt.Errorf("object %T cannot be converted to client.Object", object)
   132  		}
   133  		requests = append(requests, reconcile.Request{
   134  			NamespacedName: client.ObjectKeyFromObject(clientObj),
   135  		})
   136  		return nil
   137  	}); err != nil {
   138  		logger.Error(err, "While itterating list")
   139  		return nil
   140  	}
   141  
   142  	return requests
   143  }
   144  
   145  var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
   146  
   147  func randStringRunes(n int) string {
   148  	b := make([]rune, n)
   149  	for i := range b {
   150  		b[i] = letterRunes[rand.Intn(len(letterRunes))]
   151  	}
   152  	return string(b)
   153  }
   154  
   155  // Based on https://github.com/kubernetes-sigs/controller-runtime/blob/00f2425ce068525e0ff674dba51c3e76ee6ad2da/pkg/handler/enqueue_mapped.go
   156  // Copied to this linkedResourceHandler type such that dependencies can be injected.
   157  
   158  var _ handler.EventHandler = &linkedResourceHandler{}
   159  
   160  // Create implements EventHandler.
   161  func (e *linkedResourceHandler) Create(ctx context.Context, evt event.CreateEvent, q workqueue.RateLimitingInterface) {
   162  	reqs := map[reconcile.Request]struct{}{}
   163  	e.mapAndEnqueue(ctx, q, evt.Object, reqs)
   164  }
   165  
   166  // Update implements EventHandler.
   167  func (e *linkedResourceHandler) Update(ctx context.Context, evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
   168  	reqs := map[reconcile.Request]struct{}{}
   169  	e.mapAndEnqueue(ctx, q, evt.ObjectOld, reqs)
   170  	e.mapAndEnqueue(ctx, q, evt.ObjectNew, reqs)
   171  }
   172  
   173  // Delete implements EventHandler.
   174  func (e *linkedResourceHandler) Delete(ctx context.Context, evt event.DeleteEvent, q workqueue.RateLimitingInterface) {
   175  	reqs := map[reconcile.Request]struct{}{}
   176  	e.mapAndEnqueue(ctx, q, evt.Object, reqs)
   177  }
   178  
   179  // Generic implements EventHandler.
   180  func (e *linkedResourceHandler) Generic(ctx context.Context, evt event.GenericEvent, q workqueue.RateLimitingInterface) {
   181  	reqs := map[reconcile.Request]struct{}{}
   182  	e.mapAndEnqueue(ctx, q, evt.Object, reqs)
   183  }
   184  
   185  func (e *linkedResourceHandler) mapAndEnqueue(ctx context.Context, q workqueue.RateLimitingInterface, object client.Object, reqs map[reconcile.Request]struct{}) {
   186  	for _, req := range e.findObjectsForKind(ctx, object) {
   187  		_, ok := reqs[req]
   188  		if !ok {
   189  			if e.addToQueue != nil {
   190  				e.addToQueue(q, req)
   191  			} else {
   192  				q.Add(req)
   193  			}
   194  
   195  			reqs[req] = struct{}{}
   196  		}
   197  	}
   198  }
   199  

View as plain text