     1  /*
     2  Copyright 2014 The Kubernetes Authors.
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     8      http://www.apache.org/licenses/LICENSE-2.0
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    17  package endpoint
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"math"
    23  	"time"
    25  	v1 "k8s.io/api/core/v1"
    26  	apiequality "k8s.io/apimachinery/pkg/api/equality"
    27  	"k8s.io/apimachinery/pkg/api/errors"
    28  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    29  	"k8s.io/apimachinery/pkg/conversion"
    30  	"k8s.io/apimachinery/pkg/labels"
    31  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    32  	"k8s.io/apimachinery/pkg/util/wait"
    33  	coreinformers "k8s.io/client-go/informers/core/v1"
    34  	clientset "k8s.io/client-go/kubernetes"
    35  	"k8s.io/client-go/kubernetes/scheme"
    36  	v1core "k8s.io/client-go/kubernetes/typed/core/v1"
    37  	corelisters "k8s.io/client-go/listers/core/v1"
    38  	"k8s.io/client-go/tools/cache"
    39  	"k8s.io/client-go/tools/leaderelection/resourcelock"
    40  	"k8s.io/client-go/tools/record"
    41  	"k8s.io/client-go/util/workqueue"
    42  	endpointsliceutil "k8s.io/endpointslice/util"
    43  	"k8s.io/klog/v2"
    44  	"k8s.io/kubernetes/pkg/api/v1/endpoints"
    45  	podutil "k8s.io/kubernetes/pkg/api/v1/pod"
    46  	api "k8s.io/kubernetes/pkg/apis/core"
    47  	helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
    48  	"k8s.io/kubernetes/pkg/controller"
    49  	utillabels "k8s.io/kubernetes/pkg/util/labels"
    50  	utilnet "k8s.io/utils/net"
    51  )
    53  const (
    54  	// maxRetries is the number of times a service will be retried before it is dropped out of the queue.
    55  	// With the current rate-limiter in use (5ms*2^(maxRetries-1)) the following numbers represent the
    56  	// sequence of delays between successive queuings of a service.
    57  	//
    58  	// 5ms, 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.3s, 2.6s, 5.1s, 10.2s, 20.4s, 41s, 82s
    59  	maxRetries = 15
    61  	// maxCapacity represents the maximum number of addresses that should be
    62  	// stored in an Endpoints resource. In a future release, this controller
    63  	// may truncate endpoints exceeding this length.
    64  	maxCapacity = 1000
    66  	// truncated is a possible value for `endpoints.kubernetes.io/over-capacity` annotation on an
    67  	// endpoint resource and indicates that the number of endpoints have been truncated to
    68  	// maxCapacity
    69  	truncated = "truncated"
    70  )
    72  // NewEndpointController returns a new *Controller.
    73  func NewEndpointController(ctx context.Context, podInformer coreinformers.PodInformer, serviceInformer coreinformers.ServiceInformer,
    74  	endpointsInformer coreinformers.EndpointsInformer, client clientset.Interface, endpointUpdatesBatchPeriod time.Duration) *Controller {
    75  	broadcaster := record.NewBroadcaster(record.WithContext(ctx))
    76  	recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "endpoint-controller"})
    78  	e := &Controller{
    79  		client:           client,
    80  		queue:            workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "endpoint"),
    81  		workerLoopPeriod: time.Second,
    82  	}
    84  	serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    85  		AddFunc: e.onServiceUpdate,
    86  		UpdateFunc: func(old, cur interface{}) {
    87  			e.onServiceUpdate(cur)
    88  		},
    89  		DeleteFunc: e.onServiceDelete,
    90  	})
    91  	e.serviceLister = serviceInformer.Lister()
    92  	e.servicesSynced = serviceInformer.Informer().HasSynced
    94  	podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    95  		AddFunc:    e.addPod,
    96  		UpdateFunc: e.updatePod,
    97  		DeleteFunc: e.deletePod,
    98  	})
    99  	e.podLister = podInformer.Lister()
   100  	e.podsSynced = podInformer.Informer().HasSynced
   102  	endpointsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   103  		DeleteFunc: e.onEndpointsDelete,
   104  	})
   105  	e.endpointsLister = endpointsInformer.Lister()
   106  	e.endpointsSynced = endpointsInformer.Informer().HasSynced
   108  	e.triggerTimeTracker = endpointsliceutil.NewTriggerTimeTracker()
   109  	e.eventBroadcaster = broadcaster
   110  	e.eventRecorder = recorder
   112  	e.endpointUpdatesBatchPeriod = endpointUpdatesBatchPeriod
   114  	return e
   115  }
   117  // Controller manages selector-based service endpoints.
   118  type Controller struct {
   119  	client           clientset.Interface
   120  	eventBroadcaster record.EventBroadcaster
   121  	eventRecorder    record.EventRecorder
   123  	// serviceLister is able to list/get services and is populated by the shared informer passed to
   124  	// NewEndpointController.
   125  	serviceLister corelisters.ServiceLister
   126  	// servicesSynced returns true if the service shared informer has been synced at least once.
   127  	// Added as a member to the struct to allow injection for testing.
   128  	servicesSynced cache.InformerSynced
   130  	// podLister is able to list/get pods and is populated by the shared informer passed to
   131  	// NewEndpointController.
   132  	podLister corelisters.PodLister
   133  	// podsSynced returns true if the pod shared informer has been synced at least once.
   134  	// Added as a member to the struct to allow injection for testing.
   135  	podsSynced cache.InformerSynced
   137  	// endpointsLister is able to list/get endpoints and is populated by the shared informer passed to
   138  	// NewEndpointController.
   139  	endpointsLister corelisters.EndpointsLister
   140  	// endpointsSynced returns true if the endpoints shared informer has been synced at least once.
   141  	// Added as a member to the struct to allow injection for testing.
   142  	endpointsSynced cache.InformerSynced
   144  	// Services that need to be updated. A channel is inappropriate here,
   145  	// because it allows services with lots of pods to be serviced much
   146  	// more often than services with few pods; it also would cause a
   147  	// service that's inserted multiple times to be processed more than
   148  	// necessary.
   149  	queue workqueue.RateLimitingInterface
   151  	// workerLoopPeriod is the time between worker runs. The workers process the queue of service and pod changes.
   152  	workerLoopPeriod time.Duration
   154  	// triggerTimeTracker is an util used to compute and export the EndpointsLastChangeTriggerTime
   155  	// annotation.
   156  	triggerTimeTracker *endpointsliceutil.TriggerTimeTracker
   158  	endpointUpdatesBatchPeriod time.Duration
   159  }
   161  // Run will not return until stopCh is closed. workers determines how many
   162  // endpoints will be handled in parallel.
   163  func (e *Controller) Run(ctx context.Context, workers int) {
   164  	defer utilruntime.HandleCrash()
   166  	// Start events processing pipeline.
   167  	e.eventBroadcaster.StartStructuredLogging(3)
   168  	e.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: e.client.CoreV1().Events("")})
   169  	defer e.eventBroadcaster.Shutdown()
   171  	defer e.queue.ShutDown()
   173  	logger := klog.FromContext(ctx)
   174  	logger.Info("Starting endpoint controller")
   175  	defer logger.Info("Shutting down endpoint controller")
   177  	if !cache.WaitForNamedCacheSync("endpoint", ctx.Done(), e.podsSynced, e.servicesSynced, e.endpointsSynced) {
   178  		return
   179  	}
   181  	for i := 0; i < workers; i++ {
   182  		go wait.UntilWithContext(ctx, e.worker, e.workerLoopPeriod)
   183  	}
   185  	go func() {
   186  		defer utilruntime.HandleCrash()
   187  		e.checkLeftoverEndpoints()
   188  	}()
   190  	<-ctx.Done()
   191  }
   193  // When a pod is added, figure out what services it will be a member of and
   194  // enqueue them. obj must have *v1.Pod type.
   195  func (e *Controller) addPod(obj interface{}) {
   196  	pod := obj.(*v1.Pod)
   197  	services, err := endpointsliceutil.GetPodServiceMemberships(e.serviceLister, pod)
   198  	if err != nil {
   199  		utilruntime.HandleError(fmt.Errorf("Unable to get pod %s/%s's service memberships: %v", pod.Namespace, pod.Name, err))
   200  		return
   201  	}
   202  	for key := range services {
   203  		e.queue.AddAfter(key, e.endpointUpdatesBatchPeriod)
   204  	}
   205  }
   207  func podToEndpointAddressForService(svc *v1.Service, pod *v1.Pod) (*v1.EndpointAddress, error) {
   208  	var endpointIP string
   209  	ipFamily := v1.IPv4Protocol
   211  	if len(svc.Spec.IPFamilies) > 0 {
   212  		// controller is connected to an api-server that correctly sets IPFamilies
   213  		ipFamily = svc.Spec.IPFamilies[0] // this works for headful and headless
   214  	} else {
   215  		// controller is connected to an api server that does not correctly
   216  		// set IPFamilies (e.g. old api-server during an upgrade)
   217  		// TODO (khenidak): remove by when the possibility of upgrading
   218  		// from a cluster that does not support dual stack is nil
   219  		if len(svc.Spec.ClusterIP) > 0 && svc.Spec.ClusterIP != v1.ClusterIPNone {
   220  			// headful service. detect via service clusterIP
   221  			if utilnet.IsIPv6String(svc.Spec.ClusterIP) {
   222  				ipFamily = v1.IPv6Protocol
   223  			}
   224  		} else {
   225  			// Since this is a headless service we use podIP to identify the family.
   226  			// This assumes that status.PodIP is assigned correctly (follows pod cidr and
   227  			// pod cidr list order is same as service cidr list order). The expectation is
   228  			// this is *most probably* the case.
   230  			// if the family was incorrectly identified then this will be corrected once the
   231  			// upgrade is completed (controller connects to api-server that correctly defaults services)
   232  			if utilnet.IsIPv6String(pod.Status.PodIP) {
   233  				ipFamily = v1.IPv6Protocol
   234  			}
   235  		}
   236  	}
   238  	// find an ip that matches the family
   239  	for _, podIP := range pod.Status.PodIPs {
   240  		if (ipFamily == v1.IPv6Protocol) == utilnet.IsIPv6String(podIP.IP) {
   241  			endpointIP = podIP.IP
   242  			break
   243  		}
   244  	}
   246  	if endpointIP == "" {
   247  		return nil, fmt.Errorf("failed to find a matching endpoint for service %v", svc.Name)
   248  	}
   250  	return &v1.EndpointAddress{
   251  		IP:       endpointIP,
   252  		NodeName: &pod.Spec.NodeName,
   253  		TargetRef: &v1.ObjectReference{
   254  			Kind:      "Pod",
   255  			Namespace: pod.ObjectMeta.Namespace,
   256  			Name:      pod.ObjectMeta.Name,
   257  			UID:       pod.ObjectMeta.UID,
   258  		},
   259  	}, nil
   260  }
   262  // When a pod is updated, figure out what services it used to be a member of
   263  // and what services it will be a member of, and enqueue the union of these.
   264  // old and cur must be *v1.Pod types.
   265  func (e *Controller) updatePod(old, cur interface{}) {
   266  	services := endpointsliceutil.GetServicesToUpdateOnPodChange(e.serviceLister, old, cur)
   267  	for key := range services {
   268  		e.queue.AddAfter(key, e.endpointUpdatesBatchPeriod)
   269  	}
   270  }
   272  // When a pod is deleted, enqueue the services the pod used to be a member of.
   273  // obj could be an *v1.Pod, or a DeletionFinalStateUnknown marker item.
   274  func (e *Controller) deletePod(obj interface{}) {
   275  	pod := endpointsliceutil.GetPodFromDeleteAction(obj)
   276  	if pod != nil {
   277  		e.addPod(pod)
   278  	}
   279  }
   281  // onServiceUpdate updates the Service Selector in the cache and queues the Service for processing.
   282  func (e *Controller) onServiceUpdate(obj interface{}) {
   283  	key, err := controller.KeyFunc(obj)
   284  	if err != nil {
   285  		utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
   286  		return
   287  	}
   288  	e.queue.Add(key)
   289  }
   291  // onServiceDelete removes the Service Selector from the cache and queues the Service for processing.
   292  func (e *Controller) onServiceDelete(obj interface{}) {
   293  	key, err := controller.KeyFunc(obj)
   294  	if err != nil {
   295  		utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
   296  		return
   297  	}
   298  	e.queue.Add(key)
   299  }
   301  func (e *Controller) onEndpointsDelete(obj interface{}) {
   302  	key, err := controller.KeyFunc(obj)
   303  	if err != nil {
   304  		utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
   305  		return
   306  	}
   307  	e.queue.Add(key)
   308  }
   310  // worker runs a worker thread that just dequeues items, processes them, and
   311  // marks them done. You may run as many of these in parallel as you wish; the
   312  // workqueue guarantees that they will not end up processing the same service
   313  // at the same time.
   314  func (e *Controller) worker(ctx context.Context) {
   315  	for e.processNextWorkItem(ctx) {
   316  	}
   317  }
   319  func (e *Controller) processNextWorkItem(ctx context.Context) bool {
   320  	eKey, quit := e.queue.Get()
   321  	if quit {
   322  		return false
   323  	}
   324  	defer e.queue.Done(eKey)
   326  	logger := klog.FromContext(ctx)
   327  	err := e.syncService(ctx, eKey.(string))
   328  	e.handleErr(logger, err, eKey)
   330  	return true
   331  }
   333  func (e *Controller) handleErr(logger klog.Logger, err error, key interface{}) {
   334  	if err == nil {
   335  		e.queue.Forget(key)
   336  		return
   337  	}
   339  	ns, name, keyErr := cache.SplitMetaNamespaceKey(key.(string))
   340  	if keyErr != nil {
   341  		logger.Error(err, "Failed to split meta namespace cache key", "key", key)
   342  	}
   344  	if e.queue.NumRequeues(key) < maxRetries {
   345  		logger.V(2).Info("Error syncing endpoints, retrying", "service", klog.KRef(ns, name), "err", err)
   346  		e.queue.AddRateLimited(key)
   347  		return
   348  	}
   350  	logger.Info("Dropping service out of the queue", "service", klog.KRef(ns, name), "err", err)
   351  	e.queue.Forget(key)
   352  	utilruntime.HandleError(err)
   353  }
   355  func (e *Controller) syncService(ctx context.Context, key string) error {
   356  	startTime := time.Now()
   357  	logger := klog.FromContext(ctx)
   358  	namespace, name, err := cache.SplitMetaNamespaceKey(key)
   359  	if err != nil {
   360  		return err
   361  	}
   362  	defer func() {
   363  		logger.V(4).Info("Finished syncing service endpoints", "service", klog.KRef(namespace, name), "startTime", time.Since(startTime))
   364  	}()
   366  	service, err := e.serviceLister.Services(namespace).Get(name)
   367  	if err != nil {
   368  		if !errors.IsNotFound(err) {
   369  			return err
   370  		}
   372  		// Delete the corresponding endpoint, as the service has been deleted.
   373  		// TODO: Please note that this will delete an endpoint when a
   374  		// service is deleted. However, if we're down at the time when
   375  		// the service is deleted, we will miss that deletion, so this
   376  		// doesn't completely solve the problem. See #6877.
   377  		err = e.client.CoreV1().Endpoints(namespace).Delete(ctx, name, metav1.DeleteOptions{})
   378  		if err != nil && !errors.IsNotFound(err) {
   379  			return err
   380  		}
   381  		e.triggerTimeTracker.DeleteService(namespace, name)
   382  		return nil
   383  	}
   385  	if service.Spec.Type == v1.ServiceTypeExternalName {
   386  		// services with Type ExternalName receive no endpoints from this controller;
   387  		// Ref: https://issues.k8s.io/105986
   388  		return nil
   389  	}
   391  	if service.Spec.Selector == nil {
   392  		// services without a selector receive no endpoints from this controller;
   393  		// these services will receive the endpoints that are created out-of-band via the REST API.
   394  		return nil
   395  	}
   397  	logger.V(5).Info("About to update endpoints for service", "service", klog.KRef(namespace, name))
   398  	pods, err := e.podLister.Pods(service.Namespace).List(labels.Set(service.Spec.Selector).AsSelectorPreValidated())
   399  	if err != nil {
   400  		// Since we're getting stuff from a local cache, it is
   401  		// basically impossible to get this error.
   402  		return err
   403  	}
   405  	// We call ComputeEndpointLastChangeTriggerTime here to make sure that the
   406  	// state of the trigger time tracker gets updated even if the sync turns out
   407  	// to be no-op and we don't update the endpoints object.
   408  	endpointsLastChangeTriggerTime := e.triggerTimeTracker.
   409  		ComputeEndpointLastChangeTriggerTime(namespace, service, pods)
   411  	subsets := []v1.EndpointSubset{}
   412  	var totalReadyEps int
   413  	var totalNotReadyEps int
   415  	for _, pod := range pods {
   416  		if !endpointsliceutil.ShouldPodBeInEndpoints(pod, service.Spec.PublishNotReadyAddresses) {
   417  			logger.V(5).Info("Pod is not included on endpoints for Service", "pod", klog.KObj(pod), "service", klog.KObj(service))
   418  			continue
   419  		}
   421  		ep, err := podToEndpointAddressForService(service, pod)
   422  		if err != nil {
   423  			// this will happen, if the cluster runs with some nodes configured as dual stack and some as not
   424  			// such as the case of an upgrade..
   425  			logger.V(2).Info("Failed to find endpoint for service with ClusterIP on pod with error", "service", klog.KObj(service), "clusterIP", service.Spec.ClusterIP, "pod", klog.KObj(pod), "error", err)
   426  			continue
   427  		}
   429  		epa := *ep
   430  		if endpointsliceutil.ShouldSetHostname(pod, service) {
   431  			epa.Hostname = pod.Spec.Hostname
   432  		}
   434  		// Allow headless service not to have ports.
   435  		if len(service.Spec.Ports) == 0 {
   436  			if service.Spec.ClusterIP == api.ClusterIPNone {
   437  				subsets, totalReadyEps, totalNotReadyEps = addEndpointSubset(logger, subsets, pod, epa, nil, service.Spec.PublishNotReadyAddresses)
   438  				// No need to repack subsets for headless service without ports.
   439  			}
   440  		} else {
   441  			for i := range service.Spec.Ports {
   442  				servicePort := &service.Spec.Ports[i]
   443  				portNum, err := podutil.FindPort(pod, servicePort)
   444  				if err != nil {
   445  					logger.V(4).Info("Failed to find port for service", "service", klog.KObj(service), "error", err)
   446  					continue
   447  				}
   448  				epp := endpointPortFromServicePort(servicePort, portNum)
   450  				var readyEps, notReadyEps int
   451  				subsets, readyEps, notReadyEps = addEndpointSubset(logger, subsets, pod, epa, epp, service.Spec.PublishNotReadyAddresses)
   452  				totalReadyEps = totalReadyEps + readyEps
   453  				totalNotReadyEps = totalNotReadyEps + notReadyEps
   454  			}
   455  		}
   456  	}
   457  	subsets = endpoints.RepackSubsets(subsets)
   459  	// See if there's actually an update here.
   460  	currentEndpoints, err := e.endpointsLister.Endpoints(service.Namespace).Get(service.Name)
   461  	if err != nil {
   462  		if !errors.IsNotFound(err) {
   463  			return err
   464  		}
   465  		currentEndpoints = &v1.Endpoints{
   466  			ObjectMeta: metav1.ObjectMeta{
   467  				Name:   service.Name,
   468  				Labels: service.Labels,
   469  			},
   470  		}
   471  	}
   473  	createEndpoints := len(currentEndpoints.ResourceVersion) == 0
   475  	// Compare the sorted subsets and labels
   476  	// Remove the HeadlessService label from the endpoints if it exists,
   477  	// as this won't be set on the service itself
   478  	// and will cause a false negative in this diff check.
   479  	// But first check if it has that label to avoid expensive copies.
   480  	compareLabels := currentEndpoints.Labels
   481  	if _, ok := currentEndpoints.Labels[v1.IsHeadlessService]; ok {
   482  		compareLabels = utillabels.CloneAndRemoveLabel(currentEndpoints.Labels, v1.IsHeadlessService)
   483  	}
   484  	// When comparing the subsets, we ignore the difference in ResourceVersion of Pod to avoid unnecessary Endpoints
   485  	// updates caused by Pod updates that we don't care, e.g. annotation update.
   486  	if !createEndpoints &&
   487  		endpointSubsetsEqualIgnoreResourceVersion(currentEndpoints.Subsets, subsets) &&
   488  		apiequality.Semantic.DeepEqual(compareLabels, service.Labels) &&
   489  		capacityAnnotationSetCorrectly(currentEndpoints.Annotations, currentEndpoints.Subsets) {
   490  		logger.V(5).Info("endpoints are equal, skipping update", "service", klog.KObj(service))
   491  		return nil
   492  	}
   493  	newEndpoints := currentEndpoints.DeepCopy()
   494  	newEndpoints.Subsets = subsets
   495  	newEndpoints.Labels = service.Labels
   496  	if newEndpoints.Annotations == nil {
   497  		newEndpoints.Annotations = make(map[string]string)
   498  	}
   500  	if !endpointsLastChangeTriggerTime.IsZero() {
   501  		newEndpoints.Annotations[v1.EndpointsLastChangeTriggerTime] =
   502  			endpointsLastChangeTriggerTime.UTC().Format(time.RFC3339Nano)
   503  	} else { // No new trigger time, clear the annotation.
   504  		delete(newEndpoints.Annotations, v1.EndpointsLastChangeTriggerTime)
   505  	}
   507  	if truncateEndpoints(newEndpoints) {
   508  		newEndpoints.Annotations[v1.EndpointsOverCapacity] = truncated
   509  	} else {
   510  		delete(newEndpoints.Annotations, v1.EndpointsOverCapacity)
   511  	}
   513  	if newEndpoints.Labels == nil {
   514  		newEndpoints.Labels = make(map[string]string)
   515  	}
   517  	if !helper.IsServiceIPSet(service) {
   518  		newEndpoints.Labels = utillabels.CloneAndAddLabel(newEndpoints.Labels, v1.IsHeadlessService, "")
   519  	} else {
   520  		newEndpoints.Labels = utillabels.CloneAndRemoveLabel(newEndpoints.Labels, v1.IsHeadlessService)
   521  	}
   523  	logger.V(4).Info("Update endpoints", "service", klog.KObj(service), "readyEndpoints", totalReadyEps, "notreadyEndpoints", totalNotReadyEps)
   524  	if createEndpoints {
   525  		// No previous endpoints, create them
   526  		_, err = e.client.CoreV1().Endpoints(service.Namespace).Create(ctx, newEndpoints, metav1.CreateOptions{})
   527  	} else {
   528  		// Pre-existing
   529  		_, err = e.client.CoreV1().Endpoints(service.Namespace).Update(ctx, newEndpoints, metav1.UpdateOptions{})
   530  	}
   531  	if err != nil {
   532  		if createEndpoints && errors.IsForbidden(err) {
   533  			// A request is forbidden primarily for two reasons:
   534  			// 1. namespace is terminating, endpoint creation is not allowed by default.
   535  			// 2. policy is misconfigured, in which case no service would function anywhere.
   536  			// Given the frequency of 1, we log at a lower level.
   537  			logger.V(5).Info("Forbidden from creating endpoints", "error", err)
   539  			// If the namespace is terminating, creates will continue to fail. Simply drop the item.
   540  			if errors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
   541  				return nil
   542  			}
   543  		}
   545  		if createEndpoints {
   546  			e.eventRecorder.Eventf(newEndpoints, v1.EventTypeWarning, "FailedToCreateEndpoint", "Failed to create endpoint for service %v/%v: %v", service.Namespace, service.Name, err)
   547  		} else {
   548  			e.eventRecorder.Eventf(newEndpoints, v1.EventTypeWarning, "FailedToUpdateEndpoint", "Failed to update endpoint %v/%v: %v", service.Namespace, service.Name, err)
   549  		}
   551  		return err
   552  	}
   553  	return nil
   554  }
   556  // checkLeftoverEndpoints lists all currently existing endpoints and adds their
   557  // service to the queue. This will detect endpoints that exist with no
   558  // corresponding service; these endpoints need to be deleted. We only need to
   559  // do this once on startup, because in steady-state these are detected (but
   560  // some stragglers could have been left behind if the endpoint controller
   561  // reboots).
   562  func (e *Controller) checkLeftoverEndpoints() {
   563  	list, err := e.endpointsLister.List(labels.Everything())
   564  	if err != nil {
   565  		utilruntime.HandleError(fmt.Errorf("Unable to list endpoints (%v); orphaned endpoints will not be cleaned up. (They're pretty harmless, but you can restart this component if you want another attempt made.)", err))
   566  		return
   567  	}
   568  	for _, ep := range list {
   569  		if _, ok := ep.Annotations[resourcelock.LeaderElectionRecordAnnotationKey]; ok {
   570  			// when there are multiple controller-manager instances,
   571  			// we observe that it will delete leader-election endpoints after 5min
   572  			// and cause re-election
   573  			// so skip the delete here
   574  			// as leader-election only have endpoints without service
   575  			continue
   576  		}
   577  		key, err := controller.KeyFunc(ep)
   578  		if err != nil {
   579  			utilruntime.HandleError(fmt.Errorf("Unable to get key for endpoint %#v", ep))
   580  			continue
   581  		}
   582  		e.queue.Add(key)
   583  	}
   584  }
   586  // addEndpointSubset add the endpoints addresses and ports to the EndpointSubset.
   587  // The addresses are added to the corresponding field, ready or not ready, depending
   588  // on the pod status and the Service PublishNotReadyAddresses field value.
   589  // The pod passed to this function must have already been filtered through ShouldPodBeInEndpoints.
   590  func addEndpointSubset(logger klog.Logger, subsets []v1.EndpointSubset, pod *v1.Pod, epa v1.EndpointAddress,
   591  	epp *v1.EndpointPort, tolerateUnreadyEndpoints bool) ([]v1.EndpointSubset, int, int) {
   592  	var readyEps int
   593  	var notReadyEps int
   594  	ports := []v1.EndpointPort{}
   595  	if epp != nil {
   596  		ports = append(ports, *epp)
   597  	}
   598  	if tolerateUnreadyEndpoints || podutil.IsPodReady(pod) {
   599  		subsets = append(subsets, v1.EndpointSubset{
   600  			Addresses: []v1.EndpointAddress{epa},
   601  			Ports:     ports,
   602  		})
   603  		readyEps++
   604  	} else { // if it is not a ready address it has to be not ready
   605  		logger.V(5).Info("Pod is out of service", "pod", klog.KObj(pod))
   606  		subsets = append(subsets, v1.EndpointSubset{
   607  			NotReadyAddresses: []v1.EndpointAddress{epa},
   608  			Ports:             ports,
   609  		})
   610  		notReadyEps++
   611  	}
   612  	return subsets, readyEps, notReadyEps
   613  }
   615  func endpointPortFromServicePort(servicePort *v1.ServicePort, portNum int) *v1.EndpointPort {
   616  	return &v1.EndpointPort{
   617  		Name:        servicePort.Name,
   618  		Port:        int32(portNum),
   619  		Protocol:    servicePort.Protocol,
   620  		AppProtocol: servicePort.AppProtocol,
   621  	}
   622  }
   624  // capacityAnnotationSetCorrectly returns false if number of endpoints is greater than maxCapacity or
   625  // returns true if underCapacity and the annotation is not set.
   626  func capacityAnnotationSetCorrectly(annotations map[string]string, subsets []v1.EndpointSubset) bool {
   627  	numEndpoints := 0
   628  	for _, subset := range subsets {
   629  		numEndpoints += len(subset.Addresses) + len(subset.NotReadyAddresses)
   630  	}
   631  	if numEndpoints > maxCapacity {
   632  		// If subsets are over capacity, they must be truncated so consider
   633  		// the annotation as not set correctly
   634  		return false
   635  	}
   636  	_, ok := annotations[v1.EndpointsOverCapacity]
   637  	return !ok
   638  }
   640  // truncateEndpoints by best effort will distribute the endpoints over the subsets based on the proportion
   641  // of endpoints per subset and will prioritize Ready Endpoints over NotReady Endpoints.
   642  func truncateEndpoints(endpoints *v1.Endpoints) bool {
   643  	totalReady := 0
   644  	totalNotReady := 0
   645  	for _, subset := range endpoints.Subsets {
   646  		totalReady += len(subset.Addresses)
   647  		totalNotReady += len(subset.NotReadyAddresses)
   648  	}
   650  	if totalReady+totalNotReady <= maxCapacity {
   651  		return false
   652  	}
   654  	truncateReady := false
   655  	max := maxCapacity - totalReady
   656  	numTotal := totalNotReady
   657  	if totalReady > maxCapacity {
   658  		truncateReady = true
   659  		max = maxCapacity
   660  		numTotal = totalReady
   661  	}
   662  	canBeAdded := max
   664  	for i := range endpoints.Subsets {
   665  		subset := endpoints.Subsets[i]
   666  		numInSubset := len(subset.Addresses)
   667  		if !truncateReady {
   668  			numInSubset = len(subset.NotReadyAddresses)
   669  		}
   671  		// The number of endpoints per subset will be based on the propotion of endpoints
   672  		// in this subset versus the total number of endpoints. The proportion of endpoints
   673  		// will be rounded up which most likely will lead to the last subset having less
   674  		// endpoints than the expected proportion.
   675  		toBeAdded := int(math.Ceil((float64(numInSubset) / float64(numTotal)) * float64(max)))
   676  		// If there is not enough endpoints for the last subset, ensure only the number up
   677  		// to the capacity are added
   678  		if toBeAdded > canBeAdded {
   679  			toBeAdded = canBeAdded
   680  		}
   682  		if truncateReady {
   683  			// Truncate ready Addresses to allocated proportion and truncate all not ready
   684  			// addresses
   685  			subset.Addresses = addressSubset(subset.Addresses, toBeAdded)
   686  			subset.NotReadyAddresses = []v1.EndpointAddress{}
   687  			canBeAdded -= len(subset.Addresses)
   688  		} else {
   689  			// Only truncate the not ready addresses
   690  			subset.NotReadyAddresses = addressSubset(subset.NotReadyAddresses, toBeAdded)
   691  			canBeAdded -= len(subset.NotReadyAddresses)
   692  		}
   693  		endpoints.Subsets[i] = subset
   694  	}
   695  	return true
   696  }
   698  // addressSubset takes a list of addresses and returns a subset if the length is greater
   699  // than the maxNum. If less than the maxNum, the entire list is returned.
   700  func addressSubset(addresses []v1.EndpointAddress, maxNum int) []v1.EndpointAddress {
   701  	if len(addresses) <= maxNum {
   702  		return addresses
   703  	}
   704  	return addresses[0:maxNum]
   705  }
   707  // semanticIgnoreResourceVersion does semantic deep equality checks for objects
   708  // but excludes ResourceVersion of ObjectReference. They are used when comparing
   709  // endpoints in Endpoints and EndpointSlice objects to avoid unnecessary updates
   710  // caused by Pod resourceVersion change.
   711  var semanticIgnoreResourceVersion = conversion.EqualitiesOrDie(
   712  	func(a, b v1.ObjectReference) bool {
   713  		a.ResourceVersion = ""
   714  		b.ResourceVersion = ""
   715  		return a == b
   716  	},
   717  )
   719  // endpointSubsetsEqualIgnoreResourceVersion returns true if EndpointSubsets
   720  // have equal attributes but excludes ResourceVersion of Pod.
   721  func endpointSubsetsEqualIgnoreResourceVersion(subsets1, subsets2 []v1.EndpointSubset) bool {
   722  	return semanticIgnoreResourceVersion.DeepEqual(subsets1, subsets2)
   723  }

