...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package pubsub
16
17 import (
18 "math"
19 "strings"
20 "time"
21
22 gax "github.com/googleapis/gax-go/v2"
23 "google.golang.org/grpc/codes"
24 "google.golang.org/grpc/status"
25 )
26
27
28
29
30
31
32
33 const (
34 maxPayload = 512 * 1024
35 maxSendRecvBytes = 20 * 1024 * 1024
36 )
37
38 func trunc32(i int64) int32 {
39 if i > math.MaxInt32 {
40 i = math.MaxInt32
41 }
42 return int32(i)
43 }
44
45 type defaultRetryer struct {
46 bo gax.Backoff
47 }
48
49
50
51 func (r *defaultRetryer) Retry(err error) (pause time.Duration, shouldRetry bool) {
52 s, ok := status.FromError(err)
53 if !ok {
54 return r.bo.Pause(), true
55 }
56 switch s.Code() {
57 case codes.DeadlineExceeded, codes.Internal, codes.ResourceExhausted, codes.Aborted:
58 return r.bo.Pause(), true
59 case codes.Unavailable:
60 c := strings.Contains(s.Message(), "Server shutdownNow invoked")
61 if !c {
62 return r.bo.Pause(), true
63 }
64 return 0, false
65 case codes.Unknown:
66
67 isGoaway := strings.Contains(s.Message(), "received prior goaway: code: NO_ERROR")
68 if isGoaway {
69 return r.bo.Pause(), true
70 }
71 return 0, false
72 default:
73 return 0, false
74 }
75 }
76
77 type streamingPullRetryer struct {
78 defaultRetryer gax.Retryer
79 }
80
81
82 func (r *streamingPullRetryer) Retry(err error) (pause time.Duration, shouldRetry bool) {
83 s, ok := status.FromError(err)
84 if !ok {
85 return r.defaultRetryer.Retry(err)
86 }
87 switch s.Code() {
88 case codes.ResourceExhausted:
89 return 0, false
90 default:
91 return r.defaultRetryer.Retry(err)
92 }
93 }
94
95 type publishRetryer struct {
96 defaultRetryer gax.Retryer
97 }
98
99 func (r *publishRetryer) Retry(err error) (pause time.Duration, shouldRetry bool) {
100 s, ok := status.FromError(err)
101 if !ok {
102 return r.defaultRetryer.Retry(err)
103 }
104 if s.Code() == codes.Internal && strings.Contains(s.Message(), "string field contains invalid UTF-8") {
105 return 0, false
106 }
107 return r.defaultRetryer.Retry(err)
108 }
109
110 var (
111 exactlyOnceDeliveryTemporaryRetryErrors = map[codes.Code]struct{}{
112 codes.DeadlineExceeded: {},
113 codes.ResourceExhausted: {},
114 codes.Aborted: {},
115 codes.Internal: {},
116 codes.Unavailable: {},
117 }
118 )
119
120
121 func contains(v codes.Code, t map[codes.Code]struct{}) bool {
122 _, ok := t[v]
123 return ok
124 }
125
126 func newExactlyOnceBackoff() gax.Backoff {
127 return gax.Backoff{
128 Initial: 1 * time.Second,
129 Max: 64 * time.Second,
130 Multiplier: 2,
131 }
132 }
133
View as plain text