1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package clientv3
19
20 import (
21 "context"
22 "io"
23 "sync"
24 "time"
25
26 "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
27 "go.uber.org/zap"
28 "google.golang.org/grpc"
29 "google.golang.org/grpc/codes"
30 "google.golang.org/grpc/metadata"
31 "google.golang.org/grpc/status"
32 )
33
34
35
36
37
38 func (c *Client) unaryClientInterceptor(optFuncs ...retryOption) grpc.UnaryClientInterceptor {
39 intOpts := reuseOrNewWithCallOptions(defaultOptions, optFuncs)
40 return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
41 ctx = withVersion(ctx)
42 grpcOpts, retryOpts := filterCallOptions(opts)
43 callOpts := reuseOrNewWithCallOptions(intOpts, retryOpts)
44
45 if callOpts.max == 0 {
46 return invoker(ctx, method, req, reply, cc, grpcOpts...)
47 }
48 var lastErr error
49 for attempt := uint(0); attempt < callOpts.max; attempt++ {
50 if err := waitRetryBackoff(ctx, attempt, callOpts); err != nil {
51 return err
52 }
53 c.GetLogger().Debug(
54 "retrying of unary invoker",
55 zap.String("target", cc.Target()),
56 zap.Uint("attempt", attempt),
57 )
58 lastErr = invoker(ctx, method, req, reply, cc, grpcOpts...)
59 if lastErr == nil {
60 return nil
61 }
62 c.GetLogger().Warn(
63 "retrying of unary invoker failed",
64 zap.String("target", cc.Target()),
65 zap.Uint("attempt", attempt),
66 zap.Error(lastErr),
67 )
68 if isContextError(lastErr) {
69 if ctx.Err() != nil {
70
71 return lastErr
72 }
73
74 continue
75 }
76 if c.shouldRefreshToken(lastErr, callOpts) {
77 gterr := c.refreshToken(ctx)
78 if gterr != nil {
79 c.GetLogger().Warn(
80 "retrying of unary invoker failed to fetch new auth token",
81 zap.String("target", cc.Target()),
82 zap.Error(gterr),
83 )
84 return gterr
85 }
86 continue
87 }
88 if !isSafeRetry(c.lg, lastErr, callOpts) {
89 return lastErr
90 }
91 }
92 return lastErr
93 }
94 }
95
96
97
98
99
100
101
102
103
104 func (c *Client) streamClientInterceptor(optFuncs ...retryOption) grpc.StreamClientInterceptor {
105 intOpts := reuseOrNewWithCallOptions(defaultOptions, optFuncs)
106 return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
107 ctx = withVersion(ctx)
108
109
110 if c.authTokenBundle != nil {
111
112 err := c.getToken(ctx)
113 if err != nil && rpctypes.Error(err) != rpctypes.ErrAuthNotEnabled {
114 c.GetLogger().Error("clientv3/retry_interceptor: getToken failed", zap.Error(err))
115 return nil, err
116 }
117 }
118 grpcOpts, retryOpts := filterCallOptions(opts)
119 callOpts := reuseOrNewWithCallOptions(intOpts, retryOpts)
120
121 if callOpts.max == 0 {
122 return streamer(ctx, desc, cc, method, grpcOpts...)
123 }
124 if desc.ClientStreams {
125 return nil, status.Errorf(codes.Unimplemented, "clientv3/retry_interceptor: cannot retry on ClientStreams, set Disable()")
126 }
127 newStreamer, err := streamer(ctx, desc, cc, method, grpcOpts...)
128 if err != nil {
129 c.GetLogger().Error("streamer failed to create ClientStream", zap.Error(err))
130 return nil, err
131 }
132 retryingStreamer := &serverStreamingRetryingStream{
133 client: c,
134 ClientStream: newStreamer,
135 callOpts: callOpts,
136 ctx: ctx,
137 streamerCall: func(ctx context.Context) (grpc.ClientStream, error) {
138 return streamer(ctx, desc, cc, method, grpcOpts...)
139 },
140 }
141 return retryingStreamer, nil
142 }
143 }
144
145
146
147 func (c *Client) shouldRefreshToken(err error, callOpts *options) bool {
148 if rpctypes.Error(err) == rpctypes.ErrUserEmpty {
149
150
151 return c.authTokenBundle != nil
152 }
153
154 return callOpts.retryAuth &&
155 (rpctypes.Error(err) == rpctypes.ErrInvalidAuthToken || rpctypes.Error(err) == rpctypes.ErrAuthOldRevision)
156 }
157
158 func (c *Client) refreshToken(ctx context.Context) error {
159 if c.authTokenBundle == nil {
160
161
162
163
164
165
166
167
168
169 return nil
170 }
171
172 c.authTokenBundle.UpdateAuthToken("")
173 return c.getToken(ctx)
174 }
175
176
177
178
179 type serverStreamingRetryingStream struct {
180 grpc.ClientStream
181 client *Client
182 bufferedSends []interface{}
183 receivedGood bool
184 wasClosedSend bool
185 ctx context.Context
186 callOpts *options
187 streamerCall func(ctx context.Context) (grpc.ClientStream, error)
188 mu sync.RWMutex
189 }
190
191 func (s *serverStreamingRetryingStream) setStream(clientStream grpc.ClientStream) {
192 s.mu.Lock()
193 s.ClientStream = clientStream
194 s.mu.Unlock()
195 }
196
197 func (s *serverStreamingRetryingStream) getStream() grpc.ClientStream {
198 s.mu.RLock()
199 defer s.mu.RUnlock()
200 return s.ClientStream
201 }
202
203 func (s *serverStreamingRetryingStream) SendMsg(m interface{}) error {
204 s.mu.Lock()
205 s.bufferedSends = append(s.bufferedSends, m)
206 s.mu.Unlock()
207 return s.getStream().SendMsg(m)
208 }
209
210 func (s *serverStreamingRetryingStream) CloseSend() error {
211 s.mu.Lock()
212 s.wasClosedSend = true
213 s.mu.Unlock()
214 return s.getStream().CloseSend()
215 }
216
217 func (s *serverStreamingRetryingStream) Header() (metadata.MD, error) {
218 return s.getStream().Header()
219 }
220
221 func (s *serverStreamingRetryingStream) Trailer() metadata.MD {
222 return s.getStream().Trailer()
223 }
224
225 func (s *serverStreamingRetryingStream) RecvMsg(m interface{}) error {
226 attemptRetry, lastErr := s.receiveMsgAndIndicateRetry(m)
227 if !attemptRetry {
228 return lastErr
229 }
230
231
232 for attempt := uint(1); attempt < s.callOpts.max; attempt++ {
233 if err := waitRetryBackoff(s.ctx, attempt, s.callOpts); err != nil {
234 return err
235 }
236 newStream, err := s.reestablishStreamAndResendBuffer(s.ctx)
237 if err != nil {
238 s.client.lg.Error("failed reestablishStreamAndResendBuffer", zap.Error(err))
239 return err
240 }
241 s.setStream(newStream)
242
243 s.client.lg.Warn("retrying RecvMsg", zap.Error(lastErr))
244 attemptRetry, lastErr = s.receiveMsgAndIndicateRetry(m)
245 if !attemptRetry {
246 return lastErr
247 }
248 }
249 return lastErr
250 }
251
252 func (s *serverStreamingRetryingStream) receiveMsgAndIndicateRetry(m interface{}) (bool, error) {
253 s.mu.RLock()
254 wasGood := s.receivedGood
255 s.mu.RUnlock()
256 err := s.getStream().RecvMsg(m)
257 if err == nil || err == io.EOF {
258 s.mu.Lock()
259 s.receivedGood = true
260 s.mu.Unlock()
261 return false, err
262 } else if wasGood {
263
264 return false, err
265 }
266 if isContextError(err) {
267 if s.ctx.Err() != nil {
268 return false, err
269 }
270
271 return true, err
272 }
273 if s.client.shouldRefreshToken(err, s.callOpts) {
274 gterr := s.client.refreshToken(s.ctx)
275 if gterr != nil {
276 s.client.lg.Warn("retry failed to fetch new auth token", zap.Error(gterr))
277 return false, err
278 }
279 return true, err
280
281 }
282 return isSafeRetry(s.client.lg, err, s.callOpts), err
283 }
284
285 func (s *serverStreamingRetryingStream) reestablishStreamAndResendBuffer(callCtx context.Context) (grpc.ClientStream, error) {
286 s.mu.RLock()
287 bufferedSends := s.bufferedSends
288 s.mu.RUnlock()
289 newStream, err := s.streamerCall(callCtx)
290 if err != nil {
291 return nil, err
292 }
293 for _, msg := range bufferedSends {
294 if err := newStream.SendMsg(msg); err != nil {
295 return nil, err
296 }
297 }
298 if err := newStream.CloseSend(); err != nil {
299 return nil, err
300 }
301 return newStream, nil
302 }
303
304 func waitRetryBackoff(ctx context.Context, attempt uint, callOpts *options) error {
305 waitTime := time.Duration(0)
306 if attempt > 0 {
307 waitTime = callOpts.backoffFunc(attempt)
308 }
309 if waitTime > 0 {
310 timer := time.NewTimer(waitTime)
311 select {
312 case <-ctx.Done():
313 timer.Stop()
314 return contextErrToGrpcErr(ctx.Err())
315 case <-timer.C:
316 }
317 }
318 return nil
319 }
320
321
322 func isSafeRetry(lg *zap.Logger, err error, callOpts *options) bool {
323 if isContextError(err) {
324 return false
325 }
326 switch callOpts.retryPolicy {
327 case repeatable:
328 return isSafeRetryImmutableRPC(err)
329 case nonRepeatable:
330 return isSafeRetryMutableRPC(err)
331 default:
332 lg.Warn("unrecognized retry policy", zap.String("retryPolicy", callOpts.retryPolicy.String()))
333 return false
334 }
335 }
336
337 func isContextError(err error) bool {
338 return status.Code(err) == codes.DeadlineExceeded || status.Code(err) == codes.Canceled
339 }
340
341 func contextErrToGrpcErr(err error) error {
342 switch err {
343 case context.DeadlineExceeded:
344 return status.Errorf(codes.DeadlineExceeded, err.Error())
345 case context.Canceled:
346 return status.Errorf(codes.Canceled, err.Error())
347 default:
348 return status.Errorf(codes.Unknown, err.Error())
349 }
350 }
351
352 var (
353 defaultOptions = &options{
354 retryPolicy: nonRepeatable,
355 max: 0,
356 backoffFunc: backoffLinearWithJitter(50*time.Millisecond , 0.10),
357 retryAuth: true,
358 }
359 )
360
361
362
363
364
365
366
367 type backoffFunc func(attempt uint) time.Duration
368
369
370 func withRetryPolicy(rp retryPolicy) retryOption {
371 return retryOption{applyFunc: func(o *options) {
372 o.retryPolicy = rp
373 }}
374 }
375
376
377 func withMax(maxRetries uint) retryOption {
378 return retryOption{applyFunc: func(o *options) {
379 o.max = maxRetries
380 }}
381 }
382
383
384 func withBackoff(bf backoffFunc) retryOption {
385 return retryOption{applyFunc: func(o *options) {
386 o.backoffFunc = bf
387 }}
388 }
389
390 type options struct {
391 retryPolicy retryPolicy
392 max uint
393 backoffFunc backoffFunc
394 retryAuth bool
395 }
396
397
398 type retryOption struct {
399 grpc.EmptyCallOption
400 applyFunc func(opt *options)
401 }
402
403 func reuseOrNewWithCallOptions(opt *options, retryOptions []retryOption) *options {
404 if len(retryOptions) == 0 {
405 return opt
406 }
407 optCopy := &options{}
408 *optCopy = *opt
409 for _, f := range retryOptions {
410 f.applyFunc(optCopy)
411 }
412 return optCopy
413 }
414
415 func filterCallOptions(callOptions []grpc.CallOption) (grpcOptions []grpc.CallOption, retryOptions []retryOption) {
416 for _, opt := range callOptions {
417 if co, ok := opt.(retryOption); ok {
418 retryOptions = append(retryOptions, co)
419 } else {
420 grpcOptions = append(grpcOptions, opt)
421 }
422 }
423 return grpcOptions, retryOptions
424 }
425
426
427
428
429 func backoffLinearWithJitter(waitBetween time.Duration, jitterFraction float64) backoffFunc {
430 return func(attempt uint) time.Duration {
431 return jitterUp(waitBetween, jitterFraction)
432 }
433 }
434
View as plain text