1 package integration
2
3 import (
4 "bytes"
5 "context"
6 "fmt"
7 "os"
8 "strings"
9 "testing"
10 "time"
11
12 "cloud.google.com/go/pubsub"
13 "github.com/stretchr/testify/assert"
14
15 "edge-infra.dev/pkg/lib/fog"
16 "edge-infra.dev/pkg/sds/emergencyaccess/cliservice"
17 "edge-infra.dev/pkg/sds/emergencyaccess/eaconst"
18 "edge-infra.dev/pkg/sds/emergencyaccess/msgdata"
19 "edge-infra.dev/pkg/sds/emergencyaccess/msgsvc"
20 "edge-infra.dev/test/f2"
21 "edge-infra.dev/test/f2/x/pstest"
22 )
23
24 var (
25 f f2.Framework
26
27 defaultProjectID = "project"
28 defaultBannerID = "banner"
29 defaultStoreID = "store"
30 defaultTerminalID = "terminal"
31
32 receiverTopicIDTemplate = eaconst.DefaultTopTemplate
33 receiverSubIDTemplate = eaconst.DefaultSubTemplate
34 defaultSenderTopicID = "sender-topic"
35 defaultSenderSubID = "sender-sub"
36
37 defaultCommand = "echo hello world"
38 defaultRespData = `
39 {
40 "type": "Output",
41 "exitCode": 0,
42 "output": "hello world",
43 "timestamp": "01-01-23 00:00:00",
44 "duration": 1
45 }`
46 )
47
48 func TestMain(m *testing.M) {
49 f = f2.New(context.Background(), f2.WithExtensions(pstest.New(pstest.WithProjectID(defaultProjectID)))).
50 Component("remotecli")
51 os.Exit(f.Run(m))
52 }
53
54 func TestPubSub(t *testing.T) {
55 var (
56 projectID, bannerID, storeID, terminalID = defaultProjectID, defaultBannerID, defaultStoreID, defaultTerminalID
57
58 senderTopicID, senderSubID = defaultSenderTopicID, defaultSenderSubID
59
60 receiverTopicID = fillTemplate(receiverTopicIDTemplate, projectID, bannerID, storeID, terminalID)
61 receiverSubID = fillTemplate(receiverSubIDTemplate, projectID, bannerID, storeID, terminalID)
62
63 gcloudDir string
64
65 ms *msgsvc.MessageService
66 buf bytes.Buffer
67 cli cliservice.CLIService
68
69 handlerErrCh, subErrCh = make(chan error), make(chan error)
70 )
71 feature := f2.NewFeature("PubSub").
72 Setup("Create sender Topic", pstest.WithNewTopic(senderTopicID)).
73 Setup("Create receiver Topic", pstest.WithNewTopic(receiverTopicID)).
74 Setup("Create sender Sub", pstest.WithNewSubscription(senderSubID, receiverTopicID)).
75 Setup("Create receiver Sub", pstest.WithNewSubscription(receiverSubID, senderTopicID)).
76 Setup("Create Message Service", func(ctx f2.Context, t *testing.T) f2.Context {
77 var err error
78 ms, err = createTestMessageService(ctx, &buf)
79 assert.NoError(t, err)
80 return ctx
81 }).
82 Setup("Create CLI Service", func(ctx f2.Context, _ *testing.T) f2.Context {
83 cli = cliservice.NewCLIService(ctx, ms)
84
85 cli.DisablePerSessionSubscription()
86 return ctx
87 }).
88 Setup("Retrieve Identity Mock Gcloud", func(ctx f2.Context, t *testing.T) f2.Context {
89 var err error
90 gcloudDir, err = mockGcloud(t)
91 assert.NoError(t, err)
92
93 err = cli.RetrieveIdentity(ctx)
94 assert.NoError(t, err)
95 return ctx
96 }).
97 Test("Successfully receive a published message", func(ctx f2.Context, t *testing.T) f2.Context {
98
99 ps := pstest.FromContextT(ctx, t)
100
101
102 err := cli.Connect(ctx, projectID, bannerID, storeID, terminalID)
103 assert.NoError(t, err)
104
105
106 respData, respAttr := defaultRespData, createAttributes(bannerID, storeID, terminalID, "")
107 subCtx, cancelFunc := context.WithCancel(ctx)
108 handler := receiverHandler(subCtx, ps, senderTopicID, respData, respAttr, handlerErrCh)
109 go receiverSubscribe(subCtx, ps, senderSubID, handler, subErrCh)
110
111
112 commandID, err := cli.Send(defaultCommand)
113 assert.NoError(t, err)
114
115
116 assert.Nil(t, <-handlerErrCh)
117
118 received := <-cli.GetDisplayChannel()
119 assert.NotNil(t, received)
120
121 expected, err := msgdata.NewCommandResponse([]byte(respData), respAttr)
122 assert.NoError(t, err)
123 assert.Equal(t, received, expected)
124
125 assert.NotEmpty(t, received.Attributes().ReqMsgID)
126 assert.Equal(t, commandID, received.Attributes().ReqMsgID)
127
128
129 assert.NoError(t, endSession(t, cli, cancelFunc, subErrCh))
130
131 return ctx
132 }).
133 Test("Nack response from non-target session", func(ctx f2.Context, t *testing.T) f2.Context {
134
135 ps := pstest.FromContextT(ctx, t)
136
137
138 err := cli.Connect(ctx, projectID, bannerID, storeID, terminalID)
139 assert.NoError(t, err)
140
141
142 respData := defaultRespData
143 respAttr := createAttributes(bannerID, storeID, "wrong", "")
144 respAttr["sessionId"] = "othersessionID"
145
146 subCtx, cancelFunc := context.WithCancel(ctx)
147 handler := receiverHandler(subCtx, ps, senderTopicID, respData, respAttr, handlerErrCh)
148 go receiverSubscribe(subCtx, ps, senderSubID, handler, subErrCh)
149
150
151 _, err = cli.Send(defaultCommand)
152 assert.NoError(t, err)
153
154
155 assert.Nil(t, <-handlerErrCh)
156
157 assert.Never(t, func() bool {
158 return len(cli.GetDisplayChannel()) > 0
159 }, 1*time.Second, 100*time.Millisecond)
160
161
162 assert.NoError(t, endSession(t, cli, cancelFunc, subErrCh))
163
164 return ctx
165 }).
166 Teardown("Delete Mock Gcloud", func(ctx f2.Context, t *testing.T) f2.Context {
167
168 err := os.RemoveAll(gcloudDir)
169 assert.NoError(t, err)
170 return ctx
171 }).
172 Feature()
173
174 f.Test(t, feature)
175 }
176
177 func mockGcloud(t *testing.T) (dir string, err error) {
178 dir, err = os.MkdirTemp("", "")
179 if err != nil {
180 return "", err
181 }
182 f, err := os.Create(fmt.Sprintf("%s/%s", dir, "gcloud"))
183 if err != nil {
184 return "", err
185 }
186 _, err = f.WriteString("#!/bin/sh\n for var in '$@'; do :; done; echo my-random-gcloud-id@ncr.com")
187 if err != nil {
188 return "", err
189 }
190 err = os.Chmod(f.Name(), 0770)
191 if err != nil {
192 return "", err
193 }
194 f.Close()
195 t.Setenv("PATH", dir+":"+os.Getenv("PATH"))
196 return dir, nil
197 }
198
199 func fillTemplate(template, projectID, bannerID, storeID, terminalID string) (result string) {
200 result = template
201 result = strings.Replace(result, "<PROJECT_ID>", projectID, -1)
202 result = strings.Replace(result, "<BANNER_ID>", bannerID, -1)
203 result = strings.Replace(result, "<STORE_ID>", storeID, -1)
204 result = strings.Replace(result, "<TERMINAL_ID>", terminalID, -1)
205 return result
206 }
207
208 func createTestMessageService(ctx context.Context, b *bytes.Buffer) (*msgsvc.MessageService, error) {
209 log := fog.New(fog.To(b))
210 subCtx := fog.IntoContext(ctx, log)
211 ms, err := msgsvc.NewMessageService(subCtx)
212 if err != nil {
213 return nil, err
214 }
215 return ms, nil
216 }
217
218 func createAttributes(bannerID, storeID, terminalID, reqmsgID string) map[string]string {
219 return map[string]string{
220 "bannerId": bannerID,
221 "storeId": storeID,
222 "terminalId": terminalID,
223 "identity": "identity",
224 "version": "1.0",
225 "signature": "signature",
226 "request-message-uuid": reqmsgID,
227 }
228 }
229
230 func receiverHandler(
231 subCtx context.Context,
232 ps *pstest.PubSub,
233 topicID, data string,
234 attr map[string]string,
235 errCh chan error,
236 ) func(context.Context, *pubsub.Message) {
237 return func(_ context.Context, m *pubsub.Message) {
238 attr["request-message-uuid"] = m.Attributes["commandId"]
239
240 if _, ok := attr["sessionId"]; !ok {
241 attr["sessionId"] = m.Attributes["sessionId"]
242 }
243
244 _, err := ps.Publish(subCtx, topicID, data, attr)
245 errCh <- err
246 m.Ack()
247 }
248 }
249
250 func receiverSubscribe(
251 subCtx context.Context,
252 ps *pstest.PubSub,
253 subID string,
254 handler func(context.Context, *pubsub.Message),
255 errCh chan error,
256 ) {
257 err := ps.Subscribe(subCtx, subID, handler)
258 errCh <- err
259 }
260
261 func endSession(t *testing.T, cli cliservice.CLIService, cancelFunc context.CancelFunc, subErrCh chan error) error {
262 cancelFunc()
263 err := cli.End()
264 if err != nil {
265
266 assert.ErrorContains(t, err, "unknown session ID")
267 }
268
269
270 return <-subErrCh
271 }
272
View as plain text