1 package remotecli
2
3 import (
4 "context"
5 "errors"
6 "fmt"
7 "strings"
8 "sync"
9
10 "edge-infra.dev/pkg/lib/fog"
11 "edge-infra.dev/pkg/sds/emergencyaccess/eaconst"
12 "edge-infra.dev/pkg/sds/emergencyaccess/msgdata"
13 "edge-infra.dev/pkg/sds/lib/set"
14 )
15
16 var (
17
18 errUnknownSessionID = errors.New("unknown session ID")
19
20 ErrSessionActive = errors.New("cannot start existing session")
21 ErrUnknownSession = errors.New("invalid session id")
22 )
23
24
25 type MsgSvc interface {
26 Subscribe(ctx context.Context, subscriptionID string, projectID string,
27 handler func(context.Context, msgdata.CommandResponse), filter map[string]string) error
28 Publish(ctx context.Context, topic string, projectID string, message msgdata.Request) error
29 StopPublish(topic string, projectID string)
30 }
31
32 type sessionData struct {
33 target Target
34
35
36
37
38 postedTopics set.Set[string]
39
40 displayChan chan<- msgdata.CommandResponse
41 subscriptionID string
42 }
43
44 type subscriptionData struct {
45 sessions set.Set[string]
46 cancelFunc context.CancelFunc
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61 subscriptionEnded chan<- struct{}
62 }
63
64 type topicData struct {
65
66
67 sessions set.Set[string]
68 }
69
70 type RemoteCLI struct {
71 context context.Context
72 msgService MsgSvc
73 sessionData map[string]sessionData
74 subscriptionData map[string]subscriptionData
75 topicData map[string]topicData
76 sessionLock *sync.RWMutex
77 }
78
79
80 type Target interface {
81
82
83 ProjectID() string
84
85 BannerID() string
86
87 StoreID() string
88
89
90 TerminalID() string
91 }
92
93 type TargetError []error
94
95 func (myerr TargetError) Error() string {
96 var msg []string
97 for _, err := range myerr {
98 msg = append(msg, err.Error())
99 }
100 return strings.Join(msg, ", ")
101 }
102
103 func validateTarget(target Target) error {
104 errs := TargetError{}
105 if target.ProjectID() == "" {
106 errs = append(errs, errors.New("target missing project id"))
107 }
108 if target.BannerID() == "" {
109 errs = append(errs, errors.New("target missing banner id"))
110 }
111 if target.StoreID() == "" {
112 errs = append(errs, errors.New("target missing store id"))
113 }
114 if target.TerminalID() == "" {
115 errs = append(errs, errors.New("target missing terminal id"))
116 }
117
118 if len(errs) != 0 {
119 return errs
120 }
121 return nil
122 }
123
124 type templateConfig struct {
125 template *string
126 }
127
128 type RCLIOption = func(config *templateConfig)
129
130
131 func WithOptionalTemplate(template string) RCLIOption {
132 return func(config *templateConfig) {
133 config.template = &template
134 }
135 }
136
137
138
139
140
141 func New(ctx context.Context, ms MsgSvc) *RemoteCLI {
142 rcli := &RemoteCLI{
143 context: ctx,
144 msgService: ms,
145 sessionLock: &sync.RWMutex{},
146
147 sessionData: make(map[string]sessionData),
148 subscriptionData: make(map[string]subscriptionData),
149 topicData: make(map[string]topicData),
150 }
151 return rcli
152 }
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167 func (rcli *RemoteCLI) StartSession(ctx context.Context, sessionID string, displayChan chan<- msgdata.CommandResponse, target Target, opts ...RCLIOption) error {
168 log := fog.FromContext(ctx)
169
170 log.Info("Validating target")
171 if err := validateTarget(target); err != nil {
172 return err
173 }
174
175
176 subscriptionID := fillTemplate(target, eaconst.DefaultSubTemplate, createOptionalConfig(opts))
177
178
179
180 rcli.sessionLock.Lock()
181 defer rcli.sessionLock.Unlock()
182
183 if seshData, ok := rcli.sessionData[sessionID]; ok {
184 log.Info("Session already exists associated with subscription", "sessionID", sessionID, "subscriptionID", seshData.subscriptionID)
185 return ErrSessionActive
186 }
187
188 log.Info("Creating and caching subscription", "subscriptionID", subscriptionID)
189 rcli.createSubscription(ctx, subscriptionID, target)
190
191
192
193 go rcli.contextSessionCleanup(ctx, sessionID)
194
195
196 session := sessionData{
197 target: target,
198 displayChan: displayChan,
199 subscriptionID: subscriptionID,
200 postedTopics: set.Set[string]{},
201 }
202 rcli.sessionData[sessionID] = session
203
204 if ok := rcli.subscriptionData[subscriptionID].sessions.HasMember(sessionID); ok {
205
206 log.Error(nil, "Subscription already associated with the given session", "subscriptionID", subscriptionID)
207 }
208 rcli.subscriptionData[subscriptionID].sessions.Add(sessionID)
209
210 log.Info("Session registered to subscription", "subscriptionID", subscriptionID, "sessionID", sessionID)
211
212 return nil
213 }
214
215
216
217 func (rcli *RemoteCLI) contextSessionCleanup(ctx context.Context, sessionID string) {
218
219 <-ctx.Done()
220 log := fog.FromContext(ctx, "sessionID", sessionID)
221
222 log.V(1).Info("Context finished, cleaning up session")
223
224 err := rcli.EndSession(ctx, sessionID)
225 if err == nil {
226 return
227 }
228
229 if errors.Is(err, errUnknownSessionID) {
230 log.V(1).Info("Session already finished")
231 return
232 }
233
234 log.Error(err, "Error occurred while ending session.")
235 }
236
237
238
239 func (rcli *RemoteCLI) createSubscription(ctx context.Context, subscriptionID string, target Target) {
240 log := fog.FromContext(ctx)
241
242 if _, ok := rcli.subscriptionData[subscriptionID]; ok {
243 log.V(1).Info("Existing subscription found, reusing", "subscriptionID", subscriptionID)
244 return
245 }
246 log.Info("No existing subscription found, creating new subscription", "subscriptionID", subscriptionID)
247
248 filter := map[string]string{
249 "bannerId": target.BannerID(),
250 "storeId": target.StoreID(),
251 }
252
253
254
255
256 subCtx, subCancelFunc := context.WithCancel(rcli.context)
257
258 subscriptionEnded := make(chan struct{})
259
260 subData := subscriptionData{
261 sessions: set.Set[string]{},
262 cancelFunc: subCancelFunc,
263 subscriptionEnded: subscriptionEnded,
264 }
265
266 go func() {
267 log := fog.FromContext(rcli.context, "subscriptionID", subscriptionID)
268 log.Info("Starting new subscription")
269 err := rcli.msgService.Subscribe(subCtx, subscriptionID, target.ProjectID(), rcli.handler(), filter)
270 if err != nil {
271 log.Error(err, "subscription unexpectedly closed")
272 } else {
273
274 log.V(1).Info("Subscription ended")
275 }
276
277 rcli.subscriptionCleanup(subscriptionID, subscriptionEnded)
278
279
280 subCancelFunc()
281 }()
282
283 rcli.subscriptionData[subscriptionID] = subData
284 }
285
286
287
288
289
290
291 func (rcli *RemoteCLI) subscriptionCleanup(subscriptionID string, subscriptionCleaned <-chan struct{}) {
292 log := fog.FromContext(rcli.context, "subscriptionID", subscriptionID)
293 ctx := fog.IntoContext(rcli.context, log)
294
295 rcli.sessionLock.Lock()
296 defer rcli.sessionLock.Unlock()
297
298 select {
299 case <-subscriptionCleaned:
300
301
302 return
303 default:
304
305 }
306
307 subData, ok := rcli.subscriptionData[subscriptionID]
308 if !ok {
309
310
311 log.Error(nil, "no subscription to cleanup")
312 return
313 }
314
315 for sessionID := range subData.sessions {
316 seshData, ok := rcli.sessionData[sessionID]
317 if !ok {
318
319 log.Error(nil, "session data not found but associated with subscription", "sessionID", sessionID)
320 }
321
322 close(seshData.displayChan)
323 err := rcli.cleanupTopics(ctx, sessionID, seshData)
324 if err != nil {
325 log.Error(err, "error cleaning up topics")
326 }
327
328 delete(rcli.sessionData, sessionID)
329 }
330
331 delete(rcli.subscriptionData, subscriptionID)
332 }
333
334
335
336
337 func (rcli *RemoteCLI) Send(
338 ctx context.Context,
339 userID string,
340 sessionID string,
341 commandID string,
342 request msgdata.Request,
343 opts ...RCLIOption,
344 ) error {
345 log := fog.FromContext(ctx, "userID", userID)
346 ctx = fog.IntoContext(ctx, log)
347
348 if commandID == "" {
349 return fmt.Errorf("command id not supplied")
350 }
351
352
353
354 rcli.sessionLock.Lock()
355 defer rcli.sessionLock.Unlock()
356 log.Info("Retrieving session from cache")
357 sessionData, ok := rcli.sessionData[sessionID]
358 if !ok {
359 log.Info("Could not find target in map using input session id")
360 return ErrUnknownSession
361 }
362
363 topicID := fillTemplate(sessionData.target, eaconst.DefaultTopTemplate, createOptionalConfig(opts))
364
365 addAttributes(request, sessionData.target, sessionID, userID, commandID)
366
367
368
369
370 log.Info("Adding session to list of active subscriptions of topic", "topicID", topicID)
371 if _, ok := rcli.topicData[topicID]; !ok {
372 rcli.topicData[topicID] = topicData{sessions: set.Set[string]{}}
373 }
374 rcli.topicData[topicID].sessions.Add(sessionID)
375 sessionData.postedTopics.Add(topicID)
376
377
378 log.Info("Sending command")
379 return rcli.msgService.Publish(ctx, topicID, sessionData.target.ProjectID(), request)
380 }
381
382
383
384
385 func (rcli *RemoteCLI) EndSession(ctx context.Context, sessionID string) error {
386 log := fog.FromContext(ctx, "sessionID", sessionID)
387 ctx = fog.IntoContext(ctx, log)
388
389 rcli.sessionLock.Lock()
390 defer rcli.sessionLock.Unlock()
391
392 log.Info("Retrieving session data")
393 seshData, ok := rcli.sessionData[sessionID]
394 if !ok {
395
396
397 return errUnknownSessionID
398 }
399
400 log.Info("Retrieving subscription data", "subscriptionID", seshData.subscriptionID)
401 subData, ok := rcli.subscriptionData[seshData.subscriptionID]
402 if !ok {
403
404 log.Info("Subscription not found for given session", "subscriptionID", seshData.subscriptionID)
405 return fmt.Errorf("unknown subscription for session")
406 }
407
408 if ok := subData.sessions.HasMember(sessionID); !ok {
409
410 return fmt.Errorf("inactive subscription for session")
411 }
412
413 log.Info("Removing session")
414 subData.sessions.Remove(sessionID)
415 if len(subData.sessions) == 0 {
416
417 log.V(1).Info("No active sessions for subscription, cleaning up subscription", "subscriptionID", seshData.subscriptionID)
418 subData.cancelFunc()
419
420
421
422 close(subData.subscriptionEnded)
423
424
425
426 delete(rcli.subscriptionData, seshData.subscriptionID)
427 }
428
429 if err := rcli.cleanupTopics(ctx, sessionID, seshData); err != nil {
430 return err
431 }
432
433 close(seshData.displayChan)
434 delete(rcli.sessionData, sessionID)
435 log.Info("Session stopped")
436
437 return nil
438 }
439
440
441
442
443 func (rcli *RemoteCLI) cleanupTopics(ctx context.Context, sessionID string, seshData sessionData) error {
444
445
446 log := fog.FromContext(ctx)
447
448
449
450
451 for topicID := range seshData.postedTopics {
452 log := log.WithValues("topicID", topicID)
453 topData, ok := rcli.topicData[topicID]
454 if !ok {
455
456
457 log.Info("Topic not found for given session")
458 return fmt.Errorf("unknown topic for session")
459 }
460
461 topData.sessions.Remove(sessionID)
462 if len(topData.sessions) == 0 {
463
464 rcli.msgService.StopPublish(topicID, seshData.target.ProjectID())
465 }
466
467 seshData.postedTopics.Remove(topicID)
468 }
469
470 return nil
471 }
472
473
474
475 func (rcli *RemoteCLI) handler() func(context.Context, msgdata.CommandResponse) {
476 return func(ctx context.Context, msg msgdata.CommandResponse) {
477 rcli.sessionLock.RLock()
478 defer rcli.sessionLock.RUnlock()
479
480 sessionID := msg.Attributes().SessionID
481 seshData, ok := rcli.sessionData[sessionID]
482 if !ok {
483
484
485
486
487
488
489
490
491
492 return
493 }
494
495 select {
496 case <-ctx.Done():
497 return
498 case seshData.displayChan <- msg:
499 }
500 }
501 }
502
503 func createOptionalConfig(opts []RCLIOption) *templateConfig {
504 config := templateConfig{}
505 for _, opt := range opts {
506 opt(&config)
507 }
508 if config == (templateConfig{}) {
509 return nil
510 }
511 return &config
512 }
513
514 func fillTemplate(target Target, defaultTemplate string, config *templateConfig) (result string) {
515 result = defaultTemplate
516 if config != nil {
517 if config.template != nil {
518 result = *config.template
519 }
520
521 }
522
523 result = strings.Replace(result, "<PROJECT_ID>", target.ProjectID(), -1)
524 result = strings.Replace(result, "<BANNER_ID>", target.BannerID(), -1)
525 result = strings.Replace(result, "<STORE_ID>", target.StoreID(), -1)
526 result = strings.Replace(result, "<TERMINAL_ID>", target.TerminalID(), -1)
527 return result
528 }
529
530 func addAttributes(request msgdata.Request, t Target, sessionID string, userID string, commandID string) {
531 request.AddAttribute(eaconst.BannerIDKey, t.BannerID())
532 request.AddAttribute(eaconst.StoreIDKey, t.StoreID())
533 request.AddAttribute(eaconst.TerminalIDKey, t.TerminalID())
534 request.AddAttribute(eaconst.SessionIDKey, sessionID)
535 request.AddAttribute(eaconst.IdentityKey, userID)
536 request.AddAttribute(eaconst.CommandIDKey, commandID)
537 }
538
View as plain text