...

Source file src/go.etcd.io/etcd/client/v3/client.go

Documentation: go.etcd.io/etcd/client/v3

     1  // Copyright 2016 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package clientv3
    16  
    17  import (
    18  	"context"
    19  	"errors"
    20  	"fmt"
    21  	"strconv"
    22  	"strings"
    23  	"sync"
    24  	"time"
    25  
    26  	"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
    27  	"go.etcd.io/etcd/client/pkg/v3/logutil"
    28  	"go.etcd.io/etcd/client/v3/credentials"
    29  	"go.etcd.io/etcd/client/v3/internal/endpoint"
    30  	"go.etcd.io/etcd/client/v3/internal/resolver"
    31  	"go.uber.org/zap"
    32  	"google.golang.org/grpc"
    33  	"google.golang.org/grpc/codes"
    34  	grpccredentials "google.golang.org/grpc/credentials"
    35  	"google.golang.org/grpc/keepalive"
    36  	"google.golang.org/grpc/status"
    37  )
    38  
    39  var (
    40  	ErrNoAvailableEndpoints = errors.New("etcdclient: no available endpoints")
    41  	ErrOldCluster           = errors.New("etcdclient: old cluster version")
    42  )
    43  
    44  // Client provides and manages an etcd v3 client session.
    45  type Client struct {
    46  	Cluster
    47  	KV
    48  	Lease
    49  	Watcher
    50  	Auth
    51  	Maintenance
    52  
    53  	conn *grpc.ClientConn
    54  
    55  	cfg      Config
    56  	creds    grpccredentials.TransportCredentials
    57  	resolver *resolver.EtcdManualResolver
    58  	mu       *sync.RWMutex
    59  
    60  	ctx    context.Context
    61  	cancel context.CancelFunc
    62  
    63  	// Username is a user name for authentication.
    64  	Username string
    65  	// Password is a password for authentication.
    66  	Password        string
    67  	authTokenBundle credentials.Bundle
    68  
    69  	callOpts []grpc.CallOption
    70  
    71  	lgMu *sync.RWMutex
    72  	lg   *zap.Logger
    73  }
    74  
    75  // New creates a new etcdv3 client from a given configuration.
    76  func New(cfg Config) (*Client, error) {
    77  	if len(cfg.Endpoints) == 0 {
    78  		return nil, ErrNoAvailableEndpoints
    79  	}
    80  
    81  	return newClient(&cfg)
    82  }
    83  
    84  // NewCtxClient creates a client with a context but no underlying grpc
    85  // connection. This is useful for embedded cases that override the
    86  // service interface implementations and do not need connection management.
    87  func NewCtxClient(ctx context.Context, opts ...Option) *Client {
    88  	cctx, cancel := context.WithCancel(ctx)
    89  	c := &Client{ctx: cctx, cancel: cancel, lgMu: new(sync.RWMutex)}
    90  	for _, opt := range opts {
    91  		opt(c)
    92  	}
    93  	if c.lg == nil {
    94  		c.lg = zap.NewNop()
    95  	}
    96  	return c
    97  }
    98  
    99  // Option is a function type that can be passed as argument to NewCtxClient to configure client
   100  type Option func(*Client)
   101  
   102  // NewFromURL creates a new etcdv3 client from a URL.
   103  func NewFromURL(url string) (*Client, error) {
   104  	return New(Config{Endpoints: []string{url}})
   105  }
   106  
   107  // NewFromURLs creates a new etcdv3 client from URLs.
   108  func NewFromURLs(urls []string) (*Client, error) {
   109  	return New(Config{Endpoints: urls})
   110  }
   111  
   112  // WithZapLogger is a NewCtxClient option that overrides the logger
   113  func WithZapLogger(lg *zap.Logger) Option {
   114  	return func(c *Client) {
   115  		c.lg = lg
   116  	}
   117  }
   118  
   119  // WithLogger overrides the logger.
   120  //
   121  // Deprecated: Please use WithZapLogger or Logger field in clientv3.Config
   122  //
   123  // Does not changes grpcLogger, that can be explicitly configured
   124  // using grpc_zap.ReplaceGrpcLoggerV2(..) method.
   125  func (c *Client) WithLogger(lg *zap.Logger) *Client {
   126  	c.lgMu.Lock()
   127  	c.lg = lg
   128  	c.lgMu.Unlock()
   129  	return c
   130  }
   131  
   132  // GetLogger gets the logger.
   133  // NOTE: This method is for internal use of etcd-client library and should not be used as general-purpose logger.
   134  func (c *Client) GetLogger() *zap.Logger {
   135  	c.lgMu.RLock()
   136  	l := c.lg
   137  	c.lgMu.RUnlock()
   138  	return l
   139  }
   140  
   141  // Close shuts down the client's etcd connections.
   142  func (c *Client) Close() error {
   143  	c.cancel()
   144  	if c.Watcher != nil {
   145  		c.Watcher.Close()
   146  	}
   147  	if c.Lease != nil {
   148  		c.Lease.Close()
   149  	}
   150  	if c.conn != nil {
   151  		return toErr(c.ctx, c.conn.Close())
   152  	}
   153  	return c.ctx.Err()
   154  }
   155  
   156  // Ctx is a context for "out of band" messages (e.g., for sending
   157  // "clean up" message when another context is canceled). It is
   158  // canceled on client Close().
   159  func (c *Client) Ctx() context.Context { return c.ctx }
   160  
   161  // Endpoints lists the registered endpoints for the client.
   162  func (c *Client) Endpoints() []string {
   163  	// copy the slice; protect original endpoints from being changed
   164  	c.mu.RLock()
   165  	defer c.mu.RUnlock()
   166  	eps := make([]string, len(c.cfg.Endpoints))
   167  	copy(eps, c.cfg.Endpoints)
   168  	return eps
   169  }
   170  
   171  // SetEndpoints updates client's endpoints.
   172  func (c *Client) SetEndpoints(eps ...string) {
   173  	c.mu.Lock()
   174  	defer c.mu.Unlock()
   175  	c.cfg.Endpoints = eps
   176  
   177  	c.resolver.SetEndpoints(eps)
   178  }
   179  
   180  // Sync synchronizes client's endpoints with the known endpoints from the etcd membership.
   181  func (c *Client) Sync(ctx context.Context) error {
   182  	mresp, err := c.MemberList(ctx)
   183  	if err != nil {
   184  		return err
   185  	}
   186  	var eps []string
   187  	for _, m := range mresp.Members {
   188  		if len(m.Name) != 0 && !m.IsLearner {
   189  			eps = append(eps, m.ClientURLs...)
   190  		}
   191  	}
   192  	c.SetEndpoints(eps...)
   193  	return nil
   194  }
   195  
   196  func (c *Client) autoSync() {
   197  	if c.cfg.AutoSyncInterval == time.Duration(0) {
   198  		return
   199  	}
   200  
   201  	for {
   202  		select {
   203  		case <-c.ctx.Done():
   204  			return
   205  		case <-time.After(c.cfg.AutoSyncInterval):
   206  			ctx, cancel := context.WithTimeout(c.ctx, 5*time.Second)
   207  			err := c.Sync(ctx)
   208  			cancel()
   209  			if err != nil && err != c.ctx.Err() {
   210  				c.lg.Info("Auto sync endpoints failed.", zap.Error(err))
   211  			}
   212  		}
   213  	}
   214  }
   215  
   216  // dialSetupOpts gives the dial opts prior to any authentication.
   217  func (c *Client) dialSetupOpts(creds grpccredentials.TransportCredentials, dopts ...grpc.DialOption) (opts []grpc.DialOption, err error) {
   218  	if c.cfg.DialKeepAliveTime > 0 {
   219  		params := keepalive.ClientParameters{
   220  			Time:                c.cfg.DialKeepAliveTime,
   221  			Timeout:             c.cfg.DialKeepAliveTimeout,
   222  			PermitWithoutStream: c.cfg.PermitWithoutStream,
   223  		}
   224  		opts = append(opts, grpc.WithKeepaliveParams(params))
   225  	}
   226  	opts = append(opts, dopts...)
   227  
   228  	if creds != nil {
   229  		opts = append(opts, grpc.WithTransportCredentials(creds))
   230  	} else {
   231  		opts = append(opts, grpc.WithInsecure())
   232  	}
   233  
   234  	unaryMaxRetries := defaultUnaryMaxRetries
   235  	if c.cfg.MaxUnaryRetries > 0 {
   236  		unaryMaxRetries = c.cfg.MaxUnaryRetries
   237  	}
   238  
   239  	backoffWaitBetween := defaultBackoffWaitBetween
   240  	if c.cfg.BackoffWaitBetween > 0 {
   241  		backoffWaitBetween = c.cfg.BackoffWaitBetween
   242  	}
   243  
   244  	backoffJitterFraction := defaultBackoffJitterFraction
   245  	if c.cfg.BackoffJitterFraction > 0 {
   246  		backoffJitterFraction = c.cfg.BackoffJitterFraction
   247  	}
   248  
   249  	// Interceptor retry and backoff.
   250  	// TODO: Replace all of clientv3/retry.go with RetryPolicy:
   251  	// https://github.com/grpc/grpc-proto/blob/cdd9ed5c3d3f87aef62f373b93361cf7bddc620d/grpc/service_config/service_config.proto#L130
   252  	rrBackoff := withBackoff(c.roundRobinQuorumBackoff(backoffWaitBetween, backoffJitterFraction))
   253  	opts = append(opts,
   254  		// Disable stream retry by default since go-grpc-middleware/retry does not support client streams.
   255  		// Streams that are safe to retry are enabled individually.
   256  		grpc.WithStreamInterceptor(c.streamClientInterceptor(withMax(0), rrBackoff)),
   257  		grpc.WithUnaryInterceptor(c.unaryClientInterceptor(withMax(unaryMaxRetries), rrBackoff)),
   258  	)
   259  
   260  	return opts, nil
   261  }
   262  
   263  // Dial connects to a single endpoint using the client's config.
   264  func (c *Client) Dial(ep string) (*grpc.ClientConn, error) {
   265  	creds := c.credentialsForEndpoint(ep)
   266  
   267  	// Using ad-hoc created resolver, to guarantee only explicitly given
   268  	// endpoint is used.
   269  	return c.dial(creds, grpc.WithResolvers(resolver.New(ep)))
   270  }
   271  
   272  func (c *Client) getToken(ctx context.Context) error {
   273  	var err error // return last error in a case of fail
   274  
   275  	if c.Username == "" || c.Password == "" {
   276  		return nil
   277  	}
   278  
   279  	resp, err := c.Auth.Authenticate(ctx, c.Username, c.Password)
   280  	if err != nil {
   281  		if err == rpctypes.ErrAuthNotEnabled {
   282  			c.authTokenBundle.UpdateAuthToken("")
   283  			return nil
   284  		}
   285  		return err
   286  	}
   287  	c.authTokenBundle.UpdateAuthToken(resp.Token)
   288  	return nil
   289  }
   290  
   291  // dialWithBalancer dials the client's current load balanced resolver group.  The scheme of the host
   292  // of the provided endpoint determines the scheme used for all endpoints of the client connection.
   293  func (c *Client) dialWithBalancer(dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
   294  	creds := c.credentialsForEndpoint(c.Endpoints()[0])
   295  	opts := append(dopts, grpc.WithResolvers(c.resolver))
   296  	return c.dial(creds, opts...)
   297  }
   298  
   299  // dial configures and dials any grpc balancer target.
   300  func (c *Client) dial(creds grpccredentials.TransportCredentials, dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
   301  	opts, err := c.dialSetupOpts(creds, dopts...)
   302  	if err != nil {
   303  		return nil, fmt.Errorf("failed to configure dialer: %v", err)
   304  	}
   305  	if c.authTokenBundle != nil {
   306  		opts = append(opts, grpc.WithPerRPCCredentials(c.authTokenBundle.PerRPCCredentials()))
   307  	}
   308  
   309  	opts = append(opts, c.cfg.DialOptions...)
   310  
   311  	dctx := c.ctx
   312  	if c.cfg.DialTimeout > 0 {
   313  		var cancel context.CancelFunc
   314  		dctx, cancel = context.WithTimeout(c.ctx, c.cfg.DialTimeout)
   315  		defer cancel() // TODO: Is this right for cases where grpc.WithBlock() is not set on the dial options?
   316  	}
   317  	target := fmt.Sprintf("%s://%p/%s", resolver.Schema, c, authority(c.Endpoints()[0]))
   318  	conn, err := grpc.DialContext(dctx, target, opts...)
   319  	if err != nil {
   320  		return nil, err
   321  	}
   322  	return conn, nil
   323  }
   324  
   325  func authority(endpoint string) string {
   326  	spl := strings.SplitN(endpoint, "://", 2)
   327  	if len(spl) < 2 {
   328  		if strings.HasPrefix(endpoint, "unix:") {
   329  			return endpoint[len("unix:"):]
   330  		}
   331  		if strings.HasPrefix(endpoint, "unixs:") {
   332  			return endpoint[len("unixs:"):]
   333  		}
   334  		return endpoint
   335  	}
   336  	return spl[1]
   337  }
   338  
   339  func (c *Client) credentialsForEndpoint(ep string) grpccredentials.TransportCredentials {
   340  	r := endpoint.RequiresCredentials(ep)
   341  	switch r {
   342  	case endpoint.CREDS_DROP:
   343  		return nil
   344  	case endpoint.CREDS_OPTIONAL:
   345  		return c.creds
   346  	case endpoint.CREDS_REQUIRE:
   347  		if c.creds != nil {
   348  			return c.creds
   349  		}
   350  		return credentials.NewBundle(credentials.Config{}).TransportCredentials()
   351  	default:
   352  		panic(fmt.Errorf("unsupported CredsRequirement: %v", r))
   353  	}
   354  }
   355  
   356  func newClient(cfg *Config) (*Client, error) {
   357  	if cfg == nil {
   358  		cfg = &Config{}
   359  	}
   360  	var creds grpccredentials.TransportCredentials
   361  	if cfg.TLS != nil {
   362  		creds = credentials.NewBundle(credentials.Config{TLSConfig: cfg.TLS}).TransportCredentials()
   363  	}
   364  
   365  	// use a temporary skeleton client to bootstrap first connection
   366  	baseCtx := context.TODO()
   367  	if cfg.Context != nil {
   368  		baseCtx = cfg.Context
   369  	}
   370  
   371  	ctx, cancel := context.WithCancel(baseCtx)
   372  	client := &Client{
   373  		conn:     nil,
   374  		cfg:      *cfg,
   375  		creds:    creds,
   376  		ctx:      ctx,
   377  		cancel:   cancel,
   378  		mu:       new(sync.RWMutex),
   379  		callOpts: defaultCallOpts,
   380  		lgMu:     new(sync.RWMutex),
   381  	}
   382  
   383  	var err error
   384  	if cfg.Logger != nil {
   385  		client.lg = cfg.Logger
   386  	} else if cfg.LogConfig != nil {
   387  		client.lg, err = cfg.LogConfig.Build()
   388  	} else {
   389  		client.lg, err = logutil.CreateDefaultZapLogger(etcdClientDebugLevel())
   390  		if client.lg != nil {
   391  			client.lg = client.lg.Named("etcd-client")
   392  		}
   393  	}
   394  	if err != nil {
   395  		return nil, err
   396  	}
   397  
   398  	if cfg.Username != "" && cfg.Password != "" {
   399  		client.Username = cfg.Username
   400  		client.Password = cfg.Password
   401  		client.authTokenBundle = credentials.NewBundle(credentials.Config{})
   402  	}
   403  	if cfg.MaxCallSendMsgSize > 0 || cfg.MaxCallRecvMsgSize > 0 {
   404  		if cfg.MaxCallRecvMsgSize > 0 && cfg.MaxCallSendMsgSize > cfg.MaxCallRecvMsgSize {
   405  			return nil, fmt.Errorf("gRPC message recv limit (%d bytes) must be greater than send limit (%d bytes)", cfg.MaxCallRecvMsgSize, cfg.MaxCallSendMsgSize)
   406  		}
   407  		callOpts := []grpc.CallOption{
   408  			defaultWaitForReady,
   409  			defaultMaxCallSendMsgSize,
   410  			defaultMaxCallRecvMsgSize,
   411  		}
   412  		if cfg.MaxCallSendMsgSize > 0 {
   413  			callOpts[1] = grpc.MaxCallSendMsgSize(cfg.MaxCallSendMsgSize)
   414  		}
   415  		if cfg.MaxCallRecvMsgSize > 0 {
   416  			callOpts[2] = grpc.MaxCallRecvMsgSize(cfg.MaxCallRecvMsgSize)
   417  		}
   418  		client.callOpts = callOpts
   419  	}
   420  
   421  	client.resolver = resolver.New(cfg.Endpoints...)
   422  
   423  	if len(cfg.Endpoints) < 1 {
   424  		client.cancel()
   425  		return nil, fmt.Errorf("at least one Endpoint is required in client config")
   426  	}
   427  	// Use a provided endpoint target so that for https:// without any tls config given, then
   428  	// grpc will assume the certificate server name is the endpoint host.
   429  	conn, err := client.dialWithBalancer()
   430  	if err != nil {
   431  		client.cancel()
   432  		client.resolver.Close()
   433  		// TODO: Error like `fmt.Errorf(dialing [%s] failed: %v, strings.Join(cfg.Endpoints, ";"), err)` would help with debugging a lot.
   434  		return nil, err
   435  	}
   436  	client.conn = conn
   437  
   438  	client.Cluster = NewCluster(client)
   439  	client.KV = NewKV(client)
   440  	client.Lease = NewLease(client)
   441  	client.Watcher = NewWatcher(client)
   442  	client.Auth = NewAuth(client)
   443  	client.Maintenance = NewMaintenance(client)
   444  
   445  	//get token with established connection
   446  	ctx, cancel = client.ctx, func() {}
   447  	if client.cfg.DialTimeout > 0 {
   448  		ctx, cancel = context.WithTimeout(ctx, client.cfg.DialTimeout)
   449  	}
   450  	err = client.getToken(ctx)
   451  	if err != nil {
   452  		client.Close()
   453  		cancel()
   454  		//TODO: Consider fmt.Errorf("communicating with [%s] failed: %v", strings.Join(cfg.Endpoints, ";"), err)
   455  		return nil, err
   456  	}
   457  	cancel()
   458  
   459  	if cfg.RejectOldCluster {
   460  		if err := client.checkVersion(); err != nil {
   461  			client.Close()
   462  			return nil, err
   463  		}
   464  	}
   465  
   466  	go client.autoSync()
   467  	return client, nil
   468  }
   469  
   470  // roundRobinQuorumBackoff retries against quorum between each backoff.
   471  // This is intended for use with a round robin load balancer.
   472  func (c *Client) roundRobinQuorumBackoff(waitBetween time.Duration, jitterFraction float64) backoffFunc {
   473  	return func(attempt uint) time.Duration {
   474  		// after each round robin across quorum, backoff for our wait between duration
   475  		n := uint(len(c.Endpoints()))
   476  		quorum := (n/2 + 1)
   477  		if attempt%quorum == 0 {
   478  			c.lg.Debug("backoff", zap.Uint("attempt", attempt), zap.Uint("quorum", quorum), zap.Duration("waitBetween", waitBetween), zap.Float64("jitterFraction", jitterFraction))
   479  			return jitterUp(waitBetween, jitterFraction)
   480  		}
   481  		c.lg.Debug("backoff skipped", zap.Uint("attempt", attempt), zap.Uint("quorum", quorum))
   482  		return 0
   483  	}
   484  }
   485  
   486  func (c *Client) checkVersion() (err error) {
   487  	var wg sync.WaitGroup
   488  
   489  	eps := c.Endpoints()
   490  	errc := make(chan error, len(eps))
   491  	ctx, cancel := context.WithCancel(c.ctx)
   492  	if c.cfg.DialTimeout > 0 {
   493  		cancel()
   494  		ctx, cancel = context.WithTimeout(c.ctx, c.cfg.DialTimeout)
   495  	}
   496  
   497  	wg.Add(len(eps))
   498  	for _, ep := range eps {
   499  		// if cluster is current, any endpoint gives a recent version
   500  		go func(e string) {
   501  			defer wg.Done()
   502  			resp, rerr := c.Status(ctx, e)
   503  			if rerr != nil {
   504  				errc <- rerr
   505  				return
   506  			}
   507  			vs := strings.Split(resp.Version, ".")
   508  			maj, min := 0, 0
   509  			if len(vs) >= 2 {
   510  				var serr error
   511  				if maj, serr = strconv.Atoi(vs[0]); serr != nil {
   512  					errc <- serr
   513  					return
   514  				}
   515  				if min, serr = strconv.Atoi(vs[1]); serr != nil {
   516  					errc <- serr
   517  					return
   518  				}
   519  			}
   520  			if maj < 3 || (maj == 3 && min < 4) {
   521  				rerr = ErrOldCluster
   522  			}
   523  			errc <- rerr
   524  		}(ep)
   525  	}
   526  	// wait for success
   527  	for range eps {
   528  		if err = <-errc; err != nil {
   529  			break
   530  		}
   531  	}
   532  	cancel()
   533  	wg.Wait()
   534  	return err
   535  }
   536  
   537  // ActiveConnection returns the current in-use connection
   538  func (c *Client) ActiveConnection() *grpc.ClientConn { return c.conn }
   539  
   540  // isHaltErr returns true if the given error and context indicate no forward
   541  // progress can be made, even after reconnecting.
   542  func isHaltErr(ctx context.Context, err error) bool {
   543  	if ctx != nil && ctx.Err() != nil {
   544  		return true
   545  	}
   546  	if err == nil {
   547  		return false
   548  	}
   549  	ev, _ := status.FromError(err)
   550  	// Unavailable codes mean the system will be right back.
   551  	// (e.g., can't connect, lost leader)
   552  	// Treat Internal codes as if something failed, leaving the
   553  	// system in an inconsistent state, but retrying could make progress.
   554  	// (e.g., failed in middle of send, corrupted frame)
   555  	// TODO: are permanent Internal errors possible from grpc?
   556  	return ev.Code() != codes.Unavailable && ev.Code() != codes.Internal
   557  }
   558  
   559  // isUnavailableErr returns true if the given error is an unavailable error
   560  func isUnavailableErr(ctx context.Context, err error) bool {
   561  	if ctx != nil && ctx.Err() != nil {
   562  		return false
   563  	}
   564  	if err == nil {
   565  		return false
   566  	}
   567  	ev, ok := status.FromError(err)
   568  	if ok {
   569  		// Unavailable codes mean the system will be right back.
   570  		// (e.g., can't connect, lost leader)
   571  		return ev.Code() == codes.Unavailable
   572  	}
   573  	return false
   574  }
   575  
   576  func toErr(ctx context.Context, err error) error {
   577  	if err == nil {
   578  		return nil
   579  	}
   580  	err = rpctypes.Error(err)
   581  	if _, ok := err.(rpctypes.EtcdError); ok {
   582  		return err
   583  	}
   584  	if ev, ok := status.FromError(err); ok {
   585  		code := ev.Code()
   586  		switch code {
   587  		case codes.DeadlineExceeded:
   588  			fallthrough
   589  		case codes.Canceled:
   590  			if ctx.Err() != nil {
   591  				err = ctx.Err()
   592  			}
   593  		}
   594  	}
   595  	return err
   596  }
   597  
   598  func canceledByCaller(stopCtx context.Context, err error) bool {
   599  	if stopCtx.Err() == nil || err == nil {
   600  		return false
   601  	}
   602  
   603  	return err == context.Canceled || err == context.DeadlineExceeded
   604  }
   605  
   606  // IsConnCanceled returns true, if error is from a closed gRPC connection.
   607  // ref. https://github.com/grpc/grpc-go/pull/1854
   608  func IsConnCanceled(err error) bool {
   609  	if err == nil {
   610  		return false
   611  	}
   612  
   613  	// >= gRPC v1.23.x
   614  	s, ok := status.FromError(err)
   615  	if ok {
   616  		// connection is canceled or server has already closed the connection
   617  		return s.Code() == codes.Canceled || s.Message() == "transport is closing"
   618  	}
   619  
   620  	// >= gRPC v1.10.x
   621  	if err == context.Canceled {
   622  		return true
   623  	}
   624  
   625  	// <= gRPC v1.7.x returns 'errors.New("grpc: the client connection is closing")'
   626  	return strings.Contains(err.Error(), "grpc: the client connection is closing")
   627  }
   628  

View as plain text