...

Source file src/edge-infra.dev/pkg/f8n/warehouse/k8s/controllers/lumperctl/shipment_walker.go

Documentation: edge-infra.dev/pkg/f8n/warehouse/k8s/controllers/lumperctl

     1  package lumperctl
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"fmt"
     7  	"strings"
     8  
     9  	v1 "github.com/google/go-containerregistry/pkg/v1"
    10  	"github.com/google/go-containerregistry/pkg/v1/remote"
    11  	ctrl "sigs.k8s.io/controller-runtime"
    12  
    13  	"edge-infra.dev/pkg/f8n/warehouse"
    14  	"edge-infra.dev/pkg/f8n/warehouse/cluster"
    15  	whv1 "edge-infra.dev/pkg/f8n/warehouse/k8s/apis/v1alpha2"
    16  	"edge-infra.dev/pkg/f8n/warehouse/lift/unpack"
    17  	"edge-infra.dev/pkg/f8n/warehouse/oci"
    18  	"edge-infra.dev/pkg/f8n/warehouse/oci/walk"
    19  	"edge-infra.dev/pkg/f8n/warehouse/pallet"
    20  	"edge-infra.dev/pkg/k8s/meta"
    21  	"edge-infra.dev/pkg/k8s/runtime/controller/reconcile/recerr"
    22  )
    23  
    24  type shipmentWalker struct {
    25  	visited      map[string]v1.Hash
    26  	emptyPallets map[string]bool
    27  	result       *walkResult
    28  }
    29  
    30  // walkResult contains return values for resolvePallets
    31  type walkResult struct {
    32  	// unpackedPallets contains the K8s objects that should be applied as a result
    33  	// of resolution, i.e. all non-empty images which contain at least one Image manifest
    34  	unpackedPallets map[string]*whv1.UnpackedPallet
    35  	// resolved is a slice of digests for all images in the graph
    36  	// the ResolvedDigest may be different from the Digest if the image is an
    37  	// ImageIndex, in which case ResolvedDigest will be the digest of the provider
    38  	// specific Image
    39  	resolved []whv1.ResolvedArtifact
    40  }
    41  
    42  // resolvePallets loops over the pallets in the shipment and walks each one,
    43  // producing a resolved graph for all
    44  func (r *ShipmentReconciler) resolvePallets(
    45  	ctx context.Context,
    46  	s *whv1.Shipment,
    47  	parameters map[string]string,
    48  ) (*walkResult, recerr.Error) {
    49  	walkCtx := &shipmentWalker{
    50  		visited:      make(map[string]v1.Hash, 0),
    51  		emptyPallets: make(map[string]bool, 0),
    52  		result: &walkResult{
    53  			unpackedPallets: make(map[string]*whv1.UnpackedPallet, 0),
    54  			resolved:        []whv1.ResolvedArtifact{},
    55  		},
    56  	}
    57  
    58  	// Create keychain that is scoped to this reconcile loop, to ensure that we
    59  	// get up-to-date handles on our auth context (e.g., API key rotation)
    60  	keychain, recErr := r.Keychain(ctx, s)
    61  	if recErr != nil {
    62  		return nil, recErr
    63  	}
    64  
    65  	puller, err := remote.NewPuller(
    66  		remote.WithContext(ctx),
    67  		remote.WithAuthFromKeychain(keychain),
    68  	)
    69  	if err != nil {
    70  		recErr = recerr.New(
    71  			fmt.Errorf("failed to instantiate registry puller: %w", err),
    72  			whv1.FetchFailedReason,
    73  		)
    74  		recErr.ToCondition(s, whv1.FetchedArtifactCondition)
    75  		return nil, recErr
    76  	}
    77  
    78  	// for each artifact, build a walker and walk that dog
    79  	for _, p := range s.Spec.Pallets {
    80  		// fetch all dependent pallets from repo of pallet they are included in
    81  		fetchRepo := fmt.Sprintf("%s/%s", s.Spec.Repository, p.Name)
    82  		walker := &walk.Fns{
    83  			Index: walkCtx.bindIndexWalker(s, fetchRepo, parameters),
    84  			Image: walkCtx.bindImageWalker(s, fetchRepo, parameters),
    85  		}
    86  
    87  		// Fetch artifact
    88  		ref, err := parseRef(p.WithRepo(fetchRepo))
    89  		if err != nil {
    90  			return nil, err
    91  		}
    92  		a, err := r.Fetch(
    93  			ctrl.LoggerInto(ctx, logWithRef(ctrl.LoggerFrom(ctx), ref, p.Name)),
    94  			s,
    95  			ref,
    96  			s.Spec.PackagePullOptions.PackagePullPolicy,
    97  			remote.Reuse(puller),
    98  		)
    99  		if err != nil {
   100  			return nil, err
   101  		}
   102  
   103  		// perform walk
   104  		if err := walk.Walk(a, walker); err != nil {
   105  			return nil, recerr.New(
   106  				fmt.Errorf("failed to resolve graph for pallets: %w", err),
   107  				whv1.UnpackFailedReason,
   108  			)
   109  		}
   110  	}
   111  
   112  	if len(walkCtx.result.unpackedPallets) == 0 {
   113  		return nil, recerr.NewStalled(
   114  			fmt.Errorf("all pallets are empty for current unpacking options"),
   115  			whv1.UnpackFailedReason,
   116  		)
   117  	}
   118  
   119  	return walkCtx.result, nil
   120  }
   121  
   122  // IsPalletEmpty returns true if a pallet contains no manifests for the input
   123  // provider and infra options.
   124  // TODO: make this work for any oci.Artifact and make public?
   125  // TODO: refactor/clean up
   126  func isPalletEmpty(img v1.Image, unpackOptions whv1.UnpackOptions) (bool, error) {
   127  	layers, err := unpack.Layers(img, unpack.ForLayerKeys(unpackOptions.Layers()...))
   128  	if err != nil {
   129  		return false, err
   130  	}
   131  	return len(layers) == 0, nil
   132  }
   133  
   134  func (walker *shipmentWalker) bindIndexWalker(s *whv1.Shipment, fetchRepo string, parameters map[string]string) func(v1.ImageIndex, v1.ImageIndex) error {
   135  	return func(idx v1.ImageIndex, _ v1.ImageIndex) error {
   136  		// capture package name and digest for comparison on conflicts and keying
   137  		// our result
   138  		plt, err := pallet.New(idx)
   139  		if err != nil {
   140  			return err
   141  		}
   142  		name := plt.Name()
   143  		pltDigest, err := plt.Digest()
   144  		if err != nil {
   145  			return err
   146  		}
   147  
   148  		// check if we have encountered this package before
   149  		if digest, ok := walker.visited[name]; ok && pltDigest != digest {
   150  			// conflict error can only happen if have already found a value, so if
   151  			// acceptFirst is set to true, there is no conflict
   152  			if s.Spec.Resolution.AcceptFirst {
   153  				return nil
   154  			}
   155  
   156  			return oci.NewConflictErr(name, digest, pltDigest)
   157  		}
   158  
   159  		// if this is a composite package, there is nothing to resolve. continue with processing its children
   160  		isComposite, err := oci.IsComposite(idx)
   161  		if err != nil {
   162  			return err
   163  		} else if isComposite {
   164  			walker.result.resolved = append(walker.result.resolved, whv1.ResolvedArtifact{
   165  				Name:           name,
   166  				Digest:         pltDigest.String(),
   167  				ResolvedDigest: pltDigest.String(),
   168  				Version:        plt.Metadata().Version,
   169  			})
   170  			return nil
   171  		}
   172  
   173  		// resolve provider-specific variant so we can check for scenarios that
   174  		// produce empty pallets. we can also reuse the image to reduce the amount
   175  		// that needs to be walked when unpacking the package later
   176  		img, err := plt.Image(s.Spec.Provider)
   177  		switch {
   178  		case errors.Is(err, cluster.ErrUnsupportedProvider):
   179  			walker.emptyPallets[name] = true
   180  			return nil
   181  		case err != nil:
   182  			return err
   183  		}
   184  
   185  		l, err := unpack.Layers(img,
   186  			unpack.ForLayerKeys(s.Spec.UnpackOptions.Layers()...),
   187  		)
   188  		switch {
   189  		case err != nil:
   190  			return err
   191  		case len(l) == 0:
   192  			walker.emptyPallets[name] = true
   193  			return nil
   194  		}
   195  
   196  		// now that we have verified the package isn't empty based on our current
   197  		// unpacking options, begin constructing UnpackedPallet to schedule
   198  
   199  		// get digest from the provider-specific variant we resolved earlier
   200  		// to reduce the amount that needs to be walked when unpacking the package
   201  		digest, err := img.Digest()
   202  		if err != nil {
   203  			return err
   204  		}
   205  
   206  		up := s.UnpackedPallet(
   207  			whv1.NewArtifact(name, digest.String(), fetchRepo),
   208  			parameters,
   209  			plt.Metadata().K8sAnnotations(),
   210  		)
   211  		// TODO(dk185217): this is a hack- sets force+prune to false for lumper-controller pallet. Prevents CRD
   212  		// changes from deleting + recreating Shipment/UnpackedPallet CRDs, which causes fatal garabage collection.
   213  		// Remove if there is a better way to specify per-package apply options
   214  		if strings.Contains(name, "lumper-controller") {
   215  			up.Spec.Force = false
   216  			up.Spec.Prune = false
   217  		}
   218  
   219  		// loop over dependencies to compute spec.dependsOn
   220  		// TODO(aw185176): use match.FindManifests() here
   221  		manifest, err := idx.IndexManifest()
   222  		if err != nil {
   223  			return err
   224  		}
   225  		for _, d := range manifest.Manifests {
   226  			childName := d.Annotations[warehouse.AnnotationRefName]
   227  			// if the name is the same, this is a provider variant, not a dependency
   228  			// only process dependencies that aren't identified as being empty
   229  			if childName == name || walker.emptyPallets[childName] {
   230  				continue
   231  			}
   232  
   233  			// TODO(aw185176): restructure the walker so that we don't need to parse
   234  			// this information multiple times per pallet potentially
   235  			child, err := oci.ArtifactFromIdx(idx, d)
   236  			if err != nil {
   237  				return fmt.Errorf("failed to read child artifact for %s: %w", plt.Name(), err)
   238  			}
   239  			isComposite, err := oci.IsComposite(child)
   240  			if err != nil {
   241  				return fmt.Errorf("failed to check if child is for %s: %w", plt.Name(), err)
   242  			}
   243  			if isComposite {
   244  				continue
   245  			}
   246  
   247  			up.Spec.DependsOn = append(up.Spec.DependsOn, meta.LocalObjectReference{
   248  				Name: s.ChildResourceName(childName),
   249  			})
   250  		}
   251  
   252  		walker.result.unpackedPallets[name] = up
   253  		walker.visited[name] = pltDigest
   254  		walker.result.resolved = append(walker.result.resolved, whv1.ResolvedArtifact{
   255  			Name:           name,
   256  			Digest:         pltDigest.String(),
   257  			ResolvedDigest: digest.String(),
   258  			Version:        plt.Metadata().Version,
   259  		})
   260  		return nil
   261  	}
   262  }
   263  
   264  func (walker *shipmentWalker) bindImageWalker(s *whv1.Shipment, fetchRepo string, parameters map[string]string) func(v1.Image, v1.ImageIndex) error {
   265  	return func(img v1.Image, parent v1.ImageIndex) error {
   266  		// determine if this image is a standalone package or a provider
   267  		// variant of an index that we have already traversed.
   268  		isPkg, err := oci.IsPackage(img, parent)
   269  		// either exit early because its not a standalone package, or if we
   270  		// encountered an error while analyzing the artifact
   271  		if !isPkg || err != nil {
   272  			return err
   273  		}
   274  
   275  		plt, err := pallet.New(img)
   276  		if err != nil {
   277  			return err
   278  		}
   279  		name := plt.Name()
   280  		pltDigest, err := plt.Digest()
   281  		if err != nil {
   282  			return err
   283  		}
   284  
   285  		// check if we have encountered this package before
   286  		if digest, ok := walker.visited[name]; ok && pltDigest != digest {
   287  			// conflict error can only happen if have already found a value, so if
   288  			// acceptFirst is set to true, there is no conflict
   289  			if s.Spec.Resolution.AcceptFirst {
   290  				return nil
   291  			}
   292  
   293  			return oci.NewConflictErr(name, digest, pltDigest)
   294  		}
   295  
   296  		// check for empty pallet
   297  		empty, err := isPalletEmpty(img, s.Spec.UnpackOptions)
   298  		if err != nil {
   299  			return err
   300  		}
   301  		if empty || !plt.Supports(s.Spec.Provider) {
   302  			walker.emptyPallets[name] = true
   303  			return nil
   304  		}
   305  
   306  		up := s.UnpackedPallet(
   307  			whv1.NewArtifact(name, pltDigest.String(), fetchRepo),
   308  			parameters,
   309  			plt.Metadata().K8sAnnotations(),
   310  		)
   311  		// TODO(dk185217): this is a hack- sets force+prune to false for lumper-controller pallet. Prevents CRD
   312  		// changes from deleting + recreating Shipment/UnpackedPallet CRDs, which causes fatal garabage collection.
   313  		// Remove if there is a better way to specify per-package apply options
   314  		if strings.Contains(name, "lumper-controller") {
   315  			up.Spec.Force = false
   316  			up.Spec.Prune = false
   317  		}
   318  
   319  		// images are leaf nodes, cant have dependencies, so insert into our map
   320  		// and return
   321  		walker.result.unpackedPallets[name] = up
   322  		walker.visited[name] = pltDigest
   323  		walker.result.resolved = append(walker.result.resolved, whv1.ResolvedArtifact{
   324  			Name:           name,
   325  			Digest:         pltDigest.String(),
   326  			ResolvedDigest: pltDigest.String(),
   327  			Version:        plt.Metadata().Version,
   328  		})
   329  		return nil
   330  	}
   331  }
   332  

View as plain text