...

Source file src/github.com/linkerd/linkerd2/controller/api/destination/external-workload/endpoints_reconciler.go

Documentation: github.com/linkerd/linkerd2/controller/api/destination/external-workload

     1  package externalworkload
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"sort"
     7  
     8  	ewv1beta1 "github.com/linkerd/linkerd2/controller/gen/apis/externalworkload/v1beta1"
     9  	"github.com/linkerd/linkerd2/controller/k8s"
    10  	logging "github.com/sirupsen/logrus"
    11  	corev1 "k8s.io/api/core/v1"
    12  	discoveryv1 "k8s.io/api/discovery/v1"
    13  	"k8s.io/apimachinery/pkg/api/errors"
    14  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    15  	"k8s.io/apimachinery/pkg/runtime/schema"
    16  	utilerrors "k8s.io/apimachinery/pkg/util/errors"
    17  	"k8s.io/apimachinery/pkg/util/intstr"
    18  	epsliceutil "k8s.io/endpointslice/util"
    19  	utilnet "k8s.io/utils/net"
    20  )
    21  
    22  // endpointsReconciler is a subcomponent of the EndpointsController.
    23  //
    24  // Its main responsibility is to reconcile a service's endpoints (by diffing
    25  // states) and keeping track of any drifts between the informer cache and what
    26  // has been written to the API Server
    27  type endpointsReconciler struct {
    28  	k8sAPI         *k8s.API
    29  	log            *logging.Entry
    30  	controllerName string
    31  	// Upstream utility component that will internally track the most recent
    32  	// resourceVersion observed for an EndpointSlice
    33  	endpointTracker *epsliceutil.EndpointSliceTracker
    34  	maxEndpoints    int
    35  	// TODO (matei): add metrics around events
    36  }
    37  
    38  // endpointMeta is a helper struct that incldues attributes slices will be
    39  // grouped on (i.e. ports and the address family supported).
    40  //
    41  // Note: this is inspired from the upstream EndpointSlice controller impl.
    42  type endpointMeta struct {
    43  	ports       []discoveryv1.EndpointPort
    44  	addressType discoveryv1.AddressType
    45  }
    46  
    47  // newEndpointsReconciler takes an API client and returns a reconciler with
    48  // logging and a tracker set-up
    49  func newEndpointsReconciler(k8sAPI *k8s.API, controllerName string, maxEndpoints int) *endpointsReconciler {
    50  	return &endpointsReconciler{
    51  		k8sAPI,
    52  		logging.WithFields(logging.Fields{
    53  			"component": "external-endpoints-reconciler",
    54  		}),
    55  		controllerName,
    56  		epsliceutil.NewEndpointSliceTracker(),
    57  		maxEndpoints,
    58  	}
    59  
    60  }
    61  
    62  // === Reconciler ===
    63  
    64  // reconcile is the main entry-point for the reconciler's work.
    65  //
    66  // It accepts a slice of external workloads and their corresponding service.
    67  // Optionally, if the controller has previously created any slices for this
    68  // service, these will also be passed in. The reconciler will:
    69  //
    70  // * Determine what address types the service supports
    71  // * For each address type, it will determine which slices to process (an
    72  // EndpointSlice is specialised and supports only one type)
    73  func (r *endpointsReconciler) reconcile(svc *corev1.Service, ews []*ewv1beta1.ExternalWorkload, existingSlices []*discoveryv1.EndpointSlice) error {
    74  	toDelete := []*discoveryv1.EndpointSlice{}
    75  	slicesByAddrType := make(map[discoveryv1.AddressType][]*discoveryv1.EndpointSlice)
    76  	errs := []error{}
    77  
    78  	// Get the list of supported address types for the service
    79  	supportedAddrTypes := getSupportedAddressTypes(svc)
    80  	for _, slice := range existingSlices {
    81  		// If a slice has an address type that the slice does not support, then
    82  		// it should be deleted
    83  		_, supported := supportedAddrTypes[slice.AddressType]
    84  		if !supported {
    85  			toDelete = append(toDelete, slice)
    86  			continue
    87  		}
    88  
    89  		// If this is the first time we see this address type, create the list
    90  		// in the set.
    91  		if _, ok := slicesByAddrType[slice.AddressType]; !ok {
    92  			slicesByAddrType[slice.AddressType] = []*discoveryv1.EndpointSlice{}
    93  		}
    94  
    95  		slicesByAddrType[slice.AddressType] = append(slicesByAddrType[slice.AddressType], slice)
    96  	}
    97  
    98  	// For each supported address type, reconcile endpoint slices that match the
    99  	// given type
   100  	for addrType := range supportedAddrTypes {
   101  		existingSlices := slicesByAddrType[addrType]
   102  		err := r.reconcileByAddressType(svc, ews, existingSlices, addrType)
   103  		if err != nil {
   104  			errs = append(errs, err)
   105  		}
   106  	}
   107  
   108  	// delete services whose address type is no longer supported by the service
   109  	for _, slice := range toDelete {
   110  		err := r.k8sAPI.Client.DiscoveryV1().EndpointSlices(svc.Namespace).Delete(context.TODO(), slice.Name, metav1.DeleteOptions{})
   111  		if err != nil {
   112  			errs = append(errs, err)
   113  		}
   114  	}
   115  
   116  	return utilerrors.NewAggregate(errs)
   117  }
   118  
   119  // reconcileByAddressType operates on a set of external workloads, their
   120  // service, and any endpointslices that have been created by the controller. It
   121  // will compute the diff that needs to be written to the API Server.
   122  func (r *endpointsReconciler) reconcileByAddressType(svc *corev1.Service, extWorkloads []*ewv1beta1.ExternalWorkload, existingSlices []*discoveryv1.EndpointSlice, addrType discoveryv1.AddressType) error {
   123  	slicesToCreate := []*discoveryv1.EndpointSlice{}
   124  	slicesToUpdate := []*discoveryv1.EndpointSlice{}
   125  	slicesToDelete := []*discoveryv1.EndpointSlice{}
   126  
   127  	// We start the reconciliation by checking ownerRefs
   128  	//
   129  	// We follow the upstream here and look at our existing slices and segment
   130  	// by ports.
   131  	existingSlicesByPorts := map[epsliceutil.PortMapKey][]*discoveryv1.EndpointSlice{}
   132  	for _, slice := range existingSlices {
   133  		// Loop through the endpointslices and figure out which endpointslice
   134  		// does not have an ownerRef set to the service. If a slice has been
   135  		// selected but does not point to the service, we delete it.
   136  		if ownedBy(slice, svc) {
   137  			hash := epsliceutil.NewPortMapKey(slice.Ports)
   138  			existingSlicesByPorts[hash] = append(existingSlicesByPorts[hash], slice)
   139  		} else {
   140  			slicesToDelete = append(slicesToDelete, slice)
   141  		}
   142  	}
   143  
   144  	// desiredEndpointsByPortMap represents a set of endpoints grouped together
   145  	// by the list of ports they use. These are the endpoints that we will keep
   146  	// and write to the API server.
   147  	desiredEndpointsByPortMap := map[epsliceutil.PortMapKey]epsliceutil.EndpointSet{}
   148  	// desiredMetaByPortMap represents grouping metadata keyed off by the same
   149  	// hashed port list as the endpoints.
   150  	desiredMetaByPortMap := map[epsliceutil.PortMapKey]*endpointMeta{}
   151  
   152  	for _, extWorkload := range extWorkloads {
   153  		// We skip workloads with no IPs.
   154  		//
   155  		// Note: workloads only have a 'Ready' status so we do not care about
   156  		// other status conditions.
   157  		if len(extWorkload.Spec.WorkloadIPs) == 0 {
   158  			continue
   159  		}
   160  
   161  		// Find which ports a service selects (or maps to) on an external workload
   162  		// Note: we require all workload ports are documented. Pods do not have
   163  		// to document all of their container ports.
   164  		ports := r.findEndpointPorts(svc, extWorkload)
   165  		portHash := epsliceutil.NewPortMapKey(ports)
   166  		if _, ok := desiredMetaByPortMap[portHash]; !ok {
   167  			desiredMetaByPortMap[portHash] = &endpointMeta{ports, addrType}
   168  		}
   169  
   170  		if _, ok := desiredEndpointsByPortMap[portHash]; !ok {
   171  			desiredEndpointsByPortMap[portHash] = epsliceutil.EndpointSet{}
   172  		}
   173  
   174  		ep := externalWorkloadToEndpoint(addrType, extWorkload, svc)
   175  		if len(ep.Addresses) > 0 {
   176  			desiredEndpointsByPortMap[portHash].Insert(&ep)
   177  		}
   178  	}
   179  
   180  	for portKey, desiredEndpoints := range desiredEndpointsByPortMap {
   181  		create, update, del := r.reconcileEndpointsByPortMap(svc, existingSlicesByPorts[portKey], desiredEndpoints, desiredMetaByPortMap[portKey])
   182  		slicesToCreate = append(slicesToCreate, create...)
   183  		slicesToUpdate = append(slicesToUpdate, update...)
   184  		slicesToDelete = append(slicesToDelete, del...)
   185  	}
   186  
   187  	// If there are any slices whose ports no longer match what we want in our
   188  	// current reconciliation, delete them
   189  	for portHash, existingSlices := range existingSlicesByPorts {
   190  		if _, ok := desiredEndpointsByPortMap[portHash]; !ok {
   191  			slicesToDelete = append(slicesToDelete, existingSlices...)
   192  		}
   193  	}
   194  
   195  	return r.finalize(svc, slicesToCreate, slicesToUpdate, slicesToDelete)
   196  }
   197  
   198  // reconcileEndpointsByPortMap will compute the state diff to be written to the
   199  // API Server for a service. The function takes into account any existing
   200  // endpoint slices and any external workloads matched by the service.
   201  // The function works on slices and workloads that have been already grouped by
   202  // a common set of ports.
   203  func (r *endpointsReconciler) reconcileEndpointsByPortMap(svc *corev1.Service, existingSlices []*discoveryv1.EndpointSlice, desiredEps epsliceutil.EndpointSet, desiredMeta *endpointMeta) ([]*discoveryv1.EndpointSlice, []*discoveryv1.EndpointSlice, []*discoveryv1.EndpointSlice) {
   204  	slicesByName := map[string]*discoveryv1.EndpointSlice{}
   205  	sliceNamesUnchanged := map[string]struct{}{}
   206  	sliceNamesToUpdate := map[string]struct{}{}
   207  	sliceNamesToDelete := map[string]struct{}{}
   208  
   209  	// 1. Figure out which endpoints are no longer required in the existing
   210  	// slices, and update endpoints that have changed
   211  	for _, existingSlice := range existingSlices {
   212  		slicesByName[existingSlice.Name] = existingSlice
   213  		keepEndpoints := []discoveryv1.Endpoint{}
   214  		epUpdated := false
   215  		for _, endpoint := range existingSlice.Endpoints {
   216  			endpoint := endpoint // pin
   217  			found := desiredEps.Get(&endpoint)
   218  			// If the endpoint is desired (i.e. a workload exists with an IP and
   219  			// we want to add it to the service's endpoints), then we should
   220  			// keep it.
   221  			if found != nil {
   222  				keepEndpoints = append(keepEndpoints, *found)
   223  				// We know the slice already contains an endpoint we want, but
   224  				// has the endpoint changed? If yes, we need to persist it
   225  				if !epsliceutil.EndpointsEqualBeyondHash(found, &endpoint) {
   226  					epUpdated = true
   227  				}
   228  
   229  				// Once an endpoint has been found in a slice, we can delete it
   230  				desiredEps.Delete(&endpoint)
   231  			}
   232  		}
   233  
   234  		// Re-generate labels and see whether service's labels have changed
   235  		labels, labelsChanged := setEndpointSliceLabels(existingSlice, svc, r.controllerName)
   236  
   237  		// Consider what kind of reconciliation we should proceed with:
   238  		//
   239  		// 1. We can have a set of endpoints that have changed; this can either
   240  		// mean we need to update the endpoints, or it can also mean we have no
   241  		// endpoints to keep.
   242  		// 2. We need to update the slice's metadata because labels have
   243  		// changed.
   244  		// 3. Slice remains unchanged so we have a noop on our hands
   245  		if epUpdated || len(existingSlice.Endpoints) != len(keepEndpoints) {
   246  			if len(keepEndpoints) == 0 {
   247  				// When there are no endpoints to keep, then the slice should be
   248  				// deleted
   249  				sliceNamesToDelete[existingSlice.Name] = struct{}{}
   250  			} else {
   251  				// There is at least one endpoint to keep / update
   252  				slice := existingSlice.DeepCopy()
   253  				slice.Labels = labels
   254  				slice.Endpoints = keepEndpoints
   255  				sliceNamesToUpdate[slice.Name] = struct{}{}
   256  				slicesByName[slice.Name] = slice
   257  			}
   258  		} else if labelsChanged {
   259  			slice := existingSlice.DeepCopy()
   260  			slice.Labels = labels
   261  			sliceNamesToUpdate[slice.Name] = struct{}{}
   262  			slicesByName[slice.Name] = slice
   263  		} else {
   264  			// Unchanged, we save it for later.
   265  			// unchanged slices may receive new endpoints that are leftover if
   266  			// they're not past their quotaca
   267  			sliceNamesUnchanged[existingSlice.Name] = struct{}{}
   268  		}
   269  	}
   270  
   271  	// 2. If we still have desired endpoints left, but they haven't matched any
   272  	// endpoint that already exists in a slice, we need to add it somewhere.
   273  	//
   274  	// We start by adding our leftover endpoints to the list of endpoints we
   275  	// will update anyway (to save a write).
   276  	if desiredEps.Len() > 0 && len(sliceNamesToUpdate) > 0 {
   277  		slices := []*discoveryv1.EndpointSlice{}
   278  		for sliceName := range sliceNamesToUpdate {
   279  			slices = append(slices, slicesByName[sliceName])
   280  		}
   281  
   282  		// Sort in descending order of capacity; fullest first.
   283  		sort.Slice(slices, func(i, j int) bool {
   284  			return len(slices[i].Endpoints) > len(slices[j].Endpoints)
   285  		})
   286  
   287  		// Iterate and fill up the slices
   288  		for _, slice := range slices {
   289  			for desiredEps.Len() > 0 && len(slice.Endpoints) < r.maxEndpoints {
   290  				ep, _ := desiredEps.PopAny()
   291  				slice.Endpoints = append(slice.Endpoints, *ep)
   292  			}
   293  		}
   294  	}
   295  
   296  	// If we have remaining endpoints, we need to deal with them
   297  	// by using unchanged slices or creating new ones
   298  	slicesToCreate := []*discoveryv1.EndpointSlice{}
   299  	for desiredEps.Len() > 0 {
   300  		var sliceToFill *discoveryv1.EndpointSlice
   301  
   302  		// Deal with any remaining endpoints by:
   303  		// (a) adding to unchanged slices first
   304  		if desiredEps.Len() < r.maxEndpoints && len(sliceNamesUnchanged) > 0 {
   305  			unchangedSlices := []*discoveryv1.EndpointSlice{}
   306  			for unchangedSlice := range sliceNamesUnchanged {
   307  				unchangedSlices = append(unchangedSlices, slicesByName[unchangedSlice])
   308  			}
   309  
   310  			sliceToFill = getSliceToFill(unchangedSlices, desiredEps.Len(), r.maxEndpoints)
   311  		}
   312  
   313  		// If we have no unchanged slice to fill, then
   314  		// (b) create a new slice
   315  		if sliceToFill == nil {
   316  			sliceToFill = newEndpointSlice(svc, desiredMeta, r.controllerName)
   317  		} else {
   318  			// deep copy required to mutate slice
   319  			sliceToFill = sliceToFill.DeepCopy()
   320  			slicesByName[sliceToFill.Name] = sliceToFill
   321  		}
   322  
   323  		// Fill out the slice
   324  		for desiredEps.Len() > 0 && len(sliceToFill.Endpoints) < r.maxEndpoints {
   325  			ep, _ := desiredEps.PopAny()
   326  			sliceToFill.Endpoints = append(sliceToFill.Endpoints, *ep)
   327  		}
   328  
   329  		// Figure out what kind of slice we just filled and update the diffed
   330  		// state
   331  		if sliceToFill.Name != "" {
   332  			sliceNamesToUpdate[sliceToFill.Name] = struct{}{}
   333  			delete(sliceNamesUnchanged, sliceToFill.Name)
   334  		} else {
   335  			slicesToCreate = append(slicesToCreate, sliceToFill)
   336  		}
   337  	}
   338  
   339  	slicesToUpdate := []*discoveryv1.EndpointSlice{}
   340  	for name := range sliceNamesToUpdate {
   341  		slicesToUpdate = append(slicesToUpdate, slicesByName[name])
   342  	}
   343  
   344  	slicesToDelete := []*discoveryv1.EndpointSlice{}
   345  	for name := range sliceNamesToDelete {
   346  		slicesToDelete = append(slicesToDelete, slicesByName[name])
   347  	}
   348  
   349  	return slicesToCreate, slicesToUpdate, slicesToDelete
   350  }
   351  
   352  // finalize performs writes to the API Server to update the state after it's
   353  // been diffed.
   354  func (r *endpointsReconciler) finalize(svc *corev1.Service, slicesToCreate, slicesToUpdate, slicesToDelete []*discoveryv1.EndpointSlice) error {
   355  	// If there are slices to create and delete, change the creates to updates
   356  	// of the slices that would otherwise be deleted.
   357  	for i := 0; i < len(slicesToDelete); {
   358  		if len(slicesToCreate) == 0 {
   359  			break
   360  		}
   361  		sliceToDelete := slicesToDelete[i]
   362  		slice := slicesToCreate[len(slicesToCreate)-1]
   363  		// Only update EndpointSlices that are owned by this Service and have
   364  		// the same AddressType. We need to avoid updating EndpointSlices that
   365  		// are being garbage collected for an old Service with the same name.
   366  		// The AddressType field is immutable. Since Services also consider
   367  		// IPFamily immutable, the only case where this should matter will be
   368  		// the migration from IP to IPv4 and IPv6 AddressTypes, where there's a
   369  		// chance EndpointSlices with an IP AddressType would otherwise be
   370  		// updated to IPv4 or IPv6 without this check.
   371  		if sliceToDelete.AddressType == slice.AddressType && ownedBy(sliceToDelete, svc) {
   372  			slice.Name = sliceToDelete.Name
   373  			slicesToCreate = slicesToCreate[:len(slicesToCreate)-1]
   374  			slicesToUpdate = append(slicesToUpdate, slice)
   375  			slicesToDelete = append(slicesToDelete[:i], slicesToDelete[i+1:]...)
   376  		} else {
   377  			i++
   378  		}
   379  	}
   380  
   381  	r.log.Debugf("reconciliation result for %s/%s: %d to add, %d to update, %d to remove", svc.Namespace, svc.Name, len(slicesToCreate), len(slicesToUpdate), len(slicesToDelete))
   382  
   383  	// Create EndpointSlices only if the service has not been marked for
   384  	// deletion; according to the upstream implementation not doing so has the
   385  	// potential to cause race conditions
   386  	if svc.DeletionTimestamp == nil {
   387  		// TODO: context with timeout
   388  		for _, slice := range slicesToCreate {
   389  			r.log.Tracef("starting create: %s/%s", slice.Namespace, slice.Name)
   390  			createdSlice, err := r.k8sAPI.Client.DiscoveryV1().EndpointSlices(svc.Namespace).Create(context.TODO(), slice, metav1.CreateOptions{})
   391  			if err != nil {
   392  				// If the namespace  is terminating, operations will not
   393  				// succeed. Drop the entire reconiliation effort
   394  				if errors.HasStatusCause(err, corev1.NamespaceTerminatingCause) {
   395  					return nil
   396  				}
   397  
   398  				return err
   399  			}
   400  			r.endpointTracker.Update(createdSlice)
   401  			r.log.Tracef("finished creating: %s/%s", createdSlice.Namespace, createdSlice.Name)
   402  		}
   403  	}
   404  
   405  	for _, slice := range slicesToUpdate {
   406  		r.log.Tracef("starting update: %s/%s", slice.Namespace, slice.Name)
   407  		updatedSlice, err := r.k8sAPI.Client.DiscoveryV1().EndpointSlices(svc.Namespace).Update(context.TODO(), slice, metav1.UpdateOptions{})
   408  		if err != nil {
   409  			return err
   410  		}
   411  		r.endpointTracker.Update(updatedSlice)
   412  		r.log.Tracef("finished updating: %s/%s", updatedSlice.Namespace, updatedSlice.Name)
   413  	}
   414  
   415  	for _, slice := range slicesToDelete {
   416  		r.log.Tracef("starting delete: %s/%s", slice.Namespace, slice.Name)
   417  		err := r.k8sAPI.Client.DiscoveryV1().EndpointSlices(svc.Namespace).Delete(context.TODO(), slice.Name, metav1.DeleteOptions{})
   418  		if err != nil {
   419  			return err
   420  		}
   421  		r.endpointTracker.ExpectDeletion(slice)
   422  		r.log.Tracef("finished deleting: %s/%s", slice.Namespace, slice.Name)
   423  	}
   424  
   425  	return nil
   426  }
   427  
   428  // === Utility ===
   429  
   430  // Creates a new endpointslice object
   431  func newEndpointSlice(svc *corev1.Service, meta *endpointMeta, controllerName string) *discoveryv1.EndpointSlice {
   432  	// We need an ownerRef to point to our service
   433  	ownerRef := metav1.NewControllerRef(svc, schema.GroupVersionKind{Version: "v1", Kind: "Service"})
   434  	slice := &discoveryv1.EndpointSlice{
   435  		ObjectMeta: metav1.ObjectMeta{
   436  			GenerateName:    fmt.Sprintf("linkerd-external-%s-", svc.Name),
   437  			Namespace:       svc.Namespace,
   438  			Labels:          map[string]string{},
   439  			OwnerReferences: []metav1.OwnerReference{*ownerRef},
   440  		},
   441  		AddressType: meta.addressType,
   442  		Endpoints:   []discoveryv1.Endpoint{},
   443  		Ports:       meta.ports,
   444  	}
   445  	labels, _ := setEndpointSliceLabels(slice, svc, controllerName)
   446  	slice.Labels = labels
   447  	return slice
   448  }
   449  
   450  // getSliceToFill will return an endpoint slice from a list of endpoint slices
   451  // whose capacity is closest to being full when numEndpoints are added. If no
   452  // slice fits the criteria a nil pointer is returned
   453  func getSliceToFill(slices []*discoveryv1.EndpointSlice, numEndpoints, maxEndpoints int) *discoveryv1.EndpointSlice {
   454  	closestDiff := maxEndpoints
   455  	var closestSlice *discoveryv1.EndpointSlice
   456  	for _, slice := range slices {
   457  		diff := maxEndpoints - (numEndpoints + len(slice.Endpoints))
   458  		if diff >= 0 && diff < closestDiff {
   459  			closestDiff = diff
   460  			closestSlice = slice
   461  			if closestDiff == 0 {
   462  				return closestSlice
   463  			}
   464  		}
   465  	}
   466  	return closestSlice
   467  }
   468  
   469  // setEndpointSliceLabels returns a new map with the new endpoint slice labels,
   470  // and returns true if there was an update.
   471  //
   472  // Slice labels should always be equivalent to Service labels, except for a
   473  // reserved IsHeadlessService, LabelServiceName, and LabelManagedBy. If any
   474  // reserved labels have changed on the service, they are not copied over.
   475  //
   476  // copied from https://github.com/kubernetes/endpointslice/commit/a09c1c9580d13f5020248d25c7fd11f5dde6dd9b
   477  // copyright 2019 The Kubernetes Authors
   478  func setEndpointSliceLabels(es *discoveryv1.EndpointSlice, service *corev1.Service, controllerName string) (map[string]string, bool) {
   479  	isReserved := func(label string) bool {
   480  		if label == discoveryv1.LabelServiceName ||
   481  			label == discoveryv1.LabelManagedBy ||
   482  			label == corev1.IsHeadlessService {
   483  			return true
   484  		}
   485  		return false
   486  	}
   487  
   488  	updated := false
   489  	epLabels := make(map[string]string)
   490  	svcLabels := make(map[string]string)
   491  
   492  	// check if the endpoint slice and the service have the same labels
   493  	// clone current slice labels except the reserved labels
   494  	for key, value := range es.Labels {
   495  		if isReserved(key) {
   496  			continue
   497  		}
   498  		// copy endpoint slice labels
   499  		epLabels[key] = value
   500  	}
   501  
   502  	for key, value := range service.Labels {
   503  		if isReserved(key) {
   504  			continue
   505  		}
   506  		// copy service labels
   507  		svcLabels[key] = value
   508  	}
   509  
   510  	// if the labels are not identical update the slice with the corresponding service labels
   511  	for svcLabelKey, svcLabelVal := range svcLabels {
   512  		epLabelVal, found := epLabels[svcLabelKey]
   513  		if !found {
   514  			updated = true
   515  			break
   516  		}
   517  
   518  		if svcLabelVal != epLabelVal {
   519  			updated = true
   520  			break
   521  		}
   522  	}
   523  
   524  	// add or remove headless label depending on the service Type
   525  	if service.Spec.ClusterIP == corev1.ClusterIPNone {
   526  		svcLabels[corev1.IsHeadlessService] = ""
   527  	} else {
   528  		delete(svcLabels, corev1.IsHeadlessService)
   529  	}
   530  
   531  	// override endpoint slices reserved labels
   532  	svcLabels[discoveryv1.LabelServiceName] = service.Name
   533  	svcLabels[discoveryv1.LabelManagedBy] = controllerName
   534  
   535  	return svcLabels, updated
   536  }
   537  
   538  func externalWorkloadToEndpoint(addrType discoveryv1.AddressType, ew *ewv1beta1.ExternalWorkload, svc *corev1.Service) discoveryv1.Endpoint {
   539  	// Note: an ExternalWorkload does not have the same lifecycle as a pod; we
   540  	// do not mark a workload as "Terminating". Because of that, our code is
   541  	// simpler than the upstream and we never have to consider:
   542  	// * publishNotReadyAddresses (found on a service)
   543  	// * deletionTimestamps (found normally on a pod)
   544  	// * or a terminating flag on the endpoint
   545  	serving := IsEwReady(ew)
   546  
   547  	addresses := []string{}
   548  	// We assume the workload has been validated beforehand and contains a valid
   549  	// IP address regardless of its address family.
   550  	for _, addr := range ew.Spec.WorkloadIPs {
   551  		ip := addr.Ip
   552  		isIPv6 := utilnet.IsIPv6String(ip)
   553  		if isIPv6 && addrType == discoveryv1.AddressTypeIPv6 {
   554  			addresses = append(addresses, ip)
   555  		} else if !isIPv6 && addrType == discoveryv1.AddressTypeIPv4 {
   556  			addresses = append(addresses, ip)
   557  		}
   558  	}
   559  
   560  	terminating := false
   561  	ep := discoveryv1.Endpoint{
   562  		Addresses: addresses,
   563  		Conditions: discoveryv1.EndpointConditions{
   564  			Ready:       &serving,
   565  			Serving:     &serving,
   566  			Terminating: &terminating,
   567  		},
   568  		TargetRef: &corev1.ObjectReference{
   569  			Kind:      "ExternalWorkload",
   570  			Namespace: ew.Namespace,
   571  			Name:      ew.Name,
   572  			UID:       ew.UID,
   573  		},
   574  	}
   575  
   576  	zone, ok := ew.Labels[corev1.LabelTopologyZone]
   577  	if ok {
   578  		ep.Zone = &zone
   579  	}
   580  
   581  	// Add a hostname conditionally
   582  	// Note: upstream does this a bit differently; pods may include a hostname
   583  	// as part of their spec. We consider a hostname as long as the service is
   584  	// headless since that's what we would use a hostname for when routing in
   585  	// linkerd (we care about DNS record creation)
   586  	if svc.Spec.ClusterIP == corev1.ClusterIPNone && ew.Namespace == svc.Namespace {
   587  		ep.Hostname = &ew.Name
   588  	}
   589  
   590  	return ep
   591  }
   592  
   593  func ownedBy(slice *discoveryv1.EndpointSlice, svc *corev1.Service) bool {
   594  	for _, o := range slice.OwnerReferences {
   595  		if o.UID == svc.UID && o.Kind == "Service" && o.APIVersion == "v1" {
   596  			return true
   597  		}
   598  	}
   599  	return false
   600  }
   601  
   602  // findEndpointPorts is a utility function that will return a list of ports
   603  // that are documented on an external workload and selected by a service
   604  func (r *endpointsReconciler) findEndpointPorts(svc *corev1.Service, ew *ewv1beta1.ExternalWorkload) []discoveryv1.EndpointPort {
   605  	epPorts := []discoveryv1.EndpointPort{}
   606  	// If we are dealing with a headless service, upstream implementation allows
   607  	// the service not to have any ports
   608  	if len(svc.Spec.Ports) == 0 && svc.Spec.ClusterIP == corev1.ClusterIPNone {
   609  		return epPorts
   610  	}
   611  
   612  	for _, svcPort := range svc.Spec.Ports {
   613  		svcPort := svcPort // pin
   614  		portNum, err := findWorkloadPort(ew, &svcPort)
   615  		if err != nil {
   616  			r.log.Errorf("failed to find port for service %s/%s: %v", svc.Namespace, svc.Name, err)
   617  			continue
   618  		}
   619  
   620  		portName := &svcPort.Name
   621  		if *portName == "" {
   622  			portName = nil
   623  		}
   624  		portProto := &svcPort.Protocol
   625  		if *portProto == "" {
   626  			portProto = nil
   627  		}
   628  		epPorts = append(epPorts, discoveryv1.EndpointPort{
   629  			Name:     portName,
   630  			Port:     &portNum,
   631  			Protocol: portProto,
   632  		})
   633  	}
   634  
   635  	return epPorts
   636  }
   637  
   638  // findWorkloadPort is provided a service port and an external workload and
   639  // checks whether the workload documents in its spec the target port referenced
   640  // by the service.
   641  //
   642  // adapted from copied from k8s.io/kubernetes/pkg/api/v1/pod
   643  func findWorkloadPort(ew *ewv1beta1.ExternalWorkload, svcPort *corev1.ServicePort) (int32, error) {
   644  	targetPort := svcPort.TargetPort
   645  	switch targetPort.Type {
   646  	case intstr.String:
   647  		name := targetPort.StrVal
   648  		for _, wPort := range ew.Spec.Ports {
   649  			if wPort.Name == name && wPort.Protocol == svcPort.Protocol {
   650  				return wPort.Port, nil
   651  			}
   652  		}
   653  	case intstr.Int:
   654  		// Ensure the port is documented in the workload spec, since we
   655  		// require it.
   656  		// Upstream version allows for undocumented container ports here (i.e.
   657  		// it returns the int value).
   658  		for _, wPort := range ew.Spec.Ports {
   659  			port := int32(targetPort.IntValue())
   660  			if wPort.Port == port && wPort.Protocol == svcPort.Protocol {
   661  				return port, nil
   662  			}
   663  		}
   664  	}
   665  	return 0, fmt.Errorf("no suitable port for targetPort %s on workload %s/%s", targetPort.String(), ew.Namespace, ew.Name)
   666  }
   667  
   668  // getSupportedAddressTypes will return a set of address families (AF) supported
   669  // by this service. A service may be IPv4 or IPv6 only, or it may be dual-stack.
   670  func getSupportedAddressTypes(svc *corev1.Service) map[discoveryv1.AddressType]struct{} {
   671  	afs := map[discoveryv1.AddressType]struct{}{}
   672  	// Field only applies to LoadBalancer, ClusterIP and NodePort services. A
   673  	// headless service will not receive any IP families; it may hold max 2
   674  	// entries and can be mutated (although the 'primary' choice is never
   675  	// removed).
   676  	// See client-go type documentation for more info.
   677  	for _, af := range svc.Spec.IPFamilies {
   678  		if af == corev1.IPv4Protocol {
   679  			afs[discoveryv1.AddressTypeIPv4] = struct{}{}
   680  		} else if af == corev1.IPv6Protocol {
   681  			afs[discoveryv1.AddressTypeIPv6] = struct{}{}
   682  		}
   683  	}
   684  
   685  	if len(afs) > 0 {
   686  		// If we appended at least one address family, it means we didn't have
   687  		// to deal with a headless service.
   688  		return afs
   689  	}
   690  
   691  	// Note: our logic will differ from the upstream Kubernetes controller.
   692  	// Specifically, our minimum k8s version is greater than v1.20. Upstream
   693  	// controller needs to handle an upgrade path from v1.19 to newer APIs,
   694  	// which we disregard since we can assume all services will see contain the
   695  	// `IPFamilies` field
   696  	//
   697  	// Our only other option is to have a headless service. Our ExternalWorkload
   698  	// CRD is generic over the AF used so we may create slices for both AF_INET
   699  	// and AF_INET6
   700  	afs[discoveryv1.AddressTypeIPv4] = struct{}{}
   701  	afs[discoveryv1.AddressTypeIPv6] = struct{}{}
   702  	return afs
   703  }
   704  

View as plain text