...

Source file src/github.com/linkerd/linkerd2/controller/api/destination/watcher/workload_watcher.go

Documentation: github.com/linkerd/linkerd2/controller/api/destination/watcher

     1  package watcher
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"net"
     7  	"strconv"
     8  	"strings"
     9  	"sync"
    10  	"time"
    11  
    12  	ext "github.com/linkerd/linkerd2/controller/gen/apis/externalworkload/v1beta1"
    13  	"github.com/linkerd/linkerd2/controller/gen/apis/server/v1beta2"
    14  	"github.com/linkerd/linkerd2/controller/k8s"
    15  	consts "github.com/linkerd/linkerd2/pkg/k8s"
    16  	"github.com/linkerd/linkerd2/pkg/util"
    17  	"github.com/prometheus/client_golang/prometheus"
    18  	logging "github.com/sirupsen/logrus"
    19  	"google.golang.org/grpc/codes"
    20  	"google.golang.org/grpc/status"
    21  	corev1 "k8s.io/api/core/v1"
    22  	discovery "k8s.io/api/discovery/v1"
    23  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    24  	"k8s.io/apimachinery/pkg/labels"
    25  	"k8s.io/client-go/tools/cache"
    26  )
    27  
    28  type (
    29  	// WorkloadWatcher watches all pods and externalworkloads in the cluster.
    30  	// It keeps a map of publishers keyed by IP and port.
    31  	WorkloadWatcher struct {
    32  		defaultOpaquePorts   map[uint32]struct{}
    33  		k8sAPI               *k8s.API
    34  		metadataAPI          *k8s.MetadataAPI
    35  		publishers           map[IPPort]*workloadPublisher
    36  		log                  *logging.Entry
    37  		enableEndpointSlices bool
    38  
    39  		mu sync.RWMutex
    40  	}
    41  
    42  	// workloadPublisher represents an address including ip:port, the backing
    43  	// pod or externalworkload (if any), and if the protocol is opaque. It keeps
    44  	// a list of listeners to be notified whenever the workload or the
    45  	// associated opaque protocol config changes.
    46  	workloadPublisher struct {
    47  		defaultOpaquePorts map[uint32]struct{}
    48  		k8sAPI             *k8s.API
    49  		metadataAPI        *k8s.MetadataAPI
    50  		addr               Address
    51  		listeners          []WorkloadUpdateListener
    52  		metrics            metrics
    53  		log                *logging.Entry
    54  
    55  		mu sync.RWMutex
    56  	}
    57  
    58  	// PodUpdateListener is the interface subscribers must implement.
    59  	WorkloadUpdateListener interface {
    60  		Update(*Address) error
    61  	}
    62  )
    63  
    64  var ipPortVecs = newMetricsVecs("ip_port", []string{"ip", "port"})
    65  
    66  func NewWorkloadWatcher(k8sAPI *k8s.API, metadataAPI *k8s.MetadataAPI, log *logging.Entry, enableEndpointSlices bool, defaultOpaquePorts map[uint32]struct{}) (*WorkloadWatcher, error) {
    67  	ww := &WorkloadWatcher{
    68  		defaultOpaquePorts: defaultOpaquePorts,
    69  		k8sAPI:             k8sAPI,
    70  		metadataAPI:        metadataAPI,
    71  		publishers:         make(map[IPPort]*workloadPublisher),
    72  		log: log.WithFields(logging.Fields{
    73  			"component": "workload-watcher",
    74  		}),
    75  		enableEndpointSlices: enableEndpointSlices,
    76  	}
    77  
    78  	_, err := k8sAPI.Pod().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    79  		AddFunc:    ww.addPod,
    80  		DeleteFunc: ww.deletePod,
    81  		UpdateFunc: ww.updatePod,
    82  	})
    83  	if err != nil {
    84  		return nil, err
    85  	}
    86  
    87  	_, err = k8sAPI.ExtWorkload().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    88  		AddFunc:    ww.addExternalWorkload,
    89  		DeleteFunc: ww.deleteExternalWorkload,
    90  		UpdateFunc: ww.updateExternalWorkload,
    91  	})
    92  	if err != nil {
    93  		return nil, err
    94  	}
    95  
    96  	_, err = k8sAPI.Srv().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    97  		AddFunc:    ww.addOrDeleteServer,
    98  		DeleteFunc: ww.addOrDeleteServer,
    99  		UpdateFunc: ww.updateServer,
   100  	})
   101  	if err != nil {
   102  		return nil, err
   103  	}
   104  
   105  	return ww, nil
   106  }
   107  
   108  // Subscribe notifies the listener on changes on any workload backing the passed
   109  // host/ip:port or changes to its associated opaque protocol config. If service
   110  // and hostname are empty then ip should be set and vice-versa. If ip is empty,
   111  // the corresponding ip is found for the given service/hostname, and returned.
   112  func (ww *WorkloadWatcher) Subscribe(service *ServiceID, hostname, ip string, port Port, listener WorkloadUpdateListener) (string, error) {
   113  	if hostname != "" {
   114  		ww.log.Debugf("Establishing watch on workload %s.%s.%s:%d", hostname, service.Name, service.Namespace, port)
   115  	} else if service != nil {
   116  		ww.log.Debugf("Establishing watch on workload %s.%s:%d", service.Name, service.Namespace, port)
   117  	} else {
   118  		ww.log.Debugf("Establishing watch on workload %s:%d", ip, port)
   119  	}
   120  	wp, err := ww.getOrNewWorkloadPublisher(service, hostname, ip, port)
   121  	if err != nil {
   122  		return "", err
   123  	}
   124  
   125  	if err = wp.subscribe(listener); err != nil {
   126  		return "", err
   127  	}
   128  
   129  	return wp.addr.IP, nil
   130  }
   131  
   132  // Subscribe stops notifying the listener on chages on any pod backing the
   133  // passed ip:port or its associated protocol config
   134  func (ww *WorkloadWatcher) Unsubscribe(ip string, port Port, listener WorkloadUpdateListener) {
   135  	ww.mu.Lock()
   136  	defer ww.mu.Unlock()
   137  
   138  	ww.log.Debugf("Stopping watch on %s:%d", ip, port)
   139  	wp, ok := ww.getWorkloadPublisher(ip, port)
   140  	if !ok {
   141  		ww.log.Errorf("Cannot unsubscribe from unknown ip:port [%s:%d]", ip, port)
   142  		return
   143  	}
   144  	wp.unsubscribe(listener)
   145  
   146  	if len(wp.listeners) == 0 {
   147  		delete(ww.publishers, IPPort{wp.addr.IP, wp.addr.Port})
   148  	}
   149  }
   150  
   151  // addPod is an event handler so it cannot block
   152  func (ww *WorkloadWatcher) addPod(obj any) {
   153  	pod := obj.(*corev1.Pod)
   154  	ww.log.Tracef("Added pod %s.%s", pod.Name, pod.Namespace)
   155  	go ww.submitPodUpdate(pod, false)
   156  }
   157  
   158  // deletePod is an event handler so it cannot block
   159  func (ww *WorkloadWatcher) deletePod(obj any) {
   160  	pod, ok := obj.(*corev1.Pod)
   161  	if !ok {
   162  		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
   163  		if !ok {
   164  			ww.log.Errorf("Couldn't get object from DeletedFinalStateUnknown %#v", obj)
   165  			return
   166  		}
   167  		pod, ok = tombstone.Obj.(*corev1.Pod)
   168  		if !ok {
   169  			ww.log.Errorf("DeletedFinalStateUnknown contained object that is not a Pod %#v", obj)
   170  			return
   171  		}
   172  	}
   173  	ww.log.Tracef("Deleted pod %s.%s", pod.Name, pod.Namespace)
   174  	go ww.submitPodUpdate(pod, true)
   175  }
   176  
   177  // updatePod is an event handler so it cannot block
   178  func (ww *WorkloadWatcher) updatePod(oldObj any, newObj any) {
   179  	oldPod := oldObj.(*corev1.Pod)
   180  	newPod := newObj.(*corev1.Pod)
   181  	if oldPod.DeletionTimestamp == nil && newPod.DeletionTimestamp != nil {
   182  		// this is just a mark, wait for actual deletion event
   183  		return
   184  	}
   185  
   186  	oldUpdated := latestUpdated(oldPod.ManagedFields)
   187  	updated := latestUpdated(newPod.ManagedFields)
   188  	if !updated.IsZero() && updated != oldUpdated {
   189  		delta := time.Since(updated)
   190  		podInformerLag.Observe(delta.Seconds())
   191  	}
   192  
   193  	ww.log.Tracef("Updated pod %s.%s", newPod.Name, newPod.Namespace)
   194  	go ww.submitPodUpdate(newPod, false)
   195  }
   196  
   197  // addExternalWorkload is an event handler so it cannot block
   198  func (ww *WorkloadWatcher) addExternalWorkload(obj any) {
   199  	externalWorkload := obj.(*ext.ExternalWorkload)
   200  	ww.log.Tracef("Added externalworkload %s.%s", externalWorkload.Name, externalWorkload.Namespace)
   201  	go ww.submitExternalWorkloadUpdate(externalWorkload, false)
   202  }
   203  
   204  // deleteExternalWorkload is an event handler so it cannot block
   205  func (ww *WorkloadWatcher) deleteExternalWorkload(obj any) {
   206  	externalWorkload, ok := obj.(*ext.ExternalWorkload)
   207  	if !ok {
   208  		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
   209  		if !ok {
   210  			ww.log.Errorf("Couldn't get object from DeletedFinalStateUnknown %#v", obj)
   211  			return
   212  		}
   213  		externalWorkload, ok = tombstone.Obj.(*ext.ExternalWorkload)
   214  		if !ok {
   215  			ww.log.Errorf("DeletedFinalStateUnknown contained object that is not an ExternalWorkload %#v", obj)
   216  			return
   217  		}
   218  	}
   219  	ww.log.Tracef("Deleted externalworklod %s.%s", externalWorkload.Name, externalWorkload.Namespace)
   220  	go ww.submitExternalWorkloadUpdate(externalWorkload, true)
   221  }
   222  
   223  // updateExternalWorkload is an event handler so it cannot block
   224  func (ww *WorkloadWatcher) updateExternalWorkload(oldObj any, newObj any) {
   225  	oldExternalWorkload := oldObj.(*ext.ExternalWorkload)
   226  	newExternalWorkload := newObj.(*ext.ExternalWorkload)
   227  	if oldExternalWorkload.DeletionTimestamp == nil && newExternalWorkload.DeletionTimestamp != nil {
   228  		// this is just a mark, wait for actual deletion event
   229  		return
   230  	}
   231  
   232  	oldUpdated := latestUpdated(oldExternalWorkload.ManagedFields)
   233  	updated := latestUpdated(newExternalWorkload.ManagedFields)
   234  	if !updated.IsZero() && updated != oldUpdated {
   235  		delta := time.Since(updated)
   236  		externalWorkloadInformerLag.Observe(delta.Seconds())
   237  	}
   238  
   239  	ww.log.Tracef("Updated pod %s.%s", newExternalWorkload.Name, newExternalWorkload.Namespace)
   240  	go ww.submitExternalWorkloadUpdate(newExternalWorkload, false)
   241  }
   242  
   243  func (ww *WorkloadWatcher) submitPodUpdate(pod *corev1.Pod, remove bool) {
   244  	ww.mu.RLock()
   245  	defer ww.mu.RUnlock()
   246  
   247  	submitPod := pod
   248  	if remove {
   249  		submitPod = nil
   250  	}
   251  
   252  	for _, container := range pod.Spec.Containers {
   253  		for _, containerPort := range container.Ports {
   254  			if containerPort.ContainerPort != 0 {
   255  				for _, pip := range pod.Status.PodIPs {
   256  					if wp, ok := ww.getWorkloadPublisher(pip.IP, Port(containerPort.ContainerPort)); ok {
   257  						wp.updatePod(submitPod)
   258  					}
   259  				}
   260  				if len(pod.Status.PodIPs) == 0 && pod.Status.PodIP != "" {
   261  					if wp, ok := ww.getWorkloadPublisher(pod.Status.PodIP, Port(containerPort.ContainerPort)); ok {
   262  						wp.updatePod(submitPod)
   263  					}
   264  				}
   265  			}
   266  
   267  			if containerPort.HostPort != 0 {
   268  				for _, hip := range pod.Status.HostIPs {
   269  					if pp, ok := ww.getWorkloadPublisher(hip.IP, Port(containerPort.HostPort)); ok {
   270  						pp.updatePod(submitPod)
   271  					}
   272  				}
   273  				if len(pod.Status.HostIPs) == 0 && pod.Status.HostIP != "" {
   274  					if pp, ok := ww.getWorkloadPublisher(pod.Status.HostIP, Port(containerPort.HostPort)); ok {
   275  						pp.updatePod(submitPod)
   276  					}
   277  				}
   278  			}
   279  		}
   280  	}
   281  }
   282  
   283  func (ww *WorkloadWatcher) submitExternalWorkloadUpdate(externalWorkload *ext.ExternalWorkload, remove bool) {
   284  	ww.mu.RLock()
   285  	defer ww.mu.RUnlock()
   286  
   287  	submitWorkload := externalWorkload
   288  	if remove {
   289  		submitWorkload = nil
   290  	}
   291  
   292  	for _, port := range externalWorkload.Spec.Ports {
   293  		for _, ip := range externalWorkload.Spec.WorkloadIPs {
   294  			if wp, ok := ww.getWorkloadPublisher(ip.Ip, Port(port.Port)); ok {
   295  				wp.updateExternalWorkload(submitWorkload)
   296  			}
   297  		}
   298  	}
   299  }
   300  
   301  func (ww *WorkloadWatcher) updateServer(oldObj interface{}, newObj interface{}) {
   302  	oldServer := oldObj.(*v1beta2.Server)
   303  	newServer := newObj.(*v1beta2.Server)
   304  
   305  	oldUpdated := latestUpdated(oldServer.ManagedFields)
   306  	updated := latestUpdated(newServer.ManagedFields)
   307  
   308  	if !updated.IsZero() && updated != oldUpdated {
   309  		delta := time.Since(updated)
   310  		serverInformerLag.Observe(delta.Seconds())
   311  	}
   312  
   313  	ww.updateServers(oldServer, newServer)
   314  }
   315  
   316  func (ww *WorkloadWatcher) addOrDeleteServer(obj interface{}) {
   317  	server, ok := obj.(*v1beta2.Server)
   318  	if !ok {
   319  		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
   320  		if !ok {
   321  			ww.log.Errorf("Couldn't get object from DeletedFinalStateUnknown %#v", obj)
   322  			return
   323  		}
   324  		server, ok = tombstone.Obj.(*v1beta2.Server)
   325  		if !ok {
   326  			ww.log.Errorf("DeletedFinalStateUnknown contained object that is not a Server %#v", obj)
   327  			return
   328  		}
   329  	}
   330  	ww.updateServers(server)
   331  }
   332  
   333  // updateServers triggers an Update() call to the listeners of the workloadPublishers
   334  // whose pod matches the any of the Servers' podSelector or whose
   335  // externalworkload matches any of the Servers' externalworkload selection. This
   336  // function is an event handler so it cannot block.
   337  func (ww *WorkloadWatcher) updateServers(servers ...*v1beta2.Server) {
   338  	ww.mu.RLock()
   339  	defer ww.mu.RUnlock()
   340  
   341  	for _, wp := range ww.publishers {
   342  		var opaquePorts map[uint32]struct{}
   343  		if wp.addr.Pod != nil {
   344  			if !ww.isPodSelectedByAny(wp.addr.Pod, servers...) {
   345  				continue
   346  			}
   347  			opaquePorts = GetAnnotatedOpaquePorts(wp.addr.Pod, ww.defaultOpaquePorts)
   348  		} else if wp.addr.ExternalWorkload != nil {
   349  			if !ww.isExternalWorkloadSelectedByAny(wp.addr.ExternalWorkload, servers...) {
   350  				continue
   351  			}
   352  			opaquePorts = GetAnnotatedOpaquePortsForExternalWorkload(wp.addr.ExternalWorkload, ww.defaultOpaquePorts)
   353  		} else {
   354  			continue
   355  		}
   356  
   357  		_, annotatedOpaque := opaquePorts[wp.addr.Port]
   358  		// if port is annotated to be always opaque we can disregard Server updates
   359  		if annotatedOpaque {
   360  			continue
   361  		}
   362  
   363  		opaque := wp.addr.OpaqueProtocol
   364  		name := net.JoinHostPort(wp.addr.IP, fmt.Sprintf("%d", wp.addr.Port))
   365  		if wp.addr.Pod != nil {
   366  			name = wp.addr.Pod.GetName()
   367  		} else if wp.addr.ExternalWorkload != nil {
   368  			name = wp.addr.ExternalWorkload.GetName()
   369  		}
   370  		if err := SetToServerProtocol(wp.k8sAPI, &wp.addr); err != nil {
   371  			wp.log.Errorf("Error computing opaque protocol for %s: %q", name, err)
   372  		}
   373  		if wp.addr.OpaqueProtocol == opaque {
   374  			// OpaqueProtocol has not changed. No need to update the listeners.
   375  			continue
   376  		}
   377  
   378  		go func(wp *workloadPublisher) {
   379  			wp.mu.RLock()
   380  			defer wp.mu.RUnlock()
   381  
   382  			for _, listener := range wp.listeners {
   383  				if err := listener.Update(&wp.addr); err != nil {
   384  					ww.log.Warnf("Error sending update to listener: %s", err)
   385  					continue
   386  				}
   387  			}
   388  			wp.metrics.incUpdates()
   389  		}(wp)
   390  	}
   391  }
   392  
   393  func (ww *WorkloadWatcher) isPodSelectedByAny(pod *corev1.Pod, servers ...*v1beta2.Server) bool {
   394  	for _, s := range servers {
   395  		selector, err := metav1.LabelSelectorAsSelector(s.Spec.PodSelector)
   396  		if err != nil {
   397  			ww.log.Errorf("failed to parse PodSelector of Server %s.%s: %q", s.GetName(), s.GetNamespace(), err)
   398  			continue
   399  		}
   400  		if selector.Matches(labels.Set(pod.Labels)) {
   401  			return true
   402  		}
   403  	}
   404  	return false
   405  }
   406  
   407  func (ww *WorkloadWatcher) isExternalWorkloadSelectedByAny(ew *ext.ExternalWorkload, servers ...*v1beta2.Server) bool {
   408  	for _, s := range servers {
   409  		selector, err := metav1.LabelSelectorAsSelector(s.Spec.ExternalWorkloadSelector)
   410  		if err != nil {
   411  			ww.log.Errorf("failed to parse ExternalWorkloadSelector of Server %s.%s: %q", s.GetName(), s.GetNamespace(), err)
   412  			continue
   413  		}
   414  		if selector.Matches(labels.Set(ew.Labels)) {
   415  			return true
   416  		}
   417  	}
   418  	return false
   419  }
   420  
   421  // getOrNewWorkloadPublisher returns the workloadPublisher for the given target if it
   422  // exists. Otherwise, it creates a new one and returns it.
   423  func (ww *WorkloadWatcher) getOrNewWorkloadPublisher(service *ServiceID, hostname, ip string, port Port) (*workloadPublisher, error) {
   424  	ww.mu.Lock()
   425  	defer ww.mu.Unlock()
   426  
   427  	var pod *corev1.Pod
   428  	var externalWorkload *ext.ExternalWorkload
   429  	var err error
   430  	if hostname != "" {
   431  		pod, err = ww.getEndpointByHostname(hostname, service)
   432  		if err != nil {
   433  			return nil, err
   434  		}
   435  		ip = pod.Status.PodIP
   436  	} else {
   437  		pod, err = ww.getPodByPodIP(ip, port)
   438  		if err != nil {
   439  			return nil, err
   440  		}
   441  		if pod == nil {
   442  			pod, err = ww.getPodByHostIP(ip, port)
   443  			if err != nil {
   444  				return nil, err
   445  			}
   446  		}
   447  		if pod == nil {
   448  			externalWorkload, err = ww.getExternalWorkloadByIP(ip, port)
   449  			if err != nil {
   450  				return nil, err
   451  			}
   452  		}
   453  	}
   454  
   455  	ipPort := IPPort{ip, port}
   456  	wp, ok := ww.publishers[ipPort]
   457  	if !ok {
   458  		wp = &workloadPublisher{
   459  			defaultOpaquePorts: ww.defaultOpaquePorts,
   460  			k8sAPI:             ww.k8sAPI,
   461  			metadataAPI:        ww.metadataAPI,
   462  			addr: Address{
   463  				IP:   ip,
   464  				Port: port,
   465  			},
   466  			metrics: ipPortVecs.newMetrics(prometheus.Labels{
   467  				"ip":   ip,
   468  				"port": strconv.FormatUint(uint64(port), 10),
   469  			}),
   470  			log: ww.log.WithFields(logging.Fields{
   471  				"component": "workload-publisher",
   472  				"ip":        ip,
   473  				"port":      port,
   474  			}),
   475  		}
   476  		if pod != nil {
   477  			wp.updatePod(pod)
   478  		}
   479  		if externalWorkload != nil {
   480  			wp.updateExternalWorkload(externalWorkload)
   481  		}
   482  		ww.publishers[ipPort] = wp
   483  	}
   484  	return wp, nil
   485  }
   486  
   487  func (ww *WorkloadWatcher) getWorkloadPublisher(ip string, port Port) (wp *workloadPublisher, ok bool) {
   488  	ipPort := IPPort{ip, port}
   489  	wp, ok = ww.publishers[ipPort]
   490  	return
   491  }
   492  
   493  // getPodByPodIP returns a pod that maps to the given IP address in the pod network
   494  func (ww *WorkloadWatcher) getPodByPodIP(podIP string, port uint32) (*corev1.Pod, error) {
   495  	podIPPods, err := getIndexedPods(ww.k8sAPI, PodIPIndex, podIP)
   496  	if err != nil {
   497  		return nil, status.Error(codes.Unknown, err.Error())
   498  	}
   499  	if len(podIPPods) == 1 {
   500  		ww.log.Debugf("found %s on the pod network", podIP)
   501  		return podIPPods[0], nil
   502  	}
   503  	if len(podIPPods) > 1 {
   504  		conflictingPods := []string{}
   505  		for _, pod := range podIPPods {
   506  			conflictingPods = append(conflictingPods, fmt.Sprintf("%s:%s", pod.Namespace, pod.Name))
   507  		}
   508  		ww.log.Warnf("found conflicting %s IP on the pod network: %s", podIP, strings.Join(conflictingPods, ","))
   509  		return nil, status.Errorf(codes.FailedPrecondition, "found %d pods with a conflicting pod network IP %s", len(podIPPods), podIP)
   510  	}
   511  
   512  	ww.log.Debugf("no pod found for %s:%d", podIP, port)
   513  	return nil, nil
   514  }
   515  
   516  // getPodByHostIP returns a pod that maps to the given IP address in the host
   517  // network. It must have a container port that exposes `port` as a host port.
   518  func (ww *WorkloadWatcher) getPodByHostIP(hostIP string, port uint32) (*corev1.Pod, error) {
   519  	addr := net.JoinHostPort(hostIP, fmt.Sprintf("%d", port))
   520  	hostIPPods, err := getIndexedPods(ww.k8sAPI, HostIPIndex, addr)
   521  	if err != nil {
   522  		return nil, status.Error(codes.Unknown, err.Error())
   523  	}
   524  	if len(hostIPPods) == 1 {
   525  		ww.log.Debugf("found %s:%d on the host network", hostIP, port)
   526  		return hostIPPods[0], nil
   527  	}
   528  	if len(hostIPPods) > 1 {
   529  		conflictingPods := []string{}
   530  		for _, pod := range hostIPPods {
   531  			conflictingPods = append(conflictingPods, fmt.Sprintf("%s:%s", pod.Namespace, pod.Name))
   532  		}
   533  		ww.log.Warnf("found conflicting %s:%d endpoint on the host network: %s", hostIP, port, strings.Join(conflictingPods, ","))
   534  		return nil, status.Errorf(codes.FailedPrecondition, "found %d pods with a conflicting host network endpoint %s:%d", len(hostIPPods), hostIP, port)
   535  	}
   536  
   537  	return nil, nil
   538  }
   539  
   540  // getExternalWorkloadByIP returns an externalworkload with the given IP
   541  // address.
   542  func (ww *WorkloadWatcher) getExternalWorkloadByIP(ip string, port uint32) (*ext.ExternalWorkload, error) {
   543  	addr := net.JoinHostPort(ip, fmt.Sprintf("%d", port))
   544  	workloads, err := getIndexedExternalWorkloads(ww.k8sAPI, ExternalWorkloadIPIndex, addr)
   545  	if err != nil {
   546  		return nil, status.Error(codes.Unknown, err.Error())
   547  	}
   548  	if len(workloads) == 0 {
   549  		ww.log.Debugf("no externalworkload found for %s:%d", ip, port)
   550  		return nil, nil
   551  	}
   552  	if len(workloads) == 1 {
   553  		ww.log.Debugf("found externalworkload %s:%d", ip, port)
   554  		return workloads[0], nil
   555  	}
   556  	if len(workloads) > 1 {
   557  		conflictingWorkloads := []string{}
   558  		for _, ew := range workloads {
   559  			conflictingWorkloads = append(conflictingWorkloads, fmt.Sprintf("%s:%s", ew.Namespace, ew.Name))
   560  		}
   561  		ww.log.Warnf("found conflicting %s:%d externalworkload: %s", ip, port, strings.Join(conflictingWorkloads, ","))
   562  		return nil, status.Errorf(codes.FailedPrecondition, "found %d externalworkloads with a conflicting ip %s:%d", len(workloads), ip, port)
   563  	}
   564  
   565  	return nil, nil
   566  }
   567  
   568  // getEndpointByHostname returns a pod that maps to the given hostname (or an
   569  // instanceID). The hostname is generally the prefix of the pod's DNS name;
   570  // since it may be arbitrary we need to look at the corresponding service's
   571  // Endpoints object to see whether the hostname matches a pod.
   572  func (ww *WorkloadWatcher) getEndpointByHostname(hostname string, svcID *ServiceID) (*corev1.Pod, error) {
   573  	if ww.enableEndpointSlices {
   574  		matchLabels := map[string]string{discovery.LabelServiceName: svcID.Name}
   575  		selector := labels.Set(matchLabels).AsSelector()
   576  
   577  		sliceList, err := ww.k8sAPI.ES().Lister().EndpointSlices(svcID.Namespace).List(selector)
   578  		if err != nil {
   579  			return nil, err
   580  		}
   581  		for _, slice := range sliceList {
   582  			for _, ep := range slice.Endpoints {
   583  				if hostname == *ep.Hostname {
   584  					if ep.TargetRef != nil && ep.TargetRef.Kind == "Pod" {
   585  						podName := ep.TargetRef.Name
   586  						podNamespace := ep.TargetRef.Namespace
   587  						pod, err := ww.k8sAPI.Pod().Lister().Pods(podNamespace).Get(podName)
   588  						if err != nil {
   589  							return nil, err
   590  						}
   591  						return pod, nil
   592  					}
   593  					return nil, nil
   594  				}
   595  			}
   596  		}
   597  
   598  		return nil, status.Errorf(codes.NotFound, "no pod found in EndpointSlices of Service %s/%s for hostname %s", svcID.Namespace, svcID.Name, hostname)
   599  	}
   600  
   601  	ep, err := ww.k8sAPI.Endpoint().Lister().Endpoints(svcID.Namespace).Get(svcID.Name)
   602  	if err != nil {
   603  		return nil, err
   604  	}
   605  
   606  	for _, subset := range ep.Subsets {
   607  		for _, addr := range subset.Addresses {
   608  
   609  			if hostname == addr.Hostname {
   610  				if addr.TargetRef != nil && addr.TargetRef.Kind == "Pod" {
   611  					podName := addr.TargetRef.Name
   612  					podNamespace := addr.TargetRef.Namespace
   613  					pod, err := ww.k8sAPI.Pod().Lister().Pods(podNamespace).Get(podName)
   614  					if err != nil {
   615  						return nil, err
   616  					}
   617  					return pod, nil
   618  				}
   619  				return nil, nil
   620  			}
   621  		}
   622  	}
   623  
   624  	return nil, status.Errorf(codes.NotFound, "no pod found in Endpoints %s/%s for hostname %s", svcID.Namespace, svcID.Name, hostname)
   625  }
   626  
   627  func (wp *workloadPublisher) subscribe(listener WorkloadUpdateListener) error {
   628  	wp.mu.Lock()
   629  	defer wp.mu.Unlock()
   630  
   631  	wp.listeners = append(wp.listeners, listener)
   632  	wp.metrics.setSubscribers(len(wp.listeners))
   633  
   634  	if err := listener.Update(&wp.addr); err != nil {
   635  		return fmt.Errorf("failed to send initial update: %w", err)
   636  	}
   637  	wp.metrics.incUpdates()
   638  	return nil
   639  }
   640  
   641  func (wp *workloadPublisher) unsubscribe(listener WorkloadUpdateListener) {
   642  	wp.mu.Lock()
   643  	defer wp.mu.Unlock()
   644  
   645  	for i, e := range wp.listeners {
   646  		if e == listener {
   647  			n := len(wp.listeners)
   648  			wp.listeners[i] = wp.listeners[n-1]
   649  			wp.listeners[n-1] = nil
   650  			wp.listeners = wp.listeners[:n-1]
   651  			break
   652  		}
   653  	}
   654  
   655  	wp.metrics.setSubscribers(len(wp.listeners))
   656  }
   657  
   658  // updatePod creates an Address instance for the given pod, that is passed to
   659  // the listener's Update() method, only if the pod's readiness state has
   660  // changed. If the passed pod is nil, it means the pod (still referred to in
   661  // wp.pod) has been deleted.
   662  func (wp *workloadPublisher) updatePod(pod *corev1.Pod) {
   663  	wp.mu.Lock()
   664  	defer wp.mu.Unlock()
   665  
   666  	// pod wasn't ready or there was no backing pod - check if passed pod is ready
   667  	if wp.addr.Pod == nil {
   668  		if pod == nil {
   669  			wp.log.Trace("Pod deletion event already consumed - ignore")
   670  			return
   671  		}
   672  
   673  		if !isRunningAndReady(pod) {
   674  			wp.log.Tracef("Pod %s.%s not ready - ignore", pod.Name, pod.Namespace)
   675  			return
   676  		}
   677  
   678  		wp.log.Debugf("Pod %s.%s became ready", pod.Name, pod.Namespace)
   679  		wp.addr.Pod = pod
   680  
   681  		// Fill in ownership.
   682  		if wp.addr.Pod != nil {
   683  			ownerKind, ownerName, err := wp.metadataAPI.GetOwnerKindAndName(context.Background(), wp.addr.Pod, true)
   684  			if err != nil {
   685  				wp.log.Errorf("Error getting pod owner for pod %s: %q", wp.addr.Pod.GetName(), err)
   686  			} else {
   687  				wp.addr.OwnerKind = ownerKind
   688  				wp.addr.OwnerName = ownerName
   689  			}
   690  		}
   691  
   692  		// Compute opaque protocol.
   693  		if err := SetToServerProtocol(wp.k8sAPI, &wp.addr); err != nil {
   694  			wp.log.Errorf("Error computing opaque protocol for pod %s: %q", wp.addr.Pod.GetName(), err)
   695  		}
   696  
   697  		for _, l := range wp.listeners {
   698  			if err := l.Update(&wp.addr); err != nil {
   699  				wp.log.Warnf("Error sending update to listener: %s", err)
   700  				continue
   701  			}
   702  		}
   703  		wp.metrics.incUpdates()
   704  
   705  		return
   706  	}
   707  
   708  	// backing pod becoming unready or getting deleted
   709  	if pod == nil || !isRunningAndReady(pod) {
   710  		wp.log.Debugf("Pod %s.%s deleted or it became unready - remove", wp.addr.Pod.Name, wp.addr.Pod.Namespace)
   711  		wp.addr.Pod = nil
   712  		wp.addr.OwnerKind = ""
   713  		wp.addr.OwnerName = ""
   714  		wp.addr.OpaqueProtocol = false
   715  		for _, l := range wp.listeners {
   716  			if err := l.Update(&wp.addr); err != nil {
   717  				wp.log.Warnf("Error sending update to listener: %s", err)
   718  				continue
   719  			}
   720  		}
   721  		wp.metrics.incUpdates()
   722  
   723  		return
   724  	}
   725  
   726  	wp.log.Tracef("Ignored event on pod %s.%s", pod.Name, pod.Namespace)
   727  }
   728  
   729  // updateExternalWorkload creates an Address instance for the given externalworkload,
   730  // that is passed to the listener's Update() method, only if the workload's
   731  // readiness state has changed. If the passed workload is nil, it means the
   732  // workload (still referred to in wp.externalWorkload) has been deleted.
   733  func (wp *workloadPublisher) updateExternalWorkload(externalWorkload *ext.ExternalWorkload) {
   734  	wp.mu.Lock()
   735  	defer wp.mu.Unlock()
   736  
   737  	wp.addr.ExternalWorkload = externalWorkload
   738  
   739  	// Fill in ownership.
   740  	if wp.addr.ExternalWorkload != nil && len(wp.addr.ExternalWorkload.GetOwnerReferences()) == 1 {
   741  		wp.addr.OwnerKind = wp.addr.ExternalWorkload.GetOwnerReferences()[0].Kind
   742  		wp.addr.OwnerName = wp.addr.ExternalWorkload.GetOwnerReferences()[0].Name
   743  	}
   744  
   745  	// Compute opaque protocol.
   746  	if err := SetToServerProtocolExternalWorkload(wp.k8sAPI, &wp.addr); err != nil {
   747  		wp.log.Errorf("Error computing opaque protocol for externalworkload %s: %q", wp.addr.ExternalWorkload.GetName(), err)
   748  	}
   749  
   750  	for _, l := range wp.listeners {
   751  		if err := l.Update(&wp.addr); err != nil {
   752  			wp.log.Warnf("Error sending update to listener: %s", err)
   753  			continue
   754  		}
   755  	}
   756  	wp.metrics.incUpdates()
   757  }
   758  
   759  // GetAnnotatedOpaquePorts returns the opaque ports for the pod given its
   760  // annotations, or the default opaque ports if it's not annotated
   761  func GetAnnotatedOpaquePorts(pod *corev1.Pod, defaultPorts map[uint32]struct{}) map[uint32]struct{} {
   762  	if pod == nil {
   763  		return defaultPorts
   764  	}
   765  	annotation, ok := pod.Annotations[consts.ProxyOpaquePortsAnnotation]
   766  	if !ok {
   767  		return defaultPorts
   768  	}
   769  	opaquePorts := make(map[uint32]struct{})
   770  	namedPorts := util.GetNamedPorts(pod.Spec.Containers)
   771  	if annotation != "" {
   772  		for _, pr := range util.ParseContainerOpaquePorts(annotation, namedPorts) {
   773  			for _, port := range pr.Ports() {
   774  				opaquePorts[uint32(port)] = struct{}{}
   775  			}
   776  		}
   777  	}
   778  	return opaquePorts
   779  }
   780  
   781  // GetAnnotatedOpaquePortsForExternalWorkload returns the opaque ports for the external workload given its
   782  // annotations, or the default opaque ports if it's not annotated
   783  func GetAnnotatedOpaquePortsForExternalWorkload(ew *ext.ExternalWorkload, defaultPorts map[uint32]struct{}) map[uint32]struct{} {
   784  	if ew == nil {
   785  		return defaultPorts
   786  	}
   787  	annotation, ok := ew.Annotations[consts.ProxyOpaquePortsAnnotation]
   788  	if !ok {
   789  		return defaultPorts
   790  	}
   791  	opaquePorts := make(map[uint32]struct{})
   792  	if annotation != "" {
   793  		for _, pr := range parseExternalWorkloadOpaquePorts(annotation, ew) {
   794  			for _, port := range pr.Ports() {
   795  				opaquePorts[uint32(port)] = struct{}{}
   796  			}
   797  		}
   798  	}
   799  	return opaquePorts
   800  }
   801  
   802  func parseExternalWorkloadOpaquePorts(override string, ew *ext.ExternalWorkload) []util.PortRange {
   803  	portRanges := util.GetPortRanges(override)
   804  	var values []util.PortRange
   805  	for _, pr := range portRanges {
   806  		port, named := isNamedInExternalWorkload(pr, ew)
   807  		if named {
   808  			values = append(values, util.PortRange{UpperBound: int(port), LowerBound: int(port)})
   809  		} else {
   810  			pr, err := util.ParsePortRange(pr)
   811  			if err != nil {
   812  				logging.Warnf("Invalid port range [%v]: %s", pr, err)
   813  				continue
   814  			}
   815  			values = append(values, pr)
   816  		}
   817  	}
   818  	return values
   819  }
   820  
   821  func isNamedInExternalWorkload(pr string, ew *ext.ExternalWorkload) (int32, bool) {
   822  	for _, p := range ew.Spec.Ports {
   823  		if p.Name == pr {
   824  			return p.Port, true
   825  		}
   826  	}
   827  
   828  	return 0, false
   829  }
   830  
   831  func isRunningAndReady(pod *corev1.Pod) bool {
   832  	if pod == nil || pod.Status.Phase != corev1.PodRunning {
   833  		return false
   834  	}
   835  	for _, condition := range pod.Status.Conditions {
   836  		if condition.Type == corev1.PodReady && condition.Status == corev1.ConditionTrue {
   837  			return true
   838  		}
   839  	}
   840  
   841  	return false
   842  }
   843  

View as plain text