1 package servicemirror
2
3 import (
4 "context"
5 "fmt"
6 "log"
7 "strings"
8 "time"
9
10 "github.com/go-test/deep"
11 "github.com/linkerd/linkerd2/controller/k8s"
12 consts "github.com/linkerd/linkerd2/pkg/k8s"
13 "github.com/linkerd/linkerd2/pkg/multicluster"
14 logging "github.com/sirupsen/logrus"
15 corev1 "k8s.io/api/core/v1"
16 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
17 "k8s.io/client-go/util/workqueue"
18 "sigs.k8s.io/yaml"
19 )
20
21 const (
22 clusterName = "remote"
23 clusterDomain = "cluster.local"
24 defaultProbePath = "/probe"
25 defaultProbePort = 12345
26 defaultProbePeriod = 60
27 )
28
29 var (
30 defaultProbeSpec = multicluster.ProbeSpec{
31 Path: defaultProbePath,
32 Port: defaultProbePort,
33 Period: time.Duration(defaultProbePeriod) * time.Second,
34 }
35 defaultSelector, _ = metav1.ParseToLabelSelector(consts.DefaultExportedServiceSelector + "=true")
36 defaultRemoteDiscoverySelector, _ = metav1.ParseToLabelSelector(consts.DefaultExportedServiceSelector + "=remote-discovery")
37 )
38
39 type testEnvironment struct {
40 events []interface{}
41 remoteResources []string
42 localResources []string
43 link multicluster.Link
44 }
45
46 func (te *testEnvironment) runEnvironment(watcherQueue workqueue.RateLimitingInterface) (*k8s.API, error) {
47 remoteAPI, err := k8s.NewFakeAPI(te.remoteResources...)
48 if err != nil {
49 return nil, err
50 }
51 localAPI, err := k8s.NewFakeAPI(te.localResources...)
52 if err != nil {
53 return nil, err
54 }
55 remoteAPI.Sync(nil)
56 localAPI.Sync(nil)
57
58 watcher := RemoteClusterServiceWatcher{
59 link: &te.link,
60 remoteAPIClient: remoteAPI,
61 localAPIClient: localAPI,
62 stopper: nil,
63 log: logging.WithFields(logging.Fields{"cluster": clusterName}),
64 eventsQueue: watcherQueue,
65 requeueLimit: 0,
66 gatewayAlive: true,
67 headlessServicesEnabled: true,
68 }
69
70 for _, ev := range te.events {
71 watcherQueue.Add(ev)
72 }
73
74 for range te.events {
75 watcher.processNextEvent(context.Background())
76 }
77
78 localAPI.Sync(nil)
79 remoteAPI.Sync(nil)
80
81 return localAPI, nil
82 }
83
84 var createExportedService = &testEnvironment{
85 events: []interface{}{
86 &RemoteServiceCreated{
87 service: remoteService("service-one", "ns1", "111", map[string]string{
88 consts.DefaultExportedServiceSelector: "true",
89 }, []corev1.ServicePort{
90 {
91 Name: "port1",
92 Protocol: "TCP",
93 Port: 555,
94 },
95 {
96 Name: "port2",
97 Protocol: "TCP",
98 Port: 666,
99 },
100 }),
101 },
102 },
103 remoteResources: []string{
104 gatewayAsYaml("existing-gateway", "existing-namespace", "222", "192.0.2.127", "mc-gateway", 888, "gateway-identity", defaultProbePort, defaultProbePath, defaultProbePeriod),
105 endpointsAsYaml("service-one", "ns1", "192.0.2.127", "gateway-identity", []corev1.EndpointPort{}),
106 },
107 localResources: []string{
108 namespaceAsYaml("ns1"),
109 },
110 link: multicluster.Link{
111 TargetClusterName: clusterName,
112 TargetClusterDomain: clusterDomain,
113 GatewayIdentity: "gateway-identity",
114 GatewayAddress: "192.0.2.127",
115 GatewayPort: 888,
116 ProbeSpec: defaultProbeSpec,
117 Selector: *defaultSelector,
118 RemoteDiscoverySelector: *defaultRemoteDiscoverySelector,
119 },
120 }
121
122 var createRemoteDiscoveryService = &testEnvironment{
123 events: []interface{}{
124 &RemoteServiceCreated{
125 service: remoteService("service-one", "ns1", "111", map[string]string{
126 consts.DefaultExportedServiceSelector: "remote-discovery",
127 }, []corev1.ServicePort{
128 {
129 Name: "port1",
130 Protocol: "TCP",
131 Port: 555,
132 },
133 {
134 Name: "port2",
135 Protocol: "TCP",
136 Port: 666,
137 },
138 }),
139 },
140 },
141 remoteResources: []string{
142 endpointsAsYaml("service-one", "ns1", "192.0.2.127", "gateway-identity", []corev1.EndpointPort{}),
143 },
144 localResources: []string{
145 namespaceAsYaml("ns1"),
146 },
147 link: multicluster.Link{
148 TargetClusterName: clusterName,
149 TargetClusterDomain: clusterDomain,
150 GatewayIdentity: "gateway-identity",
151 GatewayAddress: "192.0.2.127",
152 GatewayPort: 888,
153 ProbeSpec: defaultProbeSpec,
154 Selector: *defaultSelector,
155 RemoteDiscoverySelector: *defaultRemoteDiscoverySelector,
156 },
157 }
158
159 var createExportedHeadlessService = &testEnvironment{
160 events: []interface{}{
161 &RemoteServiceCreated{
162 service: remoteHeadlessService("service-one", "ns2", "111", map[string]string{
163 consts.DefaultExportedServiceSelector: "true",
164 }, []corev1.ServicePort{
165 {
166 Name: "port1",
167 Protocol: "TCP",
168 Port: 555,
169 },
170 {
171 Name: "port2",
172 Protocol: "TCP",
173 Port: 666,
174 },
175 }),
176 },
177 &OnAddEndpointsCalled{
178 ep: remoteHeadlessEndpoints("service-one", "ns2", "112", "192.0.0.1", []corev1.EndpointPort{
179 {
180 Name: "port1",
181 Protocol: "TCP",
182 Port: 555,
183 },
184 {
185 Name: "port2",
186 Protocol: "TCP",
187 Port: 666,
188 },
189 }),
190 },
191 },
192 remoteResources: []string{
193 gatewayAsYaml("existing-gateway", "existing-namespace", "222", "192.0.2.129", "gateway", 889, "gateway-identity", 123456, "/probe1", 120),
194 remoteHeadlessSvcAsYaml("service-one", "ns2", "111",
195 []corev1.ServicePort{
196 {
197 Name: "port1",
198 Protocol: "TCP",
199 Port: 555,
200 },
201 {
202 Name: "port2",
203 Protocol: "TCP",
204 Port: 666,
205 },
206 }),
207 remoteHeadlessEndpointsAsYaml("service-one", "ns2", "112", "192.0.0.1", []corev1.EndpointPort{
208 {
209 Name: "port1",
210 Protocol: "TCP",
211 Port: 555,
212 },
213 {
214 Name: "port2",
215 Protocol: "TCP",
216 Port: 666,
217 },
218 }),
219 },
220 localResources: []string{
221 namespaceAsYaml("ns2"),
222 },
223 link: multicluster.Link{
224 TargetClusterName: clusterName,
225 TargetClusterDomain: clusterDomain,
226 GatewayIdentity: "gateway-identity",
227 GatewayAddress: "192.0.2.129",
228 GatewayPort: 889,
229 ProbeSpec: multicluster.ProbeSpec{
230 Port: 123456,
231 Path: "/probe1",
232 Period: 120,
233 },
234 Selector: *defaultSelector,
235 RemoteDiscoverySelector: *defaultRemoteDiscoverySelector,
236 },
237 }
238
239 var deleteMirrorService = &testEnvironment{
240 events: []interface{}{
241 &RemoteServiceDeleted{
242 Name: "test-service-remote-to-delete",
243 Namespace: "test-namespace-to-delete",
244 },
245 },
246 localResources: []string{
247 mirrorServiceAsYaml("test-service-remote-to-delete-remote", "test-namespace-to-delete", "", nil),
248 endpointsAsYaml("test-service-remote-to-delete-remote", "test-namespace-to-delete", "", "gateway-identity", nil),
249 },
250 link: multicluster.Link{
251 TargetClusterName: clusterName,
252 TargetClusterDomain: clusterDomain,
253 GatewayIdentity: "gateway-identity",
254 GatewayAddress: "192.0.2.127",
255 GatewayPort: 888,
256 ProbeSpec: defaultProbeSpec,
257 Selector: *defaultSelector,
258 RemoteDiscoverySelector: *defaultRemoteDiscoverySelector,
259 },
260 }
261
262 var updateServiceWithChangedPorts = &testEnvironment{
263 events: []interface{}{
264 &RemoteServiceUpdated{
265 remoteUpdate: remoteService("test-service", "test-namespace", "currentServiceResVersion", map[string]string{
266 consts.DefaultExportedServiceSelector: "true",
267 }, []corev1.ServicePort{
268 {
269 Name: "port1",
270 Protocol: "TCP",
271 Port: 111,
272 },
273 {
274 Name: "port3",
275 Protocol: "TCP",
276 Port: 333,
277 },
278 }),
279 localService: mirrorService("test-service-remote", "test-namespace", "pastServiceResVersion", []corev1.ServicePort{
280 {
281 Name: "port1",
282 Protocol: "TCP",
283 Port: 111,
284 },
285 {
286 Name: "port2",
287 Protocol: "TCP",
288 Port: 222,
289 },
290 }),
291 localEndpoints: endpoints("test-service-remote", "test-namespace", "192.0.2.127", "", []corev1.EndpointPort{
292 {
293 Name: "port1",
294 Port: 888,
295 Protocol: "TCP",
296 },
297 {
298 Name: "port2",
299 Port: 888,
300 Protocol: "TCP",
301 },
302 }),
303 },
304 },
305 remoteResources: []string{
306 gatewayAsYaml("gateway", "gateway-ns", "currentGatewayResVersion", "192.0.2.127", "mc-gateway", 888, "", defaultProbePort, defaultProbePath, defaultProbePeriod),
307 },
308 localResources: []string{
309 mirrorServiceAsYaml("test-service-remote", "test-namespace", "past", []corev1.ServicePort{
310 {
311 Name: "port1",
312 Protocol: "TCP",
313 Port: 111,
314 },
315 {
316 Name: "port2",
317 Protocol: "TCP",
318 Port: 222,
319 },
320 {
321 Name: "port3",
322 Protocol: "TCP",
323 Port: 333,
324 },
325 }),
326 endpointsAsYaml("test-service-remote", "test-namespace", "192.0.2.127", "", []corev1.EndpointPort{
327 {
328 Name: "port1",
329 Port: 888,
330 Protocol: "TCP",
331 },
332 {
333 Name: "port2",
334 Port: 888,
335 Protocol: "TCP",
336 },
337 {
338 Name: "port3",
339 Port: 888,
340 Protocol: "TCP",
341 },
342 }),
343 },
344 link: multicluster.Link{
345 TargetClusterName: clusterName,
346 TargetClusterDomain: clusterDomain,
347 GatewayIdentity: "gateway-identity",
348 GatewayAddress: "192.0.2.127",
349 GatewayPort: 888,
350 ProbeSpec: defaultProbeSpec,
351 Selector: *defaultSelector,
352 RemoteDiscoverySelector: *defaultRemoteDiscoverySelector,
353 },
354 }
355
356 var updateEndpointsWithChangedHosts = &testEnvironment{
357 events: []interface{}{
358 &OnUpdateEndpointsCalled{
359 ep: remoteHeadlessEndpointsUpdate("service-two", "eptest", "112", "192.0.0.1", []corev1.EndpointPort{
360 {
361 Name: "port1",
362 Protocol: "TCP",
363 Port: 555,
364 },
365 {
366 Name: "port2",
367 Protocol: "TCP",
368 Port: 666,
369 },
370 }),
371 },
372 },
373 remoteResources: []string{
374 gatewayAsYaml("gateway", "gateway-ns", "currentGatewayResVersion", "192.0.2.127", "mc-gateway", 888, "", defaultProbePort, defaultProbePath, defaultProbePeriod),
375 remoteHeadlessSvcAsYaml("service-two", "eptest", "222",
376 []corev1.ServicePort{
377 {
378 Name: "port1",
379 Protocol: "TCP",
380 Port: 555,
381 },
382 {
383
384 Name: "port2",
385 Protocol: "TCP",
386 Port: 666,
387 },
388 }),
389 },
390 localResources: []string{
391 headlessMirrorAsYaml("service-two-remote", "eptest", "222",
392 []corev1.ServicePort{
393 {
394 Name: "port1",
395 Protocol: "TCP",
396 Port: 555,
397 },
398 {
399 Name: "port2",
400 Protocol: "TCP",
401 Port: 666,
402 },
403 }),
404 endpointMirrorAsYaml("pod-0", "service-two-remote", "eptest", "333", []corev1.ServicePort{
405 {
406 Name: "port1",
407 Protocol: "TCP",
408 Port: 555,
409 },
410 {
411 Name: "port2",
412 Protocol: "TCP",
413 Port: 666,
414 },
415 }),
416 headlessMirrorEndpointsAsYaml(
417 "service-two-remote",
418 "eptest",
419 "pod-0",
420 "",
421 "gateway-identity",
422 []corev1.EndpointPort{
423 {
424 Name: "port1",
425 Protocol: "TCP",
426 Port: 555,
427 },
428 {
429 Name: "port2",
430 Protocol: "TCP",
431 Port: 666,
432 },
433 }),
434 endpointMirrorEndpointsAsYaml(
435 "service-two-remote",
436 "eptest",
437 "pod-0",
438 "192.0.2.127",
439 "gateway-identity",
440 []corev1.EndpointPort{
441 {
442 Name: "port1",
443 Protocol: "TCP",
444 Port: 888,
445 },
446 {
447 Name: "port2",
448 Protocol: "TCP",
449 Port: 888,
450 },
451 }),
452 },
453 link: multicluster.Link{
454 TargetClusterName: clusterName,
455 TargetClusterDomain: clusterDomain,
456 GatewayIdentity: "gateway-identity",
457 GatewayAddress: "192.0.2.127",
458 GatewayPort: 888,
459 ProbeSpec: defaultProbeSpec,
460 Selector: *defaultSelector,
461 RemoteDiscoverySelector: *defaultRemoteDiscoverySelector,
462 },
463 }
464 var clusterUnregistered = &testEnvironment{
465 events: []interface{}{
466 &ClusterUnregistered{},
467 },
468 localResources: []string{
469 mirrorServiceAsYaml("test-service-1-remote", "test-namespace", "", nil),
470 endpointsAsYaml("test-service-1-remote", "test-namespace", "", "", nil),
471 mirrorServiceAsYaml("test-service-2-remote", "test-namespace", "", nil),
472 endpointsAsYaml("test-service-2-remote", "test-namespace", "", "", nil),
473 },
474 link: multicluster.Link{
475 TargetClusterName: clusterName,
476 },
477 }
478
479 var gcTriggered = &testEnvironment{
480 events: []interface{}{
481 &OrphanedServicesGcTriggered{},
482 },
483 localResources: []string{
484 mirrorServiceAsYaml("test-service-1-remote", "test-namespace", "", nil),
485 endpointsAsYaml("test-service-1-remote", "test-namespace", "", "", nil),
486 mirrorServiceAsYaml("test-service-2-remote", "test-namespace", "", nil),
487 endpointsAsYaml("test-service-2-remote", "test-namespace", "", "", nil),
488 headlessMirrorAsYaml("test-headless-service-remote", "test-namespace", "", nil),
489 endpointMirrorAsYaml("pod-0", "test-headless-service-remote", "test-namespace", "", nil),
490 headlessMirrorEndpointsAsYaml("test-headless-service-remote", "test-namespace", "pod-0", "", "", nil),
491 endpointMirrorEndpointsAsYaml("test-headless-service-remote", "test-namespace", "pod-0", "", "", nil),
492 },
493 remoteResources: []string{
494 remoteServiceAsYaml("test-service-1", "test-namespace", "", nil),
495 remoteHeadlessSvcAsYaml("test-headless-service", "test-namespace", "", nil),
496 },
497 link: multicluster.Link{
498 TargetClusterName: clusterName,
499 },
500 }
501
502 var noGatewayLink = &testEnvironment{
503 events: []interface{}{
504 &RemoteServiceCreated{
505 service: remoteService("service-one", "ns1", "111", map[string]string{
506 consts.DefaultExportedServiceSelector: "remote-discovery",
507 }, []corev1.ServicePort{
508 {
509 Name: "port1",
510 Protocol: "TCP",
511 Port: 555,
512 },
513 {
514 Name: "port2",
515 Protocol: "TCP",
516 Port: 666,
517 },
518 }),
519 },
520 &RemoteServiceCreated{
521 service: remoteService("service-two", "ns1", "111", map[string]string{
522 consts.DefaultExportedServiceSelector: "true",
523 }, []corev1.ServicePort{
524 {
525 Name: "port1",
526 Protocol: "TCP",
527 Port: 555,
528 },
529 {
530 Name: "port2",
531 Protocol: "TCP",
532 Port: 666,
533 },
534 }),
535 },
536 },
537 localResources: []string{
538 namespaceAsYaml("ns1"),
539 },
540 remoteResources: []string{
541 endpointsAsYaml("service-one", "ns1", "192.0.2.127", "gateway-identity", []corev1.EndpointPort{}),
542 endpointsAsYaml("service-two", "ns1", "192.0.2.128", "gateway-identity", []corev1.EndpointPort{}),
543 },
544 link: multicluster.Link{
545 TargetClusterName: clusterName,
546 TargetClusterDomain: clusterDomain,
547 GatewayIdentity: "",
548 GatewayAddress: "",
549 GatewayPort: 0,
550 ProbeSpec: multicluster.ProbeSpec{
551 Path: "",
552 Port: 0,
553 Period: time.Duration(0) * time.Second,
554 },
555 Selector: metav1.LabelSelector{},
556 RemoteDiscoverySelector: *defaultRemoteDiscoverySelector,
557 },
558 }
559
560 func onAddOrUpdateExportedSvc(isAdd bool) *testEnvironment {
561 return &testEnvironment{
562 events: []interface{}{
563 onAddOrUpdateEvent(isAdd, remoteService("test-service", "test-namespace", "resVersion", map[string]string{
564 consts.DefaultExportedServiceSelector: "true",
565 }, nil)),
566 },
567 link: multicluster.Link{
568 TargetClusterName: clusterName,
569 TargetClusterDomain: clusterDomain,
570 GatewayIdentity: "gateway-identity",
571 GatewayAddress: "192.0.2.127",
572 GatewayPort: 888,
573 ProbeSpec: defaultProbeSpec,
574 Selector: *defaultSelector,
575 RemoteDiscoverySelector: *defaultRemoteDiscoverySelector,
576 },
577 }
578
579 }
580
581 func onAddOrUpdateRemoteServiceUpdated(isAdd bool) *testEnvironment {
582 return &testEnvironment{
583 events: []interface{}{
584 onAddOrUpdateEvent(isAdd, remoteService("test-service", "test-namespace", "currentResVersion", map[string]string{
585 consts.DefaultExportedServiceSelector: "true",
586 }, nil)),
587 },
588 localResources: []string{
589 mirrorServiceAsYaml("test-service-remote", "test-namespace", "pastResourceVersion", nil),
590 endpointsAsYaml("test-service-remote", "test-namespace", "0.0.0.0", "", nil),
591 },
592 link: multicluster.Link{
593 TargetClusterName: clusterName,
594 TargetClusterDomain: clusterDomain,
595 GatewayIdentity: "gateway-identity",
596 GatewayAddress: "192.0.2.127",
597 GatewayPort: 888,
598 ProbeSpec: defaultProbeSpec,
599 Selector: *defaultSelector,
600 RemoteDiscoverySelector: *defaultRemoteDiscoverySelector,
601 },
602 }
603 }
604
605 func onAddOrUpdateSameResVersion(isAdd bool) *testEnvironment {
606 return &testEnvironment{
607 events: []interface{}{
608 onAddOrUpdateEvent(isAdd, remoteService("test-service", "test-namespace", "currentResVersion", map[string]string{
609 consts.DefaultExportedServiceSelector: "true",
610 }, nil)),
611 },
612 localResources: []string{
613 mirrorServiceAsYaml("test-service-remote", "test-namespace", "currentResVersion", nil),
614 endpointsAsYaml("test-service-remote", "test-namespace", "0.0.0.0", "", nil),
615 },
616 link: multicluster.Link{
617 TargetClusterName: clusterName,
618 TargetClusterDomain: clusterDomain,
619 GatewayIdentity: "gateway-identity",
620 GatewayAddress: "192.0.2.127",
621 GatewayPort: 888,
622 ProbeSpec: defaultProbeSpec,
623 Selector: *defaultSelector,
624 RemoteDiscoverySelector: *defaultRemoteDiscoverySelector,
625 },
626 }
627 }
628
629 func serviceNotExportedAnymore(isAdd bool) *testEnvironment {
630 return &testEnvironment{
631 events: []interface{}{
632 onAddOrUpdateEvent(isAdd, remoteService("test-service", "test-namespace", "currentResVersion", map[string]string{}, nil)),
633 },
634 localResources: []string{
635 mirrorServiceAsYaml("test-service-remote", "test-namespace", "currentResVersion", nil),
636 endpointsAsYaml("test-service-remote", "test-namespace", "0.0.0.0", "", nil),
637 },
638 link: multicluster.Link{
639 TargetClusterName: clusterName,
640 TargetClusterDomain: clusterDomain,
641 GatewayIdentity: "gateway-identity",
642 GatewayAddress: "192.0.2.127",
643 GatewayPort: 888,
644 ProbeSpec: defaultProbeSpec,
645 Selector: *defaultSelector,
646 RemoteDiscoverySelector: *defaultRemoteDiscoverySelector,
647 },
648 }
649 }
650
651 var onDeleteExportedService = &testEnvironment{
652 events: []interface{}{
653 &OnDeleteCalled{
654 svc: remoteService("test-service", "test-namespace", "currentResVersion", map[string]string{
655 consts.DefaultExportedServiceSelector: "true",
656 }, nil),
657 },
658 },
659 link: multicluster.Link{
660 TargetClusterName: clusterName,
661 TargetClusterDomain: clusterDomain,
662 GatewayIdentity: "gateway-identity",
663 GatewayAddress: "192.0.2.127",
664 GatewayPort: 888,
665 ProbeSpec: defaultProbeSpec,
666 Selector: *defaultSelector,
667 RemoteDiscoverySelector: *defaultRemoteDiscoverySelector,
668 },
669 }
670
671 var onDeleteNonExportedService = &testEnvironment{
672 events: []interface{}{
673 &OnDeleteCalled{
674 svc: remoteService("gateway", "test-namespace", "currentResVersion", map[string]string{}, nil),
675 },
676 },
677 link: multicluster.Link{
678 TargetClusterName: clusterName,
679 TargetClusterDomain: clusterDomain,
680 GatewayIdentity: "gateway-identity",
681 GatewayAddress: "192.0.2.127",
682 GatewayPort: 888,
683 ProbeSpec: defaultProbeSpec,
684 Selector: *defaultSelector,
685 RemoteDiscoverySelector: *defaultRemoteDiscoverySelector,
686 },
687 }
688
689
690
691
692 func onAddOrUpdateEvent(isAdd bool, svc *corev1.Service) interface{} {
693 if isAdd {
694 return &OnAddCalled{svc: svc}
695 }
696 return &OnUpdateCalled{svc: svc}
697 }
698
699 func diffServices(expected, actual *corev1.Service) error {
700 if expected.Name != actual.Name {
701 return fmt.Errorf("was expecting service with name %s but was %s", expected.Name, actual.Name)
702 }
703
704 if expected.Namespace != actual.Namespace {
705 return fmt.Errorf("was expecting service with namespace %s but was %s", expected.Namespace, actual.Namespace)
706 }
707
708 if diff := deep.Equal(expected.Annotations, actual.Annotations); diff != nil {
709 return fmt.Errorf("annotation mismatch %+v", diff)
710 }
711
712 if diff := deep.Equal(expected.Labels, actual.Labels); diff != nil {
713 return fmt.Errorf("label mismatch %+v", diff)
714 }
715
716 return nil
717 }
718
719 func diffEndpoints(expected, actual *corev1.Endpoints) error {
720 if expected.Name != actual.Name {
721 return fmt.Errorf("was expecting endpoints with name %s but was %s", expected.Name, actual.Name)
722 }
723
724 if expected.Namespace != actual.Namespace {
725 return fmt.Errorf("was expecting endpoints with namespace %s but was %s", expected.Namespace, actual.Namespace)
726 }
727
728 if diff := deep.Equal(expected.Annotations, actual.Annotations); diff != nil {
729 return fmt.Errorf("annotation mismatch %+v", diff)
730 }
731
732 if diff := deep.Equal(expected.Labels, actual.Labels); diff != nil {
733 return fmt.Errorf("label mismatch %+v", diff)
734 }
735
736 if diff := deep.Equal(expected.Subsets, actual.Subsets); diff != nil {
737 return fmt.Errorf("subsets mismatch %+v", diff)
738 }
739
740 return nil
741 }
742
743 func remoteService(name, namespace, resourceVersion string, labels map[string]string, ports []corev1.ServicePort) *corev1.Service {
744 return &corev1.Service{
745 TypeMeta: metav1.TypeMeta{
746 Kind: "Service",
747 APIVersion: "v1",
748 },
749 ObjectMeta: metav1.ObjectMeta{
750 Name: name,
751 Namespace: namespace,
752 ResourceVersion: resourceVersion,
753 Labels: labels,
754 },
755 Spec: corev1.ServiceSpec{
756 Ports: ports,
757 },
758 }
759 }
760
761 func remoteHeadlessService(name, namespace, resourceVersion string, labels map[string]string, ports []corev1.ServicePort) *corev1.Service {
762 return &corev1.Service{
763 TypeMeta: metav1.TypeMeta{
764 Kind: "Service",
765 APIVersion: "v1",
766 },
767 ObjectMeta: metav1.ObjectMeta{
768 Name: name,
769 Namespace: namespace,
770 ResourceVersion: resourceVersion,
771 Labels: labels,
772 },
773 Spec: corev1.ServiceSpec{
774 ClusterIP: "None",
775 Ports: ports,
776 },
777 }
778 }
779
780 func remoteHeadlessEndpoints(name, namespace, resourceVersion, address string, ports []corev1.EndpointPort) *corev1.Endpoints {
781 return &corev1.Endpoints{
782 TypeMeta: metav1.TypeMeta{
783 Kind: "Endpoints",
784 APIVersion: "v1",
785 },
786 ObjectMeta: metav1.ObjectMeta{
787 Name: name,
788 Namespace: namespace,
789 ResourceVersion: resourceVersion,
790 Labels: map[string]string{
791 "service.kubernetes.io/headless": "",
792 consts.DefaultExportedServiceSelector: "true",
793 },
794 },
795 Subsets: []corev1.EndpointSubset{
796 {
797 Addresses: []corev1.EndpointAddress{
798 {
799 Hostname: "pod-0",
800 IP: address,
801 TargetRef: &corev1.ObjectReference{
802 Name: "pod-0",
803 ResourceVersion: resourceVersion,
804 },
805 },
806 },
807 Ports: ports,
808 },
809 },
810 }
811 }
812
813 func remoteHeadlessEndpointsUpdate(name, namespace, resourceVersion, address string, ports []corev1.EndpointPort) *corev1.Endpoints {
814 return &corev1.Endpoints{
815 TypeMeta: metav1.TypeMeta{
816 Kind: "Endpoints",
817 APIVersion: "v1",
818 },
819 ObjectMeta: metav1.ObjectMeta{
820 Name: name,
821 Namespace: namespace,
822 ResourceVersion: resourceVersion,
823 Labels: map[string]string{
824 "service.kubernetes.io/headless": "",
825 consts.DefaultExportedServiceSelector: "true",
826 },
827 },
828 Subsets: []corev1.EndpointSubset{
829 {
830 Addresses: []corev1.EndpointAddress{
831 {
832 Hostname: "pod-0",
833 IP: address,
834 TargetRef: &corev1.ObjectReference{
835 Name: "pod-0",
836 ResourceVersion: resourceVersion,
837 },
838 },
839 {
840 Hostname: "pod-1",
841 IP: address,
842 TargetRef: &corev1.ObjectReference{
843 Name: "pod-1",
844 ResourceVersion: resourceVersion,
845 },
846 },
847 },
848 Ports: ports,
849 },
850 },
851 }
852 }
853
854 func remoteServiceAsYaml(name, namespace, resourceVersion string, ports []corev1.ServicePort) string {
855 svc := remoteService(name, namespace, resourceVersion, map[string]string{
856 consts.DefaultExportedServiceSelector: "true",
857 }, ports)
858
859 bytes, err := yaml.Marshal(svc)
860 if err != nil {
861 log.Fatal(err)
862 }
863 return string(bytes)
864 }
865
866 func remoteHeadlessSvcAsYaml(name, namespace, resourceVersion string, ports []corev1.ServicePort) string {
867 svc := remoteHeadlessService(name, namespace, resourceVersion, nil, ports)
868
869 bytes, err := yaml.Marshal(svc)
870 if err != nil {
871 log.Fatal(err)
872 }
873 return string(bytes)
874 }
875
876 func remoteHeadlessEndpointsAsYaml(name, namespace, resourceVersion, address string, ports []corev1.EndpointPort) string {
877 ep := remoteHeadlessEndpoints(name, namespace, resourceVersion, address, ports)
878
879 bytes, err := yaml.Marshal(ep)
880 if err != nil {
881 log.Fatal(err)
882 }
883 return string(bytes)
884 }
885 func mirrorService(name, namespace, resourceVersion string, ports []corev1.ServicePort) *corev1.Service {
886 annotations := make(map[string]string)
887 annotations[consts.RemoteResourceVersionAnnotation] = resourceVersion
888 annotations[consts.RemoteServiceFqName] = fmt.Sprintf("%s.%s.svc.cluster.local", strings.Replace(name, "-remote", "", 1), namespace)
889
890 return &corev1.Service{
891 TypeMeta: metav1.TypeMeta{
892 Kind: "Service",
893 APIVersion: "v1",
894 },
895 ObjectMeta: metav1.ObjectMeta{
896 Name: name,
897 Namespace: namespace,
898 Labels: map[string]string{
899 consts.RemoteClusterNameLabel: clusterName,
900 consts.MirroredResourceLabel: "true",
901 },
902 Annotations: annotations,
903 },
904 Spec: corev1.ServiceSpec{
905 Ports: ports,
906 },
907 }
908 }
909
910 func headlessMirrorService(name, namespace, resourceVersion string, ports []corev1.ServicePort) *corev1.Service {
911 svc := mirrorService(name, namespace, resourceVersion, ports)
912 svc.Spec.ClusterIP = "None"
913 return svc
914 }
915
916 func endpointMirrorService(hostname, rootName, namespace, resourceVersion string, ports []corev1.ServicePort) *corev1.Service {
917 annotations := make(map[string]string)
918 annotations[consts.RemoteResourceVersionAnnotation] = resourceVersion
919 annotations[consts.RemoteServiceFqName] = fmt.Sprintf("%s.%s.%s.svc.cluster.local", hostname, strings.Replace(rootName, "-remote", "", 1), namespace)
920
921 return &corev1.Service{
922 TypeMeta: metav1.TypeMeta{
923 Kind: "Service",
924 APIVersion: "v1",
925 },
926 ObjectMeta: metav1.ObjectMeta{
927 Name: fmt.Sprintf("%s-%s", hostname, clusterName),
928 Namespace: namespace,
929 Labels: map[string]string{
930
931 consts.MirroredHeadlessSvcNameLabel: rootName,
932 consts.RemoteClusterNameLabel: clusterName,
933 consts.MirroredResourceLabel: "true",
934 },
935 Annotations: annotations,
936 },
937 Spec: corev1.ServiceSpec{
938 Ports: ports,
939 },
940 }
941 }
942
943 func remoteDiscoveryMirrorService(name, namespace, resourceVersion string, ports []corev1.ServicePort) *corev1.Service {
944 annotations := make(map[string]string)
945 annotations[consts.RemoteResourceVersionAnnotation] = resourceVersion
946 annotations[consts.RemoteServiceFqName] = fmt.Sprintf("%s.%s.svc.cluster.local", name, namespace)
947
948 return &corev1.Service{
949 TypeMeta: metav1.TypeMeta{
950 Kind: "Service",
951 APIVersion: "v1",
952 },
953 ObjectMeta: metav1.ObjectMeta{
954 Name: fmt.Sprintf("%s-%s", name, clusterName),
955 Namespace: namespace,
956 Labels: map[string]string{
957 consts.RemoteClusterNameLabel: clusterName,
958 consts.MirroredResourceLabel: "true",
959 consts.RemoteDiscoveryLabel: clusterName,
960 consts.RemoteServiceLabel: name,
961 },
962 Annotations: annotations,
963 },
964 Spec: corev1.ServiceSpec{
965 Ports: ports,
966 },
967 }
968 }
969
970 func mirrorServiceAsYaml(name, namespace, resourceVersion string, ports []corev1.ServicePort) string {
971 svc := mirrorService(name, namespace, resourceVersion, ports)
972
973 bytes, err := yaml.Marshal(svc)
974 if err != nil {
975 log.Fatal(err)
976 }
977 return string(bytes)
978 }
979
980 func headlessMirrorAsYaml(name, namespace, resourceVersion string, ports []corev1.ServicePort) string {
981 svc := headlessMirrorService(name, namespace, resourceVersion, ports)
982
983 bytes, err := yaml.Marshal(svc)
984 if err != nil {
985 log.Fatal(err)
986 }
987 return string(bytes)
988 }
989
990 func endpointMirrorAsYaml(hostname, rootName, namespace, resourceVersion string, ports []corev1.ServicePort) string {
991 svc := endpointMirrorService(hostname, rootName, namespace, resourceVersion, ports)
992
993 bytes, err := yaml.Marshal(svc)
994 if err != nil {
995 log.Fatal(err)
996 }
997
998 return string(bytes)
999 }
1000
1001 func gateway(name, namespace, resourceVersion, ip, hostname, portName string, port int32, identity string, probePort int32, probePath string, probePeriod int) *corev1.Service {
1002 svc := corev1.Service{
1003 TypeMeta: metav1.TypeMeta{
1004 Kind: "Service",
1005 APIVersion: "v1",
1006 },
1007 ObjectMeta: metav1.ObjectMeta{
1008 Name: name,
1009 Namespace: namespace,
1010 ResourceVersion: resourceVersion,
1011 Annotations: map[string]string{
1012 consts.GatewayIdentity: identity,
1013 consts.GatewayProbePath: probePath,
1014 consts.GatewayProbePeriod: fmt.Sprint(probePeriod),
1015 },
1016 },
1017 Spec: corev1.ServiceSpec{
1018 Ports: []corev1.ServicePort{
1019 {
1020 Name: portName,
1021 Protocol: "TCP",
1022 Port: port,
1023 },
1024 {
1025 Name: consts.ProbePortName,
1026 Protocol: "TCP",
1027 Port: probePort,
1028 },
1029 },
1030 },
1031 }
1032
1033 if ip != "" {
1034 svc.Status.LoadBalancer.Ingress = append(svc.Status.LoadBalancer.Ingress, corev1.LoadBalancerIngress{IP: ip})
1035 }
1036 if hostname != "" {
1037 svc.Status.LoadBalancer.Ingress = append(svc.Status.LoadBalancer.Ingress, corev1.LoadBalancerIngress{Hostname: hostname})
1038 }
1039 return &svc
1040 }
1041
1042 func gatewayAsYaml(name, namespace, resourceVersion, ip, portName string, port int32, identity string, probePort int32, probePath string, probePeriod int) string {
1043 gtw := gateway(name, namespace, resourceVersion, ip, "", portName, port, identity, probePort, probePath, probePeriod)
1044
1045 bytes, err := yaml.Marshal(gtw)
1046 if err != nil {
1047 log.Fatal(err)
1048 }
1049 return string(bytes)
1050 }
1051
1052 func endpoints(name, namespace, gatewayIP string, gatewayIdentity string, ports []corev1.EndpointPort) *corev1.Endpoints {
1053 var subsets []corev1.EndpointSubset
1054 if gatewayIP != "" {
1055 subsets = []corev1.EndpointSubset{
1056 {
1057 Addresses: []corev1.EndpointAddress{
1058 {
1059 IP: gatewayIP,
1060 },
1061 },
1062 Ports: ports,
1063 },
1064 }
1065 }
1066
1067 endpoints := &corev1.Endpoints{
1068 TypeMeta: metav1.TypeMeta{
1069 Kind: "Endpoints",
1070 APIVersion: "v1",
1071 },
1072 ObjectMeta: metav1.ObjectMeta{
1073 Name: name,
1074 Namespace: namespace,
1075 Labels: map[string]string{
1076 consts.RemoteClusterNameLabel: clusterName,
1077 consts.MirroredResourceLabel: "true",
1078 },
1079 Annotations: map[string]string{
1080 consts.RemoteServiceFqName: fmt.Sprintf("%s.%s.svc.cluster.local", strings.Replace(name, "-remote", "", 1), namespace),
1081 },
1082 },
1083 Subsets: subsets,
1084 }
1085
1086 if gatewayIdentity != "" {
1087 endpoints.Annotations[consts.RemoteGatewayIdentity] = gatewayIdentity
1088 }
1089
1090 return endpoints
1091 }
1092
1093 func endpointMirrorEndpoints(rootName, namespace, hostname, gatewayIP, gatewayIdentity string, ports []corev1.EndpointPort) *corev1.Endpoints {
1094 localName := fmt.Sprintf("%s-%s", hostname, clusterName)
1095 ep := endpoints(localName, namespace, gatewayIP, gatewayIdentity, ports)
1096
1097 ep.Annotations[consts.RemoteServiceFqName] = fmt.Sprintf("%s.%s.%s.svc.cluster.local", hostname, strings.Replace(rootName, "-remote", "", 1), namespace)
1098 ep.Labels[consts.MirroredHeadlessSvcNameLabel] = rootName
1099
1100 return ep
1101 }
1102
1103 func headlessMirrorEndpoints(name, namespace, hostname, hostIP, gatewayIdentity string, ports []corev1.EndpointPort) *corev1.Endpoints {
1104 endpoints := &corev1.Endpoints{
1105 TypeMeta: metav1.TypeMeta{
1106 Kind: "Endpoints",
1107 APIVersion: "v1",
1108 },
1109 ObjectMeta: metav1.ObjectMeta{
1110 Name: name,
1111 Namespace: namespace,
1112 Labels: map[string]string{
1113 consts.RemoteClusterNameLabel: clusterName,
1114 consts.MirroredResourceLabel: "true",
1115 },
1116 Annotations: map[string]string{
1117 consts.RemoteServiceFqName: fmt.Sprintf("%s.%s.svc.cluster.local", strings.Replace(name, "-remote", "", 1), namespace),
1118 },
1119 },
1120 Subsets: []corev1.EndpointSubset{
1121 {
1122 Addresses: []corev1.EndpointAddress{
1123 {
1124 Hostname: hostname,
1125 IP: hostIP,
1126 },
1127 },
1128 Ports: ports,
1129 },
1130 },
1131 }
1132
1133 if gatewayIdentity != "" {
1134 endpoints.Annotations[consts.RemoteGatewayIdentity] = gatewayIdentity
1135 }
1136
1137 return endpoints
1138 }
1139
1140 func headlessMirrorEndpointsUpdated(name, namespace string, hostnames, hostIPs []string, gatewayIdentity string, ports []corev1.EndpointPort) *corev1.Endpoints {
1141 endpoints := &corev1.Endpoints{
1142 TypeMeta: metav1.TypeMeta{
1143 Kind: "Endpoints",
1144 APIVersion: "v1",
1145 },
1146 ObjectMeta: metav1.ObjectMeta{
1147 Name: name,
1148 Namespace: namespace,
1149 Labels: map[string]string{
1150 consts.RemoteClusterNameLabel: clusterName,
1151 consts.MirroredResourceLabel: "true",
1152 },
1153 Annotations: map[string]string{
1154 consts.RemoteServiceFqName: fmt.Sprintf("%s.%s.svc.cluster.local", strings.Replace(name, "-remote", "", 1), namespace),
1155 },
1156 },
1157 Subsets: []corev1.EndpointSubset{
1158 {
1159 Addresses: []corev1.EndpointAddress{
1160 {
1161 Hostname: hostnames[0],
1162 IP: hostIPs[0],
1163 },
1164 {
1165 Hostname: hostnames[1],
1166 IP: hostIPs[1],
1167 },
1168 },
1169 Ports: ports,
1170 },
1171 },
1172 }
1173
1174 if gatewayIdentity != "" {
1175 endpoints.Annotations[consts.RemoteGatewayIdentity] = gatewayIdentity
1176 }
1177
1178 return endpoints
1179 }
1180
1181 func namespaceAsYaml(name string) string {
1182 ns := &corev1.Namespace{
1183 TypeMeta: metav1.TypeMeta{
1184 Kind: "Namespace",
1185 APIVersion: "v1",
1186 },
1187 ObjectMeta: metav1.ObjectMeta{
1188 Name: name,
1189 },
1190 }
1191
1192 bytes, err := yaml.Marshal(ns)
1193 if err != nil {
1194 log.Fatal(err)
1195 }
1196 return string(bytes)
1197 }
1198
1199 func endpointsAsYaml(name, namespace, gatewayIP, gatewayIdentity string, ports []corev1.EndpointPort) string {
1200 ep := endpoints(name, namespace, gatewayIP, gatewayIdentity, ports)
1201
1202 bytes, err := yaml.Marshal(ep)
1203 if err != nil {
1204 log.Fatal(err)
1205 }
1206 return string(bytes)
1207 }
1208
1209 func headlessMirrorEndpointsAsYaml(name, namespace, hostname, hostIP, gatewayIdentity string, ports []corev1.EndpointPort) string {
1210 ep := headlessMirrorEndpoints(name, namespace, hostname, hostIP, gatewayIdentity, ports)
1211
1212 bytes, err := yaml.Marshal(ep)
1213 if err != nil {
1214 log.Fatal(err)
1215 }
1216
1217 return string(bytes)
1218 }
1219
1220 func endpointMirrorEndpointsAsYaml(name, namespace, hostname, gatewayIP, gatewayIdentity string, ports []corev1.EndpointPort) string {
1221 ep := endpointMirrorEndpoints(name, namespace, hostname, gatewayIP, gatewayIdentity, ports)
1222
1223 bytes, err := yaml.Marshal(ep)
1224 if err != nil {
1225 log.Fatal(err)
1226 }
1227
1228 return string(bytes)
1229 }
1230
1231
1232
1233
1234
1235 func createEnvWithSelector(defaultSelector, remoteSelector *metav1.LabelSelector) *testEnvironment {
1236 return &testEnvironment{
1237 events: []interface{}{
1238 &OnAddCalled{
1239 svc: remoteService("service-one", "ns1", "111", map[string]string{
1240 consts.DefaultExportedServiceSelector: "true",
1241 }, []corev1.ServicePort{
1242 {
1243 Name: "default1",
1244 Protocol: "TCP",
1245 Port: 555,
1246 },
1247 {
1248 Name: "default2",
1249 Protocol: "TCP",
1250 Port: 666,
1251 },
1252 }),
1253 },
1254 &OnAddCalled{
1255 svc: remoteService("service-two", "ns1", "111", map[string]string{
1256 consts.DefaultExportedServiceSelector: "remote-discovery",
1257 }, []corev1.ServicePort{
1258 {
1259 Name: "remote1",
1260 Protocol: "TCP",
1261 Port: 777,
1262 },
1263 {
1264 Name: "remote2",
1265 Protocol: "TCP",
1266 Port: 888,
1267 },
1268 }),
1269 },
1270 },
1271 localResources: []string{
1272 namespaceAsYaml("ns1"),
1273 },
1274 remoteResources: []string{
1275 endpointsAsYaml("service-one", "ns1", "192.0.2.127", "gateway-identity", []corev1.EndpointPort{}),
1276 endpointsAsYaml("service-two", "ns1", "192.0.3.127", "gateway-identity", []corev1.EndpointPort{}),
1277 },
1278 link: multicluster.Link{
1279 TargetClusterName: clusterName,
1280 TargetClusterDomain: clusterDomain,
1281 GatewayIdentity: "",
1282 GatewayAddress: "",
1283 GatewayPort: 0,
1284 ProbeSpec: multicluster.ProbeSpec{
1285 Path: "",
1286 Port: 0,
1287 Period: time.Duration(0) * time.Second,
1288 },
1289 Selector: *defaultSelector,
1290 RemoteDiscoverySelector: *remoteSelector,
1291 },
1292 }
1293 }
1294
View as plain text