1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package pubsub
16
17 import (
18 "context"
19 "errors"
20 "io"
21 "log"
22 "strings"
23 "sync"
24 "time"
25
26 ipubsub "cloud.google.com/go/internal/pubsub"
27 vkit "cloud.google.com/go/pubsub/apiv1"
28 pb "cloud.google.com/go/pubsub/apiv1/pubsubpb"
29 "cloud.google.com/go/pubsub/internal/distribution"
30 gax "github.com/googleapis/gax-go/v2"
31 "github.com/googleapis/gax-go/v2/apierror"
32 "google.golang.org/grpc"
33 "google.golang.org/grpc/codes"
34 "google.golang.org/grpc/status"
35 "google.golang.org/protobuf/encoding/protowire"
36 )
37
38
39
40
41
42 const gracePeriod = 5 * time.Second
43
44
45
46
47
48
49
50
51
52 const ackIDBatchSize int = 2500
53
54
55 var (
56 maxDurationPerLeaseExtension = 10 * time.Minute
57 minDurationPerLeaseExtension = 10 * time.Second
58 minDurationPerLeaseExtensionExactlyOnce = 1 * time.Minute
59
60
61 exactlyOnceDeliveryRetryDeadline = 600 * time.Second
62 )
63
64 type messageIterator struct {
65 ctx context.Context
66 cancel func()
67 po *pullOptions
68 ps *pullStream
69 subc *vkit.SubscriberClient
70 subName string
71 kaTick <-chan time.Time
72 ackTicker *time.Ticker
73 nackTicker *time.Ticker
74 pingTicker *time.Ticker
75 failed chan struct{}
76 drained chan struct{}
77 wg sync.WaitGroup
78
79 mu sync.Mutex
80 ackTimeDist *distribution.D
81
82
83
84
85
86
87
88 keepAliveDeadlines map[string]time.Time
89 pendingAcks map[string]*AckResult
90 pendingNacks map[string]*AckResult
91
92
93 pendingModAcks map[string]*AckResult
94 err error
95
96 eoMu sync.RWMutex
97 enableExactlyOnceDelivery bool
98 sendNewAckDeadline bool
99
100 orderingMu sync.RWMutex
101
102
103
104 enableOrdering bool
105 }
106
107
108
109
110
111 func newMessageIterator(subc *vkit.SubscriberClient, subName string, po *pullOptions) *messageIterator {
112 var ps *pullStream
113 if !po.synchronous {
114 maxMessages := po.maxOutstandingMessages
115 maxBytes := po.maxOutstandingBytes
116 if po.useLegacyFlowControl {
117 maxMessages = 0
118 maxBytes = 0
119 }
120 ps = newPullStream(context.Background(), subc.StreamingPull, subName, maxMessages, maxBytes, po.maxExtensionPeriod)
121 }
122
123
124 keepAlivePeriod := minDurationPerLeaseExtension / 2
125
126
127 ackTicker := time.NewTicker(100 * time.Millisecond)
128 nackTicker := time.NewTicker(100 * time.Millisecond)
129 pingTicker := time.NewTicker(30 * time.Second)
130 cctx, cancel := context.WithCancel(context.Background())
131 cctx = withSubscriptionKey(cctx, subName)
132 it := &messageIterator{
133 ctx: cctx,
134 cancel: cancel,
135 ps: ps,
136 po: po,
137 subc: subc,
138 subName: subName,
139 kaTick: time.After(keepAlivePeriod),
140 ackTicker: ackTicker,
141 nackTicker: nackTicker,
142 pingTicker: pingTicker,
143 failed: make(chan struct{}),
144 drained: make(chan struct{}),
145 ackTimeDist: distribution.New(int(maxDurationPerLeaseExtension/time.Second) + 1),
146 keepAliveDeadlines: map[string]time.Time{},
147 pendingAcks: map[string]*AckResult{},
148 pendingNacks: map[string]*AckResult{},
149 pendingModAcks: map[string]*AckResult{},
150 }
151 it.wg.Add(1)
152 go it.sender()
153 return it
154 }
155
156
157
158
159
160 func (it *messageIterator) stop() {
161 it.cancel()
162 it.mu.Lock()
163 it.checkDrained()
164 it.mu.Unlock()
165 it.wg.Wait()
166 }
167
168
169
170
171
172 func (it *messageIterator) checkDrained() {
173 select {
174 case <-it.drained:
175 return
176 default:
177 }
178 select {
179 case <-it.ctx.Done():
180 if len(it.keepAliveDeadlines) == 0 {
181 close(it.drained)
182 }
183 default:
184 }
185 }
186
187
188
189
190 func (it *messageIterator) addToDistribution(receiveTime time.Time) {
191 d := time.Since(receiveTime)
192 d = maxDuration(d, minDurationPerLeaseExtension)
193 d = minDuration(d, maxDurationPerLeaseExtension)
194 it.ackTimeDist.Record(int(d / time.Second))
195 }
196
197
198 func (it *messageIterator) done(ackID string, ack bool, r *AckResult, receiveTime time.Time) {
199 it.addToDistribution(receiveTime)
200 it.mu.Lock()
201 defer it.mu.Unlock()
202 delete(it.keepAliveDeadlines, ackID)
203 if ack {
204 it.pendingAcks[ackID] = r
205 } else {
206 it.pendingNacks[ackID] = r
207 }
208 it.checkDrained()
209 }
210
211
212
213
214 func (it *messageIterator) fail(err error) error {
215 it.mu.Lock()
216 defer it.mu.Unlock()
217 if it.err == nil {
218 it.err = err
219 close(it.failed)
220 }
221 return it.err
222 }
223
224
225
226
227 func (it *messageIterator) receive(maxToPull int32) ([]*Message, error) {
228 it.mu.Lock()
229 ierr := it.err
230 it.mu.Unlock()
231 if ierr != nil {
232 return nil, ierr
233 }
234
235
236 select {
237 case <-it.ctx.Done():
238 it.wg.Wait()
239 return nil, io.EOF
240 default:
241 }
242
243 var rmsgs []*pb.ReceivedMessage
244 var err error
245 if it.po.synchronous {
246 rmsgs, err = it.pullMessages(maxToPull)
247 } else {
248 rmsgs, err = it.recvMessages()
249 }
250
251 if err != nil {
252 return nil, it.fail(err)
253 }
254 recordStat(it.ctx, PullCount, int64(len(rmsgs)))
255 now := time.Now()
256 msgs, err := convertMessages(rmsgs, now, it.done)
257 if err != nil {
258 return nil, it.fail(err)
259 }
260
261
262 maxExt := time.Now().Add(it.po.maxExtension)
263 ackIDs := map[string]*AckResult{}
264 it.eoMu.RLock()
265 exactlyOnceDelivery := it.enableExactlyOnceDelivery
266 it.eoMu.RUnlock()
267 it.mu.Lock()
268
269
270
271
272
273
274 pendingMessages := make(map[string]*ipubsub.Message)
275 for _, m := range msgs {
276 ackID := msgAckID(m)
277 addRecv(m.ID, ackID, now)
278 it.keepAliveDeadlines[ackID] = maxExt
279
280
281 if _, ok := it.pendingNacks[ackID]; !ok {
282
283
284
285
286
287 if !exactlyOnceDelivery {
288 ackIDs[ackID] = newSuccessAckResult()
289 } else {
290 ackIDs[ackID] = ipubsub.NewAckResult()
291 pendingMessages[ackID] = m
292 }
293 }
294 }
295 deadline := it.ackDeadline()
296 it.mu.Unlock()
297
298 if len(ackIDs) > 0 {
299
300 if !exactlyOnceDelivery {
301 go func() {
302 it.sendModAck(ackIDs, deadline, false)
303 }()
304 return msgs, nil
305 }
306
307
308
309 it.sendModAck(ackIDs, deadline, false)
310 for ackID, ar := range ackIDs {
311 ctx := context.Background()
312 _, err := ar.Get(ctx)
313 if err != nil {
314 delete(pendingMessages, ackID)
315 it.mu.Lock()
316
317 delete(it.keepAliveDeadlines, ackID)
318 it.mu.Unlock()
319 }
320 }
321
322
323 v := make([]*ipubsub.Message, 0, len(pendingMessages))
324 for _, m := range msgs {
325 ackID := msgAckID(m)
326 if _, ok := pendingMessages[ackID]; ok {
327 v = append(v, m)
328 }
329 }
330 return v, nil
331 }
332 return nil, nil
333 }
334
335
336
337 func (it *messageIterator) pullMessages(maxToPull int32) ([]*pb.ReceivedMessage, error) {
338
339
340 res, err := it.subc.Pull(it.ctx, &pb.PullRequest{
341 Subscription: it.subName,
342 MaxMessages: maxToPull,
343 }, gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(maxSendRecvBytes)))
344 switch {
345 case err == context.Canceled:
346 return nil, nil
347 case status.Code(err) == codes.Canceled:
348 return nil, nil
349 case err != nil:
350 return nil, err
351 default:
352 return res.ReceivedMessages, nil
353 }
354 }
355
356 func (it *messageIterator) recvMessages() ([]*pb.ReceivedMessage, error) {
357 res, err := it.ps.Recv()
358 if err != nil {
359 return nil, err
360 }
361
362
363 it.eoMu.RLock()
364 enableEOD := it.enableExactlyOnceDelivery
365 it.eoMu.RUnlock()
366
367 subProp := res.GetSubscriptionProperties()
368 if got := subProp.GetExactlyOnceDeliveryEnabled(); got != enableEOD {
369 it.eoMu.Lock()
370 it.sendNewAckDeadline = true
371 it.enableExactlyOnceDelivery = got
372 it.eoMu.Unlock()
373 }
374
375
376 it.orderingMu.RLock()
377 enableOrdering := it.enableOrdering
378 it.orderingMu.RUnlock()
379
380 if got := subProp.GetMessageOrderingEnabled(); got != enableOrdering {
381 it.orderingMu.Lock()
382 it.enableOrdering = got
383 it.orderingMu.Unlock()
384 }
385 return res.ReceivedMessages, nil
386 }
387
388
389 func (it *messageIterator) sender() {
390 defer it.wg.Done()
391 defer it.ackTicker.Stop()
392 defer it.nackTicker.Stop()
393 defer it.pingTicker.Stop()
394 defer func() {
395 if it.ps != nil {
396 it.ps.CloseSend()
397 }
398 }()
399
400 done := false
401 for !done {
402 sendAcks := false
403 sendNacks := false
404 sendModAcks := false
405 sendPing := false
406
407 dl := it.ackDeadline()
408
409 select {
410 case <-it.failed:
411
412 return
413
414 case <-it.drained:
415
416
417 it.mu.Lock()
418 sendAcks = (len(it.pendingAcks) > 0)
419 sendNacks = (len(it.pendingNacks) > 0)
420
421 done = true
422
423 case <-it.kaTick:
424 it.mu.Lock()
425 it.handleKeepAlives()
426 sendModAcks = (len(it.pendingModAcks) > 0)
427
428 nextTick := dl - gracePeriod
429 if nextTick <= 0 {
430
431
432 nextTick = dl / 2
433 }
434 it.kaTick = time.After(nextTick)
435
436 case <-it.nackTicker.C:
437 it.mu.Lock()
438 sendNacks = (len(it.pendingNacks) > 0)
439
440 case <-it.ackTicker.C:
441 it.mu.Lock()
442 sendAcks = (len(it.pendingAcks) > 0)
443
444 case <-it.pingTicker.C:
445 it.mu.Lock()
446
447 sendPing = !it.po.synchronous
448 }
449
450 var acks, nacks, modAcks map[string]*AckResult
451 if sendAcks {
452 acks = it.pendingAcks
453 it.pendingAcks = map[string]*AckResult{}
454 }
455 if sendNacks {
456 nacks = it.pendingNacks
457 it.pendingNacks = map[string]*AckResult{}
458 }
459 if sendModAcks {
460 modAcks = it.pendingModAcks
461 it.pendingModAcks = map[string]*AckResult{}
462 }
463 it.mu.Unlock()
464
465 if sendAcks {
466 it.sendAck(acks)
467 }
468 if sendNacks {
469
470 it.sendModAck(nacks, 0, false)
471 }
472 if sendModAcks {
473 it.sendModAck(modAcks, dl, true)
474 }
475 if sendPing {
476 it.pingStream()
477 }
478 }
479 }
480
481
482
483
484
485 func (it *messageIterator) handleKeepAlives() {
486 now := time.Now()
487 for id, expiry := range it.keepAliveDeadlines {
488 if expiry.Before(now) {
489
490
491
492
493 delete(it.keepAliveDeadlines, id)
494 } else {
495
496 it.pendingModAcks[id] = newSuccessAckResult()
497 }
498 }
499 it.checkDrained()
500 }
501
502
503
504 func (it *messageIterator) sendAck(m map[string]*AckResult) {
505 ackIDs := make([]string, 0, len(m))
506 for k := range m {
507 ackIDs = append(ackIDs, k)
508 }
509 it.eoMu.RLock()
510 exactlyOnceDelivery := it.enableExactlyOnceDelivery
511 it.eoMu.RUnlock()
512
513 var toSend []string
514 for len(ackIDs) > 0 {
515 toSend, ackIDs = splitRequestIDs(ackIDs, ackIDBatchSize)
516
517 recordStat(it.ctx, AckCount, int64(len(toSend)))
518 addAcks(toSend)
519
520
521 cctx2, cancel2 := context.WithTimeout(context.Background(), 60*time.Second)
522 defer cancel2()
523 err := it.subc.Acknowledge(cctx2, &pb.AcknowledgeRequest{
524 Subscription: it.subName,
525 AckIds: toSend,
526 })
527 if exactlyOnceDelivery {
528 resultsByAckID := make(map[string]*AckResult)
529 for _, ackID := range toSend {
530 resultsByAckID[ackID] = m[ackID]
531 }
532 st, md := extractMetadata(err)
533 _, toRetry := processResults(st, resultsByAckID, md)
534 if len(toRetry) > 0 {
535
536 go func() {
537 it.retryAcks(toRetry)
538 }()
539 }
540 }
541 }
542 }
543
544
545
546
547
548
549
550 func (it *messageIterator) sendModAck(m map[string]*AckResult, deadline time.Duration, logOnInvalid bool) {
551 deadlineSec := int32(deadline / time.Second)
552 ackIDs := make([]string, 0, len(m))
553 for k := range m {
554 ackIDs = append(ackIDs, k)
555 }
556 it.eoMu.RLock()
557 exactlyOnceDelivery := it.enableExactlyOnceDelivery
558 it.eoMu.RUnlock()
559 var toSend []string
560 for len(ackIDs) > 0 {
561 toSend, ackIDs = splitRequestIDs(ackIDs, ackIDBatchSize)
562 if deadline == 0 {
563 recordStat(it.ctx, NackCount, int64(len(toSend)))
564 } else {
565 recordStat(it.ctx, ModAckCount, int64(len(toSend)))
566 }
567 addModAcks(toSend, deadlineSec)
568
569
570 cctx, cancel2 := context.WithTimeout(context.Background(), 60*time.Second)
571 defer cancel2()
572 err := it.subc.ModifyAckDeadline(cctx, &pb.ModifyAckDeadlineRequest{
573 Subscription: it.subName,
574 AckDeadlineSeconds: deadlineSec,
575 AckIds: toSend,
576 })
577 if exactlyOnceDelivery {
578 resultsByAckID := make(map[string]*AckResult)
579 for _, ackID := range toSend {
580 resultsByAckID[ackID] = m[ackID]
581 }
582
583 st, md := extractMetadata(err)
584 _, toRetry := processResults(st, resultsByAckID, md)
585 if len(toRetry) > 0 {
586
587 go func() {
588 it.retryModAcks(toRetry, deadlineSec, logOnInvalid)
589 }()
590 }
591 }
592 }
593 }
594
595
596
597 func (it *messageIterator) retryAcks(m map[string]*AckResult) {
598 ctx, cancel := context.WithTimeout(context.Background(), exactlyOnceDeliveryRetryDeadline)
599 defer cancel()
600 bo := newExactlyOnceBackoff()
601 for {
602 if ctx.Err() != nil {
603 for _, r := range m {
604 ipubsub.SetAckResult(r, AcknowledgeStatusOther, ctx.Err())
605 }
606 return
607 }
608
609
610 ackIDs := make([]string, 0, len(m))
611 for k := range m {
612 ackIDs = append(ackIDs, k)
613 }
614 cctx2, cancel2 := context.WithTimeout(ctx, 60*time.Second)
615 defer cancel2()
616 err := it.subc.Acknowledge(cctx2, &pb.AcknowledgeRequest{
617 Subscription: it.subName,
618 AckIds: ackIDs,
619 })
620 st, md := extractMetadata(err)
621 _, toRetry := processResults(st, m, md)
622 if len(toRetry) == 0 {
623 return
624 }
625 time.Sleep(bo.Pause())
626 m = toRetry
627 }
628 }
629
630
631
632
633
634 func (it *messageIterator) retryModAcks(m map[string]*AckResult, deadlineSec int32, logOnInvalid bool) {
635 bo := newExactlyOnceBackoff()
636 retryCount := 0
637 ctx, cancel := context.WithTimeout(context.Background(), exactlyOnceDeliveryRetryDeadline)
638 defer cancel()
639 for {
640
641 if ctx.Err() != nil {
642 for _, r := range m {
643 ipubsub.SetAckResult(r, AcknowledgeStatusOther, ctx.Err())
644 }
645 return
646 }
647
648 if deadlineSec != 0 && retryCount > 3 {
649 ackIDs := make([]string, 0, len(m))
650 for k, ar := range m {
651 ackIDs = append(ackIDs, k)
652 ipubsub.SetAckResult(ar, AcknowledgeStatusOther, errors.New("modack retry failed"))
653 }
654 if logOnInvalid {
655 log.Printf("automatic lease modack retry failed for following IDs: %v", ackIDs)
656 }
657 return
658 }
659
660
661 ackIDs := make([]string, 0, len(m))
662 for k := range m {
663 ackIDs = append(ackIDs, k)
664 }
665 cctx2, cancel2 := context.WithTimeout(ctx, 60*time.Second)
666 defer cancel2()
667 err := it.subc.ModifyAckDeadline(cctx2, &pb.ModifyAckDeadlineRequest{
668 Subscription: it.subName,
669 AckIds: ackIDs,
670 AckDeadlineSeconds: deadlineSec,
671 })
672 st, md := extractMetadata(err)
673 _, toRetry := processResults(st, m, md)
674 if len(toRetry) == 0 {
675 return
676 }
677 time.Sleep(bo.Pause())
678 m = toRetry
679 retryCount++
680 }
681 }
682
683
684
685
686
687
688
689 func (it *messageIterator) pingStream() {
690 spr := &pb.StreamingPullRequest{}
691 it.eoMu.RLock()
692 if it.sendNewAckDeadline {
693 spr.StreamAckDeadlineSeconds = int32(it.ackDeadline())
694 it.sendNewAckDeadline = false
695 }
696 it.eoMu.RUnlock()
697 it.ps.Send(spr)
698 }
699
700
701
702 func calcFieldSizeString(fields ...string) int {
703 overhead := 0
704 for _, field := range fields {
705 overhead += 1 + len(field) + protowire.SizeVarint(uint64(len(field)))
706 }
707 return overhead
708 }
709
710
711
712 func calcFieldSizeInt(fields ...int) int {
713 overhead := 0
714 for _, field := range fields {
715 overhead += 1 + protowire.SizeVarint(uint64(field))
716 }
717 return overhead
718 }
719
720
721
722 func splitRequestIDs(ids []string, maxBatchSize int) (prefix, remainder []string) {
723 if len(ids) < maxBatchSize {
724 return ids, []string{}
725 }
726 return ids[:maxBatchSize], ids[maxBatchSize:]
727 }
728
729
730
731
732
733
734
735 func (it *messageIterator) ackDeadline() time.Duration {
736 pt := time.Duration(it.ackTimeDist.Percentile(.99)) * time.Second
737 it.eoMu.RLock()
738 enableExactlyOnce := it.enableExactlyOnceDelivery
739 it.eoMu.RUnlock()
740 return boundedDuration(pt, it.po.minExtensionPeriod, it.po.maxExtensionPeriod, enableExactlyOnce)
741 }
742
743 func boundedDuration(ackDeadline, minExtension, maxExtension time.Duration, exactlyOnce bool) time.Duration {
744
745 if maxExtension > 0 {
746 ackDeadline = minDuration(ackDeadline, maxExtension)
747 }
748
749
750 if minExtension > 0 {
751 ackDeadline = maxDuration(ackDeadline, minExtension)
752 } else if exactlyOnce {
753
754
755 ackDeadline = maxDuration(ackDeadline, minDurationPerLeaseExtensionExactlyOnce)
756 } else if ackDeadline < minDurationPerLeaseExtension {
757
758
759
760 ackDeadline = minDurationPerLeaseExtension
761 }
762
763 return ackDeadline
764 }
765
766 func minDuration(x, y time.Duration) time.Duration {
767 if x < y {
768 return x
769 }
770 return y
771 }
772
773 func maxDuration(x, y time.Duration) time.Duration {
774 if x > y {
775 return x
776 }
777 return y
778 }
779
780 const (
781 transientErrStringPrefix = "TRANSIENT_"
782 permanentInvalidAckErrString = "PERMANENT_FAILURE_INVALID_ACK_ID"
783 )
784
785
786 func extractMetadata(err error) (*status.Status, map[string]string) {
787 apiErr, ok := apierror.FromError(err)
788 if ok {
789 return apiErr.GRPCStatus(), apiErr.Metadata()
790 }
791 return nil, nil
792 }
793
794
795
796
797
798
799
800 func processResults(errorStatus *status.Status, ackResMap map[string]*AckResult, errorsByAckID map[string]string) (map[string]*AckResult, map[string]*AckResult) {
801 completedResults := make(map[string]*AckResult)
802 retryResults := make(map[string]*AckResult)
803 for ackID, ar := range ackResMap {
804
805
806 if errAckID, ok := errorsByAckID[ackID]; ok {
807 if strings.HasPrefix(errAckID, transientErrStringPrefix) {
808 retryResults[ackID] = ar
809 } else {
810 if errAckID == permanentInvalidAckErrString {
811 ipubsub.SetAckResult(ar, AcknowledgeStatusInvalidAckID, errors.New(errAckID))
812 } else {
813 ipubsub.SetAckResult(ar, AcknowledgeStatusOther, errors.New(errAckID))
814 }
815 completedResults[ackID] = ar
816 }
817 } else if errorStatus != nil && contains(errorStatus.Code(), exactlyOnceDeliveryTemporaryRetryErrors) {
818 retryResults[ackID] = ar
819 } else if errorStatus != nil {
820
821 switch errorStatus.Code() {
822 case codes.PermissionDenied:
823 ipubsub.SetAckResult(ar, AcknowledgeStatusPermissionDenied, errorStatus.Err())
824 case codes.FailedPrecondition:
825 ipubsub.SetAckResult(ar, AcknowledgeStatusFailedPrecondition, errorStatus.Err())
826 default:
827 ipubsub.SetAckResult(ar, AcknowledgeStatusOther, errorStatus.Err())
828 }
829 completedResults[ackID] = ar
830 } else if ar != nil {
831
832 ipubsub.SetAckResult(ar, AcknowledgeStatusSuccess, nil)
833 completedResults[ackID] = ar
834 } else {
835
836 completedResults[ackID] = ar
837 }
838 }
839 return completedResults, retryResults
840 }
841
View as plain text