package services import ( "context" "fmt" "google.golang.org/api/option" "cloud.google.com/go/pubsub" "edge-infra.dev/pkg/edge/api/apierror" edgeAgentClientApi "edge-infra.dev/pkg/edge/edgeagent/model" "edge-infra.dev/pkg/lib/mqtt" ) type EdgeAgentService interface { InvokeEdgeAgentPubsub(ctx context.Context, notificationMessage *edgeAgentClientApi.NotificationMessage, projectID string) error } type edgeAgentService struct { PubsubTopic string Opts []option.ClientOption } func (e *edgeAgentService) InvokeEdgeAgentPubsub(ctx context.Context, notificationMessage *edgeAgentClientApi.NotificationMessage, projectID string) error { if notificationMessage.Actor == "" { notificationMessage.Actor = ComponentOwner } // create pubsub client for the banner project the notification message will be published to pubSubClient, err := pubsub.NewClient(ctx, projectID) if err != nil { return apierror.New(fmt.Sprintf("error creating pubsub client for notifications: %s", err.Error())).SetOperationID(ctx) } // defer closing pubsub client once message has been published defer pubSubClient.Close() ipsnotifier, err := mqtt.NewGooglePubSubTopicWrapper(ctx, pubSubClient, projectID, e.PubsubTopic) if err != nil { return apierror.New(fmt.Sprintf("error fetching pubsub topic for notifications: %s", err.Error())).SetOperationID(ctx) } // defer cleanup of topic goroutines and resources once message has been published defer ipsnotifier.Stop() return ipsnotifier.Publish(ctx, notificationMessage) } func NewEdgeAgentService(pubsubTopic string, opts ...option.ClientOption) *edgeAgentService { //nolint return &edgeAgentService{ PubsubTopic: pubsubTopic, Opts: opts, } }