1 package watcher
2
3 import (
4 "context"
5 "errors"
6 "fmt"
7 "sort"
8 "sync"
9 "testing"
10 "time"
11
12 "github.com/linkerd/linkerd2/controller/k8s"
13 consts "github.com/linkerd/linkerd2/pkg/k8s"
14 "github.com/linkerd/linkerd2/testutil"
15 logging "github.com/sirupsen/logrus"
16 corev1 "k8s.io/api/core/v1"
17 dv1 "k8s.io/api/discovery/v1"
18 kerrors "k8s.io/apimachinery/pkg/api/errors"
19 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
20 )
21
22 type bufferingEndpointListener struct {
23 added []string
24 removed []string
25 localTrafficPolicy bool
26 noEndpointsCalled bool
27 noEndpointsExist bool
28 sync.Mutex
29 }
30
31 func newBufferingEndpointListener() *bufferingEndpointListener {
32 return &bufferingEndpointListener{
33 added: []string{},
34 removed: []string{},
35 Mutex: sync.Mutex{},
36 }
37 }
38
39 func addressString(address Address) string {
40 addressString := fmt.Sprintf("%s:%d", address.IP, address.Port)
41 if address.Identity != "" {
42 addressString = fmt.Sprintf("%s/%s", addressString, address.Identity)
43 }
44 if address.AuthorityOverride != "" {
45 addressString = fmt.Sprintf("%s/%s", addressString, address.AuthorityOverride)
46 }
47 return addressString
48 }
49
50 func (bel *bufferingEndpointListener) ExpectAdded(expected []string, t *testing.T) {
51 bel.Lock()
52 defer bel.Unlock()
53 t.Helper()
54 sort.Strings(bel.added)
55 testCompare(t, expected, bel.added)
56 }
57
58 func (bel *bufferingEndpointListener) ExpectRemoved(expected []string, t *testing.T) {
59 bel.Lock()
60 defer bel.Unlock()
61 t.Helper()
62 sort.Strings(bel.removed)
63 testCompare(t, expected, bel.removed)
64 }
65
66 func (bel *bufferingEndpointListener) endpointsAreNotCalled() bool {
67 bel.Lock()
68 defer bel.Unlock()
69 return bel.noEndpointsCalled
70 }
71
72 func (bel *bufferingEndpointListener) endpointsDoNotExist() bool {
73 bel.Lock()
74 defer bel.Unlock()
75 return bel.noEndpointsExist
76 }
77
78 func (bel *bufferingEndpointListener) Add(set AddressSet) {
79 bel.Lock()
80 defer bel.Unlock()
81 for _, address := range set.Addresses {
82 bel.added = append(bel.added, addressString(address))
83 }
84 bel.localTrafficPolicy = set.LocalTrafficPolicy
85 }
86
87 func (bel *bufferingEndpointListener) Remove(set AddressSet) {
88 bel.Lock()
89 defer bel.Unlock()
90 for _, address := range set.Addresses {
91 bel.removed = append(bel.removed, addressString(address))
92 }
93 bel.localTrafficPolicy = set.LocalTrafficPolicy
94 }
95
96 func (bel *bufferingEndpointListener) NoEndpoints(exists bool) {
97 bel.Lock()
98 defer bel.Unlock()
99 bel.noEndpointsCalled = true
100 bel.noEndpointsExist = exists
101 }
102
103 type bufferingEndpointListenerWithResVersion struct {
104 added []string
105 removed []string
106 sync.Mutex
107 }
108
109 func newBufferingEndpointListenerWithResVersion() *bufferingEndpointListenerWithResVersion {
110 return &bufferingEndpointListenerWithResVersion{
111 added: []string{},
112 removed: []string{},
113 Mutex: sync.Mutex{},
114 }
115 }
116
117 func addressStringWithResVersion(address Address) string {
118 return fmt.Sprintf("%s:%d:%s", address.IP, address.Port, address.Pod.ResourceVersion)
119 }
120
121 func (bel *bufferingEndpointListenerWithResVersion) ExpectAdded(expected []string, t *testing.T) {
122 bel.Lock()
123 defer bel.Unlock()
124 sort.Strings(bel.added)
125 testCompare(t, expected, bel.added)
126 }
127
128 func (bel *bufferingEndpointListenerWithResVersion) ExpectRemoved(expected []string, t *testing.T) {
129 bel.Lock()
130 defer bel.Unlock()
131 sort.Strings(bel.removed)
132 testCompare(t, expected, bel.removed)
133 }
134
135 func (bel *bufferingEndpointListenerWithResVersion) Add(set AddressSet) {
136 bel.Lock()
137 defer bel.Unlock()
138 for _, address := range set.Addresses {
139 bel.added = append(bel.added, addressStringWithResVersion(address))
140 }
141 }
142
143 func (bel *bufferingEndpointListenerWithResVersion) Remove(set AddressSet) {
144 bel.Lock()
145 defer bel.Unlock()
146 for _, address := range set.Addresses {
147 bel.removed = append(bel.removed, addressStringWithResVersion(address))
148 }
149 }
150
151 func (bel *bufferingEndpointListenerWithResVersion) NoEndpoints(exists bool) {}
152
153 func TestEndpointsWatcher(t *testing.T) {
154 for _, tt := range []struct {
155 serviceType string
156 k8sConfigs []string
157 id ServiceID
158 hostname string
159 port Port
160 expectedAddresses []string
161 expectedNoEndpoints bool
162 expectedNoEndpointsServiceExists bool
163 expectedError bool
164 }{
165 {
166 serviceType: "local services",
167 k8sConfigs: []string{`
168 apiVersion: v1
169 kind: Service
170 metadata:
171 name: name1
172 namespace: ns
173 spec:
174 type: LoadBalancer
175 ports:
176 - port: 8989`,
177 `
178 apiVersion: v1
179 kind: Endpoints
180 metadata:
181 name: name1
182 namespace: ns
183 subsets:
184 - addresses:
185 - ip: 172.17.0.12
186 targetRef:
187 kind: Pod
188 name: name1-1
189 namespace: ns
190 - ip: 172.17.0.19
191 targetRef:
192 kind: Pod
193 name: name1-2
194 namespace: ns
195 - ip: 172.17.0.20
196 targetRef:
197 kind: Pod
198 name: name1-3
199 namespace: ns
200 - ip: 172.17.0.21
201 ports:
202 - port: 8989`,
203 `
204 apiVersion: v1
205 kind: Pod
206 metadata:
207 name: name1-1
208 namespace: ns
209 ownerReferences:
210 - kind: ReplicaSet
211 name: rs-1
212 status:
213 phase: Running
214 podIP: 172.17.0.12`,
215 `
216 apiVersion: v1
217 kind: Pod
218 metadata:
219 name: name1-2
220 namespace: ns
221 ownerReferences:
222 - kind: ReplicaSet
223 name: rs-1
224 status:
225 phase: Running
226 podIP: 172.17.0.19`,
227 `
228 apiVersion: v1
229 kind: Pod
230 metadata:
231 name: name1-3
232 namespace: ns
233 ownerReferences:
234 - kind: ReplicaSet
235 name: rs-1
236 status:
237 phase: Running
238 podIP: 172.17.0.20`,
239 },
240 id: ServiceID{Name: "name1", Namespace: "ns"},
241 port: 8989,
242 expectedAddresses: []string{
243 "172.17.0.12:8989",
244 "172.17.0.19:8989",
245 "172.17.0.20:8989",
246 "172.17.0.21:8989",
247 },
248 expectedNoEndpoints: false,
249 expectedNoEndpointsServiceExists: false,
250 expectedError: false,
251 },
252 {
253
254 serviceType: "local NodePort service with unnamed port",
255 k8sConfigs: []string{`
256 apiVersion: v1
257 kind: Service
258 metadata:
259 name: name1
260 namespace: ns
261 spec:
262 type: NodePort
263 ports:
264 - port: 8989
265 targetPort: port1`,
266 `
267 apiVersion: v1
268 kind: Endpoints
269 metadata:
270 name: name1
271 namespace: ns
272 subsets:
273 - addresses:
274 - ip: 10.233.66.239
275 targetRef:
276 kind: Pod
277 name: name1-f748fb6b4-hpwpw
278 namespace: ns
279 - ip: 10.233.88.244
280 targetRef:
281 kind: Pod
282 name: name1-f748fb6b4-6vcmw
283 namespace: ns
284 ports:
285 - port: 8990
286 protocol: TCP`,
287 `
288 apiVersion: v1
289 kind: Pod
290 metadata:
291 name: name1-f748fb6b4-hpwpw
292 namespace: ns
293 ownerReferences:
294 - kind: ReplicaSet
295 name: rs-1
296 status:
297 podIp: 10.233.66.239
298 phase: Running`,
299 `
300 apiVersion: v1
301 kind: Pod
302 metadata:
303 name: name1-f748fb6b4-6vcmw
304 namespace: ns
305 ownerReferences:
306 - kind: ReplicaSet
307 name: rs-1
308 status:
309 podIp: 10.233.88.244
310 phase: Running`,
311 },
312 id: ServiceID{Name: "name1", Namespace: "ns"},
313 port: 8989,
314 expectedAddresses: []string{
315 "10.233.66.239:8990",
316 "10.233.88.244:8990",
317 },
318 expectedNoEndpoints: false,
319 expectedNoEndpointsServiceExists: false,
320 expectedError: false,
321 },
322 {
323
324 serviceType: "local service with named target port and differently-named service port",
325 k8sConfigs: []string{`
326 apiVersion: v1
327 kind: Service
328 metadata:
329 name: world
330 namespace: ns
331 spec:
332 type: ClusterIP
333 ports:
334 - name: app
335 port: 7778
336 targetPort: http`,
337 `
338 apiVersion: v1
339 kind: Endpoints
340 metadata:
341 name: world
342 namespace: ns
343 subsets:
344 - addresses:
345 - ip: 10.1.30.135
346 targetRef:
347 kind: Pod
348 name: world-575bf846b4-tp4hw
349 namespace: ns
350 ports:
351 - name: app
352 port: 7779
353 protocol: TCP`,
354 `
355 apiVersion: v1
356 kind: Pod
357 metadata:
358 name: world-575bf846b4-tp4hw
359 namespace: ns
360 ownerReferences:
361 - kind: ReplicaSet
362 name: rs-1
363 status:
364 podIp: 10.1.30.135
365 phase: Running`,
366 },
367 id: ServiceID{Name: "world", Namespace: "ns"},
368 port: 7778,
369 expectedAddresses: []string{
370 "10.1.30.135:7779",
371 },
372 expectedNoEndpoints: false,
373 expectedNoEndpointsServiceExists: false,
374 expectedError: false,
375 },
376 {
377 serviceType: "local services with missing addresses",
378 k8sConfigs: []string{`
379 apiVersion: v1
380 kind: Service
381 metadata:
382 name: name1
383 namespace: ns
384 spec:
385 type: LoadBalancer
386 ports:
387 - port: 8989`,
388 `
389 apiVersion: v1
390 kind: Endpoints
391 metadata:
392 name: name1
393 namespace: ns
394 subsets:
395 - addresses:
396 - ip: 172.17.0.23
397 targetRef:
398 kind: Pod
399 name: name1-1
400 namespace: ns
401 - ip: 172.17.0.24
402 targetRef:
403 kind: Pod
404 name: name1-2
405 namespace: ns
406 - ip: 172.17.0.25
407 targetRef:
408 kind: Pod
409 name: name1-3
410 namespace: ns
411 ports:
412 - port: 8989`,
413 `
414 apiVersion: v1
415 kind: Pod
416 metadata:
417 name: name1-3
418 namespace: ns
419 ownerReferences:
420 - kind: ReplicaSet
421 name: rs-1
422 status:
423 phase: Running
424 podIP: 172.17.0.25`,
425 },
426 id: ServiceID{Name: "name1", Namespace: "ns"},
427 port: 8989,
428 expectedAddresses: []string{
429 "172.17.0.25:8989",
430 },
431 expectedNoEndpoints: false,
432 expectedNoEndpointsServiceExists: false,
433 expectedError: false,
434 },
435 {
436 serviceType: "local services with no endpoints",
437 k8sConfigs: []string{`
438 apiVersion: v1
439 kind: Service
440 metadata:
441 name: name2
442 namespace: ns
443 spec:
444 type: LoadBalancer
445 ports:
446 - port: 7979`,
447 },
448 id: ServiceID{Name: "name2", Namespace: "ns"},
449 port: 7979,
450 expectedAddresses: []string{},
451 expectedNoEndpoints: true,
452 expectedNoEndpointsServiceExists: true,
453 expectedError: false,
454 },
455 {
456 serviceType: "external name services",
457 k8sConfigs: []string{`
458 apiVersion: v1
459 kind: Service
460 metadata:
461 name: name3
462 namespace: ns
463 spec:
464 type: ExternalName
465 externalName: foo`,
466 },
467 id: ServiceID{Name: "name3", Namespace: "ns"},
468 port: 6969,
469 expectedAddresses: []string{},
470 expectedNoEndpoints: false,
471 expectedNoEndpointsServiceExists: false,
472 expectedError: true,
473 },
474 {
475 serviceType: "services that do not yet exist",
476 k8sConfigs: []string{},
477 id: ServiceID{Name: "name4", Namespace: "ns"},
478 port: 5959,
479 expectedAddresses: []string{},
480 expectedNoEndpoints: true,
481 expectedNoEndpointsServiceExists: false,
482 expectedError: false,
483 },
484 {
485 serviceType: "stateful sets",
486 k8sConfigs: []string{`
487 apiVersion: v1
488 kind: Service
489 metadata:
490 name: name1
491 namespace: ns
492 spec:
493 type: LoadBalancer
494 ports:
495 - port: 8989`,
496 `
497 apiVersion: v1
498 kind: Endpoints
499 metadata:
500 name: name1
501 namespace: ns
502 subsets:
503 - addresses:
504 - ip: 172.17.0.12
505 hostname: name1-1
506 targetRef:
507 kind: Pod
508 name: name1-1
509 namespace: ns
510 - ip: 172.17.0.19
511 hostname: name1-2
512 targetRef:
513 kind: Pod
514 name: name1-2
515 namespace: ns
516 - ip: 172.17.0.20
517 hostname: name1-3
518 targetRef:
519 kind: Pod
520 name: name1-3
521 namespace: ns
522 ports:
523 - port: 8989`,
524 `
525 apiVersion: v1
526 kind: Pod
527 metadata:
528 name: name1-1
529 namespace: ns
530 ownerReferences:
531 - kind: ReplicaSet
532 name: rs-1
533 status:
534 phase: Running
535 podIP: 172.17.0.12`,
536 `
537 apiVersion: v1
538 kind: Pod
539 metadata:
540 name: name1-2
541 namespace: ns
542 ownerReferences:
543 - kind: ReplicaSet
544 name: rs-1
545 status:
546 phase: Running
547 podIP: 172.17.0.19`,
548 `
549 apiVersion: v1
550 kind: Pod
551 metadata:
552 name: name1-3
553 namespace: ns
554 ownerReferences:
555 - kind: ReplicaSet
556 name: rs-1
557 status:
558 phase: Running
559 podIP: 172.17.0.20`,
560 },
561 id: ServiceID{Name: "name1", Namespace: "ns"},
562 hostname: "name1-3",
563 port: 5959,
564 expectedAddresses: []string{"172.17.0.20:5959"},
565 expectedNoEndpoints: false,
566 expectedNoEndpointsServiceExists: false,
567 },
568 {
569 serviceType: "local service with new named port mid rollout and two subsets but only first subset is relevant",
570 k8sConfigs: []string{`
571 apiVersion: v1
572 kind: Service
573 metadata:
574 name: name1
575 namespace: ns
576 spec:
577 type: ClusterIP
578 ports:
579 - name: port1
580 port: 8989
581 targetPort: port1
582 - name: port2
583 port: 9999
584 targetPort: port2`,
585 `
586 apiVersion: v1
587 kind: Endpoints
588 metadata:
589 labels:
590 app: name1
591 name: name1
592 namespace: ns
593 subsets:
594 - addresses:
595 - ip: 172.17.0.1
596 nodeName: name1-1
597 targetRef:
598 kind: Pod
599 name: name1-1
600 namespace: ns
601 - ip: 172.17.0.2
602 nodeName: name1-2
603 targetRef:
604 kind: Pod
605 name: name1-2
606 namespace: ns
607 ports:
608 - name: port1
609 port: 8989
610 protocol: TCP
611 - addresses:
612 - ip: 172.17.0.1
613 nodeName: name1-1
614 targetRef:
615 kind: Pod
616 name: name1-1
617 namespace: ns
618 notReadyAddresses:
619 - ip: 172.17.0.2
620 nodeName: name1-2
621 targetRef:
622 kind: Pod
623 name: name1-2
624 namespace: ns
625 ports:
626 - name: port2
627 port: 9999
628 protocol: TCP
629 `,
630 `
631 apiVersion: v1
632 kind: Pod
633 metadata:
634 name: name1-1
635 namespace: ns
636 ownerReferences:
637 - kind: ReplicaSet
638 name: rs-1
639 status:
640 phase: Running
641 podIP: 172.17.0.1`,
642 `
643 apiVersion: v1
644 kind: Pod
645 metadata:
646 name: name1-2
647 namespace: ns
648 ownerReferences:
649 - kind: ReplicaSet
650 name: rs-2
651 status:
652 phase: Running
653 podIP: 172.17.0.2`,
654 },
655 id: ServiceID{Name: "name1", Namespace: "ns"},
656 port: 8989,
657 expectedAddresses: []string{
658 "172.17.0.1:8989",
659 "172.17.0.2:8989",
660 },
661 expectedNoEndpoints: false,
662 expectedNoEndpointsServiceExists: false,
663 expectedError: false,
664 },
665 } {
666 tt := tt
667 t.Run("subscribes listener to "+tt.serviceType, func(t *testing.T) {
668 k8sAPI, err := k8s.NewFakeAPI(tt.k8sConfigs...)
669 if err != nil {
670 t.Fatalf("NewFakeAPI returned an error: %s", err)
671 }
672
673 metadataAPI, err := k8s.NewFakeMetadataAPI(nil)
674 if err != nil {
675 t.Fatalf("NewFakeMetadataAPI returned an error: %s", err)
676 }
677
678 watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), false, "local")
679 if err != nil {
680 t.Fatalf("can't create Endpoints watcher: %s", err)
681 }
682
683 k8sAPI.Sync(nil)
684 metadataAPI.Sync(nil)
685
686 listener := newBufferingEndpointListener()
687
688 err = watcher.Subscribe(tt.id, tt.port, tt.hostname, listener)
689 if tt.expectedError && err == nil {
690 t.Fatal("Expected error but was ok")
691 }
692 if !tt.expectedError && err != nil {
693 t.Fatalf("Expected no error, got [%s]", err)
694 }
695
696 listener.ExpectAdded(tt.expectedAddresses, t)
697
698 if listener.endpointsAreNotCalled() != tt.expectedNoEndpoints {
699 t.Fatalf("Expected noEndpointsCalled to be [%t], got [%t]",
700 tt.expectedNoEndpoints, listener.endpointsAreNotCalled())
701 }
702
703 if listener.endpointsDoNotExist() != tt.expectedNoEndpointsServiceExists {
704 t.Fatalf("Expected noEndpointsExist to be [%t], got [%t]",
705 tt.expectedNoEndpointsServiceExists, listener.endpointsDoNotExist())
706 }
707 })
708 }
709 }
710
711 func TestEndpointsWatcherWithEndpointSlices(t *testing.T) {
712 for _, tt := range []struct {
713 serviceType string
714 k8sConfigs []string
715 id ServiceID
716 hostname string
717 port Port
718 expectedAddresses []string
719 expectedNoEndpoints bool
720 expectedNoEndpointsServiceExists bool
721 expectedError bool
722 expectedLocalTrafficPolicy bool
723 }{
724 {
725 serviceType: "local services with EndpointSlice",
726 k8sConfigs: []string{`
727 kind: APIResourceList
728 apiVersion: v1
729 groupVersion: discovery.k8s.io/v1
730 resources:
731 - name: endpointslices
732 singularName: endpointslice
733 namespaced: true
734 kind: EndpointSlice
735 verbs:
736 - delete
737 - deletecollection
738 - get
739 - list
740 - patch
741 - create
742 - update
743 - watch
744 `, `
745 apiVersion: v1
746 kind: Service
747 metadata:
748 name: name-1
749 namespace: ns
750 spec:
751 type: LoadBalancer
752 ports:
753 - port: 8989
754 internalTrafficPolicy: Local`,
755 `
756 addressType: IPv4
757 apiVersion: discovery.k8s.io/v1
758 endpoints:
759 - addresses:
760 - 172.17.0.12
761 conditions:
762 ready: true
763 targetRef:
764 kind: Pod
765 name: name-1-1
766 namespace: ns
767 topology:
768 kubernetes.io/hostname: node-1
769 - addresses:
770 - 172.17.0.19
771 conditions:
772 ready: true
773 targetRef:
774 kind: Pod
775 name: name-1-2
776 namespace: ns
777 topology:
778 kubernetes.io/hostname: node-1
779 - addresses:
780 - 172.17.0.20
781 conditions:
782 ready: true
783 targetRef:
784 kind: Pod
785 name: name-1-3
786 namespace: ns
787 topology:
788 kubernetes.io/hostname: node-2
789 - addresses:
790 - 172.17.0.21
791 conditions:
792 ready: true
793 topology:
794 kubernetes.io/hostname: node-2
795 kind: EndpointSlice
796 metadata:
797 labels:
798 kubernetes.io/service-name: name-1
799 name: name-1-bhnqh
800 namespace: ns
801 ports:
802 - name: ""
803 port: 8989`,
804 `
805 apiVersion: v1
806 kind: Pod
807 metadata:
808 name: name-1-1
809 namespace: ns
810 ownerReferences:
811 - kind: ReplicaSet
812 name: rs-1
813 status:
814 phase: Running
815 podIP: 172.17.0.12`,
816 `
817 apiVersion: v1
818 kind: Pod
819 metadata:
820 name: name-1-2
821 namespace: ns
822 ownerReferences:
823 - kind: ReplicaSet
824 name: rs-1
825 status:
826 phase: Running
827 podIP: 172.17.0.19`,
828 `
829 apiVersion: v1
830 kind: Pod
831 metadata:
832 name: name-1-3
833 namespace: ns
834 ownerReferences:
835 - kind: ReplicaSet
836 name: rs-1
837 status:
838 phase: Running
839 podIP: 172.17.0.20`,
840 },
841 id: ServiceID{Name: "name-1", Namespace: "ns"},
842 port: 8989,
843 expectedAddresses: []string{
844 "172.17.0.12:8989",
845 "172.17.0.19:8989",
846 "172.17.0.20:8989",
847 "172.17.0.21:8989",
848 },
849 expectedNoEndpoints: false,
850 expectedNoEndpointsServiceExists: false,
851 expectedError: false,
852 expectedLocalTrafficPolicy: true,
853 },
854 {
855 serviceType: "local services with missing addresses and EndpointSlice",
856 k8sConfigs: []string{`
857 kind: APIResourceList
858 apiVersion: v1
859 groupVersion: discovery.k8s.io/v1
860 resources:
861 - name: endpointslices
862 singularName: endpointslice
863 namespaced: true
864 kind: EndpointSlice
865 verbs:
866 - delete
867 - deletecollection
868 - get
869 - list
870 - patch
871 - create
872 - update
873 - watch
874 `, `
875 apiVersion: v1
876 kind: Service
877 metadata:
878 name: name-1
879 namespace: ns
880 spec:
881 type: LoadBalancer
882 ports:
883 - port: 8989`, `
884 addressType: IPv4
885 apiVersion: discovery.k8s.io/v1
886 endpoints:
887 - addresses:
888 - 172.17.0.23
889 conditions:
890 ready: true
891 targetRef:
892 kind: Pod
893 name: name-1-1
894 namespace: ns
895 topology:
896 kubernetes.io/hostname: node-1
897 - addresses:
898 - 172.17.0.24
899 conditions:
900 ready: true
901 targetRef:
902 kind: Pod
903 name: name-1-2
904 namespace: ns
905 topology:
906 kubernetes.io/hostname: node-1
907 - addresses:
908 - 172.17.0.25
909 conditions:
910 ready: true
911 targetRef:
912 kind: Pod
913 name: name-1-3
914 namespace: ns
915 topology:
916 kubernetes.io/hostname: node-2
917 kind: EndpointSlice
918 metadata:
919 labels:
920 kubernetes.io/service-name: name-1
921 name: name1-f5fad
922 namespace: ns
923 ports:
924 - name: ""
925 port: 8989`, `
926 apiVersion: v1
927 kind: Pod
928 metadata:
929 name: name-1-3
930 namespace: ns
931 ownerReferences:
932 - kind: ReplicaSet
933 name: rs-1
934 status:
935 podIP: 172.17.0.25
936 phase: Running`,
937 },
938 id: ServiceID{Name: "name-1", Namespace: "ns"},
939 port: 8989,
940 expectedAddresses: []string{"172.17.0.25:8989"},
941 expectedNoEndpoints: false,
942 expectedNoEndpointsServiceExists: false,
943 expectedError: false,
944 },
945 {
946 serviceType: "local services with no EndpointSlices",
947 k8sConfigs: []string{`
948 kind: APIResourceList
949 apiVersion: v1
950 groupVersion: discovery.k8s.io/v1
951 resources:
952 - name: endpointslices
953 singularName: endpointslice
954 namespaced: true
955 kind: EndpointSlice
956 verbs:
957 - delete
958 - deletecollection
959 - get
960 - list
961 - patch
962 - create
963 - update
964 - watch
965 `, `
966 apiVersion: v1
967 kind: Service
968 metadata:
969 name: name-2
970 namespace: ns
971 spec:
972 type: LoadBalancer
973 ports:
974 - port: 7979`,
975 },
976 id: ServiceID{Name: "name-2", Namespace: "ns"},
977 port: 7979,
978 expectedAddresses: []string{},
979 expectedNoEndpoints: true,
980 expectedNoEndpointsServiceExists: true,
981 expectedError: false,
982 },
983 {
984 serviceType: "external name services with EndpointSlices",
985 k8sConfigs: []string{`
986 kind: APIResourceList
987 apiVersion: v1
988 groupVersion: discovery.k8s.io/v1
989 resources:
990 - name: endpointslices
991 singularName: endpointslice
992 namespaced: true
993 kind: EndpointSlice
994 verbs:
995 - delete
996 - deletecollection
997 - get
998 - list
999 - patch
1000 - create
1001 - update
1002 - watch
1003 `, `
1004 apiVersion: v1
1005 kind: Service
1006 metadata:
1007 name: name-3-external-svc
1008 namespace: ns
1009 spec:
1010 type: ExternalName
1011 externalName: foo`,
1012 },
1013 id: ServiceID{Name: "name-3-external-svc", Namespace: "ns"},
1014 port: 7777,
1015 expectedAddresses: []string{},
1016 expectedNoEndpoints: false,
1017 expectedNoEndpointsServiceExists: false,
1018 expectedError: true,
1019 },
1020 {
1021 serviceType: "services that do not exist",
1022 k8sConfigs: []string{},
1023 id: ServiceID{Name: "name-4-inexistent-svc", Namespace: "ns"},
1024 port: 5555,
1025 expectedAddresses: []string{},
1026 expectedNoEndpoints: true,
1027 expectedNoEndpointsServiceExists: false,
1028 expectedError: false,
1029 },
1030 {
1031 serviceType: "stateful sets with EndpointSlices",
1032 k8sConfigs: []string{`
1033 kind: APIResourceList
1034 apiVersion: v1
1035 groupVersion: discovery.k8s.io/v1
1036 resources:
1037 - name: endpointslices
1038 singularName: endpointslice
1039 namespaced: true
1040 kind: EndpointSlice
1041 verbs:
1042 - delete
1043 - deletecollection
1044 - get
1045 - list
1046 - patch
1047 - create
1048 - update
1049 - watch
1050 `, `
1051 apiVersion: v1
1052 kind: Service
1053 metadata:
1054 name: name-1
1055 namespace: ns
1056 spec:
1057 type: LoadBalancer
1058 ports:
1059 - port: 8989`, `
1060 addressType: IPv4
1061 apiVersion: discovery.k8s.io/v1
1062 endpoints:
1063 - addresses:
1064 - 172.17.0.12
1065 conditions:
1066 ready: true
1067 hostname: name-1-1
1068 targetRef:
1069 kind: Pod
1070 name: name-1-1
1071 namespace: ns
1072 topology:
1073 kubernetes.io/hostname: node-1
1074 - addresses:
1075 - 172.17.0.19
1076 hostname: name-1-2
1077 conditions:
1078 ready: true
1079 targetRef:
1080 kind: Pod
1081 name: name-1-2
1082 namespace: ns
1083 topology:
1084 kubernetes.io/hostname: node-1
1085 - addresses:
1086 - 172.17.0.20
1087 hostname: name-1-3
1088 conditions:
1089 ready: true
1090 targetRef:
1091 kind: Pod
1092 name: name-1-3
1093 namespace: ns
1094 topology:
1095 kubernetes.io/hostname: node-2
1096 kind: EndpointSlice
1097 metadata:
1098 labels:
1099 kubernetes.io/service-name: name-1
1100 name: name-1-f5fad
1101 namespace: ns
1102 ports:
1103 - name: ""
1104 port: 8989`, `
1105 apiVersion: v1
1106 kind: Pod
1107 metadata:
1108 name: name-1-1
1109 namespace: ns
1110 ownerReferences:
1111 - kind: ReplicaSet
1112 name: rs-1
1113 status:
1114 phase: Running
1115 podIP: 172.17.0.12`,
1116 `
1117 apiVersion: v1
1118 kind: Pod
1119 metadata:
1120 name: name-1-2
1121 namespace: ns
1122 ownerReferences:
1123 - kind: ReplicaSet
1124 name: rs-1
1125 status:
1126 phase: Running
1127 podIP: 172.17.0.19`,
1128 `
1129 apiVersion: v1
1130 kind: Pod
1131 metadata:
1132 name: name-1-3
1133 namespace: ns
1134 ownerReferences:
1135 - kind: ReplicaSet
1136 name: rs-1
1137 status:
1138 phase: Running
1139 podIP: 172.17.0.20`,
1140 },
1141 id: ServiceID{Name: "name-1", Namespace: "ns"},
1142 hostname: "name-1-3",
1143 port: 6000,
1144 expectedAddresses: []string{"172.17.0.20:6000"},
1145 expectedNoEndpoints: false,
1146 expectedNoEndpointsServiceExists: false,
1147 expectedError: false,
1148 },
1149 {
1150 serviceType: "service with EndpointSlice without labels",
1151 k8sConfigs: []string{`
1152 kind: APIResourceList
1153 apiVersion: v1
1154 groupVersion: discovery.k8s.io/v1
1155 resources:
1156 - name: endpointslices
1157 singularName: endpointslice
1158 namespaced: true
1159 kind: EndpointSlice
1160 verbs:
1161 - delete
1162 - deletecollection
1163 - get
1164 - list
1165 - patch
1166 - create
1167 - update
1168 - watch
1169 `, `
1170 apiVersion: v1
1171 kind: Service
1172 metadata:
1173 name: name-5
1174 namespace: ns
1175 spec:
1176 type: LoadBalancer
1177 ports:
1178 - port: 8989`, `
1179 addressType: IPv4
1180 apiVersion: discovery.k8s.io/v1
1181 endpoints:
1182 - addresses:
1183 - 172.17.0.12
1184 conditions:
1185 ready: true
1186 hostname: name-1-1
1187 targetRef:
1188 kind: Pod
1189 name: name-1-1
1190 namespace: ns
1191 topology:
1192 kubernetes.io/hostname: node-1
1193 kind: EndpointSlice
1194 metadata:
1195 labels:
1196 name: name-1-f5fad
1197 namespace: ns
1198 ports:
1199 - name: ""
1200 port: 8989`, `
1201 apiVersion: v1
1202 kind: Pod
1203 metadata:
1204 name: name-1-1
1205 namespace: ns
1206 ownerReferences:
1207 - kind: ReplicaSet
1208 name: rs-1
1209 status:
1210 phase: Running
1211 podIP: 172.17.0.12`,
1212 },
1213 id: ServiceID{Name: "name-5", Namespace: "ns"},
1214 port: 8989,
1215 expectedAddresses: []string{},
1216 expectedNoEndpoints: true,
1217 expectedNoEndpointsServiceExists: true,
1218 expectedError: false,
1219 },
1220 {
1221 serviceType: "service with IPv6 address type EndpointSlice",
1222 k8sConfigs: []string{`
1223 kind: APIResourceList
1224 apiVersion: v1
1225 groupVersion: discovery.k8s.io/v1
1226 resources:
1227 - name: endpointslices
1228 singularName: endpointslice
1229 namespaced: true
1230 kind: EndpointSlice
1231 verbs:
1232 - delete
1233 - deletecollection
1234 - get
1235 - list
1236 - patch
1237 - create
1238 - update
1239 - watch
1240 `, `
1241 apiVersion: v1
1242 kind: Service
1243 metadata:
1244 name: name-5
1245 namespace: ns
1246 spec:
1247 type: LoadBalancer
1248 ports:
1249 - port: 9000`, `
1250 addressType: IPv6
1251 apiVersion: discovery.k8s.io/v1
1252 endpoints:
1253 - addresses:
1254 - 0:0:0:0:0:0:0:1
1255 conditions:
1256 ready: true
1257 targetRef:
1258 kind: Pod
1259 name: name-5-1
1260 namespace: ns
1261 topology:
1262 kubernetes.io/hostname: node-1
1263 kind: EndpointSlice
1264 metadata:
1265 labels:
1266 name: name-5-f65dv
1267 namespace: ns
1268 ownerReferences:
1269 - apiVersion: v1
1270 kind: Service
1271 name: name-5
1272 ports:
1273 - name: ""
1274 port: 9000`, `
1275 apiVersion: v1
1276 kind: Pod
1277 metadata:
1278 name: name-5-1
1279 namespace: ns
1280 ownerReferences:
1281 - kind: ReplicaSet
1282 name: rs-1
1283 status:
1284 phase: Running
1285 podIP: 0:0:0:0:0:0:0:1`,
1286 },
1287 id: ServiceID{Name: "name-5", Namespace: "ns"},
1288 port: 9000,
1289 expectedAddresses: []string{},
1290 expectedNoEndpoints: true,
1291 expectedNoEndpointsServiceExists: true,
1292 expectedError: false,
1293 }} {
1294 tt := tt
1295 t.Run("subscribes listener to "+tt.serviceType, func(t *testing.T) {
1296 k8sAPI, err := k8s.NewFakeAPI(tt.k8sConfigs...)
1297 if err != nil {
1298 t.Fatalf("NewFakeAPI returned an error: %s", err)
1299 }
1300
1301 metadataAPI, err := k8s.NewFakeMetadataAPI(nil)
1302 if err != nil {
1303 t.Fatalf("NewFakeMetadataAPI returned an error: %s", err)
1304 }
1305
1306 watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), true, "local")
1307 if err != nil {
1308 t.Fatalf("can't create Endpoints watcher: %s", err)
1309 }
1310
1311 k8sAPI.Sync(nil)
1312 metadataAPI.Sync(nil)
1313
1314 listener := newBufferingEndpointListener()
1315
1316 err = watcher.Subscribe(tt.id, tt.port, tt.hostname, listener)
1317 if tt.expectedError && err == nil {
1318 t.Fatal("Expected error but was ok")
1319 }
1320 if !tt.expectedError && err != nil {
1321 t.Fatalf("Expected no error, got [%s]", err)
1322 }
1323
1324 if listener.localTrafficPolicy != tt.expectedLocalTrafficPolicy {
1325 t.Fatalf("Expected localTrafficPolicy [%v], got [%v]", tt.expectedLocalTrafficPolicy, listener.localTrafficPolicy)
1326 }
1327
1328 listener.ExpectAdded(tt.expectedAddresses, t)
1329
1330 if listener.endpointsAreNotCalled() != tt.expectedNoEndpoints {
1331 t.Fatalf("Expected noEndpointsCalled to be [%t], got [%t]",
1332 tt.expectedNoEndpoints, listener.endpointsAreNotCalled())
1333 }
1334
1335 if listener.endpointsDoNotExist() != tt.expectedNoEndpointsServiceExists {
1336 t.Fatalf("Expected noEndpointsExist to be [%t], got [%t]",
1337 tt.expectedNoEndpointsServiceExists, listener.endpointsDoNotExist())
1338 }
1339 })
1340 }
1341 }
1342
1343 func TestEndpointsWatcherWithEndpointSlicesExternalWorkload(t *testing.T) {
1344 for _, tt := range []struct {
1345 serviceType string
1346 k8sConfigs []string
1347 id ServiceID
1348 hostname string
1349 port Port
1350 expectedAddresses []string
1351 expectedNoEndpoints bool
1352 expectedNoEndpointsServiceExists bool
1353 expectedError bool
1354 expectedLocalTrafficPolicy bool
1355 }{
1356 {
1357 serviceType: "local services with EndpointSlice",
1358 k8sConfigs: []string{`
1359 kind: APIResourceList
1360 apiVersion: v1
1361 groupVersion: discovery.k8s.io/v1
1362 resources:
1363 - name: endpointslices
1364 singularName: endpointslice
1365 namespaced: true
1366 kind: EndpointSlice
1367 verbs:
1368 - delete
1369 - deletecollection
1370 - get
1371 - list
1372 - patch
1373 - create
1374 - update
1375 - watch
1376 `, `
1377 apiVersion: v1
1378 kind: Service
1379 metadata:
1380 name: name-1
1381 namespace: ns
1382 spec:
1383 type: LoadBalancer
1384 ports:
1385 - port: 8989
1386 internalTrafficPolicy: Local`,
1387 `
1388 addressType: IPv4
1389 apiVersion: discovery.k8s.io/v1
1390 endpoints:
1391 - addresses:
1392 - 172.17.0.12
1393 conditions:
1394 ready: true
1395 targetRef:
1396 kind: ExternalWorkload
1397 name: name-1-1
1398 namespace: ns
1399 topology:
1400 kubernetes.io/hostname: node-1
1401 - addresses:
1402 - 172.17.0.19
1403 conditions:
1404 ready: true
1405 targetRef:
1406 kind: ExternalWorkload
1407 name: name-1-2
1408 namespace: ns
1409 topology:
1410 kubernetes.io/hostname: node-1
1411 - addresses:
1412 - 172.17.0.20
1413 conditions:
1414 ready: true
1415 targetRef:
1416 kind: ExternalWorkload
1417 name: name-1-3
1418 namespace: ns
1419 topology:
1420 kubernetes.io/hostname: node-2
1421 - addresses:
1422 - 172.17.0.21
1423 conditions:
1424 ready: true
1425 topology:
1426 kubernetes.io/hostname: node-2
1427 kind: EndpointSlice
1428 metadata:
1429 labels:
1430 kubernetes.io/service-name: name-1
1431 name: name-1-bhnqh
1432 namespace: ns
1433 ports:
1434 - name: ""
1435 port: 8989`,
1436 `
1437 apiVersion: workload.linkerd.io/v1beta1
1438 kind: ExternalWorkload
1439 metadata:
1440 name: name-1-1
1441 namespace: ns
1442 status:
1443 conditions:
1444 ready: true`,
1445 `
1446 apiVersion: workload.linkerd.io/v1beta1
1447 kind: ExternalWorkload
1448 metadata:
1449 name: name-1-2
1450 namespace: ns
1451 status:
1452 conditions:
1453 ready: true`,
1454 `
1455 apiVersion: workload.linkerd.io/v1beta1
1456 kind: ExternalWorkload
1457 metadata:
1458 name: name-1-3
1459 namespace: ns
1460 status:
1461 conditions:
1462 ready: true`,
1463 },
1464 id: ExternalWorkloadID{Name: "name-1", Namespace: "ns"},
1465 port: 8989,
1466 expectedAddresses: []string{
1467 "172.17.0.12:8989",
1468 "172.17.0.19:8989",
1469 "172.17.0.20:8989",
1470 "172.17.0.21:8989",
1471 },
1472 expectedNoEndpoints: false,
1473 expectedNoEndpointsServiceExists: false,
1474 expectedError: false,
1475 expectedLocalTrafficPolicy: true,
1476 },
1477 {
1478 serviceType: "local services with missing addresses and EndpointSlice",
1479 k8sConfigs: []string{`
1480 kind: APIResourceList
1481 apiVersion: v1
1482 groupVersion: discovery.k8s.io/v1
1483 resources:
1484 - name: endpointslices
1485 singularName: endpointslice
1486 namespaced: true
1487 kind: EndpointSlice
1488 verbs:
1489 - delete
1490 - deletecollection
1491 - get
1492 - list
1493 - patch
1494 - create
1495 - update
1496 - watch
1497 `, `
1498 apiVersion: v1
1499 kind: Service
1500 metadata:
1501 name: name-1
1502 namespace: ns
1503 spec:
1504 type: LoadBalancer
1505 ports:
1506 - port: 8989`, `
1507 addressType: IPv4
1508 apiVersion: discovery.k8s.io/v1
1509 endpoints:
1510 - addresses:
1511 - 172.17.0.23
1512 conditions:
1513 ready: true
1514 targetRef:
1515 kind: ExternalWorkload
1516 name: name-1-1
1517 namespace: ns
1518 topology:
1519 kubernetes.io/hostname: node-1
1520 - addresses:
1521 - 172.17.0.24
1522 conditions:
1523 ready: true
1524 targetRef:
1525 kind: ExternalWorkload
1526 name: name-1-2
1527 namespace: ns
1528 topology:
1529 kubernetes.io/hostname: node-1
1530 - addresses:
1531 - 172.17.0.25
1532 conditions:
1533 ready: true
1534 targetRef:
1535 kind: ExternalWorkload
1536 name: name-1-3
1537 namespace: ns
1538 topology:
1539 kubernetes.io/hostname: node-2
1540 kind: EndpointSlice
1541 metadata:
1542 labels:
1543 kubernetes.io/service-name: name-1
1544 name: name1-f5fad
1545 namespace: ns
1546 ports:
1547 - name: ""
1548 port: 8989`, `
1549 apiVersion: workload.linkerd.io/v1beta1
1550 kind: ExternalWorkload
1551 metadata:
1552 name: name-1-3
1553 namespace: ns
1554 status:
1555 conditions:
1556 ready: true`,
1557 },
1558 id: ServiceID{Name: "name-1", Namespace: "ns"},
1559 port: 8989,
1560 expectedAddresses: []string{"172.17.0.25:8989"},
1561 expectedNoEndpoints: false,
1562 expectedNoEndpointsServiceExists: false,
1563 expectedError: false,
1564 },
1565 {
1566 serviceType: "service with EndpointSlice without labels",
1567 k8sConfigs: []string{`
1568 kind: APIResourceList
1569 apiVersion: v1
1570 groupVersion: discovery.k8s.io/v1
1571 resources:
1572 - name: endpointslices
1573 singularName: endpointslice
1574 namespaced: true
1575 kind: EndpointSlice
1576 verbs:
1577 - delete
1578 - deletecollection
1579 - get
1580 - list
1581 - patch
1582 - create
1583 - update
1584 - watch
1585 `, `
1586 apiVersion: v1
1587 kind: Service
1588 metadata:
1589 name: name-5
1590 namespace: ns
1591 spec:
1592 type: LoadBalancer
1593 ports:
1594 - port: 8989`, `
1595 addressType: IPv4
1596 apiVersion: discovery.k8s.io/v1
1597 endpoints:
1598 - addresses:
1599 - 172.17.0.12
1600 conditions:
1601 ready: true
1602 hostname: name-1-1
1603 targetRef:
1604 kind: ExternalWorkload
1605 name: name-1-1
1606 namespace: ns
1607 topology:
1608 kubernetes.io/hostname: node-1
1609 kind: EndpointSlice
1610 metadata:
1611 labels:
1612 name: name-1-f5fad
1613 namespace: ns
1614 ports:
1615 - name: ""
1616 port: 8989`, `
1617 apiVersion: workload.linkerd.io/v1beta1
1618 kind: ExternalWorkload
1619 metadata:
1620 name: name-1-1
1621 namespace: ns
1622 status:
1623 conditions:
1624 ready: true`,
1625 },
1626 id: ServiceID{Name: "name-5", Namespace: "ns"},
1627 port: 8989,
1628 expectedAddresses: []string{},
1629 expectedNoEndpoints: true,
1630 expectedNoEndpointsServiceExists: true,
1631 expectedError: false,
1632 },
1633
1634 {
1635 serviceType: "service with IPv6 address type EndpointSlice",
1636 k8sConfigs: []string{`
1637 kind: APIResourceList
1638 apiVersion: v1
1639 groupVersion: discovery.k8s.io/v1
1640 resources:
1641 - name: endpointslices
1642 singularName: endpointslice
1643 namespaced: true
1644 kind: EndpointSlice
1645 verbs:
1646 - delete
1647 - deletecollection
1648 - get
1649 - list
1650 - patch
1651 - create
1652 - update
1653 - watch
1654 `, `
1655 apiVersion: v1
1656 kind: Service
1657 metadata:
1658 name: name-5
1659 namespace: ns
1660 spec:
1661 type: LoadBalancer
1662 ports:
1663 - port: 9000`, `
1664 addressType: IPv6
1665 apiVersion: discovery.k8s.io/v1
1666 endpoints:
1667 - addresses:
1668 - 0:0:0:0:0:0:0:1
1669 conditions:
1670 ready: true
1671 targetRef:
1672 kind: ExternalWorkload
1673 name: name-5-1
1674 namespace: ns
1675 topology:
1676 kubernetes.io/hostname: node-1
1677 kind: EndpointSlice
1678 metadata:
1679 labels:
1680 name: name-5-f65dv
1681 namespace: ns
1682 ownerReferences:
1683 - apiVersion: v1
1684 kind: Service
1685 name: name-5
1686 ports:
1687 - name: ""
1688 port: 9000`, `
1689 apiVersion: workload.linkerd.io/v1beta1
1690 kind: ExternalWorkload
1691 metadata:
1692 name: name-5-1
1693 namespace: ns
1694 status:
1695 conditions:
1696 ready: true`,
1697 },
1698 id: ServiceID{Name: "name-5", Namespace: "ns"},
1699 port: 9000,
1700 expectedAddresses: []string{},
1701 expectedNoEndpoints: true,
1702 expectedNoEndpointsServiceExists: true,
1703 expectedError: false,
1704 },
1705 } {
1706 tt := tt
1707 t.Run("subscribes listener to "+tt.serviceType, func(t *testing.T) {
1708 k8sAPI, err := k8s.NewFakeAPI(tt.k8sConfigs...)
1709 if err != nil {
1710 t.Fatalf("NewFakeAPI returned an error: %s", err)
1711 }
1712
1713 metadataAPI, err := k8s.NewFakeMetadataAPI(nil)
1714 if err != nil {
1715 t.Fatalf("NewFakeMetadataAPI returned an error: %s", err)
1716 }
1717
1718 watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), true, "local")
1719 if err != nil {
1720 t.Fatalf("can't create Endpoints watcher: %s", err)
1721 }
1722
1723 k8sAPI.Sync(nil)
1724 metadataAPI.Sync(nil)
1725
1726 listener := newBufferingEndpointListener()
1727
1728 err = watcher.Subscribe(tt.id, tt.port, tt.hostname, listener)
1729 if tt.expectedError && err == nil {
1730 t.Fatal("Expected error but was ok")
1731 }
1732 if !tt.expectedError && err != nil {
1733 t.Fatalf("Expected no error, got [%s]", err)
1734 }
1735
1736 if listener.localTrafficPolicy != tt.expectedLocalTrafficPolicy {
1737 t.Fatalf("Expected localTrafficPolicy [%v], got [%v]", tt.expectedLocalTrafficPolicy, listener.localTrafficPolicy)
1738 }
1739
1740 listener.ExpectAdded(tt.expectedAddresses, t)
1741
1742 if listener.endpointsAreNotCalled() != tt.expectedNoEndpoints {
1743 t.Fatalf("Expected noEndpointsCalled to be [%t], got [%t]",
1744 tt.expectedNoEndpoints, listener.endpointsAreNotCalled())
1745 }
1746
1747 if listener.endpointsDoNotExist() != tt.expectedNoEndpointsServiceExists {
1748 t.Fatalf("Expected noEndpointsExist to be [%t], got [%t]",
1749 tt.expectedNoEndpointsServiceExists, listener.endpointsDoNotExist())
1750 }
1751 })
1752 }
1753 }
1754
1755 func TestEndpointsWatcherDeletion(t *testing.T) {
1756 k8sConfigs := []string{`
1757 apiVersion: v1
1758 kind: Service
1759 metadata:
1760 name: name1
1761 namespace: ns
1762 spec:
1763 type: LoadBalancer
1764 ports:
1765 - port: 8989`,
1766 `
1767 apiVersion: v1
1768 kind: Endpoints
1769 metadata:
1770 name: name1
1771 namespace: ns
1772 subsets:
1773 - addresses:
1774 - ip: 172.17.0.12
1775 targetRef:
1776 kind: Pod
1777 name: name1-1
1778 namespace: ns
1779 ports:
1780 - port: 8989`,
1781 `
1782 apiVersion: v1
1783 kind: Pod
1784 metadata:
1785 name: name1-1
1786 namespace: ns
1787 status:
1788 phase: Running
1789 podIP: 172.17.0.12`}
1790
1791 for _, tt := range []struct {
1792 serviceType string
1793 k8sConfigs []string
1794 id ServiceID
1795 hostname string
1796 port Port
1797 objectToDelete interface{}
1798 deletingServices bool
1799 }{
1800 {
1801 serviceType: "can delete endpoints",
1802 k8sConfigs: k8sConfigs,
1803 id: ServiceID{Name: "name1", Namespace: "ns"},
1804 port: 8989,
1805 hostname: "name1-1",
1806 objectToDelete: &corev1.Endpoints{ObjectMeta: metav1.ObjectMeta{Name: "name1", Namespace: "ns"}},
1807 },
1808 {
1809 serviceType: "can delete endpoints when wrapped in a DeletedFinalStateUnknown",
1810 k8sConfigs: k8sConfigs,
1811 id: ServiceID{Name: "name1", Namespace: "ns"},
1812 port: 8989,
1813 hostname: "name1-1",
1814 objectToDelete: &corev1.Endpoints{ObjectMeta: metav1.ObjectMeta{Name: "name1", Namespace: "ns"}},
1815 },
1816 {
1817 serviceType: "can delete services",
1818 k8sConfigs: k8sConfigs,
1819 id: ServiceID{Name: "name1", Namespace: "ns"},
1820 port: 8989,
1821 hostname: "name1-1",
1822 objectToDelete: &corev1.Service{ObjectMeta: metav1.ObjectMeta{Name: "name1", Namespace: "ns"}},
1823 deletingServices: true,
1824 },
1825 {
1826 serviceType: "can delete services when wrapped in a DeletedFinalStateUnknown",
1827 k8sConfigs: k8sConfigs,
1828 id: ServiceID{Name: "name1", Namespace: "ns"},
1829 port: 8989,
1830 hostname: "name1-1",
1831 objectToDelete: &corev1.Service{ObjectMeta: metav1.ObjectMeta{Name: "name1", Namespace: "ns"}},
1832 deletingServices: true,
1833 },
1834 } {
1835
1836 tt := tt
1837 t.Run("subscribes listener to "+tt.serviceType, func(t *testing.T) {
1838 k8sAPI, err := k8s.NewFakeAPI(tt.k8sConfigs...)
1839 if err != nil {
1840 t.Fatalf("NewFakeAPI returned an error: %s", err)
1841 }
1842
1843 metadataAPI, err := k8s.NewFakeMetadataAPI(nil)
1844 if err != nil {
1845 t.Fatalf("NewFakeMetadataAPI returned an error: %s", err)
1846 }
1847
1848 watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), false, "local")
1849 if err != nil {
1850 t.Fatalf("can't create Endpoints watcher: %s", err)
1851 }
1852
1853 k8sAPI.Sync(nil)
1854 metadataAPI.Sync(nil)
1855
1856 listener := newBufferingEndpointListener()
1857
1858 err = watcher.Subscribe(tt.id, tt.port, tt.hostname, listener)
1859 if err != nil {
1860 t.Fatal(err)
1861 }
1862
1863 if tt.deletingServices {
1864 watcher.deleteService(tt.objectToDelete)
1865 } else {
1866 watcher.deleteEndpoints(tt.objectToDelete)
1867 }
1868
1869 if !listener.endpointsAreNotCalled() {
1870 t.Fatal("Expected NoEndpoints to be Called")
1871 }
1872 })
1873
1874 }
1875 }
1876
1877 func TestEndpointsWatcherDeletionWithEndpointSlices(t *testing.T) {
1878 k8sConfigsWithES := []string{`
1879 kind: APIResourceList
1880 apiVersion: v1
1881 groupVersion: discovery.k8s.io/v1
1882 resources:
1883 - name: endpointslices
1884 singularName: endpointslice
1885 namespaced: true
1886 kind: EndpointSlice
1887 verbs:
1888 - delete
1889 - deletecollection
1890 - get
1891 - list
1892 - patch
1893 - create
1894 - update
1895 - watch
1896 `, `
1897 apiVersion: v1
1898 kind: Service
1899 metadata:
1900 name: name1
1901 namespace: ns
1902 spec:
1903 type: LoadBalancer
1904 ports:
1905 - port: 8989`, `
1906 addressType: IPv4
1907 apiVersion: discovery.k8s.io/v1
1908 endpoints:
1909 - addresses:
1910 - 172.17.0.12
1911 conditions:
1912 ready: true
1913 targetRef:
1914 kind: Pod
1915 name: name1-1
1916 namespace: ns
1917 topology:
1918 kubernetes.io/hostname: node-1
1919 kind: EndpointSlice
1920 metadata:
1921 labels:
1922 kubernetes.io/service-name: name1
1923 name: name1-del
1924 namespace: ns
1925 ports:
1926 - name: ""
1927 port: 8989`, `
1928 apiVersion: v1
1929 kind: Pod
1930 metadata:
1931 name: name1-1
1932 namespace: ns
1933 status:
1934 phase: Running
1935 podIP: 172.17.0.12`}
1936
1937 k8sConfigWithMultipleES := append(k8sConfigsWithES, []string{`
1938 addressType: IPv4
1939 apiVersion: discovery.k8s.io/v1
1940 endpoints:
1941 - addresses:
1942 - 172.17.0.13
1943 conditions:
1944 ready: true
1945 targetRef:
1946 kind: Pod
1947 name: name1-2
1948 namespace: ns
1949 topology:
1950 kubernetes.io/hostname: node-1
1951 kind: EndpointSlice
1952 metadata:
1953 labels:
1954 kubernetes.io/service-name: name1
1955 name: name1-live
1956 namespace: ns
1957 ports:
1958 - name: ""
1959 port: 8989`, `apiVersion: v1
1960 kind: Pod
1961 metadata:
1962 name: name1-2
1963 namespace: ns
1964 status:
1965 phase: Running
1966 podIP: 172.17.0.13`}...)
1967
1968 for _, tt := range []struct {
1969 serviceType string
1970 k8sConfigs []string
1971 id ServiceID
1972 hostname string
1973 port Port
1974 objectToDelete interface{}
1975 deletingServices bool
1976 hasSliceAccess bool
1977 noEndpointsCalled bool
1978 }{
1979 {
1980 serviceType: "can delete an EndpointSlice",
1981 k8sConfigs: k8sConfigsWithES,
1982 id: ServiceID{Name: "name1", Namespace: "ns"},
1983 port: 8989,
1984 hostname: "name1-1",
1985 objectToDelete: createTestEndpointSlice(consts.PodKind),
1986 hasSliceAccess: true,
1987 noEndpointsCalled: true,
1988 },
1989 {
1990 serviceType: "can delete an EndpointSlice when wrapped in a DeletedFinalStateUnknown",
1991 k8sConfigs: k8sConfigsWithES,
1992 id: ServiceID{Name: "name1", Namespace: "ns"},
1993 port: 8989,
1994 hostname: "name1-1",
1995 objectToDelete: createTestEndpointSlice(consts.PodKind),
1996 hasSliceAccess: true,
1997 noEndpointsCalled: true,
1998 },
1999 {
2000 serviceType: "can delete an EndpointSlice when there are multiple ones",
2001 k8sConfigs: k8sConfigWithMultipleES,
2002 id: ServiceID{Name: "name1", Namespace: "ns"},
2003 port: 8989,
2004 hostname: "name1-1",
2005 objectToDelete: createTestEndpointSlice(consts.PodKind),
2006 hasSliceAccess: true,
2007 noEndpointsCalled: false,
2008 },
2009 } {
2010 tt := tt
2011 t.Run("subscribes listener to "+tt.serviceType, func(t *testing.T) {
2012 k8sAPI, err := k8s.NewFakeAPI(tt.k8sConfigs...)
2013 if err != nil {
2014 t.Fatalf("NewFakeAPI returned an error: %s", err)
2015 }
2016
2017 metadataAPI, err := k8s.NewFakeMetadataAPI(nil)
2018 if err != nil {
2019 t.Fatalf("NewFakeMetadataAPI returned an error: %s", err)
2020 }
2021
2022 watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), true, "local")
2023 if err != nil {
2024 t.Fatalf("can't create Endpoints watcher: %s", err)
2025 }
2026
2027 k8sAPI.Sync(nil)
2028 metadataAPI.Sync(nil)
2029
2030 listener := newBufferingEndpointListener()
2031
2032 err = watcher.Subscribe(tt.id, tt.port, tt.hostname, listener)
2033 if err != nil {
2034 t.Fatal(err)
2035 }
2036
2037 watcher.deleteEndpointSlice(tt.objectToDelete)
2038
2039 if listener.endpointsAreNotCalled() != tt.noEndpointsCalled {
2040 t.Fatalf("Expected noEndpointsCalled to be [%t], got [%t]",
2041 tt.noEndpointsCalled, listener.endpointsAreNotCalled())
2042 }
2043 })
2044 }
2045 }
2046
2047 func TestEndpointsWatcherDeletionWithEndpointSlicesExternalWorkload(t *testing.T) {
2048 k8sConfigsWithES := []string{`
2049 kind: APIResourceList
2050 apiVersion: v1
2051 groupVersion: discovery.k8s.io/v1
2052 resources:
2053 - name: endpointslices
2054 singularName: endpointslice
2055 namespaced: true
2056 kind: EndpointSlice
2057 verbs:
2058 - delete
2059 - deletecollection
2060 - get
2061 - list
2062 - patch
2063 - create
2064 - update
2065 - watch
2066 `, `
2067 apiVersion: v1
2068 kind: Service
2069 metadata:
2070 name: name1
2071 namespace: ns
2072 spec:
2073 type: LoadBalancer
2074 ports:
2075 - port: 8989`, `
2076 addressType: IPv4
2077 apiVersion: discovery.k8s.io/v1
2078 endpoints:
2079 - addresses:
2080 - 172.17.0.12
2081 conditions:
2082 ready: true
2083 targetRef:
2084 kind: ExternalWorkload
2085 name: name1-1
2086 namespace: ns
2087 topology:
2088 kubernetes.io/hostname: node-1
2089 kind: EndpointSlice
2090 metadata:
2091 labels:
2092 kubernetes.io/service-name: name1
2093 name: name1-del
2094 namespace: ns
2095 ports:
2096 - name: ""
2097 port: 8989`, `
2098 apiVersion: workload.linkerd.io/v1beta1
2099 kind: ExternalWorkload
2100 metadata:
2101 name: name1-1
2102 namespace: ns
2103 status:
2104 conditions:
2105 ready: true`}
2106
2107 k8sConfigWithMultipleES := append(k8sConfigsWithES, []string{`
2108 addressType: IPv4
2109 apiVersion: discovery.k8s.io/v1
2110 endpoints:
2111 - addresses:
2112 - 172.17.0.13
2113 conditions:
2114 ready: true
2115 targetRef:
2116 kind: ExternalWorkload
2117 name: name1-2
2118 namespace: ns
2119 topology:
2120 kubernetes.io/hostname: node-1
2121 kind: EndpointSlice
2122 metadata:
2123 labels:
2124 kubernetes.io/service-name: name1
2125 name: name1-live
2126 namespace: ns
2127 ports:
2128 - name: ""
2129 port: 8989`, `apiVersion: workload.linkerd.io/v1beta1
2130 kind: ExternalWorkload
2131 metadata:
2132 name: name1-2
2133 namespace: ns
2134 status:
2135 conditions:
2136 ready: true`}...)
2137
2138 for _, tt := range []struct {
2139 serviceType string
2140 k8sConfigs []string
2141 id ServiceID
2142 hostname string
2143 port Port
2144 objectToDelete interface{}
2145 deletingServices bool
2146 hasSliceAccess bool
2147 noEndpointsCalled bool
2148 }{
2149 {
2150 serviceType: "can delete an EndpointSlice",
2151 k8sConfigs: k8sConfigsWithES,
2152 id: ServiceID{Name: "name1", Namespace: "ns"},
2153 port: 8989,
2154 hostname: "name1-1",
2155 objectToDelete: createTestEndpointSlice(consts.ExtWorkloadKind),
2156 hasSliceAccess: true,
2157 noEndpointsCalled: true,
2158 },
2159 {
2160 serviceType: "can delete an EndpointSlice when wrapped in a DeletedFinalStateUnknown",
2161 k8sConfigs: k8sConfigsWithES,
2162 id: ServiceID{Name: "name1", Namespace: "ns"},
2163 port: 8989,
2164 hostname: "name1-1",
2165 objectToDelete: createTestEndpointSlice(consts.ExtWorkloadKind),
2166 hasSliceAccess: true,
2167 noEndpointsCalled: true,
2168 },
2169 {
2170 serviceType: "can delete an EndpointSlice when there are multiple ones",
2171 k8sConfigs: k8sConfigWithMultipleES,
2172 id: ServiceID{Name: "name1", Namespace: "ns"},
2173 port: 8989,
2174 hostname: "name1-1",
2175 objectToDelete: createTestEndpointSlice(consts.ExtWorkloadKind),
2176 hasSliceAccess: true,
2177 noEndpointsCalled: false,
2178 },
2179 } {
2180 tt := tt
2181 t.Run("subscribes listener to "+tt.serviceType, func(t *testing.T) {
2182 k8sAPI, err := k8s.NewFakeAPI(tt.k8sConfigs...)
2183 if err != nil {
2184 t.Fatalf("NewFakeAPI returned an error: %s", err)
2185 }
2186
2187 metadataAPI, err := k8s.NewFakeMetadataAPI(nil)
2188 if err != nil {
2189 t.Fatalf("NewFakeMetadataAPI returned an error: %s", err)
2190 }
2191
2192 watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), true, "local")
2193 if err != nil {
2194 t.Fatalf("can't create Endpoints watcher: %s", err)
2195 }
2196
2197 k8sAPI.Sync(nil)
2198 metadataAPI.Sync(nil)
2199
2200 listener := newBufferingEndpointListener()
2201
2202 err = watcher.Subscribe(tt.id, tt.port, tt.hostname, listener)
2203 if err != nil {
2204 t.Fatal(err)
2205 }
2206
2207 watcher.deleteEndpointSlice(tt.objectToDelete)
2208
2209 if listener.endpointsAreNotCalled() != tt.noEndpointsCalled {
2210 t.Fatalf("Expected noEndpointsCalled to be [%t], got [%t]",
2211 tt.noEndpointsCalled, listener.endpointsAreNotCalled())
2212 }
2213 })
2214 }
2215 }
2216
2217 func TestEndpointsWatcherServiceMirrors(t *testing.T) {
2218 for _, tt := range []struct {
2219 serviceType string
2220 k8sConfigs []string
2221 id ServiceID
2222 hostname string
2223 port Port
2224 expectedAddresses []string
2225 expectedNoEndpoints bool
2226 expectedNoEndpointsServiceExists bool
2227 enableEndpointSlices bool
2228 }{
2229 {
2230 k8sConfigs: []string{`
2231 apiVersion: v1
2232 kind: Service
2233 metadata:
2234 name: name1-remote
2235 namespace: ns
2236 spec:
2237 type: LoadBalancer
2238 ports:
2239 - port: 8989`,
2240 `
2241 apiVersion: v1
2242 kind: Endpoints
2243 metadata:
2244 name: name1-remote
2245 namespace: ns
2246 annotations:
2247 mirror.linkerd.io/remote-gateway-identity: "gateway-identity-1"
2248 mirror.linkerd.io/remote-svc-fq-name: "name1-remote-fq"
2249 labels:
2250 mirror.linkerd.io/mirrored-service: "true"
2251 subsets:
2252 - addresses:
2253 - ip: 172.17.0.12
2254 ports:
2255 - port: 8989`,
2256 },
2257 serviceType: "mirrored service with identity",
2258 id: ServiceID{Name: "name1-remote", Namespace: "ns"},
2259 port: 8989,
2260 expectedAddresses: []string{
2261 "172.17.0.12:8989/gateway-identity-1/name1-remote-fq:8989",
2262 },
2263 expectedNoEndpoints: false,
2264 expectedNoEndpointsServiceExists: false,
2265 },
2266 {
2267 k8sConfigs: []string{`
2268 apiVersion: v1
2269 kind: Service
2270 metadata:
2271 name: name1-remote
2272 namespace: ns
2273 spec:
2274 type: LoadBalancer
2275 ports:
2276 - port: 8989`,
2277 `
2278 apiVersion: discovery.k8s.io/v1
2279 kind: EndpointSlice
2280 metadata:
2281 name: name1-remote-xxxx
2282 namespace: ns
2283 annotations:
2284 mirror.linkerd.io/remote-gateway-identity: "gateway-identity-1"
2285 mirror.linkerd.io/remote-svc-fq-name: "name1-remote-fq"
2286 labels:
2287 mirror.linkerd.io/mirrored-service: "true"
2288 kubernetes.io/service-name: name1-remote
2289 endpoints:
2290 - addresses:
2291 - 172.17.0.12
2292 ports:
2293 - port: 8989`,
2294 },
2295 serviceType: "mirrored service with identity and endpoint slices",
2296 id: ServiceID{Name: "name1-remote", Namespace: "ns"},
2297 port: 8989,
2298 expectedAddresses: []string{
2299 "172.17.0.12:8989/gateway-identity-1/name1-remote-fq:8989",
2300 },
2301 expectedNoEndpoints: false,
2302 expectedNoEndpointsServiceExists: false,
2303 enableEndpointSlices: true,
2304 },
2305 {
2306 k8sConfigs: []string{`
2307 apiVersion: v1
2308 kind: Service
2309 metadata:
2310 name: name1-remote
2311 namespace: ns
2312 spec:
2313 type: LoadBalancer
2314 ports:
2315 - port: 8989`,
2316 `
2317 apiVersion: v1
2318 kind: Endpoints
2319 metadata:
2320 name: name1-remote
2321 namespace: ns
2322 annotations:
2323 mirror.linkerd.io/remote-svc-fq-name: "name1-remote-fq"
2324 labels:
2325 mirror.linkerd.io/mirrored-service: "true"
2326 subsets:
2327 - addresses:
2328 - ip: 172.17.0.12
2329 ports:
2330 - port: 8989`,
2331 },
2332 serviceType: "mirrored service without identity",
2333 id: ServiceID{Name: "name1-remote", Namespace: "ns"},
2334 port: 8989,
2335 expectedAddresses: []string{
2336 "172.17.0.12:8989/name1-remote-fq:8989",
2337 },
2338 expectedNoEndpoints: false,
2339 expectedNoEndpointsServiceExists: false,
2340 },
2341
2342 {
2343 k8sConfigs: []string{`
2344 apiVersion: v1
2345 kind: Service
2346 metadata:
2347 name: name1-remote
2348 namespace: ns
2349 spec:
2350 type: LoadBalancer
2351 ports:
2352 - port: 8989`,
2353 `
2354 apiVersion: v1
2355 kind: Endpoints
2356 metadata:
2357 name: name1-remote
2358 namespace: ns
2359 annotations:
2360 mirror.linkerd.io/remote-gateway-identity: "gateway-identity-1"
2361 mirror.linkerd.io/remote-svc-fq-name: "name1-remote-fq"
2362 labels:
2363 mirror.linkerd.io/mirrored-service: "true"
2364 subsets:
2365 - addresses:
2366 - ip: 172.17.0.12
2367 ports:
2368 - port: 9999`,
2369 },
2370 serviceType: "mirrored service with remapped port in endpoints",
2371 id: ServiceID{Name: "name1-remote", Namespace: "ns"},
2372 port: 8989,
2373 expectedAddresses: []string{
2374 "172.17.0.12:9999/gateway-identity-1/name1-remote-fq:8989",
2375 },
2376 expectedNoEndpoints: false,
2377 expectedNoEndpointsServiceExists: false,
2378 },
2379 {
2380 k8sConfigs: []string{`
2381 apiVersion: v1
2382 kind: Service
2383 metadata:
2384 name: name1-remote
2385 namespace: ns
2386 spec:
2387 type: LoadBalancer
2388 ports:
2389 - port: 8989`,
2390 `
2391 apiVersion: v1
2392 kind: Endpoints
2393 metadata:
2394 name: name1-remote
2395 namespace: ns
2396 annotations:
2397 mirror.linkerd.io/remote-gateway-identity: ""
2398 mirror.linkerd.io/remote-svc-fq-name: "name1-remote-fq"
2399 labels:
2400 mirror.linkerd.io/mirrored-service: "true"
2401 subsets:
2402 - addresses:
2403 - ip: 172.17.0.12
2404 ports:
2405 - port: 9999`,
2406 },
2407 serviceType: "mirrored service with empty identity and remapped port in endpoints",
2408 id: ServiceID{Name: "name1-remote", Namespace: "ns"},
2409 port: 8989,
2410 expectedAddresses: []string{
2411 "172.17.0.12:9999/name1-remote-fq:8989",
2412 },
2413 expectedNoEndpoints: false,
2414 expectedNoEndpointsServiceExists: false,
2415 },
2416 } {
2417 tt := tt
2418 t.Run("subscribes listener to "+tt.serviceType, func(t *testing.T) {
2419 k8sAPI, err := k8s.NewFakeAPI(tt.k8sConfigs...)
2420 if err != nil {
2421 t.Fatalf("NewFakeAPI returned an error: %s", err)
2422 }
2423
2424 metadataAPI, err := k8s.NewFakeMetadataAPI(nil)
2425 if err != nil {
2426 t.Fatalf("NewFakeMetadataAPI returned an error: %s", err)
2427 }
2428
2429 watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), tt.enableEndpointSlices, "local")
2430 if err != nil {
2431 t.Fatalf("can't create Endpoints watcher: %s", err)
2432 }
2433
2434 k8sAPI.Sync(nil)
2435 metadataAPI.Sync(nil)
2436
2437 listener := newBufferingEndpointListener()
2438
2439 err = watcher.Subscribe(tt.id, tt.port, tt.hostname, listener)
2440
2441 if err != nil {
2442 t.Fatalf("NewFakeAPI returned an error: %s", err)
2443 }
2444
2445 listener.ExpectAdded(tt.expectedAddresses, t)
2446
2447 if listener.endpointsAreNotCalled() != tt.expectedNoEndpoints {
2448 t.Fatalf("Expected noEndpointsCalled to be [%t], got [%t]",
2449 tt.expectedNoEndpoints, listener.endpointsAreNotCalled())
2450 }
2451
2452 if listener.endpointsDoNotExist() != tt.expectedNoEndpointsServiceExists {
2453 t.Fatalf("Expected noEndpointsExist to be [%t], got [%t]",
2454 tt.expectedNoEndpointsServiceExists, listener.endpointsDoNotExist())
2455 }
2456 })
2457 }
2458 }
2459
2460 func testPod(resVersion string) *corev1.Pod {
2461 return &corev1.Pod{
2462 TypeMeta: metav1.TypeMeta{
2463 Kind: "Pod",
2464 APIVersion: "v1",
2465 },
2466 ObjectMeta: metav1.ObjectMeta{
2467 ResourceVersion: resVersion,
2468 Name: "name1-1",
2469 Namespace: "ns",
2470 },
2471 Status: corev1.PodStatus{
2472 Phase: corev1.PodRunning,
2473 PodIP: "172.17.0.12",
2474 },
2475 }
2476 }
2477
2478 func endpoints(identity string) *corev1.Endpoints {
2479 return &corev1.Endpoints{
2480 TypeMeta: metav1.TypeMeta{
2481 Kind: "Endpoints",
2482 APIVersion: "v1",
2483 },
2484 ObjectMeta: metav1.ObjectMeta{
2485 Name: "remote-service",
2486 Namespace: "ns",
2487 Annotations: map[string]string{
2488 consts.RemoteGatewayIdentity: identity,
2489 consts.RemoteServiceFqName: "remote-service.svc.default.cluster.local",
2490 },
2491 Labels: map[string]string{
2492 consts.MirroredResourceLabel: "true",
2493 },
2494 },
2495 Subsets: []corev1.EndpointSubset{
2496 {
2497 Addresses: []corev1.EndpointAddress{
2498 {
2499 IP: "1.2.3.4",
2500 },
2501 },
2502 Ports: []corev1.EndpointPort{
2503 {
2504 Port: 80,
2505 },
2506 },
2507 },
2508 },
2509 }
2510 }
2511
2512 func createTestEndpointSlice(targetRefKind string) *dv1.EndpointSlice {
2513 return &dv1.EndpointSlice{
2514 AddressType: "IPv4",
2515 ObjectMeta: metav1.ObjectMeta{Name: "name1-del", Namespace: "ns", Labels: map[string]string{dv1.LabelServiceName: "name1"}},
2516 Endpoints: []dv1.Endpoint{
2517 {
2518 Addresses: []string{"172.17.0.12"},
2519 Conditions: dv1.EndpointConditions{Ready: func(b bool) *bool { return &b }(true)},
2520 TargetRef: &corev1.ObjectReference{Name: "name1-1", Namespace: "ns", Kind: targetRefKind},
2521 },
2522 },
2523 Ports: []dv1.EndpointPort{
2524 {
2525 Name: func(s string) *string { return &s }(""),
2526 Port: func(i int32) *int32 { return &i }(8989),
2527 },
2528 },
2529 }
2530 }
2531
2532 func TestEndpointsChangeDetection(t *testing.T) {
2533
2534 k8sConfigs := []string{`
2535 apiVersion: v1
2536 kind: Service
2537 metadata:
2538 name: remote-service
2539 namespace: ns
2540 spec:
2541 ports:
2542 - port: 80
2543 targetPort: 80`,
2544 `
2545 apiVersion: v1
2546 kind: Endpoints
2547 metadata:
2548 name: remote-service
2549 namespace: ns
2550 annotations:
2551 mirror.linkerd.io/remote-gateway-identity: "gateway-identity-1"
2552 mirror.linkerd.io/remote-svc-fq-name: "remote-service.svc.default.cluster.local"
2553 labels:
2554 mirror.linkerd.io/mirrored-service: "true"
2555 subsets:
2556 - addresses:
2557 - ip: 1.2.3.4
2558 ports:
2559 - port: 80`,
2560 }
2561
2562 for _, tt := range []struct {
2563 serviceType string
2564 id ServiceID
2565 port Port
2566 newEndpoints *corev1.Endpoints
2567 expectedAddresses []string
2568 }{
2569 {
2570 serviceType: "will update endpoints if identity is different",
2571 id: ServiceID{Name: "remote-service", Namespace: "ns"},
2572 port: 80,
2573 newEndpoints: endpoints("gateway-identity-2"),
2574 expectedAddresses: []string{"1.2.3.4:80/gateway-identity-1/remote-service.svc.default.cluster.local:80", "1.2.3.4:80/gateway-identity-2/remote-service.svc.default.cluster.local:80"},
2575 },
2576
2577 {
2578 serviceType: "will not update endpoints if identity is the same",
2579 id: ServiceID{Name: "remote-service", Namespace: "ns"},
2580 port: 80,
2581 newEndpoints: endpoints("gateway-identity-1"),
2582 expectedAddresses: []string{"1.2.3.4:80/gateway-identity-1/remote-service.svc.default.cluster.local:80"},
2583 },
2584 } {
2585
2586 tt := tt
2587 t.Run("subscribes listener to "+tt.serviceType, func(t *testing.T) {
2588 k8sAPI, err := k8s.NewFakeAPI(k8sConfigs...)
2589 if err != nil {
2590 t.Fatalf("NewFakeAPI returned an error: %s", err)
2591 }
2592
2593 metadataAPI, err := k8s.NewFakeMetadataAPI(nil)
2594 if err != nil {
2595 t.Fatalf("NewFakeMetadataAPI returned an error: %s", err)
2596 }
2597
2598 watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), false, "local")
2599 if err != nil {
2600 t.Fatalf("can't create Endpoints watcher: %s", err)
2601 }
2602
2603 k8sAPI.Sync(nil)
2604 metadataAPI.Sync(nil)
2605
2606 listener := newBufferingEndpointListener()
2607
2608 err = watcher.Subscribe(tt.id, tt.port, "", listener)
2609 if err != nil {
2610 t.Fatal(err)
2611 }
2612
2613 k8sAPI.Sync(nil)
2614
2615 watcher.addEndpoints(tt.newEndpoints)
2616
2617 listener.ExpectAdded(tt.expectedAddresses, t)
2618 })
2619 }
2620 }
2621
2622 func TestPodChangeDetection(t *testing.T) {
2623 endpoints := &corev1.Endpoints{
2624 TypeMeta: metav1.TypeMeta{
2625 Kind: "Endpoints",
2626 APIVersion: "v1",
2627 },
2628 ObjectMeta: metav1.ObjectMeta{
2629 Name: "name1",
2630 Namespace: "ns",
2631 },
2632 Subsets: []corev1.EndpointSubset{
2633 {
2634 Addresses: []corev1.EndpointAddress{
2635 {
2636 IP: "172.17.0.12",
2637 Hostname: "name1-1",
2638 TargetRef: &corev1.ObjectReference{
2639 Kind: "Pod",
2640 Namespace: "ns",
2641 Name: "name1-1",
2642 },
2643 },
2644 },
2645 Ports: []corev1.EndpointPort{
2646 {
2647 Port: 8989,
2648 },
2649 },
2650 },
2651 },
2652 }
2653
2654 k8sConfigs := []string{`
2655 apiVersion: v1
2656 kind: Service
2657 metadata:
2658 name: name1
2659 namespace: ns
2660 spec:
2661 type: LoadBalancer
2662 ports:
2663 - port: 8989`,
2664 `
2665 apiVersion: v1
2666 kind: Endpoints
2667 metadata:
2668 name: name1
2669 namespace: ns
2670 subsets:
2671 - addresses:
2672 - ip: 172.17.0.12
2673 hostname: name1-1
2674 targetRef:
2675 kind: Pod
2676 name: name1-1
2677 namespace: ns
2678 ports:
2679 - port: 8989`,
2680 `
2681 apiVersion: v1
2682 kind: Pod
2683 metadata:
2684 name: name1-1
2685 namespace: ns
2686 resourceVersion: "1"
2687 status:
2688 phase: Running
2689 podIP: 172.17.0.12`}
2690
2691 for _, tt := range []struct {
2692 serviceType string
2693 id ServiceID
2694 hostname string
2695 port Port
2696 newPod *corev1.Pod
2697 expectedAddresses []string
2698 }{
2699 {
2700 serviceType: "will update pod if resource version is different",
2701 id: ServiceID{Name: "name1", Namespace: "ns"},
2702 port: 8989,
2703 hostname: "name1-1",
2704 newPod: testPod("2"),
2705
2706 expectedAddresses: []string{"172.17.0.12:8989:1", "172.17.0.12:8989:2"},
2707 },
2708 {
2709 serviceType: "will not update pod if resource version is the same",
2710 id: ServiceID{Name: "name1", Namespace: "ns"},
2711 port: 8989,
2712 hostname: "name1-1",
2713 newPod: testPod("1"),
2714
2715 expectedAddresses: []string{"172.17.0.12:8989:1"},
2716 },
2717 } {
2718 tt := tt
2719 t.Run("subscribes listener to "+tt.serviceType, func(t *testing.T) {
2720 k8sAPI, err := k8s.NewFakeAPI(k8sConfigs...)
2721 if err != nil {
2722 t.Fatalf("NewFakeAPI returned an error: %s", err)
2723 }
2724
2725 metadataAPI, err := k8s.NewFakeMetadataAPI(nil)
2726 if err != nil {
2727 t.Fatalf("NewFakeMetadataAPI returned an error: %s", err)
2728 }
2729
2730 watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), false, "local")
2731 if err != nil {
2732 t.Fatalf("can't create Endpoints watcher: %s", err)
2733 }
2734
2735 k8sAPI.Sync(nil)
2736 metadataAPI.Sync(nil)
2737
2738 listener := newBufferingEndpointListenerWithResVersion()
2739
2740 err = watcher.Subscribe(tt.id, tt.port, tt.hostname, listener)
2741 if err != nil {
2742 t.Fatal(err)
2743 }
2744
2745 err = k8sAPI.Pod().Informer().GetStore().Add(tt.newPod)
2746 if err != nil {
2747 t.Fatal(err)
2748 }
2749 k8sAPI.Sync(nil)
2750
2751 watcher.addEndpoints(endpoints)
2752 listener.ExpectAdded(tt.expectedAddresses, t)
2753 })
2754 }
2755 }
2756
2757
2758
2759
2760 func TestEndpointSliceScaleDown(t *testing.T) {
2761 k8sConfigsWithES := []string{`
2762 kind: APIResourceList
2763 apiVersion: v1
2764 groupVersion: discovery.k8s.io/v1
2765 resources:
2766 - name: endpointslices
2767 singularName: endpointslice
2768 namespaced: true
2769 kind: EndpointSlice
2770 verbs:
2771 - delete
2772 - deletecollection
2773 - get
2774 - list
2775 - patch
2776 - create
2777 - update
2778 - watch
2779 `, `
2780 apiVersion: v1
2781 kind: Service
2782 metadata:
2783 name: name1
2784 namespace: ns
2785 spec:
2786 type: LoadBalancer
2787 ports:
2788 - port: 8989`, `
2789 addressType: IPv4
2790 apiVersion: discovery.k8s.io/v1
2791 endpoints:
2792 - addresses:
2793 - 172.17.0.12
2794 conditions:
2795 ready: true
2796 targetRef:
2797 kind: Pod
2798 name: name1-1
2799 namespace: ns
2800 topology:
2801 kubernetes.io/hostname: node-1
2802 kind: EndpointSlice
2803 metadata:
2804 labels:
2805 kubernetes.io/service-name: name1
2806 name: name1-es
2807 namespace: ns
2808 ports:
2809 - name: ""
2810 port: 8989`, `
2811 apiVersion: v1
2812 kind: Pod
2813 metadata:
2814 name: name1-1
2815 namespace: ns
2816 status:
2817 phase: Running
2818 podIP: 172.17.0.12`}
2819
2820
2821
2822 k8sAPI, err := k8s.NewFakeAPI(k8sConfigsWithES...)
2823 if err != nil {
2824 t.Fatalf("NewFakeAPI returned an error: %s", err)
2825 }
2826
2827 metadataAPI, err := k8s.NewFakeMetadataAPI(nil)
2828 if err != nil {
2829 t.Fatalf("NewFakeMetadataAPI returned an error: %s", err)
2830 }
2831
2832 watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), true, "local")
2833 if err != nil {
2834 t.Fatalf("can't create Endpoints watcher: %s", err)
2835 }
2836
2837 k8sAPI.Sync(nil)
2838 metadataAPI.Sync(nil)
2839
2840 listener := newBufferingEndpointListener()
2841
2842 err = watcher.Subscribe(ServiceID{Name: "name1", Namespace: "ns"}, 8989, "", listener)
2843 if err != nil {
2844 t.Fatal(err)
2845 }
2846
2847 k8sAPI.Sync(nil)
2848
2849 listener.ExpectAdded([]string{"172.17.0.12:8989"}, t)
2850
2851
2852
2853 err = k8sAPI.Client.CoreV1().Pods("ns").Delete(context.Background(), "name1-1", metav1.DeleteOptions{})
2854 if err != nil {
2855 t.Fatal(err)
2856 }
2857
2858
2859
2860 err = testutil.RetryFor(time.Second*30, func() error {
2861 _, err := k8sAPI.Pod().Lister().Pods("ns").Get("name1-1")
2862 if kerrors.IsNotFound(err) {
2863 return nil
2864 }
2865 if err == nil {
2866 return errors.New("pod should be deleted, but still exists in lister")
2867 }
2868 return err
2869 })
2870 if err != nil {
2871 t.Fatal(err)
2872 }
2873
2874 ES, err := k8sAPI.Client.DiscoveryV1().EndpointSlices("ns").Get(context.Background(), "name1-es", metav1.GetOptions{})
2875 if err != nil {
2876 t.Fatal(err)
2877 }
2878
2879 emptyES := &dv1.EndpointSlice{
2880 AddressType: "IPv4",
2881 ObjectMeta: metav1.ObjectMeta{
2882 Name: "name1-es", Namespace: "ns",
2883 Labels: map[string]string{dv1.LabelServiceName: "name1"},
2884 },
2885 Endpoints: []dv1.Endpoint{},
2886 Ports: []dv1.EndpointPort{},
2887 }
2888
2889 watcher.updateEndpointSlice(ES, emptyES)
2890
2891
2892
2893 listener.ExpectRemoved([]string{"172.17.0.12:8989"}, t)
2894 }
2895
2896
2897
2898 func TestEndpointSliceChangeNotReady(t *testing.T) {
2899 k8sConfigsWithES := []string{`
2900 kind: APIResourceList
2901 apiVersion: v1
2902 groupVersion: discovery.k8s.io/v1
2903 resources:
2904 - name: endpointslices
2905 singularName: endpointslice
2906 namespaced: true
2907 kind: EndpointSlice
2908 verbs:
2909 - delete
2910 - deletecollection
2911 - get
2912 - list
2913 - patch
2914 - create
2915 - update
2916 - watch
2917 `, `
2918 apiVersion: v1
2919 kind: Service
2920 metadata:
2921 name: name1
2922 namespace: ns
2923 spec:
2924 type: LoadBalancer
2925 ports:
2926 - port: 8989`, `
2927 addressType: IPv4
2928 apiVersion: discovery.k8s.io/v1
2929 endpoints:
2930 - addresses:
2931 - 172.17.0.12
2932 conditions:
2933 ready: true
2934 targetRef:
2935 kind: Pod
2936 name: name1-1
2937 namespace: ns
2938 - addresses:
2939 - 192.0.2.0
2940 conditions:
2941 ready: true
2942 targetRef:
2943 kind: ExternalWorkload
2944 name: wlkd1
2945 namespace: ns
2946 topology:
2947 kubernetes.io/hostname: node-1
2948 kind: EndpointSlice
2949 metadata:
2950 labels:
2951 kubernetes.io/service-name: name1
2952 name: name1-es
2953 namespace: ns
2954 ports:
2955 - name: ""
2956 port: 8989`, `
2957 apiVersion: v1
2958 kind: Pod
2959 metadata:
2960 name: name1-1
2961 namespace: ns
2962 status:
2963 phase: Running
2964 podIP: 172.17.0.12`, `
2965 apiVersion: workload.linkerd.io/v1beta1
2966 kind: ExternalWorkload
2967 metadata:
2968 name: wlkd1
2969 namespace: ns
2970 spec:
2971 meshTLS:
2972 identity: foo
2973 serverName: foo
2974 ports:
2975 - port: 8989
2976 workloadIPs:
2977 - ip: 192.0.2.0
2978 status:
2979 conditions:
2980 - type: Ready
2981 status: "True"
2982 `,
2983 }
2984
2985 k8sAPI, err := k8s.NewFakeAPI(k8sConfigsWithES...)
2986 if err != nil {
2987 t.Fatalf("NewFakeAPI returned an error: %s", err)
2988 }
2989
2990 metadataAPI, err := k8s.NewFakeMetadataAPI(nil)
2991 if err != nil {
2992 t.Fatalf("NewFakeMetadataAPI returned an error: %s", err)
2993 }
2994
2995 watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), true, "local")
2996 if err != nil {
2997 t.Fatalf("can't create Endpoints watcher: %s", err)
2998 }
2999
3000 k8sAPI.Sync(nil)
3001 metadataAPI.Sync(nil)
3002
3003 listener := newBufferingEndpointListener()
3004
3005 err = watcher.Subscribe(ServiceID{Name: "name1", Namespace: "ns"}, 8989, "", listener)
3006 if err != nil {
3007 t.Fatal(err)
3008 }
3009
3010 k8sAPI.Sync(nil)
3011 metadataAPI.Sync(nil)
3012
3013 listener.ExpectAdded([]string{"172.17.0.12:8989", "192.0.2.0:8989"}, t)
3014
3015
3016 es, err := k8sAPI.Client.DiscoveryV1().EndpointSlices("ns").Get(context.Background(), "name1-es", metav1.GetOptions{})
3017 if err != nil {
3018 t.Fatal(err)
3019 }
3020
3021 unready := false
3022 es.Endpoints[0].Conditions.Ready = &unready
3023 es.Endpoints[1].Conditions.Ready = &unready
3024
3025 _, err = k8sAPI.Client.DiscoveryV1().EndpointSlices("ns").Update(context.Background(), es, metav1.UpdateOptions{})
3026 if err != nil {
3027 t.Fatal(err)
3028 }
3029
3030 k8sAPI.Sync(nil)
3031 metadataAPI.Sync(nil)
3032
3033
3034 time.Sleep(50 * time.Millisecond)
3035
3036 listener.ExpectRemoved([]string{"172.17.0.12:8989", "192.0.2.0:8989"}, t)
3037 }
3038
3039
3040
3041 func TestEndpointSliceChangeToReady(t *testing.T) {
3042 k8sConfigsWithES := []string{`
3043 kind: APIResourceList
3044 apiVersion: v1
3045 groupVersion: discovery.k8s.io/v1
3046 resources:
3047 - name: endpointslices
3048 singularName: endpointslice
3049 namespaced: true
3050 kind: EndpointSlice
3051 verbs:
3052 - delete
3053 - deletecollection
3054 - get
3055 - list
3056 - patch
3057 - create
3058 - update
3059 - watch
3060 `, `
3061 apiVersion: v1
3062 kind: Service
3063 metadata:
3064 name: name1
3065 namespace: ns
3066 spec:
3067 type: LoadBalancer
3068 ports:
3069 - port: 8989`, `
3070 addressType: IPv4
3071 apiVersion: discovery.k8s.io/v1
3072 endpoints:
3073 - addresses:
3074 - 172.17.0.12
3075 conditions:
3076 ready: true
3077 targetRef:
3078 kind: Pod
3079 name: name1-1
3080 namespace: ns
3081 - addresses:
3082 - 172.17.0.13
3083 conditions:
3084 ready: false
3085 targetRef:
3086 kind: Pod
3087 name: name1-2
3088 namespace: ns
3089 - addresses:
3090 - 192.0.2.0
3091 conditions:
3092 ready: true
3093 targetRef:
3094 kind: ExternalWorkload
3095 name: wlkd1
3096 namespace: ns
3097 topology:
3098 kubernetes.io/hostname: node-1
3099 - addresses:
3100 - 192.0.2.1
3101 conditions:
3102 ready: false
3103 targetRef:
3104 kind: ExternalWorkload
3105 name: wlkd2
3106 namespace: ns
3107 topology:
3108 kubernetes.io/hostname: node-1
3109 kind: EndpointSlice
3110 metadata:
3111 labels:
3112 kubernetes.io/service-name: name1
3113 name: name1-es
3114 namespace: ns
3115 ports:
3116 - name: ""
3117 port: 8989`, `
3118 apiVersion: v1
3119 kind: Pod
3120 metadata:
3121 name: name1-1
3122 namespace: ns
3123 status:
3124 phase: Running
3125 podIP: 172.17.0.12`, `
3126 apiVersion: v1
3127 kind: Pod
3128 metadata:
3129 name: name1-2
3130 namespace: ns
3131 status:
3132 phase: Running
3133 podIP: 172.17.0.13`, `
3134 apiVersion: workload.linkerd.io/v1beta1
3135 kind: ExternalWorkload
3136 metadata:
3137 name: wlkd1
3138 namespace: ns
3139 spec:
3140 meshTLS:
3141 identity: foo
3142 serverName: foo
3143 ports:
3144 - port: 8989
3145 workloadIPs:
3146 - ip: 192.0.2.0
3147 status:
3148 conditions:
3149 - type: Ready
3150 status: "True"
3151 `, `
3152 apiVersion: workload.linkerd.io/v1beta1
3153 kind: ExternalWorkload
3154 metadata:
3155 name: wlkd2
3156 namespace: ns
3157 spec:
3158 meshTLS:
3159 identity: foo
3160 serverName: foo
3161 ports:
3162 - port: 8989
3163 workloadIPs:
3164 - ip: 192.0.2.1
3165 status:
3166 conditions:
3167 - type: Ready
3168 status: "True"
3169 `,
3170 }
3171
3172 k8sAPI, err := k8s.NewFakeAPI(k8sConfigsWithES...)
3173 if err != nil {
3174 t.Fatalf("NewFakeAPI returned an error: %s", err)
3175 }
3176
3177 metadataAPI, err := k8s.NewFakeMetadataAPI(nil)
3178 if err != nil {
3179 t.Fatalf("NewFakeMetadataAPI returned an error: %s", err)
3180 }
3181
3182 watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), true, "local")
3183 if err != nil {
3184 t.Fatalf("can't create Endpoints watcher: %s", err)
3185 }
3186
3187 k8sAPI.Sync(nil)
3188 metadataAPI.Sync(nil)
3189
3190 listener := newBufferingEndpointListener()
3191
3192 err = watcher.Subscribe(ServiceID{Name: "name1", Namespace: "ns"}, 8989, "", listener)
3193 if err != nil {
3194 t.Fatal(err)
3195 }
3196
3197 k8sAPI.Sync(nil)
3198 metadataAPI.Sync(nil)
3199
3200
3201 listener.ExpectAdded([]string{"172.17.0.12:8989", "192.0.2.0:8989"}, t)
3202
3203 es, err := k8sAPI.Client.DiscoveryV1().EndpointSlices("ns").Get(context.Background(), "name1-es", metav1.GetOptions{})
3204 if err != nil {
3205 t.Fatal(err)
3206 }
3207
3208
3209
3210 rdy := true
3211 es.Endpoints[1].Conditions.Ready = &rdy
3212 es.Endpoints[3].Conditions.Ready = &rdy
3213
3214 _, err = k8sAPI.Client.DiscoveryV1().EndpointSlices("ns").Update(context.Background(), es, metav1.UpdateOptions{})
3215 if err != nil {
3216 t.Fatal(err)
3217 }
3218
3219 k8sAPI.Sync(nil)
3220 metadataAPI.Sync(nil)
3221
3222
3223 time.Sleep(50 * time.Millisecond)
3224
3225 listener.ExpectAdded([]string{"172.17.0.12:8989", "172.17.0.13:8989", "192.0.2.0:8989", "192.0.2.1:8989"}, t)
3226
3227 }
3228
3229
3230 func TestEndpointSliceAddHints(t *testing.T) {
3231 k8sConfigsWithES := []string{`
3232 kind: APIResourceList
3233 apiVersion: v1
3234 groupVersion: discovery.k8s.io/v1
3235 resources:
3236 - name: endpointslices
3237 singularName: endpointslice
3238 namespaced: true
3239 kind: EndpointSlice
3240 verbs:
3241 - delete
3242 - deletecollection
3243 - get
3244 - list
3245 - patch
3246 - create
3247 - update
3248 - watch
3249 `, `
3250 apiVersion: v1
3251 kind: Service
3252 metadata:
3253 name: name1
3254 namespace: ns
3255 spec:
3256 type: LoadBalancer
3257 ports:
3258 - port: 8989`, `
3259 addressType: IPv4
3260 apiVersion: discovery.k8s.io/v1
3261 endpoints:
3262 - addresses:
3263 - 172.17.0.12
3264 conditions:
3265 ready: true
3266 targetRef:
3267 kind: Pod
3268 name: name1-1
3269 namespace: ns
3270 topology:
3271 kubernetes.io/hostname: node-1
3272 kind: EndpointSlice
3273 metadata:
3274 labels:
3275 kubernetes.io/service-name: name1
3276 name: name1-es
3277 namespace: ns
3278 ports:
3279 - name: ""
3280 port: 8989`, `
3281 apiVersion: v1
3282 kind: Pod
3283 metadata:
3284 name: name1-1
3285 namespace: ns
3286 status:
3287 phase: Running
3288 podIP: 172.17.0.12`}
3289
3290
3291
3292 k8sAPI, err := k8s.NewFakeAPI(k8sConfigsWithES...)
3293 if err != nil {
3294 t.Fatalf("NewFakeAPI returned an error: %s", err)
3295 }
3296
3297 metadataAPI, err := k8s.NewFakeMetadataAPI(nil)
3298 if err != nil {
3299 t.Fatalf("NewFakeMetadataAPI returned an error: %s", err)
3300 }
3301
3302 watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), true, "local")
3303 if err != nil {
3304 t.Fatalf("can't create Endpoints watcher: %s", err)
3305 }
3306
3307 k8sAPI.Sync(nil)
3308 metadataAPI.Sync(nil)
3309
3310 listener := newBufferingEndpointListener()
3311
3312 err = watcher.Subscribe(ServiceID{Name: "name1", Namespace: "ns"}, 8989, "", listener)
3313 if err != nil {
3314 t.Fatal(err)
3315 }
3316
3317 k8sAPI.Sync(nil)
3318 metadataAPI.Sync(nil)
3319
3320 listener.ExpectAdded([]string{"172.17.0.12:8989"}, t)
3321
3322
3323 es, err := k8sAPI.Client.DiscoveryV1().EndpointSlices("ns").Get(context.Background(), "name1-es", metav1.GetOptions{})
3324 if err != nil {
3325 t.Fatal(err)
3326 }
3327
3328 es.Endpoints[0].Hints = &dv1.EndpointHints{
3329 ForZones: []dv1.ForZone{{Name: "zone1"}},
3330 }
3331
3332 _, err = k8sAPI.Client.DiscoveryV1().EndpointSlices("ns").Update(context.Background(), es, metav1.UpdateOptions{})
3333 if err != nil {
3334 t.Fatal(err)
3335 }
3336
3337 k8sAPI.Sync(nil)
3338 metadataAPI.Sync(nil)
3339
3340
3341 time.Sleep(50 * time.Millisecond)
3342
3343 listener.ExpectAdded([]string{"172.17.0.12:8989", "172.17.0.12:8989"}, t)
3344 }
3345
3346
3347 func TestEndpointSliceRemoveHints(t *testing.T) {
3348 k8sConfigsWithES := []string{`
3349 kind: APIResourceList
3350 apiVersion: v1
3351 groupVersion: discovery.k8s.io/v1
3352 resources:
3353 - name: endpointslices
3354 singularName: endpointslice
3355 namespaced: true
3356 kind: EndpointSlice
3357 verbs:
3358 - delete
3359 - deletecollection
3360 - get
3361 - list
3362 - patch
3363 - create
3364 - update
3365 - watch
3366 `, `
3367 apiVersion: v1
3368 kind: Service
3369 metadata:
3370 name: name1
3371 namespace: ns
3372 spec:
3373 type: LoadBalancer
3374 ports:
3375 - port: 8989`, `
3376 addressType: IPv4
3377 apiVersion: discovery.k8s.io/v1
3378 endpoints:
3379 - addresses:
3380 - 172.17.0.12
3381 conditions:
3382 hints:
3383 forZones:
3384 - name: zone1
3385 ready: true
3386 targetRef:
3387 kind: Pod
3388 name: name1-1
3389 namespace: ns
3390 topology:
3391 kubernetes.io/hostname: node-1
3392 kind: EndpointSlice
3393 metadata:
3394 labels:
3395 kubernetes.io/service-name: name1
3396 name: name1-es
3397 namespace: ns
3398 ports:
3399 - name: ""
3400 port: 8989`, `
3401 apiVersion: v1
3402 kind: Pod
3403 metadata:
3404 name: name1-1
3405 namespace: ns
3406 status:
3407 phase: Running
3408 podIP: 172.17.0.12`}
3409
3410
3411
3412 k8sAPI, err := k8s.NewFakeAPI(k8sConfigsWithES...)
3413 if err != nil {
3414 t.Fatalf("NewFakeAPI returned an error: %s", err)
3415 }
3416
3417 metadataAPI, err := k8s.NewFakeMetadataAPI(nil)
3418 if err != nil {
3419 t.Fatalf("NewFakeMetadataAPI returned an error: %s", err)
3420 }
3421
3422 watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), true, "local")
3423 if err != nil {
3424 t.Fatalf("can't create Endpoints watcher: %s", err)
3425 }
3426
3427 k8sAPI.Sync(nil)
3428 metadataAPI.Sync(nil)
3429
3430 listener := newBufferingEndpointListener()
3431
3432 err = watcher.Subscribe(ServiceID{Name: "name1", Namespace: "ns"}, 8989, "", listener)
3433 if err != nil {
3434 t.Fatal(err)
3435 }
3436
3437 k8sAPI.Sync(nil)
3438 metadataAPI.Sync(nil)
3439
3440 listener.ExpectAdded([]string{"172.17.0.12:8989"}, t)
3441
3442
3443 es, err := k8sAPI.Client.DiscoveryV1().EndpointSlices("ns").Get(context.Background(), "name1-es", metav1.GetOptions{})
3444 if err != nil {
3445 t.Fatal(err)
3446 }
3447
3448 es.Endpoints[0].Hints = &dv1.EndpointHints{
3449
3450 }
3451
3452 _, err = k8sAPI.Client.DiscoveryV1().EndpointSlices("ns").Update(context.Background(), es, metav1.UpdateOptions{})
3453 if err != nil {
3454 t.Fatal(err)
3455 }
3456
3457 k8sAPI.Sync(nil)
3458 metadataAPI.Sync(nil)
3459
3460
3461 time.Sleep(50 * time.Millisecond)
3462
3463 listener.ExpectAdded([]string{"172.17.0.12:8989", "172.17.0.12:8989"}, t)
3464 }
3465
View as plain text