package edgetocloud import ( "bytes" "context" "encoding/base64" "encoding/json" "fmt" "io" "net/http" "testing" "time" "cloud.google.com/go/pubsub" "edge-infra.dev/pkg/edge/bsl" "edge-infra.dev/pkg/edge/info" "edge-infra.dev/test/f2" "edge-infra.dev/test/f2/x/ktest" "github.com/google/uuid" "gotest.tools/v3/assert" "gotest.tools/v3/assert/cmp" ) func TestSendMessage(t *testing.T) { var k *ktest.K8s var client *pubsub.Client var sub *pubsub.Subscription portForward := ktest.PortForward{Namespace: messagingNamespace} msgID := uuid.NewString() // must be a valid UUID feature := f2.NewFeature("Datasync Message E2C"). Setup("Initial setup", func(ctx f2.Context, t *testing.T) f2.Context { k = ktest.FromContextT(ctx, t) ei, err := info.FromClient(ctx, k.Client) assert.NilError(t, err, "fail to get edge-info configmap") bslInfo, err := bsl.FromClient(ctx, k.Client) assert.NilError(t, err, "fail to get bsl-info configmap") client, err = pubsub.NewClient(ctx, ei.ForemanProjectID) assert.NilError(t, err, "fail to create pub/sub client: %s", ei.ForemanProjectID) subIFD := fmt.Sprintf("datasync-e2c-%s", ctx.RunID) sub, err = client.CreateSubscription(ctx, subIFD, pubsub.SubscriptionConfig{ Topic: client.Topic(pubSubTopic), Filter: fmt.Sprintf("attributes.site_id = %q", bslInfo.ID), }) // this is required, otherwise there will be a nil pointer exception assert.NilError(t, err, "fail to create subscription: %s in project: %s", subIFD, ei.ForemanProjectID) t.Log("Creating subscription", subIFD) return ctx }). Setup("Datasync Messaging PortForwarding", portForward.Forward("data-sync-messaging-leader", 80)). Test("Send Message To Chirp", func(ctx f2.Context, t *testing.T) f2.Context { data := map[string]string{ "ID": msgID, "Type": dataType, "Payload": base64.StdEncoding.EncodeToString(tlog), } body, err := json.Marshal(data) assert.NilError(t, err, "fail to marshal message data") addr := portForward.Retrieve(t) messagingURL := fmt.Sprintf("http://%s/send-message", addr) t.Logf("Sending message to chirp at %s", messagingURL) req, err := http.NewRequest(http.MethodPost, messagingURL, bytes.NewReader(body)) assert.NilError(t, err, "fail to create http request") req.Header.Add("Content-Type", "application/json") req.Header.Add("Accept", "application/json") resp, err := http.DefaultClient.Do(req) assert.NilError(t, err, "fail to send message to chirp") jsonResp := make(map[string]interface{}) respData, err := io.ReadAll(resp.Body) assert.NilError(t, err, "fail to read response body") err = json.Unmarshal(respData, &jsonResp) assert.NilError(t, err, "fail to unmarshal response body") isError, ok := jsonResp["IsError"] assert.Check(t, cmp.Equal(ok, true), "invalid response: %v", jsonResp) if ok { assert.Check(t, cmp.Equal(isError.(bool), false), "error response: %v", jsonResp) } return ctx }). Test("Verify Message In Pub/Sub", func(ctx f2.Context, t *testing.T) f2.Context { ch := make(chan *pubsub.Message) go func() { _ = sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) { t.Log("Received pub/sub message:", msg.Attributes["id"]) ch <- msg }) }() outer: for timeout := time.After(k.Timeout); ; { select { case msg := <-ch: t.Log("Received pub/sub message:", msg.Attributes) if msg.Attributes["id"] != msgID { continue } assert.Equal(t, dataType, msg.Attributes["type"], "message type mismatch") assert.Equal(t, string(tlog), string(msg.Data), "message data mismatch") break outer case <-timeout: t.Errorf("Timeout waiting for pub/sub message Id: %s", msgID) break outer } } return ctx }). Teardown("Tear down", func(ctx f2.Context, t *testing.T) f2.Context { assert.NilError(t, sub.Delete(ctx), "fail to delete subscription") assert.NilError(t, client.Close(), "fail to close pub/sub client") return ctx }). Feature() f.Test(t, feature) }