1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package client
16
17 import (
18 "context"
19 "encoding/json"
20 "errors"
21 "fmt"
22 "io/ioutil"
23 "math/rand"
24 "net"
25 "net/http"
26 "net/url"
27 "sort"
28 "strconv"
29 "sync"
30 "time"
31
32 "go.etcd.io/etcd/api/v3/version"
33 )
34
35 var (
36 ErrNoEndpoints = errors.New("client: no endpoints available")
37 ErrTooManyRedirects = errors.New("client: too many redirects")
38 ErrClusterUnavailable = errors.New("client: etcd cluster is unavailable or misconfigured")
39 ErrNoLeaderEndpoint = errors.New("client: no leader endpoint available")
40 errTooManyRedirectChecks = errors.New("client: too many redirect checks")
41
42
43
44 oneShotCtxValue interface{}
45 )
46
47 var DefaultRequestTimeout = 5 * time.Second
48
49 var DefaultTransport CancelableTransport = &http.Transport{
50 Proxy: http.ProxyFromEnvironment,
51 DialContext: (&net.Dialer{
52 Timeout: 30 * time.Second,
53 KeepAlive: 30 * time.Second,
54 }).DialContext,
55 TLSHandshakeTimeout: 10 * time.Second,
56 }
57
58 type EndpointSelectionMode int
59
60 const (
61
62
63
64
65
66 EndpointSelectionRandom EndpointSelectionMode = iota
67
68
69
70
71
72
73
74
75
76
77
78 EndpointSelectionPrioritizeLeader
79 )
80
81 type Config struct {
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97 Endpoints []string
98
99
100
101 Transport CancelableTransport
102
103
104
105
106
107
108
109
110
111
112 CheckRedirect CheckRedirectFunc
113
114
115 Username string
116
117
118
119 Password string
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138 HeaderTimeoutPerRequest time.Duration
139
140
141
142 SelectionMode EndpointSelectionMode
143 }
144
145 func (cfg *Config) transport() CancelableTransport {
146 if cfg.Transport == nil {
147 return DefaultTransport
148 }
149 return cfg.Transport
150 }
151
152 func (cfg *Config) checkRedirect() CheckRedirectFunc {
153 if cfg.CheckRedirect == nil {
154 return DefaultCheckRedirect
155 }
156 return cfg.CheckRedirect
157 }
158
159
160
161 type CancelableTransport interface {
162 http.RoundTripper
163 CancelRequest(req *http.Request)
164 }
165
166 type CheckRedirectFunc func(via int) error
167
168
169 var DefaultCheckRedirect CheckRedirectFunc = func(via int) error {
170 if via > 10 {
171 return ErrTooManyRedirects
172 }
173 return nil
174 }
175
176 type Client interface {
177
178 Sync(context.Context) error
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194 AutoSync(context.Context, time.Duration) error
195
196
197
198
199 Endpoints() []string
200
201
202
203
204 SetEndpoints(eps []string) error
205
206
207 GetVersion(ctx context.Context) (*version.Versions, error)
208
209 httpClient
210 }
211
212 func New(cfg Config) (Client, error) {
213 c := &httpClusterClient{
214 clientFactory: newHTTPClientFactory(cfg.transport(), cfg.checkRedirect(), cfg.HeaderTimeoutPerRequest),
215 rand: rand.New(rand.NewSource(int64(time.Now().Nanosecond()))),
216 selectionMode: cfg.SelectionMode,
217 }
218 if cfg.Username != "" {
219 c.credentials = &credentials{
220 username: cfg.Username,
221 password: cfg.Password,
222 }
223 }
224 if err := c.SetEndpoints(cfg.Endpoints); err != nil {
225 return nil, err
226 }
227 return c, nil
228 }
229
230 type httpClient interface {
231 Do(context.Context, httpAction) (*http.Response, []byte, error)
232 }
233
234 func newHTTPClientFactory(tr CancelableTransport, cr CheckRedirectFunc, headerTimeout time.Duration) httpClientFactory {
235 return func(ep url.URL) httpClient {
236 return &redirectFollowingHTTPClient{
237 checkRedirect: cr,
238 client: &simpleHTTPClient{
239 transport: tr,
240 endpoint: ep,
241 headerTimeout: headerTimeout,
242 },
243 }
244 }
245 }
246
247 type credentials struct {
248 username string
249 password string
250 }
251
252 type httpClientFactory func(url.URL) httpClient
253
254 type httpAction interface {
255 HTTPRequest(url.URL) *http.Request
256 }
257
258 type httpClusterClient struct {
259 clientFactory httpClientFactory
260 endpoints []url.URL
261 pinned int
262 credentials *credentials
263 sync.RWMutex
264 rand *rand.Rand
265 selectionMode EndpointSelectionMode
266 }
267
268 func (c *httpClusterClient) getLeaderEndpoint(ctx context.Context, eps []url.URL) (string, error) {
269 ceps := make([]url.URL, len(eps))
270 copy(ceps, eps)
271
272
273
274 clientCopy := &httpClusterClient{
275 clientFactory: c.clientFactory,
276 credentials: c.credentials,
277 rand: c.rand,
278
279 pinned: 0,
280 endpoints: ceps,
281 }
282
283 mAPI := NewMembersAPI(clientCopy)
284 leader, err := mAPI.Leader(ctx)
285 if err != nil {
286 return "", err
287 }
288 if len(leader.ClientURLs) == 0 {
289 return "", ErrNoLeaderEndpoint
290 }
291
292 return leader.ClientURLs[0], nil
293 }
294
295 func (c *httpClusterClient) parseEndpoints(eps []string) ([]url.URL, error) {
296 if len(eps) == 0 {
297 return []url.URL{}, ErrNoEndpoints
298 }
299
300 neps := make([]url.URL, len(eps))
301 for i, ep := range eps {
302 u, err := url.Parse(ep)
303 if err != nil {
304 return []url.URL{}, err
305 }
306 neps[i] = *u
307 }
308 return neps, nil
309 }
310
311 func (c *httpClusterClient) SetEndpoints(eps []string) error {
312 neps, err := c.parseEndpoints(eps)
313 if err != nil {
314 return err
315 }
316
317 c.Lock()
318 defer c.Unlock()
319
320 c.endpoints = shuffleEndpoints(c.rand, neps)
321
322
323
324
325
326
327 c.pinned = 0
328
329 return nil
330 }
331
332 func (c *httpClusterClient) Do(ctx context.Context, act httpAction) (*http.Response, []byte, error) {
333 action := act
334 c.RLock()
335 leps := len(c.endpoints)
336 eps := make([]url.URL, leps)
337 n := copy(eps, c.endpoints)
338 pinned := c.pinned
339
340 if c.credentials != nil {
341 action = &authedAction{
342 act: act,
343 credentials: *c.credentials,
344 }
345 }
346 c.RUnlock()
347
348 if leps == 0 {
349 return nil, nil, ErrNoEndpoints
350 }
351
352 if leps != n {
353 return nil, nil, errors.New("unable to pick endpoint: copy failed")
354 }
355
356 var resp *http.Response
357 var body []byte
358 var err error
359 cerr := &ClusterError{}
360 isOneShot := ctx.Value(&oneShotCtxValue) != nil
361
362 for i := pinned; i < leps+pinned; i++ {
363 k := i % leps
364 hc := c.clientFactory(eps[k])
365 resp, body, err = hc.Do(ctx, action)
366 if err != nil {
367 cerr.Errors = append(cerr.Errors, err)
368 if err == ctx.Err() {
369 return nil, nil, ctx.Err()
370 }
371 if err == context.Canceled || err == context.DeadlineExceeded {
372 return nil, nil, err
373 }
374 } else if resp.StatusCode/100 == 5 {
375 switch resp.StatusCode {
376 case http.StatusInternalServerError, http.StatusServiceUnavailable:
377
378 cerr.Errors = append(cerr.Errors, fmt.Errorf("client: etcd member %s has no leader", eps[k].String()))
379 default:
380 cerr.Errors = append(cerr.Errors, fmt.Errorf("client: etcd member %s returns server error [%s]", eps[k].String(), http.StatusText(resp.StatusCode)))
381 }
382 err = cerr.Errors[0]
383 }
384 if err != nil {
385 if !isOneShot {
386 continue
387 }
388 c.Lock()
389 c.pinned = (k + 1) % leps
390 c.Unlock()
391 return nil, nil, err
392 }
393 if k != pinned {
394 c.Lock()
395 c.pinned = k
396 c.Unlock()
397 }
398 return resp, body, nil
399 }
400
401 return nil, nil, cerr
402 }
403
404 func (c *httpClusterClient) Endpoints() []string {
405 c.RLock()
406 defer c.RUnlock()
407
408 eps := make([]string, len(c.endpoints))
409 for i, ep := range c.endpoints {
410 eps[i] = ep.String()
411 }
412
413 return eps
414 }
415
416 func (c *httpClusterClient) Sync(ctx context.Context) error {
417 mAPI := NewMembersAPI(c)
418 ms, err := mAPI.List(ctx)
419 if err != nil {
420 return err
421 }
422
423 var eps []string
424 for _, m := range ms {
425 eps = append(eps, m.ClientURLs...)
426 }
427
428 neps, err := c.parseEndpoints(eps)
429 if err != nil {
430 return err
431 }
432
433 npin := 0
434
435 switch c.selectionMode {
436 case EndpointSelectionRandom:
437 c.RLock()
438 eq := endpointsEqual(c.endpoints, neps)
439 c.RUnlock()
440
441 if eq {
442 return nil
443 }
444
445 neps = shuffleEndpoints(c.rand, neps)
446 case EndpointSelectionPrioritizeLeader:
447 nle, err := c.getLeaderEndpoint(ctx, neps)
448 if err != nil {
449 return ErrNoLeaderEndpoint
450 }
451
452 for i, n := range neps {
453 if n.String() == nle {
454 npin = i
455 break
456 }
457 }
458 default:
459 return fmt.Errorf("invalid endpoint selection mode: %d", c.selectionMode)
460 }
461
462 c.Lock()
463 defer c.Unlock()
464 c.endpoints = neps
465 c.pinned = npin
466
467 return nil
468 }
469
470 func (c *httpClusterClient) AutoSync(ctx context.Context, interval time.Duration) error {
471 ticker := time.NewTicker(interval)
472 defer ticker.Stop()
473 for {
474 err := c.Sync(ctx)
475 if err != nil {
476 return err
477 }
478 select {
479 case <-ctx.Done():
480 return ctx.Err()
481 case <-ticker.C:
482 }
483 }
484 }
485
486 func (c *httpClusterClient) GetVersion(ctx context.Context) (*version.Versions, error) {
487 act := &getAction{Prefix: "/version"}
488
489 resp, body, err := c.Do(ctx, act)
490 if err != nil {
491 return nil, err
492 }
493
494 switch resp.StatusCode {
495 case http.StatusOK:
496 if len(body) == 0 {
497 return nil, ErrEmptyBody
498 }
499 var vresp version.Versions
500 if err := json.Unmarshal(body, &vresp); err != nil {
501 return nil, ErrInvalidJSON
502 }
503 return &vresp, nil
504 default:
505 var etcdErr Error
506 if err := json.Unmarshal(body, &etcdErr); err != nil {
507 return nil, ErrInvalidJSON
508 }
509 return nil, etcdErr
510 }
511 }
512
513 type roundTripResponse struct {
514 resp *http.Response
515 err error
516 }
517
518 type simpleHTTPClient struct {
519 transport CancelableTransport
520 endpoint url.URL
521 headerTimeout time.Duration
522 }
523
524
525
526 var ErrNoRequest = errors.New("no HTTPRequest was available")
527
528 func (c *simpleHTTPClient) Do(ctx context.Context, act httpAction) (*http.Response, []byte, error) {
529 req := act.HTTPRequest(c.endpoint)
530 if req == nil {
531 return nil, nil, ErrNoRequest
532 }
533
534 if err := printcURL(req); err != nil {
535 return nil, nil, err
536 }
537
538 isWait := false
539 if req.URL != nil {
540 ws := req.URL.Query().Get("wait")
541 if len(ws) != 0 {
542 var err error
543 isWait, err = strconv.ParseBool(ws)
544 if err != nil {
545 return nil, nil, fmt.Errorf("wrong wait value %s (%v for %+v)", ws, err, req)
546 }
547 }
548 }
549
550 var hctx context.Context
551 var hcancel context.CancelFunc
552 if !isWait && c.headerTimeout > 0 {
553 hctx, hcancel = context.WithTimeout(ctx, c.headerTimeout)
554 } else {
555 hctx, hcancel = context.WithCancel(ctx)
556 }
557 defer hcancel()
558
559 reqcancel := requestCanceler(c.transport, req)
560
561 rtchan := make(chan roundTripResponse, 1)
562 go func() {
563 resp, err := c.transport.RoundTrip(req)
564 rtchan <- roundTripResponse{resp: resp, err: err}
565 close(rtchan)
566 }()
567
568 var resp *http.Response
569 var err error
570
571 select {
572 case rtresp := <-rtchan:
573 resp, err = rtresp.resp, rtresp.err
574 case <-hctx.Done():
575
576 reqcancel()
577 rtresp := <-rtchan
578 resp = rtresp.resp
579 switch {
580 case ctx.Err() != nil:
581 err = ctx.Err()
582 case hctx.Err() != nil:
583 err = fmt.Errorf("client: endpoint %s exceeded header timeout", c.endpoint.String())
584 default:
585 panic("failed to get error from context")
586 }
587 }
588
589
590
591 defer func() {
592 if resp != nil {
593 resp.Body.Close()
594 }
595 }()
596
597 if err != nil {
598 return nil, nil, err
599 }
600
601 var body []byte
602 done := make(chan struct{})
603 go func() {
604 body, err = ioutil.ReadAll(resp.Body)
605 done <- struct{}{}
606 }()
607
608 select {
609 case <-ctx.Done():
610 resp.Body.Close()
611 <-done
612 return nil, nil, ctx.Err()
613 case <-done:
614 }
615
616 return resp, body, err
617 }
618
619 type authedAction struct {
620 act httpAction
621 credentials credentials
622 }
623
624 func (a *authedAction) HTTPRequest(url url.URL) *http.Request {
625 r := a.act.HTTPRequest(url)
626 r.SetBasicAuth(a.credentials.username, a.credentials.password)
627 return r
628 }
629
630 type redirectFollowingHTTPClient struct {
631 client httpClient
632 checkRedirect CheckRedirectFunc
633 }
634
635 func (r *redirectFollowingHTTPClient) Do(ctx context.Context, act httpAction) (*http.Response, []byte, error) {
636 next := act
637 for i := 0; i < 100; i++ {
638 if i > 0 {
639 if err := r.checkRedirect(i); err != nil {
640 return nil, nil, err
641 }
642 }
643 resp, body, err := r.client.Do(ctx, next)
644 if err != nil {
645 return nil, nil, err
646 }
647 if resp.StatusCode/100 == 3 {
648 hdr := resp.Header.Get("Location")
649 if hdr == "" {
650 return nil, nil, fmt.Errorf("location header not set")
651 }
652 loc, err := url.Parse(hdr)
653 if err != nil {
654 return nil, nil, fmt.Errorf("location header not valid URL: %s", hdr)
655 }
656 next = &redirectedHTTPAction{
657 action: act,
658 location: *loc,
659 }
660 continue
661 }
662 return resp, body, nil
663 }
664
665 return nil, nil, errTooManyRedirectChecks
666 }
667
668 type redirectedHTTPAction struct {
669 action httpAction
670 location url.URL
671 }
672
673 func (r *redirectedHTTPAction) HTTPRequest(ep url.URL) *http.Request {
674 orig := r.action.HTTPRequest(ep)
675 orig.URL = &r.location
676 return orig
677 }
678
679 func shuffleEndpoints(r *rand.Rand, eps []url.URL) []url.URL {
680
681 n := len(eps)
682 p := make([]int, n)
683 for i := 0; i < n; i++ {
684 j := r.Intn(i + 1)
685 p[i] = p[j]
686 p[j] = i
687 }
688 neps := make([]url.URL, n)
689 for i, k := range p {
690 neps[i] = eps[k]
691 }
692 return neps
693 }
694
695 func endpointsEqual(left, right []url.URL) bool {
696 if len(left) != len(right) {
697 return false
698 }
699
700 sLeft := make([]string, len(left))
701 sRight := make([]string, len(right))
702 for i, l := range left {
703 sLeft[i] = l.String()
704 }
705 for i, r := range right {
706 sRight[i] = r.String()
707 }
708
709 sort.Strings(sLeft)
710 sort.Strings(sRight)
711 for i := range sLeft {
712 if sLeft[i] != sRight[i] {
713 return false
714 }
715 }
716 return true
717 }
718
View as plain text