1 package integration
2
3 import (
4 "context"
5 _ "embed"
6 "fmt"
7 "os"
8 "strings"
9 "testing"
10 "time"
11
12 "cloud.google.com/go/pubsub"
13
14 "edge-infra.dev/pkg/edge/chariot"
15 dsapi "edge-infra.dev/pkg/edge/datasync/apis/v1alpha1"
16 "edge-infra.dev/pkg/edge/datasync/couchdb"
17 "edge-infra.dev/pkg/edge/datasync/cushion"
18 "edge-infra.dev/pkg/lib/fog"
19 epubsub "edge-infra.dev/pkg/lib/gcp/pubsub"
20 "edge-infra.dev/test/f2"
21 "edge-infra.dev/test/f2/integration"
22 "edge-infra.dev/test/f2/x/ktest"
23
24 kouchdb "github.com/go-kivik/kivik/v4/couchdb"
25 "github.com/go-logr/logr"
26 "github.com/google/uuid"
27 "k8s.io/apimachinery/pkg/runtime"
28 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
29 clientgoscheme "k8s.io/client-go/kubernetes/scheme"
30 "sigs.k8s.io/controller-runtime/pkg/client"
31 )
32
33 const cushionNS = "cushion"
34
35 var (
36 log logr.Logger
37 f f2.Framework
38 _ chariot.IPubSubReceiver = (*PubSubMock)(nil)
39 _ epubsub.Publisher = (*PubSubMock)(nil)
40
41 cfg *cushion.Config
42 cl client.WithWatch
43 c *couchdb.CouchDB
44 opt []cushion.Option
45
46 psm *PubSubMock
47 publisher epubsub.Publisher
48
49
50 tlog []byte
51 )
52
53 func TestMain(m *testing.M) {
54 log = fog.New().WithName("cushion_integration_test")
55
56 f = f2.New(
57 context.Background(),
58 f2.WithExtensions(
59 ktest.New(
60 ktest.WithScheme(createScheme()),
61 ),
62 ),
63 )
64
65 f.Setup(func(ctx f2.Context) (f2.Context, error) {
66 k, err := ktest.FromContext(ctx)
67 if err != nil {
68 log.Error(err, "fail to get k8s from context")
69 return ctx, err
70 }
71
72 log.Info("Initial setup done")
73
74 cl, err = client.NewWithWatch(k.Env.Config, client.Options{Scheme: createScheme()})
75 if err != nil {
76 log.Error(err, "fail to create client")
77 return ctx, err
78 }
79
80 if integration.IsL1() {
81 err = mockCushionDaemonOptions()
82 } else {
83 err = buildCushionDaemonOptions(ctx)
84 }
85
86 if err != nil {
87 log.Error(err, "fail to get cushion daemon options", "isL1", integration.IsL1())
88 return ctx, err
89 }
90
91 return ctx, nil
92 }).Teardown(func(ctx f2.Context) (f2.Context, error) {
93 if psm != nil {
94 psm.Close()
95 }
96 return ctx, nil
97 })
98
99 os.Exit(f.Run(m))
100 }
101
102 type PubSubMock struct{ msg chan chariot.IPubSubMessage }
103
104 func NewPubSubReceiver() *PubSubMock {
105 return &PubSubMock{
106 msg: make(chan chariot.IPubSubMessage),
107 }
108 }
109
110 func (l PubSubMock) Receive(ctx context.Context, f func(context.Context, chariot.IPubSubMessage)) error {
111 for message := range l.msg {
112 f(ctx, message)
113 }
114 return nil
115 }
116
117
118 func (l PubSubMock) Send(_ context.Context, _ string, message []byte, attributes map[string]string) error {
119 l.msg <- chariot.NewPubSubMessageFromMessage(&pubsub.Message{
120 ID: attributes["entity_id"],
121 Data: message,
122 Attributes: attributes,
123 })
124 return nil
125 }
126
127 func (l PubSubMock) Close() {
128 close(l.msg)
129 }
130
131 func mockCushionDaemonOptions() error {
132 couchServer := NewMockCouchDBServer()
133 cfg = mockConfig(couchServer.Server.URL)
134 psm = NewPubSubReceiver()
135 publisher = psm
136 c = &couchdb.CouchDB{}
137 err := c.NewFromURL(cfg.CouchUsername, cfg.CouchPassword, cfg.CouchURL, kouchdb.OptionNoRequestCompression())
138 if err != nil {
139 return err
140 }
141
142 opt = []cushion.Option{
143 cushion.OptionLogger(log),
144 cushion.OptionPubSubReceiver(psm),
145 cushion.OptionKubeClient(cl),
146 cushion.OptionCouchDBStorage(c.Client),
147 cushion.OptionDatabaseStatus(cushion.DatabaseStatusFunc(
148 func(_ context.Context, k8sDB *dsapi.CouchDBDatabase, _ time.Duration) error {
149 log.Info("database status called", "k8sDB", k8sDB.Namespace, "db", k8sDB.Spec.Name)
150 couchServer.MockDB(k8sDB.Spec.Name)
151 return c.CreateDB(context.Background(), k8sDB.Spec.Name)
152 })),
153 }
154 return nil
155 }
156
157 func buildCushionDaemonOptions(ctx context.Context) error {
158 var err error
159 cfg, err = cushion.NewConfig()
160 if err != nil {
161 log.Error(err, "fail to create new config")
162 return err
163 }
164 opt, err = cushion.DaemonOptions(cfg, log)
165 if err != nil {
166 log.Error(err, "fail to create daemon options")
167 return err
168 }
169 publisher, err = epubsub.New(ctx, cfg.ProjectID)
170 if err != nil {
171 log.Error(err, "fail to create new pubsub sender")
172 return err
173 }
174 c = &couchdb.CouchDB{}
175 couchURL := fmt.Sprintf("http://%s:%s", cfg.CouchURL, cfg.CouchPort)
176 return c.NewFromURL(cfg.CouchUsername, cfg.CouchPassword, couchURL)
177 }
178
179 func createScheme() *runtime.Scheme {
180 scheme := runtime.NewScheme()
181 utilruntime.Must(clientgoscheme.AddToScheme(scheme))
182 utilruntime.Must(dsapi.AddToScheme(scheme))
183 return scheme
184 }
185
186 func mockConfig(url string) *cushion.Config {
187 uid := uuid.NewString()
188 id := strings.ReplaceAll(uid, "-", "")
189 return &cushion.Config{
190 Port: 8080,
191 BannerBSLID: id,
192 BannerEdgeID: uid,
193 ProjectID: "ret-edge-test-foreman",
194 Topic: "data-sync-c2e",
195 Subscription: id,
196 MaxMessages: 10,
197 CouchURL: url,
198 CouchPort: "5984",
199 CouchUsername: cushionNS,
200 CouchPassword: "randompassword",
201 BulkSize: 1,
202 MaxWaitInterval: 1 * time.Second,
203 }
204 }
205
View as plain text