package remotecli import ( "context" "errors" "fmt" "strings" "sync" "edge-infra.dev/pkg/lib/fog" "edge-infra.dev/pkg/sds/emergencyaccess/eaconst" "edge-infra.dev/pkg/sds/emergencyaccess/msgdata" "edge-infra.dev/pkg/sds/lib/set" ) var ( // Unknown session when calling EndSession errUnknownSessionID = errors.New("unknown session ID") ErrSessionActive = errors.New("cannot start existing session") ErrUnknownSession = errors.New("invalid session id") ) // Interface required when creating a RemoteCLI struct to allow remote cli to send and receive messages type MsgSvc interface { Subscribe(ctx context.Context, subscriptionID string, projectID string, handler func(context.Context, msgdata.CommandResponse), filter map[string]string) error Publish(ctx context.Context, topic string, projectID string, message msgdata.Request) error StopPublish(topic string, projectID string) } type sessionData struct { target Target // Used to keep track of all of the topics that the given session has posted // to in Send. These can then all be cleaned up in EndSession. Ideally would // be only one topic in the cache, but opts in Send allows changing the topic postedTopics set.Set[string] displayChan chan<- msgdata.CommandResponse subscriptionID string } type subscriptionData struct { sessions set.Set[string] cancelFunc context.CancelFunc // When cleaning up an unused subscription in EndSession, the cancelFunc is // called under a Write Lock of the RemoteCLI's sessionLock, to ensure a new // identical subscription cannot be created as one is removed. However when // a subscription ends there is a call to subscriptionCleanup which // necessarily runs after EndSession exits. If another identical // subscription starts up before subscriptionCleanup can claim the lock, the // call to subscriptionCleanup will clear the RemoteCLI's sessionData for // the subsequent session. // We can stop this by ensuring subscriptionCleanup only runs if EndSession // has not cleaned up the subscription beforehand. EndSession can close the // subscriptionEnded channel when cleaning up a subscription, and // subscriptionCleanup can use the closed channel as indication there is no // work for it to do. subscriptionEnded chan<- struct{} } type topicData struct { // Used to maintain a set of active subscriptions that have posted to the // topic. Can be used to close the topic when the set is empty sessions set.Set[string] } type RemoteCLI struct { context context.Context msgService MsgSvc sessionData map[string]sessionData subscriptionData map[string]subscriptionData topicData map[string]topicData sessionLock *sync.RWMutex } // Specifies the target location that requests are sent to. type Target interface { // This is the GCP Project ID that holds the topics and subscriptions required by remote-cli to // communicate to the intended target terminal ProjectID() string // This is the target banner ID with which remote-cli intends to communicate BannerID() string // This is the target store ID with which remote-cli intends to communicate StoreID() string // This is the target terminal ID that remote-cli intends to send commands to and from which // it expects to receive responses TerminalID() string } type TargetError []error func (myerr TargetError) Error() string { var msg []string for _, err := range myerr { msg = append(msg, err.Error()) } return strings.Join(msg, ", ") } func validateTarget(target Target) error { errs := TargetError{} if target.ProjectID() == "" { errs = append(errs, errors.New("target missing project id")) } if target.BannerID() == "" { errs = append(errs, errors.New("target missing banner id")) } if target.StoreID() == "" { errs = append(errs, errors.New("target missing store id")) } if target.TerminalID() == "" { errs = append(errs, errors.New("target missing terminal id")) } if len(errs) != 0 { return errs } return nil } type templateConfig struct { template *string } type RCLIOption = func(config *templateConfig) // Overrides default template string func WithOptionalTemplate(template string) RCLIOption { return func(config *templateConfig) { config.template = &template } } // API // Creates a new RemoteCLI struct that can be used to send terminal commands and // receive responses from message service func New(ctx context.Context, ms MsgSvc) *RemoteCLI { rcli := &RemoteCLI{ context: ctx, msgService: ms, sessionLock: &sync.RWMutex{}, sessionData: make(map[string]sessionData), subscriptionData: make(map[string]subscriptionData), topicData: make(map[string]topicData), } return rcli } // Makes a request to message service to start a remote cli session with the target node // Response messages for a given session on a subscription are sent on the // display channel. This channel is closed when the subscription ends, which may // happen when the context is done, [RemoteCLI.EndSession] is called, or an unexpected // error occurs on the subscription // // A session must be closed using [RemoteCLI.EndSession] or by cancelling the // context when no more messages or responses are expected, unless the displayChan // has already been closed by remotecli. // // If an error is returned from StartSession, the session and subscription is never // started and remotecli will not use the dispalyChan in any way. It is the callers // responsibility to clean up resources used by the displayChan. func (rcli *RemoteCLI) StartSession(ctx context.Context, sessionID string, displayChan chan<- msgdata.CommandResponse, target Target, opts ...RCLIOption) error { log := fog.FromContext(ctx) log.Info("Validating target") if err := validateTarget(target); err != nil { return err } // Generate subscription ID using template subscriptionID := fillTemplate(target, eaconst.DefaultSubTemplate, createOptionalConfig(opts)) // Call message service to subscribe to subscription using subscriptionID and a default handler rcli.sessionLock.Lock() defer rcli.sessionLock.Unlock() if seshData, ok := rcli.sessionData[sessionID]; ok { log.Info("Session already exists associated with subscription", "sessionID", sessionID, "subscriptionID", seshData.subscriptionID) return ErrSessionActive } log.Info("Creating and caching subscription", "subscriptionID", subscriptionID) rcli.createSubscription(ctx, subscriptionID, target) // Set up a goroutine which will be used to clean up the session if the // context is done go rcli.contextSessionCleanup(ctx, sessionID) // Now add the new session to the existing subscription session := sessionData{ target: target, displayChan: displayChan, subscriptionID: subscriptionID, postedTopics: set.Set[string]{}, } rcli.sessionData[sessionID] = session if ok := rcli.subscriptionData[subscriptionID].sessions.HasMember(sessionID); ok { // This should not happen, indicates a bug log.Error(nil, "Subscription already associated with the given session", "subscriptionID", subscriptionID) } rcli.subscriptionData[subscriptionID].sessions.Add(sessionID) log.Info("Session registered to subscription", "subscriptionID", subscriptionID, "sessionID", sessionID) return nil } // Ran in a goroutine, waits for the context Done and calls EndSession for the given // session ID to ensure all caches are cleaned up func (rcli *RemoteCLI) contextSessionCleanup(ctx context.Context, sessionID string) { // Wait for context done <-ctx.Done() log := fog.FromContext(ctx, "sessionID", sessionID) log.V(1).Info("Context finished, cleaning up session") err := rcli.EndSession(ctx, sessionID) if err == nil { return } if errors.Is(err, errUnknownSessionID) { log.V(1).Info("Session already finished") return } log.Error(err, "Error occurred while ending session.") } // Checks the existing subscription cache and creates a new subscription in a // goroutine if it is not present in the cache func (rcli *RemoteCLI) createSubscription(ctx context.Context, subscriptionID string, target Target) { log := fog.FromContext(ctx) if _, ok := rcli.subscriptionData[subscriptionID]; ok { log.V(1).Info("Existing subscription found, reusing", "subscriptionID", subscriptionID) return } log.Info("No existing subscription found, creating new subscription", "subscriptionID", subscriptionID) filter := map[string]string{ "bannerId": target.BannerID(), "storeId": target.StoreID(), } // subscription ctx must be a child of the remotecli ctx, not start session // ctx as a subscription may last longer than a particular session if multiple // sessions reuse a subscription subCtx, subCancelFunc := context.WithCancel(rcli.context) subscriptionEnded := make(chan struct{}) subData := subscriptionData{ sessions: set.Set[string]{}, cancelFunc: subCancelFunc, subscriptionEnded: subscriptionEnded, } go func() { log := fog.FromContext(rcli.context, "subscriptionID", subscriptionID) log.Info("Starting new subscription") err := rcli.msgService.Subscribe(subCtx, subscriptionID, target.ProjectID(), rcli.handler(), filter) if err != nil { log.Error(err, "subscription unexpectedly closed") } else { // Should only occur when subCancelFunc is called from EndSession log.V(1).Info("Subscription ended") } rcli.subscriptionCleanup(subscriptionID, subscriptionEnded) // Ensure context is not leaked subCancelFunc() }() rcli.subscriptionData[subscriptionID] = subData } // Called when a subscription has ended. Ensures all active sessions for subscription // are closed by closing display channel, removing sessions from cache and cleaning up // unused topics. // This is necessary to ensure correct cleanup when the subscription ends for a // reason other than an EndSession call, e.g. some error on the subscription. func (rcli *RemoteCLI) subscriptionCleanup(subscriptionID string, subscriptionCleaned <-chan struct{}) { log := fog.FromContext(rcli.context, "subscriptionID", subscriptionID) ctx := fog.IntoContext(rcli.context, log) rcli.sessionLock.Lock() defer rcli.sessionLock.Unlock() select { case <-subscriptionCleaned: // This indicates the subscription has already been cleaned by EndSession // and we can exit early return default: // Subscription exited for some other reason, clean up necessary } subData, ok := rcli.subscriptionData[subscriptionID] if !ok { // This should no longer be the case now that we have the // subscriptionCleaned channel, likely indicates a bug log.Error(nil, "no subscription to cleanup") return } for sessionID := range subData.sessions { seshData, ok := rcli.sessionData[sessionID] if !ok { // Bad, bug log.Error(nil, "session data not found but associated with subscription", "sessionID", sessionID) } close(seshData.displayChan) err := rcli.cleanupTopics(ctx, sessionID, seshData) if err != nil { log.Error(err, "error cleaning up topics") } delete(rcli.sessionData, sessionID) } delete(rcli.subscriptionData, subscriptionID) } // Send a [msgdata.Request] to the configured message service for the target // node of the given session to receive. Blocks until the message has // successfully sent. func (rcli *RemoteCLI) Send( ctx context.Context, userID string, sessionID string, commandID string, request msgdata.Request, opts ...RCLIOption, ) error { log := fog.FromContext(ctx, "userID", userID) ctx = fog.IntoContext(ctx, log) if commandID == "" { return fmt.Errorf("command id not supplied") } // TODO modifying session DATA and topic cache for topic cleanup, this shouldn't // be a read lock. Inefficient if using write lock for full method though rcli.sessionLock.Lock() defer rcli.sessionLock.Unlock() log.Info("Retrieving session from cache") sessionData, ok := rcli.sessionData[sessionID] if !ok { log.Info("Could not find target in map using input session id") return ErrUnknownSession } // Inject target and session info into request message format topicID := fillTemplate(sessionData.target, eaconst.DefaultTopTemplate, createOptionalConfig(opts)) addAttributes(request, sessionData.target, sessionID, userID, commandID) // Because of the way we build topicID in Send using opts, rather than once // during StartSession, it means a sessionID may be associated with multiple // topics. Keep track of all topics posted to so can clean up in EndSession. log.Info("Adding session to list of active subscriptions of topic", "topicID", topicID) if _, ok := rcli.topicData[topicID]; !ok { rcli.topicData[topicID] = topicData{sessions: set.Set[string]{}} } rcli.topicData[topicID].sessions.Add(sessionID) sessionData.postedTopics.Add(topicID) // Publish request to message service log.Info("Sending command") return rcli.msgService.Publish(ctx, topicID, sessionData.target.ProjectID(), request) } // Takes in a context and a session ID // Cleans up resources to do with a given session and stops receiving messages // for the session. Closes the display channel for the session. func (rcli *RemoteCLI) EndSession(ctx context.Context, sessionID string) error { log := fog.FromContext(ctx, "sessionID", sessionID) ctx = fog.IntoContext(ctx, log) rcli.sessionLock.Lock() defer rcli.sessionLock.Unlock() log.Info("Retrieving session data") seshData, ok := rcli.sessionData[sessionID] if !ok { // This may happen if called with an invalid session ID, or if a // subscription unexpectedly closed before EndSession is called return errUnknownSessionID } log.Info("Retrieving subscription data", "subscriptionID", seshData.subscriptionID) subData, ok := rcli.subscriptionData[seshData.subscriptionID] if !ok { // This should never happen log.Info("Subscription not found for given session", "subscriptionID", seshData.subscriptionID) return fmt.Errorf("unknown subscription for session") } if ok := subData.sessions.HasMember(sessionID); !ok { // This should never happen return fmt.Errorf("inactive subscription for session") } log.Info("Removing session") subData.sessions.Remove(sessionID) if len(subData.sessions) == 0 { // cleanup subscription log.V(1).Info("No active sessions for subscription, cleaning up subscription", "subscriptionID", seshData.subscriptionID) subData.cancelFunc() // indicate we have cleaned up the subscription and no further cleanup // is necessary close(subData.subscriptionEnded) // Must delete the key as the presence of the key is used to check if a // new subscription is needed in StartSession delete(rcli.subscriptionData, seshData.subscriptionID) } if err := rcli.cleanupTopics(ctx, sessionID, seshData); err != nil { return err } close(seshData.displayChan) delete(rcli.sessionData, sessionID) log.Info("Session stopped") return nil } // Used to cleanup all topics that a given session has posted to and close any // topics that no longer have any associated sessions // Callers are responsible for the thread safety of this function func (rcli *RemoteCLI) cleanupTopics(ctx context.Context, sessionID string, seshData sessionData) error { // Cannot lock in here as expected to be called from a function that already // has the lock log := fog.FromContext(ctx) // message Service StopPublish should be called whenever no further messages // are expected on the topic, check the cache and close topic if no other // active sessions have posted to the topic for topicID := range seshData.postedTopics { log := log.WithValues("topicID", topicID) topData, ok := rcli.topicData[topicID] if !ok { // This should not happen as topic and session are added to the // caches at the same time in Send log.Info("Topic not found for given session") return fmt.Errorf("unknown topic for session") } topData.sessions.Remove(sessionID) if len(topData.sessions) == 0 { // cleanup message service topic rcli.msgService.StopPublish(topicID, seshData.target.ProjectID()) } seshData.postedTopics.Remove(topicID) } return nil } // handler returns a messageservice handler function which can be used to direct // incoming messages on a given subscription to the appropriate display channel func (rcli *RemoteCLI) handler() func(context.Context, msgdata.CommandResponse) { return func(ctx context.Context, msg msgdata.CommandResponse) { rcli.sessionLock.RLock() defer rcli.sessionLock.RUnlock() sessionID := msg.Attributes().SessionID seshData, ok := rcli.sessionData[sessionID] if !ok { // TODO messages will be lost, fix // May happen if e.g. a long running command is sent and the // emulator disconnects. // Must return here or the select statement will block and cause a // deadlock due to the lock, stopping End and Start Session from // proceeding. // However returning ack's the message, which means any message will // be lost and reconnecting to an existing session will not pick up // messages sent in the meantime return } select { case <-ctx.Done(): return case seshData.displayChan <- msg: } } } func createOptionalConfig(opts []RCLIOption) *templateConfig { config := templateConfig{} for _, opt := range opts { opt(&config) } if config == (templateConfig{}) { return nil } return &config } func fillTemplate(target Target, defaultTemplate string, config *templateConfig) (result string) { result = defaultTemplate if config != nil { if config.template != nil { result = *config.template } // Potential other checks in future } result = strings.Replace(result, "", target.ProjectID(), -1) result = strings.Replace(result, "", target.BannerID(), -1) result = strings.Replace(result, "", target.StoreID(), -1) result = strings.Replace(result, "", target.TerminalID(), -1) return result } func addAttributes(request msgdata.Request, t Target, sessionID string, userID string, commandID string) { request.AddAttribute(eaconst.BannerIDKey, t.BannerID()) request.AddAttribute(eaconst.StoreIDKey, t.StoreID()) request.AddAttribute(eaconst.TerminalIDKey, t.TerminalID()) request.AddAttribute(eaconst.SessionIDKey, sessionID) request.AddAttribute(eaconst.IdentityKey, userID) request.AddAttribute(eaconst.CommandIDKey, commandID) }