1 package externalworkload
2
3 import (
4 "context"
5 "errors"
6 "fmt"
7 "sync"
8 "time"
9
10 ewv1beta1 "github.com/linkerd/linkerd2/controller/gen/apis/externalworkload/v1beta1"
11 "github.com/linkerd/linkerd2/controller/k8s"
12 logging "github.com/sirupsen/logrus"
13 corev1 "k8s.io/api/core/v1"
14 discoveryv1 "k8s.io/api/discovery/v1"
15 kerrors "k8s.io/apimachinery/pkg/api/errors"
16 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
17 "k8s.io/apimachinery/pkg/labels"
18 "k8s.io/client-go/tools/cache"
19 "k8s.io/client-go/tools/leaderelection"
20 "k8s.io/client-go/tools/leaderelection/resourcelock"
21 "k8s.io/client-go/util/workqueue"
22 endpointslicerec "k8s.io/endpointslice"
23 epsliceutil "k8s.io/endpointslice/util"
24 )
25
26 const (
27
28 leaseName = "linkerd-destination-endpoint-write"
29
30
31
32 leaseDuration = 30 * time.Second
33
34
35
36 leaseRenewDeadline = 10 * time.Second
37
38
39
40 leaseRetryPeriod = 2 * time.Second
41
42
43
44 managedBy = "linkerd-external-workloads-controller"
45
46
47 maxEndpointsQuota = 100
48
49
50 maxRetryBudget = 15
51 )
52
53
54
55
56 type EndpointsController struct {
57 k8sAPI *k8s.API
58 log *logging.Entry
59 queue workqueue.RateLimitingInterface
60 reconciler *endpointsReconciler
61 stop chan struct{}
62
63 lec leaderelection.LeaderElectionConfig
64 informerHandlers
65 dropsMetric workqueue.CounterMetric
66 }
67
68
69
70
71
72
73 type informerHandlers struct {
74 ewHandle cache.ResourceEventHandlerRegistration
75 esHandle cache.ResourceEventHandlerRegistration
76 svcHandle cache.ResourceEventHandlerRegistration
77
78
79
80 sync.Mutex
81 }
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96 func NewEndpointsController(k8sAPI *k8s.API, hostname, controllerNs string, stopCh chan struct{}, exportQueueMetrics bool) (*EndpointsController, error) {
97 queueName := "endpoints_controller_workqueue"
98 workQueueConfig := workqueue.RateLimitingQueueConfig{
99 Name: queueName,
100 }
101
102 var dropsMetric workqueue.CounterMetric = &noopCounterMetric{}
103 if exportQueueMetrics {
104 provider := newWorkQueueMetricsProvider()
105 workQueueConfig.MetricsProvider = provider
106 dropsMetric = provider.NewDropsMetric(queueName)
107 }
108
109 ec := &EndpointsController{
110 k8sAPI: k8sAPI,
111 reconciler: newEndpointsReconciler(k8sAPI, managedBy, maxEndpointsQuota),
112 queue: workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(), workQueueConfig),
113 stop: stopCh,
114 log: logging.WithFields(logging.Fields{
115 "component": "external-endpoints-controller",
116 }),
117 dropsMetric: dropsMetric,
118 }
119
120
121
122
123
124 ec.lec = leaderelection.LeaderElectionConfig{
125
126
127 ReleaseOnCancel: true,
128 Lock: &resourcelock.LeaseLock{
129 LeaseMeta: metav1.ObjectMeta{
130 Name: leaseName,
131 Namespace: controllerNs,
132 },
133 Client: k8sAPI.Client.CoordinationV1(),
134 LockConfig: resourcelock.ResourceLockConfig{
135 Identity: hostname,
136 },
137 },
138 LeaseDuration: leaseDuration,
139 RenewDeadline: leaseRenewDeadline,
140 RetryPeriod: leaseRetryPeriod,
141 Callbacks: leaderelection.LeaderCallbacks{
142 OnStartedLeading: func(ctx context.Context) {
143 err := ec.addHandlers()
144 if err != nil {
145
146
147
148 panic(fmt.Sprintf("failed to register event handlers: %v", err))
149 }
150 },
151 OnStoppedLeading: func() {
152 err := ec.removeHandlers()
153 if err != nil {
154
155
156
157 panic(fmt.Sprintf("failed to de-register event handlers: %v", err))
158 }
159 ec.log.Infof("%s released lease", hostname)
160 },
161 OnNewLeader: func(identity string) {
162 if identity == hostname {
163 ec.log.Infof("%s acquired lease", hostname)
164 }
165 },
166 },
167 }
168
169 return ec, nil
170 }
171
172
173
174 func (ec *EndpointsController) addHandlers() error {
175 var err error
176 ec.Lock()
177 defer ec.Unlock()
178
179
180
181 ec.reconciler.endpointTracker = epsliceutil.NewEndpointSliceTracker()
182
183 ec.svcHandle, err = ec.k8sAPI.Svc().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
184 AddFunc: ec.onServiceUpdate,
185 DeleteFunc: ec.onServiceUpdate,
186 UpdateFunc: func(_, newObj interface{}) {
187 ec.onServiceUpdate(newObj)
188 },
189 })
190
191 if err != nil {
192 return err
193 }
194
195 ec.esHandle, err = ec.k8sAPI.ES().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
196 AddFunc: ec.onEndpointSliceAdd,
197 UpdateFunc: ec.onEndpointSliceUpdate,
198 DeleteFunc: ec.onEndpointSliceDelete,
199 })
200
201 if err != nil {
202 return err
203 }
204
205 ec.ewHandle, err = ec.k8sAPI.ExtWorkload().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
206 AddFunc: ec.onAddExternalWorkload,
207 DeleteFunc: ec.onDeleteExternalWorkload,
208 UpdateFunc: ec.onUpdateExternalWorkload,
209 })
210
211 if err != nil {
212 return err
213 }
214
215 return nil
216 }
217
218
219 func (ec *EndpointsController) removeHandlers() error {
220 var err error
221 ec.Lock()
222 defer ec.Unlock()
223 if ec.svcHandle != nil {
224 if err = ec.k8sAPI.Svc().Informer().RemoveEventHandler(ec.svcHandle); err != nil {
225 return err
226 }
227 }
228
229 if ec.ewHandle != nil {
230 if err = ec.k8sAPI.ExtWorkload().Informer().RemoveEventHandler(ec.ewHandle); err != nil {
231 return err
232 }
233 }
234
235 if ec.esHandle != nil {
236 if err = ec.k8sAPI.ES().Informer().RemoveEventHandler(ec.esHandle); err != nil {
237 return err
238 }
239 }
240
241 return nil
242 }
243
244
245
246
247
248
249
250
251
252 func (ec *EndpointsController) Start() {
253
254
255
256
257
258
259 ctx, cancel := context.WithCancel(context.Background())
260 go func() {
261 for {
262
263 leaderelection.RunOrDie(ctx, ec.lec)
264
265
266 select {
267 case <-ctx.Done():
268 ec.log.Trace("leader election client received shutdown signal")
269 return
270 default:
271 }
272 }
273 }()
274
275
276
277
278 go func() {
279
280 <-ec.stop
281
282 ec.queue.ShutDownWithDrain()
283
284 cancel()
285 ec.log.Infof("received shutdown signal")
286 }()
287
288
289 go ec.processQueue()
290 }
291
292
293
294
295
296
297
298 func (ec *EndpointsController) processQueue() {
299 for {
300 item, quit := ec.queue.Get()
301 if quit {
302 ec.log.Trace("queue received shutdown signal")
303 return
304 }
305
306 key, ok := item.(string)
307 if !ok {
308 ec.log.Errorf("Found queue element of type %T, was expecting a string", item)
309 continue
310 }
311 err := ec.syncService(key)
312 ec.handleError(err, key)
313
314
315
316
317
318 ec.queue.Done(key)
319 }
320 }
321
322
323
324
325
326
327
328 func (ec *EndpointsController) handleError(err error, key string) {
329 if err == nil {
330
331
332
333 ec.queue.Forget(key)
334 return
335 }
336
337 if ec.queue.NumRequeues(key) < maxRetryBudget {
338 ec.queue.AddRateLimited(key)
339 return
340 }
341
342 ec.queue.Forget(key)
343 ec.dropsMetric.Inc()
344 ec.log.Errorf("dropped Service %s out of update queue: %v", key, err)
345 }
346
347
348
349 func (ec *EndpointsController) syncService(update string) error {
350 namespace, name, err := cache.SplitMetaNamespaceKey(update)
351 if err != nil {
352 return err
353 }
354
355 svc, err := ec.k8sAPI.Svc().Lister().Services(namespace).Get(name)
356 if err != nil {
357
358
359
360 if !kerrors.IsNotFound(err) {
361 return err
362 }
363
364 ec.reconciler.endpointTracker.DeleteService(namespace, name)
365
366 return nil
367 }
368
369 if svc.Spec.Type == corev1.ServiceTypeExternalName {
370
371 return nil
372 }
373
374 if svc.Spec.Selector == nil {
375
376
377 return nil
378 }
379
380 ewSelector := labels.Set(svc.Spec.Selector).AsSelectorPreValidated()
381 ews, err := ec.k8sAPI.ExtWorkload().Lister().List(ewSelector)
382 if err != nil {
383
384
385
386 return err
387 }
388
389 esSelector := labels.Set(map[string]string{
390 discoveryv1.LabelServiceName: svc.Name,
391 discoveryv1.LabelManagedBy: managedBy,
392 }).AsSelectorPreValidated()
393 epSlices, err := ec.k8sAPI.ES().Lister().List(esSelector)
394 if err != nil {
395 return err
396 }
397
398 epSlices = dropEndpointSlicesPendingDeletion(epSlices)
399 if ec.reconciler.endpointTracker.StaleSlices(svc, epSlices) {
400 ec.log.Warnf("detected EndpointSlice informer cache is out of date when processing %s", update)
401 return errors.New("EndpointSlice informer cache is out of date")
402 }
403 err = ec.reconciler.reconcile(svc, ews, epSlices)
404 if err != nil {
405 return err
406 }
407
408 return nil
409 }
410
411
412
413
414 func (ec *EndpointsController) onServiceUpdate(obj interface{}) {
415 key, err := cache.MetaNamespaceKeyFunc(obj)
416 if err != nil {
417 ec.log.Infof("failed to get key for object %+v: %v", obj, err)
418 return
419 }
420
421 namespace, _, err := cache.SplitMetaNamespaceKey(key)
422 if err != nil {
423 ec.log.Infof("failed to get namespace from key %s: %v", key, err)
424 }
425
426
427 if namespace == "kube-system" {
428 return
429 }
430
431 ec.queue.Add(key)
432 }
433
434
435
436
437 func (ec *EndpointsController) onEndpointSliceAdd(obj interface{}) {
438 es := obj.(*discoveryv1.EndpointSlice)
439 if es == nil {
440 ec.log.Info("Invalid EndpointSlice provided to onEndpointSliceAdd()")
441 return
442 }
443
444 if managedByController(es) && ec.reconciler.endpointTracker.ShouldSync(es) {
445 ec.queueServiceForEndpointSlice(es)
446 }
447 }
448
449
450
451
452
453 func (ec *EndpointsController) onEndpointSliceUpdate(prevObj, obj interface{}) {
454 prevEndpointSlice := prevObj.(*discoveryv1.EndpointSlice)
455 endpointSlice := obj.(*discoveryv1.EndpointSlice)
456 if endpointSlice == nil || prevEndpointSlice == nil {
457 ec.log.Info("Invalid EndpointSlice provided to onEndpointSliceUpdate()")
458 return
459 }
460
461
462
463
464 svcName := endpointSlice.Labels[discoveryv1.LabelServiceName]
465 prevSvcName := prevEndpointSlice.Labels[discoveryv1.LabelServiceName]
466 if svcName != prevSvcName {
467 ec.log.Infof("label changed label: %s, oldService: %s, newService: %s, endpointsliece: %s", discoveryv1.LabelServiceName, prevSvcName, svcName, endpointSlice.Name)
468 ec.queueServiceForEndpointSlice(endpointSlice)
469 ec.queueServiceForEndpointSlice(prevEndpointSlice)
470 return
471 }
472 if managedByChanged(prevEndpointSlice, endpointSlice) ||
473 (managedByController(endpointSlice) && ec.reconciler.endpointTracker.ShouldSync(endpointSlice)) {
474 ec.queueServiceForEndpointSlice(endpointSlice)
475 }
476 }
477
478
479
480
481 func (ec *EndpointsController) onEndpointSliceDelete(obj interface{}) {
482 endpointSlice := ec.getEndpointSliceFromDeleteAction(obj)
483 if endpointSlice != nil && managedByController(endpointSlice) && ec.reconciler.endpointTracker.Has(endpointSlice) {
484
485
486 if !ec.reconciler.endpointTracker.HandleDeletion(endpointSlice) {
487 ec.queueServiceForEndpointSlice(endpointSlice)
488 }
489 }
490 }
491
492
493
494 func (ec *EndpointsController) queueServiceForEndpointSlice(endpointSlice *discoveryv1.EndpointSlice) {
495 key, err := endpointslicerec.ServiceControllerKey(endpointSlice)
496 if err != nil {
497 ec.log.Errorf("Couldn't get key for EndpointSlice %+v: %v", endpointSlice, err)
498 return
499 }
500
501 ec.queue.Add(key)
502 }
503
504 func (ec *EndpointsController) onAddExternalWorkload(obj interface{}) {
505 ew, ok := obj.(*ewv1beta1.ExternalWorkload)
506 if !ok {
507 ec.log.Errorf("couldn't get ExternalWorkload from object %#v", obj)
508 return
509 }
510
511 services, err := ec.getExternalWorkloadSvcMembership(ew)
512 if err != nil {
513 ec.log.Errorf("failed to get service membership for %s/%s: %v", ew.Namespace, ew.Name, err)
514 return
515 }
516
517 for svc := range services {
518 ec.queue.Add(svc)
519 }
520 }
521
522 func (ec *EndpointsController) onUpdateExternalWorkload(old, cur interface{}) {
523 services := ec.getServicesToUpdateOnExternalWorkloadChange(old, cur)
524
525 for svc := range services {
526 ec.queue.Add(svc)
527 }
528 }
529
530 func (ec *EndpointsController) onDeleteExternalWorkload(obj interface{}) {
531 ew := ec.getExternalWorkloadFromDeleteAction(obj)
532 if ew != nil {
533 ec.onAddExternalWorkload(ew)
534 }
535 }
536
537 func dropEndpointSlicesPendingDeletion(endpointSlices []*discoveryv1.EndpointSlice) []*discoveryv1.EndpointSlice {
538 n := 0
539 for _, endpointSlice := range endpointSlices {
540 if endpointSlice.DeletionTimestamp == nil {
541 endpointSlices[n] = endpointSlice
542 n++
543 }
544 }
545 return endpointSlices[:n]
546 }
547
View as plain text