...

Source file src/github.com/linkerd/linkerd2/controller/api/destination/watcher/k8s.go

Documentation: github.com/linkerd/linkerd2/controller/api/destination/watcher

     1  package watcher
     2  
     3  import (
     4  	"errors"
     5  	"fmt"
     6  	"net"
     7  
     8  	ext "github.com/linkerd/linkerd2/controller/gen/apis/externalworkload/v1beta1"
     9  	"github.com/linkerd/linkerd2/controller/k8s"
    10  	"github.com/prometheus/client_golang/prometheus"
    11  	corev1 "k8s.io/api/core/v1"
    12  	"k8s.io/apimachinery/pkg/util/intstr"
    13  	"k8s.io/client-go/tools/cache"
    14  )
    15  
    16  const (
    17  	// PodIPIndex is the key for the index based on Pod IPs
    18  	PodIPIndex = "ip"
    19  	// HostIPIndex is the key for the index based on Host IP of pods with host network enabled
    20  	HostIPIndex = "hostIP"
    21  	// ExternalWorkloadIPIndex is the key for the index based on IP of externalworkloads
    22  	ExternalWorkloadIPIndex = "externalWorkloadIP"
    23  )
    24  
    25  type (
    26  	// IPPort holds the IP and port for some destination
    27  	IPPort struct {
    28  		IP   string
    29  		Port Port
    30  	}
    31  
    32  	// ID is a namespace-qualified name.
    33  	ID struct {
    34  		Namespace string
    35  		Name      string
    36  
    37  		// Only used for PodID
    38  		IPFamily corev1.IPFamily
    39  	}
    40  	// ServiceID is the namespace-qualified name of a service.
    41  	ServiceID = ID
    42  	// PodID is the namespace-qualified name of a pod.
    43  	PodID = ID
    44  	// ProfileID is the namespace-qualified name of a service profile.
    45  	ProfileID = ID
    46  	// PodID is the namespace-qualified name of an ExternalWorkload.
    47  	ExternalWorkloadID = ID
    48  
    49  	// Port is a numeric port.
    50  	Port      = uint32
    51  	namedPort = intstr.IntOrString
    52  
    53  	// InvalidService is an error which indicates that the authority is not a
    54  	// valid service.
    55  	InvalidService struct {
    56  		authority string
    57  	}
    58  )
    59  
    60  // Labels returns the labels for prometheus metrics associated to the service
    61  func (id ServiceID) Labels() prometheus.Labels {
    62  	return prometheus.Labels{"namespace": id.Namespace, "name": id.Name}
    63  }
    64  
    65  func (is InvalidService) Error() string {
    66  	return fmt.Sprintf("Invalid k8s service %s", is.authority)
    67  }
    68  
    69  func invalidService(authority string) InvalidService {
    70  	return InvalidService{authority}
    71  }
    72  
    73  func (i ID) String() string {
    74  	return fmt.Sprintf("%s/%s", i.Namespace, i.Name)
    75  }
    76  
    77  // InitializeIndexers is used to initialize indexers on k8s informers, to be used across watchers
    78  func InitializeIndexers(k8sAPI *k8s.API) error {
    79  	err := k8sAPI.Svc().Informer().AddIndexers(cache.Indexers{PodIPIndex: func(obj interface{}) ([]string, error) {
    80  		svc, ok := obj.(*corev1.Service)
    81  		if !ok {
    82  			return nil, errors.New("object is not a service")
    83  		}
    84  
    85  		if len(svc.Spec.ClusterIPs) != 0 {
    86  			return svc.Spec.ClusterIPs, nil
    87  		}
    88  
    89  		if svc.Spec.ClusterIP != "" {
    90  			return []string{svc.Spec.ClusterIP}, nil
    91  		}
    92  
    93  		return nil, nil
    94  	}})
    95  
    96  	if err != nil {
    97  		return fmt.Errorf("could not create an indexer for services: %w", err)
    98  	}
    99  
   100  	err = k8sAPI.Pod().Informer().AddIndexers(cache.Indexers{PodIPIndex: func(obj interface{}) ([]string, error) {
   101  		if pod, ok := obj.(*corev1.Pod); ok {
   102  			// Pods that run in the host network are indexed by the host IP
   103  			// indexer in the IP watcher; they should be skipped by the pod
   104  			// IP indexer which is responsible only for indexing pod network
   105  			// pods.
   106  			if pod.Spec.HostNetwork {
   107  				return nil, nil
   108  			}
   109  			ips := []string{}
   110  			for _, pip := range pod.Status.PodIPs {
   111  				if pip.IP != "" {
   112  					ips = append(ips, pip.IP)
   113  				}
   114  			}
   115  			if len(ips) == 0 && pod.Status.PodIP != "" {
   116  				ips = append(ips, pod.Status.PodIP)
   117  			}
   118  			return ips, nil
   119  		}
   120  		return nil, fmt.Errorf("object is not a pod")
   121  	}})
   122  
   123  	if err != nil {
   124  		return fmt.Errorf("could not create an indexer for pods: %w", err)
   125  	}
   126  
   127  	err = k8sAPI.Pod().Informer().AddIndexers(cache.Indexers{HostIPIndex: func(obj interface{}) ([]string, error) {
   128  		pod, ok := obj.(*corev1.Pod)
   129  		if !ok {
   130  			return nil, errors.New("object is not a pod")
   131  		}
   132  
   133  		ips := []string{}
   134  		for _, hip := range pod.Status.HostIPs {
   135  			ips = append(ips, hip.IP)
   136  		}
   137  		if len(ips) == 0 && pod.Status.HostIP != "" {
   138  			ips = append(ips, pod.Status.HostIP)
   139  		}
   140  		if len(ips) == 0 {
   141  			return []string{}, nil
   142  		}
   143  
   144  		// If the pod is reachable from the host network, then for
   145  		// each of its containers' ports that exposes a host port, add
   146  		// that hostIP:hostPort endpoint to the indexer.
   147  		addrs := []string{}
   148  		for _, c := range pod.Spec.Containers {
   149  			for _, p := range c.Ports {
   150  				if p.HostPort == 0 {
   151  					continue
   152  				}
   153  				for _, ip := range ips {
   154  					addrs = append(addrs, net.JoinHostPort(ip, fmt.Sprintf("%d", p.HostPort)))
   155  				}
   156  			}
   157  		}
   158  		return addrs, nil
   159  	}})
   160  
   161  	if err != nil {
   162  		return fmt.Errorf("could not create an indexer for pods: %w", err)
   163  	}
   164  
   165  	err = k8sAPI.ExtWorkload().Informer().AddIndexers(cache.Indexers{ExternalWorkloadIPIndex: func(obj interface{}) ([]string, error) {
   166  		ew, ok := obj.(*ext.ExternalWorkload)
   167  		if !ok {
   168  			return nil, errors.New("object is not an externalworkload")
   169  		}
   170  
   171  		addrs := []string{}
   172  		for _, ip := range ew.Spec.WorkloadIPs {
   173  			for _, port := range ew.Spec.Ports {
   174  				addrs = append(addrs, net.JoinHostPort(ip.Ip, fmt.Sprintf("%d", port.Port)))
   175  			}
   176  		}
   177  		return addrs, nil
   178  	}})
   179  
   180  	if err != nil {
   181  		return fmt.Errorf("could not create an indexer for externalworkloads: %w", err)
   182  	}
   183  
   184  	return nil
   185  }
   186  
   187  func getIndexedPods(k8sAPI *k8s.API, indexName string, key string) ([]*corev1.Pod, error) {
   188  	objs, err := k8sAPI.Pod().Informer().GetIndexer().ByIndex(indexName, key)
   189  	if err != nil {
   190  		return nil, fmt.Errorf("failed getting %s indexed pods: %w", indexName, err)
   191  	}
   192  	pods := make([]*corev1.Pod, 0)
   193  	for _, obj := range objs {
   194  		pod := obj.(*corev1.Pod)
   195  		if !podNotTerminating(pod) {
   196  			continue
   197  		}
   198  		pods = append(pods, pod)
   199  	}
   200  	return pods, nil
   201  }
   202  
   203  func getIndexedExternalWorkloads(k8sAPI *k8s.API, indexName string, key string) ([]*ext.ExternalWorkload, error) {
   204  	objs, err := k8sAPI.ExtWorkload().Informer().GetIndexer().ByIndex(indexName, key)
   205  	if err != nil {
   206  		return nil, fmt.Errorf("failed getting %s indexed externalworkloads: %w", indexName, err)
   207  	}
   208  	workloads := make([]*ext.ExternalWorkload, 0)
   209  	for _, obj := range objs {
   210  		workload := obj.(*ext.ExternalWorkload)
   211  		workloads = append(workloads, workload)
   212  	}
   213  	return workloads, nil
   214  }
   215  
   216  func podNotTerminating(pod *corev1.Pod) bool {
   217  	phase := pod.Status.Phase
   218  	podTerminated := phase == corev1.PodSucceeded || phase == corev1.PodFailed
   219  	podTerminating := pod.DeletionTimestamp != nil
   220  	return !podTerminating && !podTerminated
   221  }
   222  

View as plain text