...

Source file src/edge-infra.dev/pkg/edge/linkerd/k8s/controllers/workloadinjection_controller.go

Documentation: edge-infra.dev/pkg/edge/linkerd/k8s/controllers

     1  package controllers
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"fmt"
     7  	"time"
     8  
     9  	"github.com/go-logr/logr"
    10  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    11  	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
    12  	"k8s.io/apimachinery/pkg/types"
    13  	kuberecorder "k8s.io/client-go/tools/record"
    14  	ctrl "sigs.k8s.io/controller-runtime"
    15  	"sigs.k8s.io/controller-runtime/pkg/builder"
    16  	"sigs.k8s.io/controller-runtime/pkg/client"
    17  	"sigs.k8s.io/controller-runtime/pkg/predicate"
    18  
    19  	"edge-infra.dev/pkg/k8s/meta/status"
    20  	"edge-infra.dev/pkg/k8s/runtime/conditions"
    21  	edgereconcile "edge-infra.dev/pkg/k8s/runtime/controller/reconcile"
    22  	"edge-infra.dev/pkg/k8s/runtime/events"
    23  	inventory "edge-infra.dev/pkg/k8s/runtime/inventory"
    24  	"edge-infra.dev/pkg/k8s/runtime/patch"
    25  
    26  	linkerd "edge-infra.dev/pkg/edge/linkerd"
    27  	l5dv1alpha1 "edge-infra.dev/pkg/edge/linkerd/k8s/apis/linkerd/v1alpha1"
    28  	l5dmetrics "edge-infra.dev/pkg/edge/linkerd/k8s/controllers/metrics"
    29  	"edge-infra.dev/pkg/edge/linkerd/k8s/controllers/workloadinjection/restart"
    30  	"edge-infra.dev/pkg/edge/linkerd/k8s/controllers/workloadinjection/workloads"
    31  	"edge-infra.dev/pkg/k8s/runtime/controller/metrics"
    32  )
    33  
    34  var (
    35  	// ErrGettingLinkerdObject is returned when the Linkerd CR cannot be retrieved
    36  	ErrGettingLinkerdObject = errors.New("unable to get the linkerd object")
    37  	// ErrGettingRestartStates is returned when the restart states of workload pods cannot be retrieved
    38  	ErrGettingRestartStates = errors.New("failed to get workload restart states")
    39  	// ErrReinjectingWorkload is returned when a workload fails to be reinjected
    40  	ErrReinjectingWorkload = errors.New("failed workload reinjection")
    41  )
    42  
    43  // L5dWorkloadInjection reconciles LinkerdWorkloadInjection objects and filters workloads to restart
    44  type L5dWorkloadInjectionReconciler struct {
    45  	client.Client
    46  	kuberecorder.EventRecorder
    47  	Log         logr.Logger
    48  	Name        string
    49  	RequeueTime time.Duration
    50  	Conditions  edgereconcile.Conditions
    51  	Metrics     metrics.Metrics
    52  }
    53  
    54  func newL5dWorkloadInjectionReconciler(mgr ctrl.Manager, metrics metrics.Metrics, eventRecorder *events.Recorder) *L5dWorkloadInjectionReconciler {
    55  	return &L5dWorkloadInjectionReconciler{
    56  		Client:        mgr.GetClient(),
    57  		Log:           ctrl.Log.WithName(linkerd.WorkloadInjectionControllerName),
    58  		Name:          linkerd.WorkloadInjectionControllerName,
    59  		RequeueTime:   30 * time.Second,
    60  		Conditions:    LinkerdWorkloadInjectionConditions,
    61  		Metrics:       metrics,
    62  		EventRecorder: eventRecorder,
    63  	}
    64  }
    65  
    66  var LinkerdWorkloadInjectionConditions = edgereconcile.Conditions{
    67  	Target: status.ReadyCondition,
    68  	Owned: []string{
    69  		status.ReconcilingCondition,
    70  		l5dv1alpha1.Restart,
    71  		l5dv1alpha1.Workloads,
    72  	},
    73  	Summarize: []string{
    74  		status.ReadyCondition,
    75  		status.ReconcilingCondition,
    76  		l5dv1alpha1.Restart,
    77  		l5dv1alpha1.Workloads,
    78  	},
    79  	NegativePolarity: []string{
    80  		status.ReconcilingCondition,
    81  	},
    82  }
    83  
    84  // SetupWithManager sets up L5dReinjectionReconciler with the manager.
    85  func (r *L5dWorkloadInjectionReconciler) SetupWithManager(mgr ctrl.Manager) error {
    86  	return ctrl.NewControllerManagedBy(mgr).
    87  		For(&l5dv1alpha1.LinkerdWorkloadInjection{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
    88  		WithEventFilter(createEventFilter()).
    89  		Complete(r)
    90  }
    91  
    92  func (r *L5dWorkloadInjectionReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, err error) {
    93  	// get the workloadinjection
    94  	l5djob := &l5dv1alpha1.LinkerdWorkloadInjection{}
    95  	if err = r.Get(ctx, req.NamespacedName, l5djob); err != nil {
    96  		return ctrl.Result{}, err
    97  	}
    98  
    99  	// exit early if job is already completed
   100  	if l5djob.IsCompleted() {
   101  		return ctrl.Result{}, nil
   102  	}
   103  
   104  	// get the linkerd object
   105  	l5d := &l5dv1alpha1.Linkerd{}
   106  	if err = r.Get(ctx, types.NamespacedName{Name: l5dv1alpha1.Name}, l5d); err != nil {
   107  		return ctrl.Result{}, errors.Join(ErrGettingLinkerdObject, err)
   108  	}
   109  
   110  	// exit early if we are reconciling LinkerdWorkloadInjection/linkerd and it is disabled
   111  	if l5djob.Name == linkerd.Namespace && l5d.IsWorkloadInjectionDisabled() {
   112  		return ctrl.Result{}, nil
   113  	}
   114  
   115  	patcher := patch.NewSerialPatcher(l5djob, r.Client)
   116  	defer func() {
   117  		rs := edgereconcile.ResultSuccess
   118  		if err != nil {
   119  			rs = edgereconcile.ResultEmpty
   120  		}
   121  		result, err = r.summarizer(ctx, patcher, l5djob, rs, err)
   122  	}()
   123  
   124  	// mark workloadinjection as reconciling
   125  	conditions.MarkTrue(l5djob, status.ReconcilingCondition, l5dv1alpha1.WorkloadInjectionProgressing, "reconciliation in progress")
   126  	l5djob.Status.StartAt = metav1.Now()
   127  
   128  	// gather workload restart states
   129  	var states map[string]*workloads.Workload
   130  	states, err = workloads.RestartStates(ctx, r.Client, l5d, l5djob)
   131  	if err != nil {
   132  		conditions.MarkFalse(l5djob, l5dv1alpha1.Workloads, l5dv1alpha1.RestartStatesFailed, "failed to get workload restart states: %v", err)
   133  		return ctrl.Result{}, errors.Join(ErrGettingRestartStates, err)
   134  	}
   135  	conditions.MarkTrue(l5djob, l5dv1alpha1.Workloads, l5dv1alpha1.SucceededReason, "successfully gathered workload restart states")
   136  
   137  	// process workload injections
   138  	var failed []*unstructured.Unstructured
   139  	var restarted []*unstructured.Unstructured
   140  	failed, restarted, err = r.processWorkloadInjections(ctx, states)
   141  	if err != nil {
   142  		conditions.MarkFalse(l5djob, l5dv1alpha1.Restart, l5dv1alpha1.WorkloadInjectionFailed, "failed workload reinjection: %v", err)
   143  		return ctrl.Result{}, err
   144  	}
   145  	conditions.MarkTrue(l5djob, l5dv1alpha1.Restart, l5dv1alpha1.SucceededReason, "completed workload injection job")
   146  
   147  	// update inventory and status on results
   148  	updateStatus(l5djob, failed, restarted)
   149  
   150  	log := ctrl.LoggerFrom(ctx)
   151  	log.Info("completed workload injection job")
   152  	return ctrl.Result{}, nil
   153  }
   154  
   155  func (r *L5dWorkloadInjectionReconciler) processWorkloadInjections(ctx context.Context, states map[string]*workloads.Workload) (allFailed, allRestarted []*unstructured.Unstructured, err error) {
   156  	for _, workload := range states {
   157  		failed, restarted, injErr := restart.AttemptWorkloadInjection(ctx, r.Client, workload)
   158  		if injErr != nil {
   159  			err = errors.Join(err, fmt.Errorf("%w for %s/%s: %w", ErrReinjectingWorkload, workload.Owner.GetNamespace(), workload.Owner.GetName(), injErr))
   160  		}
   161  		allFailed = append(allFailed, failed...)
   162  		allRestarted = append(allRestarted, restarted...)
   163  	}
   164  	return allFailed, allRestarted, err
   165  }
   166  
   167  func updateStatus(l5djob *l5dv1alpha1.LinkerdWorkloadInjection, failed, restarted []*unstructured.Unstructured) {
   168  	restartedInventory := inventory.New(inventory.FromUnstructured(restarted...))
   169  	if restartedInventory != nil {
   170  		l5djob.Status.Inventory = restartedInventory
   171  	}
   172  
   173  	failedInventory := inventory.New(inventory.FromUnstructured(failed...))
   174  	if failedInventory != nil {
   175  		l5djob.Status.FailedInventory = failedInventory
   176  	}
   177  
   178  	l5djob.Status.CompletedAt = metav1.Now()
   179  	conditions.Delete(l5djob, status.ReconcilingCondition)
   180  }
   181  
   182  func (r *L5dWorkloadInjectionReconciler) summarizer(ctx context.Context, patcher *patch.SerialPatcher, l5djob *l5dv1alpha1.LinkerdWorkloadInjection, result edgereconcile.Result, err error) (res ctrl.Result, recErr error) {
   183  	s := edgereconcile.NewSummarizer(patcher)
   184  	res, recErr = s.SummarizeAndPatch(ctx, l5djob,
   185  		edgereconcile.WithConditions(r.Conditions),
   186  		edgereconcile.WithResult(result),
   187  		edgereconcile.WithError(err),
   188  		edgereconcile.WithIgnoreNotFound(),
   189  		edgereconcile.WithProcessors(
   190  			edgereconcile.RecordReconcileReq,
   191  			edgereconcile.RecordResult,
   192  		),
   193  		edgereconcile.WithFieldOwner(r.Name),
   194  		edgereconcile.WithEventRecorder(r.EventRecorder),
   195  	)
   196  
   197  	l5dmetrics.RecordWorkloadInjectionReadiness(l5djob)
   198  
   199  	return res, recErr
   200  }
   201  

View as plain text