1 package zk
2
3 import (
4 "errors"
5 "net"
6 "strings"
7 "time"
8
9 "github.com/go-zookeeper/zk"
10
11 "github.com/go-kit/log"
12 )
13
14
15 var (
16 DefaultACL = zk.WorldACL(zk.PermAll)
17 ErrInvalidCredentials = errors.New("invalid credentials provided")
18 ErrClientClosed = errors.New("client service closed")
19 ErrNotRegistered = errors.New("not registered")
20 ErrNodeNotFound = errors.New("node not found")
21 )
22
23 const (
24
25
26 DefaultConnectTimeout = 2 * time.Second
27
28
29 DefaultSessionTimeout = 5 * time.Second
30 )
31
32
33 type Client interface {
34
35
36 GetEntries(path string) ([]string, <-chan zk.Event, error)
37
38
39 CreateParentNodes(path string) error
40
41 Register(s *Service) error
42
43 Deregister(s *Service) error
44
45 Stop()
46 }
47
48 type clientConfig struct {
49 logger log.Logger
50 acl []zk.ACL
51 credentials []byte
52 connectTimeout time.Duration
53 sessionTimeout time.Duration
54 rootNodePayload [][]byte
55 eventHandler func(zk.Event)
56 }
57
58
59 type Option func(*clientConfig) error
60
61 type client struct {
62 *zk.Conn
63 clientConfig
64 active bool
65 quit chan struct{}
66 }
67
68
69 func ACL(acl []zk.ACL) Option {
70 return func(c *clientConfig) error {
71 c.acl = acl
72 return nil
73 }
74 }
75
76
77
78 func Credentials(user, pass string) Option {
79 return func(c *clientConfig) error {
80 if user == "" || pass == "" {
81 return ErrInvalidCredentials
82 }
83 c.credentials = []byte(user + ":" + pass)
84 return nil
85 }
86 }
87
88
89
90 func ConnectTimeout(t time.Duration) Option {
91 return func(c *clientConfig) error {
92 if t.Seconds() < 1 {
93 return errors.New("invalid connect timeout (minimum value is 1 second)")
94 }
95 c.connectTimeout = t
96 return nil
97 }
98 }
99
100
101 func SessionTimeout(t time.Duration) Option {
102 return func(c *clientConfig) error {
103 if t.Seconds() < 1 {
104 return errors.New("invalid session timeout (minimum value is 1 second)")
105 }
106 c.sessionTimeout = t
107 return nil
108 }
109 }
110
111
112
113 func Payload(payload [][]byte) Option {
114 return func(c *clientConfig) error {
115 c.rootNodePayload = payload
116 return nil
117 }
118 }
119
120
121
122 func EventHandler(handler func(zk.Event)) Option {
123 return func(c *clientConfig) error {
124 c.eventHandler = handler
125 return nil
126 }
127 }
128
129
130
131 func NewClient(servers []string, logger log.Logger, options ...Option) (Client, error) {
132 defaultEventHandler := func(event zk.Event) {
133 logger.Log("eventtype", event.Type.String(), "server", event.Server, "state", event.State.String(), "err", event.Err)
134 }
135 config := clientConfig{
136 acl: DefaultACL,
137 connectTimeout: DefaultConnectTimeout,
138 sessionTimeout: DefaultSessionTimeout,
139 eventHandler: defaultEventHandler,
140 logger: logger,
141 }
142 for _, option := range options {
143 if err := option(&config); err != nil {
144 return nil, err
145 }
146 }
147
148
149
150
151 dialer := func(network, address string, _ time.Duration) (net.Conn, error) {
152 return net.DialTimeout(network, address, config.connectTimeout)
153 }
154 conn, eventc, err := zk.Connect(servers, config.sessionTimeout, withLogger(logger), zk.WithDialer(dialer))
155
156 if err != nil {
157 return nil, err
158 }
159
160 if len(config.credentials) > 0 {
161 err = conn.AddAuth("digest", config.credentials)
162 if err != nil {
163 return nil, err
164 }
165 }
166
167 c := &client{conn, config, true, make(chan struct{})}
168
169
170
171 go func() {
172 for {
173 select {
174 case event := <-eventc:
175 config.eventHandler(event)
176 case <-c.quit:
177 return
178 }
179 }
180 }()
181 return c, nil
182 }
183
184
185 func (c *client) CreateParentNodes(path string) error {
186 if !c.active {
187 return ErrClientClosed
188 }
189 if path[0] != '/' {
190 return zk.ErrInvalidPath
191 }
192 payload := []byte("")
193 pathString := ""
194 pathNodes := strings.Split(path, "/")
195 for i := 1; i < len(pathNodes); i++ {
196 if i <= len(c.rootNodePayload) {
197 payload = c.rootNodePayload[i-1]
198 } else {
199 payload = []byte("")
200 }
201 pathString += "/" + pathNodes[i]
202 _, err := c.Create(pathString, payload, 0, c.acl)
203
204
205
206 if err != nil && err != zk.ErrNodeExists && err != zk.ErrNoAuth {
207 return err
208 }
209 }
210 return nil
211 }
212
213
214 func (c *client) GetEntries(path string) ([]string, <-chan zk.Event, error) {
215
216 znodes, _, eventc, err := c.ChildrenW(path)
217
218 if err != nil {
219 return nil, eventc, err
220 }
221
222 var resp []string
223 for _, znode := range znodes {
224
225 if data, _, err := c.Get(path + "/" + znode); err == nil {
226 resp = append(resp, string(data))
227 }
228 }
229 return resp, eventc, nil
230 }
231
232
233 func (c *client) Register(s *Service) error {
234 if s.Path[len(s.Path)-1] != '/' {
235 s.Path += "/"
236 }
237 path := s.Path + s.Name
238 if err := c.CreateParentNodes(path); err != nil {
239 return err
240 }
241 if path[len(path)-1] != '/' {
242 path += "/"
243 }
244 node, err := c.CreateProtectedEphemeralSequential(path, s.Data, c.acl)
245 if err != nil {
246 return err
247 }
248 s.node = node
249 return nil
250 }
251
252
253 func (c *client) Deregister(s *Service) error {
254 if s.node == "" {
255 return ErrNotRegistered
256 }
257 path := s.Path + s.Name
258 found, stat, err := c.Exists(path)
259 if err != nil {
260 return err
261 }
262 if !found {
263 return ErrNodeNotFound
264 }
265 if err := c.Delete(path, stat.Version); err != nil {
266 return err
267 }
268 return nil
269 }
270
271
272 func (c *client) Stop() {
273 c.active = false
274 close(c.quit)
275 c.Close()
276 }
277
View as plain text