1 package watcher
2
3 import (
4 "context"
5 "fmt"
6 "net"
7 "strconv"
8 "strings"
9 "sync"
10 "time"
11
12 ext "github.com/linkerd/linkerd2/controller/gen/apis/externalworkload/v1beta1"
13 "github.com/linkerd/linkerd2/controller/gen/apis/server/v1beta2"
14 "github.com/linkerd/linkerd2/controller/k8s"
15 consts "github.com/linkerd/linkerd2/pkg/k8s"
16 "github.com/linkerd/linkerd2/pkg/util"
17 "github.com/prometheus/client_golang/prometheus"
18 logging "github.com/sirupsen/logrus"
19 "google.golang.org/grpc/codes"
20 "google.golang.org/grpc/status"
21 corev1 "k8s.io/api/core/v1"
22 discovery "k8s.io/api/discovery/v1"
23 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
24 "k8s.io/apimachinery/pkg/labels"
25 "k8s.io/client-go/tools/cache"
26 )
27
28 type (
29
30
31 WorkloadWatcher struct {
32 defaultOpaquePorts map[uint32]struct{}
33 k8sAPI *k8s.API
34 metadataAPI *k8s.MetadataAPI
35 publishers map[IPPort]*workloadPublisher
36 log *logging.Entry
37 enableEndpointSlices bool
38
39 mu sync.RWMutex
40 }
41
42
43
44
45
46 workloadPublisher struct {
47 defaultOpaquePorts map[uint32]struct{}
48 k8sAPI *k8s.API
49 metadataAPI *k8s.MetadataAPI
50 addr Address
51 listeners []WorkloadUpdateListener
52 metrics metrics
53 log *logging.Entry
54
55 mu sync.RWMutex
56 }
57
58
59 WorkloadUpdateListener interface {
60 Update(*Address) error
61 }
62 )
63
64 var ipPortVecs = newMetricsVecs("ip_port", []string{"ip", "port"})
65
66 func NewWorkloadWatcher(k8sAPI *k8s.API, metadataAPI *k8s.MetadataAPI, log *logging.Entry, enableEndpointSlices bool, defaultOpaquePorts map[uint32]struct{}) (*WorkloadWatcher, error) {
67 ww := &WorkloadWatcher{
68 defaultOpaquePorts: defaultOpaquePorts,
69 k8sAPI: k8sAPI,
70 metadataAPI: metadataAPI,
71 publishers: make(map[IPPort]*workloadPublisher),
72 log: log.WithFields(logging.Fields{
73 "component": "workload-watcher",
74 }),
75 enableEndpointSlices: enableEndpointSlices,
76 }
77
78 _, err := k8sAPI.Pod().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
79 AddFunc: ww.addPod,
80 DeleteFunc: ww.deletePod,
81 UpdateFunc: ww.updatePod,
82 })
83 if err != nil {
84 return nil, err
85 }
86
87 _, err = k8sAPI.ExtWorkload().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
88 AddFunc: ww.addExternalWorkload,
89 DeleteFunc: ww.deleteExternalWorkload,
90 UpdateFunc: ww.updateExternalWorkload,
91 })
92 if err != nil {
93 return nil, err
94 }
95
96 _, err = k8sAPI.Srv().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
97 AddFunc: ww.addOrDeleteServer,
98 DeleteFunc: ww.addOrDeleteServer,
99 UpdateFunc: ww.updateServer,
100 })
101 if err != nil {
102 return nil, err
103 }
104
105 return ww, nil
106 }
107
108
109
110
111
112 func (ww *WorkloadWatcher) Subscribe(service *ServiceID, hostname, ip string, port Port, listener WorkloadUpdateListener) (string, error) {
113 if hostname != "" {
114 ww.log.Debugf("Establishing watch on workload %s.%s.%s:%d", hostname, service.Name, service.Namespace, port)
115 } else if service != nil {
116 ww.log.Debugf("Establishing watch on workload %s.%s:%d", service.Name, service.Namespace, port)
117 } else {
118 ww.log.Debugf("Establishing watch on workload %s:%d", ip, port)
119 }
120 wp, err := ww.getOrNewWorkloadPublisher(service, hostname, ip, port)
121 if err != nil {
122 return "", err
123 }
124
125 if err = wp.subscribe(listener); err != nil {
126 return "", err
127 }
128
129 return wp.addr.IP, nil
130 }
131
132
133
134 func (ww *WorkloadWatcher) Unsubscribe(ip string, port Port, listener WorkloadUpdateListener) {
135 ww.mu.Lock()
136 defer ww.mu.Unlock()
137
138 ww.log.Debugf("Stopping watch on %s:%d", ip, port)
139 wp, ok := ww.getWorkloadPublisher(ip, port)
140 if !ok {
141 ww.log.Errorf("Cannot unsubscribe from unknown ip:port [%s:%d]", ip, port)
142 return
143 }
144 wp.unsubscribe(listener)
145
146 if len(wp.listeners) == 0 {
147 delete(ww.publishers, IPPort{wp.addr.IP, wp.addr.Port})
148 }
149 }
150
151
152 func (ww *WorkloadWatcher) addPod(obj any) {
153 pod := obj.(*corev1.Pod)
154 ww.log.Tracef("Added pod %s.%s", pod.Name, pod.Namespace)
155 go ww.submitPodUpdate(pod, false)
156 }
157
158
159 func (ww *WorkloadWatcher) deletePod(obj any) {
160 pod, ok := obj.(*corev1.Pod)
161 if !ok {
162 tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
163 if !ok {
164 ww.log.Errorf("Couldn't get object from DeletedFinalStateUnknown %#v", obj)
165 return
166 }
167 pod, ok = tombstone.Obj.(*corev1.Pod)
168 if !ok {
169 ww.log.Errorf("DeletedFinalStateUnknown contained object that is not a Pod %#v", obj)
170 return
171 }
172 }
173 ww.log.Tracef("Deleted pod %s.%s", pod.Name, pod.Namespace)
174 go ww.submitPodUpdate(pod, true)
175 }
176
177
178 func (ww *WorkloadWatcher) updatePod(oldObj any, newObj any) {
179 oldPod := oldObj.(*corev1.Pod)
180 newPod := newObj.(*corev1.Pod)
181 if oldPod.DeletionTimestamp == nil && newPod.DeletionTimestamp != nil {
182
183 return
184 }
185
186 oldUpdated := latestUpdated(oldPod.ManagedFields)
187 updated := latestUpdated(newPod.ManagedFields)
188 if !updated.IsZero() && updated != oldUpdated {
189 delta := time.Since(updated)
190 podInformerLag.Observe(delta.Seconds())
191 }
192
193 ww.log.Tracef("Updated pod %s.%s", newPod.Name, newPod.Namespace)
194 go ww.submitPodUpdate(newPod, false)
195 }
196
197
198 func (ww *WorkloadWatcher) addExternalWorkload(obj any) {
199 externalWorkload := obj.(*ext.ExternalWorkload)
200 ww.log.Tracef("Added externalworkload %s.%s", externalWorkload.Name, externalWorkload.Namespace)
201 go ww.submitExternalWorkloadUpdate(externalWorkload, false)
202 }
203
204
205 func (ww *WorkloadWatcher) deleteExternalWorkload(obj any) {
206 externalWorkload, ok := obj.(*ext.ExternalWorkload)
207 if !ok {
208 tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
209 if !ok {
210 ww.log.Errorf("Couldn't get object from DeletedFinalStateUnknown %#v", obj)
211 return
212 }
213 externalWorkload, ok = tombstone.Obj.(*ext.ExternalWorkload)
214 if !ok {
215 ww.log.Errorf("DeletedFinalStateUnknown contained object that is not an ExternalWorkload %#v", obj)
216 return
217 }
218 }
219 ww.log.Tracef("Deleted externalworklod %s.%s", externalWorkload.Name, externalWorkload.Namespace)
220 go ww.submitExternalWorkloadUpdate(externalWorkload, true)
221 }
222
223
224 func (ww *WorkloadWatcher) updateExternalWorkload(oldObj any, newObj any) {
225 oldExternalWorkload := oldObj.(*ext.ExternalWorkload)
226 newExternalWorkload := newObj.(*ext.ExternalWorkload)
227 if oldExternalWorkload.DeletionTimestamp == nil && newExternalWorkload.DeletionTimestamp != nil {
228
229 return
230 }
231
232 oldUpdated := latestUpdated(oldExternalWorkload.ManagedFields)
233 updated := latestUpdated(newExternalWorkload.ManagedFields)
234 if !updated.IsZero() && updated != oldUpdated {
235 delta := time.Since(updated)
236 externalWorkloadInformerLag.Observe(delta.Seconds())
237 }
238
239 ww.log.Tracef("Updated pod %s.%s", newExternalWorkload.Name, newExternalWorkload.Namespace)
240 go ww.submitExternalWorkloadUpdate(newExternalWorkload, false)
241 }
242
243 func (ww *WorkloadWatcher) submitPodUpdate(pod *corev1.Pod, remove bool) {
244 ww.mu.RLock()
245 defer ww.mu.RUnlock()
246
247 submitPod := pod
248 if remove {
249 submitPod = nil
250 }
251
252 for _, container := range pod.Spec.Containers {
253 for _, containerPort := range container.Ports {
254 if containerPort.ContainerPort != 0 {
255 for _, pip := range pod.Status.PodIPs {
256 if wp, ok := ww.getWorkloadPublisher(pip.IP, Port(containerPort.ContainerPort)); ok {
257 wp.updatePod(submitPod)
258 }
259 }
260 if len(pod.Status.PodIPs) == 0 && pod.Status.PodIP != "" {
261 if wp, ok := ww.getWorkloadPublisher(pod.Status.PodIP, Port(containerPort.ContainerPort)); ok {
262 wp.updatePod(submitPod)
263 }
264 }
265 }
266
267 if containerPort.HostPort != 0 {
268 for _, hip := range pod.Status.HostIPs {
269 if pp, ok := ww.getWorkloadPublisher(hip.IP, Port(containerPort.HostPort)); ok {
270 pp.updatePod(submitPod)
271 }
272 }
273 if len(pod.Status.HostIPs) == 0 && pod.Status.HostIP != "" {
274 if pp, ok := ww.getWorkloadPublisher(pod.Status.HostIP, Port(containerPort.HostPort)); ok {
275 pp.updatePod(submitPod)
276 }
277 }
278 }
279 }
280 }
281 }
282
283 func (ww *WorkloadWatcher) submitExternalWorkloadUpdate(externalWorkload *ext.ExternalWorkload, remove bool) {
284 ww.mu.RLock()
285 defer ww.mu.RUnlock()
286
287 submitWorkload := externalWorkload
288 if remove {
289 submitWorkload = nil
290 }
291
292 for _, port := range externalWorkload.Spec.Ports {
293 for _, ip := range externalWorkload.Spec.WorkloadIPs {
294 if wp, ok := ww.getWorkloadPublisher(ip.Ip, Port(port.Port)); ok {
295 wp.updateExternalWorkload(submitWorkload)
296 }
297 }
298 }
299 }
300
301 func (ww *WorkloadWatcher) updateServer(oldObj interface{}, newObj interface{}) {
302 oldServer := oldObj.(*v1beta2.Server)
303 newServer := newObj.(*v1beta2.Server)
304
305 oldUpdated := latestUpdated(oldServer.ManagedFields)
306 updated := latestUpdated(newServer.ManagedFields)
307
308 if !updated.IsZero() && updated != oldUpdated {
309 delta := time.Since(updated)
310 serverInformerLag.Observe(delta.Seconds())
311 }
312
313 ww.updateServers(oldServer, newServer)
314 }
315
316 func (ww *WorkloadWatcher) addOrDeleteServer(obj interface{}) {
317 server, ok := obj.(*v1beta2.Server)
318 if !ok {
319 tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
320 if !ok {
321 ww.log.Errorf("Couldn't get object from DeletedFinalStateUnknown %#v", obj)
322 return
323 }
324 server, ok = tombstone.Obj.(*v1beta2.Server)
325 if !ok {
326 ww.log.Errorf("DeletedFinalStateUnknown contained object that is not a Server %#v", obj)
327 return
328 }
329 }
330 ww.updateServers(server)
331 }
332
333
334
335
336
337 func (ww *WorkloadWatcher) updateServers(servers ...*v1beta2.Server) {
338 ww.mu.RLock()
339 defer ww.mu.RUnlock()
340
341 for _, wp := range ww.publishers {
342 var opaquePorts map[uint32]struct{}
343 if wp.addr.Pod != nil {
344 if !ww.isPodSelectedByAny(wp.addr.Pod, servers...) {
345 continue
346 }
347 opaquePorts = GetAnnotatedOpaquePorts(wp.addr.Pod, ww.defaultOpaquePorts)
348 } else if wp.addr.ExternalWorkload != nil {
349 if !ww.isExternalWorkloadSelectedByAny(wp.addr.ExternalWorkload, servers...) {
350 continue
351 }
352 opaquePorts = GetAnnotatedOpaquePortsForExternalWorkload(wp.addr.ExternalWorkload, ww.defaultOpaquePorts)
353 } else {
354 continue
355 }
356
357 _, annotatedOpaque := opaquePorts[wp.addr.Port]
358
359 if annotatedOpaque {
360 continue
361 }
362
363 opaque := wp.addr.OpaqueProtocol
364 name := net.JoinHostPort(wp.addr.IP, fmt.Sprintf("%d", wp.addr.Port))
365 if wp.addr.Pod != nil {
366 name = wp.addr.Pod.GetName()
367 } else if wp.addr.ExternalWorkload != nil {
368 name = wp.addr.ExternalWorkload.GetName()
369 }
370 if err := SetToServerProtocol(wp.k8sAPI, &wp.addr); err != nil {
371 wp.log.Errorf("Error computing opaque protocol for %s: %q", name, err)
372 }
373 if wp.addr.OpaqueProtocol == opaque {
374
375 continue
376 }
377
378 go func(wp *workloadPublisher) {
379 wp.mu.RLock()
380 defer wp.mu.RUnlock()
381
382 for _, listener := range wp.listeners {
383 if err := listener.Update(&wp.addr); err != nil {
384 ww.log.Warnf("Error sending update to listener: %s", err)
385 continue
386 }
387 }
388 wp.metrics.incUpdates()
389 }(wp)
390 }
391 }
392
393 func (ww *WorkloadWatcher) isPodSelectedByAny(pod *corev1.Pod, servers ...*v1beta2.Server) bool {
394 for _, s := range servers {
395 selector, err := metav1.LabelSelectorAsSelector(s.Spec.PodSelector)
396 if err != nil {
397 ww.log.Errorf("failed to parse PodSelector of Server %s.%s: %q", s.GetName(), s.GetNamespace(), err)
398 continue
399 }
400 if selector.Matches(labels.Set(pod.Labels)) {
401 return true
402 }
403 }
404 return false
405 }
406
407 func (ww *WorkloadWatcher) isExternalWorkloadSelectedByAny(ew *ext.ExternalWorkload, servers ...*v1beta2.Server) bool {
408 for _, s := range servers {
409 selector, err := metav1.LabelSelectorAsSelector(s.Spec.ExternalWorkloadSelector)
410 if err != nil {
411 ww.log.Errorf("failed to parse ExternalWorkloadSelector of Server %s.%s: %q", s.GetName(), s.GetNamespace(), err)
412 continue
413 }
414 if selector.Matches(labels.Set(ew.Labels)) {
415 return true
416 }
417 }
418 return false
419 }
420
421
422
423 func (ww *WorkloadWatcher) getOrNewWorkloadPublisher(service *ServiceID, hostname, ip string, port Port) (*workloadPublisher, error) {
424 ww.mu.Lock()
425 defer ww.mu.Unlock()
426
427 var pod *corev1.Pod
428 var externalWorkload *ext.ExternalWorkload
429 var err error
430 if hostname != "" {
431 pod, err = ww.getEndpointByHostname(hostname, service)
432 if err != nil {
433 return nil, err
434 }
435 ip = pod.Status.PodIP
436 } else {
437 pod, err = ww.getPodByPodIP(ip, port)
438 if err != nil {
439 return nil, err
440 }
441 if pod == nil {
442 pod, err = ww.getPodByHostIP(ip, port)
443 if err != nil {
444 return nil, err
445 }
446 }
447 if pod == nil {
448 externalWorkload, err = ww.getExternalWorkloadByIP(ip, port)
449 if err != nil {
450 return nil, err
451 }
452 }
453 }
454
455 ipPort := IPPort{ip, port}
456 wp, ok := ww.publishers[ipPort]
457 if !ok {
458 wp = &workloadPublisher{
459 defaultOpaquePorts: ww.defaultOpaquePorts,
460 k8sAPI: ww.k8sAPI,
461 metadataAPI: ww.metadataAPI,
462 addr: Address{
463 IP: ip,
464 Port: port,
465 },
466 metrics: ipPortVecs.newMetrics(prometheus.Labels{
467 "ip": ip,
468 "port": strconv.FormatUint(uint64(port), 10),
469 }),
470 log: ww.log.WithFields(logging.Fields{
471 "component": "workload-publisher",
472 "ip": ip,
473 "port": port,
474 }),
475 }
476 if pod != nil {
477 wp.updatePod(pod)
478 }
479 if externalWorkload != nil {
480 wp.updateExternalWorkload(externalWorkload)
481 }
482 ww.publishers[ipPort] = wp
483 }
484 return wp, nil
485 }
486
487 func (ww *WorkloadWatcher) getWorkloadPublisher(ip string, port Port) (wp *workloadPublisher, ok bool) {
488 ipPort := IPPort{ip, port}
489 wp, ok = ww.publishers[ipPort]
490 return
491 }
492
493
494 func (ww *WorkloadWatcher) getPodByPodIP(podIP string, port uint32) (*corev1.Pod, error) {
495 podIPPods, err := getIndexedPods(ww.k8sAPI, PodIPIndex, podIP)
496 if err != nil {
497 return nil, status.Error(codes.Unknown, err.Error())
498 }
499 if len(podIPPods) == 1 {
500 ww.log.Debugf("found %s on the pod network", podIP)
501 return podIPPods[0], nil
502 }
503 if len(podIPPods) > 1 {
504 conflictingPods := []string{}
505 for _, pod := range podIPPods {
506 conflictingPods = append(conflictingPods, fmt.Sprintf("%s:%s", pod.Namespace, pod.Name))
507 }
508 ww.log.Warnf("found conflicting %s IP on the pod network: %s", podIP, strings.Join(conflictingPods, ","))
509 return nil, status.Errorf(codes.FailedPrecondition, "found %d pods with a conflicting pod network IP %s", len(podIPPods), podIP)
510 }
511
512 ww.log.Debugf("no pod found for %s:%d", podIP, port)
513 return nil, nil
514 }
515
516
517
518 func (ww *WorkloadWatcher) getPodByHostIP(hostIP string, port uint32) (*corev1.Pod, error) {
519 addr := net.JoinHostPort(hostIP, fmt.Sprintf("%d", port))
520 hostIPPods, err := getIndexedPods(ww.k8sAPI, HostIPIndex, addr)
521 if err != nil {
522 return nil, status.Error(codes.Unknown, err.Error())
523 }
524 if len(hostIPPods) == 1 {
525 ww.log.Debugf("found %s:%d on the host network", hostIP, port)
526 return hostIPPods[0], nil
527 }
528 if len(hostIPPods) > 1 {
529 conflictingPods := []string{}
530 for _, pod := range hostIPPods {
531 conflictingPods = append(conflictingPods, fmt.Sprintf("%s:%s", pod.Namespace, pod.Name))
532 }
533 ww.log.Warnf("found conflicting %s:%d endpoint on the host network: %s", hostIP, port, strings.Join(conflictingPods, ","))
534 return nil, status.Errorf(codes.FailedPrecondition, "found %d pods with a conflicting host network endpoint %s:%d", len(hostIPPods), hostIP, port)
535 }
536
537 return nil, nil
538 }
539
540
541
542 func (ww *WorkloadWatcher) getExternalWorkloadByIP(ip string, port uint32) (*ext.ExternalWorkload, error) {
543 addr := net.JoinHostPort(ip, fmt.Sprintf("%d", port))
544 workloads, err := getIndexedExternalWorkloads(ww.k8sAPI, ExternalWorkloadIPIndex, addr)
545 if err != nil {
546 return nil, status.Error(codes.Unknown, err.Error())
547 }
548 if len(workloads) == 0 {
549 ww.log.Debugf("no externalworkload found for %s:%d", ip, port)
550 return nil, nil
551 }
552 if len(workloads) == 1 {
553 ww.log.Debugf("found externalworkload %s:%d", ip, port)
554 return workloads[0], nil
555 }
556 if len(workloads) > 1 {
557 conflictingWorkloads := []string{}
558 for _, ew := range workloads {
559 conflictingWorkloads = append(conflictingWorkloads, fmt.Sprintf("%s:%s", ew.Namespace, ew.Name))
560 }
561 ww.log.Warnf("found conflicting %s:%d externalworkload: %s", ip, port, strings.Join(conflictingWorkloads, ","))
562 return nil, status.Errorf(codes.FailedPrecondition, "found %d externalworkloads with a conflicting ip %s:%d", len(workloads), ip, port)
563 }
564
565 return nil, nil
566 }
567
568
569
570
571
572 func (ww *WorkloadWatcher) getEndpointByHostname(hostname string, svcID *ServiceID) (*corev1.Pod, error) {
573 if ww.enableEndpointSlices {
574 matchLabels := map[string]string{discovery.LabelServiceName: svcID.Name}
575 selector := labels.Set(matchLabels).AsSelector()
576
577 sliceList, err := ww.k8sAPI.ES().Lister().EndpointSlices(svcID.Namespace).List(selector)
578 if err != nil {
579 return nil, err
580 }
581 for _, slice := range sliceList {
582 for _, ep := range slice.Endpoints {
583 if hostname == *ep.Hostname {
584 if ep.TargetRef != nil && ep.TargetRef.Kind == "Pod" {
585 podName := ep.TargetRef.Name
586 podNamespace := ep.TargetRef.Namespace
587 pod, err := ww.k8sAPI.Pod().Lister().Pods(podNamespace).Get(podName)
588 if err != nil {
589 return nil, err
590 }
591 return pod, nil
592 }
593 return nil, nil
594 }
595 }
596 }
597
598 return nil, status.Errorf(codes.NotFound, "no pod found in EndpointSlices of Service %s/%s for hostname %s", svcID.Namespace, svcID.Name, hostname)
599 }
600
601 ep, err := ww.k8sAPI.Endpoint().Lister().Endpoints(svcID.Namespace).Get(svcID.Name)
602 if err != nil {
603 return nil, err
604 }
605
606 for _, subset := range ep.Subsets {
607 for _, addr := range subset.Addresses {
608
609 if hostname == addr.Hostname {
610 if addr.TargetRef != nil && addr.TargetRef.Kind == "Pod" {
611 podName := addr.TargetRef.Name
612 podNamespace := addr.TargetRef.Namespace
613 pod, err := ww.k8sAPI.Pod().Lister().Pods(podNamespace).Get(podName)
614 if err != nil {
615 return nil, err
616 }
617 return pod, nil
618 }
619 return nil, nil
620 }
621 }
622 }
623
624 return nil, status.Errorf(codes.NotFound, "no pod found in Endpoints %s/%s for hostname %s", svcID.Namespace, svcID.Name, hostname)
625 }
626
627 func (wp *workloadPublisher) subscribe(listener WorkloadUpdateListener) error {
628 wp.mu.Lock()
629 defer wp.mu.Unlock()
630
631 wp.listeners = append(wp.listeners, listener)
632 wp.metrics.setSubscribers(len(wp.listeners))
633
634 if err := listener.Update(&wp.addr); err != nil {
635 return fmt.Errorf("failed to send initial update: %w", err)
636 }
637 wp.metrics.incUpdates()
638 return nil
639 }
640
641 func (wp *workloadPublisher) unsubscribe(listener WorkloadUpdateListener) {
642 wp.mu.Lock()
643 defer wp.mu.Unlock()
644
645 for i, e := range wp.listeners {
646 if e == listener {
647 n := len(wp.listeners)
648 wp.listeners[i] = wp.listeners[n-1]
649 wp.listeners[n-1] = nil
650 wp.listeners = wp.listeners[:n-1]
651 break
652 }
653 }
654
655 wp.metrics.setSubscribers(len(wp.listeners))
656 }
657
658
659
660
661
662 func (wp *workloadPublisher) updatePod(pod *corev1.Pod) {
663 wp.mu.Lock()
664 defer wp.mu.Unlock()
665
666
667 if wp.addr.Pod == nil {
668 if pod == nil {
669 wp.log.Trace("Pod deletion event already consumed - ignore")
670 return
671 }
672
673 if !isRunningAndReady(pod) {
674 wp.log.Tracef("Pod %s.%s not ready - ignore", pod.Name, pod.Namespace)
675 return
676 }
677
678 wp.log.Debugf("Pod %s.%s became ready", pod.Name, pod.Namespace)
679 wp.addr.Pod = pod
680
681
682 if wp.addr.Pod != nil {
683 ownerKind, ownerName, err := wp.metadataAPI.GetOwnerKindAndName(context.Background(), wp.addr.Pod, true)
684 if err != nil {
685 wp.log.Errorf("Error getting pod owner for pod %s: %q", wp.addr.Pod.GetName(), err)
686 } else {
687 wp.addr.OwnerKind = ownerKind
688 wp.addr.OwnerName = ownerName
689 }
690 }
691
692
693 if err := SetToServerProtocol(wp.k8sAPI, &wp.addr); err != nil {
694 wp.log.Errorf("Error computing opaque protocol for pod %s: %q", wp.addr.Pod.GetName(), err)
695 }
696
697 for _, l := range wp.listeners {
698 if err := l.Update(&wp.addr); err != nil {
699 wp.log.Warnf("Error sending update to listener: %s", err)
700 continue
701 }
702 }
703 wp.metrics.incUpdates()
704
705 return
706 }
707
708
709 if pod == nil || !isRunningAndReady(pod) {
710 wp.log.Debugf("Pod %s.%s deleted or it became unready - remove", wp.addr.Pod.Name, wp.addr.Pod.Namespace)
711 wp.addr.Pod = nil
712 wp.addr.OwnerKind = ""
713 wp.addr.OwnerName = ""
714 wp.addr.OpaqueProtocol = false
715 for _, l := range wp.listeners {
716 if err := l.Update(&wp.addr); err != nil {
717 wp.log.Warnf("Error sending update to listener: %s", err)
718 continue
719 }
720 }
721 wp.metrics.incUpdates()
722
723 return
724 }
725
726 wp.log.Tracef("Ignored event on pod %s.%s", pod.Name, pod.Namespace)
727 }
728
729
730
731
732
733 func (wp *workloadPublisher) updateExternalWorkload(externalWorkload *ext.ExternalWorkload) {
734 wp.mu.Lock()
735 defer wp.mu.Unlock()
736
737 wp.addr.ExternalWorkload = externalWorkload
738
739
740 if wp.addr.ExternalWorkload != nil && len(wp.addr.ExternalWorkload.GetOwnerReferences()) == 1 {
741 wp.addr.OwnerKind = wp.addr.ExternalWorkload.GetOwnerReferences()[0].Kind
742 wp.addr.OwnerName = wp.addr.ExternalWorkload.GetOwnerReferences()[0].Name
743 }
744
745
746 if err := SetToServerProtocolExternalWorkload(wp.k8sAPI, &wp.addr); err != nil {
747 wp.log.Errorf("Error computing opaque protocol for externalworkload %s: %q", wp.addr.ExternalWorkload.GetName(), err)
748 }
749
750 for _, l := range wp.listeners {
751 if err := l.Update(&wp.addr); err != nil {
752 wp.log.Warnf("Error sending update to listener: %s", err)
753 continue
754 }
755 }
756 wp.metrics.incUpdates()
757 }
758
759
760
761 func GetAnnotatedOpaquePorts(pod *corev1.Pod, defaultPorts map[uint32]struct{}) map[uint32]struct{} {
762 if pod == nil {
763 return defaultPorts
764 }
765 annotation, ok := pod.Annotations[consts.ProxyOpaquePortsAnnotation]
766 if !ok {
767 return defaultPorts
768 }
769 opaquePorts := make(map[uint32]struct{})
770 namedPorts := util.GetNamedPorts(pod.Spec.Containers)
771 if annotation != "" {
772 for _, pr := range util.ParseContainerOpaquePorts(annotation, namedPorts) {
773 for _, port := range pr.Ports() {
774 opaquePorts[uint32(port)] = struct{}{}
775 }
776 }
777 }
778 return opaquePorts
779 }
780
781
782
783 func GetAnnotatedOpaquePortsForExternalWorkload(ew *ext.ExternalWorkload, defaultPorts map[uint32]struct{}) map[uint32]struct{} {
784 if ew == nil {
785 return defaultPorts
786 }
787 annotation, ok := ew.Annotations[consts.ProxyOpaquePortsAnnotation]
788 if !ok {
789 return defaultPorts
790 }
791 opaquePorts := make(map[uint32]struct{})
792 if annotation != "" {
793 for _, pr := range parseExternalWorkloadOpaquePorts(annotation, ew) {
794 for _, port := range pr.Ports() {
795 opaquePorts[uint32(port)] = struct{}{}
796 }
797 }
798 }
799 return opaquePorts
800 }
801
802 func parseExternalWorkloadOpaquePorts(override string, ew *ext.ExternalWorkload) []util.PortRange {
803 portRanges := util.GetPortRanges(override)
804 var values []util.PortRange
805 for _, pr := range portRanges {
806 port, named := isNamedInExternalWorkload(pr, ew)
807 if named {
808 values = append(values, util.PortRange{UpperBound: int(port), LowerBound: int(port)})
809 } else {
810 pr, err := util.ParsePortRange(pr)
811 if err != nil {
812 logging.Warnf("Invalid port range [%v]: %s", pr, err)
813 continue
814 }
815 values = append(values, pr)
816 }
817 }
818 return values
819 }
820
821 func isNamedInExternalWorkload(pr string, ew *ext.ExternalWorkload) (int32, bool) {
822 for _, p := range ew.Spec.Ports {
823 if p.Name == pr {
824 return p.Port, true
825 }
826 }
827
828 return 0, false
829 }
830
831 func isRunningAndReady(pod *corev1.Pod) bool {
832 if pod == nil || pod.Status.Phase != corev1.PodRunning {
833 return false
834 }
835 for _, condition := range pod.Status.Conditions {
836 if condition.Type == corev1.PodReady && condition.Status == corev1.ConditionTrue {
837 return true
838 }
839 }
840
841 return false
842 }
843
View as plain text