1
16
17 package endpoint
18
19 import (
20 "context"
21 "fmt"
22 "math"
23 "time"
24
25 v1 "k8s.io/api/core/v1"
26 apiequality "k8s.io/apimachinery/pkg/api/equality"
27 "k8s.io/apimachinery/pkg/api/errors"
28 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29 "k8s.io/apimachinery/pkg/conversion"
30 "k8s.io/apimachinery/pkg/labels"
31 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
32 "k8s.io/apimachinery/pkg/util/wait"
33 coreinformers "k8s.io/client-go/informers/core/v1"
34 clientset "k8s.io/client-go/kubernetes"
35 "k8s.io/client-go/kubernetes/scheme"
36 v1core "k8s.io/client-go/kubernetes/typed/core/v1"
37 corelisters "k8s.io/client-go/listers/core/v1"
38 "k8s.io/client-go/tools/cache"
39 "k8s.io/client-go/tools/leaderelection/resourcelock"
40 "k8s.io/client-go/tools/record"
41 "k8s.io/client-go/util/workqueue"
42 endpointsliceutil "k8s.io/endpointslice/util"
43 "k8s.io/klog/v2"
44 "k8s.io/kubernetes/pkg/api/v1/endpoints"
45 podutil "k8s.io/kubernetes/pkg/api/v1/pod"
46 api "k8s.io/kubernetes/pkg/apis/core"
47 helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
48 "k8s.io/kubernetes/pkg/controller"
49 utillabels "k8s.io/kubernetes/pkg/util/labels"
50 utilnet "k8s.io/utils/net"
51 )
52
53 const (
54
55
56
57
58
59 maxRetries = 15
60
61
62
63
64 maxCapacity = 1000
65
66
67
68
69 truncated = "truncated"
70 )
71
72
73 func NewEndpointController(ctx context.Context, podInformer coreinformers.PodInformer, serviceInformer coreinformers.ServiceInformer,
74 endpointsInformer coreinformers.EndpointsInformer, client clientset.Interface, endpointUpdatesBatchPeriod time.Duration) *Controller {
75 broadcaster := record.NewBroadcaster(record.WithContext(ctx))
76 recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "endpoint-controller"})
77
78 e := &Controller{
79 client: client,
80 queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "endpoint"),
81 workerLoopPeriod: time.Second,
82 }
83
84 serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
85 AddFunc: e.onServiceUpdate,
86 UpdateFunc: func(old, cur interface{}) {
87 e.onServiceUpdate(cur)
88 },
89 DeleteFunc: e.onServiceDelete,
90 })
91 e.serviceLister = serviceInformer.Lister()
92 e.servicesSynced = serviceInformer.Informer().HasSynced
93
94 podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
95 AddFunc: e.addPod,
96 UpdateFunc: e.updatePod,
97 DeleteFunc: e.deletePod,
98 })
99 e.podLister = podInformer.Lister()
100 e.podsSynced = podInformer.Informer().HasSynced
101
102 endpointsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
103 DeleteFunc: e.onEndpointsDelete,
104 })
105 e.endpointsLister = endpointsInformer.Lister()
106 e.endpointsSynced = endpointsInformer.Informer().HasSynced
107
108 e.triggerTimeTracker = endpointsliceutil.NewTriggerTimeTracker()
109 e.eventBroadcaster = broadcaster
110 e.eventRecorder = recorder
111
112 e.endpointUpdatesBatchPeriod = endpointUpdatesBatchPeriod
113
114 return e
115 }
116
117
118 type Controller struct {
119 client clientset.Interface
120 eventBroadcaster record.EventBroadcaster
121 eventRecorder record.EventRecorder
122
123
124
125 serviceLister corelisters.ServiceLister
126
127
128 servicesSynced cache.InformerSynced
129
130
131
132 podLister corelisters.PodLister
133
134
135 podsSynced cache.InformerSynced
136
137
138
139 endpointsLister corelisters.EndpointsLister
140
141
142 endpointsSynced cache.InformerSynced
143
144
145
146
147
148
149 queue workqueue.RateLimitingInterface
150
151
152 workerLoopPeriod time.Duration
153
154
155
156 triggerTimeTracker *endpointsliceutil.TriggerTimeTracker
157
158 endpointUpdatesBatchPeriod time.Duration
159 }
160
161
162
163 func (e *Controller) Run(ctx context.Context, workers int) {
164 defer utilruntime.HandleCrash()
165
166
167 e.eventBroadcaster.StartStructuredLogging(3)
168 e.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: e.client.CoreV1().Events("")})
169 defer e.eventBroadcaster.Shutdown()
170
171 defer e.queue.ShutDown()
172
173 logger := klog.FromContext(ctx)
174 logger.Info("Starting endpoint controller")
175 defer logger.Info("Shutting down endpoint controller")
176
177 if !cache.WaitForNamedCacheSync("endpoint", ctx.Done(), e.podsSynced, e.servicesSynced, e.endpointsSynced) {
178 return
179 }
180
181 for i := 0; i < workers; i++ {
182 go wait.UntilWithContext(ctx, e.worker, e.workerLoopPeriod)
183 }
184
185 go func() {
186 defer utilruntime.HandleCrash()
187 e.checkLeftoverEndpoints()
188 }()
189
190 <-ctx.Done()
191 }
192
193
194
195 func (e *Controller) addPod(obj interface{}) {
196 pod := obj.(*v1.Pod)
197 services, err := endpointsliceutil.GetPodServiceMemberships(e.serviceLister, pod)
198 if err != nil {
199 utilruntime.HandleError(fmt.Errorf("Unable to get pod %s/%s's service memberships: %v", pod.Namespace, pod.Name, err))
200 return
201 }
202 for key := range services {
203 e.queue.AddAfter(key, e.endpointUpdatesBatchPeriod)
204 }
205 }
206
207 func podToEndpointAddressForService(svc *v1.Service, pod *v1.Pod) (*v1.EndpointAddress, error) {
208 var endpointIP string
209 ipFamily := v1.IPv4Protocol
210
211 if len(svc.Spec.IPFamilies) > 0 {
212
213 ipFamily = svc.Spec.IPFamilies[0]
214 } else {
215
216
217
218
219 if len(svc.Spec.ClusterIP) > 0 && svc.Spec.ClusterIP != v1.ClusterIPNone {
220
221 if utilnet.IsIPv6String(svc.Spec.ClusterIP) {
222 ipFamily = v1.IPv6Protocol
223 }
224 } else {
225
226
227
228
229
230
231
232 if utilnet.IsIPv6String(pod.Status.PodIP) {
233 ipFamily = v1.IPv6Protocol
234 }
235 }
236 }
237
238
239 for _, podIP := range pod.Status.PodIPs {
240 if (ipFamily == v1.IPv6Protocol) == utilnet.IsIPv6String(podIP.IP) {
241 endpointIP = podIP.IP
242 break
243 }
244 }
245
246 if endpointIP == "" {
247 return nil, fmt.Errorf("failed to find a matching endpoint for service %v", svc.Name)
248 }
249
250 return &v1.EndpointAddress{
251 IP: endpointIP,
252 NodeName: &pod.Spec.NodeName,
253 TargetRef: &v1.ObjectReference{
254 Kind: "Pod",
255 Namespace: pod.ObjectMeta.Namespace,
256 Name: pod.ObjectMeta.Name,
257 UID: pod.ObjectMeta.UID,
258 },
259 }, nil
260 }
261
262
263
264
265 func (e *Controller) updatePod(old, cur interface{}) {
266 services := endpointsliceutil.GetServicesToUpdateOnPodChange(e.serviceLister, old, cur)
267 for key := range services {
268 e.queue.AddAfter(key, e.endpointUpdatesBatchPeriod)
269 }
270 }
271
272
273
274 func (e *Controller) deletePod(obj interface{}) {
275 pod := endpointsliceutil.GetPodFromDeleteAction(obj)
276 if pod != nil {
277 e.addPod(pod)
278 }
279 }
280
281
282 func (e *Controller) onServiceUpdate(obj interface{}) {
283 key, err := controller.KeyFunc(obj)
284 if err != nil {
285 utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
286 return
287 }
288 e.queue.Add(key)
289 }
290
291
292 func (e *Controller) onServiceDelete(obj interface{}) {
293 key, err := controller.KeyFunc(obj)
294 if err != nil {
295 utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
296 return
297 }
298 e.queue.Add(key)
299 }
300
301 func (e *Controller) onEndpointsDelete(obj interface{}) {
302 key, err := controller.KeyFunc(obj)
303 if err != nil {
304 utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
305 return
306 }
307 e.queue.Add(key)
308 }
309
310
311
312
313
314 func (e *Controller) worker(ctx context.Context) {
315 for e.processNextWorkItem(ctx) {
316 }
317 }
318
319 func (e *Controller) processNextWorkItem(ctx context.Context) bool {
320 eKey, quit := e.queue.Get()
321 if quit {
322 return false
323 }
324 defer e.queue.Done(eKey)
325
326 logger := klog.FromContext(ctx)
327 err := e.syncService(ctx, eKey.(string))
328 e.handleErr(logger, err, eKey)
329
330 return true
331 }
332
333 func (e *Controller) handleErr(logger klog.Logger, err error, key interface{}) {
334 if err == nil {
335 e.queue.Forget(key)
336 return
337 }
338
339 ns, name, keyErr := cache.SplitMetaNamespaceKey(key.(string))
340 if keyErr != nil {
341 logger.Error(err, "Failed to split meta namespace cache key", "key", key)
342 }
343
344 if e.queue.NumRequeues(key) < maxRetries {
345 logger.V(2).Info("Error syncing endpoints, retrying", "service", klog.KRef(ns, name), "err", err)
346 e.queue.AddRateLimited(key)
347 return
348 }
349
350 logger.Info("Dropping service out of the queue", "service", klog.KRef(ns, name), "err", err)
351 e.queue.Forget(key)
352 utilruntime.HandleError(err)
353 }
354
355 func (e *Controller) syncService(ctx context.Context, key string) error {
356 startTime := time.Now()
357 logger := klog.FromContext(ctx)
358 namespace, name, err := cache.SplitMetaNamespaceKey(key)
359 if err != nil {
360 return err
361 }
362 defer func() {
363 logger.V(4).Info("Finished syncing service endpoints", "service", klog.KRef(namespace, name), "startTime", time.Since(startTime))
364 }()
365
366 service, err := e.serviceLister.Services(namespace).Get(name)
367 if err != nil {
368 if !errors.IsNotFound(err) {
369 return err
370 }
371
372
373
374
375
376
377 err = e.client.CoreV1().Endpoints(namespace).Delete(ctx, name, metav1.DeleteOptions{})
378 if err != nil && !errors.IsNotFound(err) {
379 return err
380 }
381 e.triggerTimeTracker.DeleteService(namespace, name)
382 return nil
383 }
384
385 if service.Spec.Type == v1.ServiceTypeExternalName {
386
387
388 return nil
389 }
390
391 if service.Spec.Selector == nil {
392
393
394 return nil
395 }
396
397 logger.V(5).Info("About to update endpoints for service", "service", klog.KRef(namespace, name))
398 pods, err := e.podLister.Pods(service.Namespace).List(labels.Set(service.Spec.Selector).AsSelectorPreValidated())
399 if err != nil {
400
401
402 return err
403 }
404
405
406
407
408 endpointsLastChangeTriggerTime := e.triggerTimeTracker.
409 ComputeEndpointLastChangeTriggerTime(namespace, service, pods)
410
411 subsets := []v1.EndpointSubset{}
412 var totalReadyEps int
413 var totalNotReadyEps int
414
415 for _, pod := range pods {
416 if !endpointsliceutil.ShouldPodBeInEndpoints(pod, service.Spec.PublishNotReadyAddresses) {
417 logger.V(5).Info("Pod is not included on endpoints for Service", "pod", klog.KObj(pod), "service", klog.KObj(service))
418 continue
419 }
420
421 ep, err := podToEndpointAddressForService(service, pod)
422 if err != nil {
423
424
425 logger.V(2).Info("Failed to find endpoint for service with ClusterIP on pod with error", "service", klog.KObj(service), "clusterIP", service.Spec.ClusterIP, "pod", klog.KObj(pod), "error", err)
426 continue
427 }
428
429 epa := *ep
430 if endpointsliceutil.ShouldSetHostname(pod, service) {
431 epa.Hostname = pod.Spec.Hostname
432 }
433
434
435 if len(service.Spec.Ports) == 0 {
436 if service.Spec.ClusterIP == api.ClusterIPNone {
437 subsets, totalReadyEps, totalNotReadyEps = addEndpointSubset(logger, subsets, pod, epa, nil, service.Spec.PublishNotReadyAddresses)
438
439 }
440 } else {
441 for i := range service.Spec.Ports {
442 servicePort := &service.Spec.Ports[i]
443 portNum, err := podutil.FindPort(pod, servicePort)
444 if err != nil {
445 logger.V(4).Info("Failed to find port for service", "service", klog.KObj(service), "error", err)
446 continue
447 }
448 epp := endpointPortFromServicePort(servicePort, portNum)
449
450 var readyEps, notReadyEps int
451 subsets, readyEps, notReadyEps = addEndpointSubset(logger, subsets, pod, epa, epp, service.Spec.PublishNotReadyAddresses)
452 totalReadyEps = totalReadyEps + readyEps
453 totalNotReadyEps = totalNotReadyEps + notReadyEps
454 }
455 }
456 }
457 subsets = endpoints.RepackSubsets(subsets)
458
459
460 currentEndpoints, err := e.endpointsLister.Endpoints(service.Namespace).Get(service.Name)
461 if err != nil {
462 if !errors.IsNotFound(err) {
463 return err
464 }
465 currentEndpoints = &v1.Endpoints{
466 ObjectMeta: metav1.ObjectMeta{
467 Name: service.Name,
468 Labels: service.Labels,
469 },
470 }
471 }
472
473 createEndpoints := len(currentEndpoints.ResourceVersion) == 0
474
475
476
477
478
479
480 compareLabels := currentEndpoints.Labels
481 if _, ok := currentEndpoints.Labels[v1.IsHeadlessService]; ok {
482 compareLabels = utillabels.CloneAndRemoveLabel(currentEndpoints.Labels, v1.IsHeadlessService)
483 }
484
485
486 if !createEndpoints &&
487 endpointSubsetsEqualIgnoreResourceVersion(currentEndpoints.Subsets, subsets) &&
488 apiequality.Semantic.DeepEqual(compareLabels, service.Labels) &&
489 capacityAnnotationSetCorrectly(currentEndpoints.Annotations, currentEndpoints.Subsets) {
490 logger.V(5).Info("endpoints are equal, skipping update", "service", klog.KObj(service))
491 return nil
492 }
493 newEndpoints := currentEndpoints.DeepCopy()
494 newEndpoints.Subsets = subsets
495 newEndpoints.Labels = service.Labels
496 if newEndpoints.Annotations == nil {
497 newEndpoints.Annotations = make(map[string]string)
498 }
499
500 if !endpointsLastChangeTriggerTime.IsZero() {
501 newEndpoints.Annotations[v1.EndpointsLastChangeTriggerTime] =
502 endpointsLastChangeTriggerTime.UTC().Format(time.RFC3339Nano)
503 } else {
504 delete(newEndpoints.Annotations, v1.EndpointsLastChangeTriggerTime)
505 }
506
507 if truncateEndpoints(newEndpoints) {
508 newEndpoints.Annotations[v1.EndpointsOverCapacity] = truncated
509 } else {
510 delete(newEndpoints.Annotations, v1.EndpointsOverCapacity)
511 }
512
513 if newEndpoints.Labels == nil {
514 newEndpoints.Labels = make(map[string]string)
515 }
516
517 if !helper.IsServiceIPSet(service) {
518 newEndpoints.Labels = utillabels.CloneAndAddLabel(newEndpoints.Labels, v1.IsHeadlessService, "")
519 } else {
520 newEndpoints.Labels = utillabels.CloneAndRemoveLabel(newEndpoints.Labels, v1.IsHeadlessService)
521 }
522
523 logger.V(4).Info("Update endpoints", "service", klog.KObj(service), "readyEndpoints", totalReadyEps, "notreadyEndpoints", totalNotReadyEps)
524 if createEndpoints {
525
526 _, err = e.client.CoreV1().Endpoints(service.Namespace).Create(ctx, newEndpoints, metav1.CreateOptions{})
527 } else {
528
529 _, err = e.client.CoreV1().Endpoints(service.Namespace).Update(ctx, newEndpoints, metav1.UpdateOptions{})
530 }
531 if err != nil {
532 if createEndpoints && errors.IsForbidden(err) {
533
534
535
536
537 logger.V(5).Info("Forbidden from creating endpoints", "error", err)
538
539
540 if errors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
541 return nil
542 }
543 }
544
545 if createEndpoints {
546 e.eventRecorder.Eventf(newEndpoints, v1.EventTypeWarning, "FailedToCreateEndpoint", "Failed to create endpoint for service %v/%v: %v", service.Namespace, service.Name, err)
547 } else {
548 e.eventRecorder.Eventf(newEndpoints, v1.EventTypeWarning, "FailedToUpdateEndpoint", "Failed to update endpoint %v/%v: %v", service.Namespace, service.Name, err)
549 }
550
551 return err
552 }
553 return nil
554 }
555
556
557
558
559
560
561
562 func (e *Controller) checkLeftoverEndpoints() {
563 list, err := e.endpointsLister.List(labels.Everything())
564 if err != nil {
565 utilruntime.HandleError(fmt.Errorf("Unable to list endpoints (%v); orphaned endpoints will not be cleaned up. (They're pretty harmless, but you can restart this component if you want another attempt made.)", err))
566 return
567 }
568 for _, ep := range list {
569 if _, ok := ep.Annotations[resourcelock.LeaderElectionRecordAnnotationKey]; ok {
570
571
572
573
574
575 continue
576 }
577 key, err := controller.KeyFunc(ep)
578 if err != nil {
579 utilruntime.HandleError(fmt.Errorf("Unable to get key for endpoint %#v", ep))
580 continue
581 }
582 e.queue.Add(key)
583 }
584 }
585
586
587
588
589
590 func addEndpointSubset(logger klog.Logger, subsets []v1.EndpointSubset, pod *v1.Pod, epa v1.EndpointAddress,
591 epp *v1.EndpointPort, tolerateUnreadyEndpoints bool) ([]v1.EndpointSubset, int, int) {
592 var readyEps int
593 var notReadyEps int
594 ports := []v1.EndpointPort{}
595 if epp != nil {
596 ports = append(ports, *epp)
597 }
598 if tolerateUnreadyEndpoints || podutil.IsPodReady(pod) {
599 subsets = append(subsets, v1.EndpointSubset{
600 Addresses: []v1.EndpointAddress{epa},
601 Ports: ports,
602 })
603 readyEps++
604 } else {
605 logger.V(5).Info("Pod is out of service", "pod", klog.KObj(pod))
606 subsets = append(subsets, v1.EndpointSubset{
607 NotReadyAddresses: []v1.EndpointAddress{epa},
608 Ports: ports,
609 })
610 notReadyEps++
611 }
612 return subsets, readyEps, notReadyEps
613 }
614
615 func endpointPortFromServicePort(servicePort *v1.ServicePort, portNum int) *v1.EndpointPort {
616 return &v1.EndpointPort{
617 Name: servicePort.Name,
618 Port: int32(portNum),
619 Protocol: servicePort.Protocol,
620 AppProtocol: servicePort.AppProtocol,
621 }
622 }
623
624
625
626 func capacityAnnotationSetCorrectly(annotations map[string]string, subsets []v1.EndpointSubset) bool {
627 numEndpoints := 0
628 for _, subset := range subsets {
629 numEndpoints += len(subset.Addresses) + len(subset.NotReadyAddresses)
630 }
631 if numEndpoints > maxCapacity {
632
633
634 return false
635 }
636 _, ok := annotations[v1.EndpointsOverCapacity]
637 return !ok
638 }
639
640
641
642 func truncateEndpoints(endpoints *v1.Endpoints) bool {
643 totalReady := 0
644 totalNotReady := 0
645 for _, subset := range endpoints.Subsets {
646 totalReady += len(subset.Addresses)
647 totalNotReady += len(subset.NotReadyAddresses)
648 }
649
650 if totalReady+totalNotReady <= maxCapacity {
651 return false
652 }
653
654 truncateReady := false
655 max := maxCapacity - totalReady
656 numTotal := totalNotReady
657 if totalReady > maxCapacity {
658 truncateReady = true
659 max = maxCapacity
660 numTotal = totalReady
661 }
662 canBeAdded := max
663
664 for i := range endpoints.Subsets {
665 subset := endpoints.Subsets[i]
666 numInSubset := len(subset.Addresses)
667 if !truncateReady {
668 numInSubset = len(subset.NotReadyAddresses)
669 }
670
671
672
673
674
675 toBeAdded := int(math.Ceil((float64(numInSubset) / float64(numTotal)) * float64(max)))
676
677
678 if toBeAdded > canBeAdded {
679 toBeAdded = canBeAdded
680 }
681
682 if truncateReady {
683
684
685 subset.Addresses = addressSubset(subset.Addresses, toBeAdded)
686 subset.NotReadyAddresses = []v1.EndpointAddress{}
687 canBeAdded -= len(subset.Addresses)
688 } else {
689
690 subset.NotReadyAddresses = addressSubset(subset.NotReadyAddresses, toBeAdded)
691 canBeAdded -= len(subset.NotReadyAddresses)
692 }
693 endpoints.Subsets[i] = subset
694 }
695 return true
696 }
697
698
699
700 func addressSubset(addresses []v1.EndpointAddress, maxNum int) []v1.EndpointAddress {
701 if len(addresses) <= maxNum {
702 return addresses
703 }
704 return addresses[0:maxNum]
705 }
706
707
708
709
710
711 var semanticIgnoreResourceVersion = conversion.EqualitiesOrDie(
712 func(a, b v1.ObjectReference) bool {
713 a.ResourceVersion = ""
714 b.ResourceVersion = ""
715 return a == b
716 },
717 )
718
719
720
721 func endpointSubsetsEqualIgnoreResourceVersion(subsets1, subsets2 []v1.EndpointSubset) bool {
722 return semanticIgnoreResourceVersion.DeepEqual(subsets1, subsets2)
723 }
724
View as plain text