1 package servicemirror
2
3 import (
4 "context"
5 "errors"
6 "fmt"
7 "net"
8 "sort"
9 "strings"
10 "time"
11
12 "github.com/linkerd/linkerd2/controller/k8s"
13 consts "github.com/linkerd/linkerd2/pkg/k8s"
14 "github.com/linkerd/linkerd2/pkg/multicluster"
15 "github.com/prometheus/client_golang/prometheus"
16 logging "github.com/sirupsen/logrus"
17 corev1 "k8s.io/api/core/v1"
18 kerrors "k8s.io/apimachinery/pkg/api/errors"
19 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
20 "k8s.io/apimachinery/pkg/labels"
21 "k8s.io/client-go/kubernetes/scheme"
22 typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
23 "k8s.io/client-go/rest"
24 "k8s.io/client-go/tools/cache"
25 "k8s.io/client-go/tools/record"
26 "k8s.io/client-go/util/workqueue"
27 )
28
29 const (
30 eventTypeSkipped = "ServiceMirroringSkipped"
31 )
32
33 type (
34
35
36
37
38
39
40
41 RemoteClusterServiceWatcher struct {
42 serviceMirrorNamespace string
43 link *multicluster.Link
44 remoteAPIClient *k8s.API
45 localAPIClient *k8s.API
46 stopper chan struct{}
47 eventBroadcaster record.EventBroadcaster
48 recorder record.EventRecorder
49 log *logging.Entry
50 eventsQueue workqueue.RateLimitingInterface
51 requeueLimit int
52 repairPeriod time.Duration
53 gatewayAlive bool
54 liveness chan bool
55 headlessServicesEnabled bool
56
57 informerHandlers
58 }
59
60 informerHandlers struct {
61 svcHandler cache.ResourceEventHandlerRegistration
62 epHandler cache.ResourceEventHandlerRegistration
63 nsHandler cache.ResourceEventHandlerRegistration
64 }
65
66
67
68 RemoteServiceCreated struct {
69 service *corev1.Service
70 }
71
72
73
74
75
76 RemoteServiceUpdated struct {
77 localService *corev1.Service
78 localEndpoints *corev1.Endpoints
79 remoteUpdate *corev1.Service
80 }
81
82
83
84 RemoteServiceDeleted struct {
85 Name string
86 Namespace string
87 }
88
89
90 ClusterUnregistered struct{}
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106 OrphanedServicesGcTriggered struct{}
107
108
109
110 OnAddCalled struct {
111 svc *corev1.Service
112 }
113
114
115
116 OnAddEndpointsCalled struct {
117 ep *corev1.Endpoints
118 }
119
120
121
122 OnUpdateCalled struct {
123 svc *corev1.Service
124 }
125
126
127
128 OnUpdateEndpointsCalled struct {
129 ep *corev1.Endpoints
130 }
131
132
133 OnDeleteCalled struct {
134 svc *corev1.Service
135 }
136
137
138
139 RepairEndpoints struct{}
140
141
142
143
144 OnLocalNamespaceAdded struct {
145 ns *corev1.Namespace
146 }
147
148
149 RetryableError struct{ Inner []error }
150 )
151
152 func (re RetryableError) Error() string {
153 var errorStrings []string
154 for _, err := range re.Inner {
155 errorStrings = append(errorStrings, err.Error())
156 }
157 return fmt.Sprintf("Inner errors:\n\t%s", strings.Join(errorStrings, "\n\t"))
158 }
159
160
161 func NewRemoteClusterServiceWatcher(
162 ctx context.Context,
163 serviceMirrorNamespace string,
164 localAPI *k8s.API,
165 cfg *rest.Config,
166 link *multicluster.Link,
167 requeueLimit int,
168 repairPeriod time.Duration,
169 liveness chan bool,
170 enableHeadlessSvc bool,
171 ) (*RemoteClusterServiceWatcher, error) {
172 remoteAPI, err := k8s.InitializeAPIForConfig(ctx, cfg, false, clusterName, k8s.Svc, k8s.Endpoint)
173 if err != nil {
174 return nil, fmt.Errorf("cannot initialize api for target cluster %s: %w", clusterName, err)
175 }
176 _, err = remoteAPI.Client.Discovery().ServerVersion()
177 if err != nil {
178 remoteAPI.UnregisterGauges()
179 return nil, fmt.Errorf("cannot connect to api for target cluster %s: %w", clusterName, err)
180 }
181
182
183 eventBroadcaster := record.NewBroadcaster()
184 eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{
185 Interface: remoteAPI.Client.CoreV1().Events(""),
186 })
187 recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{
188 Component: fmt.Sprintf("linkerd-service-mirror-%s", clusterName),
189 })
190
191 stopper := make(chan struct{})
192 return &RemoteClusterServiceWatcher{
193 serviceMirrorNamespace: serviceMirrorNamespace,
194 link: link,
195 remoteAPIClient: remoteAPI,
196 localAPIClient: localAPI,
197 stopper: stopper,
198 eventBroadcaster: eventBroadcaster,
199 recorder: recorder,
200 log: logging.WithFields(logging.Fields{
201 "cluster": clusterName,
202 "apiAddress": cfg.Host,
203 }),
204 eventsQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
205 requeueLimit: requeueLimit,
206 repairPeriod: repairPeriod,
207 liveness: liveness,
208 headlessServicesEnabled: enableHeadlessSvc,
209
210 gatewayAlive: true,
211 }, nil
212 }
213
214 func (rcsw *RemoteClusterServiceWatcher) mirroredResourceName(remoteName string) string {
215 return fmt.Sprintf("%s-%s", remoteName, rcsw.link.TargetClusterName)
216 }
217
218 func (rcsw *RemoteClusterServiceWatcher) targetResourceName(mirrorName string) string {
219 return strings.TrimSuffix(mirrorName, "-"+rcsw.link.TargetClusterName)
220 }
221
222 func (rcsw *RemoteClusterServiceWatcher) originalResourceName(mirroredName string) string {
223 return strings.TrimSuffix(mirroredName, fmt.Sprintf("-%s", rcsw.link.TargetClusterName))
224 }
225
226
227
228
229
230 func (rcsw *RemoteClusterServiceWatcher) getMirroredServiceLabels(remoteService *corev1.Service) map[string]string {
231 labels := map[string]string{
232 consts.MirroredResourceLabel: "true",
233 consts.RemoteClusterNameLabel: rcsw.link.TargetClusterName,
234 }
235
236 if remoteService == nil {
237 return labels
238 }
239
240 if rcsw.isRemoteDiscovery(remoteService.Labels) {
241 labels[consts.RemoteDiscoveryLabel] = rcsw.link.TargetClusterName
242 labels[consts.RemoteServiceLabel] = remoteService.GetName()
243 }
244
245 for key, value := range remoteService.ObjectMeta.Labels {
246 if strings.HasPrefix(key, consts.SvcMirrorPrefix) {
247 continue
248 }
249 labels[key] = value
250 }
251
252 return labels
253 }
254
255
256 func (rcsw *RemoteClusterServiceWatcher) getMirroredServiceAnnotations(remoteService *corev1.Service) map[string]string {
257 annotations := map[string]string{
258 consts.RemoteResourceVersionAnnotation: remoteService.ResourceVersion,
259 consts.RemoteServiceFqName: fmt.Sprintf("%s.%s.svc.%s", remoteService.Name, remoteService.Namespace, rcsw.link.TargetClusterDomain),
260 }
261
262 for key, value := range remoteService.ObjectMeta.Annotations {
263
264 if key == "service.kubernetes.io/topology-aware-hints" || key == "service.kubernetes.io/topology-mode" {
265 continue
266 }
267 annotations[key] = value
268 }
269
270 value, ok := remoteService.GetAnnotations()[consts.ProxyOpaquePortsAnnotation]
271 if ok {
272 annotations[consts.ProxyOpaquePortsAnnotation] = value
273 }
274
275 return annotations
276 }
277
278
279
280
281
282 func (rcsw *RemoteClusterServiceWatcher) getEndpointsPorts(service *corev1.Service) []corev1.EndpointPort {
283 var endpointsPorts []corev1.EndpointPort
284 for _, remotePort := range service.Spec.Ports {
285 endpointsPorts = append(endpointsPorts, corev1.EndpointPort{
286 Name: remotePort.Name,
287 Protocol: remotePort.Protocol,
288 Port: int32(rcsw.link.GatewayPort),
289 })
290 }
291 return endpointsPorts
292 }
293
294 func (rcsw *RemoteClusterServiceWatcher) cleanupOrphanedServices(ctx context.Context) error {
295 matchLabels := map[string]string{
296 consts.MirroredResourceLabel: "true",
297 consts.RemoteClusterNameLabel: rcsw.link.TargetClusterName,
298 }
299
300 servicesOnLocalCluster, err := rcsw.localAPIClient.Svc().Lister().List(labels.Set(matchLabels).AsSelector())
301 if err != nil {
302 innerErr := fmt.Errorf("failed to list services while cleaning up mirror services: %w", err)
303 if kerrors.IsNotFound(err) {
304 return innerErr
305 }
306
307 return RetryableError{[]error{innerErr}}
308 }
309
310 var errors []error
311 for _, srv := range servicesOnLocalCluster {
312 mirroredName := srv.Name
313
314
315 if remoteHeadlessSvcName, headlessMirror := srv.Labels[consts.MirroredHeadlessSvcNameLabel]; headlessMirror {
316 mirroredName = remoteHeadlessSvcName
317 }
318 remoteServiceName := rcsw.originalResourceName(mirroredName)
319 _, err := rcsw.remoteAPIClient.Svc().Lister().Services(srv.Namespace).Get(remoteServiceName)
320 if err != nil {
321 if kerrors.IsNotFound(err) {
322
323 if err := rcsw.localAPIClient.Client.CoreV1().Services(srv.Namespace).Delete(ctx, srv.Name, metav1.DeleteOptions{}); err != nil {
324
325 errors = append(errors, err)
326 } else {
327 rcsw.log.Infof("Deleted service %s/%s while cleaning up mirror services", srv.Namespace, srv.Name)
328 }
329 } else {
330
331 errors = append(errors, err)
332 }
333 }
334 }
335 if len(errors) > 0 {
336 return RetryableError{errors}
337 }
338
339 return nil
340 }
341
342
343
344
345 func (rcsw *RemoteClusterServiceWatcher) cleanupMirroredResources(ctx context.Context) error {
346 matchLabels := rcsw.getMirroredServiceLabels(nil)
347
348 services, err := rcsw.localAPIClient.Svc().Lister().List(labels.Set(matchLabels).AsSelector())
349 if err != nil {
350 innerErr := fmt.Errorf("could not retrieve mirrored services that need cleaning up: %w", err)
351 if kerrors.IsNotFound(err) {
352 return innerErr
353 }
354
355 return RetryableError{[]error{innerErr}}
356 }
357
358 var errors []error
359 for _, svc := range services {
360 if err := rcsw.localAPIClient.Client.CoreV1().Services(svc.Namespace).Delete(ctx, svc.Name, metav1.DeleteOptions{}); err != nil {
361 if kerrors.IsNotFound(err) {
362 continue
363 }
364 errors = append(errors, fmt.Errorf("Could not delete service %s/%s: %w", svc.Namespace, svc.Name, err))
365 } else {
366 rcsw.log.Infof("Deleted service %s/%s", svc.Namespace, svc.Name)
367 }
368 }
369
370 endpoints, err := rcsw.localAPIClient.Endpoint().Lister().List(labels.Set(matchLabels).AsSelector())
371 if err != nil {
372 innerErr := fmt.Errorf("could not retrieve endpoints that need cleaning up: %w", err)
373 if kerrors.IsNotFound(err) {
374 return innerErr
375 }
376 return RetryableError{[]error{innerErr}}
377 }
378
379 for _, endpoint := range endpoints {
380 if err := rcsw.localAPIClient.Client.CoreV1().Endpoints(endpoint.Namespace).Delete(ctx, endpoint.Name, metav1.DeleteOptions{}); err != nil {
381 if kerrors.IsNotFound(err) {
382 continue
383 }
384 errors = append(errors, fmt.Errorf("Could not delete endpoints %s/%s: %w", endpoint.Namespace, endpoint.Name, err))
385 } else {
386 rcsw.log.Infof("Deleted endpoints %s/%s", endpoint.Namespace, endpoint.Name)
387 }
388 }
389
390 if len(errors) > 0 {
391 return RetryableError{errors}
392 }
393 return nil
394 }
395
396
397 func (rcsw *RemoteClusterServiceWatcher) handleRemoteServiceDeleted(ctx context.Context, ev *RemoteServiceDeleted) error {
398 localServiceName := rcsw.mirroredResourceName(ev.Name)
399 localService, err := rcsw.localAPIClient.Svc().Lister().Services(ev.Namespace).Get(localServiceName)
400 var errors []error
401 if err != nil {
402 if kerrors.IsNotFound(err) {
403 rcsw.log.Debugf("Failed to delete mirror service %s/%s: %v", ev.Namespace, ev.Name, err)
404 return nil
405 }
406 errors = append(errors, fmt.Errorf("could not fetch service %s/%s: %w", ev.Namespace, localServiceName, err))
407 }
408
409
410
411 if localService.Spec.ClusterIP == corev1.ClusterIPNone {
412 matchLabels := map[string]string{
413 consts.MirroredHeadlessSvcNameLabel: localServiceName,
414 }
415 endpointMirrorServices, err := rcsw.localAPIClient.Svc().Lister().List(labels.Set(matchLabels).AsSelector())
416 if err != nil {
417 if !kerrors.IsNotFound(err) {
418 errors = append(errors, fmt.Errorf("could not fetch endpoint mirrors for mirror service %s/%s: %w", ev.Namespace, localServiceName, err))
419 }
420 }
421
422 for _, endpointMirror := range endpointMirrorServices {
423 err = rcsw.localAPIClient.Client.CoreV1().Services(endpointMirror.Namespace).Delete(ctx, endpointMirror.Name, metav1.DeleteOptions{})
424 if err != nil {
425 if !kerrors.IsNotFound(err) {
426 errors = append(errors, fmt.Errorf("could not delete endpoint mirror %s/%s: %w", endpointMirror.Namespace, endpointMirror.Name, err))
427 }
428 }
429 }
430 }
431
432 rcsw.log.Infof("Deleting mirrored service %s/%s", ev.Namespace, localServiceName)
433 if err := rcsw.localAPIClient.Client.CoreV1().Services(ev.Namespace).Delete(ctx, localServiceName, metav1.DeleteOptions{}); err != nil {
434 if !kerrors.IsNotFound(err) {
435 errors = append(errors, fmt.Errorf("could not delete service: %s/%s: %w", ev.Namespace, localServiceName, err))
436 }
437 }
438
439 if len(errors) > 0 {
440 return RetryableError{errors}
441 }
442
443 rcsw.log.Infof("Successfully deleted service: %s/%s", ev.Namespace, localServiceName)
444 return nil
445 }
446
447
448
449 func (rcsw *RemoteClusterServiceWatcher) handleRemoteServiceUpdated(ctx context.Context, ev *RemoteServiceUpdated) error {
450 rcsw.log.Infof("Updating mirror service %s/%s", ev.localService.Namespace, ev.localService.Name)
451
452 if rcsw.isRemoteDiscovery(ev.remoteUpdate.Labels) {
453
454
455 if ev.localEndpoints != nil {
456 err := rcsw.localAPIClient.Client.CoreV1().Endpoints(ev.localService.Namespace).Delete(ctx, ev.localService.Name, metav1.DeleteOptions{})
457 if err != nil {
458 return RetryableError{[]error{
459 fmt.Errorf("failed to delete mirror endpoints for %s/%s: %w", ev.localService.Namespace, ev.localService.Name, err),
460 }}
461 }
462 }
463 } else if ev.localEndpoints == nil {
464
465
466 err := rcsw.createGatewayEndpoints(ctx, ev.remoteUpdate)
467 if err != nil {
468 return err
469 }
470 } else {
471
472
473 gatewayAddresses, err := rcsw.resolveGatewayAddress()
474 if err != nil {
475 return err
476 }
477
478 copiedEndpoints := ev.localEndpoints.DeepCopy()
479 copiedEndpoints.Subsets = []corev1.EndpointSubset{
480 {
481 Addresses: gatewayAddresses,
482 Ports: rcsw.getEndpointsPorts(ev.remoteUpdate),
483 },
484 }
485
486 if copiedEndpoints.Annotations == nil {
487 copiedEndpoints.Annotations = make(map[string]string)
488 }
489 copiedEndpoints.Annotations[consts.RemoteGatewayIdentity] = rcsw.link.GatewayIdentity
490
491 err = rcsw.updateMirrorEndpoints(ctx, copiedEndpoints)
492 if err != nil {
493 return RetryableError{[]error{err}}
494 }
495 }
496
497 ev.localService.Labels = rcsw.getMirroredServiceLabels(ev.remoteUpdate)
498 ev.localService.Annotations = rcsw.getMirroredServiceAnnotations(ev.remoteUpdate)
499 ev.localService.Spec.Ports = remapRemoteServicePorts(ev.remoteUpdate.Spec.Ports)
500
501 if _, err := rcsw.localAPIClient.Client.CoreV1().Services(ev.localService.Namespace).Update(ctx, ev.localService, metav1.UpdateOptions{}); err != nil {
502 return RetryableError{[]error{err}}
503 }
504 return nil
505 }
506
507 func remapRemoteServicePorts(ports []corev1.ServicePort) []corev1.ServicePort {
508
509
510 var newPorts []corev1.ServicePort
511 for _, port := range ports {
512 newPorts = append(newPorts, corev1.ServicePort{
513 Name: port.Name,
514 Protocol: port.Protocol,
515 Port: port.Port,
516 TargetPort: port.TargetPort,
517 })
518 }
519 return newPorts
520 }
521
522 func (rcsw *RemoteClusterServiceWatcher) handleRemoteServiceCreated(ctx context.Context, ev *RemoteServiceCreated) error {
523 remoteService := ev.service.DeepCopy()
524 if rcsw.headlessServicesEnabled && remoteService.Spec.ClusterIP == corev1.ClusterIPNone {
525 return nil
526 }
527
528 serviceInfo := fmt.Sprintf("%s/%s", remoteService.Namespace, remoteService.Name)
529 localServiceName := rcsw.mirroredResourceName(remoteService.Name)
530
531
532 if _, err := rcsw.localAPIClient.Client.CoreV1().Namespaces().Get(ctx, remoteService.Namespace, metav1.GetOptions{}); err != nil {
533 if kerrors.IsNotFound(err) {
534 rcsw.recorder.Event(remoteService, corev1.EventTypeNormal, eventTypeSkipped, "Skipped mirroring service: namespace does not exist")
535 rcsw.log.Warnf("Skipping mirroring of service %s: namespace %s does not exist", serviceInfo, remoteService.Namespace)
536 return nil
537 }
538
539 return RetryableError{[]error{err}}
540 }
541
542 serviceToCreate := &corev1.Service{
543 ObjectMeta: metav1.ObjectMeta{
544 Name: localServiceName,
545 Namespace: remoteService.Namespace,
546 Annotations: rcsw.getMirroredServiceAnnotations(remoteService),
547 Labels: rcsw.getMirroredServiceLabels(remoteService),
548 },
549 Spec: corev1.ServiceSpec{
550 Ports: remapRemoteServicePorts(remoteService.Spec.Ports),
551 },
552 }
553
554 rcsw.log.Infof("Creating a new service mirror for %s", serviceInfo)
555 if _, err := rcsw.localAPIClient.Client.CoreV1().Services(remoteService.Namespace).Create(ctx, serviceToCreate, metav1.CreateOptions{}); err != nil {
556 if !kerrors.IsAlreadyExists(err) {
557
558 return RetryableError{[]error{err}}
559 }
560 }
561
562 if rcsw.isRemoteDiscovery(remoteService.Labels) {
563
564 return nil
565 }
566 return rcsw.createGatewayEndpoints(ctx, remoteService)
567 }
568
569 func (rcsw *RemoteClusterServiceWatcher) handleLocalNamespaceAdded(ns *corev1.Namespace) error {
570
571
572 svcs, err := rcsw.remoteAPIClient.Svc().Lister().Services(ns.Name).List(labels.Everything())
573 if err != nil {
574 return RetryableError{[]error{err}}
575 }
576 for _, svc := range svcs {
577 rcsw.eventsQueue.Add(&OnAddCalled{
578 svc: svc,
579 })
580 }
581 return nil
582 }
583
584
585
586
587
588
589 func (rcsw *RemoteClusterServiceWatcher) isEmptyService(svc *corev1.Service) (bool, error) {
590 ep, err := rcsw.remoteAPIClient.Endpoint().Lister().Endpoints(svc.Namespace).Get(svc.Name)
591 if err != nil {
592 if kerrors.IsNotFound(err) {
593 rcsw.log.Debugf("target endpoint %s/%s not found", svc.Namespace, svc.Name)
594 return true, nil
595 }
596
597 return true, err
598 }
599 return rcsw.isEmptyEndpoints(ep), nil
600 }
601
602
603
604
605
606
607 func (rcsw *RemoteClusterServiceWatcher) isEmptyEndpoints(ep *corev1.Endpoints) bool {
608 if len(ep.Subsets) == 0 {
609 rcsw.log.Debugf("endpoint %s/%s has no Subsets", ep.Namespace, ep.Name)
610 return true
611 }
612 for _, subset := range ep.Subsets {
613 if len(subset.Addresses) > 0 {
614 return false
615 }
616 }
617 rcsw.log.Debugf("endpoint %s/%s has no ready addresses", ep.Namespace, ep.Name)
618 return true
619 }
620
621 func (rcsw *RemoteClusterServiceWatcher) createGatewayEndpoints(ctx context.Context, exportedService *corev1.Service) error {
622 empty, err := rcsw.isEmptyService(exportedService)
623 if err != nil {
624 return RetryableError{[]error{err}}
625 }
626
627 gatewayAddresses, err := rcsw.resolveGatewayAddress()
628 if err != nil {
629 return err
630 }
631
632 localServiceName := rcsw.mirroredResourceName(exportedService.Name)
633 serviceInfo := fmt.Sprintf("%s/%s", exportedService.Namespace, exportedService.Name)
634 endpointsToCreate := &corev1.Endpoints{
635 ObjectMeta: metav1.ObjectMeta{
636 Name: localServiceName,
637 Namespace: exportedService.Namespace,
638 Labels: map[string]string{
639 consts.MirroredResourceLabel: "true",
640 consts.RemoteClusterNameLabel: rcsw.link.TargetClusterName,
641 },
642 Annotations: map[string]string{
643 consts.RemoteServiceFqName: fmt.Sprintf("%s.%s.svc.%s", exportedService.Name, exportedService.Namespace, rcsw.link.TargetClusterDomain),
644 },
645 },
646 }
647
648 rcsw.log.Infof("Resolved gateway [%v:%d] for %s", gatewayAddresses, rcsw.link.GatewayPort, serviceInfo)
649
650 if !empty && len(gatewayAddresses) > 0 {
651 endpointsToCreate.Subsets = []corev1.EndpointSubset{
652 {
653 Addresses: gatewayAddresses,
654 Ports: rcsw.getEndpointsPorts(exportedService),
655 },
656 }
657 } else if !empty {
658 endpointsToCreate.Subsets = []corev1.EndpointSubset{
659 {
660 NotReadyAddresses: gatewayAddresses,
661 Ports: rcsw.getEndpointsPorts(exportedService),
662 },
663 }
664 rcsw.log.Warnf("could not resolve gateway addresses for %s; setting endpoint subsets to not ready", serviceInfo)
665 } else {
666 rcsw.log.Warnf("exported service %s is empty", serviceInfo)
667 }
668
669 if rcsw.link.GatewayIdentity != "" {
670 endpointsToCreate.Annotations[consts.RemoteGatewayIdentity] = rcsw.link.GatewayIdentity
671 }
672
673 rcsw.log.Infof("Creating a new endpoints for %s", serviceInfo)
674 err = rcsw.createMirrorEndpoints(ctx, endpointsToCreate)
675 if err != nil {
676 if svcErr := rcsw.localAPIClient.Client.CoreV1().Services(exportedService.Namespace).Delete(ctx, localServiceName, metav1.DeleteOptions{}); svcErr != nil {
677 rcsw.log.Errorf("Failed to delete service %s after endpoints creation failed: %s", localServiceName, svcErr)
678 }
679 return RetryableError{[]error{err}}
680 }
681 return nil
682 }
683
684
685
686
687 func (rcsw *RemoteClusterServiceWatcher) createOrUpdateService(service *corev1.Service) error {
688 localName := rcsw.mirroredResourceName(service.Name)
689
690 if rcsw.isExported(service.Labels) || rcsw.isRemoteDiscovery(service.Labels) {
691 localService, err := rcsw.localAPIClient.Svc().Lister().Services(service.Namespace).Get(localName)
692 if err != nil {
693 if kerrors.IsNotFound(err) {
694 rcsw.eventsQueue.Add(&RemoteServiceCreated{
695 service: service,
696 })
697 return nil
698 }
699 return RetryableError{[]error{err}}
700 }
701
702 lastMirroredRemoteVersion, ok := localService.Annotations[consts.RemoteResourceVersionAnnotation]
703 if ok && lastMirroredRemoteVersion != service.ResourceVersion {
704 endpoints, err := rcsw.localAPIClient.Endpoint().Lister().Endpoints(service.Namespace).Get(localName)
705 if err != nil {
706 if kerrors.IsNotFound(err) {
707 endpoints = nil
708 } else {
709 return RetryableError{[]error{err}}
710 }
711 }
712 rcsw.eventsQueue.Add(&RemoteServiceUpdated{
713 localService: localService,
714 localEndpoints: endpoints,
715 remoteUpdate: service,
716 })
717 return nil
718 }
719
720 return nil
721 }
722 localSvc, err := rcsw.localAPIClient.Svc().Lister().Services(service.Namespace).Get(localName)
723 if err == nil {
724 if localSvc.Labels != nil {
725 _, isMirroredRes := localSvc.Labels[consts.MirroredResourceLabel]
726 clusterName := localSvc.Labels[consts.RemoteClusterNameLabel]
727 if isMirroredRes && (clusterName == rcsw.link.TargetClusterName) {
728 rcsw.eventsQueue.Add(&RemoteServiceDeleted{
729 Name: service.Name,
730 Namespace: service.Namespace,
731 })
732 }
733 }
734 }
735 return nil
736 }
737
738 func (rcsw *RemoteClusterServiceWatcher) getMirrorServices() (*corev1.ServiceList, error) {
739 matchLabels := map[string]string{
740 consts.MirroredResourceLabel: "true",
741 consts.RemoteClusterNameLabel: rcsw.link.TargetClusterName,
742 }
743 services, err := rcsw.localAPIClient.Client.CoreV1().Services("").List(context.Background(), metav1.ListOptions{LabelSelector: labels.SelectorFromSet(matchLabels).String()})
744 if err != nil {
745 return nil, err
746 }
747 return services, nil
748 }
749
750 func (rcsw *RemoteClusterServiceWatcher) handleOnDelete(service *corev1.Service) {
751 if rcsw.isExported(service.Labels) || rcsw.isRemoteDiscovery(service.Labels) {
752 rcsw.eventsQueue.Add(&RemoteServiceDeleted{
753 Name: service.Name,
754 Namespace: service.Namespace,
755 })
756 } else {
757 rcsw.log.Infof("Skipping OnDelete for service %s", service)
758 }
759 }
760
761 func (rcsw *RemoteClusterServiceWatcher) processNextEvent(ctx context.Context) (bool, interface{}, error) {
762 event, done := rcsw.eventsQueue.Get()
763 if event != nil {
764 rcsw.log.Infof("Received: %s", event)
765 } else if done {
766 rcsw.log.Infof("Received: Stop")
767 }
768
769 var err error
770 switch ev := event.(type) {
771 case *OnAddCalled:
772 err = rcsw.createOrUpdateService(ev.svc)
773 case *OnAddEndpointsCalled:
774 err = rcsw.handleCreateOrUpdateEndpoints(ctx, ev.ep)
775 case *OnUpdateCalled:
776 err = rcsw.createOrUpdateService(ev.svc)
777 case *OnUpdateEndpointsCalled:
778 err = rcsw.handleCreateOrUpdateEndpoints(ctx, ev.ep)
779 case *OnDeleteCalled:
780 rcsw.handleOnDelete(ev.svc)
781 case *RemoteServiceCreated:
782 err = rcsw.handleRemoteServiceCreated(ctx, ev)
783 case *RemoteServiceUpdated:
784 err = rcsw.handleRemoteServiceUpdated(ctx, ev)
785 case *RemoteServiceDeleted:
786 err = rcsw.handleRemoteServiceDeleted(ctx, ev)
787 case *ClusterUnregistered:
788 err = rcsw.cleanupMirroredResources(ctx)
789 case *OrphanedServicesGcTriggered:
790 err = rcsw.cleanupOrphanedServices(ctx)
791 case *RepairEndpoints:
792 err = rcsw.repairEndpoints(ctx)
793 case *OnLocalNamespaceAdded:
794 err = rcsw.handleLocalNamespaceAdded(ev.ns)
795 default:
796 if ev != nil || !done {
797 rcsw.log.Warnf("Received unknown event: %v", ev)
798 }
799 }
800
801 return done, event, err
802
803 }
804
805
806
807 func (rcsw *RemoteClusterServiceWatcher) processEvents(ctx context.Context) {
808 for {
809 done, event, err := rcsw.processNextEvent(ctx)
810 rcsw.eventsQueue.Done(event)
811
812
813
814
815 if err == nil {
816 rcsw.eventsQueue.Forget(event)
817 } else {
818 var re RetryableError
819 if errors.As(err, &re) {
820 rcsw.log.Warnf("Requeues: %d, Limit: %d for event %s", rcsw.eventsQueue.NumRequeues(event), rcsw.requeueLimit, event)
821 if (rcsw.eventsQueue.NumRequeues(event) < rcsw.requeueLimit) && !done {
822 rcsw.log.Errorf("Error processing %s (will retry): %s", event, re)
823 rcsw.eventsQueue.AddRateLimited(event)
824 } else {
825 rcsw.log.Errorf("Error processing %s (giving up): %s", event, re)
826 rcsw.eventsQueue.Forget(event)
827 }
828 } else {
829 rcsw.log.Errorf("Error processing %s (will not retry): %s", event, err)
830 }
831 }
832 if done {
833 rcsw.log.Infof("Shutting down events processor")
834 return
835 }
836 }
837 }
838
839
840 func (rcsw *RemoteClusterServiceWatcher) Start(ctx context.Context) error {
841 rcsw.remoteAPIClient.Sync(rcsw.stopper)
842 rcsw.eventsQueue.Add(&OrphanedServicesGcTriggered{})
843 var err error
844 rcsw.svcHandler, err = rcsw.remoteAPIClient.Svc().Informer().AddEventHandler(
845 cache.ResourceEventHandlerFuncs{
846 AddFunc: func(svc interface{}) {
847 rcsw.eventsQueue.Add(&OnAddCalled{svc.(*corev1.Service)})
848 },
849 DeleteFunc: func(obj interface{}) {
850 service, ok := obj.(*corev1.Service)
851 if !ok {
852 tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
853 if !ok {
854 rcsw.log.Errorf("couldn't get object from DeletedFinalStateUnknown %#v", obj)
855 return
856 }
857 service, ok = tombstone.Obj.(*corev1.Service)
858 if !ok {
859 rcsw.log.Errorf("DeletedFinalStateUnknown contained object that is not a Service %#v", obj)
860 return
861 }
862 }
863 rcsw.eventsQueue.Add(&OnDeleteCalled{service})
864 },
865 UpdateFunc: func(_, new interface{}) {
866 rcsw.eventsQueue.Add(&OnUpdateCalled{new.(*corev1.Service)})
867 },
868 },
869 )
870 if err != nil {
871 return err
872 }
873
874 rcsw.epHandler, err = rcsw.remoteAPIClient.Endpoint().Informer().AddEventHandler(
875 cache.ResourceEventHandlerFuncs{
876
877 AddFunc: func(obj interface{}) {
878 ep, ok := obj.(*corev1.Endpoints)
879 if !ok {
880 rcsw.log.Errorf("error processing endpoints object: got %#v, expected *corev1.Endpoints", ep)
881 return
882 }
883
884 if !rcsw.isExported(ep.Labels) {
885 rcsw.log.Debugf("skipped processing endpoints object %s/%s: missing %s label", ep.Namespace, ep.Name, consts.DefaultExportedServiceSelector)
886 return
887 }
888
889 if !isHeadlessEndpoints(ep, rcsw.log) {
890 return
891 }
892
893 rcsw.eventsQueue.Add(&OnAddEndpointsCalled{obj.(*corev1.Endpoints)})
894 },
895
896 UpdateFunc: func(_, new interface{}) {
897 epNew, ok := new.(*corev1.Endpoints)
898 if !ok {
899 rcsw.log.Errorf("error processing endpoints object: got %#v, expected *corev1.Endpoints", epNew)
900 return
901 }
902 if !rcsw.isExported(epNew.Labels) {
903 rcsw.log.Debugf("skipped processing endpoints object %s/%s: missing %s label", epNew.Namespace, epNew.Name, consts.DefaultExportedServiceSelector)
904 return
905 }
906 if rcsw.isRemoteDiscovery(epNew.Labels) {
907 rcsw.log.Debugf("skipped processing endpoints object %s/%s (service labeled for remote-discovery mode)", epNew.Namespace, epNew.Name)
908 return
909 }
910 rcsw.eventsQueue.Add(&OnUpdateEndpointsCalled{epNew})
911 },
912 },
913 )
914 if err != nil {
915 return err
916 }
917
918 rcsw.nsHandler, err = rcsw.localAPIClient.NS().Informer().AddEventHandler(
919 cache.ResourceEventHandlerFuncs{
920 AddFunc: func(obj interface{}) {
921 rcsw.eventsQueue.Add(&OnLocalNamespaceAdded{obj.(*corev1.Namespace)})
922 },
923 },
924 )
925 if err != nil {
926 return err
927 }
928
929 go rcsw.processEvents(ctx)
930
931
932 if rcsw.link.GatewayAddress == "" {
933 return nil
934 }
935
936
937
938 ev := RepairEndpoints{}
939 rcsw.eventsQueue.Add(&ev)
940
941 go func() {
942 ticker := time.NewTicker(rcsw.repairPeriod)
943 for {
944 select {
945 case <-ticker.C:
946 ev := RepairEndpoints{}
947 rcsw.eventsQueue.Add(&ev)
948 case alive := <-rcsw.liveness:
949 rcsw.log.Debugf("gateway liveness change from %t to %t", rcsw.gatewayAlive, alive)
950 rcsw.gatewayAlive = alive
951 ev := RepairEndpoints{}
952 rcsw.eventsQueue.Add(&ev)
953 case <-rcsw.stopper:
954 return
955 }
956 }
957 }()
958
959 return nil
960 }
961
962
963 func (rcsw *RemoteClusterServiceWatcher) Stop(cleanupState bool) {
964 close(rcsw.stopper)
965 if cleanupState {
966 rcsw.eventsQueue.Add(&ClusterUnregistered{})
967 }
968 rcsw.eventsQueue.ShutDown()
969 rcsw.eventBroadcaster.Shutdown()
970
971 if rcsw.svcHandler != nil {
972 if err := rcsw.remoteAPIClient.Svc().Informer().RemoveEventHandler(rcsw.svcHandler); err != nil {
973 rcsw.log.Warnf("error removing service informer handler: %s", err)
974 }
975 }
976 if rcsw.epHandler != nil {
977 if err := rcsw.remoteAPIClient.Endpoint().Informer().RemoveEventHandler(rcsw.epHandler); err != nil {
978 rcsw.log.Warnf("error removing service informer handler: %s", err)
979 }
980 }
981 if rcsw.nsHandler != nil {
982 if err := rcsw.localAPIClient.NS().Informer().RemoveEventHandler(rcsw.nsHandler); err != nil {
983 rcsw.log.Warnf("error removing service informer handler: %s", err)
984 }
985 }
986
987 if rcsw.remoteAPIClient != nil {
988 rcsw.remoteAPIClient.UnregisterGauges()
989 }
990 }
991
992 func (rcsw *RemoteClusterServiceWatcher) resolveGatewayAddress() ([]corev1.EndpointAddress, error) {
993 var gatewayEndpoints []corev1.EndpointAddress
994 var errors []error
995 for _, addr := range strings.Split(rcsw.link.GatewayAddress, ",") {
996 ipAddrs, err := net.LookupIP(addr)
997 if err != nil {
998 err = fmt.Errorf("Error resolving '%s': %w", addr, err)
999 rcsw.log.Warn(err)
1000 errors = append(errors, err)
1001 continue
1002 }
1003
1004 for _, ipAddr := range ipAddrs {
1005 gatewayEndpoints = append(gatewayEndpoints, corev1.EndpointAddress{
1006 IP: ipAddr.String(),
1007 })
1008 }
1009 }
1010
1011 if len(gatewayEndpoints) == 0 {
1012 return nil, RetryableError{errors}
1013 }
1014
1015 sort.SliceStable(gatewayEndpoints, func(i, j int) bool {
1016 return gatewayEndpoints[i].IP < gatewayEndpoints[j].IP
1017 })
1018 return gatewayEndpoints, nil
1019 }
1020
1021 func (rcsw *RemoteClusterServiceWatcher) repairEndpoints(ctx context.Context) error {
1022 endpointRepairCounter.With(prometheus.Labels{
1023 gatewayClusterName: rcsw.link.TargetClusterName,
1024 }).Inc()
1025
1026
1027
1028 gatewayAddresses, err := rcsw.resolveGatewayAddress()
1029 if err != nil {
1030 return err
1031 }
1032 err = rcsw.createOrUpdateGatewayEndpoints(ctx, gatewayAddresses)
1033 if err != nil {
1034 rcsw.log.Errorf("Failed to create/update gateway mirror endpoints: %s", err)
1035 }
1036
1037
1038 mirrorServices, err := rcsw.getMirrorServices()
1039 if err != nil {
1040 return RetryableError{[]error{fmt.Errorf("Failed to list mirror services: %w", err)}}
1041 }
1042 for _, svc := range mirrorServices.Items {
1043 svc := svc
1044
1045
1046
1047
1048 if svc.Spec.ClusterIP == corev1.ClusterIPNone {
1049 rcsw.log.Debugf("Skipped repairing endpoints for headless mirror %s/%s", svc.Namespace, svc.Name)
1050 continue
1051 }
1052
1053 if _, ok := svc.Labels[consts.RemoteDiscoveryLabel]; ok {
1054 rcsw.log.Debugf("Skipped repairing endpoints for service in remote-discovery mode %s/%s", svc.Namespace, svc.Name)
1055 continue
1056 }
1057
1058 endpoints, err := rcsw.localAPIClient.Endpoint().Lister().Endpoints(svc.Namespace).Get(svc.Name)
1059 if err != nil {
1060 if !kerrors.IsNotFound(err) {
1061 rcsw.log.Errorf("Failed to list local endpoints: %s", err)
1062 continue
1063 }
1064 endpoints, err = rcsw.localAPIClient.Client.CoreV1().Endpoints(svc.Namespace).Get(ctx, svc.Name, metav1.GetOptions{})
1065 if err != nil {
1066 rcsw.log.Errorf("Failed to get local endpoints %s/%s: %s", svc.Namespace, svc.Name, err)
1067 continue
1068 }
1069 }
1070 updatedEndpoints := endpoints.DeepCopy()
1071 updatedEndpoints.Subsets = []corev1.EndpointSubset{
1072 {
1073 Addresses: gatewayAddresses,
1074 Ports: rcsw.getEndpointsPorts(&svc),
1075 },
1076 }
1077
1078
1079
1080
1081
1082
1083 if _, found := svc.Labels[consts.MirroredHeadlessSvcNameLabel]; !found {
1084 targetService := svc.DeepCopy()
1085 targetService.Name = rcsw.targetResourceName(svc.Name)
1086 empty, err := rcsw.isEmptyService(targetService)
1087 if err != nil {
1088 rcsw.log.Errorf("Could not check service emptiness: %s", err)
1089 continue
1090 }
1091 if empty {
1092 rcsw.log.Warnf("Exported service %s/%s is empty", targetService.Namespace, targetService.Name)
1093 updatedEndpoints.Subsets = []corev1.EndpointSubset{}
1094 }
1095 }
1096
1097 if updatedEndpoints.Annotations == nil {
1098 updatedEndpoints.Annotations = make(map[string]string)
1099 }
1100 updatedEndpoints.Annotations[consts.RemoteGatewayIdentity] = rcsw.link.GatewayIdentity
1101
1102 err = rcsw.updateMirrorEndpoints(ctx, updatedEndpoints)
1103 if err != nil {
1104 rcsw.log.Error(err)
1105 }
1106 }
1107
1108 return nil
1109 }
1110
1111
1112
1113
1114
1115 func (rcsw *RemoteClusterServiceWatcher) createOrUpdateGatewayEndpoints(ctx context.Context, addressses []corev1.EndpointAddress) error {
1116 gatewayMirrorName := fmt.Sprintf("probe-gateway-%s", rcsw.link.TargetClusterName)
1117 endpoints := &corev1.Endpoints{
1118 ObjectMeta: metav1.ObjectMeta{
1119 Name: gatewayMirrorName,
1120 Namespace: rcsw.serviceMirrorNamespace,
1121 Labels: map[string]string{
1122 consts.RemoteClusterNameLabel: rcsw.link.TargetClusterName,
1123 },
1124 Annotations: map[string]string{
1125 consts.RemoteGatewayIdentity: rcsw.link.GatewayIdentity,
1126 },
1127 },
1128 Subsets: []corev1.EndpointSubset{
1129 {
1130 Addresses: addressses,
1131 Ports: []corev1.EndpointPort{
1132 {
1133 Name: "mc-probe",
1134 Port: int32(rcsw.link.ProbeSpec.Port),
1135 Protocol: "TCP",
1136 },
1137 },
1138 },
1139 },
1140 }
1141 _, err := rcsw.localAPIClient.Client.CoreV1().Endpoints(endpoints.Namespace).Get(ctx, endpoints.Name, metav1.GetOptions{})
1142 if err != nil {
1143 if !kerrors.IsNotFound(err) {
1144 return err
1145 }
1146
1147
1148
1149
1150
1151 _, err = rcsw.localAPIClient.Client.CoreV1().Endpoints(endpoints.Namespace).Create(ctx, endpoints, metav1.CreateOptions{})
1152 if err != nil {
1153 return err
1154 }
1155 return nil
1156 }
1157
1158
1159
1160
1161 _, err = rcsw.localAPIClient.Client.CoreV1().Endpoints(endpoints.Namespace).Update(ctx, endpoints, metav1.UpdateOptions{})
1162 return err
1163 }
1164
1165
1166
1167
1168
1169
1170 func (rcsw *RemoteClusterServiceWatcher) handleCreateOrUpdateEndpoints(
1171 ctx context.Context,
1172 exportedEndpoints *corev1.Endpoints,
1173 ) error {
1174 if isHeadlessEndpoints(exportedEndpoints, rcsw.log) {
1175 if rcsw.headlessServicesEnabled {
1176 return rcsw.createOrUpdateHeadlessEndpoints(ctx, exportedEndpoints)
1177 }
1178 return nil
1179 }
1180
1181 localServiceName := rcsw.mirroredResourceName(exportedEndpoints.Name)
1182 ep, err := rcsw.localAPIClient.Endpoint().Lister().Endpoints(exportedEndpoints.Namespace).Get(localServiceName)
1183 if err != nil {
1184 return RetryableError{[]error{err}}
1185 }
1186
1187 if (rcsw.isEmptyEndpoints(ep) && rcsw.isEmptyEndpoints(exportedEndpoints)) ||
1188 (!rcsw.isEmptyEndpoints(ep) && !rcsw.isEmptyEndpoints(exportedEndpoints)) {
1189 return nil
1190 }
1191
1192 rcsw.log.Infof("Updating subsets for mirror endpoint %s/%s", exportedEndpoints.Namespace, exportedEndpoints.Name)
1193 if rcsw.isEmptyEndpoints(exportedEndpoints) {
1194 ep.Subsets = []corev1.EndpointSubset{}
1195 } else {
1196 exportedService, err := rcsw.remoteAPIClient.Svc().Lister().Services(exportedEndpoints.Namespace).Get(exportedEndpoints.Name)
1197 if err != nil {
1198 return RetryableError{[]error{
1199 fmt.Errorf("error retrieving exported service %s/%s: %w", exportedEndpoints.Namespace, exportedEndpoints.Name, err),
1200 }}
1201 }
1202 gatewayAddresses, err := rcsw.resolveGatewayAddress()
1203 if err != nil {
1204 return err
1205 }
1206 ep.Subsets = []corev1.EndpointSubset{
1207 {
1208 Addresses: gatewayAddresses,
1209 Ports: rcsw.getEndpointsPorts(exportedService),
1210 },
1211 }
1212 }
1213 return rcsw.updateMirrorEndpoints(ctx, ep)
1214 }
1215
1216
1217
1218
1219 func (rcsw *RemoteClusterServiceWatcher) createMirrorEndpoints(ctx context.Context, endpoints *corev1.Endpoints) error {
1220 rcsw.updateReadiness(endpoints)
1221 _, err := rcsw.localAPIClient.Client.CoreV1().Endpoints(endpoints.Namespace).Create(ctx, endpoints, metav1.CreateOptions{})
1222 if err != nil {
1223 return fmt.Errorf("failed to create mirror endpoints for %s/%s: %w", endpoints.Namespace, endpoints.Name, err)
1224 }
1225 return nil
1226 }
1227
1228
1229
1230
1231
1232 func (rcsw *RemoteClusterServiceWatcher) updateMirrorEndpoints(ctx context.Context, endpoints *corev1.Endpoints) error {
1233 rcsw.updateReadiness(endpoints)
1234 _, err := rcsw.localAPIClient.Client.CoreV1().Endpoints(endpoints.Namespace).Update(ctx, endpoints, metav1.UpdateOptions{})
1235 if err != nil {
1236 return fmt.Errorf("failed to update mirror endpoints for %s/%s: %w", endpoints.Namespace, endpoints.Name, err)
1237 }
1238 return err
1239 }
1240
1241 func (rcsw *RemoteClusterServiceWatcher) updateReadiness(endpoints *corev1.Endpoints) {
1242 if !rcsw.gatewayAlive {
1243 rcsw.log.Warnf("gateway for %s/%s does not have ready addresses; setting addresses to not ready", endpoints.Namespace, endpoints.Name)
1244 for i := range endpoints.Subsets {
1245 endpoints.Subsets[i].NotReadyAddresses = append(endpoints.Subsets[i].NotReadyAddresses, endpoints.Subsets[i].Addresses...)
1246 endpoints.Subsets[i].Addresses = nil
1247 }
1248 }
1249 }
1250
1251 func (rcsw *RemoteClusterServiceWatcher) isExported(l map[string]string) bool {
1252
1253
1254 if len(rcsw.link.Selector.MatchExpressions)+len(rcsw.link.Selector.MatchLabels) == 0 {
1255 return false
1256 }
1257 selector, err := metav1.LabelSelectorAsSelector(&rcsw.link.Selector)
1258 if err != nil {
1259 rcsw.log.Errorf("Invalid selector: %s", err)
1260 return false
1261 }
1262 return selector.Matches(labels.Set(l))
1263 }
1264
1265 func (rcsw *RemoteClusterServiceWatcher) isRemoteDiscovery(l map[string]string) bool {
1266
1267
1268
1269 if len(rcsw.link.RemoteDiscoverySelector.MatchExpressions)+len(rcsw.link.RemoteDiscoverySelector.MatchLabels) == 0 {
1270 return false
1271 }
1272 remoteDiscoverySelector, err := metav1.LabelSelectorAsSelector(&rcsw.link.RemoteDiscoverySelector)
1273 if err != nil {
1274 rcsw.log.Errorf("Invalid selector: %s", err)
1275 return false
1276 }
1277
1278 return remoteDiscoverySelector.Matches(labels.Set(l))
1279 }
1280
View as plain text