...
1 package server
2
3 import (
4 "context"
5 "fmt"
6 "sync"
7 "sync/atomic"
8 "time"
9
10 "edge-infra.dev/pkg/lib/fog"
11
12 "cloud.google.com/go/pubsub"
13 "github.com/go-logr/logr"
14 )
15
16 type ReceiverHandler func(context.Context, *pubsub.Message) error
17
18 type Receiver struct {
19 projectID string
20 handler ReceiverHandler
21 subscription *pubsub.Subscription
22 started chan struct{}
23 log logr.Logger
24
25
26 pollSubscriptionExistsPeriod time.Duration
27
28
33 shutdown context.CancelFunc
34
35
42 healthMu sync.Mutex
43 health ReceiverHealthCheck
44 receiveStart time.Time
45 messagesAcked atomic.Int64
46 messagesNacked atomic.Int64
47 }
48
49 func NewReceiver(projectID string, subscription *pubsub.Subscription, handler ReceiverHandler, pollPeriod time.Duration) *Receiver {
50 return &Receiver{
51 projectID: projectID,
52 handler: handler,
53 subscription: subscription,
54 started: make(chan struct{}),
55 pollSubscriptionExistsPeriod: pollPeriod,
56 }
57 }
58
59
60 func (r *Receiver) Close() error {
61 select {
62 case <-r.started:
63 r.shutdown()
64 return nil
65 default:
66 return fmt.Errorf("close requires the receiver to be started")
67 }
68 }
69
70
71 func (r *Receiver) Start(ctx context.Context) {
72 r.log = fog.New().WithName("receiver").WithValues("project_id", r.projectID)
73 ctx, r.shutdown = context.WithCancel(ctx)
74 close(r.started)
75
76 for {
77 if !r.waitUntilExists(ctx) {
78 return
79 }
80
81 if err := r.receive(ctx); err != nil {
82
83 r.log.Error(err, "receiver quit unexpectedly")
84 }
85
86 select {
87 case <-ctx.Done():
88 return
89 default:
90 }
91 }
92 }
93 func (r *Receiver) receive(ctx context.Context) error {
94 r.updateHealthy()
95 defer r.updateHealthTotals()
96
97 ctx, cancel := r.contextWhileSubscriptionExists(ctx)
98 defer cancel()
99
100 return r.subscription.Receive(ctx, r.handle)
101 }
102
103 func (r *Receiver) handle(ctx context.Context, msg *pubsub.Message) {
104 if err := r.handler(ctx, msg); err != nil {
105 r.messagesNacked.Add(1)
106 msg.Nack()
107 return
108 }
109 r.messagesAcked.Add(1)
110 msg.Ack()
111 }
112
113
114
115 func (r *Receiver) contextWhileSubscriptionExists(ctx context.Context) (context.Context, context.CancelFunc) {
116 ctx, cancel := context.WithCancel(ctx)
117
118 var quietErrs bool
119
120
121 go func() {
122 defer cancel()
123 for {
124 select {
125 case <-ctx.Done():
126 return
127 case <-time.After(r.pollSubscriptionExistsPeriod):
128 }
129
130 exists, err := r.subscription.Exists(ctx)
131 if err != nil {
132 if !quietErrs {
133 r.log.Error(err, "failed to probe if subscription exists")
134
135
136 quietErrs = true
137 }
138 continue
139 }
140
141 quietErrs = false
142
143 if !exists {
144 r.log.Info("receiver subscription no longer exists")
145 return
146 }
147 }
148 }()
149
150 return ctx, cancel
151 }
152
153 func (r *Receiver) waitUntilExists(ctx context.Context) bool {
154
155 if exists, err := r.subscription.Exists(ctx); err != nil && exists {
156 return true
157 }
158
159 var quietInfo, quietErrs bool
160
161 for {
162 select {
163 case <-time.After(r.pollSubscriptionExistsPeriod):
164 case <-ctx.Done():
165 return false
166 }
167
168 exists, err := r.subscription.Exists(ctx)
169
170 if err != nil {
171 if !quietErrs {
172 r.log.Error(err, "failed to check if subscription exists at startup")
173
174
175 quietErrs = true
176 }
177 continue
178 }
179
180 quietErrs = false
181
182 if !exists {
183 if !quietInfo {
184 r.log.Info("subscription does not exist")
185
186
187 quietInfo = true
188 }
189 continue
190 }
191
192
193 return true
194 }
195 }
196
197
198
199
200
201
202 type ReceiverHealthCheck struct {
203
206
207 RestartCount int
208 TotalMessagesProcessed int
209 TotalReceiveDuration time.Duration
210 TotalMessagesAcked int
211 TotalMessagesNacked int
212
213
218
219 Healthy bool
220 CurrentReceiveDuration time.Duration
221 CurrentMessagesProcessed int
222 CurrentMessagesAcked int
223 CurrentMessagesNacked int
224 }
225
226
229
230
231 func (r *Receiver) HealthCheck() ReceiverHealthCheck {
232 r.healthMu.Lock()
233 defer r.healthMu.Unlock()
234
235 var dur time.Duration
236 if !r.receiveStart.IsZero() {
237 dur = time.Since(r.receiveStart)
238 }
239
240 var current = struct {
241 Acked int
242 Nacked int
243 Duration time.Duration
244 }{
245 Acked: int(r.messagesAcked.Load()),
246 Nacked: int(r.messagesNacked.Load()),
247 Duration: dur,
248 }
249
250
251 var hc = r.health
252
253 hc.TotalMessagesAcked += current.Acked
254 hc.TotalMessagesNacked += current.Nacked
255 hc.TotalMessagesProcessed += current.Acked + current.Nacked
256 hc.TotalReceiveDuration += current.Duration
257
258 return ReceiverHealthCheck{
259 Healthy: hc.Healthy,
260 RestartCount: hc.RestartCount,
261
262 TotalMessagesAcked: hc.TotalMessagesAcked,
263 TotalMessagesNacked: hc.TotalMessagesNacked,
264 TotalMessagesProcessed: hc.TotalMessagesProcessed,
265 TotalReceiveDuration: hc.TotalReceiveDuration,
266
267 CurrentReceiveDuration: current.Duration,
268 CurrentMessagesAcked: current.Acked,
269 CurrentMessagesNacked: current.Nacked,
270 CurrentMessagesProcessed: current.Acked + current.Nacked,
271 }
272 }
273
274 func (r *Receiver) updateHealthy() {
275 r.healthMu.Lock()
276 defer r.healthMu.Unlock()
277
278 r.receiveStart = time.Now()
279 r.health.Healthy = true
280 }
281
282 func (r *Receiver) updateHealthTotals() {
283 r.healthMu.Lock()
284 defer r.healthMu.Unlock()
285
286 r.health.TotalMessagesAcked += int(r.messagesAcked.Swap(0))
287 r.health.TotalMessagesNacked += int(r.messagesNacked.Swap(0))
288 r.health.TotalMessagesProcessed += r.health.TotalMessagesAcked + r.health.TotalMessagesNacked
289
290 r.health.RestartCount++
291 r.health.Healthy = false
292
293 r.health.TotalReceiveDuration += time.Since(r.receiveStart)
294 r.receiveStart = time.Time{}
295 }
296
View as plain text