1 package watcher
2
3 import (
4 "context"
5 "fmt"
6 "net"
7 "sort"
8 "strconv"
9 "strings"
10 "sync"
11 "time"
12
13 ewv1beta1 "github.com/linkerd/linkerd2/controller/gen/apis/externalworkload/v1beta1"
14 "github.com/linkerd/linkerd2/controller/gen/apis/server/v1beta2"
15 "github.com/linkerd/linkerd2/controller/k8s"
16 consts "github.com/linkerd/linkerd2/pkg/k8s"
17 "github.com/prometheus/client_golang/prometheus"
18 logging "github.com/sirupsen/logrus"
19 corev1 "k8s.io/api/core/v1"
20 discovery "k8s.io/api/discovery/v1"
21 apierrors "k8s.io/apimachinery/pkg/api/errors"
22 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
23 "k8s.io/apimachinery/pkg/labels"
24 "k8s.io/apimachinery/pkg/util/intstr"
25 "k8s.io/client-go/tools/cache"
26 )
27
28 const (
29
30 service = "service"
31 namespace = "namespace"
32 targetCluster = "target_cluster"
33 targetService = "target_service"
34 targetServiceNamespace = "target_service_namespace"
35
36 opaqueProtocol = "opaque"
37 )
38
39 const endpointTargetRefPod = "Pod"
40 const endpointTargetRefExternalWorkload = "ExternalWorkload"
41
42 type (
43
44
45
46
47
48 Address struct {
49 IP string
50 Port Port
51 Pod *corev1.Pod
52 ExternalWorkload *ewv1beta1.ExternalWorkload
53 OwnerName string
54 OwnerKind string
55 Identity string
56 AuthorityOverride string
57 Zone *string
58 ForZones []discovery.ForZone
59 OpaqueProtocol bool
60 }
61
62
63
64
65
66
67
68
69 AddressSet struct {
70 Addresses map[ID]Address
71 Labels map[string]string
72 LocalTrafficPolicy bool
73 }
74
75 portAndHostname struct {
76 port Port
77 hostname string
78 }
79
80
81
82
83
84 EndpointsWatcher struct {
85 publishers map[ServiceID]*servicePublisher
86 k8sAPI *k8s.API
87 metadataAPI *k8s.MetadataAPI
88
89 cluster string
90 log *logging.Entry
91 enableEndpointSlices bool
92 sync.RWMutex
93
94 informerHandlers
95 }
96
97
98
99
100
101 informerHandlers struct {
102 epHandle cache.ResourceEventHandlerRegistration
103 svcHandle cache.ResourceEventHandlerRegistration
104 srvHandle cache.ResourceEventHandlerRegistration
105 }
106
107
108
109
110
111
112
113
114
115
116 servicePublisher struct {
117 id ServiceID
118 log *logging.Entry
119 k8sAPI *k8s.API
120 metadataAPI *k8s.MetadataAPI
121 enableEndpointSlices bool
122 localTrafficPolicy bool
123 cluster string
124 ports map[portAndHostname]*portPublisher
125
126
127 sync.Mutex
128 }
129
130
131
132
133
134
135 portPublisher struct {
136 id ServiceID
137 targetPort namedPort
138 srcPort Port
139 hostname string
140 log *logging.Entry
141 k8sAPI *k8s.API
142 metadataAPI *k8s.MetadataAPI
143 enableEndpointSlices bool
144 exists bool
145 addresses AddressSet
146 listeners []EndpointUpdateListener
147 metrics endpointsMetrics
148 localTrafficPolicy bool
149 }
150
151
152 EndpointUpdateListener interface {
153 Add(set AddressSet)
154 Remove(set AddressSet)
155 NoEndpoints(exists bool)
156 }
157 )
158
159 var endpointsVecs = newEndpointsMetricsVecs()
160
161 var undefinedEndpointPort = Port(0)
162
163
164
165
166 func (addr AddressSet) shallowCopy() AddressSet {
167 addresses := make(map[ID]Address)
168 for k, v := range addr.Addresses {
169 addresses[k] = v
170 }
171
172 labels := make(map[string]string)
173 for k, v := range addr.Labels {
174 labels[k] = v
175 }
176
177 return AddressSet{
178 Addresses: addresses,
179 Labels: labels,
180 LocalTrafficPolicy: addr.LocalTrafficPolicy,
181 }
182 }
183
184
185
186
187 func NewEndpointsWatcher(k8sAPI *k8s.API, metadataAPI *k8s.MetadataAPI, log *logging.Entry, enableEndpointSlices bool, cluster string) (*EndpointsWatcher, error) {
188 ew := &EndpointsWatcher{
189 publishers: make(map[ServiceID]*servicePublisher),
190 k8sAPI: k8sAPI,
191 metadataAPI: metadataAPI,
192 enableEndpointSlices: enableEndpointSlices,
193 cluster: cluster,
194 log: log.WithFields(logging.Fields{
195 "component": "endpoints-watcher",
196 }),
197 }
198
199 var err error
200 ew.svcHandle, err = k8sAPI.Svc().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
201 AddFunc: ew.addService,
202 DeleteFunc: ew.deleteService,
203 UpdateFunc: ew.updateService,
204 })
205 if err != nil {
206 return nil, err
207 }
208
209 ew.srvHandle, err = k8sAPI.Srv().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
210 AddFunc: ew.addServer,
211 DeleteFunc: ew.deleteServer,
212 UpdateFunc: ew.updateServer,
213 })
214 if err != nil {
215 return nil, err
216 }
217
218 if ew.enableEndpointSlices {
219 ew.log.Debugf("Watching EndpointSlice resources")
220 ew.epHandle, err = k8sAPI.ES().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
221 AddFunc: ew.addEndpointSlice,
222 DeleteFunc: ew.deleteEndpointSlice,
223 UpdateFunc: ew.updateEndpointSlice,
224 })
225 if err != nil {
226 return nil, err
227 }
228
229 } else {
230 ew.log.Debugf("Watching Endpoints resources")
231 ew.epHandle, err = k8sAPI.Endpoint().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
232 AddFunc: ew.addEndpoints,
233 DeleteFunc: ew.deleteEndpoints,
234 UpdateFunc: ew.updateEndpoints,
235 })
236 if err != nil {
237 return nil, err
238 }
239 }
240 return ew, nil
241 }
242
243
244
245
246
247
248
249
250 func (ew *EndpointsWatcher) Subscribe(id ServiceID, port Port, hostname string, listener EndpointUpdateListener) error {
251 svc, _ := ew.k8sAPI.Svc().Lister().Services(id.Namespace).Get(id.Name)
252 if svc != nil && svc.Spec.Type == corev1.ServiceTypeExternalName {
253 return invalidService(id.String())
254 }
255
256 if hostname == "" {
257 ew.log.Debugf("Establishing watch on endpoint [%s:%d]", id, port)
258 } else {
259 ew.log.Debugf("Establishing watch on endpoint [%s.%s:%d]", hostname, id, port)
260 }
261
262 sp := ew.getOrNewServicePublisher(id)
263
264 sp.subscribe(port, hostname, listener)
265 return nil
266 }
267
268
269 func (ew *EndpointsWatcher) Unsubscribe(id ServiceID, port Port, hostname string, listener EndpointUpdateListener) {
270 if hostname == "" {
271 ew.log.Debugf("Stopping watch on endpoint [%s:%d]", id, port)
272 } else {
273 ew.log.Debugf("Stopping watch on endpoint [%s.%s:%d]", hostname, id, port)
274 }
275
276 sp, ok := ew.getServicePublisher(id)
277 if !ok {
278 ew.log.Errorf("Cannot unsubscribe from unknown service [%s:%d]", id, port)
279 return
280 }
281 sp.unsubscribe(port, hostname, listener)
282 }
283
284
285
286 func (ew *EndpointsWatcher) removeHandlers() {
287 ew.Lock()
288 defer ew.Unlock()
289 if ew.svcHandle != nil {
290 if err := ew.k8sAPI.Svc().Informer().RemoveEventHandler(ew.svcHandle); err != nil {
291 ew.log.Errorf("Failed to remove Service informer event handlers: %s", err)
292 }
293 }
294
295 if ew.srvHandle != nil {
296 if err := ew.k8sAPI.Srv().Informer().RemoveEventHandler(ew.srvHandle); err != nil {
297 ew.log.Errorf("Failed to remove Server informer event handlers: %s", err)
298 }
299 }
300
301 if ew.epHandle != nil {
302 if ew.enableEndpointSlices {
303 if err := ew.k8sAPI.ES().Informer().RemoveEventHandler(ew.epHandle); err != nil {
304
305 ew.log.Errorf("Failed to remove EndpointSlice informer event handlers: %s", err)
306 }
307 } else {
308 if err := ew.k8sAPI.Endpoint().Informer().RemoveEventHandler(ew.epHandle); err != nil {
309 ew.log.Errorf("Failed to remove Endpoints informer event handlers: %s", err)
310 }
311 }
312 }
313 }
314
315 func (ew *EndpointsWatcher) addService(obj interface{}) {
316 service := obj.(*corev1.Service)
317 id := ServiceID{
318 Namespace: service.Namespace,
319 Name: service.Name,
320 }
321
322 sp := ew.getOrNewServicePublisher(id)
323
324 sp.updateService(service)
325 }
326
327 func (ew *EndpointsWatcher) updateService(oldObj interface{}, newObj interface{}) {
328 oldService := oldObj.(*corev1.Service)
329 newService := newObj.(*corev1.Service)
330
331 oldUpdated := latestUpdated(oldService.ManagedFields)
332 updated := latestUpdated(newService.ManagedFields)
333 if !updated.IsZero() && updated != oldUpdated {
334 delta := time.Since(updated)
335 serviceInformerLag.Observe(delta.Seconds())
336 }
337
338 ew.addService(newObj)
339 }
340
341 func (ew *EndpointsWatcher) deleteService(obj interface{}) {
342 service, ok := obj.(*corev1.Service)
343 if !ok {
344 tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
345 if !ok {
346 ew.log.Errorf("couldn't get object from DeletedFinalStateUnknown %#v", obj)
347 return
348 }
349 service, ok = tombstone.Obj.(*corev1.Service)
350 if !ok {
351 ew.log.Errorf("DeletedFinalStateUnknown contained object that is not a Service %#v", obj)
352 return
353 }
354 }
355
356 id := ServiceID{
357 Namespace: service.Namespace,
358 Name: service.Name,
359 }
360
361 sp, ok := ew.getServicePublisher(id)
362 if ok {
363 sp.deleteEndpoints()
364 }
365 }
366
367 func (ew *EndpointsWatcher) addEndpoints(obj interface{}) {
368 endpoints, ok := obj.(*corev1.Endpoints)
369 if !ok {
370 ew.log.Errorf("error processing endpoints resource, got %#v expected *corev1.Endpoints", obj)
371 return
372 }
373
374 id := ServiceID{Namespace: endpoints.Namespace, Name: endpoints.Name}
375 sp := ew.getOrNewServicePublisher(id)
376 sp.updateEndpoints(endpoints)
377 }
378
379 func (ew *EndpointsWatcher) updateEndpoints(oldObj interface{}, newObj interface{}) {
380 oldEndpoints, ok := oldObj.(*corev1.Endpoints)
381 if !ok {
382 ew.log.Errorf("error processing endpoints resource, got %#v expected *corev1.Endpoints", oldObj)
383 return
384 }
385 newEndpoints, ok := newObj.(*corev1.Endpoints)
386 if !ok {
387 ew.log.Errorf("error processing endpoints resource, got %#v expected *corev1.Endpoints", newObj)
388 return
389 }
390
391 oldUpdated := latestUpdated(oldEndpoints.ManagedFields)
392 updated := latestUpdated(newEndpoints.ManagedFields)
393 if !updated.IsZero() && updated != oldUpdated {
394 delta := time.Since(updated)
395 endpointsInformerLag.Observe(delta.Seconds())
396 }
397
398 id := ServiceID{Namespace: newEndpoints.Namespace, Name: newEndpoints.Name}
399 sp := ew.getOrNewServicePublisher(id)
400 sp.updateEndpoints(newEndpoints)
401 }
402
403 func (ew *EndpointsWatcher) deleteEndpoints(obj interface{}) {
404 endpoints, ok := obj.(*corev1.Endpoints)
405 if !ok {
406 tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
407 if !ok {
408 ew.log.Errorf("couldn't get object from DeletedFinalStateUnknown %#v", obj)
409 return
410 }
411 endpoints, ok = tombstone.Obj.(*corev1.Endpoints)
412 if !ok {
413 ew.log.Errorf("DeletedFinalStateUnknown contained object that is not an Endpoints %#v", obj)
414 return
415 }
416 }
417
418 id := ServiceID{
419 Namespace: endpoints.Namespace,
420 Name: endpoints.Name,
421 }
422
423 sp, ok := ew.getServicePublisher(id)
424 if ok {
425 sp.deleteEndpoints()
426 }
427 }
428
429 func (ew *EndpointsWatcher) addEndpointSlice(obj interface{}) {
430 newSlice, ok := obj.(*discovery.EndpointSlice)
431 if !ok {
432 ew.log.Errorf("error processing EndpointSlice resource, got %#v expected *discovery.EndpointSlice", obj)
433 return
434 }
435
436 id, err := getEndpointSliceServiceID(newSlice)
437 if err != nil {
438 ew.log.Errorf("Could not fetch resource service name:%v", err)
439 return
440 }
441
442 sp := ew.getOrNewServicePublisher(id)
443 sp.addEndpointSlice(newSlice)
444 }
445
446 func (ew *EndpointsWatcher) updateEndpointSlice(oldObj interface{}, newObj interface{}) {
447 oldSlice, ok := oldObj.(*discovery.EndpointSlice)
448 if !ok {
449 ew.log.Errorf("error processing EndpointSlice resource, got %#v expected *discovery.EndpointSlice", oldObj)
450 return
451 }
452 newSlice, ok := newObj.(*discovery.EndpointSlice)
453 if !ok {
454 ew.log.Errorf("error processing EndpointSlice resource, got %#v expected *discovery.EndpointSlice", newObj)
455 return
456 }
457 oldUpdated := latestUpdated(oldSlice.ManagedFields)
458 updated := latestUpdated(newSlice.ManagedFields)
459 if !updated.IsZero() && updated != oldUpdated {
460 delta := time.Since(updated)
461 endpointsliceInformerLag.Observe(delta.Seconds())
462 }
463
464 id, err := getEndpointSliceServiceID(newSlice)
465 if err != nil {
466 ew.log.Errorf("Could not fetch resource service name:%v", err)
467 return
468 }
469
470 sp, ok := ew.getServicePublisher(id)
471 if ok {
472 sp.updateEndpointSlice(oldSlice, newSlice)
473 }
474 }
475
476 func (ew *EndpointsWatcher) deleteEndpointSlice(obj interface{}) {
477 es, ok := obj.(*discovery.EndpointSlice)
478 if !ok {
479 tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
480 if !ok {
481 ew.log.Errorf("couldn't get object from DeletedFinalStateUnknown %#v", obj)
482 }
483 es, ok = tombstone.Obj.(*discovery.EndpointSlice)
484 if !ok {
485 ew.log.Errorf("DeletedFinalStateUnknown contained object that is not an EndpointSlice %#v", obj)
486 return
487 }
488 }
489
490 id, err := getEndpointSliceServiceID(es)
491 if err != nil {
492 ew.log.Errorf("Could not fetch resource service name:%v", err)
493 }
494
495 sp, ok := ew.getServicePublisher(id)
496 if ok {
497 sp.deleteEndpointSlice(es)
498 }
499 }
500
501
502
503 func (ew *EndpointsWatcher) getOrNewServicePublisher(id ServiceID) *servicePublisher {
504 ew.Lock()
505 defer ew.Unlock()
506
507
508
509 sp, ok := ew.publishers[id]
510 if !ok {
511 sp = &servicePublisher{
512 id: id,
513 log: ew.log.WithFields(logging.Fields{
514 "component": "service-publisher",
515 "ns": id.Namespace,
516 "svc": id.Name,
517 }),
518 k8sAPI: ew.k8sAPI,
519 metadataAPI: ew.metadataAPI,
520 cluster: ew.cluster,
521 ports: make(map[portAndHostname]*portPublisher),
522 enableEndpointSlices: ew.enableEndpointSlices,
523 }
524 ew.publishers[id] = sp
525 }
526 return sp
527 }
528
529 func (ew *EndpointsWatcher) getServicePublisher(id ServiceID) (sp *servicePublisher, ok bool) {
530 ew.RLock()
531 defer ew.RUnlock()
532 sp, ok = ew.publishers[id]
533 return
534 }
535
536 func (ew *EndpointsWatcher) addServer(obj interface{}) {
537 ew.Lock()
538 defer ew.Unlock()
539 server := obj.(*v1beta2.Server)
540 for _, sp := range ew.publishers {
541 sp.updateServer(nil, server)
542 }
543 }
544
545 func (ew *EndpointsWatcher) updateServer(oldObj interface{}, newObj interface{}) {
546 ew.Lock()
547 defer ew.Unlock()
548
549 oldServer := oldObj.(*v1beta2.Server)
550 newServer := newObj.(*v1beta2.Server)
551 if oldServer != nil && newServer != nil {
552 oldUpdated := latestUpdated(oldServer.ManagedFields)
553 updated := latestUpdated(newServer.ManagedFields)
554 if !updated.IsZero() && updated != oldUpdated {
555 delta := time.Since(updated)
556 serverInformerLag.Observe(delta.Seconds())
557 }
558 }
559
560 namespace := ""
561 if oldServer != nil {
562 namespace = oldServer.GetNamespace()
563 }
564 if newServer != nil {
565 namespace = newServer.GetNamespace()
566 }
567
568 for id, sp := range ew.publishers {
569
570 if id.Namespace == namespace {
571 sp.updateServer(oldServer, newServer)
572 }
573 }
574 }
575
576 func (ew *EndpointsWatcher) deleteServer(obj interface{}) {
577 ew.Lock()
578 defer ew.Unlock()
579 server := obj.(*v1beta2.Server)
580 for _, sp := range ew.publishers {
581 sp.updateServer(server, nil)
582 }
583 }
584
585
586
587
588
589 func (sp *servicePublisher) updateEndpoints(newEndpoints *corev1.Endpoints) {
590 sp.Lock()
591 defer sp.Unlock()
592 sp.log.Debugf("Updating endpoints for %s", sp.id)
593 for _, port := range sp.ports {
594 port.updateEndpoints(newEndpoints)
595 }
596 }
597
598 func (sp *servicePublisher) deleteEndpoints() {
599 sp.Lock()
600 defer sp.Unlock()
601 sp.log.Debugf("Deleting endpoints for %s", sp.id)
602 for _, port := range sp.ports {
603 port.noEndpoints(false)
604 }
605 }
606
607 func (sp *servicePublisher) addEndpointSlice(newSlice *discovery.EndpointSlice) {
608 sp.Lock()
609 defer sp.Unlock()
610
611 sp.log.Debugf("Adding ES %s/%s", newSlice.Namespace, newSlice.Name)
612 for _, port := range sp.ports {
613 port.addEndpointSlice(newSlice)
614 }
615 }
616
617 func (sp *servicePublisher) updateEndpointSlice(oldSlice *discovery.EndpointSlice, newSlice *discovery.EndpointSlice) {
618 sp.Lock()
619 defer sp.Unlock()
620
621 sp.log.Debugf("Updating ES %s/%s", oldSlice.Namespace, oldSlice.Name)
622 for _, port := range sp.ports {
623 port.updateEndpointSlice(oldSlice, newSlice)
624 }
625 }
626
627 func (sp *servicePublisher) deleteEndpointSlice(es *discovery.EndpointSlice) {
628 sp.Lock()
629 defer sp.Unlock()
630
631 sp.log.Debugf("Deleting ES %s/%s", es.Namespace, es.Name)
632 for _, port := range sp.ports {
633 port.deleteEndpointSlice(es)
634 }
635 }
636
637 func (sp *servicePublisher) updateService(newService *corev1.Service) {
638 sp.Lock()
639 defer sp.Unlock()
640 sp.log.Debugf("Updating service for %s", sp.id)
641
642
643 if newService.Spec.InternalTrafficPolicy != nil {
644 sp.localTrafficPolicy = *newService.Spec.InternalTrafficPolicy == corev1.ServiceInternalTrafficPolicyLocal
645 } else {
646 sp.localTrafficPolicy = false
647 }
648
649 for key, port := range sp.ports {
650 newTargetPort := getTargetPort(newService, key.port)
651 if newTargetPort != port.targetPort {
652 port.updatePort(newTargetPort)
653 }
654
655 if port.localTrafficPolicy != sp.localTrafficPolicy {
656 port.updateLocalTrafficPolicy(sp.localTrafficPolicy)
657 }
658 }
659
660 }
661
662 func (sp *servicePublisher) subscribe(srcPort Port, hostname string, listener EndpointUpdateListener) {
663 sp.Lock()
664 defer sp.Unlock()
665
666 key := portAndHostname{
667 port: srcPort,
668 hostname: hostname,
669 }
670 port, ok := sp.ports[key]
671 if !ok {
672 port = sp.newPortPublisher(srcPort, hostname)
673 sp.ports[key] = port
674 }
675 port.subscribe(listener)
676 }
677
678 func (sp *servicePublisher) unsubscribe(srcPort Port, hostname string, listener EndpointUpdateListener) {
679 sp.Lock()
680 defer sp.Unlock()
681
682 key := portAndHostname{
683 port: srcPort,
684 hostname: hostname,
685 }
686 port, ok := sp.ports[key]
687 if ok {
688 port.unsubscribe(listener)
689 if len(port.listeners) == 0 {
690 endpointsVecs.unregister(sp.metricsLabels(srcPort, hostname))
691 delete(sp.ports, key)
692 }
693 }
694 }
695
696 func (sp *servicePublisher) newPortPublisher(srcPort Port, hostname string) *portPublisher {
697 targetPort := intstr.FromInt(int(srcPort))
698 svc, err := sp.k8sAPI.Svc().Lister().Services(sp.id.Namespace).Get(sp.id.Name)
699 if err != nil && !apierrors.IsNotFound(err) {
700 sp.log.Errorf("error getting service: %s", err)
701 }
702 exists := false
703 if err == nil {
704 targetPort = getTargetPort(svc, srcPort)
705 exists = true
706 }
707
708 log := sp.log.WithField("port", srcPort)
709
710 port := &portPublisher{
711 listeners: []EndpointUpdateListener{},
712 targetPort: targetPort,
713 srcPort: srcPort,
714 hostname: hostname,
715 exists: exists,
716 k8sAPI: sp.k8sAPI,
717 metadataAPI: sp.metadataAPI,
718 log: log,
719 metrics: endpointsVecs.newEndpointsMetrics(sp.metricsLabels(srcPort, hostname)),
720 enableEndpointSlices: sp.enableEndpointSlices,
721 localTrafficPolicy: sp.localTrafficPolicy,
722 }
723
724 if port.enableEndpointSlices {
725 matchLabels := map[string]string{discovery.LabelServiceName: sp.id.Name}
726 selector := labels.Set(matchLabels).AsSelector()
727
728 sliceList, err := sp.k8sAPI.ES().Lister().EndpointSlices(sp.id.Namespace).List(selector)
729 if err != nil && !apierrors.IsNotFound(err) {
730 sp.log.Errorf("error getting endpointSlice list: %s", err)
731 }
732 if err == nil {
733 for _, slice := range sliceList {
734 port.addEndpointSlice(slice)
735 }
736 }
737 } else {
738 endpoints, err := sp.k8sAPI.Endpoint().Lister().Endpoints(sp.id.Namespace).Get(sp.id.Name)
739 if err != nil && !apierrors.IsNotFound(err) {
740 sp.log.Errorf("error getting endpoints: %s", err)
741 }
742 if err == nil {
743 port.updateEndpoints(endpoints)
744 }
745 }
746
747 return port
748 }
749
750 func (sp *servicePublisher) metricsLabels(port Port, hostname string) prometheus.Labels {
751 return endpointsLabels(sp.cluster, sp.id.Namespace, sp.id.Name, strconv.Itoa(int(port)), hostname)
752 }
753
754 func (sp *servicePublisher) updateServer(oldServer, newServer *v1beta2.Server) {
755 sp.Lock()
756 defer sp.Unlock()
757
758 for _, pp := range sp.ports {
759 pp.updateServer(oldServer, newServer)
760 }
761 }
762
763
764
765
766
767
768
769
770
771 func (pp *portPublisher) updateEndpoints(endpoints *corev1.Endpoints) {
772 newAddressSet := pp.endpointsToAddresses(endpoints)
773 if len(newAddressSet.Addresses) == 0 {
774 for _, listener := range pp.listeners {
775 listener.NoEndpoints(true)
776 }
777 } else {
778 add, remove := diffAddresses(pp.addresses, newAddressSet)
779 for _, listener := range pp.listeners {
780 if len(remove.Addresses) > 0 {
781 listener.Remove(remove)
782 }
783 if len(add.Addresses) > 0 {
784 listener.Add(add)
785 }
786 }
787 }
788 pp.addresses = newAddressSet
789 pp.exists = true
790 pp.metrics.incUpdates()
791 pp.metrics.setPods(len(pp.addresses.Addresses))
792 pp.metrics.setExists(true)
793 }
794
795 func (pp *portPublisher) addEndpointSlice(slice *discovery.EndpointSlice) {
796 newAddressSet := pp.endpointSliceToAddresses(slice)
797 for id, addr := range pp.addresses.Addresses {
798 if _, ok := newAddressSet.Addresses[id]; !ok {
799 newAddressSet.Addresses[id] = addr
800 }
801 }
802
803 add, _ := diffAddresses(pp.addresses, newAddressSet)
804 if len(add.Addresses) > 0 {
805 for _, listener := range pp.listeners {
806 listener.Add(add)
807 }
808 }
809
810
811
812
813
814
815 pp.addresses = newAddressSet
816 pp.exists = true
817 pp.metrics.incUpdates()
818 pp.metrics.setPods(len(pp.addresses.Addresses))
819 pp.metrics.setExists(true)
820 }
821
822 func (pp *portPublisher) updateEndpointSlice(oldSlice *discovery.EndpointSlice, newSlice *discovery.EndpointSlice) {
823 updatedAddressSet := AddressSet{
824 Addresses: make(map[ID]Address),
825 Labels: pp.addresses.Labels,
826 LocalTrafficPolicy: pp.localTrafficPolicy,
827 }
828
829 for id, address := range pp.addresses.Addresses {
830 updatedAddressSet.Addresses[id] = address
831 }
832
833 for _, id := range pp.endpointSliceToIDs(oldSlice) {
834 delete(updatedAddressSet.Addresses, id)
835 }
836
837 newAddressSet := pp.endpointSliceToAddresses(newSlice)
838 for id, address := range newAddressSet.Addresses {
839 updatedAddressSet.Addresses[id] = address
840 }
841
842 add, remove := diffAddresses(pp.addresses, updatedAddressSet)
843 for _, listener := range pp.listeners {
844 if len(remove.Addresses) > 0 {
845 listener.Remove(remove)
846 }
847 if len(add.Addresses) > 0 {
848 listener.Add(add)
849 }
850 }
851
852 pp.addresses = updatedAddressSet
853 pp.exists = true
854 pp.metrics.incUpdates()
855 pp.metrics.setPods(len(pp.addresses.Addresses))
856 pp.metrics.setExists(true)
857 }
858
859 func metricLabels(resource interface{}) map[string]string {
860 var serviceName, ns string
861 var resLabels, resAnnotations map[string]string
862 switch res := resource.(type) {
863 case *corev1.Endpoints:
864 {
865 serviceName, ns = res.Name, res.Namespace
866 resLabels, resAnnotations = res.Labels, res.Annotations
867 }
868 case *discovery.EndpointSlice:
869 {
870 serviceName, ns = res.Labels[discovery.LabelServiceName], res.Namespace
871 resLabels, resAnnotations = res.Labels, res.Annotations
872 }
873 }
874
875 labels := map[string]string{service: serviceName, namespace: ns}
876
877 remoteClusterName, hasRemoteClusterName := resLabels[consts.RemoteClusterNameLabel]
878 serviceFqn, hasServiceFqn := resAnnotations[consts.RemoteServiceFqName]
879
880 if hasRemoteClusterName {
881
882
883 labels[targetCluster] = remoteClusterName
884 if hasServiceFqn {
885 fqParts := strings.Split(serviceFqn, ".")
886 if len(fqParts) >= 2 {
887 labels[targetService] = fqParts[0]
888 labels[targetServiceNamespace] = fqParts[1]
889 }
890 }
891 }
892 return labels
893 }
894
895 func (pp *portPublisher) endpointSliceToAddresses(es *discovery.EndpointSlice) AddressSet {
896 resolvedPort := pp.resolveESTargetPort(es.Ports)
897 if resolvedPort == undefinedEndpointPort {
898 return AddressSet{
899 Labels: metricLabels(es),
900 Addresses: make(map[ID]Address),
901 LocalTrafficPolicy: pp.localTrafficPolicy,
902 }
903 }
904
905 serviceID, err := getEndpointSliceServiceID(es)
906 if err != nil {
907 pp.log.Errorf("Could not fetch resource service name:%v", err)
908 }
909
910 addresses := make(map[ID]Address)
911 for _, endpoint := range es.Endpoints {
912 if endpoint.Hostname != nil {
913 if pp.hostname != "" && pp.hostname != *endpoint.Hostname {
914 continue
915 }
916 }
917 if endpoint.Conditions.Ready != nil && !*endpoint.Conditions.Ready {
918 continue
919 }
920
921 if endpoint.TargetRef == nil {
922 for _, IPAddr := range endpoint.Addresses {
923 var authorityOverride string
924 if fqName, ok := es.Annotations[consts.RemoteServiceFqName]; ok {
925 authorityOverride = net.JoinHostPort(fqName, fmt.Sprintf("%d", pp.srcPort))
926 }
927
928 identity := es.Annotations[consts.RemoteGatewayIdentity]
929 address, id := pp.newServiceRefAddress(resolvedPort, IPAddr, serviceID.Name, es.Namespace)
930 address.Identity, address.AuthorityOverride = identity, authorityOverride
931
932 if endpoint.Hints != nil {
933 zones := make([]discovery.ForZone, len(endpoint.Hints.ForZones))
934 copy(zones, endpoint.Hints.ForZones)
935 address.ForZones = zones
936 }
937 addresses[id] = address
938 }
939 continue
940 }
941
942 if endpoint.TargetRef.Kind == endpointTargetRefPod {
943 for _, IPAddr := range endpoint.Addresses {
944 address, id, err := pp.newPodRefAddress(
945 resolvedPort,
946 es.AddressType,
947 IPAddr,
948 endpoint.TargetRef.Name,
949 endpoint.TargetRef.Namespace,
950 )
951 if err != nil {
952 pp.log.Errorf("Unable to create new address:%v", err)
953 continue
954 }
955 err = SetToServerProtocol(pp.k8sAPI, &address)
956 if err != nil {
957 pp.log.Errorf("failed to set address OpaqueProtocol: %s", err)
958 continue
959 }
960
961 address.Zone = endpoint.Zone
962 if endpoint.Hints != nil {
963 zones := make([]discovery.ForZone, len(endpoint.Hints.ForZones))
964 copy(zones, endpoint.Hints.ForZones)
965 address.ForZones = zones
966 }
967 addresses[id] = address
968 }
969 }
970
971 if endpoint.TargetRef.Kind == endpointTargetRefExternalWorkload {
972 for _, IPAddr := range endpoint.Addresses {
973 address, id, err := pp.newExtRefAddress(resolvedPort, IPAddr, endpoint.TargetRef.Name, es.Namespace)
974 if err != nil {
975 pp.log.Errorf("Unable to create new address: %v", err)
976 continue
977 }
978
979 err = SetToServerProtocolExternalWorkload(pp.k8sAPI, &address)
980 if err != nil {
981 pp.log.Errorf("failed to set address OpaqueProtocol: %s", err)
982 continue
983 }
984
985 address.Zone = endpoint.Zone
986 if endpoint.Hints != nil {
987 zones := make([]discovery.ForZone, len(endpoint.Hints.ForZones))
988 copy(zones, endpoint.Hints.ForZones)
989 address.ForZones = zones
990 }
991
992 addresses[id] = address
993 }
994
995 }
996
997 }
998 return AddressSet{
999 Addresses: addresses,
1000 Labels: metricLabels(es),
1001 LocalTrafficPolicy: pp.localTrafficPolicy,
1002 }
1003 }
1004
1005
1006
1007 func (pp *portPublisher) endpointSliceToIDs(es *discovery.EndpointSlice) []ID {
1008 resolvedPort := pp.resolveESTargetPort(es.Ports)
1009 if resolvedPort == undefinedEndpointPort {
1010 return []ID{}
1011 }
1012
1013 serviceID, err := getEndpointSliceServiceID(es)
1014 if err != nil {
1015 pp.log.Errorf("Could not fetch resource service name:%v", err)
1016 }
1017
1018 ids := []ID{}
1019 for _, endpoint := range es.Endpoints {
1020 if endpoint.Hostname != nil {
1021 if pp.hostname != "" && pp.hostname != *endpoint.Hostname {
1022 continue
1023 }
1024 }
1025 if endpoint.Conditions.Ready != nil && !*endpoint.Conditions.Ready {
1026 continue
1027 }
1028
1029 if endpoint.TargetRef == nil {
1030 for _, IPAddr := range endpoint.Addresses {
1031 ids = append(ids, ServiceID{
1032 Name: strings.Join([]string{
1033 serviceID.Name,
1034 IPAddr,
1035 fmt.Sprint(resolvedPort),
1036 }, "-"),
1037 Namespace: es.Namespace,
1038 })
1039 }
1040 continue
1041 }
1042
1043 if endpoint.TargetRef.Kind == endpointTargetRefPod {
1044 ids = append(ids, PodID{
1045 Name: endpoint.TargetRef.Name,
1046 Namespace: endpoint.TargetRef.Namespace,
1047 IPFamily: corev1.IPFamily(es.AddressType),
1048 })
1049 } else if endpoint.TargetRef.Kind == endpointTargetRefExternalWorkload {
1050 ids = append(ids, ExternalWorkloadID{
1051 Name: endpoint.TargetRef.Name,
1052 Namespace: endpoint.TargetRef.Namespace,
1053 })
1054 }
1055
1056 }
1057 return ids
1058 }
1059
1060 func (pp *portPublisher) endpointsToAddresses(endpoints *corev1.Endpoints) AddressSet {
1061 addresses := make(map[ID]Address)
1062 for _, subset := range endpoints.Subsets {
1063 resolvedPort := pp.resolveTargetPort(subset)
1064 if resolvedPort == undefinedEndpointPort {
1065 continue
1066 }
1067 for _, endpoint := range subset.Addresses {
1068 if pp.hostname != "" && pp.hostname != endpoint.Hostname {
1069 continue
1070 }
1071
1072 if endpoint.TargetRef == nil {
1073 var authorityOverride string
1074 if fqName, ok := endpoints.Annotations[consts.RemoteServiceFqName]; ok {
1075 authorityOverride = fmt.Sprintf("%s:%d", fqName, pp.srcPort)
1076 }
1077
1078 identity := endpoints.Annotations[consts.RemoteGatewayIdentity]
1079 address, id := pp.newServiceRefAddress(resolvedPort, endpoint.IP, endpoints.Name, endpoints.Namespace)
1080 address.Identity, address.AuthorityOverride = identity, authorityOverride
1081
1082 addresses[id] = address
1083 continue
1084 }
1085
1086 if endpoint.TargetRef.Kind == endpointTargetRefPod {
1087 address, id, err := pp.newPodRefAddress(
1088 resolvedPort,
1089 "",
1090 endpoint.IP,
1091 endpoint.TargetRef.Name,
1092 endpoint.TargetRef.Namespace,
1093 )
1094 if err != nil {
1095 pp.log.Errorf("Unable to create new address:%v", err)
1096 continue
1097 }
1098 err = SetToServerProtocol(pp.k8sAPI, &address)
1099 if err != nil {
1100 pp.log.Errorf("failed to set address OpaqueProtocol: %s", err)
1101 continue
1102 }
1103 addresses[id] = address
1104 }
1105 }
1106 }
1107 return AddressSet{
1108 Addresses: addresses,
1109 Labels: metricLabels(endpoints),
1110 }
1111 }
1112
1113 func (pp *portPublisher) newServiceRefAddress(endpointPort Port, endpointIP, serviceName, serviceNamespace string) (Address, ServiceID) {
1114 id := ServiceID{
1115 Name: strings.Join([]string{
1116 serviceName,
1117 endpointIP,
1118 fmt.Sprint(endpointPort),
1119 }, "-"),
1120 Namespace: serviceNamespace,
1121 }
1122
1123 return Address{IP: endpointIP, Port: endpointPort}, id
1124 }
1125
1126 func (pp *portPublisher) newPodRefAddress(
1127 endpointPort Port,
1128 ipFamily discovery.AddressType,
1129 endpointIP,
1130 podName,
1131 podNamespace string,
1132 ) (Address, PodID, error) {
1133 id := PodID{
1134 Name: podName,
1135 Namespace: podNamespace,
1136 IPFamily: corev1.IPFamily(ipFamily),
1137 }
1138 pod, err := pp.k8sAPI.Pod().Lister().Pods(id.Namespace).Get(id.Name)
1139 if err != nil {
1140 return Address{}, PodID{}, fmt.Errorf("unable to fetch pod %v: %w", id, err)
1141 }
1142 ownerKind, ownerName, err := pp.metadataAPI.GetOwnerKindAndName(context.Background(), pod, false)
1143 if err != nil {
1144 return Address{}, PodID{}, err
1145 }
1146 addr := Address{
1147 IP: endpointIP,
1148 Port: endpointPort,
1149 Pod: pod,
1150 OwnerName: ownerName,
1151 OwnerKind: ownerKind,
1152 }
1153
1154 return addr, id, nil
1155 }
1156
1157 func (pp *portPublisher) newExtRefAddress(endpointPort Port, endpointIP, externalWorkloadName, externalWorkloadNamespace string) (Address, ExternalWorkloadID, error) {
1158 id := ExternalWorkloadID{
1159 Name: externalWorkloadName,
1160 Namespace: externalWorkloadNamespace,
1161 }
1162
1163 ew, err := pp.k8sAPI.ExtWorkload().Lister().ExternalWorkloads(id.Namespace).Get(id.Name)
1164 if err != nil {
1165 return Address{}, ExternalWorkloadID{}, fmt.Errorf("unable to fetch ExternalWorkload %v: %w", id, err)
1166 }
1167
1168 addr := Address{
1169 IP: endpointIP,
1170 Port: endpointPort,
1171 ExternalWorkload: ew,
1172 }
1173
1174 ownerRefs := ew.GetOwnerReferences()
1175 if len(ownerRefs) == 1 {
1176 parent := ownerRefs[0]
1177 addr.OwnerName = parent.Name
1178 addr.OwnerName = strings.ToLower(parent.Kind)
1179 }
1180
1181 return addr, id, nil
1182 }
1183
1184 func (pp *portPublisher) resolveESTargetPort(slicePorts []discovery.EndpointPort) Port {
1185 if slicePorts == nil {
1186 return undefinedEndpointPort
1187 }
1188
1189 switch pp.targetPort.Type {
1190 case intstr.Int:
1191 return Port(pp.targetPort.IntVal)
1192 case intstr.String:
1193 for _, p := range slicePorts {
1194 name := ""
1195 if p.Name != nil {
1196 name = *p.Name
1197 }
1198 if name == pp.targetPort.StrVal {
1199 return Port(*p.Port)
1200 }
1201 }
1202 }
1203 return undefinedEndpointPort
1204 }
1205
1206 func (pp *portPublisher) resolveTargetPort(subset corev1.EndpointSubset) Port {
1207 switch pp.targetPort.Type {
1208 case intstr.Int:
1209 return Port(pp.targetPort.IntVal)
1210 case intstr.String:
1211 for _, p := range subset.Ports {
1212 if p.Name == pp.targetPort.StrVal {
1213 return Port(p.Port)
1214 }
1215 }
1216 }
1217 return undefinedEndpointPort
1218 }
1219
1220 func (pp *portPublisher) updateLocalTrafficPolicy(localTrafficPolicy bool) {
1221 pp.localTrafficPolicy = localTrafficPolicy
1222 pp.addresses.LocalTrafficPolicy = localTrafficPolicy
1223 for _, listener := range pp.listeners {
1224 listener.Add(pp.addresses.shallowCopy())
1225 }
1226 }
1227
1228 func (pp *portPublisher) updatePort(targetPort namedPort) {
1229 pp.targetPort = targetPort
1230
1231 if pp.enableEndpointSlices {
1232 matchLabels := map[string]string{discovery.LabelServiceName: pp.id.Name}
1233 selector := labels.Set(matchLabels).AsSelector()
1234
1235 endpointSlices, err := pp.k8sAPI.ES().Lister().EndpointSlices(pp.id.Namespace).List(selector)
1236 if err == nil {
1237 pp.addresses = AddressSet{}
1238 for _, slice := range endpointSlices {
1239 pp.addEndpointSlice(slice)
1240 }
1241 } else {
1242 pp.log.Errorf("Unable to get EndpointSlices during port update: %s", err)
1243 }
1244 } else {
1245 endpoints, err := pp.k8sAPI.Endpoint().Lister().Endpoints(pp.id.Namespace).Get(pp.id.Name)
1246 if err == nil {
1247 pp.updateEndpoints(endpoints)
1248 } else {
1249 pp.log.Errorf("Unable to get endpoints during port update: %s", err)
1250 }
1251 }
1252 }
1253
1254 func (pp *portPublisher) deleteEndpointSlice(es *discovery.EndpointSlice) {
1255 addrSet := pp.endpointSliceToAddresses(es)
1256 for id := range addrSet.Addresses {
1257 delete(pp.addresses.Addresses, id)
1258 }
1259
1260 for _, listener := range pp.listeners {
1261 listener.Remove(addrSet)
1262 }
1263
1264 if len(pp.addresses.Addresses) == 0 {
1265 pp.noEndpoints(false)
1266 } else {
1267 pp.exists = true
1268 pp.metrics.incUpdates()
1269 pp.metrics.setPods(len(pp.addresses.Addresses))
1270 pp.metrics.setExists(true)
1271 }
1272 }
1273
1274 func (pp *portPublisher) noEndpoints(exists bool) {
1275 pp.exists = exists
1276 pp.addresses = AddressSet{}
1277 for _, listener := range pp.listeners {
1278 listener.NoEndpoints(exists)
1279 }
1280
1281 pp.metrics.incUpdates()
1282 pp.metrics.setExists(exists)
1283 pp.metrics.setPods(0)
1284 }
1285
1286 func (pp *portPublisher) subscribe(listener EndpointUpdateListener) {
1287 if pp.exists {
1288 if len(pp.addresses.Addresses) > 0 {
1289 listener.Add(pp.addresses.shallowCopy())
1290 } else {
1291 listener.NoEndpoints(true)
1292 }
1293 } else {
1294 listener.NoEndpoints(false)
1295 }
1296 pp.listeners = append(pp.listeners, listener)
1297
1298 pp.metrics.setSubscribers(len(pp.listeners))
1299 }
1300
1301 func (pp *portPublisher) unsubscribe(listener EndpointUpdateListener) {
1302 for i, e := range pp.listeners {
1303 if e == listener {
1304 n := len(pp.listeners)
1305 pp.listeners[i] = pp.listeners[n-1]
1306 pp.listeners[n-1] = nil
1307 pp.listeners = pp.listeners[:n-1]
1308 break
1309 }
1310 }
1311
1312 pp.metrics.setSubscribers(len(pp.listeners))
1313 }
1314 func (pp *portPublisher) updateServer(oldServer, newServer *v1beta2.Server) {
1315 updated := false
1316 for id, address := range pp.addresses.Addresses {
1317
1318 if pp.isAddressSelected(address, oldServer) || pp.isAddressSelected(address, newServer) {
1319 if newServer != nil && pp.isAddressSelected(address, newServer) && newServer.Spec.ProxyProtocol == opaqueProtocol {
1320 address.OpaqueProtocol = true
1321 } else {
1322 address.OpaqueProtocol = false
1323 }
1324 if pp.addresses.Addresses[id].OpaqueProtocol != address.OpaqueProtocol {
1325 pp.addresses.Addresses[id] = address
1326 updated = true
1327 }
1328 }
1329 }
1330 if updated {
1331 for _, listener := range pp.listeners {
1332 listener.Add(pp.addresses.shallowCopy())
1333 }
1334 pp.metrics.incUpdates()
1335 }
1336 }
1337
1338 func (pp *portPublisher) isAddressSelected(address Address, server *v1beta2.Server) bool {
1339 if server == nil {
1340 return false
1341 }
1342
1343 if address.Pod != nil {
1344 selector, err := metav1.LabelSelectorAsSelector(server.Spec.PodSelector)
1345 if err != nil {
1346 pp.log.Errorf("failed to create Selector: %s", err)
1347 return false
1348 }
1349
1350 if !selector.Matches(labels.Set(address.Pod.Labels)) {
1351 return false
1352 }
1353
1354 switch server.Spec.Port.Type {
1355 case intstr.Int:
1356 if server.Spec.Port.IntVal == int32(address.Port) {
1357 return true
1358 }
1359 case intstr.String:
1360 for _, c := range address.Pod.Spec.Containers {
1361 for _, p := range c.Ports {
1362 if p.ContainerPort == int32(address.Port) && p.Name == server.Spec.Port.StrVal {
1363 return true
1364 }
1365 }
1366 }
1367 }
1368
1369 } else if address.ExternalWorkload != nil {
1370 selector, err := metav1.LabelSelectorAsSelector(server.Spec.ExternalWorkloadSelector)
1371 if err != nil {
1372 pp.log.Errorf("failed to create Selector: %s", err)
1373 return false
1374 }
1375
1376 if !selector.Matches(labels.Set(address.ExternalWorkload.Labels)) {
1377 return false
1378 }
1379
1380 switch server.Spec.Port.Type {
1381 case intstr.Int:
1382 if server.Spec.Port.IntVal == int32(address.Port) {
1383 return true
1384 }
1385 case intstr.String:
1386 for _, p := range address.ExternalWorkload.Spec.Ports {
1387 if p.Port == int32(address.Port) && p.Name == server.Spec.Port.StrVal {
1388 return true
1389 }
1390 }
1391 }
1392 }
1393 return false
1394 }
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405 func getTargetPort(service *corev1.Service, port Port) namedPort {
1406
1407 targetPort := intstr.FromInt(int(port))
1408
1409 if service == nil {
1410 return targetPort
1411 }
1412
1413
1414
1415 for _, portSpec := range service.Spec.Ports {
1416 if portSpec.Port == int32(port) {
1417 return intstr.FromString(portSpec.Name)
1418 }
1419 }
1420
1421 return targetPort
1422 }
1423
1424 func addressChanged(oldAddress Address, newAddress Address) bool {
1425
1426 if oldAddress.Identity != newAddress.Identity {
1427
1428
1429
1430
1431 return true
1432 }
1433
1434
1435 if len(newAddress.ForZones) != len(oldAddress.ForZones) {
1436 return true
1437 }
1438
1439
1440
1441 sort.Slice(oldAddress.ForZones, func(i, j int) bool {
1442 return oldAddress.ForZones[i].Name < (oldAddress.ForZones[j].Name)
1443 })
1444 sort.Slice(newAddress.ForZones, func(i, j int) bool {
1445 return newAddress.ForZones[i].Name < (newAddress.ForZones[j].Name)
1446 })
1447
1448
1449 for k := range oldAddress.ForZones {
1450 if oldAddress.ForZones[k].Name != newAddress.ForZones[k].Name {
1451 return true
1452 }
1453 }
1454
1455 if oldAddress.Pod != nil && newAddress.Pod != nil {
1456
1457 return oldAddress.Pod.ResourceVersion != newAddress.Pod.ResourceVersion
1458 }
1459 return false
1460 }
1461
1462 func diffAddresses(oldAddresses, newAddresses AddressSet) (add, remove AddressSet) {
1463
1464
1465
1466 addAddresses := make(map[ID]Address)
1467 removeAddresses := make(map[ID]Address)
1468 for id, newAddress := range newAddresses.Addresses {
1469 if oldAddress, ok := oldAddresses.Addresses[id]; ok {
1470 if addressChanged(oldAddress, newAddress) {
1471 addAddresses[id] = newAddress
1472 }
1473 } else {
1474
1475 addAddresses[id] = newAddress
1476 }
1477 }
1478 for id, address := range oldAddresses.Addresses {
1479 if _, ok := newAddresses.Addresses[id]; !ok {
1480 removeAddresses[id] = address
1481 }
1482 }
1483 add = AddressSet{
1484 Addresses: addAddresses,
1485 Labels: newAddresses.Labels,
1486 LocalTrafficPolicy: newAddresses.LocalTrafficPolicy,
1487 }
1488 remove = AddressSet{
1489 Addresses: removeAddresses,
1490 }
1491 return add, remove
1492 }
1493
1494 func getEndpointSliceServiceID(es *discovery.EndpointSlice) (ServiceID, error) {
1495 if !isValidSlice(es) {
1496 return ServiceID{}, fmt.Errorf("EndpointSlice [%s/%s] is invalid", es.Namespace, es.Name)
1497 }
1498
1499 if svc, ok := es.Labels[discovery.LabelServiceName]; ok {
1500 return ServiceID{Namespace: es.Namespace, Name: svc}, nil
1501 }
1502
1503 for _, ref := range es.OwnerReferences {
1504 if ref.Kind == "Service" && ref.Name != "" {
1505 return ServiceID{Namespace: es.Namespace, Name: ref.Name}, nil
1506 }
1507 }
1508
1509 return ServiceID{}, fmt.Errorf("EndpointSlice [%s/%s] is invalid", es.Namespace, es.Name)
1510 }
1511
1512 func isValidSlice(es *discovery.EndpointSlice) bool {
1513 serviceName, ok := es.Labels[discovery.LabelServiceName]
1514 if !ok && len(es.OwnerReferences) == 0 {
1515 return false
1516 } else if len(es.OwnerReferences) == 0 && serviceName == "" {
1517 return false
1518 }
1519
1520 return true
1521 }
1522
1523
1524
1525 func SetToServerProtocol(k8sAPI *k8s.API, address *Address) error {
1526 if address.Pod == nil {
1527 return fmt.Errorf("endpoint not backed by Pod: %s:%d", address.IP, address.Port)
1528 }
1529 servers, err := k8sAPI.Srv().Lister().Servers("").List(labels.Everything())
1530 if err != nil {
1531 return fmt.Errorf("failed to list Servers: %w", err)
1532 }
1533 for _, server := range servers {
1534 selector, err := metav1.LabelSelectorAsSelector(server.Spec.PodSelector)
1535 if err != nil {
1536 return fmt.Errorf("failed to create Selector: %w", err)
1537 }
1538 if server.Spec.ProxyProtocol == opaqueProtocol && selector.Matches(labels.Set(address.Pod.Labels)) {
1539 var portMatch bool
1540 switch server.Spec.Port.Type {
1541 case intstr.Int:
1542 if server.Spec.Port.IntVal == int32(address.Port) {
1543 portMatch = true
1544 }
1545 case intstr.String:
1546 for _, c := range address.Pod.Spec.Containers {
1547 for _, p := range c.Ports {
1548 if (p.ContainerPort == int32(address.Port) || p.HostPort == int32(address.Port)) &&
1549 p.Name == server.Spec.Port.StrVal {
1550 portMatch = true
1551 }
1552 }
1553 }
1554 default:
1555 continue
1556 }
1557 if portMatch {
1558 address.OpaqueProtocol = true
1559 return nil
1560 }
1561 }
1562 }
1563 return nil
1564 }
1565
1566
1567
1568 func SetToServerProtocolExternalWorkload(k8sAPI *k8s.API, address *Address) error {
1569 if address.ExternalWorkload == nil {
1570 return fmt.Errorf("endpoint not backed by ExternalWorkload: %s:%d", address.IP, address.Port)
1571 }
1572 servers, err := k8sAPI.Srv().Lister().Servers("").List(labels.Everything())
1573 if err != nil {
1574 return fmt.Errorf("failed to list Servers: %w", err)
1575 }
1576 for _, server := range servers {
1577 selector, err := metav1.LabelSelectorAsSelector(server.Spec.ExternalWorkloadSelector)
1578 if err != nil {
1579 return fmt.Errorf("failed to create Selector: %w", err)
1580 }
1581 if server.Spec.ProxyProtocol == opaqueProtocol && selector.Matches(labels.Set(address.ExternalWorkload.Labels)) {
1582 var portMatch bool
1583 switch server.Spec.Port.Type {
1584 case intstr.Int:
1585 if server.Spec.Port.IntVal == int32(address.Port) {
1586 portMatch = true
1587 }
1588 case intstr.String:
1589 for _, p := range address.ExternalWorkload.Spec.Ports {
1590 if p.Port == int32(address.Port) && p.Name == server.Spec.Port.StrVal {
1591 portMatch = true
1592 }
1593
1594 }
1595 default:
1596 continue
1597 }
1598 if portMatch {
1599 address.OpaqueProtocol = true
1600 return nil
1601 }
1602 }
1603 }
1604 return nil
1605 }
1606
1607 func latestUpdated(managedFields []metav1.ManagedFieldsEntry) time.Time {
1608 var latest time.Time
1609 for _, field := range managedFields {
1610 if field.Operation == metav1.ManagedFieldsOperationUpdate {
1611 if latest.IsZero() || field.Time.After(latest) {
1612 latest = field.Time.Time
1613 }
1614 }
1615 }
1616 return latest
1617 }
1618
View as plain text