...

Source file src/edge-infra.dev/pkg/f8n/warehouse/forwarder/integration/subscriber_test.go

Documentation: edge-infra.dev/pkg/f8n/warehouse/forwarder/integration

     1  package forwarder
     2  
     3  import (
     4  	"context"
     5  	"database/sql"
     6  	"encoding/json"
     7  	"fmt"
     8  	"strings"
     9  	"testing"
    10  
    11  	"cloud.google.com/go/pubsub"
    12  	"github.com/google/go-containerregistry/pkg/name"
    13  	v1 "github.com/google/go-containerregistry/pkg/v1"
    14  	_ "github.com/jackc/pgx/v4/stdlib" // nolint necessary for db driver
    15  
    16  	ksql "edge-infra.dev/pkg/f8n/kinform/sql"
    17  	sovereign "edge-infra.dev/pkg/f8n/sovereign/model"
    18  	"edge-infra.dev/pkg/f8n/warehouse/cluster"
    19  	"edge-infra.dev/pkg/f8n/warehouse/forwarder"
    20  	"edge-infra.dev/pkg/f8n/warehouse/oci/layer"
    21  	"edge-infra.dev/pkg/f8n/warehouse/oci/remote"
    22  	"edge-infra.dev/pkg/f8n/warehouse/pallet"
    23  	"edge-infra.dev/test/f2"
    24  	"edge-infra.dev/test/f2/x/postgres"
    25  	"edge-infra.dev/test/f2/x/warehouse"
    26  )
    27  
    28  var f f2.Framework
    29  
    30  // TestMain sets up an embedded database to test ingestion
    31  func TestMain(m *testing.M) {
    32  	// TODO(dk185217): Only register registry ext. if integration? Need to create
    33  	// f2 first to bind integration flag... Future: Integration level will change
    34  	f = f2.New(context.Background(), f2.WithExtensions(
    35  		&warehouse.Registry{},
    36  		postgres.New(
    37  			postgres.SkipSchemaIsolation(),
    38  		),
    39  	)).
    40  		Disruptive(). // Disruptive as skipping schema isolation in postgres
    41  		Component("forwarder")
    42  
    43  	f.Run(m)
    44  }
    45  
    46  func TestForward(t *testing.T) {
    47  	var reg *warehouse.Registry
    48  	var host string
    49  	var projectID string
    50  	var hash v1.Hash
    51  	pkgName := "subscriber-test"
    52  	ft := f2.NewFeature("forward").
    53  		Setup("forwarder setup", func(ctx f2.Context, t *testing.T) f2.Context {
    54  			reg = warehouse.FromContextT(ctx, t)
    55  			// forwarder parseRef assumes 3 part URL because GAR. ours will have
    56  			// some random path eg 127.0.0.1:54195/7c53b013. just use that as
    57  			// placeholder for projectid
    58  			urlPts := strings.Split(reg.URL, "/")
    59  			host = urlPts[0]
    60  			projectID = urlPts[1]
    61  
    62  			// create and push an artifact to the test registry
    63  			l, err := layer.New(layer.Runtime, []byte("hello"))
    64  			if err != nil {
    65  				t.Fatal(err)
    66  			}
    67  			layers := []layer.Layer{l}
    68  			a, err := pallet.Image(pallet.Options{
    69  				Metadata: pallet.Metadata{
    70  					Name: "subscriber-test",
    71  					Team: "f8n",
    72  					BuildInfo: pallet.BuildInfo{
    73  						Created:  "yesterday",
    74  						Source:   "https://gothub.com/ncrvoyix-swt-retail/edge-infra",
    75  						Revision: "d34db33f",
    76  						Version:  "1.2.3",
    77  					},
    78  				},
    79  				ClusterProviders: cluster.BuiltInProviders(),
    80  			}, layers...)
    81  			if err != nil {
    82  				t.Fatal("failed to create test pallet", "subscriber-test", err)
    83  			}
    84  			hash, err = a.Digest()
    85  			if err != nil {
    86  				t.Fatal(err)
    87  			}
    88  
    89  			tag := "late-test"
    90  			pushPath := fmt.Sprintf("%s/%s", "f2repo", pkgName)
    91  			if err := reg.Push(a, pushPath, tag, remote.WithoutAuth()); err != nil {
    92  				t.Fatal("failed to push test pallet", err)
    93  			}
    94  			return ctx
    95  		}).
    96  		Test("default destinations", func(ctx f2.Context, t *testing.T) f2.Context {
    97  			f := &forwarder.Fwder{
    98  				DST: []forwarder.Destination{
    99  					{
   100  						ProjectID:  projectID,
   101  						Repository: "f2repo",
   102  					},
   103  				},
   104  				SourceRepository: "",
   105  			}
   106  			expectImage := "subscriber-test"
   107  			data, err := json.Marshal(forwarder.Message{
   108  				Action: forwarder.Insertion,
   109  				Digest: fmt.Sprintf(
   110  					"%s/%s/f2repo/%s@%s",
   111  					host,
   112  					projectID,
   113  					expectImage,
   114  					hash.String(),
   115  				),
   116  			})
   117  			if err != nil {
   118  				t.Fatal(err)
   119  			}
   120  			if err := f.HandleMsg(ctx, &pubsub.Message{Data: data}); err != nil {
   121  				t.Logf("HandleMsg returned non-fatal error: %v", err)
   122  			}
   123  
   124  			// check if image was pushed to registry
   125  			refStr := fmt.Sprintf("%s/%s/%s/%s@%s", host, projectID, "f2repo", pkgName, hash.String())
   126  			ref, err := name.ParseReference(refStr)
   127  			if err != nil {
   128  				t.Fatal(err)
   129  			}
   130  			_, err = remote.Get(ref)
   131  			if err != nil {
   132  				t.Fatal(err)
   133  			}
   134  
   135  			return ctx
   136  		}).
   137  		Test("trims -rc suffix", func(ctx f2.Context, t *testing.T) f2.Context {
   138  			f := &forwarder.Fwder{
   139  				DST: []forwarder.Destination{
   140  					{
   141  						ProjectID:  projectID,
   142  						Repository: "f2repo",
   143  					},
   144  				},
   145  				SourceRepository: "",
   146  			}
   147  			data, err := json.Marshal(forwarder.Message{
   148  				Action: forwarder.Insertion,
   149  				Digest: fmt.Sprintf(
   150  					"%s/%s/f2repo/%s@%s",
   151  					host,
   152  					projectID,
   153  					pkgName,
   154  					hash.String(),
   155  				),
   156  				Tag: fmt.Sprintf(
   157  					"%s/%s/f2repo/%s:%s",
   158  					host,
   159  					projectID,
   160  					pkgName,
   161  					"0.0.0-rc",
   162  				),
   163  			})
   164  			if err != nil {
   165  				t.Fatal(err)
   166  			}
   167  			if err := f.HandleMsg(ctx, &pubsub.Message{Data: data}); err != nil {
   168  				t.Logf("HandleMsg returned non-fatal error: %v", err)
   169  			}
   170  
   171  			// check if image was pushed to registry, -rc suffix should be trimmed from tag
   172  			refStr := fmt.Sprintf("%s/%s/%s/%s:%s", host, projectID, "f2repo", pkgName, "0.0.0")
   173  			ref, err := name.ParseReference(refStr)
   174  			if err != nil {
   175  				t.Fatal(err)
   176  			}
   177  			_, err = remote.Get(ref)
   178  			if err != nil {
   179  				t.Fatal(err)
   180  			}
   181  			return ctx
   182  		}).
   183  		Feature()
   184  
   185  	f.Test(t, ft)
   186  }
   187  
   188  func TestIngestMessage_InsertsArtifactVersion(t *testing.T) {
   189  	ft := f2.NewFeature("ingest").
   190  		Test("inserts artifact_version", func(ctx f2.Context, t *testing.T) f2.Context {
   191  			pg := postgres.FromContextT(ctx, t)
   192  			handle, err := ksql.FromDSN(pg.DSN(), pg.MaxConns, pg.MaxIdleConns)
   193  			if err != nil {
   194  				t.Fatal(err)
   195  			}
   196  
   197  			// empty forwarding options, only ingestion into sql should occur
   198  			f := &forwarder.Fwder{
   199  				DST:              []forwarder.Destination{},
   200  				SourceRepository: "",
   201  				SQL:              handle,
   202  			}
   203  
   204  			expectImage := "registryforwarder"
   205  			expectDigest := "0ce1415bb217137919be34d8e9b55081e0a379c5c979f49fb63a8870208e5fc7"
   206  			expectTag := "other-docker.pkg.dev/ret-edge-stage1-foreman/warehouse/distributed-storage:expected"
   207  			data, err := json.Marshal(forwarder.Message{
   208  				Action: forwarder.Insertion,
   209  				Digest: fmt.Sprintf(
   210  					"us-east1-docker.pkg.dev/ret-edge-pltf-infra/warehouse/%s@sha256:%s",
   211  					expectImage,
   212  					expectDigest,
   213  				),
   214  				Tag: expectTag,
   215  			})
   216  			if err != nil {
   217  				t.Fatal(err)
   218  			}
   219  
   220  			if err := f.HandleMsg(ctx, &pubsub.Message{Data: data}); err != nil {
   221  				t.Logf("HandleMsg returned non-fatal error: %v", err)
   222  			}
   223  
   224  			tq := `
   225  SELECT image, tag, sha256_digest
   226  FROM artifact_versions
   227  WHERE sha256_digest = $1`
   228  			row := f.SQL.QueryRowContext(ctx, tq, expectDigest)
   229  			var image string
   230  			var tag string
   231  			var digest string
   232  			err = row.Scan(&image, &tag, &digest)
   233  			if err != nil {
   234  				t.Fatal(err)
   235  			}
   236  			if expectImage != image {
   237  				t.Errorf("expected artifact_version.image not found. expected '%s', got: '%s'",
   238  					expectImage,
   239  					image,
   240  				)
   241  			}
   242  			if expectDigest != digest {
   243  				t.Errorf("expected artifact_version.sha256_digest not found. expected '%s', got: '%s'",
   244  					expectDigest,
   245  					image,
   246  				)
   247  			}
   248  			if expectTag != tag {
   249  				t.Errorf("expected artifact_version.tag not found. expected '%s', got: '%s'",
   250  					expectTag,
   251  					tag,
   252  				)
   253  			}
   254  
   255  			return ctx
   256  		}).
   257  		Test("inserts artifact_version no tag", func(ctx f2.Context, t *testing.T) f2.Context {
   258  			pg := postgres.FromContextT(ctx, t)
   259  			handle, err := ksql.FromDSN(pg.DSN(), pg.MaxConns, pg.MaxIdleConns)
   260  			if err != nil {
   261  				t.Fatal(err)
   262  			}
   263  
   264  			// empty forwarding options, only ingestion into sql should occur
   265  			f := &forwarder.Fwder{
   266  				DST:              []forwarder.Destination{},
   267  				SourceRepository: "",
   268  				SQL:              handle,
   269  			}
   270  
   271  			expectImage := "registryforwarder"
   272  			expectDigest := "0ce1415bb217137919be34d8e9b55081e0a379c5c979f49fb63a8870208e5fc7"
   273  			data, err := json.Marshal(forwarder.Message{
   274  				Action: forwarder.Insertion,
   275  				Digest: fmt.Sprintf(
   276  					"us-east1-docker.pkg.dev/ret-edge-pltf-infra/warehouse/%s@sha256:%s",
   277  					expectImage,
   278  					expectDigest,
   279  				),
   280  			})
   281  			if err != nil {
   282  				t.Fatal(err)
   283  			}
   284  
   285  			if err := f.HandleMsg(ctx, &pubsub.Message{Data: data}); err != nil {
   286  				t.Logf("HandleMsg returned non-fatal error: %v", err)
   287  			}
   288  
   289  			tq := `
   290  SELECT image, tag, sha256_digest
   291  FROM artifact_versions
   292  WHERE sha256_digest = $1`
   293  			row := f.SQL.QueryRowContext(ctx, tq, expectDigest)
   294  			var image string
   295  			var tag string
   296  			var digest string
   297  			err = row.Scan(&image, &tag, &digest)
   298  			if err != nil {
   299  				t.Fatal(err)
   300  			}
   301  			if expectImage != image {
   302  				t.Errorf("expected artifact_version.image not found. expected '%s', got: '%s'",
   303  					expectImage,
   304  					image,
   305  				)
   306  			}
   307  			if expectDigest != digest {
   308  				t.Errorf("expected artifact_version.sha256_digest not found. expected '%s', got: '%s'",
   309  					expectDigest,
   310  					image,
   311  				)
   312  			}
   313  
   314  			return ctx
   315  		}).
   316  		Feature()
   317  	f.Test(t, ft)
   318  }
   319  
   320  func TestIngestMessage_DeletesArtifactVersion(t *testing.T) {
   321  	ft := f2.NewFeature("ingest").
   322  		Test("deletes artifact_versions", func(ctx f2.Context, t *testing.T) f2.Context {
   323  			pg := postgres.FromContextT(ctx, t)
   324  			handle, err := ksql.FromDSN(pg.DSN(), pg.MaxConns, pg.MaxIdleConns)
   325  			if err != nil {
   326  				t.Fatal(err)
   327  			}
   328  
   329  			// empty forwarding options, only ingestion into sql should occur
   330  			f := &forwarder.Fwder{
   331  				DST:              []forwarder.Destination{},
   332  				SourceRepository: "",
   333  				SQL:              handle,
   334  			}
   335  
   336  			expectImage := "lumper-controller"
   337  			expectDigest := "dc0fe68c3ff914b4d97c2f7d4425cc1ff1ce5b9102c941fc70894738eb670272"
   338  			expectTag := "other-docker.pkg.dev/ret-edge-stage1-foreman/warehouse/distributed-storage:expected"
   339  			insertData, err := json.Marshal(forwarder.Message{
   340  				Action: forwarder.Insertion,
   341  				Digest: fmt.Sprintf(
   342  					"us-east1-docker.pkg.dev/ret-edge-pltf-infra/warehouse/%s@sha256:%s",
   343  					expectImage,
   344  					expectDigest,
   345  				),
   346  				Tag: expectTag,
   347  			})
   348  			if err != nil {
   349  				t.Fatal(err)
   350  			}
   351  
   352  			if err := f.HandleMsg(ctx, &pubsub.Message{Data: insertData}); err != nil {
   353  				t.Fatal(err)
   354  			}
   355  
   356  			deleteData, err := json.Marshal(forwarder.Message{
   357  				Action: forwarder.Deletion,
   358  				Digest: fmt.Sprintf(
   359  					"us-east1-docker.pkg.dev/ret-edge-pltf-infra/warehouse/%s@sha256:%s",
   360  					expectImage,
   361  					expectDigest,
   362  				),
   363  				Tag: expectTag,
   364  			})
   365  			if err != nil {
   366  				t.Fatal(err)
   367  			}
   368  			msg := &pubsub.Message{
   369  				Data: deleteData,
   370  			}
   371  			if err := f.HandleMsg(ctx, msg); err != nil {
   372  				t.Logf("HandleMsg returned non-fatal error: %v", err)
   373  			}
   374  
   375  			tq := `
   376  SELECT image, sha256_digest
   377  FROM artifact_versions
   378  WHERE sha256_digest = $1`
   379  			row := f.SQL.QueryRowContext(ctx, tq, expectDigest)
   380  			var image string
   381  			var digest string
   382  			err = row.Scan(&image, &digest)
   383  			if err != sql.ErrNoRows {
   384  				t.Fatalf("artifact_version not deleted. found row with image: %s, sha256_digest: %s", image, digest)
   385  			}
   386  
   387  			return ctx
   388  		}).Feature()
   389  	f.Test(t, ft)
   390  }
   391  
   392  func TestIngest_Actions(t *testing.T) {
   393  	var (
   394  		fwd   *forwarder.Fwder
   395  		image string
   396  		ref   name.Digest
   397  	)
   398  	ft := f2.NewFeature("ingest").
   399  		Setup("ingest", func(ctx f2.Context, t *testing.T) f2.Context {
   400  			pg := postgres.FromContextT(ctx, t)
   401  			handle, err := ksql.FromDSN(pg.DSN(), pg.MaxConns, pg.MaxIdleConns)
   402  			if err != nil {
   403  				t.Fatal(err)
   404  			}
   405  
   406  			// empty options, nothing should happen
   407  			fwd = &forwarder.Fwder{
   408  				DST:              []forwarder.Destination{},
   409  				SourceRepository: "",
   410  				SQL:              handle,
   411  			}
   412  
   413  			image = "kinform-client"
   414  			digest := fmt.Sprintf(
   415  				"us-east1-docker.pkg.dev/ret-edge-pltf-infra/warehouse/%s@sha256:%s",
   416  				image,
   417  				"857339c60ea1dcd4b11567bfc4bc58e4c04bc742dacdaca7d9b01b452e1894b7",
   418  			)
   419  			ref, err = name.NewDigest(digest, name.StrictValidation)
   420  			if err != nil {
   421  				t.Fatal(err)
   422  			}
   423  
   424  			return ctx
   425  		}).
   426  		Test("INSERT", func(ctx f2.Context, t *testing.T) f2.Context {
   427  			err := fwd.Ingest(ctx, forwarder.Insertion, image, "test-ingest-tag", ref)
   428  			if err != nil {
   429  				t.Error(err)
   430  			}
   431  
   432  			return ctx
   433  		}).
   434  		Test("DELETE", func(ctx f2.Context, t *testing.T) f2.Context {
   435  			err := fwd.Ingest(ctx, forwarder.Deletion, image, "", ref)
   436  			if err != nil {
   437  				t.Error(err)
   438  			}
   439  			return ctx
   440  		}).
   441  		Feature()
   442  	f.Test(t, ft)
   443  }
   444  
   445  func TestPromote_DefaultDest(t *testing.T) {
   446  	ft := f2.NewFeature("promote").
   447  		Test("inserts artifacts", func(ctx f2.Context, t *testing.T) f2.Context {
   448  			pg := postgres.FromContextT(ctx, t)
   449  			handle, err := ksql.FromDSN(pg.DSN(), pg.MaxConns, pg.MaxIdleConns)
   450  			if err != nil {
   451  				t.Fatal(err)
   452  			}
   453  
   454  			expectProjectID := "ret-edge-testing-gang"
   455  			f := &forwarder.Fwder{
   456  				DST: []forwarder.Destination{
   457  					{
   458  						ProjectID:  expectProjectID,
   459  						Repository: "warehouse",
   460  					},
   461  				},
   462  				SourceRepository: "",
   463  				SQL:              handle,
   464  			}
   465  
   466  			promoteImage := "promote-me"
   467  			promoteDigest := strings.Repeat("0", 64)
   468  			promoteTag := "us-east1-docker.pkg.dev/ret-edge-pltf-infra/warehouse/distributed-storage:expected"
   469  			data, err := json.Marshal(forwarder.Message{
   470  				Action: forwarder.Insertion,
   471  				Digest: fmt.Sprintf(
   472  					"us-east1-docker.pkg.dev/ret-edge-pltf-infra/warehouse/%s@sha256:%s",
   473  					promoteImage,
   474  					promoteDigest,
   475  				),
   476  				Tag: promoteTag,
   477  			})
   478  			if err != nil {
   479  				t.Fatal(err)
   480  			}
   481  
   482  			if err := f.HandleMsg(ctx, &pubsub.Message{
   483  				Data: data,
   484  				Attributes: map[string]string{
   485  					"promotion": "true",
   486  				},
   487  			}); err != nil {
   488  				t.Logf("HandleMsg returned non-fatal error: %v", err)
   489  			}
   490  
   491  			expectRepository := fmt.Sprintf(
   492  				"us-east1-docker.pkg.dev/%s/warehouse/%s",
   493  				expectProjectID,
   494  				promoteImage,
   495  			)
   496  
   497  			tq := `
   498  SELECT project, repository
   499  FROM artifacts
   500  WHERE project = $1 AND repository = $2`
   501  			rows, err := f.SQL.QueryContext(ctx, tq, expectProjectID, expectRepository)
   502  			if err != nil {
   503  				t.Fatal(err)
   504  			}
   505  			var artifacts []sovereign.Artifact
   506  			for rows.Next() {
   507  				var project string
   508  				var repository string
   509  				err = rows.Scan(&project, &repository)
   510  				if err != nil {
   511  					t.Fatal(err)
   512  				}
   513  				artifacts = append(artifacts, sovereign.Artifact{
   514  					ProjectID:  project,
   515  					Repository: repository,
   516  				})
   517  			}
   518  
   519  			if len(artifacts) != 1 {
   520  				t.Fatalf("expected a single artifact. found %d", len(artifacts))
   521  			}
   522  
   523  			if expectProjectID != artifacts[0].ProjectID {
   524  				t.Errorf("expected artifact.project not found. expected '%s', got: '%s'",
   525  					expectProjectID,
   526  					artifacts[0].ProjectID,
   527  				)
   528  			}
   529  			if expectRepository != artifacts[0].Repository {
   530  				t.Errorf("expected artifact.repository not found. expected '%s', got: '%s'",
   531  					expectRepository,
   532  					artifacts[0].Repository,
   533  				)
   534  			}
   535  			return ctx
   536  		}).Feature()
   537  	f.Test(t, ft)
   538  }
   539  

View as plain text