...
1 package pubsub
2
3 import (
4 "context"
5 "sync"
6
7 "cloud.google.com/go/pubsub"
8 "github.com/go-logr/logr"
9
10 "edge-infra.dev/pkg/lib/logging"
11 )
12
13 const (
14 DefaultTopic = "changelog-topic"
15 DefaultSubscription = "changelog-subscription"
16 DefaultProject = "1037378570607"
17 )
18
19 type PubSub struct {
20 ctx context.Context
21 client *pubsub.Client
22 log logr.Logger
23 topic string
24 sub string
25 project string
26 }
27
28 type Opts func(*PubSub)
29
30
31 func New(opts ...Opts) (*PubSub, error) {
32 ps := &PubSub{
33 ctx: context.Background(),
34 sub: DefaultSubscription,
35 topic: DefaultTopic,
36 project: DefaultProject,
37 log: logging.NewLogger().WithName("pubsub"),
38 }
39
40 for _, opt := range opts {
41 opt(ps)
42 }
43
44 if ps.client == nil {
45 c, err := pubsub.NewClient(ps.ctx, ps.project)
46 if err != nil {
47 ps.log.Info("error creating client", "err", err)
48 }
49 ps.client = c
50 }
51
52 return ps, nil
53 }
54
55 func (ps *PubSub) Publish(PR []byte) {
56 result := ps.client.Topic(ps.topic).Publish(ps.ctx, &pubsub.Message{
57 Data: PR,
58 })
59
60 var wg sync.WaitGroup
61
62 wg.Add(1)
63 go func(res *pubsub.PublishResult) {
64 defer wg.Done()
65
66 id, err := res.Get(ps.ctx)
67 if err != nil {
68 ps.log.Info("Failed to publish", "err", err)
69 return
70 }
71 ps.log.Info("message was published", "ID", id)
72 }(result)
73
74 wg.Wait()
75 }
76
77 func (ps *PubSub) Subscribe() *pubsub.Subscription {
78 return ps.client.Subscription(ps.sub)
79 }
80
81 func WithTopic(topic string) Opts {
82 return func(p *PubSub) {
83 p.topic = topic
84 }
85 }
86
87 func WithSubscription(sub string) Opts {
88 return func(p *PubSub) {
89 p.sub = sub
90 }
91 }
92
93 func WithProject(project string) Opts {
94 return func(p *PubSub) {
95 p.project = project
96 }
97 }
98
99 func WithContext(ctx context.Context) Opts {
100 return func(p *PubSub) {
101 p.ctx = ctx
102 }
103 }
104
View as plain text