package lumperctl import ( "context" "fmt" "time" "github.com/google/go-containerregistry/pkg/name" "github.com/google/go-containerregistry/pkg/v1/remote" "go.uber.org/multierr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/predicate" whv1 "edge-infra.dev/pkg/f8n/warehouse/k8s/apis/v1alpha2" "edge-infra.dev/pkg/f8n/warehouse/k8s/controllers/lumperctl/internal" "edge-infra.dev/pkg/f8n/warehouse/lift/unpack" "edge-infra.dev/pkg/f8n/warehouse/oci/layer" "edge-infra.dev/pkg/f8n/warehouse/pallet" "edge-infra.dev/pkg/k8s/meta/status" "edge-infra.dev/pkg/k8s/runtime/conditions" "edge-infra.dev/pkg/k8s/runtime/controller/reconcile" "edge-infra.dev/pkg/k8s/runtime/controller/reconcile/recerr" "edge-infra.dev/pkg/k8s/runtime/inventory" "edge-infra.dev/pkg/k8s/runtime/patch" "edge-infra.dev/pkg/k8s/runtime/sap" ) // +kubebuilder:rbac:groups=warehouse.edge.ncr.com,resources=unpackedpallets,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=warehouse.edge.ncr.com,resources=unpackedpallets/status,verbs=get;update;patch // +kubebuilder:rbac:groups=warehouse.edge.ncr.com,resources=unpackedpallets/finalizers,verbs=get;create;update;patch;delete // +kubebuilder:rbac:groups="",resources=configmaps;secrets;serviceaccounts,verbs=get;list;watch // +kubebuilder:rbac:groups="",resources=events,verbs=create;patch type UnpackedPalletReconciler struct { *internal.Reconciler depRequeueInterval time.Duration } // unpackReadyConditions are the ready conditions that may be applied and // evaluated for an individual UnpackedPallet object's readiness var unpackReadyConditions = []string{ whv1.RuntimeReadyCondition, whv1.InfraReadyCondition, whv1.RuntimeCapabilitiesReadyCondition, } // unpackedPalletConditions is the reconcile summarization configuration for how // various conditions should be taken into account when the final condition is // summarized var unpackedPalletConditions = reconcile.Conditions{ Target: status.ReadyCondition, Owned: append(unpackReadyConditions, status.DependenciesReadyCondition, whv1.HealthyCondition, whv1.FetchedArtifactCondition, status.ReadyCondition, status.ReconcilingCondition, status.StalledCondition, ), Summarize: append(unpackReadyConditions, whv1.HealthyCondition, whv1.FetchedArtifactCondition, status.DependenciesReadyCondition, status.StalledCondition, ), NegativePolarity: []string{ status.ReconcilingCondition, status.StalledCondition, }, } func (r *UnpackedPalletReconciler) SetupWithManager(mgr ctrl.Manager, cfg UnpackedPalletCfg) error { if err := r.Reconciler.SetupWithManager(mgr); err != nil { return err } r.depRequeueInterval = cfg.DepRequeueInterval return ctrl.NewControllerManagedBy(mgr). For(&whv1.UnpackedPallet{}, builder.WithPredicates( // Only re-reconcile if the spec changes, not status or metadata predicate.GenerationChangedPredicate{}, )). WithOptions(controller.Options{ MaxConcurrentReconciles: cfg.Concurrent, }). Complete(r) } func (r *UnpackedPalletReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, recErr error) { var ( reconcileStart = time.Now() log = ctrl.LoggerFrom(ctx) result = reconcile.ResultEmpty u = &whv1.UnpackedPallet{} ) if err := r.Get(ctx, req.NamespacedName, u); err != nil { return ctrl.Result{}, client.IgnoreNotFound(err) } patcher := patch.NewSerialPatcher(u, r.Client) defer func() { s := reconcile.NewSummarizer(patcher) res, recErr = s.SummarizeAndPatch(ctx, u, reconcile.WithConditions(r.Conditions), reconcile.WithResult(result), reconcile.WithError(recErr), reconcile.WithIgnoreNotFound(), reconcile.WithProcessors( reconcile.RecordReconcileReq, reconcile.RecordResult, ), reconcile.WithFieldOwner(r.Name), ) r.Metrics.RecordConditionsWithReason(ctx, u) r.Metrics.RecordSuspend(ctx, u, u.Spec.Suspend) r.Metrics.RecordDuration(ctx, u, reconcileStart) r.LivenessChecker.AddStatus(u, recErr) internal.RecordCacheLen(float64(r.Cache.Len())) internal.RecordEdgeVersion(u.Name, u.ObjectMeta.Annotations["pallet.edge.ncr.com/version"], !u.GetDeletionTimestamp().IsZero()) }() // Add our finalizer if it does not exist if !controllerutil.ContainsFinalizer(u, whv1.WarehouseFinalizer) { controllerutil.AddFinalizer(u, whv1.WarehouseFinalizer) // Return immediately so that we requeue and reconcile object with finalizer // added. result = reconcile.ResultRequeue return } // If a deletion timestamp has been set, execute finalizer logic if !u.ObjectMeta.DeletionTimestamp.IsZero() { log.Info("detected deletion, executing finalizer") // TODO: use serial patcher to incrementally patch terminating status // indicate that we are reconciling the termination so that the resource // does not indicate it is Ready finalizer := Finalizer{ Name: whv1.WarehouseFinalizer, ResourceManager: r.ResourceManager, } if err := finalizer.Finalize(ctx, u, u.Spec.Prune); err != nil { recErr = err return } controllerutil.RemoveFinalizer(u, whv1.WarehouseFinalizer) log.Info("finalizer executed") return } if u.Spec.Suspend { log.Info("reconciliation is suspended", "suspended", "true") return } log.Info("reconciling") if recErr = r.reconcile(ctx, patcher, u); recErr == nil { result = reconcile.ResultSuccess } return } func (r *UnpackedPalletReconciler) reconcile( ctx context.Context, patcher *patch.SerialPatcher, u *whv1.UnpackedPallet, ) recerr.Error { if u.Spec.UnpackOptions.Provider == "" { u.Spec.UnpackOptions.Provider = r.Provider } if err := u.Validate(); err != nil { return recerr.NewStalled(fmt.Errorf("invalid spec: %w", err), whv1.InvalidSpecReason) } // Check dependencies before we mark the object as reconciling to avoid // churn on readiness for resources that depend on the pallet we are reconciling if err := r.reconcileDeps(ctx, u); err != nil { err.ToCondition(u, status.DependenciesReadyCondition) return err } if err := reconcile.Progressing(ctx, u, patcher, r.PatchOpts()...); err != nil { return recerr.New(err, whv1.ReconcileFailedReason) } ref, err := parseRef(u.Spec.Artifact) if err != nil { return err } log := logWithRef(ctrl.LoggerFrom(ctx), ref, u.Spec.Name) // Persist artifact-decorated logger for sub-reconcilers ctx = ctrl.LoggerInto(ctx, log) p, err := r.fetch(ctx, u, ref) if err != nil { return err } if !p.Supports(u.Spec.Provider) { return recerr.NewStalled( fmt.Errorf("%s not supported: [%s]", u.Spec.Provider, p.Providers()), whv1.InvalidArtifactReason, ) } layers, unpackErr := unpack.Layers(p, unpack.ForProvider(u.Spec.Provider), unpack.ForLayerKeys(u.Layers()...), unpack.RenderWith(u.Spec.Parameters), unpack.WithInfraNamespace(u.Spec.InfraNamespace), ) switch { case unpackErr != nil: return recerr.New(fmt.Errorf("failed to unpack: %w", unpackErr), whv1.UnpackFailedReason) case len(layers) == 0: return recerr.NewStalled( fmt.Errorf("Package contained no objects for the provided unpacking options"), whv1.UnpackFailedReason, ) } if err := r.apply(ctx, u, layers); err != nil { return err } log.Info("applied artifact") conditions.MarkTrue(u, whv1.HealthyCondition, status.SucceededReason, "Objects reconciled") return nil } func (r *UnpackedPalletReconciler) reconcileDeps( ctx context.Context, u *whv1.UnpackedPallet, ) recerr.Error { switch { case len(u.Spec.DependsOn) > 0: u.Status.Dependencies = u.Dependencies() case len(u.Spec.DependsOn) == 0: u.Status.Dependencies = "" conditions.Delete(u, status.DependenciesReadyCondition) return nil } var unready []string for _, dep := range u.Spec.DependsOn { var du whv1.UnpackedPallet if err := r.Client.Get(ctx, client.ObjectKey{Name: dep.Name}, &du); err != nil { // Use standard retry interval instead of dependency retry because we // aren't waiting for readiness return recerr.NewWait(err, status.DependencyNotFoundReason, u.RetryInterval()) } if !du.IsUpToDate() { unready = append(unready, du.Name) } } if len(unready) == 0 { conditions.MarkTrue( u, status.DependenciesReadyCondition, status.DependencyReadyReason, "Dependencies up to date", ) return nil } return recerr.NewWait( fmt.Errorf("%d/%d dependencies ready: waiting for %s", len(u.Spec.DependsOn)-len(unready), len(u.Spec.DependsOn), unready), status.DependencyNotReadyReason, r.depRequeueInterval, ) } // fetch fetches the artifact and then either returns digest from artifact spec // or infers it from the fetched artifact (in the event that the artifact spec // is a tag) func (r *UnpackedPalletReconciler) fetch( ctx context.Context, u *whv1.UnpackedPallet, ref name.Reference, ) (pallet.Pallet, recerr.Error) { // Create keychain that is scoped to this reconcile loop, to ensure that we // get up-to-date handles on our auth context (e.g., API key rotation) keychain, recErr := r.Keychain(ctx, u) if recErr != nil { return nil, recErr } a, recErr := r.Fetch( ctx, u, ref, u.Spec.PackagePullOptions.PackagePullPolicy, remote.WithAuthFromKeychain(keychain), ) if recErr != nil { return nil, recErr } // TODO: test stalled condition p, err := pallet.New(a) if err != nil { return nil, recerr.NewStalled(err, whv1.InvalidArtifactReason) } digest := u.Spec.Artifact.Digest if digest == "" { d, err := a.Digest() if err != nil { return nil, recerr.New(fmt.Errorf("failed to compute digest: %w", err), whv1.FetchFailedReason) } digest = d.String() } u.Status.LastAttempted = digest return p, nil } // apply applies any layers passed in, updates inventory + handles pruning, then // waits for the resources to become ready. Dependent on each layer's type, // individual conditions are set based on the result (e.g., InfraReady, // RuntimeReady, etc). // // Inventory is handled before waiting for resource readiness to avoid orphaning // resources if objects between generations do not reconcile. // // Apply is resilient to any individual layer type failing to apply, allowing // the conditions for each layer type to move independently. This allows partial // analysis of dependency readiness in reconcileDeps so that dependencies are not // blocked waiting on runtime capabilities. func (r *UnpackedPalletReconciler) apply( ctx context.Context, u *whv1.UnpackedPallet, layers []layer.Layer, ) recerr.Error { // Reset specific Ready conditions for this Pallet so that we can re-build them. // This ensures that if a specific Ready condition is no longer applicable, // either due to the package contents changing or the unpack options changing, // it is removed from the final status. for _, c := range unpackReadyConditions { conditions.Delete(u, c) } // Track changes and apply errors for each layer type by the condition they // roll up under. var ( changes = make(map[string]*sap.ChangeSet) applyErrs = make(map[string]error) ) for _, l := range layers { log := ctrl.LoggerFrom(ctx).WithValues("layer", l.Key()) // Resolve condition for this layer type var condition string switch { case l.Key() == layer.Infra.String(): condition = whv1.InfraReadyCondition case l.Key() == layer.Runtime.String(): condition = whv1.RuntimeReadyCondition case l.Type() == layer.Runtime: // All remaining runtime layers are runtime capabilities. condition = whv1.RuntimeCapabilitiesReadyCondition default: // Any other combination of keys and types represents unknown behavior. return recerr.NewStalled( fmt.Errorf("unexpected layer: type %s with key %s", l.Type(), l.Key()), whv1.InvalidArtifactReason, ) } layerChanges, err := r.applyLayer(ctx, u, l) if err != nil { log.Error(err, "failed to apply objects") applyErrs[condition] = multierr.Append(applyErrs[condition], err) switch condition { case whv1.RuntimeCapabilitiesReadyCondition: // Only update the runtime capability ready condition after applying // all layers because it can have a many:1 relationship with layers. // // Only log at error-level for layers that are used for dependency // analysis. Failing to apply runtime capability layers is not unusual // and we don't want to spam low-signal error logs. Any persistent // failures will still be reflected in the object status. log.Info("failed to apply objects", "error", err) default: // Update RuntimeCapabilitiesReady after applying all layers because it can // have a many:1 relationship with layers log.Error(err, "failed to apply objects") conditions.MarkFalse(u, condition, whv1.ApplyFailedReason, "%v", err) } } if layerChanges != nil { log.Info("applied objects", "changeset", layerChanges.ToMap()) if changes[condition] == nil { changes[condition] = sap.NewChangeSet() } changes[condition].Append(layerChanges.Entries) } } // Check if we applied runtime capabilities and update the condition appropriately if applyErrs[whv1.RuntimeCapabilitiesReadyCondition] != nil { conditions.MarkFalse(u, whv1.RuntimeCapabilitiesReadyCondition, whv1.ApplyFailedReason, "Attempted to apply runtime capabilities: %s", applyErrs[whv1.RuntimeCapabilitiesReadyCondition].Error(), ) } // If conditions that are meaningful for dependency analysis failed to apply, // return early. if applyErrs[whv1.RuntimeReadyCondition] != nil || applyErrs[whv1.InfraReadyCondition] != nil { return recerr.New( multierr.Append( applyErrs[whv1.RuntimeReadyCondition], applyErrs[whv1.InfraReadyCondition], ), whv1.ApplyFailedReason, ) } // Reconcile inventory for all changes if we didn't encounter any apply errors changeset := sap.NewChangeSet() for _, v := range changes { changeset.Append(v.Entries) } newInv := inventory.New(inventory.FromSapChangeSet(changeset)) if err := r.prune(ctx, u, newInv); err != nil { err := recerr.New(err, whv1.PruneFailedReason) err.ToCondition(u, whv1.HealthyCondition) return err } u.Status.LastApplied = u.Status.LastAttempted u.Status.Inventory = newInv u.Status.ShortDigest = u.ShortDigest() // Wait for objects that we applied independently for each condition that we // tracked changes for. var waitErr error for k, v := range changes { // If we failed to apply runtime capabilities, don't wait for them if k == whv1.RuntimeCapabilitiesReadyCondition && applyErrs[k] != nil { continue } if err := r.ResourceManager.WaitForSet(ctx, v.ToObjMetadataSet(), sap.WaitOptions{Timeout: u.Spec.Timeout.Duration}, ); err != nil { multierr.AppendInto(&waitErr, err) conditions.MarkFalse(u, k, whv1.ReconcileFailedReason, "%v", err) continue } conditions.MarkTrue(u, k, status.SucceededReason, "Objects reconciled") } if waitErr != nil { return recerr.NewWait(waitErr, whv1.ReconcileFailedReason, u.RetryInterval()) } // If we haven't returned due to error but failed to apply runtime capabilities, // that means everything else is okay and we should include that error in // the object status if applyErrs[whv1.RuntimeCapabilitiesReadyCondition] != nil { return recerr.New(applyErrs[whv1.RuntimeCapabilitiesReadyCondition], whv1.ApplyFailedReason) } return nil } func (r *UnpackedPalletReconciler) applyLayer( ctx context.Context, u *whv1.UnpackedPallet, l layer.Layer, ) (*sap.ChangeSet, error) { mgr := r.ResourceManager objs, err := l.Unstructured() if err != nil { return nil, err } mgr.SetOwnerLabels(objs, u.Name, "") changeset, err := mgr.ApplyAllStaged(ctx, objs, sap.ApplyOptions{ Force: u.Spec.Force, WaitTimeout: u.Spec.Timeout.Duration, }) if err != nil { return nil, err } return changeset, err } func (r *UnpackedPalletReconciler) prune(ctx context.Context, u *whv1.UnpackedPallet, newInventory *inventory.ResourceInventory) error { log := ctrl.LoggerFrom(ctx).WithName("prune") switch { case !u.Spec.Prune: log.Info("pruning is disabled") return nil case u.Status.Inventory == nil: return nil default: diff, err := u.Status.Inventory.Diff(newInventory) if err != nil || len(diff) == 0 { return err } // TODO: emit event with pruned resources deleted, err := r.ResourceManager.DeleteAll(ctx, diff, sap.DefaultDeleteOptions()) if err != nil { return err } log.Info("pruned", "changeset", deleted.ToMap()) return nil } }