...

Source file src/k8s.io/kubernetes/pkg/controller/endpointslice/endpointslice_controller.go

Documentation: k8s.io/kubernetes/pkg/controller/endpointslice

     1  /*
     2  Copyright 2019 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package endpointslice
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"time"
    23  
    24  	"golang.org/x/time/rate"
    25  
    26  	v1 "k8s.io/api/core/v1"
    27  	discovery "k8s.io/api/discovery/v1"
    28  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    29  	"k8s.io/apimachinery/pkg/labels"
    30  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    31  	"k8s.io/apimachinery/pkg/util/wait"
    32  	utilfeature "k8s.io/apiserver/pkg/util/feature"
    33  	coreinformers "k8s.io/client-go/informers/core/v1"
    34  	discoveryinformers "k8s.io/client-go/informers/discovery/v1"
    35  	clientset "k8s.io/client-go/kubernetes"
    36  	"k8s.io/client-go/kubernetes/scheme"
    37  	v1core "k8s.io/client-go/kubernetes/typed/core/v1"
    38  	corelisters "k8s.io/client-go/listers/core/v1"
    39  	discoverylisters "k8s.io/client-go/listers/discovery/v1"
    40  	"k8s.io/client-go/tools/cache"
    41  	"k8s.io/client-go/tools/record"
    42  	"k8s.io/client-go/util/workqueue"
    43  	endpointslicerec "k8s.io/endpointslice"
    44  	endpointslicemetrics "k8s.io/endpointslice/metrics"
    45  	"k8s.io/endpointslice/topologycache"
    46  	endpointsliceutil "k8s.io/endpointslice/util"
    47  	"k8s.io/klog/v2"
    48  	"k8s.io/kubernetes/pkg/controller"
    49  	endpointslicepkg "k8s.io/kubernetes/pkg/controller/util/endpointslice"
    50  	"k8s.io/kubernetes/pkg/features"
    51  )
    52  
    53  const (
    54  	// maxRetries is the number of times a service will be retried before it is
    55  	// dropped out of the queue. Any sync error, such as a failure to create or
    56  	// update an EndpointSlice could trigger a retry. With the current
    57  	// rate-limiter in use (1s*2^(numRetries-1)) the following numbers represent
    58  	// the sequence of delays between successive queuings of a service.
    59  	//
    60  	// 1s, 2s, 4s, 8s, 16s, 32s, 64s, 128s, 256s, 512s, 1000s (max)
    61  	maxRetries = 15
    62  
    63  	// endpointSliceChangeMinSyncDelay indicates the minimum delay before
    64  	// queuing a syncService call after an EndpointSlice changes. If
    65  	// endpointUpdatesBatchPeriod is greater than this value, it will be used
    66  	// instead. This helps batch processing of changes to multiple
    67  	// EndpointSlices.
    68  	endpointSliceChangeMinSyncDelay = 1 * time.Second
    69  
    70  	// defaultSyncBackOff is the default backoff period for syncService calls.
    71  	defaultSyncBackOff = 1 * time.Second
    72  	// maxSyncBackOff is the max backoff period for syncService calls.
    73  	maxSyncBackOff = 1000 * time.Second
    74  
    75  	// controllerName is a unique value used with LabelManagedBy to indicated
    76  	// the component managing an EndpointSlice.
    77  	controllerName = "endpointslice-controller.k8s.io"
    78  )
    79  
    80  // NewController creates and initializes a new Controller
    81  func NewController(ctx context.Context, podInformer coreinformers.PodInformer,
    82  	serviceInformer coreinformers.ServiceInformer,
    83  	nodeInformer coreinformers.NodeInformer,
    84  	endpointSliceInformer discoveryinformers.EndpointSliceInformer,
    85  	maxEndpointsPerSlice int32,
    86  	client clientset.Interface,
    87  	endpointUpdatesBatchPeriod time.Duration,
    88  ) *Controller {
    89  	broadcaster := record.NewBroadcaster(record.WithContext(ctx))
    90  	recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "endpoint-slice-controller"})
    91  
    92  	endpointslicemetrics.RegisterMetrics()
    93  
    94  	c := &Controller{
    95  		client: client,
    96  		// This is similar to the DefaultControllerRateLimiter, just with a
    97  		// significantly higher default backoff (1s vs 5ms). This controller
    98  		// processes events that can require significant EndpointSlice changes,
    99  		// such as an update to a Service or Deployment. A more significant
   100  		// rate limit back off here helps ensure that the Controller does not
   101  		// overwhelm the API Server.
   102  		queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewMaxOfRateLimiter(
   103  			workqueue.NewItemExponentialFailureRateLimiter(defaultSyncBackOff, maxSyncBackOff),
   104  			// 10 qps, 100 bucket size. This is only for retry speed and its
   105  			// only the overall factor (not per item).
   106  			&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
   107  		), "endpoint_slice"),
   108  		workerLoopPeriod: time.Second,
   109  	}
   110  
   111  	serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   112  		AddFunc: c.onServiceUpdate,
   113  		UpdateFunc: func(old, cur interface{}) {
   114  			c.onServiceUpdate(cur)
   115  		},
   116  		DeleteFunc: c.onServiceDelete,
   117  	})
   118  	c.serviceLister = serviceInformer.Lister()
   119  	c.servicesSynced = serviceInformer.Informer().HasSynced
   120  
   121  	podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   122  		AddFunc:    c.addPod,
   123  		UpdateFunc: c.updatePod,
   124  		DeleteFunc: c.deletePod,
   125  	})
   126  	c.podLister = podInformer.Lister()
   127  	c.podsSynced = podInformer.Informer().HasSynced
   128  
   129  	c.nodeLister = nodeInformer.Lister()
   130  	c.nodesSynced = nodeInformer.Informer().HasSynced
   131  
   132  	logger := klog.FromContext(ctx)
   133  	endpointSliceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   134  		AddFunc: c.onEndpointSliceAdd,
   135  		UpdateFunc: func(oldObj, newObj interface{}) {
   136  			c.onEndpointSliceUpdate(logger, oldObj, newObj)
   137  		},
   138  		DeleteFunc: c.onEndpointSliceDelete,
   139  	})
   140  
   141  	c.endpointSliceLister = endpointSliceInformer.Lister()
   142  	c.endpointSlicesSynced = endpointSliceInformer.Informer().HasSynced
   143  	c.endpointSliceTracker = endpointsliceutil.NewEndpointSliceTracker()
   144  
   145  	c.maxEndpointsPerSlice = maxEndpointsPerSlice
   146  
   147  	c.triggerTimeTracker = endpointsliceutil.NewTriggerTimeTracker()
   148  
   149  	c.eventBroadcaster = broadcaster
   150  	c.eventRecorder = recorder
   151  
   152  	c.endpointUpdatesBatchPeriod = endpointUpdatesBatchPeriod
   153  
   154  	if utilfeature.DefaultFeatureGate.Enabled(features.TopologyAwareHints) {
   155  		nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   156  			AddFunc: func(obj interface{}) {
   157  				c.addNode(logger, obj)
   158  			},
   159  			UpdateFunc: func(oldObj, newObj interface{}) {
   160  				c.updateNode(logger, oldObj, newObj)
   161  			},
   162  			DeleteFunc: func(obj interface{}) {
   163  				c.deleteNode(logger, obj)
   164  			},
   165  		})
   166  
   167  		c.topologyCache = topologycache.NewTopologyCache()
   168  	}
   169  
   170  	c.reconciler = endpointslicerec.NewReconciler(
   171  		c.client,
   172  		c.nodeLister,
   173  		c.maxEndpointsPerSlice,
   174  		c.endpointSliceTracker,
   175  		c.topologyCache,
   176  		c.eventRecorder,
   177  		controllerName,
   178  		endpointslicerec.WithTrafficDistributionEnabled(utilfeature.DefaultFeatureGate.Enabled(features.ServiceTrafficDistribution)),
   179  	)
   180  
   181  	return c
   182  }
   183  
   184  // Controller manages selector-based service endpoint slices
   185  type Controller struct {
   186  	client           clientset.Interface
   187  	eventBroadcaster record.EventBroadcaster
   188  	eventRecorder    record.EventRecorder
   189  
   190  	// serviceLister is able to list/get services and is populated by the
   191  	// shared informer passed to NewController
   192  	serviceLister corelisters.ServiceLister
   193  	// servicesSynced returns true if the service shared informer has been synced at least once.
   194  	// Added as a member to the struct to allow injection for testing.
   195  	servicesSynced cache.InformerSynced
   196  
   197  	// podLister is able to list/get pods and is populated by the
   198  	// shared informer passed to NewController
   199  	podLister corelisters.PodLister
   200  	// podsSynced returns true if the pod shared informer has been synced at least once.
   201  	// Added as a member to the struct to allow injection for testing.
   202  	podsSynced cache.InformerSynced
   203  
   204  	// endpointSliceLister is able to list/get endpoint slices and is populated by the
   205  	// shared informer passed to NewController
   206  	endpointSliceLister discoverylisters.EndpointSliceLister
   207  	// endpointSlicesSynced returns true if the endpoint slice shared informer has been synced at least once.
   208  	// Added as a member to the struct to allow injection for testing.
   209  	endpointSlicesSynced cache.InformerSynced
   210  	// endpointSliceTracker tracks the list of EndpointSlices and associated
   211  	// resource versions expected for each Service. It can help determine if a
   212  	// cached EndpointSlice is out of date.
   213  	endpointSliceTracker *endpointsliceutil.EndpointSliceTracker
   214  
   215  	// nodeLister is able to list/get nodes and is populated by the
   216  	// shared informer passed to NewController
   217  	nodeLister corelisters.NodeLister
   218  	// nodesSynced returns true if the node shared informer has been synced at least once.
   219  	// Added as a member to the struct to allow injection for testing.
   220  	nodesSynced cache.InformerSynced
   221  
   222  	// reconciler is an util used to reconcile EndpointSlice changes.
   223  	reconciler *endpointslicerec.Reconciler
   224  
   225  	// triggerTimeTracker is an util used to compute and export the
   226  	// EndpointsLastChangeTriggerTime annotation.
   227  	triggerTimeTracker *endpointsliceutil.TriggerTimeTracker
   228  
   229  	// Services that need to be updated. A channel is inappropriate here,
   230  	// because it allows services with lots of pods to be serviced much
   231  	// more often than services with few pods; it also would cause a
   232  	// service that's inserted multiple times to be processed more than
   233  	// necessary.
   234  	queue workqueue.RateLimitingInterface
   235  
   236  	// maxEndpointsPerSlice references the maximum number of endpoints that
   237  	// should be added to an EndpointSlice
   238  	maxEndpointsPerSlice int32
   239  
   240  	// workerLoopPeriod is the time between worker runs. The workers
   241  	// process the queue of service and pod changes
   242  	workerLoopPeriod time.Duration
   243  
   244  	// endpointUpdatesBatchPeriod is an artificial delay added to all service syncs triggered by pod changes.
   245  	// This can be used to reduce overall number of all endpoint slice updates.
   246  	endpointUpdatesBatchPeriod time.Duration
   247  
   248  	// topologyCache tracks the distribution of Nodes and endpoints across zones
   249  	// to enable TopologyAwareHints.
   250  	topologyCache *topologycache.TopologyCache
   251  }
   252  
   253  // Run will not return until stopCh is closed.
   254  func (c *Controller) Run(ctx context.Context, workers int) {
   255  	defer utilruntime.HandleCrash()
   256  
   257  	// Start events processing pipeline.
   258  	c.eventBroadcaster.StartLogging(klog.Infof)
   259  	c.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: c.client.CoreV1().Events("")})
   260  	defer c.eventBroadcaster.Shutdown()
   261  
   262  	defer c.queue.ShutDown()
   263  
   264  	logger := klog.FromContext(ctx)
   265  	logger.Info("Starting endpoint slice controller")
   266  	defer logger.Info("Shutting down endpoint slice controller")
   267  
   268  	if !cache.WaitForNamedCacheSync("endpoint_slice", ctx.Done(), c.podsSynced, c.servicesSynced, c.endpointSlicesSynced, c.nodesSynced) {
   269  		return
   270  	}
   271  
   272  	logger.V(2).Info("Starting worker threads", "total", workers)
   273  	for i := 0; i < workers; i++ {
   274  		go wait.Until(func() { c.worker(logger) }, c.workerLoopPeriod, ctx.Done())
   275  	}
   276  
   277  	<-ctx.Done()
   278  }
   279  
   280  // worker runs a worker thread that just dequeues items, processes them, and
   281  // marks them done. You may run as many of these in parallel as you wish; the
   282  // workqueue guarantees that they will not end up processing the same service
   283  // at the same time
   284  func (c *Controller) worker(logger klog.Logger) {
   285  	for c.processNextWorkItem(logger) {
   286  	}
   287  }
   288  
   289  func (c *Controller) processNextWorkItem(logger klog.Logger) bool {
   290  	cKey, quit := c.queue.Get()
   291  	if quit {
   292  		return false
   293  	}
   294  	defer c.queue.Done(cKey)
   295  
   296  	err := c.syncService(logger, cKey.(string))
   297  	c.handleErr(logger, err, cKey)
   298  
   299  	return true
   300  }
   301  
   302  func (c *Controller) handleErr(logger klog.Logger, err error, key interface{}) {
   303  	trackSync(err)
   304  
   305  	if err == nil {
   306  		c.queue.Forget(key)
   307  		return
   308  	}
   309  
   310  	if c.queue.NumRequeues(key) < maxRetries {
   311  		logger.Info("Error syncing endpoint slices for service, retrying", "key", key, "err", err)
   312  		c.queue.AddRateLimited(key)
   313  		return
   314  	}
   315  
   316  	logger.Info("Retry budget exceeded, dropping service out of the queue", "key", key, "err", err)
   317  	c.queue.Forget(key)
   318  	utilruntime.HandleError(err)
   319  }
   320  
   321  func (c *Controller) syncService(logger klog.Logger, key string) error {
   322  	startTime := time.Now()
   323  	defer func() {
   324  		logger.V(4).Info("Finished syncing service endpoint slices", "key", key, "elapsedTime", time.Since(startTime))
   325  	}()
   326  
   327  	namespace, name, err := cache.SplitMetaNamespaceKey(key)
   328  	if err != nil {
   329  		return err
   330  	}
   331  
   332  	service, err := c.serviceLister.Services(namespace).Get(name)
   333  	if err != nil {
   334  		if !apierrors.IsNotFound(err) {
   335  			return err
   336  		}
   337  
   338  		c.triggerTimeTracker.DeleteService(namespace, name)
   339  		c.reconciler.DeleteService(namespace, name)
   340  		c.endpointSliceTracker.DeleteService(namespace, name)
   341  		// The service has been deleted, return nil so that it won't be retried.
   342  		return nil
   343  	}
   344  
   345  	if service.Spec.Type == v1.ServiceTypeExternalName {
   346  		// services with Type ExternalName receive no endpoints from this controller;
   347  		// Ref: https://issues.k8s.io/105986
   348  		return nil
   349  	}
   350  
   351  	if service.Spec.Selector == nil {
   352  		// services without a selector receive no endpoint slices from this controller;
   353  		// these services will receive endpoint slices that are created out-of-band via the REST API.
   354  		return nil
   355  	}
   356  
   357  	logger.V(5).Info("About to update endpoint slices for service", "key", key)
   358  
   359  	podLabelSelector := labels.Set(service.Spec.Selector).AsSelectorPreValidated()
   360  	pods, err := c.podLister.Pods(service.Namespace).List(podLabelSelector)
   361  	if err != nil {
   362  		// Since we're getting stuff from a local cache, it is basically
   363  		// impossible to get this error.
   364  		c.eventRecorder.Eventf(service, v1.EventTypeWarning, "FailedToListPods",
   365  			"Error listing Pods for Service %s/%s: %v", service.Namespace, service.Name, err)
   366  		return err
   367  	}
   368  
   369  	esLabelSelector := labels.Set(map[string]string{
   370  		discovery.LabelServiceName: service.Name,
   371  		discovery.LabelManagedBy:   c.reconciler.GetControllerName(),
   372  	}).AsSelectorPreValidated()
   373  	endpointSlices, err := c.endpointSliceLister.EndpointSlices(service.Namespace).List(esLabelSelector)
   374  
   375  	if err != nil {
   376  		// Since we're getting stuff from a local cache, it is basically
   377  		// impossible to get this error.
   378  		c.eventRecorder.Eventf(service, v1.EventTypeWarning, "FailedToListEndpointSlices",
   379  			"Error listing Endpoint Slices for Service %s/%s: %v", service.Namespace, service.Name, err)
   380  		return err
   381  	}
   382  
   383  	// Drop EndpointSlices that have been marked for deletion to prevent the controller from getting stuck.
   384  	endpointSlices = dropEndpointSlicesPendingDeletion(endpointSlices)
   385  
   386  	if c.endpointSliceTracker.StaleSlices(service, endpointSlices) {
   387  		return endpointslicepkg.NewStaleInformerCache("EndpointSlice informer cache is out of date")
   388  	}
   389  
   390  	// We call ComputeEndpointLastChangeTriggerTime here to make sure that the
   391  	// state of the trigger time tracker gets updated even if the sync turns out
   392  	// to be no-op and we don't update the EndpointSlice objects.
   393  	lastChangeTriggerTime := c.triggerTimeTracker.
   394  		ComputeEndpointLastChangeTriggerTime(namespace, service, pods)
   395  
   396  	err = c.reconciler.Reconcile(logger, service, pods, endpointSlices, lastChangeTriggerTime)
   397  	if err != nil {
   398  		c.eventRecorder.Eventf(service, v1.EventTypeWarning, "FailedToUpdateEndpointSlices",
   399  			"Error updating Endpoint Slices for Service %s/%s: %v", service.Namespace, service.Name, err)
   400  		return err
   401  	}
   402  
   403  	return nil
   404  }
   405  
   406  // onServiceUpdate updates the Service Selector in the cache and queues the Service for processing.
   407  func (c *Controller) onServiceUpdate(obj interface{}) {
   408  	key, err := controller.KeyFunc(obj)
   409  	if err != nil {
   410  		utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
   411  		return
   412  	}
   413  
   414  	c.queue.Add(key)
   415  }
   416  
   417  // onServiceDelete removes the Service Selector from the cache and queues the Service for processing.
   418  func (c *Controller) onServiceDelete(obj interface{}) {
   419  	key, err := controller.KeyFunc(obj)
   420  	if err != nil {
   421  		utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
   422  		return
   423  	}
   424  
   425  	c.queue.Add(key)
   426  }
   427  
   428  // onEndpointSliceAdd queues a sync for the relevant Service for a sync if the
   429  // EndpointSlice resource version does not match the expected version in the
   430  // endpointSliceTracker.
   431  func (c *Controller) onEndpointSliceAdd(obj interface{}) {
   432  	endpointSlice := obj.(*discovery.EndpointSlice)
   433  	if endpointSlice == nil {
   434  		utilruntime.HandleError(fmt.Errorf("Invalid EndpointSlice provided to onEndpointSliceAdd()"))
   435  		return
   436  	}
   437  	if c.reconciler.ManagedByController(endpointSlice) && c.endpointSliceTracker.ShouldSync(endpointSlice) {
   438  		c.queueServiceForEndpointSlice(endpointSlice)
   439  	}
   440  }
   441  
   442  // onEndpointSliceUpdate queues a sync for the relevant Service for a sync if
   443  // the EndpointSlice resource version does not match the expected version in the
   444  // endpointSliceTracker or the managed-by value of the EndpointSlice has changed
   445  // from or to this controller.
   446  func (c *Controller) onEndpointSliceUpdate(logger klog.Logger, prevObj, obj interface{}) {
   447  	prevEndpointSlice := prevObj.(*discovery.EndpointSlice)
   448  	endpointSlice := obj.(*discovery.EndpointSlice)
   449  	if endpointSlice == nil || prevEndpointSlice == nil {
   450  		utilruntime.HandleError(fmt.Errorf("Invalid EndpointSlice provided to onEndpointSliceUpdate()"))
   451  		return
   452  	}
   453  	// EndpointSlice generation does not change when labels change. Although the
   454  	// controller will never change LabelServiceName, users might. This check
   455  	// ensures that we handle changes to this label.
   456  	svcName := endpointSlice.Labels[discovery.LabelServiceName]
   457  	prevSvcName := prevEndpointSlice.Labels[discovery.LabelServiceName]
   458  	if svcName != prevSvcName {
   459  		logger.Info("label changed", "label", discovery.LabelServiceName, "oldService", prevSvcName, "newService", svcName, "endpointslice", klog.KObj(endpointSlice))
   460  		c.queueServiceForEndpointSlice(endpointSlice)
   461  		c.queueServiceForEndpointSlice(prevEndpointSlice)
   462  		return
   463  	}
   464  	if c.reconciler.ManagedByChanged(prevEndpointSlice, endpointSlice) || (c.reconciler.ManagedByController(endpointSlice) && c.endpointSliceTracker.ShouldSync(endpointSlice)) {
   465  		c.queueServiceForEndpointSlice(endpointSlice)
   466  	}
   467  }
   468  
   469  // onEndpointSliceDelete queues a sync for the relevant Service for a sync if the
   470  // EndpointSlice resource version does not match the expected version in the
   471  // endpointSliceTracker.
   472  func (c *Controller) onEndpointSliceDelete(obj interface{}) {
   473  	endpointSlice := getEndpointSliceFromDeleteAction(obj)
   474  	if endpointSlice != nil && c.reconciler.ManagedByController(endpointSlice) && c.endpointSliceTracker.Has(endpointSlice) {
   475  		// This returns false if we didn't expect the EndpointSlice to be
   476  		// deleted. If that is the case, we queue the Service for another sync.
   477  		if !c.endpointSliceTracker.HandleDeletion(endpointSlice) {
   478  			c.queueServiceForEndpointSlice(endpointSlice)
   479  		}
   480  	}
   481  }
   482  
   483  // queueServiceForEndpointSlice attempts to queue the corresponding Service for
   484  // the provided EndpointSlice.
   485  func (c *Controller) queueServiceForEndpointSlice(endpointSlice *discovery.EndpointSlice) {
   486  	key, err := endpointslicerec.ServiceControllerKey(endpointSlice)
   487  	if err != nil {
   488  		utilruntime.HandleError(fmt.Errorf("Couldn't get key for EndpointSlice %+v: %v", endpointSlice, err))
   489  		return
   490  	}
   491  
   492  	// queue after the max of endpointSliceChangeMinSyncDelay and
   493  	// endpointUpdatesBatchPeriod.
   494  	delay := endpointSliceChangeMinSyncDelay
   495  	if c.endpointUpdatesBatchPeriod > delay {
   496  		delay = c.endpointUpdatesBatchPeriod
   497  	}
   498  	c.queue.AddAfter(key, delay)
   499  }
   500  
   501  func (c *Controller) addPod(obj interface{}) {
   502  	pod := obj.(*v1.Pod)
   503  	services, err := endpointsliceutil.GetPodServiceMemberships(c.serviceLister, pod)
   504  	if err != nil {
   505  		utilruntime.HandleError(fmt.Errorf("Unable to get pod %s/%s's service memberships: %v", pod.Namespace, pod.Name, err))
   506  		return
   507  	}
   508  	for key := range services {
   509  		c.queue.AddAfter(key, c.endpointUpdatesBatchPeriod)
   510  	}
   511  }
   512  
   513  func (c *Controller) updatePod(old, cur interface{}) {
   514  	services := endpointsliceutil.GetServicesToUpdateOnPodChange(c.serviceLister, old, cur)
   515  	for key := range services {
   516  		c.queue.AddAfter(key, c.endpointUpdatesBatchPeriod)
   517  	}
   518  }
   519  
   520  // When a pod is deleted, enqueue the services the pod used to be a member of
   521  // obj could be an *v1.Pod, or a DeletionFinalStateUnknown marker item.
   522  func (c *Controller) deletePod(obj interface{}) {
   523  	pod := endpointsliceutil.GetPodFromDeleteAction(obj)
   524  	if pod != nil {
   525  		c.addPod(pod)
   526  	}
   527  }
   528  
   529  func (c *Controller) addNode(logger klog.Logger, obj interface{}) {
   530  	c.checkNodeTopologyDistribution(logger)
   531  }
   532  
   533  func (c *Controller) updateNode(logger klog.Logger, old, cur interface{}) {
   534  	oldNode := old.(*v1.Node)
   535  	curNode := cur.(*v1.Node)
   536  
   537  	// LabelTopologyZone may be added by cloud provider asynchronously after the Node is created.
   538  	// The topology cache should be updated in this case.
   539  	if isNodeReady(oldNode) != isNodeReady(curNode) ||
   540  		oldNode.Labels[v1.LabelTopologyZone] != curNode.Labels[v1.LabelTopologyZone] {
   541  		c.checkNodeTopologyDistribution(logger)
   542  	}
   543  }
   544  
   545  func (c *Controller) deleteNode(logger klog.Logger, obj interface{}) {
   546  	c.checkNodeTopologyDistribution(logger)
   547  }
   548  
   549  // checkNodeTopologyDistribution updates Nodes in the topology cache and then
   550  // queues any Services that are past the threshold.
   551  func (c *Controller) checkNodeTopologyDistribution(logger klog.Logger) {
   552  	if c.topologyCache == nil {
   553  		return
   554  	}
   555  	nodes, err := c.nodeLister.List(labels.Everything())
   556  	if err != nil {
   557  		logger.Error(err, "Error listing Nodes")
   558  		return
   559  	}
   560  	c.topologyCache.SetNodes(logger, nodes)
   561  	serviceKeys := c.topologyCache.GetOverloadedServices()
   562  	for _, serviceKey := range serviceKeys {
   563  		logger.V(2).Info("Queuing Service after Node change due to overloading", "key", serviceKey)
   564  		c.queue.Add(serviceKey)
   565  	}
   566  }
   567  
   568  // trackSync increments the EndpointSliceSyncs metric with the result of a sync.
   569  func trackSync(err error) {
   570  	metricLabel := "success"
   571  	if err != nil {
   572  		if endpointslicepkg.IsStaleInformerCacheErr(err) {
   573  			metricLabel = "stale"
   574  		} else {
   575  			metricLabel = "error"
   576  		}
   577  	}
   578  	endpointslicemetrics.EndpointSliceSyncs.WithLabelValues(metricLabel).Inc()
   579  }
   580  
   581  func dropEndpointSlicesPendingDeletion(endpointSlices []*discovery.EndpointSlice) []*discovery.EndpointSlice {
   582  	n := 0
   583  	for _, endpointSlice := range endpointSlices {
   584  		if endpointSlice.DeletionTimestamp == nil {
   585  			endpointSlices[n] = endpointSlice
   586  			n++
   587  		}
   588  	}
   589  	return endpointSlices[:n]
   590  }
   591  
   592  // getEndpointSliceFromDeleteAction parses an EndpointSlice from a delete action.
   593  func getEndpointSliceFromDeleteAction(obj interface{}) *discovery.EndpointSlice {
   594  	if endpointSlice, ok := obj.(*discovery.EndpointSlice); ok {
   595  		// Enqueue all the services that the pod used to be a member of.
   596  		// This is the same thing we do when we add a pod.
   597  		return endpointSlice
   598  	}
   599  	// If we reached here it means the pod was deleted but its final state is unrecorded.
   600  	tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
   601  	if !ok {
   602  		utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))
   603  		return nil
   604  	}
   605  	endpointSlice, ok := tombstone.Obj.(*discovery.EndpointSlice)
   606  	if !ok {
   607  		utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a EndpointSlice: %#v", obj))
   608  		return nil
   609  	}
   610  	return endpointSlice
   611  }
   612  
   613  // isNodeReady returns true if a node is ready; false otherwise.
   614  func isNodeReady(node *v1.Node) bool {
   615  	for _, c := range node.Status.Conditions {
   616  		if c.Type == v1.NodeReady {
   617  			return c.Status == v1.ConditionTrue
   618  		}
   619  	}
   620  	return false
   621  }
   622  

View as plain text