1
16
17 package rest
18
19 import (
20 "bytes"
21 "context"
22 "errors"
23 "flag"
24 "fmt"
25 "io"
26 "net"
27 "net/http"
28 "net/http/httptest"
29 "net/url"
30 "os"
31 "reflect"
32 "strings"
33 "sync"
34 "sync/atomic"
35 "syscall"
36 "testing"
37 "time"
38
39 "github.com/google/go-cmp/cmp"
40 v1 "k8s.io/api/core/v1"
41 apiequality "k8s.io/apimachinery/pkg/api/equality"
42 apierrors "k8s.io/apimachinery/pkg/api/errors"
43 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
44 "k8s.io/apimachinery/pkg/runtime"
45 "k8s.io/apimachinery/pkg/runtime/schema"
46 "k8s.io/apimachinery/pkg/runtime/serializer"
47 "k8s.io/apimachinery/pkg/runtime/serializer/streaming"
48 "k8s.io/apimachinery/pkg/util/intstr"
49 utilnet "k8s.io/apimachinery/pkg/util/net"
50 "k8s.io/apimachinery/pkg/watch"
51 "k8s.io/client-go/kubernetes/scheme"
52 restclientwatch "k8s.io/client-go/rest/watch"
53 "k8s.io/client-go/tools/metrics"
54 "k8s.io/client-go/util/flowcontrol"
55 utiltesting "k8s.io/client-go/util/testing"
56 "k8s.io/klog/v2"
57 testingclock "k8s.io/utils/clock/testing"
58 )
59
60 func TestNewRequestSetsAccept(t *testing.T) {
61 r := NewRequestWithClient(&url.URL{Path: "/path/"}, "", ClientContentConfig{}, nil).Verb("get")
62 if r.headers.Get("Accept") != "" {
63 t.Errorf("unexpected headers: %#v", r.headers)
64 }
65 r = NewRequestWithClient(&url.URL{Path: "/path/"}, "", ClientContentConfig{ContentType: "application/other"}, nil).Verb("get")
66 if r.headers.Get("Accept") != "application/other, */*" {
67 t.Errorf("unexpected headers: %#v", r.headers)
68 }
69 }
70
71 func clientForFunc(fn clientFunc) *http.Client {
72 return &http.Client{
73 Transport: fn,
74 }
75 }
76
77 type clientFunc func(req *http.Request) (*http.Response, error)
78
79 func (f clientFunc) RoundTrip(req *http.Request) (*http.Response, error) {
80 return f(req)
81 }
82
83 func TestRequestSetsHeaders(t *testing.T) {
84 server := clientForFunc(func(req *http.Request) (*http.Response, error) {
85 if req.Header.Get("Accept") != "application/other, */*" {
86 t.Errorf("unexpected headers: %#v", req.Header)
87 }
88 return &http.Response{
89 StatusCode: http.StatusForbidden,
90 Body: io.NopCloser(bytes.NewReader([]byte{})),
91 }, nil
92 })
93 config := defaultContentConfig()
94 config.ContentType = "application/other"
95 r := NewRequestWithClient(&url.URL{Path: "/path"}, "", config, nil).Verb("get")
96 r.c.Client = server
97
98
99 _ = r.Do(context.Background())
100 _, _ = r.Watch(context.Background())
101 _, _ = r.Stream(context.Background())
102 }
103
104 func TestRequestWithErrorWontChange(t *testing.T) {
105 gvCopy := v1.SchemeGroupVersion
106 original := Request{
107 err: errors.New("test"),
108 c: &RESTClient{
109 content: ClientContentConfig{GroupVersion: gvCopy},
110 },
111 }
112 r := original
113 changed := r.Param("foo", "bar").
114 AbsPath("/abs").
115 Prefix("test").
116 Suffix("testing").
117 Namespace("new").
118 Resource("foos").
119 Name("bars").
120 Body("foo").
121 Timeout(time.Millisecond)
122 if changed != &r {
123 t.Errorf("returned request should point to the same object")
124 }
125 if !reflect.DeepEqual(changed, &original) {
126 t.Errorf("expected %#v, got %#v", &original, changed)
127 }
128 }
129
130 func TestRequestPreservesBaseTrailingSlash(t *testing.T) {
131 r := &Request{c: &RESTClient{base: &url.URL{}}, pathPrefix: "/path/"}
132 if s := r.URL().String(); s != "/path/" {
133 t.Errorf("trailing slash should be preserved: %s", s)
134 }
135 }
136
137 func TestRequestAbsPathPreservesTrailingSlash(t *testing.T) {
138 r := (&Request{c: &RESTClient{base: &url.URL{}}}).AbsPath("/foo/")
139 if s := r.URL().String(); s != "/foo/" {
140 t.Errorf("trailing slash should be preserved: %s", s)
141 }
142 }
143
144 func TestRequestAbsPathJoins(t *testing.T) {
145 r := (&Request{c: &RESTClient{base: &url.URL{}}}).AbsPath("foo/bar", "baz")
146 if s := r.URL().String(); s != "foo/bar/baz" {
147 t.Errorf("trailing slash should be preserved: %s", s)
148 }
149 }
150
151 func TestRequestSetsNamespace(t *testing.T) {
152 r := (&Request{
153 c: &RESTClient{base: &url.URL{Path: "/"}},
154 }).Namespace("foo")
155 if r.namespace == "" {
156 t.Errorf("namespace should be set: %#v", r)
157 }
158
159 if s := r.URL().String(); s != "namespaces/foo" {
160 t.Errorf("namespace should be in path: %s", s)
161 }
162 }
163
164 func TestRequestOrdersNamespaceInPath(t *testing.T) {
165 r := (&Request{
166 c: &RESTClient{base: &url.URL{}},
167 pathPrefix: "/test/",
168 }).Name("bar").Resource("baz").Namespace("foo")
169 if s := r.URL().String(); s != "/test/namespaces/foo/baz/bar" {
170 t.Errorf("namespace should be in order in path: %s", s)
171 }
172 }
173
174 func TestRequestOrdersSubResource(t *testing.T) {
175 r := (&Request{
176 c: &RESTClient{base: &url.URL{}},
177 pathPrefix: "/test/",
178 }).Name("bar").Resource("baz").Namespace("foo").Suffix("test").SubResource("a", "b")
179 if s := r.URL().String(); s != "/test/namespaces/foo/baz/bar/a/b/test" {
180 t.Errorf("namespace should be in order in path: %s", s)
181 }
182 }
183
184 func TestRequestSetTwiceError(t *testing.T) {
185 if (&Request{}).Name("bar").Name("baz").err == nil {
186 t.Errorf("setting name twice should result in error")
187 }
188 if (&Request{}).Namespace("bar").Namespace("baz").err == nil {
189 t.Errorf("setting namespace twice should result in error")
190 }
191 if (&Request{}).Resource("bar").Resource("baz").err == nil {
192 t.Errorf("setting resource twice should result in error")
193 }
194 if (&Request{}).SubResource("bar").SubResource("baz").err == nil {
195 t.Errorf("setting subresource twice should result in error")
196 }
197 }
198
199 func TestInvalidSegments(t *testing.T) {
200 invalidSegments := []string{".", "..", "test/segment", "test%2bsegment"}
201 setters := map[string]func(string, *Request){
202 "namespace": func(s string, r *Request) { r.Namespace(s) },
203 "resource": func(s string, r *Request) { r.Resource(s) },
204 "name": func(s string, r *Request) { r.Name(s) },
205 "subresource": func(s string, r *Request) { r.SubResource(s) },
206 }
207 for _, invalidSegment := range invalidSegments {
208 for setterName, setter := range setters {
209 r := &Request{}
210 setter(invalidSegment, r)
211 if r.err == nil {
212 t.Errorf("%s: %s: expected error, got none", setterName, invalidSegment)
213 }
214 }
215 }
216 }
217
218 func TestRequestParam(t *testing.T) {
219 r := (&Request{}).Param("foo", "a")
220 if !reflect.DeepEqual(r.params, url.Values{"foo": []string{"a"}}) {
221 t.Errorf("should have set a param: %#v", r)
222 }
223
224 r.Param("bar", "1")
225 r.Param("bar", "2")
226 if !reflect.DeepEqual(r.params, url.Values{"foo": []string{"a"}, "bar": []string{"1", "2"}}) {
227 t.Errorf("should have set a param: %#v", r)
228 }
229 }
230
231 func TestRequestVersionedParams(t *testing.T) {
232 r := (&Request{c: &RESTClient{content: ClientContentConfig{GroupVersion: v1.SchemeGroupVersion}}}).Param("foo", "a")
233 if !reflect.DeepEqual(r.params, url.Values{"foo": []string{"a"}}) {
234 t.Errorf("should have set a param: %#v", r)
235 }
236 r.VersionedParams(&v1.PodLogOptions{Follow: true, Container: "bar"}, scheme.ParameterCodec)
237
238 if !reflect.DeepEqual(r.params, url.Values{
239 "foo": []string{"a"},
240 "container": []string{"bar"},
241 "follow": []string{"true"},
242 }) {
243 t.Errorf("should have set a param: %#v", r)
244 }
245 }
246
247 func TestRequestVersionedParamsFromListOptions(t *testing.T) {
248 r := &Request{c: &RESTClient{content: ClientContentConfig{GroupVersion: v1.SchemeGroupVersion}}}
249 r.VersionedParams(&metav1.ListOptions{ResourceVersion: "1"}, scheme.ParameterCodec)
250 if !reflect.DeepEqual(r.params, url.Values{
251 "resourceVersion": []string{"1"},
252 }) {
253 t.Errorf("should have set a param: %#v", r)
254 }
255
256 var timeout int64 = 10
257 r.VersionedParams(&metav1.ListOptions{ResourceVersion: "2", TimeoutSeconds: &timeout}, scheme.ParameterCodec)
258 if !reflect.DeepEqual(r.params, url.Values{
259 "resourceVersion": []string{"1", "2"},
260 "timeoutSeconds": []string{"10"},
261 }) {
262 t.Errorf("should have set a param: %#v %v", r.params, r.err)
263 }
264 }
265
266 func TestRequestVersionedParamsWithInvalidScheme(t *testing.T) {
267 parameterCodec := runtime.NewParameterCodec(runtime.NewScheme())
268 r := (&Request{c: &RESTClient{content: ClientContentConfig{GroupVersion: v1.SchemeGroupVersion}}})
269 r.VersionedParams(&v1.PodExecOptions{Stdin: false, Stdout: true},
270 parameterCodec)
271
272 if r.Error() == nil {
273 t.Errorf("should have recorded an error: %#v", r.params)
274 }
275 }
276
277 func TestRequestError(t *testing.T) {
278
279 r := (&Request{}).Body([]string{"test"})
280
281 if r.Error() != r.err {
282 t.Errorf("getter should be identical to reference: %#v %#v", r.Error(), r.err)
283 }
284 }
285
286 func TestRequestURI(t *testing.T) {
287 r := (&Request{}).Param("foo", "a")
288 r.Prefix("other")
289 r.RequestURI("/test?foo=b&a=b&c=1&c=2")
290 if r.pathPrefix != "/test" {
291 t.Errorf("path is wrong: %#v", r)
292 }
293 if !reflect.DeepEqual(r.params, url.Values{"a": []string{"b"}, "foo": []string{"b"}, "c": []string{"1", "2"}}) {
294 t.Errorf("should have set a param: %#v", r)
295 }
296 }
297
298 type NotAnAPIObject struct{}
299
300 func (obj NotAnAPIObject) GroupVersionKind() *schema.GroupVersionKind { return nil }
301 func (obj NotAnAPIObject) SetGroupVersionKind(gvk *schema.GroupVersionKind) {}
302
303 func defaultContentConfig() ClientContentConfig {
304 gvCopy := v1.SchemeGroupVersion
305 return ClientContentConfig{
306 ContentType: "application/json",
307 GroupVersion: gvCopy,
308 Negotiator: runtime.NewClientNegotiator(scheme.Codecs.WithoutConversion(), gvCopy),
309 }
310 }
311
312 func TestRequestBody(t *testing.T) {
313
314 r := (&Request{}).Body([]string{"test"})
315 if r.err == nil || r.body != nil {
316 t.Errorf("should have set err and left body nil: %#v", r)
317 }
318
319
320 f, err := os.CreateTemp("", "")
321 if err != nil {
322 t.Fatalf("unable to create temp file")
323 }
324 defer f.Close()
325 os.Remove(f.Name())
326 r = (&Request{}).Body(f.Name())
327 if r.err == nil || r.body != nil {
328 t.Errorf("should have set err and left body nil: %#v", r)
329 }
330
331
332 r = (&Request{c: &RESTClient{content: defaultContentConfig()}}).Body(&NotAnAPIObject{})
333 if r.err == nil || r.body != nil {
334 t.Errorf("should have set err and left body nil: %#v", r)
335 }
336 }
337
338 func TestResultIntoWithErrReturnsErr(t *testing.T) {
339 res := Result{err: errors.New("test")}
340 if err := res.Into(&v1.Pod{}); err != res.err {
341 t.Errorf("should have returned exact error from result")
342 }
343 }
344
345 func TestResultIntoWithNoBodyReturnsErr(t *testing.T) {
346 res := Result{
347 body: []byte{},
348 decoder: scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion),
349 }
350 if err := res.Into(&v1.Pod{}); err == nil || !strings.Contains(err.Error(), "0-length") {
351 t.Errorf("should have complained about 0 length body")
352 }
353 }
354
355 func TestURLTemplate(t *testing.T) {
356 uri, _ := url.Parse("http://localhost/some/base/url/path")
357 uriSingleSlash, _ := url.Parse("http://localhost/")
358 testCases := []struct {
359 Request *Request
360 ExpectedFullURL string
361 ExpectedFinalURL string
362 }{
363 {
364
365 Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("POST").
366 Prefix("api", "v1").Resource("r1").Namespace("ns").Name("nm").Param("p0", "v0"),
367 ExpectedFullURL: "http://localhost/some/base/url/path/api/v1/namespaces/ns/r1/nm?p0=v0",
368 ExpectedFinalURL: "http://localhost/some/base/url/path/api/v1/namespaces/%7Bnamespace%7D/r1/%7Bname%7D?p0=%7Bvalue%7D",
369 },
370 {
371
372 Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("POST").
373 Prefix("pre1", "v1").Resource("r1").Namespace("ns").Name("nm").Param("p0", "v0"),
374 ExpectedFullURL: "http://localhost/some/base/url/path/pre1/v1/namespaces/ns/r1/nm?p0=v0",
375 ExpectedFinalURL: "http://localhost/%7Bprefix%7D",
376 },
377 {
378
379
380 Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE").
381 Prefix("/api/v1/namespaces/ns/r1/name1"),
382 ExpectedFullURL: "http://localhost/some/base/url/path/api/v1/namespaces/ns/r1/name1",
383 ExpectedFinalURL: "http://localhost/some/base/url/path/api/v1/namespaces/%7Bnamespace%7D/r1/%7Bname%7D",
384 },
385 {
386
387
388 Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE").
389 Prefix("/apis/g1/v1/namespaces/ns/r1/name1"),
390 ExpectedFullURL: "http://localhost/some/base/url/path/apis/g1/v1/namespaces/ns/r1/name1",
391 ExpectedFinalURL: "http://localhost/some/base/url/path/apis/g1/v1/namespaces/%7Bnamespace%7D/r1/%7Bname%7D",
392 },
393 {
394
395
396 Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE").
397 Prefix("/api/v1/namespaces/ns/r1"),
398 ExpectedFullURL: "http://localhost/some/base/url/path/api/v1/namespaces/ns/r1",
399 ExpectedFinalURL: "http://localhost/some/base/url/path/api/v1/namespaces/%7Bnamespace%7D/r1",
400 },
401 {
402
403
404 Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE").
405 Prefix("/apis/g1/v1/namespaces/ns/r1"),
406 ExpectedFullURL: "http://localhost/some/base/url/path/apis/g1/v1/namespaces/ns/r1",
407 ExpectedFinalURL: "http://localhost/some/base/url/path/apis/g1/v1/namespaces/%7Bnamespace%7D/r1",
408 },
409 {
410
411
412 Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE").
413 Prefix("/api/v1/r1/name1"),
414 ExpectedFullURL: "http://localhost/some/base/url/path/api/v1/r1/name1",
415 ExpectedFinalURL: "http://localhost/some/base/url/path/api/v1/r1/%7Bname%7D",
416 },
417 {
418
419
420 Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE").
421 Prefix("/apis/g1/v1/r1/name1"),
422 ExpectedFullURL: "http://localhost/some/base/url/path/apis/g1/v1/r1/name1",
423 ExpectedFinalURL: "http://localhost/some/base/url/path/apis/g1/v1/r1/%7Bname%7D",
424 },
425 {
426
427
428 Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE").
429 Prefix("/apis/namespaces/namespaces/namespaces/namespaces/namespaces/namespaces/finalize"),
430 ExpectedFullURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces/namespaces/namespaces/namespaces/finalize",
431 ExpectedFinalURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces/%7Bnamespace%7D/namespaces/%7Bname%7D/finalize",
432 },
433 {
434
435
436 Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE").
437 Prefix("/apis/namespaces/namespaces/namespaces/namespaces/namespaces/namespaces"),
438 ExpectedFullURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces/namespaces/namespaces/namespaces",
439 ExpectedFinalURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces/%7Bnamespace%7D/namespaces/%7Bname%7D",
440 },
441 {
442
443
444 Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE").
445 Prefix("/apis/namespaces/namespaces/namespaces/namespaces/namespaces/finalize"),
446 ExpectedFullURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces/namespaces/namespaces/finalize",
447 ExpectedFinalURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces/%7Bnamespace%7D/namespaces/finalize",
448 },
449 {
450
451
452 Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE").
453 Prefix("/apis/namespaces/namespaces/namespaces/namespaces/namespaces/status"),
454 ExpectedFullURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces/namespaces/namespaces/status",
455 ExpectedFinalURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces/%7Bnamespace%7D/namespaces/status",
456 },
457 {
458
459
460 Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE").
461 Prefix("/apis/namespaces/namespaces/namespaces/namespaces/namespaces"),
462 ExpectedFullURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces/namespaces/namespaces",
463 ExpectedFinalURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces/%7Bnamespace%7D/namespaces",
464 },
465 {
466
467
468 Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE").
469 Prefix("/apis/namespaces/namespaces/namespaces/namespaces/finalize"),
470 ExpectedFullURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces/namespaces/finalize",
471 ExpectedFinalURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces/%7Bname%7D/finalize",
472 },
473 {
474
475
476 Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE").
477 Prefix("/apis/namespaces/namespaces/namespaces/namespaces/status"),
478 ExpectedFullURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces/namespaces/status",
479 ExpectedFinalURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces/%7Bname%7D/status",
480 },
481 {
482
483
484 Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE").
485 Prefix("/apis/namespaces/namespaces/namespaces/namespaces"),
486 ExpectedFullURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces/namespaces",
487 ExpectedFinalURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces/%7Bname%7D",
488 },
489 {
490
491
492 Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE").
493 Prefix("/apis/namespaces/namespaces/namespaces"),
494 ExpectedFullURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces",
495 ExpectedFinalURL: "http://localhost/some/base/url/path/apis/namespaces/namespaces/namespaces",
496 },
497 {
498
499
500 Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE").
501 Prefix("/pre1/namespaces/namespaces/namespaces/namespaces/namespaces/namespaces/finalize"),
502 ExpectedFullURL: "http://localhost/some/base/url/path/pre1/namespaces/namespaces/namespaces/namespaces/namespaces/namespaces/finalize",
503 ExpectedFinalURL: "http://localhost/%7Bprefix%7D",
504 },
505 {
506
507
508 Request: NewRequestWithClient(uriSingleSlash, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE").
509 Prefix("/api/v1/namespaces/ns/r2/name1"),
510 ExpectedFullURL: "http://localhost/api/v1/namespaces/ns/r2/name1",
511 ExpectedFinalURL: "http://localhost/api/v1/namespaces/%7Bnamespace%7D/r2/%7Bname%7D",
512 },
513 {
514
515
516 Request: NewRequestWithClient(uri, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE").
517 Prefix("/api/v1/namespaces/ns/r3/name1"),
518 ExpectedFullURL: "http://localhost/some/base/url/path/api/v1/namespaces/ns/r3/name1",
519 ExpectedFinalURL: "http://localhost/some/base/url/path/api/v1/namespaces/%7Bnamespace%7D/r3/%7Bname%7D",
520 },
521 {
522
523
524 Request: NewRequestWithClient(uriSingleSlash, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE").
525 Prefix("/"),
526 ExpectedFullURL: "http://localhost/",
527 ExpectedFinalURL: "http://localhost/",
528 },
529 {
530
531
532 Request: NewRequestWithClient(uriSingleSlash, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("DELETE").
533 Prefix("/version"),
534 ExpectedFullURL: "http://localhost/version",
535 ExpectedFinalURL: "http://localhost/version",
536 },
537 }
538 for i, testCase := range testCases {
539 r := testCase.Request
540 full := r.URL()
541 if full.String() != testCase.ExpectedFullURL {
542 t.Errorf("%d: unexpected initial URL: %s %s", i, full, testCase.ExpectedFullURL)
543 }
544 actualURL := r.finalURLTemplate()
545 actual := actualURL.String()
546 if actual != testCase.ExpectedFinalURL {
547 t.Errorf("%d: unexpected URL template: %s %s", i, actual, testCase.ExpectedFinalURL)
548 }
549 if r.URL().String() != full.String() {
550 t.Errorf("%d, creating URL template changed request: %s -> %s", i, full.String(), r.URL().String())
551 }
552 }
553 }
554
555 func TestTransformResponse(t *testing.T) {
556 invalid := []byte("aaaaa")
557 uri, _ := url.Parse("http://localhost")
558 testCases := []struct {
559 Response *http.Response
560 Data []byte
561 Created bool
562 Error bool
563 ErrFn func(err error) bool
564 }{
565 {Response: &http.Response{StatusCode: http.StatusOK}, Data: []byte{}},
566 {Response: &http.Response{StatusCode: http.StatusCreated}, Data: []byte{}, Created: true},
567 {Response: &http.Response{StatusCode: 199}, Error: true},
568 {Response: &http.Response{StatusCode: http.StatusInternalServerError}, Error: true},
569 {Response: &http.Response{StatusCode: http.StatusUnprocessableEntity}, Error: true},
570 {Response: &http.Response{StatusCode: http.StatusConflict}, Error: true},
571 {Response: &http.Response{StatusCode: http.StatusNotFound}, Error: true},
572 {Response: &http.Response{StatusCode: http.StatusUnauthorized}, Error: true},
573 {
574 Response: &http.Response{
575 StatusCode: http.StatusUnauthorized,
576 Header: http.Header{"Content-Type": []string{"application/json"}},
577 Body: io.NopCloser(bytes.NewReader(invalid)),
578 },
579 Error: true,
580 ErrFn: func(err error) bool {
581 return err.Error() != "aaaaa" && apierrors.IsUnauthorized(err)
582 },
583 },
584 {
585 Response: &http.Response{
586 StatusCode: http.StatusUnauthorized,
587 Header: http.Header{"Content-Type": []string{"text/any"}},
588 Body: io.NopCloser(bytes.NewReader(invalid)),
589 },
590 Error: true,
591 ErrFn: func(err error) bool {
592 return strings.Contains(err.Error(), "server has asked for the client to provide") && apierrors.IsUnauthorized(err)
593 },
594 },
595 {Response: &http.Response{StatusCode: http.StatusForbidden}, Error: true},
596 {Response: &http.Response{StatusCode: http.StatusOK, Body: io.NopCloser(bytes.NewReader(invalid))}, Data: invalid},
597 {Response: &http.Response{StatusCode: http.StatusOK, Body: io.NopCloser(bytes.NewReader(invalid))}, Data: invalid},
598 }
599 for i, test := range testCases {
600 r := NewRequestWithClient(uri, "", defaultContentConfig(), nil)
601 if test.Response.Body == nil {
602 test.Response.Body = io.NopCloser(bytes.NewReader([]byte{}))
603 }
604 result := r.transformResponse(test.Response, &http.Request{})
605 response, created, err := result.body, result.statusCode == http.StatusCreated, result.err
606 hasErr := err != nil
607 if hasErr != test.Error {
608 t.Errorf("%d: unexpected error: %t %v", i, test.Error, err)
609 } else if hasErr && test.Response.StatusCode > 399 {
610 status, ok := err.(apierrors.APIStatus)
611 if !ok {
612 t.Errorf("%d: response should have been transformable into APIStatus: %v", i, err)
613 continue
614 }
615 if int(status.Status().Code) != test.Response.StatusCode {
616 t.Errorf("%d: status code did not match response: %#v", i, status.Status())
617 }
618 }
619 if test.ErrFn != nil && !test.ErrFn(err) {
620 t.Errorf("%d: error function did not match: %v", i, err)
621 }
622 if !(test.Data == nil && response == nil) && !apiequality.Semantic.DeepDerivative(test.Data, response) {
623 t.Errorf("%d: unexpected response: %#v %#v", i, test.Data, response)
624 }
625 if test.Created != created {
626 t.Errorf("%d: expected created %t, got %t", i, test.Created, created)
627 }
628 }
629 }
630
631 type renegotiator struct {
632 called bool
633 contentType string
634 params map[string]string
635 decoder runtime.Decoder
636 err error
637 }
638
639 func (r *renegotiator) Decoder(contentType string, params map[string]string) (runtime.Decoder, error) {
640 r.called = true
641 r.contentType = contentType
642 r.params = params
643 return r.decoder, r.err
644 }
645
646 func (r *renegotiator) Encoder(contentType string, params map[string]string) (runtime.Encoder, error) {
647 return nil, fmt.Errorf("UNIMPLEMENTED")
648 }
649
650 func (r *renegotiator) StreamDecoder(contentType string, params map[string]string) (runtime.Decoder, runtime.Serializer, runtime.Framer, error) {
651 return nil, nil, nil, fmt.Errorf("UNIMPLEMENTED")
652 }
653
654 func TestTransformResponseNegotiate(t *testing.T) {
655 invalid := []byte("aaaaa")
656 uri, _ := url.Parse("http://localhost")
657 testCases := []struct {
658 Response *http.Response
659 Data []byte
660 Created bool
661 Error bool
662 ErrFn func(err error) bool
663
664 ContentType string
665 Called bool
666 ExpectContentType string
667 Decoder runtime.Decoder
668 NegotiateErr error
669 }{
670 {
671 ContentType: "application/json",
672 Response: &http.Response{
673 StatusCode: http.StatusUnauthorized,
674 Header: http.Header{"Content-Type": []string{"application/json"}},
675 Body: io.NopCloser(bytes.NewReader(invalid)),
676 },
677 Called: true,
678 ExpectContentType: "application/json",
679 Error: true,
680 ErrFn: func(err error) bool {
681 return err.Error() != "aaaaa" && apierrors.IsUnauthorized(err)
682 },
683 },
684 {
685 ContentType: "application/json",
686 Response: &http.Response{
687 StatusCode: http.StatusUnauthorized,
688 Header: http.Header{"Content-Type": []string{"application/protobuf"}},
689 Body: io.NopCloser(bytes.NewReader(invalid)),
690 },
691 Decoder: scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion),
692
693 Called: true,
694 ExpectContentType: "application/protobuf",
695
696 Error: true,
697 ErrFn: func(err error) bool {
698 return err.Error() != "aaaaa" && apierrors.IsUnauthorized(err)
699 },
700 },
701 {
702 ContentType: "application/json",
703 Response: &http.Response{
704 StatusCode: http.StatusInternalServerError,
705 Header: http.Header{"Content-Type": []string{"application/,others"}},
706 },
707 Decoder: scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion),
708
709 Error: true,
710 ErrFn: func(err error) bool {
711 return err.Error() == "Internal error occurred: mime: expected token after slash" && err.(apierrors.APIStatus).Status().Code == 500
712 },
713 },
714 {
715
716 Response: &http.Response{
717 StatusCode: http.StatusOK,
718 Header: http.Header{"Content-Type": []string{"text/any"}},
719 Body: io.NopCloser(bytes.NewReader(invalid)),
720 },
721 Decoder: scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion),
722 Called: true,
723 ExpectContentType: "text/any",
724 },
725 {
726
727 ContentType: "text/any",
728 Response: &http.Response{
729 StatusCode: http.StatusOK,
730 Body: io.NopCloser(bytes.NewReader(invalid)),
731 },
732 Decoder: scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion),
733 Called: true,
734 ExpectContentType: "text/any",
735 },
736 {
737
738 ContentType: "application/json",
739 Response: &http.Response{
740 StatusCode: http.StatusNotFound,
741 Header: http.Header{"Content-Type": []string{"application/unrecognized"}},
742 Body: io.NopCloser(bytes.NewReader(invalid)),
743 },
744 Decoder: scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion),
745
746 NegotiateErr: fmt.Errorf("aaaa"),
747 Called: true,
748 ExpectContentType: "application/unrecognized",
749
750 Error: true,
751 ErrFn: func(err error) bool {
752 return err.Error() != "aaaaa" && apierrors.IsNotFound(err)
753 },
754 },
755 }
756 for i, test := range testCases {
757 contentConfig := defaultContentConfig()
758 contentConfig.ContentType = test.ContentType
759 negotiator := &renegotiator{
760 decoder: test.Decoder,
761 err: test.NegotiateErr,
762 }
763 contentConfig.Negotiator = negotiator
764 r := NewRequestWithClient(uri, "", contentConfig, nil)
765 if test.Response.Body == nil {
766 test.Response.Body = io.NopCloser(bytes.NewReader([]byte{}))
767 }
768 result := r.transformResponse(test.Response, &http.Request{})
769 _, err := result.body, result.err
770 hasErr := err != nil
771 if hasErr != test.Error {
772 t.Errorf("%d: unexpected error: %t %v", i, test.Error, err)
773 continue
774 } else if hasErr && test.Response.StatusCode > 399 {
775 status, ok := err.(apierrors.APIStatus)
776 if !ok {
777 t.Errorf("%d: response should have been transformable into APIStatus: %v", i, err)
778 continue
779 }
780 if int(status.Status().Code) != test.Response.StatusCode {
781 t.Errorf("%d: status code did not match response: %#v", i, status.Status())
782 }
783 }
784 if test.ErrFn != nil && !test.ErrFn(err) {
785 t.Errorf("%d: error function did not match: %v", i, err)
786 }
787 if negotiator.called != test.Called {
788 t.Errorf("%d: negotiator called %t != %t", i, negotiator.called, test.Called)
789 }
790 if !test.Called {
791 continue
792 }
793 if negotiator.contentType != test.ExpectContentType {
794 t.Errorf("%d: unexpected content type: %s", i, negotiator.contentType)
795 }
796 }
797 }
798
799 func TestTransformUnstructuredError(t *testing.T) {
800 testCases := []struct {
801 Req *http.Request
802 Res *http.Response
803
804 Resource string
805 Name string
806
807 ErrFn func(error) bool
808 Transformed error
809 }{
810 {
811 Resource: "foo",
812 Name: "bar",
813 Req: &http.Request{
814 Method: "POST",
815 },
816 Res: &http.Response{
817 StatusCode: http.StatusConflict,
818 Body: io.NopCloser(bytes.NewReader(nil)),
819 },
820 ErrFn: apierrors.IsAlreadyExists,
821 },
822 {
823 Resource: "foo",
824 Name: "bar",
825 Req: &http.Request{
826 Method: "PUT",
827 },
828 Res: &http.Response{
829 StatusCode: http.StatusConflict,
830 Body: io.NopCloser(bytes.NewReader(nil)),
831 },
832 ErrFn: apierrors.IsConflict,
833 },
834 {
835 Resource: "foo",
836 Name: "bar",
837 Req: &http.Request{},
838 Res: &http.Response{
839 StatusCode: http.StatusNotFound,
840 Body: io.NopCloser(bytes.NewReader(nil)),
841 },
842 ErrFn: apierrors.IsNotFound,
843 },
844 {
845 Req: &http.Request{},
846 Res: &http.Response{
847 StatusCode: http.StatusBadRequest,
848 Body: io.NopCloser(bytes.NewReader(nil)),
849 },
850 ErrFn: apierrors.IsBadRequest,
851 },
852 {
853
854 Req: &http.Request{},
855 Res: &http.Response{StatusCode: http.StatusBadRequest, Body: io.NopCloser(bytes.NewReader([]byte(`{"kind":"Status","apiVersion":"v1","status":"Failure","code":404}`)))},
856 ErrFn: apierrors.IsBadRequest,
857 Transformed: &apierrors.StatusError{
858 ErrStatus: metav1.Status{Status: metav1.StatusFailure, Code: http.StatusNotFound},
859 },
860 },
861 {
862
863 Req: &http.Request{},
864 Res: &http.Response{StatusCode: http.StatusBadRequest, Body: io.NopCloser(bytes.NewReader([]byte(`{"kind":"Status","apiVersion":"v1","status":"Success","code":404}`)))},
865 ErrFn: apierrors.IsBadRequest,
866 },
867 {
868
869 Req: &http.Request{},
870 Res: &http.Response{StatusCode: http.StatusBadRequest, Body: io.NopCloser(bytes.NewReader([]byte(`{}`)))},
871 ErrFn: apierrors.IsBadRequest,
872 },
873 {
874
875
876 Req: &http.Request{},
877 Res: &http.Response{StatusCode: http.StatusBadRequest, Body: io.NopCloser(bytes.NewReader([]byte(`{"kind":"Status","status":"Failure","code":404}`)))},
878 ErrFn: apierrors.IsBadRequest,
879 Transformed: &apierrors.StatusError{
880 ErrStatus: metav1.Status{Status: metav1.StatusFailure, Code: http.StatusNotFound},
881 },
882 },
883 {
884
885 Req: &http.Request{},
886 Res: &http.Response{StatusCode: http.StatusBadRequest, Body: io.NopCloser(bytes.NewReader([]byte(`{"status":"Failure","code":404}`)))},
887 ErrFn: apierrors.IsBadRequest,
888 },
889 }
890
891 for _, testCase := range testCases {
892 t.Run("", func(t *testing.T) {
893 r := &Request{
894 c: &RESTClient{
895 content: defaultContentConfig(),
896 },
897 resourceName: testCase.Name,
898 resource: testCase.Resource,
899 }
900 result := r.transformResponse(testCase.Res, testCase.Req)
901 err := result.err
902 if !testCase.ErrFn(err) {
903 t.Fatalf("unexpected error: %v", err)
904 }
905 if !apierrors.IsUnexpectedServerError(err) {
906 t.Errorf("unexpected error type: %v", err)
907 }
908 if len(testCase.Name) != 0 && !strings.Contains(err.Error(), testCase.Name) {
909 t.Errorf("unexpected error string: %s", err)
910 }
911 if len(testCase.Resource) != 0 && !strings.Contains(err.Error(), testCase.Resource) {
912 t.Errorf("unexpected error string: %s", err)
913 }
914
915
916 transformed := result.Error()
917 expect := testCase.Transformed
918 if expect == nil {
919 expect = err
920 }
921 if !reflect.DeepEqual(expect, transformed) {
922 t.Errorf("unexpected Error(): %s", cmp.Diff(expect, transformed))
923 }
924
925
926 if _, err := result.Get(); !reflect.DeepEqual(expect, err) {
927 t.Errorf("unexpected error on Get(): %s", cmp.Diff(expect, err))
928 }
929
930
931 if err := result.Into(&v1.Pod{}); !reflect.DeepEqual(expect, err) {
932 t.Errorf("unexpected error on Into(): %s", cmp.Diff(expect, err))
933 }
934
935
936 if _, err := result.Raw(); !reflect.DeepEqual(result.err, err) {
937 t.Errorf("unexpected error on Raw(): %s", cmp.Diff(expect, err))
938 }
939 })
940 }
941 }
942
943 func TestRequestWatch(t *testing.T) {
944 testCases := []struct {
945 name string
946 Request *Request
947 maxRetries int
948 serverReturns []responseErr
949 Expect []watch.Event
950 attemptsExpected int
951 Err bool
952 ErrFn func(error) bool
953 Empty bool
954 }{
955 {
956 name: "Request has error",
957 Request: &Request{err: errors.New("bail")},
958 attemptsExpected: 0,
959 Err: true,
960 },
961 {
962 name: "Client is nil, should use http.DefaultClient",
963 Request: &Request{c: &RESTClient{base: &url.URL{}}, pathPrefix: "%"},
964 Err: true,
965 },
966 {
967 name: "error is not retryable",
968 Request: &Request{
969 c: &RESTClient{
970 base: &url.URL{},
971 },
972 },
973 serverReturns: []responseErr{
974 {response: nil, err: errors.New("err")},
975 },
976 attemptsExpected: 1,
977 Err: true,
978 },
979 {
980 name: "server returns forbidden",
981 Request: &Request{
982 c: &RESTClient{
983 content: defaultContentConfig(),
984 base: &url.URL{},
985 },
986 },
987 serverReturns: []responseErr{
988 {response: &http.Response{
989 StatusCode: http.StatusForbidden,
990 Body: io.NopCloser(bytes.NewReader([]byte{})),
991 }, err: nil},
992 },
993 attemptsExpected: 1,
994 Expect: []watch.Event{
995 {
996 Type: watch.Error,
997 Object: &metav1.Status{
998 Status: "Failure",
999 Code: 500,
1000 Reason: "InternalError",
1001 Message: `an error on the server ("unable to decode an event from the watch stream: test error") has prevented the request from succeeding`,
1002 Details: &metav1.StatusDetails{
1003 Causes: []metav1.StatusCause{
1004 {
1005 Type: "UnexpectedServerResponse",
1006 Message: "unable to decode an event from the watch stream: test error",
1007 },
1008 {
1009 Type: "ClientWatchDecoding",
1010 Message: "unable to decode an event from the watch stream: test error",
1011 },
1012 },
1013 },
1014 },
1015 },
1016 },
1017 Err: true,
1018 ErrFn: func(err error) bool {
1019 return apierrors.IsForbidden(err)
1020 },
1021 },
1022 {
1023 name: "server returns forbidden",
1024 Request: &Request{
1025 c: &RESTClient{
1026 content: defaultContentConfig(),
1027 base: &url.URL{},
1028 },
1029 },
1030 serverReturns: []responseErr{
1031 {response: &http.Response{
1032 StatusCode: http.StatusForbidden,
1033 Body: io.NopCloser(bytes.NewReader([]byte{})),
1034 }, err: nil},
1035 },
1036 attemptsExpected: 1,
1037 Err: true,
1038 ErrFn: func(err error) bool {
1039 return apierrors.IsForbidden(err)
1040 },
1041 },
1042 {
1043 name: "server returns unauthorized",
1044 Request: &Request{
1045 c: &RESTClient{
1046 content: defaultContentConfig(),
1047 base: &url.URL{},
1048 },
1049 },
1050 serverReturns: []responseErr{
1051 {response: &http.Response{
1052 StatusCode: http.StatusUnauthorized,
1053 Body: io.NopCloser(bytes.NewReader([]byte{})),
1054 }, err: nil},
1055 },
1056 attemptsExpected: 1,
1057 Err: true,
1058 ErrFn: func(err error) bool {
1059 return apierrors.IsUnauthorized(err)
1060 },
1061 },
1062 {
1063 name: "server returns unauthorized",
1064 Request: &Request{
1065 c: &RESTClient{
1066 content: defaultContentConfig(),
1067 base: &url.URL{},
1068 },
1069 },
1070 serverReturns: []responseErr{
1071 {response: &http.Response{
1072 StatusCode: http.StatusUnauthorized,
1073 Body: io.NopCloser(bytes.NewReader([]byte(runtime.EncodeOrDie(scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &metav1.Status{
1074 Status: metav1.StatusFailure,
1075 Reason: metav1.StatusReasonUnauthorized,
1076 })))),
1077 }, err: nil},
1078 },
1079 attemptsExpected: 1,
1080 Err: true,
1081 ErrFn: func(err error) bool {
1082 return apierrors.IsUnauthorized(err)
1083 },
1084 },
1085 {
1086 name: "server returns EOF error",
1087 Request: &Request{
1088 c: &RESTClient{
1089 base: &url.URL{},
1090 },
1091 },
1092 serverReturns: []responseErr{
1093 {response: nil, err: io.EOF},
1094 },
1095 attemptsExpected: 1,
1096 Empty: true,
1097 },
1098 {
1099 name: "server returns can't write HTTP request on broken connection error",
1100 Request: &Request{
1101 c: &RESTClient{
1102 base: &url.URL{},
1103 },
1104 },
1105 serverReturns: []responseErr{
1106 {response: nil, err: errors.New("http: can't write HTTP request on broken connection")},
1107 },
1108 attemptsExpected: 1,
1109 Empty: true,
1110 },
1111 {
1112 name: "server returns connection reset by peer",
1113 Request: &Request{
1114 c: &RESTClient{
1115 base: &url.URL{},
1116 },
1117 },
1118 serverReturns: []responseErr{
1119 {response: nil, err: errors.New("foo: connection reset by peer")},
1120 },
1121 attemptsExpected: 1,
1122 Empty: true,
1123 },
1124 {
1125 name: "max retries 2, server always returns EOF error",
1126 Request: &Request{
1127 c: &RESTClient{
1128 base: &url.URL{},
1129 },
1130 },
1131 maxRetries: 2,
1132 attemptsExpected: 3,
1133 serverReturns: []responseErr{
1134 {response: nil, err: io.EOF},
1135 {response: nil, err: io.EOF},
1136 {response: nil, err: io.EOF},
1137 },
1138 Empty: true,
1139 },
1140 {
1141 name: "max retries 2, server always returns a response with Retry-After header",
1142 Request: &Request{
1143 c: &RESTClient{
1144 base: &url.URL{},
1145 },
1146 },
1147 maxRetries: 2,
1148 attemptsExpected: 3,
1149 serverReturns: []responseErr{
1150 {response: retryAfterResponse(), err: nil},
1151 {response: retryAfterResponse(), err: nil},
1152 {response: retryAfterResponse(), err: nil},
1153 },
1154 Err: true,
1155 ErrFn: func(err error) bool {
1156 return apierrors.IsInternalError(err)
1157 },
1158 },
1159 }
1160
1161 for _, testCase := range testCases {
1162 t.Run(testCase.name, func(t *testing.T) {
1163 var attemptsGot int
1164 client := clientForFunc(func(req *http.Request) (*http.Response, error) {
1165 defer func() {
1166 attemptsGot++
1167 }()
1168
1169 if attemptsGot >= len(testCase.serverReturns) {
1170 t.Fatalf("Wrong test setup, the server does not know what to return")
1171 }
1172 re := testCase.serverReturns[attemptsGot]
1173 return re.response, re.err
1174 })
1175 if c := testCase.Request.c; c != nil && len(testCase.serverReturns) > 0 {
1176 c.Client = client
1177 }
1178 testCase.Request.backoff = &noSleepBackOff{}
1179 testCase.Request.maxRetries = testCase.maxRetries
1180 testCase.Request.retryFn = defaultRequestRetryFn
1181
1182 watch, err := testCase.Request.Watch(context.Background())
1183
1184 if watch == nil && err == nil {
1185 t.Fatal("Both watch.Interface and err returned by Watch are nil")
1186 }
1187 if testCase.attemptsExpected != attemptsGot {
1188 t.Errorf("Expected RoundTrip to be invoked %d times, but got: %d", testCase.attemptsExpected, attemptsGot)
1189 }
1190 hasErr := err != nil
1191 if hasErr != testCase.Err {
1192 t.Fatalf("expected %t, got %t: %v", testCase.Err, hasErr, err)
1193 }
1194 if testCase.ErrFn != nil && !testCase.ErrFn(err) {
1195 t.Errorf("error not valid: %v", err)
1196 }
1197 if hasErr && watch != nil {
1198 t.Fatalf("watch should be nil when error is returned")
1199 }
1200 if hasErr {
1201 return
1202 }
1203 defer watch.Stop()
1204 if testCase.Empty {
1205 evt, ok := <-watch.ResultChan()
1206 if ok {
1207 t.Errorf("expected the watch to be empty: %#v", evt)
1208 }
1209 }
1210 if testCase.Expect != nil {
1211 for i, evt := range testCase.Expect {
1212 out, ok := <-watch.ResultChan()
1213 if !ok {
1214 t.Fatalf("Watch closed early, %d/%d read", i, len(testCase.Expect))
1215 }
1216 if !reflect.DeepEqual(evt, out) {
1217 t.Fatalf("Event %d does not match: %s", i, cmp.Diff(evt, out))
1218 }
1219 }
1220 }
1221 })
1222 }
1223 }
1224
1225 func TestRequestStream(t *testing.T) {
1226 testCases := []struct {
1227 name string
1228 Request *Request
1229 maxRetries int
1230 serverReturns []responseErr
1231 attemptsExpected int
1232 Err bool
1233 ErrFn func(error) bool
1234 }{
1235 {
1236 name: "request has error",
1237 Request: &Request{err: errors.New("bail")},
1238 attemptsExpected: 0,
1239 Err: true,
1240 },
1241 {
1242 name: "Client is nil, should use http.DefaultClient",
1243 Request: &Request{c: &RESTClient{base: &url.URL{}}, pathPrefix: "%"},
1244 Err: true,
1245 },
1246 {
1247 name: "server returns an error",
1248 Request: &Request{
1249 c: &RESTClient{
1250 base: &url.URL{},
1251 },
1252 },
1253 serverReturns: []responseErr{
1254 {response: nil, err: errors.New("err")},
1255 },
1256 attemptsExpected: 1,
1257 Err: true,
1258 },
1259 {
1260 Request: &Request{
1261 c: &RESTClient{
1262 content: defaultContentConfig(),
1263 base: &url.URL{},
1264 },
1265 },
1266 serverReturns: []responseErr{
1267 {response: &http.Response{
1268 StatusCode: http.StatusUnauthorized,
1269 Body: io.NopCloser(bytes.NewReader([]byte(runtime.EncodeOrDie(scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &metav1.Status{
1270 Status: metav1.StatusFailure,
1271 Reason: metav1.StatusReasonUnauthorized,
1272 })))),
1273 }, err: nil},
1274 },
1275 attemptsExpected: 1,
1276 Err: true,
1277 },
1278 {
1279 Request: &Request{
1280 c: &RESTClient{
1281 content: defaultContentConfig(),
1282 base: &url.URL{},
1283 },
1284 },
1285 serverReturns: []responseErr{
1286 {response: &http.Response{
1287 StatusCode: http.StatusBadRequest,
1288 Body: io.NopCloser(bytes.NewReader([]byte(`{"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"a container name must be specified for pod kube-dns-v20-mz5cv, choose one of: [kubedns dnsmasq healthz]","reason":"BadRequest","code":400}`))),
1289 }, err: nil},
1290 },
1291 attemptsExpected: 1,
1292 Err: true,
1293 ErrFn: func(err error) bool {
1294 if err.Error() == "a container name must be specified for pod kube-dns-v20-mz5cv, choose one of: [kubedns dnsmasq healthz]" {
1295 return true
1296 }
1297 return false
1298 },
1299 },
1300 {
1301 name: "max retries 1, server returns a retry-after response, non-bytes request, no retry",
1302 Request: &Request{
1303 body: &readSeeker{err: io.EOF},
1304 c: &RESTClient{
1305 base: &url.URL{},
1306 },
1307 },
1308 maxRetries: 1,
1309 attemptsExpected: 1,
1310 serverReturns: []responseErr{
1311 {response: retryAfterResponse(), err: nil},
1312 },
1313 Err: true,
1314 },
1315 {
1316 name: "max retries 2, server always returns a response with Retry-After header",
1317 Request: &Request{
1318 c: &RESTClient{
1319 base: &url.URL{},
1320 },
1321 },
1322 maxRetries: 2,
1323 attemptsExpected: 3,
1324 serverReturns: []responseErr{
1325 {response: retryAfterResponse(), err: nil},
1326 {response: retryAfterResponse(), err: nil},
1327 {response: retryAfterResponse(), err: nil},
1328 },
1329 Err: true,
1330 ErrFn: func(err error) bool {
1331 return apierrors.IsInternalError(err)
1332 },
1333 },
1334 {
1335 name: "server returns EOF after attempt 1, retry aborted",
1336 Request: &Request{
1337 c: &RESTClient{
1338 base: &url.URL{},
1339 },
1340 },
1341 maxRetries: 2,
1342 attemptsExpected: 2,
1343 serverReturns: []responseErr{
1344 {response: retryAfterResponse(), err: nil},
1345 {response: nil, err: io.EOF},
1346 },
1347 Err: true,
1348 ErrFn: func(err error) bool {
1349 return unWrap(err) == io.EOF
1350 },
1351 },
1352 {
1353 name: "max retries 2, server returns success on the final attempt",
1354 Request: &Request{
1355 c: &RESTClient{
1356 base: &url.URL{},
1357 },
1358 },
1359 maxRetries: 2,
1360 attemptsExpected: 3,
1361 serverReturns: []responseErr{
1362 {response: retryAfterResponse(), err: nil},
1363 {response: retryAfterResponse(), err: nil},
1364 {response: &http.Response{
1365 StatusCode: http.StatusOK,
1366 Body: io.NopCloser(bytes.NewReader([]byte{})),
1367 }, err: nil},
1368 },
1369 },
1370 }
1371
1372 for _, testCase := range testCases {
1373 t.Run(testCase.name, func(t *testing.T) {
1374 var attemptsGot int
1375 client := clientForFunc(func(req *http.Request) (*http.Response, error) {
1376 defer func() {
1377 attemptsGot++
1378 }()
1379
1380 if attemptsGot >= len(testCase.serverReturns) {
1381 t.Fatalf("Wrong test setup, the server does not know what to return")
1382 }
1383 re := testCase.serverReturns[attemptsGot]
1384 return re.response, re.err
1385 })
1386 if c := testCase.Request.c; c != nil && len(testCase.serverReturns) > 0 {
1387 c.Client = client
1388 }
1389 testCase.Request.backoff = &noSleepBackOff{}
1390 testCase.Request.maxRetries = testCase.maxRetries
1391 testCase.Request.retryFn = defaultRequestRetryFn
1392
1393 body, err := testCase.Request.Stream(context.Background())
1394
1395 if body == nil && err == nil {
1396 t.Fatal("Both body and err returned by Stream are nil")
1397 }
1398 if testCase.attemptsExpected != attemptsGot {
1399 t.Errorf("Expected RoundTrip to be invoked %d times, but got: %d", testCase.attemptsExpected, attemptsGot)
1400 }
1401
1402 hasErr := err != nil
1403 if hasErr != testCase.Err {
1404 t.Errorf("expected %t, got %t: %v", testCase.Err, hasErr, err)
1405 }
1406 if hasErr && body != nil {
1407 t.Error("body should be nil when error is returned")
1408 }
1409
1410 if hasErr {
1411 if testCase.ErrFn != nil && !testCase.ErrFn(err) {
1412 t.Errorf("unexpected error: %#v", err)
1413 }
1414 }
1415 })
1416 }
1417 }
1418
1419 func TestRequestDo(t *testing.T) {
1420 testCases := []struct {
1421 Request *Request
1422 Err bool
1423 }{
1424 {
1425 Request: &Request{c: &RESTClient{}, err: errors.New("bail")},
1426 Err: true,
1427 },
1428 {
1429 Request: &Request{c: &RESTClient{base: &url.URL{}}, pathPrefix: "%"},
1430 Err: true,
1431 },
1432 {
1433 Request: &Request{
1434 c: &RESTClient{
1435 Client: clientForFunc(func(req *http.Request) (*http.Response, error) {
1436 return nil, errors.New("err")
1437 }),
1438 base: &url.URL{},
1439 },
1440 },
1441 Err: true,
1442 },
1443 }
1444 for i, testCase := range testCases {
1445 testCase.Request.backoff = &NoBackoff{}
1446 testCase.Request.retryFn = defaultRequestRetryFn
1447 body, err := testCase.Request.Do(context.Background()).Raw()
1448 hasErr := err != nil
1449 if hasErr != testCase.Err {
1450 t.Errorf("%d: expected %t, got %t: %v", i, testCase.Err, hasErr, err)
1451 }
1452 if hasErr && body != nil {
1453 t.Errorf("%d: body should be nil when error is returned", i)
1454 }
1455 }
1456 }
1457
1458 func TestDoRequestNewWay(t *testing.T) {
1459 reqBody := "request body"
1460 expectedObj := &v1.Service{Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{
1461 Protocol: "TCP",
1462 Port: 12345,
1463 TargetPort: intstr.FromInt32(12345),
1464 }}}}
1465 expectedBody, _ := runtime.Encode(scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), expectedObj)
1466 fakeHandler := utiltesting.FakeHandler{
1467 StatusCode: 200,
1468 ResponseBody: string(expectedBody),
1469 T: t,
1470 }
1471 testServer := httptest.NewServer(&fakeHandler)
1472 defer testServer.Close()
1473 c := testRESTClient(t, testServer)
1474 obj, err := c.Verb("POST").
1475 Prefix("foo", "bar").
1476 Suffix("baz").
1477 Timeout(time.Second).
1478 Body([]byte(reqBody)).
1479 Do(context.Background()).Get()
1480 if err != nil {
1481 t.Errorf("Unexpected error: %v %#v", err, err)
1482 return
1483 }
1484 if obj == nil {
1485 t.Error("nil obj")
1486 } else if !apiequality.Semantic.DeepDerivative(expectedObj, obj) {
1487 t.Errorf("Expected: %#v, got %#v", expectedObj, obj)
1488 }
1489 requestURL := defaultResourcePathWithPrefix("foo/bar", "", "", "baz")
1490 requestURL += "?timeout=1s"
1491 fakeHandler.ValidateRequest(t, requestURL, "POST", &reqBody)
1492 }
1493
1494
1495 func TestBackoffLifecycle(t *testing.T) {
1496 count := 0
1497 testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
1498 count++
1499 t.Logf("Attempt %d", count)
1500 if count == 5 || count == 9 {
1501 w.WriteHeader(http.StatusOK)
1502 return
1503 }
1504 w.WriteHeader(http.StatusGatewayTimeout)
1505 return
1506 }))
1507 defer testServer.Close()
1508 c := testRESTClient(t, testServer)
1509
1510
1511
1512 seconds := []int{0, 1, 2, 4, 8, 0, 1, 2, 4, 0}
1513 request := c.Verb("POST").Prefix("backofftest").Suffix("abc")
1514 clock := testingclock.FakeClock{}
1515 request.backoff = &URLBackoff{
1516
1517 Backoff: flowcontrol.NewFakeBackOff(
1518 time.Duration(1)*time.Second,
1519 time.Duration(200)*time.Second,
1520 &clock,
1521 )}
1522
1523 for _, sec := range seconds {
1524 thisBackoff := request.backoff.CalculateBackoff(request.URL())
1525 t.Logf("Current backoff %v", thisBackoff)
1526 if thisBackoff != time.Duration(sec)*time.Second {
1527 t.Errorf("Backoff is %v instead of %v", thisBackoff, sec)
1528 }
1529 now := clock.Now()
1530 request.DoRaw(context.Background())
1531 elapsed := clock.Since(now)
1532 if clock.Since(now) != thisBackoff {
1533 t.Errorf("CalculatedBackoff not honored by clock: Expected time of %v, but got %v ", thisBackoff, elapsed)
1534 }
1535 }
1536 }
1537
1538 type testBackoffManager struct {
1539 sleeps []time.Duration
1540 }
1541
1542 func (b *testBackoffManager) UpdateBackoff(actualUrl *url.URL, err error, responseCode int) {
1543 }
1544
1545 func (b *testBackoffManager) CalculateBackoff(actualUrl *url.URL) time.Duration {
1546 return time.Duration(0)
1547 }
1548
1549 func (b *testBackoffManager) Sleep(d time.Duration) {
1550 b.sleeps = append(b.sleeps, d)
1551 }
1552
1553 func TestCheckRetryClosesBody(t *testing.T) {
1554 count := 0
1555 ch := make(chan struct{})
1556 testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
1557 count++
1558 t.Logf("attempt %d", count)
1559 if count >= 5 {
1560 w.WriteHeader(http.StatusOK)
1561 close(ch)
1562 return
1563 }
1564 w.Header().Set("Retry-After", "1")
1565 http.Error(w, "Too many requests, please try again later.", http.StatusTooManyRequests)
1566 }))
1567 defer testServer.Close()
1568
1569 backoff := &testBackoffManager{}
1570
1571
1572 expectedSleeps := []time.Duration{0, time.Second, time.Second, time.Second, time.Second}
1573
1574 c := testRESTClient(t, testServer)
1575 c.createBackoffMgr = func() BackoffManager { return backoff }
1576 _, err := c.Verb("POST").
1577 Prefix("foo", "bar").
1578 Suffix("baz").
1579 Timeout(time.Second).
1580 Body([]byte(strings.Repeat("abcd", 1000))).
1581 DoRaw(context.Background())
1582 if err != nil {
1583 t.Fatalf("Unexpected error: %v %#v", err, err)
1584 }
1585 <-ch
1586 if count != 5 {
1587 t.Errorf("unexpected retries: %d", count)
1588 }
1589 if !reflect.DeepEqual(backoff.sleeps, expectedSleeps) {
1590 t.Errorf("unexpected sleeps, expected: %v, got: %v", expectedSleeps, backoff.sleeps)
1591 }
1592 }
1593
1594 func TestConnectionResetByPeerIsRetried(t *testing.T) {
1595 count := 0
1596 backoff := &testBackoffManager{}
1597 req := &Request{
1598 verb: "GET",
1599 c: &RESTClient{
1600 Client: clientForFunc(func(req *http.Request) (*http.Response, error) {
1601 count++
1602 if count >= 3 {
1603 return &http.Response{
1604 StatusCode: http.StatusOK,
1605 Body: io.NopCloser(bytes.NewReader([]byte{})),
1606 }, nil
1607 }
1608 return nil, &net.OpError{Err: syscall.ECONNRESET}
1609 }),
1610 },
1611 backoff: backoff,
1612 maxRetries: 10,
1613 retryFn: defaultRequestRetryFn,
1614 }
1615
1616 _, err := req.Do(context.Background()).Raw()
1617 if err != nil {
1618 t.Errorf("Unexpected error: %v", err)
1619 }
1620 if count != 3 {
1621 t.Errorf("Expected 3 attempts, got: %d", count)
1622 }
1623
1624 if len(backoff.sleeps) != 3 {
1625 t.Errorf("Expected 3 backoff.Sleep, got: %d", len(backoff.sleeps))
1626 }
1627 }
1628
1629 func TestCheckRetryHandles429And5xx(t *testing.T) {
1630 count := 0
1631 ch := make(chan struct{})
1632 testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
1633 data, err := io.ReadAll(req.Body)
1634 if err != nil {
1635 t.Fatalf("unable to read request body: %v", err)
1636 }
1637 if !bytes.Equal(data, []byte(strings.Repeat("abcd", 1000))) {
1638 t.Fatalf("retry did not send a complete body: %s", data)
1639 }
1640 t.Logf("attempt %d", count)
1641 if count >= 4 {
1642 w.WriteHeader(http.StatusOK)
1643 close(ch)
1644 return
1645 }
1646 w.Header().Set("Retry-After", "0")
1647 w.WriteHeader([]int{http.StatusTooManyRequests, 500, 501, 504}[count])
1648 count++
1649 }))
1650 defer testServer.Close()
1651
1652 c := testRESTClient(t, testServer)
1653 _, err := c.Verb("POST").
1654 Prefix("foo", "bar").
1655 Suffix("baz").
1656 Timeout(time.Second).
1657 Body([]byte(strings.Repeat("abcd", 1000))).
1658 DoRaw(context.Background())
1659 if err != nil {
1660 t.Fatalf("Unexpected error: %v %#v", err, err)
1661 }
1662 <-ch
1663 if count != 4 {
1664 t.Errorf("unexpected retries: %d", count)
1665 }
1666 }
1667
1668 func BenchmarkCheckRetryClosesBody(b *testing.B) {
1669 count := 0
1670 testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
1671 count++
1672 if count%3 == 0 {
1673 w.WriteHeader(http.StatusOK)
1674 return
1675 }
1676 w.Header().Set("Retry-After", "0")
1677 w.WriteHeader(http.StatusTooManyRequests)
1678 }))
1679 defer testServer.Close()
1680
1681 c := testRESTClient(b, testServer)
1682
1683 requests := make([]*Request, 0, b.N)
1684 for i := 0; i < b.N; i++ {
1685 requests = append(requests, c.Verb("POST").
1686 Prefix("foo", "bar").
1687 Suffix("baz").
1688 Timeout(time.Second).
1689 Body([]byte(strings.Repeat("abcd", 1000))))
1690 }
1691
1692 b.ResetTimer()
1693 for i := 0; i < b.N; i++ {
1694 if _, err := requests[i].DoRaw(context.Background()); err != nil {
1695 b.Fatalf("Unexpected error (%d/%d): %v", i, b.N, err)
1696 }
1697 }
1698 }
1699
1700 func TestDoRequestNewWayReader(t *testing.T) {
1701 reqObj := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
1702 reqBodyExpected, _ := runtime.Encode(scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), reqObj)
1703 expectedObj := &v1.Service{Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{
1704 Protocol: "TCP",
1705 Port: 12345,
1706 TargetPort: intstr.FromInt32(12345),
1707 }}}}
1708 expectedBody, _ := runtime.Encode(scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), expectedObj)
1709 fakeHandler := utiltesting.FakeHandler{
1710 StatusCode: 200,
1711 ResponseBody: string(expectedBody),
1712 T: t,
1713 }
1714 testServer := httptest.NewServer(&fakeHandler)
1715 defer testServer.Close()
1716 c := testRESTClient(t, testServer)
1717 obj, err := c.Verb("POST").
1718 Resource("bar").
1719 Name("baz").
1720 Prefix("foo").
1721 Timeout(time.Second).
1722 Body(bytes.NewBuffer(reqBodyExpected)).
1723 Do(context.Background()).Get()
1724 if err != nil {
1725 t.Errorf("Unexpected error: %v %#v", err, err)
1726 return
1727 }
1728 if obj == nil {
1729 t.Error("nil obj")
1730 } else if !apiequality.Semantic.DeepDerivative(expectedObj, obj) {
1731 t.Errorf("Expected: %#v, got %#v", expectedObj, obj)
1732 }
1733 tmpStr := string(reqBodyExpected)
1734 requestURL := defaultResourcePathWithPrefix("foo", "bar", "", "baz")
1735 requestURL += "?timeout=1s"
1736 fakeHandler.ValidateRequest(t, requestURL, "POST", &tmpStr)
1737 }
1738
1739 func TestDoRequestNewWayObj(t *testing.T) {
1740 reqObj := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
1741 reqBodyExpected, _ := runtime.Encode(scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), reqObj)
1742 expectedObj := &v1.Service{Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{
1743 Protocol: "TCP",
1744 Port: 12345,
1745 TargetPort: intstr.FromInt32(12345),
1746 }}}}
1747 expectedBody, _ := runtime.Encode(scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), expectedObj)
1748 fakeHandler := utiltesting.FakeHandler{
1749 StatusCode: 200,
1750 ResponseBody: string(expectedBody),
1751 T: t,
1752 }
1753 testServer := httptest.NewServer(&fakeHandler)
1754 defer testServer.Close()
1755 c := testRESTClient(t, testServer)
1756 obj, err := c.Verb("POST").
1757 Suffix("baz").
1758 Name("bar").
1759 Resource("foo").
1760 Timeout(time.Second).
1761 Body(reqObj).
1762 Do(context.Background()).Get()
1763 if err != nil {
1764 t.Errorf("Unexpected error: %v %#v", err, err)
1765 return
1766 }
1767 if obj == nil {
1768 t.Error("nil obj")
1769 } else if !apiequality.Semantic.DeepDerivative(expectedObj, obj) {
1770 t.Errorf("Expected: %#v, got %#v", expectedObj, obj)
1771 }
1772 tmpStr := string(reqBodyExpected)
1773 requestURL := defaultResourcePathWithPrefix("", "foo", "", "bar/baz")
1774 requestURL += "?timeout=1s"
1775 fakeHandler.ValidateRequest(t, requestURL, "POST", &tmpStr)
1776 }
1777
1778 func TestDoRequestNewWayFile(t *testing.T) {
1779 reqObj := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
1780 reqBodyExpected, err := runtime.Encode(scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), reqObj)
1781 if err != nil {
1782 t.Errorf("unexpected error: %v", err)
1783 }
1784
1785 file, err := os.CreateTemp("", "foo")
1786 if err != nil {
1787 t.Errorf("unexpected error: %v", err)
1788 }
1789 defer file.Close()
1790 defer os.Remove(file.Name())
1791
1792 _, err = file.Write(reqBodyExpected)
1793 if err != nil {
1794 t.Errorf("unexpected error: %v", err)
1795 }
1796
1797 expectedObj := &v1.Service{Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{
1798 Protocol: "TCP",
1799 Port: 12345,
1800 TargetPort: intstr.FromInt32(12345),
1801 }}}}
1802 expectedBody, _ := runtime.Encode(scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), expectedObj)
1803 fakeHandler := utiltesting.FakeHandler{
1804 StatusCode: 200,
1805 ResponseBody: string(expectedBody),
1806 T: t,
1807 }
1808 testServer := httptest.NewServer(&fakeHandler)
1809 defer testServer.Close()
1810 c := testRESTClient(t, testServer)
1811 wasCreated := true
1812 obj, err := c.Verb("POST").
1813 Prefix("foo/bar", "baz").
1814 Timeout(time.Second).
1815 Body(file.Name()).
1816 Do(context.Background()).WasCreated(&wasCreated).Get()
1817 if err != nil {
1818 t.Errorf("Unexpected error: %v %#v", err, err)
1819 return
1820 }
1821 if obj == nil {
1822 t.Error("nil obj")
1823 } else if !apiequality.Semantic.DeepDerivative(expectedObj, obj) {
1824 t.Errorf("Expected: %#v, got %#v", expectedObj, obj)
1825 }
1826 if wasCreated {
1827 t.Errorf("expected object was created")
1828 }
1829 tmpStr := string(reqBodyExpected)
1830 requestURL := defaultResourcePathWithPrefix("foo/bar/baz", "", "", "")
1831 requestURL += "?timeout=1s"
1832 fakeHandler.ValidateRequest(t, requestURL, "POST", &tmpStr)
1833 }
1834
1835 func TestWasCreated(t *testing.T) {
1836 reqObj := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
1837 reqBodyExpected, err := runtime.Encode(scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), reqObj)
1838 if err != nil {
1839 t.Errorf("unexpected error: %v", err)
1840 }
1841
1842 expectedObj := &v1.Service{Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{
1843 Protocol: "TCP",
1844 Port: 12345,
1845 TargetPort: intstr.FromInt32(12345),
1846 }}}}
1847 expectedBody, _ := runtime.Encode(scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), expectedObj)
1848 fakeHandler := utiltesting.FakeHandler{
1849 StatusCode: 201,
1850 ResponseBody: string(expectedBody),
1851 T: t,
1852 }
1853 testServer := httptest.NewServer(&fakeHandler)
1854 defer testServer.Close()
1855 c := testRESTClient(t, testServer)
1856 wasCreated := false
1857 obj, err := c.Verb("PUT").
1858 Prefix("foo/bar", "baz").
1859 Timeout(time.Second).
1860 Body(reqBodyExpected).
1861 Do(context.Background()).WasCreated(&wasCreated).Get()
1862 if err != nil {
1863 t.Errorf("Unexpected error: %v %#v", err, err)
1864 return
1865 }
1866 if obj == nil {
1867 t.Error("nil obj")
1868 } else if !apiequality.Semantic.DeepDerivative(expectedObj, obj) {
1869 t.Errorf("Expected: %#v, got %#v", expectedObj, obj)
1870 }
1871 if !wasCreated {
1872 t.Errorf("Expected object was created")
1873 }
1874
1875 tmpStr := string(reqBodyExpected)
1876 requestURL := defaultResourcePathWithPrefix("foo/bar/baz", "", "", "")
1877 requestURL += "?timeout=1s"
1878 fakeHandler.ValidateRequest(t, requestURL, "PUT", &tmpStr)
1879 }
1880
1881 func TestVerbs(t *testing.T) {
1882 c := testRESTClient(t, nil)
1883 if r := c.Post(); r.verb != "POST" {
1884 t.Errorf("Post verb is wrong")
1885 }
1886 if r := c.Put(); r.verb != "PUT" {
1887 t.Errorf("Put verb is wrong")
1888 }
1889 if r := c.Get(); r.verb != "GET" {
1890 t.Errorf("Get verb is wrong")
1891 }
1892 if r := c.Delete(); r.verb != "DELETE" {
1893 t.Errorf("Delete verb is wrong")
1894 }
1895 }
1896
1897 func TestAbsPath(t *testing.T) {
1898 for i, tc := range []struct {
1899 configPrefix string
1900 resourcePrefix string
1901 absPath string
1902 wantsAbsPath string
1903 }{
1904 {"/", "", "", "/"},
1905 {"", "", "/", "/"},
1906 {"", "", "/api", "/api"},
1907 {"", "", "/api/", "/api/"},
1908 {"", "", "/apis", "/apis"},
1909 {"", "/foo", "/bar/foo", "/bar/foo"},
1910 {"", "/api/foo/123", "/bar/foo", "/bar/foo"},
1911 {"/p1", "", "", "/p1"},
1912 {"/p1", "", "/", "/p1/"},
1913 {"/p1", "", "/api", "/p1/api"},
1914 {"/p1", "", "/apis", "/p1/apis"},
1915 {"/p1", "/r1", "/apis", "/p1/apis"},
1916 {"/p1", "/api/r1", "/apis", "/p1/apis"},
1917 {"/p1/api/p2", "", "", "/p1/api/p2"},
1918 {"/p1/api/p2", "", "/", "/p1/api/p2/"},
1919 {"/p1/api/p2", "", "/api", "/p1/api/p2/api"},
1920 {"/p1/api/p2", "", "/api/", "/p1/api/p2/api/"},
1921 {"/p1/api/p2", "/r1", "/api/", "/p1/api/p2/api/"},
1922 {"/p1/api/p2", "/api/r1", "/api/", "/p1/api/p2/api/"},
1923 } {
1924 u, _ := url.Parse("http://localhost:123" + tc.configPrefix)
1925 r := NewRequestWithClient(u, "", ClientContentConfig{GroupVersion: schema.GroupVersion{Group: "test"}}, nil).Verb("POST").Prefix(tc.resourcePrefix).AbsPath(tc.absPath)
1926 if r.pathPrefix != tc.wantsAbsPath {
1927 t.Errorf("test case %d failed, unexpected path: %q, expected %q", i, r.pathPrefix, tc.wantsAbsPath)
1928 }
1929 }
1930 }
1931
1932 func TestUnacceptableParamNames(t *testing.T) {
1933 table := []struct {
1934 name string
1935 testVal string
1936 expectSuccess bool
1937 }{
1938
1939 {"timeout", "42", true},
1940 }
1941
1942 for _, item := range table {
1943 c := testRESTClient(t, nil)
1944 r := c.Get().setParam(item.name, item.testVal)
1945 if e, a := item.expectSuccess, r.err == nil; e != a {
1946 t.Errorf("expected %v, got %v (%v)", e, a, r.err)
1947 }
1948 }
1949 }
1950
1951 func TestBody(t *testing.T) {
1952 const data = "test payload"
1953
1954 obj := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
1955 bodyExpected, _ := runtime.Encode(scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), obj)
1956
1957 f, err := os.CreateTemp("", "test_body")
1958 if err != nil {
1959 t.Fatalf("TempFile error: %v", err)
1960 }
1961 if _, err := f.WriteString(data); err != nil {
1962 t.Fatalf("TempFile.WriteString error: %v", err)
1963 }
1964 f.Close()
1965 defer os.Remove(f.Name())
1966
1967 var nilObject *metav1.DeleteOptions
1968 typedObject := interface{}(nilObject)
1969 c := testRESTClient(t, nil)
1970 tests := []struct {
1971 input interface{}
1972 expected string
1973 headers map[string]string
1974 }{
1975 {[]byte(data), data, nil},
1976 {f.Name(), data, nil},
1977 {strings.NewReader(data), data, nil},
1978 {obj, string(bodyExpected), map[string]string{"Content-Type": "application/json"}},
1979 {typedObject, "", nil},
1980 }
1981 for i, tt := range tests {
1982 r := c.Post().Body(tt.input)
1983 if r.err != nil {
1984 t.Errorf("%d: r.Body(%#v) error: %v", i, tt, r.err)
1985 continue
1986 }
1987 if tt.headers != nil {
1988 for k, v := range tt.headers {
1989 if r.headers.Get(k) != v {
1990 t.Errorf("%d: r.headers[%q] = %q; want %q", i, k, v, v)
1991 }
1992 }
1993 }
1994
1995 req, err := r.newHTTPRequest(context.Background())
1996 if err != nil {
1997 t.Fatal(err)
1998 }
1999 if req.Body == nil {
2000 if len(tt.expected) != 0 {
2001 t.Errorf("%d: req.Body = %q; want %q", i, req.Body, tt.expected)
2002 }
2003 continue
2004 }
2005 buf := make([]byte, len(tt.expected))
2006 if _, err := req.Body.Read(buf); err != nil {
2007 t.Errorf("%d: req.Body.Read error: %v", i, err)
2008 continue
2009 }
2010 body := string(buf)
2011 if body != tt.expected {
2012 t.Errorf("%d: req.Body = %q; want %q", i, body, tt.expected)
2013 }
2014 }
2015 }
2016
2017 func TestWatch(t *testing.T) {
2018 tests := []struct {
2019 name string
2020 maxRetries int
2021 }{
2022 {
2023 name: "no retry",
2024 maxRetries: 0,
2025 },
2026 {
2027 name: "with retries",
2028 maxRetries: 3,
2029 },
2030 }
2031
2032 for _, test := range tests {
2033 t.Run(test.name, func(t *testing.T) {
2034 var table = []struct {
2035 t watch.EventType
2036 obj runtime.Object
2037 }{
2038 {watch.Added, &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "first"}}},
2039 {watch.Modified, &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "second"}}},
2040 {watch.Deleted, &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "last"}}},
2041 }
2042
2043 var attempts int
2044 testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
2045 defer func() {
2046 attempts++
2047 }()
2048
2049 flusher, ok := w.(http.Flusher)
2050 if !ok {
2051 panic("need flusher!")
2052 }
2053
2054 if attempts < test.maxRetries {
2055 w.Header().Set("Retry-After", "1")
2056 w.WriteHeader(http.StatusTooManyRequests)
2057 return
2058 }
2059
2060 w.Header().Set("Transfer-Encoding", "chunked")
2061 w.WriteHeader(http.StatusOK)
2062 flusher.Flush()
2063
2064 encoder := restclientwatch.NewEncoder(streaming.NewEncoder(w, scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion)), scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion))
2065 for _, item := range table {
2066 if err := encoder.Encode(&watch.Event{Type: item.t, Object: item.obj}); err != nil {
2067 panic(err)
2068 }
2069 flusher.Flush()
2070 }
2071 }))
2072 defer testServer.Close()
2073
2074 s := testRESTClient(t, testServer)
2075 watching, err := s.Get().Prefix("path/to/watch/thing").
2076 MaxRetries(test.maxRetries).Watch(context.Background())
2077 if err != nil {
2078 t.Fatalf("Unexpected error: %v", err)
2079 }
2080
2081 for _, item := range table {
2082 got, ok := <-watching.ResultChan()
2083 if !ok {
2084 t.Fatalf("Unexpected early close")
2085 }
2086 if e, a := item.t, got.Type; e != a {
2087 t.Errorf("Expected %v, got %v", e, a)
2088 }
2089 if e, a := item.obj, got.Object; !apiequality.Semantic.DeepDerivative(e, a) {
2090 t.Errorf("Expected %v, got %v", e, a)
2091 }
2092 }
2093
2094 _, ok := <-watching.ResultChan()
2095 if ok {
2096 t.Fatal("Unexpected non-close")
2097 }
2098 })
2099 }
2100 }
2101
2102 func TestWatchNonDefaultContentType(t *testing.T) {
2103 var table = []struct {
2104 t watch.EventType
2105 obj runtime.Object
2106 }{
2107 {watch.Added, &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "first"}}},
2108 {watch.Modified, &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "second"}}},
2109 {watch.Deleted, &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "last"}}},
2110 }
2111
2112 testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
2113 flusher, ok := w.(http.Flusher)
2114 if !ok {
2115 panic("need flusher!")
2116 }
2117
2118 w.Header().Set("Transfer-Encoding", "chunked")
2119
2120 w.Header().Set("Content-Type", "application/json")
2121 w.WriteHeader(http.StatusOK)
2122 flusher.Flush()
2123
2124 encoder := restclientwatch.NewEncoder(streaming.NewEncoder(w, scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion)), scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion))
2125 for _, item := range table {
2126 if err := encoder.Encode(&watch.Event{Type: item.t, Object: item.obj}); err != nil {
2127 panic(err)
2128 }
2129 flusher.Flush()
2130 }
2131 }))
2132 defer testServer.Close()
2133
2134
2135 contentConfig := defaultContentConfig()
2136 contentConfig.ContentType = "application/vnd.kubernetes.protobuf"
2137 s := testRESTClientWithConfig(t, testServer, contentConfig)
2138 watching, err := s.Get().Prefix("path/to/watch/thing").Watch(context.Background())
2139 if err != nil {
2140 t.Fatalf("Unexpected error")
2141 }
2142
2143 for _, item := range table {
2144 got, ok := <-watching.ResultChan()
2145 if !ok {
2146 t.Fatalf("Unexpected early close")
2147 }
2148 if e, a := item.t, got.Type; e != a {
2149 t.Errorf("Expected %v, got %v", e, a)
2150 }
2151 if e, a := item.obj, got.Object; !apiequality.Semantic.DeepDerivative(e, a) {
2152 t.Errorf("Expected %v, got %v", e, a)
2153 }
2154 }
2155
2156 _, ok := <-watching.ResultChan()
2157 if ok {
2158 t.Fatal("Unexpected non-close")
2159 }
2160 }
2161
2162 func TestWatchUnknownContentType(t *testing.T) {
2163 var table = []struct {
2164 t watch.EventType
2165 obj runtime.Object
2166 }{
2167 {watch.Added, &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "first"}}},
2168 {watch.Modified, &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "second"}}},
2169 {watch.Deleted, &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "last"}}},
2170 }
2171
2172 testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
2173 flusher, ok := w.(http.Flusher)
2174 if !ok {
2175 panic("need flusher!")
2176 }
2177
2178 w.Header().Set("Transfer-Encoding", "chunked")
2179
2180 w.Header().Set("Content-Type", "foobar")
2181 w.WriteHeader(http.StatusOK)
2182 flusher.Flush()
2183
2184 encoder := restclientwatch.NewEncoder(streaming.NewEncoder(w, scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion)), scheme.Codecs.LegacyCodec(v1.SchemeGroupVersion))
2185 for _, item := range table {
2186 if err := encoder.Encode(&watch.Event{Type: item.t, Object: item.obj}); err != nil {
2187 panic(err)
2188 }
2189 flusher.Flush()
2190 }
2191 }))
2192 defer testServer.Close()
2193
2194 s := testRESTClient(t, testServer)
2195 _, err := s.Get().Prefix("path/to/watch/thing").Watch(context.Background())
2196 if err == nil {
2197 t.Fatalf("Expected to fail due to lack of known stream serialization for content type")
2198 }
2199 }
2200
2201 func TestStream(t *testing.T) {
2202 expectedBody := "expected body"
2203
2204 testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
2205 flusher, ok := w.(http.Flusher)
2206 if !ok {
2207 panic("need flusher!")
2208 }
2209 w.Header().Set("Transfer-Encoding", "chunked")
2210 w.WriteHeader(http.StatusOK)
2211 w.Write([]byte(expectedBody))
2212 flusher.Flush()
2213 }))
2214 defer testServer.Close()
2215
2216 s := testRESTClient(t, testServer)
2217 readCloser, err := s.Get().Prefix("path/to/stream/thing").Stream(context.Background())
2218 if err != nil {
2219 t.Fatalf("unexpected error: %v", err)
2220 }
2221 defer readCloser.Close()
2222 buf := new(bytes.Buffer)
2223 buf.ReadFrom(readCloser)
2224 resultBody := buf.String()
2225
2226 if expectedBody != resultBody {
2227 t.Errorf("Expected %s, got %s", expectedBody, resultBody)
2228 }
2229 }
2230
2231 func testRESTClientWithConfig(t testing.TB, srv *httptest.Server, contentConfig ClientContentConfig) *RESTClient {
2232 base, _ := url.Parse("http://localhost")
2233 var c *http.Client
2234 if srv != nil {
2235 var err error
2236 base, err = url.Parse(srv.URL)
2237 if err != nil {
2238 t.Fatalf("failed to parse test URL: %v", err)
2239 }
2240 c = srv.Client()
2241 }
2242 versionedAPIPath := defaultResourcePathWithPrefix("", "", "", "")
2243 client, err := NewRESTClient(base, versionedAPIPath, contentConfig, nil, c)
2244 if err != nil {
2245 t.Fatalf("failed to create a client: %v", err)
2246 }
2247 return client
2248
2249 }
2250
2251 func testRESTClient(t testing.TB, srv *httptest.Server) *RESTClient {
2252 contentConfig := defaultContentConfig()
2253 return testRESTClientWithConfig(t, srv, contentConfig)
2254 }
2255
2256 func TestDoContext(t *testing.T) {
2257 receivedCh := make(chan struct{})
2258 block := make(chan struct{})
2259 testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
2260 close(receivedCh)
2261 <-block
2262 w.WriteHeader(http.StatusOK)
2263 }))
2264 defer testServer.Close()
2265 defer close(block)
2266
2267 ctx, cancel := context.WithCancel(context.Background())
2268 defer cancel()
2269
2270 go func() {
2271 <-receivedCh
2272 cancel()
2273 }()
2274
2275 c := testRESTClient(t, testServer)
2276 _, err := c.Verb("GET").
2277 Prefix("foo").
2278 DoRaw(ctx)
2279 if err == nil {
2280 t.Fatal("Expected context cancellation error")
2281 }
2282 }
2283
2284 func buildString(length int) string {
2285 s := make([]byte, length)
2286 for i := range s {
2287 s[i] = 'a'
2288 }
2289 return string(s)
2290 }
2291
2292 func init() {
2293 klog.InitFlags(nil)
2294 }
2295
2296 func TestTruncateBody(t *testing.T) {
2297 tests := []struct {
2298 body string
2299 want string
2300 level string
2301 }{
2302
2303 {
2304 body: "Completely truncated below 8",
2305 want: " [truncated 28 chars]",
2306 level: "0",
2307 },
2308
2309 {
2310 body: "Small body never gets truncated",
2311 want: "Small body never gets truncated",
2312 level: "10",
2313 },
2314 {
2315 body: "Small body never gets truncated",
2316 want: "Small body never gets truncated",
2317 level: "8",
2318 },
2319
2320 {
2321 body: buildString(2000),
2322 level: "8",
2323 want: fmt.Sprintf("%s [truncated 976 chars]", buildString(1024)),
2324 },
2325
2326 {
2327 body: buildString(20000),
2328 level: "9",
2329 want: fmt.Sprintf("%s [truncated 9760 chars]", buildString(10240)),
2330 },
2331
2332 {
2333 body: buildString(20000),
2334 level: "10",
2335 want: buildString(20000),
2336 },
2337
2338 {
2339 body: buildString(20000),
2340 level: "11",
2341 want: buildString(20000),
2342 },
2343 }
2344
2345 l := flag.Lookup("v").Value.(flag.Getter).Get().(klog.Level)
2346 for _, test := range tests {
2347 flag.Set("v", test.level)
2348 got := truncateBody(test.body)
2349 if got != test.want {
2350 t.Errorf("truncateBody(%v) = %v, want %v", test.body, got, test.want)
2351 }
2352 }
2353 flag.Set("v", l.String())
2354 }
2355
2356 func defaultResourcePathWithPrefix(prefix, resource, namespace, name string) string {
2357 var path string
2358 path = "/api/" + v1.SchemeGroupVersion.Version
2359
2360 if prefix != "" {
2361 path = path + "/" + prefix
2362 }
2363 if namespace != "" {
2364 path = path + "/namespaces/" + namespace
2365 }
2366
2367 resource = strings.ToLower(resource)
2368 if resource != "" {
2369 path = path + "/" + resource
2370 }
2371 if name != "" {
2372 path = path + "/" + name
2373 }
2374 return path
2375 }
2376
2377 func TestRequestPreflightCheck(t *testing.T) {
2378 for _, tt := range []struct {
2379 name string
2380 verbs []string
2381 namespace string
2382 resourceName string
2383 namespaceSet bool
2384 expectsErr bool
2385 }{
2386 {
2387 name: "no namespace set",
2388 verbs: []string{"GET", "PUT", "DELETE", "POST"},
2389 namespaceSet: false,
2390 expectsErr: false,
2391 },
2392 {
2393 name: "empty resource name and namespace",
2394 verbs: []string{"GET", "PUT", "DELETE"},
2395 namespaceSet: true,
2396 expectsErr: false,
2397 },
2398 {
2399 name: "resource name with empty namespace",
2400 verbs: []string{"GET", "PUT", "DELETE"},
2401 namespaceSet: true,
2402 resourceName: "ResourceName",
2403 expectsErr: true,
2404 },
2405 {
2406 name: "post empty resource name and namespace",
2407 verbs: []string{"POST"},
2408 namespaceSet: true,
2409 expectsErr: true,
2410 },
2411 {
2412 name: "working requests",
2413 verbs: []string{"GET", "PUT", "DELETE", "POST"},
2414 namespaceSet: true,
2415 resourceName: "ResourceName",
2416 namespace: "Namespace",
2417 expectsErr: false,
2418 },
2419 } {
2420 t.Run(tt.name, func(t *testing.T) {
2421 for _, verb := range tt.verbs {
2422 r := &Request{
2423 verb: verb,
2424 namespace: tt.namespace,
2425 resourceName: tt.resourceName,
2426 namespaceSet: tt.namespaceSet,
2427 }
2428
2429 err := r.requestPreflightCheck()
2430 hasErr := err != nil
2431 if hasErr == tt.expectsErr {
2432 return
2433 }
2434 t.Errorf("%s: expects error: %v, has error: %v", verb, tt.expectsErr, hasErr)
2435 }
2436 })
2437 }
2438 }
2439
2440 func TestThrottledLogger(t *testing.T) {
2441 now := time.Now()
2442 oldClock := globalThrottledLogger.clock
2443 defer func() {
2444 globalThrottledLogger.clock = oldClock
2445 }()
2446 clock := testingclock.NewFakeClock(now)
2447 globalThrottledLogger.clock = clock
2448
2449 logMessages := 0
2450 for i := 0; i < 1000; i++ {
2451 var wg sync.WaitGroup
2452 wg.Add(10)
2453 for j := 0; j < 10; j++ {
2454 go func() {
2455 if _, ok := globalThrottledLogger.attemptToLog(); ok {
2456 logMessages++
2457 }
2458 wg.Done()
2459 }()
2460 }
2461 wg.Wait()
2462 now = now.Add(1 * time.Second)
2463 clock.SetTime(now)
2464 }
2465
2466 if a, e := logMessages, 100; a != e {
2467 t.Fatalf("expected %v log messages, but got %v", e, a)
2468 }
2469 }
2470
2471 func TestRequestMaxRetries(t *testing.T) {
2472 successAtNthCalls := 1
2473 actualCalls := 0
2474 retryOneTimeHandler := func(w http.ResponseWriter, req *http.Request) {
2475 defer func() { actualCalls++ }()
2476 if actualCalls >= successAtNthCalls {
2477 w.WriteHeader(http.StatusOK)
2478 return
2479 }
2480 w.Header().Set("Retry-After", "1")
2481 w.WriteHeader(http.StatusTooManyRequests)
2482 actualCalls++
2483 }
2484 testServer := httptest.NewServer(http.HandlerFunc(retryOneTimeHandler))
2485 defer testServer.Close()
2486
2487 u, err := url.Parse(testServer.URL)
2488 if err != nil {
2489 t.Error(err)
2490 }
2491
2492 testCases := []struct {
2493 name string
2494 maxRetries int
2495 expectError bool
2496 }{
2497 {
2498 name: "no retrying should fail",
2499 maxRetries: 0,
2500 expectError: true,
2501 },
2502 {
2503 name: "1 max-retry should exactly work",
2504 maxRetries: 1,
2505 expectError: false,
2506 },
2507 {
2508 name: "5 max-retry should work",
2509 maxRetries: 5,
2510 expectError: false,
2511 },
2512 }
2513
2514 for _, testCase := range testCases {
2515 t.Run(testCase.name, func(t *testing.T) {
2516 defer func() { actualCalls = 0 }()
2517 _, err := NewRequestWithClient(u, "", defaultContentConfig(), testServer.Client()).
2518 Verb("get").
2519 MaxRetries(testCase.maxRetries).
2520 AbsPath("/foo").
2521 DoRaw(context.TODO())
2522 hasError := err != nil
2523 if testCase.expectError != hasError {
2524 t.Error(" failed checking error")
2525 }
2526 })
2527 }
2528 }
2529
2530 type responseErr struct {
2531 response *http.Response
2532 err error
2533 }
2534
2535 type seek struct {
2536 offset int64
2537 whence int
2538 }
2539
2540 type count struct {
2541
2542 seeks []seek
2543
2544
2545 lock sync.Mutex
2546 closes int
2547 }
2548
2549 func (c *count) close() {
2550 c.lock.Lock()
2551 defer c.lock.Unlock()
2552 c.closes++
2553 }
2554 func (c *count) getCloseCount() int {
2555 c.lock.Lock()
2556 defer c.lock.Unlock()
2557 return c.closes
2558 }
2559
2560
2561 type readTracker struct {
2562 delegated io.Reader
2563 count *count
2564 }
2565
2566 func (r *readTracker) Seek(offset int64, whence int) (int64, error) {
2567 if seeker, ok := r.delegated.(io.Seeker); ok {
2568 r.count.seeks = append(r.count.seeks, seek{offset: offset, whence: whence})
2569 return seeker.Seek(offset, whence)
2570 }
2571 return 0, io.EOF
2572 }
2573
2574 func (r *readTracker) Read(p []byte) (n int, err error) {
2575 return r.delegated.Read(p)
2576 }
2577
2578 func (r *readTracker) Close() error {
2579 if closer, ok := r.delegated.(io.Closer); ok {
2580 r.count.close()
2581 return closer.Close()
2582 }
2583 return nil
2584 }
2585
2586 func newReadTracker(count *count) *readTracker {
2587 return &readTracker{
2588 count: count,
2589 }
2590 }
2591
2592 func newCount() *count {
2593 return &count{
2594 closes: 0,
2595 seeks: make([]seek, 0),
2596 }
2597 }
2598
2599 type readSeeker struct{ err error }
2600
2601 func (rs readSeeker) Read([]byte) (int, error) { return 0, rs.err }
2602 func (rs readSeeker) Seek(int64, int) (int64, error) { return 0, rs.err }
2603
2604 func unWrap(err error) error {
2605 if uerr, ok := err.(*url.Error); ok {
2606 return uerr.Err
2607 }
2608 return err
2609 }
2610
2611
2612
2613 type noSleepBackOff struct {
2614 *NoBackoff
2615 }
2616
2617 func (n *noSleepBackOff) Sleep(d time.Duration) {}
2618
2619 func TestRequestWithRetry(t *testing.T) {
2620 tests := []struct {
2621 name string
2622 body io.Reader
2623 bodyBytes []byte
2624 serverReturns responseErr
2625 errExpected error
2626 errContains string
2627 transformFuncInvokedExpected int
2628 roundTripInvokedExpected int
2629 }{
2630 {
2631 name: "server returns retry-after response, no request body, retry goes ahead",
2632 bodyBytes: nil,
2633 serverReturns: responseErr{response: retryAfterResponse(), err: nil},
2634 errExpected: nil,
2635 transformFuncInvokedExpected: 1,
2636 roundTripInvokedExpected: 2,
2637 },
2638 {
2639 name: "server returns retry-after response, bytes request body, retry goes ahead",
2640 bodyBytes: []byte{},
2641 serverReturns: responseErr{response: retryAfterResponse(), err: nil},
2642 errExpected: nil,
2643 transformFuncInvokedExpected: 1,
2644 roundTripInvokedExpected: 2,
2645 },
2646 {
2647 name: "server returns retry-after response, opaque request body, retry aborted",
2648 body: &readSeeker{},
2649 serverReturns: responseErr{response: retryAfterResponse(), err: nil},
2650 errExpected: nil,
2651 transformFuncInvokedExpected: 1,
2652 roundTripInvokedExpected: 1,
2653 },
2654 {
2655 name: "server returns retryable err, no request body, retry goes ahead",
2656 bodyBytes: nil,
2657 serverReturns: responseErr{response: nil, err: io.ErrUnexpectedEOF},
2658 errExpected: io.ErrUnexpectedEOF,
2659 transformFuncInvokedExpected: 0,
2660 roundTripInvokedExpected: 2,
2661 },
2662 {
2663 name: "server returns retryable err, bytes request body, retry goes ahead",
2664 bodyBytes: []byte{},
2665 serverReturns: responseErr{response: nil, err: io.ErrUnexpectedEOF},
2666 errExpected: io.ErrUnexpectedEOF,
2667 transformFuncInvokedExpected: 0,
2668 roundTripInvokedExpected: 2,
2669 },
2670 {
2671 name: "server returns retryable err, opaque request body, retry aborted",
2672 body: &readSeeker{},
2673 serverReturns: responseErr{response: nil, err: io.ErrUnexpectedEOF},
2674 errExpected: io.ErrUnexpectedEOF,
2675 transformFuncInvokedExpected: 0,
2676 roundTripInvokedExpected: 1,
2677 },
2678 }
2679
2680 for _, test := range tests {
2681 t.Run(test.name, func(t *testing.T) {
2682 var roundTripInvoked int
2683 client := clientForFunc(func(req *http.Request) (*http.Response, error) {
2684 roundTripInvoked++
2685 return test.serverReturns.response, test.serverReturns.err
2686 })
2687
2688 req := &Request{
2689 verb: "GET",
2690 body: test.body,
2691 c: &RESTClient{
2692 Client: client,
2693 },
2694 backoff: &noSleepBackOff{},
2695 maxRetries: 1,
2696 retryFn: defaultRequestRetryFn,
2697 }
2698
2699 var transformFuncInvoked int
2700 err := req.request(context.Background(), func(request *http.Request, response *http.Response) {
2701 transformFuncInvoked++
2702 })
2703
2704 if test.roundTripInvokedExpected != roundTripInvoked {
2705 t.Errorf("Expected RoundTrip to be invoked %d times, but got: %d", test.roundTripInvokedExpected, roundTripInvoked)
2706 }
2707 if test.transformFuncInvokedExpected != transformFuncInvoked {
2708 t.Errorf("Expected transform func to be invoked %d times, but got: %d", test.transformFuncInvokedExpected, transformFuncInvoked)
2709 }
2710 switch {
2711 case test.errExpected != nil:
2712 if test.errExpected != unWrap(err) {
2713 t.Errorf("Expected error: %v, but got: %v", test.errExpected, unWrap(err))
2714 }
2715 case len(test.errContains) > 0:
2716 if !strings.Contains(err.Error(), test.errContains) {
2717 t.Errorf("Expected error message to caontain: %q, but got: %q", test.errContains, err.Error())
2718 }
2719 }
2720 })
2721 }
2722 }
2723
2724 func TestRequestDoWithRetry(t *testing.T) {
2725 testRequestWithRetry(t, "Do", func(ctx context.Context, r *Request) {
2726 r.Do(ctx)
2727 })
2728 }
2729
2730 func TestRequestDoRawWithRetry(t *testing.T) {
2731
2732 testRequestWithRetry(t, "Do", func(ctx context.Context, r *Request) {
2733 r.DoRaw(ctx)
2734 })
2735 }
2736
2737 func TestRequestStreamWithRetry(t *testing.T) {
2738 testRequestWithRetry(t, "Stream", func(ctx context.Context, r *Request) {
2739 r.Stream(ctx)
2740 })
2741 }
2742
2743 func TestRequestWatchWithRetry(t *testing.T) {
2744 testRequestWithRetry(t, "Watch", func(ctx context.Context, r *Request) {
2745 w, err := r.Watch(ctx)
2746 if err == nil {
2747
2748
2749
2750
2751
2752
2753 <-w.ResultChan()
2754 }
2755 })
2756 }
2757
2758 func TestRequestDoRetryWithRateLimiterBackoffAndMetrics(t *testing.T) {
2759
2760 testRetryWithRateLimiterBackoffAndMetrics(t, "Do", func(ctx context.Context, r *Request) {
2761 r.DoRaw(ctx)
2762 })
2763 }
2764
2765 func TestRequestStreamRetryWithRateLimiterBackoffAndMetrics(t *testing.T) {
2766 testRetryWithRateLimiterBackoffAndMetrics(t, "Stream", func(ctx context.Context, r *Request) {
2767 r.Stream(ctx)
2768 })
2769 }
2770
2771 func TestRequestWatchRetryWithRateLimiterBackoffAndMetrics(t *testing.T) {
2772 testRetryWithRateLimiterBackoffAndMetrics(t, "Watch", func(ctx context.Context, r *Request) {
2773 w, err := r.Watch(ctx)
2774 if err == nil {
2775
2776
2777
2778
2779
2780
2781 <-w.ResultChan()
2782 }
2783 })
2784 }
2785
2786 func TestRequestDoWithRetryInvokeOrder(t *testing.T) {
2787
2788 testWithRetryInvokeOrder(t, "Do", func(ctx context.Context, r *Request) {
2789 r.DoRaw(ctx)
2790 })
2791 }
2792
2793 func TestRequestStreamWithRetryInvokeOrder(t *testing.T) {
2794 testWithRetryInvokeOrder(t, "Stream", func(ctx context.Context, r *Request) {
2795 r.Stream(ctx)
2796 })
2797 }
2798
2799 func TestRequestWatchWithRetryInvokeOrder(t *testing.T) {
2800 testWithRetryInvokeOrder(t, "Watch", func(ctx context.Context, r *Request) {
2801 w, err := r.Watch(ctx)
2802 if err == nil {
2803
2804
2805
2806
2807
2808
2809 <-w.ResultChan()
2810 }
2811 })
2812 }
2813
2814 func TestRequestWatchWithWrapPreviousError(t *testing.T) {
2815 testWithWrapPreviousError(t, func(ctx context.Context, r *Request) error {
2816 w, err := r.Watch(ctx)
2817 if err == nil {
2818
2819
2820
2821
2822
2823
2824 <-w.ResultChan()
2825 }
2826 return err
2827 })
2828 }
2829
2830 func TestRequestDoWithWrapPreviousError(t *testing.T) {
2831
2832 testWithWrapPreviousError(t, func(ctx context.Context, r *Request) error {
2833 result := r.Do(ctx)
2834 return result.err
2835 })
2836 }
2837
2838 func testRequestWithRetry(t *testing.T, key string, doFunc func(ctx context.Context, r *Request)) {
2839 type expected struct {
2840 attempts int
2841 reqCount *count
2842 respCount *count
2843 }
2844
2845 tests := []struct {
2846 name string
2847 verb string
2848 body io.Reader
2849 bodyBytes []byte
2850 maxRetries int
2851 serverReturns []responseErr
2852
2853
2854 expectations map[string]expected
2855 }{
2856 {
2857 name: "server always returns retry-after response",
2858 verb: "GET",
2859 bodyBytes: []byte{},
2860 maxRetries: 2,
2861 serverReturns: []responseErr{
2862 {response: retryAfterResponse(), err: nil},
2863 {response: retryAfterResponse(), err: nil},
2864 {response: retryAfterResponse(), err: nil},
2865 },
2866 expectations: map[string]expected{
2867 "Do": {
2868 attempts: 3,
2869 reqCount: &count{closes: 0, seeks: make([]seek, 2)},
2870 respCount: &count{closes: 3, seeks: []seek{}},
2871 },
2872 "Watch": {
2873 attempts: 3,
2874 reqCount: &count{closes: 0, seeks: make([]seek, 2)},
2875 respCount: &count{closes: 3, seeks: []seek{}},
2876 },
2877 "Stream": {
2878 attempts: 3,
2879 reqCount: &count{closes: 0, seeks: make([]seek, 2)},
2880 respCount: &count{closes: 3, seeks: []seek{}},
2881 },
2882 },
2883 },
2884 {
2885 name: "server always returns retryable error",
2886 verb: "GET",
2887 bodyBytes: []byte{},
2888 maxRetries: 2,
2889 serverReturns: []responseErr{
2890 {response: nil, err: io.EOF},
2891 {response: nil, err: io.EOF},
2892 {response: nil, err: io.EOF},
2893 },
2894 expectations: map[string]expected{
2895 "Do": {
2896 attempts: 3,
2897 reqCount: &count{closes: 0, seeks: make([]seek, 2)},
2898 respCount: &count{closes: 0, seeks: []seek{}},
2899 },
2900 "Watch": {
2901 attempts: 3,
2902 reqCount: &count{closes: 0, seeks: make([]seek, 2)},
2903 respCount: &count{closes: 0, seeks: []seek{}},
2904 },
2905
2906 "Stream": {
2907 attempts: 1,
2908 reqCount: &count{closes: 0, seeks: []seek{}},
2909 respCount: &count{closes: 0, seeks: []seek{}},
2910 },
2911 },
2912 },
2913 {
2914 name: "server returns success on the final retry",
2915 verb: "GET",
2916 bodyBytes: []byte{},
2917 maxRetries: 2,
2918 serverReturns: []responseErr{
2919 {response: retryAfterResponse(), err: nil},
2920 {response: nil, err: io.EOF},
2921 {response: &http.Response{StatusCode: http.StatusOK}, err: nil},
2922 },
2923 expectations: map[string]expected{
2924 "Do": {
2925 attempts: 3,
2926 reqCount: &count{closes: 0, seeks: make([]seek, 2)},
2927 respCount: &count{closes: 2, seeks: []seek{}},
2928 },
2929 "Watch": {
2930 attempts: 3,
2931 reqCount: &count{closes: 0, seeks: make([]seek, 2)},
2932
2933
2934 respCount: &count{closes: 2, seeks: []seek{}},
2935 },
2936 "Stream": {
2937 attempts: 2,
2938 reqCount: &count{closes: 0, seeks: make([]seek, 1)},
2939 respCount: &count{closes: 1, seeks: []seek{}},
2940 },
2941 },
2942 },
2943 }
2944
2945 for _, test := range tests {
2946 t.Run(test.name, func(t *testing.T) {
2947 respCountGot := newCount()
2948 responseRecorder := newReadTracker(respCountGot)
2949 var attempts int
2950 client := clientForFunc(func(req *http.Request) (*http.Response, error) {
2951 defer func() {
2952 attempts++
2953 }()
2954
2955 resp := test.serverReturns[attempts].response
2956 if resp != nil {
2957 responseRecorder.delegated = io.NopCloser(bytes.NewReader([]byte{}))
2958 resp.Body = responseRecorder
2959 }
2960 return resp, test.serverReturns[attempts].err
2961 })
2962
2963 req := &Request{
2964 verb: test.verb,
2965 body: test.body,
2966 bodyBytes: test.bodyBytes,
2967 c: &RESTClient{
2968 content: defaultContentConfig(),
2969 Client: client,
2970 },
2971 backoff: &noSleepBackOff{},
2972 maxRetries: test.maxRetries,
2973 retryFn: defaultRequestRetryFn,
2974 }
2975
2976 doFunc(context.Background(), req)
2977
2978 expected, ok := test.expectations[key]
2979 if !ok {
2980 t.Fatalf("Wrong test setup - did not find expected for: %s", key)
2981 }
2982 if expected.attempts != attempts {
2983 t.Errorf("Expected retries: %d, but got: %d", expected.attempts, attempts)
2984 }
2985
2986 if expected.respCount.closes != respCountGot.getCloseCount() {
2987 t.Errorf("Expected response body Close to be invoked %d times, but got: %d", expected.respCount.closes, respCountGot.getCloseCount())
2988 }
2989 })
2990 }
2991 }
2992
2993 type retryTestKeyType int
2994
2995 const retryTestKey retryTestKeyType = iota
2996
2997
2998
2999
3000
3001
3002 type withRateLimiterBackoffManagerAndMetrics struct {
3003 flowcontrol.RateLimiter
3004 *NoBackoff
3005 metrics.ResultMetric
3006 calculateBackoffSeq int64
3007 calculateBackoffFn func(i int64) time.Duration
3008 metrics.RetryMetric
3009
3010 invokeOrderGot []string
3011 sleepsGot []string
3012 statusCodesGot []string
3013 }
3014
3015 func (lb *withRateLimiterBackoffManagerAndMetrics) Wait(ctx context.Context) error {
3016 lb.invokeOrderGot = append(lb.invokeOrderGot, "RateLimiter.Wait")
3017 return nil
3018 }
3019
3020 func (lb *withRateLimiterBackoffManagerAndMetrics) CalculateBackoff(actualUrl *url.URL) time.Duration {
3021 lb.invokeOrderGot = append(lb.invokeOrderGot, "BackoffManager.CalculateBackoff")
3022
3023 waitFor := lb.calculateBackoffFn(lb.calculateBackoffSeq)
3024 lb.calculateBackoffSeq++
3025 return waitFor
3026 }
3027
3028 func (lb *withRateLimiterBackoffManagerAndMetrics) UpdateBackoff(actualUrl *url.URL, err error, responseCode int) {
3029 lb.invokeOrderGot = append(lb.invokeOrderGot, "BackoffManager.UpdateBackoff")
3030 }
3031
3032 func (lb *withRateLimiterBackoffManagerAndMetrics) Sleep(d time.Duration) {
3033 lb.invokeOrderGot = append(lb.invokeOrderGot, "BackoffManager.Sleep")
3034 lb.sleepsGot = append(lb.sleepsGot, d.String())
3035 }
3036
3037 func (lb *withRateLimiterBackoffManagerAndMetrics) Increment(ctx context.Context, code, _, _ string) {
3038
3039 if marked, ok := ctx.Value(retryTestKey).(bool); ok && marked {
3040 lb.invokeOrderGot = append(lb.invokeOrderGot, "RequestResult.Increment")
3041 lb.statusCodesGot = append(lb.statusCodesGot, code)
3042 }
3043 }
3044
3045 func (lb *withRateLimiterBackoffManagerAndMetrics) IncrementRetry(ctx context.Context, code, _, _ string) {
3046
3047 if marked, ok := ctx.Value(retryTestKey).(bool); ok && marked {
3048 lb.invokeOrderGot = append(lb.invokeOrderGot, "RequestRetry.IncrementRetry")
3049 lb.statusCodesGot = append(lb.statusCodesGot, code)
3050 }
3051 }
3052
3053 func (lb *withRateLimiterBackoffManagerAndMetrics) Do() {
3054 lb.invokeOrderGot = append(lb.invokeOrderGot, "Client.Do")
3055 }
3056
3057 func testRetryWithRateLimiterBackoffAndMetrics(t *testing.T, key string, doFunc func(ctx context.Context, r *Request)) {
3058 type expected struct {
3059 attempts int
3060 order []string
3061 sleeps []string
3062 statusCodes []string
3063 }
3064
3065
3066
3067
3068
3069
3070
3071 invokeOrderWant := []string{
3072
3073
3074
3075 "RateLimiter.Wait",
3076 "BackoffManager.CalculateBackoff",
3077 "BackoffManager.Sleep",
3078
3079
3080
3081 "Client.Do",
3082
3083
3084
3085
3086 "RequestResult.Increment",
3087 "BackoffManager.UpdateBackoff",
3088 "BackoffManager.CalculateBackoff",
3089
3090 "BackoffManager.Sleep",
3091
3092 "RateLimiter.Wait",
3093
3094
3095 "Client.Do",
3096
3097
3098
3099
3100 "RequestResult.Increment",
3101 "RequestRetry.IncrementRetry",
3102 "BackoffManager.UpdateBackoff",
3103 }
3104 statusCodesWant := []string{
3105
3106 "500",
3107
3108 "200", "200",
3109 }
3110
3111 tests := []struct {
3112 name string
3113 maxRetries int
3114 serverReturns []responseErr
3115 calculateBackoffFn func(i int64) time.Duration
3116
3117 expectations map[string]expected
3118 }{
3119 {
3120 name: "success after one retry, Retry-After: N > BackoffManager.CalculateBackoff",
3121 maxRetries: 1,
3122 serverReturns: []responseErr{
3123 {response: retryAfterResponseWithDelay("5"), err: nil},
3124 {response: &http.Response{StatusCode: http.StatusOK}, err: nil},
3125 },
3126
3127 calculateBackoffFn: func(i int64) time.Duration { return time.Duration(i * int64(time.Second)) },
3128 expectations: map[string]expected{
3129 "Do": {
3130 attempts: 2,
3131 order: invokeOrderWant,
3132 statusCodes: statusCodesWant,
3133 sleeps: []string{
3134
3135 "0s",
3136
3137
3138
3139 (5 * time.Second).String(),
3140 },
3141 },
3142 "Watch": {
3143 attempts: 2,
3144
3145 order: invokeOrderWant[1:],
3146 statusCodes: statusCodesWant,
3147 sleeps: []string{
3148 "0s",
3149 (5 * time.Second).String(),
3150 },
3151 },
3152 "Stream": {
3153 attempts: 2,
3154 order: invokeOrderWant,
3155 statusCodes: statusCodesWant,
3156 sleeps: []string{
3157 "0s",
3158 (5 * time.Second).String(),
3159 },
3160 },
3161 },
3162 },
3163 {
3164 name: "success after one retry, Retry-After: N < BackoffManager.CalculateBackoff",
3165 maxRetries: 1,
3166 serverReturns: []responseErr{
3167 {response: retryAfterResponseWithDelay("2"), err: nil},
3168 {response: &http.Response{StatusCode: http.StatusOK}, err: nil},
3169 },
3170
3171 calculateBackoffFn: func(i int64) time.Duration { return time.Duration(i * int64(4*time.Second)) },
3172 expectations: map[string]expected{
3173 "Do": {
3174 attempts: 2,
3175 order: invokeOrderWant,
3176 statusCodes: statusCodesWant,
3177 sleeps: []string{
3178
3179 "0s",
3180
3181
3182
3183 (4 * time.Second).String(),
3184 },
3185 },
3186 "Watch": {
3187 attempts: 2,
3188
3189 order: invokeOrderWant[1:],
3190 statusCodes: statusCodesWant,
3191 sleeps: []string{
3192 "0s",
3193 (4 * time.Second).String(),
3194 },
3195 },
3196 "Stream": {
3197 attempts: 2,
3198 order: invokeOrderWant,
3199 statusCodes: statusCodesWant,
3200 sleeps: []string{
3201 "0s",
3202 (4 * time.Second).String(),
3203 },
3204 },
3205 },
3206 },
3207 }
3208
3209 for _, test := range tests {
3210 t.Run(test.name, func(t *testing.T) {
3211 interceptor := &withRateLimiterBackoffManagerAndMetrics{
3212 RateLimiter: flowcontrol.NewFakeAlwaysRateLimiter(),
3213 NoBackoff: &NoBackoff{},
3214 calculateBackoffFn: test.calculateBackoffFn,
3215 }
3216
3217
3218
3219
3220
3221
3222 oldRequestResult := metrics.RequestResult
3223 oldRequestRetry := metrics.RequestRetry
3224 metrics.RequestResult = interceptor
3225 metrics.RequestRetry = interceptor
3226 defer func() {
3227 metrics.RequestResult = oldRequestResult
3228 metrics.RequestRetry = oldRequestRetry
3229 }()
3230
3231 ctx, cancel := context.WithCancel(context.Background())
3232 defer cancel()
3233
3234
3235
3236
3237 ctx = context.WithValue(ctx, retryTestKey, true)
3238
3239 var attempts int
3240 client := clientForFunc(func(req *http.Request) (*http.Response, error) {
3241 defer func() {
3242 attempts++
3243 }()
3244
3245 interceptor.Do()
3246 resp := test.serverReturns[attempts].response
3247 if resp != nil {
3248 resp.Body = io.NopCloser(bytes.NewReader([]byte{}))
3249 }
3250 return resp, test.serverReturns[attempts].err
3251 })
3252
3253 base, err := url.Parse("http://foo.bar")
3254 if err != nil {
3255 t.Fatalf("Wrong test setup - did not find expected for: %s", key)
3256 }
3257 req := &Request{
3258 verb: "GET",
3259 bodyBytes: []byte{},
3260 c: &RESTClient{
3261 base: base,
3262 content: defaultContentConfig(),
3263 Client: client,
3264 rateLimiter: interceptor,
3265 },
3266 pathPrefix: "/api/v1",
3267 rateLimiter: interceptor,
3268 backoff: interceptor,
3269 maxRetries: test.maxRetries,
3270 retryFn: defaultRequestRetryFn,
3271 }
3272
3273 doFunc(ctx, req)
3274
3275 want, ok := test.expectations[key]
3276 if !ok {
3277 t.Fatalf("Wrong test setup - did not find expected for: %s", key)
3278 }
3279 if want.attempts != attempts {
3280 t.Errorf("%s: Expected retries: %d, but got: %d", key, want.attempts, attempts)
3281 }
3282 if !cmp.Equal(want.order, interceptor.invokeOrderGot) {
3283 t.Errorf("%s: Expected invoke order to match, diff: %s", key, cmp.Diff(want.order, interceptor.invokeOrderGot))
3284 }
3285 if !cmp.Equal(want.sleeps, interceptor.sleepsGot) {
3286 t.Errorf("%s: Expected sleep sequence to match, diff: %s", key, cmp.Diff(want.sleeps, interceptor.sleepsGot))
3287 }
3288 if !cmp.Equal(want.statusCodes, interceptor.statusCodesGot) {
3289 t.Errorf("%s: Expected status codes to match, diff: %s", key, cmp.Diff(want.statusCodes, interceptor.statusCodesGot))
3290 }
3291 })
3292 }
3293 }
3294
3295 type retryInterceptor struct {
3296 WithRetry
3297 invokeOrderGot []string
3298 }
3299
3300 func (ri *retryInterceptor) IsNextRetry(ctx context.Context, restReq *Request, httpReq *http.Request, resp *http.Response, err error, f IsRetryableErrorFunc) bool {
3301 ri.invokeOrderGot = append(ri.invokeOrderGot, "WithRetry.IsNextRetry")
3302 return ri.WithRetry.IsNextRetry(ctx, restReq, httpReq, resp, err, f)
3303 }
3304
3305 func (ri *retryInterceptor) Before(ctx context.Context, request *Request) error {
3306 ri.invokeOrderGot = append(ri.invokeOrderGot, "WithRetry.Before")
3307 return ri.WithRetry.Before(ctx, request)
3308 }
3309
3310 func (ri *retryInterceptor) After(ctx context.Context, request *Request, resp *http.Response, err error) {
3311 ri.invokeOrderGot = append(ri.invokeOrderGot, "WithRetry.After")
3312 ri.WithRetry.After(ctx, request, resp, err)
3313 }
3314
3315 func (ri *retryInterceptor) Do() {
3316 ri.invokeOrderGot = append(ri.invokeOrderGot, "Client.Do")
3317 }
3318
3319 func testWithRetryInvokeOrder(t *testing.T, key string, doFunc func(ctx context.Context, r *Request)) {
3320
3321
3322
3323
3324
3325
3326 defaultInvokeOrderWant := []string{
3327
3328 "WithRetry.Before",
3329 "Client.Do",
3330 "WithRetry.After",
3331
3332
3333 "WithRetry.IsNextRetry",
3334
3335
3336 "WithRetry.Before",
3337 "Client.Do",
3338 "WithRetry.After",
3339
3340
3341
3342 "WithRetry.IsNextRetry",
3343 }
3344
3345 tests := []struct {
3346 name string
3347 maxRetries int
3348 serverReturns []responseErr
3349
3350 expectations map[string][]string
3351 }{
3352 {
3353 name: "success after one retry",
3354 maxRetries: 1,
3355 serverReturns: []responseErr{
3356 {response: retryAfterResponse(), err: nil},
3357 {response: &http.Response{StatusCode: http.StatusOK}, err: nil},
3358 },
3359 expectations: map[string][]string{
3360 "Do": defaultInvokeOrderWant,
3361
3362
3363 "Watch": defaultInvokeOrderWant[0 : len(defaultInvokeOrderWant)-1],
3364 "Stream": defaultInvokeOrderWant[0 : len(defaultInvokeOrderWant)-1],
3365 },
3366 },
3367 }
3368
3369 for _, test := range tests {
3370 t.Run(test.name, func(t *testing.T) {
3371 interceptor := &retryInterceptor{
3372 WithRetry: &withRetry{maxRetries: test.maxRetries},
3373 }
3374
3375 var attempts int
3376 client := clientForFunc(func(req *http.Request) (*http.Response, error) {
3377 defer func() {
3378 attempts++
3379 }()
3380
3381 interceptor.Do()
3382 resp := test.serverReturns[attempts].response
3383 if resp != nil {
3384 resp.Body = io.NopCloser(bytes.NewReader([]byte{}))
3385 }
3386 return resp, test.serverReturns[attempts].err
3387 })
3388
3389 base, err := url.Parse("http://foo.bar")
3390 if err != nil {
3391 t.Fatalf("Wrong test setup - did not find expected for: %s", key)
3392 }
3393 req := &Request{
3394 verb: "GET",
3395 bodyBytes: []byte{},
3396 c: &RESTClient{
3397 base: base,
3398 content: defaultContentConfig(),
3399 Client: client,
3400 },
3401 pathPrefix: "/api/v1",
3402 rateLimiter: flowcontrol.NewFakeAlwaysRateLimiter(),
3403 backoff: &NoBackoff{},
3404 retryFn: func(_ int) WithRetry { return interceptor },
3405 }
3406
3407 doFunc(context.Background(), req)
3408
3409 if attempts != 2 {
3410 t.Errorf("%s: Expected attempts: %d, but got: %d", key, 2, attempts)
3411 }
3412 invokeOrderWant, ok := test.expectations[key]
3413 if !ok {
3414 t.Fatalf("Wrong test setup - did not find expected for: %s", key)
3415 }
3416 if !cmp.Equal(invokeOrderWant, interceptor.invokeOrderGot) {
3417 t.Errorf("%s: Expected invoke order to match, diff: %s", key, cmp.Diff(invokeOrderWant, interceptor.invokeOrderGot))
3418 }
3419 })
3420 }
3421 }
3422
3423 func testWithWrapPreviousError(t *testing.T, doFunc func(ctx context.Context, r *Request) error) {
3424 var (
3425 containsFormatExpected = "- error from a previous attempt: %s"
3426 nonRetryableErr = errors.New("non retryable error")
3427 )
3428
3429 tests := []struct {
3430 name string
3431 maxRetries int
3432 serverReturns []responseErr
3433 expectedErr error
3434 wrapped bool
3435 attemptsExpected int
3436 contains string
3437 }{
3438 {
3439 name: "success at first attempt",
3440 maxRetries: 2,
3441 serverReturns: []responseErr{
3442 {response: &http.Response{StatusCode: http.StatusOK}, err: nil},
3443 },
3444 attemptsExpected: 1,
3445 expectedErr: nil,
3446 },
3447 {
3448 name: "success after a series of retry-after from the server",
3449 maxRetries: 2,
3450 serverReturns: []responseErr{
3451 {response: retryAfterResponse(), err: nil},
3452 {response: retryAfterResponse(), err: nil},
3453 {response: &http.Response{StatusCode: http.StatusOK}, err: nil},
3454 },
3455 attemptsExpected: 3,
3456 expectedErr: nil,
3457 },
3458 {
3459 name: "success after a series of retryable errors",
3460 maxRetries: 2,
3461 serverReturns: []responseErr{
3462 {response: nil, err: io.EOF},
3463 {response: nil, err: io.EOF},
3464 {response: &http.Response{StatusCode: http.StatusOK}, err: nil},
3465 },
3466 attemptsExpected: 3,
3467 expectedErr: nil,
3468 },
3469 {
3470 name: "request errors out with a non retryable error",
3471 maxRetries: 2,
3472 serverReturns: []responseErr{
3473 {response: nil, err: nonRetryableErr},
3474 },
3475 attemptsExpected: 1,
3476 expectedErr: nonRetryableErr,
3477 },
3478 {
3479 name: "request times out after retries, but no previous error",
3480 maxRetries: 2,
3481 serverReturns: []responseErr{
3482 {response: retryAfterResponse(), err: nil},
3483 {response: retryAfterResponse(), err: nil},
3484 {response: nil, err: context.Canceled},
3485 },
3486 attemptsExpected: 3,
3487 expectedErr: context.Canceled,
3488 },
3489 {
3490 name: "request times out after retries, and we have a relevant previous error",
3491 maxRetries: 3,
3492 serverReturns: []responseErr{
3493 {response: nil, err: io.EOF},
3494 {response: retryAfterResponse(), err: nil},
3495 {response: retryAfterResponse(), err: nil},
3496 {response: nil, err: context.Canceled},
3497 },
3498 attemptsExpected: 4,
3499 wrapped: true,
3500 expectedErr: context.Canceled,
3501 contains: fmt.Sprintf(containsFormatExpected, io.EOF),
3502 },
3503 {
3504 name: "interleaved retry-after responses with retryable errors",
3505 maxRetries: 8,
3506 serverReturns: []responseErr{
3507 {response: retryAfterResponse(), err: nil},
3508 {response: retryAfterResponse(), err: nil},
3509 {response: nil, err: io.ErrUnexpectedEOF},
3510 {response: retryAfterResponse(), err: nil},
3511 {response: retryAfterResponse(), err: nil},
3512 {response: nil, err: io.EOF},
3513 {response: retryAfterResponse(), err: nil},
3514 {response: retryAfterResponse(), err: nil},
3515 {response: nil, err: context.Canceled},
3516 },
3517 attemptsExpected: 9,
3518 wrapped: true,
3519 expectedErr: context.Canceled,
3520 contains: fmt.Sprintf(containsFormatExpected, io.EOF),
3521 },
3522 {
3523 name: "request errors out with a retryable error, followed by a non-retryable one",
3524 maxRetries: 3,
3525 serverReturns: []responseErr{
3526 {response: nil, err: io.EOF},
3527 {response: nil, err: nonRetryableErr},
3528 },
3529 attemptsExpected: 2,
3530 wrapped: true,
3531 expectedErr: nonRetryableErr,
3532 contains: fmt.Sprintf(containsFormatExpected, io.EOF),
3533 },
3534 {
3535 name: "use the most recent error",
3536 maxRetries: 2,
3537 serverReturns: []responseErr{
3538 {response: nil, err: io.ErrUnexpectedEOF},
3539 {response: nil, err: io.EOF},
3540 {response: nil, err: context.Canceled},
3541 },
3542 attemptsExpected: 3,
3543 wrapped: true,
3544 expectedErr: context.Canceled,
3545 contains: fmt.Sprintf(containsFormatExpected, io.EOF),
3546 },
3547 }
3548
3549 for _, test := range tests {
3550 t.Run(test.name, func(t *testing.T) {
3551 var attempts int
3552 client := clientForFunc(func(req *http.Request) (*http.Response, error) {
3553 defer func() {
3554 attempts++
3555 }()
3556
3557 resp := test.serverReturns[attempts].response
3558 if resp != nil {
3559 resp.Body = io.NopCloser(bytes.NewReader([]byte{}))
3560 }
3561 return resp, test.serverReturns[attempts].err
3562 })
3563
3564 base, err := url.Parse("http://foo.bar")
3565 if err != nil {
3566 t.Fatalf("Failed to create new HTTP request - %v", err)
3567 }
3568 req := &Request{
3569 verb: "GET",
3570 bodyBytes: []byte{},
3571 c: &RESTClient{
3572 base: base,
3573 content: defaultContentConfig(),
3574 Client: client,
3575 },
3576 pathPrefix: "/api/v1",
3577 rateLimiter: flowcontrol.NewFakeAlwaysRateLimiter(),
3578 backoff: &noSleepBackOff{},
3579 maxRetries: test.maxRetries,
3580 retryFn: defaultRequestRetryFn,
3581 }
3582
3583 err = doFunc(context.Background(), req)
3584 if test.attemptsExpected != attempts {
3585 t.Errorf("Expected attempts: %d, but got: %d", test.attemptsExpected, attempts)
3586 }
3587
3588 switch {
3589 case test.expectedErr == nil:
3590 if err != nil {
3591 t.Errorf("Expected a nil error, but got: %v", err)
3592 return
3593 }
3594 case test.expectedErr != nil:
3595 if !strings.Contains(err.Error(), test.contains) {
3596 t.Errorf("Expected error message to contain %q, but got: %v", test.contains, err)
3597 }
3598
3599 urlErrGot, _ := err.(*url.Error)
3600 if test.wrapped {
3601
3602 unwrapper, ok := err.(interface {
3603 Unwrap() error
3604 })
3605 if !ok {
3606 t.Errorf("Expected error to implement Unwrap method, but got: %v", err)
3607 return
3608 }
3609 urlErrGot, _ = unwrapper.Unwrap().(*url.Error)
3610 }
3611
3612 if urlErrGot == nil {
3613 t.Errorf("Expected error to be url.Error, but got: %v", err)
3614 return
3615 }
3616
3617 errGot := urlErrGot.Unwrap()
3618 if test.expectedErr != errGot {
3619 t.Errorf("Expected error %v, but got: %v", test.expectedErr, errGot)
3620 }
3621 }
3622 })
3623 }
3624 }
3625
3626 func TestReuseRequest(t *testing.T) {
3627 var tests = []struct {
3628 name string
3629 enableHTTP2 bool
3630 }{
3631 {"HTTP1", false},
3632 {"HTTP2", true},
3633 }
3634 for _, tt := range tests {
3635 t.Run(tt.name, func(t *testing.T) {
3636
3637 ts := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
3638 w.Write([]byte(r.RemoteAddr))
3639 }))
3640 ts.EnableHTTP2 = tt.enableHTTP2
3641 ts.StartTLS()
3642 defer ts.Close()
3643
3644 ctx, cancel := context.WithCancel(context.Background())
3645 defer cancel()
3646
3647 c := testRESTClient(t, ts)
3648
3649 req1, err := c.Verb("GET").
3650 Prefix("foo").
3651 DoRaw(ctx)
3652 if err != nil {
3653 t.Fatalf("Unexpected error: %v", err)
3654 }
3655
3656 req2, err := c.Verb("GET").
3657 Prefix("foo").
3658 DoRaw(ctx)
3659 if err != nil {
3660 t.Fatalf("Unexpected error: %v", err)
3661 }
3662
3663 if string(req1) != string(req2) {
3664 t.Fatalf("Expected %v to be equal to %v", string(req1), string(req2))
3665 }
3666
3667 })
3668 }
3669 }
3670
3671 func TestHTTP1DoNotReuseRequestAfterTimeout(t *testing.T) {
3672 var tests = []struct {
3673 name string
3674 enableHTTP2 bool
3675 }{
3676 {"HTTP1", false},
3677 {"HTTP2", true},
3678 }
3679 for _, tt := range tests {
3680 t.Run(tt.name, func(t *testing.T) {
3681 done := make(chan struct{})
3682 ts := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
3683 t.Logf("TEST Connected from %v on %v\n", r.RemoteAddr, r.URL.Path)
3684 if r.URL.Path == "/hang" {
3685 t.Logf("TEST hanging %v\n", r.RemoteAddr)
3686 <-done
3687 }
3688 w.Write([]byte(r.RemoteAddr))
3689 }))
3690 ts.EnableHTTP2 = tt.enableHTTP2
3691 ts.StartTLS()
3692 defer ts.Close()
3693
3694 defer close(done)
3695
3696 ctx, cancel := context.WithCancel(context.Background())
3697 defer cancel()
3698
3699 transport, ok := ts.Client().Transport.(*http.Transport)
3700 if !ok {
3701 t.Fatalf("failed to assert *http.Transport")
3702 }
3703
3704 config := &Config{
3705 Host: ts.URL,
3706 Transport: utilnet.SetTransportDefaults(transport),
3707 Timeout: 1 * time.Second,
3708
3709 ContentConfig: ContentConfig{
3710 GroupVersion: &schema.GroupVersion{},
3711 NegotiatedSerializer: &serializer.CodecFactory{},
3712 },
3713 }
3714 if !tt.enableHTTP2 {
3715 config.TLSClientConfig.NextProtos = []string{"http/1.1"}
3716 }
3717 c, err := RESTClientFor(config)
3718 if err != nil {
3719 t.Fatalf("failed to create REST client: %v", err)
3720 }
3721 req1, err := c.Verb("GET").
3722 Prefix("foo").
3723 DoRaw(ctx)
3724 if err != nil {
3725 t.Fatalf("Unexpected error: %v", err)
3726 }
3727
3728 _, err = c.Verb("GET").
3729 Prefix("/hang").
3730 DoRaw(ctx)
3731 if err == nil {
3732 t.Fatalf("Expected error")
3733 }
3734
3735 req2, err := c.Verb("GET").
3736 Prefix("foo").
3737 DoRaw(ctx)
3738 if err != nil {
3739 t.Fatalf("Unexpected error: %v", err)
3740 }
3741
3742
3743 if tt.enableHTTP2 != (string(req1) == string(req2)) {
3744 if tt.enableHTTP2 {
3745 t.Fatalf("Expected %v to be the same as %v", string(req1), string(req2))
3746 } else {
3747 t.Fatalf("Expected %v to be different to %v", string(req1), string(req2))
3748 }
3749 }
3750 })
3751 }
3752 }
3753
3754 func TestTransportConcurrency(t *testing.T) {
3755 const numReqs = 10
3756 var tests = []struct {
3757 name string
3758 enableHTTP2 bool
3759 }{
3760 {"HTTP1", false},
3761 {"HTTP2", true},
3762 }
3763 for _, tt := range tests {
3764 t.Run(tt.name, func(t *testing.T) {
3765
3766 ts := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
3767 t.Logf("Connected from %v %v", r.RemoteAddr, r.URL)
3768 fmt.Fprintf(w, "%v", r.FormValue("echo"))
3769 }))
3770 ts.EnableHTTP2 = tt.enableHTTP2
3771 ts.StartTLS()
3772 defer ts.Close()
3773 var wg sync.WaitGroup
3774
3775 wg.Add(numReqs)
3776 c := testRESTClient(t, ts)
3777 reqs := make(chan string)
3778 defer close(reqs)
3779
3780 for i := 0; i < 4; i++ {
3781 go func() {
3782 for req := range reqs {
3783 res, err := c.Get().Param("echo", req).DoRaw(context.Background())
3784 if err != nil {
3785 t.Errorf("error on req %s: %v", req, err)
3786 wg.Done()
3787 continue
3788 }
3789
3790 if string(res) != req {
3791 t.Errorf("body of req %s = %q; want %q", req, res, req)
3792 }
3793
3794 wg.Done()
3795 }
3796 }()
3797 }
3798 for i := 0; i < numReqs; i++ {
3799 reqs <- fmt.Sprintf("request-%d", i)
3800 }
3801 wg.Wait()
3802 })
3803 }
3804 }
3805
3806 func TestRetryableConditions(t *testing.T) {
3807 var (
3808 methods = map[string]func(ctx context.Context, r *Request){
3809 "Do": func(ctx context.Context, r *Request) {
3810 r.Do(ctx)
3811 },
3812 "DoRaw": func(ctx context.Context, r *Request) {
3813 r.DoRaw(ctx)
3814 },
3815 "Stream": func(ctx context.Context, r *Request) {
3816 r.Stream(ctx)
3817 },
3818 "Watch": func(ctx context.Context, r *Request) {
3819 w, err := r.Watch(ctx)
3820 if err == nil {
3821
3822 <-w.ResultChan()
3823 }
3824 },
3825 }
3826
3827 alwaysRetry = map[string]bool{
3828 "Do": true,
3829 "DoRaw": true,
3830 "Watch": true,
3831 "Stream": true,
3832 }
3833
3834 neverRetry = map[string]bool{
3835 "Do": false,
3836 "DoRaw": false,
3837 "Watch": false,
3838 "Stream": false,
3839 }
3840
3841 alwaysRetryExceptStream = map[string]bool{
3842 "Do": true,
3843 "DoRaw": true,
3844 "Watch": true,
3845 "Stream": false,
3846 }
3847 )
3848
3849 tests := []struct {
3850 name string
3851 verbs []string
3852 serverReturns responseErr
3853 retryExpectation map[string]bool
3854 }{
3855
3856 {
3857 name: "server returns {429, Retry-After}",
3858 verbs: []string{"GET", "POST", "PUT", "DELETE", "PATCH"},
3859 serverReturns: responseErr{response: retryAfterResponseWithCodeAndDelay(http.StatusTooManyRequests, "0"), err: nil},
3860 retryExpectation: alwaysRetry,
3861 },
3862
3863 {
3864 name: "server returns {503, Retry-After}",
3865 verbs: []string{"GET", "POST", "PUT", "DELETE", "PATCH"},
3866 serverReturns: responseErr{response: retryAfterResponseWithCodeAndDelay(http.StatusServiceUnavailable, "0"), err: nil},
3867 retryExpectation: alwaysRetry,
3868 },
3869
3870 {
3871 name: "server returns 5xx, but no Retry-After",
3872 verbs: []string{"GET", "POST", "PUT", "DELETE", "PATCH"},
3873 serverReturns: responseErr{response: &http.Response{StatusCode: http.StatusInternalServerError}, err: nil},
3874 retryExpectation: neverRetry,
3875 },
3876
3877 {
3878 name: "server returns 429 but no Retry-After",
3879 verbs: []string{"GET", "POST", "PUT", "DELETE", "PATCH"},
3880 serverReturns: responseErr{response: &http.Response{StatusCode: http.StatusTooManyRequests}, err: nil},
3881 retryExpectation: neverRetry,
3882 },
3883
3884 {
3885 name: "server returns connection reset error",
3886 verbs: []string{"GET"},
3887 serverReturns: responseErr{response: nil, err: syscall.ECONNRESET},
3888 retryExpectation: alwaysRetryExceptStream,
3889 },
3890 {
3891 name: "server returns EOF error",
3892 verbs: []string{"GET"},
3893 serverReturns: responseErr{response: nil, err: io.EOF},
3894 retryExpectation: alwaysRetryExceptStream,
3895 },
3896 {
3897 name: "server returns unexpected EOF error",
3898 verbs: []string{"GET"},
3899 serverReturns: responseErr{response: nil, err: io.ErrUnexpectedEOF},
3900 retryExpectation: alwaysRetryExceptStream,
3901 },
3902 {
3903 name: "server returns broken connection error",
3904 verbs: []string{"GET"},
3905 serverReturns: responseErr{response: nil, err: errors.New("http: can't write HTTP request on broken connection")},
3906 retryExpectation: alwaysRetryExceptStream,
3907 },
3908 {
3909 name: "server returns GOAWAY error",
3910 verbs: []string{"GET"},
3911 serverReturns: responseErr{response: nil, err: errors.New("http2: server sent GOAWAY and closed the connection")},
3912 retryExpectation: alwaysRetryExceptStream,
3913 },
3914 {
3915 name: "server returns connection reset by peer error",
3916 verbs: []string{"GET"},
3917 serverReturns: responseErr{response: nil, err: errors.New("connection reset by peer")},
3918 retryExpectation: alwaysRetryExceptStream,
3919 },
3920 {
3921 name: "server returns use of closed network connection error",
3922 verbs: []string{"GET"},
3923 serverReturns: responseErr{response: nil, err: errors.New("use of closed network connection")},
3924 retryExpectation: alwaysRetryExceptStream,
3925 },
3926
3927 {
3928 name: "server returns connection refused error",
3929 verbs: []string{"GET"},
3930 serverReturns: responseErr{response: nil, err: syscall.ECONNREFUSED},
3931 retryExpectation: neverRetry,
3932 },
3933 {
3934 name: "server returns connection refused error",
3935 verbs: []string{"POST"},
3936 serverReturns: responseErr{response: nil, err: syscall.ECONNREFUSED},
3937 retryExpectation: neverRetry,
3938 },
3939 {
3940 name: "server returns EOF error",
3941 verbs: []string{"POST"},
3942 serverReturns: responseErr{response: nil, err: io.EOF},
3943 retryExpectation: map[string]bool{
3944 "Do": false,
3945 "DoRaw": false,
3946 "Watch": true,
3947 "Stream": false,
3948 },
3949 },
3950
3951 {
3952 name: "server returns net.Timeout() == true error",
3953 verbs: []string{"GET"},
3954 serverReturns: responseErr{response: nil, err: &net.DNSError{IsTimeout: true}},
3955 retryExpectation: map[string]bool{
3956 "Do": false,
3957 "DoRaw": false,
3958 "Watch": true,
3959 "Stream": false,
3960 },
3961 },
3962 {
3963 name: "server returns OK, never retry",
3964 verbs: []string{"GET", "POST", "PUT", "DELETE", "PATCH"},
3965 serverReturns: responseErr{response: &http.Response{StatusCode: http.StatusOK}, err: nil},
3966 retryExpectation: neverRetry,
3967 },
3968 {
3969 name: "server returns {3xx, Retry-After}",
3970 verbs: []string{"GET", "POST", "PUT", "DELETE", "PATCH"},
3971 serverReturns: responseErr{response: &http.Response{StatusCode: http.StatusMovedPermanently, Header: http.Header{"Retry-After": []string{"0"}}}, err: nil},
3972 retryExpectation: neverRetry,
3973 },
3974 }
3975
3976 for _, test := range tests {
3977 for method, retryExpected := range test.retryExpectation {
3978 fn, ok := methods[method]
3979 if !ok {
3980 t.Fatalf("Wrong test setup, unknown method: %s", method)
3981 }
3982
3983 for _, verb := range test.verbs {
3984 t.Run(fmt.Sprintf("%s/%s/%s", test.name, method, verb), func(t *testing.T) {
3985 var attemptsGot int
3986 client := clientForFunc(func(req *http.Request) (*http.Response, error) {
3987 attemptsGot++
3988 return test.serverReturns.response, test.serverReturns.err
3989 })
3990
3991 u, _ := url.Parse("http://localhost:123" + "/apis")
3992 req := &Request{
3993 verb: verb,
3994 c: &RESTClient{
3995 base: u,
3996 content: defaultContentConfig(),
3997 Client: client,
3998 },
3999 backoff: &noSleepBackOff{},
4000 maxRetries: 2,
4001 retryFn: defaultRequestRetryFn,
4002 }
4003
4004 fn(context.Background(), req)
4005
4006 if retryExpected {
4007 if attemptsGot != 3 {
4008 t.Errorf("Expected attempt count: %d, but got: %d", 3, attemptsGot)
4009 }
4010 return
4011 }
4012
4013 if attemptsGot > 1 {
4014 t.Errorf("Expected no retry, but got %d attempts", attemptsGot)
4015 }
4016 })
4017 }
4018 }
4019 }
4020 }
4021
4022 func TestRequestConcurrencyWithRetry(t *testing.T) {
4023 var attempts int32
4024 client := clientForFunc(func(req *http.Request) (*http.Response, error) {
4025 defer func() {
4026 atomic.AddInt32(&attempts, 1)
4027 }()
4028
4029
4030 return &http.Response{
4031 StatusCode: http.StatusInternalServerError,
4032 Header: http.Header{"Retry-After": []string{"1"}},
4033 }, nil
4034 })
4035
4036 req := &Request{
4037 verb: "POST",
4038 c: &RESTClient{
4039 content: defaultContentConfig(),
4040 Client: client,
4041 },
4042 backoff: &noSleepBackOff{},
4043 maxRetries: 9,
4044 retryFn: defaultRequestRetryFn,
4045 }
4046
4047 concurrency := 20
4048 wg := sync.WaitGroup{}
4049 wg.Add(concurrency)
4050 startCh := make(chan struct{})
4051 for i := 0; i < concurrency; i++ {
4052 go func() {
4053 defer wg.Done()
4054 <-startCh
4055 req.Do(context.Background())
4056 }()
4057 }
4058
4059 close(startCh)
4060 wg.Wait()
4061
4062
4063 expected := concurrency * (req.maxRetries + 1)
4064 if atomic.LoadInt32(&attempts) != int32(expected) {
4065 t.Errorf("Expected attempts: %d, but got: %d", expected, attempts)
4066 }
4067 }
4068
View as plain text