1 /* 2 Copyright 2021 The Kubernetes Authors. 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 package rest 18 19 import ( 20 "context" 21 "fmt" 22 "io" 23 "net/http" 24 "net/url" 25 "time" 26 27 "k8s.io/klog/v2" 28 ) 29 30 // IsRetryableErrorFunc allows the client to provide its own function 31 // that determines whether the specified err from the server is retryable. 32 // 33 // request: the original request sent to the server 34 // err: the server sent this error to us 35 // 36 // The function returns true if the error is retryable and the request 37 // can be retried, otherwise it returns false. 38 // We have four mode of communications - 'Stream', 'Watch', 'Do' and 'DoRaw', this 39 // function allows us to customize the retryability aspect of each. 40 type IsRetryableErrorFunc func(request *http.Request, err error) bool 41 42 func (r IsRetryableErrorFunc) IsErrorRetryable(request *http.Request, err error) bool { 43 return r(request, err) 44 } 45 46 var neverRetryError = IsRetryableErrorFunc(func(_ *http.Request, _ error) bool { 47 return false 48 }) 49 50 // WithRetry allows the client to retry a request up to a certain number of times 51 // Note that WithRetry is not safe for concurrent use by multiple 52 // goroutines without additional locking or coordination. 53 type WithRetry interface { 54 // IsNextRetry advances the retry counter appropriately 55 // and returns true if the request should be retried, 56 // otherwise it returns false, if: 57 // - we have already reached the maximum retry threshold. 58 // - the error does not fall into the retryable category. 59 // - the server has not sent us a 429, or 5xx status code and the 60 // 'Retry-After' response header is not set with a value. 61 // - we need to seek to the beginning of the request body before we 62 // initiate the next retry, the function should log an error and 63 // return false if it fails to do so. 64 // 65 // restReq: the associated rest.Request 66 // httpReq: the HTTP Request sent to the server 67 // resp: the response sent from the server, it is set if err is nil 68 // err: the server sent this error to us, if err is set then resp is nil. 69 // f: a IsRetryableErrorFunc function provided by the client that determines 70 // if the err sent by the server is retryable. 71 IsNextRetry(ctx context.Context, restReq *Request, httpReq *http.Request, resp *http.Response, err error, f IsRetryableErrorFunc) bool 72 73 // Before should be invoked prior to each attempt, including 74 // the first one. If an error is returned, the request should 75 // be aborted immediately. 76 // 77 // Before may also be additionally responsible for preparing 78 // the request for the next retry, namely in terms of resetting 79 // the request body in case it has been read. 80 Before(ctx context.Context, r *Request) error 81 82 // After should be invoked immediately after an attempt is made. 83 After(ctx context.Context, r *Request, resp *http.Response, err error) 84 85 // WrapPreviousError wraps the error from any previous attempt into 86 // the final error specified in 'finalErr', so the user has more 87 // context why the request failed. 88 // For example, if a request times out after multiple retries then 89 // we see a generic context.Canceled or context.DeadlineExceeded 90 // error which is not very useful in debugging. This function can 91 // wrap any error from previous attempt(s) to provide more context to 92 // the user. The error returned in 'err' must satisfy the 93 // following conditions: 94 // a: errors.Unwrap(err) = errors.Unwrap(finalErr) if finalErr 95 // implements Unwrap 96 // b: errors.Unwrap(err) = finalErr if finalErr does not 97 // implements Unwrap 98 // c: errors.Is(err, otherErr) = errors.Is(finalErr, otherErr) 99 WrapPreviousError(finalErr error) (err error) 100 } 101 102 // RetryAfter holds information associated with the next retry. 103 type RetryAfter struct { 104 // Wait is the duration the server has asked us to wait before 105 // the next retry is initiated. 106 // This is the value of the 'Retry-After' response header in seconds. 107 Wait time.Duration 108 109 // Attempt is the Nth attempt after which we have received a retryable 110 // error or a 'Retry-After' response header from the server. 111 Attempt int 112 113 // Reason describes why we are retrying the request 114 Reason string 115 } 116 117 type withRetry struct { 118 maxRetries int 119 attempts int 120 121 // retry after parameters that pertain to the attempt that is to 122 // be made soon, so as to enable 'Before' and 'After' to refer 123 // to the retry parameters. 124 // - for the first attempt, it will always be nil 125 // - for consecutive attempts, it is non nil and holds the 126 // retry after parameters for the next attempt to be made. 127 retryAfter *RetryAfter 128 129 // we keep track of two most recent errors, if the most 130 // recent attempt is labeled as 'N' then: 131 // - currentErr represents the error returned by attempt N, it 132 // can be nil if attempt N did not return an error. 133 // - previousErr represents an error from an attempt 'M' which 134 // precedes attempt 'N' (N - M >= 1), it is non nil only when: 135 // - for a sequence of attempt(s) 1..n (n>1), there 136 // is an attempt k (k<n) that returned an error. 137 previousErr, currentErr error 138 } 139 140 func (r *withRetry) trackPreviousError(err error) { 141 // keep track of two most recent errors 142 if r.currentErr != nil { 143 r.previousErr = r.currentErr 144 } 145 r.currentErr = err 146 } 147 148 func (r *withRetry) IsNextRetry(ctx context.Context, restReq *Request, httpReq *http.Request, resp *http.Response, err error, f IsRetryableErrorFunc) bool { 149 defer r.trackPreviousError(err) 150 151 if httpReq == nil || (resp == nil && err == nil) { 152 // bad input, we do nothing. 153 return false 154 } 155 156 if restReq.body != nil { 157 // we have an opaque reader, we can't safely reset it 158 return false 159 } 160 161 r.attempts++ 162 r.retryAfter = &RetryAfter{Attempt: r.attempts} 163 if r.attempts > r.maxRetries { 164 return false 165 } 166 167 // if the server returned an error, it takes precedence over the http response. 168 var errIsRetryable bool 169 if f != nil && err != nil && f.IsErrorRetryable(httpReq, err) { 170 errIsRetryable = true 171 // we have a retryable error, for which we will create an 172 // artificial "Retry-After" response. 173 resp = retryAfterResponse() 174 } 175 if err != nil && !errIsRetryable { 176 return false 177 } 178 179 // if we are here, we have either a or b: 180 // a: we have a retryable error, for which we already 181 // have an artificial "Retry-After" response. 182 // b: we have a response from the server for which we 183 // need to check if it is retryable 184 seconds, wait := checkWait(resp) 185 if !wait { 186 return false 187 } 188 189 r.retryAfter.Wait = time.Duration(seconds) * time.Second 190 r.retryAfter.Reason = getRetryReason(r.attempts, seconds, resp, err) 191 192 return true 193 } 194 195 func (r *withRetry) Before(ctx context.Context, request *Request) error { 196 // If the request context is already canceled there 197 // is no need to retry. 198 if ctx.Err() != nil { 199 r.trackPreviousError(ctx.Err()) 200 return ctx.Err() 201 } 202 203 url := request.URL() 204 // r.retryAfter represents the retry after parameters calculated 205 // from the (response, err) tuple from the last attempt, so 'Before' 206 // can apply these retry after parameters prior to the next attempt. 207 // 'r.retryAfter == nil' indicates that this is the very first attempt. 208 if r.retryAfter == nil { 209 // we do a backoff sleep before the first attempt is made, 210 // (preserving current behavior). 211 if request.backoff != nil { 212 request.backoff.Sleep(request.backoff.CalculateBackoff(url)) 213 } 214 return nil 215 } 216 217 // if we are here, we have made attempt(s) at least once before. 218 if request.backoff != nil { 219 delay := request.backoff.CalculateBackoff(url) 220 if r.retryAfter.Wait > delay { 221 delay = r.retryAfter.Wait 222 } 223 request.backoff.Sleep(delay) 224 } 225 226 // We are retrying the request that we already send to 227 // apiserver at least once before. This request should 228 // also be throttled with the client-internal rate limiter. 229 if err := request.tryThrottleWithInfo(ctx, r.retryAfter.Reason); err != nil { 230 r.trackPreviousError(ctx.Err()) 231 return err 232 } 233 234 klog.V(4).Infof("Got a Retry-After %s response for attempt %d to %v", r.retryAfter.Wait, r.retryAfter.Attempt, request.URL().String()) 235 return nil 236 } 237 238 func (r *withRetry) After(ctx context.Context, request *Request, resp *http.Response, err error) { 239 // 'After' is invoked immediately after an attempt is made, let's label 240 // the attempt we have just made as attempt 'N'. 241 // the current value of r.retryAfter represents the retry after 242 // parameters calculated from the (response, err) tuple from 243 // attempt N-1, so r.retryAfter is outdated and should not be 244 // referred to here. 245 isRetry := r.retryAfter != nil 246 r.retryAfter = nil 247 248 // the client finishes a single request after N attempts (1..N) 249 // - all attempts (1..N) are counted to the rest_client_requests_total 250 // metric (current behavior). 251 // - every attempt after the first (2..N) are counted to the 252 // rest_client_request_retries_total metric. 253 updateRequestResultMetric(ctx, request, resp, err) 254 if isRetry { 255 // this is attempt 2 or later 256 updateRequestRetryMetric(ctx, request, resp, err) 257 } 258 259 if request.c.base != nil { 260 if err != nil { 261 request.backoff.UpdateBackoff(request.URL(), err, 0) 262 } else { 263 request.backoff.UpdateBackoff(request.URL(), err, resp.StatusCode) 264 } 265 } 266 } 267 268 func (r *withRetry) WrapPreviousError(currentErr error) error { 269 if currentErr == nil || r.previousErr == nil { 270 return currentErr 271 } 272 273 // if both previous and current error objects represent the error, 274 // then there is no need to wrap the previous error. 275 if currentErr.Error() == r.previousErr.Error() { 276 return currentErr 277 } 278 279 previousErr := r.previousErr 280 // net/http wraps the underlying error with an url.Error, if the 281 // previous err object is an instance of url.Error, then we can 282 // unwrap it to get to the inner error object, this is so we can 283 // avoid error message like: 284 // Error: Get "http://foo.bar/api/v1": context deadline exceeded - error \ 285 // from a previous attempt: Error: Get "http://foo.bar/api/v1": EOF 286 if urlErr, ok := r.previousErr.(*url.Error); ok && urlErr != nil { 287 if urlErr.Unwrap() != nil { 288 previousErr = urlErr.Unwrap() 289 } 290 } 291 292 return &wrapPreviousError{ 293 currentErr: currentErr, 294 previousError: previousErr, 295 } 296 } 297 298 type wrapPreviousError struct { 299 currentErr, previousError error 300 } 301 302 func (w *wrapPreviousError) Unwrap() error { return w.currentErr } 303 func (w *wrapPreviousError) Error() string { 304 return fmt.Sprintf("%s - error from a previous attempt: %s", w.currentErr.Error(), w.previousError.Error()) 305 } 306 307 // checkWait returns true along with a number of seconds if 308 // the server instructed us to wait before retrying. 309 func checkWait(resp *http.Response) (int, bool) { 310 switch r := resp.StatusCode; { 311 // any 500 error code and 429 can trigger a wait 312 case r == http.StatusTooManyRequests, r >= 500: 313 default: 314 return 0, false 315 } 316 i, ok := retryAfterSeconds(resp) 317 return i, ok 318 } 319 320 func getRetryReason(retries, seconds int, resp *http.Response, err error) string { 321 // priority and fairness sets the UID of the FlowSchema 322 // associated with a request in the following response Header. 323 const responseHeaderMatchedFlowSchemaUID = "X-Kubernetes-PF-FlowSchema-UID" 324 325 message := fmt.Sprintf("retries: %d, retry-after: %ds", retries, seconds) 326 327 switch { 328 case resp.StatusCode == http.StatusTooManyRequests: 329 // it is server-side throttling from priority and fairness 330 flowSchemaUID := resp.Header.Get(responseHeaderMatchedFlowSchemaUID) 331 return fmt.Sprintf("%s - retry-reason: due to server-side throttling, FlowSchema UID: %q", message, flowSchemaUID) 332 case err != nil: 333 // it's a retryable error 334 return fmt.Sprintf("%s - retry-reason: due to retryable error, error: %v", message, err) 335 default: 336 return fmt.Sprintf("%s - retry-reason: %d", message, resp.StatusCode) 337 } 338 } 339 340 func readAndCloseResponseBody(resp *http.Response) { 341 if resp == nil { 342 return 343 } 344 345 // Ensure the response body is fully read and closed 346 // before we reconnect, so that we reuse the same TCP 347 // connection. 348 const maxBodySlurpSize = 2 << 10 349 defer resp.Body.Close() 350 351 if resp.ContentLength <= maxBodySlurpSize { 352 io.Copy(io.Discard, &io.LimitedReader{R: resp.Body, N: maxBodySlurpSize}) 353 } 354 } 355 356 func retryAfterResponse() *http.Response { 357 return retryAfterResponseWithDelay("1") 358 } 359 360 func retryAfterResponseWithDelay(delay string) *http.Response { 361 return retryAfterResponseWithCodeAndDelay(http.StatusInternalServerError, delay) 362 } 363 364 func retryAfterResponseWithCodeAndDelay(code int, delay string) *http.Response { 365 return &http.Response{ 366 StatusCode: code, 367 Header: http.Header{"Retry-After": []string{delay}}, 368 } 369 } 370