1 package integration
2
3 import (
4 "fmt"
5 "testing"
6 "time"
7
8 dsapi "edge-infra.dev/pkg/edge/datasync/apis/v1alpha1"
9 "edge-infra.dev/pkg/edge/datasync/cushion"
10 "edge-infra.dev/test/f2"
11 "edge-infra.dev/test/f2/x/ktest"
12
13 "github.com/go-kivik/kivik/v4"
14 "gotest.tools/v3/assert"
15
16 corev1 "k8s.io/api/core/v1"
17 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
18 "sigs.k8s.io/controller-runtime/pkg/client"
19 )
20
21 func TestPubSubReceiverHandle(t *testing.T) {
22 var daemon *cushion.Daemon
23
24 feat := f2.NewFeature("Daemon Test").
25 Setup("Daemon Setup", func(ctx f2.Context, t *testing.T) f2.Context {
26 log.Info("Daemon Setup")
27
28 err := cl.Create(ctx, &corev1.Namespace{
29 TypeMeta: metav1.TypeMeta{APIVersion: "v1", Kind: "Namespace"},
30 ObjectMeta: metav1.ObjectMeta{Name: cushionNS},
31 })
32 assert.NilError(t, client.IgnoreAlreadyExists(err), "fail to create daemon")
33
34 daemon, err = cushion.NewDaemon(cfg, opt...)
35 assert.NilError(t, err, "fail to create daemon")
36
37 return ctx
38 }).
39 Test("Verify PubSubReceiverHandle", func(ctx f2.Context, t *testing.T) f2.Context {
40 log.Info("Verifying PubSubReceiverHandle")
41
42 go func() {
43 err := daemon.Run(ctx)
44 assert.NilError(t, err, "fail to run daemon")
45 }()
46
47 return ctx
48 }).
49 Test("Send Datasync PubSub Message", func(ctx f2.Context, t *testing.T) f2.Context {
50 log.Info("Send PubSub Message")
51
52 err := publisher.Send(ctx, cfg.Topic, tlog, map[string]string{
53 "tenant_id": cfg.BannerBSLID,
54 "db_name": dbName(ctx),
55 "entity_id": ctx.RunID,
56 })
57 assert.NilError(t, err, "failed to send message")
58
59 return ctx
60 }).
61 Test("CouchDBDatabase Created", func(ctx f2.Context, t *testing.T) f2.Context {
62 log.Info("CouchDBDatabase Created")
63
64 k := ktest.FromContextT(ctx, t)
65 couchDB := &dsapi.CouchDBDatabase{
66 TypeMeta: metav1.TypeMeta{
67 APIVersion: dsapi.GroupVersion.String(),
68 Kind: "CouchDBDatabase",
69 },
70 ObjectMeta: metav1.ObjectMeta{
71 Name: cushion.K8sDBName(dbName(ctx)),
72 Namespace: cushionNS,
73 },
74 }
75 k.WaitOn(t, k.ObjExists(couchDB))
76
77 return ctx
78 }).
79 Test("Verify Data In CouchDB", func(ctx f2.Context, t *testing.T) f2.Context {
80 log.Info("Verify Data In CouchDB")
81
82 k := ktest.FromContextT(ctx, t)
83
84 db := c.Client.DB(dbName(ctx))
85
86 timeout := time.After(k.Timeout)
87
88 var row *kivik.Document
89 loop:
90 for {
91 select {
92 case <-timeout:
93 log.Info("timeout waiting for DB", "timeout", k.Timeout)
94 break loop
95 default:
96 row = db.Get(ctx, ctx.RunID)
97 if row.Err() == nil {
98 break loop
99 }
100 time.Sleep(k.Tick)
101 }
102 }
103
104 assert.NilError(t, row.Err(), "invalid response from couchdb")
105
106
107 return ctx
108 }).
109 Feature()
110 f.Test(t, feat)
111 }
112
113 func dbName(ctx f2.Context) string {
114 return fmt.Sprintf("db%s", ctx.RunID)
115 }
116
View as plain text