1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package client
16
17 import (
18 "context"
19 "encoding/json"
20 "errors"
21 "fmt"
22 "net/http"
23 "net/url"
24 "strconv"
25 "strings"
26 "time"
27
28 "go.etcd.io/etcd/client/pkg/v3/pathutil"
29 )
30
31 const (
32 ErrorCodeKeyNotFound = 100
33 ErrorCodeTestFailed = 101
34 ErrorCodeNotFile = 102
35 ErrorCodeNotDir = 104
36 ErrorCodeNodeExist = 105
37 ErrorCodeRootROnly = 107
38 ErrorCodeDirNotEmpty = 108
39 ErrorCodeUnauthorized = 110
40
41 ErrorCodePrevValueRequired = 201
42 ErrorCodeTTLNaN = 202
43 ErrorCodeIndexNaN = 203
44 ErrorCodeInvalidField = 209
45 ErrorCodeInvalidForm = 210
46
47 ErrorCodeRaftInternal = 300
48 ErrorCodeLeaderElect = 301
49
50 ErrorCodeWatcherCleared = 400
51 ErrorCodeEventIndexCleared = 401
52 )
53
54 type Error struct {
55 Code int `json:"errorCode"`
56 Message string `json:"message"`
57 Cause string `json:"cause"`
58 Index uint64 `json:"index"`
59 }
60
61 func (e Error) Error() string {
62 return fmt.Sprintf("%v: %v (%v) [%v]", e.Code, e.Message, e.Cause, e.Index)
63 }
64
65 var (
66 ErrInvalidJSON = errors.New("client: response is invalid json. The endpoint is probably not valid etcd cluster endpoint")
67 ErrEmptyBody = errors.New("client: response body is empty")
68 )
69
70
71
72 type PrevExistType string
73
74 const (
75 PrevIgnore = PrevExistType("")
76 PrevExist = PrevExistType("true")
77 PrevNoExist = PrevExistType("false")
78 )
79
80 var (
81 defaultV2KeysPrefix = "/v2/keys"
82 )
83
84
85
86 func NewKeysAPI(c Client) KeysAPI {
87 return NewKeysAPIWithPrefix(c, defaultV2KeysPrefix)
88 }
89
90
91
92
93 func NewKeysAPIWithPrefix(c Client, p string) KeysAPI {
94 return &httpKeysAPI{
95 client: c,
96 prefix: p,
97 }
98 }
99
100 type KeysAPI interface {
101
102 Get(ctx context.Context, key string, opts *GetOptions) (*Response, error)
103
104
105
106
107 Set(ctx context.Context, key, value string, opts *SetOptions) (*Response, error)
108
109
110
111
112 Delete(ctx context.Context, key string, opts *DeleteOptions) (*Response, error)
113
114
115 Create(ctx context.Context, key, value string) (*Response, error)
116
117
118 CreateInOrder(ctx context.Context, dir, value string, opts *CreateInOrderOptions) (*Response, error)
119
120
121 Update(ctx context.Context, key, value string) (*Response, error)
122
123
124
125
126
127 Watcher(key string, opts *WatcherOptions) Watcher
128 }
129
130 type WatcherOptions struct {
131
132
133
134
135
136
137
138 AfterIndex uint64
139
140
141
142
143
144 Recursive bool
145 }
146
147 type CreateInOrderOptions struct {
148
149
150
151
152 TTL time.Duration
153 }
154
155 type SetOptions struct {
156
157
158
159
160
161
162
163
164 PrevValue string
165
166
167
168
169
170 PrevIndex uint64
171
172
173
174
175
176 PrevExist PrevExistType
177
178
179
180
181
182 TTL time.Duration
183
184
185
186
187 Refresh bool
188
189
190 Dir bool
191
192
193
194 NoValueOnSuccess bool
195 }
196
197 type GetOptions struct {
198
199
200 Recursive bool
201
202
203
204
205
206
207 Sort bool
208
209
210
211
212 Quorum bool
213 }
214
215 type DeleteOptions struct {
216
217
218
219
220
221
222 PrevValue string
223
224
225
226
227
228 PrevIndex uint64
229
230
231
232
233
234
235 Recursive bool
236
237
238 Dir bool
239 }
240
241 type Watcher interface {
242
243
244
245
246
247
248
249
250
251
252 Next(context.Context) (*Response, error)
253 }
254
255 type Response struct {
256
257
258
259 Action string `json:"action"`
260
261
262 Node *Node `json:"node"`
263
264
265
266
267 PrevNode *Node `json:"prevNode"`
268
269
270
271 Index uint64 `json:"-"`
272
273
274
275 ClusterID string `json:"-"`
276 }
277
278 type Node struct {
279
280 Key string `json:"key"`
281
282
283 Dir bool `json:"dir,omitempty"`
284
285
286
287 Value string `json:"value"`
288
289
290
291
292 Nodes Nodes `json:"nodes"`
293
294
295 CreatedIndex uint64 `json:"createdIndex"`
296
297
298 ModifiedIndex uint64 `json:"modifiedIndex"`
299
300
301 Expiration *time.Time `json:"expiration,omitempty"`
302
303
304 TTL int64 `json:"ttl,omitempty"`
305 }
306
307 func (n *Node) String() string {
308 return fmt.Sprintf("{Key: %s, CreatedIndex: %d, ModifiedIndex: %d, TTL: %d}", n.Key, n.CreatedIndex, n.ModifiedIndex, n.TTL)
309 }
310
311
312 func (n *Node) TTLDuration() time.Duration {
313 return time.Duration(n.TTL) * time.Second
314 }
315
316 type Nodes []*Node
317
318
319
320 func (ns Nodes) Len() int { return len(ns) }
321 func (ns Nodes) Less(i, j int) bool { return ns[i].Key < ns[j].Key }
322 func (ns Nodes) Swap(i, j int) { ns[i], ns[j] = ns[j], ns[i] }
323
324 type httpKeysAPI struct {
325 client httpClient
326 prefix string
327 }
328
329 func (k *httpKeysAPI) Set(ctx context.Context, key, val string, opts *SetOptions) (*Response, error) {
330 act := &setAction{
331 Prefix: k.prefix,
332 Key: key,
333 Value: val,
334 }
335
336 if opts != nil {
337 act.PrevValue = opts.PrevValue
338 act.PrevIndex = opts.PrevIndex
339 act.PrevExist = opts.PrevExist
340 act.TTL = opts.TTL
341 act.Refresh = opts.Refresh
342 act.Dir = opts.Dir
343 act.NoValueOnSuccess = opts.NoValueOnSuccess
344 }
345
346 doCtx := ctx
347 if act.PrevExist == PrevNoExist {
348 doCtx = context.WithValue(doCtx, &oneShotCtxValue, &oneShotCtxValue)
349 }
350 resp, body, err := k.client.Do(doCtx, act)
351 if err != nil {
352 return nil, err
353 }
354
355 return unmarshalHTTPResponse(resp.StatusCode, resp.Header, body)
356 }
357
358 func (k *httpKeysAPI) Create(ctx context.Context, key, val string) (*Response, error) {
359 return k.Set(ctx, key, val, &SetOptions{PrevExist: PrevNoExist})
360 }
361
362 func (k *httpKeysAPI) CreateInOrder(ctx context.Context, dir, val string, opts *CreateInOrderOptions) (*Response, error) {
363 act := &createInOrderAction{
364 Prefix: k.prefix,
365 Dir: dir,
366 Value: val,
367 }
368
369 if opts != nil {
370 act.TTL = opts.TTL
371 }
372
373 resp, body, err := k.client.Do(ctx, act)
374 if err != nil {
375 return nil, err
376 }
377
378 return unmarshalHTTPResponse(resp.StatusCode, resp.Header, body)
379 }
380
381 func (k *httpKeysAPI) Update(ctx context.Context, key, val string) (*Response, error) {
382 return k.Set(ctx, key, val, &SetOptions{PrevExist: PrevExist})
383 }
384
385 func (k *httpKeysAPI) Delete(ctx context.Context, key string, opts *DeleteOptions) (*Response, error) {
386 act := &deleteAction{
387 Prefix: k.prefix,
388 Key: key,
389 }
390
391 if opts != nil {
392 act.PrevValue = opts.PrevValue
393 act.PrevIndex = opts.PrevIndex
394 act.Dir = opts.Dir
395 act.Recursive = opts.Recursive
396 }
397
398 doCtx := context.WithValue(ctx, &oneShotCtxValue, &oneShotCtxValue)
399 resp, body, err := k.client.Do(doCtx, act)
400 if err != nil {
401 return nil, err
402 }
403
404 return unmarshalHTTPResponse(resp.StatusCode, resp.Header, body)
405 }
406
407 func (k *httpKeysAPI) Get(ctx context.Context, key string, opts *GetOptions) (*Response, error) {
408 act := &getAction{
409 Prefix: k.prefix,
410 Key: key,
411 }
412
413 if opts != nil {
414 act.Recursive = opts.Recursive
415 act.Sorted = opts.Sort
416 act.Quorum = opts.Quorum
417 }
418
419 resp, body, err := k.client.Do(ctx, act)
420 if err != nil {
421 return nil, err
422 }
423
424 return unmarshalHTTPResponse(resp.StatusCode, resp.Header, body)
425 }
426
427 func (k *httpKeysAPI) Watcher(key string, opts *WatcherOptions) Watcher {
428 act := waitAction{
429 Prefix: k.prefix,
430 Key: key,
431 }
432
433 if opts != nil {
434 act.Recursive = opts.Recursive
435 if opts.AfterIndex > 0 {
436 act.WaitIndex = opts.AfterIndex + 1
437 }
438 }
439
440 return &httpWatcher{
441 client: k.client,
442 nextWait: act,
443 }
444 }
445
446 type httpWatcher struct {
447 client httpClient
448 nextWait waitAction
449 }
450
451 func (hw *httpWatcher) Next(ctx context.Context) (*Response, error) {
452 for {
453 httpresp, body, err := hw.client.Do(ctx, &hw.nextWait)
454 if err != nil {
455 return nil, err
456 }
457
458 resp, err := unmarshalHTTPResponse(httpresp.StatusCode, httpresp.Header, body)
459 if err != nil {
460 if err == ErrEmptyBody {
461 continue
462 }
463 return nil, err
464 }
465
466 hw.nextWait.WaitIndex = resp.Node.ModifiedIndex + 1
467 return resp, nil
468 }
469 }
470
471
472
473
474
475
476 func v2KeysURL(ep url.URL, prefix, key string) *url.URL {
477
478
479
480 if prefix != "" && prefix[0] != '/' {
481 prefix = "/" + prefix
482 }
483 if key != "" && key[0] != '/' {
484 key = "/" + key
485 }
486 ep.Path = pathutil.CanonicalURLPath(ep.Path + prefix + key)
487 return &ep
488 }
489
490 type getAction struct {
491 Prefix string
492 Key string
493 Recursive bool
494 Sorted bool
495 Quorum bool
496 }
497
498 func (g *getAction) HTTPRequest(ep url.URL) *http.Request {
499 u := v2KeysURL(ep, g.Prefix, g.Key)
500
501 params := u.Query()
502 params.Set("recursive", strconv.FormatBool(g.Recursive))
503 params.Set("sorted", strconv.FormatBool(g.Sorted))
504 params.Set("quorum", strconv.FormatBool(g.Quorum))
505 u.RawQuery = params.Encode()
506
507 req, _ := http.NewRequest("GET", u.String(), nil)
508 return req
509 }
510
511 type waitAction struct {
512 Prefix string
513 Key string
514 WaitIndex uint64
515 Recursive bool
516 }
517
518 func (w *waitAction) HTTPRequest(ep url.URL) *http.Request {
519 u := v2KeysURL(ep, w.Prefix, w.Key)
520
521 params := u.Query()
522 params.Set("wait", "true")
523 params.Set("waitIndex", strconv.FormatUint(w.WaitIndex, 10))
524 params.Set("recursive", strconv.FormatBool(w.Recursive))
525 u.RawQuery = params.Encode()
526
527 req, _ := http.NewRequest("GET", u.String(), nil)
528 return req
529 }
530
531 type setAction struct {
532 Prefix string
533 Key string
534 Value string
535 PrevValue string
536 PrevIndex uint64
537 PrevExist PrevExistType
538 TTL time.Duration
539 Refresh bool
540 Dir bool
541 NoValueOnSuccess bool
542 }
543
544 func (a *setAction) HTTPRequest(ep url.URL) *http.Request {
545 u := v2KeysURL(ep, a.Prefix, a.Key)
546
547 params := u.Query()
548 form := url.Values{}
549
550
551 if a.Dir {
552 params.Set("dir", strconv.FormatBool(a.Dir))
553 } else {
554
555 if a.PrevValue != "" {
556 params.Set("prevValue", a.PrevValue)
557 }
558 form.Add("value", a.Value)
559 }
560
561
562 if a.PrevIndex != 0 {
563 params.Set("prevIndex", strconv.FormatUint(a.PrevIndex, 10))
564 }
565 if a.PrevExist != PrevIgnore {
566 params.Set("prevExist", string(a.PrevExist))
567 }
568 if a.TTL > 0 {
569 form.Add("ttl", strconv.FormatUint(uint64(a.TTL.Seconds()), 10))
570 }
571
572 if a.Refresh {
573 form.Add("refresh", "true")
574 }
575 if a.NoValueOnSuccess {
576 params.Set("noValueOnSuccess", strconv.FormatBool(a.NoValueOnSuccess))
577 }
578
579 u.RawQuery = params.Encode()
580 body := strings.NewReader(form.Encode())
581
582 req, _ := http.NewRequest("PUT", u.String(), body)
583 req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
584
585 return req
586 }
587
588 type deleteAction struct {
589 Prefix string
590 Key string
591 PrevValue string
592 PrevIndex uint64
593 Dir bool
594 Recursive bool
595 }
596
597 func (a *deleteAction) HTTPRequest(ep url.URL) *http.Request {
598 u := v2KeysURL(ep, a.Prefix, a.Key)
599
600 params := u.Query()
601 if a.PrevValue != "" {
602 params.Set("prevValue", a.PrevValue)
603 }
604 if a.PrevIndex != 0 {
605 params.Set("prevIndex", strconv.FormatUint(a.PrevIndex, 10))
606 }
607 if a.Dir {
608 params.Set("dir", "true")
609 }
610 if a.Recursive {
611 params.Set("recursive", "true")
612 }
613 u.RawQuery = params.Encode()
614
615 req, _ := http.NewRequest("DELETE", u.String(), nil)
616 req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
617
618 return req
619 }
620
621 type createInOrderAction struct {
622 Prefix string
623 Dir string
624 Value string
625 TTL time.Duration
626 }
627
628 func (a *createInOrderAction) HTTPRequest(ep url.URL) *http.Request {
629 u := v2KeysURL(ep, a.Prefix, a.Dir)
630
631 form := url.Values{}
632 form.Add("value", a.Value)
633 if a.TTL > 0 {
634 form.Add("ttl", strconv.FormatUint(uint64(a.TTL.Seconds()), 10))
635 }
636 body := strings.NewReader(form.Encode())
637
638 req, _ := http.NewRequest("POST", u.String(), body)
639 req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
640 return req
641 }
642
643 func unmarshalHTTPResponse(code int, header http.Header, body []byte) (res *Response, err error) {
644 switch code {
645 case http.StatusOK, http.StatusCreated:
646 if len(body) == 0 {
647 return nil, ErrEmptyBody
648 }
649 res, err = unmarshalSuccessfulKeysResponse(header, body)
650 default:
651 err = unmarshalFailedKeysResponse(body)
652 }
653 return res, err
654 }
655
656 var jsonIterator = caseSensitiveJsonIterator()
657
658 func unmarshalSuccessfulKeysResponse(header http.Header, body []byte) (*Response, error) {
659 var res Response
660 err := jsonIterator.Unmarshal(body, &res)
661 if err != nil {
662 return nil, ErrInvalidJSON
663 }
664 if header.Get("X-Etcd-Index") != "" {
665 res.Index, err = strconv.ParseUint(header.Get("X-Etcd-Index"), 10, 64)
666 if err != nil {
667 return nil, err
668 }
669 }
670 res.ClusterID = header.Get("X-Etcd-Cluster-ID")
671 return &res, nil
672 }
673
674 func unmarshalFailedKeysResponse(body []byte) error {
675 var etcdErr Error
676 if err := json.Unmarshal(body, &etcdErr); err != nil {
677 return ErrInvalidJSON
678 }
679 return etcdErr
680 }
681
View as plain text