package forwarder import ( "context" "database/sql" "encoding/json" "fmt" "strings" "testing" "cloud.google.com/go/pubsub" "github.com/google/go-containerregistry/pkg/name" v1 "github.com/google/go-containerregistry/pkg/v1" _ "github.com/jackc/pgx/v4/stdlib" // nolint necessary for db driver ksql "edge-infra.dev/pkg/f8n/kinform/sql" sovereign "edge-infra.dev/pkg/f8n/sovereign/model" "edge-infra.dev/pkg/f8n/warehouse/cluster" "edge-infra.dev/pkg/f8n/warehouse/forwarder" "edge-infra.dev/pkg/f8n/warehouse/oci/layer" "edge-infra.dev/pkg/f8n/warehouse/oci/remote" "edge-infra.dev/pkg/f8n/warehouse/pallet" "edge-infra.dev/test/f2" "edge-infra.dev/test/f2/x/postgres" "edge-infra.dev/test/f2/x/warehouse" ) var f f2.Framework // TestMain sets up an embedded database to test ingestion func TestMain(m *testing.M) { // TODO(dk185217): Only register registry ext. if integration? Need to create // f2 first to bind integration flag... Future: Integration level will change f = f2.New(context.Background(), f2.WithExtensions( &warehouse.Registry{}, postgres.New( postgres.SkipSchemaIsolation(), ), )). Disruptive(). // Disruptive as skipping schema isolation in postgres Component("forwarder") f.Run(m) } func TestForward(t *testing.T) { var reg *warehouse.Registry var host string var projectID string var hash v1.Hash pkgName := "subscriber-test" ft := f2.NewFeature("forward"). Setup("forwarder setup", func(ctx f2.Context, t *testing.T) f2.Context { reg = warehouse.FromContextT(ctx, t) // forwarder parseRef assumes 3 part URL because GAR. ours will have // some random path eg 127.0.0.1:54195/7c53b013. just use that as // placeholder for projectid urlPts := strings.Split(reg.URL, "/") host = urlPts[0] projectID = urlPts[1] // create and push an artifact to the test registry l, err := layer.New(layer.Runtime, []byte("hello")) if err != nil { t.Fatal(err) } layers := []layer.Layer{l} a, err := pallet.Image(pallet.Options{ Metadata: pallet.Metadata{ Name: "subscriber-test", Team: "f8n", BuildInfo: pallet.BuildInfo{ Created: "yesterday", Source: "https://gothub.com/ncrvoyix-swt-retail/edge-infra", Revision: "d34db33f", Version: "1.2.3", }, }, ClusterProviders: cluster.BuiltInProviders(), }, layers...) if err != nil { t.Fatal("failed to create test pallet", "subscriber-test", err) } hash, err = a.Digest() if err != nil { t.Fatal(err) } tag := "late-test" pushPath := fmt.Sprintf("%s/%s", "f2repo", pkgName) if err := reg.Push(a, pushPath, tag, remote.WithoutAuth()); err != nil { t.Fatal("failed to push test pallet", err) } return ctx }). Test("default destinations", func(ctx f2.Context, t *testing.T) f2.Context { f := &forwarder.Fwder{ DST: []forwarder.Destination{ { ProjectID: projectID, Repository: "f2repo", }, }, SourceRepository: "", } expectImage := "subscriber-test" data, err := json.Marshal(forwarder.Message{ Action: forwarder.Insertion, Digest: fmt.Sprintf( "%s/%s/f2repo/%s@%s", host, projectID, expectImage, hash.String(), ), }) if err != nil { t.Fatal(err) } if err := f.HandleMsg(ctx, &pubsub.Message{Data: data}); err != nil { t.Logf("HandleMsg returned non-fatal error: %v", err) } // check if image was pushed to registry refStr := fmt.Sprintf("%s/%s/%s/%s@%s", host, projectID, "f2repo", pkgName, hash.String()) ref, err := name.ParseReference(refStr) if err != nil { t.Fatal(err) } _, err = remote.Get(ref) if err != nil { t.Fatal(err) } return ctx }). Test("trims -rc suffix", func(ctx f2.Context, t *testing.T) f2.Context { f := &forwarder.Fwder{ DST: []forwarder.Destination{ { ProjectID: projectID, Repository: "f2repo", }, }, SourceRepository: "", } data, err := json.Marshal(forwarder.Message{ Action: forwarder.Insertion, Digest: fmt.Sprintf( "%s/%s/f2repo/%s@%s", host, projectID, pkgName, hash.String(), ), Tag: fmt.Sprintf( "%s/%s/f2repo/%s:%s", host, projectID, pkgName, "0.0.0-rc", ), }) if err != nil { t.Fatal(err) } if err := f.HandleMsg(ctx, &pubsub.Message{Data: data}); err != nil { t.Logf("HandleMsg returned non-fatal error: %v", err) } // check if image was pushed to registry, -rc suffix should be trimmed from tag refStr := fmt.Sprintf("%s/%s/%s/%s:%s", host, projectID, "f2repo", pkgName, "0.0.0") ref, err := name.ParseReference(refStr) if err != nil { t.Fatal(err) } _, err = remote.Get(ref) if err != nil { t.Fatal(err) } return ctx }). Feature() f.Test(t, ft) } func TestIngestMessage_InsertsArtifactVersion(t *testing.T) { ft := f2.NewFeature("ingest"). Test("inserts artifact_version", func(ctx f2.Context, t *testing.T) f2.Context { pg := postgres.FromContextT(ctx, t) handle, err := ksql.FromDSN(pg.DSN(), pg.MaxConns, pg.MaxIdleConns) if err != nil { t.Fatal(err) } // empty forwarding options, only ingestion into sql should occur f := &forwarder.Fwder{ DST: []forwarder.Destination{}, SourceRepository: "", SQL: handle, } expectImage := "registryforwarder" expectDigest := "0ce1415bb217137919be34d8e9b55081e0a379c5c979f49fb63a8870208e5fc7" expectTag := "other-docker.pkg.dev/ret-edge-stage1-foreman/warehouse/distributed-storage:expected" data, err := json.Marshal(forwarder.Message{ Action: forwarder.Insertion, Digest: fmt.Sprintf( "us-east1-docker.pkg.dev/ret-edge-pltf-infra/warehouse/%s@sha256:%s", expectImage, expectDigest, ), Tag: expectTag, }) if err != nil { t.Fatal(err) } if err := f.HandleMsg(ctx, &pubsub.Message{Data: data}); err != nil { t.Logf("HandleMsg returned non-fatal error: %v", err) } tq := ` SELECT image, tag, sha256_digest FROM artifact_versions WHERE sha256_digest = $1` row := f.SQL.QueryRowContext(ctx, tq, expectDigest) var image string var tag string var digest string err = row.Scan(&image, &tag, &digest) if err != nil { t.Fatal(err) } if expectImage != image { t.Errorf("expected artifact_version.image not found. expected '%s', got: '%s'", expectImage, image, ) } if expectDigest != digest { t.Errorf("expected artifact_version.sha256_digest not found. expected '%s', got: '%s'", expectDigest, image, ) } if expectTag != tag { t.Errorf("expected artifact_version.tag not found. expected '%s', got: '%s'", expectTag, tag, ) } return ctx }). Test("inserts artifact_version no tag", func(ctx f2.Context, t *testing.T) f2.Context { pg := postgres.FromContextT(ctx, t) handle, err := ksql.FromDSN(pg.DSN(), pg.MaxConns, pg.MaxIdleConns) if err != nil { t.Fatal(err) } // empty forwarding options, only ingestion into sql should occur f := &forwarder.Fwder{ DST: []forwarder.Destination{}, SourceRepository: "", SQL: handle, } expectImage := "registryforwarder" expectDigest := "0ce1415bb217137919be34d8e9b55081e0a379c5c979f49fb63a8870208e5fc7" data, err := json.Marshal(forwarder.Message{ Action: forwarder.Insertion, Digest: fmt.Sprintf( "us-east1-docker.pkg.dev/ret-edge-pltf-infra/warehouse/%s@sha256:%s", expectImage, expectDigest, ), }) if err != nil { t.Fatal(err) } if err := f.HandleMsg(ctx, &pubsub.Message{Data: data}); err != nil { t.Logf("HandleMsg returned non-fatal error: %v", err) } tq := ` SELECT image, tag, sha256_digest FROM artifact_versions WHERE sha256_digest = $1` row := f.SQL.QueryRowContext(ctx, tq, expectDigest) var image string var tag string var digest string err = row.Scan(&image, &tag, &digest) if err != nil { t.Fatal(err) } if expectImage != image { t.Errorf("expected artifact_version.image not found. expected '%s', got: '%s'", expectImage, image, ) } if expectDigest != digest { t.Errorf("expected artifact_version.sha256_digest not found. expected '%s', got: '%s'", expectDigest, image, ) } return ctx }). Feature() f.Test(t, ft) } func TestIngestMessage_DeletesArtifactVersion(t *testing.T) { ft := f2.NewFeature("ingest"). Test("deletes artifact_versions", func(ctx f2.Context, t *testing.T) f2.Context { pg := postgres.FromContextT(ctx, t) handle, err := ksql.FromDSN(pg.DSN(), pg.MaxConns, pg.MaxIdleConns) if err != nil { t.Fatal(err) } // empty forwarding options, only ingestion into sql should occur f := &forwarder.Fwder{ DST: []forwarder.Destination{}, SourceRepository: "", SQL: handle, } expectImage := "lumper-controller" expectDigest := "dc0fe68c3ff914b4d97c2f7d4425cc1ff1ce5b9102c941fc70894738eb670272" expectTag := "other-docker.pkg.dev/ret-edge-stage1-foreman/warehouse/distributed-storage:expected" insertData, err := json.Marshal(forwarder.Message{ Action: forwarder.Insertion, Digest: fmt.Sprintf( "us-east1-docker.pkg.dev/ret-edge-pltf-infra/warehouse/%s@sha256:%s", expectImage, expectDigest, ), Tag: expectTag, }) if err != nil { t.Fatal(err) } if err := f.HandleMsg(ctx, &pubsub.Message{Data: insertData}); err != nil { t.Fatal(err) } deleteData, err := json.Marshal(forwarder.Message{ Action: forwarder.Deletion, Digest: fmt.Sprintf( "us-east1-docker.pkg.dev/ret-edge-pltf-infra/warehouse/%s@sha256:%s", expectImage, expectDigest, ), Tag: expectTag, }) if err != nil { t.Fatal(err) } msg := &pubsub.Message{ Data: deleteData, } if err := f.HandleMsg(ctx, msg); err != nil { t.Logf("HandleMsg returned non-fatal error: %v", err) } tq := ` SELECT image, sha256_digest FROM artifact_versions WHERE sha256_digest = $1` row := f.SQL.QueryRowContext(ctx, tq, expectDigest) var image string var digest string err = row.Scan(&image, &digest) if err != sql.ErrNoRows { t.Fatalf("artifact_version not deleted. found row with image: %s, sha256_digest: %s", image, digest) } return ctx }).Feature() f.Test(t, ft) } func TestIngest_Actions(t *testing.T) { var ( fwd *forwarder.Fwder image string ref name.Digest ) ft := f2.NewFeature("ingest"). Setup("ingest", func(ctx f2.Context, t *testing.T) f2.Context { pg := postgres.FromContextT(ctx, t) handle, err := ksql.FromDSN(pg.DSN(), pg.MaxConns, pg.MaxIdleConns) if err != nil { t.Fatal(err) } // empty options, nothing should happen fwd = &forwarder.Fwder{ DST: []forwarder.Destination{}, SourceRepository: "", SQL: handle, } image = "kinform-client" digest := fmt.Sprintf( "us-east1-docker.pkg.dev/ret-edge-pltf-infra/warehouse/%s@sha256:%s", image, "857339c60ea1dcd4b11567bfc4bc58e4c04bc742dacdaca7d9b01b452e1894b7", ) ref, err = name.NewDigest(digest, name.StrictValidation) if err != nil { t.Fatal(err) } return ctx }). Test("INSERT", func(ctx f2.Context, t *testing.T) f2.Context { err := fwd.Ingest(ctx, forwarder.Insertion, image, "test-ingest-tag", ref) if err != nil { t.Error(err) } return ctx }). Test("DELETE", func(ctx f2.Context, t *testing.T) f2.Context { err := fwd.Ingest(ctx, forwarder.Deletion, image, "", ref) if err != nil { t.Error(err) } return ctx }). Feature() f.Test(t, ft) } func TestPromote_DefaultDest(t *testing.T) { ft := f2.NewFeature("promote"). Test("inserts artifacts", func(ctx f2.Context, t *testing.T) f2.Context { pg := postgres.FromContextT(ctx, t) handle, err := ksql.FromDSN(pg.DSN(), pg.MaxConns, pg.MaxIdleConns) if err != nil { t.Fatal(err) } expectProjectID := "ret-edge-testing-gang" f := &forwarder.Fwder{ DST: []forwarder.Destination{ { ProjectID: expectProjectID, Repository: "warehouse", }, }, SourceRepository: "", SQL: handle, } promoteImage := "promote-me" promoteDigest := strings.Repeat("0", 64) promoteTag := "us-east1-docker.pkg.dev/ret-edge-pltf-infra/warehouse/distributed-storage:expected" data, err := json.Marshal(forwarder.Message{ Action: forwarder.Insertion, Digest: fmt.Sprintf( "us-east1-docker.pkg.dev/ret-edge-pltf-infra/warehouse/%s@sha256:%s", promoteImage, promoteDigest, ), Tag: promoteTag, }) if err != nil { t.Fatal(err) } if err := f.HandleMsg(ctx, &pubsub.Message{ Data: data, Attributes: map[string]string{ "promotion": "true", }, }); err != nil { t.Logf("HandleMsg returned non-fatal error: %v", err) } expectRepository := fmt.Sprintf( "us-east1-docker.pkg.dev/%s/warehouse/%s", expectProjectID, promoteImage, ) tq := ` SELECT project, repository FROM artifacts WHERE project = $1 AND repository = $2` rows, err := f.SQL.QueryContext(ctx, tq, expectProjectID, expectRepository) if err != nil { t.Fatal(err) } var artifacts []sovereign.Artifact for rows.Next() { var project string var repository string err = rows.Scan(&project, &repository) if err != nil { t.Fatal(err) } artifacts = append(artifacts, sovereign.Artifact{ ProjectID: project, Repository: repository, }) } if len(artifacts) != 1 { t.Fatalf("expected a single artifact. found %d", len(artifacts)) } if expectProjectID != artifacts[0].ProjectID { t.Errorf("expected artifact.project not found. expected '%s', got: '%s'", expectProjectID, artifacts[0].ProjectID, ) } if expectRepository != artifacts[0].Repository { t.Errorf("expected artifact.repository not found. expected '%s', got: '%s'", expectRepository, artifacts[0].Repository, ) } return ctx }).Feature() f.Test(t, ft) }