1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package clientv3
16
17 import (
18 "context"
19 "errors"
20 "fmt"
21 "strconv"
22 "strings"
23 "sync"
24 "time"
25
26 "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
27 "go.etcd.io/etcd/client/pkg/v3/logutil"
28 "go.etcd.io/etcd/client/v3/credentials"
29 "go.etcd.io/etcd/client/v3/internal/endpoint"
30 "go.etcd.io/etcd/client/v3/internal/resolver"
31 "go.uber.org/zap"
32 "google.golang.org/grpc"
33 "google.golang.org/grpc/codes"
34 grpccredentials "google.golang.org/grpc/credentials"
35 "google.golang.org/grpc/keepalive"
36 "google.golang.org/grpc/status"
37 )
38
39 var (
40 ErrNoAvailableEndpoints = errors.New("etcdclient: no available endpoints")
41 ErrOldCluster = errors.New("etcdclient: old cluster version")
42 )
43
44
45 type Client struct {
46 Cluster
47 KV
48 Lease
49 Watcher
50 Auth
51 Maintenance
52
53 conn *grpc.ClientConn
54
55 cfg Config
56 creds grpccredentials.TransportCredentials
57 resolver *resolver.EtcdManualResolver
58 mu *sync.RWMutex
59
60 ctx context.Context
61 cancel context.CancelFunc
62
63
64 Username string
65
66 Password string
67 authTokenBundle credentials.Bundle
68
69 callOpts []grpc.CallOption
70
71 lgMu *sync.RWMutex
72 lg *zap.Logger
73 }
74
75
76 func New(cfg Config) (*Client, error) {
77 if len(cfg.Endpoints) == 0 {
78 return nil, ErrNoAvailableEndpoints
79 }
80
81 return newClient(&cfg)
82 }
83
84
85
86
87 func NewCtxClient(ctx context.Context, opts ...Option) *Client {
88 cctx, cancel := context.WithCancel(ctx)
89 c := &Client{ctx: cctx, cancel: cancel, lgMu: new(sync.RWMutex)}
90 for _, opt := range opts {
91 opt(c)
92 }
93 if c.lg == nil {
94 c.lg = zap.NewNop()
95 }
96 return c
97 }
98
99
100 type Option func(*Client)
101
102
103 func NewFromURL(url string) (*Client, error) {
104 return New(Config{Endpoints: []string{url}})
105 }
106
107
108 func NewFromURLs(urls []string) (*Client, error) {
109 return New(Config{Endpoints: urls})
110 }
111
112
113 func WithZapLogger(lg *zap.Logger) Option {
114 return func(c *Client) {
115 c.lg = lg
116 }
117 }
118
119
120
121
122
123
124
125 func (c *Client) WithLogger(lg *zap.Logger) *Client {
126 c.lgMu.Lock()
127 c.lg = lg
128 c.lgMu.Unlock()
129 return c
130 }
131
132
133
134 func (c *Client) GetLogger() *zap.Logger {
135 c.lgMu.RLock()
136 l := c.lg
137 c.lgMu.RUnlock()
138 return l
139 }
140
141
142 func (c *Client) Close() error {
143 c.cancel()
144 if c.Watcher != nil {
145 c.Watcher.Close()
146 }
147 if c.Lease != nil {
148 c.Lease.Close()
149 }
150 if c.conn != nil {
151 return toErr(c.ctx, c.conn.Close())
152 }
153 return c.ctx.Err()
154 }
155
156
157
158
159 func (c *Client) Ctx() context.Context { return c.ctx }
160
161
162 func (c *Client) Endpoints() []string {
163
164 c.mu.RLock()
165 defer c.mu.RUnlock()
166 eps := make([]string, len(c.cfg.Endpoints))
167 copy(eps, c.cfg.Endpoints)
168 return eps
169 }
170
171
172 func (c *Client) SetEndpoints(eps ...string) {
173 c.mu.Lock()
174 defer c.mu.Unlock()
175 c.cfg.Endpoints = eps
176
177 c.resolver.SetEndpoints(eps)
178 }
179
180
181 func (c *Client) Sync(ctx context.Context) error {
182 mresp, err := c.MemberList(ctx)
183 if err != nil {
184 return err
185 }
186 var eps []string
187 for _, m := range mresp.Members {
188 if len(m.Name) != 0 && !m.IsLearner {
189 eps = append(eps, m.ClientURLs...)
190 }
191 }
192 c.SetEndpoints(eps...)
193 return nil
194 }
195
196 func (c *Client) autoSync() {
197 if c.cfg.AutoSyncInterval == time.Duration(0) {
198 return
199 }
200
201 for {
202 select {
203 case <-c.ctx.Done():
204 return
205 case <-time.After(c.cfg.AutoSyncInterval):
206 ctx, cancel := context.WithTimeout(c.ctx, 5*time.Second)
207 err := c.Sync(ctx)
208 cancel()
209 if err != nil && err != c.ctx.Err() {
210 c.lg.Info("Auto sync endpoints failed.", zap.Error(err))
211 }
212 }
213 }
214 }
215
216
217 func (c *Client) dialSetupOpts(creds grpccredentials.TransportCredentials, dopts ...grpc.DialOption) (opts []grpc.DialOption, err error) {
218 if c.cfg.DialKeepAliveTime > 0 {
219 params := keepalive.ClientParameters{
220 Time: c.cfg.DialKeepAliveTime,
221 Timeout: c.cfg.DialKeepAliveTimeout,
222 PermitWithoutStream: c.cfg.PermitWithoutStream,
223 }
224 opts = append(opts, grpc.WithKeepaliveParams(params))
225 }
226 opts = append(opts, dopts...)
227
228 if creds != nil {
229 opts = append(opts, grpc.WithTransportCredentials(creds))
230 } else {
231 opts = append(opts, grpc.WithInsecure())
232 }
233
234 unaryMaxRetries := defaultUnaryMaxRetries
235 if c.cfg.MaxUnaryRetries > 0 {
236 unaryMaxRetries = c.cfg.MaxUnaryRetries
237 }
238
239 backoffWaitBetween := defaultBackoffWaitBetween
240 if c.cfg.BackoffWaitBetween > 0 {
241 backoffWaitBetween = c.cfg.BackoffWaitBetween
242 }
243
244 backoffJitterFraction := defaultBackoffJitterFraction
245 if c.cfg.BackoffJitterFraction > 0 {
246 backoffJitterFraction = c.cfg.BackoffJitterFraction
247 }
248
249
250
251
252 rrBackoff := withBackoff(c.roundRobinQuorumBackoff(backoffWaitBetween, backoffJitterFraction))
253 opts = append(opts,
254
255
256 grpc.WithStreamInterceptor(c.streamClientInterceptor(withMax(0), rrBackoff)),
257 grpc.WithUnaryInterceptor(c.unaryClientInterceptor(withMax(unaryMaxRetries), rrBackoff)),
258 )
259
260 return opts, nil
261 }
262
263
264 func (c *Client) Dial(ep string) (*grpc.ClientConn, error) {
265 creds := c.credentialsForEndpoint(ep)
266
267
268
269 return c.dial(creds, grpc.WithResolvers(resolver.New(ep)))
270 }
271
272 func (c *Client) getToken(ctx context.Context) error {
273 var err error
274
275 if c.Username == "" || c.Password == "" {
276 return nil
277 }
278
279 resp, err := c.Auth.Authenticate(ctx, c.Username, c.Password)
280 if err != nil {
281 if err == rpctypes.ErrAuthNotEnabled {
282 c.authTokenBundle.UpdateAuthToken("")
283 return nil
284 }
285 return err
286 }
287 c.authTokenBundle.UpdateAuthToken(resp.Token)
288 return nil
289 }
290
291
292
293 func (c *Client) dialWithBalancer(dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
294 creds := c.credentialsForEndpoint(c.Endpoints()[0])
295 opts := append(dopts, grpc.WithResolvers(c.resolver))
296 return c.dial(creds, opts...)
297 }
298
299
300 func (c *Client) dial(creds grpccredentials.TransportCredentials, dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
301 opts, err := c.dialSetupOpts(creds, dopts...)
302 if err != nil {
303 return nil, fmt.Errorf("failed to configure dialer: %v", err)
304 }
305 if c.authTokenBundle != nil {
306 opts = append(opts, grpc.WithPerRPCCredentials(c.authTokenBundle.PerRPCCredentials()))
307 }
308
309 opts = append(opts, c.cfg.DialOptions...)
310
311 dctx := c.ctx
312 if c.cfg.DialTimeout > 0 {
313 var cancel context.CancelFunc
314 dctx, cancel = context.WithTimeout(c.ctx, c.cfg.DialTimeout)
315 defer cancel()
316 }
317 target := fmt.Sprintf("%s://%p/%s", resolver.Schema, c, authority(c.Endpoints()[0]))
318 conn, err := grpc.DialContext(dctx, target, opts...)
319 if err != nil {
320 return nil, err
321 }
322 return conn, nil
323 }
324
325 func authority(endpoint string) string {
326 spl := strings.SplitN(endpoint, "://", 2)
327 if len(spl) < 2 {
328 if strings.HasPrefix(endpoint, "unix:") {
329 return endpoint[len("unix:"):]
330 }
331 if strings.HasPrefix(endpoint, "unixs:") {
332 return endpoint[len("unixs:"):]
333 }
334 return endpoint
335 }
336 return spl[1]
337 }
338
339 func (c *Client) credentialsForEndpoint(ep string) grpccredentials.TransportCredentials {
340 r := endpoint.RequiresCredentials(ep)
341 switch r {
342 case endpoint.CREDS_DROP:
343 return nil
344 case endpoint.CREDS_OPTIONAL:
345 return c.creds
346 case endpoint.CREDS_REQUIRE:
347 if c.creds != nil {
348 return c.creds
349 }
350 return credentials.NewBundle(credentials.Config{}).TransportCredentials()
351 default:
352 panic(fmt.Errorf("unsupported CredsRequirement: %v", r))
353 }
354 }
355
356 func newClient(cfg *Config) (*Client, error) {
357 if cfg == nil {
358 cfg = &Config{}
359 }
360 var creds grpccredentials.TransportCredentials
361 if cfg.TLS != nil {
362 creds = credentials.NewBundle(credentials.Config{TLSConfig: cfg.TLS}).TransportCredentials()
363 }
364
365
366 baseCtx := context.TODO()
367 if cfg.Context != nil {
368 baseCtx = cfg.Context
369 }
370
371 ctx, cancel := context.WithCancel(baseCtx)
372 client := &Client{
373 conn: nil,
374 cfg: *cfg,
375 creds: creds,
376 ctx: ctx,
377 cancel: cancel,
378 mu: new(sync.RWMutex),
379 callOpts: defaultCallOpts,
380 lgMu: new(sync.RWMutex),
381 }
382
383 var err error
384 if cfg.Logger != nil {
385 client.lg = cfg.Logger
386 } else if cfg.LogConfig != nil {
387 client.lg, err = cfg.LogConfig.Build()
388 } else {
389 client.lg, err = logutil.CreateDefaultZapLogger(etcdClientDebugLevel())
390 if client.lg != nil {
391 client.lg = client.lg.Named("etcd-client")
392 }
393 }
394 if err != nil {
395 return nil, err
396 }
397
398 if cfg.Username != "" && cfg.Password != "" {
399 client.Username = cfg.Username
400 client.Password = cfg.Password
401 client.authTokenBundle = credentials.NewBundle(credentials.Config{})
402 }
403 if cfg.MaxCallSendMsgSize > 0 || cfg.MaxCallRecvMsgSize > 0 {
404 if cfg.MaxCallRecvMsgSize > 0 && cfg.MaxCallSendMsgSize > cfg.MaxCallRecvMsgSize {
405 return nil, fmt.Errorf("gRPC message recv limit (%d bytes) must be greater than send limit (%d bytes)", cfg.MaxCallRecvMsgSize, cfg.MaxCallSendMsgSize)
406 }
407 callOpts := []grpc.CallOption{
408 defaultWaitForReady,
409 defaultMaxCallSendMsgSize,
410 defaultMaxCallRecvMsgSize,
411 }
412 if cfg.MaxCallSendMsgSize > 0 {
413 callOpts[1] = grpc.MaxCallSendMsgSize(cfg.MaxCallSendMsgSize)
414 }
415 if cfg.MaxCallRecvMsgSize > 0 {
416 callOpts[2] = grpc.MaxCallRecvMsgSize(cfg.MaxCallRecvMsgSize)
417 }
418 client.callOpts = callOpts
419 }
420
421 client.resolver = resolver.New(cfg.Endpoints...)
422
423 if len(cfg.Endpoints) < 1 {
424 client.cancel()
425 return nil, fmt.Errorf("at least one Endpoint is required in client config")
426 }
427
428
429 conn, err := client.dialWithBalancer()
430 if err != nil {
431 client.cancel()
432 client.resolver.Close()
433
434 return nil, err
435 }
436 client.conn = conn
437
438 client.Cluster = NewCluster(client)
439 client.KV = NewKV(client)
440 client.Lease = NewLease(client)
441 client.Watcher = NewWatcher(client)
442 client.Auth = NewAuth(client)
443 client.Maintenance = NewMaintenance(client)
444
445
446 ctx, cancel = client.ctx, func() {}
447 if client.cfg.DialTimeout > 0 {
448 ctx, cancel = context.WithTimeout(ctx, client.cfg.DialTimeout)
449 }
450 err = client.getToken(ctx)
451 if err != nil {
452 client.Close()
453 cancel()
454
455 return nil, err
456 }
457 cancel()
458
459 if cfg.RejectOldCluster {
460 if err := client.checkVersion(); err != nil {
461 client.Close()
462 return nil, err
463 }
464 }
465
466 go client.autoSync()
467 return client, nil
468 }
469
470
471
472 func (c *Client) roundRobinQuorumBackoff(waitBetween time.Duration, jitterFraction float64) backoffFunc {
473 return func(attempt uint) time.Duration {
474
475 n := uint(len(c.Endpoints()))
476 quorum := (n/2 + 1)
477 if attempt%quorum == 0 {
478 c.lg.Debug("backoff", zap.Uint("attempt", attempt), zap.Uint("quorum", quorum), zap.Duration("waitBetween", waitBetween), zap.Float64("jitterFraction", jitterFraction))
479 return jitterUp(waitBetween, jitterFraction)
480 }
481 c.lg.Debug("backoff skipped", zap.Uint("attempt", attempt), zap.Uint("quorum", quorum))
482 return 0
483 }
484 }
485
486 func (c *Client) checkVersion() (err error) {
487 var wg sync.WaitGroup
488
489 eps := c.Endpoints()
490 errc := make(chan error, len(eps))
491 ctx, cancel := context.WithCancel(c.ctx)
492 if c.cfg.DialTimeout > 0 {
493 cancel()
494 ctx, cancel = context.WithTimeout(c.ctx, c.cfg.DialTimeout)
495 }
496
497 wg.Add(len(eps))
498 for _, ep := range eps {
499
500 go func(e string) {
501 defer wg.Done()
502 resp, rerr := c.Status(ctx, e)
503 if rerr != nil {
504 errc <- rerr
505 return
506 }
507 vs := strings.Split(resp.Version, ".")
508 maj, min := 0, 0
509 if len(vs) >= 2 {
510 var serr error
511 if maj, serr = strconv.Atoi(vs[0]); serr != nil {
512 errc <- serr
513 return
514 }
515 if min, serr = strconv.Atoi(vs[1]); serr != nil {
516 errc <- serr
517 return
518 }
519 }
520 if maj < 3 || (maj == 3 && min < 4) {
521 rerr = ErrOldCluster
522 }
523 errc <- rerr
524 }(ep)
525 }
526
527 for range eps {
528 if err = <-errc; err != nil {
529 break
530 }
531 }
532 cancel()
533 wg.Wait()
534 return err
535 }
536
537
538 func (c *Client) ActiveConnection() *grpc.ClientConn { return c.conn }
539
540
541
542 func isHaltErr(ctx context.Context, err error) bool {
543 if ctx != nil && ctx.Err() != nil {
544 return true
545 }
546 if err == nil {
547 return false
548 }
549 ev, _ := status.FromError(err)
550
551
552
553
554
555
556 return ev.Code() != codes.Unavailable && ev.Code() != codes.Internal
557 }
558
559
560 func isUnavailableErr(ctx context.Context, err error) bool {
561 if ctx != nil && ctx.Err() != nil {
562 return false
563 }
564 if err == nil {
565 return false
566 }
567 ev, ok := status.FromError(err)
568 if ok {
569
570
571 return ev.Code() == codes.Unavailable
572 }
573 return false
574 }
575
576 func toErr(ctx context.Context, err error) error {
577 if err == nil {
578 return nil
579 }
580 err = rpctypes.Error(err)
581 if _, ok := err.(rpctypes.EtcdError); ok {
582 return err
583 }
584 if ev, ok := status.FromError(err); ok {
585 code := ev.Code()
586 switch code {
587 case codes.DeadlineExceeded:
588 fallthrough
589 case codes.Canceled:
590 if ctx.Err() != nil {
591 err = ctx.Err()
592 }
593 }
594 }
595 return err
596 }
597
598 func canceledByCaller(stopCtx context.Context, err error) bool {
599 if stopCtx.Err() == nil || err == nil {
600 return false
601 }
602
603 return err == context.Canceled || err == context.DeadlineExceeded
604 }
605
606
607
608 func IsConnCanceled(err error) bool {
609 if err == nil {
610 return false
611 }
612
613
614 s, ok := status.FromError(err)
615 if ok {
616
617 return s.Code() == codes.Canceled || s.Message() == "transport is closing"
618 }
619
620
621 if err == context.Canceled {
622 return true
623 }
624
625
626 return strings.Contains(err.Error(), "grpc: the client connection is closing")
627 }
628
View as plain text