...

Source file src/edge-infra.dev/pkg/lib/mqtt/pubsub.go

Documentation: edge-infra.dev/pkg/lib/mqtt

     1  package mqtt
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  
     7  	"cloud.google.com/go/pubsub"
     8  )
     9  
    10  // ResponseTopicWrapper is a Publisher that is used to send chariot pubsub responses back to the appropriate clients.
    11  type GCPPubSubTopicWrapper struct {
    12  	Topic *pubsub.Topic
    13  }
    14  
    15  // NewGooglePubSubResponseTopicPublisher creates and wraps a pubsub.Topic for the given topicID.
    16  func NewGooglePubSubTopicWrapper(ctx context.Context, client *pubsub.Client, projectID, topicID string) (*GCPPubSubTopicWrapper, error) {
    17  	topic := client.TopicInProject(topicID, projectID)
    18  	if exists, err := topic.Exists(ctx); err != nil {
    19  		return nil, fmt.Errorf("error checking if topic %q exists: %w", topicID, err)
    20  	} else if !exists {
    21  		return nil, fmt.Errorf("topic %q does not exist", topicID)
    22  	}
    23  	return &GCPPubSubTopicWrapper{Topic: topic}, nil
    24  }
    25  
    26  // Publish satisfies the Publisher interface.
    27  func (w *GCPPubSubTopicWrapper) Publish(ctx context.Context, m Payload) error {
    28  	_, err := w.Topic.Publish(ctx, &pubsub.Message{
    29  		Data:       m.Data(),
    30  		Attributes: m.Attributes(),
    31  	}).Get(ctx)
    32  	return err
    33  }
    34  
    35  // Stop ensures that the topic's resources are cleaned up
    36  func (w *GCPPubSubTopicWrapper) Stop() {
    37  	if w.Topic != nil {
    38  		w.Topic.Stop()
    39  	}
    40  }
    41  

View as plain text