1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package mvcc
16
17 import (
18 "bytes"
19 "fmt"
20 "os"
21 "reflect"
22 "sync"
23 "testing"
24 "time"
25
26 "github.com/stretchr/testify/require"
27 "go.uber.org/zap"
28 "go.uber.org/zap/zaptest"
29
30 "go.etcd.io/etcd/api/v3/mvccpb"
31 "go.etcd.io/etcd/pkg/v3/traceutil"
32 "go.etcd.io/etcd/server/v3/lease"
33 betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing"
34 )
35
36 func TestWatch(t *testing.T) {
37 b, tmpPath := betesting.NewDefaultTmpBackend(t)
38 s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
39
40 defer func() {
41 s.store.Close()
42 os.Remove(tmpPath)
43 }()
44
45 testKey := []byte("foo")
46 testValue := []byte("bar")
47 s.Put(testKey, testValue, lease.NoLease)
48
49 w := s.NewWatchStream()
50 w.Watch(0, testKey, nil, 0)
51
52 if !s.synced.contains(string(testKey)) {
53
54 t.Errorf("existence = false, want true")
55 }
56 }
57
58 func TestNewWatcherCancel(t *testing.T) {
59 b, tmpPath := betesting.NewDefaultTmpBackend(t)
60 s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
61
62 defer func() {
63 s.store.Close()
64 os.Remove(tmpPath)
65 }()
66 testKey := []byte("foo")
67 testValue := []byte("bar")
68 s.Put(testKey, testValue, lease.NoLease)
69
70 w := s.NewWatchStream()
71 wt, _ := w.Watch(0, testKey, nil, 0)
72
73 if err := w.Cancel(wt); err != nil {
74 t.Error(err)
75 }
76
77 if s.synced.contains(string(testKey)) {
78
79 t.Errorf("existence = true, want false")
80 }
81 }
82
83
84 func TestCancelUnsynced(t *testing.T) {
85 b, tmpPath := betesting.NewDefaultTmpBackend(t)
86
87
88
89
90
91 s := &watchableStore{
92 store: NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}),
93 unsynced: newWatcherGroup(),
94
95
96
97 synced: newWatcherGroup(),
98 }
99
100 defer func() {
101 s.store.Close()
102 os.Remove(tmpPath)
103 }()
104
105
106
107
108
109 testKey := []byte("foo")
110 testValue := []byte("bar")
111 s.Put(testKey, testValue, lease.NoLease)
112
113 w := s.NewWatchStream()
114
115
116 watcherN := 100
117
118
119 watchIDs := make([]WatchID, watcherN)
120 for i := 0; i < watcherN; i++ {
121
122 watchIDs[i], _ = w.Watch(0, testKey, nil, 1)
123 }
124
125 for _, idx := range watchIDs {
126 if err := w.Cancel(idx); err != nil {
127 t.Error(err)
128 }
129 }
130
131
132
133
134
135 if size := s.unsynced.size(); size != 0 {
136 t.Errorf("unsynced size = %d, want 0", size)
137 }
138 }
139
140
141
142
143 func TestSyncWatchers(t *testing.T) {
144 b, tmpPath := betesting.NewDefaultTmpBackend(t)
145
146 s := &watchableStore{
147 store: NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}),
148 unsynced: newWatcherGroup(),
149 synced: newWatcherGroup(),
150 }
151
152 defer func() {
153 s.store.Close()
154 os.Remove(tmpPath)
155 }()
156
157 testKey := []byte("foo")
158 testValue := []byte("bar")
159 s.Put(testKey, testValue, lease.NoLease)
160
161 w := s.NewWatchStream()
162
163
164 watcherN := 100
165
166 for i := 0; i < watcherN; i++ {
167
168 w.Watch(0, testKey, nil, 1)
169 }
170
171
172
173 sws := s.synced.watcherSetByKey(string(testKey))
174 uws := s.unsynced.watcherSetByKey(string(testKey))
175
176 if len(sws) != 0 {
177 t.Fatalf("synced[string(testKey)] size = %d, want 0", len(sws))
178 }
179
180 if len(uws) != watcherN {
181 t.Errorf("unsynced size = %d, want %d", len(uws), watcherN)
182 }
183
184
185 s.syncWatchers()
186
187 sws = s.synced.watcherSetByKey(string(testKey))
188 uws = s.unsynced.watcherSetByKey(string(testKey))
189
190
191
192 if len(sws) != watcherN {
193 t.Errorf("synced[string(testKey)] size = %d, want %d", len(sws), watcherN)
194 }
195
196
197
198 if len(uws) != 0 {
199 t.Errorf("unsynced size = %d, want 0", len(uws))
200 }
201
202 for w := range sws {
203 if w.minRev != s.Rev()+1 {
204 t.Errorf("w.minRev = %d, want %d", w.minRev, s.Rev()+1)
205 }
206 }
207
208 if len(w.(*watchStream).ch) != watcherN {
209 t.Errorf("watched event size = %d, want %d", len(w.(*watchStream).ch), watcherN)
210 }
211
212 evs := (<-w.(*watchStream).ch).Events
213 if len(evs) != 1 {
214 t.Errorf("len(evs) got = %d, want = 1", len(evs))
215 }
216 if evs[0].Type != mvccpb.PUT {
217 t.Errorf("got = %v, want = %v", evs[0].Type, mvccpb.PUT)
218 }
219 if !bytes.Equal(evs[0].Kv.Key, testKey) {
220 t.Errorf("got = %s, want = %s", evs[0].Kv.Key, testKey)
221 }
222 if !bytes.Equal(evs[0].Kv.Value, testValue) {
223 t.Errorf("got = %s, want = %s", evs[0].Kv.Value, testValue)
224 }
225 }
226
227
228 func TestWatchCompacted(t *testing.T) {
229 b, tmpPath := betesting.NewDefaultTmpBackend(t)
230 s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
231
232 defer func() {
233 s.store.Close()
234 os.Remove(tmpPath)
235 }()
236 testKey := []byte("foo")
237 testValue := []byte("bar")
238
239 maxRev := 10
240 compactRev := int64(5)
241 for i := 0; i < maxRev; i++ {
242 s.Put(testKey, testValue, lease.NoLease)
243 }
244 _, err := s.Compact(traceutil.TODO(), compactRev)
245 if err != nil {
246 t.Fatalf("failed to compact kv (%v)", err)
247 }
248
249 w := s.NewWatchStream()
250 wt, _ := w.Watch(0, testKey, nil, compactRev-1)
251
252 select {
253 case resp := <-w.Chan():
254 if resp.WatchID != wt {
255 t.Errorf("resp.WatchID = %x, want %x", resp.WatchID, wt)
256 }
257 if resp.CompactRevision == 0 {
258 t.Errorf("resp.Compacted = %v, want %v", resp.CompactRevision, compactRev)
259 }
260 case <-time.After(1 * time.Second):
261 t.Fatalf("failed to receive response (timeout)")
262 }
263 }
264
265 func TestWatchNoEventLossOnCompact(t *testing.T) {
266 oldChanBufLen, oldMaxWatchersPerSync := chanBufLen, maxWatchersPerSync
267 b, tmpPath := betesting.NewDefaultTmpBackend(t)
268 lg := zaptest.NewLogger(t)
269 s := newWatchableStore(lg, b, &lease.FakeLessor{}, StoreConfig{})
270
271 defer func() {
272 cleanup(s, b, tmpPath)
273 chanBufLen, maxWatchersPerSync = oldChanBufLen, oldMaxWatchersPerSync
274 }()
275
276 chanBufLen, maxWatchersPerSync = 1, 4
277 testKey, testValue := []byte("foo"), []byte("bar")
278
279 maxRev := 10
280 compactRev := int64(5)
281 for i := 0; i < maxRev; i++ {
282 s.Put(testKey, testValue, lease.NoLease)
283 }
284 _, err := s.Compact(traceutil.TODO(), compactRev)
285 require.NoErrorf(t, err, "failed to compact kv (%v)", err)
286
287 w := s.NewWatchStream()
288 defer w.Close()
289
290 watchers := map[WatchID]int64{
291 0: 1,
292 1: 1,
293 2: 6,
294 }
295 for id, startRev := range watchers {
296 _, err := w.Watch(id, testKey, nil, startRev)
297 require.NoError(t, err)
298 }
299
300 s.syncWatchers()
301
302 for len(watchers) > 0 {
303 resp := <-w.Chan()
304 if resp.CompactRevision != 0 {
305 require.Equal(t, resp.CompactRevision, compactRev)
306 require.Contains(t, watchers, resp.WatchID)
307 delete(watchers, resp.WatchID)
308 continue
309 }
310 nextRev := watchers[resp.WatchID]
311 for _, ev := range resp.Events {
312 require.Equalf(t, nextRev, ev.Kv.ModRevision, "got event revision %d but want %d for watcher with watch ID %d", ev.Kv.ModRevision, nextRev, resp.WatchID)
313 nextRev++
314 }
315 if nextRev == s.rev()+1 {
316 delete(watchers, resp.WatchID)
317 }
318 }
319 }
320
321 func TestWatchFutureRev(t *testing.T) {
322 b, tmpPath := betesting.NewDefaultTmpBackend(t)
323 s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
324
325 defer func() {
326 s.store.Close()
327 os.Remove(tmpPath)
328 }()
329
330 testKey := []byte("foo")
331 testValue := []byte("bar")
332
333 w := s.NewWatchStream()
334 wrev := int64(10)
335 w.Watch(0, testKey, nil, wrev)
336
337 for i := 0; i < 10; i++ {
338 rev := s.Put(testKey, testValue, lease.NoLease)
339 if rev >= wrev {
340 break
341 }
342 }
343
344 select {
345 case resp := <-w.Chan():
346 if resp.Revision != wrev {
347 t.Fatalf("rev = %d, want %d", resp.Revision, wrev)
348 }
349 if len(resp.Events) != 1 {
350 t.Fatalf("failed to get events from the response")
351 }
352 if resp.Events[0].Kv.ModRevision != wrev {
353 t.Fatalf("kv.rev = %d, want %d", resp.Events[0].Kv.ModRevision, wrev)
354 }
355 case <-time.After(time.Second):
356 t.Fatal("failed to receive event in 1 second.")
357 }
358 }
359
360 func TestWatchRestore(t *testing.T) {
361 test := func(delay time.Duration) func(t *testing.T) {
362 return func(t *testing.T) {
363 b, tmpPath := betesting.NewDefaultTmpBackend(t)
364 s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
365 defer cleanup(s, b, tmpPath)
366
367 testKey := []byte("foo")
368 testValue := []byte("bar")
369 w := s.NewWatchStream()
370 defer w.Close()
371 w.Watch(0, testKey, nil, 1)
372
373 time.Sleep(delay)
374 wantRev := s.Put(testKey, testValue, lease.NoLease)
375
376 s.Restore(b)
377 events := readEventsForSecond(w.Chan())
378 if len(events) != 1 {
379 t.Errorf("Expected only one event, got %d", len(events))
380 }
381 if events[0].Kv.ModRevision != wantRev {
382 t.Errorf("Expected revision to match, got %d, want %d", events[0].Kv.ModRevision, wantRev)
383 }
384
385 }
386 }
387
388 t.Run("Normal", test(0))
389 t.Run("RunSyncWatchLoopBeforeRestore", test(time.Millisecond*120))
390 }
391
392 func readEventsForSecond(ws <-chan WatchResponse) (events []mvccpb.Event) {
393 for {
394 select {
395 case resp := <-ws:
396 events = append(events, resp.Events...)
397 case <-time.After(time.Second):
398 return events
399 }
400 }
401 }
402
403
404
405
406
407
408
409 func TestWatchRestoreSyncedWatcher(t *testing.T) {
410 b1, b1Path := betesting.NewDefaultTmpBackend(t)
411 s1 := newWatchableStore(zap.NewExample(), b1, &lease.FakeLessor{}, StoreConfig{})
412 defer cleanup(s1, b1, b1Path)
413
414 b2, b2Path := betesting.NewDefaultTmpBackend(t)
415 s2 := newWatchableStore(zap.NewExample(), b2, &lease.FakeLessor{}, StoreConfig{})
416 defer cleanup(s2, b2, b2Path)
417
418 testKey, testValue := []byte("foo"), []byte("bar")
419 rev := s1.Put(testKey, testValue, lease.NoLease)
420 startRev := rev + 2
421
422
423
424 w1 := s1.NewWatchStream()
425 w1.Watch(0, testKey, nil, startRev)
426
427
428 s2.Put(testKey, testValue, lease.NoLease)
429 s2.Put(testKey, testValue, lease.NoLease)
430
431
432 if err := s1.Restore(b2); err != nil {
433 t.Fatal(err)
434 }
435
436
437
438 time.Sleep(2 * time.Second)
439
440
441 s1.Put(testKey, testValue, lease.NoLease)
442
443 select {
444 case resp := <-w1.Chan():
445 if resp.Revision != startRev {
446 t.Fatalf("resp.Revision expect %d, got %d", startRev, resp.Revision)
447 }
448 if len(resp.Events) != 1 {
449 t.Fatalf("len(resp.Events) expect 1, got %d", len(resp.Events))
450 }
451 if resp.Events[0].Kv.ModRevision != startRev {
452 t.Fatalf("resp.Events[0].Kv.ModRevision expect %d, got %d", startRev, resp.Events[0].Kv.ModRevision)
453 }
454 case <-time.After(time.Second):
455 t.Fatal("failed to receive event in 1 second")
456 }
457 }
458
459
460 func TestWatchBatchUnsynced(t *testing.T) {
461 b, tmpPath := betesting.NewDefaultTmpBackend(t)
462 s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
463
464 oldMaxRevs := watchBatchMaxRevs
465 defer func() {
466 watchBatchMaxRevs = oldMaxRevs
467 s.store.Close()
468 os.Remove(tmpPath)
469 }()
470 batches := 3
471 watchBatchMaxRevs = 4
472
473 v := []byte("foo")
474 for i := 0; i < watchBatchMaxRevs*batches; i++ {
475 s.Put(v, v, lease.NoLease)
476 }
477
478 w := s.NewWatchStream()
479 w.Watch(0, v, nil, 1)
480 for i := 0; i < batches; i++ {
481 if resp := <-w.Chan(); len(resp.Events) != watchBatchMaxRevs {
482 t.Fatalf("len(events) = %d, want %d", len(resp.Events), watchBatchMaxRevs)
483 }
484 }
485
486 s.store.revMu.Lock()
487 defer s.store.revMu.Unlock()
488 if size := s.synced.size(); size != 1 {
489 t.Errorf("synced size = %d, want 1", size)
490 }
491 }
492
493 func TestNewMapwatcherToEventMap(t *testing.T) {
494 k0, k1, k2 := []byte("foo0"), []byte("foo1"), []byte("foo2")
495 v0, v1, v2 := []byte("bar0"), []byte("bar1"), []byte("bar2")
496
497 ws := []*watcher{{key: k0}, {key: k1}, {key: k2}}
498
499 evs := []mvccpb.Event{
500 {
501 Type: mvccpb.PUT,
502 Kv: &mvccpb.KeyValue{Key: k0, Value: v0},
503 },
504 {
505 Type: mvccpb.PUT,
506 Kv: &mvccpb.KeyValue{Key: k1, Value: v1},
507 },
508 {
509 Type: mvccpb.PUT,
510 Kv: &mvccpb.KeyValue{Key: k2, Value: v2},
511 },
512 }
513
514 tests := []struct {
515 sync []*watcher
516 evs []mvccpb.Event
517
518 wwe map[*watcher][]mvccpb.Event
519 }{
520
521 {
522 nil,
523 evs,
524 map[*watcher][]mvccpb.Event{},
525 },
526
527
528
529 {
530 []*watcher{ws[2]},
531 evs[:1],
532 map[*watcher][]mvccpb.Event{},
533 },
534
535
536
537 {
538 []*watcher{ws[1]},
539 evs[1:2],
540 map[*watcher][]mvccpb.Event{
541 ws[1]: evs[1:2],
542 },
543 },
544
545
546
547
548 {
549 []*watcher{ws[0], ws[2]},
550 evs[2:],
551 map[*watcher][]mvccpb.Event{
552 ws[2]: evs[2:],
553 },
554 },
555
556
557
558 {
559 []*watcher{ws[0], ws[1]},
560 evs[:2],
561 map[*watcher][]mvccpb.Event{
562 ws[0]: evs[:1],
563 ws[1]: evs[1:2],
564 },
565 },
566 }
567
568 for i, tt := range tests {
569 wg := newWatcherGroup()
570 for _, w := range tt.sync {
571 wg.add(w)
572 }
573
574 gwe := newWatcherBatch(&wg, tt.evs)
575 if len(gwe) != len(tt.wwe) {
576 t.Errorf("#%d: len(gwe) got = %d, want = %d", i, len(gwe), len(tt.wwe))
577 }
578
579 for w, eb := range gwe {
580 if len(eb.evs) != len(tt.wwe[w]) {
581 t.Errorf("#%d: len(eb.evs) got = %d, want = %d", i, len(eb.evs), len(tt.wwe[w]))
582 }
583 if !reflect.DeepEqual(eb.evs, tt.wwe[w]) {
584 t.Errorf("#%d: reflect.DeepEqual events got = %v, want = true", i, false)
585 }
586 }
587 }
588 }
589
590
591
592 func TestWatchVictims(t *testing.T) {
593 oldChanBufLen, oldMaxWatchersPerSync := chanBufLen, maxWatchersPerSync
594
595 b, tmpPath := betesting.NewDefaultTmpBackend(t)
596 s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
597
598 defer func() {
599 s.store.Close()
600 os.Remove(tmpPath)
601 chanBufLen, maxWatchersPerSync = oldChanBufLen, oldMaxWatchersPerSync
602 }()
603
604 chanBufLen, maxWatchersPerSync = 1, 2
605 numPuts := chanBufLen * 64
606 testKey, testValue := []byte("foo"), []byte("bar")
607
608 var wg sync.WaitGroup
609 numWatches := maxWatchersPerSync * 128
610 errc := make(chan error, numWatches)
611 wg.Add(numWatches)
612 for i := 0; i < numWatches; i++ {
613 go func() {
614 w := s.NewWatchStream()
615 w.Watch(0, testKey, nil, 1)
616 defer func() {
617 w.Close()
618 wg.Done()
619 }()
620 tc := time.After(10 * time.Second)
621 evs, nextRev := 0, int64(2)
622 for evs < numPuts {
623 select {
624 case <-tc:
625 errc <- fmt.Errorf("time out")
626 return
627 case wr := <-w.Chan():
628 evs += len(wr.Events)
629 for _, ev := range wr.Events {
630 if ev.Kv.ModRevision != nextRev {
631 errc <- fmt.Errorf("expected rev=%d, got %d", nextRev, ev.Kv.ModRevision)
632 return
633 }
634 nextRev++
635 }
636 time.Sleep(time.Millisecond)
637 }
638 }
639 if evs != numPuts {
640 errc <- fmt.Errorf("expected %d events, got %d", numPuts, evs)
641 return
642 }
643 select {
644 case <-w.Chan():
645 errc <- fmt.Errorf("unexpected response")
646 default:
647 }
648 }()
649 time.Sleep(time.Millisecond)
650 }
651
652 var wgPut sync.WaitGroup
653 wgPut.Add(numPuts)
654 for i := 0; i < numPuts; i++ {
655 go func() {
656 defer wgPut.Done()
657 s.Put(testKey, testValue, lease.NoLease)
658 }()
659 }
660 wgPut.Wait()
661
662 wg.Wait()
663 select {
664 case err := <-errc:
665 t.Fatal(err)
666 default:
667 }
668 }
669
670
671
672 func TestStressWatchCancelClose(t *testing.T) {
673 b, tmpPath := betesting.NewDefaultTmpBackend(t)
674 s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
675
676 defer func() {
677 s.store.Close()
678 os.Remove(tmpPath)
679 }()
680
681 testKey, testValue := []byte("foo"), []byte("bar")
682 var wg sync.WaitGroup
683 readyc := make(chan struct{})
684 wg.Add(100)
685 for i := 0; i < 100; i++ {
686 go func() {
687 defer wg.Done()
688 w := s.NewWatchStream()
689 ids := make([]WatchID, 10)
690 for i := range ids {
691 ids[i], _ = w.Watch(0, testKey, nil, 0)
692 }
693 <-readyc
694 wg.Add(1 + len(ids)/2)
695 for i := range ids[:len(ids)/2] {
696 go func(n int) {
697 defer wg.Done()
698 w.Cancel(ids[n])
699 }(i)
700 }
701 go func() {
702 defer wg.Done()
703 w.Close()
704 }()
705 }()
706 }
707
708 close(readyc)
709 for i := 0; i < 100; i++ {
710 s.Put(testKey, testValue, lease.NoLease)
711 }
712
713 wg.Wait()
714 }
715
View as plain text