1 package servicemirror
2
3 import (
4 "context"
5 "fmt"
6 "testing"
7
8 "github.com/go-test/deep"
9 "github.com/linkerd/linkerd2/controller/k8s"
10 consts "github.com/linkerd/linkerd2/pkg/k8s"
11 "github.com/linkerd/linkerd2/pkg/multicluster"
12 logging "github.com/sirupsen/logrus"
13 corev1 "k8s.io/api/core/v1"
14 "k8s.io/apimachinery/pkg/api/errors"
15 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
16 "k8s.io/client-go/tools/record"
17 "k8s.io/client-go/util/workqueue"
18 )
19
20 type mirroringTestCase struct {
21 description string
22 environment *testEnvironment
23 expectedLocalServices []*corev1.Service
24 expectedLocalEndpoints []*corev1.Endpoints
25 expectedEventsInQueue []interface{}
26 }
27
28 func (tc *mirroringTestCase) run(t *testing.T) {
29 t.Run(tc.description, func(t *testing.T) {
30
31 q := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
32 localAPI, err := tc.environment.runEnvironment(q)
33 if err != nil {
34 t.Fatal(err)
35 }
36 if tc.expectedLocalServices == nil {
37
38 services, err := localAPI.Client.CoreV1().Services(corev1.NamespaceAll).List(context.Background(), metav1.ListOptions{})
39 if err != nil {
40 t.Fatal(err)
41 }
42
43 if len(services.Items) > 0 {
44 t.Fatalf("Was expecting no local services but instead found %v", services.Items)
45
46 }
47 } else {
48 for _, expected := range tc.expectedLocalServices {
49 actual, err := localAPI.Client.CoreV1().Services(expected.Namespace).Get(context.Background(), expected.Name, metav1.GetOptions{})
50 if err != nil {
51 t.Fatalf("Could not find mirrored service with name %s", expected.Name)
52 }
53
54 if err := diffServices(expected, actual); err != nil {
55 t.Fatal(err)
56 }
57 }
58 }
59
60 if tc.expectedLocalEndpoints == nil {
61
62
63 } else {
64 for _, expected := range tc.expectedLocalEndpoints {
65 actual, err := localAPI.Client.CoreV1().Endpoints(expected.Namespace).Get(context.Background(), expected.Name, metav1.GetOptions{})
66 if err != nil {
67 t.Fatalf("Could not find endpoints with name %s", expected.Name)
68 }
69
70 if err := diffEndpoints(expected, actual); err != nil {
71 t.Fatal(err)
72 }
73 }
74 }
75
76 expectedNumEvents := len(tc.expectedEventsInQueue)
77 actualNumEvents := q.Len()
78
79 if expectedNumEvents != actualNumEvents {
80 t.Fatalf("Was expecting %d events but got %d", expectedNumEvents, actualNumEvents)
81 }
82
83 for _, ev := range tc.expectedEventsInQueue {
84 evInQueue, _ := q.Get()
85 if diff := deep.Equal(ev, evInQueue); diff != nil {
86 t.Errorf("%v", diff)
87 }
88 }
89 })
90 }
91
92 func TestRemoteServiceCreatedMirroring(t *testing.T) {
93 for _, tt := range []mirroringTestCase{
94 {
95 description: "create service and endpoints when gateway can be resolved",
96 environment: createExportedService,
97 expectedLocalServices: []*corev1.Service{
98 mirrorService(
99 "service-one-remote",
100 "ns1",
101 "111",
102 []corev1.ServicePort{
103 {
104 Name: "port1",
105 Protocol: "TCP",
106 Port: 555,
107 },
108 {
109 Name: "port2",
110 Protocol: "TCP",
111 Port: 666,
112 },
113 }),
114 },
115 expectedLocalEndpoints: []*corev1.Endpoints{
116 endpoints("service-one-remote", "ns1", "192.0.2.127", "gateway-identity", []corev1.EndpointPort{
117 {
118 Name: "port1",
119 Port: 888,
120 Protocol: "TCP",
121 },
122 {
123 Name: "port2",
124 Port: 888,
125 Protocol: "TCP",
126 },
127 }),
128 },
129 },
130 {
131 description: "create headless service and endpoints when gateway can be resolved",
132 environment: createExportedHeadlessService,
133 expectedLocalServices: []*corev1.Service{
134 headlessMirrorService(
135 "service-one-remote",
136 "ns2",
137 "111",
138 []corev1.ServicePort{
139 {
140 Name: "port1",
141 Protocol: "TCP",
142 Port: 555,
143 },
144 {
145 Name: "port2",
146 Protocol: "TCP",
147 Port: 666,
148 },
149 }),
150 endpointMirrorService(
151 "pod-0",
152 "service-one-remote",
153 "ns2",
154 "112",
155 []corev1.ServicePort{
156 {
157 Name: "port1",
158 Protocol: "TCP",
159 Port: 555,
160 },
161 {
162 Name: "port2",
163 Protocol: "TCP",
164 Port: 666,
165 },
166 },
167 ),
168 },
169 expectedLocalEndpoints: []*corev1.Endpoints{
170 headlessMirrorEndpoints("service-one-remote", "ns2", "pod-0", "", "gateway-identity", []corev1.EndpointPort{
171 {
172 Name: "port1",
173 Port: 555,
174 Protocol: "TCP",
175 },
176 {
177 Name: "port2",
178 Port: 666,
179 Protocol: "TCP",
180 },
181 }),
182 endpointMirrorEndpoints(
183 "service-one-remote",
184 "ns2",
185 "pod-0",
186 "192.0.2.129",
187 "gateway-identity",
188 []corev1.EndpointPort{
189 {
190 Name: "port1",
191 Port: 889,
192 Protocol: "TCP",
193 },
194 {
195 Name: "port2",
196 Port: 889,
197 Protocol: "TCP",
198 },
199 }),
200 },
201 },
202 {
203 description: "remote discovery mirroring",
204 environment: createRemoteDiscoveryService,
205 expectedLocalServices: []*corev1.Service{
206 remoteDiscoveryMirrorService(
207 "service-one",
208 "ns1",
209 "111",
210 []corev1.ServicePort{
211 {
212 Name: "port1",
213 Protocol: "TCP",
214 Port: 555,
215 },
216 {
217 Name: "port2",
218 Protocol: "TCP",
219 Port: 666,
220 },
221 }),
222 },
223 expectedLocalEndpoints: []*corev1.Endpoints{},
224 },
225 {
226 description: "link with no gateway mirrors only remote discovery",
227 environment: noGatewayLink,
228 expectedLocalServices: []*corev1.Service{
229 remoteDiscoveryMirrorService(
230 "service-one",
231 "ns1",
232 "111",
233 []corev1.ServicePort{
234 {
235 Name: "port1",
236 Protocol: "TCP",
237 Port: 555,
238 },
239 {
240 Name: "port2",
241 Protocol: "TCP",
242 Port: 666,
243 },
244 }),
245 },
246 expectedLocalEndpoints: []*corev1.Endpoints{},
247 },
248 } {
249 tc := tt
250 tc.run(t)
251 }
252 }
253
254 func TestLocalNamespaceCreatedAfterServiceExport(t *testing.T) {
255 remoteAPI, err := k8s.NewFakeAPI(
256 gatewayAsYaml("existing-gateway", "existing-namespace", "222", "192.0.2.127", "mc-gateway", 888, "gateway-identity", defaultProbePort, defaultProbePath, defaultProbePeriod),
257 remoteServiceAsYaml("service-one", "ns1", "111", []corev1.ServicePort{}),
258 endpointsAsYaml("service-one", "ns1", "192.0.2.127", "gateway-identity", []corev1.EndpointPort{}),
259 )
260 if err != nil {
261 t.Fatal(err)
262 }
263 localAPI, err := k8s.NewFakeAPI()
264 if err != nil {
265 t.Fatal(err)
266 }
267 remoteAPI.Sync(nil)
268 localAPI.Sync(nil)
269
270 q := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
271 eventRecorder := record.NewFakeRecorder(100)
272
273 watcher := RemoteClusterServiceWatcher{
274 link: &multicluster.Link{
275 TargetClusterName: clusterName,
276 TargetClusterDomain: clusterDomain,
277 GatewayIdentity: "gateway-identity",
278 GatewayAddress: "192.0.2.127",
279 GatewayPort: 888,
280 ProbeSpec: defaultProbeSpec,
281 Selector: *defaultSelector,
282 RemoteDiscoverySelector: *defaultRemoteDiscoverySelector,
283 },
284 remoteAPIClient: remoteAPI,
285 localAPIClient: localAPI,
286 stopper: nil,
287 recorder: eventRecorder,
288 log: logging.WithFields(logging.Fields{"cluster": clusterName}),
289 eventsQueue: q,
290 requeueLimit: 0,
291 gatewayAlive: true,
292 headlessServicesEnabled: true,
293 }
294
295 q.Add(&RemoteServiceCreated{
296 service: remoteService("service-one", "ns1", "111", map[string]string{
297 consts.DefaultExportedServiceSelector: "true",
298 }, []corev1.ServicePort{
299 {
300 Name: "port1",
301 Protocol: "TCP",
302 Port: 555,
303 },
304 {
305 Name: "port2",
306 Protocol: "TCP",
307 Port: 666,
308 },
309 }),
310 })
311 for q.Len() > 0 {
312 watcher.processNextEvent(context.Background())
313 }
314
315 _, err = localAPI.Svc().Lister().Services("ns1").Get("service-one-remote")
316 if err == nil {
317 t.Fatalf("service-one should not exist in local cluster before namespace is created")
318 } else if !errors.IsNotFound(err) {
319 t.Fatalf("unexpected error: %v", err)
320 }
321
322 skippedEvent := <-eventRecorder.Events
323 if skippedEvent != fmt.Sprintf("%s %s %s", corev1.EventTypeNormal, eventTypeSkipped, "Skipped mirroring service: namespace does not exist") {
324 t.Error("Expected skipped event, got:", skippedEvent)
325 }
326
327 ns, err := localAPI.Client.CoreV1().Namespaces().Create(context.Background(), &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "ns1"}}, metav1.CreateOptions{})
328 if err != nil {
329 t.Fatal(err)
330 }
331
332 q.Add(&OnLocalNamespaceAdded{ns})
333 for q.Len() > 0 {
334 watcher.processNextEvent(context.Background())
335 }
336
337 _, err = localAPI.Client.CoreV1().Services("ns1").Get(context.Background(), "service-one-remote", metav1.GetOptions{})
338 if err != nil {
339 t.Fatalf("error getting service-one locally: %v", err)
340 }
341 }
342
343 func TestServiceCreatedGatewayAlive(t *testing.T) {
344 remoteAPI, err := k8s.NewFakeAPI(
345 gatewayAsYaml("gateway", "gateway-ns", "1", "192.0.0.1", "gateway", 888, "gateway-identity", defaultProbePort, defaultProbePath, defaultProbePeriod),
346 remoteServiceAsYaml("svc", "ns", "1", []corev1.ServicePort{}),
347 endpointsAsYaml("svc", "ns", "192.0.0.1", "gateway-identity", []corev1.EndpointPort{}),
348 )
349 if err != nil {
350 t.Fatal(err)
351 }
352 localAPI, err := k8s.NewFakeAPI(
353 namespaceAsYaml("ns"),
354 )
355 if err != nil {
356 t.Fatal(err)
357 }
358 remoteAPI.Sync(nil)
359 localAPI.Sync(nil)
360
361 events := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
362 watcher := RemoteClusterServiceWatcher{
363 link: &multicluster.Link{
364 TargetClusterName: clusterName,
365 TargetClusterDomain: clusterDomain,
366 GatewayIdentity: "gateway-identity",
367 GatewayAddress: "192.0.0.1",
368 GatewayPort: 888,
369 ProbeSpec: defaultProbeSpec,
370 Selector: *defaultSelector,
371 RemoteDiscoverySelector: *defaultRemoteDiscoverySelector,
372 },
373 remoteAPIClient: remoteAPI,
374 localAPIClient: localAPI,
375 log: logging.WithFields(logging.Fields{"cluster": clusterName}),
376 eventsQueue: events,
377 requeueLimit: 0,
378 gatewayAlive: true,
379 }
380
381 events.Add(&RemoteServiceCreated{
382 service: remoteService("svc", "ns", "1", map[string]string{
383 consts.DefaultExportedServiceSelector: "true",
384 }, []corev1.ServicePort{
385 {
386 Name: "port",
387 Protocol: "TCP",
388 Port: 111,
389 },
390 }),
391 })
392 for events.Len() > 0 {
393 watcher.processNextEvent(context.Background())
394 }
395
396
397
398 _, err = localAPI.Client.CoreV1().Services("ns").Get(context.Background(), "svc-remote", metav1.GetOptions{})
399 if err != nil {
400 t.Fatalf("error getting svc-remote Service: %v", err)
401 }
402 endpoints, err := localAPI.Client.CoreV1().Endpoints("ns").Get(context.Background(), "svc-remote", metav1.GetOptions{})
403 if err != nil {
404 t.Fatalf("error getting svc-remote Endpoints: %v", err)
405 }
406 if len(endpoints.Subsets) == 0 {
407 t.Fatal("expected svc-remote Endpoints subsets")
408 }
409 for _, ss := range endpoints.Subsets {
410 if len(ss.Addresses) == 0 {
411 t.Fatal("svc-remote Endpoints should have addresses")
412 }
413 if len(ss.NotReadyAddresses) != 0 {
414 t.Fatalf("svc-remote Endpoints should not have not ready addresses: %v", ss.NotReadyAddresses)
415 }
416 }
417
418
419
420 watcher.gatewayAlive = false
421 events.Add(&RepairEndpoints{})
422 for events.Len() > 0 {
423 watcher.processNextEvent(context.Background())
424 }
425
426
427
428
429 endpoints, err = localAPI.Client.CoreV1().Endpoints("ns").Get(context.Background(), "svc-remote", metav1.GetOptions{})
430 if err != nil {
431 t.Fatalf("error getting svc-remote Endpoints locally: %v", err)
432 }
433 if len(endpoints.Subsets) == 0 {
434 t.Fatal("expected svc-remote Endpoints subsets")
435 }
436 for _, ss := range endpoints.Subsets {
437 if len(ss.NotReadyAddresses) == 0 {
438 t.Fatal("svc-remote Endpoints should have not ready addresses")
439 }
440 if len(ss.Addresses) != 0 {
441 t.Fatalf("svc-remote Endpoints should not have addresses: %v", ss.Addresses)
442 }
443 }
444
445
446
447
448
449 events.Add(&RemoteServiceUpdated{
450 localService: remoteService("svc-remote", "ns", "2", nil, nil),
451 localEndpoints: endpoints,
452 remoteUpdate: remoteService("svc", "ns", "2", map[string]string{
453 consts.DefaultExportedServiceSelector: "true",
454 "new-label": "hi",
455 }, []corev1.ServicePort{
456 {
457 Name: "port",
458 Protocol: "TCP",
459 Port: 111,
460 },
461 }),
462 })
463 for events.Len() > 0 {
464 watcher.processNextEvent(context.Background())
465 }
466 service, err := localAPI.Client.CoreV1().Services("ns").Get(context.Background(), "svc-remote", metav1.GetOptions{})
467 if err != nil {
468 t.Fatalf("error getting svc-remote Service: %v", err)
469 }
470 _, ok := service.Labels["new-label"]
471 if !ok {
472 t.Fatalf("error updating svc-remote Service: %v", err)
473 }
474 endpoints, err = localAPI.Client.CoreV1().Endpoints("ns").Get(context.Background(), "svc-remote", metav1.GetOptions{})
475 if err != nil {
476 t.Fatalf("error getting svc-remote Endpoints: %v", err)
477 }
478 if len(endpoints.Subsets) == 0 {
479 t.Fatal("expected svc-remote Endpoints subsets")
480 }
481 for _, ss := range endpoints.Subsets {
482 if len(ss.NotReadyAddresses) == 0 {
483 t.Fatal("svc-remote Endpoints should have not ready addresses")
484 }
485 if len(ss.Addresses) != 0 {
486 t.Fatalf("svc-remote Endpoints should not have addresses: %v", ss.Addresses)
487 }
488 }
489 }
490
491 func TestServiceCreatedGatewayDown(t *testing.T) {
492 remoteAPI, err := k8s.NewFakeAPI(
493 gatewayAsYaml("gateway", "gateway-ns", "1", "192.0.0.1", "gateway", 888, "gateway-identity", defaultProbePort, defaultProbePath, defaultProbePeriod),
494 remoteServiceAsYaml("svc", "ns", "1", []corev1.ServicePort{}),
495 endpointsAsYaml("svc", "ns", "192.0.0.1", "gateway-identity", []corev1.EndpointPort{}),
496 )
497 if err != nil {
498 t.Fatal(err)
499 }
500 localAPI, err := k8s.NewFakeAPI(
501 namespaceAsYaml("ns"),
502 )
503 if err != nil {
504 t.Fatal(err)
505 }
506 remoteAPI.Sync(nil)
507 localAPI.Sync(nil)
508
509 events := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
510 watcher := RemoteClusterServiceWatcher{
511 link: &multicluster.Link{
512 TargetClusterName: clusterName,
513 TargetClusterDomain: clusterDomain,
514 GatewayIdentity: "gateway-identity",
515 GatewayAddress: "192.0.0.1",
516 GatewayPort: 888,
517 ProbeSpec: defaultProbeSpec,
518 Selector: *defaultSelector,
519 RemoteDiscoverySelector: *defaultRemoteDiscoverySelector,
520 },
521 remoteAPIClient: remoteAPI,
522 localAPIClient: localAPI,
523 log: logging.WithFields(logging.Fields{"cluster": clusterName}),
524 eventsQueue: events,
525 requeueLimit: 0,
526 gatewayAlive: false,
527 }
528
529 events.Add(&RemoteServiceCreated{
530 service: remoteService("svc", "ns", "1", map[string]string{
531 consts.DefaultExportedServiceSelector: "true",
532 }, []corev1.ServicePort{
533 {
534 Name: "port",
535 Protocol: "TCP",
536 Port: 111,
537 },
538 }),
539 })
540 for events.Len() > 0 {
541 watcher.processNextEvent(context.Background())
542 }
543
544
545
546 _, err = localAPI.Client.CoreV1().Services("ns").Get(context.Background(), "svc-remote", metav1.GetOptions{})
547 if err != nil {
548 t.Fatalf("error getting svc-remote Service: %v", err)
549 }
550 endpoints, err := localAPI.Client.CoreV1().Endpoints("ns").Get(context.Background(), "svc-remote", metav1.GetOptions{})
551 if err != nil {
552 t.Fatalf("error getting svc-remote Endpoints: %v", err)
553 }
554 if len(endpoints.Subsets) == 0 {
555 t.Fatal("expected svc-remote Endpoints subsets")
556 }
557 for _, ss := range endpoints.Subsets {
558 if len(ss.NotReadyAddresses) == 0 {
559 t.Fatal("svc-remote Endpoints should have not ready addresses")
560 }
561 if len(ss.Addresses) != 0 {
562 t.Fatalf("svc-remote Endpoints should not have addresses: %v", ss.Addresses)
563 }
564 }
565
566
567
568 watcher.gatewayAlive = true
569 events.Add(&RepairEndpoints{})
570 for events.Len() > 0 {
571 watcher.processNextEvent(context.Background())
572 }
573 endpoints, err = localAPI.Client.CoreV1().Endpoints("ns").Get(context.Background(), "svc-remote", metav1.GetOptions{})
574 if err != nil {
575 t.Fatalf("error getting svc-remote Endpoints locally: %v", err)
576 }
577 if len(endpoints.Subsets) == 0 {
578 t.Fatal("expected svc-remote Endpoints subsets")
579 }
580 for _, ss := range endpoints.Subsets {
581 if len(ss.Addresses) == 0 {
582 t.Fatal("svc-remote Endpoints should have addresses")
583 }
584 if len(ss.NotReadyAddresses) != 0 {
585 t.Fatalf("svc-remote Service endpoints should not have not ready addresses: %v", ss.NotReadyAddresses)
586 }
587 }
588 }
589
590 func TestRemoteServiceDeletedMirroring(t *testing.T) {
591 for _, tt := range []mirroringTestCase{
592 {
593 description: "deletes locally mirrored service",
594 environment: deleteMirrorService,
595 },
596 } {
597 tc := tt
598 tc.run(t)
599 }
600 }
601
602 func TestRemoteServiceUpdatedMirroring(t *testing.T) {
603 for _, tt := range []mirroringTestCase{
604 {
605 description: "updates service ports on both service and endpoints",
606 environment: updateServiceWithChangedPorts,
607 expectedLocalServices: []*corev1.Service{
608 mirrorService("test-service-remote", "test-namespace", "currentServiceResVersion",
609 []corev1.ServicePort{
610 {
611 Name: "port1",
612 Protocol: "TCP",
613 Port: 111,
614 },
615 {
616 Name: "port3",
617 Protocol: "TCP",
618 Port: 333,
619 },
620 }),
621 },
622
623 expectedLocalEndpoints: []*corev1.Endpoints{
624 endpoints("test-service-remote", "test-namespace", "192.0.2.127", "gateway-identity", []corev1.EndpointPort{
625 {
626 Name: "port1",
627 Port: 888,
628 Protocol: "TCP",
629 },
630 {
631 Name: "port3",
632 Port: 888,
633 Protocol: "TCP",
634 },
635 }),
636 },
637 },
638 } {
639 tc := tt
640 tc.run(t)
641 }
642 }
643
644
645
646
647
648
649 func TestEmptyRemoteSelectors(t *testing.T) {
650 for _, tt := range []mirroringTestCase{
651 {
652 description: "empty remote discovery selector does not result in exports",
653 environment: createEnvWithSelector(defaultSelector, &metav1.LabelSelector{}),
654 expectedEventsInQueue: []interface{}{&RemoteServiceCreated{
655 service: remoteService("service-one", "ns1", "111", map[string]string{
656 consts.DefaultExportedServiceSelector: "true",
657 }, []corev1.ServicePort{
658 {
659 Name: "default1",
660 Protocol: "TCP",
661 Port: 555,
662 },
663 {
664 Name: "default2",
665 Protocol: "TCP",
666 Port: 666,
667 },
668 }),
669 },
670 },
671 },
672 {
673 description: "empty default selector does not result in exports",
674 environment: createEnvWithSelector(&metav1.LabelSelector{}, defaultRemoteDiscoverySelector),
675 expectedEventsInQueue: []interface{}{&RemoteServiceCreated{
676 service: remoteService("service-two", "ns1", "111", map[string]string{
677 consts.DefaultExportedServiceSelector: "remote-discovery",
678 }, []corev1.ServicePort{
679 {
680 Name: "remote1",
681 Protocol: "TCP",
682 Port: 777,
683 },
684 {
685 Name: "remote2",
686 Protocol: "TCP",
687 Port: 888,
688 },
689 }),
690 }},
691 },
692 {
693 description: "no selector in link does not result in exports",
694 environment: createEnvWithSelector(&metav1.LabelSelector{}, &metav1.LabelSelector{}),
695 },
696 } {
697 tc := tt
698 tc.run(t)
699 }
700 }
701
702 func TestRemoteEndpointsUpdatedMirroring(t *testing.T) {
703 for _, tt := range []mirroringTestCase{
704 {
705 description: "updates headless mirror service with new remote Endpoints hosts",
706 environment: updateEndpointsWithChangedHosts,
707 expectedLocalServices: []*corev1.Service{
708 headlessMirrorService("service-two-remote", "eptest", "222", []corev1.ServicePort{
709 {
710 Name: "port1",
711 Protocol: "TCP",
712 Port: 555,
713 },
714 {
715 Name: "port2",
716 Protocol: "TCP",
717 Port: 666,
718 },
719 }),
720 endpointMirrorService("pod-0", "service-two-remote", "eptest", "333", []corev1.ServicePort{
721 {
722 Name: "port1",
723 Protocol: "TCP",
724 Port: 555,
725 },
726 {
727 Name: "port2",
728 Protocol: "TCP",
729 Port: 666,
730 },
731 }),
732 endpointMirrorService("pod-1", "service-two-remote", "eptest", "112", []corev1.ServicePort{
733 {
734 Name: "port1",
735 Protocol: "TCP",
736 Port: 555,
737 },
738 {
739 Name: "port2",
740 Protocol: "TCP",
741 Port: 666,
742 },
743 }),
744 },
745 expectedLocalEndpoints: []*corev1.Endpoints{
746 headlessMirrorEndpointsUpdated(
747 "service-two-remote",
748 "eptest",
749 []string{"pod-0", "pod-1"},
750 []string{"", ""},
751 "gateway-identity",
752 []corev1.EndpointPort{
753 {
754 Name: "port1",
755 Port: 555,
756 Protocol: "TCP",
757 },
758 {
759 Name: "port2",
760 Port: 666,
761 Protocol: "TCP",
762 },
763 }),
764 endpointMirrorEndpoints(
765 "service-two-remote",
766 "eptest",
767 "pod-0",
768 "192.0.2.127",
769 "gateway-identity",
770 []corev1.EndpointPort{
771 {
772 Name: "port1",
773 Port: 888,
774 Protocol: "TCP",
775 },
776 {
777 Name: "port2",
778 Port: 888,
779 Protocol: "TCP",
780 },
781 }),
782 endpointMirrorEndpoints(
783 "service-two-remote",
784 "eptest",
785 "pod-1",
786 "192.0.2.127",
787 "gateway-identity",
788 []corev1.EndpointPort{
789 {
790 Name: "port1",
791 Port: 888,
792 Protocol: "TCP",
793 },
794 {
795 Name: "port2",
796 Port: 888,
797 Protocol: "TCP",
798 },
799 }),
800 },
801 },
802 } {
803 tc := tt
804 tc.run(t)
805 }
806 }
807
808 func TestClusterUnregisteredMirroring(t *testing.T) {
809 for _, tt := range []mirroringTestCase{
810 {
811 description: "unregisters cluster and cleans up all mirrored resources",
812 environment: clusterUnregistered,
813 },
814 } {
815 tc := tt
816 tc.run(t)
817 }
818 }
819
820 func TestGcOrphanedServicesMirroring(t *testing.T) {
821 for _, tt := range []mirroringTestCase{
822 {
823 description: "deletes mirrored resources that are no longer present on the remote cluster",
824 environment: gcTriggered,
825 expectedLocalServices: []*corev1.Service{
826 mirrorService("test-service-1-remote", "test-namespace", "", nil),
827 headlessMirrorService("test-headless-service-remote", "test-namespace", "", nil),
828 endpointMirrorService("pod-0", "test-headless-service-remote", "test-namespace", "", nil),
829 },
830
831 expectedLocalEndpoints: []*corev1.Endpoints{
832 endpoints("test-service-1-remote", "test-namespace", "", "", nil),
833 headlessMirrorEndpoints("test-headless-service-remote", "test-namespace", "pod-0", "", "", nil),
834 endpointMirrorEndpoints("test-headless-service-remote", "test-namespace", "pod-0", "", "", nil),
835 },
836 },
837 } {
838 tc := tt
839 tc.run(t)
840 }
841 }
842
843 func onAddOrUpdateTestCases(isAdd bool) []mirroringTestCase {
844
845 testType := "ADD"
846 if !isAdd {
847 testType = "UPDATE"
848 }
849
850 return []mirroringTestCase{
851 {
852 description: fmt.Sprintf("enqueue a RemoteServiceCreated event when this is not a gateway and we have the needed annotations (%s)", testType),
853 environment: onAddOrUpdateExportedSvc(isAdd),
854 expectedEventsInQueue: []interface{}{&RemoteServiceCreated{
855 service: remoteService("test-service", "test-namespace", "resVersion", map[string]string{
856 consts.DefaultExportedServiceSelector: "true",
857 }, nil),
858 }},
859 },
860 {
861 description: fmt.Sprintf("enqueue a RemoteServiceUpdated event if this is a service that we have already mirrored and its res version is different (%s)", testType),
862 environment: onAddOrUpdateRemoteServiceUpdated(isAdd),
863 expectedEventsInQueue: []interface{}{&RemoteServiceUpdated{
864 localService: mirrorService("test-service-remote", "test-namespace", "pastResourceVersion", nil),
865 localEndpoints: endpoints("test-service-remote", "test-namespace", "0.0.0.0", "", nil),
866 remoteUpdate: remoteService("test-service", "test-namespace", "currentResVersion", map[string]string{
867 consts.DefaultExportedServiceSelector: "true",
868 }, nil),
869 }},
870 expectedLocalServices: []*corev1.Service{
871 mirrorService("test-service-remote", "test-namespace", "pastResourceVersion", nil),
872 },
873 expectedLocalEndpoints: []*corev1.Endpoints{
874 endpoints("test-service-remote", "test-namespace", "0.0.0.0", "", nil),
875 },
876 },
877 {
878 description: fmt.Sprintf("not enqueue any events as this update does not really tell us anything new (res version is the same...) (%s)", testType),
879 environment: onAddOrUpdateSameResVersion(isAdd),
880 expectedLocalServices: []*corev1.Service{
881 mirrorService("test-service-remote", "test-namespace", "currentResVersion", nil),
882 },
883 expectedLocalEndpoints: []*corev1.Endpoints{
884 endpoints("test-service-remote", "test-namespace", "0.0.0.0", "", nil),
885 },
886 },
887 {
888 description: fmt.Sprintf("enqueue RemoteServiceDeleted event as this service is not mirrorable anymore (%s)", testType),
889 environment: serviceNotExportedAnymore(isAdd),
890 expectedEventsInQueue: []interface{}{&RemoteServiceDeleted{
891 Name: "test-service",
892 Namespace: "test-namespace",
893 }},
894
895 expectedLocalServices: []*corev1.Service{
896 mirrorService("test-service-remote", "test-namespace", "currentResVersion", nil),
897 },
898 expectedLocalEndpoints: []*corev1.Endpoints{
899 endpoints("test-service-remote", "test-namespace", "0.0.0.0", "", nil),
900 },
901 },
902 }
903 }
904
905 func TestOnAdd(t *testing.T) {
906 for _, tt := range onAddOrUpdateTestCases(true) {
907 tc := tt
908 tc.run(t)
909 }
910 }
911
912 func TestOnUpdate(t *testing.T) {
913 for _, tt := range onAddOrUpdateTestCases(false) {
914 tc := tt
915 tc.run(t)
916 }
917 }
918
919 func TestOnDelete(t *testing.T) {
920 for _, tt := range []mirroringTestCase{
921 {
922 description: "enqueues a RemoteServiceDeleted because there is gateway metadata present on the service",
923 environment: onDeleteExportedService,
924 expectedEventsInQueue: []interface{}{
925 &RemoteServiceDeleted{
926 Name: "test-service",
927 Namespace: "test-namespace",
928 },
929 },
930 },
931 {
932 description: "skips because there is no gateway metadata present on the service",
933 environment: onDeleteNonExportedService,
934 expectedEventsInQueue: []interface{}{},
935 },
936 } {
937 tc := tt
938 tc.run(t)
939 }
940 }
941
View as plain text