...

Source file src/edge-infra.dev/pkg/f8n/warehouse/forwarder/subscriber.go

Documentation: edge-infra.dev/pkg/f8n/warehouse/forwarder

     1  package forwarder
     2  
     3  import (
     4  	"context"
     5  	"encoding/json"
     6  	"fmt"
     7  	"path"
     8  	"strings"
     9  
    10  	"cloud.google.com/go/pubsub"
    11  	"github.com/google/go-containerregistry/pkg/name"
    12  	"go.uber.org/multierr"
    13  
    14  	"edge-infra.dev/pkg/f8n/kinform/sql"
    15  	sovereign "edge-infra.dev/pkg/f8n/sovereign/model"
    16  	"edge-infra.dev/pkg/f8n/warehouse/oci/remote"
    17  	"edge-infra.dev/pkg/lib/fog"
    18  )
    19  
    20  const (
    21  	Insertion = "INSERT"
    22  	Deletion  = "DELETE"
    23  )
    24  
    25  type Fwder struct {
    26  	DST []Destination
    27  	// SourceRepository is an optional registry repository name that is used to filter
    28  	// messages
    29  	SourceRepository string
    30  	SQL              *sql.DBHandle
    31  }
    32  
    33  // Message defines the structure of payloads that the forwarder understands. It
    34  // is a superset of the payloads sent by Google Artifact Registry that allows
    35  // for other message producers to forward packages on-demand to specific projects
    36  // and registry instances.
    37  type Message struct {
    38  	// Fields from Google Artifact Registry events.
    39  	// https://cloud.google.com/artifact-registry/docs/configure-notifications#examples
    40  
    41  	// Action is the operation that was taken (INSERT or DELETE).
    42  	// Other message produces can omit the action field to simply forward a digest
    43  	// from one registry to another on-demand.
    44  	Action string `json:"action,omitempty"`
    45  
    46  	// Digest is the full reference, including digest.
    47  	// e.g., pkg.dev/ret-edge-dev-infra/warehouse/foo@sha256:...
    48  	Digest string `json:"digest"`
    49  
    50  	// Tag is an optional alias included in the payload from Google Artifact
    51  	// Registry.
    52  	Tag string `json:"tag"`
    53  
    54  	// Optional destination information
    55  
    56  	// ProjectID is the project ID where the destination registry exists. If
    57  	// not provided, it defaults to the source project ID.
    58  	ProjectID string `json:"projectID,omitempty"`
    59  
    60  	// Repository is the name of the destination Google Artifact Registry repository
    61  	// e.g. warehouse
    62  	// TODO(dk185217): "registry" field tag retained for compatibility with external clients. This
    63  	// should eventually be updated to "repository"
    64  	Repository string `json:"registry,omitempty"`
    65  }
    66  
    67  func (f *Fwder) HandleMsg(ctx context.Context, msg *pubsub.Message) error {
    68  	log := fog.FromContext(ctx)
    69  
    70  	m := &Message{}
    71  	if err := json.Unmarshal(msg.Data, m); err != nil {
    72  		IncPackageForwardErrs(*m)
    73  		return err
    74  	}
    75  	log = log.WithValues("ref", m.Digest)
    76  
    77  	ref, err := name.NewDigest(m.Digest, name.StrictValidation)
    78  	if err != nil {
    79  		IncPackageForwardErrs(*m)
    80  		return fmt.Errorf("failed to parse reference from digest %s: %w", m.Digest, err)
    81  	}
    82  
    83  	// e.g., us-east1-docker.pkg.dev, warehouse, registryforwarder
    84  	registry, srcrepository, image := parseRef(ref)
    85  
    86  	if m.Tag != "" {
    87  		dstTag, err := name.NewTag(m.Tag, name.StrictValidation)
    88  		if err != nil {
    89  			IncPackageForwardErrs(*m)
    90  			return fmt.Errorf("failed to parse reference from tag %s: %w", m.Tag, err)
    91  		}
    92  
    93  		// parse the expected registry from tag (used for promotions to non us locations)
    94  		registry, _, _ = parseRef(dstTag)
    95  	}
    96  
    97  	if f.SourceRepository != "" && f.SourceRepository != srcrepository {
    98  		log.Info("ignoring because repository doesn't match configured source",
    99  			"repository", srcrepository,
   100  			"source", f.SourceRepository,
   101  		)
   102  		IncPackageForwardSkips(*m)
   103  		return nil
   104  	}
   105  
   106  	// TODO(dk185217): best effort update db. nothing depends on the data yet so erroring
   107  	// out doesnt make sense
   108  	if err := f.Ingest(ctx, m.Action, image, m.Tag, ref); err != nil {
   109  		log.Info("failed to ingest registry event. forwarding anyways", "error", err)
   110  	}
   111  
   112  	var destinations []Destination
   113  	// If this message contain explicits forwarding destination information, use, otherwise forward
   114  	// to all default destinations
   115  	if m.ProjectID != "" && m.Repository != "" {
   116  		destinations = append(destinations, Destination{ProjectID: m.ProjectID, Repository: m.Repository})
   117  	} else {
   118  		if m.ProjectID != "" || m.Repository != "" {
   119  			log.Info("partial forwarder config given. ignoring and using defaults",
   120  				"projectID", m.ProjectID, "registry", m.Repository,
   121  			)
   122  		}
   123  		destinations = append(destinations, f.DST...)
   124  	}
   125  
   126  	if m.Action != "" && m.Action != Insertion {
   127  		log.Info("ignoring because action is present and not INSERT", "action", m.Action)
   128  		IncPackageForwardSkips(*m)
   129  		return nil
   130  	}
   131  
   132  	isPromotion := msg.Attributes["promotion"] == "true"
   133  	var fwdErrs []error
   134  	for _, d := range destinations {
   135  		// Create a copy of the message, filling in destination info
   136  		fm := &Message{
   137  			ProjectID:  d.ProjectID,
   138  			Repository: d.Repository,
   139  			Digest:     m.Digest,
   140  			Tag:        m.Tag,
   141  		}
   142  		if isPromotion {
   143  			if err := f.promote(ctx, fm, registry, image, ref); err != nil {
   144  				log.Info("failed to insert promotion record. forwarding anyways",
   145  					"error", err)
   146  			}
   147  		}
   148  		if err := f.forward(ctx, fm, registry, image, ref); err != nil {
   149  			fwdErrs = append(fwdErrs, err)
   150  		}
   151  	}
   152  	return multierr.Combine(fwdErrs...)
   153  }
   154  
   155  func (f *Fwder) forward(ctx context.Context, m *Message, registry, image string, ref name.Digest) error {
   156  	log := fog.FromContext(ctx)
   157  
   158  	// compute base path for artifact, without tag or digest
   159  	// e.g. us-east1-docker.pkg.dev/red-edge-env/warehouse/registryforwarder
   160  	dstRoot := path.Join(registry, m.ProjectID, m.Repository, image)
   161  
   162  	dst, err := name.ParseReference(fmt.Sprintf("%s@%s", dstRoot, ref.Identifier()))
   163  	if err != nil {
   164  		return fmt.Errorf("failed to parse new destination reference: %w", err)
   165  	}
   166  	log = log.WithValues("dst", dst.String())
   167  	log.Info("forwarding")
   168  
   169  	a, err := remote.Get(ref)
   170  	if err != nil {
   171  		IncPackageForwardErrs(*m)
   172  		return fmt.Errorf("failed to fetch %s: %w", m.Digest, err)
   173  	}
   174  
   175  	if err := remote.Write(a, dst); err != nil {
   176  		IncPackageForwardErrs(*m)
   177  		return fmt.Errorf("failed to write %s: %w", dst, err)
   178  	}
   179  
   180  	log.Info("forwarded")
   181  
   182  	// copy tags from source to destination artifact
   183  	if m.Tag != "" {
   184  		ref, err := name.NewTag(m.Tag, name.StrictValidation)
   185  		if err != nil {
   186  			return fmt.Errorf("failed to parse reference from tag %s: %w", m.Tag, err)
   187  		}
   188  		dstTag := fmt.Sprintf("%s:%s", dstRoot, ref.Identifier())
   189  		tag, err := name.NewTag(dstTag, name.StrictValidation)
   190  		if err != nil {
   191  			return fmt.Errorf("failed to parse updated tag reference %s: %W", tag, err)
   192  		}
   193  
   194  		// master builds are tagged x.y.z-rc. being forwarded "releases" an rc, so trim
   195  		// the -rc suffix leaving only the actual version
   196  		trimmed, found := strings.CutSuffix(tag.TagStr(), "-rc")
   197  		if found {
   198  			tag = tag.Tag(trimmed)
   199  		}
   200  		if err := remote.Tag(tag, a); err != nil {
   201  			return fmt.Errorf("failed to tag %s: %w", dstTag, err)
   202  		}
   203  		log.Info("tagged forwarded artifact", "tag", ref.Identifier())
   204  	}
   205  
   206  	IncPackageForwards(*m)
   207  	return nil
   208  }
   209  
   210  func (f *Fwder) Ingest(ctx context.Context, action, image, tag string, ref name.Digest) error {
   211  	if f.SQL == nil {
   212  		return fmt.Errorf("no database connection, cant ingest artifact")
   213  	}
   214  	digestPts := strings.Split(ref.DigestStr(), ":")
   215  	if len(digestPts) != 2 && digestPts[0] != "sha256" {
   216  		return fmt.Errorf("choked on digest string")
   217  	}
   218  	digest := digestPts[1]
   219  
   220  	switch action {
   221  	case Insertion:
   222  		_, err := f.SQL.InsertArtifactVersion(ctx, image, tag, digest)
   223  		if err != nil {
   224  			return fmt.Errorf("failed to insert artifact_version. err: %v", err)
   225  		}
   226  	case Deletion:
   227  		if err := f.SQL.DeleteArtifactVersion(ctx, image, digest); err != nil {
   228  			return fmt.Errorf("failed to delete artifact_version. err: %v", err)
   229  		}
   230  	}
   231  
   232  	return nil
   233  }
   234  
   235  func (f *Fwder) promote(ctx context.Context, m *Message, registry, image string, ref name.Digest) error {
   236  	if f.SQL == nil {
   237  		return fmt.Errorf("no database connection, cant promote artifact")
   238  	}
   239  	// TODO(dk185217): refactor. copied from ingest
   240  	digestPts := strings.Split(ref.DigestStr(), ":")
   241  	if len(digestPts) != 2 && digestPts[0] != "sha256" {
   242  		return fmt.Errorf("choked on digest string")
   243  	}
   244  	digest := digestPts[1]
   245  
   246  	// Make sure artifact_version exists before allowing promotion
   247  	artifactVersion, err := f.SQL.QueryArtifactVersion(ctx, image, digest)
   248  	if err != nil {
   249  		return fmt.Errorf("artifact_version not found, cant promote. err: %v", err)
   250  	}
   251  
   252  	repository := path.Join(registry, m.ProjectID, m.Repository, image)
   253  	a := sovereign.Artifact{
   254  		ProjectID:       m.ProjectID,
   255  		Repository:      repository,
   256  		ArtifactVersion: artifactVersion.ID,
   257  	}
   258  	_, err = f.SQL.InsertArtifact(ctx, a)
   259  	if err != nil {
   260  		return fmt.Errorf("failed to insert artifact. err: %v", err)
   261  	}
   262  	return nil
   263  }
   264  

View as plain text