package msgsvc import ( "context" "fmt" "sync" "time" "github.com/go-logr/logr" "edge-infra.dev/pkg/lib/fog" "edge-infra.dev/pkg/sds/emergencyaccess/eaconst" "edge-infra.dev/pkg/sds/emergencyaccess/msgdata" ) type topicEntry struct { topicID string projectID string } // MessageService allows sending and receiving messages to GCP topics in various // projects // MessageService should be created using NewMessageService type MessageService struct { ps clientItfc topicCache map[topicEntry]topicItfc logger logr.Logger topicMu sync.Mutex } // Creates a new MessageService struct with the default credentials. // Must be initialised with a valid logr.Logger func NewMessageService(ctx context.Context) (*MessageService, error) { // client will be used cross-project, so no need to specify project here. // Must specify project each time client is used, e.g SubscriptionInProject projectID := "ts" client, err := newClient(ctx, projectID) if err != nil { return nil, err } logr := fog.FromContext(ctx) msgSvc := MessageService{ ps: client, topicCache: make(map[topicEntry]topicItfc), logger: logr.WithName("msgsvc"), topicMu: sync.Mutex{}, } return &msgSvc, nil } // Create a subscription with the minimum retention duration and expiration policy. // filters all messages by the given sessionID func (ms *MessageService) CreateSubscription(ctx context.Context, sessionID string, subscriptionID string, projectID string, topicID string) error { cfg := subscriptionCfg{ topicName: topicID, projectID: projectID, retentionDuration: 1 * time.Hour, expirationPolicy: 24 * time.Hour, filter: fmt.Sprintf(`attributes.sessionId="%s"`, sessionID), } _, err := ms.ps.CreateSubscription(ctx, subscriptionID, cfg) if err != nil { return fmt.Errorf("error creating subscription: %w", err) } return nil } func (ms *MessageService) DeleteSubscription(ctx context.Context, subscriptionID string, projectID string) error { sub := ms.ps.SubscriptionInProject(subscriptionID, projectID) return sub.Delete(ctx) } func createCommandResponse(msg messageItfc) (msgdata.CommandResponse, error) { return msgdata.NewCommandResponse(msg.Data(), msg.Attributes()) } // Starts receiving messages from a given subscription in GCP project projectID. // The optional filter argument allows discarding messages if the attributes of // a message does not contain all of the keys and values provided in filter // handler is called in a new goroutine with each new message received // Subscribe blocks until an error is encountered or ctx is done func (ms *MessageService) Subscribe(ctx context.Context, subscriptionID string, projectID string, handler func(ctx context.Context, msg msgdata.CommandResponse), filter map[string]string) error { return ms.ps.SubscriptionInProject(subscriptionID, projectID).Receive(ctx, func(ctx context.Context, msg messageItfc) { log := fog.FromContext(ctx) if !isFilterMatch(msg.Attributes(), filter) { msg.Nack() return } cResp, err := createCommandResponse(msg) if err != nil { log.Error(err, "Error parsing message", "messageID", msg.ID()) msg.Nack() return } log = log.WithValues("sessionID", cResp.Attributes().SessionID, "requestMessageID", cResp.Attributes().ReqMsgID, "messageID", msg.ID()) ctx = fog.IntoContext(ctx, log) handler(ctx, cResp) log.Info("Acknowledging message", "messageID", msg.ID()) msg.Ack() }) } func isFilterMatch(attr, filter map[string]string) bool { for k, v := range filter { attrVal, ok := attr[k] if attrVal != v || !ok { return false } } return true } func createPubsubMessage(msg msgdata.Request) (messageItfc, error) { data, err := msg.Data() if err != nil { return nil, err } resp := newMessage(data, msg.Attributes()) // TODO unnecessary. newMessage is defined within pubsub.go, so directly depends on pubsub. The message to be published should only require attributes and data fields, it doesn't need to be a full messageInt return resp, nil } // Publish sends a message to a given topic in projectID // Publish blocks until the message has sent successfully or ctx is done // Publish sets up background goroutines for each new topic published to. Once // a given topic is unlikely to be published to again StopPublish should be called func (ms *MessageService) Publish(ctx context.Context, topic string, projectID string, message msgdata.Request) error { log := fog.FromContext(ctx, "commandID", message.Attributes()[eaconst.CommandIDKey], "sessionID", message.Attributes()[eaconst.SessionIDKey]) ctx = fog.IntoContext(ctx, log) log.Info("Received message to publish") msg, err := createPubsubMessage(message) if err != nil { return err } // Removing session wide ordering of messages to allow for async nature of remote cli to take priority msg.SetOrderingKey(message.Attributes()[eaconst.CommandIDKey]) top := getTopic(log, &ms.topicMu, topic, projectID, ms.topicCache, ms.ps) log.Info("Publishing message", "topicID", top.ID()) res := top.Publish(ctx, msg) _, err = res.Get(ctx) return err } func getTopic(log logr.Logger, lock *sync.Mutex, topic string, projectID string, topicCache map[topicEntry]topicItfc, client topicInProjecter) topicItfc { cacheEntry := topicEntry{ topicID: topic, projectID: projectID, } lock.Lock() top, ok := topicCache[cacheEntry] if !ok { log.Info("Topic not in cache, creating new topic", "topic", topic, "projectID", projectID) top = client.TopicInProject(topic, projectID) top.SetOrdering(true) } else { log.Info("Topic discovered in cache, reusing.", "topic", topic, "projectID", projectID) } topicCache[cacheEntry] = top lock.Unlock() return top } // StopPublish cleans up background goroutines created when publishing to a new // topic. Should be called once there are no more messages expected on a given // topic. func (ms *MessageService) StopPublish(topic string, projectID string) { entry := topicEntry{ topicID: topic, projectID: projectID, } ms.topicMu.Lock() top, ok := ms.topicCache[entry] if !ok { ms.logger.Info("Topic not in cache, nothing to do.", "topic", topic, "projectID", projectID) ms.topicMu.Unlock() return } ms.logger.Info("Topic discovered. Removing from cache and stopping topic", "topic", topic, "projectID", projectID) delete(ms.topicCache, entry) ms.topicMu.Unlock() top.Stop() }