...
1 package mqtt
2
3 import (
4 "context"
5 "fmt"
6
7 "cloud.google.com/go/pubsub"
8 )
9
10
11 type GCPPubSubTopicWrapper struct {
12 Topic *pubsub.Topic
13 }
14
15
16 func NewGooglePubSubTopicWrapper(ctx context.Context, client *pubsub.Client, projectID, topicID string) (*GCPPubSubTopicWrapper, error) {
17 topic := client.TopicInProject(topicID, projectID)
18 if exists, err := topic.Exists(ctx); err != nil {
19 return nil, fmt.Errorf("error checking if topic %q exists: %w", topicID, err)
20 } else if !exists {
21 return nil, fmt.Errorf("topic %q does not exist", topicID)
22 }
23 return &GCPPubSubTopicWrapper{Topic: topic}, nil
24 }
25
26
27 func (w *GCPPubSubTopicWrapper) Publish(ctx context.Context, m Payload) error {
28 _, err := w.Topic.Publish(ctx, &pubsub.Message{
29 Data: m.Data(),
30 Attributes: m.Attributes(),
31 }).Get(ctx)
32 return err
33 }
34
35
36 func (w *GCPPubSubTopicWrapper) Stop() {
37 if w.Topic != nil {
38 w.Topic.Stop()
39 }
40 }
41
View as plain text