...

Source file src/github.com/linkerd/linkerd2/controller/api/destination/external-workload/controller_util.go

Documentation: github.com/linkerd/linkerd2/controller/api/destination/external-workload

     1  package externalworkload
     2  
     3  import (
     4  	"reflect"
     5  
     6  	ewv1beta1 "github.com/linkerd/linkerd2/controller/gen/apis/externalworkload/v1beta1"
     7  	discoveryv1 "k8s.io/api/discovery/v1"
     8  	"k8s.io/apimachinery/pkg/labels"
     9  	"k8s.io/apimachinery/pkg/util/sets"
    10  	"k8s.io/client-go/tools/cache"
    11  )
    12  
    13  func (ec *EndpointsController) getServicesToUpdateOnExternalWorkloadChange(old, cur interface{}) sets.Set[string] {
    14  	newEw, newEwOk := cur.(*ewv1beta1.ExternalWorkload)
    15  	oldEw, oldEwOk := old.(*ewv1beta1.ExternalWorkload)
    16  
    17  	if !oldEwOk {
    18  		ec.log.Errorf("Expected (cur) to be an EndpointSlice in getServicesToUpdateOnExternalWorkloadChange(), got type: %T", cur)
    19  		return sets.Set[string]{}
    20  	}
    21  
    22  	if !newEwOk {
    23  		ec.log.Errorf("Expected (old) to be an EndpointSlice in getServicesToUpdateOnExternalWorkloadChange(), got type: %T", old)
    24  		return sets.Set[string]{}
    25  	}
    26  
    27  	if newEw.ResourceVersion == oldEw.ResourceVersion {
    28  		// Periodic resync will send update events for all known ExternalWorkloads.
    29  		// Two different versions of the same pod will always have different RVs
    30  		return sets.Set[string]{}
    31  	}
    32  
    33  	ewChanged, labelsChanged := ewEndpointsChanged(oldEw, newEw)
    34  	if !ewChanged && !labelsChanged {
    35  		ec.log.Errorf("skipping update; nothing has changed between old rv %s and new rv %s", oldEw.ResourceVersion, newEw.ResourceVersion)
    36  		return sets.Set[string]{}
    37  	}
    38  
    39  	services, err := ec.getExternalWorkloadSvcMembership(newEw)
    40  	if err != nil {
    41  		ec.log.Errorf("unable to get pod %s/%s's service memberships: %v", newEw.Namespace, newEw.Name, err)
    42  		return sets.Set[string]{}
    43  	}
    44  
    45  	if labelsChanged {
    46  		oldServices, err := ec.getExternalWorkloadSvcMembership(oldEw)
    47  		if err != nil {
    48  			ec.log.Errorf("unable to get pod %s/%s's service memberships: %v", oldEw.Namespace, oldEw.Name, err)
    49  		}
    50  		services = determineNeededServiceUpdates(oldServices, services, ewChanged)
    51  	}
    52  
    53  	return services
    54  }
    55  
    56  func determineNeededServiceUpdates(oldServices, services sets.Set[string], specChanged bool) sets.Set[string] {
    57  	if specChanged {
    58  		// if the labels and spec changed, all services need to be updated
    59  		services = services.Union(oldServices)
    60  	} else {
    61  		// if only the labels changed, services not common to both the new
    62  		// and old service set (the disjuntive union) need to be updated
    63  		services = services.Difference(oldServices).Union(oldServices.Difference(services))
    64  	}
    65  	return services
    66  }
    67  
    68  // getExternalWorkloadSvcMembership accepts a pointer to an external workload
    69  // resource and returns a set of service keys (<namespace>/<name>). The set
    70  // includes all services local to the workload's namespace that match the workload.
    71  func (ec *EndpointsController) getExternalWorkloadSvcMembership(workload *ewv1beta1.ExternalWorkload) (sets.Set[string], error) {
    72  	keys := sets.Set[string]{}
    73  	services, err := ec.k8sAPI.Svc().Lister().Services(workload.Namespace).List(labels.Everything())
    74  	if err != nil {
    75  		return keys, err
    76  	}
    77  
    78  	for _, svc := range services {
    79  		if svc.Spec.Selector == nil {
    80  			continue
    81  		}
    82  
    83  		// Taken from upstream k8s code, this checks whether a given object has
    84  		// a deleted state before returning a `namespace/name` key. This is
    85  		// important since we do not want to consider a service that has been
    86  		// deleted and is waiting for cache eviction
    87  		key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(svc)
    88  		if err != nil {
    89  			return sets.Set[string]{}, err
    90  		}
    91  
    92  		// Check if service selects our ExternalWorkload.
    93  		if labels.ValidatedSetSelector(svc.Spec.Selector).Matches(labels.Set(workload.Labels)) {
    94  			keys.Insert(key)
    95  		}
    96  	}
    97  
    98  	return keys, nil
    99  }
   100  
   101  // getEndpointSliceFromDeleteAction parses an EndpointSlice from a delete action.
   102  func (ec *EndpointsController) getEndpointSliceFromDeleteAction(obj interface{}) *discoveryv1.EndpointSlice {
   103  	if endpointSlice, ok := obj.(*discoveryv1.EndpointSlice); ok {
   104  		// Enqueue all the services that the pod used to be a member of.
   105  		// This is the same thing we do when we add a pod.
   106  		return endpointSlice
   107  	}
   108  	// If we reached here it means the pod was deleted but its final state is unrecorded.
   109  	tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
   110  	if !ok {
   111  		ec.log.Errorf("Couldn't get object from tombstone")
   112  		return nil
   113  	}
   114  	endpointSlice, ok := tombstone.Obj.(*discoveryv1.EndpointSlice)
   115  	if !ok {
   116  		ec.log.Errorf("Tombstone contained object that is not a EndpointSlice")
   117  		return nil
   118  	}
   119  	return endpointSlice
   120  }
   121  
   122  // getExternalWorkloadFromDeleteAction parses an ExternalWorkload from a delete action.
   123  func (ec *EndpointsController) getExternalWorkloadFromDeleteAction(obj interface{}) *ewv1beta1.ExternalWorkload {
   124  	if ew, ok := obj.(*ewv1beta1.ExternalWorkload); ok {
   125  		return ew
   126  	}
   127  
   128  	// If we reached here it means the pod was deleted but its final state is unrecorded.
   129  	tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
   130  	if !ok {
   131  		ec.log.Errorf("couldn't get object from tombstone %#v", obj)
   132  		return nil
   133  	}
   134  
   135  	ew, ok := tombstone.Obj.(*ewv1beta1.ExternalWorkload)
   136  	if !ok {
   137  		ec.log.Errorf("tombstone contained object that is not a ExternalWorkload: %#v", obj)
   138  		return nil
   139  	}
   140  	return ew
   141  }
   142  
   143  // ewEndpointsChanged returns two boolean values. The first is true if the ExternalWorkload has
   144  // changed in a way that may change existing endpoints. The second value is true if the
   145  // ExternalWorkload has changed in a way that may affect which Services it matches.
   146  func ewEndpointsChanged(oldEw, newEw *ewv1beta1.ExternalWorkload) (bool, bool) {
   147  	// Check if the ExternalWorkload labels have changed, indicating a possible
   148  	// change in the service membership
   149  	labelsChanged := false
   150  	if !reflect.DeepEqual(newEw.Labels, oldEw.Labels) {
   151  		labelsChanged = true
   152  	}
   153  
   154  	// If the ExternalWorkload's deletion timestamp is set, remove endpoint from ready address.
   155  	if newEw.DeletionTimestamp != oldEw.DeletionTimestamp {
   156  		return true, labelsChanged
   157  	}
   158  	// If the ExternalWorkload's readiness has changed, the associated endpoint address
   159  	// will move from the unready endpoints set to the ready endpoints.
   160  	// So for the purposes of an endpoint, a readiness change on an ExternalWorkload
   161  	// means we have a changed ExternalWorkload.
   162  	if IsEwReady(oldEw) != IsEwReady(newEw) {
   163  		return true, labelsChanged
   164  	}
   165  
   166  	// Check if the ExternalWorkload IPs have changed
   167  	if len(oldEw.Spec.WorkloadIPs) != len(newEw.Spec.WorkloadIPs) {
   168  		return true, labelsChanged
   169  	}
   170  	for i := range oldEw.Spec.WorkloadIPs {
   171  		if oldEw.Spec.WorkloadIPs[i].Ip != newEw.Spec.WorkloadIPs[i].Ip {
   172  			return true, labelsChanged
   173  		}
   174  	}
   175  
   176  	// Check if the Ports  have changed
   177  	if len(oldEw.Spec.Ports) != len(newEw.Spec.Ports) {
   178  		return true, labelsChanged
   179  	}
   180  
   181  	// Determine if the ports have changed between workload resources
   182  	portSet := make(map[int32]ewv1beta1.PortSpec)
   183  	for _, ps := range newEw.Spec.Ports {
   184  		portSet[ps.Port] = ps
   185  	}
   186  
   187  	for _, oldPs := range oldEw.Spec.Ports {
   188  		// If the port number is present in the new workload but not the old
   189  		// one, then we have a diff and we return early
   190  		newPs, ok := portSet[oldPs.Port]
   191  		if !ok {
   192  			return true, labelsChanged
   193  		}
   194  
   195  		// If the port is present in both workloads, we check to see if any of
   196  		// the port spec's values have changed, e.g. name or protocol
   197  		if newPs.Name != oldPs.Name || newPs.Protocol != oldPs.Protocol {
   198  			return true, labelsChanged
   199  		}
   200  	}
   201  
   202  	return false, labelsChanged
   203  }
   204  
   205  func managedByController(es *discoveryv1.EndpointSlice) bool {
   206  	esManagedBy := es.Labels[discoveryv1.LabelManagedBy]
   207  	return managedBy == esManagedBy
   208  }
   209  
   210  func managedByChanged(endpointSlice1, endpointSlice2 *discoveryv1.EndpointSlice) bool {
   211  	return managedByController(endpointSlice1) != managedByController(endpointSlice2)
   212  }
   213  
   214  func IsEwReady(ew *ewv1beta1.ExternalWorkload) bool {
   215  	if len(ew.Status.Conditions) == 0 {
   216  		return false
   217  	}
   218  
   219  	// Loop through the conditions and look at each condition in turn starting
   220  	// from the top.
   221  	for i := range ew.Status.Conditions {
   222  		cond := ew.Status.Conditions[i]
   223  		// Stop once we find a 'Ready' condition. We expect a resource to only
   224  		// have one 'Ready' type condition.
   225  		if cond.Type == ewv1beta1.WorkloadReady && cond.Status == ewv1beta1.ConditionTrue {
   226  			return true
   227  		}
   228  	}
   229  
   230  	return false
   231  }
   232  

View as plain text