1 package edgetocloud
2
3 import (
4 "bytes"
5 "context"
6 "encoding/base64"
7 "encoding/json"
8 "fmt"
9 "io"
10 "net/http"
11 "testing"
12 "time"
13
14 "cloud.google.com/go/pubsub"
15
16 "edge-infra.dev/pkg/edge/bsl"
17 "edge-infra.dev/pkg/edge/info"
18 "edge-infra.dev/test/f2"
19 "edge-infra.dev/test/f2/x/ktest"
20
21 "github.com/google/uuid"
22
23 "gotest.tools/v3/assert"
24 "gotest.tools/v3/assert/cmp"
25 )
26
27 func TestSendMessage(t *testing.T) {
28 var k *ktest.K8s
29 var client *pubsub.Client
30 var sub *pubsub.Subscription
31 portForward := ktest.PortForward{Namespace: messagingNamespace}
32 msgID := uuid.NewString()
33 feature := f2.NewFeature("Datasync Message E2C").
34 Setup("Initial setup", func(ctx f2.Context, t *testing.T) f2.Context {
35 k = ktest.FromContextT(ctx, t)
36
37 ei, err := info.FromClient(ctx, k.Client)
38 assert.NilError(t, err, "fail to get edge-info configmap")
39
40 bslInfo, err := bsl.FromClient(ctx, k.Client)
41 assert.NilError(t, err, "fail to get bsl-info configmap")
42
43 client, err = pubsub.NewClient(ctx, ei.ForemanProjectID)
44 assert.NilError(t, err, "fail to create pub/sub client: %s", ei.ForemanProjectID)
45
46 subIFD := fmt.Sprintf("datasync-e2c-%s", ctx.RunID)
47 sub, err = client.CreateSubscription(ctx, subIFD, pubsub.SubscriptionConfig{
48 Topic: client.Topic(pubSubTopic),
49 Filter: fmt.Sprintf("attributes.site_id = %q", bslInfo.ID),
50 })
51
52 assert.NilError(t, err, "fail to create subscription: %s in project: %s", subIFD, ei.ForemanProjectID)
53 t.Log("Creating subscription", subIFD)
54 return ctx
55 }).
56 Setup("Datasync Messaging PortForwarding", portForward.Forward("data-sync-messaging-leader", 80)).
57 Test("Send Message To Chirp", func(ctx f2.Context, t *testing.T) f2.Context {
58 data := map[string]string{
59 "ID": msgID,
60 "Type": dataType,
61 "Payload": base64.StdEncoding.EncodeToString(tlog),
62 }
63 body, err := json.Marshal(data)
64 assert.NilError(t, err, "fail to marshal message data")
65
66 addr := portForward.Retrieve(t)
67 messagingURL := fmt.Sprintf("http://%s/send-message", addr)
68 t.Logf("Sending message to chirp at %s", messagingURL)
69
70 req, err := http.NewRequest(http.MethodPost, messagingURL, bytes.NewReader(body))
71 assert.NilError(t, err, "fail to create http request")
72 req.Header.Add("Content-Type", "application/json")
73 req.Header.Add("Accept", "application/json")
74 resp, err := http.DefaultClient.Do(req)
75 assert.NilError(t, err, "fail to send message to chirp")
76
77 jsonResp := make(map[string]interface{})
78 respData, err := io.ReadAll(resp.Body)
79 assert.NilError(t, err, "fail to read response body")
80 err = json.Unmarshal(respData, &jsonResp)
81 assert.NilError(t, err, "fail to unmarshal response body")
82 isError, ok := jsonResp["IsError"]
83 assert.Check(t, cmp.Equal(ok, true), "invalid response: %v", jsonResp)
84 if ok {
85 assert.Check(t, cmp.Equal(isError.(bool), false), "error response: %v", jsonResp)
86 }
87 return ctx
88 }).
89 Test("Verify Message In Pub/Sub", func(ctx f2.Context, t *testing.T) f2.Context {
90 ch := make(chan *pubsub.Message)
91 go func() {
92 _ = sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
93 t.Log("Received pub/sub message:", msg.Attributes["id"])
94 ch <- msg
95 })
96 }()
97 outer:
98 for timeout := time.After(k.Timeout); ; {
99 select {
100 case msg := <-ch:
101 t.Log("Received pub/sub message:", msg.Attributes)
102 if msg.Attributes["id"] != msgID {
103 continue
104 }
105 assert.Equal(t, dataType, msg.Attributes["type"], "message type mismatch")
106 assert.Equal(t, string(tlog), string(msg.Data), "message data mismatch")
107 break outer
108 case <-timeout:
109 t.Errorf("Timeout waiting for pub/sub message Id: %s", msgID)
110 break outer
111 }
112 }
113 return ctx
114 }).
115 Teardown("Tear down", func(ctx f2.Context, t *testing.T) f2.Context {
116 assert.NilError(t, sub.Delete(ctx), "fail to delete subscription")
117 assert.NilError(t, client.Close(), "fail to close pub/sub client")
118 return ctx
119 }).
120 Feature()
121 f.Test(t, feature)
122 }
123
View as plain text