1
16
17 package endpointslice
18
19 import (
20 "context"
21 "fmt"
22 "time"
23
24 "golang.org/x/time/rate"
25
26 v1 "k8s.io/api/core/v1"
27 discovery "k8s.io/api/discovery/v1"
28 apierrors "k8s.io/apimachinery/pkg/api/errors"
29 "k8s.io/apimachinery/pkg/labels"
30 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
31 "k8s.io/apimachinery/pkg/util/wait"
32 utilfeature "k8s.io/apiserver/pkg/util/feature"
33 coreinformers "k8s.io/client-go/informers/core/v1"
34 discoveryinformers "k8s.io/client-go/informers/discovery/v1"
35 clientset "k8s.io/client-go/kubernetes"
36 "k8s.io/client-go/kubernetes/scheme"
37 v1core "k8s.io/client-go/kubernetes/typed/core/v1"
38 corelisters "k8s.io/client-go/listers/core/v1"
39 discoverylisters "k8s.io/client-go/listers/discovery/v1"
40 "k8s.io/client-go/tools/cache"
41 "k8s.io/client-go/tools/record"
42 "k8s.io/client-go/util/workqueue"
43 endpointslicerec "k8s.io/endpointslice"
44 endpointslicemetrics "k8s.io/endpointslice/metrics"
45 "k8s.io/endpointslice/topologycache"
46 endpointsliceutil "k8s.io/endpointslice/util"
47 "k8s.io/klog/v2"
48 "k8s.io/kubernetes/pkg/controller"
49 endpointslicepkg "k8s.io/kubernetes/pkg/controller/util/endpointslice"
50 "k8s.io/kubernetes/pkg/features"
51 )
52
53 const (
54
55
56
57
58
59
60
61 maxRetries = 15
62
63
64
65
66
67
68 endpointSliceChangeMinSyncDelay = 1 * time.Second
69
70
71 defaultSyncBackOff = 1 * time.Second
72
73 maxSyncBackOff = 1000 * time.Second
74
75
76
77 controllerName = "endpointslice-controller.k8s.io"
78 )
79
80
81 func NewController(ctx context.Context, podInformer coreinformers.PodInformer,
82 serviceInformer coreinformers.ServiceInformer,
83 nodeInformer coreinformers.NodeInformer,
84 endpointSliceInformer discoveryinformers.EndpointSliceInformer,
85 maxEndpointsPerSlice int32,
86 client clientset.Interface,
87 endpointUpdatesBatchPeriod time.Duration,
88 ) *Controller {
89 broadcaster := record.NewBroadcaster(record.WithContext(ctx))
90 recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "endpoint-slice-controller"})
91
92 endpointslicemetrics.RegisterMetrics()
93
94 c := &Controller{
95 client: client,
96
97
98
99
100
101
102 queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewMaxOfRateLimiter(
103 workqueue.NewItemExponentialFailureRateLimiter(defaultSyncBackOff, maxSyncBackOff),
104
105
106 &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
107 ), "endpoint_slice"),
108 workerLoopPeriod: time.Second,
109 }
110
111 serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
112 AddFunc: c.onServiceUpdate,
113 UpdateFunc: func(old, cur interface{}) {
114 c.onServiceUpdate(cur)
115 },
116 DeleteFunc: c.onServiceDelete,
117 })
118 c.serviceLister = serviceInformer.Lister()
119 c.servicesSynced = serviceInformer.Informer().HasSynced
120
121 podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
122 AddFunc: c.addPod,
123 UpdateFunc: c.updatePod,
124 DeleteFunc: c.deletePod,
125 })
126 c.podLister = podInformer.Lister()
127 c.podsSynced = podInformer.Informer().HasSynced
128
129 c.nodeLister = nodeInformer.Lister()
130 c.nodesSynced = nodeInformer.Informer().HasSynced
131
132 logger := klog.FromContext(ctx)
133 endpointSliceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
134 AddFunc: c.onEndpointSliceAdd,
135 UpdateFunc: func(oldObj, newObj interface{}) {
136 c.onEndpointSliceUpdate(logger, oldObj, newObj)
137 },
138 DeleteFunc: c.onEndpointSliceDelete,
139 })
140
141 c.endpointSliceLister = endpointSliceInformer.Lister()
142 c.endpointSlicesSynced = endpointSliceInformer.Informer().HasSynced
143 c.endpointSliceTracker = endpointsliceutil.NewEndpointSliceTracker()
144
145 c.maxEndpointsPerSlice = maxEndpointsPerSlice
146
147 c.triggerTimeTracker = endpointsliceutil.NewTriggerTimeTracker()
148
149 c.eventBroadcaster = broadcaster
150 c.eventRecorder = recorder
151
152 c.endpointUpdatesBatchPeriod = endpointUpdatesBatchPeriod
153
154 if utilfeature.DefaultFeatureGate.Enabled(features.TopologyAwareHints) {
155 nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
156 AddFunc: func(obj interface{}) {
157 c.addNode(logger, obj)
158 },
159 UpdateFunc: func(oldObj, newObj interface{}) {
160 c.updateNode(logger, oldObj, newObj)
161 },
162 DeleteFunc: func(obj interface{}) {
163 c.deleteNode(logger, obj)
164 },
165 })
166
167 c.topologyCache = topologycache.NewTopologyCache()
168 }
169
170 c.reconciler = endpointslicerec.NewReconciler(
171 c.client,
172 c.nodeLister,
173 c.maxEndpointsPerSlice,
174 c.endpointSliceTracker,
175 c.topologyCache,
176 c.eventRecorder,
177 controllerName,
178 endpointslicerec.WithTrafficDistributionEnabled(utilfeature.DefaultFeatureGate.Enabled(features.ServiceTrafficDistribution)),
179 )
180
181 return c
182 }
183
184
185 type Controller struct {
186 client clientset.Interface
187 eventBroadcaster record.EventBroadcaster
188 eventRecorder record.EventRecorder
189
190
191
192 serviceLister corelisters.ServiceLister
193
194
195 servicesSynced cache.InformerSynced
196
197
198
199 podLister corelisters.PodLister
200
201
202 podsSynced cache.InformerSynced
203
204
205
206 endpointSliceLister discoverylisters.EndpointSliceLister
207
208
209 endpointSlicesSynced cache.InformerSynced
210
211
212
213 endpointSliceTracker *endpointsliceutil.EndpointSliceTracker
214
215
216
217 nodeLister corelisters.NodeLister
218
219
220 nodesSynced cache.InformerSynced
221
222
223 reconciler *endpointslicerec.Reconciler
224
225
226
227 triggerTimeTracker *endpointsliceutil.TriggerTimeTracker
228
229
230
231
232
233
234 queue workqueue.RateLimitingInterface
235
236
237
238 maxEndpointsPerSlice int32
239
240
241
242 workerLoopPeriod time.Duration
243
244
245
246 endpointUpdatesBatchPeriod time.Duration
247
248
249
250 topologyCache *topologycache.TopologyCache
251 }
252
253
254 func (c *Controller) Run(ctx context.Context, workers int) {
255 defer utilruntime.HandleCrash()
256
257
258 c.eventBroadcaster.StartLogging(klog.Infof)
259 c.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: c.client.CoreV1().Events("")})
260 defer c.eventBroadcaster.Shutdown()
261
262 defer c.queue.ShutDown()
263
264 logger := klog.FromContext(ctx)
265 logger.Info("Starting endpoint slice controller")
266 defer logger.Info("Shutting down endpoint slice controller")
267
268 if !cache.WaitForNamedCacheSync("endpoint_slice", ctx.Done(), c.podsSynced, c.servicesSynced, c.endpointSlicesSynced, c.nodesSynced) {
269 return
270 }
271
272 logger.V(2).Info("Starting worker threads", "total", workers)
273 for i := 0; i < workers; i++ {
274 go wait.Until(func() { c.worker(logger) }, c.workerLoopPeriod, ctx.Done())
275 }
276
277 <-ctx.Done()
278 }
279
280
281
282
283
284 func (c *Controller) worker(logger klog.Logger) {
285 for c.processNextWorkItem(logger) {
286 }
287 }
288
289 func (c *Controller) processNextWorkItem(logger klog.Logger) bool {
290 cKey, quit := c.queue.Get()
291 if quit {
292 return false
293 }
294 defer c.queue.Done(cKey)
295
296 err := c.syncService(logger, cKey.(string))
297 c.handleErr(logger, err, cKey)
298
299 return true
300 }
301
302 func (c *Controller) handleErr(logger klog.Logger, err error, key interface{}) {
303 trackSync(err)
304
305 if err == nil {
306 c.queue.Forget(key)
307 return
308 }
309
310 if c.queue.NumRequeues(key) < maxRetries {
311 logger.Info("Error syncing endpoint slices for service, retrying", "key", key, "err", err)
312 c.queue.AddRateLimited(key)
313 return
314 }
315
316 logger.Info("Retry budget exceeded, dropping service out of the queue", "key", key, "err", err)
317 c.queue.Forget(key)
318 utilruntime.HandleError(err)
319 }
320
321 func (c *Controller) syncService(logger klog.Logger, key string) error {
322 startTime := time.Now()
323 defer func() {
324 logger.V(4).Info("Finished syncing service endpoint slices", "key", key, "elapsedTime", time.Since(startTime))
325 }()
326
327 namespace, name, err := cache.SplitMetaNamespaceKey(key)
328 if err != nil {
329 return err
330 }
331
332 service, err := c.serviceLister.Services(namespace).Get(name)
333 if err != nil {
334 if !apierrors.IsNotFound(err) {
335 return err
336 }
337
338 c.triggerTimeTracker.DeleteService(namespace, name)
339 c.reconciler.DeleteService(namespace, name)
340 c.endpointSliceTracker.DeleteService(namespace, name)
341
342 return nil
343 }
344
345 if service.Spec.Type == v1.ServiceTypeExternalName {
346
347
348 return nil
349 }
350
351 if service.Spec.Selector == nil {
352
353
354 return nil
355 }
356
357 logger.V(5).Info("About to update endpoint slices for service", "key", key)
358
359 podLabelSelector := labels.Set(service.Spec.Selector).AsSelectorPreValidated()
360 pods, err := c.podLister.Pods(service.Namespace).List(podLabelSelector)
361 if err != nil {
362
363
364 c.eventRecorder.Eventf(service, v1.EventTypeWarning, "FailedToListPods",
365 "Error listing Pods for Service %s/%s: %v", service.Namespace, service.Name, err)
366 return err
367 }
368
369 esLabelSelector := labels.Set(map[string]string{
370 discovery.LabelServiceName: service.Name,
371 discovery.LabelManagedBy: c.reconciler.GetControllerName(),
372 }).AsSelectorPreValidated()
373 endpointSlices, err := c.endpointSliceLister.EndpointSlices(service.Namespace).List(esLabelSelector)
374
375 if err != nil {
376
377
378 c.eventRecorder.Eventf(service, v1.EventTypeWarning, "FailedToListEndpointSlices",
379 "Error listing Endpoint Slices for Service %s/%s: %v", service.Namespace, service.Name, err)
380 return err
381 }
382
383
384 endpointSlices = dropEndpointSlicesPendingDeletion(endpointSlices)
385
386 if c.endpointSliceTracker.StaleSlices(service, endpointSlices) {
387 return endpointslicepkg.NewStaleInformerCache("EndpointSlice informer cache is out of date")
388 }
389
390
391
392
393 lastChangeTriggerTime := c.triggerTimeTracker.
394 ComputeEndpointLastChangeTriggerTime(namespace, service, pods)
395
396 err = c.reconciler.Reconcile(logger, service, pods, endpointSlices, lastChangeTriggerTime)
397 if err != nil {
398 c.eventRecorder.Eventf(service, v1.EventTypeWarning, "FailedToUpdateEndpointSlices",
399 "Error updating Endpoint Slices for Service %s/%s: %v", service.Namespace, service.Name, err)
400 return err
401 }
402
403 return nil
404 }
405
406
407 func (c *Controller) onServiceUpdate(obj interface{}) {
408 key, err := controller.KeyFunc(obj)
409 if err != nil {
410 utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
411 return
412 }
413
414 c.queue.Add(key)
415 }
416
417
418 func (c *Controller) onServiceDelete(obj interface{}) {
419 key, err := controller.KeyFunc(obj)
420 if err != nil {
421 utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
422 return
423 }
424
425 c.queue.Add(key)
426 }
427
428
429
430
431 func (c *Controller) onEndpointSliceAdd(obj interface{}) {
432 endpointSlice := obj.(*discovery.EndpointSlice)
433 if endpointSlice == nil {
434 utilruntime.HandleError(fmt.Errorf("Invalid EndpointSlice provided to onEndpointSliceAdd()"))
435 return
436 }
437 if c.reconciler.ManagedByController(endpointSlice) && c.endpointSliceTracker.ShouldSync(endpointSlice) {
438 c.queueServiceForEndpointSlice(endpointSlice)
439 }
440 }
441
442
443
444
445
446 func (c *Controller) onEndpointSliceUpdate(logger klog.Logger, prevObj, obj interface{}) {
447 prevEndpointSlice := prevObj.(*discovery.EndpointSlice)
448 endpointSlice := obj.(*discovery.EndpointSlice)
449 if endpointSlice == nil || prevEndpointSlice == nil {
450 utilruntime.HandleError(fmt.Errorf("Invalid EndpointSlice provided to onEndpointSliceUpdate()"))
451 return
452 }
453
454
455
456 svcName := endpointSlice.Labels[discovery.LabelServiceName]
457 prevSvcName := prevEndpointSlice.Labels[discovery.LabelServiceName]
458 if svcName != prevSvcName {
459 logger.Info("label changed", "label", discovery.LabelServiceName, "oldService", prevSvcName, "newService", svcName, "endpointslice", klog.KObj(endpointSlice))
460 c.queueServiceForEndpointSlice(endpointSlice)
461 c.queueServiceForEndpointSlice(prevEndpointSlice)
462 return
463 }
464 if c.reconciler.ManagedByChanged(prevEndpointSlice, endpointSlice) || (c.reconciler.ManagedByController(endpointSlice) && c.endpointSliceTracker.ShouldSync(endpointSlice)) {
465 c.queueServiceForEndpointSlice(endpointSlice)
466 }
467 }
468
469
470
471
472 func (c *Controller) onEndpointSliceDelete(obj interface{}) {
473 endpointSlice := getEndpointSliceFromDeleteAction(obj)
474 if endpointSlice != nil && c.reconciler.ManagedByController(endpointSlice) && c.endpointSliceTracker.Has(endpointSlice) {
475
476
477 if !c.endpointSliceTracker.HandleDeletion(endpointSlice) {
478 c.queueServiceForEndpointSlice(endpointSlice)
479 }
480 }
481 }
482
483
484
485 func (c *Controller) queueServiceForEndpointSlice(endpointSlice *discovery.EndpointSlice) {
486 key, err := endpointslicerec.ServiceControllerKey(endpointSlice)
487 if err != nil {
488 utilruntime.HandleError(fmt.Errorf("Couldn't get key for EndpointSlice %+v: %v", endpointSlice, err))
489 return
490 }
491
492
493
494 delay := endpointSliceChangeMinSyncDelay
495 if c.endpointUpdatesBatchPeriod > delay {
496 delay = c.endpointUpdatesBatchPeriod
497 }
498 c.queue.AddAfter(key, delay)
499 }
500
501 func (c *Controller) addPod(obj interface{}) {
502 pod := obj.(*v1.Pod)
503 services, err := endpointsliceutil.GetPodServiceMemberships(c.serviceLister, pod)
504 if err != nil {
505 utilruntime.HandleError(fmt.Errorf("Unable to get pod %s/%s's service memberships: %v", pod.Namespace, pod.Name, err))
506 return
507 }
508 for key := range services {
509 c.queue.AddAfter(key, c.endpointUpdatesBatchPeriod)
510 }
511 }
512
513 func (c *Controller) updatePod(old, cur interface{}) {
514 services := endpointsliceutil.GetServicesToUpdateOnPodChange(c.serviceLister, old, cur)
515 for key := range services {
516 c.queue.AddAfter(key, c.endpointUpdatesBatchPeriod)
517 }
518 }
519
520
521
522 func (c *Controller) deletePod(obj interface{}) {
523 pod := endpointsliceutil.GetPodFromDeleteAction(obj)
524 if pod != nil {
525 c.addPod(pod)
526 }
527 }
528
529 func (c *Controller) addNode(logger klog.Logger, obj interface{}) {
530 c.checkNodeTopologyDistribution(logger)
531 }
532
533 func (c *Controller) updateNode(logger klog.Logger, old, cur interface{}) {
534 oldNode := old.(*v1.Node)
535 curNode := cur.(*v1.Node)
536
537
538
539 if isNodeReady(oldNode) != isNodeReady(curNode) ||
540 oldNode.Labels[v1.LabelTopologyZone] != curNode.Labels[v1.LabelTopologyZone] {
541 c.checkNodeTopologyDistribution(logger)
542 }
543 }
544
545 func (c *Controller) deleteNode(logger klog.Logger, obj interface{}) {
546 c.checkNodeTopologyDistribution(logger)
547 }
548
549
550
551 func (c *Controller) checkNodeTopologyDistribution(logger klog.Logger) {
552 if c.topologyCache == nil {
553 return
554 }
555 nodes, err := c.nodeLister.List(labels.Everything())
556 if err != nil {
557 logger.Error(err, "Error listing Nodes")
558 return
559 }
560 c.topologyCache.SetNodes(logger, nodes)
561 serviceKeys := c.topologyCache.GetOverloadedServices()
562 for _, serviceKey := range serviceKeys {
563 logger.V(2).Info("Queuing Service after Node change due to overloading", "key", serviceKey)
564 c.queue.Add(serviceKey)
565 }
566 }
567
568
569 func trackSync(err error) {
570 metricLabel := "success"
571 if err != nil {
572 if endpointslicepkg.IsStaleInformerCacheErr(err) {
573 metricLabel = "stale"
574 } else {
575 metricLabel = "error"
576 }
577 }
578 endpointslicemetrics.EndpointSliceSyncs.WithLabelValues(metricLabel).Inc()
579 }
580
581 func dropEndpointSlicesPendingDeletion(endpointSlices []*discovery.EndpointSlice) []*discovery.EndpointSlice {
582 n := 0
583 for _, endpointSlice := range endpointSlices {
584 if endpointSlice.DeletionTimestamp == nil {
585 endpointSlices[n] = endpointSlice
586 n++
587 }
588 }
589 return endpointSlices[:n]
590 }
591
592
593 func getEndpointSliceFromDeleteAction(obj interface{}) *discovery.EndpointSlice {
594 if endpointSlice, ok := obj.(*discovery.EndpointSlice); ok {
595
596
597 return endpointSlice
598 }
599
600 tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
601 if !ok {
602 utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))
603 return nil
604 }
605 endpointSlice, ok := tombstone.Obj.(*discovery.EndpointSlice)
606 if !ok {
607 utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a EndpointSlice: %#v", obj))
608 return nil
609 }
610 return endpointSlice
611 }
612
613
614 func isNodeReady(node *v1.Node) bool {
615 for _, c := range node.Status.Conditions {
616 if c.Type == v1.NodeReady {
617 return c.Status == v1.ConditionTrue
618 }
619 }
620 return false
621 }
622
View as plain text