...

Source file src/sigs.k8s.io/controller-runtime/pkg/internal/controller/controller.go

Documentation: sigs.k8s.io/controller-runtime/pkg/internal/controller

     1  /*
     2  Copyright 2018 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package controller
    18  
    19  import (
    20  	"context"
    21  	"errors"
    22  	"fmt"
    23  	"sync"
    24  	"time"
    25  
    26  	"github.com/go-logr/logr"
    27  	"k8s.io/apimachinery/pkg/types"
    28  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    29  	"k8s.io/apimachinery/pkg/util/uuid"
    30  	"k8s.io/client-go/util/workqueue"
    31  
    32  	ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics"
    33  	logf "sigs.k8s.io/controller-runtime/pkg/log"
    34  	"sigs.k8s.io/controller-runtime/pkg/ratelimiter"
    35  	"sigs.k8s.io/controller-runtime/pkg/reconcile"
    36  	"sigs.k8s.io/controller-runtime/pkg/source"
    37  )
    38  
    39  // Controller implements controller.Controller.
    40  type Controller struct {
    41  	// Name is used to uniquely identify a Controller in tracing, logging and monitoring.  Name is required.
    42  	Name string
    43  
    44  	// MaxConcurrentReconciles is the maximum number of concurrent Reconciles which can be run. Defaults to 1.
    45  	MaxConcurrentReconciles int
    46  
    47  	// Reconciler is a function that can be called at any time with the Name / Namespace of an object and
    48  	// ensures that the state of the system matches the state specified in the object.
    49  	// Defaults to the DefaultReconcileFunc.
    50  	Do reconcile.Reconciler
    51  
    52  	// RateLimiter is used to limit how frequently requests may be queued into the work queue.
    53  	RateLimiter ratelimiter.RateLimiter
    54  
    55  	// NewQueue constructs the queue for this controller once the controller is ready to start.
    56  	// This is a func because the standard Kubernetes work queues start themselves immediately, which
    57  	// leads to goroutine leaks if something calls controller.New repeatedly.
    58  	NewQueue func(controllerName string, rateLimiter ratelimiter.RateLimiter) workqueue.RateLimitingInterface
    59  
    60  	// Queue is an listeningQueue that listens for events from Informers and adds object keys to
    61  	// the Queue for processing
    62  	Queue workqueue.RateLimitingInterface
    63  
    64  	// mu is used to synchronize Controller setup
    65  	mu sync.Mutex
    66  
    67  	// Started is true if the Controller has been Started
    68  	Started bool
    69  
    70  	// ctx is the context that was passed to Start() and used when starting watches.
    71  	//
    72  	// According to the docs, contexts should not be stored in a struct: https://golang.org/pkg/context,
    73  	// while we usually always strive to follow best practices, we consider this a legacy case and it should
    74  	// undergo a major refactoring and redesign to allow for context to not be stored in a struct.
    75  	ctx context.Context
    76  
    77  	// CacheSyncTimeout refers to the time limit set on waiting for cache to sync
    78  	// Defaults to 2 minutes if not set.
    79  	CacheSyncTimeout time.Duration
    80  
    81  	// startWatches maintains a list of sources, handlers, and predicates to start when the controller is started.
    82  	startWatches []source.Source
    83  
    84  	// LogConstructor is used to construct a logger to then log messages to users during reconciliation,
    85  	// or for example when a watch is started.
    86  	// Note: LogConstructor has to be able to handle nil requests as we are also using it
    87  	// outside the context of a reconciliation.
    88  	LogConstructor func(request *reconcile.Request) logr.Logger
    89  
    90  	// RecoverPanic indicates whether the panic caused by reconcile should be recovered.
    91  	RecoverPanic *bool
    92  
    93  	// LeaderElected indicates whether the controller is leader elected or always running.
    94  	LeaderElected *bool
    95  }
    96  
    97  // Reconcile implements reconcile.Reconciler.
    98  func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (_ reconcile.Result, err error) {
    99  	defer func() {
   100  		if r := recover(); r != nil {
   101  			if c.RecoverPanic != nil && *c.RecoverPanic {
   102  				for _, fn := range utilruntime.PanicHandlers {
   103  					fn(r)
   104  				}
   105  				err = fmt.Errorf("panic: %v [recovered]", r)
   106  				return
   107  			}
   108  
   109  			log := logf.FromContext(ctx)
   110  			log.Info(fmt.Sprintf("Observed a panic in reconciler: %v", r))
   111  			panic(r)
   112  		}
   113  	}()
   114  	return c.Do.Reconcile(ctx, req)
   115  }
   116  
   117  // Watch implements controller.Controller.
   118  func (c *Controller) Watch(src source.Source) error {
   119  	c.mu.Lock()
   120  	defer c.mu.Unlock()
   121  
   122  	// Controller hasn't started yet, store the watches locally and return.
   123  	//
   124  	// These watches are going to be held on the controller struct until the manager or user calls Start(...).
   125  	if !c.Started {
   126  		c.startWatches = append(c.startWatches, src)
   127  		return nil
   128  	}
   129  
   130  	c.LogConstructor(nil).Info("Starting EventSource", "source", src)
   131  	return src.Start(c.ctx, c.Queue)
   132  }
   133  
   134  // NeedLeaderElection implements the manager.LeaderElectionRunnable interface.
   135  func (c *Controller) NeedLeaderElection() bool {
   136  	if c.LeaderElected == nil {
   137  		return true
   138  	}
   139  	return *c.LeaderElected
   140  }
   141  
   142  // Start implements controller.Controller.
   143  func (c *Controller) Start(ctx context.Context) error {
   144  	// use an IIFE to get proper lock handling
   145  	// but lock outside to get proper handling of the queue shutdown
   146  	c.mu.Lock()
   147  	if c.Started {
   148  		return errors.New("controller was started more than once. This is likely to be caused by being added to a manager multiple times")
   149  	}
   150  
   151  	c.initMetrics()
   152  
   153  	// Set the internal context.
   154  	c.ctx = ctx
   155  
   156  	c.Queue = c.NewQueue(c.Name, c.RateLimiter)
   157  	go func() {
   158  		<-ctx.Done()
   159  		c.Queue.ShutDown()
   160  	}()
   161  
   162  	wg := &sync.WaitGroup{}
   163  	err := func() error {
   164  		defer c.mu.Unlock()
   165  
   166  		// TODO(pwittrock): Reconsider HandleCrash
   167  		defer utilruntime.HandleCrash()
   168  
   169  		// NB(directxman12): launch the sources *before* trying to wait for the
   170  		// caches to sync so that they have a chance to register their intendeded
   171  		// caches.
   172  		for _, watch := range c.startWatches {
   173  			c.LogConstructor(nil).Info("Starting EventSource", "source", fmt.Sprintf("%s", watch))
   174  
   175  			if err := watch.Start(ctx, c.Queue); err != nil {
   176  				return err
   177  			}
   178  		}
   179  
   180  		// Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches
   181  		c.LogConstructor(nil).Info("Starting Controller")
   182  
   183  		for _, watch := range c.startWatches {
   184  			syncingSource, ok := watch.(source.SyncingSource)
   185  			if !ok {
   186  				continue
   187  			}
   188  
   189  			if err := func() error {
   190  				// use a context with timeout for launching sources and syncing caches.
   191  				sourceStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout)
   192  				defer cancel()
   193  
   194  				// WaitForSync waits for a definitive timeout, and returns if there
   195  				// is an error or a timeout
   196  				if err := syncingSource.WaitForSync(sourceStartCtx); err != nil {
   197  					err := fmt.Errorf("failed to wait for %s caches to sync: %w", c.Name, err)
   198  					c.LogConstructor(nil).Error(err, "Could not wait for Cache to sync")
   199  					return err
   200  				}
   201  
   202  				return nil
   203  			}(); err != nil {
   204  				return err
   205  			}
   206  		}
   207  
   208  		// All the watches have been started, we can reset the local slice.
   209  		//
   210  		// We should never hold watches more than necessary, each watch source can hold a backing cache,
   211  		// which won't be garbage collected if we hold a reference to it.
   212  		c.startWatches = nil
   213  
   214  		// Launch workers to process resources
   215  		c.LogConstructor(nil).Info("Starting workers", "worker count", c.MaxConcurrentReconciles)
   216  		wg.Add(c.MaxConcurrentReconciles)
   217  		for i := 0; i < c.MaxConcurrentReconciles; i++ {
   218  			go func() {
   219  				defer wg.Done()
   220  				// Run a worker thread that just dequeues items, processes them, and marks them done.
   221  				// It enforces that the reconcileHandler is never invoked concurrently with the same object.
   222  				for c.processNextWorkItem(ctx) {
   223  				}
   224  			}()
   225  		}
   226  
   227  		c.Started = true
   228  		return nil
   229  	}()
   230  	if err != nil {
   231  		return err
   232  	}
   233  
   234  	<-ctx.Done()
   235  	c.LogConstructor(nil).Info("Shutdown signal received, waiting for all workers to finish")
   236  	wg.Wait()
   237  	c.LogConstructor(nil).Info("All workers finished")
   238  	return nil
   239  }
   240  
   241  // processNextWorkItem will read a single work item off the workqueue and
   242  // attempt to process it, by calling the reconcileHandler.
   243  func (c *Controller) processNextWorkItem(ctx context.Context) bool {
   244  	obj, shutdown := c.Queue.Get()
   245  	if shutdown {
   246  		// Stop working
   247  		return false
   248  	}
   249  
   250  	// We call Done here so the workqueue knows we have finished
   251  	// processing this item. We also must remember to call Forget if we
   252  	// do not want this work item being re-queued. For example, we do
   253  	// not call Forget if a transient error occurs, instead the item is
   254  	// put back on the workqueue and attempted again after a back-off
   255  	// period.
   256  	defer c.Queue.Done(obj)
   257  
   258  	ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(1)
   259  	defer ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(-1)
   260  
   261  	c.reconcileHandler(ctx, obj)
   262  	return true
   263  }
   264  
   265  const (
   266  	labelError        = "error"
   267  	labelRequeueAfter = "requeue_after"
   268  	labelRequeue      = "requeue"
   269  	labelSuccess      = "success"
   270  )
   271  
   272  func (c *Controller) initMetrics() {
   273  	ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Set(0)
   274  	ctrlmetrics.ReconcileErrors.WithLabelValues(c.Name).Add(0)
   275  	ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelError).Add(0)
   276  	ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeueAfter).Add(0)
   277  	ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeue).Add(0)
   278  	ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelSuccess).Add(0)
   279  	ctrlmetrics.WorkerCount.WithLabelValues(c.Name).Set(float64(c.MaxConcurrentReconciles))
   280  }
   281  
   282  func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) {
   283  	// Update metrics after processing each item
   284  	reconcileStartTS := time.Now()
   285  	defer func() {
   286  		c.updateMetrics(time.Since(reconcileStartTS))
   287  	}()
   288  
   289  	// Make sure that the object is a valid request.
   290  	req, ok := obj.(reconcile.Request)
   291  	if !ok {
   292  		// As the item in the workqueue is actually invalid, we call
   293  		// Forget here else we'd go into a loop of attempting to
   294  		// process a work item that is invalid.
   295  		c.Queue.Forget(obj)
   296  		c.LogConstructor(nil).Error(nil, "Queue item was not a Request", "type", fmt.Sprintf("%T", obj), "value", obj)
   297  		// Return true, don't take a break
   298  		return
   299  	}
   300  
   301  	log := c.LogConstructor(&req)
   302  	reconcileID := uuid.NewUUID()
   303  
   304  	log = log.WithValues("reconcileID", reconcileID)
   305  	ctx = logf.IntoContext(ctx, log)
   306  	ctx = addReconcileID(ctx, reconcileID)
   307  
   308  	// RunInformersAndControllers the syncHandler, passing it the Namespace/Name string of the
   309  	// resource to be synced.
   310  	log.V(5).Info("Reconciling")
   311  	result, err := c.Reconcile(ctx, req)
   312  	switch {
   313  	case err != nil:
   314  		if errors.Is(err, reconcile.TerminalError(nil)) {
   315  			ctrlmetrics.TerminalReconcileErrors.WithLabelValues(c.Name).Inc()
   316  		} else {
   317  			c.Queue.AddRateLimited(req)
   318  		}
   319  		ctrlmetrics.ReconcileErrors.WithLabelValues(c.Name).Inc()
   320  		ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelError).Inc()
   321  		if !result.IsZero() {
   322  			log.Info("Warning: Reconciler returned both a non-zero result and a non-nil error. The result will always be ignored if the error is non-nil and the non-nil error causes reqeueuing with exponential backoff. For more details, see: https://pkg.go.dev/sigs.k8s.io/controller-runtime/pkg/reconcile#Reconciler")
   323  		}
   324  		log.Error(err, "Reconciler error")
   325  	case result.RequeueAfter > 0:
   326  		log.V(5).Info(fmt.Sprintf("Reconcile done, requeueing after %s", result.RequeueAfter))
   327  		// The result.RequeueAfter request will be lost, if it is returned
   328  		// along with a non-nil error. But this is intended as
   329  		// We need to drive to stable reconcile loops before queuing due
   330  		// to result.RequestAfter
   331  		c.Queue.Forget(obj)
   332  		c.Queue.AddAfter(req, result.RequeueAfter)
   333  		ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeueAfter).Inc()
   334  	case result.Requeue:
   335  		log.V(5).Info("Reconcile done, requeueing")
   336  		c.Queue.AddRateLimited(req)
   337  		ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeue).Inc()
   338  	default:
   339  		log.V(5).Info("Reconcile successful")
   340  		// Finally, if no error occurs we Forget this item so it does not
   341  		// get queued again until another change happens.
   342  		c.Queue.Forget(obj)
   343  		ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelSuccess).Inc()
   344  	}
   345  }
   346  
   347  // GetLogger returns this controller's logger.
   348  func (c *Controller) GetLogger() logr.Logger {
   349  	return c.LogConstructor(nil)
   350  }
   351  
   352  // updateMetrics updates prometheus metrics within the controller.
   353  func (c *Controller) updateMetrics(reconcileTime time.Duration) {
   354  	ctrlmetrics.ReconcileTime.WithLabelValues(c.Name).Observe(reconcileTime.Seconds())
   355  }
   356  
   357  // ReconcileIDFromContext gets the reconcileID from the current context.
   358  func ReconcileIDFromContext(ctx context.Context) types.UID {
   359  	r, ok := ctx.Value(reconcileIDKey{}).(types.UID)
   360  	if !ok {
   361  		return ""
   362  	}
   363  
   364  	return r
   365  }
   366  
   367  // reconcileIDKey is a context.Context Value key. Its associated value should
   368  // be a types.UID.
   369  type reconcileIDKey struct{}
   370  
   371  func addReconcileID(ctx context.Context, reconcileID types.UID) context.Context {
   372  	return context.WithValue(ctx, reconcileIDKey{}, reconcileID)
   373  }
   374  

View as plain text