1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package mvcc
16
17 import (
18 "bytes"
19 "fmt"
20 "os"
21 "reflect"
22 "testing"
23 "time"
24
25 "go.uber.org/zap"
26 "go.uber.org/zap/zaptest"
27
28 "go.etcd.io/etcd/api/v3/mvccpb"
29 clientv3 "go.etcd.io/etcd/client/v3"
30 "go.etcd.io/etcd/server/v3/lease"
31 betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing"
32 )
33
34
35
36 func TestWatcherWatchID(t *testing.T) {
37 b, tmpPath := betesting.NewDefaultTmpBackend(t)
38 s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}))
39 defer cleanup(s, b, tmpPath)
40
41 w := s.NewWatchStream()
42 defer w.Close()
43
44 idm := make(map[WatchID]struct{})
45
46 for i := 0; i < 10; i++ {
47 id, _ := w.Watch(0, []byte("foo"), nil, 0)
48 if _, ok := idm[id]; ok {
49 t.Errorf("#%d: id %d exists", i, id)
50 }
51 idm[id] = struct{}{}
52
53 s.Put([]byte("foo"), []byte("bar"), lease.NoLease)
54
55 resp := <-w.Chan()
56 if resp.WatchID != id {
57 t.Errorf("#%d: watch id in event = %d, want %d", i, resp.WatchID, id)
58 }
59
60 if err := w.Cancel(id); err != nil {
61 t.Error(err)
62 }
63 }
64
65 s.Put([]byte("foo2"), []byte("bar"), lease.NoLease)
66
67
68 for i := 10; i < 20; i++ {
69 id, _ := w.Watch(0, []byte("foo2"), nil, 1)
70 if _, ok := idm[id]; ok {
71 t.Errorf("#%d: id %d exists", i, id)
72 }
73 idm[id] = struct{}{}
74
75 resp := <-w.Chan()
76 if resp.WatchID != id {
77 t.Errorf("#%d: watch id in event = %d, want %d", i, resp.WatchID, id)
78 }
79
80 if err := w.Cancel(id); err != nil {
81 t.Error(err)
82 }
83 }
84 }
85
86 func TestWatcherRequestsCustomID(t *testing.T) {
87 b, tmpPath := betesting.NewDefaultTmpBackend(t)
88 s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}))
89 defer cleanup(s, b, tmpPath)
90
91 w := s.NewWatchStream()
92 defer w.Close()
93
94
95
96
97
98 tt := []struct {
99 givenID WatchID
100 expectedID WatchID
101 expectedErr error
102 }{
103 {1, 1, nil},
104 {1, 0, ErrWatcherDuplicateID},
105 {0, 0, nil},
106 {0, 2, nil},
107 }
108
109 for i, tcase := range tt {
110 id, err := w.Watch(tcase.givenID, []byte("foo"), nil, 0)
111 if tcase.expectedErr != nil || err != nil {
112 if err != tcase.expectedErr {
113 t.Errorf("expected get error %q in test case %q, got %q", tcase.expectedErr, i, err)
114 }
115 } else if tcase.expectedID != id {
116 t.Errorf("expected to create ID %d, got %d in test case %d", tcase.expectedID, id, i)
117 }
118 }
119 }
120
121
122
123 func TestWatcherWatchPrefix(t *testing.T) {
124 b, tmpPath := betesting.NewDefaultTmpBackend(t)
125 s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}))
126 defer cleanup(s, b, tmpPath)
127
128 w := s.NewWatchStream()
129 defer w.Close()
130
131 idm := make(map[WatchID]struct{})
132
133 val := []byte("bar")
134 keyWatch, keyEnd, keyPut := []byte("foo"), []byte("fop"), []byte("foobar")
135
136 for i := 0; i < 10; i++ {
137 id, _ := w.Watch(0, keyWatch, keyEnd, 0)
138 if _, ok := idm[id]; ok {
139 t.Errorf("#%d: unexpected duplicated id %x", i, id)
140 }
141 idm[id] = struct{}{}
142
143 s.Put(keyPut, val, lease.NoLease)
144
145 resp := <-w.Chan()
146 if resp.WatchID != id {
147 t.Errorf("#%d: watch id in event = %d, want %d", i, resp.WatchID, id)
148 }
149
150 if err := w.Cancel(id); err != nil {
151 t.Errorf("#%d: unexpected cancel error %v", i, err)
152 }
153
154 if len(resp.Events) != 1 {
155 t.Errorf("#%d: len(resp.Events) got = %d, want = 1", i, len(resp.Events))
156 }
157 if len(resp.Events) == 1 {
158 if !bytes.Equal(resp.Events[0].Kv.Key, keyPut) {
159 t.Errorf("#%d: resp.Events got = %s, want = %s", i, resp.Events[0].Kv.Key, keyPut)
160 }
161 }
162 }
163
164 keyWatch1, keyEnd1, keyPut1 := []byte("foo1"), []byte("foo2"), []byte("foo1bar")
165 s.Put(keyPut1, val, lease.NoLease)
166
167
168 for i := 10; i < 15; i++ {
169 id, _ := w.Watch(0, keyWatch1, keyEnd1, 1)
170 if _, ok := idm[id]; ok {
171 t.Errorf("#%d: id %d exists", i, id)
172 }
173 idm[id] = struct{}{}
174
175 resp := <-w.Chan()
176 if resp.WatchID != id {
177 t.Errorf("#%d: watch id in event = %d, want %d", i, resp.WatchID, id)
178 }
179
180 if err := w.Cancel(id); err != nil {
181 t.Error(err)
182 }
183
184 if len(resp.Events) != 1 {
185 t.Errorf("#%d: len(resp.Events) got = %d, want = 1", i, len(resp.Events))
186 }
187 if len(resp.Events) == 1 {
188 if !bytes.Equal(resp.Events[0].Kv.Key, keyPut1) {
189 t.Errorf("#%d: resp.Events got = %s, want = %s", i, resp.Events[0].Kv.Key, keyPut1)
190 }
191 }
192 }
193 }
194
195
196
197 func TestWatcherWatchWrongRange(t *testing.T) {
198 b, tmpPath := betesting.NewDefaultTmpBackend(t)
199 s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}))
200 defer cleanup(s, b, tmpPath)
201
202 w := s.NewWatchStream()
203 defer w.Close()
204
205 if _, err := w.Watch(0, []byte("foa"), []byte("foa"), 1); err != ErrEmptyWatcherRange {
206 t.Fatalf("key == end range given; expected ErrEmptyWatcherRange, got %+v", err)
207 }
208 if _, err := w.Watch(0, []byte("fob"), []byte("foa"), 1); err != ErrEmptyWatcherRange {
209 t.Fatalf("key > end range given; expected ErrEmptyWatcherRange, got %+v", err)
210 }
211
212 if id, _ := w.Watch(0, []byte("foo"), []byte{}, 1); id != 0 {
213 t.Fatalf("\x00 is range given; id expected 0, got %d", id)
214 }
215 }
216
217 func TestWatchDeleteRange(t *testing.T) {
218 b, tmpPath := betesting.NewDefaultTmpBackend(t)
219 s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
220
221 defer func() {
222 s.store.Close()
223 os.Remove(tmpPath)
224 }()
225
226 testKeyPrefix := []byte("foo")
227
228 for i := 0; i < 3; i++ {
229 s.Put([]byte(fmt.Sprintf("%s_%d", testKeyPrefix, i)), []byte("bar"), lease.NoLease)
230 }
231
232 w := s.NewWatchStream()
233 from, to := testKeyPrefix, []byte(fmt.Sprintf("%s_%d", testKeyPrefix, 99))
234 w.Watch(0, from, to, 0)
235
236 s.DeleteRange(from, to)
237
238 we := []mvccpb.Event{
239 {Type: mvccpb.DELETE, Kv: &mvccpb.KeyValue{Key: []byte("foo_0"), ModRevision: 5}},
240 {Type: mvccpb.DELETE, Kv: &mvccpb.KeyValue{Key: []byte("foo_1"), ModRevision: 5}},
241 {Type: mvccpb.DELETE, Kv: &mvccpb.KeyValue{Key: []byte("foo_2"), ModRevision: 5}},
242 }
243
244 select {
245 case r := <-w.Chan():
246 if !reflect.DeepEqual(r.Events, we) {
247 t.Errorf("event = %v, want %v", r.Events, we)
248 }
249 case <-time.After(10 * time.Second):
250 t.Fatal("failed to receive event after 10 seconds!")
251 }
252 }
253
254
255
256 func TestWatchStreamCancelWatcherByID(t *testing.T) {
257 b, tmpPath := betesting.NewDefaultTmpBackend(t)
258 s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}))
259 defer cleanup(s, b, tmpPath)
260
261 w := s.NewWatchStream()
262 defer w.Close()
263
264 id, _ := w.Watch(0, []byte("foo"), nil, 0)
265
266 tests := []struct {
267 cancelID WatchID
268 werr error
269 }{
270
271 {id, nil},
272
273 {id, ErrWatcherNotExist},
274
275 {id + 1, ErrWatcherNotExist},
276 }
277
278 for i, tt := range tests {
279 gerr := w.Cancel(tt.cancelID)
280
281 if gerr != tt.werr {
282 t.Errorf("#%d: err = %v, want %v", i, gerr, tt.werr)
283 }
284 }
285
286 if l := len(w.(*watchStream).cancels); l != 0 {
287 t.Errorf("cancels = %d, want 0", l)
288 }
289 }
290
291
292
293 func TestWatcherRequestProgress(t *testing.T) {
294 b, tmpPath := betesting.NewDefaultTmpBackend(t)
295
296
297
298
299
300 s := &watchableStore{
301 store: NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}),
302 unsynced: newWatcherGroup(),
303 synced: newWatcherGroup(),
304 }
305
306 defer func() {
307 s.store.Close()
308 os.Remove(tmpPath)
309 }()
310
311 testKey := []byte("foo")
312 notTestKey := []byte("bad")
313 testValue := []byte("bar")
314 s.Put(testKey, testValue, lease.NoLease)
315
316 w := s.NewWatchStream()
317
318 badID := WatchID(1000)
319 w.RequestProgress(badID)
320 select {
321 case resp := <-w.Chan():
322 t.Fatalf("unexpected %+v", resp)
323 default:
324 }
325
326 id, _ := w.Watch(0, notTestKey, nil, 1)
327 w.RequestProgress(id)
328 select {
329 case resp := <-w.Chan():
330 t.Fatalf("unexpected %+v", resp)
331 default:
332 }
333
334 s.syncWatchers()
335
336 w.RequestProgress(id)
337 wrs := WatchResponse{WatchID: id, Revision: 2}
338 select {
339 case resp := <-w.Chan():
340 if !reflect.DeepEqual(resp, wrs) {
341 t.Fatalf("got %+v, expect %+v", resp, wrs)
342 }
343 case <-time.After(time.Second):
344 t.Fatal("failed to receive progress")
345 }
346 }
347
348 func TestWatcherRequestProgressAll(t *testing.T) {
349 b, tmpPath := betesting.NewDefaultTmpBackend(t)
350
351
352
353
354
355 s := &watchableStore{
356 store: NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}),
357 unsynced: newWatcherGroup(),
358 synced: newWatcherGroup(),
359 stopc: make(chan struct{}),
360 }
361
362 defer func() {
363 s.store.Close()
364 os.Remove(tmpPath)
365 }()
366
367 testKey := []byte("foo")
368 notTestKey := []byte("bad")
369 testValue := []byte("bar")
370 s.Put(testKey, testValue, lease.NoLease)
371
372
373
374
375
376 w := s.NewWatchStream()
377 w.Watch(0, notTestKey, nil, 1)
378
379 w.RequestProgressAll()
380 select {
381 case resp := <-w.Chan():
382 t.Fatalf("unexpected %+v", resp)
383 default:
384 }
385
386 s.syncWatchers()
387
388 w.RequestProgressAll()
389 wrs := WatchResponse{WatchID: clientv3.InvalidWatchID, Revision: 2}
390 select {
391 case resp := <-w.Chan():
392 if !reflect.DeepEqual(resp, wrs) {
393 t.Fatalf("got %+v, expect %+v", resp, wrs)
394 }
395 case <-time.After(time.Second):
396 t.Fatal("failed to receive progress")
397 }
398 }
399
400 func TestWatcherWatchWithFilter(t *testing.T) {
401 b, tmpPath := betesting.NewDefaultTmpBackend(t)
402 s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}))
403 defer cleanup(s, b, tmpPath)
404
405 w := s.NewWatchStream()
406 defer w.Close()
407
408 filterPut := func(e mvccpb.Event) bool {
409 return e.Type == mvccpb.PUT
410 }
411
412 w.Watch(0, []byte("foo"), nil, 0, filterPut)
413 done := make(chan struct{}, 1)
414
415 go func() {
416 <-w.Chan()
417 done <- struct{}{}
418 }()
419
420 s.Put([]byte("foo"), []byte("bar"), 0)
421
422 select {
423 case <-done:
424 t.Fatal("failed to filter put request")
425 case <-time.After(100 * time.Millisecond):
426 }
427
428 s.DeleteRange([]byte("foo"), nil)
429
430 select {
431 case <-done:
432 case <-time.After(100 * time.Millisecond):
433 t.Fatal("failed to receive delete request")
434 }
435 }
436
View as plain text