1 package forwarder
2
3 import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "fmt"
8 "strings"
9 "testing"
10
11 "cloud.google.com/go/pubsub"
12 "github.com/google/go-containerregistry/pkg/name"
13 v1 "github.com/google/go-containerregistry/pkg/v1"
14 _ "github.com/jackc/pgx/v4/stdlib"
15
16 ksql "edge-infra.dev/pkg/f8n/kinform/sql"
17 sovereign "edge-infra.dev/pkg/f8n/sovereign/model"
18 "edge-infra.dev/pkg/f8n/warehouse/cluster"
19 "edge-infra.dev/pkg/f8n/warehouse/forwarder"
20 "edge-infra.dev/pkg/f8n/warehouse/oci/layer"
21 "edge-infra.dev/pkg/f8n/warehouse/oci/remote"
22 "edge-infra.dev/pkg/f8n/warehouse/pallet"
23 "edge-infra.dev/test/f2"
24 "edge-infra.dev/test/f2/x/postgres"
25 "edge-infra.dev/test/f2/x/warehouse"
26 )
27
28 var f f2.Framework
29
30
31 func TestMain(m *testing.M) {
32
33
34 f = f2.New(context.Background(), f2.WithExtensions(
35 &warehouse.Registry{},
36 postgres.New(
37 postgres.SkipSchemaIsolation(),
38 ),
39 )).
40 Disruptive().
41 Component("forwarder")
42
43 f.Run(m)
44 }
45
46 func TestForward(t *testing.T) {
47 var reg *warehouse.Registry
48 var host string
49 var projectID string
50 var hash v1.Hash
51 pkgName := "subscriber-test"
52 ft := f2.NewFeature("forward").
53 Setup("forwarder setup", func(ctx f2.Context, t *testing.T) f2.Context {
54 reg = warehouse.FromContextT(ctx, t)
55
56
57
58 urlPts := strings.Split(reg.URL, "/")
59 host = urlPts[0]
60 projectID = urlPts[1]
61
62
63 l, err := layer.New(layer.Runtime, []byte("hello"))
64 if err != nil {
65 t.Fatal(err)
66 }
67 layers := []layer.Layer{l}
68 a, err := pallet.Image(pallet.Options{
69 Metadata: pallet.Metadata{
70 Name: "subscriber-test",
71 Team: "f8n",
72 BuildInfo: pallet.BuildInfo{
73 Created: "yesterday",
74 Source: "https://gothub.com/ncrvoyix-swt-retail/edge-infra",
75 Revision: "d34db33f",
76 Version: "1.2.3",
77 },
78 },
79 ClusterProviders: cluster.BuiltInProviders(),
80 }, layers...)
81 if err != nil {
82 t.Fatal("failed to create test pallet", "subscriber-test", err)
83 }
84 hash, err = a.Digest()
85 if err != nil {
86 t.Fatal(err)
87 }
88
89 tag := "late-test"
90 pushPath := fmt.Sprintf("%s/%s", "f2repo", pkgName)
91 if err := reg.Push(a, pushPath, tag, remote.WithoutAuth()); err != nil {
92 t.Fatal("failed to push test pallet", err)
93 }
94 return ctx
95 }).
96 Test("default destinations", func(ctx f2.Context, t *testing.T) f2.Context {
97 f := &forwarder.Fwder{
98 DST: []forwarder.Destination{
99 {
100 ProjectID: projectID,
101 Repository: "f2repo",
102 },
103 },
104 SourceRepository: "",
105 }
106 expectImage := "subscriber-test"
107 data, err := json.Marshal(forwarder.Message{
108 Action: forwarder.Insertion,
109 Digest: fmt.Sprintf(
110 "%s/%s/f2repo/%s@%s",
111 host,
112 projectID,
113 expectImage,
114 hash.String(),
115 ),
116 })
117 if err != nil {
118 t.Fatal(err)
119 }
120 if err := f.HandleMsg(ctx, &pubsub.Message{Data: data}); err != nil {
121 t.Logf("HandleMsg returned non-fatal error: %v", err)
122 }
123
124
125 refStr := fmt.Sprintf("%s/%s/%s/%s@%s", host, projectID, "f2repo", pkgName, hash.String())
126 ref, err := name.ParseReference(refStr)
127 if err != nil {
128 t.Fatal(err)
129 }
130 _, err = remote.Get(ref)
131 if err != nil {
132 t.Fatal(err)
133 }
134
135 return ctx
136 }).
137 Test("trims -rc suffix", func(ctx f2.Context, t *testing.T) f2.Context {
138 f := &forwarder.Fwder{
139 DST: []forwarder.Destination{
140 {
141 ProjectID: projectID,
142 Repository: "f2repo",
143 },
144 },
145 SourceRepository: "",
146 }
147 data, err := json.Marshal(forwarder.Message{
148 Action: forwarder.Insertion,
149 Digest: fmt.Sprintf(
150 "%s/%s/f2repo/%s@%s",
151 host,
152 projectID,
153 pkgName,
154 hash.String(),
155 ),
156 Tag: fmt.Sprintf(
157 "%s/%s/f2repo/%s:%s",
158 host,
159 projectID,
160 pkgName,
161 "0.0.0-rc",
162 ),
163 })
164 if err != nil {
165 t.Fatal(err)
166 }
167 if err := f.HandleMsg(ctx, &pubsub.Message{Data: data}); err != nil {
168 t.Logf("HandleMsg returned non-fatal error: %v", err)
169 }
170
171
172 refStr := fmt.Sprintf("%s/%s/%s/%s:%s", host, projectID, "f2repo", pkgName, "0.0.0")
173 ref, err := name.ParseReference(refStr)
174 if err != nil {
175 t.Fatal(err)
176 }
177 _, err = remote.Get(ref)
178 if err != nil {
179 t.Fatal(err)
180 }
181 return ctx
182 }).
183 Feature()
184
185 f.Test(t, ft)
186 }
187
188 func TestIngestMessage_InsertsArtifactVersion(t *testing.T) {
189 ft := f2.NewFeature("ingest").
190 Test("inserts artifact_version", func(ctx f2.Context, t *testing.T) f2.Context {
191 pg := postgres.FromContextT(ctx, t)
192 handle, err := ksql.FromDSN(pg.DSN(), pg.MaxConns, pg.MaxIdleConns)
193 if err != nil {
194 t.Fatal(err)
195 }
196
197
198 f := &forwarder.Fwder{
199 DST: []forwarder.Destination{},
200 SourceRepository: "",
201 SQL: handle,
202 }
203
204 expectImage := "registryforwarder"
205 expectDigest := "0ce1415bb217137919be34d8e9b55081e0a379c5c979f49fb63a8870208e5fc7"
206 expectTag := "other-docker.pkg.dev/ret-edge-stage1-foreman/warehouse/distributed-storage:expected"
207 data, err := json.Marshal(forwarder.Message{
208 Action: forwarder.Insertion,
209 Digest: fmt.Sprintf(
210 "us-east1-docker.pkg.dev/ret-edge-pltf-infra/warehouse/%s@sha256:%s",
211 expectImage,
212 expectDigest,
213 ),
214 Tag: expectTag,
215 })
216 if err != nil {
217 t.Fatal(err)
218 }
219
220 if err := f.HandleMsg(ctx, &pubsub.Message{Data: data}); err != nil {
221 t.Logf("HandleMsg returned non-fatal error: %v", err)
222 }
223
224 tq := `
225 SELECT image, tag, sha256_digest
226 FROM artifact_versions
227 WHERE sha256_digest = $1`
228 row := f.SQL.QueryRowContext(ctx, tq, expectDigest)
229 var image string
230 var tag string
231 var digest string
232 err = row.Scan(&image, &tag, &digest)
233 if err != nil {
234 t.Fatal(err)
235 }
236 if expectImage != image {
237 t.Errorf("expected artifact_version.image not found. expected '%s', got: '%s'",
238 expectImage,
239 image,
240 )
241 }
242 if expectDigest != digest {
243 t.Errorf("expected artifact_version.sha256_digest not found. expected '%s', got: '%s'",
244 expectDigest,
245 image,
246 )
247 }
248 if expectTag != tag {
249 t.Errorf("expected artifact_version.tag not found. expected '%s', got: '%s'",
250 expectTag,
251 tag,
252 )
253 }
254
255 return ctx
256 }).
257 Test("inserts artifact_version no tag", func(ctx f2.Context, t *testing.T) f2.Context {
258 pg := postgres.FromContextT(ctx, t)
259 handle, err := ksql.FromDSN(pg.DSN(), pg.MaxConns, pg.MaxIdleConns)
260 if err != nil {
261 t.Fatal(err)
262 }
263
264
265 f := &forwarder.Fwder{
266 DST: []forwarder.Destination{},
267 SourceRepository: "",
268 SQL: handle,
269 }
270
271 expectImage := "registryforwarder"
272 expectDigest := "0ce1415bb217137919be34d8e9b55081e0a379c5c979f49fb63a8870208e5fc7"
273 data, err := json.Marshal(forwarder.Message{
274 Action: forwarder.Insertion,
275 Digest: fmt.Sprintf(
276 "us-east1-docker.pkg.dev/ret-edge-pltf-infra/warehouse/%s@sha256:%s",
277 expectImage,
278 expectDigest,
279 ),
280 })
281 if err != nil {
282 t.Fatal(err)
283 }
284
285 if err := f.HandleMsg(ctx, &pubsub.Message{Data: data}); err != nil {
286 t.Logf("HandleMsg returned non-fatal error: %v", err)
287 }
288
289 tq := `
290 SELECT image, tag, sha256_digest
291 FROM artifact_versions
292 WHERE sha256_digest = $1`
293 row := f.SQL.QueryRowContext(ctx, tq, expectDigest)
294 var image string
295 var tag string
296 var digest string
297 err = row.Scan(&image, &tag, &digest)
298 if err != nil {
299 t.Fatal(err)
300 }
301 if expectImage != image {
302 t.Errorf("expected artifact_version.image not found. expected '%s', got: '%s'",
303 expectImage,
304 image,
305 )
306 }
307 if expectDigest != digest {
308 t.Errorf("expected artifact_version.sha256_digest not found. expected '%s', got: '%s'",
309 expectDigest,
310 image,
311 )
312 }
313
314 return ctx
315 }).
316 Feature()
317 f.Test(t, ft)
318 }
319
320 func TestIngestMessage_DeletesArtifactVersion(t *testing.T) {
321 ft := f2.NewFeature("ingest").
322 Test("deletes artifact_versions", func(ctx f2.Context, t *testing.T) f2.Context {
323 pg := postgres.FromContextT(ctx, t)
324 handle, err := ksql.FromDSN(pg.DSN(), pg.MaxConns, pg.MaxIdleConns)
325 if err != nil {
326 t.Fatal(err)
327 }
328
329
330 f := &forwarder.Fwder{
331 DST: []forwarder.Destination{},
332 SourceRepository: "",
333 SQL: handle,
334 }
335
336 expectImage := "lumper-controller"
337 expectDigest := "dc0fe68c3ff914b4d97c2f7d4425cc1ff1ce5b9102c941fc70894738eb670272"
338 expectTag := "other-docker.pkg.dev/ret-edge-stage1-foreman/warehouse/distributed-storage:expected"
339 insertData, err := json.Marshal(forwarder.Message{
340 Action: forwarder.Insertion,
341 Digest: fmt.Sprintf(
342 "us-east1-docker.pkg.dev/ret-edge-pltf-infra/warehouse/%s@sha256:%s",
343 expectImage,
344 expectDigest,
345 ),
346 Tag: expectTag,
347 })
348 if err != nil {
349 t.Fatal(err)
350 }
351
352 if err := f.HandleMsg(ctx, &pubsub.Message{Data: insertData}); err != nil {
353 t.Fatal(err)
354 }
355
356 deleteData, err := json.Marshal(forwarder.Message{
357 Action: forwarder.Deletion,
358 Digest: fmt.Sprintf(
359 "us-east1-docker.pkg.dev/ret-edge-pltf-infra/warehouse/%s@sha256:%s",
360 expectImage,
361 expectDigest,
362 ),
363 Tag: expectTag,
364 })
365 if err != nil {
366 t.Fatal(err)
367 }
368 msg := &pubsub.Message{
369 Data: deleteData,
370 }
371 if err := f.HandleMsg(ctx, msg); err != nil {
372 t.Logf("HandleMsg returned non-fatal error: %v", err)
373 }
374
375 tq := `
376 SELECT image, sha256_digest
377 FROM artifact_versions
378 WHERE sha256_digest = $1`
379 row := f.SQL.QueryRowContext(ctx, tq, expectDigest)
380 var image string
381 var digest string
382 err = row.Scan(&image, &digest)
383 if err != sql.ErrNoRows {
384 t.Fatalf("artifact_version not deleted. found row with image: %s, sha256_digest: %s", image, digest)
385 }
386
387 return ctx
388 }).Feature()
389 f.Test(t, ft)
390 }
391
392 func TestIngest_Actions(t *testing.T) {
393 var (
394 fwd *forwarder.Fwder
395 image string
396 ref name.Digest
397 )
398 ft := f2.NewFeature("ingest").
399 Setup("ingest", func(ctx f2.Context, t *testing.T) f2.Context {
400 pg := postgres.FromContextT(ctx, t)
401 handle, err := ksql.FromDSN(pg.DSN(), pg.MaxConns, pg.MaxIdleConns)
402 if err != nil {
403 t.Fatal(err)
404 }
405
406
407 fwd = &forwarder.Fwder{
408 DST: []forwarder.Destination{},
409 SourceRepository: "",
410 SQL: handle,
411 }
412
413 image = "kinform-client"
414 digest := fmt.Sprintf(
415 "us-east1-docker.pkg.dev/ret-edge-pltf-infra/warehouse/%s@sha256:%s",
416 image,
417 "857339c60ea1dcd4b11567bfc4bc58e4c04bc742dacdaca7d9b01b452e1894b7",
418 )
419 ref, err = name.NewDigest(digest, name.StrictValidation)
420 if err != nil {
421 t.Fatal(err)
422 }
423
424 return ctx
425 }).
426 Test("INSERT", func(ctx f2.Context, t *testing.T) f2.Context {
427 err := fwd.Ingest(ctx, forwarder.Insertion, image, "test-ingest-tag", ref)
428 if err != nil {
429 t.Error(err)
430 }
431
432 return ctx
433 }).
434 Test("DELETE", func(ctx f2.Context, t *testing.T) f2.Context {
435 err := fwd.Ingest(ctx, forwarder.Deletion, image, "", ref)
436 if err != nil {
437 t.Error(err)
438 }
439 return ctx
440 }).
441 Feature()
442 f.Test(t, ft)
443 }
444
445 func TestPromote_DefaultDest(t *testing.T) {
446 ft := f2.NewFeature("promote").
447 Test("inserts artifacts", func(ctx f2.Context, t *testing.T) f2.Context {
448 pg := postgres.FromContextT(ctx, t)
449 handle, err := ksql.FromDSN(pg.DSN(), pg.MaxConns, pg.MaxIdleConns)
450 if err != nil {
451 t.Fatal(err)
452 }
453
454 expectProjectID := "ret-edge-testing-gang"
455 f := &forwarder.Fwder{
456 DST: []forwarder.Destination{
457 {
458 ProjectID: expectProjectID,
459 Repository: "warehouse",
460 },
461 },
462 SourceRepository: "",
463 SQL: handle,
464 }
465
466 promoteImage := "promote-me"
467 promoteDigest := strings.Repeat("0", 64)
468 promoteTag := "us-east1-docker.pkg.dev/ret-edge-pltf-infra/warehouse/distributed-storage:expected"
469 data, err := json.Marshal(forwarder.Message{
470 Action: forwarder.Insertion,
471 Digest: fmt.Sprintf(
472 "us-east1-docker.pkg.dev/ret-edge-pltf-infra/warehouse/%s@sha256:%s",
473 promoteImage,
474 promoteDigest,
475 ),
476 Tag: promoteTag,
477 })
478 if err != nil {
479 t.Fatal(err)
480 }
481
482 if err := f.HandleMsg(ctx, &pubsub.Message{
483 Data: data,
484 Attributes: map[string]string{
485 "promotion": "true",
486 },
487 }); err != nil {
488 t.Logf("HandleMsg returned non-fatal error: %v", err)
489 }
490
491 expectRepository := fmt.Sprintf(
492 "us-east1-docker.pkg.dev/%s/warehouse/%s",
493 expectProjectID,
494 promoteImage,
495 )
496
497 tq := `
498 SELECT project, repository
499 FROM artifacts
500 WHERE project = $1 AND repository = $2`
501 rows, err := f.SQL.QueryContext(ctx, tq, expectProjectID, expectRepository)
502 if err != nil {
503 t.Fatal(err)
504 }
505 var artifacts []sovereign.Artifact
506 for rows.Next() {
507 var project string
508 var repository string
509 err = rows.Scan(&project, &repository)
510 if err != nil {
511 t.Fatal(err)
512 }
513 artifacts = append(artifacts, sovereign.Artifact{
514 ProjectID: project,
515 Repository: repository,
516 })
517 }
518
519 if len(artifacts) != 1 {
520 t.Fatalf("expected a single artifact. found %d", len(artifacts))
521 }
522
523 if expectProjectID != artifacts[0].ProjectID {
524 t.Errorf("expected artifact.project not found. expected '%s', got: '%s'",
525 expectProjectID,
526 artifacts[0].ProjectID,
527 )
528 }
529 if expectRepository != artifacts[0].Repository {
530 t.Errorf("expected artifact.repository not found. expected '%s', got: '%s'",
531 expectRepository,
532 artifacts[0].Repository,
533 )
534 }
535 return ctx
536 }).Feature()
537 f.Test(t, ft)
538 }
539
View as plain text