...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package storage
16
17 import (
18 "context"
19 "errors"
20 "fmt"
21 "io"
22 "net"
23 "net/url"
24 "strings"
25
26 "cloud.google.com/go/internal"
27 "cloud.google.com/go/internal/version"
28 sinternal "cloud.google.com/go/storage/internal"
29 "github.com/google/uuid"
30 gax "github.com/googleapis/gax-go/v2"
31 "github.com/googleapis/gax-go/v2/callctx"
32 "google.golang.org/api/googleapi"
33 "google.golang.org/grpc/codes"
34 "google.golang.org/grpc/status"
35 )
36
37 var defaultRetry *retryConfig = &retryConfig{}
38 var xGoogDefaultHeader = fmt.Sprintf("gl-go/%s gccl/%s", version.Go(), sinternal.Version)
39
40 const (
41 xGoogHeaderKey = "x-goog-api-client"
42 idempotencyHeaderKey = "x-goog-gcs-idempotency-token"
43 )
44
45
46
47
48 func run(ctx context.Context, call func(ctx context.Context) error, retry *retryConfig, isIdempotent bool) error {
49 attempts := 1
50 invocationID := uuid.New().String()
51
52 if retry == nil {
53 retry = defaultRetry
54 }
55 if (retry.policy == RetryIdempotent && !isIdempotent) || retry.policy == RetryNever {
56 ctxWithHeaders := setInvocationHeaders(ctx, invocationID, attempts)
57 return call(ctxWithHeaders)
58 }
59 bo := gax.Backoff{}
60 if retry.backoff != nil {
61 bo.Multiplier = retry.backoff.Multiplier
62 bo.Initial = retry.backoff.Initial
63 bo.Max = retry.backoff.Max
64 }
65 var errorFunc func(err error) bool = ShouldRetry
66 if retry.shouldRetry != nil {
67 errorFunc = retry.shouldRetry
68 }
69
70 return internal.Retry(ctx, bo, func() (stop bool, err error) {
71 ctxWithHeaders := setInvocationHeaders(ctx, invocationID, attempts)
72 err = call(ctxWithHeaders)
73 if err != nil && retry.maxAttempts != nil && attempts >= *retry.maxAttempts {
74 return true, fmt.Errorf("storage: retry failed after %v attempts; last error: %w", *retry.maxAttempts, err)
75 }
76 attempts++
77 return !errorFunc(err), err
78 })
79 }
80
81
82
83 func setInvocationHeaders(ctx context.Context, invocationID string, attempts int) context.Context {
84 invocationHeader := fmt.Sprintf("gccl-invocation-id/%v gccl-attempt-count/%v", invocationID, attempts)
85 xGoogHeader := strings.Join([]string{invocationHeader, xGoogDefaultHeader}, " ")
86
87 ctx = callctx.SetHeaders(ctx, xGoogHeaderKey, xGoogHeader)
88 ctx = callctx.SetHeaders(ctx, idempotencyHeaderKey, invocationID)
89 return ctx
90 }
91
92
93
94
95
96
97
98
99
100
101 func ShouldRetry(err error) bool {
102 if err == nil {
103 return false
104 }
105 if errors.Is(err, io.ErrUnexpectedEOF) {
106 return true
107 }
108 if errors.Is(err, net.ErrClosed) {
109 return true
110 }
111
112 switch e := err.(type) {
113 case *googleapi.Error:
114
115
116 return e.Code == 408 || e.Code == 429 || (e.Code >= 500 && e.Code < 600)
117 case *net.OpError, *url.Error:
118
119
120
121 retriable := []string{"connection refused", "connection reset"}
122 for _, s := range retriable {
123 if strings.Contains(e.Error(), s) {
124 return true
125 }
126 }
127 case interface{ Temporary() bool }:
128 if e.Temporary() {
129 return true
130 }
131 }
132
133 if st, ok := status.FromError(err); ok {
134 if code := st.Code(); code == codes.Unavailable || code == codes.ResourceExhausted || code == codes.Internal {
135 return true
136 }
137 }
138
139 if e, ok := err.(interface{ Unwrap() error }); ok {
140 return ShouldRetry(e.Unwrap())
141 }
142 return false
143 }
144
View as plain text