...

Source file src/github.com/linkerd/linkerd2/controller/api/destination/watcher/endpoints_watcher.go

Documentation: github.com/linkerd/linkerd2/controller/api/destination/watcher

     1  package watcher
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"net"
     7  	"sort"
     8  	"strconv"
     9  	"strings"
    10  	"sync"
    11  	"time"
    12  
    13  	ewv1beta1 "github.com/linkerd/linkerd2/controller/gen/apis/externalworkload/v1beta1"
    14  	"github.com/linkerd/linkerd2/controller/gen/apis/server/v1beta2"
    15  	"github.com/linkerd/linkerd2/controller/k8s"
    16  	consts "github.com/linkerd/linkerd2/pkg/k8s"
    17  	"github.com/prometheus/client_golang/prometheus"
    18  	logging "github.com/sirupsen/logrus"
    19  	corev1 "k8s.io/api/core/v1"
    20  	discovery "k8s.io/api/discovery/v1"
    21  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    22  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    23  	"k8s.io/apimachinery/pkg/labels"
    24  	"k8s.io/apimachinery/pkg/util/intstr"
    25  	"k8s.io/client-go/tools/cache"
    26  )
    27  
    28  const (
    29  	// metrics labels
    30  	service                = "service"
    31  	namespace              = "namespace"
    32  	targetCluster          = "target_cluster"
    33  	targetService          = "target_service"
    34  	targetServiceNamespace = "target_service_namespace"
    35  
    36  	opaqueProtocol = "opaque"
    37  )
    38  
    39  const endpointTargetRefPod = "Pod"
    40  const endpointTargetRefExternalWorkload = "ExternalWorkload"
    41  
    42  type (
    43  	// Address represents an individual port on a specific endpoint.
    44  	// This endpoint might be the result of a the existence of a pod
    45  	// that is targeted by this service; alternatively it can be the
    46  	// case that this endpoint is not associated with a pod and maps
    47  	// to some other IP (i.e. a remote service gateway)
    48  	Address struct {
    49  		IP                string
    50  		Port              Port
    51  		Pod               *corev1.Pod
    52  		ExternalWorkload  *ewv1beta1.ExternalWorkload
    53  		OwnerName         string
    54  		OwnerKind         string
    55  		Identity          string
    56  		AuthorityOverride string
    57  		Zone              *string
    58  		ForZones          []discovery.ForZone
    59  		OpaqueProtocol    bool
    60  	}
    61  
    62  	// AddressSet is a set of Address, indexed by ID.
    63  	// The ID can be either:
    64  	// 1) A reference to service: id.Name contains both the service name and
    65  	// the target IP and port (see newServiceRefAddress)
    66  	// 2) A reference to a pod: id.Name refers to the pod's name, and
    67  	// id.IPFamily refers to the ES AddressType (see newPodRefAddress).
    68  	// 3) A reference to an ExternalWorkload: id.Name refers to the EW's name.
    69  	AddressSet struct {
    70  		Addresses          map[ID]Address
    71  		Labels             map[string]string
    72  		LocalTrafficPolicy bool
    73  	}
    74  
    75  	portAndHostname struct {
    76  		port     Port
    77  		hostname string
    78  	}
    79  
    80  	// EndpointsWatcher watches all endpoints and services in the Kubernetes
    81  	// cluster.  Listeners can subscribe to a particular service and port and
    82  	// EndpointsWatcher will publish the address set and all future changes for
    83  	// that service:port.
    84  	EndpointsWatcher struct {
    85  		publishers  map[ServiceID]*servicePublisher
    86  		k8sAPI      *k8s.API
    87  		metadataAPI *k8s.MetadataAPI
    88  
    89  		cluster              string
    90  		log                  *logging.Entry
    91  		enableEndpointSlices bool
    92  		sync.RWMutex         // This mutex protects modification of the map itself.
    93  
    94  		informerHandlers
    95  	}
    96  
    97  	// informerHandlers holds a registration handle for each informer handler
    98  	// that has been registered for the EndpointsWatcher. The registration
    99  	// handles are used to re-deregister informer handlers when the
   100  	// EndpointsWatcher stops.
   101  	informerHandlers struct {
   102  		epHandle  cache.ResourceEventHandlerRegistration
   103  		svcHandle cache.ResourceEventHandlerRegistration
   104  		srvHandle cache.ResourceEventHandlerRegistration
   105  	}
   106  
   107  	// servicePublisher represents a service.  It keeps a map of portPublishers
   108  	// keyed by port and hostname.  This is because each watch on a service
   109  	// will have a port and optionally may specify a hostname.  The port
   110  	// and hostname will influence the endpoint set which is why a separate
   111  	// portPublisher is required for each port and hostname combination.  The
   112  	// service's port mapping will be applied to the requested port and the
   113  	// mapped port will be used in the addresses set.  If a hostname is
   114  	// requested, the address set will be filtered to only include addresses
   115  	// with the requested hostname.
   116  	servicePublisher struct {
   117  		id                   ServiceID
   118  		log                  *logging.Entry
   119  		k8sAPI               *k8s.API
   120  		metadataAPI          *k8s.MetadataAPI
   121  		enableEndpointSlices bool
   122  		localTrafficPolicy   bool
   123  		cluster              string
   124  		ports                map[portAndHostname]*portPublisher
   125  		// All access to the servicePublisher and its portPublishers is explicitly synchronized by
   126  		// this mutex.
   127  		sync.Mutex
   128  	}
   129  
   130  	// portPublisher represents a service along with a port and optionally a
   131  	// hostname.  Multiple listeners may be subscribed to a portPublisher.
   132  	// portPublisher maintains the current state of the address set and
   133  	// publishes diffs to all listeners when updates come from either the
   134  	// endpoints API or the service API.
   135  	portPublisher struct {
   136  		id                   ServiceID
   137  		targetPort           namedPort
   138  		srcPort              Port
   139  		hostname             string
   140  		log                  *logging.Entry
   141  		k8sAPI               *k8s.API
   142  		metadataAPI          *k8s.MetadataAPI
   143  		enableEndpointSlices bool
   144  		exists               bool
   145  		addresses            AddressSet
   146  		listeners            []EndpointUpdateListener
   147  		metrics              endpointsMetrics
   148  		localTrafficPolicy   bool
   149  	}
   150  
   151  	// EndpointUpdateListener is the interface that subscribers must implement.
   152  	EndpointUpdateListener interface {
   153  		Add(set AddressSet)
   154  		Remove(set AddressSet)
   155  		NoEndpoints(exists bool)
   156  	}
   157  )
   158  
   159  var endpointsVecs = newEndpointsMetricsVecs()
   160  
   161  var undefinedEndpointPort = Port(0)
   162  
   163  // shallowCopy returns a shallow copy of addr, in the sense that the Pod and
   164  // ExternalWorkload fields of the Addresses map values still point to the
   165  // locations of the original variable
   166  func (addr AddressSet) shallowCopy() AddressSet {
   167  	addresses := make(map[ID]Address)
   168  	for k, v := range addr.Addresses {
   169  		addresses[k] = v
   170  	}
   171  
   172  	labels := make(map[string]string)
   173  	for k, v := range addr.Labels {
   174  		labels[k] = v
   175  	}
   176  
   177  	return AddressSet{
   178  		Addresses:          addresses,
   179  		Labels:             labels,
   180  		LocalTrafficPolicy: addr.LocalTrafficPolicy,
   181  	}
   182  }
   183  
   184  // NewEndpointsWatcher creates an EndpointsWatcher and begins watching the
   185  // k8sAPI for pod, service, and endpoint changes. An EndpointsWatcher will
   186  // watch on Endpoints or EndpointSlice resources, depending on cluster configuration.
   187  func NewEndpointsWatcher(k8sAPI *k8s.API, metadataAPI *k8s.MetadataAPI, log *logging.Entry, enableEndpointSlices bool, cluster string) (*EndpointsWatcher, error) {
   188  	ew := &EndpointsWatcher{
   189  		publishers:           make(map[ServiceID]*servicePublisher),
   190  		k8sAPI:               k8sAPI,
   191  		metadataAPI:          metadataAPI,
   192  		enableEndpointSlices: enableEndpointSlices,
   193  		cluster:              cluster,
   194  		log: log.WithFields(logging.Fields{
   195  			"component": "endpoints-watcher",
   196  		}),
   197  	}
   198  
   199  	var err error
   200  	ew.svcHandle, err = k8sAPI.Svc().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   201  		AddFunc:    ew.addService,
   202  		DeleteFunc: ew.deleteService,
   203  		UpdateFunc: ew.updateService,
   204  	})
   205  	if err != nil {
   206  		return nil, err
   207  	}
   208  
   209  	ew.srvHandle, err = k8sAPI.Srv().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   210  		AddFunc:    ew.addServer,
   211  		DeleteFunc: ew.deleteServer,
   212  		UpdateFunc: ew.updateServer,
   213  	})
   214  	if err != nil {
   215  		return nil, err
   216  	}
   217  
   218  	if ew.enableEndpointSlices {
   219  		ew.log.Debugf("Watching EndpointSlice resources")
   220  		ew.epHandle, err = k8sAPI.ES().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   221  			AddFunc:    ew.addEndpointSlice,
   222  			DeleteFunc: ew.deleteEndpointSlice,
   223  			UpdateFunc: ew.updateEndpointSlice,
   224  		})
   225  		if err != nil {
   226  			return nil, err
   227  		}
   228  
   229  	} else {
   230  		ew.log.Debugf("Watching Endpoints resources")
   231  		ew.epHandle, err = k8sAPI.Endpoint().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   232  			AddFunc:    ew.addEndpoints,
   233  			DeleteFunc: ew.deleteEndpoints,
   234  			UpdateFunc: ew.updateEndpoints,
   235  		})
   236  		if err != nil {
   237  			return nil, err
   238  		}
   239  	}
   240  	return ew, nil
   241  }
   242  
   243  ////////////////////////
   244  /// EndpointsWatcher ///
   245  ////////////////////////
   246  
   247  // Subscribe to an authority.
   248  // The provided listener will be updated each time the address set for the
   249  // given authority is changed.
   250  func (ew *EndpointsWatcher) Subscribe(id ServiceID, port Port, hostname string, listener EndpointUpdateListener) error {
   251  	svc, _ := ew.k8sAPI.Svc().Lister().Services(id.Namespace).Get(id.Name)
   252  	if svc != nil && svc.Spec.Type == corev1.ServiceTypeExternalName {
   253  		return invalidService(id.String())
   254  	}
   255  
   256  	if hostname == "" {
   257  		ew.log.Debugf("Establishing watch on endpoint [%s:%d]", id, port)
   258  	} else {
   259  		ew.log.Debugf("Establishing watch on endpoint [%s.%s:%d]", hostname, id, port)
   260  	}
   261  
   262  	sp := ew.getOrNewServicePublisher(id)
   263  
   264  	sp.subscribe(port, hostname, listener)
   265  	return nil
   266  }
   267  
   268  // Unsubscribe removes a listener from the subscribers list for this authority.
   269  func (ew *EndpointsWatcher) Unsubscribe(id ServiceID, port Port, hostname string, listener EndpointUpdateListener) {
   270  	if hostname == "" {
   271  		ew.log.Debugf("Stopping watch on endpoint [%s:%d]", id, port)
   272  	} else {
   273  		ew.log.Debugf("Stopping watch on endpoint [%s.%s:%d]", hostname, id, port)
   274  	}
   275  
   276  	sp, ok := ew.getServicePublisher(id)
   277  	if !ok {
   278  		ew.log.Errorf("Cannot unsubscribe from unknown service [%s:%d]", id, port)
   279  		return
   280  	}
   281  	sp.unsubscribe(port, hostname, listener)
   282  }
   283  
   284  // removeHandlers will de-register any event handlers used by the
   285  // EndpointsWatcher's informers.
   286  func (ew *EndpointsWatcher) removeHandlers() {
   287  	ew.Lock()
   288  	defer ew.Unlock()
   289  	if ew.svcHandle != nil {
   290  		if err := ew.k8sAPI.Svc().Informer().RemoveEventHandler(ew.svcHandle); err != nil {
   291  			ew.log.Errorf("Failed to remove Service informer event handlers: %s", err)
   292  		}
   293  	}
   294  
   295  	if ew.srvHandle != nil {
   296  		if err := ew.k8sAPI.Srv().Informer().RemoveEventHandler(ew.srvHandle); err != nil {
   297  			ew.log.Errorf("Failed to remove Server informer event handlers: %s", err)
   298  		}
   299  	}
   300  
   301  	if ew.epHandle != nil {
   302  		if ew.enableEndpointSlices {
   303  			if err := ew.k8sAPI.ES().Informer().RemoveEventHandler(ew.epHandle); err != nil {
   304  
   305  				ew.log.Errorf("Failed to remove EndpointSlice informer event handlers: %s", err)
   306  			}
   307  		} else {
   308  			if err := ew.k8sAPI.Endpoint().Informer().RemoveEventHandler(ew.epHandle); err != nil {
   309  				ew.log.Errorf("Failed to remove Endpoints informer event handlers: %s", err)
   310  			}
   311  		}
   312  	}
   313  }
   314  
   315  func (ew *EndpointsWatcher) addService(obj interface{}) {
   316  	service := obj.(*corev1.Service)
   317  	id := ServiceID{
   318  		Namespace: service.Namespace,
   319  		Name:      service.Name,
   320  	}
   321  
   322  	sp := ew.getOrNewServicePublisher(id)
   323  
   324  	sp.updateService(service)
   325  }
   326  
   327  func (ew *EndpointsWatcher) updateService(oldObj interface{}, newObj interface{}) {
   328  	oldService := oldObj.(*corev1.Service)
   329  	newService := newObj.(*corev1.Service)
   330  
   331  	oldUpdated := latestUpdated(oldService.ManagedFields)
   332  	updated := latestUpdated(newService.ManagedFields)
   333  	if !updated.IsZero() && updated != oldUpdated {
   334  		delta := time.Since(updated)
   335  		serviceInformerLag.Observe(delta.Seconds())
   336  	}
   337  
   338  	ew.addService(newObj)
   339  }
   340  
   341  func (ew *EndpointsWatcher) deleteService(obj interface{}) {
   342  	service, ok := obj.(*corev1.Service)
   343  	if !ok {
   344  		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
   345  		if !ok {
   346  			ew.log.Errorf("couldn't get object from DeletedFinalStateUnknown %#v", obj)
   347  			return
   348  		}
   349  		service, ok = tombstone.Obj.(*corev1.Service)
   350  		if !ok {
   351  			ew.log.Errorf("DeletedFinalStateUnknown contained object that is not a Service %#v", obj)
   352  			return
   353  		}
   354  	}
   355  
   356  	id := ServiceID{
   357  		Namespace: service.Namespace,
   358  		Name:      service.Name,
   359  	}
   360  
   361  	sp, ok := ew.getServicePublisher(id)
   362  	if ok {
   363  		sp.deleteEndpoints()
   364  	}
   365  }
   366  
   367  func (ew *EndpointsWatcher) addEndpoints(obj interface{}) {
   368  	endpoints, ok := obj.(*corev1.Endpoints)
   369  	if !ok {
   370  		ew.log.Errorf("error processing endpoints resource, got %#v expected *corev1.Endpoints", obj)
   371  		return
   372  	}
   373  
   374  	id := ServiceID{Namespace: endpoints.Namespace, Name: endpoints.Name}
   375  	sp := ew.getOrNewServicePublisher(id)
   376  	sp.updateEndpoints(endpoints)
   377  }
   378  
   379  func (ew *EndpointsWatcher) updateEndpoints(oldObj interface{}, newObj interface{}) {
   380  	oldEndpoints, ok := oldObj.(*corev1.Endpoints)
   381  	if !ok {
   382  		ew.log.Errorf("error processing endpoints resource, got %#v expected *corev1.Endpoints", oldObj)
   383  		return
   384  	}
   385  	newEndpoints, ok := newObj.(*corev1.Endpoints)
   386  	if !ok {
   387  		ew.log.Errorf("error processing endpoints resource, got %#v expected *corev1.Endpoints", newObj)
   388  		return
   389  	}
   390  
   391  	oldUpdated := latestUpdated(oldEndpoints.ManagedFields)
   392  	updated := latestUpdated(newEndpoints.ManagedFields)
   393  	if !updated.IsZero() && updated != oldUpdated {
   394  		delta := time.Since(updated)
   395  		endpointsInformerLag.Observe(delta.Seconds())
   396  	}
   397  
   398  	id := ServiceID{Namespace: newEndpoints.Namespace, Name: newEndpoints.Name}
   399  	sp := ew.getOrNewServicePublisher(id)
   400  	sp.updateEndpoints(newEndpoints)
   401  }
   402  
   403  func (ew *EndpointsWatcher) deleteEndpoints(obj interface{}) {
   404  	endpoints, ok := obj.(*corev1.Endpoints)
   405  	if !ok {
   406  		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
   407  		if !ok {
   408  			ew.log.Errorf("couldn't get object from DeletedFinalStateUnknown %#v", obj)
   409  			return
   410  		}
   411  		endpoints, ok = tombstone.Obj.(*corev1.Endpoints)
   412  		if !ok {
   413  			ew.log.Errorf("DeletedFinalStateUnknown contained object that is not an Endpoints %#v", obj)
   414  			return
   415  		}
   416  	}
   417  
   418  	id := ServiceID{
   419  		Namespace: endpoints.Namespace,
   420  		Name:      endpoints.Name,
   421  	}
   422  
   423  	sp, ok := ew.getServicePublisher(id)
   424  	if ok {
   425  		sp.deleteEndpoints()
   426  	}
   427  }
   428  
   429  func (ew *EndpointsWatcher) addEndpointSlice(obj interface{}) {
   430  	newSlice, ok := obj.(*discovery.EndpointSlice)
   431  	if !ok {
   432  		ew.log.Errorf("error processing EndpointSlice resource, got %#v expected *discovery.EndpointSlice", obj)
   433  		return
   434  	}
   435  
   436  	id, err := getEndpointSliceServiceID(newSlice)
   437  	if err != nil {
   438  		ew.log.Errorf("Could not fetch resource service name:%v", err)
   439  		return
   440  	}
   441  
   442  	sp := ew.getOrNewServicePublisher(id)
   443  	sp.addEndpointSlice(newSlice)
   444  }
   445  
   446  func (ew *EndpointsWatcher) updateEndpointSlice(oldObj interface{}, newObj interface{}) {
   447  	oldSlice, ok := oldObj.(*discovery.EndpointSlice)
   448  	if !ok {
   449  		ew.log.Errorf("error processing EndpointSlice resource, got %#v expected *discovery.EndpointSlice", oldObj)
   450  		return
   451  	}
   452  	newSlice, ok := newObj.(*discovery.EndpointSlice)
   453  	if !ok {
   454  		ew.log.Errorf("error processing EndpointSlice resource, got %#v expected *discovery.EndpointSlice", newObj)
   455  		return
   456  	}
   457  	oldUpdated := latestUpdated(oldSlice.ManagedFields)
   458  	updated := latestUpdated(newSlice.ManagedFields)
   459  	if !updated.IsZero() && updated != oldUpdated {
   460  		delta := time.Since(updated)
   461  		endpointsliceInformerLag.Observe(delta.Seconds())
   462  	}
   463  
   464  	id, err := getEndpointSliceServiceID(newSlice)
   465  	if err != nil {
   466  		ew.log.Errorf("Could not fetch resource service name:%v", err)
   467  		return
   468  	}
   469  
   470  	sp, ok := ew.getServicePublisher(id)
   471  	if ok {
   472  		sp.updateEndpointSlice(oldSlice, newSlice)
   473  	}
   474  }
   475  
   476  func (ew *EndpointsWatcher) deleteEndpointSlice(obj interface{}) {
   477  	es, ok := obj.(*discovery.EndpointSlice)
   478  	if !ok {
   479  		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
   480  		if !ok {
   481  			ew.log.Errorf("couldn't get object from DeletedFinalStateUnknown %#v", obj)
   482  		}
   483  		es, ok = tombstone.Obj.(*discovery.EndpointSlice)
   484  		if !ok {
   485  			ew.log.Errorf("DeletedFinalStateUnknown contained object that is not an EndpointSlice %#v", obj)
   486  			return
   487  		}
   488  	}
   489  
   490  	id, err := getEndpointSliceServiceID(es)
   491  	if err != nil {
   492  		ew.log.Errorf("Could not fetch resource service name:%v", err)
   493  	}
   494  
   495  	sp, ok := ew.getServicePublisher(id)
   496  	if ok {
   497  		sp.deleteEndpointSlice(es)
   498  	}
   499  }
   500  
   501  // Returns the servicePublisher for the given id if it exists.  Otherwise,
   502  // create a new one and return it.
   503  func (ew *EndpointsWatcher) getOrNewServicePublisher(id ServiceID) *servicePublisher {
   504  	ew.Lock()
   505  	defer ew.Unlock()
   506  
   507  	// If the service doesn't yet exist, create a stub for it so the listener can
   508  	// be registered.
   509  	sp, ok := ew.publishers[id]
   510  	if !ok {
   511  		sp = &servicePublisher{
   512  			id: id,
   513  			log: ew.log.WithFields(logging.Fields{
   514  				"component": "service-publisher",
   515  				"ns":        id.Namespace,
   516  				"svc":       id.Name,
   517  			}),
   518  			k8sAPI:               ew.k8sAPI,
   519  			metadataAPI:          ew.metadataAPI,
   520  			cluster:              ew.cluster,
   521  			ports:                make(map[portAndHostname]*portPublisher),
   522  			enableEndpointSlices: ew.enableEndpointSlices,
   523  		}
   524  		ew.publishers[id] = sp
   525  	}
   526  	return sp
   527  }
   528  
   529  func (ew *EndpointsWatcher) getServicePublisher(id ServiceID) (sp *servicePublisher, ok bool) {
   530  	ew.RLock()
   531  	defer ew.RUnlock()
   532  	sp, ok = ew.publishers[id]
   533  	return
   534  }
   535  
   536  func (ew *EndpointsWatcher) addServer(obj interface{}) {
   537  	ew.Lock()
   538  	defer ew.Unlock()
   539  	server := obj.(*v1beta2.Server)
   540  	for _, sp := range ew.publishers {
   541  		sp.updateServer(nil, server)
   542  	}
   543  }
   544  
   545  func (ew *EndpointsWatcher) updateServer(oldObj interface{}, newObj interface{}) {
   546  	ew.Lock()
   547  	defer ew.Unlock()
   548  
   549  	oldServer := oldObj.(*v1beta2.Server)
   550  	newServer := newObj.(*v1beta2.Server)
   551  	if oldServer != nil && newServer != nil {
   552  		oldUpdated := latestUpdated(oldServer.ManagedFields)
   553  		updated := latestUpdated(newServer.ManagedFields)
   554  		if !updated.IsZero() && updated != oldUpdated {
   555  			delta := time.Since(updated)
   556  			serverInformerLag.Observe(delta.Seconds())
   557  		}
   558  	}
   559  
   560  	namespace := ""
   561  	if oldServer != nil {
   562  		namespace = oldServer.GetNamespace()
   563  	}
   564  	if newServer != nil {
   565  		namespace = newServer.GetNamespace()
   566  	}
   567  
   568  	for id, sp := range ew.publishers {
   569  		// Servers may only select workloads in their namespace.
   570  		if id.Namespace == namespace {
   571  			sp.updateServer(oldServer, newServer)
   572  		}
   573  	}
   574  }
   575  
   576  func (ew *EndpointsWatcher) deleteServer(obj interface{}) {
   577  	ew.Lock()
   578  	defer ew.Unlock()
   579  	server := obj.(*v1beta2.Server)
   580  	for _, sp := range ew.publishers {
   581  		sp.updateServer(server, nil)
   582  	}
   583  }
   584  
   585  ////////////////////////
   586  /// servicePublisher ///
   587  ////////////////////////
   588  
   589  func (sp *servicePublisher) updateEndpoints(newEndpoints *corev1.Endpoints) {
   590  	sp.Lock()
   591  	defer sp.Unlock()
   592  	sp.log.Debugf("Updating endpoints for %s", sp.id)
   593  	for _, port := range sp.ports {
   594  		port.updateEndpoints(newEndpoints)
   595  	}
   596  }
   597  
   598  func (sp *servicePublisher) deleteEndpoints() {
   599  	sp.Lock()
   600  	defer sp.Unlock()
   601  	sp.log.Debugf("Deleting endpoints for %s", sp.id)
   602  	for _, port := range sp.ports {
   603  		port.noEndpoints(false)
   604  	}
   605  }
   606  
   607  func (sp *servicePublisher) addEndpointSlice(newSlice *discovery.EndpointSlice) {
   608  	sp.Lock()
   609  	defer sp.Unlock()
   610  
   611  	sp.log.Debugf("Adding ES %s/%s", newSlice.Namespace, newSlice.Name)
   612  	for _, port := range sp.ports {
   613  		port.addEndpointSlice(newSlice)
   614  	}
   615  }
   616  
   617  func (sp *servicePublisher) updateEndpointSlice(oldSlice *discovery.EndpointSlice, newSlice *discovery.EndpointSlice) {
   618  	sp.Lock()
   619  	defer sp.Unlock()
   620  
   621  	sp.log.Debugf("Updating ES %s/%s", oldSlice.Namespace, oldSlice.Name)
   622  	for _, port := range sp.ports {
   623  		port.updateEndpointSlice(oldSlice, newSlice)
   624  	}
   625  }
   626  
   627  func (sp *servicePublisher) deleteEndpointSlice(es *discovery.EndpointSlice) {
   628  	sp.Lock()
   629  	defer sp.Unlock()
   630  
   631  	sp.log.Debugf("Deleting ES %s/%s", es.Namespace, es.Name)
   632  	for _, port := range sp.ports {
   633  		port.deleteEndpointSlice(es)
   634  	}
   635  }
   636  
   637  func (sp *servicePublisher) updateService(newService *corev1.Service) {
   638  	sp.Lock()
   639  	defer sp.Unlock()
   640  	sp.log.Debugf("Updating service for %s", sp.id)
   641  
   642  	// set localTrafficPolicy to true if InternalTrafficPolicy is set to local
   643  	if newService.Spec.InternalTrafficPolicy != nil {
   644  		sp.localTrafficPolicy = *newService.Spec.InternalTrafficPolicy == corev1.ServiceInternalTrafficPolicyLocal
   645  	} else {
   646  		sp.localTrafficPolicy = false
   647  	}
   648  
   649  	for key, port := range sp.ports {
   650  		newTargetPort := getTargetPort(newService, key.port)
   651  		if newTargetPort != port.targetPort {
   652  			port.updatePort(newTargetPort)
   653  		}
   654  		// update service endpoints with new localTrafficPolicy
   655  		if port.localTrafficPolicy != sp.localTrafficPolicy {
   656  			port.updateLocalTrafficPolicy(sp.localTrafficPolicy)
   657  		}
   658  	}
   659  
   660  }
   661  
   662  func (sp *servicePublisher) subscribe(srcPort Port, hostname string, listener EndpointUpdateListener) {
   663  	sp.Lock()
   664  	defer sp.Unlock()
   665  
   666  	key := portAndHostname{
   667  		port:     srcPort,
   668  		hostname: hostname,
   669  	}
   670  	port, ok := sp.ports[key]
   671  	if !ok {
   672  		port = sp.newPortPublisher(srcPort, hostname)
   673  		sp.ports[key] = port
   674  	}
   675  	port.subscribe(listener)
   676  }
   677  
   678  func (sp *servicePublisher) unsubscribe(srcPort Port, hostname string, listener EndpointUpdateListener) {
   679  	sp.Lock()
   680  	defer sp.Unlock()
   681  
   682  	key := portAndHostname{
   683  		port:     srcPort,
   684  		hostname: hostname,
   685  	}
   686  	port, ok := sp.ports[key]
   687  	if ok {
   688  		port.unsubscribe(listener)
   689  		if len(port.listeners) == 0 {
   690  			endpointsVecs.unregister(sp.metricsLabels(srcPort, hostname))
   691  			delete(sp.ports, key)
   692  		}
   693  	}
   694  }
   695  
   696  func (sp *servicePublisher) newPortPublisher(srcPort Port, hostname string) *portPublisher {
   697  	targetPort := intstr.FromInt(int(srcPort))
   698  	svc, err := sp.k8sAPI.Svc().Lister().Services(sp.id.Namespace).Get(sp.id.Name)
   699  	if err != nil && !apierrors.IsNotFound(err) {
   700  		sp.log.Errorf("error getting service: %s", err)
   701  	}
   702  	exists := false
   703  	if err == nil {
   704  		targetPort = getTargetPort(svc, srcPort)
   705  		exists = true
   706  	}
   707  
   708  	log := sp.log.WithField("port", srcPort)
   709  
   710  	port := &portPublisher{
   711  		listeners:            []EndpointUpdateListener{},
   712  		targetPort:           targetPort,
   713  		srcPort:              srcPort,
   714  		hostname:             hostname,
   715  		exists:               exists,
   716  		k8sAPI:               sp.k8sAPI,
   717  		metadataAPI:          sp.metadataAPI,
   718  		log:                  log,
   719  		metrics:              endpointsVecs.newEndpointsMetrics(sp.metricsLabels(srcPort, hostname)),
   720  		enableEndpointSlices: sp.enableEndpointSlices,
   721  		localTrafficPolicy:   sp.localTrafficPolicy,
   722  	}
   723  
   724  	if port.enableEndpointSlices {
   725  		matchLabels := map[string]string{discovery.LabelServiceName: sp.id.Name}
   726  		selector := labels.Set(matchLabels).AsSelector()
   727  
   728  		sliceList, err := sp.k8sAPI.ES().Lister().EndpointSlices(sp.id.Namespace).List(selector)
   729  		if err != nil && !apierrors.IsNotFound(err) {
   730  			sp.log.Errorf("error getting endpointSlice list: %s", err)
   731  		}
   732  		if err == nil {
   733  			for _, slice := range sliceList {
   734  				port.addEndpointSlice(slice)
   735  			}
   736  		}
   737  	} else {
   738  		endpoints, err := sp.k8sAPI.Endpoint().Lister().Endpoints(sp.id.Namespace).Get(sp.id.Name)
   739  		if err != nil && !apierrors.IsNotFound(err) {
   740  			sp.log.Errorf("error getting endpoints: %s", err)
   741  		}
   742  		if err == nil {
   743  			port.updateEndpoints(endpoints)
   744  		}
   745  	}
   746  
   747  	return port
   748  }
   749  
   750  func (sp *servicePublisher) metricsLabels(port Port, hostname string) prometheus.Labels {
   751  	return endpointsLabels(sp.cluster, sp.id.Namespace, sp.id.Name, strconv.Itoa(int(port)), hostname)
   752  }
   753  
   754  func (sp *servicePublisher) updateServer(oldServer, newServer *v1beta2.Server) {
   755  	sp.Lock()
   756  	defer sp.Unlock()
   757  
   758  	for _, pp := range sp.ports {
   759  		pp.updateServer(oldServer, newServer)
   760  	}
   761  }
   762  
   763  /////////////////////
   764  /// portPublisher ///
   765  /////////////////////
   766  
   767  // Note that portPublishers methods are generally NOT thread-safe.  You should
   768  // hold the parent servicePublisher's mutex before calling methods on a
   769  // portPublisher.
   770  
   771  func (pp *portPublisher) updateEndpoints(endpoints *corev1.Endpoints) {
   772  	newAddressSet := pp.endpointsToAddresses(endpoints)
   773  	if len(newAddressSet.Addresses) == 0 {
   774  		for _, listener := range pp.listeners {
   775  			listener.NoEndpoints(true)
   776  		}
   777  	} else {
   778  		add, remove := diffAddresses(pp.addresses, newAddressSet)
   779  		for _, listener := range pp.listeners {
   780  			if len(remove.Addresses) > 0 {
   781  				listener.Remove(remove)
   782  			}
   783  			if len(add.Addresses) > 0 {
   784  				listener.Add(add)
   785  			}
   786  		}
   787  	}
   788  	pp.addresses = newAddressSet
   789  	pp.exists = true
   790  	pp.metrics.incUpdates()
   791  	pp.metrics.setPods(len(pp.addresses.Addresses))
   792  	pp.metrics.setExists(true)
   793  }
   794  
   795  func (pp *portPublisher) addEndpointSlice(slice *discovery.EndpointSlice) {
   796  	newAddressSet := pp.endpointSliceToAddresses(slice)
   797  	for id, addr := range pp.addresses.Addresses {
   798  		if _, ok := newAddressSet.Addresses[id]; !ok {
   799  			newAddressSet.Addresses[id] = addr
   800  		}
   801  	}
   802  
   803  	add, _ := diffAddresses(pp.addresses, newAddressSet)
   804  	if len(add.Addresses) > 0 {
   805  		for _, listener := range pp.listeners {
   806  			listener.Add(add)
   807  		}
   808  	}
   809  
   810  	// even if the ES doesn't have addresses yet we need to create a new
   811  	// pp.addresses entry with the appropriate Labels and LocalTrafficPolicy,
   812  	// which isn't going to be captured during the ES update event when
   813  	// addresses get added
   814  
   815  	pp.addresses = newAddressSet
   816  	pp.exists = true
   817  	pp.metrics.incUpdates()
   818  	pp.metrics.setPods(len(pp.addresses.Addresses))
   819  	pp.metrics.setExists(true)
   820  }
   821  
   822  func (pp *portPublisher) updateEndpointSlice(oldSlice *discovery.EndpointSlice, newSlice *discovery.EndpointSlice) {
   823  	updatedAddressSet := AddressSet{
   824  		Addresses:          make(map[ID]Address),
   825  		Labels:             pp.addresses.Labels,
   826  		LocalTrafficPolicy: pp.localTrafficPolicy,
   827  	}
   828  
   829  	for id, address := range pp.addresses.Addresses {
   830  		updatedAddressSet.Addresses[id] = address
   831  	}
   832  
   833  	for _, id := range pp.endpointSliceToIDs(oldSlice) {
   834  		delete(updatedAddressSet.Addresses, id)
   835  	}
   836  
   837  	newAddressSet := pp.endpointSliceToAddresses(newSlice)
   838  	for id, address := range newAddressSet.Addresses {
   839  		updatedAddressSet.Addresses[id] = address
   840  	}
   841  
   842  	add, remove := diffAddresses(pp.addresses, updatedAddressSet)
   843  	for _, listener := range pp.listeners {
   844  		if len(remove.Addresses) > 0 {
   845  			listener.Remove(remove)
   846  		}
   847  		if len(add.Addresses) > 0 {
   848  			listener.Add(add)
   849  		}
   850  	}
   851  
   852  	pp.addresses = updatedAddressSet
   853  	pp.exists = true
   854  	pp.metrics.incUpdates()
   855  	pp.metrics.setPods(len(pp.addresses.Addresses))
   856  	pp.metrics.setExists(true)
   857  }
   858  
   859  func metricLabels(resource interface{}) map[string]string {
   860  	var serviceName, ns string
   861  	var resLabels, resAnnotations map[string]string
   862  	switch res := resource.(type) {
   863  	case *corev1.Endpoints:
   864  		{
   865  			serviceName, ns = res.Name, res.Namespace
   866  			resLabels, resAnnotations = res.Labels, res.Annotations
   867  		}
   868  	case *discovery.EndpointSlice:
   869  		{
   870  			serviceName, ns = res.Labels[discovery.LabelServiceName], res.Namespace
   871  			resLabels, resAnnotations = res.Labels, res.Annotations
   872  		}
   873  	}
   874  
   875  	labels := map[string]string{service: serviceName, namespace: ns}
   876  
   877  	remoteClusterName, hasRemoteClusterName := resLabels[consts.RemoteClusterNameLabel]
   878  	serviceFqn, hasServiceFqn := resAnnotations[consts.RemoteServiceFqName]
   879  
   880  	if hasRemoteClusterName {
   881  		// this means we are looking at Endpoints created for the purpose of mirroring
   882  		// an out of cluster service.
   883  		labels[targetCluster] = remoteClusterName
   884  		if hasServiceFqn {
   885  			fqParts := strings.Split(serviceFqn, ".")
   886  			if len(fqParts) >= 2 {
   887  				labels[targetService] = fqParts[0]
   888  				labels[targetServiceNamespace] = fqParts[1]
   889  			}
   890  		}
   891  	}
   892  	return labels
   893  }
   894  
   895  func (pp *portPublisher) endpointSliceToAddresses(es *discovery.EndpointSlice) AddressSet {
   896  	resolvedPort := pp.resolveESTargetPort(es.Ports)
   897  	if resolvedPort == undefinedEndpointPort {
   898  		return AddressSet{
   899  			Labels:             metricLabels(es),
   900  			Addresses:          make(map[ID]Address),
   901  			LocalTrafficPolicy: pp.localTrafficPolicy,
   902  		}
   903  	}
   904  
   905  	serviceID, err := getEndpointSliceServiceID(es)
   906  	if err != nil {
   907  		pp.log.Errorf("Could not fetch resource service name:%v", err)
   908  	}
   909  
   910  	addresses := make(map[ID]Address)
   911  	for _, endpoint := range es.Endpoints {
   912  		if endpoint.Hostname != nil {
   913  			if pp.hostname != "" && pp.hostname != *endpoint.Hostname {
   914  				continue
   915  			}
   916  		}
   917  		if endpoint.Conditions.Ready != nil && !*endpoint.Conditions.Ready {
   918  			continue
   919  		}
   920  
   921  		if endpoint.TargetRef == nil {
   922  			for _, IPAddr := range endpoint.Addresses {
   923  				var authorityOverride string
   924  				if fqName, ok := es.Annotations[consts.RemoteServiceFqName]; ok {
   925  					authorityOverride = net.JoinHostPort(fqName, fmt.Sprintf("%d", pp.srcPort))
   926  				}
   927  
   928  				identity := es.Annotations[consts.RemoteGatewayIdentity]
   929  				address, id := pp.newServiceRefAddress(resolvedPort, IPAddr, serviceID.Name, es.Namespace)
   930  				address.Identity, address.AuthorityOverride = identity, authorityOverride
   931  
   932  				if endpoint.Hints != nil {
   933  					zones := make([]discovery.ForZone, len(endpoint.Hints.ForZones))
   934  					copy(zones, endpoint.Hints.ForZones)
   935  					address.ForZones = zones
   936  				}
   937  				addresses[id] = address
   938  			}
   939  			continue
   940  		}
   941  
   942  		if endpoint.TargetRef.Kind == endpointTargetRefPod {
   943  			for _, IPAddr := range endpoint.Addresses {
   944  				address, id, err := pp.newPodRefAddress(
   945  					resolvedPort,
   946  					es.AddressType,
   947  					IPAddr,
   948  					endpoint.TargetRef.Name,
   949  					endpoint.TargetRef.Namespace,
   950  				)
   951  				if err != nil {
   952  					pp.log.Errorf("Unable to create new address:%v", err)
   953  					continue
   954  				}
   955  				err = SetToServerProtocol(pp.k8sAPI, &address)
   956  				if err != nil {
   957  					pp.log.Errorf("failed to set address OpaqueProtocol: %s", err)
   958  					continue
   959  				}
   960  
   961  				address.Zone = endpoint.Zone
   962  				if endpoint.Hints != nil {
   963  					zones := make([]discovery.ForZone, len(endpoint.Hints.ForZones))
   964  					copy(zones, endpoint.Hints.ForZones)
   965  					address.ForZones = zones
   966  				}
   967  				addresses[id] = address
   968  			}
   969  		}
   970  
   971  		if endpoint.TargetRef.Kind == endpointTargetRefExternalWorkload {
   972  			for _, IPAddr := range endpoint.Addresses {
   973  				address, id, err := pp.newExtRefAddress(resolvedPort, IPAddr, endpoint.TargetRef.Name, es.Namespace)
   974  				if err != nil {
   975  					pp.log.Errorf("Unable to create new address: %v", err)
   976  					continue
   977  				}
   978  
   979  				err = SetToServerProtocolExternalWorkload(pp.k8sAPI, &address)
   980  				if err != nil {
   981  					pp.log.Errorf("failed to set address OpaqueProtocol: %s", err)
   982  					continue
   983  				}
   984  
   985  				address.Zone = endpoint.Zone
   986  				if endpoint.Hints != nil {
   987  					zones := make([]discovery.ForZone, len(endpoint.Hints.ForZones))
   988  					copy(zones, endpoint.Hints.ForZones)
   989  					address.ForZones = zones
   990  				}
   991  
   992  				addresses[id] = address
   993  			}
   994  
   995  		}
   996  
   997  	}
   998  	return AddressSet{
   999  		Addresses:          addresses,
  1000  		Labels:             metricLabels(es),
  1001  		LocalTrafficPolicy: pp.localTrafficPolicy,
  1002  	}
  1003  }
  1004  
  1005  // endpointSliceToIDs is similar to endpointSliceToAddresses but instead returns
  1006  // only the IDs of the endpoints rather than the addresses themselves.
  1007  func (pp *portPublisher) endpointSliceToIDs(es *discovery.EndpointSlice) []ID {
  1008  	resolvedPort := pp.resolveESTargetPort(es.Ports)
  1009  	if resolvedPort == undefinedEndpointPort {
  1010  		return []ID{}
  1011  	}
  1012  
  1013  	serviceID, err := getEndpointSliceServiceID(es)
  1014  	if err != nil {
  1015  		pp.log.Errorf("Could not fetch resource service name:%v", err)
  1016  	}
  1017  
  1018  	ids := []ID{}
  1019  	for _, endpoint := range es.Endpoints {
  1020  		if endpoint.Hostname != nil {
  1021  			if pp.hostname != "" && pp.hostname != *endpoint.Hostname {
  1022  				continue
  1023  			}
  1024  		}
  1025  		if endpoint.Conditions.Ready != nil && !*endpoint.Conditions.Ready {
  1026  			continue
  1027  		}
  1028  
  1029  		if endpoint.TargetRef == nil {
  1030  			for _, IPAddr := range endpoint.Addresses {
  1031  				ids = append(ids, ServiceID{
  1032  					Name: strings.Join([]string{
  1033  						serviceID.Name,
  1034  						IPAddr,
  1035  						fmt.Sprint(resolvedPort),
  1036  					}, "-"),
  1037  					Namespace: es.Namespace,
  1038  				})
  1039  			}
  1040  			continue
  1041  		}
  1042  
  1043  		if endpoint.TargetRef.Kind == endpointTargetRefPod {
  1044  			ids = append(ids, PodID{
  1045  				Name:      endpoint.TargetRef.Name,
  1046  				Namespace: endpoint.TargetRef.Namespace,
  1047  				IPFamily:  corev1.IPFamily(es.AddressType),
  1048  			})
  1049  		} else if endpoint.TargetRef.Kind == endpointTargetRefExternalWorkload {
  1050  			ids = append(ids, ExternalWorkloadID{
  1051  				Name:      endpoint.TargetRef.Name,
  1052  				Namespace: endpoint.TargetRef.Namespace,
  1053  			})
  1054  		}
  1055  
  1056  	}
  1057  	return ids
  1058  }
  1059  
  1060  func (pp *portPublisher) endpointsToAddresses(endpoints *corev1.Endpoints) AddressSet {
  1061  	addresses := make(map[ID]Address)
  1062  	for _, subset := range endpoints.Subsets {
  1063  		resolvedPort := pp.resolveTargetPort(subset)
  1064  		if resolvedPort == undefinedEndpointPort {
  1065  			continue
  1066  		}
  1067  		for _, endpoint := range subset.Addresses {
  1068  			if pp.hostname != "" && pp.hostname != endpoint.Hostname {
  1069  				continue
  1070  			}
  1071  
  1072  			if endpoint.TargetRef == nil {
  1073  				var authorityOverride string
  1074  				if fqName, ok := endpoints.Annotations[consts.RemoteServiceFqName]; ok {
  1075  					authorityOverride = fmt.Sprintf("%s:%d", fqName, pp.srcPort)
  1076  				}
  1077  
  1078  				identity := endpoints.Annotations[consts.RemoteGatewayIdentity]
  1079  				address, id := pp.newServiceRefAddress(resolvedPort, endpoint.IP, endpoints.Name, endpoints.Namespace)
  1080  				address.Identity, address.AuthorityOverride = identity, authorityOverride
  1081  
  1082  				addresses[id] = address
  1083  				continue
  1084  			}
  1085  
  1086  			if endpoint.TargetRef.Kind == endpointTargetRefPod {
  1087  				address, id, err := pp.newPodRefAddress(
  1088  					resolvedPort,
  1089  					"",
  1090  					endpoint.IP,
  1091  					endpoint.TargetRef.Name,
  1092  					endpoint.TargetRef.Namespace,
  1093  				)
  1094  				if err != nil {
  1095  					pp.log.Errorf("Unable to create new address:%v", err)
  1096  					continue
  1097  				}
  1098  				err = SetToServerProtocol(pp.k8sAPI, &address)
  1099  				if err != nil {
  1100  					pp.log.Errorf("failed to set address OpaqueProtocol: %s", err)
  1101  					continue
  1102  				}
  1103  				addresses[id] = address
  1104  			}
  1105  		}
  1106  	}
  1107  	return AddressSet{
  1108  		Addresses: addresses,
  1109  		Labels:    metricLabels(endpoints),
  1110  	}
  1111  }
  1112  
  1113  func (pp *portPublisher) newServiceRefAddress(endpointPort Port, endpointIP, serviceName, serviceNamespace string) (Address, ServiceID) {
  1114  	id := ServiceID{
  1115  		Name: strings.Join([]string{
  1116  			serviceName,
  1117  			endpointIP,
  1118  			fmt.Sprint(endpointPort),
  1119  		}, "-"),
  1120  		Namespace: serviceNamespace,
  1121  	}
  1122  
  1123  	return Address{IP: endpointIP, Port: endpointPort}, id
  1124  }
  1125  
  1126  func (pp *portPublisher) newPodRefAddress(
  1127  	endpointPort Port,
  1128  	ipFamily discovery.AddressType,
  1129  	endpointIP,
  1130  	podName,
  1131  	podNamespace string,
  1132  ) (Address, PodID, error) {
  1133  	id := PodID{
  1134  		Name:      podName,
  1135  		Namespace: podNamespace,
  1136  		IPFamily:  corev1.IPFamily(ipFamily),
  1137  	}
  1138  	pod, err := pp.k8sAPI.Pod().Lister().Pods(id.Namespace).Get(id.Name)
  1139  	if err != nil {
  1140  		return Address{}, PodID{}, fmt.Errorf("unable to fetch pod %v: %w", id, err)
  1141  	}
  1142  	ownerKind, ownerName, err := pp.metadataAPI.GetOwnerKindAndName(context.Background(), pod, false)
  1143  	if err != nil {
  1144  		return Address{}, PodID{}, err
  1145  	}
  1146  	addr := Address{
  1147  		IP:        endpointIP,
  1148  		Port:      endpointPort,
  1149  		Pod:       pod,
  1150  		OwnerName: ownerName,
  1151  		OwnerKind: ownerKind,
  1152  	}
  1153  
  1154  	return addr, id, nil
  1155  }
  1156  
  1157  func (pp *portPublisher) newExtRefAddress(endpointPort Port, endpointIP, externalWorkloadName, externalWorkloadNamespace string) (Address, ExternalWorkloadID, error) {
  1158  	id := ExternalWorkloadID{
  1159  		Name:      externalWorkloadName,
  1160  		Namespace: externalWorkloadNamespace,
  1161  	}
  1162  
  1163  	ew, err := pp.k8sAPI.ExtWorkload().Lister().ExternalWorkloads(id.Namespace).Get(id.Name)
  1164  	if err != nil {
  1165  		return Address{}, ExternalWorkloadID{}, fmt.Errorf("unable to fetch ExternalWorkload %v: %w", id, err)
  1166  	}
  1167  
  1168  	addr := Address{
  1169  		IP:               endpointIP,
  1170  		Port:             endpointPort,
  1171  		ExternalWorkload: ew,
  1172  	}
  1173  
  1174  	ownerRefs := ew.GetOwnerReferences()
  1175  	if len(ownerRefs) == 1 {
  1176  		parent := ownerRefs[0]
  1177  		addr.OwnerName = parent.Name
  1178  		addr.OwnerName = strings.ToLower(parent.Kind)
  1179  	}
  1180  
  1181  	return addr, id, nil
  1182  }
  1183  
  1184  func (pp *portPublisher) resolveESTargetPort(slicePorts []discovery.EndpointPort) Port {
  1185  	if slicePorts == nil {
  1186  		return undefinedEndpointPort
  1187  	}
  1188  
  1189  	switch pp.targetPort.Type {
  1190  	case intstr.Int:
  1191  		return Port(pp.targetPort.IntVal)
  1192  	case intstr.String:
  1193  		for _, p := range slicePorts {
  1194  			name := ""
  1195  			if p.Name != nil {
  1196  				name = *p.Name
  1197  			}
  1198  			if name == pp.targetPort.StrVal {
  1199  				return Port(*p.Port)
  1200  			}
  1201  		}
  1202  	}
  1203  	return undefinedEndpointPort
  1204  }
  1205  
  1206  func (pp *portPublisher) resolveTargetPort(subset corev1.EndpointSubset) Port {
  1207  	switch pp.targetPort.Type {
  1208  	case intstr.Int:
  1209  		return Port(pp.targetPort.IntVal)
  1210  	case intstr.String:
  1211  		for _, p := range subset.Ports {
  1212  			if p.Name == pp.targetPort.StrVal {
  1213  				return Port(p.Port)
  1214  			}
  1215  		}
  1216  	}
  1217  	return undefinedEndpointPort
  1218  }
  1219  
  1220  func (pp *portPublisher) updateLocalTrafficPolicy(localTrafficPolicy bool) {
  1221  	pp.localTrafficPolicy = localTrafficPolicy
  1222  	pp.addresses.LocalTrafficPolicy = localTrafficPolicy
  1223  	for _, listener := range pp.listeners {
  1224  		listener.Add(pp.addresses.shallowCopy())
  1225  	}
  1226  }
  1227  
  1228  func (pp *portPublisher) updatePort(targetPort namedPort) {
  1229  	pp.targetPort = targetPort
  1230  
  1231  	if pp.enableEndpointSlices {
  1232  		matchLabels := map[string]string{discovery.LabelServiceName: pp.id.Name}
  1233  		selector := labels.Set(matchLabels).AsSelector()
  1234  
  1235  		endpointSlices, err := pp.k8sAPI.ES().Lister().EndpointSlices(pp.id.Namespace).List(selector)
  1236  		if err == nil {
  1237  			pp.addresses = AddressSet{}
  1238  			for _, slice := range endpointSlices {
  1239  				pp.addEndpointSlice(slice)
  1240  			}
  1241  		} else {
  1242  			pp.log.Errorf("Unable to get EndpointSlices during port update: %s", err)
  1243  		}
  1244  	} else {
  1245  		endpoints, err := pp.k8sAPI.Endpoint().Lister().Endpoints(pp.id.Namespace).Get(pp.id.Name)
  1246  		if err == nil {
  1247  			pp.updateEndpoints(endpoints)
  1248  		} else {
  1249  			pp.log.Errorf("Unable to get endpoints during port update: %s", err)
  1250  		}
  1251  	}
  1252  }
  1253  
  1254  func (pp *portPublisher) deleteEndpointSlice(es *discovery.EndpointSlice) {
  1255  	addrSet := pp.endpointSliceToAddresses(es)
  1256  	for id := range addrSet.Addresses {
  1257  		delete(pp.addresses.Addresses, id)
  1258  	}
  1259  
  1260  	for _, listener := range pp.listeners {
  1261  		listener.Remove(addrSet)
  1262  	}
  1263  
  1264  	if len(pp.addresses.Addresses) == 0 {
  1265  		pp.noEndpoints(false)
  1266  	} else {
  1267  		pp.exists = true
  1268  		pp.metrics.incUpdates()
  1269  		pp.metrics.setPods(len(pp.addresses.Addresses))
  1270  		pp.metrics.setExists(true)
  1271  	}
  1272  }
  1273  
  1274  func (pp *portPublisher) noEndpoints(exists bool) {
  1275  	pp.exists = exists
  1276  	pp.addresses = AddressSet{}
  1277  	for _, listener := range pp.listeners {
  1278  		listener.NoEndpoints(exists)
  1279  	}
  1280  
  1281  	pp.metrics.incUpdates()
  1282  	pp.metrics.setExists(exists)
  1283  	pp.metrics.setPods(0)
  1284  }
  1285  
  1286  func (pp *portPublisher) subscribe(listener EndpointUpdateListener) {
  1287  	if pp.exists {
  1288  		if len(pp.addresses.Addresses) > 0 {
  1289  			listener.Add(pp.addresses.shallowCopy())
  1290  		} else {
  1291  			listener.NoEndpoints(true)
  1292  		}
  1293  	} else {
  1294  		listener.NoEndpoints(false)
  1295  	}
  1296  	pp.listeners = append(pp.listeners, listener)
  1297  
  1298  	pp.metrics.setSubscribers(len(pp.listeners))
  1299  }
  1300  
  1301  func (pp *portPublisher) unsubscribe(listener EndpointUpdateListener) {
  1302  	for i, e := range pp.listeners {
  1303  		if e == listener {
  1304  			n := len(pp.listeners)
  1305  			pp.listeners[i] = pp.listeners[n-1]
  1306  			pp.listeners[n-1] = nil
  1307  			pp.listeners = pp.listeners[:n-1]
  1308  			break
  1309  		}
  1310  	}
  1311  
  1312  	pp.metrics.setSubscribers(len(pp.listeners))
  1313  }
  1314  func (pp *portPublisher) updateServer(oldServer, newServer *v1beta2.Server) {
  1315  	updated := false
  1316  	for id, address := range pp.addresses.Addresses {
  1317  
  1318  		if pp.isAddressSelected(address, oldServer) || pp.isAddressSelected(address, newServer) {
  1319  			if newServer != nil && pp.isAddressSelected(address, newServer) && newServer.Spec.ProxyProtocol == opaqueProtocol {
  1320  				address.OpaqueProtocol = true
  1321  			} else {
  1322  				address.OpaqueProtocol = false
  1323  			}
  1324  			if pp.addresses.Addresses[id].OpaqueProtocol != address.OpaqueProtocol {
  1325  				pp.addresses.Addresses[id] = address
  1326  				updated = true
  1327  			}
  1328  		}
  1329  	}
  1330  	if updated {
  1331  		for _, listener := range pp.listeners {
  1332  			listener.Add(pp.addresses.shallowCopy())
  1333  		}
  1334  		pp.metrics.incUpdates()
  1335  	}
  1336  }
  1337  
  1338  func (pp *portPublisher) isAddressSelected(address Address, server *v1beta2.Server) bool {
  1339  	if server == nil {
  1340  		return false
  1341  	}
  1342  
  1343  	if address.Pod != nil {
  1344  		selector, err := metav1.LabelSelectorAsSelector(server.Spec.PodSelector)
  1345  		if err != nil {
  1346  			pp.log.Errorf("failed to create Selector: %s", err)
  1347  			return false
  1348  		}
  1349  
  1350  		if !selector.Matches(labels.Set(address.Pod.Labels)) {
  1351  			return false
  1352  		}
  1353  
  1354  		switch server.Spec.Port.Type {
  1355  		case intstr.Int:
  1356  			if server.Spec.Port.IntVal == int32(address.Port) {
  1357  				return true
  1358  			}
  1359  		case intstr.String:
  1360  			for _, c := range address.Pod.Spec.Containers {
  1361  				for _, p := range c.Ports {
  1362  					if p.ContainerPort == int32(address.Port) && p.Name == server.Spec.Port.StrVal {
  1363  						return true
  1364  					}
  1365  				}
  1366  			}
  1367  		}
  1368  
  1369  	} else if address.ExternalWorkload != nil {
  1370  		selector, err := metav1.LabelSelectorAsSelector(server.Spec.ExternalWorkloadSelector)
  1371  		if err != nil {
  1372  			pp.log.Errorf("failed to create Selector: %s", err)
  1373  			return false
  1374  		}
  1375  
  1376  		if !selector.Matches(labels.Set(address.ExternalWorkload.Labels)) {
  1377  			return false
  1378  		}
  1379  
  1380  		switch server.Spec.Port.Type {
  1381  		case intstr.Int:
  1382  			if server.Spec.Port.IntVal == int32(address.Port) {
  1383  				return true
  1384  			}
  1385  		case intstr.String:
  1386  			for _, p := range address.ExternalWorkload.Spec.Ports {
  1387  				if p.Port == int32(address.Port) && p.Name == server.Spec.Port.StrVal {
  1388  					return true
  1389  				}
  1390  			}
  1391  		}
  1392  	}
  1393  	return false
  1394  }
  1395  
  1396  ////////////
  1397  /// util ///
  1398  ////////////
  1399  
  1400  // getTargetPort returns the port specified as an argument if no service is
  1401  // present. If the service is present and it has a port spec matching the
  1402  // specified port, it returns the name of the service's port (not the name
  1403  // of the target pod port), so that it can be looked up in the endpoints API
  1404  // response, which uses service port names.
  1405  func getTargetPort(service *corev1.Service, port Port) namedPort {
  1406  	// Use the specified port as the target port by default
  1407  	targetPort := intstr.FromInt(int(port))
  1408  
  1409  	if service == nil {
  1410  		return targetPort
  1411  	}
  1412  
  1413  	// If a port spec exists with a port matching the specified port use that
  1414  	// port spec's name as the target port
  1415  	for _, portSpec := range service.Spec.Ports {
  1416  		if portSpec.Port == int32(port) {
  1417  			return intstr.FromString(portSpec.Name)
  1418  		}
  1419  	}
  1420  
  1421  	return targetPort
  1422  }
  1423  
  1424  func addressChanged(oldAddress Address, newAddress Address) bool {
  1425  
  1426  	if oldAddress.Identity != newAddress.Identity {
  1427  		// in this case the identity could have changed; this can happen when for
  1428  		// example a mirrored service is reassigned to a new gateway with a different
  1429  		// identity and the service mirroring controller picks that and updates the
  1430  		// identity
  1431  		return true
  1432  	}
  1433  
  1434  	// If the zone hints have changed, then the address has changed
  1435  	if len(newAddress.ForZones) != len(oldAddress.ForZones) {
  1436  		return true
  1437  	}
  1438  
  1439  	// Sort the zone information so that we can compare them accurately
  1440  	// We can't use `sort.StringSlice` because these are arrays of structs and not just strings
  1441  	sort.Slice(oldAddress.ForZones, func(i, j int) bool {
  1442  		return oldAddress.ForZones[i].Name < (oldAddress.ForZones[j].Name)
  1443  	})
  1444  	sort.Slice(newAddress.ForZones, func(i, j int) bool {
  1445  		return newAddress.ForZones[i].Name < (newAddress.ForZones[j].Name)
  1446  	})
  1447  
  1448  	// Both old and new addresses have the same number of zones, so we can just compare them directly
  1449  	for k := range oldAddress.ForZones {
  1450  		if oldAddress.ForZones[k].Name != newAddress.ForZones[k].Name {
  1451  			return true
  1452  		}
  1453  	}
  1454  
  1455  	if oldAddress.Pod != nil && newAddress.Pod != nil {
  1456  		// if these addresses are owned by pods we can check the resource versions
  1457  		return oldAddress.Pod.ResourceVersion != newAddress.Pod.ResourceVersion
  1458  	}
  1459  	return false
  1460  }
  1461  
  1462  func diffAddresses(oldAddresses, newAddresses AddressSet) (add, remove AddressSet) {
  1463  	// TODO: this detects pods which have been added or removed, but does not
  1464  	// detect addresses which have been modified.  A modified address should trigger
  1465  	// an add of the new version.
  1466  	addAddresses := make(map[ID]Address)
  1467  	removeAddresses := make(map[ID]Address)
  1468  	for id, newAddress := range newAddresses.Addresses {
  1469  		if oldAddress, ok := oldAddresses.Addresses[id]; ok {
  1470  			if addressChanged(oldAddress, newAddress) {
  1471  				addAddresses[id] = newAddress
  1472  			}
  1473  		} else {
  1474  			// this is a new address, we need to add it
  1475  			addAddresses[id] = newAddress
  1476  		}
  1477  	}
  1478  	for id, address := range oldAddresses.Addresses {
  1479  		if _, ok := newAddresses.Addresses[id]; !ok {
  1480  			removeAddresses[id] = address
  1481  		}
  1482  	}
  1483  	add = AddressSet{
  1484  		Addresses:          addAddresses,
  1485  		Labels:             newAddresses.Labels,
  1486  		LocalTrafficPolicy: newAddresses.LocalTrafficPolicy,
  1487  	}
  1488  	remove = AddressSet{
  1489  		Addresses: removeAddresses,
  1490  	}
  1491  	return add, remove
  1492  }
  1493  
  1494  func getEndpointSliceServiceID(es *discovery.EndpointSlice) (ServiceID, error) {
  1495  	if !isValidSlice(es) {
  1496  		return ServiceID{}, fmt.Errorf("EndpointSlice [%s/%s] is invalid", es.Namespace, es.Name)
  1497  	}
  1498  
  1499  	if svc, ok := es.Labels[discovery.LabelServiceName]; ok {
  1500  		return ServiceID{Namespace: es.Namespace, Name: svc}, nil
  1501  	}
  1502  
  1503  	for _, ref := range es.OwnerReferences {
  1504  		if ref.Kind == "Service" && ref.Name != "" {
  1505  			return ServiceID{Namespace: es.Namespace, Name: ref.Name}, nil
  1506  		}
  1507  	}
  1508  
  1509  	return ServiceID{}, fmt.Errorf("EndpointSlice [%s/%s] is invalid", es.Namespace, es.Name)
  1510  }
  1511  
  1512  func isValidSlice(es *discovery.EndpointSlice) bool {
  1513  	serviceName, ok := es.Labels[discovery.LabelServiceName]
  1514  	if !ok && len(es.OwnerReferences) == 0 {
  1515  		return false
  1516  	} else if len(es.OwnerReferences) == 0 && serviceName == "" {
  1517  		return false
  1518  	}
  1519  
  1520  	return true
  1521  }
  1522  
  1523  // SetToServerProtocol sets the address's OpaqueProtocol field based off any
  1524  // Servers that select it and override the expected protocol.
  1525  func SetToServerProtocol(k8sAPI *k8s.API, address *Address) error {
  1526  	if address.Pod == nil {
  1527  		return fmt.Errorf("endpoint not backed by Pod: %s:%d", address.IP, address.Port)
  1528  	}
  1529  	servers, err := k8sAPI.Srv().Lister().Servers("").List(labels.Everything())
  1530  	if err != nil {
  1531  		return fmt.Errorf("failed to list Servers: %w", err)
  1532  	}
  1533  	for _, server := range servers {
  1534  		selector, err := metav1.LabelSelectorAsSelector(server.Spec.PodSelector)
  1535  		if err != nil {
  1536  			return fmt.Errorf("failed to create Selector: %w", err)
  1537  		}
  1538  		if server.Spec.ProxyProtocol == opaqueProtocol && selector.Matches(labels.Set(address.Pod.Labels)) {
  1539  			var portMatch bool
  1540  			switch server.Spec.Port.Type {
  1541  			case intstr.Int:
  1542  				if server.Spec.Port.IntVal == int32(address.Port) {
  1543  					portMatch = true
  1544  				}
  1545  			case intstr.String:
  1546  				for _, c := range address.Pod.Spec.Containers {
  1547  					for _, p := range c.Ports {
  1548  						if (p.ContainerPort == int32(address.Port) || p.HostPort == int32(address.Port)) &&
  1549  							p.Name == server.Spec.Port.StrVal {
  1550  							portMatch = true
  1551  						}
  1552  					}
  1553  				}
  1554  			default:
  1555  				continue
  1556  			}
  1557  			if portMatch {
  1558  				address.OpaqueProtocol = true
  1559  				return nil
  1560  			}
  1561  		}
  1562  	}
  1563  	return nil
  1564  }
  1565  
  1566  // setToServerProtocolExternalWorkload sets the address's OpaqueProtocol field based off any
  1567  // Servers that select it and override the expected protocol for ExternalWorkloads.
  1568  func SetToServerProtocolExternalWorkload(k8sAPI *k8s.API, address *Address) error {
  1569  	if address.ExternalWorkload == nil {
  1570  		return fmt.Errorf("endpoint not backed by ExternalWorkload: %s:%d", address.IP, address.Port)
  1571  	}
  1572  	servers, err := k8sAPI.Srv().Lister().Servers("").List(labels.Everything())
  1573  	if err != nil {
  1574  		return fmt.Errorf("failed to list Servers: %w", err)
  1575  	}
  1576  	for _, server := range servers {
  1577  		selector, err := metav1.LabelSelectorAsSelector(server.Spec.ExternalWorkloadSelector)
  1578  		if err != nil {
  1579  			return fmt.Errorf("failed to create Selector: %w", err)
  1580  		}
  1581  		if server.Spec.ProxyProtocol == opaqueProtocol && selector.Matches(labels.Set(address.ExternalWorkload.Labels)) {
  1582  			var portMatch bool
  1583  			switch server.Spec.Port.Type {
  1584  			case intstr.Int:
  1585  				if server.Spec.Port.IntVal == int32(address.Port) {
  1586  					portMatch = true
  1587  				}
  1588  			case intstr.String:
  1589  				for _, p := range address.ExternalWorkload.Spec.Ports {
  1590  					if p.Port == int32(address.Port) && p.Name == server.Spec.Port.StrVal {
  1591  						portMatch = true
  1592  					}
  1593  
  1594  				}
  1595  			default:
  1596  				continue
  1597  			}
  1598  			if portMatch {
  1599  				address.OpaqueProtocol = true
  1600  				return nil
  1601  			}
  1602  		}
  1603  	}
  1604  	return nil
  1605  }
  1606  
  1607  func latestUpdated(managedFields []metav1.ManagedFieldsEntry) time.Time {
  1608  	var latest time.Time
  1609  	for _, field := range managedFields {
  1610  		if field.Operation == metav1.ManagedFieldsOperationUpdate {
  1611  			if latest.IsZero() || field.Time.After(latest) {
  1612  				latest = field.Time.Time
  1613  			}
  1614  		}
  1615  	}
  1616  	return latest
  1617  }
  1618  

View as plain text