...

Source file src/k8s.io/kubernetes/pkg/controller/endpointslicemirroring/endpointslicemirroring_controller.go

Documentation: k8s.io/kubernetes/pkg/controller/endpointslicemirroring

     1  /*
     2  Copyright 2020 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package endpointslicemirroring
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"time"
    23  
    24  	"golang.org/x/time/rate"
    25  
    26  	v1 "k8s.io/api/core/v1"
    27  	discovery "k8s.io/api/discovery/v1"
    28  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    29  	"k8s.io/apimachinery/pkg/labels"
    30  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    31  	"k8s.io/apimachinery/pkg/util/wait"
    32  	coreinformers "k8s.io/client-go/informers/core/v1"
    33  	discoveryinformers "k8s.io/client-go/informers/discovery/v1"
    34  	clientset "k8s.io/client-go/kubernetes"
    35  	"k8s.io/client-go/kubernetes/scheme"
    36  	v1core "k8s.io/client-go/kubernetes/typed/core/v1"
    37  	corelisters "k8s.io/client-go/listers/core/v1"
    38  	discoverylisters "k8s.io/client-go/listers/discovery/v1"
    39  	"k8s.io/client-go/tools/cache"
    40  	"k8s.io/client-go/tools/record"
    41  	"k8s.io/client-go/util/workqueue"
    42  	endpointsliceutil "k8s.io/endpointslice/util"
    43  	"k8s.io/klog/v2"
    44  	"k8s.io/kubernetes/pkg/controller"
    45  	"k8s.io/kubernetes/pkg/controller/endpointslicemirroring/metrics"
    46  	endpointslicepkg "k8s.io/kubernetes/pkg/controller/util/endpointslice"
    47  )
    48  
    49  const (
    50  	// maxRetries is the number of times an Endpoints resource will be retried
    51  	// before it is dropped out of the queue. Any sync error, such as a failure
    52  	// to create or update an EndpointSlice could trigger a retry. With the
    53  	// current rate-limiter in use (1s*2^(numRetries-1)) up to a max of 100s.
    54  	// The following numbers represent the sequence of delays between successive
    55  	// queuings of an Endpoints resource.
    56  	//
    57  	// 1s, 2s, 4s, 8s, 16s, 32s, 64s, 100s (max)
    58  	maxRetries = 15
    59  
    60  	// defaultSyncBackOff is the default backoff period for syncEndpoints calls.
    61  	defaultSyncBackOff = 1 * time.Second
    62  	// maxSyncBackOff is the max backoff period for syncEndpoints calls.
    63  	maxSyncBackOff = 100 * time.Second
    64  
    65  	// controllerName is a unique value used with LabelManagedBy to indicated
    66  	// the component managing an EndpointSlice.
    67  	controllerName = "endpointslicemirroring-controller.k8s.io"
    68  )
    69  
    70  // NewController creates and initializes a new Controller
    71  func NewController(ctx context.Context, endpointsInformer coreinformers.EndpointsInformer,
    72  	endpointSliceInformer discoveryinformers.EndpointSliceInformer,
    73  	serviceInformer coreinformers.ServiceInformer,
    74  	maxEndpointsPerSubset int32,
    75  	client clientset.Interface,
    76  	endpointUpdatesBatchPeriod time.Duration,
    77  ) *Controller {
    78  	logger := klog.FromContext(ctx)
    79  	broadcaster := record.NewBroadcaster(record.WithContext(ctx))
    80  	recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "endpoint-slice-mirroring-controller"})
    81  
    82  	metrics.RegisterMetrics()
    83  
    84  	c := &Controller{
    85  		client: client,
    86  		// This is similar to the DefaultControllerRateLimiter, just with a
    87  		// significantly higher default backoff (1s vs 5ms). This controller
    88  		// processes events that can require significant EndpointSlice changes.
    89  		// A more significant rate limit back off here helps ensure that the
    90  		// Controller does not overwhelm the API Server.
    91  		queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewMaxOfRateLimiter(
    92  			workqueue.NewItemExponentialFailureRateLimiter(defaultSyncBackOff, maxSyncBackOff),
    93  			// 10 qps, 100 bucket size. This is only for retry speed and its
    94  			// only the overall factor (not per item).
    95  			&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
    96  		), "endpoint_slice_mirroring"),
    97  		workerLoopPeriod: time.Second,
    98  	}
    99  
   100  	endpointsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   101  		AddFunc: func(obj interface{}) {
   102  			c.onEndpointsAdd(logger, obj)
   103  		},
   104  		UpdateFunc: func(oldObj, newObj interface{}) {
   105  			c.onEndpointsUpdate(logger, oldObj, newObj)
   106  		},
   107  		DeleteFunc: func(obj interface{}) {
   108  			c.onEndpointsDelete(logger, obj)
   109  		},
   110  	})
   111  	c.endpointsLister = endpointsInformer.Lister()
   112  	c.endpointsSynced = endpointsInformer.Informer().HasSynced
   113  
   114  	endpointSliceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   115  		AddFunc: c.onEndpointSliceAdd,
   116  		UpdateFunc: func(oldObj, newObj interface{}) {
   117  			c.onEndpointSliceUpdate(logger, oldObj, newObj)
   118  		},
   119  		DeleteFunc: c.onEndpointSliceDelete,
   120  	})
   121  
   122  	c.endpointSliceLister = endpointSliceInformer.Lister()
   123  	c.endpointSlicesSynced = endpointSliceInformer.Informer().HasSynced
   124  	c.endpointSliceTracker = endpointsliceutil.NewEndpointSliceTracker()
   125  
   126  	c.serviceLister = serviceInformer.Lister()
   127  	c.servicesSynced = serviceInformer.Informer().HasSynced
   128  	serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   129  		AddFunc:    c.onServiceAdd,
   130  		UpdateFunc: c.onServiceUpdate,
   131  		DeleteFunc: c.onServiceDelete,
   132  	})
   133  
   134  	c.maxEndpointsPerSubset = maxEndpointsPerSubset
   135  
   136  	c.reconciler = &reconciler{
   137  		client:                c.client,
   138  		maxEndpointsPerSubset: c.maxEndpointsPerSubset,
   139  		endpointSliceTracker:  c.endpointSliceTracker,
   140  		metricsCache:          metrics.NewCache(maxEndpointsPerSubset),
   141  		eventRecorder:         recorder,
   142  	}
   143  
   144  	c.eventBroadcaster = broadcaster
   145  	c.eventRecorder = recorder
   146  
   147  	c.endpointUpdatesBatchPeriod = endpointUpdatesBatchPeriod
   148  
   149  	return c
   150  }
   151  
   152  // Controller manages selector-based service endpoint slices
   153  type Controller struct {
   154  	client           clientset.Interface
   155  	eventBroadcaster record.EventBroadcaster
   156  	eventRecorder    record.EventRecorder
   157  
   158  	// endpointsLister is able to list/get endpoints and is populated by the
   159  	// shared informer passed to NewController.
   160  	endpointsLister corelisters.EndpointsLister
   161  	// endpointsSynced returns true if the endpoints shared informer has been
   162  	// synced at least once. Added as a member to the struct to allow injection
   163  	// for testing.
   164  	endpointsSynced cache.InformerSynced
   165  
   166  	// endpointSliceLister is able to list/get endpoint slices and is populated
   167  	// by the shared informer passed to NewController
   168  	endpointSliceLister discoverylisters.EndpointSliceLister
   169  	// endpointSlicesSynced returns true if the endpoint slice shared informer
   170  	// has been synced at least once. Added as a member to the struct to allow
   171  	// injection for testing.
   172  	endpointSlicesSynced cache.InformerSynced
   173  
   174  	// endpointSliceTracker tracks the list of EndpointSlices and associated
   175  	// resource versions expected for each Endpoints resource. It can help
   176  	// determine if a cached EndpointSlice is out of date.
   177  	endpointSliceTracker *endpointsliceutil.EndpointSliceTracker
   178  
   179  	// serviceLister is able to list/get services and is populated by the shared
   180  	// informer passed to NewController.
   181  	serviceLister corelisters.ServiceLister
   182  	// servicesSynced returns true if the services shared informer has been
   183  	// synced at least once. Added as a member to the struct to allow injection
   184  	// for testing.
   185  	servicesSynced cache.InformerSynced
   186  
   187  	// reconciler is an util used to reconcile EndpointSlice changes.
   188  	reconciler *reconciler
   189  
   190  	// Endpoints that need to be updated. A channel is inappropriate here,
   191  	// because it allows Endpoints with lots of addresses to be serviced much
   192  	// more often than Endpoints with few addresses; it also would cause an
   193  	// Endpoints resource that's inserted multiple times to be processed more
   194  	// than necessary.
   195  	queue workqueue.RateLimitingInterface
   196  
   197  	// maxEndpointsPerSubset references the maximum number of endpoints that
   198  	// should be added to an EndpointSlice for an EndpointSubset.
   199  	maxEndpointsPerSubset int32
   200  
   201  	// workerLoopPeriod is the time between worker runs. The workers process the
   202  	// queue of changes to Endpoints resources.
   203  	workerLoopPeriod time.Duration
   204  
   205  	// endpointUpdatesBatchPeriod is an artificial delay added to all Endpoints
   206  	// syncs triggered by EndpointSlice changes. This can be used to reduce
   207  	// overall number of all EndpointSlice updates.
   208  	endpointUpdatesBatchPeriod time.Duration
   209  }
   210  
   211  // Run will not return until stopCh is closed.
   212  func (c *Controller) Run(ctx context.Context, workers int) {
   213  	defer utilruntime.HandleCrash()
   214  
   215  	// Start events processing pipeline.
   216  	c.eventBroadcaster.StartLogging(klog.Infof)
   217  	c.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: c.client.CoreV1().Events("")})
   218  	defer c.eventBroadcaster.Shutdown()
   219  
   220  	defer c.queue.ShutDown()
   221  
   222  	logger := klog.FromContext(ctx)
   223  	logger.Info("Starting EndpointSliceMirroring controller")
   224  	defer logger.Info("Shutting down EndpointSliceMirroring controller")
   225  
   226  	if !cache.WaitForNamedCacheSync("endpoint_slice_mirroring", ctx.Done(), c.endpointsSynced, c.endpointSlicesSynced, c.servicesSynced) {
   227  		return
   228  	}
   229  
   230  	logger.V(2).Info("Starting worker threads", "total", workers)
   231  	for i := 0; i < workers; i++ {
   232  		go wait.Until(func() { c.worker(logger) }, c.workerLoopPeriod, ctx.Done())
   233  	}
   234  
   235  	<-ctx.Done()
   236  }
   237  
   238  // worker runs a worker thread that just dequeues items, processes them, and
   239  // marks them done. You may run as many of these in parallel as you wish; the
   240  // workqueue guarantees that they will not end up processing the same service
   241  // at the same time
   242  func (c *Controller) worker(logger klog.Logger) {
   243  	for c.processNextWorkItem(logger) {
   244  	}
   245  }
   246  
   247  func (c *Controller) processNextWorkItem(logger klog.Logger) bool {
   248  	cKey, quit := c.queue.Get()
   249  	if quit {
   250  		return false
   251  	}
   252  	defer c.queue.Done(cKey)
   253  
   254  	err := c.syncEndpoints(logger, cKey.(string))
   255  	c.handleErr(logger, err, cKey)
   256  
   257  	return true
   258  }
   259  
   260  func (c *Controller) handleErr(logger klog.Logger, err error, key interface{}) {
   261  	if err == nil {
   262  		c.queue.Forget(key)
   263  		return
   264  	}
   265  
   266  	if c.queue.NumRequeues(key) < maxRetries {
   267  		logger.Info("Error mirroring EndpointSlices for Endpoints, retrying", "key", key, "err", err)
   268  		c.queue.AddRateLimited(key)
   269  		return
   270  	}
   271  
   272  	logger.Info("Retry budget exceeded, dropping Endpoints out of the queue", "key", key, "err", err)
   273  	c.queue.Forget(key)
   274  	utilruntime.HandleError(err)
   275  }
   276  
   277  func (c *Controller) syncEndpoints(logger klog.Logger, key string) error {
   278  	startTime := time.Now()
   279  	defer func() {
   280  		syncDuration := float64(time.Since(startTime).Milliseconds()) / 1000
   281  		metrics.EndpointsSyncDuration.WithLabelValues().Observe(syncDuration)
   282  		logger.V(4).Info("Finished syncing EndpointSlices for Endpoints", "key", key, "elapsedTime", time.Since(startTime))
   283  	}()
   284  
   285  	logger.V(4).Info("syncEndpoints", "key", key)
   286  
   287  	namespace, name, err := cache.SplitMetaNamespaceKey(key)
   288  	if err != nil {
   289  		return err
   290  	}
   291  
   292  	endpoints, err := c.endpointsLister.Endpoints(namespace).Get(name)
   293  	if err != nil {
   294  		if apierrors.IsNotFound(err) {
   295  			logger.V(4).Info("Endpoints not found, cleaning up any mirrored EndpointSlices", "endpoints", klog.KRef(namespace, name))
   296  			c.endpointSliceTracker.DeleteService(namespace, name)
   297  			return c.deleteMirroredSlices(namespace, name)
   298  		}
   299  		return err
   300  	}
   301  
   302  	if !c.shouldMirror(endpoints) {
   303  		logger.V(4).Info("Endpoints should not be mirrored, cleaning up any mirrored EndpointSlices", "endpoints", klog.KRef(namespace, name))
   304  		c.endpointSliceTracker.DeleteService(namespace, name)
   305  		return c.deleteMirroredSlices(namespace, name)
   306  	}
   307  
   308  	svc, err := c.serviceLister.Services(namespace).Get(name)
   309  	if err != nil {
   310  		if apierrors.IsNotFound(err) {
   311  			logger.V(4).Info("Service not found, cleaning up any mirrored EndpointSlices", "service", klog.KRef(namespace, name))
   312  			c.endpointSliceTracker.DeleteService(namespace, name)
   313  			return c.deleteMirroredSlices(namespace, name)
   314  		}
   315  		return err
   316  	}
   317  
   318  	// If a selector is specified, clean up any mirrored slices.
   319  	if svc.Spec.Selector != nil {
   320  		logger.V(4).Info("Service now has selector, cleaning up any mirrored EndpointSlices", "service", klog.KRef(namespace, name))
   321  		c.endpointSliceTracker.DeleteService(namespace, name)
   322  		return c.deleteMirroredSlices(namespace, name)
   323  	}
   324  
   325  	endpointSlices, err := endpointSlicesMirroredForService(c.endpointSliceLister, namespace, name)
   326  	if err != nil {
   327  		return err
   328  	}
   329  
   330  	if c.endpointSliceTracker.StaleSlices(svc, endpointSlices) {
   331  		return endpointslicepkg.NewStaleInformerCache("EndpointSlice informer cache is out of date")
   332  	}
   333  
   334  	err = c.reconciler.reconcile(logger, endpoints, endpointSlices)
   335  	if err != nil {
   336  		return err
   337  	}
   338  
   339  	return nil
   340  }
   341  
   342  // queueEndpoints queues the Endpoints resource for processing.
   343  func (c *Controller) queueEndpoints(obj interface{}) {
   344  	key, err := controller.KeyFunc(obj)
   345  	if err != nil {
   346  		utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v (type %T): %v", obj, obj, err))
   347  		return
   348  	}
   349  
   350  	c.queue.Add(key)
   351  }
   352  
   353  // shouldMirror returns true if an Endpoints resource should be mirrored by this
   354  // controller. This will be false if:
   355  // - the Endpoints resource is nil.
   356  // - the Endpoints resource has a skip-mirror label.
   357  // - the Endpoints resource has a leader election annotation.
   358  // This does not ensure that a corresponding Service exists with a nil selector.
   359  // That check should be performed separately.
   360  func (c *Controller) shouldMirror(endpoints *v1.Endpoints) bool {
   361  	if endpoints == nil || skipMirror(endpoints.Labels) || hasLeaderElection(endpoints.Annotations) {
   362  		return false
   363  	}
   364  
   365  	return true
   366  }
   367  
   368  // onServiceAdd queues a sync for the relevant Endpoints resource.
   369  func (c *Controller) onServiceAdd(obj interface{}) {
   370  	service := obj.(*v1.Service)
   371  	if service == nil {
   372  		utilruntime.HandleError(fmt.Errorf("onServiceAdd() expected type v1.Service, got %T", obj))
   373  		return
   374  	}
   375  	if service.Spec.Selector == nil {
   376  		c.queueEndpoints(obj)
   377  	}
   378  }
   379  
   380  // onServiceUpdate queues a sync for the relevant Endpoints resource.
   381  func (c *Controller) onServiceUpdate(prevObj, obj interface{}) {
   382  	service := obj.(*v1.Service)
   383  	prevService := prevObj.(*v1.Service)
   384  	if service == nil || prevService == nil {
   385  		utilruntime.HandleError(fmt.Errorf("onServiceUpdate() expected type v1.Service, got %T, %T", prevObj, obj))
   386  		return
   387  	}
   388  	if (service.Spec.Selector == nil) != (prevService.Spec.Selector == nil) {
   389  		c.queueEndpoints(obj)
   390  	}
   391  }
   392  
   393  // onServiceDelete queues a sync for the relevant Endpoints resource.
   394  func (c *Controller) onServiceDelete(obj interface{}) {
   395  	service := getServiceFromDeleteAction(obj)
   396  	if service == nil {
   397  		utilruntime.HandleError(fmt.Errorf("onServiceDelete() expected type v1.Service, got %T", obj))
   398  		return
   399  	}
   400  	if service.Spec.Selector == nil {
   401  		c.queueEndpoints(obj)
   402  	}
   403  }
   404  
   405  // onEndpointsAdd queues a sync for the relevant Endpoints resource.
   406  func (c *Controller) onEndpointsAdd(logger klog.Logger, obj interface{}) {
   407  	endpoints := obj.(*v1.Endpoints)
   408  	if endpoints == nil {
   409  		utilruntime.HandleError(fmt.Errorf("onEndpointsAdd() expected type v1.Endpoints, got %T", obj))
   410  		return
   411  	}
   412  	if !c.shouldMirror(endpoints) {
   413  		logger.V(5).Info("Skipping mirroring", "endpoints", klog.KObj(endpoints))
   414  		return
   415  	}
   416  	c.queueEndpoints(obj)
   417  }
   418  
   419  // onEndpointsUpdate queues a sync for the relevant Endpoints resource.
   420  func (c *Controller) onEndpointsUpdate(logger klog.Logger, prevObj, obj interface{}) {
   421  	endpoints := obj.(*v1.Endpoints)
   422  	prevEndpoints := prevObj.(*v1.Endpoints)
   423  	if endpoints == nil || prevEndpoints == nil {
   424  		utilruntime.HandleError(fmt.Errorf("onEndpointsUpdate() expected type v1.Endpoints, got %T, %T", prevObj, obj))
   425  		return
   426  	}
   427  	if !c.shouldMirror(endpoints) && !c.shouldMirror(prevEndpoints) {
   428  		logger.V(5).Info("Skipping mirroring", "endpoints", klog.KObj(endpoints))
   429  		return
   430  	}
   431  	c.queueEndpoints(obj)
   432  }
   433  
   434  // onEndpointsDelete queues a sync for the relevant Endpoints resource.
   435  func (c *Controller) onEndpointsDelete(logger klog.Logger, obj interface{}) {
   436  	endpoints := getEndpointsFromDeleteAction(obj)
   437  	if endpoints == nil {
   438  		utilruntime.HandleError(fmt.Errorf("onEndpointsDelete() expected type v1.Endpoints, got %T", obj))
   439  		return
   440  	}
   441  	if !c.shouldMirror(endpoints) {
   442  		logger.V(5).Info("Skipping mirroring", "endpoints", klog.KObj(endpoints))
   443  		return
   444  	}
   445  	c.queueEndpoints(obj)
   446  }
   447  
   448  // onEndpointSliceAdd queues a sync for the relevant Endpoints resource for a
   449  // sync if the EndpointSlice resource version does not match the expected
   450  // version in the endpointSliceTracker.
   451  func (c *Controller) onEndpointSliceAdd(obj interface{}) {
   452  	endpointSlice := obj.(*discovery.EndpointSlice)
   453  	if endpointSlice == nil {
   454  		utilruntime.HandleError(fmt.Errorf("onEndpointSliceAdd() expected type discovery.EndpointSlice, got %T", obj))
   455  		return
   456  	}
   457  	if managedByController(endpointSlice) && c.endpointSliceTracker.ShouldSync(endpointSlice) {
   458  		c.queueEndpointsForEndpointSlice(endpointSlice)
   459  	}
   460  }
   461  
   462  // onEndpointSliceUpdate queues a sync for the relevant Endpoints resource for a
   463  // sync if the EndpointSlice resource version does not match the expected
   464  // version in the endpointSliceTracker or the managed-by value of the
   465  // EndpointSlice has changed from or to this controller.
   466  func (c *Controller) onEndpointSliceUpdate(logger klog.Logger, prevObj, obj interface{}) {
   467  	prevEndpointSlice := obj.(*discovery.EndpointSlice)
   468  	endpointSlice := prevObj.(*discovery.EndpointSlice)
   469  	if endpointSlice == nil || prevEndpointSlice == nil {
   470  		utilruntime.HandleError(fmt.Errorf("onEndpointSliceUpdated() expected type discovery.EndpointSlice, got %T, %T", prevObj, obj))
   471  		return
   472  	}
   473  	// EndpointSlice generation does not change when labels change. Although the
   474  	// controller will never change LabelServiceName, users might. This check
   475  	// ensures that we handle changes to this label.
   476  	svcName := endpointSlice.Labels[discovery.LabelServiceName]
   477  	prevSvcName := prevEndpointSlice.Labels[discovery.LabelServiceName]
   478  	if svcName != prevSvcName {
   479  		logger.Info("LabelServiceName changed", "labelServiceName", discovery.LabelServiceName, "oldName", prevSvcName, "newName", svcName, "endpointSlice", klog.KObj(endpointSlice))
   480  		c.queueEndpointsForEndpointSlice(endpointSlice)
   481  		c.queueEndpointsForEndpointSlice(prevEndpointSlice)
   482  		return
   483  	}
   484  	if managedByChanged(prevEndpointSlice, endpointSlice) || (managedByController(endpointSlice) && c.endpointSliceTracker.ShouldSync(endpointSlice)) {
   485  		c.queueEndpointsForEndpointSlice(endpointSlice)
   486  	}
   487  }
   488  
   489  // onEndpointSliceDelete queues a sync for the relevant Endpoints resource for a
   490  // sync if the EndpointSlice resource version does not match the expected
   491  // version in the endpointSliceTracker.
   492  func (c *Controller) onEndpointSliceDelete(obj interface{}) {
   493  	endpointSlice := getEndpointSliceFromDeleteAction(obj)
   494  	if endpointSlice == nil {
   495  		utilruntime.HandleError(fmt.Errorf("onEndpointSliceDelete() expected type discovery.EndpointSlice, got %T", obj))
   496  		return
   497  	}
   498  	if managedByController(endpointSlice) && c.endpointSliceTracker.Has(endpointSlice) {
   499  		// This returns false if we didn't expect the EndpointSlice to be
   500  		// deleted. If that is the case, we queue the Service for another sync.
   501  		if !c.endpointSliceTracker.HandleDeletion(endpointSlice) {
   502  			c.queueEndpointsForEndpointSlice(endpointSlice)
   503  		}
   504  	}
   505  }
   506  
   507  // queueEndpointsForEndpointSlice attempts to queue the corresponding Endpoints
   508  // resource for the provided EndpointSlice.
   509  func (c *Controller) queueEndpointsForEndpointSlice(endpointSlice *discovery.EndpointSlice) {
   510  	key, err := endpointsControllerKey(endpointSlice)
   511  	if err != nil {
   512  		utilruntime.HandleError(fmt.Errorf("Couldn't get key for EndpointSlice %+v (type %T): %v", endpointSlice, endpointSlice, err))
   513  		return
   514  	}
   515  
   516  	c.queue.AddAfter(key, c.endpointUpdatesBatchPeriod)
   517  }
   518  
   519  // deleteMirroredSlices will delete and EndpointSlices that have been mirrored
   520  // for Endpoints with this namespace and name.
   521  func (c *Controller) deleteMirroredSlices(namespace, name string) error {
   522  	endpointSlices, err := endpointSlicesMirroredForService(c.endpointSliceLister, namespace, name)
   523  	if err != nil {
   524  		return err
   525  	}
   526  
   527  	c.endpointSliceTracker.DeleteService(namespace, name)
   528  	return c.reconciler.deleteEndpoints(namespace, name, endpointSlices)
   529  }
   530  
   531  // endpointSlicesMirroredForService returns the EndpointSlices that have been
   532  // mirrored for a Service by this controller.
   533  func endpointSlicesMirroredForService(endpointSliceLister discoverylisters.EndpointSliceLister, namespace, name string) ([]*discovery.EndpointSlice, error) {
   534  	esLabelSelector := labels.Set(map[string]string{
   535  		discovery.LabelServiceName: name,
   536  		discovery.LabelManagedBy:   controllerName,
   537  	}).AsSelectorPreValidated()
   538  	return endpointSliceLister.EndpointSlices(namespace).List(esLabelSelector)
   539  }
   540  

View as plain text