...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package managedwriter
16
17 import (
18 "errors"
19 "io"
20 "math/rand"
21 "strings"
22 "sync"
23 "time"
24
25 "github.com/googleapis/gax-go/v2"
26 "google.golang.org/grpc/codes"
27 "google.golang.org/grpc/status"
28 )
29
30 var (
31 defaultRetryAttempts = 4
32 )
33
34
35
36 func retryPredicate(err error) (shouldRetry, aggressiveBackoff bool) {
37 if err == nil {
38 return
39 }
40
41 s, ok := status.FromError(err)
42
43 if !ok {
44
45 if errors.Is(err, io.EOF) {
46 shouldRetry = true
47 return
48 }
49
50 return
51 }
52 switch s.Code() {
53 case codes.Aborted,
54 codes.Canceled,
55 codes.DeadlineExceeded,
56 codes.FailedPrecondition,
57 codes.Internal,
58 codes.Unavailable:
59 shouldRetry = true
60 return
61 case codes.ResourceExhausted:
62 if strings.HasPrefix(s.Message(), "Exceeds 'AppendRows throughput' quota") {
63
64
65 shouldRetry = true
66 return
67 }
68 }
69 return
70 }
71
72
73 type unaryRetryer struct {
74 bo gax.Backoff
75 }
76
77 func (ur *unaryRetryer) Retry(err error) (time.Duration, bool) {
78 shouldRetry, _ := retryPredicate(err)
79 return ur.bo.Pause(), shouldRetry
80 }
81
82
83
84
85 type statelessRetryer struct {
86 mu sync.Mutex
87 r *rand.Rand
88
89 minBackoff time.Duration
90 jitter time.Duration
91 aggressiveFactor int
92 maxAttempts int
93 }
94
95 func newStatelessRetryer() *statelessRetryer {
96 return &statelessRetryer{
97 r: rand.New(rand.NewSource(time.Now().UnixNano())),
98 minBackoff: 50 * time.Millisecond,
99 jitter: time.Second,
100 maxAttempts: defaultRetryAttempts,
101 }
102 }
103
104 func (sr *statelessRetryer) pause(aggressiveBackoff bool) time.Duration {
105 jitter := sr.jitter.Nanoseconds()
106 if jitter > 0 {
107 sr.mu.Lock()
108 jitter = sr.r.Int63n(jitter)
109 sr.mu.Unlock()
110 }
111 pause := sr.minBackoff.Nanoseconds() + jitter
112 if aggressiveBackoff {
113 pause = pause * int64(sr.aggressiveFactor)
114 }
115 return time.Duration(pause)
116 }
117
118 func (sr *statelessRetryer) Retry(err error, attemptCount int) (time.Duration, bool) {
119 if attemptCount >= sr.maxAttempts {
120 return 0, false
121 }
122 shouldRetry, aggressive := retryPredicate(err)
123 if shouldRetry {
124 return sr.pause(aggressive), true
125 }
126 return 0, false
127 }
128
129
130
131
132 func shouldReconnect(err error) bool {
133
134
135 if errors.Is(err, io.EOF) {
136 return true
137 }
138
139 reconnectCodes := []codes.Code{
140 codes.Aborted,
141 codes.Canceled,
142 codes.Unavailable,
143 codes.DeadlineExceeded,
144 }
145 if s, ok := status.FromError(err); ok {
146 for _, c := range reconnectCodes {
147 if s.Code() == c {
148 return true
149 }
150 }
151 }
152 return false
153 }
154
View as plain text