package integration import ( "bytes" "context" "fmt" "os" "strings" "testing" "time" "cloud.google.com/go/pubsub" "github.com/stretchr/testify/assert" "edge-infra.dev/pkg/lib/fog" "edge-infra.dev/pkg/sds/emergencyaccess/cliservice" "edge-infra.dev/pkg/sds/emergencyaccess/eaconst" "edge-infra.dev/pkg/sds/emergencyaccess/msgdata" "edge-infra.dev/pkg/sds/emergencyaccess/msgsvc" "edge-infra.dev/test/f2" "edge-infra.dev/test/f2/x/pstest" ) var ( f f2.Framework defaultProjectID = "project" defaultBannerID = "banner" defaultStoreID = "store" defaultTerminalID = "terminal" receiverTopicIDTemplate = eaconst.DefaultTopTemplate receiverSubIDTemplate = eaconst.DefaultSubTemplate defaultSenderTopicID = "sender-topic" defaultSenderSubID = "sender-sub" defaultCommand = "echo hello world" defaultRespData = ` { "type": "Output", "exitCode": 0, "output": "hello world", "timestamp": "01-01-23 00:00:00", "duration": 1 }` ) func TestMain(m *testing.M) { f = f2.New(context.Background(), f2.WithExtensions(pstest.New(pstest.WithProjectID(defaultProjectID)))). Component("remotecli") os.Exit(f.Run(m)) } func TestPubSub(t *testing.T) { var ( projectID, bannerID, storeID, terminalID = defaultProjectID, defaultBannerID, defaultStoreID, defaultTerminalID senderTopicID, senderSubID = defaultSenderTopicID, defaultSenderSubID receiverTopicID = fillTemplate(receiverTopicIDTemplate, projectID, bannerID, storeID, terminalID) receiverSubID = fillTemplate(receiverSubIDTemplate, projectID, bannerID, storeID, terminalID) gcloudDir string ms *msgsvc.MessageService buf bytes.Buffer cli cliservice.CLIService handlerErrCh, subErrCh = make(chan error), make(chan error) ) feature := f2.NewFeature("PubSub"). Setup("Create sender Topic", pstest.WithNewTopic(senderTopicID)). Setup("Create receiver Topic", pstest.WithNewTopic(receiverTopicID)). Setup("Create sender Sub", pstest.WithNewSubscription(senderSubID, receiverTopicID)). Setup("Create receiver Sub", pstest.WithNewSubscription(receiverSubID, senderTopicID)). Setup("Create Message Service", func(ctx f2.Context, t *testing.T) f2.Context { var err error ms, err = createTestMessageService(ctx, &buf) assert.NoError(t, err) return ctx }). Setup("Create CLI Service", func(ctx f2.Context, _ *testing.T) f2.Context { cli = cliservice.NewCLIService(ctx, ms) // Disable the temporary solution of creating a new subscription per session cli.DisablePerSessionSubscription() return ctx }). Setup("Retrieve Identity Mock Gcloud", func(ctx f2.Context, t *testing.T) f2.Context { var err error gcloudDir, err = mockGcloud(t) assert.NoError(t, err) err = cli.RetrieveIdentity(ctx) assert.NoError(t, err) return ctx }). Test("Successfully receive a published message", func(ctx f2.Context, t *testing.T) f2.Context { // Imports ps := pstest.FromContextT(ctx, t) // Start session err := cli.Connect(ctx, projectID, bannerID, storeID, terminalID) assert.NoError(t, err) // Start subscription on target side respData, respAttr := defaultRespData, createAttributes(bannerID, storeID, terminalID, "") subCtx, cancelFunc := context.WithCancel(ctx) handler := receiverHandler(subCtx, ps, senderTopicID, respData, respAttr, handlerErrCh) go receiverSubscribe(subCtx, ps, senderSubID, handler, subErrCh) // Send command commandID, err := cli.Send(defaultCommand) assert.NoError(t, err) // Assertions assert.Nil(t, <-handlerErrCh) received := <-cli.GetDisplayChannel() assert.NotNil(t, received) expected, err := msgdata.NewCommandResponse([]byte(respData), respAttr) assert.NoError(t, err) assert.Equal(t, received, expected) assert.NotEmpty(t, received.Attributes().ReqMsgID) assert.Equal(t, commandID, received.Attributes().ReqMsgID) // End the session assert.NoError(t, endSession(t, cli, cancelFunc, subErrCh)) return ctx }). Test("Nack response from non-target session", func(ctx f2.Context, t *testing.T) f2.Context { // Imports ps := pstest.FromContextT(ctx, t) // Start session err := cli.Connect(ctx, projectID, bannerID, storeID, terminalID) assert.NoError(t, err) // Start subscription on target side respData := defaultRespData respAttr := createAttributes(bannerID, storeID, "wrong", "") respAttr["sessionId"] = "othersessionID" subCtx, cancelFunc := context.WithCancel(ctx) handler := receiverHandler(subCtx, ps, senderTopicID, respData, respAttr, handlerErrCh) go receiverSubscribe(subCtx, ps, senderSubID, handler, subErrCh) // Send command _, err = cli.Send(defaultCommand) assert.NoError(t, err) //Assertions assert.Nil(t, <-handlerErrCh) assert.Never(t, func() bool { return len(cli.GetDisplayChannel()) > 0 }, 1*time.Second, 100*time.Millisecond) // End the session assert.NoError(t, endSession(t, cli, cancelFunc, subErrCh)) return ctx }). Teardown("Delete Mock Gcloud", func(ctx f2.Context, t *testing.T) f2.Context { // gcloudDir could potentially be nil, but this will not cause any issues err := os.RemoveAll(gcloudDir) assert.NoError(t, err) return ctx }). Feature() f.Test(t, feature) } func mockGcloud(t *testing.T) (dir string, err error) { dir, err = os.MkdirTemp("", "") if err != nil { return "", err } f, err := os.Create(fmt.Sprintf("%s/%s", dir, "gcloud")) if err != nil { return "", err } _, err = f.WriteString("#!/bin/sh\n for var in '$@'; do :; done; echo my-random-gcloud-id@ncr.com") if err != nil { return "", err } err = os.Chmod(f.Name(), 0770) if err != nil { return "", err } f.Close() // Need to close the file in order for it to be executable t.Setenv("PATH", dir+":"+os.Getenv("PATH")) return dir, nil } func fillTemplate(template, projectID, bannerID, storeID, terminalID string) (result string) { result = template result = strings.Replace(result, "", projectID, -1) result = strings.Replace(result, "", bannerID, -1) result = strings.Replace(result, "", storeID, -1) result = strings.Replace(result, "", terminalID, -1) return result } func createTestMessageService(ctx context.Context, b *bytes.Buffer) (*msgsvc.MessageService, error) { log := fog.New(fog.To(b)) subCtx := fog.IntoContext(ctx, log) ms, err := msgsvc.NewMessageService(subCtx) if err != nil { return nil, err } return ms, nil } func createAttributes(bannerID, storeID, terminalID, reqmsgID string) map[string]string { return map[string]string{ "bannerId": bannerID, "storeId": storeID, "terminalId": terminalID, "identity": "identity", "version": "1.0", "signature": "signature", "request-message-uuid": reqmsgID, } } func receiverHandler( subCtx context.Context, ps *pstest.PubSub, topicID, data string, attr map[string]string, errCh chan error, ) func(context.Context, *pubsub.Message) { return func(_ context.Context, m *pubsub.Message) { attr["request-message-uuid"] = m.Attributes["commandId"] if _, ok := attr["sessionId"]; !ok { attr["sessionId"] = m.Attributes["sessionId"] } _, err := ps.Publish(subCtx, topicID, data, attr) errCh <- err m.Ack() } } func receiverSubscribe( subCtx context.Context, ps *pstest.PubSub, subID string, handler func(context.Context, *pubsub.Message), errCh chan error, ) { err := ps.Subscribe(subCtx, subID, handler) errCh <- err } func endSession(t *testing.T, cli cliservice.CLIService, cancelFunc context.CancelFunc, subErrCh chan error) error { cancelFunc() err := cli.End() if err != nil { // It's ok for the session to not exist as it may have been previously cleaned up assert.ErrorContains(t, err, "unknown session ID") } // Ensure that all goroutines are closed before ending test return <-subErrCh }