...

Source file src/github.com/linkerd/linkerd2/multicluster/service-mirror/cluster_watcher_headless.go

Documentation: github.com/linkerd/linkerd2/multicluster/service-mirror

     1  package servicemirror
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  
     7  	consts "github.com/linkerd/linkerd2/pkg/k8s"
     8  	logging "github.com/sirupsen/logrus"
     9  	corev1 "k8s.io/api/core/v1"
    10  	kerrors "k8s.io/apimachinery/pkg/api/errors"
    11  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    12  	"k8s.io/apimachinery/pkg/labels"
    13  )
    14  
    15  // createOrUpdateHeadlessEndpoints processes endpoints objects for exported
    16  // headless services. When an endpoints object is created or updated in the
    17  // remote cluster, it will be processed here in order to reconcile the local
    18  // cluster state with the remote cluster state.
    19  //
    20  // createOrUpdateHeadlessEndpoints is also responsible for creating the service
    21  // mirror in the source cluster. In order for an exported headless service to be
    22  // mirrored as headless, it must have at least one port defined and at least one
    23  // named address in its endpoints object (e.g a deployment would not work since
    24  // pods may not have arbitrary hostnames). As such, when an endpoints object is
    25  // first processed, if there is no mirror service, we create one, by looking at
    26  // the endpoints object itself. If the exported service is deemed to be valid
    27  // for headless mirroring, then the function will create the headless mirror and
    28  // then create an endpoints object for it in the source cluster. If it is not
    29  // valid, the exported service will be mirrored as clusterIP and its endpoints
    30  // will point to the gateway.
    31  //
    32  // When creating endpoints for a headless mirror, we also create an endpoint
    33  // mirror (clusterIP) service for each of the endpoints' named addresses. If the
    34  // headless mirror exists and has an endpoints object, we simply update by
    35  // either creating or deleting endpoint mirror services.
    36  func (rcsw *RemoteClusterServiceWatcher) createOrUpdateHeadlessEndpoints(ctx context.Context, exportedEndpoints *corev1.Endpoints) error {
    37  	exportedService, err := rcsw.remoteAPIClient.Svc().Lister().Services(exportedEndpoints.Namespace).Get(exportedEndpoints.Name)
    38  	if err != nil {
    39  		rcsw.log.Debugf("failed to retrieve exported service %s/%s when updating its headless mirror endpoints: %v", exportedEndpoints.Namespace, exportedEndpoints.Name, err)
    40  		return fmt.Errorf("error retrieving exported service %s/%s: %w", exportedEndpoints.Namespace, exportedEndpoints.Name, err)
    41  	}
    42  
    43  	// Check whether the endpoints should be processed for a headless exported
    44  	// service. If the exported service does not have any ports exposed, then
    45  	// neither will its corresponding endpoint mirrors, it should not be created
    46  	// as a headless mirror. If the service does not have any named addresses in
    47  	// its Endpoints object, then the endpoints should not be processed.
    48  	if len(exportedService.Spec.Ports) == 0 {
    49  		rcsw.recorder.Event(exportedService, corev1.EventTypeNormal, eventTypeSkipped, "Skipped mirroring service: object spec has no exposed ports")
    50  		rcsw.log.Infof("Skipped creating headless mirror for %s/%s: service object spec has no exposed ports", exportedService.Namespace, exportedService.Name)
    51  		return nil
    52  	}
    53  
    54  	mirrorServiceName := rcsw.mirroredResourceName(exportedService.Name)
    55  	mirrorService, err := rcsw.localAPIClient.Svc().Lister().Services(exportedService.Namespace).Get(mirrorServiceName)
    56  	if err != nil {
    57  		if !kerrors.IsNotFound(err) {
    58  			return err
    59  		}
    60  
    61  		// If the mirror service does not exist, create it, either as clusterIP
    62  		// or as headless.
    63  		mirrorService, err = rcsw.createRemoteHeadlessService(ctx, exportedService, exportedEndpoints)
    64  		if err != nil {
    65  			return err
    66  		}
    67  	}
    68  
    69  	headlessMirrorEpName := rcsw.mirroredResourceName(exportedEndpoints.Name)
    70  	headlessMirrorEndpoints, err := rcsw.localAPIClient.Endpoint().Lister().Endpoints(exportedEndpoints.Namespace).Get(headlessMirrorEpName)
    71  	if err != nil {
    72  		if !kerrors.IsNotFound(err) {
    73  			return err
    74  		}
    75  
    76  		if mirrorService.Spec.ClusterIP != corev1.ClusterIPNone {
    77  			return rcsw.createGatewayEndpoints(ctx, exportedService)
    78  		}
    79  
    80  		// Create endpoint mirrors for headless mirror
    81  		if err := rcsw.createHeadlessMirrorEndpoints(ctx, exportedService, exportedEndpoints); err != nil {
    82  			rcsw.log.Debugf("failed to create headless mirrors for endpoints %s/%s: %v", exportedEndpoints.Namespace, exportedEndpoints.Name, err)
    83  			return err
    84  		}
    85  
    86  		return nil
    87  	}
    88  
    89  	// Past this point, we do not want to process a mirror service that is not
    90  	// headless. We want to process only endpoints for headless mirrors; before
    91  	// this point it would have been possible to have a clusterIP mirror, since
    92  	// we are creating the mirror service in the scope of the function.
    93  	if mirrorService.Spec.ClusterIP != corev1.ClusterIPNone {
    94  		return nil
    95  	}
    96  
    97  	mirrorEndpoints := headlessMirrorEndpoints.DeepCopy()
    98  	endpointMirrors := make(map[string]struct{})
    99  	newSubsets := make([]corev1.EndpointSubset, 0, len(exportedEndpoints.Subsets))
   100  	for _, subset := range exportedEndpoints.Subsets {
   101  		newAddresses := make([]corev1.EndpointAddress, 0, len(subset.Addresses))
   102  		for _, address := range subset.Addresses {
   103  			if address.Hostname == "" {
   104  				continue
   105  			}
   106  
   107  			endpointMirrorName := rcsw.mirroredResourceName(address.Hostname)
   108  			endpointMirrorService, err := rcsw.localAPIClient.Svc().Lister().Services(exportedEndpoints.Namespace).Get(endpointMirrorName)
   109  			if err != nil {
   110  				if !kerrors.IsNotFound(err) {
   111  					return err
   112  				}
   113  				// If the error is 'NotFound' then the Endpoint Mirror service
   114  				// does not exist, so create it.
   115  				endpointMirrorService, err = rcsw.createEndpointMirrorService(ctx, address.Hostname, exportedEndpoints.ResourceVersion, endpointMirrorName, exportedService)
   116  				if err != nil {
   117  					return err
   118  				}
   119  			}
   120  
   121  			endpointMirrors[endpointMirrorName] = struct{}{}
   122  			newAddresses = append(newAddresses, corev1.EndpointAddress{
   123  				Hostname: address.Hostname,
   124  				IP:       endpointMirrorService.Spec.ClusterIP,
   125  			})
   126  		}
   127  
   128  		if len(newAddresses) == 0 {
   129  			continue
   130  		}
   131  
   132  		// copy ports, create subset
   133  		newSubsets = append(newSubsets, corev1.EndpointSubset{
   134  			Addresses: newAddresses,
   135  			Ports:     subset.DeepCopy().Ports,
   136  		})
   137  	}
   138  
   139  	headlessMirrorName := rcsw.mirroredResourceName(exportedService.Name)
   140  	matchLabels := map[string]string{
   141  		consts.MirroredHeadlessSvcNameLabel: headlessMirrorName,
   142  	}
   143  
   144  	// Fetch all Endpoint Mirror services that belong to the same Headless Mirror
   145  	endpointMirrorServices, err := rcsw.localAPIClient.Svc().Lister().List(labels.Set(matchLabels).AsSelector())
   146  	if err != nil {
   147  		return err
   148  	}
   149  
   150  	var errors []error
   151  	for _, service := range endpointMirrorServices {
   152  		// If the service's name does not show up in the up-to-date map of
   153  		// Endpoint Mirror names, then we should delete it.
   154  		if _, found := endpointMirrors[service.Name]; found {
   155  			continue
   156  		}
   157  		err := rcsw.localAPIClient.Client.CoreV1().Services(service.Namespace).Delete(ctx, service.Name, metav1.DeleteOptions{})
   158  		if err != nil {
   159  			if !kerrors.IsNotFound(err) {
   160  				errors = append(errors, fmt.Errorf("error deleting Endpoint Mirror service %s/%s: %w", service.Namespace, service.Name, err))
   161  			}
   162  		}
   163  	}
   164  
   165  	if len(errors) > 0 {
   166  		return RetryableError{errors}
   167  	}
   168  
   169  	// Update endpoints
   170  	mirrorEndpoints.Subsets = newSubsets
   171  	err = rcsw.updateMirrorEndpoints(ctx, mirrorEndpoints)
   172  	if err != nil {
   173  		return RetryableError{[]error{err}}
   174  	}
   175  
   176  	return nil
   177  }
   178  
   179  // createRemoteHeadlessService creates a mirror service for an exported headless
   180  // service. Whether the mirror will be created as a headless or clusterIP
   181  // service depends on the endpoints object associated with the exported service.
   182  // If there is at least one named address, then the service will be mirrored as
   183  // headless.
   184  //
   185  // Note: we do not check for any exposed ports because it was previously done
   186  // when the service was picked up by the service mirror. We also do not need to
   187  // check if the exported service is headless; its endpoints will be processed
   188  // only if it is headless so we are certain at this point that is the case.
   189  func (rcsw *RemoteClusterServiceWatcher) createRemoteHeadlessService(ctx context.Context, exportedService *corev1.Service, exportedEndpoints *corev1.Endpoints) (*corev1.Service, error) {
   190  	// If we don't have any subsets to process then avoid creating the service.
   191  	// We need at least one address to be make a decision (whether we should
   192  	// create as clusterIP or headless), rely on the endpoints being eventually
   193  	// consistent, will likely receive an update with subsets.
   194  	if len(exportedEndpoints.Subsets) == 0 {
   195  		return &corev1.Service{}, nil
   196  	}
   197  
   198  	remoteService := exportedService.DeepCopy()
   199  	serviceInfo := fmt.Sprintf("%s/%s", remoteService.Namespace, remoteService.Name)
   200  	localServiceName := rcsw.mirroredResourceName(remoteService.Name)
   201  
   202  	// Ensure the namespace exists, and skip mirroring if it doesn't
   203  	if _, err := rcsw.localAPIClient.NS().Lister().Get(remoteService.Namespace); err != nil {
   204  		if kerrors.IsNotFound(err) {
   205  			rcsw.log.Warnf("Skipping mirroring of service %s: namespace %s does not exist", serviceInfo, remoteService.Namespace)
   206  			return &corev1.Service{}, nil
   207  		}
   208  		// something else went wrong, so we can just retry
   209  		return nil, RetryableError{[]error{err}}
   210  	}
   211  
   212  	serviceToCreate := &corev1.Service{
   213  		ObjectMeta: metav1.ObjectMeta{
   214  			Name:        localServiceName,
   215  			Namespace:   remoteService.Namespace,
   216  			Annotations: rcsw.getMirroredServiceAnnotations(remoteService),
   217  			Labels:      rcsw.getMirroredServiceLabels(remoteService),
   218  		},
   219  		Spec: corev1.ServiceSpec{
   220  			Ports: remapRemoteServicePorts(remoteService.Spec.Ports),
   221  		},
   222  	}
   223  
   224  	if shouldExportAsHeadlessService(exportedEndpoints, rcsw.log) {
   225  		serviceToCreate.Spec.ClusterIP = corev1.ClusterIPNone
   226  		rcsw.log.Infof("Creating a new headless service mirror for %s", serviceInfo)
   227  	} else {
   228  		rcsw.log.Infof("Creating a new service mirror for %s", serviceInfo)
   229  	}
   230  
   231  	svc, err := rcsw.localAPIClient.Client.CoreV1().Services(remoteService.Namespace).Create(ctx, serviceToCreate, metav1.CreateOptions{})
   232  	if err != nil {
   233  		if !kerrors.IsAlreadyExists(err) {
   234  			// we might have created it during earlier attempt, if that is not the case, we retry
   235  			return &corev1.Service{}, RetryableError{[]error{err}}
   236  		}
   237  	}
   238  
   239  	return svc, err
   240  }
   241  
   242  // createHeadlessMirrorEndpoints creates an endpoints object for a Headless
   243  // Mirror service. The endpoints object will contain the same subsets and hosts
   244  // as the endpoints object of the exported headless service. Each host in the
   245  // Headless Mirror's endpoints object will point to an Endpoint Mirror service.
   246  func (rcsw *RemoteClusterServiceWatcher) createHeadlessMirrorEndpoints(ctx context.Context, exportedService *corev1.Service, exportedEndpoints *corev1.Endpoints) error {
   247  	exportedServiceInfo := fmt.Sprintf("%s/%s", exportedService.Namespace, exportedService.Name)
   248  	endpointsHostnames := make(map[string]struct{})
   249  	subsetsToCreate := make([]corev1.EndpointSubset, 0, len(exportedEndpoints.Subsets))
   250  	for _, subset := range exportedEndpoints.Subsets {
   251  		newAddresses := make([]corev1.EndpointAddress, 0, len(subset.Addresses))
   252  		for _, addr := range subset.Addresses {
   253  			if addr.Hostname == "" {
   254  				continue
   255  			}
   256  
   257  			endpointMirrorName := rcsw.mirroredResourceName(addr.Hostname)
   258  			createdService, err := rcsw.createEndpointMirrorService(ctx, addr.Hostname, exportedEndpoints.ResourceVersion, endpointMirrorName, exportedService)
   259  			if err != nil {
   260  				rcsw.log.Errorf("error creating endpoint mirror service %s/%s for exported headless service %s: %v", endpointMirrorName, exportedService.Namespace, exportedServiceInfo, err)
   261  				continue
   262  			}
   263  
   264  			endpointsHostnames[addr.Hostname] = struct{}{}
   265  			newAddresses = append(newAddresses, corev1.EndpointAddress{
   266  				Hostname: addr.TargetRef.Name,
   267  				IP:       createdService.Spec.ClusterIP,
   268  			})
   269  
   270  		}
   271  
   272  		if len(newAddresses) == 0 {
   273  			continue
   274  		}
   275  
   276  		subsetsToCreate = append(subsetsToCreate, corev1.EndpointSubset{
   277  			Addresses: newAddresses,
   278  			Ports:     subset.DeepCopy().Ports,
   279  		})
   280  	}
   281  
   282  	headlessMirrorServiceName := rcsw.mirroredResourceName(exportedService.Name)
   283  	headlessMirrorEndpoints := &corev1.Endpoints{
   284  		ObjectMeta: metav1.ObjectMeta{
   285  			Name:      headlessMirrorServiceName,
   286  			Namespace: exportedService.Namespace,
   287  			Labels: map[string]string{
   288  				consts.MirroredResourceLabel:  "true",
   289  				consts.RemoteClusterNameLabel: rcsw.link.TargetClusterName,
   290  			},
   291  			Annotations: map[string]string{
   292  				consts.RemoteServiceFqName: fmt.Sprintf("%s.%s.svc.%s", exportedService.Name, exportedService.Namespace, rcsw.link.TargetClusterDomain),
   293  			},
   294  		},
   295  		Subsets: subsetsToCreate,
   296  	}
   297  
   298  	if rcsw.link.GatewayIdentity != "" {
   299  		headlessMirrorEndpoints.Annotations[consts.RemoteGatewayIdentity] = rcsw.link.GatewayIdentity
   300  	}
   301  
   302  	rcsw.log.Infof("Creating a new headless mirror endpoints object for headless mirror %s/%s", headlessMirrorServiceName, exportedService.Namespace)
   303  	// The addresses for the headless mirror service point to the Cluster IPs
   304  	// of auxiliary services that are tied to gateway liveness. Therefore,
   305  	// these addresses should always be considered ready.
   306  	_, err := rcsw.localAPIClient.Client.CoreV1().Endpoints(exportedService.Namespace).Create(ctx, headlessMirrorEndpoints, metav1.CreateOptions{})
   307  	if err != nil {
   308  		if svcErr := rcsw.localAPIClient.Client.CoreV1().Services(exportedService.Namespace).Delete(ctx, headlessMirrorServiceName, metav1.DeleteOptions{}); svcErr != nil {
   309  			rcsw.log.Errorf("failed to delete Service %s after Endpoints creation failed: %s", headlessMirrorServiceName, svcErr)
   310  		}
   311  		return RetryableError{[]error{err}}
   312  	}
   313  
   314  	return nil
   315  }
   316  
   317  // createEndpointMirrorService creates a new Endpoint Mirror service and its
   318  // corresponding endpoints object. It returns the newly created Endpoint Mirror
   319  // service object. When a headless service is exported, we create a Headless
   320  // Mirror service in the source cluster and then for each hostname in the
   321  // exported service's endpoints object, we also create an Endpoint Mirror
   322  // service (and its corresponding endpoints object).
   323  func (rcsw *RemoteClusterServiceWatcher) createEndpointMirrorService(ctx context.Context, endpointHostname, resourceVersion, endpointMirrorName string, exportedService *corev1.Service) (*corev1.Service, error) {
   324  	gatewayAddresses, err := rcsw.resolveGatewayAddress()
   325  	if err != nil {
   326  		return nil, err
   327  	}
   328  
   329  	endpointMirrorAnnotations := map[string]string{
   330  		consts.RemoteResourceVersionAnnotation: resourceVersion, // needed to detect real changes
   331  		consts.RemoteServiceFqName:             fmt.Sprintf("%s.%s.%s.svc.%s", endpointHostname, exportedService.Name, exportedService.Namespace, rcsw.link.TargetClusterDomain),
   332  	}
   333  
   334  	endpointMirrorLabels := rcsw.getMirroredServiceLabels(nil)
   335  	mirrorServiceName := rcsw.mirroredResourceName(exportedService.Name)
   336  	endpointMirrorLabels[consts.MirroredHeadlessSvcNameLabel] = mirrorServiceName
   337  
   338  	// Create service spec, clusterIP
   339  	endpointMirrorService := &corev1.Service{
   340  		ObjectMeta: metav1.ObjectMeta{
   341  			Name:        endpointMirrorName,
   342  			Namespace:   exportedService.Namespace,
   343  			Annotations: endpointMirrorAnnotations,
   344  			Labels:      endpointMirrorLabels,
   345  		},
   346  		Spec: corev1.ServiceSpec{
   347  			Ports: remapRemoteServicePorts(exportedService.Spec.Ports),
   348  		},
   349  	}
   350  	endpointMirrorEndpoints := &corev1.Endpoints{
   351  		ObjectMeta: metav1.ObjectMeta{
   352  			Name:      endpointMirrorService.Name,
   353  			Namespace: endpointMirrorService.Namespace,
   354  			Labels:    endpointMirrorLabels,
   355  			Annotations: map[string]string{
   356  				consts.RemoteServiceFqName: endpointMirrorService.Annotations[consts.RemoteServiceFqName],
   357  			},
   358  		},
   359  		Subsets: []corev1.EndpointSubset{
   360  			{
   361  				Addresses: gatewayAddresses,
   362  				Ports:     rcsw.getEndpointsPorts(exportedService),
   363  			},
   364  		},
   365  	}
   366  
   367  	if rcsw.link.GatewayIdentity != "" {
   368  		endpointMirrorEndpoints.Annotations[consts.RemoteGatewayIdentity] = rcsw.link.GatewayIdentity
   369  	}
   370  
   371  	exportedServiceInfo := fmt.Sprintf("%s/%s", exportedService.Namespace, exportedService.Name)
   372  	endpointMirrorInfo := fmt.Sprintf("%s/%s", endpointMirrorService.Namespace, endpointMirrorName)
   373  	rcsw.log.Infof("Creating a new endpoint mirror service %s for exported headless service %s", endpointMirrorInfo, exportedServiceInfo)
   374  	createdService, err := rcsw.localAPIClient.Client.CoreV1().Services(endpointMirrorService.Namespace).Create(ctx, endpointMirrorService, metav1.CreateOptions{})
   375  	if err != nil {
   376  		if !kerrors.IsAlreadyExists(err) {
   377  			// we might have created it during earlier attempt, if that is not the case, we retry
   378  			return createdService, RetryableError{[]error{err}}
   379  		}
   380  	}
   381  
   382  	rcsw.log.Infof("Creating a new endpoints object for endpoint mirror service %s", endpointMirrorInfo)
   383  	err = rcsw.createMirrorEndpoints(ctx, endpointMirrorEndpoints)
   384  	if err != nil {
   385  		if svcErr := rcsw.localAPIClient.Client.CoreV1().Services(endpointMirrorService.Namespace).Delete(ctx, endpointMirrorName, metav1.DeleteOptions{}); svcErr != nil {
   386  			rcsw.log.Errorf("Failed to delete service %s after endpoints creation failed: %s", endpointMirrorName, svcErr)
   387  		}
   388  		return createdService, RetryableError{[]error{err}}
   389  	}
   390  	return createdService, nil
   391  }
   392  
   393  // shouldExportAsHeadlessService checks if an exported service should be
   394  // mirrored as a headless service or as a clusterIP service, based on its
   395  // endpoints object. For an exported service to be a headless mirror, it needs
   396  // to have at least one named address in its endpoints (that is, a pod with a
   397  // `hostname`). If the endpoints object does not contain at least one named
   398  // address, it should be exported as clusterIP.
   399  func shouldExportAsHeadlessService(endpoints *corev1.Endpoints, log *logging.Entry) bool {
   400  	for _, subset := range endpoints.Subsets {
   401  		for _, addr := range subset.Addresses {
   402  			if addr.Hostname != "" {
   403  				return true
   404  			}
   405  		}
   406  
   407  		for _, addr := range subset.NotReadyAddresses {
   408  			if addr.Hostname != "" {
   409  				return true
   410  			}
   411  		}
   412  	}
   413  	log.Infof("Service %s/%s should not be exported as headless: no named addresses in its endpoints object", endpoints.Namespace, endpoints.Name)
   414  	return false
   415  }
   416  
   417  // isHeadlessEndpoints checks if an endpoints object belongs to a
   418  // headless service.
   419  func isHeadlessEndpoints(ep *corev1.Endpoints, log *logging.Entry) bool {
   420  	if _, found := ep.Labels[corev1.IsHeadlessService]; !found {
   421  		// Not an Endpoints object for a headless service? Then we likely don't want
   422  		// to update anything.
   423  		log.Debugf("skipped processing endpoints object %s/%s: missing %s label", ep.Namespace, ep.Name, corev1.IsHeadlessService)
   424  		return false
   425  	}
   426  
   427  	return true
   428  }
   429  

View as plain text