...

Source file src/edge-infra.dev/pkg/sds/emergencyaccess/remotecli/remotecli.go

Documentation: edge-infra.dev/pkg/sds/emergencyaccess/remotecli

     1  package remotecli
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"fmt"
     7  	"strings"
     8  	"sync"
     9  
    10  	"edge-infra.dev/pkg/lib/fog"
    11  	"edge-infra.dev/pkg/sds/emergencyaccess/eaconst"
    12  	"edge-infra.dev/pkg/sds/emergencyaccess/msgdata"
    13  	"edge-infra.dev/pkg/sds/lib/set"
    14  )
    15  
    16  var (
    17  	// Unknown session when calling EndSession
    18  	errUnknownSessionID = errors.New("unknown session ID")
    19  
    20  	ErrSessionActive  = errors.New("cannot start existing session")
    21  	ErrUnknownSession = errors.New("invalid session id")
    22  )
    23  
    24  // Interface required when creating a RemoteCLI struct to allow remote cli to send and receive messages
    25  type MsgSvc interface {
    26  	Subscribe(ctx context.Context, subscriptionID string, projectID string,
    27  		handler func(context.Context, msgdata.CommandResponse), filter map[string]string) error
    28  	Publish(ctx context.Context, topic string, projectID string, message msgdata.Request) error
    29  	StopPublish(topic string, projectID string)
    30  }
    31  
    32  type sessionData struct {
    33  	target Target
    34  
    35  	// Used to keep track of all of the topics that the given session has posted
    36  	// to in Send. These can then all be cleaned up in EndSession. Ideally would
    37  	// be only one topic in the cache, but opts in Send allows changing the topic
    38  	postedTopics set.Set[string]
    39  
    40  	displayChan    chan<- msgdata.CommandResponse
    41  	subscriptionID string
    42  }
    43  
    44  type subscriptionData struct {
    45  	sessions   set.Set[string]
    46  	cancelFunc context.CancelFunc
    47  
    48  	// When cleaning up an unused subscription in EndSession, the cancelFunc is
    49  	// called under a Write Lock of the RemoteCLI's sessionLock, to ensure a new
    50  	// identical subscription cannot be created as one is removed. However when
    51  	// a subscription ends there is a call to subscriptionCleanup which
    52  	// necessarily runs after EndSession exits. If another identical
    53  	// subscription starts up before subscriptionCleanup can claim the lock, the
    54  	// call to subscriptionCleanup will clear the RemoteCLI's sessionData for
    55  	// the subsequent session.
    56  	// We can stop this by ensuring subscriptionCleanup only runs if EndSession
    57  	// has not cleaned up the subscription beforehand. EndSession can close the
    58  	// subscriptionEnded channel when cleaning up a subscription, and
    59  	// subscriptionCleanup can use the closed channel as indication there is no
    60  	// work for it to do.
    61  	subscriptionEnded chan<- struct{}
    62  }
    63  
    64  type topicData struct {
    65  	// Used to maintain a set of active subscriptions that have posted to the
    66  	// topic. Can be used to close the topic when the set is empty
    67  	sessions set.Set[string]
    68  }
    69  
    70  type RemoteCLI struct {
    71  	context          context.Context
    72  	msgService       MsgSvc
    73  	sessionData      map[string]sessionData
    74  	subscriptionData map[string]subscriptionData
    75  	topicData        map[string]topicData
    76  	sessionLock      *sync.RWMutex
    77  }
    78  
    79  // Specifies the target location that requests are sent to.
    80  type Target interface {
    81  	// This is the GCP Project ID that holds the topics and subscriptions required by remote-cli to
    82  	// communicate to the intended target terminal
    83  	ProjectID() string
    84  	// This is the target banner ID with which remote-cli intends to communicate
    85  	BannerID() string
    86  	// This is the target store ID with which remote-cli intends to communicate
    87  	StoreID() string
    88  	// This is the target terminal ID that remote-cli intends to send commands to and from which
    89  	// it expects to receive responses
    90  	TerminalID() string
    91  }
    92  
    93  type TargetError []error
    94  
    95  func (myerr TargetError) Error() string {
    96  	var msg []string
    97  	for _, err := range myerr {
    98  		msg = append(msg, err.Error())
    99  	}
   100  	return strings.Join(msg, ", ")
   101  }
   102  
   103  func validateTarget(target Target) error {
   104  	errs := TargetError{}
   105  	if target.ProjectID() == "" {
   106  		errs = append(errs, errors.New("target missing project id"))
   107  	}
   108  	if target.BannerID() == "" {
   109  		errs = append(errs, errors.New("target missing banner id"))
   110  	}
   111  	if target.StoreID() == "" {
   112  		errs = append(errs, errors.New("target missing store id"))
   113  	}
   114  	if target.TerminalID() == "" {
   115  		errs = append(errs, errors.New("target missing terminal id"))
   116  	}
   117  
   118  	if len(errs) != 0 {
   119  		return errs
   120  	}
   121  	return nil
   122  }
   123  
   124  type templateConfig struct {
   125  	template *string
   126  }
   127  
   128  type RCLIOption = func(config *templateConfig)
   129  
   130  // Overrides default template string
   131  func WithOptionalTemplate(template string) RCLIOption {
   132  	return func(config *templateConfig) {
   133  		config.template = &template
   134  	}
   135  }
   136  
   137  // API
   138  
   139  // Creates a new RemoteCLI struct that can be used to send terminal commands and
   140  // receive responses from message service
   141  func New(ctx context.Context, ms MsgSvc) *RemoteCLI {
   142  	rcli := &RemoteCLI{
   143  		context:     ctx,
   144  		msgService:  ms,
   145  		sessionLock: &sync.RWMutex{},
   146  
   147  		sessionData:      make(map[string]sessionData),
   148  		subscriptionData: make(map[string]subscriptionData),
   149  		topicData:        make(map[string]topicData),
   150  	}
   151  	return rcli
   152  }
   153  
   154  // Makes a request to message service to start a remote cli session with the target node
   155  // Response messages for a given session on a subscription are sent on the
   156  // display channel. This channel is closed when the subscription ends, which may
   157  // happen when the context is done, [RemoteCLI.EndSession] is called, or an unexpected
   158  // error occurs on the subscription
   159  //
   160  // A session must be closed using [RemoteCLI.EndSession] or by cancelling the
   161  // context when no more messages or responses are expected, unless the displayChan
   162  // has already been closed by remotecli.
   163  //
   164  // If an error is returned from StartSession, the session and subscription is never
   165  // started and remotecli will not use the dispalyChan in any way. It is the callers
   166  // responsibility to clean up resources used by the displayChan.
   167  func (rcli *RemoteCLI) StartSession(ctx context.Context, sessionID string, displayChan chan<- msgdata.CommandResponse, target Target, opts ...RCLIOption) error {
   168  	log := fog.FromContext(ctx)
   169  
   170  	log.Info("Validating target")
   171  	if err := validateTarget(target); err != nil {
   172  		return err
   173  	}
   174  
   175  	// Generate subscription ID using template
   176  	subscriptionID := fillTemplate(target, eaconst.DefaultSubTemplate, createOptionalConfig(opts))
   177  
   178  	// Call message service to subscribe to subscription using subscriptionID and a default handler
   179  
   180  	rcli.sessionLock.Lock()
   181  	defer rcli.sessionLock.Unlock()
   182  
   183  	if seshData, ok := rcli.sessionData[sessionID]; ok {
   184  		log.Info("Session already exists associated with subscription", "sessionID", sessionID, "subscriptionID", seshData.subscriptionID)
   185  		return ErrSessionActive
   186  	}
   187  
   188  	log.Info("Creating and caching subscription", "subscriptionID", subscriptionID)
   189  	rcli.createSubscription(ctx, subscriptionID, target)
   190  
   191  	// Set up a goroutine which will be used to clean up the session if the
   192  	// context is done
   193  	go rcli.contextSessionCleanup(ctx, sessionID)
   194  
   195  	// Now add the new session to the existing subscription
   196  	session := sessionData{
   197  		target:         target,
   198  		displayChan:    displayChan,
   199  		subscriptionID: subscriptionID,
   200  		postedTopics:   set.Set[string]{},
   201  	}
   202  	rcli.sessionData[sessionID] = session
   203  
   204  	if ok := rcli.subscriptionData[subscriptionID].sessions.HasMember(sessionID); ok {
   205  		// This should not happen, indicates a bug
   206  		log.Error(nil, "Subscription already associated with the given session", "subscriptionID", subscriptionID)
   207  	}
   208  	rcli.subscriptionData[subscriptionID].sessions.Add(sessionID)
   209  
   210  	log.Info("Session registered to subscription", "subscriptionID", subscriptionID, "sessionID", sessionID)
   211  
   212  	return nil
   213  }
   214  
   215  // Ran in a goroutine, waits for the context Done and calls EndSession for the given
   216  // session ID to ensure all caches are cleaned up
   217  func (rcli *RemoteCLI) contextSessionCleanup(ctx context.Context, sessionID string) {
   218  	// Wait for context done
   219  	<-ctx.Done()
   220  	log := fog.FromContext(ctx, "sessionID", sessionID)
   221  
   222  	log.V(1).Info("Context finished, cleaning up session")
   223  
   224  	err := rcli.EndSession(ctx, sessionID)
   225  	if err == nil {
   226  		return
   227  	}
   228  
   229  	if errors.Is(err, errUnknownSessionID) {
   230  		log.V(1).Info("Session already finished")
   231  		return
   232  	}
   233  
   234  	log.Error(err, "Error occurred while ending session.")
   235  }
   236  
   237  // Checks the existing subscription cache and creates a new subscription in a
   238  // goroutine if it is not present in the cache
   239  func (rcli *RemoteCLI) createSubscription(ctx context.Context, subscriptionID string, target Target) {
   240  	log := fog.FromContext(ctx)
   241  
   242  	if _, ok := rcli.subscriptionData[subscriptionID]; ok {
   243  		log.V(1).Info("Existing subscription found, reusing", "subscriptionID", subscriptionID)
   244  		return
   245  	}
   246  	log.Info("No existing subscription found, creating new subscription", "subscriptionID", subscriptionID)
   247  
   248  	filter := map[string]string{
   249  		"bannerId": target.BannerID(),
   250  		"storeId":  target.StoreID(),
   251  	}
   252  
   253  	// subscription ctx must be a child of the remotecli ctx, not start session
   254  	// ctx as a subscription may last longer than a particular session if multiple
   255  	// sessions reuse a subscription
   256  	subCtx, subCancelFunc := context.WithCancel(rcli.context)
   257  
   258  	subscriptionEnded := make(chan struct{})
   259  
   260  	subData := subscriptionData{
   261  		sessions:          set.Set[string]{},
   262  		cancelFunc:        subCancelFunc,
   263  		subscriptionEnded: subscriptionEnded,
   264  	}
   265  
   266  	go func() {
   267  		log := fog.FromContext(rcli.context, "subscriptionID", subscriptionID)
   268  		log.Info("Starting new subscription")
   269  		err := rcli.msgService.Subscribe(subCtx, subscriptionID, target.ProjectID(), rcli.handler(), filter)
   270  		if err != nil {
   271  			log.Error(err, "subscription unexpectedly closed")
   272  		} else {
   273  			// Should only occur when subCancelFunc is called from EndSession
   274  			log.V(1).Info("Subscription ended")
   275  		}
   276  
   277  		rcli.subscriptionCleanup(subscriptionID, subscriptionEnded)
   278  
   279  		// Ensure context is not leaked
   280  		subCancelFunc()
   281  	}()
   282  
   283  	rcli.subscriptionData[subscriptionID] = subData
   284  }
   285  
   286  // Called when a subscription has ended. Ensures all active sessions for subscription
   287  // are closed by closing display channel, removing sessions from cache and cleaning up
   288  // unused topics.
   289  // This is necessary to ensure correct cleanup when the subscription ends for a
   290  // reason other than an EndSession call, e.g. some error on the subscription.
   291  func (rcli *RemoteCLI) subscriptionCleanup(subscriptionID string, subscriptionCleaned <-chan struct{}) {
   292  	log := fog.FromContext(rcli.context, "subscriptionID", subscriptionID)
   293  	ctx := fog.IntoContext(rcli.context, log)
   294  
   295  	rcli.sessionLock.Lock()
   296  	defer rcli.sessionLock.Unlock()
   297  
   298  	select {
   299  	case <-subscriptionCleaned:
   300  		// This indicates the subscription has already been cleaned by EndSession
   301  		// and we can exit early
   302  		return
   303  	default:
   304  		// Subscription exited for some other reason, clean up necessary
   305  	}
   306  
   307  	subData, ok := rcli.subscriptionData[subscriptionID]
   308  	if !ok {
   309  		// This should no longer be the case now that we have the
   310  		// subscriptionCleaned channel, likely indicates a bug
   311  		log.Error(nil, "no subscription to cleanup")
   312  		return
   313  	}
   314  
   315  	for sessionID := range subData.sessions {
   316  		seshData, ok := rcli.sessionData[sessionID]
   317  		if !ok {
   318  			// Bad, bug
   319  			log.Error(nil, "session data not found but associated with subscription", "sessionID", sessionID)
   320  		}
   321  
   322  		close(seshData.displayChan)
   323  		err := rcli.cleanupTopics(ctx, sessionID, seshData)
   324  		if err != nil {
   325  			log.Error(err, "error cleaning up topics")
   326  		}
   327  
   328  		delete(rcli.sessionData, sessionID)
   329  	}
   330  
   331  	delete(rcli.subscriptionData, subscriptionID)
   332  }
   333  
   334  // Send a [msgdata.Request] to the configured message service for the target
   335  // node of the given session to receive. Blocks until the message has
   336  // successfully sent.
   337  func (rcli *RemoteCLI) Send(
   338  	ctx context.Context,
   339  	userID string,
   340  	sessionID string,
   341  	commandID string,
   342  	request msgdata.Request,
   343  	opts ...RCLIOption,
   344  ) error {
   345  	log := fog.FromContext(ctx, "userID", userID)
   346  	ctx = fog.IntoContext(ctx, log)
   347  
   348  	if commandID == "" {
   349  		return fmt.Errorf("command id not supplied")
   350  	}
   351  
   352  	// TODO modifying session DATA and topic cache for topic cleanup, this shouldn't
   353  	// be a read lock. Inefficient if using write lock for full method though
   354  	rcli.sessionLock.Lock()
   355  	defer rcli.sessionLock.Unlock()
   356  	log.Info("Retrieving session from cache")
   357  	sessionData, ok := rcli.sessionData[sessionID]
   358  	if !ok {
   359  		log.Info("Could not find target in map using input session id")
   360  		return ErrUnknownSession
   361  	}
   362  	// Inject target and session info into request message format
   363  	topicID := fillTemplate(sessionData.target, eaconst.DefaultTopTemplate, createOptionalConfig(opts))
   364  
   365  	addAttributes(request, sessionData.target, sessionID, userID, commandID)
   366  
   367  	// Because of the way we build topicID in Send using opts, rather than once
   368  	// during StartSession, it means a sessionID may be associated with multiple
   369  	// topics. Keep track of all topics posted to so can clean up in EndSession.
   370  	log.Info("Adding session to list of active subscriptions of topic", "topicID", topicID)
   371  	if _, ok := rcli.topicData[topicID]; !ok {
   372  		rcli.topicData[topicID] = topicData{sessions: set.Set[string]{}}
   373  	}
   374  	rcli.topicData[topicID].sessions.Add(sessionID)
   375  	sessionData.postedTopics.Add(topicID)
   376  
   377  	// Publish request to message service
   378  	log.Info("Sending command")
   379  	return rcli.msgService.Publish(ctx, topicID, sessionData.target.ProjectID(), request)
   380  }
   381  
   382  // Takes in a context and a session ID
   383  // Cleans up resources to do with a given session and stops receiving messages
   384  // for the session. Closes the display channel for the session.
   385  func (rcli *RemoteCLI) EndSession(ctx context.Context, sessionID string) error {
   386  	log := fog.FromContext(ctx, "sessionID", sessionID)
   387  	ctx = fog.IntoContext(ctx, log)
   388  
   389  	rcli.sessionLock.Lock()
   390  	defer rcli.sessionLock.Unlock()
   391  
   392  	log.Info("Retrieving session data")
   393  	seshData, ok := rcli.sessionData[sessionID]
   394  	if !ok {
   395  		// This may happen if called with an invalid session ID, or if a
   396  		// subscription unexpectedly closed before EndSession is called
   397  		return errUnknownSessionID
   398  	}
   399  
   400  	log.Info("Retrieving subscription data", "subscriptionID", seshData.subscriptionID)
   401  	subData, ok := rcli.subscriptionData[seshData.subscriptionID]
   402  	if !ok {
   403  		// This should never happen
   404  		log.Info("Subscription not found for given session", "subscriptionID", seshData.subscriptionID)
   405  		return fmt.Errorf("unknown subscription for session")
   406  	}
   407  
   408  	if ok := subData.sessions.HasMember(sessionID); !ok {
   409  		// This should never happen
   410  		return fmt.Errorf("inactive subscription for session")
   411  	}
   412  
   413  	log.Info("Removing session")
   414  	subData.sessions.Remove(sessionID)
   415  	if len(subData.sessions) == 0 {
   416  		// cleanup subscription
   417  		log.V(1).Info("No active sessions for subscription, cleaning up subscription", "subscriptionID", seshData.subscriptionID)
   418  		subData.cancelFunc()
   419  
   420  		// indicate we have cleaned up the subscription and no further cleanup
   421  		// is necessary
   422  		close(subData.subscriptionEnded)
   423  
   424  		// Must delete the key as the presence of the key is used to check if a
   425  		// new subscription is needed in StartSession
   426  		delete(rcli.subscriptionData, seshData.subscriptionID)
   427  	}
   428  
   429  	if err := rcli.cleanupTopics(ctx, sessionID, seshData); err != nil {
   430  		return err
   431  	}
   432  
   433  	close(seshData.displayChan)
   434  	delete(rcli.sessionData, sessionID)
   435  	log.Info("Session stopped")
   436  
   437  	return nil
   438  }
   439  
   440  // Used to cleanup all topics that a given session has posted to and close any
   441  // topics that no longer have any associated sessions
   442  // Callers are responsible for the thread safety of this function
   443  func (rcli *RemoteCLI) cleanupTopics(ctx context.Context, sessionID string, seshData sessionData) error {
   444  	// Cannot lock in here as expected to be called from a function that already
   445  	// has the lock
   446  	log := fog.FromContext(ctx)
   447  
   448  	// message Service StopPublish should be called whenever no further messages
   449  	// are expected on the topic, check the cache and close topic if no other
   450  	// active sessions have posted to the topic
   451  	for topicID := range seshData.postedTopics {
   452  		log := log.WithValues("topicID", topicID)
   453  		topData, ok := rcli.topicData[topicID]
   454  		if !ok {
   455  			// This should not happen as topic and session are added to the
   456  			// caches at the same time in Send
   457  			log.Info("Topic not found for given session")
   458  			return fmt.Errorf("unknown topic for session")
   459  		}
   460  
   461  		topData.sessions.Remove(sessionID)
   462  		if len(topData.sessions) == 0 {
   463  			// cleanup message service topic
   464  			rcli.msgService.StopPublish(topicID, seshData.target.ProjectID())
   465  		}
   466  
   467  		seshData.postedTopics.Remove(topicID)
   468  	}
   469  
   470  	return nil
   471  }
   472  
   473  // handler returns a messageservice handler function which can be used to direct
   474  // incoming messages on a given subscription to the appropriate display channel
   475  func (rcli *RemoteCLI) handler() func(context.Context, msgdata.CommandResponse) {
   476  	return func(ctx context.Context, msg msgdata.CommandResponse) {
   477  		rcli.sessionLock.RLock()
   478  		defer rcli.sessionLock.RUnlock()
   479  
   480  		sessionID := msg.Attributes().SessionID
   481  		seshData, ok := rcli.sessionData[sessionID]
   482  		if !ok {
   483  			// TODO messages will be lost, fix
   484  			// May happen if e.g. a long running command is sent and the
   485  			// emulator disconnects.
   486  			// Must return here or the select statement will block and cause a
   487  			// deadlock due to the lock, stopping End and Start Session from
   488  			// proceeding.
   489  			// However returning ack's the message, which means any message will
   490  			// be lost and reconnecting to an existing session will not pick up
   491  			// messages sent in the meantime
   492  			return
   493  		}
   494  
   495  		select {
   496  		case <-ctx.Done():
   497  			return
   498  		case seshData.displayChan <- msg:
   499  		}
   500  	}
   501  }
   502  
   503  func createOptionalConfig(opts []RCLIOption) *templateConfig {
   504  	config := templateConfig{}
   505  	for _, opt := range opts {
   506  		opt(&config)
   507  	}
   508  	if config == (templateConfig{}) {
   509  		return nil
   510  	}
   511  	return &config
   512  }
   513  
   514  func fillTemplate(target Target, defaultTemplate string, config *templateConfig) (result string) {
   515  	result = defaultTemplate
   516  	if config != nil {
   517  		if config.template != nil {
   518  			result = *config.template
   519  		}
   520  		// Potential other checks in future
   521  	}
   522  
   523  	result = strings.Replace(result, "<PROJECT_ID>", target.ProjectID(), -1)
   524  	result = strings.Replace(result, "<BANNER_ID>", target.BannerID(), -1)
   525  	result = strings.Replace(result, "<STORE_ID>", target.StoreID(), -1)
   526  	result = strings.Replace(result, "<TERMINAL_ID>", target.TerminalID(), -1)
   527  	return result
   528  }
   529  
   530  func addAttributes(request msgdata.Request, t Target, sessionID string, userID string, commandID string) {
   531  	request.AddAttribute(eaconst.BannerIDKey, t.BannerID())
   532  	request.AddAttribute(eaconst.StoreIDKey, t.StoreID())
   533  	request.AddAttribute(eaconst.TerminalIDKey, t.TerminalID())
   534  	request.AddAttribute(eaconst.SessionIDKey, sessionID)
   535  	request.AddAttribute(eaconst.IdentityKey, userID)
   536  	request.AddAttribute(eaconst.CommandIDKey, commandID)
   537  }
   538  

View as plain text