...

Source file src/github.com/go-kit/kit/sd/zk/client.go

Documentation: github.com/go-kit/kit/sd/zk

     1  package zk
     2  
     3  import (
     4  	"errors"
     5  	"net"
     6  	"strings"
     7  	"time"
     8  
     9  	"github.com/go-zookeeper/zk"
    10  
    11  	"github.com/go-kit/log"
    12  )
    13  
    14  // DefaultACL is the default ACL to use for creating znodes.
    15  var (
    16  	DefaultACL            = zk.WorldACL(zk.PermAll)
    17  	ErrInvalidCredentials = errors.New("invalid credentials provided")
    18  	ErrClientClosed       = errors.New("client service closed")
    19  	ErrNotRegistered      = errors.New("not registered")
    20  	ErrNodeNotFound       = errors.New("node not found")
    21  )
    22  
    23  const (
    24  	// DefaultConnectTimeout is the default timeout to establish a connection to
    25  	// a ZooKeeper node.
    26  	DefaultConnectTimeout = 2 * time.Second
    27  	// DefaultSessionTimeout is the default timeout to keep the current
    28  	// ZooKeeper session alive during a temporary disconnect.
    29  	DefaultSessionTimeout = 5 * time.Second
    30  )
    31  
    32  // Client is a wrapper around a lower level ZooKeeper client implementation.
    33  type Client interface {
    34  	// GetEntries should query the provided path in ZooKeeper, place a watch on
    35  	// it and retrieve data from its current child nodes.
    36  	GetEntries(path string) ([]string, <-chan zk.Event, error)
    37  	// CreateParentNodes should try to create the path in case it does not exist
    38  	// yet on ZooKeeper.
    39  	CreateParentNodes(path string) error
    40  	// Register a service with ZooKeeper.
    41  	Register(s *Service) error
    42  	// Deregister a service with ZooKeeper.
    43  	Deregister(s *Service) error
    44  	// Stop should properly shutdown the client implementation
    45  	Stop()
    46  }
    47  
    48  type clientConfig struct {
    49  	logger          log.Logger
    50  	acl             []zk.ACL
    51  	credentials     []byte
    52  	connectTimeout  time.Duration
    53  	sessionTimeout  time.Duration
    54  	rootNodePayload [][]byte
    55  	eventHandler    func(zk.Event)
    56  }
    57  
    58  // Option functions enable friendly APIs.
    59  type Option func(*clientConfig) error
    60  
    61  type client struct {
    62  	*zk.Conn
    63  	clientConfig
    64  	active bool
    65  	quit   chan struct{}
    66  }
    67  
    68  // ACL returns an Option specifying a non-default ACL for creating parent nodes.
    69  func ACL(acl []zk.ACL) Option {
    70  	return func(c *clientConfig) error {
    71  		c.acl = acl
    72  		return nil
    73  	}
    74  }
    75  
    76  // Credentials returns an Option specifying a user/password combination which
    77  // the client will use to authenticate itself with.
    78  func Credentials(user, pass string) Option {
    79  	return func(c *clientConfig) error {
    80  		if user == "" || pass == "" {
    81  			return ErrInvalidCredentials
    82  		}
    83  		c.credentials = []byte(user + ":" + pass)
    84  		return nil
    85  	}
    86  }
    87  
    88  // ConnectTimeout returns an Option specifying a non-default connection timeout
    89  // when we try to establish a connection to a ZooKeeper server.
    90  func ConnectTimeout(t time.Duration) Option {
    91  	return func(c *clientConfig) error {
    92  		if t.Seconds() < 1 {
    93  			return errors.New("invalid connect timeout (minimum value is 1 second)")
    94  		}
    95  		c.connectTimeout = t
    96  		return nil
    97  	}
    98  }
    99  
   100  // SessionTimeout returns an Option specifying a non-default session timeout.
   101  func SessionTimeout(t time.Duration) Option {
   102  	return func(c *clientConfig) error {
   103  		if t.Seconds() < 1 {
   104  			return errors.New("invalid session timeout (minimum value is 1 second)")
   105  		}
   106  		c.sessionTimeout = t
   107  		return nil
   108  	}
   109  }
   110  
   111  // Payload returns an Option specifying non-default data values for each znode
   112  // created by CreateParentNodes.
   113  func Payload(payload [][]byte) Option {
   114  	return func(c *clientConfig) error {
   115  		c.rootNodePayload = payload
   116  		return nil
   117  	}
   118  }
   119  
   120  // EventHandler returns an Option specifying a callback function to handle
   121  // incoming zk.Event payloads (ZooKeeper connection events).
   122  func EventHandler(handler func(zk.Event)) Option {
   123  	return func(c *clientConfig) error {
   124  		c.eventHandler = handler
   125  		return nil
   126  	}
   127  }
   128  
   129  // NewClient returns a ZooKeeper client with a connection to the server cluster.
   130  // It will return an error if the server cluster cannot be resolved.
   131  func NewClient(servers []string, logger log.Logger, options ...Option) (Client, error) {
   132  	defaultEventHandler := func(event zk.Event) {
   133  		logger.Log("eventtype", event.Type.String(), "server", event.Server, "state", event.State.String(), "err", event.Err)
   134  	}
   135  	config := clientConfig{
   136  		acl:            DefaultACL,
   137  		connectTimeout: DefaultConnectTimeout,
   138  		sessionTimeout: DefaultSessionTimeout,
   139  		eventHandler:   defaultEventHandler,
   140  		logger:         logger,
   141  	}
   142  	for _, option := range options {
   143  		if err := option(&config); err != nil {
   144  			return nil, err
   145  		}
   146  	}
   147  	// dialer overrides the default ZooKeeper library Dialer so we can configure
   148  	// the connectTimeout. The current library has a hardcoded value of 1 second
   149  	// and there are reports of race conditions, due to slow DNS resolvers and
   150  	// other network latency issues.
   151  	dialer := func(network, address string, _ time.Duration) (net.Conn, error) {
   152  		return net.DialTimeout(network, address, config.connectTimeout)
   153  	}
   154  	conn, eventc, err := zk.Connect(servers, config.sessionTimeout, withLogger(logger), zk.WithDialer(dialer))
   155  
   156  	if err != nil {
   157  		return nil, err
   158  	}
   159  
   160  	if len(config.credentials) > 0 {
   161  		err = conn.AddAuth("digest", config.credentials)
   162  		if err != nil {
   163  			return nil, err
   164  		}
   165  	}
   166  
   167  	c := &client{conn, config, true, make(chan struct{})}
   168  
   169  	// Start listening for incoming Event payloads and callback the set
   170  	// eventHandler.
   171  	go func() {
   172  		for {
   173  			select {
   174  			case event := <-eventc:
   175  				config.eventHandler(event)
   176  			case <-c.quit:
   177  				return
   178  			}
   179  		}
   180  	}()
   181  	return c, nil
   182  }
   183  
   184  // CreateParentNodes implements the ZooKeeper Client interface.
   185  func (c *client) CreateParentNodes(path string) error {
   186  	if !c.active {
   187  		return ErrClientClosed
   188  	}
   189  	if path[0] != '/' {
   190  		return zk.ErrInvalidPath
   191  	}
   192  	payload := []byte("")
   193  	pathString := ""
   194  	pathNodes := strings.Split(path, "/")
   195  	for i := 1; i < len(pathNodes); i++ {
   196  		if i <= len(c.rootNodePayload) {
   197  			payload = c.rootNodePayload[i-1]
   198  		} else {
   199  			payload = []byte("")
   200  		}
   201  		pathString += "/" + pathNodes[i]
   202  		_, err := c.Create(pathString, payload, 0, c.acl)
   203  		// not being able to create the node because it exists or not having
   204  		// sufficient rights is not an issue. It is ok for the node to already
   205  		// exist and/or us to only have read rights
   206  		if err != nil && err != zk.ErrNodeExists && err != zk.ErrNoAuth {
   207  			return err
   208  		}
   209  	}
   210  	return nil
   211  }
   212  
   213  // GetEntries implements the ZooKeeper Client interface.
   214  func (c *client) GetEntries(path string) ([]string, <-chan zk.Event, error) {
   215  	// retrieve list of child nodes for given path and add watch to path
   216  	znodes, _, eventc, err := c.ChildrenW(path)
   217  
   218  	if err != nil {
   219  		return nil, eventc, err
   220  	}
   221  
   222  	var resp []string
   223  	for _, znode := range znodes {
   224  		// retrieve payload for child znode and add to response array
   225  		if data, _, err := c.Get(path + "/" + znode); err == nil {
   226  			resp = append(resp, string(data))
   227  		}
   228  	}
   229  	return resp, eventc, nil
   230  }
   231  
   232  // Register implements the ZooKeeper Client interface.
   233  func (c *client) Register(s *Service) error {
   234  	if s.Path[len(s.Path)-1] != '/' {
   235  		s.Path += "/"
   236  	}
   237  	path := s.Path + s.Name
   238  	if err := c.CreateParentNodes(path); err != nil {
   239  		return err
   240  	}
   241  	if path[len(path)-1] != '/' {
   242  		path += "/"
   243  	}
   244  	node, err := c.CreateProtectedEphemeralSequential(path, s.Data, c.acl)
   245  	if err != nil {
   246  		return err
   247  	}
   248  	s.node = node
   249  	return nil
   250  }
   251  
   252  // Deregister implements the ZooKeeper Client interface.
   253  func (c *client) Deregister(s *Service) error {
   254  	if s.node == "" {
   255  		return ErrNotRegistered
   256  	}
   257  	path := s.Path + s.Name
   258  	found, stat, err := c.Exists(path)
   259  	if err != nil {
   260  		return err
   261  	}
   262  	if !found {
   263  		return ErrNodeNotFound
   264  	}
   265  	if err := c.Delete(path, stat.Version); err != nil {
   266  		return err
   267  	}
   268  	return nil
   269  }
   270  
   271  // Stop implements the ZooKeeper Client interface.
   272  func (c *client) Stop() {
   273  	c.active = false
   274  	close(c.quit)
   275  	c.Close()
   276  }
   277  

View as plain text