...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package v2store
16
17 import (
18 "container/list"
19 "path"
20 "strings"
21 "sync"
22 "sync/atomic"
23
24 "go.etcd.io/etcd/server/v3/etcdserver/api/v2error"
25 )
26
27
28
29
30
31
32
33 type watcherHub struct {
34
35
36
37 count int64
38
39 mutex sync.Mutex
40 watchers map[string]*list.List
41 EventHistory *EventHistory
42 }
43
44
45
46
47
48 func newWatchHub(capacity int) *watcherHub {
49 return &watcherHub{
50 watchers: make(map[string]*list.List),
51 EventHistory: newEventHistory(capacity),
52 }
53 }
54
55
56
57
58
59 func (wh *watcherHub) watch(key string, recursive, stream bool, index, storeIndex uint64) (Watcher, *v2error.Error) {
60 reportWatchRequest()
61 event, err := wh.EventHistory.scan(key, recursive, index)
62
63 if err != nil {
64 err.Index = storeIndex
65 return nil, err
66 }
67
68 w := &watcher{
69 eventChan: make(chan *Event, 100),
70 recursive: recursive,
71 stream: stream,
72 sinceIndex: index,
73 startIndex: storeIndex,
74 hub: wh,
75 }
76
77 wh.mutex.Lock()
78 defer wh.mutex.Unlock()
79
80 if event != nil {
81 ne := event.Clone()
82 ne.EtcdIndex = storeIndex
83 w.eventChan <- ne
84 return w, nil
85 }
86
87 l, ok := wh.watchers[key]
88
89 var elem *list.Element
90
91 if ok {
92 elem = l.PushBack(w)
93 } else {
94 l = list.New()
95 elem = l.PushBack(w)
96 wh.watchers[key] = l
97 }
98
99 w.remove = func() {
100 if w.removed {
101 return
102 }
103 w.removed = true
104 l.Remove(elem)
105 atomic.AddInt64(&wh.count, -1)
106 reportWatcherRemoved()
107 if l.Len() == 0 {
108 delete(wh.watchers, key)
109 }
110 }
111
112 atomic.AddInt64(&wh.count, 1)
113 reportWatcherAdded()
114
115 return w, nil
116 }
117
118 func (wh *watcherHub) add(e *Event) {
119 wh.EventHistory.addEvent(e)
120 }
121
122
123 func (wh *watcherHub) notify(e *Event) {
124 e = wh.EventHistory.addEvent(e)
125
126 segments := strings.Split(e.Node.Key, "/")
127
128 currPath := "/"
129
130
131
132
133
134 for _, segment := range segments {
135 currPath = path.Join(currPath, segment)
136
137 wh.notifyWatchers(e, currPath, false)
138 }
139 }
140
141 func (wh *watcherHub) notifyWatchers(e *Event, nodePath string, deleted bool) {
142 wh.mutex.Lock()
143 defer wh.mutex.Unlock()
144
145 l, ok := wh.watchers[nodePath]
146 if ok {
147 curr := l.Front()
148
149 for curr != nil {
150 next := curr.Next()
151
152 w, _ := curr.Value.(*watcher)
153
154 originalPath := e.Node.Key == nodePath
155 if (originalPath || !isHidden(nodePath, e.Node.Key)) && w.notify(e, originalPath, deleted) {
156 if !w.stream {
157
158
159
160 w.removed = true
161 l.Remove(curr)
162 atomic.AddInt64(&wh.count, -1)
163 reportWatcherRemoved()
164 }
165 }
166
167 curr = next
168 }
169
170 if l.Len() == 0 {
171
172
173 delete(wh.watchers, nodePath)
174 }
175 }
176 }
177
178
179
180 func (wh *watcherHub) clone() *watcherHub {
181 clonedHistory := wh.EventHistory.clone()
182
183 return &watcherHub{
184 EventHistory: clonedHistory,
185 }
186 }
187
188
189
190 func isHidden(watchPath, keyPath string) bool {
191
192
193 if len(watchPath) > len(keyPath) {
194 return false
195 }
196
197
198 afterPath := path.Clean("/" + keyPath[len(watchPath):])
199 return strings.Contains(afterPath, "/_")
200 }
201
View as plain text