...

Source file src/k8s.io/client-go/rest/with_retry.go

Documentation: k8s.io/client-go/rest

     1  /*
     2  Copyright 2021 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package rest
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"io"
    23  	"net/http"
    24  	"net/url"
    25  	"time"
    26  
    27  	"k8s.io/klog/v2"
    28  )
    29  
    30  // IsRetryableErrorFunc allows the client to provide its own function
    31  // that determines whether the specified err from the server is retryable.
    32  //
    33  // request: the original request sent to the server
    34  // err: the server sent this error to us
    35  //
    36  // The function returns true if the error is retryable and the request
    37  // can be retried, otherwise it returns false.
    38  // We have four mode of communications - 'Stream', 'Watch', 'Do' and 'DoRaw', this
    39  // function allows us to customize the retryability aspect of each.
    40  type IsRetryableErrorFunc func(request *http.Request, err error) bool
    41  
    42  func (r IsRetryableErrorFunc) IsErrorRetryable(request *http.Request, err error) bool {
    43  	return r(request, err)
    44  }
    45  
    46  var neverRetryError = IsRetryableErrorFunc(func(_ *http.Request, _ error) bool {
    47  	return false
    48  })
    49  
    50  // WithRetry allows the client to retry a request up to a certain number of times
    51  // Note that WithRetry is not safe for concurrent use by multiple
    52  // goroutines without additional locking or coordination.
    53  type WithRetry interface {
    54  	// IsNextRetry advances the retry counter appropriately
    55  	// and returns true if the request should be retried,
    56  	// otherwise it returns false, if:
    57  	//  - we have already reached the maximum retry threshold.
    58  	//  - the error does not fall into the retryable category.
    59  	//  - the server has not sent us a 429, or 5xx status code and the
    60  	//    'Retry-After' response header is not set with a value.
    61  	//  - we need to seek to the beginning of the request body before we
    62  	//    initiate the next retry, the function should log an error and
    63  	//    return false if it fails to do so.
    64  	//
    65  	// restReq: the associated rest.Request
    66  	// httpReq: the HTTP Request sent to the server
    67  	// resp: the response sent from the server, it is set if err is nil
    68  	// err: the server sent this error to us, if err is set then resp is nil.
    69  	// f: a IsRetryableErrorFunc function provided by the client that determines
    70  	//    if the err sent by the server is retryable.
    71  	IsNextRetry(ctx context.Context, restReq *Request, httpReq *http.Request, resp *http.Response, err error, f IsRetryableErrorFunc) bool
    72  
    73  	// Before should be invoked prior to each attempt, including
    74  	// the first one. If an error is returned, the request should
    75  	// be aborted immediately.
    76  	//
    77  	// Before may also be additionally responsible for preparing
    78  	// the request for the next retry, namely in terms of resetting
    79  	// the request body in case it has been read.
    80  	Before(ctx context.Context, r *Request) error
    81  
    82  	// After should be invoked immediately after an attempt is made.
    83  	After(ctx context.Context, r *Request, resp *http.Response, err error)
    84  
    85  	// WrapPreviousError wraps the error from any previous attempt into
    86  	// the final error specified in 'finalErr', so the user has more
    87  	// context why the request failed.
    88  	// For example, if a request times out after multiple retries then
    89  	// we see a generic context.Canceled or context.DeadlineExceeded
    90  	// error which is not very useful in debugging. This function can
    91  	// wrap any error from previous attempt(s) to provide more context to
    92  	// the user. The error returned in 'err' must satisfy the
    93  	// following conditions:
    94  	//  a: errors.Unwrap(err) = errors.Unwrap(finalErr) if finalErr
    95  	//     implements Unwrap
    96  	//  b: errors.Unwrap(err) = finalErr if finalErr does not
    97  	//     implements Unwrap
    98  	//  c: errors.Is(err, otherErr) = errors.Is(finalErr, otherErr)
    99  	WrapPreviousError(finalErr error) (err error)
   100  }
   101  
   102  // RetryAfter holds information associated with the next retry.
   103  type RetryAfter struct {
   104  	// Wait is the duration the server has asked us to wait before
   105  	// the next retry is initiated.
   106  	// This is the value of the 'Retry-After' response header in seconds.
   107  	Wait time.Duration
   108  
   109  	// Attempt is the Nth attempt after which we have received a retryable
   110  	// error or a 'Retry-After' response header from the server.
   111  	Attempt int
   112  
   113  	// Reason describes why we are retrying the request
   114  	Reason string
   115  }
   116  
   117  type withRetry struct {
   118  	maxRetries int
   119  	attempts   int
   120  
   121  	// retry after parameters that pertain to the attempt that is to
   122  	// be made soon, so as to enable 'Before' and 'After' to refer
   123  	// to the retry parameters.
   124  	//  - for the first attempt, it will always be nil
   125  	//  - for consecutive attempts, it is non nil and holds the
   126  	//    retry after parameters for the next attempt to be made.
   127  	retryAfter *RetryAfter
   128  
   129  	// we keep track of two most recent errors, if the most
   130  	// recent attempt is labeled as 'N' then:
   131  	//  - currentErr represents the error returned by attempt N, it
   132  	//    can be nil if attempt N did not return an error.
   133  	//  - previousErr represents an error from an attempt 'M' which
   134  	//    precedes attempt 'N' (N - M >= 1), it is non nil only when:
   135  	//      - for a sequence of attempt(s) 1..n (n>1), there
   136  	//        is an attempt k (k<n) that returned an error.
   137  	previousErr, currentErr error
   138  }
   139  
   140  func (r *withRetry) trackPreviousError(err error) {
   141  	// keep track of two most recent errors
   142  	if r.currentErr != nil {
   143  		r.previousErr = r.currentErr
   144  	}
   145  	r.currentErr = err
   146  }
   147  
   148  func (r *withRetry) IsNextRetry(ctx context.Context, restReq *Request, httpReq *http.Request, resp *http.Response, err error, f IsRetryableErrorFunc) bool {
   149  	defer r.trackPreviousError(err)
   150  
   151  	if httpReq == nil || (resp == nil && err == nil) {
   152  		// bad input, we do nothing.
   153  		return false
   154  	}
   155  
   156  	if restReq.body != nil {
   157  		// we have an opaque reader, we can't safely reset it
   158  		return false
   159  	}
   160  
   161  	r.attempts++
   162  	r.retryAfter = &RetryAfter{Attempt: r.attempts}
   163  	if r.attempts > r.maxRetries {
   164  		return false
   165  	}
   166  
   167  	// if the server returned an error, it takes precedence over the http response.
   168  	var errIsRetryable bool
   169  	if f != nil && err != nil && f.IsErrorRetryable(httpReq, err) {
   170  		errIsRetryable = true
   171  		// we have a retryable error, for which we will create an
   172  		// artificial "Retry-After" response.
   173  		resp = retryAfterResponse()
   174  	}
   175  	if err != nil && !errIsRetryable {
   176  		return false
   177  	}
   178  
   179  	// if we are here, we have either a or b:
   180  	//  a: we have a retryable error, for which we already
   181  	//     have an artificial "Retry-After" response.
   182  	//  b: we have a response from the server for which we
   183  	//     need to check if it is retryable
   184  	seconds, wait := checkWait(resp)
   185  	if !wait {
   186  		return false
   187  	}
   188  
   189  	r.retryAfter.Wait = time.Duration(seconds) * time.Second
   190  	r.retryAfter.Reason = getRetryReason(r.attempts, seconds, resp, err)
   191  
   192  	return true
   193  }
   194  
   195  func (r *withRetry) Before(ctx context.Context, request *Request) error {
   196  	// If the request context is already canceled there
   197  	// is no need to retry.
   198  	if ctx.Err() != nil {
   199  		r.trackPreviousError(ctx.Err())
   200  		return ctx.Err()
   201  	}
   202  
   203  	url := request.URL()
   204  	// r.retryAfter represents the retry after parameters calculated
   205  	// from the (response, err) tuple from the last attempt, so 'Before'
   206  	// can apply these retry after parameters prior to the next attempt.
   207  	// 'r.retryAfter == nil' indicates that this is the very first attempt.
   208  	if r.retryAfter == nil {
   209  		// we do a backoff sleep before the first attempt is made,
   210  		// (preserving current behavior).
   211  		if request.backoff != nil {
   212  			request.backoff.Sleep(request.backoff.CalculateBackoff(url))
   213  		}
   214  		return nil
   215  	}
   216  
   217  	// if we are here, we have made attempt(s) at least once before.
   218  	if request.backoff != nil {
   219  		delay := request.backoff.CalculateBackoff(url)
   220  		if r.retryAfter.Wait > delay {
   221  			delay = r.retryAfter.Wait
   222  		}
   223  		request.backoff.Sleep(delay)
   224  	}
   225  
   226  	// We are retrying the request that we already send to
   227  	// apiserver at least once before. This request should
   228  	// also be throttled with the client-internal rate limiter.
   229  	if err := request.tryThrottleWithInfo(ctx, r.retryAfter.Reason); err != nil {
   230  		r.trackPreviousError(ctx.Err())
   231  		return err
   232  	}
   233  
   234  	klog.V(4).Infof("Got a Retry-After %s response for attempt %d to %v", r.retryAfter.Wait, r.retryAfter.Attempt, request.URL().String())
   235  	return nil
   236  }
   237  
   238  func (r *withRetry) After(ctx context.Context, request *Request, resp *http.Response, err error) {
   239  	// 'After' is invoked immediately after an attempt is made, let's label
   240  	// the attempt we have just made as attempt 'N'.
   241  	// the current value of r.retryAfter represents the retry after
   242  	// parameters calculated from the (response, err) tuple from
   243  	// attempt N-1, so r.retryAfter is outdated and should not be
   244  	// referred to here.
   245  	isRetry := r.retryAfter != nil
   246  	r.retryAfter = nil
   247  
   248  	// the client finishes a single request after N attempts (1..N)
   249  	//  - all attempts (1..N) are counted to the rest_client_requests_total
   250  	//    metric (current behavior).
   251  	//  - every attempt after the first (2..N) are counted to the
   252  	//    rest_client_request_retries_total metric.
   253  	updateRequestResultMetric(ctx, request, resp, err)
   254  	if isRetry {
   255  		// this is attempt 2 or later
   256  		updateRequestRetryMetric(ctx, request, resp, err)
   257  	}
   258  
   259  	if request.c.base != nil {
   260  		if err != nil {
   261  			request.backoff.UpdateBackoff(request.URL(), err, 0)
   262  		} else {
   263  			request.backoff.UpdateBackoff(request.URL(), err, resp.StatusCode)
   264  		}
   265  	}
   266  }
   267  
   268  func (r *withRetry) WrapPreviousError(currentErr error) error {
   269  	if currentErr == nil || r.previousErr == nil {
   270  		return currentErr
   271  	}
   272  
   273  	// if both previous and current error objects represent the error,
   274  	// then there is no need to wrap the previous error.
   275  	if currentErr.Error() == r.previousErr.Error() {
   276  		return currentErr
   277  	}
   278  
   279  	previousErr := r.previousErr
   280  	// net/http wraps the underlying error with an url.Error, if the
   281  	// previous err object is an instance of url.Error, then we can
   282  	// unwrap it to get to the inner error object, this is so we can
   283  	// avoid error message like:
   284  	//  Error: Get "http://foo.bar/api/v1": context deadline exceeded - error \
   285  	//  from a previous attempt: Error: Get "http://foo.bar/api/v1": EOF
   286  	if urlErr, ok := r.previousErr.(*url.Error); ok && urlErr != nil {
   287  		if urlErr.Unwrap() != nil {
   288  			previousErr = urlErr.Unwrap()
   289  		}
   290  	}
   291  
   292  	return &wrapPreviousError{
   293  		currentErr:    currentErr,
   294  		previousError: previousErr,
   295  	}
   296  }
   297  
   298  type wrapPreviousError struct {
   299  	currentErr, previousError error
   300  }
   301  
   302  func (w *wrapPreviousError) Unwrap() error { return w.currentErr }
   303  func (w *wrapPreviousError) Error() string {
   304  	return fmt.Sprintf("%s - error from a previous attempt: %s", w.currentErr.Error(), w.previousError.Error())
   305  }
   306  
   307  // checkWait returns true along with a number of seconds if
   308  // the server instructed us to wait before retrying.
   309  func checkWait(resp *http.Response) (int, bool) {
   310  	switch r := resp.StatusCode; {
   311  	// any 500 error code and 429 can trigger a wait
   312  	case r == http.StatusTooManyRequests, r >= 500:
   313  	default:
   314  		return 0, false
   315  	}
   316  	i, ok := retryAfterSeconds(resp)
   317  	return i, ok
   318  }
   319  
   320  func getRetryReason(retries, seconds int, resp *http.Response, err error) string {
   321  	// priority and fairness sets the UID of the FlowSchema
   322  	// associated with a request in the following response Header.
   323  	const responseHeaderMatchedFlowSchemaUID = "X-Kubernetes-PF-FlowSchema-UID"
   324  
   325  	message := fmt.Sprintf("retries: %d, retry-after: %ds", retries, seconds)
   326  
   327  	switch {
   328  	case resp.StatusCode == http.StatusTooManyRequests:
   329  		// it is server-side throttling from priority and fairness
   330  		flowSchemaUID := resp.Header.Get(responseHeaderMatchedFlowSchemaUID)
   331  		return fmt.Sprintf("%s - retry-reason: due to server-side throttling, FlowSchema UID: %q", message, flowSchemaUID)
   332  	case err != nil:
   333  		// it's a retryable error
   334  		return fmt.Sprintf("%s - retry-reason: due to retryable error, error: %v", message, err)
   335  	default:
   336  		return fmt.Sprintf("%s - retry-reason: %d", message, resp.StatusCode)
   337  	}
   338  }
   339  
   340  func readAndCloseResponseBody(resp *http.Response) {
   341  	if resp == nil {
   342  		return
   343  	}
   344  
   345  	// Ensure the response body is fully read and closed
   346  	// before we reconnect, so that we reuse the same TCP
   347  	// connection.
   348  	const maxBodySlurpSize = 2 << 10
   349  	defer resp.Body.Close()
   350  
   351  	if resp.ContentLength <= maxBodySlurpSize {
   352  		io.Copy(io.Discard, &io.LimitedReader{R: resp.Body, N: maxBodySlurpSize})
   353  	}
   354  }
   355  
   356  func retryAfterResponse() *http.Response {
   357  	return retryAfterResponseWithDelay("1")
   358  }
   359  
   360  func retryAfterResponseWithDelay(delay string) *http.Response {
   361  	return retryAfterResponseWithCodeAndDelay(http.StatusInternalServerError, delay)
   362  }
   363  
   364  func retryAfterResponseWithCodeAndDelay(code int, delay string) *http.Response {
   365  	return &http.Response{
   366  		StatusCode: code,
   367  		Header:     http.Header{"Retry-After": []string{delay}},
   368  	}
   369  }
   370  

View as plain text