1
16
17 package endpointslicemirroring
18
19 import (
20 "context"
21 "fmt"
22 "time"
23
24 "golang.org/x/time/rate"
25
26 v1 "k8s.io/api/core/v1"
27 discovery "k8s.io/api/discovery/v1"
28 apierrors "k8s.io/apimachinery/pkg/api/errors"
29 "k8s.io/apimachinery/pkg/labels"
30 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
31 "k8s.io/apimachinery/pkg/util/wait"
32 coreinformers "k8s.io/client-go/informers/core/v1"
33 discoveryinformers "k8s.io/client-go/informers/discovery/v1"
34 clientset "k8s.io/client-go/kubernetes"
35 "k8s.io/client-go/kubernetes/scheme"
36 v1core "k8s.io/client-go/kubernetes/typed/core/v1"
37 corelisters "k8s.io/client-go/listers/core/v1"
38 discoverylisters "k8s.io/client-go/listers/discovery/v1"
39 "k8s.io/client-go/tools/cache"
40 "k8s.io/client-go/tools/record"
41 "k8s.io/client-go/util/workqueue"
42 endpointsliceutil "k8s.io/endpointslice/util"
43 "k8s.io/klog/v2"
44 "k8s.io/kubernetes/pkg/controller"
45 "k8s.io/kubernetes/pkg/controller/endpointslicemirroring/metrics"
46 endpointslicepkg "k8s.io/kubernetes/pkg/controller/util/endpointslice"
47 )
48
49 const (
50
51
52
53
54
55
56
57
58 maxRetries = 15
59
60
61 defaultSyncBackOff = 1 * time.Second
62
63 maxSyncBackOff = 100 * time.Second
64
65
66
67 controllerName = "endpointslicemirroring-controller.k8s.io"
68 )
69
70
71 func NewController(ctx context.Context, endpointsInformer coreinformers.EndpointsInformer,
72 endpointSliceInformer discoveryinformers.EndpointSliceInformer,
73 serviceInformer coreinformers.ServiceInformer,
74 maxEndpointsPerSubset int32,
75 client clientset.Interface,
76 endpointUpdatesBatchPeriod time.Duration,
77 ) *Controller {
78 logger := klog.FromContext(ctx)
79 broadcaster := record.NewBroadcaster(record.WithContext(ctx))
80 recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "endpoint-slice-mirroring-controller"})
81
82 metrics.RegisterMetrics()
83
84 c := &Controller{
85 client: client,
86
87
88
89
90
91 queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewMaxOfRateLimiter(
92 workqueue.NewItemExponentialFailureRateLimiter(defaultSyncBackOff, maxSyncBackOff),
93
94
95 &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
96 ), "endpoint_slice_mirroring"),
97 workerLoopPeriod: time.Second,
98 }
99
100 endpointsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
101 AddFunc: func(obj interface{}) {
102 c.onEndpointsAdd(logger, obj)
103 },
104 UpdateFunc: func(oldObj, newObj interface{}) {
105 c.onEndpointsUpdate(logger, oldObj, newObj)
106 },
107 DeleteFunc: func(obj interface{}) {
108 c.onEndpointsDelete(logger, obj)
109 },
110 })
111 c.endpointsLister = endpointsInformer.Lister()
112 c.endpointsSynced = endpointsInformer.Informer().HasSynced
113
114 endpointSliceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
115 AddFunc: c.onEndpointSliceAdd,
116 UpdateFunc: func(oldObj, newObj interface{}) {
117 c.onEndpointSliceUpdate(logger, oldObj, newObj)
118 },
119 DeleteFunc: c.onEndpointSliceDelete,
120 })
121
122 c.endpointSliceLister = endpointSliceInformer.Lister()
123 c.endpointSlicesSynced = endpointSliceInformer.Informer().HasSynced
124 c.endpointSliceTracker = endpointsliceutil.NewEndpointSliceTracker()
125
126 c.serviceLister = serviceInformer.Lister()
127 c.servicesSynced = serviceInformer.Informer().HasSynced
128 serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
129 AddFunc: c.onServiceAdd,
130 UpdateFunc: c.onServiceUpdate,
131 DeleteFunc: c.onServiceDelete,
132 })
133
134 c.maxEndpointsPerSubset = maxEndpointsPerSubset
135
136 c.reconciler = &reconciler{
137 client: c.client,
138 maxEndpointsPerSubset: c.maxEndpointsPerSubset,
139 endpointSliceTracker: c.endpointSliceTracker,
140 metricsCache: metrics.NewCache(maxEndpointsPerSubset),
141 eventRecorder: recorder,
142 }
143
144 c.eventBroadcaster = broadcaster
145 c.eventRecorder = recorder
146
147 c.endpointUpdatesBatchPeriod = endpointUpdatesBatchPeriod
148
149 return c
150 }
151
152
153 type Controller struct {
154 client clientset.Interface
155 eventBroadcaster record.EventBroadcaster
156 eventRecorder record.EventRecorder
157
158
159
160 endpointsLister corelisters.EndpointsLister
161
162
163
164 endpointsSynced cache.InformerSynced
165
166
167
168 endpointSliceLister discoverylisters.EndpointSliceLister
169
170
171
172 endpointSlicesSynced cache.InformerSynced
173
174
175
176
177 endpointSliceTracker *endpointsliceutil.EndpointSliceTracker
178
179
180
181 serviceLister corelisters.ServiceLister
182
183
184
185 servicesSynced cache.InformerSynced
186
187
188 reconciler *reconciler
189
190
191
192
193
194
195 queue workqueue.RateLimitingInterface
196
197
198
199 maxEndpointsPerSubset int32
200
201
202
203 workerLoopPeriod time.Duration
204
205
206
207
208 endpointUpdatesBatchPeriod time.Duration
209 }
210
211
212 func (c *Controller) Run(ctx context.Context, workers int) {
213 defer utilruntime.HandleCrash()
214
215
216 c.eventBroadcaster.StartLogging(klog.Infof)
217 c.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: c.client.CoreV1().Events("")})
218 defer c.eventBroadcaster.Shutdown()
219
220 defer c.queue.ShutDown()
221
222 logger := klog.FromContext(ctx)
223 logger.Info("Starting EndpointSliceMirroring controller")
224 defer logger.Info("Shutting down EndpointSliceMirroring controller")
225
226 if !cache.WaitForNamedCacheSync("endpoint_slice_mirroring", ctx.Done(), c.endpointsSynced, c.endpointSlicesSynced, c.servicesSynced) {
227 return
228 }
229
230 logger.V(2).Info("Starting worker threads", "total", workers)
231 for i := 0; i < workers; i++ {
232 go wait.Until(func() { c.worker(logger) }, c.workerLoopPeriod, ctx.Done())
233 }
234
235 <-ctx.Done()
236 }
237
238
239
240
241
242 func (c *Controller) worker(logger klog.Logger) {
243 for c.processNextWorkItem(logger) {
244 }
245 }
246
247 func (c *Controller) processNextWorkItem(logger klog.Logger) bool {
248 cKey, quit := c.queue.Get()
249 if quit {
250 return false
251 }
252 defer c.queue.Done(cKey)
253
254 err := c.syncEndpoints(logger, cKey.(string))
255 c.handleErr(logger, err, cKey)
256
257 return true
258 }
259
260 func (c *Controller) handleErr(logger klog.Logger, err error, key interface{}) {
261 if err == nil {
262 c.queue.Forget(key)
263 return
264 }
265
266 if c.queue.NumRequeues(key) < maxRetries {
267 logger.Info("Error mirroring EndpointSlices for Endpoints, retrying", "key", key, "err", err)
268 c.queue.AddRateLimited(key)
269 return
270 }
271
272 logger.Info("Retry budget exceeded, dropping Endpoints out of the queue", "key", key, "err", err)
273 c.queue.Forget(key)
274 utilruntime.HandleError(err)
275 }
276
277 func (c *Controller) syncEndpoints(logger klog.Logger, key string) error {
278 startTime := time.Now()
279 defer func() {
280 syncDuration := float64(time.Since(startTime).Milliseconds()) / 1000
281 metrics.EndpointsSyncDuration.WithLabelValues().Observe(syncDuration)
282 logger.V(4).Info("Finished syncing EndpointSlices for Endpoints", "key", key, "elapsedTime", time.Since(startTime))
283 }()
284
285 logger.V(4).Info("syncEndpoints", "key", key)
286
287 namespace, name, err := cache.SplitMetaNamespaceKey(key)
288 if err != nil {
289 return err
290 }
291
292 endpoints, err := c.endpointsLister.Endpoints(namespace).Get(name)
293 if err != nil {
294 if apierrors.IsNotFound(err) {
295 logger.V(4).Info("Endpoints not found, cleaning up any mirrored EndpointSlices", "endpoints", klog.KRef(namespace, name))
296 c.endpointSliceTracker.DeleteService(namespace, name)
297 return c.deleteMirroredSlices(namespace, name)
298 }
299 return err
300 }
301
302 if !c.shouldMirror(endpoints) {
303 logger.V(4).Info("Endpoints should not be mirrored, cleaning up any mirrored EndpointSlices", "endpoints", klog.KRef(namespace, name))
304 c.endpointSliceTracker.DeleteService(namespace, name)
305 return c.deleteMirroredSlices(namespace, name)
306 }
307
308 svc, err := c.serviceLister.Services(namespace).Get(name)
309 if err != nil {
310 if apierrors.IsNotFound(err) {
311 logger.V(4).Info("Service not found, cleaning up any mirrored EndpointSlices", "service", klog.KRef(namespace, name))
312 c.endpointSliceTracker.DeleteService(namespace, name)
313 return c.deleteMirroredSlices(namespace, name)
314 }
315 return err
316 }
317
318
319 if svc.Spec.Selector != nil {
320 logger.V(4).Info("Service now has selector, cleaning up any mirrored EndpointSlices", "service", klog.KRef(namespace, name))
321 c.endpointSliceTracker.DeleteService(namespace, name)
322 return c.deleteMirroredSlices(namespace, name)
323 }
324
325 endpointSlices, err := endpointSlicesMirroredForService(c.endpointSliceLister, namespace, name)
326 if err != nil {
327 return err
328 }
329
330 if c.endpointSliceTracker.StaleSlices(svc, endpointSlices) {
331 return endpointslicepkg.NewStaleInformerCache("EndpointSlice informer cache is out of date")
332 }
333
334 err = c.reconciler.reconcile(logger, endpoints, endpointSlices)
335 if err != nil {
336 return err
337 }
338
339 return nil
340 }
341
342
343 func (c *Controller) queueEndpoints(obj interface{}) {
344 key, err := controller.KeyFunc(obj)
345 if err != nil {
346 utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v (type %T): %v", obj, obj, err))
347 return
348 }
349
350 c.queue.Add(key)
351 }
352
353
354
355
356
357
358
359
360 func (c *Controller) shouldMirror(endpoints *v1.Endpoints) bool {
361 if endpoints == nil || skipMirror(endpoints.Labels) || hasLeaderElection(endpoints.Annotations) {
362 return false
363 }
364
365 return true
366 }
367
368
369 func (c *Controller) onServiceAdd(obj interface{}) {
370 service := obj.(*v1.Service)
371 if service == nil {
372 utilruntime.HandleError(fmt.Errorf("onServiceAdd() expected type v1.Service, got %T", obj))
373 return
374 }
375 if service.Spec.Selector == nil {
376 c.queueEndpoints(obj)
377 }
378 }
379
380
381 func (c *Controller) onServiceUpdate(prevObj, obj interface{}) {
382 service := obj.(*v1.Service)
383 prevService := prevObj.(*v1.Service)
384 if service == nil || prevService == nil {
385 utilruntime.HandleError(fmt.Errorf("onServiceUpdate() expected type v1.Service, got %T, %T", prevObj, obj))
386 return
387 }
388 if (service.Spec.Selector == nil) != (prevService.Spec.Selector == nil) {
389 c.queueEndpoints(obj)
390 }
391 }
392
393
394 func (c *Controller) onServiceDelete(obj interface{}) {
395 service := getServiceFromDeleteAction(obj)
396 if service == nil {
397 utilruntime.HandleError(fmt.Errorf("onServiceDelete() expected type v1.Service, got %T", obj))
398 return
399 }
400 if service.Spec.Selector == nil {
401 c.queueEndpoints(obj)
402 }
403 }
404
405
406 func (c *Controller) onEndpointsAdd(logger klog.Logger, obj interface{}) {
407 endpoints := obj.(*v1.Endpoints)
408 if endpoints == nil {
409 utilruntime.HandleError(fmt.Errorf("onEndpointsAdd() expected type v1.Endpoints, got %T", obj))
410 return
411 }
412 if !c.shouldMirror(endpoints) {
413 logger.V(5).Info("Skipping mirroring", "endpoints", klog.KObj(endpoints))
414 return
415 }
416 c.queueEndpoints(obj)
417 }
418
419
420 func (c *Controller) onEndpointsUpdate(logger klog.Logger, prevObj, obj interface{}) {
421 endpoints := obj.(*v1.Endpoints)
422 prevEndpoints := prevObj.(*v1.Endpoints)
423 if endpoints == nil || prevEndpoints == nil {
424 utilruntime.HandleError(fmt.Errorf("onEndpointsUpdate() expected type v1.Endpoints, got %T, %T", prevObj, obj))
425 return
426 }
427 if !c.shouldMirror(endpoints) && !c.shouldMirror(prevEndpoints) {
428 logger.V(5).Info("Skipping mirroring", "endpoints", klog.KObj(endpoints))
429 return
430 }
431 c.queueEndpoints(obj)
432 }
433
434
435 func (c *Controller) onEndpointsDelete(logger klog.Logger, obj interface{}) {
436 endpoints := getEndpointsFromDeleteAction(obj)
437 if endpoints == nil {
438 utilruntime.HandleError(fmt.Errorf("onEndpointsDelete() expected type v1.Endpoints, got %T", obj))
439 return
440 }
441 if !c.shouldMirror(endpoints) {
442 logger.V(5).Info("Skipping mirroring", "endpoints", klog.KObj(endpoints))
443 return
444 }
445 c.queueEndpoints(obj)
446 }
447
448
449
450
451 func (c *Controller) onEndpointSliceAdd(obj interface{}) {
452 endpointSlice := obj.(*discovery.EndpointSlice)
453 if endpointSlice == nil {
454 utilruntime.HandleError(fmt.Errorf("onEndpointSliceAdd() expected type discovery.EndpointSlice, got %T", obj))
455 return
456 }
457 if managedByController(endpointSlice) && c.endpointSliceTracker.ShouldSync(endpointSlice) {
458 c.queueEndpointsForEndpointSlice(endpointSlice)
459 }
460 }
461
462
463
464
465
466 func (c *Controller) onEndpointSliceUpdate(logger klog.Logger, prevObj, obj interface{}) {
467 prevEndpointSlice := obj.(*discovery.EndpointSlice)
468 endpointSlice := prevObj.(*discovery.EndpointSlice)
469 if endpointSlice == nil || prevEndpointSlice == nil {
470 utilruntime.HandleError(fmt.Errorf("onEndpointSliceUpdated() expected type discovery.EndpointSlice, got %T, %T", prevObj, obj))
471 return
472 }
473
474
475
476 svcName := endpointSlice.Labels[discovery.LabelServiceName]
477 prevSvcName := prevEndpointSlice.Labels[discovery.LabelServiceName]
478 if svcName != prevSvcName {
479 logger.Info("LabelServiceName changed", "labelServiceName", discovery.LabelServiceName, "oldName", prevSvcName, "newName", svcName, "endpointSlice", klog.KObj(endpointSlice))
480 c.queueEndpointsForEndpointSlice(endpointSlice)
481 c.queueEndpointsForEndpointSlice(prevEndpointSlice)
482 return
483 }
484 if managedByChanged(prevEndpointSlice, endpointSlice) || (managedByController(endpointSlice) && c.endpointSliceTracker.ShouldSync(endpointSlice)) {
485 c.queueEndpointsForEndpointSlice(endpointSlice)
486 }
487 }
488
489
490
491
492 func (c *Controller) onEndpointSliceDelete(obj interface{}) {
493 endpointSlice := getEndpointSliceFromDeleteAction(obj)
494 if endpointSlice == nil {
495 utilruntime.HandleError(fmt.Errorf("onEndpointSliceDelete() expected type discovery.EndpointSlice, got %T", obj))
496 return
497 }
498 if managedByController(endpointSlice) && c.endpointSliceTracker.Has(endpointSlice) {
499
500
501 if !c.endpointSliceTracker.HandleDeletion(endpointSlice) {
502 c.queueEndpointsForEndpointSlice(endpointSlice)
503 }
504 }
505 }
506
507
508
509 func (c *Controller) queueEndpointsForEndpointSlice(endpointSlice *discovery.EndpointSlice) {
510 key, err := endpointsControllerKey(endpointSlice)
511 if err != nil {
512 utilruntime.HandleError(fmt.Errorf("Couldn't get key for EndpointSlice %+v (type %T): %v", endpointSlice, endpointSlice, err))
513 return
514 }
515
516 c.queue.AddAfter(key, c.endpointUpdatesBatchPeriod)
517 }
518
519
520
521 func (c *Controller) deleteMirroredSlices(namespace, name string) error {
522 endpointSlices, err := endpointSlicesMirroredForService(c.endpointSliceLister, namespace, name)
523 if err != nil {
524 return err
525 }
526
527 c.endpointSliceTracker.DeleteService(namespace, name)
528 return c.reconciler.deleteEndpoints(namespace, name, endpointSlices)
529 }
530
531
532
533 func endpointSlicesMirroredForService(endpointSliceLister discoverylisters.EndpointSliceLister, namespace, name string) ([]*discovery.EndpointSlice, error) {
534 esLabelSelector := labels.Set(map[string]string{
535 discovery.LabelServiceName: name,
536 discovery.LabelManagedBy: controllerName,
537 }).AsSelectorPreValidated()
538 return endpointSliceLister.EndpointSlices(namespace).List(esLabelSelector)
539 }
540
View as plain text