...

Source file src/edge-infra.dev/pkg/f8n/warehouse/k8s/controllers/lumperctl/shipment_controller.go

Documentation: edge-infra.dev/pkg/f8n/warehouse/k8s/controllers/lumperctl

     1  package lumperctl
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"fmt"
     7  	"time"
     8  
     9  	ctrl "sigs.k8s.io/controller-runtime"
    10  	"sigs.k8s.io/controller-runtime/pkg/client"
    11  	"sigs.k8s.io/controller-runtime/pkg/controller"
    12  	"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
    13  
    14  	whv1 "edge-infra.dev/pkg/f8n/warehouse/k8s/apis/v1alpha2"
    15  	"edge-infra.dev/pkg/f8n/warehouse/k8s/controllers/lumperctl/internal"
    16  	"edge-infra.dev/pkg/k8s/meta/status"
    17  	"edge-infra.dev/pkg/k8s/runtime/conditions"
    18  	"edge-infra.dev/pkg/k8s/runtime/controller/reconcile"
    19  	"edge-infra.dev/pkg/k8s/runtime/controller/reconcile/recerr"
    20  	"edge-infra.dev/pkg/k8s/runtime/inventory"
    21  	"edge-infra.dev/pkg/k8s/runtime/patch"
    22  	"edge-infra.dev/pkg/k8s/runtime/sap"
    23  	"edge-infra.dev/pkg/k8s/unstructured"
    24  )
    25  
    26  // +kubebuilder:rbac:groups=warehouse.edge.ncr.com,resources=unpackedpallets,verbs=get;list;watch;create;update;patch;delete
    27  // +kubebuilder:rbac:groups=warehouse.edge.ncr.com,resources=unpackedpallets/status,verbs=get;update;patch
    28  // +kubebuilder:rbac:groups=warehouse.edge.ncr.com,resources=unpackedpallets/finalizers,verbs=get;create;update;patch;delete
    29  // +kubebuilder:rbac:groups=warehouse.edge.ncr.com,resources=shipments/finalizers,verbs=get;create;update;patch;delete
    30  // +kubebuilder:rbac:groups="",resources=configmaps;secrets;serviceaccounts,verbs=get;list;watch
    31  // +kubebuilder:rbac:groups="",resources=events,verbs=create;patch
    32  
    33  type ShipmentReconciler struct {
    34  	*internal.Reconciler
    35  }
    36  
    37  var shipmentConditions = reconcile.Conditions{
    38  	Target: status.ReadyCondition,
    39  	Owned: []string{
    40  		whv1.HealthyCondition,
    41  		whv1.FetchedArtifactCondition,
    42  		status.ReadyCondition,
    43  		status.ReconcilingCondition,
    44  		status.StalledCondition,
    45  	},
    46  	Summarize: []string{
    47  		whv1.HealthyCondition,
    48  		whv1.FetchedArtifactCondition,
    49  		status.StalledCondition,
    50  	},
    51  	NegativePolarity: []string{
    52  		status.ReconcilingCondition,
    53  		status.StalledCondition,
    54  	},
    55  }
    56  
    57  func (r *ShipmentReconciler) SetupWithManager(mgr ctrl.Manager, cfg ShipmentCfg) error {
    58  	if err := r.Reconciler.SetupWithManager(mgr); err != nil {
    59  		return err
    60  	}
    61  
    62  	return ctrl.NewControllerManagedBy(mgr).
    63  		For(&whv1.Shipment{}).
    64  		WithOptions(controller.Options{
    65  			MaxConcurrentReconciles: cfg.Concurrent,
    66  		}).
    67  		Owns(&whv1.UnpackedPallet{}).
    68  		Complete(r)
    69  }
    70  
    71  func (r *ShipmentReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, recErr error) {
    72  	var (
    73  		reconcileStart = time.Now()
    74  		log            = ctrl.LoggerFrom(ctx)
    75  		result         = reconcile.ResultEmpty
    76  		s              = &whv1.Shipment{}
    77  	)
    78  
    79  	if err := r.Get(ctx, req.NamespacedName, s); err != nil {
    80  		return ctrl.Result{}, client.IgnoreNotFound(err)
    81  	}
    82  
    83  	patcher := patch.NewSerialPatcher(s, r.Client)
    84  
    85  	defer func() {
    86  		summarizer := reconcile.NewSummarizer(patcher)
    87  		res, recErr = summarizer.SummarizeAndPatch(ctx, s, []reconcile.SummarizeOption{
    88  			reconcile.WithConditions(r.Conditions),
    89  			reconcile.WithResult(result),
    90  			reconcile.WithError(recErr),
    91  			reconcile.WithIgnoreNotFound(),
    92  			reconcile.WithProcessors(
    93  				reconcile.RecordReconcileReq,
    94  				reconcile.RecordResult,
    95  			),
    96  			reconcile.WithFieldOwner(r.Name),
    97  		}...)
    98  
    99  		r.Metrics.RecordConditionsWithReason(ctx, s)
   100  		r.Metrics.RecordDuration(ctx, s, reconcileStart)
   101  		r.Metrics.RecordSuspend(ctx, s, s.Spec.Suspend)
   102  		r.LivenessChecker.AddStatus(s, recErr)
   103  
   104  		internal.RecordCacheLen(float64(r.Cache.Len()))
   105  	}()
   106  
   107  	// Check if finalizer exists
   108  	if !controllerutil.ContainsFinalizer(s, whv1.WarehouseFinalizer) {
   109  		controllerutil.AddFinalizer(s, whv1.WarehouseFinalizer)
   110  		// Return immediately so that we requeue and reconcile object with finalizer
   111  		// added.
   112  		result = reconcile.ResultRequeue
   113  		return
   114  	}
   115  
   116  	// If a deletion timestamp has been set, execute finalizer logic
   117  	if !s.ObjectMeta.DeletionTimestamp.IsZero() {
   118  		log.Info("detected deletion, executing finalizer")
   119  		// TODO: use serial patcher to incrementally patch terminating status
   120  		finalizer := Finalizer{
   121  			Name:            whv1.WarehouseFinalizer,
   122  			ResourceManager: r.ResourceManager,
   123  		}
   124  
   125  		if err := finalizer.Finalize(ctx, s, s.Spec.Prune); err != nil {
   126  			result, recErr = reconcile.ResultEmpty, err
   127  			return
   128  		}
   129  
   130  		controllerutil.RemoveFinalizer(s, whv1.WarehouseFinalizer)
   131  		log.Info("finalizer executed")
   132  		return
   133  	}
   134  
   135  	if s.Spec.Suspend {
   136  		log.Info("reconciliation is suspended", "suspended", "true")
   137  		return
   138  	}
   139  
   140  	log.Info("reconciling")
   141  	recErr = r.reconcile(ctx, patcher, s)
   142  	if recErr == nil {
   143  		result = reconcile.ResultSuccess
   144  	}
   145  	return
   146  }
   147  
   148  func (r *ShipmentReconciler) reconcile(
   149  	ctx context.Context,
   150  	patcher *patch.SerialPatcher,
   151  	s *whv1.Shipment,
   152  ) recerr.Error {
   153  	log := ctrl.LoggerFrom(ctx)
   154  
   155  	if s.Spec.UnpackOptions.Provider == "" {
   156  		s.Spec.UnpackOptions.Provider = r.Provider
   157  	}
   158  
   159  	if err := s.Validate(); err != nil {
   160  		return recerr.NewStalled(fmt.Errorf("invalid spec: %w", err),
   161  			whv1.InvalidSpecReason)
   162  	}
   163  
   164  	if err := reconcile.Progressing(ctx, s, patcher, r.PatchOpts()...); err != nil {
   165  		return recerr.New(err, whv1.ReconcileFailedReason)
   166  	}
   167  
   168  	parameters, err := r.resolveParameters(ctx, s)
   169  	if err != nil {
   170  		return err
   171  	}
   172  	resolveResult, err := r.resolvePallets(ctx, s, parameters)
   173  	if err != nil {
   174  		return err
   175  	}
   176  
   177  	// Ensure that the shipment runtime capability pallet exists in the set of
   178  	// packages that will be scheduled by the shipment.
   179  	if !s.Spec.UnpackOptions.IsEmpty() {
   180  		for _, capability := range s.Spec.UnpackOptions.Capabilities {
   181  			found := false
   182  			for name := range resolveResult.unpackedPallets {
   183  				if capability == name {
   184  					found = true
   185  				}
   186  			}
   187  			if !found {
   188  				return recerr.NewStalled(
   189  					fmt.Errorf(
   190  						"runtime capability provider %s not included in shipment packages",
   191  						capability,
   192  					),
   193  					whv1.InvalidArtifactReason,
   194  				)
   195  			}
   196  		}
   197  	}
   198  
   199  	objs := make([]*unstructured.Unstructured, len(resolveResult.unpackedPallets))
   200  	i := 0
   201  	for _, v := range resolveResult.unpackedPallets {
   202  		u, err := unstructured.ToUnstructured(v)
   203  		if err != nil {
   204  			return recerr.New(err, whv1.ApplyFailedReason)
   205  		}
   206  		objs[i] = u
   207  		i++
   208  	}
   209  
   210  	// Update lastAttempted to record whatever craziness we are about to try
   211  	s.Status.LastAttempted = resolveResult.resolved
   212  
   213  	if err := r.apply(ctx, s, objs); err != nil {
   214  		err.ToCondition(s, whv1.HealthyCondition) // Reflect error on HealthyCondition
   215  		return err
   216  	}
   217  	log.Info("applied artifacts")
   218  
   219  	conditions.MarkTrue(s, whv1.HealthyCondition, status.SucceededReason,
   220  		"Applied %d pallets", len(resolveResult.unpackedPallets))
   221  	return nil
   222  }
   223  
   224  func (r *ShipmentReconciler) apply(
   225  	ctx context.Context,
   226  	s *whv1.Shipment,
   227  	objs []*unstructured.Unstructured,
   228  ) recerr.Error {
   229  	var (
   230  		mgr = r.ResourceManager
   231  		log = ctrl.LoggerFrom(ctx).WithName("apply")
   232  	)
   233  
   234  	mgr.SetOwnerLabels(objs, s.Name, "")
   235  
   236  	changeSet, err := mgr.ApplyAll(ctx, objs, sap.ApplyOptions{
   237  		WaitTimeout: s.Spec.Timeout.Duration,
   238  	})
   239  	if err != nil {
   240  		return recerr.New(err, whv1.ApplyFailedReason)
   241  	}
   242  	log.Info("applied", "changeset", changeSet.ToMap())
   243  
   244  	newInventory := inventory.New(inventory.FromSapChangeSet(changeSet))
   245  	if err := r.prune(ctx, s, newInventory); err != nil {
   246  		return recerr.New(err, whv1.PruneFailedReason)
   247  	}
   248  
   249  	// Replace old inventory with new inventory now that we have diff'd and pruned
   250  	// any resources
   251  	s.Status.Inventory = newInventory
   252  	s.Status.LastApplied = s.Status.LastAttempted
   253  
   254  	if err := mgr.WaitForSet(ctx, changeSet.ToObjMetadataSet(), sap.WaitOptions{
   255  		Timeout: s.Spec.Timeout.Duration,
   256  	}); err != nil {
   257  		err := recerr.NewWait(err, whv1.ReconcileFailedReason, s.RetryInterval())
   258  		return err
   259  	}
   260  
   261  	return nil
   262  }
   263  
   264  func (r *ShipmentReconciler) prune(
   265  	ctx context.Context,
   266  	s *whv1.Shipment,
   267  	newInventory *inventory.ResourceInventory,
   268  ) error {
   269  	log := ctrl.LoggerFrom(ctx).WithName("prune")
   270  
   271  	switch {
   272  	case !s.Spec.Prune:
   273  		log.Info("pruning is turned off for this shipment")
   274  		return nil
   275  	case s.Status.Inventory == nil:
   276  		return nil
   277  	default:
   278  		diff, err := s.Status.Inventory.Diff(newInventory)
   279  		switch {
   280  		case err != nil:
   281  			return err
   282  		case len(diff) == 0:
   283  			return nil
   284  		}
   285  
   286  		// TODO: emit event with pruned resources?
   287  		deleted, err := r.ResourceManager.DeleteAll(ctx, diff, sap.DefaultDeleteOptions())
   288  		if err != nil {
   289  			return err
   290  		}
   291  		log.Info("pruned", "changeset", deleted.ToMap())
   292  		return nil
   293  	}
   294  }
   295  
   296  func (r *ShipmentReconciler) resolveParameters(
   297  	ctx context.Context,
   298  	s *whv1.Shipment,
   299  ) (map[string]string, recerr.Error) {
   300  	result := make(map[string]string, 0)
   301  	for _, p := range s.Spec.Rendering {
   302  		vars, err := p.Resolve(ctx, r.Client)
   303  		switch {
   304  		case errors.Is(err, whv1.ErrInvalidParameterMapping):
   305  			// TODO: only return ErrInvalidParameterMapping if the invalid params are
   306  			// inlined, because we should reconcile and pick up external updates in
   307  			// configmaps eventually
   308  			return nil, recerr.NewStalled(err, whv1.InvalidResourceReason)
   309  		case err != nil:
   310  			return nil, recerr.New(err, whv1.RenderingParameterParsingFailedReason)
   311  		}
   312  
   313  		for k, v := range vars {
   314  			result[k] = v
   315  		}
   316  	}
   317  
   318  	return result, nil
   319  }
   320  

View as plain text