1 package lumperctl
2
3 import (
4 "context"
5 "fmt"
6 "time"
7
8 "github.com/google/go-containerregistry/pkg/name"
9 "github.com/google/go-containerregistry/pkg/v1/remote"
10 "go.uber.org/multierr"
11 ctrl "sigs.k8s.io/controller-runtime"
12 "sigs.k8s.io/controller-runtime/pkg/builder"
13 "sigs.k8s.io/controller-runtime/pkg/client"
14 "sigs.k8s.io/controller-runtime/pkg/controller"
15 "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
16 "sigs.k8s.io/controller-runtime/pkg/predicate"
17
18 whv1 "edge-infra.dev/pkg/f8n/warehouse/k8s/apis/v1alpha2"
19 "edge-infra.dev/pkg/f8n/warehouse/k8s/controllers/lumperctl/internal"
20 "edge-infra.dev/pkg/f8n/warehouse/lift/unpack"
21 "edge-infra.dev/pkg/f8n/warehouse/oci/layer"
22 "edge-infra.dev/pkg/f8n/warehouse/pallet"
23 "edge-infra.dev/pkg/k8s/meta/status"
24 "edge-infra.dev/pkg/k8s/runtime/conditions"
25 "edge-infra.dev/pkg/k8s/runtime/controller/reconcile"
26 "edge-infra.dev/pkg/k8s/runtime/controller/reconcile/recerr"
27 "edge-infra.dev/pkg/k8s/runtime/inventory"
28 "edge-infra.dev/pkg/k8s/runtime/patch"
29 "edge-infra.dev/pkg/k8s/runtime/sap"
30 )
31
32
33
34
35
36
37
38 type UnpackedPalletReconciler struct {
39 *internal.Reconciler
40
41 depRequeueInterval time.Duration
42 }
43
44
45
46 var unpackReadyConditions = []string{
47 whv1.RuntimeReadyCondition,
48 whv1.InfraReadyCondition,
49 whv1.RuntimeCapabilitiesReadyCondition,
50 }
51
52
53
54
55 var unpackedPalletConditions = reconcile.Conditions{
56 Target: status.ReadyCondition,
57 Owned: append(unpackReadyConditions,
58 status.DependenciesReadyCondition,
59 whv1.HealthyCondition,
60 whv1.FetchedArtifactCondition,
61 status.ReadyCondition,
62 status.ReconcilingCondition,
63 status.StalledCondition,
64 ),
65 Summarize: append(unpackReadyConditions,
66 whv1.HealthyCondition,
67 whv1.FetchedArtifactCondition,
68 status.DependenciesReadyCondition,
69 status.StalledCondition,
70 ),
71 NegativePolarity: []string{
72 status.ReconcilingCondition,
73 status.StalledCondition,
74 },
75 }
76
77 func (r *UnpackedPalletReconciler) SetupWithManager(mgr ctrl.Manager, cfg UnpackedPalletCfg) error {
78 if err := r.Reconciler.SetupWithManager(mgr); err != nil {
79 return err
80 }
81
82 r.depRequeueInterval = cfg.DepRequeueInterval
83
84 return ctrl.NewControllerManagedBy(mgr).
85 For(&whv1.UnpackedPallet{}, builder.WithPredicates(
86
87 predicate.GenerationChangedPredicate{},
88 )).
89 WithOptions(controller.Options{
90 MaxConcurrentReconciles: cfg.Concurrent,
91 }).
92 Complete(r)
93 }
94
95 func (r *UnpackedPalletReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, recErr error) {
96 var (
97 reconcileStart = time.Now()
98 log = ctrl.LoggerFrom(ctx)
99 result = reconcile.ResultEmpty
100 u = &whv1.UnpackedPallet{}
101 )
102
103 if err := r.Get(ctx, req.NamespacedName, u); err != nil {
104 return ctrl.Result{}, client.IgnoreNotFound(err)
105 }
106
107 patcher := patch.NewSerialPatcher(u, r.Client)
108
109 defer func() {
110 s := reconcile.NewSummarizer(patcher)
111 res, recErr = s.SummarizeAndPatch(ctx, u,
112 reconcile.WithConditions(r.Conditions),
113 reconcile.WithResult(result),
114 reconcile.WithError(recErr),
115 reconcile.WithIgnoreNotFound(),
116 reconcile.WithProcessors(
117 reconcile.RecordReconcileReq,
118 reconcile.RecordResult,
119 ),
120 reconcile.WithFieldOwner(r.Name),
121 )
122
123 r.Metrics.RecordConditionsWithReason(ctx, u)
124 r.Metrics.RecordSuspend(ctx, u, u.Spec.Suspend)
125 r.Metrics.RecordDuration(ctx, u, reconcileStart)
126 r.LivenessChecker.AddStatus(u, recErr)
127 internal.RecordCacheLen(float64(r.Cache.Len()))
128 internal.RecordEdgeVersion(u.Name, u.ObjectMeta.Annotations["pallet.edge.ncr.com/version"], !u.GetDeletionTimestamp().IsZero())
129 }()
130
131
132 if !controllerutil.ContainsFinalizer(u, whv1.WarehouseFinalizer) {
133 controllerutil.AddFinalizer(u, whv1.WarehouseFinalizer)
134
135
136 result = reconcile.ResultRequeue
137 return
138 }
139
140
141 if !u.ObjectMeta.DeletionTimestamp.IsZero() {
142 log.Info("detected deletion, executing finalizer")
143
144
145
146 finalizer := Finalizer{
147 Name: whv1.WarehouseFinalizer,
148 ResourceManager: r.ResourceManager,
149 }
150 if err := finalizer.Finalize(ctx, u, u.Spec.Prune); err != nil {
151 recErr = err
152 return
153 }
154 controllerutil.RemoveFinalizer(u, whv1.WarehouseFinalizer)
155 log.Info("finalizer executed")
156 return
157 }
158
159 if u.Spec.Suspend {
160 log.Info("reconciliation is suspended", "suspended", "true")
161 return
162 }
163
164 log.Info("reconciling")
165 if recErr = r.reconcile(ctx, patcher, u); recErr == nil {
166 result = reconcile.ResultSuccess
167 }
168 return
169 }
170
171 func (r *UnpackedPalletReconciler) reconcile(
172 ctx context.Context,
173 patcher *patch.SerialPatcher,
174 u *whv1.UnpackedPallet,
175 ) recerr.Error {
176 if u.Spec.UnpackOptions.Provider == "" {
177 u.Spec.UnpackOptions.Provider = r.Provider
178 }
179
180 if err := u.Validate(); err != nil {
181 return recerr.NewStalled(fmt.Errorf("invalid spec: %w", err),
182 whv1.InvalidSpecReason)
183 }
184
185
186
187 if err := r.reconcileDeps(ctx, u); err != nil {
188 err.ToCondition(u, status.DependenciesReadyCondition)
189 return err
190 }
191
192 if err := reconcile.Progressing(ctx, u, patcher, r.PatchOpts()...); err != nil {
193 return recerr.New(err, whv1.ReconcileFailedReason)
194 }
195
196 ref, err := parseRef(u.Spec.Artifact)
197 if err != nil {
198 return err
199 }
200
201 log := logWithRef(ctrl.LoggerFrom(ctx), ref, u.Spec.Name)
202
203 ctx = ctrl.LoggerInto(ctx, log)
204
205 p, err := r.fetch(ctx, u, ref)
206 if err != nil {
207 return err
208 }
209
210 if !p.Supports(u.Spec.Provider) {
211 return recerr.NewStalled(
212 fmt.Errorf("%s not supported: [%s]", u.Spec.Provider, p.Providers()),
213 whv1.InvalidArtifactReason,
214 )
215 }
216
217 layers, unpackErr := unpack.Layers(p,
218 unpack.ForProvider(u.Spec.Provider),
219 unpack.ForLayerKeys(u.Layers()...),
220 unpack.RenderWith(u.Spec.Parameters),
221 unpack.WithInfraNamespace(u.Spec.InfraNamespace),
222 )
223 switch {
224 case unpackErr != nil:
225 return recerr.New(fmt.Errorf("failed to unpack: %w", unpackErr),
226 whv1.UnpackFailedReason)
227 case len(layers) == 0:
228 return recerr.NewStalled(
229 fmt.Errorf("Package contained no objects for the provided unpacking options"),
230 whv1.UnpackFailedReason,
231 )
232 }
233
234 if err := r.apply(ctx, u, layers); err != nil {
235 return err
236 }
237 log.Info("applied artifact")
238 conditions.MarkTrue(u, whv1.HealthyCondition, status.SucceededReason,
239 "Objects reconciled")
240
241 return nil
242 }
243
244 func (r *UnpackedPalletReconciler) reconcileDeps(
245 ctx context.Context,
246 u *whv1.UnpackedPallet,
247 ) recerr.Error {
248 switch {
249 case len(u.Spec.DependsOn) > 0:
250 u.Status.Dependencies = u.Dependencies()
251 case len(u.Spec.DependsOn) == 0:
252 u.Status.Dependencies = ""
253 conditions.Delete(u, status.DependenciesReadyCondition)
254 return nil
255 }
256
257 var unready []string
258 for _, dep := range u.Spec.DependsOn {
259 var du whv1.UnpackedPallet
260 if err := r.Client.Get(ctx, client.ObjectKey{Name: dep.Name}, &du); err != nil {
261
262
263 return recerr.NewWait(err, status.DependencyNotFoundReason, u.RetryInterval())
264 }
265
266 if !du.IsUpToDate() {
267 unready = append(unready, du.Name)
268 }
269 }
270
271 if len(unready) == 0 {
272 conditions.MarkTrue(
273 u,
274 status.DependenciesReadyCondition,
275 status.DependencyReadyReason,
276 "Dependencies up to date",
277 )
278 return nil
279 }
280
281 return recerr.NewWait(
282 fmt.Errorf("%d/%d dependencies ready: waiting for %s",
283 len(u.Spec.DependsOn)-len(unready), len(u.Spec.DependsOn), unready),
284 status.DependencyNotReadyReason,
285 r.depRequeueInterval,
286 )
287 }
288
289
290
291
292 func (r *UnpackedPalletReconciler) fetch(
293 ctx context.Context,
294 u *whv1.UnpackedPallet,
295 ref name.Reference,
296 ) (pallet.Pallet, recerr.Error) {
297
298
299 keychain, recErr := r.Keychain(ctx, u)
300 if recErr != nil {
301 return nil, recErr
302 }
303 a, recErr := r.Fetch(
304 ctx,
305 u,
306 ref,
307 u.Spec.PackagePullOptions.PackagePullPolicy,
308 remote.WithAuthFromKeychain(keychain),
309 )
310 if recErr != nil {
311 return nil, recErr
312 }
313
314
315 p, err := pallet.New(a)
316 if err != nil {
317 return nil, recerr.NewStalled(err, whv1.InvalidArtifactReason)
318 }
319
320 digest := u.Spec.Artifact.Digest
321 if digest == "" {
322 d, err := a.Digest()
323 if err != nil {
324 return nil, recerr.New(fmt.Errorf("failed to compute digest: %w", err),
325 whv1.FetchFailedReason)
326 }
327 digest = d.String()
328 }
329 u.Status.LastAttempted = digest
330
331 return p, nil
332 }
333
334
335
336
337
338
339
340
341
342
343
344
345
346 func (r *UnpackedPalletReconciler) apply(
347 ctx context.Context,
348 u *whv1.UnpackedPallet,
349 layers []layer.Layer,
350 ) recerr.Error {
351
352
353
354
355 for _, c := range unpackReadyConditions {
356 conditions.Delete(u, c)
357 }
358
359
360
361 var (
362 changes = make(map[string]*sap.ChangeSet)
363 applyErrs = make(map[string]error)
364 )
365 for _, l := range layers {
366 log := ctrl.LoggerFrom(ctx).WithValues("layer", l.Key())
367
368
369 var condition string
370 switch {
371 case l.Key() == layer.Infra.String():
372 condition = whv1.InfraReadyCondition
373 case l.Key() == layer.Runtime.String():
374 condition = whv1.RuntimeReadyCondition
375 case l.Type() == layer.Runtime:
376
377 condition = whv1.RuntimeCapabilitiesReadyCondition
378 default:
379
380 return recerr.NewStalled(
381 fmt.Errorf("unexpected layer: type %s with key %s", l.Type(), l.Key()),
382 whv1.InvalidArtifactReason,
383 )
384 }
385
386 layerChanges, err := r.applyLayer(ctx, u, l)
387 if err != nil {
388 log.Error(err, "failed to apply objects")
389 applyErrs[condition] = multierr.Append(applyErrs[condition], err)
390 switch condition {
391 case whv1.RuntimeCapabilitiesReadyCondition:
392
393
394
395
396
397
398
399 log.Info("failed to apply objects", "error", err)
400 default:
401
402
403 log.Error(err, "failed to apply objects")
404 conditions.MarkFalse(u, condition, whv1.ApplyFailedReason, "%v", err)
405 }
406 }
407 if layerChanges != nil {
408 log.Info("applied objects", "changeset", layerChanges.ToMap())
409 if changes[condition] == nil {
410 changes[condition] = sap.NewChangeSet()
411 }
412 changes[condition].Append(layerChanges.Entries)
413 }
414 }
415
416
417 if applyErrs[whv1.RuntimeCapabilitiesReadyCondition] != nil {
418 conditions.MarkFalse(u,
419 whv1.RuntimeCapabilitiesReadyCondition,
420 whv1.ApplyFailedReason,
421 "Attempted to apply runtime capabilities: %s",
422 applyErrs[whv1.RuntimeCapabilitiesReadyCondition].Error(),
423 )
424 }
425
426
427
428 if applyErrs[whv1.RuntimeReadyCondition] != nil ||
429 applyErrs[whv1.InfraReadyCondition] != nil {
430 return recerr.New(
431 multierr.Append(
432 applyErrs[whv1.RuntimeReadyCondition],
433 applyErrs[whv1.InfraReadyCondition],
434 ),
435 whv1.ApplyFailedReason,
436 )
437 }
438
439
440 changeset := sap.NewChangeSet()
441 for _, v := range changes {
442 changeset.Append(v.Entries)
443 }
444
445 newInv := inventory.New(inventory.FromSapChangeSet(changeset))
446 if err := r.prune(ctx, u, newInv); err != nil {
447 err := recerr.New(err, whv1.PruneFailedReason)
448 err.ToCondition(u, whv1.HealthyCondition)
449 return err
450 }
451
452 u.Status.LastApplied = u.Status.LastAttempted
453 u.Status.Inventory = newInv
454 u.Status.ShortDigest = u.ShortDigest()
455
456
457
458 var waitErr error
459 for k, v := range changes {
460
461 if k == whv1.RuntimeCapabilitiesReadyCondition && applyErrs[k] != nil {
462 continue
463 }
464
465 if err := r.ResourceManager.WaitForSet(ctx,
466 v.ToObjMetadataSet(),
467 sap.WaitOptions{Timeout: u.Spec.Timeout.Duration},
468 ); err != nil {
469 multierr.AppendInto(&waitErr, err)
470 conditions.MarkFalse(u, k, whv1.ReconcileFailedReason, "%v", err)
471 continue
472 }
473
474 conditions.MarkTrue(u, k, status.SucceededReason, "Objects reconciled")
475 }
476
477 if waitErr != nil {
478 return recerr.NewWait(waitErr, whv1.ReconcileFailedReason, u.RetryInterval())
479 }
480
481
482
483
484 if applyErrs[whv1.RuntimeCapabilitiesReadyCondition] != nil {
485 return recerr.New(applyErrs[whv1.RuntimeCapabilitiesReadyCondition],
486 whv1.ApplyFailedReason)
487 }
488
489 return nil
490 }
491
492 func (r *UnpackedPalletReconciler) applyLayer(
493 ctx context.Context,
494 u *whv1.UnpackedPallet,
495 l layer.Layer,
496 ) (*sap.ChangeSet, error) {
497 mgr := r.ResourceManager
498
499 objs, err := l.Unstructured()
500 if err != nil {
501 return nil, err
502 }
503 mgr.SetOwnerLabels(objs, u.Name, "")
504
505 changeset, err := mgr.ApplyAllStaged(ctx, objs, sap.ApplyOptions{
506 Force: u.Spec.Force,
507 WaitTimeout: u.Spec.Timeout.Duration,
508 })
509 if err != nil {
510 return nil, err
511 }
512
513 return changeset, err
514 }
515
516 func (r *UnpackedPalletReconciler) prune(ctx context.Context, u *whv1.UnpackedPallet, newInventory *inventory.ResourceInventory) error {
517 log := ctrl.LoggerFrom(ctx).WithName("prune")
518
519 switch {
520 case !u.Spec.Prune:
521 log.Info("pruning is disabled")
522 return nil
523 case u.Status.Inventory == nil:
524 return nil
525 default:
526 diff, err := u.Status.Inventory.Diff(newInventory)
527 if err != nil || len(diff) == 0 {
528 return err
529 }
530
531
532 deleted, err := r.ResourceManager.DeleteAll(ctx, diff, sap.DefaultDeleteOptions())
533 if err != nil {
534 return err
535 }
536 log.Info("pruned", "changeset", deleted.ToMap())
537 return nil
538 }
539 }
540
View as plain text