package lumperctl import ( "context" "errors" "fmt" "time" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" whv1 "edge-infra.dev/pkg/f8n/warehouse/k8s/apis/v1alpha2" "edge-infra.dev/pkg/f8n/warehouse/k8s/controllers/lumperctl/internal" "edge-infra.dev/pkg/k8s/meta/status" "edge-infra.dev/pkg/k8s/runtime/conditions" "edge-infra.dev/pkg/k8s/runtime/controller/reconcile" "edge-infra.dev/pkg/k8s/runtime/controller/reconcile/recerr" "edge-infra.dev/pkg/k8s/runtime/inventory" "edge-infra.dev/pkg/k8s/runtime/patch" "edge-infra.dev/pkg/k8s/runtime/sap" "edge-infra.dev/pkg/k8s/unstructured" ) // +kubebuilder:rbac:groups=warehouse.edge.ncr.com,resources=unpackedpallets,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=warehouse.edge.ncr.com,resources=unpackedpallets/status,verbs=get;update;patch // +kubebuilder:rbac:groups=warehouse.edge.ncr.com,resources=unpackedpallets/finalizers,verbs=get;create;update;patch;delete // +kubebuilder:rbac:groups=warehouse.edge.ncr.com,resources=shipments/finalizers,verbs=get;create;update;patch;delete // +kubebuilder:rbac:groups="",resources=configmaps;secrets;serviceaccounts,verbs=get;list;watch // +kubebuilder:rbac:groups="",resources=events,verbs=create;patch type ShipmentReconciler struct { *internal.Reconciler } var shipmentConditions = reconcile.Conditions{ Target: status.ReadyCondition, Owned: []string{ whv1.HealthyCondition, whv1.FetchedArtifactCondition, status.ReadyCondition, status.ReconcilingCondition, status.StalledCondition, }, Summarize: []string{ whv1.HealthyCondition, whv1.FetchedArtifactCondition, status.StalledCondition, }, NegativePolarity: []string{ status.ReconcilingCondition, status.StalledCondition, }, } func (r *ShipmentReconciler) SetupWithManager(mgr ctrl.Manager, cfg ShipmentCfg) error { if err := r.Reconciler.SetupWithManager(mgr); err != nil { return err } return ctrl.NewControllerManagedBy(mgr). For(&whv1.Shipment{}). WithOptions(controller.Options{ MaxConcurrentReconciles: cfg.Concurrent, }). Owns(&whv1.UnpackedPallet{}). Complete(r) } func (r *ShipmentReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, recErr error) { var ( reconcileStart = time.Now() log = ctrl.LoggerFrom(ctx) result = reconcile.ResultEmpty s = &whv1.Shipment{} ) if err := r.Get(ctx, req.NamespacedName, s); err != nil { return ctrl.Result{}, client.IgnoreNotFound(err) } patcher := patch.NewSerialPatcher(s, r.Client) defer func() { summarizer := reconcile.NewSummarizer(patcher) res, recErr = summarizer.SummarizeAndPatch(ctx, s, []reconcile.SummarizeOption{ reconcile.WithConditions(r.Conditions), reconcile.WithResult(result), reconcile.WithError(recErr), reconcile.WithIgnoreNotFound(), reconcile.WithProcessors( reconcile.RecordReconcileReq, reconcile.RecordResult, ), reconcile.WithFieldOwner(r.Name), }...) r.Metrics.RecordConditionsWithReason(ctx, s) r.Metrics.RecordDuration(ctx, s, reconcileStart) r.Metrics.RecordSuspend(ctx, s, s.Spec.Suspend) r.LivenessChecker.AddStatus(s, recErr) internal.RecordCacheLen(float64(r.Cache.Len())) }() // Check if finalizer exists if !controllerutil.ContainsFinalizer(s, whv1.WarehouseFinalizer) { controllerutil.AddFinalizer(s, whv1.WarehouseFinalizer) // Return immediately so that we requeue and reconcile object with finalizer // added. result = reconcile.ResultRequeue return } // If a deletion timestamp has been set, execute finalizer logic if !s.ObjectMeta.DeletionTimestamp.IsZero() { log.Info("detected deletion, executing finalizer") // TODO: use serial patcher to incrementally patch terminating status finalizer := Finalizer{ Name: whv1.WarehouseFinalizer, ResourceManager: r.ResourceManager, } if err := finalizer.Finalize(ctx, s, s.Spec.Prune); err != nil { result, recErr = reconcile.ResultEmpty, err return } controllerutil.RemoveFinalizer(s, whv1.WarehouseFinalizer) log.Info("finalizer executed") return } if s.Spec.Suspend { log.Info("reconciliation is suspended", "suspended", "true") return } log.Info("reconciling") recErr = r.reconcile(ctx, patcher, s) if recErr == nil { result = reconcile.ResultSuccess } return } func (r *ShipmentReconciler) reconcile( ctx context.Context, patcher *patch.SerialPatcher, s *whv1.Shipment, ) recerr.Error { log := ctrl.LoggerFrom(ctx) if s.Spec.UnpackOptions.Provider == "" { s.Spec.UnpackOptions.Provider = r.Provider } if err := s.Validate(); err != nil { return recerr.NewStalled(fmt.Errorf("invalid spec: %w", err), whv1.InvalidSpecReason) } if err := reconcile.Progressing(ctx, s, patcher, r.PatchOpts()...); err != nil { return recerr.New(err, whv1.ReconcileFailedReason) } parameters, err := r.resolveParameters(ctx, s) if err != nil { return err } resolveResult, err := r.resolvePallets(ctx, s, parameters) if err != nil { return err } // Ensure that the shipment runtime capability pallet exists in the set of // packages that will be scheduled by the shipment. if !s.Spec.UnpackOptions.IsEmpty() { for _, capability := range s.Spec.UnpackOptions.Capabilities { found := false for name := range resolveResult.unpackedPallets { if capability == name { found = true } } if !found { return recerr.NewStalled( fmt.Errorf( "runtime capability provider %s not included in shipment packages", capability, ), whv1.InvalidArtifactReason, ) } } } objs := make([]*unstructured.Unstructured, len(resolveResult.unpackedPallets)) i := 0 for _, v := range resolveResult.unpackedPallets { u, err := unstructured.ToUnstructured(v) if err != nil { return recerr.New(err, whv1.ApplyFailedReason) } objs[i] = u i++ } // Update lastAttempted to record whatever craziness we are about to try s.Status.LastAttempted = resolveResult.resolved if err := r.apply(ctx, s, objs); err != nil { err.ToCondition(s, whv1.HealthyCondition) // Reflect error on HealthyCondition return err } log.Info("applied artifacts") conditions.MarkTrue(s, whv1.HealthyCondition, status.SucceededReason, "Applied %d pallets", len(resolveResult.unpackedPallets)) return nil } func (r *ShipmentReconciler) apply( ctx context.Context, s *whv1.Shipment, objs []*unstructured.Unstructured, ) recerr.Error { var ( mgr = r.ResourceManager log = ctrl.LoggerFrom(ctx).WithName("apply") ) mgr.SetOwnerLabels(objs, s.Name, "") changeSet, err := mgr.ApplyAll(ctx, objs, sap.ApplyOptions{ WaitTimeout: s.Spec.Timeout.Duration, }) if err != nil { return recerr.New(err, whv1.ApplyFailedReason) } log.Info("applied", "changeset", changeSet.ToMap()) newInventory := inventory.New(inventory.FromSapChangeSet(changeSet)) if err := r.prune(ctx, s, newInventory); err != nil { return recerr.New(err, whv1.PruneFailedReason) } // Replace old inventory with new inventory now that we have diff'd and pruned // any resources s.Status.Inventory = newInventory s.Status.LastApplied = s.Status.LastAttempted if err := mgr.WaitForSet(ctx, changeSet.ToObjMetadataSet(), sap.WaitOptions{ Timeout: s.Spec.Timeout.Duration, }); err != nil { err := recerr.NewWait(err, whv1.ReconcileFailedReason, s.RetryInterval()) return err } return nil } func (r *ShipmentReconciler) prune( ctx context.Context, s *whv1.Shipment, newInventory *inventory.ResourceInventory, ) error { log := ctrl.LoggerFrom(ctx).WithName("prune") switch { case !s.Spec.Prune: log.Info("pruning is turned off for this shipment") return nil case s.Status.Inventory == nil: return nil default: diff, err := s.Status.Inventory.Diff(newInventory) switch { case err != nil: return err case len(diff) == 0: return nil } // TODO: emit event with pruned resources? deleted, err := r.ResourceManager.DeleteAll(ctx, diff, sap.DefaultDeleteOptions()) if err != nil { return err } log.Info("pruned", "changeset", deleted.ToMap()) return nil } } func (r *ShipmentReconciler) resolveParameters( ctx context.Context, s *whv1.Shipment, ) (map[string]string, recerr.Error) { result := make(map[string]string, 0) for _, p := range s.Spec.Rendering { vars, err := p.Resolve(ctx, r.Client) switch { case errors.Is(err, whv1.ErrInvalidParameterMapping): // TODO: only return ErrInvalidParameterMapping if the invalid params are // inlined, because we should reconcile and pick up external updates in // configmaps eventually return nil, recerr.NewStalled(err, whv1.InvalidResourceReason) case err != nil: return nil, recerr.New(err, whv1.RenderingParameterParsingFailedReason) } for k, v := range vars { result[k] = v } } return result, nil }