package controllers import ( "context" "errors" "fmt" "time" "github.com/go-logr/logr" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/types" kuberecorder "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/predicate" "edge-infra.dev/pkg/k8s/meta/status" "edge-infra.dev/pkg/k8s/runtime/conditions" edgereconcile "edge-infra.dev/pkg/k8s/runtime/controller/reconcile" "edge-infra.dev/pkg/k8s/runtime/events" inventory "edge-infra.dev/pkg/k8s/runtime/inventory" "edge-infra.dev/pkg/k8s/runtime/patch" linkerd "edge-infra.dev/pkg/edge/linkerd" l5dv1alpha1 "edge-infra.dev/pkg/edge/linkerd/k8s/apis/linkerd/v1alpha1" l5dmetrics "edge-infra.dev/pkg/edge/linkerd/k8s/controllers/metrics" "edge-infra.dev/pkg/edge/linkerd/k8s/controllers/workloadinjection/restart" "edge-infra.dev/pkg/edge/linkerd/k8s/controllers/workloadinjection/workloads" "edge-infra.dev/pkg/k8s/runtime/controller/metrics" ) var ( // ErrGettingLinkerdObject is returned when the Linkerd CR cannot be retrieved ErrGettingLinkerdObject = errors.New("unable to get the linkerd object") // ErrGettingRestartStates is returned when the restart states of workload pods cannot be retrieved ErrGettingRestartStates = errors.New("failed to get workload restart states") // ErrReinjectingWorkload is returned when a workload fails to be reinjected ErrReinjectingWorkload = errors.New("failed workload reinjection") ) // L5dWorkloadInjection reconciles LinkerdWorkloadInjection objects and filters workloads to restart type L5dWorkloadInjectionReconciler struct { client.Client kuberecorder.EventRecorder Log logr.Logger Name string RequeueTime time.Duration Conditions edgereconcile.Conditions Metrics metrics.Metrics } func newL5dWorkloadInjectionReconciler(mgr ctrl.Manager, metrics metrics.Metrics, eventRecorder *events.Recorder) *L5dWorkloadInjectionReconciler { return &L5dWorkloadInjectionReconciler{ Client: mgr.GetClient(), Log: ctrl.Log.WithName(linkerd.WorkloadInjectionControllerName), Name: linkerd.WorkloadInjectionControllerName, RequeueTime: 30 * time.Second, Conditions: LinkerdWorkloadInjectionConditions, Metrics: metrics, EventRecorder: eventRecorder, } } var LinkerdWorkloadInjectionConditions = edgereconcile.Conditions{ Target: status.ReadyCondition, Owned: []string{ status.ReconcilingCondition, l5dv1alpha1.Restart, l5dv1alpha1.Workloads, }, Summarize: []string{ status.ReadyCondition, status.ReconcilingCondition, l5dv1alpha1.Restart, l5dv1alpha1.Workloads, }, NegativePolarity: []string{ status.ReconcilingCondition, }, } // SetupWithManager sets up L5dReinjectionReconciler with the manager. func (r *L5dWorkloadInjectionReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&l5dv1alpha1.LinkerdWorkloadInjection{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})). WithEventFilter(createEventFilter()). Complete(r) } func (r *L5dWorkloadInjectionReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, err error) { // get the workloadinjection l5djob := &l5dv1alpha1.LinkerdWorkloadInjection{} if err = r.Get(ctx, req.NamespacedName, l5djob); err != nil { return ctrl.Result{}, err } // exit early if job is already completed if l5djob.IsCompleted() { return ctrl.Result{}, nil } // get the linkerd object l5d := &l5dv1alpha1.Linkerd{} if err = r.Get(ctx, types.NamespacedName{Name: l5dv1alpha1.Name}, l5d); err != nil { return ctrl.Result{}, errors.Join(ErrGettingLinkerdObject, err) } // exit early if we are reconciling LinkerdWorkloadInjection/linkerd and it is disabled if l5djob.Name == linkerd.Namespace && l5d.IsWorkloadInjectionDisabled() { return ctrl.Result{}, nil } patcher := patch.NewSerialPatcher(l5djob, r.Client) defer func() { rs := edgereconcile.ResultSuccess if err != nil { rs = edgereconcile.ResultEmpty } result, err = r.summarizer(ctx, patcher, l5djob, rs, err) }() // mark workloadinjection as reconciling conditions.MarkTrue(l5djob, status.ReconcilingCondition, l5dv1alpha1.WorkloadInjectionProgressing, "reconciliation in progress") l5djob.Status.StartAt = metav1.Now() // gather workload restart states var states map[string]*workloads.Workload states, err = workloads.RestartStates(ctx, r.Client, l5d, l5djob) if err != nil { conditions.MarkFalse(l5djob, l5dv1alpha1.Workloads, l5dv1alpha1.RestartStatesFailed, "failed to get workload restart states: %v", err) return ctrl.Result{}, errors.Join(ErrGettingRestartStates, err) } conditions.MarkTrue(l5djob, l5dv1alpha1.Workloads, l5dv1alpha1.SucceededReason, "successfully gathered workload restart states") // process workload injections var failed []*unstructured.Unstructured var restarted []*unstructured.Unstructured failed, restarted, err = r.processWorkloadInjections(ctx, states) if err != nil { conditions.MarkFalse(l5djob, l5dv1alpha1.Restart, l5dv1alpha1.WorkloadInjectionFailed, "failed workload reinjection: %v", err) return ctrl.Result{}, err } conditions.MarkTrue(l5djob, l5dv1alpha1.Restart, l5dv1alpha1.SucceededReason, "completed workload injection job") // update inventory and status on results updateStatus(l5djob, failed, restarted) log := ctrl.LoggerFrom(ctx) log.Info("completed workload injection job") return ctrl.Result{}, nil } func (r *L5dWorkloadInjectionReconciler) processWorkloadInjections(ctx context.Context, states map[string]*workloads.Workload) (allFailed, allRestarted []*unstructured.Unstructured, err error) { for _, workload := range states { failed, restarted, injErr := restart.AttemptWorkloadInjection(ctx, r.Client, workload) if injErr != nil { err = errors.Join(err, fmt.Errorf("%w for %s/%s: %w", ErrReinjectingWorkload, workload.Owner.GetNamespace(), workload.Owner.GetName(), injErr)) } allFailed = append(allFailed, failed...) allRestarted = append(allRestarted, restarted...) } return allFailed, allRestarted, err } func updateStatus(l5djob *l5dv1alpha1.LinkerdWorkloadInjection, failed, restarted []*unstructured.Unstructured) { restartedInventory := inventory.New(inventory.FromUnstructured(restarted...)) if restartedInventory != nil { l5djob.Status.Inventory = restartedInventory } failedInventory := inventory.New(inventory.FromUnstructured(failed...)) if failedInventory != nil { l5djob.Status.FailedInventory = failedInventory } l5djob.Status.CompletedAt = metav1.Now() conditions.Delete(l5djob, status.ReconcilingCondition) } func (r *L5dWorkloadInjectionReconciler) summarizer(ctx context.Context, patcher *patch.SerialPatcher, l5djob *l5dv1alpha1.LinkerdWorkloadInjection, result edgereconcile.Result, err error) (res ctrl.Result, recErr error) { s := edgereconcile.NewSummarizer(patcher) res, recErr = s.SummarizeAndPatch(ctx, l5djob, edgereconcile.WithConditions(r.Conditions), edgereconcile.WithResult(result), edgereconcile.WithError(err), edgereconcile.WithIgnoreNotFound(), edgereconcile.WithProcessors( edgereconcile.RecordReconcileReq, edgereconcile.RecordResult, ), edgereconcile.WithFieldOwner(r.Name), edgereconcile.WithEventRecorder(r.EventRecorder), ) l5dmetrics.RecordWorkloadInjectionReadiness(l5djob) return res, recErr }