1 package externalworkload
2
3 import (
4 "context"
5 "fmt"
6 "sort"
7
8 ewv1beta1 "github.com/linkerd/linkerd2/controller/gen/apis/externalworkload/v1beta1"
9 "github.com/linkerd/linkerd2/controller/k8s"
10 logging "github.com/sirupsen/logrus"
11 corev1 "k8s.io/api/core/v1"
12 discoveryv1 "k8s.io/api/discovery/v1"
13 "k8s.io/apimachinery/pkg/api/errors"
14 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
15 "k8s.io/apimachinery/pkg/runtime/schema"
16 utilerrors "k8s.io/apimachinery/pkg/util/errors"
17 "k8s.io/apimachinery/pkg/util/intstr"
18 epsliceutil "k8s.io/endpointslice/util"
19 utilnet "k8s.io/utils/net"
20 )
21
22
23
24
25
26
27 type endpointsReconciler struct {
28 k8sAPI *k8s.API
29 log *logging.Entry
30 controllerName string
31
32
33 endpointTracker *epsliceutil.EndpointSliceTracker
34 maxEndpoints int
35
36 }
37
38
39
40
41
42 type endpointMeta struct {
43 ports []discoveryv1.EndpointPort
44 addressType discoveryv1.AddressType
45 }
46
47
48
49 func newEndpointsReconciler(k8sAPI *k8s.API, controllerName string, maxEndpoints int) *endpointsReconciler {
50 return &endpointsReconciler{
51 k8sAPI,
52 logging.WithFields(logging.Fields{
53 "component": "external-endpoints-reconciler",
54 }),
55 controllerName,
56 epsliceutil.NewEndpointSliceTracker(),
57 maxEndpoints,
58 }
59
60 }
61
62
63
64
65
66
67
68
69
70
71
72
73 func (r *endpointsReconciler) reconcile(svc *corev1.Service, ews []*ewv1beta1.ExternalWorkload, existingSlices []*discoveryv1.EndpointSlice) error {
74 toDelete := []*discoveryv1.EndpointSlice{}
75 slicesByAddrType := make(map[discoveryv1.AddressType][]*discoveryv1.EndpointSlice)
76 errs := []error{}
77
78
79 supportedAddrTypes := getSupportedAddressTypes(svc)
80 for _, slice := range existingSlices {
81
82
83 _, supported := supportedAddrTypes[slice.AddressType]
84 if !supported {
85 toDelete = append(toDelete, slice)
86 continue
87 }
88
89
90
91 if _, ok := slicesByAddrType[slice.AddressType]; !ok {
92 slicesByAddrType[slice.AddressType] = []*discoveryv1.EndpointSlice{}
93 }
94
95 slicesByAddrType[slice.AddressType] = append(slicesByAddrType[slice.AddressType], slice)
96 }
97
98
99
100 for addrType := range supportedAddrTypes {
101 existingSlices := slicesByAddrType[addrType]
102 err := r.reconcileByAddressType(svc, ews, existingSlices, addrType)
103 if err != nil {
104 errs = append(errs, err)
105 }
106 }
107
108
109 for _, slice := range toDelete {
110 err := r.k8sAPI.Client.DiscoveryV1().EndpointSlices(svc.Namespace).Delete(context.TODO(), slice.Name, metav1.DeleteOptions{})
111 if err != nil {
112 errs = append(errs, err)
113 }
114 }
115
116 return utilerrors.NewAggregate(errs)
117 }
118
119
120
121
122 func (r *endpointsReconciler) reconcileByAddressType(svc *corev1.Service, extWorkloads []*ewv1beta1.ExternalWorkload, existingSlices []*discoveryv1.EndpointSlice, addrType discoveryv1.AddressType) error {
123 slicesToCreate := []*discoveryv1.EndpointSlice{}
124 slicesToUpdate := []*discoveryv1.EndpointSlice{}
125 slicesToDelete := []*discoveryv1.EndpointSlice{}
126
127
128
129
130
131 existingSlicesByPorts := map[epsliceutil.PortMapKey][]*discoveryv1.EndpointSlice{}
132 for _, slice := range existingSlices {
133
134
135
136 if ownedBy(slice, svc) {
137 hash := epsliceutil.NewPortMapKey(slice.Ports)
138 existingSlicesByPorts[hash] = append(existingSlicesByPorts[hash], slice)
139 } else {
140 slicesToDelete = append(slicesToDelete, slice)
141 }
142 }
143
144
145
146
147 desiredEndpointsByPortMap := map[epsliceutil.PortMapKey]epsliceutil.EndpointSet{}
148
149
150 desiredMetaByPortMap := map[epsliceutil.PortMapKey]*endpointMeta{}
151
152 for _, extWorkload := range extWorkloads {
153
154
155
156
157 if len(extWorkload.Spec.WorkloadIPs) == 0 {
158 continue
159 }
160
161
162
163
164 ports := r.findEndpointPorts(svc, extWorkload)
165 portHash := epsliceutil.NewPortMapKey(ports)
166 if _, ok := desiredMetaByPortMap[portHash]; !ok {
167 desiredMetaByPortMap[portHash] = &endpointMeta{ports, addrType}
168 }
169
170 if _, ok := desiredEndpointsByPortMap[portHash]; !ok {
171 desiredEndpointsByPortMap[portHash] = epsliceutil.EndpointSet{}
172 }
173
174 ep := externalWorkloadToEndpoint(addrType, extWorkload, svc)
175 if len(ep.Addresses) > 0 {
176 desiredEndpointsByPortMap[portHash].Insert(&ep)
177 }
178 }
179
180 for portKey, desiredEndpoints := range desiredEndpointsByPortMap {
181 create, update, del := r.reconcileEndpointsByPortMap(svc, existingSlicesByPorts[portKey], desiredEndpoints, desiredMetaByPortMap[portKey])
182 slicesToCreate = append(slicesToCreate, create...)
183 slicesToUpdate = append(slicesToUpdate, update...)
184 slicesToDelete = append(slicesToDelete, del...)
185 }
186
187
188
189 for portHash, existingSlices := range existingSlicesByPorts {
190 if _, ok := desiredEndpointsByPortMap[portHash]; !ok {
191 slicesToDelete = append(slicesToDelete, existingSlices...)
192 }
193 }
194
195 return r.finalize(svc, slicesToCreate, slicesToUpdate, slicesToDelete)
196 }
197
198
199
200
201
202
203 func (r *endpointsReconciler) reconcileEndpointsByPortMap(svc *corev1.Service, existingSlices []*discoveryv1.EndpointSlice, desiredEps epsliceutil.EndpointSet, desiredMeta *endpointMeta) ([]*discoveryv1.EndpointSlice, []*discoveryv1.EndpointSlice, []*discoveryv1.EndpointSlice) {
204 slicesByName := map[string]*discoveryv1.EndpointSlice{}
205 sliceNamesUnchanged := map[string]struct{}{}
206 sliceNamesToUpdate := map[string]struct{}{}
207 sliceNamesToDelete := map[string]struct{}{}
208
209
210
211 for _, existingSlice := range existingSlices {
212 slicesByName[existingSlice.Name] = existingSlice
213 keepEndpoints := []discoveryv1.Endpoint{}
214 epUpdated := false
215 for _, endpoint := range existingSlice.Endpoints {
216 endpoint := endpoint
217 found := desiredEps.Get(&endpoint)
218
219
220
221 if found != nil {
222 keepEndpoints = append(keepEndpoints, *found)
223
224
225 if !epsliceutil.EndpointsEqualBeyondHash(found, &endpoint) {
226 epUpdated = true
227 }
228
229
230 desiredEps.Delete(&endpoint)
231 }
232 }
233
234
235 labels, labelsChanged := setEndpointSliceLabels(existingSlice, svc, r.controllerName)
236
237
238
239
240
241
242
243
244
245 if epUpdated || len(existingSlice.Endpoints) != len(keepEndpoints) {
246 if len(keepEndpoints) == 0 {
247
248
249 sliceNamesToDelete[existingSlice.Name] = struct{}{}
250 } else {
251
252 slice := existingSlice.DeepCopy()
253 slice.Labels = labels
254 slice.Endpoints = keepEndpoints
255 sliceNamesToUpdate[slice.Name] = struct{}{}
256 slicesByName[slice.Name] = slice
257 }
258 } else if labelsChanged {
259 slice := existingSlice.DeepCopy()
260 slice.Labels = labels
261 sliceNamesToUpdate[slice.Name] = struct{}{}
262 slicesByName[slice.Name] = slice
263 } else {
264
265
266
267 sliceNamesUnchanged[existingSlice.Name] = struct{}{}
268 }
269 }
270
271
272
273
274
275
276 if desiredEps.Len() > 0 && len(sliceNamesToUpdate) > 0 {
277 slices := []*discoveryv1.EndpointSlice{}
278 for sliceName := range sliceNamesToUpdate {
279 slices = append(slices, slicesByName[sliceName])
280 }
281
282
283 sort.Slice(slices, func(i, j int) bool {
284 return len(slices[i].Endpoints) > len(slices[j].Endpoints)
285 })
286
287
288 for _, slice := range slices {
289 for desiredEps.Len() > 0 && len(slice.Endpoints) < r.maxEndpoints {
290 ep, _ := desiredEps.PopAny()
291 slice.Endpoints = append(slice.Endpoints, *ep)
292 }
293 }
294 }
295
296
297
298 slicesToCreate := []*discoveryv1.EndpointSlice{}
299 for desiredEps.Len() > 0 {
300 var sliceToFill *discoveryv1.EndpointSlice
301
302
303
304 if desiredEps.Len() < r.maxEndpoints && len(sliceNamesUnchanged) > 0 {
305 unchangedSlices := []*discoveryv1.EndpointSlice{}
306 for unchangedSlice := range sliceNamesUnchanged {
307 unchangedSlices = append(unchangedSlices, slicesByName[unchangedSlice])
308 }
309
310 sliceToFill = getSliceToFill(unchangedSlices, desiredEps.Len(), r.maxEndpoints)
311 }
312
313
314
315 if sliceToFill == nil {
316 sliceToFill = newEndpointSlice(svc, desiredMeta, r.controllerName)
317 } else {
318
319 sliceToFill = sliceToFill.DeepCopy()
320 slicesByName[sliceToFill.Name] = sliceToFill
321 }
322
323
324 for desiredEps.Len() > 0 && len(sliceToFill.Endpoints) < r.maxEndpoints {
325 ep, _ := desiredEps.PopAny()
326 sliceToFill.Endpoints = append(sliceToFill.Endpoints, *ep)
327 }
328
329
330
331 if sliceToFill.Name != "" {
332 sliceNamesToUpdate[sliceToFill.Name] = struct{}{}
333 delete(sliceNamesUnchanged, sliceToFill.Name)
334 } else {
335 slicesToCreate = append(slicesToCreate, sliceToFill)
336 }
337 }
338
339 slicesToUpdate := []*discoveryv1.EndpointSlice{}
340 for name := range sliceNamesToUpdate {
341 slicesToUpdate = append(slicesToUpdate, slicesByName[name])
342 }
343
344 slicesToDelete := []*discoveryv1.EndpointSlice{}
345 for name := range sliceNamesToDelete {
346 slicesToDelete = append(slicesToDelete, slicesByName[name])
347 }
348
349 return slicesToCreate, slicesToUpdate, slicesToDelete
350 }
351
352
353
354 func (r *endpointsReconciler) finalize(svc *corev1.Service, slicesToCreate, slicesToUpdate, slicesToDelete []*discoveryv1.EndpointSlice) error {
355
356
357 for i := 0; i < len(slicesToDelete); {
358 if len(slicesToCreate) == 0 {
359 break
360 }
361 sliceToDelete := slicesToDelete[i]
362 slice := slicesToCreate[len(slicesToCreate)-1]
363
364
365
366
367
368
369
370
371 if sliceToDelete.AddressType == slice.AddressType && ownedBy(sliceToDelete, svc) {
372 slice.Name = sliceToDelete.Name
373 slicesToCreate = slicesToCreate[:len(slicesToCreate)-1]
374 slicesToUpdate = append(slicesToUpdate, slice)
375 slicesToDelete = append(slicesToDelete[:i], slicesToDelete[i+1:]...)
376 } else {
377 i++
378 }
379 }
380
381 r.log.Debugf("reconciliation result for %s/%s: %d to add, %d to update, %d to remove", svc.Namespace, svc.Name, len(slicesToCreate), len(slicesToUpdate), len(slicesToDelete))
382
383
384
385
386 if svc.DeletionTimestamp == nil {
387
388 for _, slice := range slicesToCreate {
389 r.log.Tracef("starting create: %s/%s", slice.Namespace, slice.Name)
390 createdSlice, err := r.k8sAPI.Client.DiscoveryV1().EndpointSlices(svc.Namespace).Create(context.TODO(), slice, metav1.CreateOptions{})
391 if err != nil {
392
393
394 if errors.HasStatusCause(err, corev1.NamespaceTerminatingCause) {
395 return nil
396 }
397
398 return err
399 }
400 r.endpointTracker.Update(createdSlice)
401 r.log.Tracef("finished creating: %s/%s", createdSlice.Namespace, createdSlice.Name)
402 }
403 }
404
405 for _, slice := range slicesToUpdate {
406 r.log.Tracef("starting update: %s/%s", slice.Namespace, slice.Name)
407 updatedSlice, err := r.k8sAPI.Client.DiscoveryV1().EndpointSlices(svc.Namespace).Update(context.TODO(), slice, metav1.UpdateOptions{})
408 if err != nil {
409 return err
410 }
411 r.endpointTracker.Update(updatedSlice)
412 r.log.Tracef("finished updating: %s/%s", updatedSlice.Namespace, updatedSlice.Name)
413 }
414
415 for _, slice := range slicesToDelete {
416 r.log.Tracef("starting delete: %s/%s", slice.Namespace, slice.Name)
417 err := r.k8sAPI.Client.DiscoveryV1().EndpointSlices(svc.Namespace).Delete(context.TODO(), slice.Name, metav1.DeleteOptions{})
418 if err != nil {
419 return err
420 }
421 r.endpointTracker.ExpectDeletion(slice)
422 r.log.Tracef("finished deleting: %s/%s", slice.Namespace, slice.Name)
423 }
424
425 return nil
426 }
427
428
429
430
431 func newEndpointSlice(svc *corev1.Service, meta *endpointMeta, controllerName string) *discoveryv1.EndpointSlice {
432
433 ownerRef := metav1.NewControllerRef(svc, schema.GroupVersionKind{Version: "v1", Kind: "Service"})
434 slice := &discoveryv1.EndpointSlice{
435 ObjectMeta: metav1.ObjectMeta{
436 GenerateName: fmt.Sprintf("linkerd-external-%s-", svc.Name),
437 Namespace: svc.Namespace,
438 Labels: map[string]string{},
439 OwnerReferences: []metav1.OwnerReference{*ownerRef},
440 },
441 AddressType: meta.addressType,
442 Endpoints: []discoveryv1.Endpoint{},
443 Ports: meta.ports,
444 }
445 labels, _ := setEndpointSliceLabels(slice, svc, controllerName)
446 slice.Labels = labels
447 return slice
448 }
449
450
451
452
453 func getSliceToFill(slices []*discoveryv1.EndpointSlice, numEndpoints, maxEndpoints int) *discoveryv1.EndpointSlice {
454 closestDiff := maxEndpoints
455 var closestSlice *discoveryv1.EndpointSlice
456 for _, slice := range slices {
457 diff := maxEndpoints - (numEndpoints + len(slice.Endpoints))
458 if diff >= 0 && diff < closestDiff {
459 closestDiff = diff
460 closestSlice = slice
461 if closestDiff == 0 {
462 return closestSlice
463 }
464 }
465 }
466 return closestSlice
467 }
468
469
470
471
472
473
474
475
476
477
478 func setEndpointSliceLabels(es *discoveryv1.EndpointSlice, service *corev1.Service, controllerName string) (map[string]string, bool) {
479 isReserved := func(label string) bool {
480 if label == discoveryv1.LabelServiceName ||
481 label == discoveryv1.LabelManagedBy ||
482 label == corev1.IsHeadlessService {
483 return true
484 }
485 return false
486 }
487
488 updated := false
489 epLabels := make(map[string]string)
490 svcLabels := make(map[string]string)
491
492
493
494 for key, value := range es.Labels {
495 if isReserved(key) {
496 continue
497 }
498
499 epLabels[key] = value
500 }
501
502 for key, value := range service.Labels {
503 if isReserved(key) {
504 continue
505 }
506
507 svcLabels[key] = value
508 }
509
510
511 for svcLabelKey, svcLabelVal := range svcLabels {
512 epLabelVal, found := epLabels[svcLabelKey]
513 if !found {
514 updated = true
515 break
516 }
517
518 if svcLabelVal != epLabelVal {
519 updated = true
520 break
521 }
522 }
523
524
525 if service.Spec.ClusterIP == corev1.ClusterIPNone {
526 svcLabels[corev1.IsHeadlessService] = ""
527 } else {
528 delete(svcLabels, corev1.IsHeadlessService)
529 }
530
531
532 svcLabels[discoveryv1.LabelServiceName] = service.Name
533 svcLabels[discoveryv1.LabelManagedBy] = controllerName
534
535 return svcLabels, updated
536 }
537
538 func externalWorkloadToEndpoint(addrType discoveryv1.AddressType, ew *ewv1beta1.ExternalWorkload, svc *corev1.Service) discoveryv1.Endpoint {
539
540
541
542
543
544
545 serving := IsEwReady(ew)
546
547 addresses := []string{}
548
549
550 for _, addr := range ew.Spec.WorkloadIPs {
551 ip := addr.Ip
552 isIPv6 := utilnet.IsIPv6String(ip)
553 if isIPv6 && addrType == discoveryv1.AddressTypeIPv6 {
554 addresses = append(addresses, ip)
555 } else if !isIPv6 && addrType == discoveryv1.AddressTypeIPv4 {
556 addresses = append(addresses, ip)
557 }
558 }
559
560 terminating := false
561 ep := discoveryv1.Endpoint{
562 Addresses: addresses,
563 Conditions: discoveryv1.EndpointConditions{
564 Ready: &serving,
565 Serving: &serving,
566 Terminating: &terminating,
567 },
568 TargetRef: &corev1.ObjectReference{
569 Kind: "ExternalWorkload",
570 Namespace: ew.Namespace,
571 Name: ew.Name,
572 UID: ew.UID,
573 },
574 }
575
576 zone, ok := ew.Labels[corev1.LabelTopologyZone]
577 if ok {
578 ep.Zone = &zone
579 }
580
581
582
583
584
585
586 if svc.Spec.ClusterIP == corev1.ClusterIPNone && ew.Namespace == svc.Namespace {
587 ep.Hostname = &ew.Name
588 }
589
590 return ep
591 }
592
593 func ownedBy(slice *discoveryv1.EndpointSlice, svc *corev1.Service) bool {
594 for _, o := range slice.OwnerReferences {
595 if o.UID == svc.UID && o.Kind == "Service" && o.APIVersion == "v1" {
596 return true
597 }
598 }
599 return false
600 }
601
602
603
604 func (r *endpointsReconciler) findEndpointPorts(svc *corev1.Service, ew *ewv1beta1.ExternalWorkload) []discoveryv1.EndpointPort {
605 epPorts := []discoveryv1.EndpointPort{}
606
607
608 if len(svc.Spec.Ports) == 0 && svc.Spec.ClusterIP == corev1.ClusterIPNone {
609 return epPorts
610 }
611
612 for _, svcPort := range svc.Spec.Ports {
613 svcPort := svcPort
614 portNum, err := findWorkloadPort(ew, &svcPort)
615 if err != nil {
616 r.log.Errorf("failed to find port for service %s/%s: %v", svc.Namespace, svc.Name, err)
617 continue
618 }
619
620 portName := &svcPort.Name
621 if *portName == "" {
622 portName = nil
623 }
624 portProto := &svcPort.Protocol
625 if *portProto == "" {
626 portProto = nil
627 }
628 epPorts = append(epPorts, discoveryv1.EndpointPort{
629 Name: portName,
630 Port: &portNum,
631 Protocol: portProto,
632 })
633 }
634
635 return epPorts
636 }
637
638
639
640
641
642
643 func findWorkloadPort(ew *ewv1beta1.ExternalWorkload, svcPort *corev1.ServicePort) (int32, error) {
644 targetPort := svcPort.TargetPort
645 switch targetPort.Type {
646 case intstr.String:
647 name := targetPort.StrVal
648 for _, wPort := range ew.Spec.Ports {
649 if wPort.Name == name && wPort.Protocol == svcPort.Protocol {
650 return wPort.Port, nil
651 }
652 }
653 case intstr.Int:
654
655
656
657
658 for _, wPort := range ew.Spec.Ports {
659 port := int32(targetPort.IntValue())
660 if wPort.Port == port && wPort.Protocol == svcPort.Protocol {
661 return port, nil
662 }
663 }
664 }
665 return 0, fmt.Errorf("no suitable port for targetPort %s on workload %s/%s", targetPort.String(), ew.Namespace, ew.Name)
666 }
667
668
669
670 func getSupportedAddressTypes(svc *corev1.Service) map[discoveryv1.AddressType]struct{} {
671 afs := map[discoveryv1.AddressType]struct{}{}
672
673
674
675
676
677 for _, af := range svc.Spec.IPFamilies {
678 if af == corev1.IPv4Protocol {
679 afs[discoveryv1.AddressTypeIPv4] = struct{}{}
680 } else if af == corev1.IPv6Protocol {
681 afs[discoveryv1.AddressTypeIPv6] = struct{}{}
682 }
683 }
684
685 if len(afs) > 0 {
686
687
688 return afs
689 }
690
691
692
693
694
695
696
697
698
699
700 afs[discoveryv1.AddressTypeIPv4] = struct{}{}
701 afs[discoveryv1.AddressTypeIPv6] = struct{}{}
702 return afs
703 }
704
View as plain text