1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package client
16
17 import (
18 "context"
19 "errors"
20 "fmt"
21 "io"
22 "io/ioutil"
23 "math/rand"
24 "net/http"
25 "net/url"
26 "reflect"
27 "sort"
28 "strings"
29 "testing"
30 "time"
31
32 "go.etcd.io/etcd/api/v3/version"
33 "go.etcd.io/etcd/client/pkg/v3/testutil"
34 )
35
36 type actionAssertingHTTPClient struct {
37 t *testing.T
38 num int
39 act httpAction
40
41 resp http.Response
42 body []byte
43 err error
44 }
45
46 func (a *actionAssertingHTTPClient) Do(_ context.Context, act httpAction) (*http.Response, []byte, error) {
47 if !reflect.DeepEqual(a.act, act) {
48 a.t.Errorf("#%d: unexpected httpAction: want=%#v got=%#v", a.num, a.act, act)
49 }
50
51 return &a.resp, a.body, a.err
52 }
53
54 type staticHTTPClient struct {
55 resp http.Response
56 body []byte
57 err error
58 }
59
60 func (s *staticHTTPClient) Do(context.Context, httpAction) (*http.Response, []byte, error) {
61 return &s.resp, s.body, s.err
62 }
63
64 type staticHTTPAction struct {
65 request http.Request
66 }
67
68 func (s *staticHTTPAction) HTTPRequest(url.URL) *http.Request {
69 return &s.request
70 }
71
72 type staticHTTPResponse struct {
73 resp http.Response
74 body []byte
75 err error
76 }
77
78 type multiStaticHTTPClient struct {
79 responses []staticHTTPResponse
80 cur int
81 }
82
83 func (s *multiStaticHTTPClient) Do(context.Context, httpAction) (*http.Response, []byte, error) {
84 r := s.responses[s.cur]
85 s.cur++
86 return &r.resp, r.body, r.err
87 }
88
89 func newStaticHTTPClientFactory(responses []staticHTTPResponse) httpClientFactory {
90 var cur int
91 return func(url.URL) httpClient {
92 r := responses[cur]
93 cur++
94 return &staticHTTPClient{resp: r.resp, body: r.body, err: r.err}
95 }
96 }
97
98 type fakeTransport struct {
99 respchan chan *http.Response
100 errchan chan error
101 startCancel chan struct{}
102 finishCancel chan struct{}
103 }
104
105 func newFakeTransport() *fakeTransport {
106 return &fakeTransport{
107 respchan: make(chan *http.Response, 1),
108 errchan: make(chan error, 1),
109 startCancel: make(chan struct{}, 1),
110 finishCancel: make(chan struct{}, 1),
111 }
112 }
113
114 func (t *fakeTransport) CancelRequest(*http.Request) {
115 t.startCancel <- struct{}{}
116 }
117
118 type fakeAction struct{}
119
120 func (a *fakeAction) HTTPRequest(url.URL) *http.Request {
121 return &http.Request{}
122 }
123
124 func TestSimpleHTTPClientDoSuccess(t *testing.T) {
125 tr := newFakeTransport()
126 c := &simpleHTTPClient{transport: tr}
127
128 tr.respchan <- &http.Response{
129 StatusCode: http.StatusTeapot,
130 Body: ioutil.NopCloser(strings.NewReader("foo")),
131 }
132
133 resp, body, err := c.Do(context.Background(), &fakeAction{})
134 if err != nil {
135 t.Fatalf("incorrect error value: want=nil got=%v", err)
136 }
137
138 wantCode := http.StatusTeapot
139 if wantCode != resp.StatusCode {
140 t.Fatalf("invalid response code: want=%d got=%d", wantCode, resp.StatusCode)
141 }
142
143 wantBody := []byte("foo")
144 if !reflect.DeepEqual(wantBody, body) {
145 t.Fatalf("invalid response body: want=%q got=%q", wantBody, body)
146 }
147 }
148
149 func TestSimpleHTTPClientDoError(t *testing.T) {
150 tr := newFakeTransport()
151 c := &simpleHTTPClient{transport: tr}
152
153 tr.errchan <- errors.New("fixture")
154
155 _, _, err := c.Do(context.Background(), &fakeAction{})
156 if err == nil {
157 t.Fatalf("expected non-nil error, got nil")
158 }
159 }
160
161 type nilAction struct{}
162
163 func (a *nilAction) HTTPRequest(url.URL) *http.Request {
164 return nil
165 }
166
167 func TestSimpleHTTPClientDoNilRequest(t *testing.T) {
168 tr := newFakeTransport()
169 c := &simpleHTTPClient{transport: tr}
170
171 tr.errchan <- errors.New("fixture")
172
173 _, _, err := c.Do(context.Background(), &nilAction{})
174 if err != ErrNoRequest {
175 t.Fatalf("expected non-nil error, got nil")
176 }
177 }
178
179 func TestSimpleHTTPClientDoCancelContext(t *testing.T) {
180 tr := newFakeTransport()
181 c := &simpleHTTPClient{transport: tr}
182
183 tr.startCancel <- struct{}{}
184 tr.finishCancel <- struct{}{}
185
186 _, _, err := c.Do(context.Background(), &fakeAction{})
187 if err == nil {
188 t.Fatalf("expected non-nil error, got nil")
189 }
190 }
191
192 type checkableReadCloser struct {
193 io.ReadCloser
194 closed bool
195 }
196
197 func (c *checkableReadCloser) Close() error {
198 if !c.closed {
199 c.closed = true
200 return c.ReadCloser.Close()
201 }
202 return nil
203 }
204
205 func TestSimpleHTTPClientDoCancelContextResponseBodyClosed(t *testing.T) {
206 tr := newFakeTransport()
207 c := &simpleHTTPClient{transport: tr}
208
209
210 ctx, cancel := context.WithCancel(context.Background())
211 cancel()
212
213 body := &checkableReadCloser{ReadCloser: ioutil.NopCloser(strings.NewReader("foo"))}
214 go func() {
215
216
217 testutil.WaitSchedule()
218
219
220 tr.respchan <- &http.Response{Body: body}
221 }()
222
223 _, _, err := c.Do(ctx, &fakeAction{})
224 if err == nil {
225 t.Fatalf("expected non-nil error, got nil")
226 }
227
228 if !body.closed {
229 t.Fatalf("expected closed body")
230 }
231 }
232
233 type blockingBody struct {
234 c chan struct{}
235 }
236
237 func (bb *blockingBody) Read(p []byte) (n int, err error) {
238 <-bb.c
239 return 0, errors.New("closed")
240 }
241
242 func (bb *blockingBody) Close() error {
243 close(bb.c)
244 return nil
245 }
246
247 func TestSimpleHTTPClientDoCancelContextResponseBodyClosedWithBlockingBody(t *testing.T) {
248 tr := newFakeTransport()
249 c := &simpleHTTPClient{transport: tr}
250
251 ctx, cancel := context.WithCancel(context.Background())
252 body := &checkableReadCloser{ReadCloser: &blockingBody{c: make(chan struct{})}}
253 go func() {
254 tr.respchan <- &http.Response{Body: body}
255 time.Sleep(2 * time.Millisecond)
256
257 cancel()
258 }()
259
260 _, _, err := c.Do(ctx, &fakeAction{})
261 if err != context.Canceled {
262 t.Fatalf("expected %+v, got %+v", context.Canceled, err)
263 }
264
265 if !body.closed {
266 t.Fatalf("expected closed body")
267 }
268 }
269
270 func TestSimpleHTTPClientDoCancelContextWaitForRoundTrip(t *testing.T) {
271 tr := newFakeTransport()
272 c := &simpleHTTPClient{transport: tr}
273
274 donechan := make(chan struct{})
275 ctx, cancel := context.WithCancel(context.Background())
276 go func() {
277 c.Do(ctx, &fakeAction{})
278 close(donechan)
279 }()
280
281
282 cancel()
283
284 select {
285 case <-donechan:
286 t.Fatalf("simpleHTTPClient.Do should not have exited yet")
287 default:
288 }
289
290 tr.finishCancel <- struct{}{}
291
292 select {
293 case <-donechan:
294
295 return
296 case <-time.After(time.Second):
297 t.Fatalf("simpleHTTPClient.Do did not exit within 1s")
298 }
299 }
300
301 func TestSimpleHTTPClientDoHeaderTimeout(t *testing.T) {
302 tr := newFakeTransport()
303 tr.finishCancel <- struct{}{}
304 c := &simpleHTTPClient{transport: tr, headerTimeout: time.Millisecond}
305
306 errc := make(chan error, 1)
307 go func() {
308 _, _, err := c.Do(context.Background(), &fakeAction{})
309 errc <- err
310 }()
311
312 select {
313 case err := <-errc:
314 if err == nil {
315 t.Fatalf("expected non-nil error, got nil")
316 }
317 case <-time.After(time.Second):
318 t.Fatalf("unexpected timeout when waiting for the test to finish")
319 }
320 }
321
322 func TestHTTPClusterClientDo(t *testing.T) {
323 fakeErr := errors.New("fake!")
324 fakeURL := url.URL{}
325 tests := []struct {
326 client *httpClusterClient
327 ctx context.Context
328
329 wantCode int
330 wantErr error
331 wantPinned int
332 }{
333
334 {
335 client: &httpClusterClient{
336 endpoints: []url.URL{fakeURL, fakeURL},
337 clientFactory: newStaticHTTPClientFactory(
338 []staticHTTPResponse{
339 {resp: http.Response{StatusCode: http.StatusTeapot}},
340 {err: fakeErr},
341 },
342 ),
343 rand: rand.New(rand.NewSource(0)),
344 },
345 wantCode: http.StatusTeapot,
346 },
347
348
349 {
350 client: &httpClusterClient{
351 endpoints: []url.URL{fakeURL, fakeURL},
352 clientFactory: newStaticHTTPClientFactory(
353 []staticHTTPResponse{
354 {err: fakeErr},
355 {resp: http.Response{StatusCode: http.StatusTeapot}},
356 },
357 ),
358 rand: rand.New(rand.NewSource(0)),
359 },
360 wantCode: http.StatusTeapot,
361 wantPinned: 1,
362 },
363
364
365 {
366 client: &httpClusterClient{
367 endpoints: []url.URL{fakeURL, fakeURL},
368 clientFactory: newStaticHTTPClientFactory(
369 []staticHTTPResponse{
370 {err: context.Canceled},
371 {resp: http.Response{StatusCode: http.StatusTeapot}},
372 },
373 ),
374 rand: rand.New(rand.NewSource(0)),
375 },
376 wantErr: context.Canceled,
377 },
378
379
380 {
381 client: &httpClusterClient{
382 endpoints: []url.URL{},
383 clientFactory: newHTTPClientFactory(nil, nil, 0),
384 rand: rand.New(rand.NewSource(0)),
385 },
386 wantErr: ErrNoEndpoints,
387 },
388
389
390 {
391 client: &httpClusterClient{
392 endpoints: []url.URL{fakeURL, fakeURL},
393 clientFactory: newStaticHTTPClientFactory(
394 []staticHTTPResponse{
395 {err: fakeErr},
396 {err: fakeErr},
397 },
398 ),
399 rand: rand.New(rand.NewSource(0)),
400 },
401 wantErr: &ClusterError{Errors: []error{fakeErr, fakeErr}},
402 },
403
404
405 {
406 client: &httpClusterClient{
407 endpoints: []url.URL{fakeURL, fakeURL},
408 clientFactory: newStaticHTTPClientFactory(
409 []staticHTTPResponse{
410 {resp: http.Response{StatusCode: http.StatusBadGateway}},
411 {resp: http.Response{StatusCode: http.StatusTeapot}},
412 },
413 ),
414 rand: rand.New(rand.NewSource(0)),
415 },
416 wantCode: http.StatusTeapot,
417 wantPinned: 1,
418 },
419
420
421 {
422 client: &httpClusterClient{
423 endpoints: []url.URL{fakeURL, fakeURL},
424 clientFactory: newStaticHTTPClientFactory(
425 []staticHTTPResponse{
426 {resp: http.Response{StatusCode: http.StatusBadGateway}},
427 {resp: http.Response{StatusCode: http.StatusTeapot}},
428 },
429 ),
430 rand: rand.New(rand.NewSource(0)),
431 },
432 ctx: context.WithValue(context.Background(), &oneShotCtxValue, &oneShotCtxValue),
433 wantErr: fmt.Errorf("client: etcd member returns server error [Bad Gateway]"),
434 wantPinned: 1,
435 },
436 }
437
438 for i, tt := range tests {
439 if tt.ctx == nil {
440 tt.ctx = context.Background()
441 }
442 resp, _, err := tt.client.Do(tt.ctx, nil)
443 if (tt.wantErr == nil && tt.wantErr != err) || (tt.wantErr != nil && tt.wantErr.Error() != err.Error()) {
444 t.Errorf("#%d: got err=%v, want=%v", i, err, tt.wantErr)
445 continue
446 }
447
448 if resp == nil {
449 if tt.wantCode != 0 {
450 t.Errorf("#%d: resp is nil, want=%d", i, tt.wantCode)
451 continue
452 }
453 } else if resp.StatusCode != tt.wantCode {
454 t.Errorf("#%d: resp code=%d, want=%d", i, resp.StatusCode, tt.wantCode)
455 continue
456 }
457
458 if tt.client.pinned != tt.wantPinned {
459 t.Errorf("#%d: pinned=%d, want=%d", i, tt.client.pinned, tt.wantPinned)
460 }
461 }
462 }
463
464 func TestHTTPClusterClientDoDeadlineExceedContext(t *testing.T) {
465 fakeURL := url.URL{}
466 tr := newFakeTransport()
467 tr.finishCancel <- struct{}{}
468 c := &httpClusterClient{
469 clientFactory: newHTTPClientFactory(tr, DefaultCheckRedirect, 0),
470 endpoints: []url.URL{fakeURL},
471 }
472
473 errc := make(chan error, 1)
474 go func() {
475 ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
476 defer cancel()
477 _, _, err := c.Do(ctx, &fakeAction{})
478 errc <- err
479 }()
480
481 select {
482 case err := <-errc:
483 if err != context.DeadlineExceeded {
484 t.Errorf("err = %+v, want %+v", err, context.DeadlineExceeded)
485 }
486 case <-time.After(time.Second):
487 t.Fatalf("unexpected timeout when waiting for request to deadline exceed")
488 }
489 }
490
491 type fakeCancelContext struct{}
492
493 var errFakeCancelContext = errors.New("fake context canceled")
494
495 func (f fakeCancelContext) Deadline() (time.Time, bool) { return time.Time{}, false }
496 func (f fakeCancelContext) Done() <-chan struct{} {
497 d := make(chan struct{}, 1)
498 d <- struct{}{}
499 return d
500 }
501 func (f fakeCancelContext) Err() error { return errFakeCancelContext }
502 func (f fakeCancelContext) Value(key interface{}) interface{} { return 1 }
503
504 func withTimeout(parent context.Context, timeout time.Duration) (
505 ctx context.Context,
506 cancel context.CancelFunc) {
507 ctx = parent
508 cancel = func() {
509 ctx = nil
510 }
511 return ctx, cancel
512 }
513
514 func TestHTTPClusterClientDoCanceledContext(t *testing.T) {
515 fakeURL := url.URL{}
516 tr := newFakeTransport()
517 tr.finishCancel <- struct{}{}
518 c := &httpClusterClient{
519 clientFactory: newHTTPClientFactory(tr, DefaultCheckRedirect, 0),
520 endpoints: []url.URL{fakeURL},
521 }
522
523 errc := make(chan error, 1)
524 go func() {
525 ctx, cancel := withTimeout(fakeCancelContext{}, time.Millisecond)
526 cancel()
527 _, _, err := c.Do(ctx, &fakeAction{})
528 errc <- err
529 }()
530
531 select {
532 case err := <-errc:
533 if err != errFakeCancelContext {
534 t.Errorf("err = %+v, want %+v", err, errFakeCancelContext)
535 }
536 case <-time.After(time.Second):
537 t.Fatalf("unexpected timeout when waiting for request to fake context canceled")
538 }
539 }
540
541 func TestRedirectedHTTPAction(t *testing.T) {
542 act := &redirectedHTTPAction{
543 action: &staticHTTPAction{
544 request: http.Request{
545 Method: "DELETE",
546 URL: &url.URL{
547 Scheme: "https",
548 Host: "foo.example.com",
549 Path: "/ping",
550 },
551 },
552 },
553 location: url.URL{
554 Scheme: "https",
555 Host: "bar.example.com",
556 Path: "/pong",
557 },
558 }
559
560 want := &http.Request{
561 Method: "DELETE",
562 URL: &url.URL{
563 Scheme: "https",
564 Host: "bar.example.com",
565 Path: "/pong",
566 },
567 }
568 got := act.HTTPRequest(url.URL{Scheme: "http", Host: "baz.example.com", Path: "/pang"})
569
570 if !reflect.DeepEqual(want, got) {
571 t.Fatalf("HTTPRequest is %#v, want %#v", want, got)
572 }
573 }
574
575 func TestRedirectFollowingHTTPClient(t *testing.T) {
576 tests := []struct {
577 checkRedirect CheckRedirectFunc
578 client httpClient
579 wantCode int
580 wantErr error
581 }{
582
583 {
584 checkRedirect: func(int) error { return ErrTooManyRedirects },
585 client: &multiStaticHTTPClient{
586 responses: []staticHTTPResponse{
587 {
588 err: errors.New("fail!"),
589 },
590 },
591 },
592 wantErr: errors.New("fail!"),
593 },
594
595
596 {
597 checkRedirect: func(int) error { return ErrTooManyRedirects },
598 client: &multiStaticHTTPClient{
599 responses: []staticHTTPResponse{
600 {
601 resp: http.Response{
602 StatusCode: http.StatusTeapot,
603 },
604 },
605 },
606 },
607 wantCode: http.StatusTeapot,
608 },
609
610
611 {
612 checkRedirect: func(via int) error {
613 if via >= 2 {
614 return ErrTooManyRedirects
615 }
616 return nil
617 },
618 client: &multiStaticHTTPClient{
619 responses: []staticHTTPResponse{
620 {
621 resp: http.Response{
622 StatusCode: http.StatusTemporaryRedirect,
623 Header: http.Header{"Location": []string{"http://example.com"}},
624 },
625 },
626 {
627 resp: http.Response{
628 StatusCode: http.StatusTeapot,
629 },
630 },
631 },
632 },
633 wantCode: http.StatusTeapot,
634 },
635
636
637 {
638 checkRedirect: func(via int) error {
639 if via >= 3 {
640 return ErrTooManyRedirects
641 }
642 return nil
643 },
644 client: &multiStaticHTTPClient{
645 responses: []staticHTTPResponse{
646 {
647 resp: http.Response{
648 StatusCode: http.StatusTemporaryRedirect,
649 Header: http.Header{"Location": []string{"http://example.com"}},
650 },
651 },
652 {
653 resp: http.Response{
654 StatusCode: http.StatusTemporaryRedirect,
655 Header: http.Header{"Location": []string{"http://example.com"}},
656 },
657 },
658 {
659 resp: http.Response{
660 StatusCode: http.StatusTeapot,
661 },
662 },
663 },
664 },
665 wantCode: http.StatusTeapot,
666 },
667
668
669 {
670 checkRedirect: func(via int) error {
671 if via >= 2 {
672 return ErrTooManyRedirects
673 }
674 return nil
675 },
676 client: &multiStaticHTTPClient{
677 responses: []staticHTTPResponse{
678 {
679 resp: http.Response{
680 StatusCode: http.StatusTemporaryRedirect,
681 Header: http.Header{"Location": []string{"http://example.com"}},
682 },
683 },
684 {
685 resp: http.Response{
686 StatusCode: http.StatusTemporaryRedirect,
687 Header: http.Header{"Location": []string{"http://example.com"}},
688 },
689 },
690 {
691 resp: http.Response{
692 StatusCode: http.StatusTeapot,
693 },
694 },
695 },
696 },
697 wantErr: ErrTooManyRedirects,
698 },
699
700
701 {
702 checkRedirect: func(int) error { return ErrTooManyRedirects },
703 client: &multiStaticHTTPClient{
704 responses: []staticHTTPResponse{
705 {
706 resp: http.Response{
707 StatusCode: http.StatusTemporaryRedirect,
708 },
709 },
710 },
711 },
712 wantErr: errors.New("location header not set"),
713 },
714
715
716 {
717 checkRedirect: func(int) error { return ErrTooManyRedirects },
718 client: &multiStaticHTTPClient{
719 responses: []staticHTTPResponse{
720 {
721 resp: http.Response{
722 StatusCode: http.StatusTemporaryRedirect,
723 Header: http.Header{"Location": []string{":"}},
724 },
725 },
726 },
727 },
728 wantErr: errors.New("location header not valid URL: :"),
729 },
730
731
732 {
733 checkRedirect: func(int) error { return nil },
734 client: &staticHTTPClient{
735 resp: http.Response{
736 StatusCode: http.StatusTemporaryRedirect,
737 Header: http.Header{"Location": []string{"http://example.com"}},
738 },
739 },
740 wantErr: errTooManyRedirectChecks,
741 },
742 }
743
744 for i, tt := range tests {
745 client := &redirectFollowingHTTPClient{client: tt.client, checkRedirect: tt.checkRedirect}
746 resp, _, err := client.Do(context.Background(), nil)
747 if (tt.wantErr == nil && tt.wantErr != err) || (tt.wantErr != nil && tt.wantErr.Error() != err.Error()) {
748 t.Errorf("#%d: got err=%v, want=%v", i, err, tt.wantErr)
749 continue
750 }
751
752 if resp == nil {
753 if tt.wantCode != 0 {
754 t.Errorf("#%d: resp is nil, want=%d", i, tt.wantCode)
755 }
756 continue
757 }
758
759 if resp.StatusCode != tt.wantCode {
760 t.Errorf("#%d: resp code=%d, want=%d", i, resp.StatusCode, tt.wantCode)
761 continue
762 }
763 }
764 }
765
766 func TestDefaultCheckRedirect(t *testing.T) {
767 tests := []struct {
768 num int
769 err error
770 }{
771 {0, nil},
772 {5, nil},
773 {10, nil},
774 {11, ErrTooManyRedirects},
775 {29, ErrTooManyRedirects},
776 }
777
778 for i, tt := range tests {
779 err := DefaultCheckRedirect(tt.num)
780 if !reflect.DeepEqual(tt.err, err) {
781 t.Errorf("#%d: want=%#v got=%#v", i, tt.err, err)
782 }
783 }
784 }
785
786 func TestHTTPClusterClientSync(t *testing.T) {
787 cf := newStaticHTTPClientFactory([]staticHTTPResponse{
788 {
789 resp: http.Response{StatusCode: http.StatusOK, Header: http.Header{"Content-Type": []string{"application/json"}}},
790 body: []byte(`{"members":[{"id":"2745e2525fce8fe","peerURLs":["http://127.0.0.1:7003"],"name":"node3","clientURLs":["http://127.0.0.1:4003"]},{"id":"42134f434382925","peerURLs":["http://127.0.0.1:2380","http://127.0.0.1:7001"],"name":"node1","clientURLs":["http://127.0.0.1:2379","http://127.0.0.1:4001"]},{"id":"94088180e21eb87b","peerURLs":["http://127.0.0.1:7002"],"name":"node2","clientURLs":["http://127.0.0.1:4002"]}]}`),
791 },
792 })
793
794 hc := &httpClusterClient{
795 clientFactory: cf,
796 rand: rand.New(rand.NewSource(0)),
797 }
798 err := hc.SetEndpoints([]string{"http://127.0.0.1:2379"})
799 if err != nil {
800 t.Fatalf("unexpected error during setup: %#v", err)
801 }
802
803 want := []string{"http://127.0.0.1:2379"}
804 got := hc.Endpoints()
805 if !reflect.DeepEqual(want, got) {
806 t.Fatalf("incorrect endpoints: want=%#v got=%#v", want, got)
807 }
808
809 err = hc.Sync(context.Background())
810 if err != nil {
811 t.Fatalf("unexpected error during Sync: %#v", err)
812 }
813
814 want = []string{"http://127.0.0.1:2379", "http://127.0.0.1:4001", "http://127.0.0.1:4002", "http://127.0.0.1:4003"}
815 got = hc.Endpoints()
816 sort.Strings(got)
817 if !reflect.DeepEqual(want, got) {
818 t.Fatalf("incorrect endpoints post-Sync: want=%#v got=%#v", want, got)
819 }
820
821 err = hc.SetEndpoints([]string{"http://127.0.0.1:4009"})
822 if err != nil {
823 t.Fatalf("unexpected error during reset: %#v", err)
824 }
825
826 want = []string{"http://127.0.0.1:4009"}
827 got = hc.Endpoints()
828 if !reflect.DeepEqual(want, got) {
829 t.Fatalf("incorrect endpoints post-reset: want=%#v got=%#v", want, got)
830 }
831 }
832
833 func TestHTTPClusterClientSyncFail(t *testing.T) {
834 cf := newStaticHTTPClientFactory([]staticHTTPResponse{
835 {err: errors.New("fail!")},
836 })
837
838 hc := &httpClusterClient{
839 clientFactory: cf,
840 rand: rand.New(rand.NewSource(0)),
841 }
842 err := hc.SetEndpoints([]string{"http://127.0.0.1:2379"})
843 if err != nil {
844 t.Fatalf("unexpected error during setup: %#v", err)
845 }
846
847 want := []string{"http://127.0.0.1:2379"}
848 got := hc.Endpoints()
849 if !reflect.DeepEqual(want, got) {
850 t.Fatalf("incorrect endpoints: want=%#v got=%#v", want, got)
851 }
852
853 err = hc.Sync(context.Background())
854 if err == nil {
855 t.Fatalf("got nil error during Sync")
856 }
857
858 got = hc.Endpoints()
859 if !reflect.DeepEqual(want, got) {
860 t.Fatalf("incorrect endpoints after failed Sync: want=%#v got=%#v", want, got)
861 }
862 }
863
864 func TestHTTPClusterClientAutoSyncCancelContext(t *testing.T) {
865 cf := newStaticHTTPClientFactory([]staticHTTPResponse{
866 {
867 resp: http.Response{StatusCode: http.StatusOK, Header: http.Header{"Content-Type": []string{"application/json"}}},
868 body: []byte(`{"members":[{"id":"2745e2525fce8fe","peerURLs":["http://127.0.0.1:7003"],"name":"node3","clientURLs":["http://127.0.0.1:4003"]},{"id":"42134f434382925","peerURLs":["http://127.0.0.1:2380","http://127.0.0.1:7001"],"name":"node1","clientURLs":["http://127.0.0.1:2379","http://127.0.0.1:4001"]},{"id":"94088180e21eb87b","peerURLs":["http://127.0.0.1:7002"],"name":"node2","clientURLs":["http://127.0.0.1:4002"]}]}`),
869 },
870 })
871
872 hc := &httpClusterClient{
873 clientFactory: cf,
874 rand: rand.New(rand.NewSource(0)),
875 }
876 err := hc.SetEndpoints([]string{"http://127.0.0.1:2379"})
877 if err != nil {
878 t.Fatalf("unexpected error during setup: %#v", err)
879 }
880 ctx, cancel := context.WithCancel(context.Background())
881 cancel()
882
883 err = hc.AutoSync(ctx, time.Hour)
884 if err != context.Canceled {
885 t.Fatalf("incorrect error value: want=%v got=%v", context.Canceled, err)
886 }
887 }
888
889 func TestHTTPClusterClientAutoSyncFail(t *testing.T) {
890 cf := newStaticHTTPClientFactory([]staticHTTPResponse{
891 {err: errors.New("fail!")},
892 })
893
894 hc := &httpClusterClient{
895 clientFactory: cf,
896 rand: rand.New(rand.NewSource(0)),
897 }
898 err := hc.SetEndpoints([]string{"http://127.0.0.1:2379"})
899 if err != nil {
900 t.Fatalf("unexpected error during setup: %#v", err)
901 }
902
903 err = hc.AutoSync(context.Background(), time.Hour)
904 if !strings.HasPrefix(err.Error(), ErrClusterUnavailable.Error()) {
905 t.Fatalf("incorrect error value: want=%v got=%v", ErrClusterUnavailable, err)
906 }
907 }
908
909 func TestHTTPClusterClientGetVersion(t *testing.T) {
910 body := []byte(`{"etcdserver":"2.3.2","etcdcluster":"2.3.0"}`)
911 cf := newStaticHTTPClientFactory([]staticHTTPResponse{
912 {
913 resp: http.Response{StatusCode: http.StatusOK, Header: http.Header{"Content-Length": []string{"44"}}},
914 body: body,
915 },
916 })
917
918 hc := &httpClusterClient{
919 clientFactory: cf,
920 rand: rand.New(rand.NewSource(0)),
921 }
922 err := hc.SetEndpoints([]string{"http://127.0.0.1:4003", "http://127.0.0.1:2379", "http://127.0.0.1:4001", "http://127.0.0.1:4002"})
923 if err != nil {
924 t.Fatalf("unexpected error during setup: %#v", err)
925 }
926
927 actual, err := hc.GetVersion(context.Background())
928 if err != nil {
929 t.Errorf("non-nil error: %#v", err)
930 }
931 expected := version.Versions{Server: "2.3.2", Cluster: "2.3.0"}
932 if !reflect.DeepEqual(&expected, actual) {
933 t.Errorf("incorrect Response: want=%#v got=%#v", expected, actual)
934 }
935 }
936
937
938
939 func TestHTTPClusterClientSyncPinEndpoint(t *testing.T) {
940 cf := newStaticHTTPClientFactory([]staticHTTPResponse{
941 {
942 resp: http.Response{StatusCode: http.StatusOK, Header: http.Header{"Content-Type": []string{"application/json"}}},
943 body: []byte(`{"members":[{"id":"2745e2525fce8fe","peerURLs":["http://127.0.0.1:7003"],"name":"node3","clientURLs":["http://127.0.0.1:4003"]},{"id":"42134f434382925","peerURLs":["http://127.0.0.1:2380","http://127.0.0.1:7001"],"name":"node1","clientURLs":["http://127.0.0.1:2379","http://127.0.0.1:4001"]},{"id":"94088180e21eb87b","peerURLs":["http://127.0.0.1:7002"],"name":"node2","clientURLs":["http://127.0.0.1:4002"]}]}`),
944 },
945 {
946 resp: http.Response{StatusCode: http.StatusOK, Header: http.Header{"Content-Type": []string{"application/json"}}},
947 body: []byte(`{"members":[{"id":"2745e2525fce8fe","peerURLs":["http://127.0.0.1:7003"],"name":"node3","clientURLs":["http://127.0.0.1:4003"]},{"id":"42134f434382925","peerURLs":["http://127.0.0.1:2380","http://127.0.0.1:7001"],"name":"node1","clientURLs":["http://127.0.0.1:2379","http://127.0.0.1:4001"]},{"id":"94088180e21eb87b","peerURLs":["http://127.0.0.1:7002"],"name":"node2","clientURLs":["http://127.0.0.1:4002"]}]}`),
948 },
949 {
950 resp: http.Response{StatusCode: http.StatusOK, Header: http.Header{"Content-Type": []string{"application/json"}}},
951 body: []byte(`{"members":[{"id":"2745e2525fce8fe","peerURLs":["http://127.0.0.1:7003"],"name":"node3","clientURLs":["http://127.0.0.1:4003"]},{"id":"42134f434382925","peerURLs":["http://127.0.0.1:2380","http://127.0.0.1:7001"],"name":"node1","clientURLs":["http://127.0.0.1:2379","http://127.0.0.1:4001"]},{"id":"94088180e21eb87b","peerURLs":["http://127.0.0.1:7002"],"name":"node2","clientURLs":["http://127.0.0.1:4002"]}]}`),
952 },
953 })
954
955 hc := &httpClusterClient{
956 clientFactory: cf,
957 rand: rand.New(rand.NewSource(0)),
958 }
959 err := hc.SetEndpoints([]string{"http://127.0.0.1:4003", "http://127.0.0.1:2379", "http://127.0.0.1:4001", "http://127.0.0.1:4002"})
960 if err != nil {
961 t.Fatalf("unexpected error during setup: %#v", err)
962 }
963 pinnedEndpoint := hc.endpoints[hc.pinned]
964
965 for i := 0; i < 3; i++ {
966 err = hc.Sync(context.Background())
967 if err != nil {
968 t.Fatalf("#%d: unexpected error during Sync: %#v", i, err)
969 }
970
971 if g := hc.endpoints[hc.pinned]; g != pinnedEndpoint {
972 t.Errorf("#%d: pinned endpoint = %v, want %v", i, g, pinnedEndpoint)
973 }
974 }
975 }
976
977
978
979 func TestHTTPClusterClientSyncUnpinEndpoint(t *testing.T) {
980 cf := newStaticHTTPClientFactory([]staticHTTPResponse{
981 {
982 resp: http.Response{StatusCode: http.StatusOK, Header: http.Header{"Content-Type": []string{"application/json"}}},
983 body: []byte(`{"members":[{"id":"2745e2525fce8fe","peerURLs":["http://127.0.0.1:7003"],"name":"node3","clientURLs":["http://127.0.0.1:4003"]},{"id":"42134f434382925","peerURLs":["http://127.0.0.1:2380","http://127.0.0.1:7001"],"name":"node1","clientURLs":["http://127.0.0.1:2379","http://127.0.0.1:4001"]},{"id":"94088180e21eb87b","peerURLs":["http://127.0.0.1:7002"],"name":"node2","clientURLs":["http://127.0.0.1:4002"]}]}`),
984 },
985 {
986 resp: http.Response{StatusCode: http.StatusOK, Header: http.Header{"Content-Type": []string{"application/json"}}},
987 body: []byte(`{"members":[{"id":"42134f434382925","peerURLs":["http://127.0.0.1:2380","http://127.0.0.1:7001"],"name":"node1","clientURLs":["http://127.0.0.1:2379","http://127.0.0.1:4001"]},{"id":"94088180e21eb87b","peerURLs":["http://127.0.0.1:7002"],"name":"node2","clientURLs":["http://127.0.0.1:4002"]}]}`),
988 },
989 {
990 resp: http.Response{StatusCode: http.StatusOK, Header: http.Header{"Content-Type": []string{"application/json"}}},
991 body: []byte(`{"members":[{"id":"2745e2525fce8fe","peerURLs":["http://127.0.0.1:7003"],"name":"node3","clientURLs":["http://127.0.0.1:4003"]},{"id":"42134f434382925","peerURLs":["http://127.0.0.1:2380","http://127.0.0.1:7001"],"name":"node1","clientURLs":["http://127.0.0.1:2379","http://127.0.0.1:4001"]},{"id":"94088180e21eb87b","peerURLs":["http://127.0.0.1:7002"],"name":"node2","clientURLs":["http://127.0.0.1:4002"]}]}`),
992 },
993 })
994
995 hc := &httpClusterClient{
996 clientFactory: cf,
997 rand: rand.New(rand.NewSource(0)),
998 }
999 err := hc.SetEndpoints([]string{"http://127.0.0.1:4003", "http://127.0.0.1:2379", "http://127.0.0.1:4001", "http://127.0.0.1:4002"})
1000 if err != nil {
1001 t.Fatalf("unexpected error during setup: %#v", err)
1002 }
1003 wants := []string{"http://127.0.0.1:2379", "http://127.0.0.1:4001", "http://127.0.0.1:4002"}
1004
1005 for i := 0; i < 3; i++ {
1006 err = hc.Sync(context.Background())
1007 if err != nil {
1008 t.Fatalf("#%d: unexpected error during Sync: %#v", i, err)
1009 }
1010
1011 if g := hc.endpoints[hc.pinned]; g.String() != wants[i] {
1012 t.Errorf("#%d: pinned endpoint = %v, want %v", i, g, wants[i])
1013 }
1014 }
1015 }
1016
1017
1018
1019 func TestHTTPClusterClientSyncPinLeaderEndpoint(t *testing.T) {
1020 cf := newStaticHTTPClientFactory([]staticHTTPResponse{
1021 {
1022 resp: http.Response{StatusCode: http.StatusOK, Header: http.Header{"Content-Type": []string{"application/json"}}},
1023 body: []byte(`{"members":[{"id":"2745e2525fce8fe","peerURLs":["http://127.0.0.1:7003"],"name":"node3","clientURLs":["http://127.0.0.1:4003"]},{"id":"42134f434382925","peerURLs":["http://127.0.0.1:2380","http://127.0.0.1:7001"],"name":"node1","clientURLs":["http://127.0.0.1:2379","http://127.0.0.1:4001"]},{"id":"94088180e21eb87b","peerURLs":["http://127.0.0.1:7002"],"name":"node2","clientURLs":["http://127.0.0.1:4002"]}]}`),
1024 },
1025 {
1026 resp: http.Response{StatusCode: http.StatusOK, Header: http.Header{"Content-Type": []string{"application/json"}}},
1027 body: []byte(`{"id":"2745e2525fce8fe","peerURLs":["http://127.0.0.1:7003"],"name":"node3","clientURLs":["http://127.0.0.1:4003"]}`),
1028 },
1029 {
1030 resp: http.Response{StatusCode: http.StatusOK, Header: http.Header{"Content-Type": []string{"application/json"}}},
1031 body: []byte(`{"members":[{"id":"2745e2525fce8fe","peerURLs":["http://127.0.0.1:7003"],"name":"node3","clientURLs":["http://127.0.0.1:4003"]},{"id":"42134f434382925","peerURLs":["http://127.0.0.1:2380","http://127.0.0.1:7001"],"name":"node1","clientURLs":["http://127.0.0.1:2379","http://127.0.0.1:4001"]},{"id":"94088180e21eb87b","peerURLs":["http://127.0.0.1:7002"],"name":"node2","clientURLs":["http://127.0.0.1:4002"]}]}`),
1032 },
1033 {
1034 resp: http.Response{StatusCode: http.StatusOK, Header: http.Header{"Content-Type": []string{"application/json"}}},
1035 body: []byte(`{"id":"94088180e21eb87b","peerURLs":["http://127.0.0.1:7002"],"name":"node2","clientURLs":["http://127.0.0.1:4002"]}`),
1036 },
1037 })
1038
1039 hc := &httpClusterClient{
1040 clientFactory: cf,
1041 rand: rand.New(rand.NewSource(0)),
1042 selectionMode: EndpointSelectionPrioritizeLeader,
1043 endpoints: []url.URL{{}},
1044 }
1045
1046 wants := []string{"http://127.0.0.1:4003", "http://127.0.0.1:4002"}
1047
1048 for i, want := range wants {
1049 err := hc.Sync(context.Background())
1050 if err != nil {
1051 t.Fatalf("#%d: unexpected error during Sync: %#v", i, err)
1052 }
1053
1054 pinned := hc.endpoints[hc.pinned].String()
1055 if pinned != want {
1056 t.Errorf("#%d: pinned endpoint = %v, want %v", i, pinned, want)
1057 }
1058 }
1059 }
1060
1061 func TestHTTPClusterClientResetFail(t *testing.T) {
1062 tests := [][]string{
1063
1064 {},
1065
1066
1067 {":"},
1068 }
1069
1070 for i, tt := range tests {
1071 hc := &httpClusterClient{rand: rand.New(rand.NewSource(0))}
1072 err := hc.SetEndpoints(tt)
1073 if err == nil {
1074 t.Errorf("#%d: expected non-nil error", i)
1075 }
1076 }
1077 }
1078
1079 func TestHTTPClusterClientResetPinRandom(t *testing.T) {
1080 round := 2000
1081 pinNum := 0
1082 for i := 0; i < round; i++ {
1083 hc := &httpClusterClient{rand: rand.New(rand.NewSource(int64(i)))}
1084 err := hc.SetEndpoints([]string{"http://127.0.0.1:4001", "http://127.0.0.1:4002", "http://127.0.0.1:4003"})
1085 if err != nil {
1086 t.Fatalf("#%d: reset error (%v)", i, err)
1087 }
1088 if hc.endpoints[hc.pinned].String() == "http://127.0.0.1:4001" {
1089 pinNum++
1090 }
1091 }
1092
1093 min := 1.0/3.0 - 0.05
1094 max := 1.0/3.0 + 0.05
1095 if ratio := float64(pinNum) / float64(round); ratio > max || ratio < min {
1096 t.Errorf("pinned ratio = %v, want [%v, %v]", ratio, min, max)
1097 }
1098 }
1099
View as plain text