...

Source file src/edge-infra.dev/pkg/edge/linkerd/k8s/controllers/workloadinjection/workloads/workloads.go

Documentation: edge-infra.dev/pkg/edge/linkerd/k8s/controllers/workloadinjection/workloads

     1  package workloads
     2  
     3  import (
     4  	"context"
     5  	"crypto/x509"
     6  	"fmt"
     7  	"reflect"
     8  	"slices"
     9  
    10  	v1 "k8s.io/api/apps/v1"
    11  	corev1 "k8s.io/api/core/v1"
    12  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    13  	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
    14  	ctrl "sigs.k8s.io/controller-runtime"
    15  	"sigs.k8s.io/controller-runtime/pkg/client"
    16  
    17  	"edge-infra.dev/pkg/edge/linkerd"
    18  	"edge-infra.dev/pkg/edge/linkerd/certs/identity"
    19  	"edge-infra.dev/pkg/edge/linkerd/certs/trustanchor"
    20  	l5dv1alpha1 "edge-infra.dev/pkg/edge/linkerd/k8s/apis/linkerd/v1alpha1"
    21  	"edge-infra.dev/pkg/lib/fog"
    22  	chart "edge-infra.dev/third_party/k8s/linkerd/helm"
    23  )
    24  
    25  type Workload struct {
    26  	Owner       *unstructured.Unstructured
    27  	PodRestarts []*PodRestart
    28  }
    29  
    30  type RestartReason string
    31  
    32  const (
    33  	CertRotated   RestartReason = "cert-rotated"
    34  	CertExpired   RestartReason = "cert-expired"
    35  	ProxyMissing  RestartReason = "proxy-missing"
    36  	ProxyOutdated RestartReason = "proxy-outdated"
    37  	ProxyUnmesh   RestartReason = "proxy-unmesh"
    38  	ForceRestart  RestartReason = "force"
    39  	DoNotRestart  RestartReason = "do-not-restart"
    40  )
    41  
    42  type PodRestart struct {
    43  	RestartReason
    44  	*corev1.Pod
    45  }
    46  
    47  // RestartStates returns a list of workloads and their child pods
    48  // and the restart status (reason) of the pod
    49  func RestartStates(ctx context.Context, c client.Client, l5d *l5dv1alpha1.Linkerd, l5djob l5dv1alpha1.LinkerdWorkloadInjection) (map[string]*Workload, error) {
    50  	var workloads = map[string]*Workload{}
    51  	// if no namespaces are defined, attempt to reinject all workloads
    52  	if len(l5djob.Spec.Namespaces) == 0 {
    53  		podList := corev1.PodList{}
    54  		if err := c.List(ctx, &podList, &client.ListOptions{}); err != nil {
    55  			return nil, err
    56  		}
    57  		workloads = appendPodToOwningWorkload(ctx, c, podList, workloads, l5d, l5djob)
    58  		return workloads, nil
    59  	}
    60  	// otherwise return all pods in the specified namespaces that can be injected
    61  	for _, ns := range l5djob.Spec.Namespaces {
    62  		podList := corev1.PodList{}
    63  		if err := c.List(ctx, &podList, &client.ListOptions{Namespace: ns}); err != nil {
    64  			return nil, err
    65  		}
    66  		workloads = appendPodToOwningWorkload(ctx, c, podList, workloads, l5d, l5djob)
    67  	}
    68  	// workloads that have been unmeshed should be restarted
    69  	for _, disabledNs := range l5d.Status.DisabledNamespaces {
    70  		podList := corev1.PodList{}
    71  		if err := c.List(ctx, &podList, &client.ListOptions{Namespace: disabledNs}); err != nil {
    72  			return nil, err
    73  		}
    74  		unmeshedPods := []corev1.Pod{}
    75  		for _, pod := range podList.Items {
    76  			if value, found := pod.Annotations[linkerd.InjectionAnnotation]; found && value == "enabled" {
    77  				unmeshedPods = append(unmeshedPods, pod)
    78  			}
    79  		}
    80  		podList.Items = unmeshedPods
    81  		disabledWorkloads := appendPodToOwningWorkload(ctx, c, podList, workloads, l5d, l5djob)
    82  		for id, workload := range disabledWorkloads {
    83  			workloads[id] = workload
    84  		}
    85  	}
    86  	return workloads, nil
    87  }
    88  
    89  // appendPodToOwningWorkload will append the pod to its owning workload with the restart reason
    90  func appendPodToOwningWorkload(ctx context.Context, c client.Client, podList corev1.PodList, workloads map[string]*Workload, l5d *l5dv1alpha1.Linkerd, l5djob l5dv1alpha1.LinkerdWorkloadInjection) map[string]*Workload {
    91  	log := ctrl.LoggerFrom(ctx)
    92  	for idx, pod := range podList.Items {
    93  		owningWorkload, err := OwningWorkload(ctx, c, pod)
    94  		if err != nil {
    95  			log.Error(err, "unable to get owning workload", "pod", pod.Name, "namespace", pod.Namespace)
    96  			continue
    97  		}
    98  		if owningWorkload == nil {
    99  			continue
   100  		}
   101  		workloadID := workloadID(owningWorkload)
   102  		if workloads[workloadID] == nil {
   103  			workloads[workloadID] = &Workload{
   104  				Owner:       nil,
   105  				PodRestarts: []*PodRestart{},
   106  			}
   107  		}
   108  
   109  		certs, err := identity.GetCertificates(&podList.Items[idx])
   110  		if err != nil {
   111  			log.Error(err, "unable to get identity certificate from admin port", "pod", pod.Name, "namespace", pod.Namespace)
   112  			continue
   113  		}
   114  
   115  		workloads[workloadID].Owner = owningWorkload
   116  		workloads[workloadID].PodRestarts = append(workloads[workloadID].PodRestarts, &PodRestart{
   117  			Pod:           &podList.Items[idx],
   118  			RestartReason: PodRestartReason(ctx, pod, l5d, l5djob, certs),
   119  		})
   120  	}
   121  	return workloads
   122  }
   123  
   124  // PodRestartReason will check if we need to restart a pod. This is determined by a number of factors:
   125  //
   126  //	is the pod meshable?
   127  //	is the pod set for termination?
   128  //	is the pod using the host network?
   129  //	is the proxy version outdated?
   130  //	does the pod contain the linkerd proxy?
   131  //	is linkerd workload injection forcing a restart?
   132  //	have the linkerd identity certificates expired?
   133  func PodRestartReason(ctx context.Context, pod corev1.Pod, l5d *l5dv1alpha1.Linkerd, l5djob l5dv1alpha1.LinkerdWorkloadInjection, certs []*x509.Certificate) RestartReason {
   134  	log := fog.FromContext(ctx)
   135  	podsNsDisabled := slices.Contains(l5d.Status.DisabledNamespaces, pod.Namespace)
   136  	if inject, found := pod.GetAnnotations()[linkerd.InjectionAnnotation]; found && inject == "disabled" {
   137  		return DoNotRestart
   138  	}
   139  	// skip pods in terminating state
   140  	if pod.DeletionTimestamp != nil {
   141  		return DoNotRestart
   142  	}
   143  	// skip pod if remeshing all workloads and pod in disabled namespace
   144  	if len(l5djob.Spec.Namespaces) == 0 && podsNsDisabled {
   145  		return DoNotRestart
   146  	}
   147  	// skip pods using hostNetwork
   148  	// See: https://github.com/linkerd/linkerd2/issues/7949#issuecomment-1049099837
   149  	if pod.Spec.HostNetwork {
   150  		return DoNotRestart
   151  	}
   152  
   153  	if l5djob.Spec.Force {
   154  		return ForceRestart
   155  	}
   156  
   157  	if !linkerd.ProxyExists(&pod) && !podsNsDisabled {
   158  		return ProxyMissing
   159  	}
   160  	if linkerd.ProxyExists(&pod) && podsNsDisabled {
   161  		return ProxyUnmesh
   162  	}
   163  
   164  	if IsProxyOutdated(ctx, pod.GetAnnotations()) {
   165  		return ProxyOutdated
   166  	}
   167  
   168  	if identity.CertificateExpired(certs) {
   169  		log.Info("the identity certificate expired", "pod", pod.Name, "namespace", pod.Namespace)
   170  		return CertExpired
   171  	}
   172  
   173  	if trustanchor.HasManualRotationAnnotation(l5d) {
   174  		return CertRotated
   175  	}
   176  
   177  	return DoNotRestart
   178  }
   179  
   180  // OwningWorkload determines the top-level workload types that owns the provided pod,
   181  // fetches that workload object from the API server and returns it (e.g., Deployment/StatefulSet/DaemonSet).
   182  //
   183  // If the owner is a ReplicaSet, the owning Deployment is returned.
   184  //
   185  // If the pods owner is not a workload type that we care about, a nil object is returned.
   186  func OwningWorkload(ctx context.Context, c client.Client, pod corev1.Pod) (*unstructured.Unstructured, error) {
   187  	ownerRef := getOwnerReference(pod.ObjectMeta)
   188  	if ownerRef == (metav1.OwnerReference{}) {
   189  		return nil, nil
   190  	}
   191  	var err error
   192  	if ownerRef.Kind == reflect.TypeOf(v1.ReplicaSet{}).Name() {
   193  		ownerRef, err = getReplicaSetOwnerReference(ctx, c, pod, ownerRef)
   194  		if err != nil {
   195  			return nil, err
   196  		}
   197  	}
   198  	return getWorkloadFromOwnerReference(ctx, c, pod, ownerRef)
   199  }
   200  
   201  func IsProxyOutdated(ctx context.Context, podAnnotations map[string]string) bool {
   202  	log := fog.FromContext(ctx)
   203  	v, found := podAnnotations[linkerd.ProxyVersionAnnotation]
   204  	if !found {
   205  		log.Info("linkerd proxy version annotation not found, forcing restart")
   206  		return true
   207  	}
   208  	return v != chart.Version
   209  }
   210  
   211  func workloadID(obj *unstructured.Unstructured) string {
   212  	return fmt.Sprintf("%s/%s/%s", obj.GetKind(), obj.GetNamespace(), obj.GetName())
   213  }
   214  

View as plain text