17 package rest
19 import (
20 "bytes"
21 "context"
22 "errors"
23 "flag"
24 "fmt"
25 "io"
26 "net"
27 "net/http"
28 "net/http/httptest"
29 "net/url"
30 "os"
31 "reflect"
32 "strings"
33 "sync"
34 "sync/atomic"
35 "syscall"
36 "testing"
37 "time"
39 "github.com/google/go-cmp/cmp"
40 v1 "k8s.io/api/core/v1"
41 apiequality "k8s.io/apimachinery/pkg/api/equality"
42 apierrors "k8s.io/apimachinery/pkg/api/errors"
43 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
44 "k8s.io/apimachinery/pkg/runtime"
45 "k8s.io/apimachinery/pkg/runtime/schema"
46 "k8s.io/apimachinery/pkg/runtime/serializer"
47 "k8s.io/apimachinery/pkg/runtime/serializer/streaming"
48 "k8s.io/apimachinery/pkg/util/intstr"
49 utilnet "k8s.io/apimachinery/pkg/util/net"
50 "k8s.io/apimachinery/pkg/watch"
51 "k8s.io/client-go/kubernetes/scheme"
52 restclientwatch "k8s.io/client-go/rest/watch"
53 "k8s.io/client-go/tools/metrics"
54 "k8s.io/client-go/util/flowcontrol"
55 utiltesting "k8s.io/client-go/util/testing"
56 "k8s.io/klog/v2"
57 testingclock "k8s.io/utils/clock/testing"
58 )
60 func TestNewRequestSetsAccept(t *testing.T) {
61 r := NewRequestWithClient(&url.URL{Path: "/path/"}, "", ClientContentConfig{}, nil).Verb("get")
62 if r.headers.Get("Accept") != "" {
63 t.Errorf("unexpected headers: %#v", r.headers)
64 }
65 r = NewRequestWithClient(&url.URL{Path: "/path/"}, "", ClientContentConfig{ContentType: "application/other"}, nil).Verb("get")
66 if r.headers.Get("Accept") != "application/other, */*" {
67 t.Errorf("unexpected headers: %#v", r.headers)
68 }
69 }
71 func clientForFunc(fn clientFunc) *http.Client {
72 return &http.Client{
73 Transport: fn,
74 }
75 }
77 type clientFunc func(req *http.Request) (*http.Response, error)
79 func (f clientFunc) RoundTrip(req *http.Request) (*http.Response, error) {
80 return f(req)
81 }
83 func TestRequestSetsHeaders(t *testing.T) {
84 server := clientForFunc(func(req *http.Request) (*http.Response, error) {
85 if req.Header.Get("Accept") != "application/other, */*" {
86 t.Errorf("unexpected headers: %#v", req.Header)
87 }
88 return &http.Response{
89 StatusCode: http.StatusForbidden,
90 Body: io.NopCloser(bytes.NewReader([]byte{})),
91 }, nil
92 })
93 config := defaultContentConfig()
94 config.ContentType = "application/other"
95 r := NewRequestWithClient(&url.URL{Path: "/path"}, "", config, nil).Verb("get")
96 r.c.Client = server
99 _ = r.Do(context.Background())
100 _, _ = r.Watch(context.Background())
101 _, _ = r.Stream(context.Background())
102 }
104 func TestRequestWithErrorWontChange(t *testing.T) {
105 gvCopy := v1.SchemeGroupVersion
106 original := Request{
107 err: errors.New("test"),
108 c: &RESTClient{
109 content: ClientContentConfig{GroupVersion: gvCopy},
110 },
111 }
112 r := original
113 changed := r.Param("foo", "bar").
114 AbsPath("/abs").
115 Prefix("test").
116 Suffix("testing").
117 Namespace("new").
118 Resource("foos").
119 Name("bars").
120 Body("foo").
121 Timeout(time.Millisecond)
122 if changed != &r {
123 t.Errorf("returned request should point to the same object")
124 }
125 if !reflect.DeepEqual(changed, &original) {
126 t.Errorf("expected %#v, got %#v", &original, changed)
127 }
128 }
130 func TestRequestPreservesBaseTrailingSlash(t *testing.T) {
131 r := &Request{c: &RESTClient{base: &url.URL{}}, pathPrefix: "/path/"}
132 if s := r.URL().String(); s != "/path/" {
133 t.Errorf("trailing slash should be preserved: %s", s)
134 }
135 }
137 func TestRequestAbsPathPreservesTrailingSlash(t *testing.T) {
138 r := (&Request{c: &RESTClient{base: &url.URL{}}}).AbsPath("/foo/")
139 if s := r.URL().String(); s != "/foo/" {
140 t.Errorf("trailing slash should be preserved: %s", s)
141 }
142 }
144 func TestRequestAbsPathJoins(t *testing.T) {
145 r := (&Request{c: &RESTClient{base: &url.URL{}}}).AbsPath("foo/bar", "baz")
146 if s := r.URL().String(); s != "foo/bar/baz" {
147 t.Errorf("trailing slash should be preserved: %s", s)
148 }
149 }
151 func TestRequestSetsNamespace(t *testing.T) {
152 r := (&Request{
153 c: &RESTClient{base: &url.URL{Path: "/"}},
154 }).Namespace("foo")
155 if r.namespace == "" {
156 t.Errorf("namespace should be set: %#v", r)
157 }
159 if s := r.URL().String(); s != "namespaces/foo" {
160 t.Errorf("namespace should be in path: %s", s)
161 }
162 }
164 func TestRequestOrdersNamespaceInPath(t *testing.T) {
165 r := (&Request{
166 c: &RESTClient{base: &url.URL{}},
167 pathPrefix: "/test/",
168 }).Name("bar").Resource("baz").Namespace("foo")
169 if s := r.URL().String(); s != "/test/namespaces/foo/baz/bar" {
170 t.Errorf("namespace should be in order in path: %s", s)
171 }
172 }
174 func TestRequestOrdersSubResource(t *testing.T) {
175 r := (&Request{
176 c: &RESTClient{base: &url.URL{}},
177 pathPrefix: "/test/",
178 }).Name("bar").Resource("baz").Namespace("foo").Suffix("test").SubResource("a", "b")
179 if s := r.URL().String(); s != "/test/namespaces/foo/baz/bar/a/b/test" {
180 t.Errorf("namespace should be in order in path: %s", s)
181 }
182 }
184 func TestRequestSetTwiceError(t *testing.T) {
185 if (&Request{}).Name("bar").Name("baz").err == nil {
186 t.Errorf("setting name twice should result in error")
187 }
188 if (&Request{}).Namespace("bar").Namespace("baz").err == nil {
189 t.Errorf("setting namespace twice should result in error")
190 }
191 if (&Request{}).Resource("bar").Resource("baz").err == nil {
192 t.Errorf("setting resource twice should result in error")
193 }
194 if (&Request{}).SubResource("bar").SubResource("baz").err == nil {
195 t.Errorf("setting subresource twice should result in error")
196 }
197 }
199 func TestInvalidSegments(t *testing.T) {
200 invalidSegments := []string{".", "..", "test/segment", "test%2bsegment"}
201 setters := map[string]func(string, *Request){
202 "namespace": func(s string, r *Request) { r.Namespace(s) },
203 "resource": func(s string, r *Request) { r.Resource(s) },
204 "name": func(s string, r *Request) { r.Name(s) },
205 "subresource": func(s string, r *Request) { r.SubResource(s) },
206 }
207 for _, invalidSegment := range invalidSegments {
208 for setterName, setter := range setters {
209 r := &Request{}
210 setter(invalidSegment, r)
211 if r.err == nil {
212 t.Errorf("%s: %s: expected error, got none", setterName, invalidSegment)
213 }
214 }
215 }
216 }
218 func TestRequestParam(t *testing.T) {
219 r := (&Request{}).Param("foo", "a")
220 if !reflect.DeepEqual(r.params, url.Values{"foo": []string{"a"}}) {
221 t.Errorf("should have set a param: %#v", r)
222 }
224 r.Param("bar", "1")
225 r.Param("bar", "2")
226 if !reflect.DeepEqual(r.params, url.Values{"foo": []string{"a"}, "bar": []string{"1", "2"}}) {
227 t.Errorf("should have set a param: %#v", r)
228 }
229 }
231 func TestRequestVersionedParams(t *testing.T) {
232 r := (&Request{c: &RESTClient{content: ClientContentConfig{GroupVersion: v1.SchemeGroupVersion}}}).Param("foo", "a")
233 if !reflect.DeepEqual(r.params, url.Values{"foo": []string{"a"}}) {
234 t.Errorf("should have set a param: %#v", r)
235 }
236 r.VersionedParams(&v1.PodLogOptions{Follow: true, Container: "bar"}, scheme.ParameterCodec)
238 if !reflect.DeepEqual(r.params, url.Values{
239 "foo": []string{"a"},
240 "container": []string{"bar"},
241 "follow": []string{"true"},
242 }) {
243 t.Errorf("should have set a param: %#v", r)
244 }
245 }
247 func TestRequestVersionedParamsFromListOptions(t *testing.T) {
248 r := &Request{c: &RESTClient{content: ClientContentConfig{GroupVersion: v1.SchemeGroupVersion}}}
249 r.VersionedParams(&metav1.ListOptions{ResourceVersion: "1"}, scheme.ParameterCodec)
250 if !reflect.DeepEqual(r.params, url.Values{
251 "resourceVersion": []string{"1"},
252 }) {
253 t.Errorf("should have set a param: %#v", r)
254 }
256 var timeout int64 = 10
257 r.VersionedParams(&metav1.ListOptions{ResourceVersion: "2", TimeoutSeconds: &timeout}, scheme.ParameterCodec)
258 if !reflect.DeepEqual(r.params, url.Values{
259 "resourceVersion": []string{"1", "2"},
260 "timeoutSeconds": []string{"10"},
261 }) {
262 t.Errorf("should have set a param: %#v %v", r.params, r.err)
263 }
264 }
266 func TestRequestVersionedParamsWithInvalidScheme(t *testing.T) {
267 parameterCodec := runtime.NewParameterCodec(runtime.NewScheme())
268 r := (&Request{c: &RESTClient{content: ClientContentConfig{GroupVersion: v1.SchemeGroupVersion}}})
269 r.VersionedParams(&v1.PodExecOptions{Stdin: false, Stdout: true},
270 parameterCodec)
272 if r.Error() == nil {
273 t.Errorf("should have recorded an error: %#v", r.params)
274 }
275 }
277 func TestRequestError(t *testing.T) {
279 r := (&Request{}).Body([]string{"test"})
281 if r.Error() != r.err {
282 t.Errorf("getter should be identical to reference: %#v %#v", r.Error(), r.err)
283 }
284 }
286 func TestRequestURI(t *testing.T) {
287 r := (&Request{}).Param("foo", "a")
288 r.Prefix("other")
289 r.RequestURI("/test?foo=b&a=b&c=1&c=2")
290 if r.pathPrefix != "/test" {
291 t.Errorf("path is wrong: %#v", r)
292 }
293 if !reflect.DeepEqual(r.params, url.Values{"a": []string{"b"}, "foo": []string{"b"}, "c": []string{"1", "2"}}) {
294 t.Errorf("should have set a param: %#v", r)
295 }
296 }
298 type NotAnAPIObject struct{}
300 func (obj NotAnAPIObject) GroupVersionKind() *schema.GroupVersionKind { return nil }
301 func (obj NotAnAPIObject) SetGroupVersionKind(gvk *schema.GroupVersionKind) {}
303 func defaultContentConfig() ClientContentConfig {
304 gvCopy := v1.SchemeGroupVersion
305 return ClientContentConfig{
306 ContentType: "application/json",
307 GroupVersion: gvCopy,
308 Negotiator: runtime.NewClientNegotiator(scheme.Codecs.WithoutConversion(), gvCopy),
309 }
310 }
312 func TestRequestBody(t *testing.T) {
314 r := (&Request{}).Body([]string{"test"})
315 if r.err == nil || r.body != nil {
316 t.Errorf("should have set err and left body nil: %#v", r)
317 }
320 f, err := os.CreateTemp("", "")
321 if err != nil {
322 t.Fatalf("unable to create temp file")
323 }
324 defer f.Close()
325 os.Remove(f.Name())
326 r = (&Request{}).Body(f.Name())
327 if r.err == nil || r.body != nil {
328 t.Errorf("should have set err and left body nil: %#v", r)
329 }
332 r = (&Request{c: &RESTClient{content: defaultContentConfig()}}).Body(&NotAnAPIObject{})
333 if r.err == nil || r.body != nil {
334 t.Errorf("should have set err and left body nil: %#v", r)
335 }
336 }
338 func TestResultIntoWithErrReturnsErr(t *testing.T) {
339 res := Result{err: errors.New("test")}
340 if err := res.Into(&v1.Pod{}); err != res.err {
341 t.Errorf("should have returned exact error from result")
342 }
343 }
345 func TestResultIntoWithNoBodyReturnsErr(t *testing.T) {
346 res := Result{
347 body: []byte{},
348 decoder: scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion),
349 }
350 if err := res.Into(&v1.Pod{}); err == nil || !strings.Contains(err.Error(), "0-length") {
351 t.Errorf("should have complained about 0 length body")
352 }
353 }
355 func TestURLTemplate(t *testing.T) {
356 uri, _ := url.Parse("http://localhost/some/base/url/path")
357 uriSingleSlash, _ := url.Parse("http://localhost/")
358 testCases := []struct {
359 Request *Request
360 ExpectedFullURL string
361 ExpectedFinalURL string
362 }{
363 {
365 Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("POST").
366 Prefix("api", "v1").Resource("r1").Namespace("ns").Name("nm").Param("p0", "v0"),
367 ExpectedFullURL: "http://localhost/some/base/url/path/api/v1/namespaces/ns/r1/nm?p0=v0",
368 ExpectedFinalURL: "http://localhost/some/base/url/path/api/v1/namespaces/%7Bnamespace%7D/r1/%7Bname%7D?p0=%7Bvalue%7D",
369 },
370 {
372 Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("POST").
373 Prefix("pre1", "v1").Resource("r1").Namespace("ns").Name("nm").Param("p0", "v0"),
374 ExpectedFullURL: "http://localhost/some/base/url/path/pre1/v1/namespaces/ns/r1/nm?p0=v0",
375 ExpectedFinalURL: "http://localhost/%7Bprefix%7D",
376 },
377 {
380 Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE").
381 Prefix("/api/v1/namespaces/ns/r1/name1"),
382 ExpectedFullURL: "http://localhost/some/base/url/path/api/v1/namespaces/ns/r1/name1",
383 ExpectedFinalURL: "http://localhost/some/base/url/path/api/v1/namespaces/%7Bnamespace%7D/r1/%7Bname%7D",
384 },
385 {
388 Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE").
389 Prefix("/apis/g1/v1/namespaces/ns/r1/name1"),
390 ExpectedFullURL: "http://localhost/some/base/url/path/apis/g1/v1/namespaces/ns/r1/name1",
391 ExpectedFinalURL: "http://localhost/some/base/url/path/apis/g1/v1/namespaces/%7Bnamespace%7D/r1/%7Bname%7D",
392 },
393 {
396 Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE").
397 Prefix("/api/v1/namespaces/ns/r1"),
398 ExpectedFullURL: "http://localhost/some/base/url/path/api/v1/namespaces/ns/r1",
399 ExpectedFinalURL: "http://localhost/some/base/url/path/api/v1/namespaces/%7Bnamespace%7D/r1",
400 },
401 {
404 Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE").
405 Prefix("/apis/g1/v1/namespaces/ns/r1"),
406 ExpectedFullURL: "http://localhost/some/base/url/path/apis/g1/v1/namespaces/ns/r1",
407 ExpectedFinalURL: "http://localhost/some/base/url/path/apis/g1/v1/namespaces/%7Bnamespace%7D/r1",
408 },
409 {
412 Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE").
413 Prefix("/api/v1/r1/name1"),
414 ExpectedFullURL: "http://localhost/some/base/url/path/api/v1/r1/name1",
415 ExpectedFinalURL: "http://localhost/some/base/url/path/api/v1/r1/%7Bname%7D",
416 },
417 {
420 Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE").
421 Prefix("/apis/g1/v1/r1/name1"),
422 ExpectedFullURL: "http://localhost/some/base/url/path/apis/g1/v1/r1/name1",
423 ExpectedFinalURL: "http://localhost/some/base/url/path/apis/g1/v1/r1/%7Bname%7D",
424 },
425 {
428 Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE").
429 Prefix("/apis/namespaces/namespaces/namespaces/namespaces/namespaces/namespaces/finalize"),
430 ExpectedFullURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces/namespaces/namespaces/namespaces/finalize",
431 ExpectedFinalURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces/%7Bnamespace%7D/namespaces/%7Bname%7D/finalize",
432 },
433 {
436 Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE").
437 Prefix("/apis/namespaces/namespaces/namespaces/namespaces/namespaces/namespaces"),
438 ExpectedFullURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces/namespaces/namespaces/namespaces",
439 ExpectedFinalURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces/%7Bnamespace%7D/namespaces/%7Bname%7D",
440 },
441 {
444 Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE").
445 Prefix("/apis/namespaces/namespaces/namespaces/namespaces/namespaces/finalize"),
446 ExpectedFullURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces/namespaces/namespaces/finalize",
447 ExpectedFinalURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces/%7Bnamespace%7D/namespaces/finalize",
448 },
449 {
452 Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE").
453 Prefix("/apis/namespaces/namespaces/namespaces/namespaces/namespaces/status"),
454 ExpectedFullURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces/namespaces/namespaces/status",
455 ExpectedFinalURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces/%7Bnamespace%7D/namespaces/status",
456 },
457 {
460 Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE").
461 Prefix("/apis/namespaces/namespaces/namespaces/namespaces/namespaces"),
462 ExpectedFullURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces/namespaces/namespaces",
463 ExpectedFinalURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces/%7Bnamespace%7D/namespaces",
464 },
465 {
468 Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE").
469 Prefix("/apis/namespaces/namespaces/namespaces/namespaces/finalize"),
470 ExpectedFullURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces/namespaces/finalize",
471 ExpectedFinalURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces/%7Bname%7D/finalize",
472 },
473 {
476 Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE").
477 Prefix("/apis/namespaces/namespaces/namespaces/namespaces/status"),
478 ExpectedFullURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces/namespaces/status",
479 ExpectedFinalURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces/%7Bname%7D/status",
480 },
481 {
484 Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE").
485 Prefix("/apis/namespaces/namespaces/namespaces/namespaces"),
486 ExpectedFullURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces/namespaces",
487 ExpectedFinalURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces/%7Bname%7D",
488 },
489 {
492 Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE").
493 Prefix("/apis/namespaces/namespaces/namespaces"),
494 ExpectedFullURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces",
495 ExpectedFinalURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces",
496 },
497 {
500 Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE").
501 Prefix("/pre1/namespaces/namespaces/namespaces/namespaces/namespaces/namespaces/finalize"),
502 ExpectedFullURL: "http://localhost/some/base/url/path/pre1/namespaces/namespaces/namespaces/namespaces/namespaces/namespaces/finalize",
503 ExpectedFinalURL: "http://localhost/%7Bprefix%7D",
504 },
505 {
508 Request: NewRequestWithClient(uriSingleSlash, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE").
509 Prefix("/api/v1/namespaces/ns/r2/name1"),
510 ExpectedFullURL: "http://localhost/api/v1/namespaces/ns/r2/name1",
511 ExpectedFinalURL: "http://localhost/api/v1/namespaces/%7Bnamespace%7D/r2/%7Bname%7D",
512 },
513 {
516 Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE").
517 Prefix("/api/v1/namespaces/ns/r3/name1"),
518 ExpectedFullURL: "http://localhost/some/base/url/path/api/v1/namespaces/ns/r3/name1",
519 ExpectedFinalURL: "http://localhost/some/base/url/path/api/v1/namespaces/%7Bnamespace%7D/r3/%7Bname%7D",
520 },
521 {
524 Request: NewRequestWithClient(uriSingleSlash, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE").
525 Prefix("/"),
526 ExpectedFullURL: "http://localhost/",
527 ExpectedFinalURL: "http://localhost/",
528 },
529 {
532 Request: NewRequestWithClient(uriSingleSlash, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE").
533 Prefix("/version"),
534 ExpectedFullURL: "http://localhost/version",
535 ExpectedFinalURL: "http://localhost/version",
536 },
537 }
538 for i, testCase := range testCases {
539 r := testCase.Request
540 full := r.URL()
541 if full.String() != testCase.ExpectedFullURL {
542 t.Errorf("%d: unexpected initial URL: %s %s", i, full, testCase.ExpectedFullURL)
543 }
544 actualURL := r.finalURLTemplate()
545 actual := actualURL.String()
546 if actual != testCase.ExpectedFinalURL {
547 t.Errorf("%d: unexpected URL template: %s %s", i, actual, testCase.ExpectedFinalURL)
548 }
549 if r.URL().String() != full.String() {
550 t.Errorf("%d, creating URL template changed request: %s -> %s", i, full.String(), r.URL().String())
551 }
552 }
553 }
555 func TestTransformResponse(t *testing.T) {
556 invalid := []byte("aaaaa")
557 uri, _ := url.Parse("http://localhost")
558 testCases := []struct {
559 Response *http.Response
560 Data []byte
561 Created bool
562 Error bool
563 ErrFn func(err error) bool
564 }{
565 {Response: &http.Response{StatusCode: http.StatusOK}, Data: []byte{}},
566 {Response: &http.Response{StatusCode: http.StatusCreated}, Data: []byte{}, Created: true},
567 {Response: &http.Response{StatusCode: 199}, Error: true},
568 {Response: &http.Response{StatusCode: http.StatusInternalServerError}, Error: true},
569 {Response: &http.Response{StatusCode: http.StatusUnprocessableEntity}, Error: true},
570 {Response: &http.Response{StatusCode: http.StatusConflict}, Error: true},
571 {Response: &http.Response{StatusCode: http.StatusNotFound}, Error: true},
572 {Response: &http.Response{StatusCode: http.StatusUnauthorized}, Error: true},
573 {
574 Response: &http.Response{
575 StatusCode: http.StatusUnauthorized,
576 Header: http.Header{"Content-Type": []string{"application/json"}},
577 Body: io.NopCloser(bytes.NewReader(invalid)),
578 },
579 Error: true,
580 ErrFn: func(err error) bool {
581 return err.Error() != "aaaaa" && apierrors.IsUnauthorized(err)
582 },
583 },
584 {
585 Response: &http.Response{
586 StatusCode: http.StatusUnauthorized,
587 Header: http.Header{"Content-Type": []string{"text/any"}},
588 Body: io.NopCloser(bytes.NewReader(invalid)),
589 },
590 Error: true,
591 ErrFn: func(err error) bool {
592 return strings.Contains(err.Error(), "server has asked for the client to provide") && apierrors.IsUnauthorized(err)
593 },
594 },
595 {Response: &http.Response{StatusCode: http.StatusForbidden}, Error: true},
596 {Response: &http.Response{StatusCode: http.StatusOK, Body: io.NopCloser(bytes.NewReader(invalid))}, Data: invalid},
597 {Response: &http.Response{StatusCode: http.StatusOK, Body: io.NopCloser(bytes.NewReader(invalid))}, Data: invalid},
598 }
599 for i, test := range testCases {
600 r := NewRequestWithClient(uri, "", defaultContentConfig(), nil)
601 if test.Response.Body == nil {
602 test.Response.Body = io.NopCloser(bytes.NewReader([]byte{}))
603 }
604 result := r.transformResponse(test.Response, &http.Request{})
605 response, created, err := result.body, result.statusCode == http.StatusCreated, result.err
606 hasErr := err != nil
607 if hasErr != test.Error {
608 t.Errorf("%d: unexpected error: %t %v", i, test.Error, err)
609 } else if hasErr && test.Response.StatusCode > 399 {
610 status, ok := err.(apierrors.APIStatus)
611 if !ok {
612 t.Errorf("%d: response should have been transformable into APIStatus: %v", i, err)
613 continue
614 }
615 if int(status.Status().Code) != test.Response.StatusCode {
616 t.Errorf("%d: status code did not match response: %#v", i, status.Status())
617 }
618 }
619 if test.ErrFn != nil && !test.ErrFn(err) {
620 t.Errorf("%d: error function did not match: %v", i, err)
621 }
622 if !(test.Data == nil && response == nil) && !apiequality.Semantic.DeepDerivative(test.Data, response) {
623 t.Errorf("%d: unexpected response: %#v %#v", i, test.Data, response)
624 }
625 if test.Created != created {
626 t.Errorf("%d: expected created %t, got %t", i, test.Created, created)
627 }
628 }
629 }
631 type renegotiator struct {
632 called bool
633 contentType string
634 params map[string]string
635 decoder runtime.Decoder
636 err error
637 }
639 func (r *renegotiator) Decoder(contentType string, params map[string]string) (runtime.Decoder, error) {
640 r.called = true
641 r.contentType = contentType
642 r.params = params
643 return r.decoder, r.err
644 }
646 func (r *renegotiator) Encoder(contentType string, params map[string]string) (runtime.Encoder, error) {
647 return nil, fmt.Errorf("UNIMPLEMENTED")
648 }
650 func (r *renegotiator) StreamDecoder(contentType string, params map[string]string) (runtime.Decoder, runtime.Serializer, runtime.Framer, error) {
651 return nil, nil, nil, fmt.Errorf("UNIMPLEMENTED")
652 }
654 func TestTransformResponseNegotiate(t *testing.T) {
655 invalid := []byte("aaaaa")
656 uri, _ := url.Parse("http://localhost")
657 testCases := []struct {
658 Response *http.Response
659 Data []byte
660 Created bool
661 Error bool
662 ErrFn func(err error) bool
664 ContentType string
665 Called bool
666 ExpectContentType string
667 Decoder runtime.Decoder
668 NegotiateErr error
669 }{
670 {
671 ContentType: "application/json",
672 Response: &http.Response{
673 StatusCode: http.StatusUnauthorized,
674 Header: http.Header{"Content-Type": []string{"application/json"}},
675 Body: io.NopCloser(bytes.NewReader(invalid)),
676 },
677 Called: true,
678 ExpectContentType: "application/json",
679 Error: true,
680 ErrFn: func(err error) bool {
681 return err.Error() != "aaaaa" && apierrors.IsUnauthorized(err)
682 },
683 },
684 {
685 ContentType: "application/json",
686 Response: &http.Response{
687 StatusCode: http.StatusUnauthorized,
688 Header: http.Header{"Content-Type": []string{"application/protobuf"}},
689 Body: io.NopCloser(bytes.NewReader(invalid)),
690 },
691 Decoder: scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion),
693 Called: true,
694 ExpectContentType: "application/protobuf",
696 Error: true,
697 ErrFn: func(err error) bool {
698 return err.Error() != "aaaaa" && apierrors.IsUnauthorized(err)
699 },
700 },
701 {
702 ContentType: "application/json",
703 Response: &http.Response{
704 StatusCode: http.StatusInternalServerError,
705 Header: http.Header{"Content-Type": []string{"application/,others"}},
706 },
707 Decoder: scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion),
709 Error: true,
710 ErrFn: func(err error) bool {
711 return err.Error() == "Internal error occurred: mime: expected token after slash" && err.(apierrors.APIStatus).Status().Code == 500
712 },
713 },
714 {
716 Response: &http.Response{
717 StatusCode: http.StatusOK,
718 Header: http.Header{"Content-Type": []string{"text/any"}},
719 Body: io.NopCloser(bytes.NewReader(invalid)),
720 },
721 Decoder: scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion),
722 Called: true,
723 ExpectContentType: "text/any",
724 },
725 {
727 ContentType: "text/any",
728 Response: &http.Response{
729 StatusCode: http.StatusOK,
730 Body: io.NopCloser(bytes.NewReader(invalid)),
731 },
732 Decoder: scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion),
733 Called: true,
734 ExpectContentType: "text/any",
735 },
736 {
738 ContentType: "application/json",
739 Response: &http.Response{
740 StatusCode: http.StatusNotFound,
741 Header: http.Header{"Content-Type": []string{"application/unrecognized"}},
742 Body: io.NopCloser(bytes.NewReader(invalid)),
743 },
744 Decoder: scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion),
746 NegotiateErr: fmt.Errorf("aaaa"),
747 Called: true,
748 ExpectContentType: "application/unrecognized",
750 Error: true,
751 ErrFn: func(err error) bool {
752 return err.Error() != "aaaaa" && apierrors.IsNotFound(err)
753 },
754 },
755 }
756 for i, test := range testCases {
757 contentConfig := defaultContentConfig()
758 contentConfig.ContentType = test.ContentType
759 negotiator := &renegotiator{
760 decoder: test.Decoder,
761 err: test.NegotiateErr,
762 }
763 contentConfig.Negotiator = negotiator
764 r := NewRequestWithClient(uri, "", contentConfig, nil)
765 if test.Response.Body == nil {
766 test.Response.Body = io.NopCloser(bytes.NewReader([]byte{}))
767 }
768 result := r.transformResponse(test.Response, &http.Request{})
769 _, err := result.body, result.err
770 hasErr := err != nil
771 if hasErr != test.Error {
772 t.Errorf("%d: unexpected error: %t %v", i, test.Error, err)
773 continue
774 } else if hasErr && test.Response.StatusCode > 399 {
775 status, ok := err.(apierrors.APIStatus)
776 if !ok {
777 t.Errorf("%d: response should have been transformable into APIStatus: %v", i, err)
778 continue
779 }
780 if int(status.Status().Code) != test.Response.StatusCode {
781 t.Errorf("%d: status code did not match response: %#v", i, status.Status())
782 }
783 }
784 if test.ErrFn != nil && !test.ErrFn(err) {
785 t.Errorf("%d: error function did not match: %v", i, err)
786 }
787 if negotiator.called != test.Called {
788 t.Errorf("%d: negotiator called %t != %t", i, negotiator.called, test.Called)
789 }
790 if !test.Called {
791 continue
792 }
793 if negotiator.contentType != test.ExpectContentType {
794 t.Errorf("%d: unexpected content type: %s", i, negotiator.contentType)
795 }
796 }
797 }
799 func TestTransformUnstructuredError(t *testing.T) {
800 testCases := []struct {
801 Req *http.Request
802 Res *http.Response
804 Resource string
805 Name string
807 ErrFn func(error) bool
808 Transformed error
809 }{
810 {
811 Resource: "foo",
812 Name: "bar",
813 Req: &http.Request{
814 Method: "POST",
815 },
816 Res: &http.Response{
817 StatusCode: http.StatusConflict,
818 Body: io.NopCloser(bytes.NewReader(nil)),
819 },
820 ErrFn: apierrors.IsAlreadyExists,
821 },
822 {
823 Resource: "foo",
824 Name: "bar",
825 Req: &http.Request{
826 Method: "PUT",
827 },
828 Res: &http.Response{
829 StatusCode: http.StatusConflict,
830 Body: io.NopCloser(bytes.NewReader(nil)),
831 },
832 ErrFn: apierrors.IsConflict,
833 },
834 {
835 Resource: "foo",
836 Name: "bar",
837 Req: &http.Request{},
838 Res: &http.Response{
839 StatusCode: http.StatusNotFound,
840 Body: io.NopCloser(bytes.NewReader(nil)),
841 },
842 ErrFn: apierrors.IsNotFound,
843 },
844 {
845 Req: &http.Request{},
846 Res: &http.Response{
847 StatusCode: http.StatusBadRequest,
848 Body: io.NopCloser(bytes.NewReader(nil)),
849 },
850 ErrFn: apierrors.IsBadRequest,
851 },
852 {
854 Req: &http.Request{},
855 Res: &http.Response{StatusCode: http.StatusBadRequest, Body: io.NopCloser(bytes.NewReader([]byte(`{"kind":"Status","apiVersion":"v1","status":"Failure","code":404}`)))},
856 ErrFn: apierrors.IsBadRequest,
857 Transformed: &apierrors.StatusError{
858 ErrStatus: metav1.Status{Status: metav1.StatusFailure, Code: http.StatusNotFound},
859 },
860 },
861 {
863 Req: &http.Request{},
864 Res: &http.Response{StatusCode: http.StatusBadRequest, Body: io.NopCloser(bytes.NewReader([]byte(`{"kind":"Status","apiVersion":"v1","status":"Success","code":404}`)))},
865 ErrFn: apierrors.IsBadRequest,
866 },
867 {
869 Req: &http.Request{},
870 Res: &http.Response{StatusCode: http.StatusBadRequest, Body: io.NopCloser(bytes.NewReader([]byte(`{}`)))},
871 ErrFn: apierrors.IsBadRequest,
872 },
873 {
876 Req: &http.Request{},
877 Res: &http.Response{StatusCode: http.StatusBadRequest, Body: io.NopCloser(bytes.NewReader([]byte(`{"kind":"Status","status":"Failure","code":404}`)))},
878 ErrFn: apierrors.IsBadRequest,
879 Transformed: &apierrors.StatusError{
880 ErrStatus: metav1.Status{Status: metav1.StatusFailure, Code: http.StatusNotFound},
881 },
882 },
883 {
885 Req: &http.Request{},
886 Res: &http.Response{StatusCode: http.StatusBadRequest, Body: io.NopCloser(bytes.NewReader([]byte(`{"status":"Failure","code":404}`)))},
887 ErrFn: apierrors.IsBadRequest,
888 },
889 }
891 for _, testCase := range testCases {
892 t.Run("", func(t *testing.T) {
893 r := &Request{
894 c: &RESTClient{
895 content: defaultContentConfig(),
896 },
897 resourceName: testCase.Name,
898 resource: testCase.Resource,
899 }
900 result := r.transformResponse(testCase.Res, testCase.Req)
901 err := result.err
902 if !testCase.ErrFn(err) {
903 t.Fatalf("unexpected error: %v", err)
904 }
905 if !apierrors.IsUnexpectedServerError(err) {
906 t.Errorf("unexpected error type: %v", err)
907 }
908 if len(testCase.Name) != 0 && !strings.Contains(err.Error(), testCase.Name) {
909 t.Errorf("unexpected error string: %s", err)
910 }
911 if len(testCase.Resource) != 0 && !strings.Contains(err.Error(), testCase.Resource) {
912 t.Errorf("unexpected error string: %s", err)
913 }
916 transformed := result.Error()
917 expect := testCase.Transformed
918 if expect == nil {
919 expect = err
920 }
921 if !reflect.DeepEqual(expect, transformed) {
922 t.Errorf("unexpected Error(): %s", cmp.Diff(expect, transformed))
923 }
926 if _, err := result.Get(); !reflect.DeepEqual(expect, err) {
927 t.Errorf("unexpected error on Get(): %s", cmp.Diff(expect, err))
928 }
931 if err := result.Into(&v1.Pod{}); !reflect.DeepEqual(expect, err) {
932 t.Errorf("unexpected error on Into(): %s", cmp.Diff(expect, err))
933 }
936 if _, err := result.Raw(); !reflect.DeepEqual(result.err, err) {
937 t.Errorf("unexpected error on Raw(): %s", cmp.Diff(expect, err))
938 }
939 })
940 }
941 }
943 func TestRequestWatch(t *testing.T) {
944 testCases := []struct {
945 name string
946 Request *Request
947 maxRetries int
948 serverReturns []responseErr
949 Expect []watch.Event
950 attemptsExpected int
951 Err bool
952 ErrFn func(error) bool
953 Empty bool
954 }{
955 {
956 name: "Request has error",
957 Request: &Request{err: errors.New("bail")},
958 attemptsExpected: 0,
959 Err: true,
960 },
961 {
962 name: "Client is nil, should use http.DefaultClient",
963 Request: &Request{c: &RESTClient{base: &url.URL{}}, pathPrefix: "%"},
964 Err: true,
965 },
966 {
967 name: "error is not retryable",
968 Request: &Request{
969 c: &RESTClient{
970 base: &url.URL{},
971 },
972 },
973 serverReturns: []responseErr{
974 {response: nil, err: errors.New("err")},
975 },
976 attemptsExpected: 1,
977 Err: true,
978 },
979 {
980 name: "server returns forbidden",
981 Request: &Request{
982 c: &RESTClient{
983 content: defaultContentConfig(),
984 base: &url.URL{},
985 },
986 },
987 serverReturns: []responseErr{
988 {response: &http.Response{
989 StatusCode: http.StatusForbidden,
990 Body: io.NopCloser(bytes.NewReader([]byte{})),
991 }, err: nil},
992 },
993 attemptsExpected: 1,
994 Expect: []watch.Event{
995 {
996 Type: watch.Error,
997 Object: &metav1.Status{
998 Status: "Failure",
999 Code: 500,
1000 Reason: "InternalError",
1001 Message: `an error on the server ("unable to decode an event from the watch stream: test error") has prevented the request from succeeding`,
1002 Details: &metav1.StatusDetails{
1003 Causes: []metav1.StatusCause{
1004 {
1005 Type: "UnexpectedServerResponse",
1006 Message: "unable to decode an event from the watch stream: test error",
1007 },
1008 {
1009 Type: "ClientWatchDecoding",
1010 Message: "unable to decode an event from the watch stream: test error",
1011 },
1012 },
1013 },
1014 },
1015 },
1016 },
1017 Err: true,
1018 ErrFn: func(err error) bool {
1019 return apierrors.IsForbidden(err)
1020 },
1021 },
1022 {
1023 name: "server returns forbidden",
1024 Request: &Request{
1025 c: &RESTClient{
1026 content: defaultContentConfig(),
1027 base: &url.URL{},
1028 },
1029 },
1030 serverReturns: []responseErr{
1031 {response: &http.Response{
1032 StatusCode: http.StatusForbidden,
1033 Body: io.NopCloser(bytes.NewReader([]byte{})),
1034 }, err: nil},
1035 },
1036 attemptsExpected: 1,
1037 Err: true,
1038 ErrFn: func(err error) bool {
1039 return apierrors.IsForbidden(err)
1040 },
1041 },
1042 {
1043 name: "server returns unauthorized",
1044 Request: &Request{
1045 c: &RESTClient{
1046 content: defaultContentConfig(),
1047 base: &url.URL{},
1048 },
1049 },
1050 serverReturns: []responseErr{
1051 {response: &http.Response{
1052 StatusCode: http.StatusUnauthorized,
1053 Body: io.NopCloser(bytes.NewReader([]byte{})),
1054 }, err: nil},
1055 },
1056 attemptsExpected: 1,
1057 Err: true,
1058 ErrFn: func(err error) bool {
1059 return apierrors.IsUnauthorized(err)
1060 },
1061 },
1062 {
1063 name: "server returns unauthorized",
1064 Request: &Request{
1065 c: &RESTClient{
1066 content: defaultContentConfig(),
1067 base: &url.URL{},
1068 },
1069 },
1070 serverReturns: []responseErr{
1071 {response: &http.Response{
1072 StatusCode: http.StatusUnauthorized,
1073 Body: io.NopCloser(bytes.NewReader([]byte(runtime.EncodeOrDie(scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &metav1.Status{
1074 Status: metav1.StatusFailure,
1075 Reason: metav1.StatusReasonUnauthorized,
1076 })))),
1077 }, err: nil},
1078 },
1079 attemptsExpected: 1,
1080 Err: true,
1081 ErrFn: func(err error) bool {
1082 return apierrors.IsUnauthorized(err)
1083 },
1084 },
1085 {
1086 name: "server returns EOF error",
1087 Request: &Request{
1088 c: &RESTClient{
1089 base: &url.URL{},
1090 },
1091 },
1092 serverReturns: []responseErr{
1093 {response: nil, err: io.EOF},
1094 },
1095 attemptsExpected: 1,
1096 Empty: true,
1097 },
1098 {
1099 name: "server returns can't write HTTP request on broken connection error",
1100 Request: &Request{
1101 c: &RESTClient{
1102 base: &url.URL{},
1103 },
1104 },
1105 serverReturns: []responseErr{
1106 {response: nil, err: errors.New("http: can't write HTTP request on broken connection")},
1107 },
1108 attemptsExpected: 1,
1109 Empty: true,
1110 },
1111 {
1112 name: "server returns connection reset by peer",
1113 Request: &Request{
1114 c: &RESTClient{
1115 base: &url.URL{},
1116 },
1117 },
1118 serverReturns: []responseErr{
1119 {response: nil, err: errors.New("foo: connection reset by peer")},
1120 },
1121 attemptsExpected: 1,
1122 Empty: true,
1123 },
1124 {
1125 name: "max retries 2, server always returns EOF error",
1126 Request: &Request{
1127 c: &RESTClient{
1128 base: &url.URL{},
1129 },
1130 },
1131 maxRetries: 2,
1132 attemptsExpected: 3,
1133 serverReturns: []responseErr{
1134 {response: nil, err: io.EOF},
1135 {response: nil, err: io.EOF},
1136 {response: nil, err: io.EOF},
1137 },
1138 Empty: true,
1139 },
1140 {
1141 name: "max retries 2, server always returns a response with Retry-After header",
1142 Request: &Request{
1143 c: &RESTClient{
1144 base: &url.URL{},
1145 },
1146 },
1147 maxRetries: 2,
1148 attemptsExpected: 3,
1149 serverReturns: []responseErr{
1150 {response: retryAfterResponse(), err: nil},
1151 {response: retryAfterResponse(), err: nil},
1152 {response: retryAfterResponse(), err: nil},
1153 },
1154 Err: true,
1155 ErrFn: func(err error) bool {
1156 return apierrors.IsInternalError(err)
1157 },
1158 },
1159 }
1161 for _, testCase := range testCases {
1162 t.Run(testCase.name, func(t *testing.T) {
1163 var attemptsGot int
1164 client := clientForFunc(func(req *http.Request) (*http.Response, error) {
1165 defer func() {
1166 attemptsGot++
1167 }()
1169 if attemptsGot >= len(testCase.serverReturns) {
1170 t.Fatalf("Wrong test setup, the server does not know what to return")
1171 }
1172 re := testCase.serverReturns[attemptsGot]
1173 return re.response, re.err
1174 })
1175 if c := testCase.Request.c; c != nil && len(testCase.serverReturns) > 0 {
1176 c.Client = client
1177 }
1178 testCase.Request.backoff = &noSleepBackOff{}
1179 testCase.Request.maxRetries = testCase.maxRetries
1180 testCase.Request.retryFn = defaultRequestRetryFn
1182 watch, err := testCase.Request.Watch(context.Background())
1184 if watch == nil && err == nil {
1185 t.Fatal("Both watch.Interface and err returned by Watch are nil")
1186 }
1187 if testCase.attemptsExpected != attemptsGot {
1188 t.Errorf("Expected RoundTrip to be invoked %d times, but got: %d", testCase.attemptsExpected, attemptsGot)
1189 }
1190 hasErr := err != nil
1191 if hasErr != testCase.Err {
1192 t.Fatalf("expected %t, got %t: %v", testCase.Err, hasErr, err)
1193 }
1194 if testCase.ErrFn != nil && !testCase.ErrFn(err) {
1195 t.Errorf("error not valid: %v", err)
1196 }
1197 if hasErr && watch != nil {
1198 t.Fatalf("watch should be nil when error is returned")
1199 }
1200 if hasErr {
1201 return
1202 }
1203 defer watch.Stop()
1204 if testCase.Empty {
1205 evt, ok := <-watch.ResultChan()
1206 if ok {
1207 t.Errorf("expected the watch to be empty: %#v", evt)
1208 }
1209 }
1210 if testCase.Expect != nil {
1211 for i, evt := range testCase.Expect {
1212 out, ok := <-watch.ResultChan()
1213 if !ok {
1214 t.Fatalf("Watch closed early, %d/%d read", i, len(testCase.Expect))
1215 }
1216 if !reflect.DeepEqual(evt, out) {
1217 t.Fatalf("Event %d does not match: %s", i, cmp.Diff(evt, out))
1218 }
1219 }
1220 }
1221 })
1222 }
1223 }
1225 func TestRequestStream(t *testing.T) {
1226 testCases := []struct {
1227 name string
1228 Request *Request
1229 maxRetries int
1230 serverReturns []responseErr
1231 attemptsExpected int
1232 Err bool
1233 ErrFn func(error) bool
1234 }{
1235 {
1236 name: "request has error",
1237 Request: &Request{err: errors.New("bail")},
1238 attemptsExpected: 0,
1239 Err: true,
1240 },
1241 {
1242 name: "Client is nil, should use http.DefaultClient",
1243 Request: &Request{c: &RESTClient{base: &url.URL{}}, pathPrefix: "%"},
1244 Err: true,
1245 },
1246 {
1247 name: "server returns an error",
1248 Request: &Request{
1249 c: &RESTClient{
1250 base: &url.URL{},
1251 },
1252 },
1253 serverReturns: []responseErr{
1254 {response: nil, err: errors.New("err")},
1255 },
1256 attemptsExpected: 1,
1257 Err: true,
1258 },
1259 {
1260 Request: &Request{
1261 c: &RESTClient{
1262 content: defaultContentConfig(),
1263 base: &url.URL{},
1264 },
1265 },
1266 serverReturns: []responseErr{
1267 {response: &http.Response{
1268 StatusCode: http.StatusUnauthorized,
1269 Body: io.NopCloser(bytes.NewReader([]byte(runtime.EncodeOrDie(scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &metav1.Status{
1270 Status: metav1.StatusFailure,
1271 Reason: metav1.StatusReasonUnauthorized,
1272 })))),
1273 }, err: nil},
1274 },
1275 attemptsExpected: 1,
1276 Err: true,
1277 },
1278 {
1279 Request: &Request{
1280 c: &RESTClient{
1281 content: defaultContentConfig(),
1282 base: &url.URL{},
1283 },
1284 },
1285 serverReturns: []responseErr{
1286 {response: &http.Response{
1287 StatusCode: http.StatusBadRequest,
1288 Body: io.NopCloser(bytes.NewReader([]byte(`{"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"a container name must be specified for pod kube-dns-v20-mz5cv, choose one of: [kubedns dnsmasq healthz]","reason":"BadRequest","code":400}`))),
1289 }, err: nil},
1290 },
1291 attemptsExpected: 1,
1292 Err: true,
1293 ErrFn: func(err error) bool {
1294 if err.Error() == "a container name must be specified for pod kube-dns-v20-mz5cv, choose one of: [kubedns dnsmasq healthz]" {
1295 return true
1296 }
1297 return false
1298 },
1299 },
1300 {
1301 name: "max retries 1, server returns a retry-after response, non-bytes request, no retry",
1302 Request: &Request{
1303 body: &readSeeker{err: io.EOF},
1304 c: &RESTClient{
1305 base: &url.URL{},
1306 },
1307 },
1308 maxRetries: 1,
1309 attemptsExpected: 1,
1310 serverReturns: []responseErr{
1311 {response: retryAfterResponse(), err: nil},
1312 },
1313 Err: true,
1314 },
1315 {
1316 name: "max retries 2, server always returns a response with Retry-After header",
1317 Request: &Request{
1318 c: &RESTClient{
1319 base: &url.URL{},
1320 },
1321 },
1322 maxRetries: 2,
1323 attemptsExpected: 3,
1324 serverReturns: []responseErr{
1325 {response: retryAfterResponse(), err: nil},
1326 {response: retryAfterResponse(), err: nil},
1327 {response: retryAfterResponse(), err: nil},
1328 },
1329 Err: true,
1330 ErrFn: func(err error) bool {
1331 return apierrors.IsInternalError(err)
1332 },
1333 },
1334 {
1335 name: "server returns EOF after attempt 1, retry aborted",
1336 Request: &Request{
1337 c: &RESTClient{
1338 base: &url.URL{},
1339 },
1340 },
1341 maxRetries: 2,
1342 attemptsExpected: 2,
1343 serverReturns: []responseErr{
1344 {response: retryAfterResponse(), err: nil},
1345 {response: nil, err: io.EOF},
1346 },
1347 Err: true,
1348 ErrFn: func(err error) bool {
1349 return unWrap(err) == io.EOF
1350 },
1351 },
1352 {
1353 name: "max retries 2, server returns success on the final attempt",
1354 Request: &Request{
1355 c: &RESTClient{
1356 base: &url.URL{},
1357 },
1358 },
1359 maxRetries: 2,
1360 attemptsExpected: 3,
1361 serverReturns: []responseErr{
1362 {response: retryAfterResponse(), err: nil},
1363 {response: retryAfterResponse(), err: nil},
1364 {response: &http.Response{
1365 StatusCode: http.StatusOK,
1366 Body: io.NopCloser(bytes.NewReader([]byte{})),
1367 }, err: nil},
1368 },
1369 },
1370 }
1372 for _, testCase := range testCases {
1373 t.Run(testCase.name, func(t *testing.T) {
1374 var attemptsGot int
1375 client := clientForFunc(func(req *http.Request) (*http.Response, error) {
1376 defer func() {
1377 attemptsGot++
1378 }()
1380 if attemptsGot >= len(testCase.serverReturns) {
1381 t.Fatalf("Wrong test setup, the server does not know what to return")
1382 }
1383 re := testCase.serverReturns[attemptsGot]
1384 return re.response, re.err
1385 })
1386 if c := testCase.Request.c; c != nil && len(testCase.serverReturns) > 0 {
1387 c.Client = client
1388 }
1389 testCase.Request.backoff = &noSleepBackOff{}
1390 testCase.Request.maxRetries = testCase.maxRetries
1391 testCase.Request.retryFn = defaultRequestRetryFn
1393 body, err := testCase.Request.Stream(context.Background())
1395 if body == nil && err == nil {
1396 t.Fatal("Both body and err returned by Stream are nil")
1397 }
1398 if testCase.attemptsExpected != attemptsGot {
1399 t.Errorf("Expected RoundTrip to be invoked %d times, but got: %d", testCase.attemptsExpected, attemptsGot)
1400 }
1402 hasErr := err != nil
1403 if hasErr != testCase.Err {
1404 t.Errorf("expected %t, got %t: %v", testCase.Err, hasErr, err)
1405 }
1406 if hasErr && body != nil {
1407 t.Error("body should be nil when error is returned")
1408 }
1410 if hasErr {
1411 if testCase.ErrFn != nil && !testCase.ErrFn(err) {
1412 t.Errorf("unexpected error: %#v", err)
1413 }
1414 }
1415 })
1416 }
1417 }
1419 func TestRequestDo(t *testing.T) {
1420 testCases := []struct {
1421 Request *Request
1422 Err bool
1423 }{
1424 {
1425 Request: &Request{c: &RESTClient{}, err: errors.New("bail")},
1426 Err: true,
1427 },
1428 {
1429 Request: &Request{c: &RESTClient{base: &url.URL{}}, pathPrefix: "%"},
1430 Err: true,
1431 },
1432 {
1433 Request: &Request{
1434 c: &RESTClient{
1435 Client: clientForFunc(func(req *http.Request) (*http.Response, error) {
1436 return nil, errors.New("err")
1437 }),
1438 base: &url.URL{},
1439 },
1440 },
1441 Err: true,
1442 },
1443 }
1444 for i, testCase := range testCases {
1445 testCase.Request.backoff = &NoBackoff{}
1446 testCase.Request.retryFn = defaultRequestRetryFn
1447 body, err := testCase.Request.Do(context.Background()).Raw()
1448 hasErr := err != nil
1449 if hasErr != testCase.Err {
1450 t.Errorf("%d: expected %t, got %t: %v", i, testCase.Err, hasErr, err)
1451 }
1452 if hasErr && body != nil {
1453 t.Errorf("%d: body should be nil when error is returned", i)
1454 }
1455 }
1456 }
1458 func TestDoRequestNewWay(t *testing.T) {
1459 reqBody := "request body"
1460 expectedObj := &v1.Service{Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{
1461 Protocol: "TCP",
1462 Port: 12345,
1463 TargetPort: intstr.FromInt32(12345),
1464 }}}}
1465 expectedBody, _ := runtime.Encode(scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), expectedObj)
1466 fakeHandler := utiltesting.FakeHandler{
1467 StatusCode: 200,
1468 ResponseBody: string(expectedBody),
1469 T: t,
1470 }
1471 testServer := httptest.NewServer(&fakeHandler)
1472 defer testServer.Close()
1473 c := testRESTClient(t, testServer)
1474 obj, err := c.Verb("POST").
1475 Prefix("foo", "bar").
1476 Suffix("baz").
1477 Timeout(time.Second).
1478 Body([]byte(reqBody)).
1479 Do(context.Background()).Get()
1480 if err != nil {
1481 t.Errorf("Unexpected error: %v %#v", err, err)
1482 return
1483 }
1484 if obj == nil {
1485 t.Error("nil obj")
1486 } else if !apiequality.Semantic.DeepDerivative(expectedObj, obj) {
1487 t.Errorf("Expected: %#v, got %#v", expectedObj, obj)
1488 }
1489 requestURL := defaultResourcePathWithPrefix("foo/bar", "", "", "baz")
1490 requestURL += "?timeout=1s"
1491 fakeHandler.ValidateRequest(t, requestURL, "POST", &reqBody)
1492 }
1495 func TestBackoffLifecycle(t *testing.T) {
1496 count := 0
1497 testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
1498 count++
1499 t.Logf("Attempt %d", count)
1500 if count == 5 || count == 9 {
1501 w.WriteHeader(http.StatusOK)
1502 return
1503 }
1504 w.WriteHeader(http.StatusGatewayTimeout)
1505 return
1506 }))
1507 defer testServer.Close()
1508 c := testRESTClient(t, testServer)
1512 seconds := []int{0, 1, 2, 4, 8, 0, 1, 2, 4, 0}
1513 request := c.Verb("POST").Prefix("backofftest").Suffix("abc")
1514 clock := testingclock.FakeClock{}
1515 request.backoff = &URLBackoff{
1517 Backoff: flowcontrol.NewFakeBackOff(
1518 time.Duration(1)*time.Second,
1519 time.Duration(200)*time.Second,
1520 &clock,
1521 )}
1523 for _, sec := range seconds {
1524 thisBackoff := request.backoff.CalculateBackoff(request.URL())
1525 t.Logf("Current backoff %v", thisBackoff)
1526 if thisBackoff != time.Duration(sec)*time.Second {
1527 t.Errorf("Backoff is %v instead of %v", thisBackoff, sec)
1528 }
1529 now := clock.Now()
1530 request.DoRaw(context.Background())
1531 elapsed := clock.Since(now)
1532 if clock.Since(now) != thisBackoff {
1533 t.Errorf("CalculatedBackoff not honored by clock: Expected time of %v, but got %v ", thisBackoff, elapsed)
1534 }
1535 }
1536 }
1538 type testBackoffManager struct {
1539 sleeps []time.Duration
1540 }
1542 func (b *testBackoffManager) UpdateBackoff(actualUrl *url.URL, err error, responseCode int) {
1543 }
1545 func (b *testBackoffManager) CalculateBackoff(actualUrl *url.URL) time.Duration {
1546 return time.Duration(0)
1547 }
1549 func (b *testBackoffManager) Sleep(d time.Duration) {
1550 b.sleeps = append(b.sleeps, d)
1551 }
1553 func TestCheckRetryClosesBody(t *testing.T) {
1554 count := 0
1555 ch := make(chan struct{})
1556 testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
1557 count++
1558 t.Logf("attempt %d", count)
1559 if count >= 5 {
1560 w.WriteHeader(http.StatusOK)
1561 close(ch)
1562 return
1563 }
1564 w.Header().Set("Retry-After", "1")
1565 http.Error(w, "Too many requests, please try again later.", http.StatusTooManyRequests)
1566 }))
1567 defer testServer.Close()
1569 backoff := &testBackoffManager{}
1572 expectedSleeps := []time.Duration{0, time.Second, time.Second, time.Second, time.Second}
1574 c := testRESTClient(t, testServer)
1575 c.createBackoffMgr = func() BackoffManager { return backoff }
1576 _, err := c.Verb("POST").
1577 Prefix("foo", "bar").
1578 Suffix("baz").
1579 Timeout(time.Second).
1580 Body([]byte(strings.Repeat("abcd", 1000))).
1581 DoRaw(context.Background())
1582 if err != nil {
1583 t.Fatalf("Unexpected error: %v %#v", err, err)
1584 }
1585 <-ch
1586 if count != 5 {
1587 t.Errorf("unexpected retries: %d", count)
1588 }
1589 if !reflect.DeepEqual(backoff.sleeps, expectedSleeps) {
1590 t.Errorf("unexpected sleeps, expected: %v, got: %v", expectedSleeps, backoff.sleeps)
1591 }
1592 }
1594 func TestConnectionResetByPeerIsRetried(t *testing.T) {
1595 count := 0
1596 backoff := &testBackoffManager{}
1597 req := &Request{
1598 verb: "GET",
1599 c: &RESTClient{
1600 Client: clientForFunc(func(req *http.Request) (*http.Response, error) {
1601 count++
1602 if count >= 3 {
1603 return &http.Response{
1604 StatusCode: http.StatusOK,
1605 Body: io.NopCloser(bytes.NewReader([]byte{})),
1606 }, nil
1607 }
1608 return nil, &net.OpError{Err: syscall.ECONNRESET}
1609 }),
1610 },
1611 backoff: backoff,
1612 maxRetries: 10,
1613 retryFn: defaultRequestRetryFn,
1614 }
1616 _, err := req.Do(context.Background()).Raw()
1617 if err != nil {
1618 t.Errorf("Unexpected error: %v", err)
1619 }
1620 if count != 3 {
1621 t.Errorf("Expected 3 attempts, got: %d", count)
1622 }
1624 if len(backoff.sleeps) != 3 {
1625 t.Errorf("Expected 3 backoff.Sleep, got: %d", len(backoff.sleeps))
1626 }
1627 }
1629 func TestCheckRetryHandles429And5xx(t *testing.T) {
1630 count := 0
1631 ch := make(chan struct{})
1632 testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
1633 data, err := io.ReadAll(req.Body)
1634 if err != nil {
1635 t.Fatalf("unable to read request body: %v", err)
1636 }
1637 if !bytes.Equal(data, []byte(strings.Repeat("abcd", 1000))) {
1638 t.Fatalf("retry did not send a complete body: %s", data)
1639 }
1640 t.Logf("attempt %d", count)
1641 if count >= 4 {
1642 w.WriteHeader(http.StatusOK)
1643 close(ch)
1644 return
1645 }
1646 w.Header().Set("Retry-After", "0")
1647 w.WriteHeader([]int{http.StatusTooManyRequests, 500, 501, 504}[count])
1648 count++
1649 }))
1650 defer testServer.Close()
1652 c := testRESTClient(t, testServer)
1653 _, err := c.Verb("POST").
1654 Prefix("foo", "bar").
1655 Suffix("baz").
1656 Timeout(time.Second).
1657 Body([]byte(strings.Repeat("abcd", 1000))).
1658 DoRaw(context.Background())
1659 if err != nil {
1660 t.Fatalf("Unexpected error: %v %#v", err, err)
1661 }
1662 <-ch
1663 if count != 4 {
1664 t.Errorf("unexpected retries: %d", count)
1665 }
1666 }
1668 func BenchmarkCheckRetryClosesBody(b *testing.B) {
1669 count := 0
1670 testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
1671 count++
1672 if count%3 == 0 {
1673 w.WriteHeader(http.StatusOK)
1674 return
1675 }
1676 w.Header().Set("Retry-After", "0")
1677 w.WriteHeader(http.StatusTooManyRequests)
1678 }))
1679 defer testServer.Close()
1681 c := testRESTClient(b, testServer)
1683 requests := make([]*Request, 0, b.N)
1684 for i := 0; i < b.N; i++ {
1685 requests = append(requests, c.Verb("POST").
1686 Prefix("foo", "bar").
1687 Suffix("baz").
1688 Timeout(time.Second).
1689 Body([]byte(strings.Repeat("abcd", 1000))))
1690 }
1692 b.ResetTimer()
1693 for i := 0; i < b.N; i++ {
1694 if _, err := requests[i].DoRaw(context.Background()); err != nil {
1695 b.Fatalf("Unexpected error (%d/%d): %v", i, b.N, err)
1696 }
1697 }
1698 }
1700 func TestDoRequestNewWayReader(t *testing.T) {
1701 reqObj := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
1702 reqBodyExpected, _ := runtime.Encode(scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), reqObj)
1703 expectedObj := &v1.Service{Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{
1704 Protocol: "TCP",
1705 Port: 12345,
1706 TargetPort: intstr.FromInt32(12345),
1707 }}}}
1708 expectedBody, _ := runtime.Encode(scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), expectedObj)
1709 fakeHandler := utiltesting.FakeHandler{
1710 StatusCode: 200,
1711 ResponseBody: string(expectedBody),
1712 T: t,
1713 }
1714 testServer := httptest.NewServer(&fakeHandler)
1715 defer testServer.Close()
1716 c := testRESTClient(t, testServer)
1717 obj, err := c.Verb("POST").
1718 Resource("bar").
1719 Name("baz").
1720 Prefix("foo").
1721 Timeout(time.Second).
1722 Body(bytes.NewBuffer(reqBodyExpected)).
1723 Do(context.Background()).Get()
1724 if err != nil {
1725 t.Errorf("Unexpected error: %v %#v", err, err)
1726 return
1727 }
1728 if obj == nil {
1729 t.Error("nil obj")
1730 } else if !apiequality.Semantic.DeepDerivative(expectedObj, obj) {
1731 t.Errorf("Expected: %#v, got %#v", expectedObj, obj)
1732 }
1733 tmpStr := string(reqBodyExpected)
1734 requestURL := defaultResourcePathWithPrefix("foo", "bar", "", "baz")
1735 requestURL += "?timeout=1s"
1736 fakeHandler.ValidateRequest(t, requestURL, "POST", &tmpStr)
1737 }
1739 func TestDoRequestNewWayObj(t *testing.T) {
1740 reqObj := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
1741 reqBodyExpected, _ := runtime.Encode(scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), reqObj)
1742 expectedObj := &v1.Service{Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{
1743 Protocol: "TCP",
1744 Port: 12345,
1745 TargetPort: intstr.FromInt32(12345),
1746 }}}}
1747 expectedBody, _ := runtime.Encode(scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), expectedObj)
1748 fakeHandler := utiltesting.FakeHandler{
1749 StatusCode: 200,
1750 ResponseBody: string(expectedBody),
1751 T: t,
1752 }
1753 testServer := httptest.NewServer(&fakeHandler)
1754 defer testServer.Close()
1755 c := testRESTClient(t, testServer)
1756 obj, err := c.Verb("POST").
1757 Suffix("baz").
1758 Name("bar").
1759 Resource("foo").
1760 Timeout(time.Second).
1761 Body(reqObj).
1762 Do(context.Background()).Get()
1763 if err != nil {
1764 t.Errorf("Unexpected error: %v %#v", err, err)
1765 return
1766 }
1767 if obj == nil {
1768 t.Error("nil obj")
1769 } else if !apiequality.Semantic.DeepDerivative(expectedObj, obj) {
1770 t.Errorf("Expected: %#v, got %#v", expectedObj, obj)
1771 }
1772 tmpStr := string(reqBodyExpected)
1773 requestURL := defaultResourcePathWithPrefix("", "foo", "", "bar/baz")
1774 requestURL += "?timeout=1s"
1775 fakeHandler.ValidateRequest(t, requestURL, "POST", &tmpStr)
1776 }
1778 func TestDoRequestNewWayFile(t *testing.T) {
1779 reqObj := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
1780 reqBodyExpected, err := runtime.Encode(scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), reqObj)
1781 if err != nil {
1782 t.Errorf("unexpected error: %v", err)
1783 }
1785 file, err := os.CreateTemp("", "foo")
1786 if err != nil {
1787 t.Errorf("unexpected error: %v", err)
1788 }
1789 defer file.Close()
1790 defer os.Remove(file.Name())
1792 _, err = file.Write(reqBodyExpected)
1793 if err != nil {
1794 t.Errorf("unexpected error: %v", err)
1795 }
1797 expectedObj := &v1.Service{Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{
1798 Protocol: "TCP",
1799 Port: 12345,
1800 TargetPort: intstr.FromInt32(12345),
1801 }}}}
1802 expectedBody, _ := runtime.Encode(scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), expectedObj)
1803 fakeHandler := utiltesting.FakeHandler{
1804 StatusCode: 200,
1805 ResponseBody: string(expectedBody),
1806 T: t,
1807 }
1808 testServer := httptest.NewServer(&fakeHandler)
1809 defer testServer.Close()
1810 c := testRESTClient(t, testServer)
1811 wasCreated := true
1812 obj, err := c.Verb("POST").
1813 Prefix("foo/bar", "baz").
1814 Timeout(time.Second).
1815 Body(file.Name()).
1816 Do(context.Background()).WasCreated(&wasCreated).Get()
1817 if err != nil {
1818 t.Errorf("Unexpected error: %v %#v", err, err)
1819 return
1820 }
1821 if obj == nil {
1822 t.Error("nil obj")
1823 } else if !apiequality.Semantic.DeepDerivative(expectedObj, obj) {
1824 t.Errorf("Expected: %#v, got %#v", expectedObj, obj)
1825 }
1826 if wasCreated {
1827 t.Errorf("expected object was created")
1828 }
1829 tmpStr := string(reqBodyExpected)
1830 requestURL := defaultResourcePathWithPrefix("foo/bar/baz", "", "", "")
1831 requestURL += "?timeout=1s"
1832 fakeHandler.ValidateRequest(t, requestURL, "POST", &tmpStr)
1833 }
1835 func TestWasCreated(t *testing.T) {
1836 reqObj := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
1837 reqBodyExpected, err := runtime.Encode(scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), reqObj)
1838 if err != nil {
1839 t.Errorf("unexpected error: %v", err)
1840 }
1842 expectedObj := &v1.Service{Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{
1843 Protocol: "TCP",
1844 Port: 12345,
1845 TargetPort: intstr.FromInt32(12345),
1846 }}}}
1847 expectedBody, _ := runtime.Encode(scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), expectedObj)
1848 fakeHandler := utiltesting.FakeHandler{
1849 StatusCode: 201,
1850 ResponseBody: string(expectedBody),
1851 T: t,
1852 }
1853 testServer := httptest.NewServer(&fakeHandler)
1854 defer testServer.Close()
1855 c := testRESTClient(t, testServer)
1856 wasCreated := false
1857 obj, err := c.Verb("PUT").
1858 Prefix("foo/bar", "baz").
1859 Timeout(time.Second).
1860 Body(reqBodyExpected).
1861 Do(context.Background()).WasCreated(&wasCreated).Get()
1862 if err != nil {
1863 t.Errorf("Unexpected error: %v %#v", err, err)
1864 return
1865 }
1866 if obj == nil {
1867 t.Error("nil obj")
1868 } else if !apiequality.Semantic.DeepDerivative(expectedObj, obj) {
1869 t.Errorf("Expected: %#v, got %#v", expectedObj, obj)
1870 }
1871 if !wasCreated {
1872 t.Errorf("Expected object was created")
1873 }
1875 tmpStr := string(reqBodyExpected)
1876 requestURL := defaultResourcePathWithPrefix("foo/bar/baz", "", "", "")
1877 requestURL += "?timeout=1s"
1878 fakeHandler.ValidateRequest(t, requestURL, "PUT", &tmpStr)
1879 }
1881 func TestVerbs(t *testing.T) {
1882 c := testRESTClient(t, nil)
1883 if r := c.Post(); r.verb != "POST" {
1884 t.Errorf("Post verb is wrong")
1885 }
1886 if r := c.Put(); r.verb != "PUT" {
1887 t.Errorf("Put verb is wrong")
1888 }
1889 if r := c.Get(); r.verb != "GET" {
1890 t.Errorf("Get verb is wrong")
1891 }
1892 if r := c.Delete(); r.verb != "DELETE" {
1893 t.Errorf("Delete verb is wrong")
1894 }
1895 }
1897 func TestAbsPath(t *testing.T) {
1898 for i, tc := range []struct {
1899 configPrefix string
1900 resourcePrefix string
1901 absPath string
1902 wantsAbsPath string
1903 }{
1904 {"/", "", "", "/"},
1905 {"", "", "/", "/"},
1906 {"", "", "/api", "/api"},
1907 {"", "", "/api/", "/api/"},
1908 {"", "", "/apis", "/apis"},
1909 {"", "/foo", "/bar/foo", "/bar/foo"},
1910 {"", "/api/foo/123", "/bar/foo", "/bar/foo"},
1911 {"/p1", "", "", "/p1"},
1912 {"/p1", "", "/", "/p1/"},
1913 {"/p1", "", "/api", "/p1/api"},
1914 {"/p1", "", "/apis", "/p1/apis"},
1915 {"/p1", "/r1", "/apis", "/p1/apis"},
1916 {"/p1", "/api/r1", "/apis", "/p1/apis"},
1917 {"/p1/api/p2", "", "", "/p1/api/p2"},
1918 {"/p1/api/p2", "", "/", "/p1/api/p2/"},
1919 {"/p1/api/p2", "", "/api", "/p1/api/p2/api"},
1920 {"/p1/api/p2", "", "/api/", "/p1/api/p2/api/"},
1921 {"/p1/api/p2", "/r1", "/api/", "/p1/api/p2/api/"},
1922 {"/p1/api/p2", "/api/r1", "/api/", "/p1/api/p2/api/"},
1923 } {
1924 u, _ := url.Parse("http://localhost:123" + tc.configPrefix)
1925 r := NewRequestWithClient(u, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("POST").Prefix(tc.resourcePrefix).AbsPath(tc.absPath)
1926 if r.pathPrefix != tc.wantsAbsPath {
1927 t.Errorf("test case %d failed, unexpected path: %q, expected %q", i, r.pathPrefix, tc.wantsAbsPath)
1928 }
1929 }
1930 }
1932 func TestUnacceptableParamNames(t *testing.T) {
1933 table := []struct {
1934 name string
1935 testVal string
1936 expectSuccess bool
1937 }{
1939 {"timeout", "42", true},
1940 }
1942 for _, item := range table {
1943 c := testRESTClient(t, nil)
1944 r := c.Get().setParam(item.name, item.testVal)
1945 if e, a := item.expectSuccess, r.err == nil; e != a {
1946 t.Errorf("expected %v, got %v (%v)", e, a, r.err)
1947 }
1948 }
1949 }
1951 func TestBody(t *testing.T) {
1952 const data = "test payload"
1954 obj := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
1955 bodyExpected, _ := runtime.Encode(scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), obj)
1957 f, err := os.CreateTemp("", "test_body")
1958 if err != nil {
1959 t.Fatalf("TempFile error: %v", err)
1960 }
1961 if _, err := f.WriteString(data); err != nil {
1962 t.Fatalf("TempFile.WriteString error: %v", err)
1963 }
1964 f.Close()
1965 defer os.Remove(f.Name())
1967 var nilObject *metav1.DeleteOptions
1968 typedObject := interface{}(nilObject)
1969 c := testRESTClient(t, nil)
1970 tests := []struct {
1971 input interface{}
1972 expected string
1973 headers map[string]string
1974 }{
1975 {[]byte(data), data, nil},
1976 {f.Name(), data, nil},
1977 {strings.NewReader(data), data, nil},
1978 {obj, string(bodyExpected), map[string]string{"Content-Type": "application/json"}},
1979 {typedObject, "", nil},
1980 }
1981 for i, tt := range tests {
1982 r := c.Post().Body(tt.input)
1983 if r.err != nil {
1984 t.Errorf("%d: r.Body(%#v) error: %v", i, tt, r.err)
1985 continue
1986 }
1987 if tt.headers != nil {
1988 for k, v := range tt.headers {
1989 if r.headers.Get(k) != v {
1990 t.Errorf("%d: r.headers[%q] = %q; want %q", i, k, v, v)
1991 }
1992 }
1993 }
1995 req, err := r.newHTTPRequest(context.Background())
1996 if err != nil {
1997 t.Fatal(err)
1998 }
1999 if req.Body == nil {
2000 if len(tt.expected) != 0 {
2001 t.Errorf("%d: req.Body = %q; want %q", i, req.Body, tt.expected)
2002 }
2003 continue
2004 }
2005 buf := make([]byte, len(tt.expected))
2006 if _, err := req.Body.Read(buf); err != nil {
2007 t.Errorf("%d: req.Body.Read error: %v", i, err)
2008 continue
2009 }
2010 body := string(buf)
2011 if body != tt.expected {
2012 t.Errorf("%d: req.Body = %q; want %q", i, body, tt.expected)
2013 }
2014 }
2015 }
2017 func TestWatch(t *testing.T) {
2018 tests := []struct {
2019 name string
2020 maxRetries int
2021 }{
2022 {
2023 name: "no retry",
2024 maxRetries: 0,
2025 },
2026 {
2027 name: "with retries",
2028 maxRetries: 3,
2029 },
2030 }
2032 for _, test := range tests {
2033 t.Run(test.name, func(t *testing.T) {
2034 var table = []struct {
2035 t watch.EventType
2036 obj runtime.Object
2037 }{
2038 {watch.Added, &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "first"}}},
2039 {watch.Modified, &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "second"}}},
2040 {watch.Deleted, &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "last"}}},
2041 }
2043 var attempts int
2044 testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
2045 defer func() {
2046 attempts++
2047 }()
2049 flusher, ok := w.(http.Flusher)
2050 if !ok {
2051 panic("need flusher!")
2052 }
2054 if attempts < test.maxRetries {
2055 w.Header().Set("Retry-After", "1")
2056 w.WriteHeader(http.StatusTooManyRequests)
2057 return
2058 }
2060 w.Header().Set("Transfer-Encoding", "chunked")
2061 w.WriteHeader(http.StatusOK)
2062 flusher.Flush()
2064 encoder := restclientwatch.NewEncoder(streaming.NewEncoder(w, scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion)), scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion))
2065 for _, item := range table {
2066 if err := encoder.Encode(&watch.Event{Type: item.t, Object: item.obj}); err != nil {
2067 panic(err)
2068 }
2069 flusher.Flush()
2070 }
2071 }))
2072 defer testServer.Close()
2074 s := testRESTClient(t, testServer)
2075 watching, err := s.Get().Prefix("path/to/watch/thing").
2076 MaxRetries(test.maxRetries).Watch(context.Background())
2077 if err != nil {
2078 t.Fatalf("Unexpected error: %v", err)
2079 }
2081 for _, item := range table {
2082 got, ok := <-watching.ResultChan()
2083 if !ok {
2084 t.Fatalf("Unexpected early close")
2085 }
2086 if e, a := item.t, got.Type; e != a {
2087 t.Errorf("Expected %v, got %v", e, a)
2088 }
2089 if e, a := item.obj, got.Object; !apiequality.Semantic.DeepDerivative(e, a) {
2090 t.Errorf("Expected %v, got %v", e, a)
2091 }
2092 }
2094 _, ok := <-watching.ResultChan()
2095 if ok {
2096 t.Fatal("Unexpected non-close")
2097 }
2098 })
2099 }
2100 }
2102 func TestWatchNonDefaultContentType(t *testing.T) {
2103 var table = []struct {
2104 t watch.EventType
2105 obj runtime.Object
2106 }{
2107 {watch.Added, &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "first"}}},
2108 {watch.Modified, &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "second"}}},
2109 {watch.Deleted, &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "last"}}},
2110 }
2112 testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
2113 flusher, ok := w.(http.Flusher)
2114 if !ok {
2115 panic("need flusher!")
2116 }
2118 w.Header().Set("Transfer-Encoding", "chunked")
2120 w.Header().Set("Content-Type", "application/json")
2121 w.WriteHeader(http.StatusOK)
2122 flusher.Flush()
2124 encoder := restclientwatch.NewEncoder(streaming.NewEncoder(w, scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion)), scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion))
2125 for _, item := range table {
2126 if err := encoder.Encode(&watch.Event{Type: item.t, Object: item.obj}); err != nil {
2127 panic(err)
2128 }
2129 flusher.Flush()
2130 }
2131 }))
2132 defer testServer.Close()
2135 contentConfig := defaultContentConfig()
2136 contentConfig.ContentType = "application/vnd.kubernetes.protobuf"
2137 s := testRESTClientWithConfig(t, testServer, contentConfig)
2138 watching, err := s.Get().Prefix("path/to/watch/thing").Watch(context.Background())
2139 if err != nil {
2140 t.Fatalf("Unexpected error")
2141 }
2143 for _, item := range table {
2144 got, ok := <-watching.ResultChan()
2145 if !ok {
2146 t.Fatalf("Unexpected early close")
2147 }
2148 if e, a := item.t, got.Type; e != a {
2149 t.Errorf("Expected %v, got %v", e, a)
2150 }
2151 if e, a := item.obj, got.Object; !apiequality.Semantic.DeepDerivative(e, a) {
2152 t.Errorf("Expected %v, got %v", e, a)
2153 }
2154 }
2156 _, ok := <-watching.ResultChan()
2157 if ok {
2158 t.Fatal("Unexpected non-close")
2159 }
2160 }
2162 func TestWatchUnknownContentType(t *testing.T) {
2163 var table = []struct {
2164 t watch.EventType
2165 obj runtime.Object
2166 }{
2167 {watch.Added, &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "first"}}},
2168 {watch.Modified, &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "second"}}},
2169 {watch.Deleted, &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "last"}}},
2170 }
2172 testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
2173 flusher, ok := w.(http.Flusher)
2174 if !ok {
2175 panic("need flusher!")
2176 }
2178 w.Header().Set("Transfer-Encoding", "chunked")
2180 w.Header().Set("Content-Type", "foobar")
2181 w.WriteHeader(http.StatusOK)
2182 flusher.Flush()
2184 encoder := restclientwatch.NewEncoder(streaming.NewEncoder(w, scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion)), scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion))
2185 for _, item := range table {
2186 if err := encoder.Encode(&watch.Event{Type: item.t, Object: item.obj}); err != nil {
2187 panic(err)
2188 }
2189 flusher.Flush()
2190 }
2191 }))
2192 defer testServer.Close()
2194 s := testRESTClient(t, testServer)
2195 _, err := s.Get().Prefix("path/to/watch/thing").Watch(context.Background())
2196 if err == nil {
2197 t.Fatalf("Expected to fail due to lack of known stream serialization for content type")
2198 }
2199 }
2201 func TestStream(t *testing.T) {
2202 expectedBody := "expected body"
2204 testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
2205 flusher, ok := w.(http.Flusher)
2206 if !ok {
2207 panic("need flusher!")
2208 }
2209 w.Header().Set("Transfer-Encoding", "chunked")
2210 w.WriteHeader(http.StatusOK)
2211 w.Write([]byte(expectedBody))
2212 flusher.Flush()
2213 }))
2214 defer testServer.Close()
2216 s := testRESTClient(t, testServer)
2217 readCloser, err := s.Get().Prefix("path/to/stream/thing").Stream(context.Background())
2218 if err != nil {
2219 t.Fatalf("unexpected error: %v", err)
2220 }
2221 defer readCloser.Close()
2222 buf := new(bytes.Buffer)
2223 buf.ReadFrom(readCloser)
2224 resultBody := buf.String()
2226 if expectedBody != resultBody {
2227 t.Errorf("Expected %s, got %s", expectedBody, resultBody)
2228 }
2229 }
2231 func testRESTClientWithConfig(t testing.TB, srv *httptest.Server, contentConfig ClientContentConfig) *RESTClient {
2232 base, _ := url.Parse("http://localhost")
2233 var c *http.Client
2234 if srv != nil {
2235 var err error
2236 base, err = url.Parse(srv.URL)
2237 if err != nil {
2238 t.Fatalf("failed to parse test URL: %v", err)
2239 }
2240 c = srv.Client()
2241 }
2242 versionedAPIPath := defaultResourcePathWithPrefix("", "", "", "")
2243 client, err := NewRESTClient(base, versionedAPIPath, contentConfig, nil, c)
2244 if err != nil {
2245 t.Fatalf("failed to create a client: %v", err)
2246 }
2247 return client
2249 }
2251 func testRESTClient(t testing.TB, srv *httptest.Server) *RESTClient {
2252 contentConfig := defaultContentConfig()
2253 return testRESTClientWithConfig(t, srv, contentConfig)
2254 }
2256 func TestDoContext(t *testing.T) {
2257 receivedCh := make(chan struct{})
2258 block := make(chan struct{})
2259 testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
2260 close(receivedCh)
2261 <-block
2262 w.WriteHeader(http.StatusOK)
2263 }))
2264 defer testServer.Close()
2265 defer close(block)
2267 ctx, cancel := context.WithCancel(context.Background())
2268 defer cancel()
2270 go func() {
2271 <-receivedCh
2272 cancel()
2273 }()
2275 c := testRESTClient(t, testServer)
2276 _, err := c.Verb("GET").
2277 Prefix("foo").
2278 DoRaw(ctx)
2279 if err == nil {
2280 t.Fatal("Expected context cancellation error")
2281 }
2282 }
2284 func buildString(length int) string {
2285 s := make([]byte, length)
2286 for i := range s {
2287 s[i] = 'a'
2288 }
2289 return string(s)
2290 }
2292 func init() {
2293 klog.InitFlags(nil)
2294 }
2296 func TestTruncateBody(t *testing.T) {
2297 tests := []struct {
2298 body string
2299 want string
2300 level string
2301 }{
2303 {
2304 body: "Completely truncated below 8",
2305 want: " [truncated 28 chars]",
2306 level: "0",
2307 },
2309 {
2310 body: "Small body never gets truncated",
2311 want: "Small body never gets truncated",
2312 level: "10",
2313 },
2314 {
2315 body: "Small body never gets truncated",
2316 want: "Small body never gets truncated",
2317 level: "8",
2318 },
2320 {
2321 body: buildString(2000),
2322 level: "8",
2323 want: fmt.Sprintf("%s [truncated 976 chars]", buildString(1024)),
2324 },
2326 {
2327 body: buildString(20000),
2328 level: "9",
2329 want: fmt.Sprintf("%s [truncated 9760 chars]", buildString(10240)),
2330 },
2332 {
2333 body: buildString(20000),
2334 level: "10",
2335 want: buildString(20000),
2336 },
2338 {
2339 body: buildString(20000),
2340 level: "11",
2341 want: buildString(20000),
2342 },
2343 }
2345 l := flag.Lookup("v").Value.(flag.Getter).Get().(klog.Level)
2346 for _, test := range tests {
2347 flag.Set("v", test.level)
2348 got := truncateBody(test.body)
2349 if got != test.want {
2350 t.Errorf("truncateBody(%v) = %v, want %v", test.body, got, test.want)
2351 }
2352 }
2353 flag.Set("v", l.String())
2354 }
2356 func defaultResourcePathWithPrefix(prefix, resource, namespace, name string) string {
2357 var path string
2358 path = "/api/" + v1.SchemeGroupVersion.Version
2360 if prefix != "" {
2361 path = path + "/" + prefix
2362 }
2363 if namespace != "" {
2364 path = path + "/namespaces/" + namespace
2365 }
2367 resource = strings.ToLower(resource)
2368 if resource != "" {
2369 path = path + "/" + resource
2370 }
2371 if name != "" {
2372 path = path + "/" + name
2373 }
2374 return path
2375 }
2377 func TestRequestPreflightCheck(t *testing.T) {
2378 for _, tt := range []struct {
2379 name string
2380 verbs []string
2381 namespace string
2382 resourceName string
2383 namespaceSet bool
2384 expectsErr bool
2385 }{
2386 {
2387 name: "no namespace set",
2388 verbs: []string{"GET", "PUT", "DELETE", "POST"},
2389 namespaceSet: false,
2390 expectsErr: false,
2391 },
2392 {
2393 name: "empty resource name and namespace",
2394 verbs: []string{"GET", "PUT", "DELETE"},
2395 namespaceSet: true,
2396 expectsErr: false,
2397 },
2398 {
2399 name: "resource name with empty namespace",
2400 verbs: []string{"GET", "PUT", "DELETE"},
2401 namespaceSet: true,
2402 resourceName: "ResourceName",
2403 expectsErr: true,
2404 },
2405 {
2406 name: "post empty resource name and namespace",
2407 verbs: []string{"POST"},
2408 namespaceSet: true,
2409 expectsErr: true,
2410 },
2411 {
2412 name: "working requests",
2413 verbs: []string{"GET", "PUT", "DELETE", "POST"},
2414 namespaceSet: true,
2415 resourceName: "ResourceName",
2416 namespace: "Namespace",
2417 expectsErr: false,
2418 },
2419 } {
2420 t.Run(tt.name, func(t *testing.T) {
2421 for _, verb := range tt.verbs {
2422 r := &Request{
2423 verb: verb,
2424 namespace: tt.namespace,
2425 resourceName: tt.resourceName,
2426 namespaceSet: tt.namespaceSet,
2427 }
2429 err := r.requestPreflightCheck()
2430 hasErr := err != nil
2431 if hasErr == tt.expectsErr {
2432 return
2433 }
2434 t.Errorf("%s: expects error: %v, has error: %v", verb, tt.expectsErr, hasErr)
2435 }
2436 })
2437 }
2438 }
2440 func TestThrottledLogger(t *testing.T) {
2441 now := time.Now()
2442 oldClock := globalThrottledLogger.clock
2443 defer func() {
2444 globalThrottledLogger.clock = oldClock
2445 }()
2446 clock := testingclock.NewFakeClock(now)
2447 globalThrottledLogger.clock = clock
2449 logMessages := 0
2450 for i := 0; i < 1000; i++ {
2451 var wg sync.WaitGroup
2452 wg.Add(10)
2453 for j := 0; j < 10; j++ {
2454 go func() {
2455 if _, ok := globalThrottledLogger.attemptToLog(); ok {
2456 logMessages++
2457 }
2458 wg.Done()
2459 }()
2460 }
2461 wg.Wait()
2462 now = now.Add(1 * time.Second)
2463 clock.SetTime(now)
2464 }
2466 if a, e := logMessages, 100; a != e {
2467 t.Fatalf("expected %v log messages, but got %v", e, a)
2468 }
2469 }
2471 func TestRequestMaxRetries(t *testing.T) {
2472 successAtNthCalls := 1
2473 actualCalls := 0
2474 retryOneTimeHandler := func(w http.ResponseWriter, req *http.Request) {
2475 defer func() { actualCalls++ }()
2476 if actualCalls >= successAtNthCalls {
2477 w.WriteHeader(http.StatusOK)
2478 return
2479 }
2480 w.Header().Set("Retry-After", "1")
2481 w.WriteHeader(http.StatusTooManyRequests)
2482 actualCalls++
2483 }
2484 testServer := httptest.NewServer(http.HandlerFunc(retryOneTimeHandler))
2485 defer testServer.Close()
2487 u, err := url.Parse(testServer.URL)
2488 if err != nil {
2489 t.Error(err)
2490 }
2492 testCases := []struct {
2493 name string
2494 maxRetries int
2495 expectError bool
2496 }{
2497 {
2498 name: "no retrying should fail",
2499 maxRetries: 0,
2500 expectError: true,
2501 },
2502 {
2503 name: "1 max-retry should exactly work",
2504 maxRetries: 1,
2505 expectError: false,
2506 },
2507 {
2508 name: "5 max-retry should work",
2509 maxRetries: 5,
2510 expectError: false,
2511 },
2512 }
2514 for _, testCase := range testCases {
2515 t.Run(testCase.name, func(t *testing.T) {
2516 defer func() { actualCalls = 0 }()
2517 _, err := NewRequestWithClient(u, "", defaultContentConfig(), testServer.Client()).
2518 Verb("get").
2519 MaxRetries(testCase.maxRetries).
2520 AbsPath("/foo").
2521 DoRaw(context.TODO())
2522 hasError := err != nil
2523 if testCase.expectError != hasError {
2524 t.Error(" failed checking error")
2525 }
2526 })
2527 }
2528 }
2530 type responseErr struct {
2531 response *http.Response
2532 err error
2533 }
2535 type seek struct {
2536 offset int64
2537 whence int
2538 }
2540 type count struct {
2542 seeks []seek
2545 lock sync.Mutex
2546 closes int
2547 }
2549 func (c *count) close() {
2550 c.lock.Lock()
2551 defer c.lock.Unlock()
2552 c.closes++
2553 }
2554 func (c *count) getCloseCount() int {
2555 c.lock.Lock()
2556 defer c.lock.Unlock()
2557 return c.closes
2558 }
2561 type readTracker struct {
2562 delegated io.Reader
2563 count *count
2564 }
2566 func (r *readTracker) Seek(offset int64, whence int) (int64, error) {
2567 if seeker, ok := r.delegated.(io.Seeker); ok {
2568 r.count.seeks = append(r.count.seeks, seek{offset: offset, whence: whence})
2569 return seeker.Seek(offset, whence)
2570 }
2571 return 0, io.EOF
2572 }
2574 func (r *readTracker) Read(p []byte) (n int, err error) {
2575 return r.delegated.Read(p)
2576 }
2578 func (r *readTracker) Close() error {
2579 if closer, ok := r.delegated.(io.Closer); ok {
2580 r.count.close()
2581 return closer.Close()
2582 }
2583 return nil
2584 }
2586 func newReadTracker(count *count) *readTracker {
2587 return &readTracker{
2588 count: count,
2589 }
2590 }
2592 func newCount() *count {
2593 return &count{
2594 closes: 0,
2595 seeks: make([]seek, 0),
2596 }
2597 }
2599 type readSeeker struct{ err error }
2601 func (rs readSeeker) Read([]byte) (int, error) { return 0, rs.err }
2602 func (rs readSeeker) Seek(int64, int) (int64, error) { return 0, rs.err }
2604 func unWrap(err error) error {
2605 if uerr, ok := err.(*url.Error); ok {
2606 return uerr.Err
2607 }
2608 return err
2609 }
2613 type noSleepBackOff struct {
2614 *NoBackoff
2615 }
2617 func (n *noSleepBackOff) Sleep(d time.Duration) {}
2619 func TestRequestWithRetry(t *testing.T) {
2620 tests := []struct {
2621 name string
2622 body io.Reader
2623 bodyBytes []byte
2624 serverReturns responseErr
2625 errExpected error
2626 errContains string
2627 transformFuncInvokedExpected int
2628 roundTripInvokedExpected int
2629 }{
2630 {
2631 name: "server returns retry-after response, no request body, retry goes ahead",
2632 bodyBytes: nil,
2633 serverReturns: responseErr{response: retryAfterResponse(), err: nil},
2634 errExpected: nil,
2635 transformFuncInvokedExpected: 1,
2636 roundTripInvokedExpected: 2,
2637 },
2638 {
2639 name: "server returns retry-after response, bytes request body, retry goes ahead",
2640 bodyBytes: []byte{},
2641 serverReturns: responseErr{response: retryAfterResponse(), err: nil},
2642 errExpected: nil,
2643 transformFuncInvokedExpected: 1,
2644 roundTripInvokedExpected: 2,
2645 },
2646 {
2647 name: "server returns retry-after response, opaque request body, retry aborted",
2648 body: &readSeeker{},
2649 serverReturns: responseErr{response: retryAfterResponse(), err: nil},
2650 errExpected: nil,
2651 transformFuncInvokedExpected: 1,
2652 roundTripInvokedExpected: 1,
2653 },
2654 {
2655 name: "server returns retryable err, no request body, retry goes ahead",
2656 bodyBytes: nil,
2657 serverReturns: responseErr{response: nil, err: io.ErrUnexpectedEOF},
2658 errExpected: io.ErrUnexpectedEOF,
2659 transformFuncInvokedExpected: 0,
2660 roundTripInvokedExpected: 2,
2661 },
2662 {
2663 name: "server returns retryable err, bytes request body, retry goes ahead",
2664 bodyBytes: []byte{},
2665 serverReturns: responseErr{response: nil, err: io.ErrUnexpectedEOF},
2666 errExpected: io.ErrUnexpectedEOF,
2667 transformFuncInvokedExpected: 0,
2668 roundTripInvokedExpected: 2,
2669 },
2670 {
2671 name: "server returns retryable err, opaque request body, retry aborted",
2672 body: &readSeeker{},
2673 serverReturns: responseErr{response: nil, err: io.ErrUnexpectedEOF},
2674 errExpected: io.ErrUnexpectedEOF,
2675 transformFuncInvokedExpected: 0,
2676 roundTripInvokedExpected: 1,
2677 },
2678 }
2680 for _, test := range tests {
2681 t.Run(test.name, func(t *testing.T) {
2682 var roundTripInvoked int
2683 client := clientForFunc(func(req *http.Request) (*http.Response, error) {
2684 roundTripInvoked++
2685 return test.serverReturns.response, test.serverReturns.err
2686 })
2688 req := &Request{
2689 verb: "GET",
2690 body: test.body,
2691 c: &RESTClient{
2692 Client: client,
2693 },
2694 backoff: &noSleepBackOff{},
2695 maxRetries: 1,
2696 retryFn: defaultRequestRetryFn,
2697 }
2699 var transformFuncInvoked int
2700 err := req.request(context.Background(), func(request *http.Request, response *http.Response) {
2701 transformFuncInvoked++
2702 })
2704 if test.roundTripInvokedExpected != roundTripInvoked {
2705 t.Errorf("Expected RoundTrip to be invoked %d times, but got: %d", test.roundTripInvokedExpected, roundTripInvoked)
2706 }
2707 if test.transformFuncInvokedExpected != transformFuncInvoked {
2708 t.Errorf("Expected transform func to be invoked %d times, but got: %d", test.transformFuncInvokedExpected, transformFuncInvoked)
2709 }
2710 switch {
2711 case test.errExpected != nil:
2712 if test.errExpected != unWrap(err) {
2713 t.Errorf("Expected error: %v, but got: %v", test.errExpected, unWrap(err))
2714 }
2715 case len(test.errContains) > 0:
2716 if !strings.Contains(err.Error(), test.errContains) {
2717 t.Errorf("Expected error message to caontain: %q, but got: %q", test.errContains, err.Error())
2718 }
2719 }
2720 })
2721 }
2722 }
2724 func TestRequestDoWithRetry(t *testing.T) {
2725 testRequestWithRetry(t, "Do", func(ctx context.Context, r *Request) {
2726 r.Do(ctx)
2727 })
2728 }
2730 func TestRequestDoRawWithRetry(t *testing.T) {
2732 testRequestWithRetry(t, "Do", func(ctx context.Context, r *Request) {
2733 r.DoRaw(ctx)
2734 })
2735 }
2737 func TestRequestStreamWithRetry(t *testing.T) {
2738 testRequestWithRetry(t, "Stream", func(ctx context.Context, r *Request) {
2739 r.Stream(ctx)
2740 })
2741 }
2743 func TestRequestWatchWithRetry(t *testing.T) {
2744 testRequestWithRetry(t, "Watch", func(ctx context.Context, r *Request) {
2745 w, err := r.Watch(ctx)
2746 if err == nil {
2753 <-w.ResultChan()
2754 }
2755 })
2756 }
2758 func TestRequestDoRetryWithRateLimiterBackoffAndMetrics(t *testing.T) {
2760 testRetryWithRateLimiterBackoffAndMetrics(t, "Do", func(ctx context.Context, r *Request) {
2761 r.DoRaw(ctx)
2762 })
2763 }
2765 func TestRequestStreamRetryWithRateLimiterBackoffAndMetrics(t *testing.T) {
2766 testRetryWithRateLimiterBackoffAndMetrics(t, "Stream", func(ctx context.Context, r *Request) {
2767 r.Stream(ctx)
2768 })
2769 }
2771 func TestRequestWatchRetryWithRateLimiterBackoffAndMetrics(t *testing.T) {
2772 testRetryWithRateLimiterBackoffAndMetrics(t, "Watch", func(ctx context.Context, r *Request) {
2773 w, err := r.Watch(ctx)
2774 if err == nil {
2781 <-w.ResultChan()
2782 }
2783 })
2784 }
2786 func TestRequestDoWithRetryInvokeOrder(t *testing.T) {
2788 testWithRetryInvokeOrder(t, "Do", func(ctx context.Context, r *Request) {
2789 r.DoRaw(ctx)
2790 })
2791 }
2793 func TestRequestStreamWithRetryInvokeOrder(t *testing.T) {
2794 testWithRetryInvokeOrder(t, "Stream", func(ctx context.Context, r *Request) {
2795 r.Stream(ctx)
2796 })
2797 }
2799 func TestRequestWatchWithRetryInvokeOrder(t *testing.T) {
2800 testWithRetryInvokeOrder(t, "Watch", func(ctx context.Context, r *Request) {
2801 w, err := r.Watch(ctx)
2802 if err == nil {
2809 <-w.ResultChan()
2810 }
2811 })
2812 }
2814 func TestRequestWatchWithWrapPreviousError(t *testing.T) {
2815 testWithWrapPreviousError(t, func(ctx context.Context, r *Request) error {
2816 w, err := r.Watch(ctx)
2817 if err == nil {
2824 <-w.ResultChan()
2825 }
2826 return err
2827 })
2828 }
2830 func TestRequestDoWithWrapPreviousError(t *testing.T) {
2832 testWithWrapPreviousError(t, func(ctx context.Context, r *Request) error {
2833 result := r.Do(ctx)
2834 return result.err
2835 })
2836 }
2838 func testRequestWithRetry(t *testing.T, key string, doFunc func(ctx context.Context, r *Request)) {
2839 type expected struct {
2840 attempts int
2841 reqCount *count
2842 respCount *count
2843 }
2845 tests := []struct {
2846 name string
2847 verb string
2848 body io.Reader
2849 bodyBytes []byte
2850 maxRetries int
2851 serverReturns []responseErr
2854 expectations map[string]expected
2855 }{
2856 {
2857 name: "server always returns retry-after response",
2858 verb: "GET",
2859 bodyBytes: []byte{},
2860 maxRetries: 2,
2861 serverReturns: []responseErr{
2862 {response: retryAfterResponse(), err: nil},
2863 {response: retryAfterResponse(), err: nil},
2864 {response: retryAfterResponse(), err: nil},
2865 },
2866 expectations: map[string]expected{
2867 "Do": {
2868 attempts: 3,
2869 reqCount: &count{closes: 0, seeks: make([]seek, 2)},
2870 respCount: &count{closes: 3, seeks: []seek{}},
2871 },
2872 "Watch": {
2873 attempts: 3,
2874 reqCount: &count{closes: 0, seeks: make([]seek, 2)},
2875 respCount: &count{closes: 3, seeks: []seek{}},
2876 },
2877 "Stream": {
2878 attempts: 3,
2879 reqCount: &count{closes: 0, seeks: make([]seek, 2)},
2880 respCount: &count{closes: 3, seeks: []seek{}},
2881 },
2882 },
2883 },
2884 {
2885 name: "server always returns retryable error",
2886 verb: "GET",
2887 bodyBytes: []byte{},
2888 maxRetries: 2,
2889 serverReturns: []responseErr{
2890 {response: nil, err: io.EOF},
2891 {response: nil, err: io.EOF},
2892 {response: nil, err: io.EOF},
2893 },
2894 expectations: map[string]expected{
2895 "Do": {
2896 attempts: 3,
2897 reqCount: &count{closes: 0, seeks: make([]seek, 2)},
2898 respCount: &count{closes: 0, seeks: []seek{}},
2899 },
2900 "Watch": {
2901 attempts: 3,
2902 reqCount: &count{closes: 0, seeks: make([]seek, 2)},
2903 respCount: &count{closes: 0, seeks: []seek{}},
2904 },
2906 "Stream": {
2907 attempts: 1,
2908 reqCount: &count{closes: 0, seeks: []seek{}},
2909 respCount: &count{closes: 0, seeks: []seek{}},
2910 },
2911 },
2912 },
2913 {
2914 name: "server returns success on the final retry",
2915 verb: "GET",
2916 bodyBytes: []byte{},
2917 maxRetries: 2,
2918 serverReturns: []responseErr{
2919 {response: retryAfterResponse(), err: nil},
2920 {response: nil, err: io.EOF},
2921 {response: &http.Response{StatusCode: http.StatusOK}, err: nil},
2922 },
2923 expectations: map[string]expected{
2924 "Do": {
2925 attempts: 3,
2926 reqCount: &count{closes: 0, seeks: make([]seek, 2)},
2927 respCount: &count{closes: 2, seeks: []seek{}},
2928 },
2929 "Watch": {
2930 attempts: 3,
2931 reqCount: &count{closes: 0, seeks: make([]seek, 2)},
2934 respCount: &count{closes: 2, seeks: []seek{}},
2935 },
2936 "Stream": {
2937 attempts: 2,
2938 reqCount: &count{closes: 0, seeks: make([]seek, 1)},
2939 respCount: &count{closes: 1, seeks: []seek{}},
2940 },
2941 },
2942 },
2943 }
2945 for _, test := range tests {
2946 t.Run(test.name, func(t *testing.T) {
2947 respCountGot := newCount()
2948 responseRecorder := newReadTracker(respCountGot)
2949 var attempts int
2950 client := clientForFunc(func(req *http.Request) (*http.Response, error) {
2951 defer func() {
2952 attempts++
2953 }()
2955 resp := test.serverReturns[attempts].response
2956 if resp != nil {
2957 responseRecorder.delegated = io.NopCloser(bytes.NewReader([]byte{}))
2958 resp.Body = responseRecorder
2959 }
2960 return resp, test.serverReturns[attempts].err
2961 })
2963 req := &Request{
2964 verb: test.verb,
2965 body: test.body,
2966 bodyBytes: test.bodyBytes,
2967 c: &RESTClient{
2968 content: defaultContentConfig(),
2969 Client: client,
2970 },
2971 backoff: &noSleepBackOff{},
2972 maxRetries: test.maxRetries,
2973 retryFn: defaultRequestRetryFn,
2974 }
2976 doFunc(context.Background(), req)
2978 expected, ok := test.expectations[key]
2979 if !ok {
2980 t.Fatalf("Wrong test setup - did not find expected for: %s", key)
2981 }
2982 if expected.attempts != attempts {
2983 t.Errorf("Expected retries: %d, but got: %d", expected.attempts, attempts)
2984 }
2986 if expected.respCount.closes != respCountGot.getCloseCount() {
2987 t.Errorf("Expected response body Close to be invoked %d times, but got: %d", expected.respCount.closes, respCountGot.getCloseCount())
2988 }
2989 })
2990 }
2991 }
2993 type retryTestKeyType int
2995 const retryTestKey retryTestKeyType = iota
3002 type withRateLimiterBackoffManagerAndMetrics struct {
3003 flowcontrol.RateLimiter
3004 *NoBackoff
3005 metrics.ResultMetric
3006 calculateBackoffSeq int64
3007 calculateBackoffFn func(i int64) time.Duration
3008 metrics.RetryMetric
3010 invokeOrderGot []string
3011 sleepsGot []string
3012 statusCodesGot []string
3013 }
3015 func (lb *withRateLimiterBackoffManagerAndMetrics) Wait(ctx context.Context) error {
3016 lb.invokeOrderGot = append(lb.invokeOrderGot, "RateLimiter.Wait")
3017 return nil
3018 }
3020 func (lb *withRateLimiterBackoffManagerAndMetrics) CalculateBackoff(actualUrl *url.URL) time.Duration {
3021 lb.invokeOrderGot = append(lb.invokeOrderGot, "BackoffManager.CalculateBackoff")
3023 waitFor := lb.calculateBackoffFn(lb.calculateBackoffSeq)
3024 lb.calculateBackoffSeq++
3025 return waitFor
3026 }
3028 func (lb *withRateLimiterBackoffManagerAndMetrics) UpdateBackoff(actualUrl *url.URL, err error, responseCode int) {
3029 lb.invokeOrderGot = append(lb.invokeOrderGot, "BackoffManager.UpdateBackoff")
3030 }
3032 func (lb *withRateLimiterBackoffManagerAndMetrics) Sleep(d time.Duration) {
3033 lb.invokeOrderGot = append(lb.invokeOrderGot, "BackoffManager.Sleep")
3034 lb.sleepsGot = append(lb.sleepsGot, d.String())
3035 }
3037 func (lb *withRateLimiterBackoffManagerAndMetrics) Increment(ctx context.Context, code, _, _ string) {
3039 if marked, ok := ctx.Value(retryTestKey).(bool); ok && marked {
3040 lb.invokeOrderGot = append(lb.invokeOrderGot, "RequestResult.Increment")
3041 lb.statusCodesGot = append(lb.statusCodesGot, code)
3042 }
3043 }
3045 func (lb *withRateLimiterBackoffManagerAndMetrics) IncrementRetry(ctx context.Context, code, _, _ string) {
3047 if marked, ok := ctx.Value(retryTestKey).(bool); ok && marked {
3048 lb.invokeOrderGot = append(lb.invokeOrderGot, "RequestRetry.IncrementRetry")
3049 lb.statusCodesGot = append(lb.statusCodesGot, code)
3050 }
3051 }
3053 func (lb *withRateLimiterBackoffManagerAndMetrics) Do() {
3054 lb.invokeOrderGot = append(lb.invokeOrderGot, "Client.Do")
3055 }
3057 func testRetryWithRateLimiterBackoffAndMetrics(t *testing.T, key string, doFunc func(ctx context.Context, r *Request)) {
3058 type expected struct {
3059 attempts int
3060 order []string
3061 sleeps []string
3062 statusCodes []string
3063 }
3071 invokeOrderWant := []string{
3075 "RateLimiter.Wait",
3076 "BackoffManager.CalculateBackoff",
3077 "BackoffManager.Sleep",
3081 "Client.Do",
3086 "RequestResult.Increment",
3087 "BackoffManager.UpdateBackoff",
3088 "BackoffManager.CalculateBackoff",
3090 "BackoffManager.Sleep",
3092 "RateLimiter.Wait",
3095 "Client.Do",
3100 "RequestResult.Increment",
3101 "RequestRetry.IncrementRetry",
3102 "BackoffManager.UpdateBackoff",
3103 }
3104 statusCodesWant := []string{
3106 "500",
3108 "200", "200",
3109 }
3111 tests := []struct {
3112 name string
3113 maxRetries int
3114 serverReturns []responseErr
3115 calculateBackoffFn func(i int64) time.Duration
3117 expectations map[string]expected
3118 }{
3119 {
3120 name: "success after one retry, Retry-After: N > BackoffManager.CalculateBackoff",
3121 maxRetries: 1,
3122 serverReturns: []responseErr{
3123 {response: retryAfterResponseWithDelay("5"), err: nil},
3124 {response: &http.Response{StatusCode: http.StatusOK}, err: nil},
3125 },
3127 calculateBackoffFn: func(i int64) time.Duration { return time.Duration(i * int64(time.Second)) },
3128 expectations: map[string]expected{
3129 "Do": {
3130 attempts: 2,
3131 order: invokeOrderWant,
3132 statusCodes: statusCodesWant,
3133 sleeps: []string{
3135 "0s",
3139 (5 * time.Second).String(),
3140 },
3141 },
3142 "Watch": {
3143 attempts: 2,
3145 order: invokeOrderWant[1:],
3146 statusCodes: statusCodesWant,
3147 sleeps: []string{
3148 "0s",
3149 (5 * time.Second).String(),
3150 },
3151 },
3152 "Stream": {
3153 attempts: 2,
3154 order: invokeOrderWant,
3155 statusCodes: statusCodesWant,
3156 sleeps: []string{
3157 "0s",
3158 (5 * time.Second).String(),
3159 },
3160 },
3161 },
3162 },
3163 {
3164 name: "success after one retry, Retry-After: N < BackoffManager.CalculateBackoff",
3165 maxRetries: 1,
3166 serverReturns: []responseErr{
3167 {response: retryAfterResponseWithDelay("2"), err: nil},
3168 {response: &http.Response{StatusCode: http.StatusOK}, err: nil},
3169 },
3171 calculateBackoffFn: func(i int64) time.Duration { return time.Duration(i * int64(4*time.Second)) },
3172 expectations: map[string]expected{
3173 "Do": {
3174 attempts: 2,
3175 order: invokeOrderWant,
3176 statusCodes: statusCodesWant,
3177 sleeps: []string{
3179 "0s",
3183 (4 * time.Second).String(),
3184 },
3185 },
3186 "Watch": {
3187 attempts: 2,
3189 order: invokeOrderWant[1:],
3190 statusCodes: statusCodesWant,
3191 sleeps: []string{
3192 "0s",
3193 (4 * time.Second).String(),
3194 },
3195 },
3196 "Stream": {
3197 attempts: 2,
3198 order: invokeOrderWant,
3199 statusCodes: statusCodesWant,
3200 sleeps: []string{
3201 "0s",
3202 (4 * time.Second).String(),
3203 },
3204 },
3205 },
3206 },
3207 }
3209 for _, test := range tests {
3210 t.Run(test.name, func(t *testing.T) {
3211 interceptor := &withRateLimiterBackoffManagerAndMetrics{
3212 RateLimiter: flowcontrol.NewFakeAlwaysRateLimiter(),
3213 NoBackoff: &NoBackoff{},
3214 calculateBackoffFn: test.calculateBackoffFn,
3215 }
3222 oldRequestResult := metrics.RequestResult
3223 oldRequestRetry := metrics.RequestRetry
3224 metrics.RequestResult = interceptor
3225 metrics.RequestRetry = interceptor
3226 defer func() {
3227 metrics.RequestResult = oldRequestResult
3228 metrics.RequestRetry = oldRequestRetry
3229 }()
3231 ctx, cancel := context.WithCancel(context.Background())
3232 defer cancel()
3237 ctx = context.WithValue(ctx, retryTestKey, true)
3239 var attempts int
3240 client := clientForFunc(func(req *http.Request) (*http.Response, error) {
3241 defer func() {
3242 attempts++
3243 }()
3245 interceptor.Do()
3246 resp := test.serverReturns[attempts].response
3247 if resp != nil {
3248 resp.Body = io.NopCloser(bytes.NewReader([]byte{}))
3249 }
3250 return resp, test.serverReturns[attempts].err
3251 })
3253 base, err := url.Parse("http://foo.bar")
3254 if err != nil {
3255 t.Fatalf("Wrong test setup - did not find expected for: %s", key)
3256 }
3257 req := &Request{
3258 verb: "GET",
3259 bodyBytes: []byte{},
3260 c: &RESTClient{
3261 base: base,
3262 content: defaultContentConfig(),
3263 Client: client,
3264 rateLimiter: interceptor,
3265 },
3266 pathPrefix: "/api/v1",
3267 rateLimiter: interceptor,
3268 backoff: interceptor,
3269 maxRetries: test.maxRetries,
3270 retryFn: defaultRequestRetryFn,
3271 }
3273 doFunc(ctx, req)
3275 want, ok := test.expectations[key]
3276 if !ok {
3277 t.Fatalf("Wrong test setup - did not find expected for: %s", key)
3278 }
3279 if want.attempts != attempts {
3280 t.Errorf("%s: Expected retries: %d, but got: %d", key, want.attempts, attempts)
3281 }
3282 if !cmp.Equal(want.order, interceptor.invokeOrderGot) {
3283 t.Errorf("%s: Expected invoke order to match, diff: %s", key, cmp.Diff(want.order, interceptor.invokeOrderGot))
3284 }
3285 if !cmp.Equal(want.sleeps, interceptor.sleepsGot) {
3286 t.Errorf("%s: Expected sleep sequence to match, diff: %s", key, cmp.Diff(want.sleeps, interceptor.sleepsGot))
3287 }
3288 if !cmp.Equal(want.statusCodes, interceptor.statusCodesGot) {
3289 t.Errorf("%s: Expected status codes to match, diff: %s", key, cmp.Diff(want.statusCodes, interceptor.statusCodesGot))
3290 }
3291 })
3292 }
3293 }
3295 type retryInterceptor struct {
3296 WithRetry
3297 invokeOrderGot []string
3298 }
3300 func (ri *retryInterceptor) IsNextRetry(ctx context.Context, restReq *Request, httpReq *http.Request, resp *http.Response, err error, f IsRetryableErrorFunc) bool {
3301 ri.invokeOrderGot = append(ri.invokeOrderGot, "WithRetry.IsNextRetry")
3302 return ri.WithRetry.IsNextRetry(ctx, restReq, httpReq, resp, err, f)
3303 }
3305 func (ri *retryInterceptor) Before(ctx context.Context, request *Request) error {
3306 ri.invokeOrderGot = append(ri.invokeOrderGot, "WithRetry.Before")
3307 return ri.WithRetry.Before(ctx, request)
3308 }
3310 func (ri *retryInterceptor) After(ctx context.Context, request *Request, resp *http.Response, err error) {
3311 ri.invokeOrderGot = append(ri.invokeOrderGot, "WithRetry.After")
3312 ri.WithRetry.After(ctx, request, resp, err)
3313 }
3315 func (ri *retryInterceptor) Do() {
3316 ri.invokeOrderGot = append(ri.invokeOrderGot, "Client.Do")
3317 }
3319 func testWithRetryInvokeOrder(t *testing.T, key string, doFunc func(ctx context.Context, r *Request)) {
3326 defaultInvokeOrderWant := []string{
3328 "WithRetry.Before",
3329 "Client.Do",
3330 "WithRetry.After",
3333 "WithRetry.IsNextRetry",
3336 "WithRetry.Before",
3337 "Client.Do",
3338 "WithRetry.After",
3342 "WithRetry.IsNextRetry",
3343 }
3345 tests := []struct {
3346 name string
3347 maxRetries int
3348 serverReturns []responseErr
3350 expectations map[string][]string
3351 }{
3352 {
3353 name: "success after one retry",
3354 maxRetries: 1,
3355 serverReturns: []responseErr{
3356 {response: retryAfterResponse(), err: nil},
3357 {response: &http.Response{StatusCode: http.StatusOK}, err: nil},
3358 },
3359 expectations: map[string][]string{
3360 "Do": defaultInvokeOrderWant,
3363 "Watch": defaultInvokeOrderWant[0 : len(defaultInvokeOrderWant)-1],
3364 "Stream": defaultInvokeOrderWant[0 : len(defaultInvokeOrderWant)-1],
3365 },
3366 },
3367 }
3369 for _, test := range tests {
3370 t.Run(test.name, func(t *testing.T) {
3371 interceptor := &retryInterceptor{
3372 WithRetry: &withRetry{maxRetries: test.maxRetries},
3373 }
3375 var attempts int
3376 client := clientForFunc(func(req *http.Request) (*http.Response, error) {
3377 defer func() {
3378 attempts++
3379 }()
3381 interceptor.Do()
3382 resp := test.serverReturns[attempts].response
3383 if resp != nil {
3384 resp.Body = io.NopCloser(bytes.NewReader([]byte{}))
3385 }
3386 return resp, test.serverReturns[attempts].err
3387 })
3389 base, err := url.Parse("http://foo.bar")
3390 if err != nil {
3391 t.Fatalf("Wrong test setup - did not find expected for: %s", key)
3392 }
3393 req := &Request{
3394 verb: "GET",
3395 bodyBytes: []byte{},
3396 c: &RESTClient{
3397 base: base,
3398 content: defaultContentConfig(),
3399 Client: client,
3400 },
3401 pathPrefix: "/api/v1",
3402 rateLimiter: flowcontrol.NewFakeAlwaysRateLimiter(),
3403 backoff: &NoBackoff{},
3404 retryFn: func(_ int) WithRetry { return interceptor },
3405 }
3407 doFunc(context.Background(), req)
3409 if attempts != 2 {
3410 t.Errorf("%s: Expected attempts: %d, but got: %d", key, 2, attempts)
3411 }
3412 invokeOrderWant, ok := test.expectations[key]
3413 if !ok {
3414 t.Fatalf("Wrong test setup - did not find expected for: %s", key)
3415 }
3416 if !cmp.Equal(invokeOrderWant, interceptor.invokeOrderGot) {
3417 t.Errorf("%s: Expected invoke order to match, diff: %s", key, cmp.Diff(invokeOrderWant, interceptor.invokeOrderGot))
3418 }
3419 })
3420 }
3421 }
3423 func testWithWrapPreviousError(t *testing.T, doFunc func(ctx context.Context, r *Request) error) {
3424 var (
3425 containsFormatExpected = "- error from a previous attempt: %s"
3426 nonRetryableErr = errors.New("non retryable error")
3427 )
3429 tests := []struct {
3430 name string
3431 maxRetries int
3432 serverReturns []responseErr
3433 expectedErr error
3434 wrapped bool
3435 attemptsExpected int
3436 contains string
3437 }{
3438 {
3439 name: "success at first attempt",
3440 maxRetries: 2,
3441 serverReturns: []responseErr{
3442 {response: &http.Response{StatusCode: http.StatusOK}, err: nil},
3443 },
3444 attemptsExpected: 1,
3445 expectedErr: nil,
3446 },
3447 {
3448 name: "success after a series of retry-after from the server",
3449 maxRetries: 2,
3450 serverReturns: []responseErr{
3451 {response: retryAfterResponse(), err: nil},
3452 {response: retryAfterResponse(), err: nil},
3453 {response: &http.Response{StatusCode: http.StatusOK}, err: nil},
3454 },
3455 attemptsExpected: 3,
3456 expectedErr: nil,
3457 },
3458 {
3459 name: "success after a series of retryable errors",
3460 maxRetries: 2,
3461 serverReturns: []responseErr{
3462 {response: nil, err: io.EOF},
3463 {response: nil, err: io.EOF},
3464 {response: &http.Response{StatusCode: http.StatusOK}, err: nil},
3465 },
3466 attemptsExpected: 3,
3467 expectedErr: nil,
3468 },
3469 {
3470 name: "request errors out with a non retryable error",
3471 maxRetries: 2,
3472 serverReturns: []responseErr{
3473 {response: nil, err: nonRetryableErr},
3474 },
3475 attemptsExpected: 1,
3476 expectedErr: nonRetryableErr,
3477 },
3478 {
3479 name: "request times out after retries, but no previous error",
3480 maxRetries: 2,
3481 serverReturns: []responseErr{
3482 {response: retryAfterResponse(), err: nil},
3483 {response: retryAfterResponse(), err: nil},
3484 {response: nil, err: context.Canceled},
3485 },
3486 attemptsExpected: 3,
3487 expectedErr: context.Canceled,
3488 },
3489 {
3490 name: "request times out after retries, and we have a relevant previous error",
3491 maxRetries: 3,
3492 serverReturns: []responseErr{
3493 {response: nil, err: io.EOF},
3494 {response: retryAfterResponse(), err: nil},
3495 {response: retryAfterResponse(), err: nil},
3496 {response: nil, err: context.Canceled},
3497 },
3498 attemptsExpected: 4,
3499 wrapped: true,
3500 expectedErr: context.Canceled,
3501 contains: fmt.Sprintf(containsFormatExpected, io.EOF),
3502 },
3503 {
3504 name: "interleaved retry-after responses with retryable errors",
3505 maxRetries: 8,
3506 serverReturns: []responseErr{
3507 {response: retryAfterResponse(), err: nil},
3508 {response: retryAfterResponse(), err: nil},
3509 {response: nil, err: io.ErrUnexpectedEOF},
3510 {response: retryAfterResponse(), err: nil},
3511 {response: retryAfterResponse(), err: nil},
3512 {response: nil, err: io.EOF},
3513 {response: retryAfterResponse(), err: nil},
3514 {response: retryAfterResponse(), err: nil},
3515 {response: nil, err: context.Canceled},
3516 },
3517 attemptsExpected: 9,
3518 wrapped: true,
3519 expectedErr: context.Canceled,
3520 contains: fmt.Sprintf(containsFormatExpected, io.EOF),
3521 },
3522 {
3523 name: "request errors out with a retryable error, followed by a non-retryable one",
3524 maxRetries: 3,
3525 serverReturns: []responseErr{
3526 {response: nil, err: io.EOF},
3527 {response: nil, err: nonRetryableErr},
3528 },
3529 attemptsExpected: 2,
3530 wrapped: true,
3531 expectedErr: nonRetryableErr,
3532 contains: fmt.Sprintf(containsFormatExpected, io.EOF),
3533 },
3534 {
3535 name: "use the most recent error",
3536 maxRetries: 2,
3537 serverReturns: []responseErr{
3538 {response: nil, err: io.ErrUnexpectedEOF},
3539 {response: nil, err: io.EOF},
3540 {response: nil, err: context.Canceled},
3541 },
3542 attemptsExpected: 3,
3543 wrapped: true,
3544 expectedErr: context.Canceled,
3545 contains: fmt.Sprintf(containsFormatExpected, io.EOF),
3546 },
3547 }
3549 for _, test := range tests {
3550 t.Run(test.name, func(t *testing.T) {
3551 var attempts int
3552 client := clientForFunc(func(req *http.Request) (*http.Response, error) {
3553 defer func() {
3554 attempts++
3555 }()
3557 resp := test.serverReturns[attempts].response
3558 if resp != nil {
3559 resp.Body = io.NopCloser(bytes.NewReader([]byte{}))
3560 }
3561 return resp, test.serverReturns[attempts].err
3562 })
3564 base, err := url.Parse("http://foo.bar")
3565 if err != nil {
3566 t.Fatalf("Failed to create new HTTP request - %v", err)
3567 }
3568 req := &Request{
3569 verb: "GET",
3570 bodyBytes: []byte{},
3571 c: &RESTClient{
3572 base: base,
3573 content: defaultContentConfig(),
3574 Client: client,
3575 },
3576 pathPrefix: "/api/v1",
3577 rateLimiter: flowcontrol.NewFakeAlwaysRateLimiter(),
3578 backoff: &noSleepBackOff{},
3579 maxRetries: test.maxRetries,
3580 retryFn: defaultRequestRetryFn,
3581 }
3583 err = doFunc(context.Background(), req)
3584 if test.attemptsExpected != attempts {
3585 t.Errorf("Expected attempts: %d, but got: %d", test.attemptsExpected, attempts)
3586 }
3588 switch {
3589 case test.expectedErr == nil:
3590 if err != nil {
3591 t.Errorf("Expected a nil error, but got: %v", err)
3592 return
3593 }
3594 case test.expectedErr != nil:
3595 if !strings.Contains(err.Error(), test.contains) {
3596 t.Errorf("Expected error message to contain %q, but got: %v", test.contains, err)
3597 }
3599 urlErrGot, _ := err.(*url.Error)
3600 if test.wrapped {
3602 unwrapper, ok := err.(interface {
3603 Unwrap() error
3604 })
3605 if !ok {
3606 t.Errorf("Expected error to implement Unwrap method, but got: %v", err)
3607 return
3608 }
3609 urlErrGot, _ = unwrapper.Unwrap().(*url.Error)
3610 }
3612 if urlErrGot == nil {
3613 t.Errorf("Expected error to be url.Error, but got: %v", err)
3614 return
3615 }
3617 errGot := urlErrGot.Unwrap()
3618 if test.expectedErr != errGot {
3619 t.Errorf("Expected error %v, but got: %v", test.expectedErr, errGot)
3620 }
3621 }
3622 })
3623 }
3624 }
3626 func TestReuseRequest(t *testing.T) {
3627 var tests = []struct {
3628 name string
3629 enableHTTP2 bool
3630 }{
3631 {"HTTP1", false},
3632 {"HTTP2", true},
3633 }
3634 for _, tt := range tests {
3635 t.Run(tt.name, func(t *testing.T) {
3637 ts := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
3638 w.Write([]byte(r.RemoteAddr))
3639 }))
3640 ts.EnableHTTP2 = tt.enableHTTP2
3641 ts.StartTLS()
3642 defer ts.Close()
3644 ctx, cancel := context.WithCancel(context.Background())
3645 defer cancel()
3647 c := testRESTClient(t, ts)
3649 req1, err := c.Verb("GET").
3650 Prefix("foo").
3651 DoRaw(ctx)
3652 if err != nil {
3653 t.Fatalf("Unexpected error: %v", err)
3654 }
3656 req2, err := c.Verb("GET").
3657 Prefix("foo").
3658 DoRaw(ctx)
3659 if err != nil {
3660 t.Fatalf("Unexpected error: %v", err)
3661 }
3663 if string(req1) != string(req2) {
3664 t.Fatalf("Expected %v to be equal to %v", string(req1), string(req2))
3665 }
3667 })
3668 }
3669 }
3671 func TestHTTP1DoNotReuseRequestAfterTimeout(t *testing.T) {
3672 var tests = []struct {
3673 name string
3674 enableHTTP2 bool
3675 }{
3676 {"HTTP1", false},
3677 {"HTTP2", true},
3678 }
3679 for _, tt := range tests {
3680 t.Run(tt.name, func(t *testing.T) {
3681 done := make(chan struct{})
3682 ts := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
3683 t.Logf("TEST Connected from %v on %v\n", r.RemoteAddr, r.URL.Path)
3684 if r.URL.Path == "/hang" {
3685 t.Logf("TEST hanging %v\n", r.RemoteAddr)
3686 <-done
3687 }
3688 w.Write([]byte(r.RemoteAddr))
3689 }))
3690 ts.EnableHTTP2 = tt.enableHTTP2
3691 ts.StartTLS()
3692 defer ts.Close()
3694 defer close(done)
3696 ctx, cancel := context.WithCancel(context.Background())
3697 defer cancel()
3699 transport, ok := ts.Client().Transport.(*http.Transport)
3700 if !ok {
3701 t.Fatalf("failed to assert *http.Transport")
3702 }
3704 config := &Config{
3705 Host: ts.URL,
3706 Transport: utilnet.SetTransportDefaults(transport),
3707 Timeout: 1 * time.Second,
3709 ContentConfig: ContentConfig{
3710 GroupVersion: &schema.GroupVersion{},
3711 NegotiatedSerializer: &serializer.CodecFactory{},
3712 },
3713 }
3714 if !tt.enableHTTP2 {
3715 config.TLSClientConfig.NextProtos = []string{"http/1.1"}
3716 }
3717 c, err := RESTClientFor(config)
3718 if err != nil {
3719 t.Fatalf("failed to create REST client: %v", err)
3720 }
3721 req1, err := c.Verb("GET").
3722 Prefix("foo").
3723 DoRaw(ctx)
3724 if err != nil {
3725 t.Fatalf("Unexpected error: %v", err)
3726 }
3728 _, err = c.Verb("GET").
3729 Prefix("/hang").
3730 DoRaw(ctx)
3731 if err == nil {
3732 t.Fatalf("Expected error")
3733 }
3735 req2, err := c.Verb("GET").
3736 Prefix("foo").
3737 DoRaw(ctx)
3738 if err != nil {
3739 t.Fatalf("Unexpected error: %v", err)
3740 }
3743 if tt.enableHTTP2 != (string(req1) == string(req2)) {
3744 if tt.enableHTTP2 {
3745 t.Fatalf("Expected %v to be the same as %v", string(req1), string(req2))
3746 } else {
3747 t.Fatalf("Expected %v to be different to %v", string(req1), string(req2))
3748 }
3749 }
3750 })
3751 }
3752 }
3754 func TestTransportConcurrency(t *testing.T) {
3755 const numReqs = 10
3756 var tests = []struct {
3757 name string
3758 enableHTTP2 bool
3759 }{
3760 {"HTTP1", false},
3761 {"HTTP2", true},
3762 }
3763 for _, tt := range tests {
3764 t.Run(tt.name, func(t *testing.T) {
3766 ts := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
3767 t.Logf("Connected from %v %v", r.RemoteAddr, r.URL)
3768 fmt.Fprintf(w, "%v", r.FormValue("echo"))
3769 }))
3770 ts.EnableHTTP2 = tt.enableHTTP2
3771 ts.StartTLS()
3772 defer ts.Close()
3773 var wg sync.WaitGroup
3775 wg.Add(numReqs)
3776 c := testRESTClient(t, ts)
3777 reqs := make(chan string)
3778 defer close(reqs)
3780 for i := 0; i < 4; i++ {
3781 go func() {
3782 for req := range reqs {
3783 res, err := c.Get().Param("echo", req).DoRaw(context.Background())
3784 if err != nil {
3785 t.Errorf("error on req %s: %v", req, err)
3786 wg.Done()
3787 continue
3788 }
3790 if string(res) != req {
3791 t.Errorf("body of req %s = %q; want %q", req, res, req)
3792 }
3794 wg.Done()
3795 }
3796 }()
3797 }
3798 for i := 0; i < numReqs; i++ {
3799 reqs <- fmt.Sprintf("request-%d", i)
3800 }
3801 wg.Wait()
3802 })
3803 }
3804 }
3806 func TestRetryableConditions(t *testing.T) {
3807 var (
3808 methods = map[string]func(ctx context.Context, r *Request){
3809 "Do": func(ctx context.Context, r *Request) {
3810 r.Do(ctx)
3811 },
3812 "DoRaw": func(ctx context.Context, r *Request) {
3813 r.DoRaw(ctx)
3814 },
3815 "Stream": func(ctx context.Context, r *Request) {
3816 r.Stream(ctx)
3817 },
3818 "Watch": func(ctx context.Context, r *Request) {
3819 w, err := r.Watch(ctx)
3820 if err == nil {
3822 <-w.ResultChan()
3823 }
3824 },
3825 }
3827 alwaysRetry = map[string]bool{
3828 "Do": true,
3829 "DoRaw": true,
3830 "Watch": true,
3831 "Stream": true,
3832 }
3834 neverRetry = map[string]bool{
3835 "Do": false,
3836 "DoRaw": false,
3837 "Watch": false,
3838 "Stream": false,
3839 }
3841 alwaysRetryExceptStream = map[string]bool{
3842 "Do": true,
3843 "DoRaw": true,
3844 "Watch": true,
3845 "Stream": false,
3846 }
3847 )
3849 tests := []struct {
3850 name string
3851 verbs []string
3852 serverReturns responseErr
3853 retryExpectation map[string]bool
3854 }{
3856 {
3857 name: "server returns {429, Retry-After}",
3858 verbs: []string{"GET", "POST", "PUT", "DELETE", "PATCH"},
3859 serverReturns: responseErr{response: retryAfterResponseWithCodeAndDelay(http.StatusTooManyRequests, "0"), err: nil},
3860 retryExpectation: alwaysRetry,
3861 },
3863 {
3864 name: "server returns {503, Retry-After}",
3865 verbs: []string{"GET", "POST", "PUT", "DELETE", "PATCH"},
3866 serverReturns: responseErr{response: retryAfterResponseWithCodeAndDelay(http.StatusServiceUnavailable, "0"), err: nil},
3867 retryExpectation: alwaysRetry,
3868 },
3870 {
3871 name: "server returns 5xx, but no Retry-After",
3872 verbs: []string{"GET", "POST", "PUT", "DELETE", "PATCH"},
3873 serverReturns: responseErr{response: &http.Response{StatusCode: http.StatusInternalServerError}, err: nil},
3874 retryExpectation: neverRetry,
3875 },
3877 {
3878 name: "server returns 429 but no Retry-After",
3879 verbs: []string{"GET", "POST", "PUT", "DELETE", "PATCH"},
3880 serverReturns: responseErr{response: &http.Response{StatusCode: http.StatusTooManyRequests}, err: nil},
3881 retryExpectation: neverRetry,
3882 },
3884 {
3885 name: "server returns connection reset error",
3886 verbs: []string{"GET"},
3887 serverReturns: responseErr{response: nil, err: syscall.ECONNRESET},
3888 retryExpectation: alwaysRetryExceptStream,
3889 },
3890 {
3891 name: "server returns EOF error",
3892 verbs: []string{"GET"},
3893 serverReturns: responseErr{response: nil, err: io.EOF},
3894 retryExpectation: alwaysRetryExceptStream,
3895 },
3896 {
3897 name: "server returns unexpected EOF error",
3898 verbs: []string{"GET"},
3899 serverReturns: responseErr{response: nil, err: io.ErrUnexpectedEOF},
3900 retryExpectation: alwaysRetryExceptStream,
3901 },
3902 {
3903 name: "server returns broken connection error",
3904 verbs: []string{"GET"},
3905 serverReturns: responseErr{response: nil, err: errors.New("http: can't write HTTP request on broken connection")},
3906 retryExpectation: alwaysRetryExceptStream,
3907 },
3908 {
3909 name: "server returns GOAWAY error",
3910 verbs: []string{"GET"},
3911 serverReturns: responseErr{response: nil, err: errors.New("http2: server sent GOAWAY and closed the connection")},
3912 retryExpectation: alwaysRetryExceptStream,
3913 },
3914 {
3915 name: "server returns connection reset by peer error",
3916 verbs: []string{"GET"},
3917 serverReturns: responseErr{response: nil, err: errors.New("connection reset by peer")},
3918 retryExpectation: alwaysRetryExceptStream,
3919 },
3920 {
3921 name: "server returns use of closed network connection error",
3922 verbs: []string{"GET"},
3923 serverReturns: responseErr{response: nil, err: errors.New("use of closed network connection")},
3924 retryExpectation: alwaysRetryExceptStream,
3925 },
3927 {
3928 name: "server returns connection refused error",
3929 verbs: []string{"GET"},
3930 serverReturns: responseErr{response: nil, err: syscall.ECONNREFUSED},
3931 retryExpectation: neverRetry,
3932 },
3933 {
3934 name: "server returns connection refused error",
3935 verbs: []string{"POST"},
3936 serverReturns: responseErr{response: nil, err: syscall.ECONNREFUSED},
3937 retryExpectation: neverRetry,
3938 },
3939 {
3940 name: "server returns EOF error",
3941 verbs: []string{"POST"},
3942 serverReturns: responseErr{response: nil, err: io.EOF},
3943 retryExpectation: map[string]bool{
3944 "Do": false,
3945 "DoRaw": false,
3946 "Watch": true,
3947 "Stream": false,
3948 },
3949 },
3951 {
3952 name: "server returns net.Timeout() == true error",
3953 verbs: []string{"GET"},
3954 serverReturns: responseErr{response: nil, err: &net.DNSError{IsTimeout: true}},
3955 retryExpectation: map[string]bool{
3956 "Do": false,
3957 "DoRaw": false,
3958 "Watch": true,
3959 "Stream": false,
3960 },
3961 },
3962 {
3963 name: "server returns OK, never retry",
3964 verbs: []string{"GET", "POST", "PUT", "DELETE", "PATCH"},
3965 serverReturns: responseErr{response: &http.Response{StatusCode: http.StatusOK}, err: nil},
3966 retryExpectation: neverRetry,
3967 },
3968 {
3969 name: "server returns {3xx, Retry-After}",
3970 verbs: []string{"GET", "POST", "PUT", "DELETE", "PATCH"},
3971 serverReturns: responseErr{response: &http.Response{StatusCode: http.StatusMovedPermanently, Header: http.Header{"Retry-After": []string{"0"}}}, err: nil},
3972 retryExpectation: neverRetry,
3973 },
3974 }
3976 for _, test := range tests {
3977 for method, retryExpected := range test.retryExpectation {
3978 fn, ok := methods[method]
3979 if !ok {
3980 t.Fatalf("Wrong test setup, unknown method: %s", method)
3981 }
3983 for _, verb := range test.verbs {
3984 t.Run(fmt.Sprintf("%s/%s/%s", test.name, method, verb), func(t *testing.T) {
3985 var attemptsGot int
3986 client := clientForFunc(func(req *http.Request) (*http.Response, error) {
3987 attemptsGot++
3988 return test.serverReturns.response, test.serverReturns.err
3989 })
3991 u, _ := url.Parse("http://localhost:123" + "/apis")
3992 req := &Request{
3993 verb: verb,
3994 c: &RESTClient{
3995 base: u,
3996 content: defaultContentConfig(),
3997 Client: client,
3998 },
3999 backoff: &noSleepBackOff{},
4000 maxRetries: 2,
4001 retryFn: defaultRequestRetryFn,
4002 }
4004 fn(context.Background(), req)
4006 if retryExpected {
4007 if attemptsGot != 3 {
4008 t.Errorf("Expected attempt count: %d, but got: %d", 3, attemptsGot)
4009 }
4010 return
4011 }
4013 if attemptsGot > 1 {
4014 t.Errorf("Expected no retry, but got %d attempts", attemptsGot)
4015 }
4016 })
4017 }
4018 }
4019 }
4020 }
4022 func TestRequestConcurrencyWithRetry(t *testing.T) {
4023 var attempts int32
4024 client := clientForFunc(func(req *http.Request) (*http.Response, error) {
4025 defer func() {
4026 atomic.AddInt32(&attempts, 1)
4027 }()
4030 return &http.Response{
4031 StatusCode: http.StatusInternalServerError,
4032 Header: http.Header{"Retry-After": []string{"1"}},
4033 }, nil
4034 })
4036 req := &Request{
4037 verb: "POST",
4038 c: &RESTClient{
4039 content: defaultContentConfig(),
4040 Client: client,
4041 },
4042 backoff: &noSleepBackOff{},
4043 maxRetries: 9,
4044 retryFn: defaultRequestRetryFn,
4045 }
4047 concurrency := 20
4048 wg := sync.WaitGroup{}
4049 wg.Add(concurrency)
4050 startCh := make(chan struct{})
4051 for i := 0; i < concurrency; i++ {
4052 go func() {
4053 defer wg.Done()
4054 <-startCh
4055 req.Do(context.Background())
4056 }()
4057 }
4059 close(startCh)
4060 wg.Wait()
4063 expected := concurrency * (req.maxRetries + 1)
4064 if atomic.LoadInt32(&attempts) != int32(expected) {
4065 t.Errorf("Expected attempts: %d, but got: %d", expected, attempts)
4066 }
4067 }
View as plain text