...

Source file src/github.com/linkerd/linkerd2/multicluster/service-mirror/cluster_watcher.go

Documentation: github.com/linkerd/linkerd2/multicluster/service-mirror

     1  package servicemirror
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"fmt"
     7  	"net"
     8  	"sort"
     9  	"strings"
    10  	"time"
    11  
    12  	"github.com/linkerd/linkerd2/controller/k8s"
    13  	consts "github.com/linkerd/linkerd2/pkg/k8s"
    14  	"github.com/linkerd/linkerd2/pkg/multicluster"
    15  	"github.com/prometheus/client_golang/prometheus"
    16  	logging "github.com/sirupsen/logrus"
    17  	corev1 "k8s.io/api/core/v1"
    18  	kerrors "k8s.io/apimachinery/pkg/api/errors"
    19  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    20  	"k8s.io/apimachinery/pkg/labels"
    21  	"k8s.io/client-go/kubernetes/scheme"
    22  	typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
    23  	"k8s.io/client-go/rest"
    24  	"k8s.io/client-go/tools/cache"
    25  	"k8s.io/client-go/tools/record"
    26  	"k8s.io/client-go/util/workqueue"
    27  )
    28  
    29  const (
    30  	eventTypeSkipped = "ServiceMirroringSkipped"
    31  )
    32  
    33  type (
    34  	// RemoteClusterServiceWatcher is a watcher instantiated for every cluster that is being watched
    35  	// Its main job is to listen to events coming from the remote cluster and react accordingly, keeping
    36  	// the state of the mirrored services in sync. This is achieved by maintaining a SharedInformer
    37  	// on the remote cluster. The basic add/update/delete operations are mapped to a more domain specific
    38  	// events, put onto a work queue and handled by the processing loop. In case processing an event fails
    39  	// it can be requeued up to N times, to ensure that the failure is not due to some temporary network
    40  	// problems or general glitch in the Matrix.
    41  	RemoteClusterServiceWatcher struct {
    42  		serviceMirrorNamespace  string
    43  		link                    *multicluster.Link
    44  		remoteAPIClient         *k8s.API
    45  		localAPIClient          *k8s.API
    46  		stopper                 chan struct{}
    47  		eventBroadcaster        record.EventBroadcaster
    48  		recorder                record.EventRecorder
    49  		log                     *logging.Entry
    50  		eventsQueue             workqueue.RateLimitingInterface
    51  		requeueLimit            int
    52  		repairPeriod            time.Duration
    53  		gatewayAlive            bool
    54  		liveness                chan bool
    55  		headlessServicesEnabled bool
    56  
    57  		informerHandlers
    58  	}
    59  
    60  	informerHandlers struct {
    61  		svcHandler cache.ResourceEventHandlerRegistration
    62  		epHandler  cache.ResourceEventHandlerRegistration
    63  		nsHandler  cache.ResourceEventHandlerRegistration
    64  	}
    65  
    66  	// RemoteServiceCreated is generated whenever a remote service is created Observing
    67  	// this event means that the service in question is not mirrored atm
    68  	RemoteServiceCreated struct {
    69  		service *corev1.Service
    70  	}
    71  
    72  	// RemoteServiceUpdated is generated when we see something about an already
    73  	// mirrored service change on the remote cluster. In that case we need to
    74  	// reconcile. Most importantly we need to keep track of exposed ports
    75  	// and gateway association changes.
    76  	RemoteServiceUpdated struct {
    77  		localService   *corev1.Service
    78  		localEndpoints *corev1.Endpoints
    79  		remoteUpdate   *corev1.Service
    80  	}
    81  
    82  	// RemoteServiceDeleted when a remote service is going away or it is not
    83  	// considered mirrored anymore
    84  	RemoteServiceDeleted struct {
    85  		Name      string
    86  		Namespace string
    87  	}
    88  
    89  	// ClusterUnregistered is issued when this ClusterWatcher is shut down.
    90  	ClusterUnregistered struct{}
    91  
    92  	// OrphanedServicesGcTriggered is a self-triggered event which aims to delete any
    93  	// orphaned services that are no longer on the remote cluster. It is emitted every
    94  	// time a new remote cluster is registered for monitoring. The need for this arises
    95  	// because the following might happen.
    96  	//
    97  	// 1. A cluster is registered for monitoring
    98  	// 2. Services A,B,C are created and mirrored
    99  	// 3. Then this component crashes, leaving the mirrors around
   100  	// 4. In the meantime services B and C are deleted on the remote cluster
   101  	// 5. When the controller starts up again it registers to listen for mirrored services
   102  	// 6. It receives an ADD for A but not a DELETE for B and C
   103  	//
   104  	// This event indicates that we need to make a diff with all services on the remote
   105  	// cluster, ensuring that we do not keep any mirrors that are not relevant anymore
   106  	OrphanedServicesGcTriggered struct{}
   107  
   108  	// OnAddCalled is issued when the onAdd function of the
   109  	// shared informer is called
   110  	OnAddCalled struct {
   111  		svc *corev1.Service
   112  	}
   113  
   114  	// OnAddEndpointsCalled is issued when the onAdd function of the Endpoints
   115  	// shared informer is called
   116  	OnAddEndpointsCalled struct {
   117  		ep *corev1.Endpoints
   118  	}
   119  
   120  	// OnUpdateCalled is issued when the onUpdate function of the
   121  	// shared informer is called
   122  	OnUpdateCalled struct {
   123  		svc *corev1.Service
   124  	}
   125  
   126  	// OnUpdateEndpointsCalled is issued when the onUpdate function of the
   127  	// shared Endpoints informer is called
   128  	OnUpdateEndpointsCalled struct {
   129  		ep *corev1.Endpoints
   130  	}
   131  	// OnDeleteCalled is issued when the onDelete function of the
   132  	// shared informer is called
   133  	OnDeleteCalled struct {
   134  		svc *corev1.Service
   135  	}
   136  
   137  	// RepairEndpoints is issued when the service mirror and mirror gateway
   138  	// endpoints should be resolved based on the remote gateway and updated.
   139  	RepairEndpoints struct{}
   140  
   141  	// OnLocalNamespaceAdded is issued when when a new namespace is added to
   142  	// the local cluster. This means that we should check the remote cluster
   143  	// for exported service in that namespace.
   144  	OnLocalNamespaceAdded struct {
   145  		ns *corev1.Namespace
   146  	}
   147  
   148  	// RetryableError is an error that should be retried through requeuing events
   149  	RetryableError struct{ Inner []error }
   150  )
   151  
   152  func (re RetryableError) Error() string {
   153  	var errorStrings []string
   154  	for _, err := range re.Inner {
   155  		errorStrings = append(errorStrings, err.Error())
   156  	}
   157  	return fmt.Sprintf("Inner errors:\n\t%s", strings.Join(errorStrings, "\n\t"))
   158  }
   159  
   160  // NewRemoteClusterServiceWatcher constructs a new cluster watcher
   161  func NewRemoteClusterServiceWatcher(
   162  	ctx context.Context,
   163  	serviceMirrorNamespace string,
   164  	localAPI *k8s.API,
   165  	cfg *rest.Config,
   166  	link *multicluster.Link,
   167  	requeueLimit int,
   168  	repairPeriod time.Duration,
   169  	liveness chan bool,
   170  	enableHeadlessSvc bool,
   171  ) (*RemoteClusterServiceWatcher, error) {
   172  	remoteAPI, err := k8s.InitializeAPIForConfig(ctx, cfg, false, clusterName, k8s.Svc, k8s.Endpoint)
   173  	if err != nil {
   174  		return nil, fmt.Errorf("cannot initialize api for target cluster %s: %w", clusterName, err)
   175  	}
   176  	_, err = remoteAPI.Client.Discovery().ServerVersion()
   177  	if err != nil {
   178  		remoteAPI.UnregisterGauges()
   179  		return nil, fmt.Errorf("cannot connect to api for target cluster %s: %w", clusterName, err)
   180  	}
   181  
   182  	// Create k8s event recorder
   183  	eventBroadcaster := record.NewBroadcaster()
   184  	eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{
   185  		Interface: remoteAPI.Client.CoreV1().Events(""),
   186  	})
   187  	recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{
   188  		Component: fmt.Sprintf("linkerd-service-mirror-%s", clusterName),
   189  	})
   190  
   191  	stopper := make(chan struct{})
   192  	return &RemoteClusterServiceWatcher{
   193  		serviceMirrorNamespace: serviceMirrorNamespace,
   194  		link:                   link,
   195  		remoteAPIClient:        remoteAPI,
   196  		localAPIClient:         localAPI,
   197  		stopper:                stopper,
   198  		eventBroadcaster:       eventBroadcaster,
   199  		recorder:               recorder,
   200  		log: logging.WithFields(logging.Fields{
   201  			"cluster":    clusterName,
   202  			"apiAddress": cfg.Host,
   203  		}),
   204  		eventsQueue:             workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
   205  		requeueLimit:            requeueLimit,
   206  		repairPeriod:            repairPeriod,
   207  		liveness:                liveness,
   208  		headlessServicesEnabled: enableHeadlessSvc,
   209  		// always instantiate the gatewayAlive=true to prevent unexpected service fail fast
   210  		gatewayAlive: true,
   211  	}, nil
   212  }
   213  
   214  func (rcsw *RemoteClusterServiceWatcher) mirroredResourceName(remoteName string) string {
   215  	return fmt.Sprintf("%s-%s", remoteName, rcsw.link.TargetClusterName)
   216  }
   217  
   218  func (rcsw *RemoteClusterServiceWatcher) targetResourceName(mirrorName string) string {
   219  	return strings.TrimSuffix(mirrorName, "-"+rcsw.link.TargetClusterName)
   220  }
   221  
   222  func (rcsw *RemoteClusterServiceWatcher) originalResourceName(mirroredName string) string {
   223  	return strings.TrimSuffix(mirroredName, fmt.Sprintf("-%s", rcsw.link.TargetClusterName))
   224  }
   225  
   226  // Provides labels for mirrored service.
   227  // "remoteService" is an optional parameter. If provided, copies all labels
   228  // from the remote service to mirrored service (except labels with the
   229  // "SvcMirrorPrefix").
   230  func (rcsw *RemoteClusterServiceWatcher) getMirroredServiceLabels(remoteService *corev1.Service) map[string]string {
   231  	labels := map[string]string{
   232  		consts.MirroredResourceLabel:  "true",
   233  		consts.RemoteClusterNameLabel: rcsw.link.TargetClusterName,
   234  	}
   235  
   236  	if remoteService == nil {
   237  		return labels
   238  	}
   239  
   240  	if rcsw.isRemoteDiscovery(remoteService.Labels) {
   241  		labels[consts.RemoteDiscoveryLabel] = rcsw.link.TargetClusterName
   242  		labels[consts.RemoteServiceLabel] = remoteService.GetName()
   243  	}
   244  
   245  	for key, value := range remoteService.ObjectMeta.Labels {
   246  		if strings.HasPrefix(key, consts.SvcMirrorPrefix) {
   247  			continue
   248  		}
   249  		labels[key] = value
   250  	}
   251  
   252  	return labels
   253  }
   254  
   255  // Provides annotations for mirrored service
   256  func (rcsw *RemoteClusterServiceWatcher) getMirroredServiceAnnotations(remoteService *corev1.Service) map[string]string {
   257  	annotations := map[string]string{
   258  		consts.RemoteResourceVersionAnnotation: remoteService.ResourceVersion, // needed to detect real changes
   259  		consts.RemoteServiceFqName:             fmt.Sprintf("%s.%s.svc.%s", remoteService.Name, remoteService.Namespace, rcsw.link.TargetClusterDomain),
   260  	}
   261  
   262  	for key, value := range remoteService.ObjectMeta.Annotations {
   263  		// Topology aware hints are not multicluster aware.
   264  		if key == "service.kubernetes.io/topology-aware-hints" || key == "service.kubernetes.io/topology-mode" {
   265  			continue
   266  		}
   267  		annotations[key] = value
   268  	}
   269  
   270  	value, ok := remoteService.GetAnnotations()[consts.ProxyOpaquePortsAnnotation]
   271  	if ok {
   272  		annotations[consts.ProxyOpaquePortsAnnotation] = value
   273  	}
   274  
   275  	return annotations
   276  }
   277  
   278  // This method takes care of port remapping. What it does essentially is get the one gateway port
   279  // that we should send traffic to and create endpoint ports that bind to the mirrored service ports
   280  // (same name, etc) but send traffic to the gateway port. This way we do not need to do any remapping
   281  // on the service side of things. It all happens in the endpoints.
   282  func (rcsw *RemoteClusterServiceWatcher) getEndpointsPorts(service *corev1.Service) []corev1.EndpointPort {
   283  	var endpointsPorts []corev1.EndpointPort
   284  	for _, remotePort := range service.Spec.Ports {
   285  		endpointsPorts = append(endpointsPorts, corev1.EndpointPort{
   286  			Name:     remotePort.Name,
   287  			Protocol: remotePort.Protocol,
   288  			Port:     int32(rcsw.link.GatewayPort),
   289  		})
   290  	}
   291  	return endpointsPorts
   292  }
   293  
   294  func (rcsw *RemoteClusterServiceWatcher) cleanupOrphanedServices(ctx context.Context) error {
   295  	matchLabels := map[string]string{
   296  		consts.MirroredResourceLabel:  "true",
   297  		consts.RemoteClusterNameLabel: rcsw.link.TargetClusterName,
   298  	}
   299  
   300  	servicesOnLocalCluster, err := rcsw.localAPIClient.Svc().Lister().List(labels.Set(matchLabels).AsSelector())
   301  	if err != nil {
   302  		innerErr := fmt.Errorf("failed to list services while cleaning up mirror services: %w", err)
   303  		if kerrors.IsNotFound(err) {
   304  			return innerErr
   305  		}
   306  		// if it is something else, we can just retry
   307  		return RetryableError{[]error{innerErr}}
   308  	}
   309  
   310  	var errors []error
   311  	for _, srv := range servicesOnLocalCluster {
   312  		mirroredName := srv.Name
   313  		// For headless services with cluster IPs representing the backing pods, the mirrored service name
   314  		// is the root headless service in the source cluster
   315  		if remoteHeadlessSvcName, headlessMirror := srv.Labels[consts.MirroredHeadlessSvcNameLabel]; headlessMirror {
   316  			mirroredName = remoteHeadlessSvcName
   317  		}
   318  		remoteServiceName := rcsw.originalResourceName(mirroredName)
   319  		_, err := rcsw.remoteAPIClient.Svc().Lister().Services(srv.Namespace).Get(remoteServiceName)
   320  		if err != nil {
   321  			if kerrors.IsNotFound(err) {
   322  				// service does not exist anymore. Need to delete
   323  				if err := rcsw.localAPIClient.Client.CoreV1().Services(srv.Namespace).Delete(ctx, srv.Name, metav1.DeleteOptions{}); err != nil {
   324  					// something went wrong with deletion, we need to retry
   325  					errors = append(errors, err)
   326  				} else {
   327  					rcsw.log.Infof("Deleted service %s/%s while cleaning up mirror services", srv.Namespace, srv.Name)
   328  				}
   329  			} else {
   330  				// something went wrong getting the service, we can retry
   331  				errors = append(errors, err)
   332  			}
   333  		}
   334  	}
   335  	if len(errors) > 0 {
   336  		return RetryableError{errors}
   337  	}
   338  
   339  	return nil
   340  }
   341  
   342  // Whenever we stop watching a cluster, we need to cleanup everything that we have
   343  // created. This piece of code is responsible for doing just that. It takes care of
   344  // services, endpoints and namespaces (if needed)
   345  func (rcsw *RemoteClusterServiceWatcher) cleanupMirroredResources(ctx context.Context) error {
   346  	matchLabels := rcsw.getMirroredServiceLabels(nil)
   347  
   348  	services, err := rcsw.localAPIClient.Svc().Lister().List(labels.Set(matchLabels).AsSelector())
   349  	if err != nil {
   350  		innerErr := fmt.Errorf("could not retrieve mirrored services that need cleaning up: %w", err)
   351  		if kerrors.IsNotFound(err) {
   352  			return innerErr
   353  		}
   354  		// if its not notFound then something else went wrong, so we can retry
   355  		return RetryableError{[]error{innerErr}}
   356  	}
   357  
   358  	var errors []error
   359  	for _, svc := range services {
   360  		if err := rcsw.localAPIClient.Client.CoreV1().Services(svc.Namespace).Delete(ctx, svc.Name, metav1.DeleteOptions{}); err != nil {
   361  			if kerrors.IsNotFound(err) {
   362  				continue
   363  			}
   364  			errors = append(errors, fmt.Errorf("Could not delete service %s/%s: %w", svc.Namespace, svc.Name, err))
   365  		} else {
   366  			rcsw.log.Infof("Deleted service %s/%s", svc.Namespace, svc.Name)
   367  		}
   368  	}
   369  
   370  	endpoints, err := rcsw.localAPIClient.Endpoint().Lister().List(labels.Set(matchLabels).AsSelector())
   371  	if err != nil {
   372  		innerErr := fmt.Errorf("could not retrieve endpoints that need cleaning up: %w", err)
   373  		if kerrors.IsNotFound(err) {
   374  			return innerErr
   375  		}
   376  		return RetryableError{[]error{innerErr}}
   377  	}
   378  
   379  	for _, endpoint := range endpoints {
   380  		if err := rcsw.localAPIClient.Client.CoreV1().Endpoints(endpoint.Namespace).Delete(ctx, endpoint.Name, metav1.DeleteOptions{}); err != nil {
   381  			if kerrors.IsNotFound(err) {
   382  				continue
   383  			}
   384  			errors = append(errors, fmt.Errorf("Could not delete endpoints %s/%s: %w", endpoint.Namespace, endpoint.Name, err))
   385  		} else {
   386  			rcsw.log.Infof("Deleted endpoints %s/%s", endpoint.Namespace, endpoint.Name)
   387  		}
   388  	}
   389  
   390  	if len(errors) > 0 {
   391  		return RetryableError{errors}
   392  	}
   393  	return nil
   394  }
   395  
   396  // Deletes a locally mirrored service as it is not present on the remote cluster anymore
   397  func (rcsw *RemoteClusterServiceWatcher) handleRemoteServiceDeleted(ctx context.Context, ev *RemoteServiceDeleted) error {
   398  	localServiceName := rcsw.mirroredResourceName(ev.Name)
   399  	localService, err := rcsw.localAPIClient.Svc().Lister().Services(ev.Namespace).Get(localServiceName)
   400  	var errors []error
   401  	if err != nil {
   402  		if kerrors.IsNotFound(err) {
   403  			rcsw.log.Debugf("Failed to delete mirror service %s/%s: %v", ev.Namespace, ev.Name, err)
   404  			return nil
   405  		}
   406  		errors = append(errors, fmt.Errorf("could not fetch service %s/%s: %w", ev.Namespace, localServiceName, err))
   407  	}
   408  
   409  	// If the mirror service is headless, also delete its endpoint mirror
   410  	// services.
   411  	if localService.Spec.ClusterIP == corev1.ClusterIPNone {
   412  		matchLabels := map[string]string{
   413  			consts.MirroredHeadlessSvcNameLabel: localServiceName,
   414  		}
   415  		endpointMirrorServices, err := rcsw.localAPIClient.Svc().Lister().List(labels.Set(matchLabels).AsSelector())
   416  		if err != nil {
   417  			if !kerrors.IsNotFound(err) {
   418  				errors = append(errors, fmt.Errorf("could not fetch endpoint mirrors for mirror service %s/%s: %w", ev.Namespace, localServiceName, err))
   419  			}
   420  		}
   421  
   422  		for _, endpointMirror := range endpointMirrorServices {
   423  			err = rcsw.localAPIClient.Client.CoreV1().Services(endpointMirror.Namespace).Delete(ctx, endpointMirror.Name, metav1.DeleteOptions{})
   424  			if err != nil {
   425  				if !kerrors.IsNotFound(err) {
   426  					errors = append(errors, fmt.Errorf("could not delete endpoint mirror %s/%s: %w", endpointMirror.Namespace, endpointMirror.Name, err))
   427  				}
   428  			}
   429  		}
   430  	}
   431  
   432  	rcsw.log.Infof("Deleting mirrored service %s/%s", ev.Namespace, localServiceName)
   433  	if err := rcsw.localAPIClient.Client.CoreV1().Services(ev.Namespace).Delete(ctx, localServiceName, metav1.DeleteOptions{}); err != nil {
   434  		if !kerrors.IsNotFound(err) {
   435  			errors = append(errors, fmt.Errorf("could not delete service: %s/%s: %w", ev.Namespace, localServiceName, err))
   436  		}
   437  	}
   438  
   439  	if len(errors) > 0 {
   440  		return RetryableError{errors}
   441  	}
   442  
   443  	rcsw.log.Infof("Successfully deleted service: %s/%s", ev.Namespace, localServiceName)
   444  	return nil
   445  }
   446  
   447  // Updates a locally mirrored service. There might have been some pretty fundamental changes such as
   448  // new gateway being assigned or additional ports exposed. This method takes care of that.
   449  func (rcsw *RemoteClusterServiceWatcher) handleRemoteServiceUpdated(ctx context.Context, ev *RemoteServiceUpdated) error {
   450  	rcsw.log.Infof("Updating mirror service %s/%s", ev.localService.Namespace, ev.localService.Name)
   451  
   452  	if rcsw.isRemoteDiscovery(ev.remoteUpdate.Labels) {
   453  		// The service is mirrored in remote discovery mode and any local
   454  		// endpoints for it should be deleted if they exist.
   455  		if ev.localEndpoints != nil {
   456  			err := rcsw.localAPIClient.Client.CoreV1().Endpoints(ev.localService.Namespace).Delete(ctx, ev.localService.Name, metav1.DeleteOptions{})
   457  			if err != nil {
   458  				return RetryableError{[]error{
   459  					fmt.Errorf("failed to delete mirror endpoints for %s/%s: %w", ev.localService.Namespace, ev.localService.Name, err),
   460  				}}
   461  			}
   462  		}
   463  	} else if ev.localEndpoints == nil {
   464  		// The service is mirrored in gateway mode and gateway endpoints should
   465  		// be created for it.
   466  		err := rcsw.createGatewayEndpoints(ctx, ev.remoteUpdate)
   467  		if err != nil {
   468  			return err
   469  		}
   470  	} else {
   471  		// The service is mirrored in gateway mode and gateway endpoints already
   472  		// exist for it but may need to be updated.
   473  		gatewayAddresses, err := rcsw.resolveGatewayAddress()
   474  		if err != nil {
   475  			return err
   476  		}
   477  
   478  		copiedEndpoints := ev.localEndpoints.DeepCopy()
   479  		copiedEndpoints.Subsets = []corev1.EndpointSubset{
   480  			{
   481  				Addresses: gatewayAddresses,
   482  				Ports:     rcsw.getEndpointsPorts(ev.remoteUpdate),
   483  			},
   484  		}
   485  
   486  		if copiedEndpoints.Annotations == nil {
   487  			copiedEndpoints.Annotations = make(map[string]string)
   488  		}
   489  		copiedEndpoints.Annotations[consts.RemoteGatewayIdentity] = rcsw.link.GatewayIdentity
   490  
   491  		err = rcsw.updateMirrorEndpoints(ctx, copiedEndpoints)
   492  		if err != nil {
   493  			return RetryableError{[]error{err}}
   494  		}
   495  	}
   496  
   497  	ev.localService.Labels = rcsw.getMirroredServiceLabels(ev.remoteUpdate)
   498  	ev.localService.Annotations = rcsw.getMirroredServiceAnnotations(ev.remoteUpdate)
   499  	ev.localService.Spec.Ports = remapRemoteServicePorts(ev.remoteUpdate.Spec.Ports)
   500  
   501  	if _, err := rcsw.localAPIClient.Client.CoreV1().Services(ev.localService.Namespace).Update(ctx, ev.localService, metav1.UpdateOptions{}); err != nil {
   502  		return RetryableError{[]error{err}}
   503  	}
   504  	return nil
   505  }
   506  
   507  func remapRemoteServicePorts(ports []corev1.ServicePort) []corev1.ServicePort {
   508  	// We ignore the NodePort here as its not relevant
   509  	// to the local cluster
   510  	var newPorts []corev1.ServicePort
   511  	for _, port := range ports {
   512  		newPorts = append(newPorts, corev1.ServicePort{
   513  			Name:       port.Name,
   514  			Protocol:   port.Protocol,
   515  			Port:       port.Port,
   516  			TargetPort: port.TargetPort,
   517  		})
   518  	}
   519  	return newPorts
   520  }
   521  
   522  func (rcsw *RemoteClusterServiceWatcher) handleRemoteServiceCreated(ctx context.Context, ev *RemoteServiceCreated) error {
   523  	remoteService := ev.service.DeepCopy()
   524  	if rcsw.headlessServicesEnabled && remoteService.Spec.ClusterIP == corev1.ClusterIPNone {
   525  		return nil
   526  	}
   527  
   528  	serviceInfo := fmt.Sprintf("%s/%s", remoteService.Namespace, remoteService.Name)
   529  	localServiceName := rcsw.mirroredResourceName(remoteService.Name)
   530  
   531  	// Ensure the namespace exists, and skip mirroring if it doesn't
   532  	if _, err := rcsw.localAPIClient.Client.CoreV1().Namespaces().Get(ctx, remoteService.Namespace, metav1.GetOptions{}); err != nil {
   533  		if kerrors.IsNotFound(err) {
   534  			rcsw.recorder.Event(remoteService, corev1.EventTypeNormal, eventTypeSkipped, "Skipped mirroring service: namespace does not exist")
   535  			rcsw.log.Warnf("Skipping mirroring of service %s: namespace %s does not exist", serviceInfo, remoteService.Namespace)
   536  			return nil
   537  		}
   538  		// something else went wrong, so we can just retry
   539  		return RetryableError{[]error{err}}
   540  	}
   541  
   542  	serviceToCreate := &corev1.Service{
   543  		ObjectMeta: metav1.ObjectMeta{
   544  			Name:        localServiceName,
   545  			Namespace:   remoteService.Namespace,
   546  			Annotations: rcsw.getMirroredServiceAnnotations(remoteService),
   547  			Labels:      rcsw.getMirroredServiceLabels(remoteService),
   548  		},
   549  		Spec: corev1.ServiceSpec{
   550  			Ports: remapRemoteServicePorts(remoteService.Spec.Ports),
   551  		},
   552  	}
   553  
   554  	rcsw.log.Infof("Creating a new service mirror for %s", serviceInfo)
   555  	if _, err := rcsw.localAPIClient.Client.CoreV1().Services(remoteService.Namespace).Create(ctx, serviceToCreate, metav1.CreateOptions{}); err != nil {
   556  		if !kerrors.IsAlreadyExists(err) {
   557  			// we might have created it during earlier attempt, if that is not the case, we retry
   558  			return RetryableError{[]error{err}}
   559  		}
   560  	}
   561  
   562  	if rcsw.isRemoteDiscovery(remoteService.Labels) {
   563  		// For remote discovery services, skip creating gateway endpoints.
   564  		return nil
   565  	}
   566  	return rcsw.createGatewayEndpoints(ctx, remoteService)
   567  }
   568  
   569  func (rcsw *RemoteClusterServiceWatcher) handleLocalNamespaceAdded(ns *corev1.Namespace) error {
   570  	// When a local namespace is added, we issue a create event for all the services in the corresponding namespace in
   571  	// case any of them are exported and need to be mirrored.
   572  	svcs, err := rcsw.remoteAPIClient.Svc().Lister().Services(ns.Name).List(labels.Everything())
   573  	if err != nil {
   574  		return RetryableError{[]error{err}}
   575  	}
   576  	for _, svc := range svcs {
   577  		rcsw.eventsQueue.Add(&OnAddCalled{
   578  			svc: svc,
   579  		})
   580  	}
   581  	return nil
   582  }
   583  
   584  // isEmptyService returns true if any of these conditions are true:
   585  // - svc's Endpoint is not found
   586  // - svc's Endpoint has no Subsets (happens when there's no associated Pod)
   587  // - svc's Endpoint has Subsets, but none have addresses (only notReadyAddresses,
   588  // when the pod is not ready yet)
   589  func (rcsw *RemoteClusterServiceWatcher) isEmptyService(svc *corev1.Service) (bool, error) {
   590  	ep, err := rcsw.remoteAPIClient.Endpoint().Lister().Endpoints(svc.Namespace).Get(svc.Name)
   591  	if err != nil {
   592  		if kerrors.IsNotFound(err) {
   593  			rcsw.log.Debugf("target endpoint %s/%s not found", svc.Namespace, svc.Name)
   594  			return true, nil
   595  		}
   596  
   597  		return true, err
   598  	}
   599  	return rcsw.isEmptyEndpoints(ep), nil
   600  }
   601  
   602  // isEmptyEndpoints returns true if any of these conditions are true:
   603  // - The Endpoint is not found
   604  // - The Endpoint has no Subsets (happens when there's no associated Pod)
   605  // - The Endpoint has Subsets, but none have addresses (only notReadyAddresses,
   606  // when the pod is not ready yet)
   607  func (rcsw *RemoteClusterServiceWatcher) isEmptyEndpoints(ep *corev1.Endpoints) bool {
   608  	if len(ep.Subsets) == 0 {
   609  		rcsw.log.Debugf("endpoint %s/%s has no Subsets", ep.Namespace, ep.Name)
   610  		return true
   611  	}
   612  	for _, subset := range ep.Subsets {
   613  		if len(subset.Addresses) > 0 {
   614  			return false
   615  		}
   616  	}
   617  	rcsw.log.Debugf("endpoint %s/%s has no ready addresses", ep.Namespace, ep.Name)
   618  	return true
   619  }
   620  
   621  func (rcsw *RemoteClusterServiceWatcher) createGatewayEndpoints(ctx context.Context, exportedService *corev1.Service) error {
   622  	empty, err := rcsw.isEmptyService(exportedService)
   623  	if err != nil {
   624  		return RetryableError{[]error{err}}
   625  	}
   626  
   627  	gatewayAddresses, err := rcsw.resolveGatewayAddress()
   628  	if err != nil {
   629  		return err
   630  	}
   631  
   632  	localServiceName := rcsw.mirroredResourceName(exportedService.Name)
   633  	serviceInfo := fmt.Sprintf("%s/%s", exportedService.Namespace, exportedService.Name)
   634  	endpointsToCreate := &corev1.Endpoints{
   635  		ObjectMeta: metav1.ObjectMeta{
   636  			Name:      localServiceName,
   637  			Namespace: exportedService.Namespace,
   638  			Labels: map[string]string{
   639  				consts.MirroredResourceLabel:  "true",
   640  				consts.RemoteClusterNameLabel: rcsw.link.TargetClusterName,
   641  			},
   642  			Annotations: map[string]string{
   643  				consts.RemoteServiceFqName: fmt.Sprintf("%s.%s.svc.%s", exportedService.Name, exportedService.Namespace, rcsw.link.TargetClusterDomain),
   644  			},
   645  		},
   646  	}
   647  
   648  	rcsw.log.Infof("Resolved gateway [%v:%d] for %s", gatewayAddresses, rcsw.link.GatewayPort, serviceInfo)
   649  
   650  	if !empty && len(gatewayAddresses) > 0 {
   651  		endpointsToCreate.Subsets = []corev1.EndpointSubset{
   652  			{
   653  				Addresses: gatewayAddresses,
   654  				Ports:     rcsw.getEndpointsPorts(exportedService),
   655  			},
   656  		}
   657  	} else if !empty {
   658  		endpointsToCreate.Subsets = []corev1.EndpointSubset{
   659  			{
   660  				NotReadyAddresses: gatewayAddresses,
   661  				Ports:             rcsw.getEndpointsPorts(exportedService),
   662  			},
   663  		}
   664  		rcsw.log.Warnf("could not resolve gateway addresses for %s; setting endpoint subsets to not ready", serviceInfo)
   665  	} else {
   666  		rcsw.log.Warnf("exported service %s is empty", serviceInfo)
   667  	}
   668  
   669  	if rcsw.link.GatewayIdentity != "" {
   670  		endpointsToCreate.Annotations[consts.RemoteGatewayIdentity] = rcsw.link.GatewayIdentity
   671  	}
   672  
   673  	rcsw.log.Infof("Creating a new endpoints for %s", serviceInfo)
   674  	err = rcsw.createMirrorEndpoints(ctx, endpointsToCreate)
   675  	if err != nil {
   676  		if svcErr := rcsw.localAPIClient.Client.CoreV1().Services(exportedService.Namespace).Delete(ctx, localServiceName, metav1.DeleteOptions{}); svcErr != nil {
   677  			rcsw.log.Errorf("Failed to delete service %s after endpoints creation failed: %s", localServiceName, svcErr)
   678  		}
   679  		return RetryableError{[]error{err}}
   680  	}
   681  	return nil
   682  }
   683  
   684  // this method is common to both CREATE and UPDATE because if we have been
   685  // offline for some time due to a crash a CREATE for a service that we have
   686  // observed before is simply a case of UPDATE
   687  func (rcsw *RemoteClusterServiceWatcher) createOrUpdateService(service *corev1.Service) error {
   688  	localName := rcsw.mirroredResourceName(service.Name)
   689  
   690  	if rcsw.isExported(service.Labels) || rcsw.isRemoteDiscovery(service.Labels) {
   691  		localService, err := rcsw.localAPIClient.Svc().Lister().Services(service.Namespace).Get(localName)
   692  		if err != nil {
   693  			if kerrors.IsNotFound(err) {
   694  				rcsw.eventsQueue.Add(&RemoteServiceCreated{
   695  					service: service,
   696  				})
   697  				return nil
   698  			}
   699  			return RetryableError{[]error{err}}
   700  		}
   701  		// if we have the local service present, we need to issue an update
   702  		lastMirroredRemoteVersion, ok := localService.Annotations[consts.RemoteResourceVersionAnnotation]
   703  		if ok && lastMirroredRemoteVersion != service.ResourceVersion {
   704  			endpoints, err := rcsw.localAPIClient.Endpoint().Lister().Endpoints(service.Namespace).Get(localName)
   705  			if err != nil {
   706  				if kerrors.IsNotFound(err) {
   707  					endpoints = nil
   708  				} else {
   709  					return RetryableError{[]error{err}}
   710  				}
   711  			}
   712  			rcsw.eventsQueue.Add(&RemoteServiceUpdated{
   713  				localService:   localService,
   714  				localEndpoints: endpoints,
   715  				remoteUpdate:   service,
   716  			})
   717  			return nil
   718  		}
   719  
   720  		return nil
   721  	}
   722  	localSvc, err := rcsw.localAPIClient.Svc().Lister().Services(service.Namespace).Get(localName)
   723  	if err == nil {
   724  		if localSvc.Labels != nil {
   725  			_, isMirroredRes := localSvc.Labels[consts.MirroredResourceLabel]
   726  			clusterName := localSvc.Labels[consts.RemoteClusterNameLabel]
   727  			if isMirroredRes && (clusterName == rcsw.link.TargetClusterName) {
   728  				rcsw.eventsQueue.Add(&RemoteServiceDeleted{
   729  					Name:      service.Name,
   730  					Namespace: service.Namespace,
   731  				})
   732  			}
   733  		}
   734  	}
   735  	return nil
   736  }
   737  
   738  func (rcsw *RemoteClusterServiceWatcher) getMirrorServices() (*corev1.ServiceList, error) {
   739  	matchLabels := map[string]string{
   740  		consts.MirroredResourceLabel:  "true",
   741  		consts.RemoteClusterNameLabel: rcsw.link.TargetClusterName,
   742  	}
   743  	services, err := rcsw.localAPIClient.Client.CoreV1().Services("").List(context.Background(), metav1.ListOptions{LabelSelector: labels.SelectorFromSet(matchLabels).String()})
   744  	if err != nil {
   745  		return nil, err
   746  	}
   747  	return services, nil
   748  }
   749  
   750  func (rcsw *RemoteClusterServiceWatcher) handleOnDelete(service *corev1.Service) {
   751  	if rcsw.isExported(service.Labels) || rcsw.isRemoteDiscovery(service.Labels) {
   752  		rcsw.eventsQueue.Add(&RemoteServiceDeleted{
   753  			Name:      service.Name,
   754  			Namespace: service.Namespace,
   755  		})
   756  	} else {
   757  		rcsw.log.Infof("Skipping OnDelete for service %s", service)
   758  	}
   759  }
   760  
   761  func (rcsw *RemoteClusterServiceWatcher) processNextEvent(ctx context.Context) (bool, interface{}, error) {
   762  	event, done := rcsw.eventsQueue.Get()
   763  	if event != nil {
   764  		rcsw.log.Infof("Received: %s", event)
   765  	} else if done {
   766  		rcsw.log.Infof("Received: Stop")
   767  	}
   768  
   769  	var err error
   770  	switch ev := event.(type) {
   771  	case *OnAddCalled:
   772  		err = rcsw.createOrUpdateService(ev.svc)
   773  	case *OnAddEndpointsCalled:
   774  		err = rcsw.handleCreateOrUpdateEndpoints(ctx, ev.ep)
   775  	case *OnUpdateCalled:
   776  		err = rcsw.createOrUpdateService(ev.svc)
   777  	case *OnUpdateEndpointsCalled:
   778  		err = rcsw.handleCreateOrUpdateEndpoints(ctx, ev.ep)
   779  	case *OnDeleteCalled:
   780  		rcsw.handleOnDelete(ev.svc)
   781  	case *RemoteServiceCreated:
   782  		err = rcsw.handleRemoteServiceCreated(ctx, ev)
   783  	case *RemoteServiceUpdated:
   784  		err = rcsw.handleRemoteServiceUpdated(ctx, ev)
   785  	case *RemoteServiceDeleted:
   786  		err = rcsw.handleRemoteServiceDeleted(ctx, ev)
   787  	case *ClusterUnregistered:
   788  		err = rcsw.cleanupMirroredResources(ctx)
   789  	case *OrphanedServicesGcTriggered:
   790  		err = rcsw.cleanupOrphanedServices(ctx)
   791  	case *RepairEndpoints:
   792  		err = rcsw.repairEndpoints(ctx)
   793  	case *OnLocalNamespaceAdded:
   794  		err = rcsw.handleLocalNamespaceAdded(ev.ns)
   795  	default:
   796  		if ev != nil || !done { // we get a nil in case we are shutting down...
   797  			rcsw.log.Warnf("Received unknown event: %v", ev)
   798  		}
   799  	}
   800  
   801  	return done, event, err
   802  
   803  }
   804  
   805  // the main processing loop in which we handle more domain specific events
   806  // and deal with retries
   807  func (rcsw *RemoteClusterServiceWatcher) processEvents(ctx context.Context) {
   808  	for {
   809  		done, event, err := rcsw.processNextEvent(ctx)
   810  		rcsw.eventsQueue.Done(event)
   811  		// the logic here is that there might have been an API
   812  		// connectivity glitch or something. So its not a bad idea to requeue
   813  		// the event and try again up to a number of limits, just to ensure
   814  		// that we are not diverging in states due to bad luck...
   815  		if err == nil {
   816  			rcsw.eventsQueue.Forget(event)
   817  		} else {
   818  			var re RetryableError
   819  			if errors.As(err, &re) {
   820  				rcsw.log.Warnf("Requeues: %d, Limit: %d for event %s", rcsw.eventsQueue.NumRequeues(event), rcsw.requeueLimit, event)
   821  				if (rcsw.eventsQueue.NumRequeues(event) < rcsw.requeueLimit) && !done {
   822  					rcsw.log.Errorf("Error processing %s (will retry): %s", event, re)
   823  					rcsw.eventsQueue.AddRateLimited(event)
   824  				} else {
   825  					rcsw.log.Errorf("Error processing %s (giving up): %s", event, re)
   826  					rcsw.eventsQueue.Forget(event)
   827  				}
   828  			} else {
   829  				rcsw.log.Errorf("Error processing %s (will not retry): %s", event, err)
   830  			}
   831  		}
   832  		if done {
   833  			rcsw.log.Infof("Shutting down events processor")
   834  			return
   835  		}
   836  	}
   837  }
   838  
   839  // Start starts watching the remote cluster
   840  func (rcsw *RemoteClusterServiceWatcher) Start(ctx context.Context) error {
   841  	rcsw.remoteAPIClient.Sync(rcsw.stopper)
   842  	rcsw.eventsQueue.Add(&OrphanedServicesGcTriggered{})
   843  	var err error
   844  	rcsw.svcHandler, err = rcsw.remoteAPIClient.Svc().Informer().AddEventHandler(
   845  		cache.ResourceEventHandlerFuncs{
   846  			AddFunc: func(svc interface{}) {
   847  				rcsw.eventsQueue.Add(&OnAddCalled{svc.(*corev1.Service)})
   848  			},
   849  			DeleteFunc: func(obj interface{}) {
   850  				service, ok := obj.(*corev1.Service)
   851  				if !ok {
   852  					tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
   853  					if !ok {
   854  						rcsw.log.Errorf("couldn't get object from DeletedFinalStateUnknown %#v", obj)
   855  						return
   856  					}
   857  					service, ok = tombstone.Obj.(*corev1.Service)
   858  					if !ok {
   859  						rcsw.log.Errorf("DeletedFinalStateUnknown contained object that is not a Service %#v", obj)
   860  						return
   861  					}
   862  				}
   863  				rcsw.eventsQueue.Add(&OnDeleteCalled{service})
   864  			},
   865  			UpdateFunc: func(_, new interface{}) {
   866  				rcsw.eventsQueue.Add(&OnUpdateCalled{new.(*corev1.Service)})
   867  			},
   868  		},
   869  	)
   870  	if err != nil {
   871  		return err
   872  	}
   873  
   874  	rcsw.epHandler, err = rcsw.remoteAPIClient.Endpoint().Informer().AddEventHandler(
   875  		cache.ResourceEventHandlerFuncs{
   876  			// AddFunc only relevant for exported headless endpoints
   877  			AddFunc: func(obj interface{}) {
   878  				ep, ok := obj.(*corev1.Endpoints)
   879  				if !ok {
   880  					rcsw.log.Errorf("error processing endpoints object: got %#v, expected *corev1.Endpoints", ep)
   881  					return
   882  				}
   883  
   884  				if !rcsw.isExported(ep.Labels) {
   885  					rcsw.log.Debugf("skipped processing endpoints object %s/%s: missing %s label", ep.Namespace, ep.Name, consts.DefaultExportedServiceSelector)
   886  					return
   887  				}
   888  
   889  				if !isHeadlessEndpoints(ep, rcsw.log) {
   890  					return
   891  				}
   892  
   893  				rcsw.eventsQueue.Add(&OnAddEndpointsCalled{obj.(*corev1.Endpoints)})
   894  			},
   895  			// AddFunc relevant for all kind of exported endpoints
   896  			UpdateFunc: func(_, new interface{}) {
   897  				epNew, ok := new.(*corev1.Endpoints)
   898  				if !ok {
   899  					rcsw.log.Errorf("error processing endpoints object: got %#v, expected *corev1.Endpoints", epNew)
   900  					return
   901  				}
   902  				if !rcsw.isExported(epNew.Labels) {
   903  					rcsw.log.Debugf("skipped processing endpoints object %s/%s: missing %s label", epNew.Namespace, epNew.Name, consts.DefaultExportedServiceSelector)
   904  					return
   905  				}
   906  				if rcsw.isRemoteDiscovery(epNew.Labels) {
   907  					rcsw.log.Debugf("skipped processing endpoints object %s/%s (service labeled for remote-discovery mode)", epNew.Namespace, epNew.Name)
   908  					return
   909  				}
   910  				rcsw.eventsQueue.Add(&OnUpdateEndpointsCalled{epNew})
   911  			},
   912  		},
   913  	)
   914  	if err != nil {
   915  		return err
   916  	}
   917  
   918  	rcsw.nsHandler, err = rcsw.localAPIClient.NS().Informer().AddEventHandler(
   919  		cache.ResourceEventHandlerFuncs{
   920  			AddFunc: func(obj interface{}) {
   921  				rcsw.eventsQueue.Add(&OnLocalNamespaceAdded{obj.(*corev1.Namespace)})
   922  			},
   923  		},
   924  	)
   925  	if err != nil {
   926  		return err
   927  	}
   928  
   929  	go rcsw.processEvents(ctx)
   930  
   931  	// If no gateway address is present, do not repair endpoints
   932  	if rcsw.link.GatewayAddress == "" {
   933  		return nil
   934  	}
   935  
   936  	// We need to issue a RepairEndpoints immediately to populate the gateway
   937  	// mirror endpoints.
   938  	ev := RepairEndpoints{}
   939  	rcsw.eventsQueue.Add(&ev)
   940  
   941  	go func() {
   942  		ticker := time.NewTicker(rcsw.repairPeriod)
   943  		for {
   944  			select {
   945  			case <-ticker.C:
   946  				ev := RepairEndpoints{}
   947  				rcsw.eventsQueue.Add(&ev)
   948  			case alive := <-rcsw.liveness:
   949  				rcsw.log.Debugf("gateway liveness change from %t to %t", rcsw.gatewayAlive, alive)
   950  				rcsw.gatewayAlive = alive
   951  				ev := RepairEndpoints{}
   952  				rcsw.eventsQueue.Add(&ev)
   953  			case <-rcsw.stopper:
   954  				return
   955  			}
   956  		}
   957  	}()
   958  
   959  	return nil
   960  }
   961  
   962  // Stop stops watching the cluster and cleans up all mirrored resources
   963  func (rcsw *RemoteClusterServiceWatcher) Stop(cleanupState bool) {
   964  	close(rcsw.stopper)
   965  	if cleanupState {
   966  		rcsw.eventsQueue.Add(&ClusterUnregistered{})
   967  	}
   968  	rcsw.eventsQueue.ShutDown()
   969  	rcsw.eventBroadcaster.Shutdown()
   970  
   971  	if rcsw.svcHandler != nil {
   972  		if err := rcsw.remoteAPIClient.Svc().Informer().RemoveEventHandler(rcsw.svcHandler); err != nil {
   973  			rcsw.log.Warnf("error removing service informer handler: %s", err)
   974  		}
   975  	}
   976  	if rcsw.epHandler != nil {
   977  		if err := rcsw.remoteAPIClient.Endpoint().Informer().RemoveEventHandler(rcsw.epHandler); err != nil {
   978  			rcsw.log.Warnf("error removing service informer handler: %s", err)
   979  		}
   980  	}
   981  	if rcsw.nsHandler != nil {
   982  		if err := rcsw.localAPIClient.NS().Informer().RemoveEventHandler(rcsw.nsHandler); err != nil {
   983  			rcsw.log.Warnf("error removing service informer handler: %s", err)
   984  		}
   985  	}
   986  
   987  	if rcsw.remoteAPIClient != nil {
   988  		rcsw.remoteAPIClient.UnregisterGauges()
   989  	}
   990  }
   991  
   992  func (rcsw *RemoteClusterServiceWatcher) resolveGatewayAddress() ([]corev1.EndpointAddress, error) {
   993  	var gatewayEndpoints []corev1.EndpointAddress
   994  	var errors []error
   995  	for _, addr := range strings.Split(rcsw.link.GatewayAddress, ",") {
   996  		ipAddrs, err := net.LookupIP(addr)
   997  		if err != nil {
   998  			err = fmt.Errorf("Error resolving '%s': %w", addr, err)
   999  			rcsw.log.Warn(err)
  1000  			errors = append(errors, err)
  1001  			continue
  1002  		}
  1003  
  1004  		for _, ipAddr := range ipAddrs {
  1005  			gatewayEndpoints = append(gatewayEndpoints, corev1.EndpointAddress{
  1006  				IP: ipAddr.String(),
  1007  			})
  1008  		}
  1009  	}
  1010  
  1011  	if len(gatewayEndpoints) == 0 {
  1012  		return nil, RetryableError{errors}
  1013  	}
  1014  
  1015  	sort.SliceStable(gatewayEndpoints, func(i, j int) bool {
  1016  		return gatewayEndpoints[i].IP < gatewayEndpoints[j].IP
  1017  	})
  1018  	return gatewayEndpoints, nil
  1019  }
  1020  
  1021  func (rcsw *RemoteClusterServiceWatcher) repairEndpoints(ctx context.Context) error {
  1022  	endpointRepairCounter.With(prometheus.Labels{
  1023  		gatewayClusterName: rcsw.link.TargetClusterName,
  1024  	}).Inc()
  1025  
  1026  	// Create or update the gateway mirror endpoints responsible for driving
  1027  	// the cluster watcher's gateway liveness status.
  1028  	gatewayAddresses, err := rcsw.resolveGatewayAddress()
  1029  	if err != nil {
  1030  		return err
  1031  	}
  1032  	err = rcsw.createOrUpdateGatewayEndpoints(ctx, gatewayAddresses)
  1033  	if err != nil {
  1034  		rcsw.log.Errorf("Failed to create/update gateway mirror endpoints: %s", err)
  1035  	}
  1036  
  1037  	// Repair mirror service endpoints.
  1038  	mirrorServices, err := rcsw.getMirrorServices()
  1039  	if err != nil {
  1040  		return RetryableError{[]error{fmt.Errorf("Failed to list mirror services: %w", err)}}
  1041  	}
  1042  	for _, svc := range mirrorServices.Items {
  1043  		svc := svc
  1044  
  1045  		// Mirrors for headless services are also headless, and their
  1046  		// Endpoints point to auxiliary services instead of pointing to
  1047  		// the gateway, so they're skipped.
  1048  		if svc.Spec.ClusterIP == corev1.ClusterIPNone {
  1049  			rcsw.log.Debugf("Skipped repairing endpoints for headless mirror %s/%s", svc.Namespace, svc.Name)
  1050  			continue
  1051  		}
  1052  
  1053  		if _, ok := svc.Labels[consts.RemoteDiscoveryLabel]; ok {
  1054  			rcsw.log.Debugf("Skipped repairing endpoints for service in remote-discovery mode %s/%s", svc.Namespace, svc.Name)
  1055  			continue
  1056  		}
  1057  
  1058  		endpoints, err := rcsw.localAPIClient.Endpoint().Lister().Endpoints(svc.Namespace).Get(svc.Name)
  1059  		if err != nil {
  1060  			if !kerrors.IsNotFound(err) {
  1061  				rcsw.log.Errorf("Failed to list local endpoints: %s", err)
  1062  				continue
  1063  			}
  1064  			endpoints, err = rcsw.localAPIClient.Client.CoreV1().Endpoints(svc.Namespace).Get(ctx, svc.Name, metav1.GetOptions{})
  1065  			if err != nil {
  1066  				rcsw.log.Errorf("Failed to get local endpoints %s/%s: %s", svc.Namespace, svc.Name, err)
  1067  				continue
  1068  			}
  1069  		}
  1070  		updatedEndpoints := endpoints.DeepCopy()
  1071  		updatedEndpoints.Subsets = []corev1.EndpointSubset{
  1072  			{
  1073  				Addresses: gatewayAddresses,
  1074  				Ports:     rcsw.getEndpointsPorts(&svc),
  1075  			},
  1076  		}
  1077  
  1078  		// We want to skip this service empty check for auxiliary services --
  1079  		// services which are not headless but do belong to a headless
  1080  		// mirrored service. This is because they do not have a corresponding
  1081  		// endpoint on the target cluster, only a pod. If we attempt to find
  1082  		// endpoints for services like this, they'll always be set to empty.
  1083  		if _, found := svc.Labels[consts.MirroredHeadlessSvcNameLabel]; !found {
  1084  			targetService := svc.DeepCopy()
  1085  			targetService.Name = rcsw.targetResourceName(svc.Name)
  1086  			empty, err := rcsw.isEmptyService(targetService)
  1087  			if err != nil {
  1088  				rcsw.log.Errorf("Could not check service emptiness: %s", err)
  1089  				continue
  1090  			}
  1091  			if empty {
  1092  				rcsw.log.Warnf("Exported service %s/%s is empty", targetService.Namespace, targetService.Name)
  1093  				updatedEndpoints.Subsets = []corev1.EndpointSubset{}
  1094  			}
  1095  		}
  1096  
  1097  		if updatedEndpoints.Annotations == nil {
  1098  			updatedEndpoints.Annotations = make(map[string]string)
  1099  		}
  1100  		updatedEndpoints.Annotations[consts.RemoteGatewayIdentity] = rcsw.link.GatewayIdentity
  1101  
  1102  		err = rcsw.updateMirrorEndpoints(ctx, updatedEndpoints)
  1103  		if err != nil {
  1104  			rcsw.log.Error(err)
  1105  		}
  1106  	}
  1107  
  1108  	return nil
  1109  }
  1110  
  1111  // createOrUpdateGatewayEndpoints will create or update the gateway mirror
  1112  // endpoints for a remote cluster. These endpoints are required for the probe
  1113  // worker responsible for probing gateway liveness, so these endpoints are
  1114  // never in a not ready state.
  1115  func (rcsw *RemoteClusterServiceWatcher) createOrUpdateGatewayEndpoints(ctx context.Context, addressses []corev1.EndpointAddress) error {
  1116  	gatewayMirrorName := fmt.Sprintf("probe-gateway-%s", rcsw.link.TargetClusterName)
  1117  	endpoints := &corev1.Endpoints{
  1118  		ObjectMeta: metav1.ObjectMeta{
  1119  			Name:      gatewayMirrorName,
  1120  			Namespace: rcsw.serviceMirrorNamespace,
  1121  			Labels: map[string]string{
  1122  				consts.RemoteClusterNameLabel: rcsw.link.TargetClusterName,
  1123  			},
  1124  			Annotations: map[string]string{
  1125  				consts.RemoteGatewayIdentity: rcsw.link.GatewayIdentity,
  1126  			},
  1127  		},
  1128  		Subsets: []corev1.EndpointSubset{
  1129  			{
  1130  				Addresses: addressses,
  1131  				Ports: []corev1.EndpointPort{
  1132  					{
  1133  						Name:     "mc-probe",
  1134  						Port:     int32(rcsw.link.ProbeSpec.Port),
  1135  						Protocol: "TCP",
  1136  					},
  1137  				},
  1138  			},
  1139  		},
  1140  	}
  1141  	_, err := rcsw.localAPIClient.Client.CoreV1().Endpoints(endpoints.Namespace).Get(ctx, endpoints.Name, metav1.GetOptions{})
  1142  	if err != nil {
  1143  		if !kerrors.IsNotFound(err) {
  1144  			return err
  1145  		}
  1146  
  1147  		// Mirror endpoints for the gateway do not exist so they need to be
  1148  		// created. As mentioned above, these endpoints are required for the
  1149  		// probe worker and therefore should never be put in a not ready
  1150  		// state.
  1151  		_, err = rcsw.localAPIClient.Client.CoreV1().Endpoints(endpoints.Namespace).Create(ctx, endpoints, metav1.CreateOptions{})
  1152  		if err != nil {
  1153  			return err
  1154  		}
  1155  		return nil
  1156  	}
  1157  
  1158  	// Mirror endpoints for the gateway already exist so they need to be
  1159  	// updated. As mentioned above, these endpoints are required for the probe
  1160  	// worker and therefore should never be put in a not ready state.
  1161  	_, err = rcsw.localAPIClient.Client.CoreV1().Endpoints(endpoints.Namespace).Update(ctx, endpoints, metav1.UpdateOptions{})
  1162  	return err
  1163  }
  1164  
  1165  // handleCreateOrUpdateEndpoints forwards the call to
  1166  // createOrUpdateHeadlessEndpoints when adding/updating exported headless
  1167  // endpoints. Otherwise, it handles updates to endpoints to check if they've
  1168  // become empty/filled since their creation, in order to empty/fill the
  1169  // mirrored endpoints as well
  1170  func (rcsw *RemoteClusterServiceWatcher) handleCreateOrUpdateEndpoints(
  1171  	ctx context.Context,
  1172  	exportedEndpoints *corev1.Endpoints,
  1173  ) error {
  1174  	if isHeadlessEndpoints(exportedEndpoints, rcsw.log) {
  1175  		if rcsw.headlessServicesEnabled {
  1176  			return rcsw.createOrUpdateHeadlessEndpoints(ctx, exportedEndpoints)
  1177  		}
  1178  		return nil
  1179  	}
  1180  
  1181  	localServiceName := rcsw.mirroredResourceName(exportedEndpoints.Name)
  1182  	ep, err := rcsw.localAPIClient.Endpoint().Lister().Endpoints(exportedEndpoints.Namespace).Get(localServiceName)
  1183  	if err != nil {
  1184  		return RetryableError{[]error{err}}
  1185  	}
  1186  
  1187  	if (rcsw.isEmptyEndpoints(ep) && rcsw.isEmptyEndpoints(exportedEndpoints)) ||
  1188  		(!rcsw.isEmptyEndpoints(ep) && !rcsw.isEmptyEndpoints(exportedEndpoints)) {
  1189  		return nil
  1190  	}
  1191  
  1192  	rcsw.log.Infof("Updating subsets for mirror endpoint %s/%s", exportedEndpoints.Namespace, exportedEndpoints.Name)
  1193  	if rcsw.isEmptyEndpoints(exportedEndpoints) {
  1194  		ep.Subsets = []corev1.EndpointSubset{}
  1195  	} else {
  1196  		exportedService, err := rcsw.remoteAPIClient.Svc().Lister().Services(exportedEndpoints.Namespace).Get(exportedEndpoints.Name)
  1197  		if err != nil {
  1198  			return RetryableError{[]error{
  1199  				fmt.Errorf("error retrieving exported service %s/%s: %w", exportedEndpoints.Namespace, exportedEndpoints.Name, err),
  1200  			}}
  1201  		}
  1202  		gatewayAddresses, err := rcsw.resolveGatewayAddress()
  1203  		if err != nil {
  1204  			return err
  1205  		}
  1206  		ep.Subsets = []corev1.EndpointSubset{
  1207  			{
  1208  				Addresses: gatewayAddresses,
  1209  				Ports:     rcsw.getEndpointsPorts(exportedService),
  1210  			},
  1211  		}
  1212  	}
  1213  	return rcsw.updateMirrorEndpoints(ctx, ep)
  1214  }
  1215  
  1216  // createMirrorEndpoints will create endpoints based off gateway liveness. If
  1217  // the gateway is not alive, then the addresses in each subset will be set to
  1218  // not ready.
  1219  func (rcsw *RemoteClusterServiceWatcher) createMirrorEndpoints(ctx context.Context, endpoints *corev1.Endpoints) error {
  1220  	rcsw.updateReadiness(endpoints)
  1221  	_, err := rcsw.localAPIClient.Client.CoreV1().Endpoints(endpoints.Namespace).Create(ctx, endpoints, metav1.CreateOptions{})
  1222  	if err != nil {
  1223  		return fmt.Errorf("failed to create mirror endpoints for %s/%s: %w", endpoints.Namespace, endpoints.Name, err)
  1224  	}
  1225  	return nil
  1226  }
  1227  
  1228  // updateMirrorEndpoints will update endpoints based off gateway liveness. If
  1229  // the gateway is not alive, then the addresses in each subset will be set to
  1230  // not ready. Future calls to updateMirrorEndpoints can set the addresses back
  1231  // to ready if the gateway is alive.
  1232  func (rcsw *RemoteClusterServiceWatcher) updateMirrorEndpoints(ctx context.Context, endpoints *corev1.Endpoints) error {
  1233  	rcsw.updateReadiness(endpoints)
  1234  	_, err := rcsw.localAPIClient.Client.CoreV1().Endpoints(endpoints.Namespace).Update(ctx, endpoints, metav1.UpdateOptions{})
  1235  	if err != nil {
  1236  		return fmt.Errorf("failed to update mirror endpoints for %s/%s: %w", endpoints.Namespace, endpoints.Name, err)
  1237  	}
  1238  	return err
  1239  }
  1240  
  1241  func (rcsw *RemoteClusterServiceWatcher) updateReadiness(endpoints *corev1.Endpoints) {
  1242  	if !rcsw.gatewayAlive {
  1243  		rcsw.log.Warnf("gateway for %s/%s does not have ready addresses; setting addresses to not ready", endpoints.Namespace, endpoints.Name)
  1244  		for i := range endpoints.Subsets {
  1245  			endpoints.Subsets[i].NotReadyAddresses = append(endpoints.Subsets[i].NotReadyAddresses, endpoints.Subsets[i].Addresses...)
  1246  			endpoints.Subsets[i].Addresses = nil
  1247  		}
  1248  	}
  1249  }
  1250  
  1251  func (rcsw *RemoteClusterServiceWatcher) isExported(l map[string]string) bool {
  1252  	// Treat an empty selector as "Nothing" instead of "Everything" so that
  1253  	// when the selector field is unset, we don't export all Services.
  1254  	if len(rcsw.link.Selector.MatchExpressions)+len(rcsw.link.Selector.MatchLabels) == 0 {
  1255  		return false
  1256  	}
  1257  	selector, err := metav1.LabelSelectorAsSelector(&rcsw.link.Selector)
  1258  	if err != nil {
  1259  		rcsw.log.Errorf("Invalid selector: %s", err)
  1260  		return false
  1261  	}
  1262  	return selector.Matches(labels.Set(l))
  1263  }
  1264  
  1265  func (rcsw *RemoteClusterServiceWatcher) isRemoteDiscovery(l map[string]string) bool {
  1266  	// Treat an empty remoteDiscoverySelector as "Nothing" instead of
  1267  	// "Everything" so that when the remoteDiscoverySelector field is unset, we
  1268  	// don't export all Services.
  1269  	if len(rcsw.link.RemoteDiscoverySelector.MatchExpressions)+len(rcsw.link.RemoteDiscoverySelector.MatchLabels) == 0 {
  1270  		return false
  1271  	}
  1272  	remoteDiscoverySelector, err := metav1.LabelSelectorAsSelector(&rcsw.link.RemoteDiscoverySelector)
  1273  	if err != nil {
  1274  		rcsw.log.Errorf("Invalid selector: %s", err)
  1275  		return false
  1276  	}
  1277  
  1278  	return remoteDiscoverySelector.Matches(labels.Set(l))
  1279  }
  1280  

View as plain text