1
16
17 package rest
18
19 import (
20 "bytes"
21 "context"
22 "encoding/hex"
23 "fmt"
24 "io"
25 "mime"
26 "net/http"
27 "net/http/httptrace"
28 "net/url"
29 "os"
30 "path"
31 "reflect"
32 "strconv"
33 "strings"
34 "sync"
35 "time"
36
37 "golang.org/x/net/http2"
38
39 "k8s.io/apimachinery/pkg/api/errors"
40 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
41 "k8s.io/apimachinery/pkg/runtime"
42 "k8s.io/apimachinery/pkg/runtime/schema"
43 "k8s.io/apimachinery/pkg/runtime/serializer/streaming"
44 "k8s.io/apimachinery/pkg/util/net"
45 "k8s.io/apimachinery/pkg/watch"
46 restclientwatch "k8s.io/client-go/rest/watch"
47 "k8s.io/client-go/tools/metrics"
48 "k8s.io/client-go/util/flowcontrol"
49 "k8s.io/klog/v2"
50 "k8s.io/utils/clock"
51 )
52
53 var (
54
55
56
57 longThrottleLatency = 50 * time.Millisecond
58
59
60 extraLongThrottleLatency = 1 * time.Second
61 )
62
63
64 type HTTPClient interface {
65 Do(req *http.Request) (*http.Response, error)
66 }
67
68
69
70 type ResponseWrapper interface {
71 DoRaw(context.Context) ([]byte, error)
72 Stream(context.Context) (io.ReadCloser, error)
73 }
74
75
76 type RequestConstructionError struct {
77 Err error
78 }
79
80
81 func (r *RequestConstructionError) Error() string {
82 return fmt.Sprintf("request construction error: '%v'", r.Err)
83 }
84
85 var noBackoff = &NoBackoff{}
86
87 type requestRetryFunc func(maxRetries int) WithRetry
88
89 func defaultRequestRetryFn(maxRetries int) WithRetry {
90 return &withRetry{maxRetries: maxRetries}
91 }
92
93
94
95
96 type Request struct {
97 c *RESTClient
98
99 warningHandler WarningHandler
100
101 rateLimiter flowcontrol.RateLimiter
102 backoff BackoffManager
103 timeout time.Duration
104 maxRetries int
105
106
107 verb string
108 pathPrefix string
109 subpath string
110 params url.Values
111 headers http.Header
112
113
114 namespace string
115 namespaceSet bool
116 resource string
117 resourceName string
118 subresource string
119
120
121 err error
122
123
124 body io.Reader
125 bodyBytes []byte
126
127 retryFn requestRetryFunc
128 }
129
130
131 func NewRequest(c *RESTClient) *Request {
132 var backoff BackoffManager
133 if c.createBackoffMgr != nil {
134 backoff = c.createBackoffMgr()
135 }
136 if backoff == nil {
137 backoff = noBackoff
138 }
139
140 var pathPrefix string
141 if c.base != nil {
142 pathPrefix = path.Join("/", c.base.Path, c.versionedAPIPath)
143 } else {
144 pathPrefix = path.Join("/", c.versionedAPIPath)
145 }
146
147 var timeout time.Duration
148 if c.Client != nil {
149 timeout = c.Client.Timeout
150 }
151
152 r := &Request{
153 c: c,
154 rateLimiter: c.rateLimiter,
155 backoff: backoff,
156 timeout: timeout,
157 pathPrefix: pathPrefix,
158 maxRetries: 10,
159 retryFn: defaultRequestRetryFn,
160 warningHandler: c.warningHandler,
161 }
162
163 switch {
164 case len(c.content.AcceptContentTypes) > 0:
165 r.SetHeader("Accept", c.content.AcceptContentTypes)
166 case len(c.content.ContentType) > 0:
167 r.SetHeader("Accept", c.content.ContentType+", */*")
168 }
169 return r
170 }
171
172
173 func NewRequestWithClient(base *url.URL, versionedAPIPath string, content ClientContentConfig, client *http.Client) *Request {
174 return NewRequest(&RESTClient{
175 base: base,
176 versionedAPIPath: versionedAPIPath,
177 content: content,
178 Client: client,
179 })
180 }
181
182
183 func (r *Request) Verb(verb string) *Request {
184 r.verb = verb
185 return r
186 }
187
188
189
190
191 func (r *Request) Prefix(segments ...string) *Request {
192 if r.err != nil {
193 return r
194 }
195 r.pathPrefix = path.Join(r.pathPrefix, path.Join(segments...))
196 return r
197 }
198
199
200
201 func (r *Request) Suffix(segments ...string) *Request {
202 if r.err != nil {
203 return r
204 }
205 r.subpath = path.Join(r.subpath, path.Join(segments...))
206 return r
207 }
208
209
210 func (r *Request) Resource(resource string) *Request {
211 if r.err != nil {
212 return r
213 }
214 if len(r.resource) != 0 {
215 r.err = fmt.Errorf("resource already set to %q, cannot change to %q", r.resource, resource)
216 return r
217 }
218 if msgs := IsValidPathSegmentName(resource); len(msgs) != 0 {
219 r.err = fmt.Errorf("invalid resource %q: %v", resource, msgs)
220 return r
221 }
222 r.resource = resource
223 return r
224 }
225
226
227
228 func (r *Request) BackOff(manager BackoffManager) *Request {
229 if manager == nil {
230 r.backoff = &NoBackoff{}
231 return r
232 }
233
234 r.backoff = manager
235 return r
236 }
237
238
239
240 func (r *Request) WarningHandler(handler WarningHandler) *Request {
241 r.warningHandler = handler
242 return r
243 }
244
245
246 func (r *Request) Throttle(limiter flowcontrol.RateLimiter) *Request {
247 r.rateLimiter = limiter
248 return r
249 }
250
251
252
253 func (r *Request) SubResource(subresources ...string) *Request {
254 if r.err != nil {
255 return r
256 }
257 subresource := path.Join(subresources...)
258 if len(r.subresource) != 0 {
259 r.err = fmt.Errorf("subresource already set to %q, cannot change to %q", r.subresource, subresource)
260 return r
261 }
262 for _, s := range subresources {
263 if msgs := IsValidPathSegmentName(s); len(msgs) != 0 {
264 r.err = fmt.Errorf("invalid subresource %q: %v", s, msgs)
265 return r
266 }
267 }
268 r.subresource = subresource
269 return r
270 }
271
272
273 func (r *Request) Name(resourceName string) *Request {
274 if r.err != nil {
275 return r
276 }
277 if len(resourceName) == 0 {
278 r.err = fmt.Errorf("resource name may not be empty")
279 return r
280 }
281 if len(r.resourceName) != 0 {
282 r.err = fmt.Errorf("resource name already set to %q, cannot change to %q", r.resourceName, resourceName)
283 return r
284 }
285 if msgs := IsValidPathSegmentName(resourceName); len(msgs) != 0 {
286 r.err = fmt.Errorf("invalid resource name %q: %v", resourceName, msgs)
287 return r
288 }
289 r.resourceName = resourceName
290 return r
291 }
292
293
294 func (r *Request) Namespace(namespace string) *Request {
295 if r.err != nil {
296 return r
297 }
298 if r.namespaceSet {
299 r.err = fmt.Errorf("namespace already set to %q, cannot change to %q", r.namespace, namespace)
300 return r
301 }
302 if msgs := IsValidPathSegmentName(namespace); len(msgs) != 0 {
303 r.err = fmt.Errorf("invalid namespace %q: %v", namespace, msgs)
304 return r
305 }
306 r.namespaceSet = true
307 r.namespace = namespace
308 return r
309 }
310
311
312 func (r *Request) NamespaceIfScoped(namespace string, scoped bool) *Request {
313 if scoped {
314 return r.Namespace(namespace)
315 }
316 return r
317 }
318
319
320
321 func (r *Request) AbsPath(segments ...string) *Request {
322 if r.err != nil {
323 return r
324 }
325 r.pathPrefix = path.Join(r.c.base.Path, path.Join(segments...))
326 if len(segments) == 1 && (len(r.c.base.Path) > 1 || len(segments[0]) > 1) && strings.HasSuffix(segments[0], "/") {
327
328 r.pathPrefix += "/"
329 }
330 return r
331 }
332
333
334
335 func (r *Request) RequestURI(uri string) *Request {
336 if r.err != nil {
337 return r
338 }
339 locator, err := url.Parse(uri)
340 if err != nil {
341 r.err = err
342 return r
343 }
344 r.pathPrefix = locator.Path
345 if len(locator.Query()) > 0 {
346 if r.params == nil {
347 r.params = make(url.Values)
348 }
349 for k, v := range locator.Query() {
350 r.params[k] = v
351 }
352 }
353 return r
354 }
355
356
357 func (r *Request) Param(paramName, s string) *Request {
358 if r.err != nil {
359 return r
360 }
361 return r.setParam(paramName, s)
362 }
363
364
365
366
367
368
369 func (r *Request) VersionedParams(obj runtime.Object, codec runtime.ParameterCodec) *Request {
370 return r.SpecificallyVersionedParams(obj, codec, r.c.content.GroupVersion)
371 }
372
373 func (r *Request) SpecificallyVersionedParams(obj runtime.Object, codec runtime.ParameterCodec, version schema.GroupVersion) *Request {
374 if r.err != nil {
375 return r
376 }
377 params, err := codec.EncodeParameters(obj, version)
378 if err != nil {
379 r.err = err
380 return r
381 }
382 for k, v := range params {
383 if r.params == nil {
384 r.params = make(url.Values)
385 }
386 r.params[k] = append(r.params[k], v...)
387 }
388 return r
389 }
390
391 func (r *Request) setParam(paramName, value string) *Request {
392 if r.params == nil {
393 r.params = make(url.Values)
394 }
395 r.params[paramName] = append(r.params[paramName], value)
396 return r
397 }
398
399 func (r *Request) SetHeader(key string, values ...string) *Request {
400 if r.headers == nil {
401 r.headers = http.Header{}
402 }
403 r.headers.Del(key)
404 for _, value := range values {
405 r.headers.Add(key, value)
406 }
407 return r
408 }
409
410
411
412 func (r *Request) Timeout(d time.Duration) *Request {
413 if r.err != nil {
414 return r
415 }
416 r.timeout = d
417 return r
418 }
419
420
421
422
423
424 func (r *Request) MaxRetries(maxRetries int) *Request {
425 if maxRetries < 0 {
426 maxRetries = 0
427 }
428 r.maxRetries = maxRetries
429 return r
430 }
431
432
433
434
435
436
437
438
439 func (r *Request) Body(obj interface{}) *Request {
440 if r.err != nil {
441 return r
442 }
443 switch t := obj.(type) {
444 case string:
445 data, err := os.ReadFile(t)
446 if err != nil {
447 r.err = err
448 return r
449 }
450 glogBody("Request Body", data)
451 r.body = nil
452 r.bodyBytes = data
453 case []byte:
454 glogBody("Request Body", t)
455 r.body = nil
456 r.bodyBytes = t
457 case io.Reader:
458 r.body = t
459 r.bodyBytes = nil
460 case runtime.Object:
461
462 if reflect.ValueOf(t).IsNil() {
463 return r
464 }
465 encoder, err := r.c.content.Negotiator.Encoder(r.c.content.ContentType, nil)
466 if err != nil {
467 r.err = err
468 return r
469 }
470 data, err := runtime.Encode(encoder, t)
471 if err != nil {
472 r.err = err
473 return r
474 }
475 glogBody("Request Body", data)
476 r.body = nil
477 r.bodyBytes = data
478 r.SetHeader("Content-Type", r.c.content.ContentType)
479 default:
480 r.err = fmt.Errorf("unknown type used for body: %+v", obj)
481 }
482 return r
483 }
484
485
486 func (r *Request) Error() error {
487 return r.err
488 }
489
490
491
492 func (r *Request) URL() *url.URL {
493 p := r.pathPrefix
494 if r.namespaceSet && len(r.namespace) > 0 {
495 p = path.Join(p, "namespaces", r.namespace)
496 }
497 if len(r.resource) != 0 {
498 p = path.Join(p, strings.ToLower(r.resource))
499 }
500
501 if len(r.resourceName) != 0 || len(r.subpath) != 0 || len(r.subresource) != 0 {
502 p = path.Join(p, r.resourceName, r.subresource, r.subpath)
503 }
504
505 finalURL := &url.URL{}
506 if r.c.base != nil {
507 *finalURL = *r.c.base
508 }
509 finalURL.Path = p
510
511 query := url.Values{}
512 for key, values := range r.params {
513 for _, value := range values {
514 query.Add(key, value)
515 }
516 }
517
518
519 if r.timeout != 0 {
520 query.Set("timeout", r.timeout.String())
521 }
522 finalURL.RawQuery = query.Encode()
523 return finalURL
524 }
525
526
527
528
529
530 func (r Request) finalURLTemplate() url.URL {
531 newParams := url.Values{}
532 v := []string{"{value}"}
533 for k := range r.params {
534 newParams[k] = v
535 }
536 r.params = newParams
537 u := r.URL()
538 if u == nil {
539 return url.URL{}
540 }
541
542 segments := strings.Split(u.Path, "/")
543 groupIndex := 0
544 index := 0
545 trimmedBasePath := ""
546 if r.c.base != nil && strings.Contains(u.Path, r.c.base.Path) {
547 p := strings.TrimPrefix(u.Path, r.c.base.Path)
548 if !strings.HasPrefix(p, "/") {
549 p = "/" + p
550 }
551
552
553 trimmedBasePath = r.c.base.Path
554 segments = strings.Split(p, "/")
555 groupIndex = 1
556 }
557 if len(segments) <= 2 {
558 return *u
559 }
560
561 const CoreGroupPrefix = "api"
562 const NamedGroupPrefix = "apis"
563 isCoreGroup := segments[groupIndex] == CoreGroupPrefix
564 isNamedGroup := segments[groupIndex] == NamedGroupPrefix
565 if isCoreGroup {
566
567 index = groupIndex + 2
568 } else if isNamedGroup {
569
570 index = groupIndex + 3
571 } else {
572
573
574
575
576 u.Path = "/{prefix}"
577 u.RawQuery = ""
578 return *u
579 }
580
581 switch {
582
583
584 case len(segments)-index == 2:
585
586 segments[index+1] = "{name}"
587 case len(segments)-index == 3:
588 if segments[index+2] == "finalize" || segments[index+2] == "status" {
589
590 segments[index+1] = "{name}"
591 } else {
592
593 segments[index+1] = "{namespace}"
594 }
595 case len(segments)-index >= 4:
596 segments[index+1] = "{namespace}"
597
598 if segments[index+3] != "finalize" && segments[index+3] != "status" {
599
600 segments[index+3] = "{name}"
601 }
602 }
603 u.Path = path.Join(trimmedBasePath, path.Join(segments...))
604 return *u
605 }
606
607 func (r *Request) tryThrottleWithInfo(ctx context.Context, retryInfo string) error {
608 if r.rateLimiter == nil {
609 return nil
610 }
611
612 now := time.Now()
613
614 err := r.rateLimiter.Wait(ctx)
615 if err != nil {
616 err = fmt.Errorf("client rate limiter Wait returned an error: %w", err)
617 }
618 latency := time.Since(now)
619
620 var message string
621 switch {
622 case len(retryInfo) > 0:
623 message = fmt.Sprintf("Waited for %v, %s - request: %s:%s", latency, retryInfo, r.verb, r.URL().String())
624 default:
625 message = fmt.Sprintf("Waited for %v due to client-side throttling, not priority and fairness, request: %s:%s", latency, r.verb, r.URL().String())
626 }
627
628 if latency > longThrottleLatency {
629 klog.V(3).Info(message)
630 }
631 if latency > extraLongThrottleLatency {
632
633
634 globalThrottledLogger.Infof("%s", message)
635 }
636 metrics.RateLimiterLatency.Observe(ctx, r.verb, r.finalURLTemplate(), latency)
637
638 return err
639 }
640
641 func (r *Request) tryThrottle(ctx context.Context) error {
642 return r.tryThrottleWithInfo(ctx, "")
643 }
644
645 type throttleSettings struct {
646 logLevel klog.Level
647 minLogInterval time.Duration
648
649 lastLogTime time.Time
650 lock sync.RWMutex
651 }
652
653 type throttledLogger struct {
654 clock clock.PassiveClock
655 settings []*throttleSettings
656 }
657
658 var globalThrottledLogger = &throttledLogger{
659 clock: clock.RealClock{},
660 settings: []*throttleSettings{
661 {
662 logLevel: 2,
663 minLogInterval: 1 * time.Second,
664 }, {
665 logLevel: 0,
666 minLogInterval: 10 * time.Second,
667 },
668 },
669 }
670
671 func (b *throttledLogger) attemptToLog() (klog.Level, bool) {
672 for _, setting := range b.settings {
673 if bool(klog.V(setting.logLevel).Enabled()) {
674
675 if func() bool {
676 setting.lock.RLock()
677 defer setting.lock.RUnlock()
678 return b.clock.Since(setting.lastLogTime) >= setting.minLogInterval
679 }() {
680 setting.lock.Lock()
681 defer setting.lock.Unlock()
682 if b.clock.Since(setting.lastLogTime) >= setting.minLogInterval {
683 setting.lastLogTime = b.clock.Now()
684 return setting.logLevel, true
685 }
686 }
687 return -1, false
688 }
689 }
690 return -1, false
691 }
692
693
694
695 func (b *throttledLogger) Infof(message string, args ...interface{}) {
696 if logLevel, ok := b.attemptToLog(); ok {
697 klog.V(logLevel).Infof(message, args...)
698 }
699 }
700
701
702
703 func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
704
705
706 if r.err != nil {
707 return nil, r.err
708 }
709
710 client := r.c.Client
711 if client == nil {
712 client = http.DefaultClient
713 }
714
715 isErrRetryableFunc := func(request *http.Request, err error) bool {
716
717
718 if net.IsProbableEOF(err) || net.IsTimeout(err) {
719 return true
720 }
721 return false
722 }
723 retry := r.retryFn(r.maxRetries)
724 url := r.URL().String()
725 for {
726 if err := retry.Before(ctx, r); err != nil {
727 return nil, retry.WrapPreviousError(err)
728 }
729
730 req, err := r.newHTTPRequest(ctx)
731 if err != nil {
732 return nil, err
733 }
734
735 resp, err := client.Do(req)
736 retry.After(ctx, r, resp, err)
737 if err == nil && resp.StatusCode == http.StatusOK {
738 return r.newStreamWatcher(resp)
739 }
740
741 done, transformErr := func() (bool, error) {
742 defer readAndCloseResponseBody(resp)
743
744 if retry.IsNextRetry(ctx, r, req, resp, err, isErrRetryableFunc) {
745 return false, nil
746 }
747
748 if resp == nil {
749
750 return true, nil
751 }
752 if result := r.transformResponse(resp, req); result.err != nil {
753 return true, result.err
754 }
755 return true, fmt.Errorf("for request %s, got status: %v", url, resp.StatusCode)
756 }()
757 if done {
758 if isErrRetryableFunc(req, err) {
759 return watch.NewEmptyWatch(), nil
760 }
761 if err == nil {
762
763
764 err = transformErr
765 }
766 return nil, retry.WrapPreviousError(err)
767 }
768 }
769 }
770
771 func (r *Request) newStreamWatcher(resp *http.Response) (watch.Interface, error) {
772 contentType := resp.Header.Get("Content-Type")
773 mediaType, params, err := mime.ParseMediaType(contentType)
774 if err != nil {
775 klog.V(4).Infof("Unexpected content type from the server: %q: %v", contentType, err)
776 }
777 objectDecoder, streamingSerializer, framer, err := r.c.content.Negotiator.StreamDecoder(mediaType, params)
778 if err != nil {
779 return nil, err
780 }
781
782 handleWarnings(resp.Header, r.warningHandler)
783
784 frameReader := framer.NewFrameReader(resp.Body)
785 watchEventDecoder := streaming.NewDecoder(frameReader, streamingSerializer)
786
787 return watch.NewStreamWatcher(
788 restclientwatch.NewDecoder(watchEventDecoder, objectDecoder),
789
790
791 errors.NewClientErrorReporter(http.StatusInternalServerError, r.verb, "ClientWatchDecoding"),
792 ), nil
793 }
794
795
796
797
798 func updateRequestResultMetric(ctx context.Context, req *Request, resp *http.Response, err error) {
799 code, host := sanitize(req, resp, err)
800 metrics.RequestResult.Increment(ctx, code, req.verb, host)
801 }
802
803
804
805
806 func updateRequestRetryMetric(ctx context.Context, req *Request, resp *http.Response, err error) {
807 code, host := sanitize(req, resp, err)
808 metrics.RequestRetry.IncrementRetry(ctx, code, req.verb, host)
809 }
810
811 func sanitize(req *Request, resp *http.Response, err error) (string, string) {
812 host := "none"
813 if req.c.base != nil {
814 host = req.c.base.Host
815 }
816
817
818
819 code := "<error>"
820 if resp != nil {
821 code = strconv.Itoa(resp.StatusCode)
822 }
823
824 return code, host
825 }
826
827
828
829
830
831 func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) {
832 if r.err != nil {
833 return nil, r.err
834 }
835
836 if err := r.tryThrottle(ctx); err != nil {
837 return nil, err
838 }
839
840 client := r.c.Client
841 if client == nil {
842 client = http.DefaultClient
843 }
844
845 retry := r.retryFn(r.maxRetries)
846 url := r.URL().String()
847 for {
848 if err := retry.Before(ctx, r); err != nil {
849 return nil, err
850 }
851
852 req, err := r.newHTTPRequest(ctx)
853 if err != nil {
854 return nil, err
855 }
856 resp, err := client.Do(req)
857 retry.After(ctx, r, resp, err)
858 if err != nil {
859
860 return nil, err
861 }
862
863 switch {
864 case (resp.StatusCode >= 200) && (resp.StatusCode < 300):
865 handleWarnings(resp.Header, r.warningHandler)
866 return resp.Body, nil
867
868 default:
869 done, transformErr := func() (bool, error) {
870 defer resp.Body.Close()
871
872 if retry.IsNextRetry(ctx, r, req, resp, err, neverRetryError) {
873 return false, nil
874 }
875 result := r.transformResponse(resp, req)
876 if err := result.Error(); err != nil {
877 return true, err
878 }
879 return true, fmt.Errorf("%d while accessing %v: %s", result.statusCode, url, string(result.body))
880 }()
881 if done {
882 return nil, transformErr
883 }
884 }
885 }
886 }
887
888
889
890
891
892
893
894
895
896 func (r *Request) requestPreflightCheck() error {
897 if !r.namespaceSet {
898 return nil
899 }
900 if len(r.namespace) > 0 {
901 return nil
902 }
903
904 switch r.verb {
905 case "POST":
906 return fmt.Errorf("an empty namespace may not be set during creation")
907 case "GET", "PUT", "DELETE":
908 if len(r.resourceName) > 0 {
909 return fmt.Errorf("an empty namespace may not be set when a resource name is provided")
910 }
911 }
912 return nil
913 }
914
915 func (r *Request) newHTTPRequest(ctx context.Context) (*http.Request, error) {
916 var body io.Reader
917 switch {
918 case r.body != nil && r.bodyBytes != nil:
919 return nil, fmt.Errorf("cannot set both body and bodyBytes")
920 case r.body != nil:
921 body = r.body
922 case r.bodyBytes != nil:
923
924
925 body = bytes.NewReader(r.bodyBytes)
926 }
927
928 url := r.URL().String()
929 req, err := http.NewRequestWithContext(httptrace.WithClientTrace(ctx, newDNSMetricsTrace(ctx)), r.verb, url, body)
930 if err != nil {
931 return nil, err
932 }
933 req.Header = r.headers
934 return req, nil
935 }
936
937
938
939 func newDNSMetricsTrace(ctx context.Context) *httptrace.ClientTrace {
940 type dnsMetric struct {
941 start time.Time
942 host string
943 sync.Mutex
944 }
945 dns := &dnsMetric{}
946 return &httptrace.ClientTrace{
947 DNSStart: func(info httptrace.DNSStartInfo) {
948 dns.Lock()
949 defer dns.Unlock()
950 dns.start = time.Now()
951 dns.host = info.Host
952 },
953 DNSDone: func(info httptrace.DNSDoneInfo) {
954 dns.Lock()
955 defer dns.Unlock()
956 metrics.ResolverLatency.Observe(ctx, dns.host, time.Since(dns.start))
957 },
958 }
959 }
960
961
962
963
964
965 func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Response)) error {
966
967 start := time.Now()
968 defer func() {
969 metrics.RequestLatency.Observe(ctx, r.verb, r.finalURLTemplate(), time.Since(start))
970 }()
971
972 if r.err != nil {
973 klog.V(4).Infof("Error in request: %v", r.err)
974 return r.err
975 }
976
977 if err := r.requestPreflightCheck(); err != nil {
978 return err
979 }
980
981 client := r.c.Client
982 if client == nil {
983 client = http.DefaultClient
984 }
985
986
987
988
989 if err := r.tryThrottle(ctx); err != nil {
990 return err
991 }
992
993 if r.timeout > 0 {
994 var cancel context.CancelFunc
995 ctx, cancel = context.WithTimeout(ctx, r.timeout)
996 defer cancel()
997 }
998
999 isErrRetryableFunc := func(req *http.Request, err error) bool {
1000
1001
1002
1003 if req.Method != "GET" {
1004 return false
1005 }
1006
1007 if net.IsConnectionReset(err) || net.IsProbableEOF(err) {
1008 return true
1009 }
1010 return false
1011 }
1012
1013
1014 retry := r.retryFn(r.maxRetries)
1015 for {
1016 if err := retry.Before(ctx, r); err != nil {
1017 return retry.WrapPreviousError(err)
1018 }
1019 req, err := r.newHTTPRequest(ctx)
1020 if err != nil {
1021 return err
1022 }
1023 resp, err := client.Do(req)
1024
1025
1026 if req.ContentLength >= 0 && !(req.Body != nil && req.ContentLength == 0) {
1027 metrics.RequestSize.Observe(ctx, r.verb, r.URL().Host, float64(req.ContentLength))
1028 }
1029 retry.After(ctx, r, resp, err)
1030
1031 done := func() bool {
1032 defer readAndCloseResponseBody(resp)
1033
1034
1035 f := func(req *http.Request, resp *http.Response) {
1036 if resp == nil {
1037 return
1038 }
1039 fn(req, resp)
1040 }
1041
1042 if retry.IsNextRetry(ctx, r, req, resp, err, isErrRetryableFunc) {
1043 return false
1044 }
1045
1046 f(req, resp)
1047 return true
1048 }()
1049 if done {
1050 return retry.WrapPreviousError(err)
1051 }
1052 }
1053 }
1054
1055
1056
1057
1058
1059
1060
1061 func (r *Request) Do(ctx context.Context) Result {
1062 var result Result
1063 err := r.request(ctx, func(req *http.Request, resp *http.Response) {
1064 result = r.transformResponse(resp, req)
1065 })
1066 if err != nil {
1067 return Result{err: err}
1068 }
1069 if result.err == nil || len(result.body) > 0 {
1070 metrics.ResponseSize.Observe(ctx, r.verb, r.URL().Host, float64(len(result.body)))
1071 }
1072 return result
1073 }
1074
1075
1076 func (r *Request) DoRaw(ctx context.Context) ([]byte, error) {
1077 var result Result
1078 err := r.request(ctx, func(req *http.Request, resp *http.Response) {
1079 result.body, result.err = io.ReadAll(resp.Body)
1080 glogBody("Response Body", result.body)
1081 if resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent {
1082 result.err = r.transformUnstructuredResponseError(resp, req, result.body)
1083 }
1084 })
1085 if err != nil {
1086 return nil, err
1087 }
1088 if result.err == nil || len(result.body) > 0 {
1089 metrics.ResponseSize.Observe(ctx, r.verb, r.URL().Host, float64(len(result.body)))
1090 }
1091 return result.body, result.err
1092 }
1093
1094
1095 func (r *Request) transformResponse(resp *http.Response, req *http.Request) Result {
1096 var body []byte
1097 if resp.Body != nil {
1098 data, err := io.ReadAll(resp.Body)
1099 switch err.(type) {
1100 case nil:
1101 body = data
1102 case http2.StreamError:
1103
1104
1105
1106
1107
1108
1109
1110 klog.V(2).Infof("Stream error %#v when reading response body, may be caused by closed connection.", err)
1111 streamErr := fmt.Errorf("stream error when reading response body, may be caused by closed connection. Please retry. Original error: %w", err)
1112 return Result{
1113 err: streamErr,
1114 }
1115 default:
1116 klog.Errorf("Unexpected error when reading response body: %v", err)
1117 unexpectedErr := fmt.Errorf("unexpected error when reading response body. Please retry. Original error: %w", err)
1118 return Result{
1119 err: unexpectedErr,
1120 }
1121 }
1122 }
1123
1124 glogBody("Response Body", body)
1125
1126
1127 var decoder runtime.Decoder
1128 contentType := resp.Header.Get("Content-Type")
1129 if len(contentType) == 0 {
1130 contentType = r.c.content.ContentType
1131 }
1132 if len(contentType) > 0 {
1133 var err error
1134 mediaType, params, err := mime.ParseMediaType(contentType)
1135 if err != nil {
1136 return Result{err: errors.NewInternalError(err)}
1137 }
1138 decoder, err = r.c.content.Negotiator.Decoder(mediaType, params)
1139 if err != nil {
1140
1141 switch {
1142 case resp.StatusCode == http.StatusSwitchingProtocols:
1143
1144 case resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent:
1145 return Result{err: r.transformUnstructuredResponseError(resp, req, body)}
1146 }
1147 return Result{
1148 body: body,
1149 contentType: contentType,
1150 statusCode: resp.StatusCode,
1151 warnings: handleWarnings(resp.Header, r.warningHandler),
1152 }
1153 }
1154 }
1155
1156 switch {
1157 case resp.StatusCode == http.StatusSwitchingProtocols:
1158
1159 case resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent:
1160
1161
1162 retryAfter, _ := retryAfterSeconds(resp)
1163 err := r.newUnstructuredResponseError(body, isTextResponse(resp), resp.StatusCode, req.Method, retryAfter)
1164 return Result{
1165 body: body,
1166 contentType: contentType,
1167 statusCode: resp.StatusCode,
1168 decoder: decoder,
1169 err: err,
1170 warnings: handleWarnings(resp.Header, r.warningHandler),
1171 }
1172 }
1173
1174 return Result{
1175 body: body,
1176 contentType: contentType,
1177 statusCode: resp.StatusCode,
1178 decoder: decoder,
1179 warnings: handleWarnings(resp.Header, r.warningHandler),
1180 }
1181 }
1182
1183
1184 func truncateBody(body string) string {
1185 max := 0
1186 switch {
1187 case bool(klog.V(10).Enabled()):
1188 return body
1189 case bool(klog.V(9).Enabled()):
1190 max = 10240
1191 case bool(klog.V(8).Enabled()):
1192 max = 1024
1193 }
1194
1195 if len(body) <= max {
1196 return body
1197 }
1198
1199 return body[:max] + fmt.Sprintf(" [truncated %d chars]", len(body)-max)
1200 }
1201
1202
1203
1204
1205 func glogBody(prefix string, body []byte) {
1206 if klogV := klog.V(8); klogV.Enabled() {
1207 if bytes.IndexFunc(body, func(r rune) bool {
1208 return r < 0x0a
1209 }) != -1 {
1210 klogV.Infof("%s:\n%s", prefix, truncateBody(hex.Dump(body)))
1211 } else {
1212 klogV.Infof("%s: %s", prefix, truncateBody(string(body)))
1213 }
1214 }
1215 }
1216
1217
1218 const maxUnstructuredResponseTextBytes = 2048
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238 func (r *Request) transformUnstructuredResponseError(resp *http.Response, req *http.Request, body []byte) error {
1239 if body == nil && resp.Body != nil {
1240 if data, err := io.ReadAll(&io.LimitedReader{R: resp.Body, N: maxUnstructuredResponseTextBytes}); err == nil {
1241 body = data
1242 }
1243 }
1244 retryAfter, _ := retryAfterSeconds(resp)
1245 return r.newUnstructuredResponseError(body, isTextResponse(resp), resp.StatusCode, req.Method, retryAfter)
1246 }
1247
1248
1249 func (r *Request) newUnstructuredResponseError(body []byte, isTextResponse bool, statusCode int, method string, retryAfter int) error {
1250
1251 if len(body) > maxUnstructuredResponseTextBytes {
1252 body = body[:maxUnstructuredResponseTextBytes]
1253 }
1254
1255 message := "unknown"
1256 if isTextResponse {
1257 message = strings.TrimSpace(string(body))
1258 }
1259 var groupResource schema.GroupResource
1260 if len(r.resource) > 0 {
1261 groupResource.Group = r.c.content.GroupVersion.Group
1262 groupResource.Resource = r.resource
1263 }
1264 return errors.NewGenericServerResponse(
1265 statusCode,
1266 method,
1267 groupResource,
1268 r.resourceName,
1269 message,
1270 retryAfter,
1271 true,
1272 )
1273 }
1274
1275
1276 func isTextResponse(resp *http.Response) bool {
1277 contentType := resp.Header.Get("Content-Type")
1278 if len(contentType) == 0 {
1279 return true
1280 }
1281 media, _, err := mime.ParseMediaType(contentType)
1282 if err != nil {
1283 return false
1284 }
1285 return strings.HasPrefix(media, "text/")
1286 }
1287
1288
1289
1290 func retryAfterSeconds(resp *http.Response) (int, bool) {
1291 if h := resp.Header.Get("Retry-After"); len(h) > 0 {
1292 if i, err := strconv.Atoi(h); err == nil {
1293 return i, true
1294 }
1295 }
1296 return 0, false
1297 }
1298
1299
1300 type Result struct {
1301 body []byte
1302 warnings []net.WarningHeader
1303 contentType string
1304 err error
1305 statusCode int
1306
1307 decoder runtime.Decoder
1308 }
1309
1310
1311 func (r Result) Raw() ([]byte, error) {
1312 return r.body, r.err
1313 }
1314
1315
1316
1317
1318 func (r Result) Get() (runtime.Object, error) {
1319 if r.err != nil {
1320
1321 return nil, r.Error()
1322 }
1323 if r.decoder == nil {
1324 return nil, fmt.Errorf("serializer for %s doesn't exist", r.contentType)
1325 }
1326
1327
1328 out, _, err := r.decoder.Decode(r.body, nil, nil)
1329 if err != nil {
1330 return nil, err
1331 }
1332 switch t := out.(type) {
1333 case *metav1.Status:
1334
1335 if t.Status != metav1.StatusSuccess {
1336 return nil, errors.FromObject(t)
1337 }
1338 }
1339 return out, nil
1340 }
1341
1342
1343
1344 func (r Result) StatusCode(statusCode *int) Result {
1345 *statusCode = r.statusCode
1346 return r
1347 }
1348
1349
1350
1351
1352 func (r Result) ContentType(contentType *string) Result {
1353 *contentType = r.contentType
1354 return r
1355 }
1356
1357
1358
1359
1360 func (r Result) Into(obj runtime.Object) error {
1361 if r.err != nil {
1362
1363 return r.Error()
1364 }
1365 if r.decoder == nil {
1366 return fmt.Errorf("serializer for %s doesn't exist", r.contentType)
1367 }
1368 if len(r.body) == 0 {
1369 return fmt.Errorf("0-length response with status code: %d and content type: %s",
1370 r.statusCode, r.contentType)
1371 }
1372
1373 out, _, err := r.decoder.Decode(r.body, nil, obj)
1374 if err != nil || out == obj {
1375 return err
1376 }
1377
1378
1379 switch t := out.(type) {
1380 case *metav1.Status:
1381
1382 if t.Status != metav1.StatusSuccess {
1383 return errors.FromObject(t)
1384 }
1385 }
1386 return nil
1387 }
1388
1389
1390
1391 func (r Result) WasCreated(wasCreated *bool) Result {
1392 *wasCreated = r.statusCode == http.StatusCreated
1393 return r
1394 }
1395
1396
1397
1398
1399
1400 func (r Result) Error() error {
1401
1402
1403 if r.err == nil || !errors.IsUnexpectedServerError(r.err) || len(r.body) == 0 || r.decoder == nil {
1404 return r.err
1405 }
1406
1407
1408
1409 out, _, err := r.decoder.Decode(r.body, &schema.GroupVersionKind{Version: "v1"}, nil)
1410 if err != nil {
1411 klog.V(5).Infof("body was not decodable (unable to check for Status): %v", err)
1412 return r.err
1413 }
1414 switch t := out.(type) {
1415 case *metav1.Status:
1416
1417 if t.Status == metav1.StatusFailure {
1418 return errors.FromObject(t)
1419 }
1420 }
1421 return r.err
1422 }
1423
1424
1425 func (r Result) Warnings() []net.WarningHeader {
1426 return r.warnings
1427 }
1428
1429
1430 var NameMayNotBe = []string{".", ".."}
1431
1432
1433 var NameMayNotContain = []string{"/", "%"}
1434
1435
1436 func IsValidPathSegmentName(name string) []string {
1437 for _, illegalName := range NameMayNotBe {
1438 if name == illegalName {
1439 return []string{fmt.Sprintf(`may not be '%s'`, illegalName)}
1440 }
1441 }
1442
1443 var errors []string
1444 for _, illegalContent := range NameMayNotContain {
1445 if strings.Contains(name, illegalContent) {
1446 errors = append(errors, fmt.Sprintf(`may not contain '%s'`, illegalContent))
1447 }
1448 }
1449
1450 return errors
1451 }
1452
1453
1454
1455 func IsValidPathSegmentPrefix(name string) []string {
1456 var errors []string
1457 for _, illegalContent := range NameMayNotContain {
1458 if strings.Contains(name, illegalContent) {
1459 errors = append(errors, fmt.Sprintf(`may not contain '%s'`, illegalContent))
1460 }
1461 }
1462
1463 return errors
1464 }
1465
1466
1467 func ValidatePathSegmentName(name string, prefix bool) []string {
1468 if prefix {
1469 return IsValidPathSegmentPrefix(name)
1470 }
1471 return IsValidPathSegmentName(name)
1472 }
1473
View as plain text