1 package pubsub
2
3 import (
4 "context"
5 "fmt"
6 "sync"
7 "time"
8
9 "cloud.google.com/go/pubsub"
10 "google.golang.org/api/option"
11 )
12
13 const schema = "projects/%s/schemas/%s"
14
15
16 type Manager struct {
17 Client *pubsub.Client
18 ProjectID string
19 topicMap *sync.Map
20 }
21
22
23
24
25 func New(ctx context.Context, projectID string) (Manager, error) {
26 client, err := pubsub.NewClient(ctx, projectID)
27 if err != nil {
28 return Manager{}, err
29 }
30
31 return Manager{
32 Client: client,
33 ProjectID: projectID,
34 topicMap: &sync.Map{},
35 }, nil
36 }
37
38
39 func NewWithOptions(ctx context.Context, projectID string, opts ...option.ClientOption) (Manager, error) {
40 client, err := pubsub.NewClient(ctx, projectID, opts...)
41 if err != nil {
42 return Manager{}, err
43 }
44
45 return Manager{
46 Client: client,
47 ProjectID: projectID,
48 topicMap: &sync.Map{},
49 }, nil
50 }
51
52
53 func (p *Manager) CreateSchema(schemaID string) string {
54 return fmt.Sprintf(schema, p.ProjectID, schemaID)
55 }
56
57
58 func (p Manager) CreateTopic(ctx context.Context, topicID string) (*pubsub.Topic, error) {
59 return p.Client.CreateTopic(ctx, topicID)
60 }
61
62
63
64 func (p Manager) DeleteTopic(ctx context.Context, topicID string) error {
65 topic := p.Client.Topic(topicID)
66 return topic.Delete(ctx)
67 }
68
69
70
71 func (p Manager) Send(ctx context.Context, topicID string, message []byte, attributes map[string]string) error {
72 t, ok := p.topicMap.Load(topicID)
73
74 if !ok {
75 newTopic := p.Client.Topic(topicID)
76 t, _ = p.topicMap.LoadOrStore(topicID, newTopic)
77 }
78 topic := t.(*pubsub.Topic)
79 result := topic.Publish(ctx, &pubsub.Message{
80 Data: message,
81 Attributes: attributes,
82 })
83 _, err := result.Get(ctx)
84 return err
85 }
86
87
88
89 func (p Manager) CreateSubscription(ctx context.Context, topicID, subscriptionID string, ackDeadline time.Duration, filter string) (*pubsub.Subscription, error) {
90 topic := p.Client.Topic(topicID)
91 if ackDeadline == 0 {
92 ackDeadline = 20 * time.Second
93 }
94 return p.Client.CreateSubscription(ctx, subscriptionID, pubsub.SubscriptionConfig{
95 Topic: topic,
96 AckDeadline: ackDeadline,
97 Filter: filter,
98 })
99 }
100
View as plain text