...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package v2store
16
17 import (
18 "fmt"
19 "path"
20 "strings"
21 "sync"
22
23 "go.etcd.io/etcd/server/v3/etcdserver/api/v2error"
24 )
25
26 type EventHistory struct {
27 Queue eventQueue
28 StartIndex uint64
29 LastIndex uint64
30 rwl sync.RWMutex
31 }
32
33 func newEventHistory(capacity int) *EventHistory {
34 return &EventHistory{
35 Queue: eventQueue{
36 Capacity: capacity,
37 Events: make([]*Event, capacity),
38 },
39 }
40 }
41
42
43 func (eh *EventHistory) addEvent(e *Event) *Event {
44 eh.rwl.Lock()
45 defer eh.rwl.Unlock()
46
47 eh.Queue.insert(e)
48
49 eh.LastIndex = e.Index()
50
51 eh.StartIndex = eh.Queue.Events[eh.Queue.Front].Index()
52
53 return e
54 }
55
56
57
58 func (eh *EventHistory) scan(key string, recursive bool, index uint64) (*Event, *v2error.Error) {
59 eh.rwl.RLock()
60 defer eh.rwl.RUnlock()
61
62
63 if index < eh.StartIndex {
64 return nil,
65 v2error.NewError(v2error.EcodeEventIndexCleared,
66 fmt.Sprintf("the requested history has been cleared [%v/%v]",
67 eh.StartIndex, index), 0)
68 }
69
70
71 if index > eh.LastIndex {
72 return nil, nil
73 }
74
75 offset := index - eh.StartIndex
76 i := (eh.Queue.Front + int(offset)) % eh.Queue.Capacity
77
78 for {
79 e := eh.Queue.Events[i]
80
81 if !e.Refresh {
82 ok := e.Node.Key == key
83
84 if recursive {
85
86 nkey := path.Clean(key)
87 if nkey[len(nkey)-1] != '/' {
88 nkey = nkey + "/"
89 }
90
91 ok = ok || strings.HasPrefix(e.Node.Key, nkey)
92 }
93
94 if (e.Action == Delete || e.Action == Expire) && e.PrevNode != nil && e.PrevNode.Dir {
95 ok = ok || strings.HasPrefix(key, e.PrevNode.Key)
96 }
97
98 if ok {
99 return e, nil
100 }
101 }
102
103 i = (i + 1) % eh.Queue.Capacity
104
105 if i == eh.Queue.Back {
106 return nil, nil
107 }
108 }
109 }
110
111
112
113 func (eh *EventHistory) clone() *EventHistory {
114 clonedQueue := eventQueue{
115 Capacity: eh.Queue.Capacity,
116 Events: make([]*Event, eh.Queue.Capacity),
117 Size: eh.Queue.Size,
118 Front: eh.Queue.Front,
119 Back: eh.Queue.Back,
120 }
121
122 copy(clonedQueue.Events, eh.Queue.Events)
123 return &EventHistory{
124 StartIndex: eh.StartIndex,
125 Queue: clonedQueue,
126 LastIndex: eh.LastIndex,
127 }
128
129 }
130
View as plain text