1 package controllers
2
3 import (
4 "context"
5 "errors"
6 "fmt"
7 "time"
8
9 "github.com/go-logr/logr"
10 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
11 "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
12 "k8s.io/apimachinery/pkg/types"
13 kuberecorder "k8s.io/client-go/tools/record"
14 ctrl "sigs.k8s.io/controller-runtime"
15 "sigs.k8s.io/controller-runtime/pkg/builder"
16 "sigs.k8s.io/controller-runtime/pkg/client"
17 "sigs.k8s.io/controller-runtime/pkg/predicate"
18
19 "edge-infra.dev/pkg/k8s/meta/status"
20 "edge-infra.dev/pkg/k8s/runtime/conditions"
21 edgereconcile "edge-infra.dev/pkg/k8s/runtime/controller/reconcile"
22 "edge-infra.dev/pkg/k8s/runtime/events"
23 inventory "edge-infra.dev/pkg/k8s/runtime/inventory"
24 "edge-infra.dev/pkg/k8s/runtime/patch"
25
26 linkerd "edge-infra.dev/pkg/edge/linkerd"
27 l5dv1alpha1 "edge-infra.dev/pkg/edge/linkerd/k8s/apis/linkerd/v1alpha1"
28 l5dmetrics "edge-infra.dev/pkg/edge/linkerd/k8s/controllers/metrics"
29 "edge-infra.dev/pkg/edge/linkerd/k8s/controllers/workloadinjection/restart"
30 "edge-infra.dev/pkg/edge/linkerd/k8s/controllers/workloadinjection/workloads"
31 "edge-infra.dev/pkg/k8s/runtime/controller/metrics"
32 )
33
34 var (
35
36 ErrGettingLinkerdObject = errors.New("unable to get the linkerd object")
37
38 ErrGettingRestartStates = errors.New("failed to get workload restart states")
39
40 ErrReinjectingWorkload = errors.New("failed workload reinjection")
41 )
42
43
44 type L5dWorkloadInjectionReconciler struct {
45 client.Client
46 kuberecorder.EventRecorder
47 Log logr.Logger
48 Name string
49 RequeueTime time.Duration
50 Conditions edgereconcile.Conditions
51 Metrics metrics.Metrics
52 }
53
54 func newL5dWorkloadInjectionReconciler(mgr ctrl.Manager, metrics metrics.Metrics, eventRecorder *events.Recorder) *L5dWorkloadInjectionReconciler {
55 return &L5dWorkloadInjectionReconciler{
56 Client: mgr.GetClient(),
57 Log: ctrl.Log.WithName(linkerd.WorkloadInjectionControllerName),
58 Name: linkerd.WorkloadInjectionControllerName,
59 RequeueTime: 30 * time.Second,
60 Conditions: LinkerdWorkloadInjectionConditions,
61 Metrics: metrics,
62 EventRecorder: eventRecorder,
63 }
64 }
65
66 var LinkerdWorkloadInjectionConditions = edgereconcile.Conditions{
67 Target: status.ReadyCondition,
68 Owned: []string{
69 status.ReconcilingCondition,
70 l5dv1alpha1.Restart,
71 l5dv1alpha1.Workloads,
72 },
73 Summarize: []string{
74 status.ReadyCondition,
75 status.ReconcilingCondition,
76 l5dv1alpha1.Restart,
77 l5dv1alpha1.Workloads,
78 },
79 NegativePolarity: []string{
80 status.ReconcilingCondition,
81 },
82 }
83
84
85 func (r *L5dWorkloadInjectionReconciler) SetupWithManager(mgr ctrl.Manager) error {
86 return ctrl.NewControllerManagedBy(mgr).
87 For(&l5dv1alpha1.LinkerdWorkloadInjection{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
88 WithEventFilter(createEventFilter()).
89 Complete(r)
90 }
91
92 func (r *L5dWorkloadInjectionReconciler) Reconcile(ctx context.Context, req ctrl.Request) (result ctrl.Result, err error) {
93
94 l5djob := &l5dv1alpha1.LinkerdWorkloadInjection{}
95 if err = r.Get(ctx, req.NamespacedName, l5djob); err != nil {
96 return ctrl.Result{}, err
97 }
98
99
100 if l5djob.IsCompleted() {
101 return ctrl.Result{}, nil
102 }
103
104
105 l5d := &l5dv1alpha1.Linkerd{}
106 if err = r.Get(ctx, types.NamespacedName{Name: l5dv1alpha1.Name}, l5d); err != nil {
107 return ctrl.Result{}, errors.Join(ErrGettingLinkerdObject, err)
108 }
109
110
111 if l5djob.Name == linkerd.Namespace && l5d.IsWorkloadInjectionDisabled() {
112 return ctrl.Result{}, nil
113 }
114
115 patcher := patch.NewSerialPatcher(l5djob, r.Client)
116 defer func() {
117 rs := edgereconcile.ResultSuccess
118 if err != nil {
119 rs = edgereconcile.ResultEmpty
120 }
121 result, err = r.summarizer(ctx, patcher, l5djob, rs, err)
122 }()
123
124
125 conditions.MarkTrue(l5djob, status.ReconcilingCondition, l5dv1alpha1.WorkloadInjectionProgressing, "reconciliation in progress")
126 l5djob.Status.StartAt = metav1.Now()
127
128
129 var states map[string]*workloads.Workload
130 states, err = workloads.RestartStates(ctx, r.Client, l5d, l5djob)
131 if err != nil {
132 conditions.MarkFalse(l5djob, l5dv1alpha1.Workloads, l5dv1alpha1.RestartStatesFailed, "failed to get workload restart states: %v", err)
133 return ctrl.Result{}, errors.Join(ErrGettingRestartStates, err)
134 }
135 conditions.MarkTrue(l5djob, l5dv1alpha1.Workloads, l5dv1alpha1.SucceededReason, "successfully gathered workload restart states")
136
137
138 var failed []*unstructured.Unstructured
139 var restarted []*unstructured.Unstructured
140 failed, restarted, err = r.processWorkloadInjections(ctx, states)
141 if err != nil {
142 conditions.MarkFalse(l5djob, l5dv1alpha1.Restart, l5dv1alpha1.WorkloadInjectionFailed, "failed workload reinjection: %v", err)
143 return ctrl.Result{}, err
144 }
145 conditions.MarkTrue(l5djob, l5dv1alpha1.Restart, l5dv1alpha1.SucceededReason, "completed workload injection job")
146
147
148 updateStatus(l5djob, failed, restarted)
149
150 log := ctrl.LoggerFrom(ctx)
151 log.Info("completed workload injection job")
152 return ctrl.Result{}, nil
153 }
154
155 func (r *L5dWorkloadInjectionReconciler) processWorkloadInjections(ctx context.Context, states map[string]*workloads.Workload) (allFailed, allRestarted []*unstructured.Unstructured, err error) {
156 for _, workload := range states {
157 failed, restarted, injErr := restart.AttemptWorkloadInjection(ctx, r.Client, workload)
158 if injErr != nil {
159 err = errors.Join(err, fmt.Errorf("%w for %s/%s: %w", ErrReinjectingWorkload, workload.Owner.GetNamespace(), workload.Owner.GetName(), injErr))
160 }
161 allFailed = append(allFailed, failed...)
162 allRestarted = append(allRestarted, restarted...)
163 }
164 return allFailed, allRestarted, err
165 }
166
167 func updateStatus(l5djob *l5dv1alpha1.LinkerdWorkloadInjection, failed, restarted []*unstructured.Unstructured) {
168 restartedInventory := inventory.New(inventory.FromUnstructured(restarted...))
169 if restartedInventory != nil {
170 l5djob.Status.Inventory = restartedInventory
171 }
172
173 failedInventory := inventory.New(inventory.FromUnstructured(failed...))
174 if failedInventory != nil {
175 l5djob.Status.FailedInventory = failedInventory
176 }
177
178 l5djob.Status.CompletedAt = metav1.Now()
179 conditions.Delete(l5djob, status.ReconcilingCondition)
180 }
181
182 func (r *L5dWorkloadInjectionReconciler) summarizer(ctx context.Context, patcher *patch.SerialPatcher, l5djob *l5dv1alpha1.LinkerdWorkloadInjection, result edgereconcile.Result, err error) (res ctrl.Result, recErr error) {
183 s := edgereconcile.NewSummarizer(patcher)
184 res, recErr = s.SummarizeAndPatch(ctx, l5djob,
185 edgereconcile.WithConditions(r.Conditions),
186 edgereconcile.WithResult(result),
187 edgereconcile.WithError(err),
188 edgereconcile.WithIgnoreNotFound(),
189 edgereconcile.WithProcessors(
190 edgereconcile.RecordReconcileReq,
191 edgereconcile.RecordResult,
192 ),
193 edgereconcile.WithFieldOwner(r.Name),
194 edgereconcile.WithEventRecorder(r.EventRecorder),
195 )
196
197 l5dmetrics.RecordWorkloadInjectionReadiness(l5djob)
198
199 return res, recErr
200 }
201
View as plain text