...

Source file src/github.com/linkerd/linkerd2/controller/api/destination/external-workload/endpoints_controller.go

Documentation: github.com/linkerd/linkerd2/controller/api/destination/external-workload

     1  package externalworkload
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"fmt"
     7  	"sync"
     8  	"time"
     9  
    10  	ewv1beta1 "github.com/linkerd/linkerd2/controller/gen/apis/externalworkload/v1beta1"
    11  	"github.com/linkerd/linkerd2/controller/k8s"
    12  	logging "github.com/sirupsen/logrus"
    13  	corev1 "k8s.io/api/core/v1"
    14  	discoveryv1 "k8s.io/api/discovery/v1"
    15  	kerrors "k8s.io/apimachinery/pkg/api/errors"
    16  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    17  	"k8s.io/apimachinery/pkg/labels"
    18  	"k8s.io/client-go/tools/cache"
    19  	"k8s.io/client-go/tools/leaderelection"
    20  	"k8s.io/client-go/tools/leaderelection/resourcelock"
    21  	"k8s.io/client-go/util/workqueue"
    22  	endpointslicerec "k8s.io/endpointslice"
    23  	epsliceutil "k8s.io/endpointslice/util"
    24  )
    25  
    26  const (
    27  	// Name of the lease resource the controller will use
    28  	leaseName = "linkerd-destination-endpoint-write"
    29  
    30  	// Duration of the lease
    31  	// Core controllers (kube-controller-manager) has a duration of 15 seconds
    32  	leaseDuration = 30 * time.Second
    33  
    34  	// Deadline for the leader to refresh its lease. Core controllers have a
    35  	// deadline of 10 seconds.
    36  	leaseRenewDeadline = 10 * time.Second
    37  
    38  	// Duration a leader elector should wait in between action re-tries.
    39  	// Core controllers have a value of 2 seconds.
    40  	leaseRetryPeriod = 2 * time.Second
    41  
    42  	// Name of the controller. Used as an annotation value for all created
    43  	// EndpointSlice objects
    44  	managedBy = "linkerd-external-workloads-controller"
    45  
    46  	// Max number of endpoints per EndpointSlice
    47  	maxEndpointsQuota = 100
    48  
    49  	// Max retries for a service to be reconciled
    50  	maxRetryBudget = 15
    51  )
    52  
    53  // EndpointsController reconciles service memberships for ExternalWorkload resources
    54  // by writing EndpointSlice objects for Services that select one or more
    55  // external endpoints.
    56  type EndpointsController struct {
    57  	k8sAPI     *k8s.API
    58  	log        *logging.Entry
    59  	queue      workqueue.RateLimitingInterface
    60  	reconciler *endpointsReconciler
    61  	stop       chan struct{}
    62  
    63  	lec leaderelection.LeaderElectionConfig
    64  	informerHandlers
    65  	dropsMetric workqueue.CounterMetric
    66  }
    67  
    68  // informerHandlers holds handles to callbacks that have been registered with
    69  // the API Server client's informers.
    70  //
    71  // These callbacks will be registered when a controller is elected as leader,
    72  // and de-registered when the lease is lost.
    73  type informerHandlers struct {
    74  	ewHandle  cache.ResourceEventHandlerRegistration
    75  	esHandle  cache.ResourceEventHandlerRegistration
    76  	svcHandle cache.ResourceEventHandlerRegistration
    77  
    78  	// Mutex to guard handler registration since the elector loop may start
    79  	// executing callbacks when a controller starts reading in a background task
    80  	sync.Mutex
    81  }
    82  
    83  // The EndpointsController code has been structured (and modified) based on the
    84  // core EndpointSlice controller. Copyright 2014 The Kubernetes Authors
    85  // https://github.com/kubernetes/kubernetes/blob/29fad383dab0dd7b7b563ec9eae10156616a6f34/pkg/controller/endpointslice/endpointslice_controller.go
    86  //
    87  // There are some fundamental differences between the core endpoints controller
    88  // and Linkerd's endpoints controller; for one, the churn rate is expected to be
    89  // much lower for a controller that reconciles ExternalWorkload resources.
    90  // Furthermore, the structure of the resource is different, statuses do not
    91  // contain as many conditions, and the lifecycle of an ExternalWorkload is
    92  // different to that of a Pod (e.g. a workload is long lived).
    93  //
    94  // NewEndpointsController creates a new controller. The controller must be
    95  // started with its `Start()` method.
    96  func NewEndpointsController(k8sAPI *k8s.API, hostname, controllerNs string, stopCh chan struct{}, exportQueueMetrics bool) (*EndpointsController, error) {
    97  	queueName := "endpoints_controller_workqueue"
    98  	workQueueConfig := workqueue.RateLimitingQueueConfig{
    99  		Name: queueName,
   100  	}
   101  
   102  	var dropsMetric workqueue.CounterMetric = &noopCounterMetric{}
   103  	if exportQueueMetrics {
   104  		provider := newWorkQueueMetricsProvider()
   105  		workQueueConfig.MetricsProvider = provider
   106  		dropsMetric = provider.NewDropsMetric(queueName)
   107  	}
   108  
   109  	ec := &EndpointsController{
   110  		k8sAPI:     k8sAPI,
   111  		reconciler: newEndpointsReconciler(k8sAPI, managedBy, maxEndpointsQuota),
   112  		queue:      workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(), workQueueConfig),
   113  		stop:       stopCh,
   114  		log: logging.WithFields(logging.Fields{
   115  			"component": "external-endpoints-controller",
   116  		}),
   117  		dropsMetric: dropsMetric,
   118  	}
   119  
   120  	// Store configuration for leader elector client. The leader elector will
   121  	// accept three callbacks. When a lease is claimed, the elector will mark
   122  	// the manager as a 'leader'. When a lease is released, the elector will set
   123  	// the isLeader value back to false.
   124  	ec.lec = leaderelection.LeaderElectionConfig{
   125  		// When runtime context is cancelled, lock will be released. Implies any
   126  		// code guarded by the lease _must_ finish before cancelling.
   127  		ReleaseOnCancel: true,
   128  		Lock: &resourcelock.LeaseLock{
   129  			LeaseMeta: metav1.ObjectMeta{
   130  				Name:      leaseName,
   131  				Namespace: controllerNs,
   132  			},
   133  			Client: k8sAPI.Client.CoordinationV1(),
   134  			LockConfig: resourcelock.ResourceLockConfig{
   135  				Identity: hostname,
   136  			},
   137  		},
   138  		LeaseDuration: leaseDuration,
   139  		RenewDeadline: leaseRenewDeadline,
   140  		RetryPeriod:   leaseRetryPeriod,
   141  		Callbacks: leaderelection.LeaderCallbacks{
   142  			OnStartedLeading: func(ctx context.Context) {
   143  				err := ec.addHandlers()
   144  				if err != nil {
   145  					// If the leader has failed to register callbacks then
   146  					// panic; we are in a bad state that's hard to recover from
   147  					// gracefully.
   148  					panic(fmt.Sprintf("failed to register event handlers: %v", err))
   149  				}
   150  			},
   151  			OnStoppedLeading: func() {
   152  				err := ec.removeHandlers()
   153  				if err != nil {
   154  					// If the leader has failed to de-register callbacks then
   155  					// panic; otherwise, we risk racing with the newly elected
   156  					// leader
   157  					panic(fmt.Sprintf("failed to de-register event handlers: %v", err))
   158  				}
   159  				ec.log.Infof("%s released lease", hostname)
   160  			},
   161  			OnNewLeader: func(identity string) {
   162  				if identity == hostname {
   163  					ec.log.Infof("%s acquired lease", hostname)
   164  				}
   165  			},
   166  		},
   167  	}
   168  
   169  	return ec, nil
   170  }
   171  
   172  // addHandlers will register a set of callbacks with the different informers
   173  // needed to synchronise endpoint state.
   174  func (ec *EndpointsController) addHandlers() error {
   175  	var err error
   176  	ec.Lock()
   177  	defer ec.Unlock()
   178  
   179  	// Wipe out previously observed state. This ensures we will not have stale
   180  	// cache errors due to events that happened when callbacks were not firing.
   181  	ec.reconciler.endpointTracker = epsliceutil.NewEndpointSliceTracker()
   182  
   183  	ec.svcHandle, err = ec.k8sAPI.Svc().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   184  		AddFunc:    ec.onServiceUpdate,
   185  		DeleteFunc: ec.onServiceUpdate,
   186  		UpdateFunc: func(_, newObj interface{}) {
   187  			ec.onServiceUpdate(newObj)
   188  		},
   189  	})
   190  
   191  	if err != nil {
   192  		return err
   193  	}
   194  
   195  	ec.esHandle, err = ec.k8sAPI.ES().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   196  		AddFunc:    ec.onEndpointSliceAdd,
   197  		UpdateFunc: ec.onEndpointSliceUpdate,
   198  		DeleteFunc: ec.onEndpointSliceDelete,
   199  	})
   200  
   201  	if err != nil {
   202  		return err
   203  	}
   204  
   205  	ec.ewHandle, err = ec.k8sAPI.ExtWorkload().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   206  		AddFunc:    ec.onAddExternalWorkload,
   207  		DeleteFunc: ec.onDeleteExternalWorkload,
   208  		UpdateFunc: ec.onUpdateExternalWorkload,
   209  	})
   210  
   211  	if err != nil {
   212  		return err
   213  	}
   214  
   215  	return nil
   216  }
   217  
   218  // removeHandlers will de-register callbacks
   219  func (ec *EndpointsController) removeHandlers() error {
   220  	var err error
   221  	ec.Lock()
   222  	defer ec.Unlock()
   223  	if ec.svcHandle != nil {
   224  		if err = ec.k8sAPI.Svc().Informer().RemoveEventHandler(ec.svcHandle); err != nil {
   225  			return err
   226  		}
   227  	}
   228  
   229  	if ec.ewHandle != nil {
   230  		if err = ec.k8sAPI.ExtWorkload().Informer().RemoveEventHandler(ec.ewHandle); err != nil {
   231  			return err
   232  		}
   233  	}
   234  
   235  	if ec.esHandle != nil {
   236  		if err = ec.k8sAPI.ES().Informer().RemoveEventHandler(ec.esHandle); err != nil {
   237  			return err
   238  		}
   239  	}
   240  
   241  	return nil
   242  }
   243  
   244  // Start will run the endpoint manager's processing loop and leader elector.
   245  //
   246  // The function will spawn three background tasks; one to run the leader elector
   247  // client, one that will process updates applied by the informer
   248  // callbacks and one to handle shutdown signals and propagate them to all
   249  // components.
   250  //
   251  // Warning: Do not call Start() more than once
   252  func (ec *EndpointsController) Start() {
   253  	// Create a parent context that will be used by leader elector to gracefully
   254  	// shutdown.
   255  	//
   256  	// When cancelled (either through cancel function or by having its done
   257  	// channel closed), the leader elector will release the lease and stop its
   258  	// execution.
   259  	ctx, cancel := context.WithCancel(context.Background())
   260  	go func() {
   261  		for {
   262  			// Block until a lease is acquired or a lease has been released
   263  			leaderelection.RunOrDie(ctx, ec.lec)
   264  			// If the context has been cancelled, exit the function, otherwise
   265  			// continue spinning.
   266  			select {
   267  			case <-ctx.Done():
   268  				ec.log.Trace("leader election client received shutdown signal")
   269  				return
   270  			default:
   271  			}
   272  		}
   273  	}()
   274  
   275  	// When a shutdown signal is received over the manager's stop channel, it is
   276  	// propagated to the elector through the context object and to the queue
   277  	// through its dedicated `Shutdown()` function.
   278  	go func() {
   279  		// Block until a shutdown signal arrives
   280  		<-ec.stop
   281  		// Drain the queue before signalling the lease to terminate
   282  		ec.queue.ShutDownWithDrain()
   283  		// Propagate shutdown to elector
   284  		cancel()
   285  		ec.log.Infof("received shutdown signal")
   286  	}()
   287  
   288  	// Start a background task to process updates.
   289  	go ec.processQueue()
   290  }
   291  
   292  // processQueue spins and pops elements off the queue. When the queue has
   293  // received a shutdown signal it exists.
   294  //
   295  // The queue uses locking internally so this function is thread safe and can
   296  // have many workers call it in parallel; workers will not process the same item
   297  // at the same time.
   298  func (ec *EndpointsController) processQueue() {
   299  	for {
   300  		item, quit := ec.queue.Get()
   301  		if quit {
   302  			ec.log.Trace("queue received shutdown signal")
   303  			return
   304  		}
   305  
   306  		key, ok := item.(string)
   307  		if !ok {
   308  			ec.log.Errorf("Found queue element of type %T, was expecting a string", item)
   309  			continue
   310  		}
   311  		err := ec.syncService(key)
   312  		ec.handleError(err, key)
   313  
   314  		// Tell the queue that we are done processing this key. This will
   315  		// unblock the key for other workers to process if executing in
   316  		// parallel, or if it needs to be re-queued because another update has
   317  		// been received.
   318  		ec.queue.Done(key)
   319  	}
   320  }
   321  
   322  // handleError will look at the result of the queue update processing step and
   323  // decide whether an update should be re-tried or marked as done.
   324  //
   325  // The queue operates with an error budget. When exceeded, the item is evicted
   326  // from the queue (and its retry history wiped). Otherwise, the item is enqueued
   327  // according to the queue's rate limiting algorithm.
   328  func (ec *EndpointsController) handleError(err error, key string) {
   329  	if err == nil {
   330  		// Wipe out rate limiting history for key when processing was successful.
   331  		// Next time this key is used, it will get its own fresh rate limiter
   332  		// error budget
   333  		ec.queue.Forget(key)
   334  		return
   335  	}
   336  
   337  	if ec.queue.NumRequeues(key) < maxRetryBudget {
   338  		ec.queue.AddRateLimited(key)
   339  		return
   340  	}
   341  
   342  	ec.queue.Forget(key)
   343  	ec.dropsMetric.Inc()
   344  	ec.log.Errorf("dropped Service %s out of update queue: %v", key, err)
   345  }
   346  
   347  // syncService will run a reconciliation function for a single Service object
   348  // that needs to have its EndpointSlice objects reconciled.
   349  func (ec *EndpointsController) syncService(update string) error {
   350  	namespace, name, err := cache.SplitMetaNamespaceKey(update)
   351  	if err != nil {
   352  		return err
   353  	}
   354  
   355  	svc, err := ec.k8sAPI.Svc().Lister().Services(namespace).Get(name)
   356  	if err != nil {
   357  		// If the error is anything except a 'NotFound' then bubble up the error
   358  		// and re-queue the entry; the service will be re-processed at some
   359  		// point in the future.
   360  		if !kerrors.IsNotFound(err) {
   361  			return err
   362  		}
   363  
   364  		ec.reconciler.endpointTracker.DeleteService(namespace, name)
   365  		// The service has been deleted, return nil so that it won't be retried.
   366  		return nil
   367  	}
   368  
   369  	if svc.Spec.Type == corev1.ServiceTypeExternalName {
   370  		// services with Type ExternalName do not receive any endpoints
   371  		return nil
   372  	}
   373  
   374  	if svc.Spec.Selector == nil {
   375  		// services without a selector will not get any endpoints automatically
   376  		// created; this is done out-of-band by the service operator
   377  		return nil
   378  	}
   379  
   380  	ewSelector := labels.Set(svc.Spec.Selector).AsSelectorPreValidated()
   381  	ews, err := ec.k8sAPI.ExtWorkload().Lister().List(ewSelector)
   382  	if err != nil {
   383  		// This operation should be infallible since we retrieve from the cache
   384  		// (we can guarantee we will receive at least an empty list), for good
   385  		// measure, bubble up the error if one will be returned by the informer.
   386  		return err
   387  	}
   388  
   389  	esSelector := labels.Set(map[string]string{
   390  		discoveryv1.LabelServiceName: svc.Name,
   391  		discoveryv1.LabelManagedBy:   managedBy,
   392  	}).AsSelectorPreValidated()
   393  	epSlices, err := ec.k8sAPI.ES().Lister().List(esSelector)
   394  	if err != nil {
   395  		return err
   396  	}
   397  
   398  	epSlices = dropEndpointSlicesPendingDeletion(epSlices)
   399  	if ec.reconciler.endpointTracker.StaleSlices(svc, epSlices) {
   400  		ec.log.Warnf("detected EndpointSlice informer cache is out of date when processing %s", update)
   401  		return errors.New("EndpointSlice informer cache is out of date")
   402  	}
   403  	err = ec.reconciler.reconcile(svc, ews, epSlices)
   404  	if err != nil {
   405  		return err
   406  	}
   407  
   408  	return nil
   409  }
   410  
   411  // When a service update has been received (regardless of the event type, i.e.
   412  // can be Added, Modified, Deleted) send it to the endpoint controller for
   413  // processing.
   414  func (ec *EndpointsController) onServiceUpdate(obj interface{}) {
   415  	key, err := cache.MetaNamespaceKeyFunc(obj)
   416  	if err != nil {
   417  		ec.log.Infof("failed to get key for object %+v: %v", obj, err)
   418  		return
   419  	}
   420  
   421  	namespace, _, err := cache.SplitMetaNamespaceKey(key)
   422  	if err != nil {
   423  		ec.log.Infof("failed to get namespace from key %s: %v", key, err)
   424  	}
   425  
   426  	// Skip processing 'core' services
   427  	if namespace == "kube-system" {
   428  		return
   429  	}
   430  
   431  	ec.queue.Add(key)
   432  }
   433  
   434  // onEndpointSliceAdd queues a sync for the relevant Service for a sync if the
   435  // EndpointSlice resource version does not match the expected version in the
   436  // endpointSliceTracker.
   437  func (ec *EndpointsController) onEndpointSliceAdd(obj interface{}) {
   438  	es := obj.(*discoveryv1.EndpointSlice)
   439  	if es == nil {
   440  		ec.log.Info("Invalid EndpointSlice provided to onEndpointSliceAdd()")
   441  		return
   442  	}
   443  
   444  	if managedByController(es) && ec.reconciler.endpointTracker.ShouldSync(es) {
   445  		ec.queueServiceForEndpointSlice(es)
   446  	}
   447  }
   448  
   449  // onEndpointSliceUpdate queues a sync for the relevant Service for a sync if
   450  // the EndpointSlice resource version does not match the expected version in the
   451  // endpointSliceTracker or the managed-by value of the EndpointSlice has changed
   452  // from or to this controller.
   453  func (ec *EndpointsController) onEndpointSliceUpdate(prevObj, obj interface{}) {
   454  	prevEndpointSlice := prevObj.(*discoveryv1.EndpointSlice)
   455  	endpointSlice := obj.(*discoveryv1.EndpointSlice)
   456  	if endpointSlice == nil || prevEndpointSlice == nil {
   457  		ec.log.Info("Invalid EndpointSlice provided to onEndpointSliceUpdate()")
   458  		return
   459  	}
   460  
   461  	// EndpointSlice generation does not change when labels change. Although the
   462  	// controller will never change LabelServiceName, users might. This check
   463  	// ensures that we handle changes to this label.
   464  	svcName := endpointSlice.Labels[discoveryv1.LabelServiceName]
   465  	prevSvcName := prevEndpointSlice.Labels[discoveryv1.LabelServiceName]
   466  	if svcName != prevSvcName {
   467  		ec.log.Infof("label changed label: %s, oldService: %s, newService: %s, endpointsliece: %s", discoveryv1.LabelServiceName, prevSvcName, svcName, endpointSlice.Name)
   468  		ec.queueServiceForEndpointSlice(endpointSlice)
   469  		ec.queueServiceForEndpointSlice(prevEndpointSlice)
   470  		return
   471  	}
   472  	if managedByChanged(prevEndpointSlice, endpointSlice) ||
   473  		(managedByController(endpointSlice) && ec.reconciler.endpointTracker.ShouldSync(endpointSlice)) {
   474  		ec.queueServiceForEndpointSlice(endpointSlice)
   475  	}
   476  }
   477  
   478  // onEndpointSliceDelete queues a sync for the relevant Service for a sync if the
   479  // EndpointSlice resource version does not match the expected version in the
   480  // endpointSliceTracker.
   481  func (ec *EndpointsController) onEndpointSliceDelete(obj interface{}) {
   482  	endpointSlice := ec.getEndpointSliceFromDeleteAction(obj)
   483  	if endpointSlice != nil && managedByController(endpointSlice) && ec.reconciler.endpointTracker.Has(endpointSlice) {
   484  		// This returns false if we didn't expect the EndpointSlice to be
   485  		// deleted. If that is the case, we queue the Service for another sync.
   486  		if !ec.reconciler.endpointTracker.HandleDeletion(endpointSlice) {
   487  			ec.queueServiceForEndpointSlice(endpointSlice)
   488  		}
   489  	}
   490  }
   491  
   492  // queueServiceForEndpointSlice attempts to queue the corresponding Service for
   493  // the provided EndpointSlice.
   494  func (ec *EndpointsController) queueServiceForEndpointSlice(endpointSlice *discoveryv1.EndpointSlice) {
   495  	key, err := endpointslicerec.ServiceControllerKey(endpointSlice)
   496  	if err != nil {
   497  		ec.log.Errorf("Couldn't get key for EndpointSlice %+v: %v", endpointSlice, err)
   498  		return
   499  	}
   500  
   501  	ec.queue.Add(key)
   502  }
   503  
   504  func (ec *EndpointsController) onAddExternalWorkload(obj interface{}) {
   505  	ew, ok := obj.(*ewv1beta1.ExternalWorkload)
   506  	if !ok {
   507  		ec.log.Errorf("couldn't get ExternalWorkload from object %#v", obj)
   508  		return
   509  	}
   510  
   511  	services, err := ec.getExternalWorkloadSvcMembership(ew)
   512  	if err != nil {
   513  		ec.log.Errorf("failed to get service membership for %s/%s: %v", ew.Namespace, ew.Name, err)
   514  		return
   515  	}
   516  
   517  	for svc := range services {
   518  		ec.queue.Add(svc)
   519  	}
   520  }
   521  
   522  func (ec *EndpointsController) onUpdateExternalWorkload(old, cur interface{}) {
   523  	services := ec.getServicesToUpdateOnExternalWorkloadChange(old, cur)
   524  
   525  	for svc := range services {
   526  		ec.queue.Add(svc)
   527  	}
   528  }
   529  
   530  func (ec *EndpointsController) onDeleteExternalWorkload(obj interface{}) {
   531  	ew := ec.getExternalWorkloadFromDeleteAction(obj)
   532  	if ew != nil {
   533  		ec.onAddExternalWorkload(ew)
   534  	}
   535  }
   536  
   537  func dropEndpointSlicesPendingDeletion(endpointSlices []*discoveryv1.EndpointSlice) []*discoveryv1.EndpointSlice {
   538  	n := 0
   539  	for _, endpointSlice := range endpointSlices {
   540  		if endpointSlice.DeletionTimestamp == nil {
   541  			endpointSlices[n] = endpointSlice
   542  			n++
   543  		}
   544  	}
   545  	return endpointSlices[:n]
   546  }
   547  

View as plain text