...

Source file src/edge-infra.dev/pkg/sds/emergencyaccess/msgsvc/msgsvc.go

Documentation: edge-infra.dev/pkg/sds/emergencyaccess/msgsvc

     1  package msgsvc
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"sync"
     7  	"time"
     8  
     9  	"github.com/go-logr/logr"
    10  
    11  	"edge-infra.dev/pkg/lib/fog"
    12  	"edge-infra.dev/pkg/sds/emergencyaccess/eaconst"
    13  	"edge-infra.dev/pkg/sds/emergencyaccess/msgdata"
    14  )
    15  
    16  type topicEntry struct {
    17  	topicID   string
    18  	projectID string
    19  }
    20  
    21  // MessageService allows sending and receiving messages to GCP topics in various
    22  // projects
    23  // MessageService should be created using NewMessageService
    24  type MessageService struct {
    25  	ps         clientItfc
    26  	topicCache map[topicEntry]topicItfc
    27  	logger     logr.Logger
    28  
    29  	topicMu sync.Mutex
    30  }
    31  
    32  // Creates a new MessageService struct with the default credentials.
    33  // Must be initialised with a valid logr.Logger
    34  func NewMessageService(ctx context.Context) (*MessageService, error) {
    35  	// client will be used cross-project, so no need to specify project here.
    36  	// Must specify project each time client is used, e.g SubscriptionInProject
    37  	projectID := "ts"
    38  	client, err := newClient(ctx, projectID)
    39  	if err != nil {
    40  		return nil, err
    41  	}
    42  	logr := fog.FromContext(ctx)
    43  	msgSvc := MessageService{
    44  		ps:         client,
    45  		topicCache: make(map[topicEntry]topicItfc),
    46  		logger:     logr.WithName("msgsvc"),
    47  		topicMu:    sync.Mutex{},
    48  	}
    49  
    50  	return &msgSvc, nil
    51  }
    52  
    53  // Create a subscription with the minimum retention duration and expiration policy.
    54  // filters all messages by the given sessionID
    55  func (ms *MessageService) CreateSubscription(ctx context.Context, sessionID string, subscriptionID string, projectID string, topicID string) error {
    56  	cfg := subscriptionCfg{
    57  		topicName: topicID,
    58  		projectID: projectID,
    59  
    60  		retentionDuration: 1 * time.Hour,
    61  		expirationPolicy:  24 * time.Hour,
    62  
    63  		filter: fmt.Sprintf(`attributes.sessionId="%s"`, sessionID),
    64  	}
    65  
    66  	_, err := ms.ps.CreateSubscription(ctx, subscriptionID, cfg)
    67  	if err != nil {
    68  		return fmt.Errorf("error creating subscription: %w", err)
    69  	}
    70  	return nil
    71  }
    72  
    73  func (ms *MessageService) DeleteSubscription(ctx context.Context, subscriptionID string, projectID string) error {
    74  	sub := ms.ps.SubscriptionInProject(subscriptionID, projectID)
    75  	return sub.Delete(ctx)
    76  }
    77  
    78  func createCommandResponse(msg messageItfc) (msgdata.CommandResponse, error) {
    79  	return msgdata.NewCommandResponse(msg.Data(), msg.Attributes())
    80  }
    81  
    82  // Starts receiving messages from a given subscription in GCP project projectID.
    83  // The optional filter argument allows discarding messages if the attributes of
    84  // a message does not contain all of the keys and values provided in filter
    85  // handler is called in a new goroutine with each new message received
    86  // Subscribe blocks until an error is encountered or ctx is done
    87  func (ms *MessageService) Subscribe(ctx context.Context, subscriptionID string, projectID string, handler func(ctx context.Context, msg msgdata.CommandResponse), filter map[string]string) error {
    88  	return ms.ps.SubscriptionInProject(subscriptionID, projectID).Receive(ctx, func(ctx context.Context, msg messageItfc) {
    89  		log := fog.FromContext(ctx)
    90  		if !isFilterMatch(msg.Attributes(), filter) {
    91  			msg.Nack()
    92  			return
    93  		}
    94  
    95  		cResp, err := createCommandResponse(msg)
    96  		if err != nil {
    97  			log.Error(err, "Error parsing message", "messageID", msg.ID())
    98  			msg.Nack()
    99  			return
   100  		}
   101  
   102  		log = log.WithValues("sessionID", cResp.Attributes().SessionID, "requestMessageID", cResp.Attributes().ReqMsgID, "messageID", msg.ID())
   103  		ctx = fog.IntoContext(ctx, log)
   104  		handler(ctx, cResp)
   105  		log.Info("Acknowledging message", "messageID", msg.ID())
   106  		msg.Ack()
   107  	})
   108  }
   109  
   110  func isFilterMatch(attr, filter map[string]string) bool {
   111  	for k, v := range filter {
   112  		attrVal, ok := attr[k]
   113  		if attrVal != v || !ok {
   114  			return false
   115  		}
   116  	}
   117  	return true
   118  }
   119  
   120  func createPubsubMessage(msg msgdata.Request) (messageItfc, error) {
   121  	data, err := msg.Data()
   122  	if err != nil {
   123  		return nil, err
   124  	}
   125  	resp := newMessage(data, msg.Attributes()) // TODO unnecessary. newMessage is defined within pubsub.go, so directly depends on pubsub. The message to be published should only require attributes and data fields, it doesn't need to be a full messageInt
   126  	return resp, nil
   127  }
   128  
   129  // Publish sends a message to a given topic in projectID
   130  // Publish blocks until the message has sent successfully or ctx is done
   131  // Publish sets up background goroutines for each new topic published to. Once
   132  // a given topic is unlikely to be published to again StopPublish should be called
   133  func (ms *MessageService) Publish(ctx context.Context, topic string, projectID string, message msgdata.Request) error {
   134  	log := fog.FromContext(ctx, "commandID", message.Attributes()[eaconst.CommandIDKey], "sessionID", message.Attributes()[eaconst.SessionIDKey])
   135  	ctx = fog.IntoContext(ctx, log)
   136  	log.Info("Received message to publish")
   137  	msg, err := createPubsubMessage(message)
   138  	if err != nil {
   139  		return err
   140  	}
   141  	// Removing session wide ordering of messages to allow for async nature of remote cli to take priority
   142  	msg.SetOrderingKey(message.Attributes()[eaconst.CommandIDKey])
   143  
   144  	top := getTopic(log, &ms.topicMu, topic, projectID, ms.topicCache, ms.ps)
   145  
   146  	log.Info("Publishing message", "topicID", top.ID())
   147  	res := top.Publish(ctx, msg)
   148  	_, err = res.Get(ctx)
   149  
   150  	return err
   151  }
   152  
   153  func getTopic(log logr.Logger, lock *sync.Mutex, topic string, projectID string, topicCache map[topicEntry]topicItfc, client topicInProjecter) topicItfc {
   154  	cacheEntry := topicEntry{
   155  		topicID:   topic,
   156  		projectID: projectID,
   157  	}
   158  
   159  	lock.Lock()
   160  
   161  	top, ok := topicCache[cacheEntry]
   162  
   163  	if !ok {
   164  		log.Info("Topic not in cache, creating new topic", "topic", topic, "projectID", projectID)
   165  		top = client.TopicInProject(topic, projectID)
   166  		top.SetOrdering(true)
   167  	} else {
   168  		log.Info("Topic discovered in cache, reusing.", "topic", topic, "projectID", projectID)
   169  	}
   170  
   171  	topicCache[cacheEntry] = top
   172  
   173  	lock.Unlock()
   174  
   175  	return top
   176  }
   177  
   178  // StopPublish cleans up background goroutines created when publishing to a new
   179  // topic. Should be called once there are no more messages expected on a given
   180  // topic.
   181  func (ms *MessageService) StopPublish(topic string, projectID string) {
   182  	entry := topicEntry{
   183  		topicID:   topic,
   184  		projectID: projectID,
   185  	}
   186  
   187  	ms.topicMu.Lock()
   188  	top, ok := ms.topicCache[entry]
   189  
   190  	if !ok {
   191  		ms.logger.Info("Topic not in cache, nothing to do.", "topic", topic, "projectID", projectID)
   192  		ms.topicMu.Unlock()
   193  		return
   194  	}
   195  
   196  	ms.logger.Info("Topic discovered. Removing from cache and stopping topic", "topic", topic, "projectID", projectID)
   197  	delete(ms.topicCache, entry)
   198  
   199  	ms.topicMu.Unlock()
   200  
   201  	top.Stop()
   202  }
   203  

View as plain text