1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package mvcc
16
17 import (
18 "sync"
19 "time"
20
21 "go.etcd.io/etcd/api/v3/mvccpb"
22 clientv3 "go.etcd.io/etcd/client/v3"
23 "go.etcd.io/etcd/pkg/v3/traceutil"
24 "go.etcd.io/etcd/server/v3/lease"
25 "go.etcd.io/etcd/server/v3/mvcc/backend"
26 "go.etcd.io/etcd/server/v3/mvcc/buckets"
27
28 "go.uber.org/zap"
29 )
30
31
32 var (
33
34
35
36 chanBufLen = 128
37
38
39 maxWatchersPerSync = 512
40 )
41
42 type watchable interface {
43 watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc)
44 progress(w *watcher)
45 progressAll(watchers map[WatchID]*watcher) bool
46 rev() int64
47 }
48
49 type watchableStore struct {
50 *store
51
52
53
54 mu sync.RWMutex
55
56
57 victims []watcherBatch
58 victimc chan struct{}
59
60
61 unsynced watcherGroup
62
63
64
65 synced watcherGroup
66
67 stopc chan struct{}
68 wg sync.WaitGroup
69 }
70
71
72
73 type cancelFunc func()
74
75 func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) WatchableKV {
76 return newWatchableStore(lg, b, le, cfg)
77 }
78
79 func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) *watchableStore {
80 if lg == nil {
81 lg = zap.NewNop()
82 }
83 s := &watchableStore{
84 store: NewStore(lg, b, le, cfg),
85 victimc: make(chan struct{}, 1),
86 unsynced: newWatcherGroup(),
87 synced: newWatcherGroup(),
88 stopc: make(chan struct{}),
89 }
90 s.store.ReadView = &readView{s}
91 s.store.WriteView = &writeView{s}
92 if s.le != nil {
93
94 s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write(traceutil.TODO()) })
95 }
96 s.wg.Add(2)
97 go s.syncWatchersLoop()
98 go s.syncVictimsLoop()
99 return s
100 }
101
102 func (s *watchableStore) Close() error {
103 close(s.stopc)
104 s.wg.Wait()
105 return s.store.Close()
106 }
107
108 func (s *watchableStore) NewWatchStream() WatchStream {
109 watchStreamGauge.Inc()
110 return &watchStream{
111 watchable: s,
112 ch: make(chan WatchResponse, chanBufLen),
113 cancels: make(map[WatchID]cancelFunc),
114 watchers: make(map[WatchID]*watcher),
115 }
116 }
117
118 func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc) {
119 wa := &watcher{
120 key: key,
121 end: end,
122 minRev: startRev,
123 id: id,
124 ch: ch,
125 fcs: fcs,
126 }
127
128 s.mu.Lock()
129 s.revMu.RLock()
130 synced := startRev > s.store.currentRev || startRev == 0
131 if synced {
132 wa.minRev = s.store.currentRev + 1
133 if startRev > wa.minRev {
134 wa.minRev = startRev
135 }
136 s.synced.add(wa)
137 } else {
138 slowWatcherGauge.Inc()
139 s.unsynced.add(wa)
140 }
141 s.revMu.RUnlock()
142 s.mu.Unlock()
143
144 watcherGauge.Inc()
145
146 return wa, func() { s.cancelWatcher(wa) }
147 }
148
149
150 func (s *watchableStore) cancelWatcher(wa *watcher) {
151 for {
152 s.mu.Lock()
153 if s.unsynced.delete(wa) {
154 slowWatcherGauge.Dec()
155 watcherGauge.Dec()
156 break
157 } else if s.synced.delete(wa) {
158 watcherGauge.Dec()
159 break
160 } else if wa.compacted {
161 watcherGauge.Dec()
162 break
163 } else if wa.ch == nil {
164
165 break
166 }
167
168 if !wa.victim {
169 s.mu.Unlock()
170 panic("watcher not victim but not in watch groups")
171 }
172
173 var victimBatch watcherBatch
174 for _, wb := range s.victims {
175 if wb[wa] != nil {
176 victimBatch = wb
177 break
178 }
179 }
180 if victimBatch != nil {
181 slowWatcherGauge.Dec()
182 watcherGauge.Dec()
183 delete(victimBatch, wa)
184 break
185 }
186
187
188 s.mu.Unlock()
189 time.Sleep(time.Millisecond)
190 }
191
192 wa.ch = nil
193 s.mu.Unlock()
194 }
195
196 func (s *watchableStore) Restore(b backend.Backend) error {
197 s.mu.Lock()
198 defer s.mu.Unlock()
199 err := s.store.Restore(b)
200 if err != nil {
201 return err
202 }
203
204 for wa := range s.synced.watchers {
205 wa.restore = true
206 s.unsynced.add(wa)
207 }
208 s.synced = newWatcherGroup()
209 return nil
210 }
211
212
213 func (s *watchableStore) syncWatchersLoop() {
214 defer s.wg.Done()
215
216 for {
217 s.mu.RLock()
218 st := time.Now()
219 lastUnsyncedWatchers := s.unsynced.size()
220 s.mu.RUnlock()
221
222 unsyncedWatchers := 0
223 if lastUnsyncedWatchers > 0 {
224 unsyncedWatchers = s.syncWatchers()
225 }
226 syncDuration := time.Since(st)
227
228 waitDuration := 100 * time.Millisecond
229
230 if unsyncedWatchers != 0 && lastUnsyncedWatchers > unsyncedWatchers {
231
232 waitDuration = syncDuration
233 }
234
235 select {
236 case <-time.After(waitDuration):
237 case <-s.stopc:
238 return
239 }
240 }
241 }
242
243
244
245 func (s *watchableStore) syncVictimsLoop() {
246 defer s.wg.Done()
247
248 for {
249 for s.moveVictims() != 0 {
250
251 }
252 s.mu.RLock()
253 isEmpty := len(s.victims) == 0
254 s.mu.RUnlock()
255
256 var tickc <-chan time.Time
257 if !isEmpty {
258 tickc = time.After(10 * time.Millisecond)
259 }
260
261 select {
262 case <-tickc:
263 case <-s.victimc:
264 case <-s.stopc:
265 return
266 }
267 }
268 }
269
270
271 func (s *watchableStore) moveVictims() (moved int) {
272 s.mu.Lock()
273 victims := s.victims
274 s.victims = nil
275 s.mu.Unlock()
276
277 var newVictim watcherBatch
278 for _, wb := range victims {
279
280 for w, eb := range wb {
281
282 rev := w.minRev - 1
283 if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}) {
284 pendingEventsGauge.Add(float64(len(eb.evs)))
285 } else {
286 if newVictim == nil {
287 newVictim = make(watcherBatch)
288 }
289 newVictim[w] = eb
290 continue
291 }
292 moved++
293 }
294
295
296 s.mu.Lock()
297 s.store.revMu.RLock()
298 curRev := s.store.currentRev
299 for w, eb := range wb {
300 if newVictim != nil && newVictim[w] != nil {
301
302 continue
303 }
304 w.victim = false
305 if eb.moreRev != 0 {
306 w.minRev = eb.moreRev
307 }
308 if w.minRev <= curRev {
309 s.unsynced.add(w)
310 } else {
311 slowWatcherGauge.Dec()
312 s.synced.add(w)
313 }
314 }
315 s.store.revMu.RUnlock()
316 s.mu.Unlock()
317 }
318
319 if len(newVictim) > 0 {
320 s.mu.Lock()
321 s.victims = append(s.victims, newVictim)
322 s.mu.Unlock()
323 }
324
325 return moved
326 }
327
328
329
330
331
332
333 func (s *watchableStore) syncWatchers() int {
334 s.mu.Lock()
335 defer s.mu.Unlock()
336
337 if s.unsynced.size() == 0 {
338 return 0
339 }
340
341 s.store.revMu.RLock()
342 defer s.store.revMu.RUnlock()
343
344
345
346
347 curRev := s.store.currentRev
348 compactionRev := s.store.compactMainRev
349
350 wg, minRev := s.unsynced.choose(maxWatchersPerSync, curRev, compactionRev)
351 minBytes, maxBytes := newRevBytes(), newRevBytes()
352 revToBytes(revision{main: minRev}, minBytes)
353 revToBytes(revision{main: curRev + 1}, maxBytes)
354
355
356
357 tx := s.store.b.ReadTx()
358 tx.RLock()
359 revs, vs := tx.UnsafeRange(buckets.Key, minBytes, maxBytes, 0)
360 evs := kvsToEvents(s.store.lg, wg, revs, vs)
361
362
363
364 tx.RUnlock()
365
366 var victims watcherBatch
367 wb := newWatcherBatch(wg, evs)
368 for w := range wg.watchers {
369 if w.minRev < compactionRev {
370
371
372 continue
373 }
374 w.minRev = curRev + 1
375
376 eb, ok := wb[w]
377 if !ok {
378
379 s.synced.add(w)
380 s.unsynced.delete(w)
381 continue
382 }
383
384 if eb.moreRev != 0 {
385 w.minRev = eb.moreRev
386 }
387
388 if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: curRev}) {
389 pendingEventsGauge.Add(float64(len(eb.evs)))
390 } else {
391 if victims == nil {
392 victims = make(watcherBatch)
393 }
394 w.victim = true
395 }
396
397 if w.victim {
398 victims[w] = eb
399 } else {
400 if eb.moreRev != 0 {
401
402 continue
403 }
404 s.synced.add(w)
405 }
406 s.unsynced.delete(w)
407 }
408 s.addVictim(victims)
409
410 vsz := 0
411 for _, v := range s.victims {
412 vsz += len(v)
413 }
414 slowWatcherGauge.Set(float64(s.unsynced.size() + vsz))
415
416 return s.unsynced.size()
417 }
418
419
420 func kvsToEvents(lg *zap.Logger, wg *watcherGroup, revs, vals [][]byte) (evs []mvccpb.Event) {
421 for i, v := range vals {
422 var kv mvccpb.KeyValue
423 if err := kv.Unmarshal(v); err != nil {
424 lg.Panic("failed to unmarshal mvccpb.KeyValue", zap.Error(err))
425 }
426
427 if !wg.contains(string(kv.Key)) {
428 continue
429 }
430
431 ty := mvccpb.PUT
432 if isTombstone(revs[i]) {
433 ty = mvccpb.DELETE
434
435 kv.ModRevision = bytesToRev(revs[i]).main
436 }
437 evs = append(evs, mvccpb.Event{Kv: &kv, Type: ty})
438 }
439 return evs
440 }
441
442
443
444 func (s *watchableStore) notify(rev int64, evs []mvccpb.Event) {
445 var victim watcherBatch
446 for w, eb := range newWatcherBatch(&s.synced, evs) {
447 if eb.revs != 1 {
448 s.store.lg.Panic(
449 "unexpected multiple revisions in watch notification",
450 zap.Int("number-of-revisions", eb.revs),
451 )
452 }
453 if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}) {
454 pendingEventsGauge.Add(float64(len(eb.evs)))
455 } else {
456
457 if victim == nil {
458 victim = make(watcherBatch)
459 }
460 w.victim = true
461 victim[w] = eb
462 s.synced.delete(w)
463 slowWatcherGauge.Inc()
464 }
465
466
467
468 w.minRev = rev + 1
469 }
470 s.addVictim(victim)
471 }
472
473 func (s *watchableStore) addVictim(victim watcherBatch) {
474 if victim == nil {
475 return
476 }
477 s.victims = append(s.victims, victim)
478 select {
479 case s.victimc <- struct{}{}:
480 default:
481 }
482 }
483
484 func (s *watchableStore) rev() int64 { return s.store.Rev() }
485
486 func (s *watchableStore) progress(w *watcher) {
487 s.progressIfSync(map[WatchID]*watcher{w.id: w}, w.id)
488 }
489
490 func (s *watchableStore) progressAll(watchers map[WatchID]*watcher) bool {
491 return s.progressIfSync(watchers, clientv3.InvalidWatchID)
492 }
493
494 func (s *watchableStore) progressIfSync(watchers map[WatchID]*watcher, responseWatchID WatchID) bool {
495 s.mu.RLock()
496 defer s.mu.RUnlock()
497
498
499 for _, w := range watchers {
500 if _, ok := s.synced.watchers[w]; !ok {
501 return false
502 }
503 }
504
505
506
507
508
509
510 for _, w := range watchers {
511 w.send(WatchResponse{WatchID: responseWatchID, Revision: s.rev()})
512 return true
513 }
514 return true
515 }
516
517 type watcher struct {
518
519 key []byte
520
521
522 end []byte
523
524
525 victim bool
526
527
528 compacted bool
529
530
531
532
533
534
535
536 restore bool
537
538
539 minRev int64
540 id WatchID
541
542 fcs []FilterFunc
543
544
545 ch chan<- WatchResponse
546 }
547
548 func (w *watcher) send(wr WatchResponse) bool {
549 progressEvent := len(wr.Events) == 0
550
551 if len(w.fcs) != 0 {
552 ne := make([]mvccpb.Event, 0, len(wr.Events))
553 for i := range wr.Events {
554 filtered := false
555 for _, filter := range w.fcs {
556 if filter(wr.Events[i]) {
557 filtered = true
558 break
559 }
560 }
561 if !filtered {
562 ne = append(ne, wr.Events[i])
563 }
564 }
565 wr.Events = ne
566 }
567
568
569 if !progressEvent && len(wr.Events) == 0 {
570 return true
571 }
572 select {
573 case w.ch <- wr:
574 return true
575 default:
576 return false
577 }
578 }
579
View as plain text