package integration import ( "context" _ "embed" "fmt" "os" "strings" "testing" "time" "cloud.google.com/go/pubsub" "edge-infra.dev/pkg/edge/chariot" dsapi "edge-infra.dev/pkg/edge/datasync/apis/v1alpha1" "edge-infra.dev/pkg/edge/datasync/couchdb" "edge-infra.dev/pkg/edge/datasync/cushion" "edge-infra.dev/pkg/lib/fog" epubsub "edge-infra.dev/pkg/lib/gcp/pubsub" "edge-infra.dev/test/f2" "edge-infra.dev/test/f2/integration" "edge-infra.dev/test/f2/x/ktest" kouchdb "github.com/go-kivik/kivik/v4/couchdb" "github.com/go-logr/logr" "github.com/google/uuid" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" "sigs.k8s.io/controller-runtime/pkg/client" ) const cushionNS = "cushion" var ( log logr.Logger f f2.Framework _ chariot.IPubSubReceiver = (*PubSubMock)(nil) _ epubsub.Publisher = (*PubSubMock)(nil) cfg *cushion.Config cl client.WithWatch c *couchdb.CouchDB opt []cushion.Option psm *PubSubMock publisher epubsub.Publisher //go:embed testdata/tlog.json tlog []byte ) func TestMain(m *testing.M) { log = fog.New().WithName("cushion_integration_test") f = f2.New( context.Background(), f2.WithExtensions( ktest.New( ktest.WithScheme(createScheme()), ), ), ) f.Setup(func(ctx f2.Context) (f2.Context, error) { k, err := ktest.FromContext(ctx) if err != nil { log.Error(err, "fail to get k8s from context") return ctx, err } log.Info("Initial setup done") cl, err = client.NewWithWatch(k.Env.Config, client.Options{Scheme: createScheme()}) if err != nil { log.Error(err, "fail to create client") return ctx, err } if integration.IsL1() { err = mockCushionDaemonOptions() } else { err = buildCushionDaemonOptions(ctx) } if err != nil { log.Error(err, "fail to get cushion daemon options", "isL1", integration.IsL1()) return ctx, err } return ctx, nil }).Teardown(func(ctx f2.Context) (f2.Context, error) { if psm != nil { psm.Close() } return ctx, nil }) os.Exit(f.Run(m)) } type PubSubMock struct{ msg chan chariot.IPubSubMessage } func NewPubSubReceiver() *PubSubMock { return &PubSubMock{ msg: make(chan chariot.IPubSubMessage), } } func (l PubSubMock) Receive(ctx context.Context, f func(context.Context, chariot.IPubSubMessage)) error { for message := range l.msg { f(ctx, message) } return nil } // Send pub/sub message func (l PubSubMock) Send(_ context.Context, _ string, message []byte, attributes map[string]string) error { l.msg <- chariot.NewPubSubMessageFromMessage(&pubsub.Message{ ID: attributes["entity_id"], Data: message, Attributes: attributes, }) return nil } func (l PubSubMock) Close() { close(l.msg) } func mockCushionDaemonOptions() error { couchServer := NewMockCouchDBServer() cfg = mockConfig(couchServer.Server.URL) psm = NewPubSubReceiver() publisher = psm c = &couchdb.CouchDB{} err := c.NewFromURL(cfg.CouchUsername, cfg.CouchPassword, cfg.CouchURL, kouchdb.OptionNoRequestCompression()) if err != nil { return err } opt = []cushion.Option{ cushion.OptionLogger(log), cushion.OptionPubSubReceiver(psm), cushion.OptionKubeClient(cl), cushion.OptionCouchDBStorage(c.Client), cushion.OptionDatabaseStatus(cushion.DatabaseStatusFunc( func(_ context.Context, k8sDB *dsapi.CouchDBDatabase, _ time.Duration) error { log.Info("database status called", "k8sDB", k8sDB.Namespace, "db", k8sDB.Spec.Name) couchServer.MockDB(k8sDB.Spec.Name) return c.CreateDB(context.Background(), k8sDB.Spec.Name) })), } return nil } func buildCushionDaemonOptions(ctx context.Context) error { var err error cfg, err = cushion.NewConfig() if err != nil { log.Error(err, "fail to create new config") return err } opt, err = cushion.DaemonOptions(cfg, log) if err != nil { log.Error(err, "fail to create daemon options") return err } publisher, err = epubsub.New(ctx, cfg.ProjectID) if err != nil { log.Error(err, "fail to create new pubsub sender") return err } c = &couchdb.CouchDB{} couchURL := fmt.Sprintf("http://%s:%s", cfg.CouchURL, cfg.CouchPort) return c.NewFromURL(cfg.CouchUsername, cfg.CouchPassword, couchURL) } func createScheme() *runtime.Scheme { scheme := runtime.NewScheme() utilruntime.Must(clientgoscheme.AddToScheme(scheme)) utilruntime.Must(dsapi.AddToScheme(scheme)) return scheme } func mockConfig(url string) *cushion.Config { uid := uuid.NewString() id := strings.ReplaceAll(uid, "-", "") return &cushion.Config{ Port: 8080, BannerBSLID: id, BannerEdgeID: uid, ProjectID: "ret-edge-test-foreman", Topic: "data-sync-c2e", Subscription: id, MaxMessages: 10, CouchURL: url, CouchPort: "5984", CouchUsername: cushionNS, CouchPassword: "randompassword", BulkSize: 1, MaxWaitInterval: 1 * time.Second, } }