package integration import ( "fmt" "testing" "time" dsapi "edge-infra.dev/pkg/edge/datasync/apis/v1alpha1" "edge-infra.dev/pkg/edge/datasync/cushion" "edge-infra.dev/test/f2" "edge-infra.dev/test/f2/x/ktest" "github.com/go-kivik/kivik/v4" "gotest.tools/v3/assert" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/client" ) func TestPubSubReceiverHandle(t *testing.T) { var daemon *cushion.Daemon feat := f2.NewFeature("Daemon Test"). Setup("Daemon Setup", func(ctx f2.Context, t *testing.T) f2.Context { log.Info("Daemon Setup") err := cl.Create(ctx, &corev1.Namespace{ TypeMeta: metav1.TypeMeta{APIVersion: "v1", Kind: "Namespace"}, ObjectMeta: metav1.ObjectMeta{Name: cushionNS}, }) assert.NilError(t, client.IgnoreAlreadyExists(err), "fail to create daemon") daemon, err = cushion.NewDaemon(cfg, opt...) assert.NilError(t, err, "fail to create daemon") return ctx }). Test("Verify PubSubReceiverHandle", func(ctx f2.Context, t *testing.T) f2.Context { log.Info("Verifying PubSubReceiverHandle") go func() { err := daemon.Run(ctx) assert.NilError(t, err, "fail to run daemon") }() return ctx }). Test("Send Datasync PubSub Message", func(ctx f2.Context, t *testing.T) f2.Context { log.Info("Send PubSub Message") err := publisher.Send(ctx, cfg.Topic, tlog, map[string]string{ "tenant_id": cfg.BannerBSLID, "db_name": dbName(ctx), "entity_id": ctx.RunID, }) assert.NilError(t, err, "failed to send message") return ctx }). Test("CouchDBDatabase Created", func(ctx f2.Context, t *testing.T) f2.Context { log.Info("CouchDBDatabase Created") k := ktest.FromContextT(ctx, t) couchDB := &dsapi.CouchDBDatabase{ TypeMeta: metav1.TypeMeta{ APIVersion: dsapi.GroupVersion.String(), Kind: "CouchDBDatabase", }, ObjectMeta: metav1.ObjectMeta{ Name: cushion.K8sDBName(dbName(ctx)), Namespace: cushionNS, }, } k.WaitOn(t, k.ObjExists(couchDB)) return ctx }). Test("Verify Data In CouchDB", func(ctx f2.Context, t *testing.T) f2.Context { log.Info("Verify Data In CouchDB") k := ktest.FromContextT(ctx, t) db := c.Client.DB(dbName(ctx)) timeout := time.After(k.Timeout) var row *kivik.Document loop: for { select { case <-timeout: log.Info("timeout waiting for DB", "timeout", k.Timeout) break loop default: row = db.Get(ctx, ctx.RunID) if row.Err() == nil { break loop } time.Sleep(k.Tick) } } assert.NilError(t, row.Err(), "invalid response from couchdb") // todo compare doc with updated ID return ctx }). Feature() f.Test(t, feat) } func dbName(ctx f2.Context) string { return fmt.Sprintf("db%s", ctx.RunID) }