...

Source file src/edge-infra.dev/pkg/edge/api/services/edge_agent_service.go

Documentation: edge-infra.dev/pkg/edge/api/services

     1  package services
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  
     7  	"google.golang.org/api/option"
     8  
     9  	"cloud.google.com/go/pubsub"
    10  
    11  	"edge-infra.dev/pkg/edge/api/apierror"
    12  	edgeAgentClientApi "edge-infra.dev/pkg/edge/edgeagent/model"
    13  	"edge-infra.dev/pkg/lib/mqtt"
    14  )
    15  
    16  type EdgeAgentService interface {
    17  	InvokeEdgeAgentPubsub(ctx context.Context, notificationMessage *edgeAgentClientApi.NotificationMessage, projectID string) error
    18  }
    19  
    20  type edgeAgentService struct {
    21  	PubsubTopic string
    22  	Opts        []option.ClientOption
    23  }
    24  
    25  func (e *edgeAgentService) InvokeEdgeAgentPubsub(ctx context.Context, notificationMessage *edgeAgentClientApi.NotificationMessage, projectID string) error {
    26  	if notificationMessage.Actor == "" {
    27  		notificationMessage.Actor = ComponentOwner
    28  	}
    29  	// create pubsub client for the banner project the notification message will be published to
    30  	pubSubClient, err := pubsub.NewClient(ctx, projectID)
    31  	if err != nil {
    32  		return apierror.New(fmt.Sprintf("error creating pubsub client for notifications: %s", err.Error())).SetOperationID(ctx)
    33  	}
    34  	// defer closing pubsub client once message has been published
    35  	defer pubSubClient.Close()
    36  
    37  	ipsnotifier, err := mqtt.NewGooglePubSubTopicWrapper(ctx, pubSubClient, projectID, e.PubsubTopic)
    38  	if err != nil {
    39  		return apierror.New(fmt.Sprintf("error fetching pubsub topic for notifications: %s", err.Error())).SetOperationID(ctx)
    40  	}
    41  	// defer cleanup of topic goroutines and resources once message has been published
    42  	defer ipsnotifier.Stop()
    43  
    44  	return ipsnotifier.Publish(ctx, notificationMessage)
    45  }
    46  
    47  func NewEdgeAgentService(pubsubTopic string, opts ...option.ClientOption) *edgeAgentService { //nolint
    48  	return &edgeAgentService{
    49  		PubsubTopic: pubsubTopic,
    50  		Opts:        opts,
    51  	}
    52  }
    53  

View as plain text