...

Source file src/github.com/go-kit/kit/sd/etcdv3/client.go

Documentation: github.com/go-kit/kit/sd/etcdv3

     1  package etcdv3
     2  
     3  import (
     4  	"context"
     5  	"crypto/tls"
     6  	"errors"
     7  	"time"
     8  
     9  	"go.etcd.io/etcd/client/pkg/v3/transport"
    10  	clientv3 "go.etcd.io/etcd/client/v3"
    11  	"google.golang.org/grpc"
    12  )
    13  
    14  var (
    15  	// ErrNoKey indicates a client method needs a key but receives none.
    16  	ErrNoKey = errors.New("no key provided")
    17  
    18  	// ErrNoValue indicates a client method needs a value but receives none.
    19  	ErrNoValue = errors.New("no value provided")
    20  )
    21  
    22  // Client is a wrapper around the etcd client.
    23  type Client interface {
    24  	// GetEntries queries the given prefix in etcd and returns a slice
    25  	// containing the values of all keys found, recursively, underneath that
    26  	// prefix.
    27  	GetEntries(prefix string) ([]string, error)
    28  
    29  	// WatchPrefix watches the given prefix in etcd for changes. When a change
    30  	// is detected, it will signal on the passed channel. Clients are expected
    31  	// to call GetEntries to update themselves with the latest set of complete
    32  	// values. WatchPrefix will always send an initial sentinel value on the
    33  	// channel after establishing the watch, to ensure that clients always
    34  	// receive the latest set of values. WatchPrefix will block until the
    35  	// context passed to the NewClient constructor is terminated.
    36  	WatchPrefix(prefix string, ch chan struct{})
    37  
    38  	// Register a service with etcd.
    39  	Register(s Service) error
    40  
    41  	// Deregister a service with etcd.
    42  	Deregister(s Service) error
    43  
    44  	// LeaseID returns the lease id created for this service instance
    45  	LeaseID() int64
    46  }
    47  
    48  type client struct {
    49  	cli *clientv3.Client
    50  	ctx context.Context
    51  
    52  	kv clientv3.KV
    53  
    54  	// Watcher interface instance, used to leverage Watcher.Close()
    55  	watcher clientv3.Watcher
    56  	// watcher context
    57  	wctx context.Context
    58  	// watcher cancel func
    59  	wcf context.CancelFunc
    60  
    61  	// leaseID will be 0 (clientv3.NoLease) if a lease was not created
    62  	leaseID clientv3.LeaseID
    63  
    64  	hbch <-chan *clientv3.LeaseKeepAliveResponse
    65  	// Lease interface instance, used to leverage Lease.Close()
    66  	leaser clientv3.Lease
    67  }
    68  
    69  // ClientOptions defines options for the etcd client. All values are optional.
    70  // If any duration is not specified, a default of 3 seconds will be used.
    71  type ClientOptions struct {
    72  	Cert          string
    73  	Key           string
    74  	CACert        string
    75  	DialTimeout   time.Duration
    76  	DialKeepAlive time.Duration
    77  
    78  	// DialOptions is a list of dial options for the gRPC client (e.g., for interceptors).
    79  	// For example, pass grpc.WithBlock() to block until the underlying connection is up.
    80  	// Without this, Dial returns immediately and connecting the server happens in background.
    81  	DialOptions []grpc.DialOption
    82  
    83  	Username string
    84  	Password string
    85  }
    86  
    87  // NewClient returns Client with a connection to the named machines. It will
    88  // return an error if a connection to the cluster cannot be made.
    89  func NewClient(ctx context.Context, machines []string, options ClientOptions) (Client, error) {
    90  	if options.DialTimeout == 0 {
    91  		options.DialTimeout = 3 * time.Second
    92  	}
    93  	if options.DialKeepAlive == 0 {
    94  		options.DialKeepAlive = 3 * time.Second
    95  	}
    96  
    97  	var err error
    98  	var tlscfg *tls.Config
    99  
   100  	if options.Cert != "" && options.Key != "" {
   101  		tlsInfo := transport.TLSInfo{
   102  			CertFile:      options.Cert,
   103  			KeyFile:       options.Key,
   104  			TrustedCAFile: options.CACert,
   105  		}
   106  		tlscfg, err = tlsInfo.ClientConfig()
   107  		if err != nil {
   108  			return nil, err
   109  		}
   110  	}
   111  
   112  	cli, err := clientv3.New(clientv3.Config{
   113  		Context:           ctx,
   114  		Endpoints:         machines,
   115  		DialTimeout:       options.DialTimeout,
   116  		DialKeepAliveTime: options.DialKeepAlive,
   117  		DialOptions:       options.DialOptions,
   118  		TLS:               tlscfg,
   119  		Username:          options.Username,
   120  		Password:          options.Password,
   121  	})
   122  	if err != nil {
   123  		return nil, err
   124  	}
   125  
   126  	return &client{
   127  		cli: cli,
   128  		ctx: ctx,
   129  		kv:  clientv3.NewKV(cli),
   130  	}, nil
   131  }
   132  
   133  func (c *client) LeaseID() int64 { return int64(c.leaseID) }
   134  
   135  // GetEntries implements the etcd Client interface.
   136  func (c *client) GetEntries(key string) ([]string, error) {
   137  	resp, err := c.kv.Get(c.ctx, key, clientv3.WithPrefix())
   138  	if err != nil {
   139  		return nil, err
   140  	}
   141  
   142  	entries := make([]string, len(resp.Kvs))
   143  	for i, kv := range resp.Kvs {
   144  		entries[i] = string(kv.Value)
   145  	}
   146  
   147  	return entries, nil
   148  }
   149  
   150  // WatchPrefix implements the etcd Client interface.
   151  func (c *client) WatchPrefix(prefix string, ch chan struct{}) {
   152  	c.wctx, c.wcf = context.WithCancel(c.ctx)
   153  	c.watcher = clientv3.NewWatcher(c.cli)
   154  
   155  	wch := c.watcher.Watch(c.wctx, prefix, clientv3.WithPrefix(), clientv3.WithRev(0))
   156  	ch <- struct{}{}
   157  	for wr := range wch {
   158  		if wr.Canceled {
   159  			return
   160  		}
   161  		ch <- struct{}{}
   162  	}
   163  }
   164  
   165  func (c *client) Register(s Service) error {
   166  	var err error
   167  
   168  	if s.Key == "" {
   169  		return ErrNoKey
   170  	}
   171  	if s.Value == "" {
   172  		return ErrNoValue
   173  	}
   174  
   175  	if c.leaser != nil {
   176  		c.leaser.Close()
   177  	}
   178  	c.leaser = clientv3.NewLease(c.cli)
   179  
   180  	if c.watcher != nil {
   181  		c.watcher.Close()
   182  	}
   183  	c.watcher = clientv3.NewWatcher(c.cli)
   184  	if c.kv == nil {
   185  		c.kv = clientv3.NewKV(c.cli)
   186  	}
   187  
   188  	if s.TTL == nil {
   189  		s.TTL = NewTTLOption(time.Second*3, time.Second*10)
   190  	}
   191  
   192  	grantResp, err := c.leaser.Grant(c.ctx, int64(s.TTL.ttl.Seconds()))
   193  	if err != nil {
   194  		return err
   195  	}
   196  	c.leaseID = grantResp.ID
   197  
   198  	_, err = c.kv.Put(
   199  		c.ctx,
   200  		s.Key,
   201  		s.Value,
   202  		clientv3.WithLease(c.leaseID),
   203  	)
   204  	if err != nil {
   205  		return err
   206  	}
   207  
   208  	// this will keep the key alive 'forever' or until we revoke it or
   209  	// the context is canceled
   210  	c.hbch, err = c.leaser.KeepAlive(c.ctx, c.leaseID)
   211  	if err != nil {
   212  		return err
   213  	}
   214  
   215  	// discard the keepalive response, make etcd library not to complain
   216  	// fix bug #799
   217  	go func() {
   218  		for {
   219  			select {
   220  			case r := <-c.hbch:
   221  				// avoid dead loop when channel was closed
   222  				if r == nil {
   223  					return
   224  				}
   225  			case <-c.ctx.Done():
   226  				return
   227  			}
   228  		}
   229  	}()
   230  
   231  	return nil
   232  }
   233  
   234  func (c *client) Deregister(s Service) error {
   235  	defer c.close()
   236  
   237  	if s.Key == "" {
   238  		return ErrNoKey
   239  	}
   240  	if _, err := c.cli.Delete(c.ctx, s.Key, clientv3.WithIgnoreLease()); err != nil {
   241  		return err
   242  	}
   243  
   244  	return nil
   245  }
   246  
   247  // close will close any open clients and call
   248  // the watcher cancel func
   249  func (c *client) close() {
   250  	if c.leaser != nil {
   251  		c.leaser.Close()
   252  	}
   253  	if c.watcher != nil {
   254  		c.watcher.Close()
   255  	}
   256  	if c.wcf != nil {
   257  		c.wcf()
   258  	}
   259  }
   260  

View as plain text