...

Source file src/sigs.k8s.io/controller-runtime/pkg/builder/controller.go

Documentation: sigs.k8s.io/controller-runtime/pkg/builder

     1  /*
     2  Copyright 2018 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package builder
    18  
    19  import (
    20  	"errors"
    21  	"fmt"
    22  	"strings"
    23  
    24  	"github.com/go-logr/logr"
    25  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    26  	"k8s.io/apimachinery/pkg/runtime/schema"
    27  	"k8s.io/klog/v2"
    28  
    29  	"sigs.k8s.io/controller-runtime/pkg/client"
    30  	"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
    31  	"sigs.k8s.io/controller-runtime/pkg/controller"
    32  	"sigs.k8s.io/controller-runtime/pkg/handler"
    33  	"sigs.k8s.io/controller-runtime/pkg/manager"
    34  	"sigs.k8s.io/controller-runtime/pkg/predicate"
    35  	"sigs.k8s.io/controller-runtime/pkg/reconcile"
    36  	"sigs.k8s.io/controller-runtime/pkg/source"
    37  )
    38  
    39  // Supporting mocking out functions for testing.
    40  var newController = controller.New
    41  var getGvk = apiutil.GVKForObject
    42  
    43  // project represents other forms that we can use to
    44  // send/receive a given resource (metadata-only, unstructured, etc).
    45  type objectProjection int
    46  
    47  const (
    48  	// projectAsNormal doesn't change the object from the form given.
    49  	projectAsNormal objectProjection = iota
    50  	// projectAsMetadata turns this into a metadata-only watch.
    51  	projectAsMetadata
    52  )
    53  
    54  // Builder builds a Controller.
    55  type Builder struct {
    56  	forInput         ForInput
    57  	ownsInput        []OwnsInput
    58  	rawSources       []source.Source
    59  	watchesInput     []WatchesInput
    60  	mgr              manager.Manager
    61  	globalPredicates []predicate.Predicate
    62  	ctrl             controller.Controller
    63  	ctrlOptions      controller.Options
    64  	name             string
    65  }
    66  
    67  // ControllerManagedBy returns a new controller builder that will be started by the provided Manager.
    68  func ControllerManagedBy(m manager.Manager) *Builder {
    69  	return &Builder{mgr: m}
    70  }
    71  
    72  // ForInput represents the information set by the For method.
    73  type ForInput struct {
    74  	object           client.Object
    75  	predicates       []predicate.Predicate
    76  	objectProjection objectProjection
    77  	err              error
    78  }
    79  
    80  // For defines the type of Object being *reconciled*, and configures the ControllerManagedBy to respond to create / delete /
    81  // update events by *reconciling the object*.
    82  // This is the equivalent of calling
    83  // Watches(&source.Kind{Type: apiType}, &handler.EnqueueRequestForObject{}).
    84  func (blder *Builder) For(object client.Object, opts ...ForOption) *Builder {
    85  	if blder.forInput.object != nil {
    86  		blder.forInput.err = fmt.Errorf("For(...) should only be called once, could not assign multiple objects for reconciliation")
    87  		return blder
    88  	}
    89  	input := ForInput{object: object}
    90  	for _, opt := range opts {
    91  		opt.ApplyToFor(&input)
    92  	}
    93  
    94  	blder.forInput = input
    95  	return blder
    96  }
    97  
    98  // OwnsInput represents the information set by Owns method.
    99  type OwnsInput struct {
   100  	matchEveryOwner  bool
   101  	object           client.Object
   102  	predicates       []predicate.Predicate
   103  	objectProjection objectProjection
   104  }
   105  
   106  // Owns defines types of Objects being *generated* by the ControllerManagedBy, and configures the ControllerManagedBy to respond to
   107  // create / delete / update events by *reconciling the owner object*.
   108  //
   109  // The default behavior reconciles only the first controller-type OwnerReference of the given type.
   110  // Use Owns(object, builder.MatchEveryOwner) to reconcile all owners.
   111  //
   112  // By default, this is the equivalent of calling
   113  // Watches(object, handler.EnqueueRequestForOwner([...], ownerType, OnlyControllerOwner())).
   114  func (blder *Builder) Owns(object client.Object, opts ...OwnsOption) *Builder {
   115  	input := OwnsInput{object: object}
   116  	for _, opt := range opts {
   117  		opt.ApplyToOwns(&input)
   118  	}
   119  
   120  	blder.ownsInput = append(blder.ownsInput, input)
   121  	return blder
   122  }
   123  
   124  // WatchesInput represents the information set by Watches method.
   125  type WatchesInput struct {
   126  	obj              client.Object
   127  	handler          handler.EventHandler
   128  	predicates       []predicate.Predicate
   129  	objectProjection objectProjection
   130  }
   131  
   132  // Watches defines the type of Object to watch, and configures the ControllerManagedBy to respond to create / delete /
   133  // update events by *reconciling the object* with the given EventHandler.
   134  //
   135  // This is the equivalent of calling
   136  // WatchesRawSource(source.Kind(cache, object, eventHandler, predicates...)).
   137  func (blder *Builder) Watches(object client.Object, eventHandler handler.EventHandler, opts ...WatchesOption) *Builder {
   138  	input := WatchesInput{
   139  		obj:     object,
   140  		handler: eventHandler,
   141  	}
   142  	for _, opt := range opts {
   143  		opt.ApplyToWatches(&input)
   144  	}
   145  
   146  	blder.watchesInput = append(blder.watchesInput, input)
   147  
   148  	return blder
   149  }
   150  
   151  // WatchesMetadata is the same as Watches, but forces the internal cache to only watch PartialObjectMetadata.
   152  //
   153  // This is useful when watching lots of objects, really big objects, or objects for which you only know
   154  // the GVK, but not the structure. You'll need to pass metav1.PartialObjectMetadata to the client
   155  // when fetching objects in your reconciler, otherwise you'll end up with a duplicate structured or unstructured cache.
   156  //
   157  // When watching a resource with metadata only, for example the v1.Pod, you should not Get and List using the v1.Pod type.
   158  // Instead, you should use the special metav1.PartialObjectMetadata type.
   159  //
   160  // ❌ Incorrect:
   161  //
   162  //	pod := &v1.Pod{}
   163  //	mgr.GetClient().Get(ctx, nsAndName, pod)
   164  //
   165  // ✅ Correct:
   166  //
   167  //	pod := &metav1.PartialObjectMetadata{}
   168  //	pod.SetGroupVersionKind(schema.GroupVersionKind{
   169  //	    Group:   "",
   170  //	    Version: "v1",
   171  //	    Kind:    "Pod",
   172  //	})
   173  //	mgr.GetClient().Get(ctx, nsAndName, pod)
   174  //
   175  // In the first case, controller-runtime will create another cache for the
   176  // concrete type on top of the metadata cache; this increases memory
   177  // consumption and leads to race conditions as caches are not in sync.
   178  func (blder *Builder) WatchesMetadata(object client.Object, eventHandler handler.EventHandler, opts ...WatchesOption) *Builder {
   179  	opts = append(opts, OnlyMetadata)
   180  	return blder.Watches(object, eventHandler, opts...)
   181  }
   182  
   183  // WatchesRawSource exposes the lower-level ControllerManagedBy Watches functions through the builder.
   184  // Specified predicates are registered only for given source.
   185  //
   186  // STOP! Consider using For(...), Owns(...), Watches(...), WatchesMetadata(...) instead.
   187  // This method is only exposed for more advanced use cases, most users should use one of the higher level functions.
   188  //
   189  // WatchesRawSource does not respect predicates configured through WithEventFilter.
   190  func (blder *Builder) WatchesRawSource(src source.Source) *Builder {
   191  	blder.rawSources = append(blder.rawSources, src)
   192  
   193  	return blder
   194  }
   195  
   196  // WithEventFilter sets the event filters, to filter which create/update/delete/generic events eventually
   197  // trigger reconciliations. For example, filtering on whether the resource version has changed.
   198  // Given predicate is added for all watched objects.
   199  // Defaults to the empty list.
   200  func (blder *Builder) WithEventFilter(p predicate.Predicate) *Builder {
   201  	blder.globalPredicates = append(blder.globalPredicates, p)
   202  	return blder
   203  }
   204  
   205  // WithOptions overrides the controller options used in doController. Defaults to empty.
   206  func (blder *Builder) WithOptions(options controller.Options) *Builder {
   207  	blder.ctrlOptions = options
   208  	return blder
   209  }
   210  
   211  // WithLogConstructor overrides the controller options's LogConstructor.
   212  func (blder *Builder) WithLogConstructor(logConstructor func(*reconcile.Request) logr.Logger) *Builder {
   213  	blder.ctrlOptions.LogConstructor = logConstructor
   214  	return blder
   215  }
   216  
   217  // Named sets the name of the controller to the given name. The name shows up
   218  // in metrics, among other things, and thus should be a prometheus compatible name
   219  // (underscores and alphanumeric characters only).
   220  //
   221  // By default, controllers are named using the lowercase version of their kind.
   222  func (blder *Builder) Named(name string) *Builder {
   223  	blder.name = name
   224  	return blder
   225  }
   226  
   227  // Complete builds the Application Controller.
   228  func (blder *Builder) Complete(r reconcile.Reconciler) error {
   229  	_, err := blder.Build(r)
   230  	return err
   231  }
   232  
   233  // Build builds the Application Controller and returns the Controller it created.
   234  func (blder *Builder) Build(r reconcile.Reconciler) (controller.Controller, error) {
   235  	if r == nil {
   236  		return nil, fmt.Errorf("must provide a non-nil Reconciler")
   237  	}
   238  	if blder.mgr == nil {
   239  		return nil, fmt.Errorf("must provide a non-nil Manager")
   240  	}
   241  	if blder.forInput.err != nil {
   242  		return nil, blder.forInput.err
   243  	}
   244  
   245  	// Set the ControllerManagedBy
   246  	if err := blder.doController(r); err != nil {
   247  		return nil, err
   248  	}
   249  
   250  	// Set the Watch
   251  	if err := blder.doWatch(); err != nil {
   252  		return nil, err
   253  	}
   254  
   255  	return blder.ctrl, nil
   256  }
   257  
   258  func (blder *Builder) project(obj client.Object, proj objectProjection) (client.Object, error) {
   259  	switch proj {
   260  	case projectAsNormal:
   261  		return obj, nil
   262  	case projectAsMetadata:
   263  		metaObj := &metav1.PartialObjectMetadata{}
   264  		gvk, err := getGvk(obj, blder.mgr.GetScheme())
   265  		if err != nil {
   266  			return nil, fmt.Errorf("unable to determine GVK of %T for a metadata-only watch: %w", obj, err)
   267  		}
   268  		metaObj.SetGroupVersionKind(gvk)
   269  		return metaObj, nil
   270  	default:
   271  		panic(fmt.Sprintf("unexpected projection type %v on type %T, should not be possible since this is an internal field", proj, obj))
   272  	}
   273  }
   274  
   275  func (blder *Builder) doWatch() error {
   276  	// Reconcile type
   277  	if blder.forInput.object != nil {
   278  		obj, err := blder.project(blder.forInput.object, blder.forInput.objectProjection)
   279  		if err != nil {
   280  			return err
   281  		}
   282  		hdler := &handler.EnqueueRequestForObject{}
   283  		allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
   284  		allPredicates = append(allPredicates, blder.forInput.predicates...)
   285  		src := source.Kind(blder.mgr.GetCache(), obj, hdler, allPredicates...)
   286  		if err := blder.ctrl.Watch(src); err != nil {
   287  			return err
   288  		}
   289  	}
   290  
   291  	// Watches the managed types
   292  	if len(blder.ownsInput) > 0 && blder.forInput.object == nil {
   293  		return errors.New("Owns() can only be used together with For()")
   294  	}
   295  	for _, own := range blder.ownsInput {
   296  		obj, err := blder.project(own.object, own.objectProjection)
   297  		if err != nil {
   298  			return err
   299  		}
   300  		opts := []handler.OwnerOption{}
   301  		if !own.matchEveryOwner {
   302  			opts = append(opts, handler.OnlyControllerOwner())
   303  		}
   304  		hdler := handler.EnqueueRequestForOwner(
   305  			blder.mgr.GetScheme(), blder.mgr.GetRESTMapper(),
   306  			blder.forInput.object,
   307  			opts...,
   308  		)
   309  		allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
   310  		allPredicates = append(allPredicates, own.predicates...)
   311  		src := source.Kind(blder.mgr.GetCache(), obj, hdler, allPredicates...)
   312  		if err := blder.ctrl.Watch(src); err != nil {
   313  			return err
   314  		}
   315  	}
   316  
   317  	// Do the watch requests
   318  	if len(blder.watchesInput) == 0 && blder.forInput.object == nil && len(blder.rawSources) == 0 {
   319  		return errors.New("there are no watches configured, controller will never get triggered. Use For(), Owns(), Watches() or WatchesRawSource() to set them up")
   320  	}
   321  	for _, w := range blder.watchesInput {
   322  		projected, err := blder.project(w.obj, w.objectProjection)
   323  		if err != nil {
   324  			return fmt.Errorf("failed to project for %T: %w", w.obj, err)
   325  		}
   326  		allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
   327  		allPredicates = append(allPredicates, w.predicates...)
   328  		if err := blder.ctrl.Watch(source.Kind(blder.mgr.GetCache(), projected, w.handler, allPredicates...)); err != nil {
   329  			return err
   330  		}
   331  	}
   332  	for _, src := range blder.rawSources {
   333  		if err := blder.ctrl.Watch(src); err != nil {
   334  			return err
   335  		}
   336  	}
   337  	return nil
   338  }
   339  
   340  func (blder *Builder) getControllerName(gvk schema.GroupVersionKind, hasGVK bool) (string, error) {
   341  	if blder.name != "" {
   342  		return blder.name, nil
   343  	}
   344  	if !hasGVK {
   345  		return "", errors.New("one of For() or Named() must be called")
   346  	}
   347  	return strings.ToLower(gvk.Kind), nil
   348  }
   349  
   350  func (blder *Builder) doController(r reconcile.Reconciler) error {
   351  	globalOpts := blder.mgr.GetControllerOptions()
   352  
   353  	ctrlOptions := blder.ctrlOptions
   354  	if ctrlOptions.Reconciler != nil && r != nil {
   355  		return errors.New("reconciler was set via WithOptions() and via Build() or Complete()")
   356  	}
   357  	if ctrlOptions.Reconciler == nil {
   358  		ctrlOptions.Reconciler = r
   359  	}
   360  
   361  	// Retrieve the GVK from the object we're reconciling
   362  	// to pre-populate logger information, and to optionally generate a default name.
   363  	var gvk schema.GroupVersionKind
   364  	hasGVK := blder.forInput.object != nil
   365  	if hasGVK {
   366  		var err error
   367  		gvk, err = getGvk(blder.forInput.object, blder.mgr.GetScheme())
   368  		if err != nil {
   369  			return err
   370  		}
   371  	}
   372  
   373  	// Setup concurrency.
   374  	if ctrlOptions.MaxConcurrentReconciles == 0 && hasGVK {
   375  		groupKind := gvk.GroupKind().String()
   376  
   377  		if concurrency, ok := globalOpts.GroupKindConcurrency[groupKind]; ok && concurrency > 0 {
   378  			ctrlOptions.MaxConcurrentReconciles = concurrency
   379  		}
   380  	}
   381  
   382  	// Setup cache sync timeout.
   383  	if ctrlOptions.CacheSyncTimeout == 0 && globalOpts.CacheSyncTimeout > 0 {
   384  		ctrlOptions.CacheSyncTimeout = globalOpts.CacheSyncTimeout
   385  	}
   386  
   387  	controllerName, err := blder.getControllerName(gvk, hasGVK)
   388  	if err != nil {
   389  		return err
   390  	}
   391  
   392  	// Setup the logger.
   393  	if ctrlOptions.LogConstructor == nil {
   394  		log := blder.mgr.GetLogger().WithValues(
   395  			"controller", controllerName,
   396  		)
   397  		if hasGVK {
   398  			log = log.WithValues(
   399  				"controllerGroup", gvk.Group,
   400  				"controllerKind", gvk.Kind,
   401  			)
   402  		}
   403  
   404  		ctrlOptions.LogConstructor = func(req *reconcile.Request) logr.Logger {
   405  			log := log
   406  			if req != nil {
   407  				if hasGVK {
   408  					log = log.WithValues(gvk.Kind, klog.KRef(req.Namespace, req.Name))
   409  				}
   410  				log = log.WithValues(
   411  					"namespace", req.Namespace, "name", req.Name,
   412  				)
   413  			}
   414  			return log
   415  		}
   416  	}
   417  
   418  	// Build the controller and return.
   419  	blder.ctrl, err = newController(controllerName, blder.mgr, ctrlOptions)
   420  	return err
   421  }
   422  

View as plain text