...

Source file src/edge-infra.dev/test/e2e/datasync/edgetocloud/send_message_test.go

Documentation: edge-infra.dev/test/e2e/datasync/edgetocloud

     1  package edgetocloud
     2  
     3  import (
     4  	"bytes"
     5  	"context"
     6  	"encoding/base64"
     7  	"encoding/json"
     8  	"fmt"
     9  	"io"
    10  	"net/http"
    11  	"testing"
    12  	"time"
    13  
    14  	"cloud.google.com/go/pubsub"
    15  
    16  	"edge-infra.dev/pkg/edge/bsl"
    17  	"edge-infra.dev/pkg/edge/info"
    18  	"edge-infra.dev/test/f2"
    19  	"edge-infra.dev/test/f2/x/ktest"
    20  
    21  	"github.com/google/uuid"
    22  
    23  	"gotest.tools/v3/assert"
    24  	"gotest.tools/v3/assert/cmp"
    25  )
    26  
    27  func TestSendMessage(t *testing.T) {
    28  	var k *ktest.K8s
    29  	var client *pubsub.Client
    30  	var sub *pubsub.Subscription
    31  	portForward := ktest.PortForward{Namespace: messagingNamespace}
    32  	msgID := uuid.NewString() // must be a valid UUID
    33  	feature := f2.NewFeature("Datasync Message E2C").
    34  		Setup("Initial setup", func(ctx f2.Context, t *testing.T) f2.Context {
    35  			k = ktest.FromContextT(ctx, t)
    36  
    37  			ei, err := info.FromClient(ctx, k.Client)
    38  			assert.NilError(t, err, "fail to get edge-info configmap")
    39  
    40  			bslInfo, err := bsl.FromClient(ctx, k.Client)
    41  			assert.NilError(t, err, "fail to get bsl-info configmap")
    42  
    43  			client, err = pubsub.NewClient(ctx, ei.ForemanProjectID)
    44  			assert.NilError(t, err, "fail to create pub/sub client: %s", ei.ForemanProjectID)
    45  
    46  			subIFD := fmt.Sprintf("datasync-e2c-%s", ctx.RunID)
    47  			sub, err = client.CreateSubscription(ctx, subIFD, pubsub.SubscriptionConfig{
    48  				Topic:  client.Topic(pubSubTopic),
    49  				Filter: fmt.Sprintf("attributes.site_id = %q", bslInfo.ID),
    50  			})
    51  			// this is required, otherwise there will be a nil pointer exception
    52  			assert.NilError(t, err, "fail to create subscription: %s in project: %s", subIFD, ei.ForemanProjectID)
    53  			t.Log("Creating subscription", subIFD)
    54  			return ctx
    55  		}).
    56  		Setup("Datasync Messaging PortForwarding", portForward.Forward("data-sync-messaging-leader", 80)).
    57  		Test("Send Message To Chirp", func(ctx f2.Context, t *testing.T) f2.Context {
    58  			data := map[string]string{
    59  				"ID":      msgID,
    60  				"Type":    dataType,
    61  				"Payload": base64.StdEncoding.EncodeToString(tlog),
    62  			}
    63  			body, err := json.Marshal(data)
    64  			assert.NilError(t, err, "fail to marshal message data")
    65  
    66  			addr := portForward.Retrieve(t)
    67  			messagingURL := fmt.Sprintf("http://%s/send-message", addr)
    68  			t.Logf("Sending message to chirp at %s", messagingURL)
    69  
    70  			req, err := http.NewRequest(http.MethodPost, messagingURL, bytes.NewReader(body))
    71  			assert.NilError(t, err, "fail to create http request")
    72  			req.Header.Add("Content-Type", "application/json")
    73  			req.Header.Add("Accept", "application/json")
    74  			resp, err := http.DefaultClient.Do(req)
    75  			assert.NilError(t, err, "fail to send message to chirp")
    76  
    77  			jsonResp := make(map[string]interface{})
    78  			respData, err := io.ReadAll(resp.Body)
    79  			assert.NilError(t, err, "fail to read response body")
    80  			err = json.Unmarshal(respData, &jsonResp)
    81  			assert.NilError(t, err, "fail to unmarshal response body")
    82  			isError, ok := jsonResp["IsError"]
    83  			assert.Check(t, cmp.Equal(ok, true), "invalid response: %v", jsonResp)
    84  			if ok {
    85  				assert.Check(t, cmp.Equal(isError.(bool), false), "error response: %v", jsonResp)
    86  			}
    87  			return ctx
    88  		}).
    89  		Test("Verify Message In Pub/Sub", func(ctx f2.Context, t *testing.T) f2.Context {
    90  			ch := make(chan *pubsub.Message)
    91  			go func() {
    92  				_ = sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
    93  					t.Log("Received pub/sub message:", msg.Attributes["id"])
    94  					ch <- msg
    95  				})
    96  			}()
    97  		outer:
    98  			for timeout := time.After(k.Timeout); ; {
    99  				select {
   100  				case msg := <-ch:
   101  					t.Log("Received pub/sub message:", msg.Attributes)
   102  					if msg.Attributes["id"] != msgID {
   103  						continue
   104  					}
   105  					assert.Equal(t, dataType, msg.Attributes["type"], "message type mismatch")
   106  					assert.Equal(t, string(tlog), string(msg.Data), "message data mismatch")
   107  					break outer
   108  				case <-timeout:
   109  					t.Errorf("Timeout waiting for pub/sub message Id: %s", msgID)
   110  					break outer
   111  				}
   112  			}
   113  			return ctx
   114  		}).
   115  		Teardown("Tear down", func(ctx f2.Context, t *testing.T) f2.Context {
   116  			assert.NilError(t, sub.Delete(ctx), "fail to delete subscription")
   117  			assert.NilError(t, client.Close(), "fail to close pub/sub client")
   118  			return ctx
   119  		}).
   120  		Feature()
   121  	f.Test(t, feature)
   122  }
   123  

View as plain text