package lumperctl import ( "context" "errors" "fmt" "strings" v1 "github.com/google/go-containerregistry/pkg/v1" "github.com/google/go-containerregistry/pkg/v1/remote" ctrl "sigs.k8s.io/controller-runtime" "edge-infra.dev/pkg/f8n/warehouse" "edge-infra.dev/pkg/f8n/warehouse/cluster" whv1 "edge-infra.dev/pkg/f8n/warehouse/k8s/apis/v1alpha2" "edge-infra.dev/pkg/f8n/warehouse/lift/unpack" "edge-infra.dev/pkg/f8n/warehouse/oci" "edge-infra.dev/pkg/f8n/warehouse/oci/walk" "edge-infra.dev/pkg/f8n/warehouse/pallet" "edge-infra.dev/pkg/k8s/meta" "edge-infra.dev/pkg/k8s/runtime/controller/reconcile/recerr" ) type shipmentWalker struct { visited map[string]v1.Hash emptyPallets map[string]bool result *walkResult } // walkResult contains return values for resolvePallets type walkResult struct { // unpackedPallets contains the K8s objects that should be applied as a result // of resolution, i.e. all non-empty images which contain at least one Image manifest unpackedPallets map[string]*whv1.UnpackedPallet // resolved is a slice of digests for all images in the graph // the ResolvedDigest may be different from the Digest if the image is an // ImageIndex, in which case ResolvedDigest will be the digest of the provider // specific Image resolved []whv1.ResolvedArtifact } // resolvePallets loops over the pallets in the shipment and walks each one, // producing a resolved graph for all func (r *ShipmentReconciler) resolvePallets( ctx context.Context, s *whv1.Shipment, parameters map[string]string, ) (*walkResult, recerr.Error) { walkCtx := &shipmentWalker{ visited: make(map[string]v1.Hash, 0), emptyPallets: make(map[string]bool, 0), result: &walkResult{ unpackedPallets: make(map[string]*whv1.UnpackedPallet, 0), resolved: []whv1.ResolvedArtifact{}, }, } // Create keychain that is scoped to this reconcile loop, to ensure that we // get up-to-date handles on our auth context (e.g., API key rotation) keychain, recErr := r.Keychain(ctx, s) if recErr != nil { return nil, recErr } puller, err := remote.NewPuller( remote.WithContext(ctx), remote.WithAuthFromKeychain(keychain), ) if err != nil { recErr = recerr.New( fmt.Errorf("failed to instantiate registry puller: %w", err), whv1.FetchFailedReason, ) recErr.ToCondition(s, whv1.FetchedArtifactCondition) return nil, recErr } // for each artifact, build a walker and walk that dog for _, p := range s.Spec.Pallets { // fetch all dependent pallets from repo of pallet they are included in fetchRepo := fmt.Sprintf("%s/%s", s.Spec.Repository, p.Name) walker := &walk.Fns{ Index: walkCtx.bindIndexWalker(s, fetchRepo, parameters), Image: walkCtx.bindImageWalker(s, fetchRepo, parameters), } // Fetch artifact ref, err := parseRef(p.WithRepo(fetchRepo)) if err != nil { return nil, err } a, err := r.Fetch( ctrl.LoggerInto(ctx, logWithRef(ctrl.LoggerFrom(ctx), ref, p.Name)), s, ref, s.Spec.PackagePullOptions.PackagePullPolicy, remote.Reuse(puller), ) if err != nil { return nil, err } // perform walk if err := walk.Walk(a, walker); err != nil { return nil, recerr.New( fmt.Errorf("failed to resolve graph for pallets: %w", err), whv1.UnpackFailedReason, ) } } if len(walkCtx.result.unpackedPallets) == 0 { return nil, recerr.NewStalled( fmt.Errorf("all pallets are empty for current unpacking options"), whv1.UnpackFailedReason, ) } return walkCtx.result, nil } // IsPalletEmpty returns true if a pallet contains no manifests for the input // provider and infra options. // TODO: make this work for any oci.Artifact and make public? // TODO: refactor/clean up func isPalletEmpty(img v1.Image, unpackOptions whv1.UnpackOptions) (bool, error) { layers, err := unpack.Layers(img, unpack.ForLayerKeys(unpackOptions.Layers()...)) if err != nil { return false, err } return len(layers) == 0, nil } func (walker *shipmentWalker) bindIndexWalker(s *whv1.Shipment, fetchRepo string, parameters map[string]string) func(v1.ImageIndex, v1.ImageIndex) error { return func(idx v1.ImageIndex, _ v1.ImageIndex) error { // capture package name and digest for comparison on conflicts and keying // our result plt, err := pallet.New(idx) if err != nil { return err } name := plt.Name() pltDigest, err := plt.Digest() if err != nil { return err } // check if we have encountered this package before if digest, ok := walker.visited[name]; ok && pltDigest != digest { // conflict error can only happen if have already found a value, so if // acceptFirst is set to true, there is no conflict if s.Spec.Resolution.AcceptFirst { return nil } return oci.NewConflictErr(name, digest, pltDigest) } // if this is a composite package, there is nothing to resolve. continue with processing its children isComposite, err := oci.IsComposite(idx) if err != nil { return err } else if isComposite { walker.result.resolved = append(walker.result.resolved, whv1.ResolvedArtifact{ Name: name, Digest: pltDigest.String(), ResolvedDigest: pltDigest.String(), Version: plt.Metadata().Version, }) return nil } // resolve provider-specific variant so we can check for scenarios that // produce empty pallets. we can also reuse the image to reduce the amount // that needs to be walked when unpacking the package later img, err := plt.Image(s.Spec.Provider) switch { case errors.Is(err, cluster.ErrUnsupportedProvider): walker.emptyPallets[name] = true return nil case err != nil: return err } l, err := unpack.Layers(img, unpack.ForLayerKeys(s.Spec.UnpackOptions.Layers()...), ) switch { case err != nil: return err case len(l) == 0: walker.emptyPallets[name] = true return nil } // now that we have verified the package isn't empty based on our current // unpacking options, begin constructing UnpackedPallet to schedule // get digest from the provider-specific variant we resolved earlier // to reduce the amount that needs to be walked when unpacking the package digest, err := img.Digest() if err != nil { return err } up := s.UnpackedPallet( whv1.NewArtifact(name, digest.String(), fetchRepo), parameters, plt.Metadata().K8sAnnotations(), ) // TODO(dk185217): this is a hack- sets force+prune to false for lumper-controller pallet. Prevents CRD // changes from deleting + recreating Shipment/UnpackedPallet CRDs, which causes fatal garabage collection. // Remove if there is a better way to specify per-package apply options if strings.Contains(name, "lumper-controller") { up.Spec.Force = false up.Spec.Prune = false } // loop over dependencies to compute spec.dependsOn // TODO(aw185176): use match.FindManifests() here manifest, err := idx.IndexManifest() if err != nil { return err } for _, d := range manifest.Manifests { childName := d.Annotations[warehouse.AnnotationRefName] // if the name is the same, this is a provider variant, not a dependency // only process dependencies that aren't identified as being empty if childName == name || walker.emptyPallets[childName] { continue } // TODO(aw185176): restructure the walker so that we don't need to parse // this information multiple times per pallet potentially child, err := oci.ArtifactFromIdx(idx, d) if err != nil { return fmt.Errorf("failed to read child artifact for %s: %w", plt.Name(), err) } isComposite, err := oci.IsComposite(child) if err != nil { return fmt.Errorf("failed to check if child is for %s: %w", plt.Name(), err) } if isComposite { continue } up.Spec.DependsOn = append(up.Spec.DependsOn, meta.LocalObjectReference{ Name: s.ChildResourceName(childName), }) } walker.result.unpackedPallets[name] = up walker.visited[name] = pltDigest walker.result.resolved = append(walker.result.resolved, whv1.ResolvedArtifact{ Name: name, Digest: pltDigest.String(), ResolvedDigest: digest.String(), Version: plt.Metadata().Version, }) return nil } } func (walker *shipmentWalker) bindImageWalker(s *whv1.Shipment, fetchRepo string, parameters map[string]string) func(v1.Image, v1.ImageIndex) error { return func(img v1.Image, parent v1.ImageIndex) error { // determine if this image is a standalone package or a provider // variant of an index that we have already traversed. isPkg, err := oci.IsPackage(img, parent) // either exit early because its not a standalone package, or if we // encountered an error while analyzing the artifact if !isPkg || err != nil { return err } plt, err := pallet.New(img) if err != nil { return err } name := plt.Name() pltDigest, err := plt.Digest() if err != nil { return err } // check if we have encountered this package before if digest, ok := walker.visited[name]; ok && pltDigest != digest { // conflict error can only happen if have already found a value, so if // acceptFirst is set to true, there is no conflict if s.Spec.Resolution.AcceptFirst { return nil } return oci.NewConflictErr(name, digest, pltDigest) } // check for empty pallet empty, err := isPalletEmpty(img, s.Spec.UnpackOptions) if err != nil { return err } if empty || !plt.Supports(s.Spec.Provider) { walker.emptyPallets[name] = true return nil } up := s.UnpackedPallet( whv1.NewArtifact(name, pltDigest.String(), fetchRepo), parameters, plt.Metadata().K8sAnnotations(), ) // TODO(dk185217): this is a hack- sets force+prune to false for lumper-controller pallet. Prevents CRD // changes from deleting + recreating Shipment/UnpackedPallet CRDs, which causes fatal garabage collection. // Remove if there is a better way to specify per-package apply options if strings.Contains(name, "lumper-controller") { up.Spec.Force = false up.Spec.Prune = false } // images are leaf nodes, cant have dependencies, so insert into our map // and return walker.result.unpackedPallets[name] = up walker.visited[name] = pltDigest walker.result.resolved = append(walker.result.resolved, whv1.ResolvedArtifact{ Name: name, Digest: pltDigest.String(), ResolvedDigest: pltDigest.String(), Version: plt.Metadata().Version, }) return nil } }