1 package controllers
2
3 import (
4 "context"
5 "time"
6
7 certmgr "github.com/cert-manager/cert-manager/pkg/apis/certmanager/v1"
8 "github.com/fluxcd/pkg/ssa"
9 "github.com/go-logr/logr"
10 corev1 "k8s.io/api/core/v1"
11 "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
12 kuberecorder "k8s.io/client-go/tools/record"
13 "sigs.k8s.io/cli-utils/pkg/kstatus/polling"
14 ctrl "sigs.k8s.io/controller-runtime"
15 "sigs.k8s.io/controller-runtime/pkg/builder"
16 "sigs.k8s.io/controller-runtime/pkg/client"
17 "sigs.k8s.io/controller-runtime/pkg/event"
18 "sigs.k8s.io/controller-runtime/pkg/handler"
19 "sigs.k8s.io/controller-runtime/pkg/predicate"
20 "sigs.k8s.io/controller-runtime/pkg/reconcile"
21
22 "edge-infra.dev/pkg/edge/component/build"
23 linkerd "edge-infra.dev/pkg/edge/linkerd"
24 "edge-infra.dev/pkg/edge/linkerd/certs/identity"
25 "edge-infra.dev/pkg/edge/linkerd/certs/trustanchor"
26 l5dv1alpha1 "edge-infra.dev/pkg/edge/linkerd/k8s/apis/linkerd/v1alpha1"
27 l5dconfig "edge-infra.dev/pkg/edge/linkerd/k8s/controllers/linkerd/config"
28 "edge-infra.dev/pkg/edge/linkerd/k8s/controllers/linkerd/install"
29 "edge-infra.dev/pkg/edge/linkerd/k8s/controllers/linkerd/proxyinjection"
30 l5dmetrics "edge-infra.dev/pkg/edge/linkerd/k8s/controllers/metrics"
31 "edge-infra.dev/pkg/k8s/meta/status"
32 "edge-infra.dev/pkg/k8s/runtime/conditions"
33 "edge-infra.dev/pkg/k8s/runtime/controller/metrics"
34 edgereconcile "edge-infra.dev/pkg/k8s/runtime/controller/reconcile"
35 "edge-infra.dev/pkg/k8s/runtime/events"
36 "edge-infra.dev/pkg/k8s/runtime/inventory"
37 "edge-infra.dev/pkg/k8s/runtime/patch"
38 "edge-infra.dev/pkg/sds/ien/topology"
39 l5dchart "edge-infra.dev/third_party/k8s/linkerd/helm"
40 )
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81 type L5dReconciler struct {
82 client.Client
83 kuberecorder.EventRecorder
84 Log logr.Logger
85 Manifests []*unstructured.Unstructured
86 Name string
87 ResourceManager *ssa.ResourceManager
88 Conditions edgereconcile.Conditions
89 Metrics metrics.Metrics
90 Config *l5dconfig.Config
91 }
92
93 func newL5dReconciler(mgr ctrl.Manager, l5dcfg Config, metrics metrics.Metrics, eventRecorder *events.Recorder) *L5dReconciler {
94 l5dctl := &L5dReconciler{
95 Client: mgr.GetClient(),
96 Log: ctrl.Log.WithName(linkerd.LinkerdControllerName),
97 Name: linkerd.LinkerdControllerName,
98 Conditions: LinkerdConditions,
99 Metrics: metrics,
100 EventRecorder: eventRecorder,
101 }
102 var registry, l5dDir string = build.DefaultPublicContainerRegistry, "/etc/l5d"
103 if l5dcfg.Registry != nil {
104 registry = *l5dcfg.Registry
105 }
106 if l5dcfg.L5dDirPath != nil {
107 l5dDir = *l5dcfg.L5dDirPath
108 }
109 l5dctl.Config = l5dconfig.NewConfig(registry, l5dDir)
110 return l5dctl
111 }
112
113 var LinkerdConditions = edgereconcile.Conditions{
114 Target: status.ReadyCondition,
115 Owned: []string{
116 status.ReconcilingCondition,
117 status.StalledCondition,
118 l5dv1alpha1.Apply,
119 l5dv1alpha1.ProxyInjection,
120 l5dv1alpha1.CertManager,
121 l5dv1alpha1.TrustAnchor,
122 },
123 Summarize: []string{
124 status.ReconcilingCondition,
125 status.StalledCondition,
126 l5dv1alpha1.Apply,
127 l5dv1alpha1.ProxyInjection,
128 l5dv1alpha1.CertManager,
129 l5dv1alpha1.TrustAnchor,
130 },
131 NegativePolarity: []string{
132 status.ReconcilingCondition,
133 status.StalledCondition,
134 },
135 }
136
137
138
139
140 func (r *L5dReconciler) SetupWithManager(mgr ctrl.Manager) error {
141 return ctrl.NewControllerManagedBy(mgr).
142 For(&l5dv1alpha1.Linkerd{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).
143 Owns(&certmgr.Certificate{}, builder.WithPredicates(predicate.NewPredicateFuncs(isLinkerdIssuer))).
144 Owns(&certmgr.Issuer{}, builder.WithPredicates(predicate.NewPredicateFuncs(isLinkerdCertificate))).
145 WithEventFilter(createEventFilter()).
146 Watches(
147 &corev1.ConfigMap{},
148 handler.EnqueueRequestsFromMapFunc(r.createReconcileRequests),
149 builder.WithPredicates(topology.PredicateFilter()),
150 ).
151 Complete(r)
152 }
153
154
155 func isLinkerdIssuer(obj client.Object) bool {
156 return obj.GetName() == linkerd.IssuerName && obj.GetNamespace() == linkerd.Namespace
157 }
158
159
160 func isLinkerdCertificate(obj client.Object) bool {
161 return obj.GetName() == linkerd.TrustAnchorName && obj.GetNamespace() == linkerd.Namespace
162 }
163
164 func createEventFilter() predicate.Predicate {
165 return predicate.Funcs{
166 CreateFunc: func(_ event.CreateEvent) bool {
167 return true
168 },
169 UpdateFunc: func(e event.UpdateEvent) bool {
170 if node, ok := e.ObjectNew.(*corev1.Node); ok {
171
172 return !node.DeletionTimestamp.IsZero()
173 }
174 return true
175 },
176 DeleteFunc: func(_ event.DeleteEvent) bool {
177 return false
178 },
179 }
180 }
181
182
183 func (r *L5dReconciler) createReconcileRequests(_ context.Context, _ client.Object) []reconcile.Request {
184 linkerdList := &l5dv1alpha1.LinkerdList{}
185 if err := r.Client.List(context.Background(), linkerdList); err != nil {
186 return nil
187 }
188
189 reconcileRequests := []reconcile.Request{}
190 for _, linkerd := range linkerdList.Items {
191 reconcileRequests = append(reconcileRequests, reconcile.Request{
192 NamespacedName: client.ObjectKey{
193 Namespace: linkerd.GetNamespace(),
194 Name: linkerd.GetName(),
195 },
196 })
197 }
198
199 return reconcileRequests
200 }
201
202
203
204
205
206
207
208
209
210
211
212
213 func (r *L5dReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
214 r.setResourceManager()
215
216 l5d := l5dv1alpha1.Linkerd{}
217 if err := r.Get(ctx, req.NamespacedName, &l5d); err != nil {
218 return ctrl.Result{}, err
219 }
220
221 patcher := patch.NewSerialPatcher(&l5d, r.Client)
222
223
224 if len(l5d.Status.Conditions) == 0 || l5d.Status.Version != l5dchart.Version {
225 conditions.MarkReconciling(&l5d, status.ProgressingReason, "reconciliation in progress")
226 if _, err := r.summarizer(ctx, patcher, &l5d); err != nil {
227 return ctrl.Result{RequeueAfter: 1 * time.Minute}, err
228 }
229 }
230
231
232 l5d, reconcileErr := r.reconcile(ctx, *l5d.DeepCopy())
233 if reconcileErr != nil {
234
235 if _, err := r.summarizer(ctx, patcher, &l5d); err != nil {
236 r.Log.Error(reconcileErr, "Reconcile error")
237 return ctrl.Result{Requeue: true}, err
238 }
239 return ctrl.Result{RequeueAfter: 1 * time.Minute}, reconcileErr
240 }
241
242
243
244
245 conditions.Delete(&l5d, status.ReconcilingCondition)
246 conditions.Delete(&l5d, status.StalledCondition)
247 _, err := r.summarizer(ctx, patcher, &l5d)
248 return ctrl.Result{RequeueAfter: 1 * time.Minute}, err
249 }
250
251
252
253
254
255
256
257
258
259
260 func (r *L5dReconciler) reconcile(ctx context.Context, l5d l5dv1alpha1.Linkerd) (l5dv1alpha1.Linkerd, error) {
261 var err error
262 log := r.Log.WithName("Reconciler")
263
264
265 oldStatus := l5d.Status.DeepCopy()
266 l5d.Status.Inventory = inventory.New()
267
268 if err := install.Namespace(ctx, r.Client, &l5d); err != nil {
269 return l5d, err
270 }
271
272 _, err = trustanchor.CreateIfNotExists(ctx, r.Client, &l5d)
273 if err != nil {
274 return l5d, err
275 }
276
277 if err := r.Config.UpdateTopologyInfo(ctx, r.Client); err != nil {
278 return l5d, err
279 }
280
281 if err := identity.Create(ctx, r.Client, &l5d, r.Config.IdentityIssuer); err != nil {
282 return l5d, err
283 }
284
285 if err := install.Apply(ctx, r.Client, r.ResourceManager, &l5d, r.Config); err != nil {
286 log.Error(err, "unable to apply the linkerd control plane manifests")
287 return l5d, err
288 }
289
290 l5d, err = proxyinjection.Start(ctx, r.Client, r.ResourceManager, l5d)
291 if err != nil {
292 log.Error(err, "unable to start the proxy injection job")
293 return l5d, err
294 }
295
296 if trustanchor.IsRotated(ctx, r.Client) {
297 if err := identity.DeleteIdentity(ctx, r.Client); err != nil {
298 return l5d, err
299 }
300 if err := trustanchor.RemoveRotationAnnotations(ctx, r.Client, &l5d); err != nil {
301 log.Error(err, "unable to remove trust anchor rotation annotations")
302 }
303 }
304
305
306 if oldStatus.Inventory != nil {
307 diff, err := inventory.Diff(oldStatus.Inventory, l5d.GetInventory())
308 if err != nil {
309 return l5d, err
310 }
311 if len(diff) > 0 {
312 changeSet, err := r.ResourceManager.DeleteAll(ctx, diff, ssa.DefaultDeleteOptions())
313 if err != nil {
314 return l5d, err
315 }
316 log.Info("pruned objects", "changeset", linkerd.FilterChanged(changeSet))
317 }
318 }
319
320 return l5d, nil
321 }
322
323 func (r *L5dReconciler) setResourceManager() {
324 if r.ResourceManager == nil {
325 mgr := ssa.NewResourceManager(
326 r.Client,
327
328
329 polling.NewStatusPoller(r.Client, r.Client.RESTMapper(), polling.Options{}), ssa.Owner{Field: r.Name},
330 )
331 r.ResourceManager = mgr
332 }
333 }
334
335 func (r *L5dReconciler) summarizer(ctx context.Context, patcher *patch.SerialPatcher, l5d *l5dv1alpha1.Linkerd) (res ctrl.Result, recErr error) {
336 s := edgereconcile.NewSummarizer(patcher)
337 res, recErr = s.SummarizeAndPatch(ctx, l5d,
338 edgereconcile.WithConditions(r.Conditions),
339 edgereconcile.WithResult(edgereconcile.ResultEmpty),
340 edgereconcile.WithError(recErr),
341 edgereconcile.WithIgnoreNotFound(),
342 edgereconcile.WithProcessors(
343 edgereconcile.RecordReconcileReq,
344 edgereconcile.RecordResult,
345 ),
346 edgereconcile.WithFieldOwner(r.Name),
347 edgereconcile.WithEventRecorder(r.EventRecorder),
348 )
349
350 l5dmetrics.RecordLinkerdReadiness(l5d)
351
352 return res, recErr
353 }
354
View as plain text