...
1 package msgsvc
2
3 import (
4 "context"
5 "fmt"
6
7 "cloud.google.com/go/pubsub"
8 )
9
10
11 type publishResult struct {
12 r *pubsub.PublishResult
13 }
14
15 func (r publishResult) Get(ctx context.Context) (serverID string, err error) {
16 return r.r.Get(ctx)
17 }
18
19
20 type topic struct {
21 *pubsub.Topic
22 }
23
24 func (t topic) Publish(ctx context.Context, msg messageItfc) publishResultItfc {
25 pubsubMsg := &pubsub.Message{
26 Data: msg.Data(),
27 Attributes: msg.Attributes(),
28 OrderingKey: msg.OrderingKey(),
29 }
30 return publishResult{t.Topic.Publish(ctx, pubsubMsg)}
31 }
32
33 func (t topic) SetOrdering(val bool) {
34 t.Topic.EnableMessageOrdering = val
35 }
36
37
38 type subscription struct {
39 *pubsub.Subscription
40 }
41
42 func (s subscription) Receive(ctx context.Context, f func(ctx context.Context, msg messageItfc)) error {
43 handler := func(pubsubCtx context.Context, pubsubMsg *pubsub.Message) {
44 myMessage := message{pubsubMsg}
45 f(pubsubCtx, myMessage)
46 }
47 return s.Subscription.Receive(ctx, handler)
48 }
49
50
51 func newClient(ctx context.Context, projectID string) (*client, error) {
52 clnt, err := pubsub.NewClient(ctx, projectID)
53 if err != nil {
54 return nil, err
55 }
56 return &client{clnt}, nil
57 }
58
59 type client struct {
60 m *pubsub.Client
61 }
62
63 func (c client) SubscriptionInProject(id string, projectID string) subscriptionItfc {
64 return subscription{c.m.SubscriptionInProject(id, projectID)}
65 }
66
67 func (c client) TopicInProject(id string, projectID string) topicItfc {
68 return topic{c.m.TopicInProject(id, projectID)}
69 }
70
71 func (c client) CreateSubscription(ctx context.Context, subscriptionID string, cfg subscriptionCfg) (subscriptionItfc, error) {
72
73
74
75
76 cl, err := pubsub.NewClient(ctx, cfg.projectID)
77 if err != nil {
78 return nil, fmt.Errorf("error creating new client: %w", err)
79 }
80 top := c.m.TopicInProject(cfg.topicName, cfg.projectID)
81 sub, err := cl.CreateSubscription(ctx, subscriptionID, pubsub.SubscriptionConfig{
82 Topic: top,
83 RetentionDuration: cfg.retentionDuration,
84 ExpirationPolicy: cfg.expirationPolicy,
85 EnableMessageOrdering: true,
86 Filter: cfg.filter,
87 })
88 return subscription{sub}, err
89 }
90
91 func newMessage(data []byte, attributes map[string]string) messageItfc {
92 return message{&pubsub.Message{
93 Data: data,
94 Attributes: attributes,
95 }}
96 }
97
98 type message struct {
99 *pubsub.Message
100 }
101
102 func (m message) ID() string {
103 return m.Message.ID
104 }
105
106 func (m message) Data() []byte {
107 return m.Message.Data
108 }
109
110 func (m message) Attributes() map[string]string {
111 return m.Message.Attributes
112 }
113
114 func (m message) OrderingKey() string {
115 return m.Message.OrderingKey
116 }
117
118 func (m message) SetOrderingKey(orderingKey string) {
119 m.Message.OrderingKey = orderingKey
120 }
121
View as plain text