package cloudtoedge import ( "encoding/base64" "fmt" "net/http" "strings" "testing" dsapi "edge-infra.dev/pkg/edge/datasync/apis/v1alpha1" "edge-infra.dev/pkg/edge/datasync/couchdb" "edge-infra.dev/pkg/k8s/testing/kmp" "edge-infra.dev/test/f2" "edge-infra.dev/test/f2/x/ktest" "cloud.google.com/go/pubsub" "gotest.tools/v3/assert" "gotest.tools/v3/assert/cmp" "gotest.tools/v3/poll" corev1 "k8s.io/api/core/v1" "sigs.k8s.io/controller-runtime/pkg/client" ) func TestSendMessage(t *testing.T) { feature := f2.NewFeature("Datasync Message E2C"). Test("Publish Message To Datasync", func(ctx f2.Context, t *testing.T) f2.Context { result := pubSubClient.Topic(pubSubTopic).Publish(ctx, &pubsub.Message{ Attributes: map[string]string{ "tenant_id": bslInfo.OrganizationID, "db_name": dbName, "entity_id": ctx.RunID, "entity_type": "json", }, Data: tlog, }) msgID, err := result.Get(ctx) assert.NilError(t, err, "fail to send C2E message to pub/sub") t.Log("Sent pub/sub message Id:", msgID) return ctx }). Test("Verify Message In CouchDB Master", func(ctx f2.Context, t *testing.T) f2.Context { k := ktest.FromContextT(ctx, t) secret := corev1.Secret{} key := client.ObjectKey{Namespace: "couchctl", Name: couchdb.StoreReplicationSecretName} err := k.Client.Get(ctx, key, &secret) assert.NilError(t, err, "fail to get couchdb master secret") req, err := couchDBRequest(secret, string(secret.Data["uri"]), dbName, ctx.RunID) assert.NilError(t, err, "fail to create couchdb request") var resp *http.Response poll.WaitOn(t, func(_ poll.LogT) poll.Result { resp, err = http.DefaultClient.Do(req) if err == nil && resp.StatusCode == 200 { return poll.Success() } return poll.Continue("fail to get doc from couchdb master: %s", errMessage(resp, req, err)) }, poll.WithDelay(k.Tick), poll.WithTimeout(k.Timeout*timeoutMultiplier)) return ctx }). Test("Verify CouchDBServer in ready state", func(ctx f2.Context, t *testing.T) f2.Context { k := ktest.FromContextT(ctx, t) servers := dsapi.CouchDBServerList{} err := k.Client.List(ctx, &servers, client.InNamespace(couchNamespace)) assert.NilError(t, err, "fail to list couchdb servers") assert.Check(t, cmp.Len(servers.Items, len(nodes.Items)), "number of couchdb servers not equal to number of nodes") for _, server := range servers.Items { server := server k.WaitOn(t, k.Check(&server, kmp.IsReady())) } return ctx }). Test("Verify CouchDBReplicationSet in ready state", func(ctx f2.Context, t *testing.T) f2.Context { k := ktest.FromContextT(ctx, t) replications := dsapi.CouchDBReplicationSetList{} err := k.Client.List(ctx, &replications, client.InNamespace(couchNamespace)) assert.NilError(t, err, "fail to list couchdb replications") assert.Check(t, cmp.Len(replications.Items, len(nodes.Items)), "number of couchdb replications not equal to number of nodes") for _, replication := range replications.Items { repl := replication k.WaitOn(t, k.Check(&repl, kmp.IsReady())) } return ctx }). Test("Verify Message In CouchDB Store", func(ctx f2.Context, t *testing.T) f2.Context { k := ktest.FromContextT(ctx, t) secret := corev1.Secret{} key := client.ObjectKey{Namespace: couchNamespace, Name: couchdb.StoreSecretName} err := k.Client.Get(ctx, key, &secret) assert.NilError(t, err, "fail to get couchdb store secret") servers := dsapi.CouchDBServerList{} assert.NilError(t, k.Client.List(ctx, &servers), "fail to list couchdb servers") for _, server := range servers.Items { pod := strings.Split(server.Spec.URI, ".")[0] portForward := portMapping[pod] if portForward == nil { t.Errorf("portForward is nil for pod %s", pod) continue } addr := portForward.Retrieve(t) req, err := couchDBRequest(secret, fmt.Sprintf("http://%s", addr), dbName, ctx.RunID) assert.NilError(t, err, "fail to create couchdb request") var resp *http.Response poll.WaitOn(t, func(_ poll.LogT) poll.Result { resp, err = http.DefaultClient.Do(req) if err == nil && resp.StatusCode == 200 { return poll.Success() } return poll.Continue("fail to get doc from couchdb Store: %s", errMessage(resp, req, err)) }, poll.WithDelay(k.Tick), poll.WithTimeout(k.Timeout*timeoutMultiplier)) } return ctx }). Feature() f.Test(t, feature) } func couchDBRequest(s corev1.Secret, url, db, docID string) (*http.Request, error) { docURL := fmt.Sprintf("%s/%s/%s", url, db, docID) req, err := http.NewRequest(http.MethodGet, docURL, nil) if err != nil { return nil, err } req.Header.Add("Content-Type", "application/json") req.Header.Add("Accept", "application/json") auth := fmt.Sprintf("%s:%s", string(s.Data["username"]), string(s.Data["password"])) token := base64.StdEncoding.EncodeToString([]byte(auth)) req.Header.Add("Authorization", fmt.Sprintf("Basic %s", token)) return req, err } func errMessage(resp *http.Response, req *http.Request, err error) string { b := strings.Builder{} if err != nil { b.WriteString(fmt.Sprintf("error: %s", err)) } if resp != nil { b.WriteString(fmt.Sprintf("status code: %d", resp.StatusCode)) b.WriteString(fmt.Sprintf("status: %s", resp.Status)) } if req != nil { b.WriteString(fmt.Sprintf("URL: %s", req.URL.String())) } return b.String() }