...

Source file src/k8s.io/client-go/rest/request.go

Documentation: k8s.io/client-go/rest

     1  /*
     2  Copyright 2014 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package rest
    18  
    19  import (
    20  	"bytes"
    21  	"context"
    22  	"encoding/hex"
    23  	"fmt"
    24  	"io"
    25  	"mime"
    26  	"net/http"
    27  	"net/http/httptrace"
    28  	"net/url"
    29  	"os"
    30  	"path"
    31  	"reflect"
    32  	"strconv"
    33  	"strings"
    34  	"sync"
    35  	"time"
    36  
    37  	"golang.org/x/net/http2"
    38  
    39  	"k8s.io/apimachinery/pkg/api/errors"
    40  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    41  	"k8s.io/apimachinery/pkg/runtime"
    42  	"k8s.io/apimachinery/pkg/runtime/schema"
    43  	"k8s.io/apimachinery/pkg/runtime/serializer/streaming"
    44  	"k8s.io/apimachinery/pkg/util/net"
    45  	"k8s.io/apimachinery/pkg/watch"
    46  	restclientwatch "k8s.io/client-go/rest/watch"
    47  	"k8s.io/client-go/tools/metrics"
    48  	"k8s.io/client-go/util/flowcontrol"
    49  	"k8s.io/klog/v2"
    50  	"k8s.io/utils/clock"
    51  )
    52  
    53  var (
    54  	// longThrottleLatency defines threshold for logging requests. All requests being
    55  	// throttled (via the provided rateLimiter) for more than longThrottleLatency will
    56  	// be logged.
    57  	longThrottleLatency = 50 * time.Millisecond
    58  
    59  	// extraLongThrottleLatency defines the threshold for logging requests at log level 2.
    60  	extraLongThrottleLatency = 1 * time.Second
    61  )
    62  
    63  // HTTPClient is an interface for testing a request object.
    64  type HTTPClient interface {
    65  	Do(req *http.Request) (*http.Response, error)
    66  }
    67  
    68  // ResponseWrapper is an interface for getting a response.
    69  // The response may be either accessed as a raw data (the whole output is put into memory) or as a stream.
    70  type ResponseWrapper interface {
    71  	DoRaw(context.Context) ([]byte, error)
    72  	Stream(context.Context) (io.ReadCloser, error)
    73  }
    74  
    75  // RequestConstructionError is returned when there's an error assembling a request.
    76  type RequestConstructionError struct {
    77  	Err error
    78  }
    79  
    80  // Error returns a textual description of 'r'.
    81  func (r *RequestConstructionError) Error() string {
    82  	return fmt.Sprintf("request construction error: '%v'", r.Err)
    83  }
    84  
    85  var noBackoff = &NoBackoff{}
    86  
    87  type requestRetryFunc func(maxRetries int) WithRetry
    88  
    89  func defaultRequestRetryFn(maxRetries int) WithRetry {
    90  	return &withRetry{maxRetries: maxRetries}
    91  }
    92  
    93  // Request allows for building up a request to a server in a chained fashion.
    94  // Any errors are stored until the end of your call, so you only have to
    95  // check once.
    96  type Request struct {
    97  	c *RESTClient
    98  
    99  	warningHandler WarningHandler
   100  
   101  	rateLimiter flowcontrol.RateLimiter
   102  	backoff     BackoffManager
   103  	timeout     time.Duration
   104  	maxRetries  int
   105  
   106  	// generic components accessible via method setters
   107  	verb       string
   108  	pathPrefix string
   109  	subpath    string
   110  	params     url.Values
   111  	headers    http.Header
   112  
   113  	// structural elements of the request that are part of the Kubernetes API conventions
   114  	namespace    string
   115  	namespaceSet bool
   116  	resource     string
   117  	resourceName string
   118  	subresource  string
   119  
   120  	// output
   121  	err error
   122  
   123  	// only one of body / bodyBytes may be set. requests using body are not retriable.
   124  	body      io.Reader
   125  	bodyBytes []byte
   126  
   127  	retryFn requestRetryFunc
   128  }
   129  
   130  // NewRequest creates a new request helper object for accessing runtime.Objects on a server.
   131  func NewRequest(c *RESTClient) *Request {
   132  	var backoff BackoffManager
   133  	if c.createBackoffMgr != nil {
   134  		backoff = c.createBackoffMgr()
   135  	}
   136  	if backoff == nil {
   137  		backoff = noBackoff
   138  	}
   139  
   140  	var pathPrefix string
   141  	if c.base != nil {
   142  		pathPrefix = path.Join("/", c.base.Path, c.versionedAPIPath)
   143  	} else {
   144  		pathPrefix = path.Join("/", c.versionedAPIPath)
   145  	}
   146  
   147  	var timeout time.Duration
   148  	if c.Client != nil {
   149  		timeout = c.Client.Timeout
   150  	}
   151  
   152  	r := &Request{
   153  		c:              c,
   154  		rateLimiter:    c.rateLimiter,
   155  		backoff:        backoff,
   156  		timeout:        timeout,
   157  		pathPrefix:     pathPrefix,
   158  		maxRetries:     10,
   159  		retryFn:        defaultRequestRetryFn,
   160  		warningHandler: c.warningHandler,
   161  	}
   162  
   163  	switch {
   164  	case len(c.content.AcceptContentTypes) > 0:
   165  		r.SetHeader("Accept", c.content.AcceptContentTypes)
   166  	case len(c.content.ContentType) > 0:
   167  		r.SetHeader("Accept", c.content.ContentType+", */*")
   168  	}
   169  	return r
   170  }
   171  
   172  // NewRequestWithClient creates a Request with an embedded RESTClient for use in test scenarios.
   173  func NewRequestWithClient(base *url.URL, versionedAPIPath string, content ClientContentConfig, client *http.Client) *Request {
   174  	return NewRequest(&RESTClient{
   175  		base:             base,
   176  		versionedAPIPath: versionedAPIPath,
   177  		content:          content,
   178  		Client:           client,
   179  	})
   180  }
   181  
   182  // Verb sets the verb this request will use.
   183  func (r *Request) Verb(verb string) *Request {
   184  	r.verb = verb
   185  	return r
   186  }
   187  
   188  // Prefix adds segments to the relative beginning to the request path. These
   189  // items will be placed before the optional Namespace, Resource, or Name sections.
   190  // Setting AbsPath will clear any previously set Prefix segments
   191  func (r *Request) Prefix(segments ...string) *Request {
   192  	if r.err != nil {
   193  		return r
   194  	}
   195  	r.pathPrefix = path.Join(r.pathPrefix, path.Join(segments...))
   196  	return r
   197  }
   198  
   199  // Suffix appends segments to the end of the path. These items will be placed after the prefix and optional
   200  // Namespace, Resource, or Name sections.
   201  func (r *Request) Suffix(segments ...string) *Request {
   202  	if r.err != nil {
   203  		return r
   204  	}
   205  	r.subpath = path.Join(r.subpath, path.Join(segments...))
   206  	return r
   207  }
   208  
   209  // Resource sets the resource to access (<resource>/[ns/<namespace>/]<name>)
   210  func (r *Request) Resource(resource string) *Request {
   211  	if r.err != nil {
   212  		return r
   213  	}
   214  	if len(r.resource) != 0 {
   215  		r.err = fmt.Errorf("resource already set to %q, cannot change to %q", r.resource, resource)
   216  		return r
   217  	}
   218  	if msgs := IsValidPathSegmentName(resource); len(msgs) != 0 {
   219  		r.err = fmt.Errorf("invalid resource %q: %v", resource, msgs)
   220  		return r
   221  	}
   222  	r.resource = resource
   223  	return r
   224  }
   225  
   226  // BackOff sets the request's backoff manager to the one specified,
   227  // or defaults to the stub implementation if nil is provided
   228  func (r *Request) BackOff(manager BackoffManager) *Request {
   229  	if manager == nil {
   230  		r.backoff = &NoBackoff{}
   231  		return r
   232  	}
   233  
   234  	r.backoff = manager
   235  	return r
   236  }
   237  
   238  // WarningHandler sets the handler this client uses when warning headers are encountered.
   239  // If set to nil, this client will use the default warning handler (see SetDefaultWarningHandler).
   240  func (r *Request) WarningHandler(handler WarningHandler) *Request {
   241  	r.warningHandler = handler
   242  	return r
   243  }
   244  
   245  // Throttle receives a rate-limiter and sets or replaces an existing request limiter
   246  func (r *Request) Throttle(limiter flowcontrol.RateLimiter) *Request {
   247  	r.rateLimiter = limiter
   248  	return r
   249  }
   250  
   251  // SubResource sets a sub-resource path which can be multiple segments after the resource
   252  // name but before the suffix.
   253  func (r *Request) SubResource(subresources ...string) *Request {
   254  	if r.err != nil {
   255  		return r
   256  	}
   257  	subresource := path.Join(subresources...)
   258  	if len(r.subresource) != 0 {
   259  		r.err = fmt.Errorf("subresource already set to %q, cannot change to %q", r.subresource, subresource)
   260  		return r
   261  	}
   262  	for _, s := range subresources {
   263  		if msgs := IsValidPathSegmentName(s); len(msgs) != 0 {
   264  			r.err = fmt.Errorf("invalid subresource %q: %v", s, msgs)
   265  			return r
   266  		}
   267  	}
   268  	r.subresource = subresource
   269  	return r
   270  }
   271  
   272  // Name sets the name of a resource to access (<resource>/[ns/<namespace>/]<name>)
   273  func (r *Request) Name(resourceName string) *Request {
   274  	if r.err != nil {
   275  		return r
   276  	}
   277  	if len(resourceName) == 0 {
   278  		r.err = fmt.Errorf("resource name may not be empty")
   279  		return r
   280  	}
   281  	if len(r.resourceName) != 0 {
   282  		r.err = fmt.Errorf("resource name already set to %q, cannot change to %q", r.resourceName, resourceName)
   283  		return r
   284  	}
   285  	if msgs := IsValidPathSegmentName(resourceName); len(msgs) != 0 {
   286  		r.err = fmt.Errorf("invalid resource name %q: %v", resourceName, msgs)
   287  		return r
   288  	}
   289  	r.resourceName = resourceName
   290  	return r
   291  }
   292  
   293  // Namespace applies the namespace scope to a request (<resource>/[ns/<namespace>/]<name>)
   294  func (r *Request) Namespace(namespace string) *Request {
   295  	if r.err != nil {
   296  		return r
   297  	}
   298  	if r.namespaceSet {
   299  		r.err = fmt.Errorf("namespace already set to %q, cannot change to %q", r.namespace, namespace)
   300  		return r
   301  	}
   302  	if msgs := IsValidPathSegmentName(namespace); len(msgs) != 0 {
   303  		r.err = fmt.Errorf("invalid namespace %q: %v", namespace, msgs)
   304  		return r
   305  	}
   306  	r.namespaceSet = true
   307  	r.namespace = namespace
   308  	return r
   309  }
   310  
   311  // NamespaceIfScoped is a convenience function to set a namespace if scoped is true
   312  func (r *Request) NamespaceIfScoped(namespace string, scoped bool) *Request {
   313  	if scoped {
   314  		return r.Namespace(namespace)
   315  	}
   316  	return r
   317  }
   318  
   319  // AbsPath overwrites an existing path with the segments provided. Trailing slashes are preserved
   320  // when a single segment is passed.
   321  func (r *Request) AbsPath(segments ...string) *Request {
   322  	if r.err != nil {
   323  		return r
   324  	}
   325  	r.pathPrefix = path.Join(r.c.base.Path, path.Join(segments...))
   326  	if len(segments) == 1 && (len(r.c.base.Path) > 1 || len(segments[0]) > 1) && strings.HasSuffix(segments[0], "/") {
   327  		// preserve any trailing slashes for legacy behavior
   328  		r.pathPrefix += "/"
   329  	}
   330  	return r
   331  }
   332  
   333  // RequestURI overwrites existing path and parameters with the value of the provided server relative
   334  // URI.
   335  func (r *Request) RequestURI(uri string) *Request {
   336  	if r.err != nil {
   337  		return r
   338  	}
   339  	locator, err := url.Parse(uri)
   340  	if err != nil {
   341  		r.err = err
   342  		return r
   343  	}
   344  	r.pathPrefix = locator.Path
   345  	if len(locator.Query()) > 0 {
   346  		if r.params == nil {
   347  			r.params = make(url.Values)
   348  		}
   349  		for k, v := range locator.Query() {
   350  			r.params[k] = v
   351  		}
   352  	}
   353  	return r
   354  }
   355  
   356  // Param creates a query parameter with the given string value.
   357  func (r *Request) Param(paramName, s string) *Request {
   358  	if r.err != nil {
   359  		return r
   360  	}
   361  	return r.setParam(paramName, s)
   362  }
   363  
   364  // VersionedParams will take the provided object, serialize it to a map[string][]string using the
   365  // implicit RESTClient API version and the default parameter codec, and then add those as parameters
   366  // to the request. Use this to provide versioned query parameters from client libraries.
   367  // VersionedParams will not write query parameters that have omitempty set and are empty. If a
   368  // parameter has already been set it is appended to (Params and VersionedParams are additive).
   369  func (r *Request) VersionedParams(obj runtime.Object, codec runtime.ParameterCodec) *Request {
   370  	return r.SpecificallyVersionedParams(obj, codec, r.c.content.GroupVersion)
   371  }
   372  
   373  func (r *Request) SpecificallyVersionedParams(obj runtime.Object, codec runtime.ParameterCodec, version schema.GroupVersion) *Request {
   374  	if r.err != nil {
   375  		return r
   376  	}
   377  	params, err := codec.EncodeParameters(obj, version)
   378  	if err != nil {
   379  		r.err = err
   380  		return r
   381  	}
   382  	for k, v := range params {
   383  		if r.params == nil {
   384  			r.params = make(url.Values)
   385  		}
   386  		r.params[k] = append(r.params[k], v...)
   387  	}
   388  	return r
   389  }
   390  
   391  func (r *Request) setParam(paramName, value string) *Request {
   392  	if r.params == nil {
   393  		r.params = make(url.Values)
   394  	}
   395  	r.params[paramName] = append(r.params[paramName], value)
   396  	return r
   397  }
   398  
   399  func (r *Request) SetHeader(key string, values ...string) *Request {
   400  	if r.headers == nil {
   401  		r.headers = http.Header{}
   402  	}
   403  	r.headers.Del(key)
   404  	for _, value := range values {
   405  		r.headers.Add(key, value)
   406  	}
   407  	return r
   408  }
   409  
   410  // Timeout makes the request use the given duration as an overall timeout for the
   411  // request. Additionally, if set passes the value as "timeout" parameter in URL.
   412  func (r *Request) Timeout(d time.Duration) *Request {
   413  	if r.err != nil {
   414  		return r
   415  	}
   416  	r.timeout = d
   417  	return r
   418  }
   419  
   420  // MaxRetries makes the request use the given integer as a ceiling of retrying upon receiving
   421  // "Retry-After" headers and 429 status-code in the response. The default is 10 unless this
   422  // function is specifically called with a different value.
   423  // A zero maxRetries prevent it from doing retires and return an error immediately.
   424  func (r *Request) MaxRetries(maxRetries int) *Request {
   425  	if maxRetries < 0 {
   426  		maxRetries = 0
   427  	}
   428  	r.maxRetries = maxRetries
   429  	return r
   430  }
   431  
   432  // Body makes the request use obj as the body. Optional.
   433  // If obj is a string, try to read a file of that name.
   434  // If obj is a []byte, send it directly.
   435  // If obj is an io.Reader, use it directly.
   436  // If obj is a runtime.Object, marshal it correctly, and set Content-Type header.
   437  // If obj is a runtime.Object and nil, do nothing.
   438  // Otherwise, set an error.
   439  func (r *Request) Body(obj interface{}) *Request {
   440  	if r.err != nil {
   441  		return r
   442  	}
   443  	switch t := obj.(type) {
   444  	case string:
   445  		data, err := os.ReadFile(t)
   446  		if err != nil {
   447  			r.err = err
   448  			return r
   449  		}
   450  		glogBody("Request Body", data)
   451  		r.body = nil
   452  		r.bodyBytes = data
   453  	case []byte:
   454  		glogBody("Request Body", t)
   455  		r.body = nil
   456  		r.bodyBytes = t
   457  	case io.Reader:
   458  		r.body = t
   459  		r.bodyBytes = nil
   460  	case runtime.Object:
   461  		// callers may pass typed interface pointers, therefore we must check nil with reflection
   462  		if reflect.ValueOf(t).IsNil() {
   463  			return r
   464  		}
   465  		encoder, err := r.c.content.Negotiator.Encoder(r.c.content.ContentType, nil)
   466  		if err != nil {
   467  			r.err = err
   468  			return r
   469  		}
   470  		data, err := runtime.Encode(encoder, t)
   471  		if err != nil {
   472  			r.err = err
   473  			return r
   474  		}
   475  		glogBody("Request Body", data)
   476  		r.body = nil
   477  		r.bodyBytes = data
   478  		r.SetHeader("Content-Type", r.c.content.ContentType)
   479  	default:
   480  		r.err = fmt.Errorf("unknown type used for body: %+v", obj)
   481  	}
   482  	return r
   483  }
   484  
   485  // Error returns any error encountered constructing the request, if any.
   486  func (r *Request) Error() error {
   487  	return r.err
   488  }
   489  
   490  // URL returns the current working URL. Check the result of Error() to ensure
   491  // that the returned URL is valid.
   492  func (r *Request) URL() *url.URL {
   493  	p := r.pathPrefix
   494  	if r.namespaceSet && len(r.namespace) > 0 {
   495  		p = path.Join(p, "namespaces", r.namespace)
   496  	}
   497  	if len(r.resource) != 0 {
   498  		p = path.Join(p, strings.ToLower(r.resource))
   499  	}
   500  	// Join trims trailing slashes, so preserve r.pathPrefix's trailing slash for backwards compatibility if nothing was changed
   501  	if len(r.resourceName) != 0 || len(r.subpath) != 0 || len(r.subresource) != 0 {
   502  		p = path.Join(p, r.resourceName, r.subresource, r.subpath)
   503  	}
   504  
   505  	finalURL := &url.URL{}
   506  	if r.c.base != nil {
   507  		*finalURL = *r.c.base
   508  	}
   509  	finalURL.Path = p
   510  
   511  	query := url.Values{}
   512  	for key, values := range r.params {
   513  		for _, value := range values {
   514  			query.Add(key, value)
   515  		}
   516  	}
   517  
   518  	// timeout is handled specially here.
   519  	if r.timeout != 0 {
   520  		query.Set("timeout", r.timeout.String())
   521  	}
   522  	finalURL.RawQuery = query.Encode()
   523  	return finalURL
   524  }
   525  
   526  // finalURLTemplate is similar to URL(), but will make all specific parameter values equal
   527  // - instead of name or namespace, "{name}" and "{namespace}" will be used, and all query
   528  // parameters will be reset. This creates a copy of the url so as not to change the
   529  // underlying object.
   530  func (r Request) finalURLTemplate() url.URL {
   531  	newParams := url.Values{}
   532  	v := []string{"{value}"}
   533  	for k := range r.params {
   534  		newParams[k] = v
   535  	}
   536  	r.params = newParams
   537  	u := r.URL()
   538  	if u == nil {
   539  		return url.URL{}
   540  	}
   541  
   542  	segments := strings.Split(u.Path, "/")
   543  	groupIndex := 0
   544  	index := 0
   545  	trimmedBasePath := ""
   546  	if r.c.base != nil && strings.Contains(u.Path, r.c.base.Path) {
   547  		p := strings.TrimPrefix(u.Path, r.c.base.Path)
   548  		if !strings.HasPrefix(p, "/") {
   549  			p = "/" + p
   550  		}
   551  		// store the base path that we have trimmed so we can append it
   552  		// before returning the URL
   553  		trimmedBasePath = r.c.base.Path
   554  		segments = strings.Split(p, "/")
   555  		groupIndex = 1
   556  	}
   557  	if len(segments) <= 2 {
   558  		return *u
   559  	}
   560  
   561  	const CoreGroupPrefix = "api"
   562  	const NamedGroupPrefix = "apis"
   563  	isCoreGroup := segments[groupIndex] == CoreGroupPrefix
   564  	isNamedGroup := segments[groupIndex] == NamedGroupPrefix
   565  	if isCoreGroup {
   566  		// checking the case of core group with /api/v1/... format
   567  		index = groupIndex + 2
   568  	} else if isNamedGroup {
   569  		// checking the case of named group with /apis/apps/v1/... format
   570  		index = groupIndex + 3
   571  	} else {
   572  		// this should not happen that the only two possibilities are /api... and /apis..., just want to put an
   573  		// outlet here in case more API groups are added in future if ever possible:
   574  		// https://kubernetes.io/docs/concepts/overview/kubernetes-api/#api-groups
   575  		// if a wrong API groups name is encountered, return the {prefix} for url.Path
   576  		u.Path = "/{prefix}"
   577  		u.RawQuery = ""
   578  		return *u
   579  	}
   580  	// switch segLength := len(segments) - index; segLength {
   581  	switch {
   582  	// case len(segments) - index == 1:
   583  	// resource (with no name) do nothing
   584  	case len(segments)-index == 2:
   585  		// /$RESOURCE/$NAME: replace $NAME with {name}
   586  		segments[index+1] = "{name}"
   587  	case len(segments)-index == 3:
   588  		if segments[index+2] == "finalize" || segments[index+2] == "status" {
   589  			// /$RESOURCE/$NAME/$SUBRESOURCE: replace $NAME with {name}
   590  			segments[index+1] = "{name}"
   591  		} else {
   592  			// /namespace/$NAMESPACE/$RESOURCE: replace $NAMESPACE with {namespace}
   593  			segments[index+1] = "{namespace}"
   594  		}
   595  	case len(segments)-index >= 4:
   596  		segments[index+1] = "{namespace}"
   597  		// /namespace/$NAMESPACE/$RESOURCE/$NAME: replace $NAMESPACE with {namespace},  $NAME with {name}
   598  		if segments[index+3] != "finalize" && segments[index+3] != "status" {
   599  			// /$RESOURCE/$NAME/$SUBRESOURCE: replace $NAME with {name}
   600  			segments[index+3] = "{name}"
   601  		}
   602  	}
   603  	u.Path = path.Join(trimmedBasePath, path.Join(segments...))
   604  	return *u
   605  }
   606  
   607  func (r *Request) tryThrottleWithInfo(ctx context.Context, retryInfo string) error {
   608  	if r.rateLimiter == nil {
   609  		return nil
   610  	}
   611  
   612  	now := time.Now()
   613  
   614  	err := r.rateLimiter.Wait(ctx)
   615  	if err != nil {
   616  		err = fmt.Errorf("client rate limiter Wait returned an error: %w", err)
   617  	}
   618  	latency := time.Since(now)
   619  
   620  	var message string
   621  	switch {
   622  	case len(retryInfo) > 0:
   623  		message = fmt.Sprintf("Waited for %v, %s - request: %s:%s", latency, retryInfo, r.verb, r.URL().String())
   624  	default:
   625  		message = fmt.Sprintf("Waited for %v due to client-side throttling, not priority and fairness, request: %s:%s", latency, r.verb, r.URL().String())
   626  	}
   627  
   628  	if latency > longThrottleLatency {
   629  		klog.V(3).Info(message)
   630  	}
   631  	if latency > extraLongThrottleLatency {
   632  		// If the rate limiter latency is very high, the log message should be printed at a higher log level,
   633  		// but we use a throttled logger to prevent spamming.
   634  		globalThrottledLogger.Infof("%s", message)
   635  	}
   636  	metrics.RateLimiterLatency.Observe(ctx, r.verb, r.finalURLTemplate(), latency)
   637  
   638  	return err
   639  }
   640  
   641  func (r *Request) tryThrottle(ctx context.Context) error {
   642  	return r.tryThrottleWithInfo(ctx, "")
   643  }
   644  
   645  type throttleSettings struct {
   646  	logLevel       klog.Level
   647  	minLogInterval time.Duration
   648  
   649  	lastLogTime time.Time
   650  	lock        sync.RWMutex
   651  }
   652  
   653  type throttledLogger struct {
   654  	clock    clock.PassiveClock
   655  	settings []*throttleSettings
   656  }
   657  
   658  var globalThrottledLogger = &throttledLogger{
   659  	clock: clock.RealClock{},
   660  	settings: []*throttleSettings{
   661  		{
   662  			logLevel:       2,
   663  			minLogInterval: 1 * time.Second,
   664  		}, {
   665  			logLevel:       0,
   666  			minLogInterval: 10 * time.Second,
   667  		},
   668  	},
   669  }
   670  
   671  func (b *throttledLogger) attemptToLog() (klog.Level, bool) {
   672  	for _, setting := range b.settings {
   673  		if bool(klog.V(setting.logLevel).Enabled()) {
   674  			// Return early without write locking if possible.
   675  			if func() bool {
   676  				setting.lock.RLock()
   677  				defer setting.lock.RUnlock()
   678  				return b.clock.Since(setting.lastLogTime) >= setting.minLogInterval
   679  			}() {
   680  				setting.lock.Lock()
   681  				defer setting.lock.Unlock()
   682  				if b.clock.Since(setting.lastLogTime) >= setting.minLogInterval {
   683  					setting.lastLogTime = b.clock.Now()
   684  					return setting.logLevel, true
   685  				}
   686  			}
   687  			return -1, false
   688  		}
   689  	}
   690  	return -1, false
   691  }
   692  
   693  // Infof will write a log message at each logLevel specified by the receiver's throttleSettings
   694  // as long as it hasn't written a log message more recently than minLogInterval.
   695  func (b *throttledLogger) Infof(message string, args ...interface{}) {
   696  	if logLevel, ok := b.attemptToLog(); ok {
   697  		klog.V(logLevel).Infof(message, args...)
   698  	}
   699  }
   700  
   701  // Watch attempts to begin watching the requested location.
   702  // Returns a watch.Interface, or an error.
   703  func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
   704  	// We specifically don't want to rate limit watches, so we
   705  	// don't use r.rateLimiter here.
   706  	if r.err != nil {
   707  		return nil, r.err
   708  	}
   709  
   710  	client := r.c.Client
   711  	if client == nil {
   712  		client = http.DefaultClient
   713  	}
   714  
   715  	isErrRetryableFunc := func(request *http.Request, err error) bool {
   716  		// The watch stream mechanism handles many common partial data errors, so closed
   717  		// connections can be retried in many cases.
   718  		if net.IsProbableEOF(err) || net.IsTimeout(err) {
   719  			return true
   720  		}
   721  		return false
   722  	}
   723  	retry := r.retryFn(r.maxRetries)
   724  	url := r.URL().String()
   725  	for {
   726  		if err := retry.Before(ctx, r); err != nil {
   727  			return nil, retry.WrapPreviousError(err)
   728  		}
   729  
   730  		req, err := r.newHTTPRequest(ctx)
   731  		if err != nil {
   732  			return nil, err
   733  		}
   734  
   735  		resp, err := client.Do(req)
   736  		retry.After(ctx, r, resp, err)
   737  		if err == nil && resp.StatusCode == http.StatusOK {
   738  			return r.newStreamWatcher(resp)
   739  		}
   740  
   741  		done, transformErr := func() (bool, error) {
   742  			defer readAndCloseResponseBody(resp)
   743  
   744  			if retry.IsNextRetry(ctx, r, req, resp, err, isErrRetryableFunc) {
   745  				return false, nil
   746  			}
   747  
   748  			if resp == nil {
   749  				// the server must have sent us an error in 'err'
   750  				return true, nil
   751  			}
   752  			if result := r.transformResponse(resp, req); result.err != nil {
   753  				return true, result.err
   754  			}
   755  			return true, fmt.Errorf("for request %s, got status: %v", url, resp.StatusCode)
   756  		}()
   757  		if done {
   758  			if isErrRetryableFunc(req, err) {
   759  				return watch.NewEmptyWatch(), nil
   760  			}
   761  			if err == nil {
   762  				// if the server sent us an HTTP Response object,
   763  				// we need to return the error object from that.
   764  				err = transformErr
   765  			}
   766  			return nil, retry.WrapPreviousError(err)
   767  		}
   768  	}
   769  }
   770  
   771  func (r *Request) newStreamWatcher(resp *http.Response) (watch.Interface, error) {
   772  	contentType := resp.Header.Get("Content-Type")
   773  	mediaType, params, err := mime.ParseMediaType(contentType)
   774  	if err != nil {
   775  		klog.V(4).Infof("Unexpected content type from the server: %q: %v", contentType, err)
   776  	}
   777  	objectDecoder, streamingSerializer, framer, err := r.c.content.Negotiator.StreamDecoder(mediaType, params)
   778  	if err != nil {
   779  		return nil, err
   780  	}
   781  
   782  	handleWarnings(resp.Header, r.warningHandler)
   783  
   784  	frameReader := framer.NewFrameReader(resp.Body)
   785  	watchEventDecoder := streaming.NewDecoder(frameReader, streamingSerializer)
   786  
   787  	return watch.NewStreamWatcher(
   788  		restclientwatch.NewDecoder(watchEventDecoder, objectDecoder),
   789  		// use 500 to indicate that the cause of the error is unknown - other error codes
   790  		// are more specific to HTTP interactions, and set a reason
   791  		errors.NewClientErrorReporter(http.StatusInternalServerError, r.verb, "ClientWatchDecoding"),
   792  	), nil
   793  }
   794  
   795  // updateRequestResultMetric increments the RequestResult metric counter,
   796  // it should be called with the (response, err) tuple from the final
   797  // reply from the server.
   798  func updateRequestResultMetric(ctx context.Context, req *Request, resp *http.Response, err error) {
   799  	code, host := sanitize(req, resp, err)
   800  	metrics.RequestResult.Increment(ctx, code, req.verb, host)
   801  }
   802  
   803  // updateRequestRetryMetric increments the RequestRetry metric counter,
   804  // it should be called with the (response, err) tuple for each retry
   805  // except for the final attempt.
   806  func updateRequestRetryMetric(ctx context.Context, req *Request, resp *http.Response, err error) {
   807  	code, host := sanitize(req, resp, err)
   808  	metrics.RequestRetry.IncrementRetry(ctx, code, req.verb, host)
   809  }
   810  
   811  func sanitize(req *Request, resp *http.Response, err error) (string, string) {
   812  	host := "none"
   813  	if req.c.base != nil {
   814  		host = req.c.base.Host
   815  	}
   816  
   817  	// Errors can be arbitrary strings. Unbound label cardinality is not suitable for a metric
   818  	// system so we just report them as `<error>`.
   819  	code := "<error>"
   820  	if resp != nil {
   821  		code = strconv.Itoa(resp.StatusCode)
   822  	}
   823  
   824  	return code, host
   825  }
   826  
   827  // Stream formats and executes the request, and offers streaming of the response.
   828  // Returns io.ReadCloser which could be used for streaming of the response, or an error
   829  // Any non-2xx http status code causes an error.  If we get a non-2xx code, we try to convert the body into an APIStatus object.
   830  // If we can, we return that as an error.  Otherwise, we create an error that lists the http status and the content of the response.
   831  func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) {
   832  	if r.err != nil {
   833  		return nil, r.err
   834  	}
   835  
   836  	if err := r.tryThrottle(ctx); err != nil {
   837  		return nil, err
   838  	}
   839  
   840  	client := r.c.Client
   841  	if client == nil {
   842  		client = http.DefaultClient
   843  	}
   844  
   845  	retry := r.retryFn(r.maxRetries)
   846  	url := r.URL().String()
   847  	for {
   848  		if err := retry.Before(ctx, r); err != nil {
   849  			return nil, err
   850  		}
   851  
   852  		req, err := r.newHTTPRequest(ctx)
   853  		if err != nil {
   854  			return nil, err
   855  		}
   856  		resp, err := client.Do(req)
   857  		retry.After(ctx, r, resp, err)
   858  		if err != nil {
   859  			// we only retry on an HTTP response with 'Retry-After' header
   860  			return nil, err
   861  		}
   862  
   863  		switch {
   864  		case (resp.StatusCode >= 200) && (resp.StatusCode < 300):
   865  			handleWarnings(resp.Header, r.warningHandler)
   866  			return resp.Body, nil
   867  
   868  		default:
   869  			done, transformErr := func() (bool, error) {
   870  				defer resp.Body.Close()
   871  
   872  				if retry.IsNextRetry(ctx, r, req, resp, err, neverRetryError) {
   873  					return false, nil
   874  				}
   875  				result := r.transformResponse(resp, req)
   876  				if err := result.Error(); err != nil {
   877  					return true, err
   878  				}
   879  				return true, fmt.Errorf("%d while accessing %v: %s", result.statusCode, url, string(result.body))
   880  			}()
   881  			if done {
   882  				return nil, transformErr
   883  			}
   884  		}
   885  	}
   886  }
   887  
   888  // requestPreflightCheck looks for common programmer errors on Request.
   889  //
   890  // We tackle here two programmer mistakes. The first one is to try to create
   891  // something(POST) using an empty string as namespace with namespaceSet as
   892  // true. If namespaceSet is true then namespace should also be defined. The
   893  // second mistake is, when under the same circumstances, the programmer tries
   894  // to GET, PUT or DELETE a named resource(resourceName != ""), again, if
   895  // namespaceSet is true then namespace must not be empty.
   896  func (r *Request) requestPreflightCheck() error {
   897  	if !r.namespaceSet {
   898  		return nil
   899  	}
   900  	if len(r.namespace) > 0 {
   901  		return nil
   902  	}
   903  
   904  	switch r.verb {
   905  	case "POST":
   906  		return fmt.Errorf("an empty namespace may not be set during creation")
   907  	case "GET", "PUT", "DELETE":
   908  		if len(r.resourceName) > 0 {
   909  			return fmt.Errorf("an empty namespace may not be set when a resource name is provided")
   910  		}
   911  	}
   912  	return nil
   913  }
   914  
   915  func (r *Request) newHTTPRequest(ctx context.Context) (*http.Request, error) {
   916  	var body io.Reader
   917  	switch {
   918  	case r.body != nil && r.bodyBytes != nil:
   919  		return nil, fmt.Errorf("cannot set both body and bodyBytes")
   920  	case r.body != nil:
   921  		body = r.body
   922  	case r.bodyBytes != nil:
   923  		// Create a new reader specifically for this request.
   924  		// Giving each request a dedicated reader allows retries to avoid races resetting the request body.
   925  		body = bytes.NewReader(r.bodyBytes)
   926  	}
   927  
   928  	url := r.URL().String()
   929  	req, err := http.NewRequestWithContext(httptrace.WithClientTrace(ctx, newDNSMetricsTrace(ctx)), r.verb, url, body)
   930  	if err != nil {
   931  		return nil, err
   932  	}
   933  	req.Header = r.headers
   934  	return req, nil
   935  }
   936  
   937  // newDNSMetricsTrace returns an HTTP trace that tracks time spent on DNS lookups per host.
   938  // This metric is available in client as "rest_client_dns_resolution_duration_seconds".
   939  func newDNSMetricsTrace(ctx context.Context) *httptrace.ClientTrace {
   940  	type dnsMetric struct {
   941  		start time.Time
   942  		host  string
   943  		sync.Mutex
   944  	}
   945  	dns := &dnsMetric{}
   946  	return &httptrace.ClientTrace{
   947  		DNSStart: func(info httptrace.DNSStartInfo) {
   948  			dns.Lock()
   949  			defer dns.Unlock()
   950  			dns.start = time.Now()
   951  			dns.host = info.Host
   952  		},
   953  		DNSDone: func(info httptrace.DNSDoneInfo) {
   954  			dns.Lock()
   955  			defer dns.Unlock()
   956  			metrics.ResolverLatency.Observe(ctx, dns.host, time.Since(dns.start))
   957  		},
   958  	}
   959  }
   960  
   961  // request connects to the server and invokes the provided function when a server response is
   962  // received. It handles retry behavior and up front validation of requests. It will invoke
   963  // fn at most once. It will return an error if a problem occurred prior to connecting to the
   964  // server - the provided function is responsible for handling server errors.
   965  func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Response)) error {
   966  	// Metrics for total request latency
   967  	start := time.Now()
   968  	defer func() {
   969  		metrics.RequestLatency.Observe(ctx, r.verb, r.finalURLTemplate(), time.Since(start))
   970  	}()
   971  
   972  	if r.err != nil {
   973  		klog.V(4).Infof("Error in request: %v", r.err)
   974  		return r.err
   975  	}
   976  
   977  	if err := r.requestPreflightCheck(); err != nil {
   978  		return err
   979  	}
   980  
   981  	client := r.c.Client
   982  	if client == nil {
   983  		client = http.DefaultClient
   984  	}
   985  
   986  	// Throttle the first try before setting up the timeout configured on the
   987  	// client. We don't want a throttled client to return timeouts to callers
   988  	// before it makes a single request.
   989  	if err := r.tryThrottle(ctx); err != nil {
   990  		return err
   991  	}
   992  
   993  	if r.timeout > 0 {
   994  		var cancel context.CancelFunc
   995  		ctx, cancel = context.WithTimeout(ctx, r.timeout)
   996  		defer cancel()
   997  	}
   998  
   999  	isErrRetryableFunc := func(req *http.Request, err error) bool {
  1000  		// "Connection reset by peer" or "apiserver is shutting down" are usually a transient errors.
  1001  		// Thus in case of "GET" operations, we simply retry it.
  1002  		// We are not automatically retrying "write" operations, as they are not idempotent.
  1003  		if req.Method != "GET" {
  1004  			return false
  1005  		}
  1006  		// For connection errors and apiserver shutdown errors retry.
  1007  		if net.IsConnectionReset(err) || net.IsProbableEOF(err) {
  1008  			return true
  1009  		}
  1010  		return false
  1011  	}
  1012  
  1013  	// Right now we make about ten retry attempts if we get a Retry-After response.
  1014  	retry := r.retryFn(r.maxRetries)
  1015  	for {
  1016  		if err := retry.Before(ctx, r); err != nil {
  1017  			return retry.WrapPreviousError(err)
  1018  		}
  1019  		req, err := r.newHTTPRequest(ctx)
  1020  		if err != nil {
  1021  			return err
  1022  		}
  1023  		resp, err := client.Do(req)
  1024  		// The value -1 or a value of 0 with a non-nil Body indicates that the length is unknown.
  1025  		// https://pkg.go.dev/net/http#Request
  1026  		if req.ContentLength >= 0 && !(req.Body != nil && req.ContentLength == 0) {
  1027  			metrics.RequestSize.Observe(ctx, r.verb, r.URL().Host, float64(req.ContentLength))
  1028  		}
  1029  		retry.After(ctx, r, resp, err)
  1030  
  1031  		done := func() bool {
  1032  			defer readAndCloseResponseBody(resp)
  1033  
  1034  			// if the server returns an error in err, the response will be nil.
  1035  			f := func(req *http.Request, resp *http.Response) {
  1036  				if resp == nil {
  1037  					return
  1038  				}
  1039  				fn(req, resp)
  1040  			}
  1041  
  1042  			if retry.IsNextRetry(ctx, r, req, resp, err, isErrRetryableFunc) {
  1043  				return false
  1044  			}
  1045  
  1046  			f(req, resp)
  1047  			return true
  1048  		}()
  1049  		if done {
  1050  			return retry.WrapPreviousError(err)
  1051  		}
  1052  	}
  1053  }
  1054  
  1055  // Do formats and executes the request. Returns a Result object for easy response
  1056  // processing.
  1057  //
  1058  // Error type:
  1059  //   - If the server responds with a status: *errors.StatusError or *errors.UnexpectedObjectError
  1060  //   - http.Client.Do errors are returned directly.
  1061  func (r *Request) Do(ctx context.Context) Result {
  1062  	var result Result
  1063  	err := r.request(ctx, func(req *http.Request, resp *http.Response) {
  1064  		result = r.transformResponse(resp, req)
  1065  	})
  1066  	if err != nil {
  1067  		return Result{err: err}
  1068  	}
  1069  	if result.err == nil || len(result.body) > 0 {
  1070  		metrics.ResponseSize.Observe(ctx, r.verb, r.URL().Host, float64(len(result.body)))
  1071  	}
  1072  	return result
  1073  }
  1074  
  1075  // DoRaw executes the request but does not process the response body.
  1076  func (r *Request) DoRaw(ctx context.Context) ([]byte, error) {
  1077  	var result Result
  1078  	err := r.request(ctx, func(req *http.Request, resp *http.Response) {
  1079  		result.body, result.err = io.ReadAll(resp.Body)
  1080  		glogBody("Response Body", result.body)
  1081  		if resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent {
  1082  			result.err = r.transformUnstructuredResponseError(resp, req, result.body)
  1083  		}
  1084  	})
  1085  	if err != nil {
  1086  		return nil, err
  1087  	}
  1088  	if result.err == nil || len(result.body) > 0 {
  1089  		metrics.ResponseSize.Observe(ctx, r.verb, r.URL().Host, float64(len(result.body)))
  1090  	}
  1091  	return result.body, result.err
  1092  }
  1093  
  1094  // transformResponse converts an API response into a structured API object
  1095  func (r *Request) transformResponse(resp *http.Response, req *http.Request) Result {
  1096  	var body []byte
  1097  	if resp.Body != nil {
  1098  		data, err := io.ReadAll(resp.Body)
  1099  		switch err.(type) {
  1100  		case nil:
  1101  			body = data
  1102  		case http2.StreamError:
  1103  			// This is trying to catch the scenario that the server may close the connection when sending the
  1104  			// response body. This can be caused by server timeout due to a slow network connection.
  1105  			// TODO: Add test for this. Steps may be:
  1106  			// 1. client-go (or kubectl) sends a GET request.
  1107  			// 2. Apiserver sends back the headers and then part of the body
  1108  			// 3. Apiserver closes connection.
  1109  			// 4. client-go should catch this and return an error.
  1110  			klog.V(2).Infof("Stream error %#v when reading response body, may be caused by closed connection.", err)
  1111  			streamErr := fmt.Errorf("stream error when reading response body, may be caused by closed connection. Please retry. Original error: %w", err)
  1112  			return Result{
  1113  				err: streamErr,
  1114  			}
  1115  		default:
  1116  			klog.Errorf("Unexpected error when reading response body: %v", err)
  1117  			unexpectedErr := fmt.Errorf("unexpected error when reading response body. Please retry. Original error: %w", err)
  1118  			return Result{
  1119  				err: unexpectedErr,
  1120  			}
  1121  		}
  1122  	}
  1123  
  1124  	glogBody("Response Body", body)
  1125  
  1126  	// verify the content type is accurate
  1127  	var decoder runtime.Decoder
  1128  	contentType := resp.Header.Get("Content-Type")
  1129  	if len(contentType) == 0 {
  1130  		contentType = r.c.content.ContentType
  1131  	}
  1132  	if len(contentType) > 0 {
  1133  		var err error
  1134  		mediaType, params, err := mime.ParseMediaType(contentType)
  1135  		if err != nil {
  1136  			return Result{err: errors.NewInternalError(err)}
  1137  		}
  1138  		decoder, err = r.c.content.Negotiator.Decoder(mediaType, params)
  1139  		if err != nil {
  1140  			// if we fail to negotiate a decoder, treat this as an unstructured error
  1141  			switch {
  1142  			case resp.StatusCode == http.StatusSwitchingProtocols:
  1143  				// no-op, we've been upgraded
  1144  			case resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent:
  1145  				return Result{err: r.transformUnstructuredResponseError(resp, req, body)}
  1146  			}
  1147  			return Result{
  1148  				body:        body,
  1149  				contentType: contentType,
  1150  				statusCode:  resp.StatusCode,
  1151  				warnings:    handleWarnings(resp.Header, r.warningHandler),
  1152  			}
  1153  		}
  1154  	}
  1155  
  1156  	switch {
  1157  	case resp.StatusCode == http.StatusSwitchingProtocols:
  1158  		// no-op, we've been upgraded
  1159  	case resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent:
  1160  		// calculate an unstructured error from the response which the Result object may use if the caller
  1161  		// did not return a structured error.
  1162  		retryAfter, _ := retryAfterSeconds(resp)
  1163  		err := r.newUnstructuredResponseError(body, isTextResponse(resp), resp.StatusCode, req.Method, retryAfter)
  1164  		return Result{
  1165  			body:        body,
  1166  			contentType: contentType,
  1167  			statusCode:  resp.StatusCode,
  1168  			decoder:     decoder,
  1169  			err:         err,
  1170  			warnings:    handleWarnings(resp.Header, r.warningHandler),
  1171  		}
  1172  	}
  1173  
  1174  	return Result{
  1175  		body:        body,
  1176  		contentType: contentType,
  1177  		statusCode:  resp.StatusCode,
  1178  		decoder:     decoder,
  1179  		warnings:    handleWarnings(resp.Header, r.warningHandler),
  1180  	}
  1181  }
  1182  
  1183  // truncateBody decides if the body should be truncated, based on the glog Verbosity.
  1184  func truncateBody(body string) string {
  1185  	max := 0
  1186  	switch {
  1187  	case bool(klog.V(10).Enabled()):
  1188  		return body
  1189  	case bool(klog.V(9).Enabled()):
  1190  		max = 10240
  1191  	case bool(klog.V(8).Enabled()):
  1192  		max = 1024
  1193  	}
  1194  
  1195  	if len(body) <= max {
  1196  		return body
  1197  	}
  1198  
  1199  	return body[:max] + fmt.Sprintf(" [truncated %d chars]", len(body)-max)
  1200  }
  1201  
  1202  // glogBody logs a body output that could be either JSON or protobuf. It explicitly guards against
  1203  // allocating a new string for the body output unless necessary. Uses a simple heuristic to determine
  1204  // whether the body is printable.
  1205  func glogBody(prefix string, body []byte) {
  1206  	if klogV := klog.V(8); klogV.Enabled() {
  1207  		if bytes.IndexFunc(body, func(r rune) bool {
  1208  			return r < 0x0a
  1209  		}) != -1 {
  1210  			klogV.Infof("%s:\n%s", prefix, truncateBody(hex.Dump(body)))
  1211  		} else {
  1212  			klogV.Infof("%s: %s", prefix, truncateBody(string(body)))
  1213  		}
  1214  	}
  1215  }
  1216  
  1217  // maxUnstructuredResponseTextBytes is an upper bound on how much output to include in the unstructured error.
  1218  const maxUnstructuredResponseTextBytes = 2048
  1219  
  1220  // transformUnstructuredResponseError handles an error from the server that is not in a structured form.
  1221  // It is expected to transform any response that is not recognizable as a clear server sent error from the
  1222  // K8S API using the information provided with the request. In practice, HTTP proxies and client libraries
  1223  // introduce a level of uncertainty to the responses returned by servers that in common use result in
  1224  // unexpected responses. The rough structure is:
  1225  //
  1226  // 1. Assume the server sends you something sane - JSON + well defined error objects + proper codes
  1227  //   - this is the happy path
  1228  //   - when you get this output, trust what the server sends
  1229  //     2. Guard against empty fields / bodies in received JSON and attempt to cull sufficient info from them to
  1230  //     generate a reasonable facsimile of the original failure.
  1231  //   - Be sure to use a distinct error type or flag that allows a client to distinguish between this and error 1 above
  1232  //     3. Handle true disconnect failures / completely malformed data by moving up to a more generic client error
  1233  //     4. Distinguish between various connection failures like SSL certificates, timeouts, proxy errors, unexpected
  1234  //     initial contact, the presence of mismatched body contents from posted content types
  1235  //   - Give these a separate distinct error type and capture as much as possible of the original message
  1236  //
  1237  // TODO: introduce transformation of generic http.Client.Do() errors that separates 4.
  1238  func (r *Request) transformUnstructuredResponseError(resp *http.Response, req *http.Request, body []byte) error {
  1239  	if body == nil && resp.Body != nil {
  1240  		if data, err := io.ReadAll(&io.LimitedReader{R: resp.Body, N: maxUnstructuredResponseTextBytes}); err == nil {
  1241  			body = data
  1242  		}
  1243  	}
  1244  	retryAfter, _ := retryAfterSeconds(resp)
  1245  	return r.newUnstructuredResponseError(body, isTextResponse(resp), resp.StatusCode, req.Method, retryAfter)
  1246  }
  1247  
  1248  // newUnstructuredResponseError instantiates the appropriate generic error for the provided input. It also logs the body.
  1249  func (r *Request) newUnstructuredResponseError(body []byte, isTextResponse bool, statusCode int, method string, retryAfter int) error {
  1250  	// cap the amount of output we create
  1251  	if len(body) > maxUnstructuredResponseTextBytes {
  1252  		body = body[:maxUnstructuredResponseTextBytes]
  1253  	}
  1254  
  1255  	message := "unknown"
  1256  	if isTextResponse {
  1257  		message = strings.TrimSpace(string(body))
  1258  	}
  1259  	var groupResource schema.GroupResource
  1260  	if len(r.resource) > 0 {
  1261  		groupResource.Group = r.c.content.GroupVersion.Group
  1262  		groupResource.Resource = r.resource
  1263  	}
  1264  	return errors.NewGenericServerResponse(
  1265  		statusCode,
  1266  		method,
  1267  		groupResource,
  1268  		r.resourceName,
  1269  		message,
  1270  		retryAfter,
  1271  		true,
  1272  	)
  1273  }
  1274  
  1275  // isTextResponse returns true if the response appears to be a textual media type.
  1276  func isTextResponse(resp *http.Response) bool {
  1277  	contentType := resp.Header.Get("Content-Type")
  1278  	if len(contentType) == 0 {
  1279  		return true
  1280  	}
  1281  	media, _, err := mime.ParseMediaType(contentType)
  1282  	if err != nil {
  1283  		return false
  1284  	}
  1285  	return strings.HasPrefix(media, "text/")
  1286  }
  1287  
  1288  // retryAfterSeconds returns the value of the Retry-After header and true, or 0 and false if
  1289  // the header was missing or not a valid number.
  1290  func retryAfterSeconds(resp *http.Response) (int, bool) {
  1291  	if h := resp.Header.Get("Retry-After"); len(h) > 0 {
  1292  		if i, err := strconv.Atoi(h); err == nil {
  1293  			return i, true
  1294  		}
  1295  	}
  1296  	return 0, false
  1297  }
  1298  
  1299  // Result contains the result of calling Request.Do().
  1300  type Result struct {
  1301  	body        []byte
  1302  	warnings    []net.WarningHeader
  1303  	contentType string
  1304  	err         error
  1305  	statusCode  int
  1306  
  1307  	decoder runtime.Decoder
  1308  }
  1309  
  1310  // Raw returns the raw result.
  1311  func (r Result) Raw() ([]byte, error) {
  1312  	return r.body, r.err
  1313  }
  1314  
  1315  // Get returns the result as an object, which means it passes through the decoder.
  1316  // If the returned object is of type Status and has .Status != StatusSuccess, the
  1317  // additional information in Status will be used to enrich the error.
  1318  func (r Result) Get() (runtime.Object, error) {
  1319  	if r.err != nil {
  1320  		// Check whether the result has a Status object in the body and prefer that.
  1321  		return nil, r.Error()
  1322  	}
  1323  	if r.decoder == nil {
  1324  		return nil, fmt.Errorf("serializer for %s doesn't exist", r.contentType)
  1325  	}
  1326  
  1327  	// decode, but if the result is Status return that as an error instead.
  1328  	out, _, err := r.decoder.Decode(r.body, nil, nil)
  1329  	if err != nil {
  1330  		return nil, err
  1331  	}
  1332  	switch t := out.(type) {
  1333  	case *metav1.Status:
  1334  		// any status besides StatusSuccess is considered an error.
  1335  		if t.Status != metav1.StatusSuccess {
  1336  			return nil, errors.FromObject(t)
  1337  		}
  1338  	}
  1339  	return out, nil
  1340  }
  1341  
  1342  // StatusCode returns the HTTP status code of the request. (Only valid if no
  1343  // error was returned.)
  1344  func (r Result) StatusCode(statusCode *int) Result {
  1345  	*statusCode = r.statusCode
  1346  	return r
  1347  }
  1348  
  1349  // ContentType returns the "Content-Type" response header into the passed
  1350  // string, returning the Result for possible chaining. (Only valid if no
  1351  // error code was returned.)
  1352  func (r Result) ContentType(contentType *string) Result {
  1353  	*contentType = r.contentType
  1354  	return r
  1355  }
  1356  
  1357  // Into stores the result into obj, if possible. If obj is nil it is ignored.
  1358  // If the returned object is of type Status and has .Status != StatusSuccess, the
  1359  // additional information in Status will be used to enrich the error.
  1360  func (r Result) Into(obj runtime.Object) error {
  1361  	if r.err != nil {
  1362  		// Check whether the result has a Status object in the body and prefer that.
  1363  		return r.Error()
  1364  	}
  1365  	if r.decoder == nil {
  1366  		return fmt.Errorf("serializer for %s doesn't exist", r.contentType)
  1367  	}
  1368  	if len(r.body) == 0 {
  1369  		return fmt.Errorf("0-length response with status code: %d and content type: %s",
  1370  			r.statusCode, r.contentType)
  1371  	}
  1372  
  1373  	out, _, err := r.decoder.Decode(r.body, nil, obj)
  1374  	if err != nil || out == obj {
  1375  		return err
  1376  	}
  1377  	// if a different object is returned, see if it is Status and avoid double decoding
  1378  	// the object.
  1379  	switch t := out.(type) {
  1380  	case *metav1.Status:
  1381  		// any status besides StatusSuccess is considered an error.
  1382  		if t.Status != metav1.StatusSuccess {
  1383  			return errors.FromObject(t)
  1384  		}
  1385  	}
  1386  	return nil
  1387  }
  1388  
  1389  // WasCreated updates the provided bool pointer to whether the server returned
  1390  // 201 created or a different response.
  1391  func (r Result) WasCreated(wasCreated *bool) Result {
  1392  	*wasCreated = r.statusCode == http.StatusCreated
  1393  	return r
  1394  }
  1395  
  1396  // Error returns the error executing the request, nil if no error occurred.
  1397  // If the returned object is of type Status and has Status != StatusSuccess, the
  1398  // additional information in Status will be used to enrich the error.
  1399  // See the Request.Do() comment for what errors you might get.
  1400  func (r Result) Error() error {
  1401  	// if we have received an unexpected server error, and we have a body and decoder, we can try to extract
  1402  	// a Status object.
  1403  	if r.err == nil || !errors.IsUnexpectedServerError(r.err) || len(r.body) == 0 || r.decoder == nil {
  1404  		return r.err
  1405  	}
  1406  
  1407  	// attempt to convert the body into a Status object
  1408  	// to be backwards compatible with old servers that do not return a version, default to "v1"
  1409  	out, _, err := r.decoder.Decode(r.body, &schema.GroupVersionKind{Version: "v1"}, nil)
  1410  	if err != nil {
  1411  		klog.V(5).Infof("body was not decodable (unable to check for Status): %v", err)
  1412  		return r.err
  1413  	}
  1414  	switch t := out.(type) {
  1415  	case *metav1.Status:
  1416  		// because we default the kind, we *must* check for StatusFailure
  1417  		if t.Status == metav1.StatusFailure {
  1418  			return errors.FromObject(t)
  1419  		}
  1420  	}
  1421  	return r.err
  1422  }
  1423  
  1424  // Warnings returns any warning headers received in the response
  1425  func (r Result) Warnings() []net.WarningHeader {
  1426  	return r.warnings
  1427  }
  1428  
  1429  // NameMayNotBe specifies strings that cannot be used as names specified as path segments (like the REST API or etcd store)
  1430  var NameMayNotBe = []string{".", ".."}
  1431  
  1432  // NameMayNotContain specifies substrings that cannot be used in names specified as path segments (like the REST API or etcd store)
  1433  var NameMayNotContain = []string{"/", "%"}
  1434  
  1435  // IsValidPathSegmentName validates the name can be safely encoded as a path segment
  1436  func IsValidPathSegmentName(name string) []string {
  1437  	for _, illegalName := range NameMayNotBe {
  1438  		if name == illegalName {
  1439  			return []string{fmt.Sprintf(`may not be '%s'`, illegalName)}
  1440  		}
  1441  	}
  1442  
  1443  	var errors []string
  1444  	for _, illegalContent := range NameMayNotContain {
  1445  		if strings.Contains(name, illegalContent) {
  1446  			errors = append(errors, fmt.Sprintf(`may not contain '%s'`, illegalContent))
  1447  		}
  1448  	}
  1449  
  1450  	return errors
  1451  }
  1452  
  1453  // IsValidPathSegmentPrefix validates the name can be used as a prefix for a name which will be encoded as a path segment
  1454  // It does not check for exact matches with disallowed names, since an arbitrary suffix might make the name valid
  1455  func IsValidPathSegmentPrefix(name string) []string {
  1456  	var errors []string
  1457  	for _, illegalContent := range NameMayNotContain {
  1458  		if strings.Contains(name, illegalContent) {
  1459  			errors = append(errors, fmt.Sprintf(`may not contain '%s'`, illegalContent))
  1460  		}
  1461  	}
  1462  
  1463  	return errors
  1464  }
  1465  
  1466  // ValidatePathSegmentName validates the name can be safely encoded as a path segment
  1467  func ValidatePathSegmentName(name string, prefix bool) []string {
  1468  	if prefix {
  1469  		return IsValidPathSegmentPrefix(name)
  1470  	}
  1471  	return IsValidPathSegmentName(name)
  1472  }
  1473  

View as plain text