...

Source file src/edge-infra.dev/pkg/f8n/warehouse/k8s/controllers/lumperctl/unpacked_pallet_controller.go

Documentation: edge-infra.dev/pkg/f8n/warehouse/k8s/controllers/lumperctl

     1  package lumperctl
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"time"
     7  
     8  	"github.com/google/go-containerregistry/pkg/name"
     9  	"github.com/google/go-containerregistry/pkg/v1/remote"
    10  	"go.uber.org/multierr"
    11  	ctrl "sigs.k8s.io/controller-runtime"
    12  	"sigs.k8s.io/controller-runtime/pkg/builder"
    13  	"sigs.k8s.io/controller-runtime/pkg/client"
    14  	"sigs.k8s.io/controller-runtime/pkg/controller"
    15  	"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
    16  	"sigs.k8s.io/controller-runtime/pkg/predicate"
    17  
    18  	whv1 "edge-infra.dev/pkg/f8n/warehouse/k8s/apis/v1alpha2"
    19  	"edge-infra.dev/pkg/f8n/warehouse/k8s/controllers/lumperctl/internal"
    20  	"edge-infra.dev/pkg/f8n/warehouse/lift/unpack"
    21  	"edge-infra.dev/pkg/f8n/warehouse/oci/layer"
    22  	"edge-infra.dev/pkg/f8n/warehouse/pallet"
    23  	"edge-infra.dev/pkg/k8s/meta/status"
    24  	"edge-infra.dev/pkg/k8s/runtime/conditions"
    25  	"edge-infra.dev/pkg/k8s/runtime/controller/reconcile"
    26  	"edge-infra.dev/pkg/k8s/runtime/controller/reconcile/recerr"
    27  	"edge-infra.dev/pkg/k8s/runtime/inventory"
    28  	"edge-infra.dev/pkg/k8s/runtime/patch"
    29  	"edge-infra.dev/pkg/k8s/runtime/sap"
    30  )
    31  
    32  // +kubebuilder:rbac:groups=warehouse.edge.ncr.com,resources=unpackedpallets,verbs=get;list;watch;create;update;patch;delete
    33  // +kubebuilder:rbac:groups=warehouse.edge.ncr.com,resources=unpackedpallets/status,verbs=get;update;patch
    34  // +kubebuilder:rbac:groups=warehouse.edge.ncr.com,resources=unpackedpallets/finalizers,verbs=get;create;update;patch;delete
    35  // +kubebuilder:rbac:groups="",resources=configmaps;secrets;serviceaccounts,verbs=get;list;watch
    36  // +kubebuilder:rbac:groups="",resources=events,verbs=create;patch
    37  
    38  type UnpackedPalletReconciler struct {
    39  	*internal.Reconciler
    40  
    41  	depRequeueInterval time.Duration
    42  }
    43  
    44  // unpackReadyConditions are the ready conditions that may be applied and
    45  // evaluated for an individual UnpackedPallet object's readiness
    46  var unpackReadyConditions = []string{
    47  	whv1.RuntimeReadyCondition,
    48  	whv1.InfraReadyCondition,
    49  	whv1.RuntimeCapabilitiesReadyCondition,
    50  }
    51  
    52  // unpackedPalletConditions is the reconcile summarization configuration for how
    53  // various conditions should be taken into account when the final condition is
    54  // summarized
    55  var unpackedPalletConditions = reconcile.Conditions{
    56  	Target: status.ReadyCondition,
    57  	Owned: append(unpackReadyConditions,
    58  		status.DependenciesReadyCondition,
    59  		whv1.HealthyCondition,
    60  		whv1.FetchedArtifactCondition,
    61  		status.ReadyCondition,
    62  		status.ReconcilingCondition,
    63  		status.StalledCondition,
    64  	),
    65  	Summarize: append(unpackReadyConditions,
    66  		whv1.HealthyCondition,
    67  		whv1.FetchedArtifactCondition,
    68  		status.DependenciesReadyCondition,
    69  		status.StalledCondition,
    70  	),
    71  	NegativePolarity: []string{
    72  		status.ReconcilingCondition,
    73  		status.StalledCondition,
    74  	},
    75  }
    76  
    77  func (r *UnpackedPalletReconciler) SetupWithManager(mgr ctrl.Manager, cfg UnpackedPalletCfg) error {
    78  	if err := r.Reconciler.SetupWithManager(mgr); err != nil {
    79  		return err
    80  	}
    81  
    82  	r.depRequeueInterval = cfg.DepRequeueInterval
    83  
    84  	return ctrl.NewControllerManagedBy(mgr).
    85  		For(&whv1.UnpackedPallet{}, builder.WithPredicates(
    86  			// Only re-reconcile if the spec changes, not status or metadata
    87  			predicate.GenerationChangedPredicate{},
    88  		)).
    89  		WithOptions(controller.Options{
    90  			MaxConcurrentReconciles: cfg.Concurrent,
    91  		}).
    92  		Complete(r)
    93  }
    94  
    95  func (r *UnpackedPalletReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, recErr error) {
    96  	var (
    97  		reconcileStart = time.Now()
    98  		log            = ctrl.LoggerFrom(ctx)
    99  		result         = reconcile.ResultEmpty
   100  		u              = &whv1.UnpackedPallet{}
   101  	)
   102  
   103  	if err := r.Get(ctx, req.NamespacedName, u); err != nil {
   104  		return ctrl.Result{}, client.IgnoreNotFound(err)
   105  	}
   106  
   107  	patcher := patch.NewSerialPatcher(u, r.Client)
   108  
   109  	defer func() {
   110  		s := reconcile.NewSummarizer(patcher)
   111  		res, recErr = s.SummarizeAndPatch(ctx, u,
   112  			reconcile.WithConditions(r.Conditions),
   113  			reconcile.WithResult(result),
   114  			reconcile.WithError(recErr),
   115  			reconcile.WithIgnoreNotFound(),
   116  			reconcile.WithProcessors(
   117  				reconcile.RecordReconcileReq,
   118  				reconcile.RecordResult,
   119  			),
   120  			reconcile.WithFieldOwner(r.Name),
   121  		)
   122  
   123  		r.Metrics.RecordConditionsWithReason(ctx, u)
   124  		r.Metrics.RecordSuspend(ctx, u, u.Spec.Suspend)
   125  		r.Metrics.RecordDuration(ctx, u, reconcileStart)
   126  		r.LivenessChecker.AddStatus(u, recErr)
   127  		internal.RecordCacheLen(float64(r.Cache.Len()))
   128  		internal.RecordEdgeVersion(u.Name, u.ObjectMeta.Annotations["pallet.edge.ncr.com/version"], !u.GetDeletionTimestamp().IsZero())
   129  	}()
   130  
   131  	// Add our finalizer if it does not exist
   132  	if !controllerutil.ContainsFinalizer(u, whv1.WarehouseFinalizer) {
   133  		controllerutil.AddFinalizer(u, whv1.WarehouseFinalizer)
   134  		// Return immediately so that we requeue and reconcile object with finalizer
   135  		// added.
   136  		result = reconcile.ResultRequeue
   137  		return
   138  	}
   139  
   140  	// If a deletion timestamp has been set, execute finalizer logic
   141  	if !u.ObjectMeta.DeletionTimestamp.IsZero() {
   142  		log.Info("detected deletion, executing finalizer")
   143  		// TODO: use serial patcher to incrementally patch terminating status
   144  		// indicate that we are reconciling the termination so that the resource
   145  		// does not indicate it is Ready
   146  		finalizer := Finalizer{
   147  			Name:            whv1.WarehouseFinalizer,
   148  			ResourceManager: r.ResourceManager,
   149  		}
   150  		if err := finalizer.Finalize(ctx, u, u.Spec.Prune); err != nil {
   151  			recErr = err
   152  			return
   153  		}
   154  		controllerutil.RemoveFinalizer(u, whv1.WarehouseFinalizer)
   155  		log.Info("finalizer executed")
   156  		return
   157  	}
   158  
   159  	if u.Spec.Suspend {
   160  		log.Info("reconciliation is suspended", "suspended", "true")
   161  		return
   162  	}
   163  
   164  	log.Info("reconciling")
   165  	if recErr = r.reconcile(ctx, patcher, u); recErr == nil {
   166  		result = reconcile.ResultSuccess
   167  	}
   168  	return
   169  }
   170  
   171  func (r *UnpackedPalletReconciler) reconcile(
   172  	ctx context.Context,
   173  	patcher *patch.SerialPatcher,
   174  	u *whv1.UnpackedPallet,
   175  ) recerr.Error {
   176  	if u.Spec.UnpackOptions.Provider == "" {
   177  		u.Spec.UnpackOptions.Provider = r.Provider
   178  	}
   179  
   180  	if err := u.Validate(); err != nil {
   181  		return recerr.NewStalled(fmt.Errorf("invalid spec: %w", err),
   182  			whv1.InvalidSpecReason)
   183  	}
   184  
   185  	// Check dependencies before we mark the object as reconciling to avoid
   186  	// churn on readiness for resources that depend on the pallet we are reconciling
   187  	if err := r.reconcileDeps(ctx, u); err != nil {
   188  		err.ToCondition(u, status.DependenciesReadyCondition)
   189  		return err
   190  	}
   191  
   192  	if err := reconcile.Progressing(ctx, u, patcher, r.PatchOpts()...); err != nil {
   193  		return recerr.New(err, whv1.ReconcileFailedReason)
   194  	}
   195  
   196  	ref, err := parseRef(u.Spec.Artifact)
   197  	if err != nil {
   198  		return err
   199  	}
   200  
   201  	log := logWithRef(ctrl.LoggerFrom(ctx), ref, u.Spec.Name)
   202  	// Persist artifact-decorated logger for sub-reconcilers
   203  	ctx = ctrl.LoggerInto(ctx, log)
   204  
   205  	p, err := r.fetch(ctx, u, ref)
   206  	if err != nil {
   207  		return err
   208  	}
   209  
   210  	if !p.Supports(u.Spec.Provider) {
   211  		return recerr.NewStalled(
   212  			fmt.Errorf("%s not supported: [%s]", u.Spec.Provider, p.Providers()),
   213  			whv1.InvalidArtifactReason,
   214  		)
   215  	}
   216  
   217  	layers, unpackErr := unpack.Layers(p,
   218  		unpack.ForProvider(u.Spec.Provider),
   219  		unpack.ForLayerKeys(u.Layers()...),
   220  		unpack.RenderWith(u.Spec.Parameters),
   221  		unpack.WithInfraNamespace(u.Spec.InfraNamespace),
   222  	)
   223  	switch {
   224  	case unpackErr != nil:
   225  		return recerr.New(fmt.Errorf("failed to unpack: %w", unpackErr),
   226  			whv1.UnpackFailedReason)
   227  	case len(layers) == 0:
   228  		return recerr.NewStalled(
   229  			fmt.Errorf("Package contained no objects for the provided unpacking options"),
   230  			whv1.UnpackFailedReason,
   231  		)
   232  	}
   233  
   234  	if err := r.apply(ctx, u, layers); err != nil {
   235  		return err
   236  	}
   237  	log.Info("applied artifact")
   238  	conditions.MarkTrue(u, whv1.HealthyCondition, status.SucceededReason,
   239  		"Objects reconciled")
   240  
   241  	return nil
   242  }
   243  
   244  func (r *UnpackedPalletReconciler) reconcileDeps(
   245  	ctx context.Context,
   246  	u *whv1.UnpackedPallet,
   247  ) recerr.Error {
   248  	switch {
   249  	case len(u.Spec.DependsOn) > 0:
   250  		u.Status.Dependencies = u.Dependencies()
   251  	case len(u.Spec.DependsOn) == 0:
   252  		u.Status.Dependencies = ""
   253  		conditions.Delete(u, status.DependenciesReadyCondition)
   254  		return nil
   255  	}
   256  
   257  	var unready []string
   258  	for _, dep := range u.Spec.DependsOn {
   259  		var du whv1.UnpackedPallet
   260  		if err := r.Client.Get(ctx, client.ObjectKey{Name: dep.Name}, &du); err != nil {
   261  			// Use standard retry interval instead of dependency retry because we
   262  			// aren't waiting for readiness
   263  			return recerr.NewWait(err, status.DependencyNotFoundReason, u.RetryInterval())
   264  		}
   265  
   266  		if !du.IsUpToDate() {
   267  			unready = append(unready, du.Name)
   268  		}
   269  	}
   270  
   271  	if len(unready) == 0 {
   272  		conditions.MarkTrue(
   273  			u,
   274  			status.DependenciesReadyCondition,
   275  			status.DependencyReadyReason,
   276  			"Dependencies up to date",
   277  		)
   278  		return nil
   279  	}
   280  
   281  	return recerr.NewWait(
   282  		fmt.Errorf("%d/%d dependencies ready: waiting for %s",
   283  			len(u.Spec.DependsOn)-len(unready), len(u.Spec.DependsOn), unready),
   284  		status.DependencyNotReadyReason,
   285  		r.depRequeueInterval,
   286  	)
   287  }
   288  
   289  // fetch fetches the artifact and then either returns digest from artifact spec
   290  // or infers it from the fetched artifact (in the event that the artifact spec
   291  // is a tag)
   292  func (r *UnpackedPalletReconciler) fetch(
   293  	ctx context.Context,
   294  	u *whv1.UnpackedPallet,
   295  	ref name.Reference,
   296  ) (pallet.Pallet, recerr.Error) {
   297  	// Create keychain that is scoped to this reconcile loop, to ensure that we
   298  	// get up-to-date handles on our auth context (e.g., API key rotation)
   299  	keychain, recErr := r.Keychain(ctx, u)
   300  	if recErr != nil {
   301  		return nil, recErr
   302  	}
   303  	a, recErr := r.Fetch(
   304  		ctx,
   305  		u,
   306  		ref,
   307  		u.Spec.PackagePullOptions.PackagePullPolicy,
   308  		remote.WithAuthFromKeychain(keychain),
   309  	)
   310  	if recErr != nil {
   311  		return nil, recErr
   312  	}
   313  
   314  	// TODO: test stalled condition
   315  	p, err := pallet.New(a)
   316  	if err != nil {
   317  		return nil, recerr.NewStalled(err, whv1.InvalidArtifactReason)
   318  	}
   319  
   320  	digest := u.Spec.Artifact.Digest
   321  	if digest == "" {
   322  		d, err := a.Digest()
   323  		if err != nil {
   324  			return nil, recerr.New(fmt.Errorf("failed to compute digest: %w", err),
   325  				whv1.FetchFailedReason)
   326  		}
   327  		digest = d.String()
   328  	}
   329  	u.Status.LastAttempted = digest
   330  
   331  	return p, nil
   332  }
   333  
   334  // apply applies any layers passed in, updates inventory + handles pruning, then
   335  // waits for the resources to become ready. Dependent on each layer's type,
   336  // individual conditions are set based on the result (e.g., InfraReady,
   337  // RuntimeReady, etc).
   338  //
   339  // Inventory is handled before waiting for resource readiness to avoid orphaning
   340  // resources if objects between generations do not reconcile.
   341  //
   342  // Apply is resilient to any individual layer type failing to apply, allowing
   343  // the conditions for each layer type to move independently. This allows partial
   344  // analysis of dependency readiness in reconcileDeps so that dependencies are not
   345  // blocked waiting on runtime capabilities.
   346  func (r *UnpackedPalletReconciler) apply(
   347  	ctx context.Context,
   348  	u *whv1.UnpackedPallet,
   349  	layers []layer.Layer,
   350  ) recerr.Error {
   351  	// Reset specific Ready conditions for this Pallet so that we can re-build them.
   352  	// This ensures that if a specific Ready condition is no longer applicable,
   353  	// either due to the package contents changing or the unpack options changing,
   354  	// it is removed from the final status.
   355  	for _, c := range unpackReadyConditions {
   356  		conditions.Delete(u, c)
   357  	}
   358  
   359  	// Track changes and apply errors for each layer type by the condition they
   360  	// roll up under.
   361  	var (
   362  		changes   = make(map[string]*sap.ChangeSet)
   363  		applyErrs = make(map[string]error)
   364  	)
   365  	for _, l := range layers {
   366  		log := ctrl.LoggerFrom(ctx).WithValues("layer", l.Key())
   367  
   368  		// Resolve condition for this layer type
   369  		var condition string
   370  		switch {
   371  		case l.Key() == layer.Infra.String():
   372  			condition = whv1.InfraReadyCondition
   373  		case l.Key() == layer.Runtime.String():
   374  			condition = whv1.RuntimeReadyCondition
   375  		case l.Type() == layer.Runtime:
   376  			// All remaining runtime layers are runtime capabilities.
   377  			condition = whv1.RuntimeCapabilitiesReadyCondition
   378  		default:
   379  			// Any other combination of keys and types represents unknown behavior.
   380  			return recerr.NewStalled(
   381  				fmt.Errorf("unexpected layer: type %s with key %s", l.Type(), l.Key()),
   382  				whv1.InvalidArtifactReason,
   383  			)
   384  		}
   385  
   386  		layerChanges, err := r.applyLayer(ctx, u, l)
   387  		if err != nil {
   388  			log.Error(err, "failed to apply objects")
   389  			applyErrs[condition] = multierr.Append(applyErrs[condition], err)
   390  			switch condition {
   391  			case whv1.RuntimeCapabilitiesReadyCondition:
   392  				// Only update the runtime capability ready condition after applying
   393  				// all layers because it can have a many:1 relationship with layers.
   394  				//
   395  				// Only log at error-level for layers that are used for dependency
   396  				// analysis. Failing to apply runtime capability layers is not unusual
   397  				// and we don't want to spam low-signal error logs. Any persistent
   398  				// failures will still be reflected in the object status.
   399  				log.Info("failed to apply objects", "error", err)
   400  			default:
   401  				// Update RuntimeCapabilitiesReady after applying all layers because it can
   402  				// have a many:1 relationship with layers
   403  				log.Error(err, "failed to apply objects")
   404  				conditions.MarkFalse(u, condition, whv1.ApplyFailedReason, "%v", err)
   405  			}
   406  		}
   407  		if layerChanges != nil {
   408  			log.Info("applied objects", "changeset", layerChanges.ToMap())
   409  			if changes[condition] == nil {
   410  				changes[condition] = sap.NewChangeSet()
   411  			}
   412  			changes[condition].Append(layerChanges.Entries)
   413  		}
   414  	}
   415  
   416  	// Check if we applied runtime capabilities and update the condition appropriately
   417  	if applyErrs[whv1.RuntimeCapabilitiesReadyCondition] != nil {
   418  		conditions.MarkFalse(u,
   419  			whv1.RuntimeCapabilitiesReadyCondition,
   420  			whv1.ApplyFailedReason,
   421  			"Attempted to apply runtime capabilities: %s",
   422  			applyErrs[whv1.RuntimeCapabilitiesReadyCondition].Error(),
   423  		)
   424  	}
   425  
   426  	// If conditions that are meaningful for dependency analysis failed to apply,
   427  	// return early.
   428  	if applyErrs[whv1.RuntimeReadyCondition] != nil ||
   429  		applyErrs[whv1.InfraReadyCondition] != nil {
   430  		return recerr.New(
   431  			multierr.Append(
   432  				applyErrs[whv1.RuntimeReadyCondition],
   433  				applyErrs[whv1.InfraReadyCondition],
   434  			),
   435  			whv1.ApplyFailedReason,
   436  		)
   437  	}
   438  
   439  	// Reconcile inventory for all changes if we didn't encounter any apply errors
   440  	changeset := sap.NewChangeSet()
   441  	for _, v := range changes {
   442  		changeset.Append(v.Entries)
   443  	}
   444  
   445  	newInv := inventory.New(inventory.FromSapChangeSet(changeset))
   446  	if err := r.prune(ctx, u, newInv); err != nil {
   447  		err := recerr.New(err, whv1.PruneFailedReason)
   448  		err.ToCondition(u, whv1.HealthyCondition)
   449  		return err
   450  	}
   451  
   452  	u.Status.LastApplied = u.Status.LastAttempted
   453  	u.Status.Inventory = newInv
   454  	u.Status.ShortDigest = u.ShortDigest()
   455  
   456  	// Wait for objects that we applied independently for each condition that we
   457  	// tracked changes for.
   458  	var waitErr error
   459  	for k, v := range changes {
   460  		// If we failed to apply runtime capabilities, don't wait for them
   461  		if k == whv1.RuntimeCapabilitiesReadyCondition && applyErrs[k] != nil {
   462  			continue
   463  		}
   464  
   465  		if err := r.ResourceManager.WaitForSet(ctx,
   466  			v.ToObjMetadataSet(),
   467  			sap.WaitOptions{Timeout: u.Spec.Timeout.Duration},
   468  		); err != nil {
   469  			multierr.AppendInto(&waitErr, err)
   470  			conditions.MarkFalse(u, k, whv1.ReconcileFailedReason, "%v", err)
   471  			continue
   472  		}
   473  
   474  		conditions.MarkTrue(u, k, status.SucceededReason, "Objects reconciled")
   475  	}
   476  
   477  	if waitErr != nil {
   478  		return recerr.NewWait(waitErr, whv1.ReconcileFailedReason, u.RetryInterval())
   479  	}
   480  
   481  	// If we haven't returned due to error but failed to apply runtime capabilities,
   482  	// that means everything else is okay and we should include that error in
   483  	// the object status
   484  	if applyErrs[whv1.RuntimeCapabilitiesReadyCondition] != nil {
   485  		return recerr.New(applyErrs[whv1.RuntimeCapabilitiesReadyCondition],
   486  			whv1.ApplyFailedReason)
   487  	}
   488  
   489  	return nil
   490  }
   491  
   492  func (r *UnpackedPalletReconciler) applyLayer(
   493  	ctx context.Context,
   494  	u *whv1.UnpackedPallet,
   495  	l layer.Layer,
   496  ) (*sap.ChangeSet, error) {
   497  	mgr := r.ResourceManager
   498  
   499  	objs, err := l.Unstructured()
   500  	if err != nil {
   501  		return nil, err
   502  	}
   503  	mgr.SetOwnerLabels(objs, u.Name, "")
   504  
   505  	changeset, err := mgr.ApplyAllStaged(ctx, objs, sap.ApplyOptions{
   506  		Force:       u.Spec.Force,
   507  		WaitTimeout: u.Spec.Timeout.Duration,
   508  	})
   509  	if err != nil {
   510  		return nil, err
   511  	}
   512  
   513  	return changeset, err
   514  }
   515  
   516  func (r *UnpackedPalletReconciler) prune(ctx context.Context, u *whv1.UnpackedPallet, newInventory *inventory.ResourceInventory) error {
   517  	log := ctrl.LoggerFrom(ctx).WithName("prune")
   518  
   519  	switch {
   520  	case !u.Spec.Prune:
   521  		log.Info("pruning is disabled")
   522  		return nil
   523  	case u.Status.Inventory == nil:
   524  		return nil
   525  	default:
   526  		diff, err := u.Status.Inventory.Diff(newInventory)
   527  		if err != nil || len(diff) == 0 {
   528  			return err
   529  		}
   530  
   531  		// TODO: emit event with pruned resources
   532  		deleted, err := r.ResourceManager.DeleteAll(ctx, diff, sap.DefaultDeleteOptions())
   533  		if err != nil {
   534  			return err
   535  		}
   536  		log.Info("pruned", "changeset", deleted.ToMap())
   537  		return nil
   538  	}
   539  }
   540  

View as plain text