...

Source file src/github.com/go-kit/kit/sd/etcd/client.go

Documentation: github.com/go-kit/kit/sd/etcd

     1  package etcd
     2  
     3  import (
     4  	"context"
     5  	"crypto/tls"
     6  	"crypto/x509"
     7  	"errors"
     8  	"io/ioutil"
     9  	"net"
    10  	"net/http"
    11  	"time"
    12  
    13  	etcd "go.etcd.io/etcd/client/v2"
    14  )
    15  
    16  var (
    17  	// ErrNoKey indicates a client method needs a key but receives none.
    18  	ErrNoKey = errors.New("no key provided")
    19  
    20  	// ErrNoValue indicates a client method needs a value but receives none.
    21  	ErrNoValue = errors.New("no value provided")
    22  )
    23  
    24  // Client is a wrapper around the etcd client.
    25  type Client interface {
    26  	// GetEntries queries the given prefix in etcd and returns a slice
    27  	// containing the values of all keys found, recursively, underneath that
    28  	// prefix.
    29  	GetEntries(prefix string) ([]string, error)
    30  
    31  	// WatchPrefix watches the given prefix in etcd for changes. When a change
    32  	// is detected, it will signal on the passed channel. Clients are expected
    33  	// to call GetEntries to update themselves with the latest set of complete
    34  	// values. WatchPrefix will always send an initial sentinel value on the
    35  	// channel after establishing the watch, to ensure that clients always
    36  	// receive the latest set of values. WatchPrefix will block until the
    37  	// context passed to the NewClient constructor is terminated.
    38  	WatchPrefix(prefix string, ch chan struct{})
    39  
    40  	// Register a service with etcd.
    41  	Register(s Service) error
    42  
    43  	// Deregister a service with etcd.
    44  	Deregister(s Service) error
    45  }
    46  
    47  type client struct {
    48  	keysAPI etcd.KeysAPI
    49  	ctx     context.Context
    50  }
    51  
    52  // ClientOptions defines options for the etcd client. All values are optional.
    53  // If any duration is not specified, a default of 3 seconds will be used.
    54  type ClientOptions struct {
    55  	Cert                    string
    56  	Key                     string
    57  	CACert                  string
    58  	DialTimeout             time.Duration
    59  	DialKeepAlive           time.Duration
    60  	HeaderTimeoutPerRequest time.Duration
    61  }
    62  
    63  // NewClient returns Client with a connection to the named machines. It will
    64  // return an error if a connection to the cluster cannot be made. The parameter
    65  // machines needs to be a full URL with schemas. e.g. "http://localhost:2379"
    66  // will work, but "localhost:2379" will not.
    67  func NewClient(ctx context.Context, machines []string, options ClientOptions) (Client, error) {
    68  	if options.DialTimeout == 0 {
    69  		options.DialTimeout = 3 * time.Second
    70  	}
    71  	if options.DialKeepAlive == 0 {
    72  		options.DialKeepAlive = 3 * time.Second
    73  	}
    74  	if options.HeaderTimeoutPerRequest == 0 {
    75  		options.HeaderTimeoutPerRequest = 3 * time.Second
    76  	}
    77  
    78  	transport := etcd.DefaultTransport
    79  	if options.Cert != "" && options.Key != "" {
    80  		tlsCert, err := tls.LoadX509KeyPair(options.Cert, options.Key)
    81  		if err != nil {
    82  			return nil, err
    83  		}
    84  		caCertCt, err := ioutil.ReadFile(options.CACert)
    85  		if err != nil {
    86  			return nil, err
    87  		}
    88  		caCertPool := x509.NewCertPool()
    89  		caCertPool.AppendCertsFromPEM(caCertCt)
    90  		transport = &http.Transport{
    91  			TLSClientConfig: &tls.Config{
    92  				Certificates: []tls.Certificate{tlsCert},
    93  				RootCAs:      caCertPool,
    94  			},
    95  			Dial: func(network, address string) (net.Conn, error) {
    96  				return (&net.Dialer{
    97  					Timeout:   options.DialTimeout,
    98  					KeepAlive: options.DialKeepAlive,
    99  				}).Dial(network, address)
   100  			},
   101  		}
   102  	}
   103  
   104  	ce, err := etcd.New(etcd.Config{
   105  		Endpoints:               machines,
   106  		Transport:               transport,
   107  		HeaderTimeoutPerRequest: options.HeaderTimeoutPerRequest,
   108  	})
   109  	if err != nil {
   110  		return nil, err
   111  	}
   112  
   113  	return &client{
   114  		keysAPI: etcd.NewKeysAPI(ce),
   115  		ctx:     ctx,
   116  	}, nil
   117  }
   118  
   119  // GetEntries implements the etcd Client interface.
   120  func (c *client) GetEntries(key string) ([]string, error) {
   121  	resp, err := c.keysAPI.Get(c.ctx, key, &etcd.GetOptions{Recursive: true})
   122  	if err != nil {
   123  		return nil, err
   124  	}
   125  
   126  	// Special case. Note that it's possible that len(resp.Node.Nodes) == 0 and
   127  	// resp.Node.Value is also empty, in which case the key is empty and we
   128  	// should not return any entries.
   129  	if len(resp.Node.Nodes) == 0 && resp.Node.Value != "" {
   130  		return []string{resp.Node.Value}, nil
   131  	}
   132  
   133  	entries := make([]string, len(resp.Node.Nodes))
   134  	for i, node := range resp.Node.Nodes {
   135  		entries[i] = node.Value
   136  	}
   137  	return entries, nil
   138  }
   139  
   140  // WatchPrefix implements the etcd Client interface.
   141  func (c *client) WatchPrefix(prefix string, ch chan struct{}) {
   142  	watch := c.keysAPI.Watcher(prefix, &etcd.WatcherOptions{AfterIndex: 0, Recursive: true})
   143  	ch <- struct{}{} // make sure caller invokes GetEntries
   144  	for {
   145  		if _, err := watch.Next(c.ctx); err != nil {
   146  			return
   147  		}
   148  		ch <- struct{}{}
   149  	}
   150  }
   151  
   152  func (c *client) Register(s Service) error {
   153  	if s.Key == "" {
   154  		return ErrNoKey
   155  	}
   156  	if s.Value == "" {
   157  		return ErrNoValue
   158  	}
   159  	var err error
   160  	if s.TTL != nil {
   161  		_, err = c.keysAPI.Set(c.ctx, s.Key, s.Value, &etcd.SetOptions{
   162  			PrevExist: etcd.PrevIgnore,
   163  			TTL:       s.TTL.ttl,
   164  		})
   165  	} else {
   166  		_, err = c.keysAPI.Create(c.ctx, s.Key, s.Value)
   167  	}
   168  	return err
   169  }
   170  
   171  func (c *client) Deregister(s Service) error {
   172  	if s.Key == "" {
   173  		return ErrNoKey
   174  	}
   175  	_, err := c.keysAPI.Delete(c.ctx, s.Key, s.DeleteOptions)
   176  	return err
   177  }
   178  

View as plain text