package forwarder import ( "context" "encoding/json" "fmt" "path" "strings" "cloud.google.com/go/pubsub" "github.com/google/go-containerregistry/pkg/name" "go.uber.org/multierr" "edge-infra.dev/pkg/f8n/kinform/sql" sovereign "edge-infra.dev/pkg/f8n/sovereign/model" "edge-infra.dev/pkg/f8n/warehouse/oci/remote" "edge-infra.dev/pkg/lib/fog" ) const ( Insertion = "INSERT" Deletion = "DELETE" ) type Fwder struct { DST []Destination // SourceRepository is an optional registry repository name that is used to filter // messages SourceRepository string SQL *sql.DBHandle } // Message defines the structure of payloads that the forwarder understands. It // is a superset of the payloads sent by Google Artifact Registry that allows // for other message producers to forward packages on-demand to specific projects // and registry instances. type Message struct { // Fields from Google Artifact Registry events. // https://cloud.google.com/artifact-registry/docs/configure-notifications#examples // Action is the operation that was taken (INSERT or DELETE). // Other message produces can omit the action field to simply forward a digest // from one registry to another on-demand. Action string `json:"action,omitempty"` // Digest is the full reference, including digest. // e.g., pkg.dev/ret-edge-dev-infra/warehouse/foo@sha256:... Digest string `json:"digest"` // Tag is an optional alias included in the payload from Google Artifact // Registry. Tag string `json:"tag"` // Optional destination information // ProjectID is the project ID where the destination registry exists. If // not provided, it defaults to the source project ID. ProjectID string `json:"projectID,omitempty"` // Repository is the name of the destination Google Artifact Registry repository // e.g. warehouse // TODO(dk185217): "registry" field tag retained for compatibility with external clients. This // should eventually be updated to "repository" Repository string `json:"registry,omitempty"` } func (f *Fwder) HandleMsg(ctx context.Context, msg *pubsub.Message) error { log := fog.FromContext(ctx) m := &Message{} if err := json.Unmarshal(msg.Data, m); err != nil { IncPackageForwardErrs(*m) return err } log = log.WithValues("ref", m.Digest) ref, err := name.NewDigest(m.Digest, name.StrictValidation) if err != nil { IncPackageForwardErrs(*m) return fmt.Errorf("failed to parse reference from digest %s: %w", m.Digest, err) } // e.g., us-east1-docker.pkg.dev, warehouse, registryforwarder registry, srcrepository, image := parseRef(ref) if m.Tag != "" { dstTag, err := name.NewTag(m.Tag, name.StrictValidation) if err != nil { IncPackageForwardErrs(*m) return fmt.Errorf("failed to parse reference from tag %s: %w", m.Tag, err) } // parse the expected registry from tag (used for promotions to non us locations) registry, _, _ = parseRef(dstTag) } if f.SourceRepository != "" && f.SourceRepository != srcrepository { log.Info("ignoring because repository doesn't match configured source", "repository", srcrepository, "source", f.SourceRepository, ) IncPackageForwardSkips(*m) return nil } // TODO(dk185217): best effort update db. nothing depends on the data yet so erroring // out doesnt make sense if err := f.Ingest(ctx, m.Action, image, m.Tag, ref); err != nil { log.Info("failed to ingest registry event. forwarding anyways", "error", err) } var destinations []Destination // If this message contain explicits forwarding destination information, use, otherwise forward // to all default destinations if m.ProjectID != "" && m.Repository != "" { destinations = append(destinations, Destination{ProjectID: m.ProjectID, Repository: m.Repository}) } else { if m.ProjectID != "" || m.Repository != "" { log.Info("partial forwarder config given. ignoring and using defaults", "projectID", m.ProjectID, "registry", m.Repository, ) } destinations = append(destinations, f.DST...) } if m.Action != "" && m.Action != Insertion { log.Info("ignoring because action is present and not INSERT", "action", m.Action) IncPackageForwardSkips(*m) return nil } isPromotion := msg.Attributes["promotion"] == "true" var fwdErrs []error for _, d := range destinations { // Create a copy of the message, filling in destination info fm := &Message{ ProjectID: d.ProjectID, Repository: d.Repository, Digest: m.Digest, Tag: m.Tag, } if isPromotion { if err := f.promote(ctx, fm, registry, image, ref); err != nil { log.Info("failed to insert promotion record. forwarding anyways", "error", err) } } if err := f.forward(ctx, fm, registry, image, ref); err != nil { fwdErrs = append(fwdErrs, err) } } return multierr.Combine(fwdErrs...) } func (f *Fwder) forward(ctx context.Context, m *Message, registry, image string, ref name.Digest) error { log := fog.FromContext(ctx) // compute base path for artifact, without tag or digest // e.g. us-east1-docker.pkg.dev/red-edge-env/warehouse/registryforwarder dstRoot := path.Join(registry, m.ProjectID, m.Repository, image) dst, err := name.ParseReference(fmt.Sprintf("%s@%s", dstRoot, ref.Identifier())) if err != nil { return fmt.Errorf("failed to parse new destination reference: %w", err) } log = log.WithValues("dst", dst.String()) log.Info("forwarding") a, err := remote.Get(ref) if err != nil { IncPackageForwardErrs(*m) return fmt.Errorf("failed to fetch %s: %w", m.Digest, err) } if err := remote.Write(a, dst); err != nil { IncPackageForwardErrs(*m) return fmt.Errorf("failed to write %s: %w", dst, err) } log.Info("forwarded") // copy tags from source to destination artifact if m.Tag != "" { ref, err := name.NewTag(m.Tag, name.StrictValidation) if err != nil { return fmt.Errorf("failed to parse reference from tag %s: %w", m.Tag, err) } dstTag := fmt.Sprintf("%s:%s", dstRoot, ref.Identifier()) tag, err := name.NewTag(dstTag, name.StrictValidation) if err != nil { return fmt.Errorf("failed to parse updated tag reference %s: %W", tag, err) } // master builds are tagged x.y.z-rc. being forwarded "releases" an rc, so trim // the -rc suffix leaving only the actual version trimmed, found := strings.CutSuffix(tag.TagStr(), "-rc") if found { tag = tag.Tag(trimmed) } if err := remote.Tag(tag, a); err != nil { return fmt.Errorf("failed to tag %s: %w", dstTag, err) } log.Info("tagged forwarded artifact", "tag", ref.Identifier()) } IncPackageForwards(*m) return nil } func (f *Fwder) Ingest(ctx context.Context, action, image, tag string, ref name.Digest) error { if f.SQL == nil { return fmt.Errorf("no database connection, cant ingest artifact") } digestPts := strings.Split(ref.DigestStr(), ":") if len(digestPts) != 2 && digestPts[0] != "sha256" { return fmt.Errorf("choked on digest string") } digest := digestPts[1] switch action { case Insertion: _, err := f.SQL.InsertArtifactVersion(ctx, image, tag, digest) if err != nil { return fmt.Errorf("failed to insert artifact_version. err: %v", err) } case Deletion: if err := f.SQL.DeleteArtifactVersion(ctx, image, digest); err != nil { return fmt.Errorf("failed to delete artifact_version. err: %v", err) } } return nil } func (f *Fwder) promote(ctx context.Context, m *Message, registry, image string, ref name.Digest) error { if f.SQL == nil { return fmt.Errorf("no database connection, cant promote artifact") } // TODO(dk185217): refactor. copied from ingest digestPts := strings.Split(ref.DigestStr(), ":") if len(digestPts) != 2 && digestPts[0] != "sha256" { return fmt.Errorf("choked on digest string") } digest := digestPts[1] // Make sure artifact_version exists before allowing promotion artifactVersion, err := f.SQL.QueryArtifactVersion(ctx, image, digest) if err != nil { return fmt.Errorf("artifact_version not found, cant promote. err: %v", err) } repository := path.Join(registry, m.ProjectID, m.Repository, image) a := sovereign.Artifact{ ProjectID: m.ProjectID, Repository: repository, ArtifactVersion: artifactVersion.ID, } _, err = f.SQL.InsertArtifact(ctx, a) if err != nil { return fmt.Errorf("failed to insert artifact. err: %v", err) } return nil }