package mqtt import ( "context" "fmt" "cloud.google.com/go/pubsub" ) // ResponseTopicWrapper is a Publisher that is used to send chariot pubsub responses back to the appropriate clients. type GCPPubSubTopicWrapper struct { Topic *pubsub.Topic } // NewGooglePubSubResponseTopicPublisher creates and wraps a pubsub.Topic for the given topicID. func NewGooglePubSubTopicWrapper(ctx context.Context, client *pubsub.Client, projectID, topicID string) (*GCPPubSubTopicWrapper, error) { topic := client.TopicInProject(topicID, projectID) if exists, err := topic.Exists(ctx); err != nil { return nil, fmt.Errorf("error checking if topic %q exists: %w", topicID, err) } else if !exists { return nil, fmt.Errorf("topic %q does not exist", topicID) } return &GCPPubSubTopicWrapper{Topic: topic}, nil } // Publish satisfies the Publisher interface. func (w *GCPPubSubTopicWrapper) Publish(ctx context.Context, m Payload) error { _, err := w.Topic.Publish(ctx, &pubsub.Message{ Data: m.Data(), Attributes: m.Attributes(), }).Get(ctx) return err } // Stop ensures that the topic's resources are cleaned up func (w *GCPPubSubTopicWrapper) Stop() { if w.Topic != nil { w.Topic.Stop() } }