1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package v2store
16
17 import (
18 "encoding/json"
19 "fmt"
20 "path"
21 "strconv"
22 "strings"
23 "sync"
24 "time"
25
26 "go.etcd.io/etcd/client/pkg/v3/types"
27 "go.etcd.io/etcd/server/v3/etcdserver/api/v2error"
28
29 "github.com/jonboulle/clockwork"
30 )
31
32
33 const defaultVersion = 2
34
35 var minExpireTime time.Time
36
37 func init() {
38 minExpireTime, _ = time.Parse(time.RFC3339, "2000-01-01T00:00:00Z")
39 }
40
41 type Store interface {
42 Version() int
43 Index() uint64
44
45 Get(nodePath string, recursive, sorted bool) (*Event, error)
46 Set(nodePath string, dir bool, value string, expireOpts TTLOptionSet) (*Event, error)
47 Update(nodePath string, newValue string, expireOpts TTLOptionSet) (*Event, error)
48 Create(nodePath string, dir bool, value string, unique bool,
49 expireOpts TTLOptionSet) (*Event, error)
50 CompareAndSwap(nodePath string, prevValue string, prevIndex uint64,
51 value string, expireOpts TTLOptionSet) (*Event, error)
52 Delete(nodePath string, dir, recursive bool) (*Event, error)
53 CompareAndDelete(nodePath string, prevValue string, prevIndex uint64) (*Event, error)
54
55 Watch(prefix string, recursive, stream bool, sinceIndex uint64) (Watcher, error)
56
57 Save() ([]byte, error)
58 Recovery(state []byte) error
59
60 Clone() Store
61 SaveNoCopy() ([]byte, error)
62
63 JsonStats() []byte
64 DeleteExpiredKeys(cutoff time.Time)
65
66 HasTTLKeys() bool
67 }
68
69 type TTLOptionSet struct {
70 ExpireTime time.Time
71 Refresh bool
72 }
73
74 type store struct {
75 Root *node
76 WatcherHub *watcherHub
77 CurrentIndex uint64
78 Stats *Stats
79 CurrentVersion int
80 ttlKeyHeap *ttlKeyHeap
81 worldLock sync.RWMutex
82 clock clockwork.Clock
83 readonlySet types.Set
84 }
85
86
87 func New(namespaces ...string) Store {
88 s := newStore(namespaces...)
89 s.clock = clockwork.NewRealClock()
90 return s
91 }
92
93 func newStore(namespaces ...string) *store {
94 s := new(store)
95 s.CurrentVersion = defaultVersion
96 s.Root = newDir(s, "/", s.CurrentIndex, nil, Permanent)
97 for _, namespace := range namespaces {
98 s.Root.Add(newDir(s, namespace, s.CurrentIndex, s.Root, Permanent))
99 }
100 s.Stats = newStats()
101 s.WatcherHub = newWatchHub(1000)
102 s.ttlKeyHeap = newTtlKeyHeap()
103 s.readonlySet = types.NewUnsafeSet(append(namespaces, "/")...)
104 return s
105 }
106
107
108 func (s *store) Version() int {
109 return s.CurrentVersion
110 }
111
112
113 func (s *store) Index() uint64 {
114 s.worldLock.RLock()
115 defer s.worldLock.RUnlock()
116 return s.CurrentIndex
117 }
118
119
120
121
122 func (s *store) Get(nodePath string, recursive, sorted bool) (*Event, error) {
123 var err *v2error.Error
124
125 s.worldLock.RLock()
126 defer s.worldLock.RUnlock()
127
128 defer func() {
129 if err == nil {
130 s.Stats.Inc(GetSuccess)
131 if recursive {
132 reportReadSuccess(GetRecursive)
133 } else {
134 reportReadSuccess(Get)
135 }
136 return
137 }
138
139 s.Stats.Inc(GetFail)
140 if recursive {
141 reportReadFailure(GetRecursive)
142 } else {
143 reportReadFailure(Get)
144 }
145 }()
146
147 n, err := s.internalGet(nodePath)
148 if err != nil {
149 return nil, err
150 }
151
152 e := newEvent(Get, nodePath, n.ModifiedIndex, n.CreatedIndex)
153 e.EtcdIndex = s.CurrentIndex
154 e.Node.loadInternalNode(n, recursive, sorted, s.clock)
155
156 return e, nil
157 }
158
159
160
161
162 func (s *store) Create(nodePath string, dir bool, value string, unique bool, expireOpts TTLOptionSet) (*Event, error) {
163 var err *v2error.Error
164
165 s.worldLock.Lock()
166 defer s.worldLock.Unlock()
167
168 defer func() {
169 if err == nil {
170 s.Stats.Inc(CreateSuccess)
171 reportWriteSuccess(Create)
172 return
173 }
174
175 s.Stats.Inc(CreateFail)
176 reportWriteFailure(Create)
177 }()
178
179 e, err := s.internalCreate(nodePath, dir, value, unique, false, expireOpts.ExpireTime, Create)
180 if err != nil {
181 return nil, err
182 }
183
184 e.EtcdIndex = s.CurrentIndex
185 s.WatcherHub.notify(e)
186
187 return e, nil
188 }
189
190
191 func (s *store) Set(nodePath string, dir bool, value string, expireOpts TTLOptionSet) (*Event, error) {
192 var err *v2error.Error
193
194 s.worldLock.Lock()
195 defer s.worldLock.Unlock()
196
197 defer func() {
198 if err == nil {
199 s.Stats.Inc(SetSuccess)
200 reportWriteSuccess(Set)
201 return
202 }
203
204 s.Stats.Inc(SetFail)
205 reportWriteFailure(Set)
206 }()
207
208
209 n, getErr := s.internalGet(nodePath)
210 if getErr != nil && getErr.ErrorCode != v2error.EcodeKeyNotFound {
211 err = getErr
212 return nil, err
213 }
214
215 if expireOpts.Refresh {
216 if getErr != nil {
217 err = getErr
218 return nil, err
219 }
220 value = n.Value
221 }
222
223
224 e, err := s.internalCreate(nodePath, dir, value, false, true, expireOpts.ExpireTime, Set)
225 if err != nil {
226 return nil, err
227 }
228 e.EtcdIndex = s.CurrentIndex
229
230
231 if getErr == nil {
232 prev := newEvent(Get, nodePath, n.ModifiedIndex, n.CreatedIndex)
233 prev.Node.loadInternalNode(n, false, false, s.clock)
234 e.PrevNode = prev.Node
235 }
236
237 if !expireOpts.Refresh {
238 s.WatcherHub.notify(e)
239 } else {
240 e.SetRefresh()
241 s.WatcherHub.add(e)
242 }
243
244 return e, nil
245 }
246
247
248 func getCompareFailCause(n *node, which int, prevValue string, prevIndex uint64) string {
249 switch which {
250 case CompareIndexNotMatch:
251 return fmt.Sprintf("[%v != %v]", prevIndex, n.ModifiedIndex)
252 case CompareValueNotMatch:
253 return fmt.Sprintf("[%v != %v]", prevValue, n.Value)
254 default:
255 return fmt.Sprintf("[%v != %v] [%v != %v]", prevValue, n.Value, prevIndex, n.ModifiedIndex)
256 }
257 }
258
259 func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint64,
260 value string, expireOpts TTLOptionSet) (*Event, error) {
261
262 var err *v2error.Error
263
264 s.worldLock.Lock()
265 defer s.worldLock.Unlock()
266
267 defer func() {
268 if err == nil {
269 s.Stats.Inc(CompareAndSwapSuccess)
270 reportWriteSuccess(CompareAndSwap)
271 return
272 }
273
274 s.Stats.Inc(CompareAndSwapFail)
275 reportWriteFailure(CompareAndSwap)
276 }()
277
278 nodePath = path.Clean(path.Join("/", nodePath))
279
280 if s.readonlySet.Contains(nodePath) {
281 return nil, v2error.NewError(v2error.EcodeRootROnly, "/", s.CurrentIndex)
282 }
283
284 n, err := s.internalGet(nodePath)
285 if err != nil {
286 return nil, err
287 }
288 if n.IsDir() {
289 err = v2error.NewError(v2error.EcodeNotFile, nodePath, s.CurrentIndex)
290 return nil, err
291 }
292
293
294
295 if ok, which := n.Compare(prevValue, prevIndex); !ok {
296 cause := getCompareFailCause(n, which, prevValue, prevIndex)
297 err = v2error.NewError(v2error.EcodeTestFailed, cause, s.CurrentIndex)
298 return nil, err
299 }
300
301 if expireOpts.Refresh {
302 value = n.Value
303 }
304
305
306 s.CurrentIndex++
307
308 e := newEvent(CompareAndSwap, nodePath, s.CurrentIndex, n.CreatedIndex)
309 e.EtcdIndex = s.CurrentIndex
310 e.PrevNode = n.Repr(false, false, s.clock)
311 eNode := e.Node
312
313
314 if err := n.Write(value, s.CurrentIndex); err != nil {
315 return nil, err
316 }
317 n.UpdateTTL(expireOpts.ExpireTime)
318
319
320 valueCopy := value
321 eNode.Value = &valueCopy
322 eNode.Expiration, eNode.TTL = n.expirationAndTTL(s.clock)
323
324 if !expireOpts.Refresh {
325 s.WatcherHub.notify(e)
326 } else {
327 e.SetRefresh()
328 s.WatcherHub.add(e)
329 }
330
331 return e, nil
332 }
333
334
335
336 func (s *store) Delete(nodePath string, dir, recursive bool) (*Event, error) {
337 var err *v2error.Error
338
339 s.worldLock.Lock()
340 defer s.worldLock.Unlock()
341
342 defer func() {
343 if err == nil {
344 s.Stats.Inc(DeleteSuccess)
345 reportWriteSuccess(Delete)
346 return
347 }
348
349 s.Stats.Inc(DeleteFail)
350 reportWriteFailure(Delete)
351 }()
352
353 nodePath = path.Clean(path.Join("/", nodePath))
354
355 if s.readonlySet.Contains(nodePath) {
356 return nil, v2error.NewError(v2error.EcodeRootROnly, "/", s.CurrentIndex)
357 }
358
359
360 if recursive {
361 dir = true
362 }
363
364 n, err := s.internalGet(nodePath)
365 if err != nil {
366 return nil, err
367 }
368
369 nextIndex := s.CurrentIndex + 1
370 e := newEvent(Delete, nodePath, nextIndex, n.CreatedIndex)
371 e.EtcdIndex = nextIndex
372 e.PrevNode = n.Repr(false, false, s.clock)
373 eNode := e.Node
374
375 if n.IsDir() {
376 eNode.Dir = true
377 }
378
379 callback := func(path string) {
380
381 s.WatcherHub.notifyWatchers(e, path, true)
382 }
383
384 err = n.Remove(dir, recursive, callback)
385 if err != nil {
386 return nil, err
387 }
388
389
390 s.CurrentIndex++
391
392 s.WatcherHub.notify(e)
393
394 return e, nil
395 }
396
397 func (s *store) CompareAndDelete(nodePath string, prevValue string, prevIndex uint64) (*Event, error) {
398 var err *v2error.Error
399
400 s.worldLock.Lock()
401 defer s.worldLock.Unlock()
402
403 defer func() {
404 if err == nil {
405 s.Stats.Inc(CompareAndDeleteSuccess)
406 reportWriteSuccess(CompareAndDelete)
407 return
408 }
409
410 s.Stats.Inc(CompareAndDeleteFail)
411 reportWriteFailure(CompareAndDelete)
412 }()
413
414 nodePath = path.Clean(path.Join("/", nodePath))
415
416 n, err := s.internalGet(nodePath)
417 if err != nil {
418 return nil, err
419 }
420 if n.IsDir() {
421 return nil, v2error.NewError(v2error.EcodeNotFile, nodePath, s.CurrentIndex)
422 }
423
424
425
426 if ok, which := n.Compare(prevValue, prevIndex); !ok {
427 cause := getCompareFailCause(n, which, prevValue, prevIndex)
428 return nil, v2error.NewError(v2error.EcodeTestFailed, cause, s.CurrentIndex)
429 }
430
431
432 s.CurrentIndex++
433
434 e := newEvent(CompareAndDelete, nodePath, s.CurrentIndex, n.CreatedIndex)
435 e.EtcdIndex = s.CurrentIndex
436 e.PrevNode = n.Repr(false, false, s.clock)
437
438 callback := func(path string) {
439
440 s.WatcherHub.notifyWatchers(e, path, true)
441 }
442
443 err = n.Remove(false, false, callback)
444 if err != nil {
445 return nil, err
446 }
447
448 s.WatcherHub.notify(e)
449
450 return e, nil
451 }
452
453 func (s *store) Watch(key string, recursive, stream bool, sinceIndex uint64) (Watcher, error) {
454 s.worldLock.RLock()
455 defer s.worldLock.RUnlock()
456
457 key = path.Clean(path.Join("/", key))
458 if sinceIndex == 0 {
459 sinceIndex = s.CurrentIndex + 1
460 }
461
462 w, err := s.WatcherHub.watch(key, recursive, stream, sinceIndex, s.CurrentIndex)
463 if err != nil {
464 return nil, err
465 }
466
467 return w, nil
468 }
469
470
471 func (s *store) walk(nodePath string, walkFunc func(prev *node, component string) (*node, *v2error.Error)) (*node, *v2error.Error) {
472 components := strings.Split(nodePath, "/")
473
474 curr := s.Root
475 var err *v2error.Error
476
477 for i := 1; i < len(components); i++ {
478 if len(components[i]) == 0 {
479 return curr, nil
480 }
481
482 curr, err = walkFunc(curr, components[i])
483 if err != nil {
484 return nil, err
485 }
486 }
487
488 return curr, nil
489 }
490
491
492
493
494 func (s *store) Update(nodePath string, newValue string, expireOpts TTLOptionSet) (*Event, error) {
495 var err *v2error.Error
496
497 s.worldLock.Lock()
498 defer s.worldLock.Unlock()
499
500 defer func() {
501 if err == nil {
502 s.Stats.Inc(UpdateSuccess)
503 reportWriteSuccess(Update)
504 return
505 }
506
507 s.Stats.Inc(UpdateFail)
508 reportWriteFailure(Update)
509 }()
510
511 nodePath = path.Clean(path.Join("/", nodePath))
512
513 if s.readonlySet.Contains(nodePath) {
514 return nil, v2error.NewError(v2error.EcodeRootROnly, "/", s.CurrentIndex)
515 }
516
517 currIndex, nextIndex := s.CurrentIndex, s.CurrentIndex+1
518
519 n, err := s.internalGet(nodePath)
520 if err != nil {
521 return nil, err
522 }
523 if n.IsDir() && len(newValue) != 0 {
524
525 return nil, v2error.NewError(v2error.EcodeNotFile, nodePath, currIndex)
526 }
527
528 if expireOpts.Refresh {
529 newValue = n.Value
530 }
531
532 e := newEvent(Update, nodePath, nextIndex, n.CreatedIndex)
533 e.EtcdIndex = nextIndex
534 e.PrevNode = n.Repr(false, false, s.clock)
535 eNode := e.Node
536
537 if err := n.Write(newValue, nextIndex); err != nil {
538 return nil, fmt.Errorf("nodePath %v : %v", nodePath, err)
539 }
540
541 if n.IsDir() {
542 eNode.Dir = true
543 } else {
544
545 newValueCopy := newValue
546 eNode.Value = &newValueCopy
547 }
548
549
550 n.UpdateTTL(expireOpts.ExpireTime)
551
552 eNode.Expiration, eNode.TTL = n.expirationAndTTL(s.clock)
553
554 if !expireOpts.Refresh {
555 s.WatcherHub.notify(e)
556 } else {
557 e.SetRefresh()
558 s.WatcherHub.add(e)
559 }
560
561 s.CurrentIndex = nextIndex
562
563 return e, nil
564 }
565
566 func (s *store) internalCreate(nodePath string, dir bool, value string, unique, replace bool,
567 expireTime time.Time, action string) (*Event, *v2error.Error) {
568
569 currIndex, nextIndex := s.CurrentIndex, s.CurrentIndex+1
570
571 if unique {
572 nodePath += "/" + fmt.Sprintf("%020s", strconv.FormatUint(nextIndex, 10))
573 }
574
575 nodePath = path.Clean(path.Join("/", nodePath))
576
577
578 if s.readonlySet.Contains(nodePath) {
579 return nil, v2error.NewError(v2error.EcodeRootROnly, "/", currIndex)
580 }
581
582
583
584 if expireTime.Before(minExpireTime) {
585 expireTime = Permanent
586 }
587
588 dirName, nodeName := path.Split(nodePath)
589
590
591 d, err := s.walk(dirName, s.checkDir)
592
593 if err != nil {
594 s.Stats.Inc(SetFail)
595 reportWriteFailure(action)
596 err.Index = currIndex
597 return nil, err
598 }
599
600 e := newEvent(action, nodePath, nextIndex, nextIndex)
601 eNode := e.Node
602
603 n, _ := d.GetChild(nodeName)
604
605
606 if n != nil {
607 if replace {
608 if n.IsDir() {
609 return nil, v2error.NewError(v2error.EcodeNotFile, nodePath, currIndex)
610 }
611 e.PrevNode = n.Repr(false, false, s.clock)
612
613 if err := n.Remove(false, false, nil); err != nil {
614 return nil, err
615 }
616 } else {
617 return nil, v2error.NewError(v2error.EcodeNodeExist, nodePath, currIndex)
618 }
619 }
620
621 if !dir {
622
623 valueCopy := value
624 eNode.Value = &valueCopy
625
626 n = newKV(s, nodePath, value, nextIndex, d, expireTime)
627
628 } else {
629 eNode.Dir = true
630
631 n = newDir(s, nodePath, nextIndex, d, expireTime)
632 }
633
634
635 if err := d.Add(n); err != nil {
636 return nil, err
637 }
638
639
640 if !n.IsPermanent() {
641 s.ttlKeyHeap.push(n)
642
643 eNode.Expiration, eNode.TTL = n.expirationAndTTL(s.clock)
644 }
645
646 s.CurrentIndex = nextIndex
647
648 return e, nil
649 }
650
651
652 func (s *store) internalGet(nodePath string) (*node, *v2error.Error) {
653 nodePath = path.Clean(path.Join("/", nodePath))
654
655 walkFunc := func(parent *node, name string) (*node, *v2error.Error) {
656
657 if !parent.IsDir() {
658 err := v2error.NewError(v2error.EcodeNotDir, parent.Path, s.CurrentIndex)
659 return nil, err
660 }
661
662 child, ok := parent.Children[name]
663 if ok {
664 return child, nil
665 }
666
667 return nil, v2error.NewError(v2error.EcodeKeyNotFound, path.Join(parent.Path, name), s.CurrentIndex)
668 }
669
670 f, err := s.walk(nodePath, walkFunc)
671
672 if err != nil {
673 return nil, err
674 }
675 return f, nil
676 }
677
678
679 func (s *store) DeleteExpiredKeys(cutoff time.Time) {
680 s.worldLock.Lock()
681 defer s.worldLock.Unlock()
682
683 for {
684 node := s.ttlKeyHeap.top()
685 if node == nil || node.ExpireTime.After(cutoff) {
686 break
687 }
688
689 s.CurrentIndex++
690 e := newEvent(Expire, node.Path, s.CurrentIndex, node.CreatedIndex)
691 e.EtcdIndex = s.CurrentIndex
692 e.PrevNode = node.Repr(false, false, s.clock)
693 if node.IsDir() {
694 e.Node.Dir = true
695 }
696
697 callback := func(path string) {
698
699 s.WatcherHub.notifyWatchers(e, path, true)
700 }
701
702 s.ttlKeyHeap.pop()
703 node.Remove(true, true, callback)
704
705 reportExpiredKey()
706 s.Stats.Inc(ExpireCount)
707
708 s.WatcherHub.notify(e)
709 }
710
711 }
712
713
714
715
716
717 func (s *store) checkDir(parent *node, dirName string) (*node, *v2error.Error) {
718 node, ok := parent.Children[dirName]
719
720 if ok {
721 if node.IsDir() {
722 return node, nil
723 }
724
725 return nil, v2error.NewError(v2error.EcodeNotDir, node.Path, s.CurrentIndex)
726 }
727
728 n := newDir(s, path.Join(parent.Path, dirName), s.CurrentIndex+1, parent, Permanent)
729
730 parent.Children[dirName] = n
731
732 return n, nil
733 }
734
735
736
737
738
739 func (s *store) Save() ([]byte, error) {
740 b, err := json.Marshal(s.Clone())
741 if err != nil {
742 return nil, err
743 }
744
745 return b, nil
746 }
747
748 func (s *store) SaveNoCopy() ([]byte, error) {
749 b, err := json.Marshal(s)
750 if err != nil {
751 return nil, err
752 }
753
754 return b, nil
755 }
756
757 func (s *store) Clone() Store {
758 s.worldLock.RLock()
759
760 clonedStore := newStore()
761 clonedStore.CurrentIndex = s.CurrentIndex
762 clonedStore.Root = s.Root.Clone()
763 clonedStore.WatcherHub = s.WatcherHub.clone()
764 clonedStore.Stats = s.Stats.clone()
765 clonedStore.CurrentVersion = s.CurrentVersion
766
767 s.worldLock.RUnlock()
768 return clonedStore
769 }
770
771
772
773
774
775 func (s *store) Recovery(state []byte) error {
776 s.worldLock.Lock()
777 defer s.worldLock.Unlock()
778 err := json.Unmarshal(state, s)
779
780 if err != nil {
781 return err
782 }
783
784 s.ttlKeyHeap = newTtlKeyHeap()
785
786 s.Root.recoverAndclean()
787 return nil
788 }
789
790 func (s *store) JsonStats() []byte {
791 s.Stats.Watchers = uint64(s.WatcherHub.count)
792 return s.Stats.toJson()
793 }
794
795 func (s *store) HasTTLKeys() bool {
796 s.worldLock.RLock()
797 defer s.worldLock.RUnlock()
798 return s.ttlKeyHeap.Len() != 0
799 }
800
View as plain text