...

Source file src/edge-infra.dev/pkg/sds/emergencyaccess/ea_integration/v1/ea_v1_integration_test.go

Documentation: edge-infra.dev/pkg/sds/emergencyaccess/ea_integration/v1

     1  package integration
     2  
     3  import (
     4  	"bytes"
     5  	"context"
     6  	"fmt"
     7  	"os"
     8  	"strings"
     9  	"testing"
    10  	"time"
    11  
    12  	"cloud.google.com/go/pubsub"
    13  	"github.com/stretchr/testify/assert"
    14  
    15  	"edge-infra.dev/pkg/lib/fog"
    16  	"edge-infra.dev/pkg/sds/emergencyaccess/cliservice"
    17  	"edge-infra.dev/pkg/sds/emergencyaccess/eaconst"
    18  	"edge-infra.dev/pkg/sds/emergencyaccess/msgdata"
    19  	"edge-infra.dev/pkg/sds/emergencyaccess/msgsvc"
    20  	"edge-infra.dev/test/f2"
    21  	"edge-infra.dev/test/f2/x/pstest"
    22  )
    23  
    24  var (
    25  	f f2.Framework
    26  
    27  	defaultProjectID  = "project"
    28  	defaultBannerID   = "banner"
    29  	defaultStoreID    = "store"
    30  	defaultTerminalID = "terminal"
    31  
    32  	receiverTopicIDTemplate = eaconst.DefaultTopTemplate
    33  	receiverSubIDTemplate   = eaconst.DefaultSubTemplate
    34  	defaultSenderTopicID    = "sender-topic"
    35  	defaultSenderSubID      = "sender-sub"
    36  
    37  	defaultCommand  = "echo hello world"
    38  	defaultRespData = `
    39  {
    40  	"type": "Output",
    41  	"exitCode": 0,
    42  	"output": "hello world",
    43  	"timestamp": "01-01-23 00:00:00",
    44  	"duration": 1
    45  }`
    46  )
    47  
    48  func TestMain(m *testing.M) {
    49  	f = f2.New(context.Background(), f2.WithExtensions(pstest.New(pstest.WithProjectID(defaultProjectID)))).
    50  		Component("remotecli")
    51  	os.Exit(f.Run(m))
    52  }
    53  
    54  func TestPubSub(t *testing.T) {
    55  	var (
    56  		projectID, bannerID, storeID, terminalID = defaultProjectID, defaultBannerID, defaultStoreID, defaultTerminalID
    57  
    58  		senderTopicID, senderSubID = defaultSenderTopicID, defaultSenderSubID
    59  
    60  		receiverTopicID = fillTemplate(receiverTopicIDTemplate, projectID, bannerID, storeID, terminalID)
    61  		receiverSubID   = fillTemplate(receiverSubIDTemplate, projectID, bannerID, storeID, terminalID)
    62  
    63  		gcloudDir string
    64  
    65  		ms  *msgsvc.MessageService
    66  		buf bytes.Buffer
    67  		cli cliservice.CLIService
    68  
    69  		handlerErrCh, subErrCh = make(chan error), make(chan error)
    70  	)
    71  	feature := f2.NewFeature("PubSub").
    72  		Setup("Create sender Topic", pstest.WithNewTopic(senderTopicID)).
    73  		Setup("Create receiver Topic", pstest.WithNewTopic(receiverTopicID)).
    74  		Setup("Create sender Sub", pstest.WithNewSubscription(senderSubID, receiverTopicID)).
    75  		Setup("Create receiver Sub", pstest.WithNewSubscription(receiverSubID, senderTopicID)).
    76  		Setup("Create Message Service", func(ctx f2.Context, t *testing.T) f2.Context {
    77  			var err error
    78  			ms, err = createTestMessageService(ctx, &buf)
    79  			assert.NoError(t, err)
    80  			return ctx
    81  		}).
    82  		Setup("Create CLI Service", func(ctx f2.Context, _ *testing.T) f2.Context {
    83  			cli = cliservice.NewCLIService(ctx, ms)
    84  			// Disable the temporary solution of creating a new subscription per session
    85  			cli.DisablePerSessionSubscription()
    86  			return ctx
    87  		}).
    88  		Setup("Retrieve Identity Mock Gcloud", func(ctx f2.Context, t *testing.T) f2.Context {
    89  			var err error
    90  			gcloudDir, err = mockGcloud(t)
    91  			assert.NoError(t, err)
    92  
    93  			err = cli.RetrieveIdentity(ctx)
    94  			assert.NoError(t, err)
    95  			return ctx
    96  		}).
    97  		Test("Successfully receive a published message", func(ctx f2.Context, t *testing.T) f2.Context {
    98  			// Imports
    99  			ps := pstest.FromContextT(ctx, t)
   100  
   101  			// Start session
   102  			err := cli.Connect(ctx, projectID, bannerID, storeID, terminalID)
   103  			assert.NoError(t, err)
   104  
   105  			// Start subscription on target side
   106  			respData, respAttr := defaultRespData, createAttributes(bannerID, storeID, terminalID, "")
   107  			subCtx, cancelFunc := context.WithCancel(ctx)
   108  			handler := receiverHandler(subCtx, ps, senderTopicID, respData, respAttr, handlerErrCh)
   109  			go receiverSubscribe(subCtx, ps, senderSubID, handler, subErrCh)
   110  
   111  			// Send command
   112  			commandID, err := cli.Send(defaultCommand)
   113  			assert.NoError(t, err)
   114  
   115  			// Assertions
   116  			assert.Nil(t, <-handlerErrCh)
   117  
   118  			received := <-cli.GetDisplayChannel()
   119  			assert.NotNil(t, received)
   120  
   121  			expected, err := msgdata.NewCommandResponse([]byte(respData), respAttr)
   122  			assert.NoError(t, err)
   123  			assert.Equal(t, received, expected)
   124  
   125  			assert.NotEmpty(t, received.Attributes().ReqMsgID)
   126  			assert.Equal(t, commandID, received.Attributes().ReqMsgID)
   127  
   128  			// End the session
   129  			assert.NoError(t, endSession(t, cli, cancelFunc, subErrCh))
   130  
   131  			return ctx
   132  		}).
   133  		Test("Nack response from non-target session", func(ctx f2.Context, t *testing.T) f2.Context {
   134  			// Imports
   135  			ps := pstest.FromContextT(ctx, t)
   136  
   137  			// Start session
   138  			err := cli.Connect(ctx, projectID, bannerID, storeID, terminalID)
   139  			assert.NoError(t, err)
   140  
   141  			// Start subscription on target side
   142  			respData := defaultRespData
   143  			respAttr := createAttributes(bannerID, storeID, "wrong", "")
   144  			respAttr["sessionId"] = "othersessionID"
   145  
   146  			subCtx, cancelFunc := context.WithCancel(ctx)
   147  			handler := receiverHandler(subCtx, ps, senderTopicID, respData, respAttr, handlerErrCh)
   148  			go receiverSubscribe(subCtx, ps, senderSubID, handler, subErrCh)
   149  
   150  			// Send command
   151  			_, err = cli.Send(defaultCommand)
   152  			assert.NoError(t, err)
   153  
   154  			//Assertions
   155  			assert.Nil(t, <-handlerErrCh)
   156  
   157  			assert.Never(t, func() bool {
   158  				return len(cli.GetDisplayChannel()) > 0
   159  			}, 1*time.Second, 100*time.Millisecond)
   160  
   161  			// End the session
   162  			assert.NoError(t, endSession(t, cli, cancelFunc, subErrCh))
   163  
   164  			return ctx
   165  		}).
   166  		Teardown("Delete Mock Gcloud", func(ctx f2.Context, t *testing.T) f2.Context {
   167  			// gcloudDir could potentially be nil, but this will not cause any issues
   168  			err := os.RemoveAll(gcloudDir)
   169  			assert.NoError(t, err)
   170  			return ctx
   171  		}).
   172  		Feature()
   173  
   174  	f.Test(t, feature)
   175  }
   176  
   177  func mockGcloud(t *testing.T) (dir string, err error) {
   178  	dir, err = os.MkdirTemp("", "")
   179  	if err != nil {
   180  		return "", err
   181  	}
   182  	f, err := os.Create(fmt.Sprintf("%s/%s", dir, "gcloud"))
   183  	if err != nil {
   184  		return "", err
   185  	}
   186  	_, err = f.WriteString("#!/bin/sh\n for var in '$@'; do :; done; echo my-random-gcloud-id@ncr.com")
   187  	if err != nil {
   188  		return "", err
   189  	}
   190  	err = os.Chmod(f.Name(), 0770)
   191  	if err != nil {
   192  		return "", err
   193  	}
   194  	f.Close() // Need to close the file in order for it to be executable
   195  	t.Setenv("PATH", dir+":"+os.Getenv("PATH"))
   196  	return dir, nil
   197  }
   198  
   199  func fillTemplate(template, projectID, bannerID, storeID, terminalID string) (result string) {
   200  	result = template
   201  	result = strings.Replace(result, "<PROJECT_ID>", projectID, -1)
   202  	result = strings.Replace(result, "<BANNER_ID>", bannerID, -1)
   203  	result = strings.Replace(result, "<STORE_ID>", storeID, -1)
   204  	result = strings.Replace(result, "<TERMINAL_ID>", terminalID, -1)
   205  	return result
   206  }
   207  
   208  func createTestMessageService(ctx context.Context, b *bytes.Buffer) (*msgsvc.MessageService, error) {
   209  	log := fog.New(fog.To(b))
   210  	subCtx := fog.IntoContext(ctx, log)
   211  	ms, err := msgsvc.NewMessageService(subCtx)
   212  	if err != nil {
   213  		return nil, err
   214  	}
   215  	return ms, nil
   216  }
   217  
   218  func createAttributes(bannerID, storeID, terminalID, reqmsgID string) map[string]string {
   219  	return map[string]string{
   220  		"bannerId":             bannerID,
   221  		"storeId":              storeID,
   222  		"terminalId":           terminalID,
   223  		"identity":             "identity",
   224  		"version":              "1.0",
   225  		"signature":            "signature",
   226  		"request-message-uuid": reqmsgID,
   227  	}
   228  }
   229  
   230  func receiverHandler(
   231  	subCtx context.Context,
   232  	ps *pstest.PubSub,
   233  	topicID, data string,
   234  	attr map[string]string,
   235  	errCh chan error,
   236  ) func(context.Context, *pubsub.Message) {
   237  	return func(_ context.Context, m *pubsub.Message) {
   238  		attr["request-message-uuid"] = m.Attributes["commandId"]
   239  
   240  		if _, ok := attr["sessionId"]; !ok {
   241  			attr["sessionId"] = m.Attributes["sessionId"]
   242  		}
   243  
   244  		_, err := ps.Publish(subCtx, topicID, data, attr)
   245  		errCh <- err
   246  		m.Ack()
   247  	}
   248  }
   249  
   250  func receiverSubscribe(
   251  	subCtx context.Context,
   252  	ps *pstest.PubSub,
   253  	subID string,
   254  	handler func(context.Context, *pubsub.Message),
   255  	errCh chan error,
   256  ) {
   257  	err := ps.Subscribe(subCtx, subID, handler)
   258  	errCh <- err
   259  }
   260  
   261  func endSession(t *testing.T, cli cliservice.CLIService, cancelFunc context.CancelFunc, subErrCh chan error) error {
   262  	cancelFunc()
   263  	err := cli.End()
   264  	if err != nil {
   265  		// It's ok for the session to not exist as it may have been previously cleaned up
   266  		assert.ErrorContains(t, err, "unknown session ID")
   267  	}
   268  
   269  	// Ensure that all goroutines are closed before ending test
   270  	return <-subErrCh
   271  }
   272  

View as plain text