...
1 package services
2
3 import (
4 "context"
5 "fmt"
6
7 "google.golang.org/api/option"
8
9 "cloud.google.com/go/pubsub"
10
11 "edge-infra.dev/pkg/edge/api/apierror"
12 edgeAgentClientApi "edge-infra.dev/pkg/edge/edgeagent/model"
13 "edge-infra.dev/pkg/lib/mqtt"
14 )
15
16 type EdgeAgentService interface {
17 InvokeEdgeAgentPubsub(ctx context.Context, notificationMessage *edgeAgentClientApi.NotificationMessage, projectID string) error
18 }
19
20 type edgeAgentService struct {
21 PubsubTopic string
22 Opts []option.ClientOption
23 }
24
25 func (e *edgeAgentService) InvokeEdgeAgentPubsub(ctx context.Context, notificationMessage *edgeAgentClientApi.NotificationMessage, projectID string) error {
26 if notificationMessage.Actor == "" {
27 notificationMessage.Actor = ComponentOwner
28 }
29
30 pubSubClient, err := pubsub.NewClient(ctx, projectID)
31 if err != nil {
32 return apierror.New(fmt.Sprintf("error creating pubsub client for notifications: %s", err.Error())).SetOperationID(ctx)
33 }
34
35 defer pubSubClient.Close()
36
37 ipsnotifier, err := mqtt.NewGooglePubSubTopicWrapper(ctx, pubSubClient, projectID, e.PubsubTopic)
38 if err != nil {
39 return apierror.New(fmt.Sprintf("error fetching pubsub topic for notifications: %s", err.Error())).SetOperationID(ctx)
40 }
41
42 defer ipsnotifier.Stop()
43
44 return ipsnotifier.Publish(ctx, notificationMessage)
45 }
46
47 func NewEdgeAgentService(pubsubTopic string, opts ...option.ClientOption) *edgeAgentService {
48 return &edgeAgentService{
49 PubsubTopic: pubsubTopic,
50 Opts: opts,
51 }
52 }
53
View as plain text