1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package mvcc
16
17 import (
18 "context"
19 "fmt"
20 "os"
21 "reflect"
22 "testing"
23 "time"
24
25 "go.etcd.io/etcd/api/v3/mvccpb"
26 "go.etcd.io/etcd/client/pkg/v3/testutil"
27 "go.etcd.io/etcd/pkg/v3/traceutil"
28 "go.etcd.io/etcd/server/v3/lease"
29 "go.etcd.io/etcd/server/v3/mvcc/backend"
30 betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing"
31
32 "github.com/prometheus/client_golang/prometheus"
33 dto "github.com/prometheus/client_model/go"
34 "go.uber.org/zap"
35 )
36
37
38
39
40
41
42 type (
43 rangeFunc func(kv KV, key, end []byte, ro RangeOptions) (*RangeResult, error)
44 putFunc func(kv KV, key, value []byte, lease lease.LeaseID) int64
45 deleteRangeFunc func(kv KV, key, end []byte) (n, rev int64)
46 )
47
48 var (
49 normalRangeFunc = func(kv KV, key, end []byte, ro RangeOptions) (*RangeResult, error) {
50 return kv.Range(context.TODO(), key, end, ro)
51 }
52 txnRangeFunc = func(kv KV, key, end []byte, ro RangeOptions) (*RangeResult, error) {
53 txn := kv.Read(ConcurrentReadTxMode, traceutil.TODO())
54 defer txn.End()
55 return txn.Range(context.TODO(), key, end, ro)
56 }
57
58 normalPutFunc = func(kv KV, key, value []byte, lease lease.LeaseID) int64 {
59 return kv.Put(key, value, lease)
60 }
61 txnPutFunc = func(kv KV, key, value []byte, lease lease.LeaseID) int64 {
62 txn := kv.Write(traceutil.TODO())
63 defer txn.End()
64 return txn.Put(key, value, lease)
65 }
66
67 normalDeleteRangeFunc = func(kv KV, key, end []byte) (n, rev int64) {
68 return kv.DeleteRange(key, end)
69 }
70 txnDeleteRangeFunc = func(kv KV, key, end []byte) (n, rev int64) {
71 txn := kv.Write(traceutil.TODO())
72 defer txn.End()
73 return txn.DeleteRange(key, end)
74 }
75 )
76
77 func TestKVRange(t *testing.T) { testKVRange(t, normalRangeFunc) }
78 func TestKVTxnRange(t *testing.T) { testKVRange(t, txnRangeFunc) }
79
80 func testKVRange(t *testing.T, f rangeFunc) {
81 b, tmpPath := betesting.NewDefaultTmpBackend(t)
82 s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
83 defer cleanup(s, b, tmpPath)
84
85 kvs := put3TestKVs(s)
86
87 wrev := int64(4)
88 tests := []struct {
89 key, end []byte
90 wkvs []mvccpb.KeyValue
91 }{
92
93 {
94 []byte("doo"), []byte("foo"),
95 nil,
96 },
97
98 {
99 []byte("foo"), []byte("foo"),
100 nil,
101 },
102
103 {
104 []byte("doo"), nil,
105 nil,
106 },
107
108 {
109 []byte("foo"), []byte("foo3"),
110 kvs,
111 },
112
113 {
114 []byte("foo"), []byte("foo1"),
115 kvs[:1],
116 },
117
118 {
119 []byte("foo"), nil,
120 kvs[:1],
121 },
122
123 {
124 []byte(""), []byte(""),
125 kvs,
126 },
127 }
128
129 for i, tt := range tests {
130 r, err := f(s, tt.key, tt.end, RangeOptions{})
131 if err != nil {
132 t.Fatal(err)
133 }
134 if r.Rev != wrev {
135 t.Errorf("#%d: rev = %d, want %d", i, r.Rev, wrev)
136 }
137 if !reflect.DeepEqual(r.KVs, tt.wkvs) {
138 t.Errorf("#%d: kvs = %+v, want %+v", i, r.KVs, tt.wkvs)
139 }
140 }
141 }
142
143 func TestKVRangeRev(t *testing.T) { testKVRangeRev(t, normalRangeFunc) }
144 func TestKVTxnRangeRev(t *testing.T) { testKVRangeRev(t, txnRangeFunc) }
145
146 func testKVRangeRev(t *testing.T, f rangeFunc) {
147 b, tmpPath := betesting.NewDefaultTmpBackend(t)
148 s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
149 defer cleanup(s, b, tmpPath)
150
151 kvs := put3TestKVs(s)
152
153 tests := []struct {
154 rev int64
155 wrev int64
156 wkvs []mvccpb.KeyValue
157 }{
158 {-1, 4, kvs},
159 {0, 4, kvs},
160 {2, 4, kvs[:1]},
161 {3, 4, kvs[:2]},
162 {4, 4, kvs},
163 }
164
165 for i, tt := range tests {
166 r, err := f(s, []byte("foo"), []byte("foo3"), RangeOptions{Rev: tt.rev})
167 if err != nil {
168 t.Fatal(err)
169 }
170 if r.Rev != tt.wrev {
171 t.Errorf("#%d: rev = %d, want %d", i, r.Rev, tt.wrev)
172 }
173 if !reflect.DeepEqual(r.KVs, tt.wkvs) {
174 t.Errorf("#%d: kvs = %+v, want %+v", i, r.KVs, tt.wkvs)
175 }
176 }
177 }
178
179 func TestKVRangeBadRev(t *testing.T) { testKVRangeBadRev(t, normalRangeFunc) }
180 func TestKVTxnRangeBadRev(t *testing.T) { testKVRangeBadRev(t, txnRangeFunc) }
181
182 func testKVRangeBadRev(t *testing.T, f rangeFunc) {
183 b, tmpPath := betesting.NewDefaultTmpBackend(t)
184 s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
185 defer cleanup(s, b, tmpPath)
186
187 put3TestKVs(s)
188 if _, err := s.Compact(traceutil.TODO(), 4); err != nil {
189 t.Fatalf("compact error (%v)", err)
190 }
191
192 tests := []struct {
193 rev int64
194 werr error
195 }{
196 {-1, nil},
197 {0, nil},
198 {1, ErrCompacted},
199 {2, ErrCompacted},
200 {4, nil},
201 {5, ErrFutureRev},
202 {100, ErrFutureRev},
203 }
204 for i, tt := range tests {
205 _, err := f(s, []byte("foo"), []byte("foo3"), RangeOptions{Rev: tt.rev})
206 if err != tt.werr {
207 t.Errorf("#%d: error = %v, want %v", i, err, tt.werr)
208 }
209 }
210 }
211
212 func TestKVRangeLimit(t *testing.T) { testKVRangeLimit(t, normalRangeFunc) }
213 func TestKVTxnRangeLimit(t *testing.T) { testKVRangeLimit(t, txnRangeFunc) }
214
215 func testKVRangeLimit(t *testing.T, f rangeFunc) {
216 b, tmpPath := betesting.NewDefaultTmpBackend(t)
217 s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
218 defer cleanup(s, b, tmpPath)
219
220 kvs := put3TestKVs(s)
221
222 wrev := int64(4)
223 tests := []struct {
224 limit int64
225 wcounts int64
226 wkvs []mvccpb.KeyValue
227 }{
228
229 {-1, 3, kvs},
230
231 {0, 3, kvs},
232 {1, 3, kvs[:1]},
233 {2, 3, kvs[:2]},
234 {3, 3, kvs},
235 {100, 3, kvs},
236 }
237 for i, tt := range tests {
238 r, err := f(s, []byte("foo"), []byte("foo3"), RangeOptions{Limit: tt.limit})
239 if err != nil {
240 t.Fatalf("#%d: range error (%v)", i, err)
241 }
242 if !reflect.DeepEqual(r.KVs, tt.wkvs) {
243 t.Errorf("#%d: kvs = %+v, want %+v", i, r.KVs, tt.wkvs)
244 }
245 if r.Rev != wrev {
246 t.Errorf("#%d: rev = %d, want %d", i, r.Rev, wrev)
247 }
248 if tt.limit <= 0 || int(tt.limit) > len(kvs) {
249 if r.Count != len(kvs) {
250 t.Errorf("#%d: count = %d, want %d", i, r.Count, len(kvs))
251 }
252 } else if r.Count != int(tt.wcounts) {
253 t.Errorf("#%d: count = %d, want %d", i, r.Count, tt.limit)
254 }
255 }
256 }
257
258 func TestKVPutMultipleTimes(t *testing.T) { testKVPutMultipleTimes(t, normalPutFunc) }
259 func TestKVTxnPutMultipleTimes(t *testing.T) { testKVPutMultipleTimes(t, txnPutFunc) }
260
261 func testKVPutMultipleTimes(t *testing.T, f putFunc) {
262 b, tmpPath := betesting.NewDefaultTmpBackend(t)
263 s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
264 defer cleanup(s, b, tmpPath)
265
266 for i := 0; i < 10; i++ {
267 base := int64(i + 1)
268
269 rev := f(s, []byte("foo"), []byte("bar"), lease.LeaseID(base))
270 if rev != base+1 {
271 t.Errorf("#%d: rev = %d, want %d", i, rev, base+1)
272 }
273
274 r, err := s.Range(context.TODO(), []byte("foo"), nil, RangeOptions{})
275 if err != nil {
276 t.Fatal(err)
277 }
278 wkvs := []mvccpb.KeyValue{
279 {Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 2, ModRevision: base + 1, Version: base, Lease: base},
280 }
281 if !reflect.DeepEqual(r.KVs, wkvs) {
282 t.Errorf("#%d: kvs = %+v, want %+v", i, r.KVs, wkvs)
283 }
284 }
285 }
286
287 func TestKVDeleteRange(t *testing.T) { testKVDeleteRange(t, normalDeleteRangeFunc) }
288 func TestKVTxnDeleteRange(t *testing.T) { testKVDeleteRange(t, txnDeleteRangeFunc) }
289
290 func testKVDeleteRange(t *testing.T, f deleteRangeFunc) {
291 tests := []struct {
292 key, end []byte
293
294 wrev int64
295 wN int64
296 }{
297 {
298 []byte("foo"), nil,
299 5, 1,
300 },
301 {
302 []byte("foo"), []byte("foo1"),
303 5, 1,
304 },
305 {
306 []byte("foo"), []byte("foo2"),
307 5, 2,
308 },
309 {
310 []byte("foo"), []byte("foo3"),
311 5, 3,
312 },
313 {
314 []byte("foo3"), []byte("foo8"),
315 4, 0,
316 },
317 {
318 []byte("foo3"), nil,
319 4, 0,
320 },
321 }
322
323 for i, tt := range tests {
324 b, tmpPath := betesting.NewDefaultTmpBackend(t)
325 s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
326
327 s.Put([]byte("foo"), []byte("bar"), lease.NoLease)
328 s.Put([]byte("foo1"), []byte("bar1"), lease.NoLease)
329 s.Put([]byte("foo2"), []byte("bar2"), lease.NoLease)
330
331 n, rev := f(s, tt.key, tt.end)
332 if n != tt.wN || rev != tt.wrev {
333 t.Errorf("#%d: n = %d, rev = %d, want (%d, %d)", i, n, rev, tt.wN, tt.wrev)
334 }
335
336 cleanup(s, b, tmpPath)
337 }
338 }
339
340 func TestKVDeleteMultipleTimes(t *testing.T) { testKVDeleteMultipleTimes(t, normalDeleteRangeFunc) }
341 func TestKVTxnDeleteMultipleTimes(t *testing.T) { testKVDeleteMultipleTimes(t, txnDeleteRangeFunc) }
342
343 func testKVDeleteMultipleTimes(t *testing.T, f deleteRangeFunc) {
344 b, tmpPath := betesting.NewDefaultTmpBackend(t)
345 s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
346 defer cleanup(s, b, tmpPath)
347
348 s.Put([]byte("foo"), []byte("bar"), lease.NoLease)
349
350 n, rev := f(s, []byte("foo"), nil)
351 if n != 1 || rev != 3 {
352 t.Fatalf("n = %d, rev = %d, want (%d, %d)", n, rev, 1, 3)
353 }
354
355 for i := 0; i < 10; i++ {
356 n, rev := f(s, []byte("foo"), nil)
357 if n != 0 || rev != 3 {
358 t.Fatalf("#%d: n = %d, rev = %d, want (%d, %d)", i, n, rev, 0, 3)
359 }
360 }
361 }
362
363
364 func TestKVOperationInSequence(t *testing.T) {
365 b, tmpPath := betesting.NewDefaultTmpBackend(t)
366 s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
367 defer cleanup(s, b, tmpPath)
368
369 for i := 0; i < 10; i++ {
370 base := int64(i*2 + 1)
371
372
373 rev := s.Put([]byte("foo"), []byte("bar"), lease.NoLease)
374 if rev != base+1 {
375 t.Errorf("#%d: put rev = %d, want %d", i, rev, base+1)
376 }
377
378 r, err := s.Range(context.TODO(), []byte("foo"), nil, RangeOptions{Rev: base + 1})
379 if err != nil {
380 t.Fatal(err)
381 }
382 wkvs := []mvccpb.KeyValue{
383 {Key: []byte("foo"), Value: []byte("bar"), CreateRevision: base + 1, ModRevision: base + 1, Version: 1, Lease: int64(lease.NoLease)},
384 }
385 if !reflect.DeepEqual(r.KVs, wkvs) {
386 t.Errorf("#%d: kvs = %+v, want %+v", i, r.KVs, wkvs)
387 }
388 if r.Rev != base+1 {
389 t.Errorf("#%d: range rev = %d, want %d", i, rev, base+1)
390 }
391
392
393 n, rev := s.DeleteRange([]byte("foo"), nil)
394 if n != 1 || rev != base+2 {
395 t.Errorf("#%d: n = %d, rev = %d, want (%d, %d)", i, n, rev, 1, base+2)
396 }
397
398 r, err = s.Range(context.TODO(), []byte("foo"), nil, RangeOptions{Rev: base + 2})
399 if err != nil {
400 t.Fatal(err)
401 }
402 if r.KVs != nil {
403 t.Errorf("#%d: kvs = %+v, want %+v", i, r.KVs, nil)
404 }
405 if r.Rev != base+2 {
406 t.Errorf("#%d: range rev = %d, want %d", i, r.Rev, base+2)
407 }
408 }
409 }
410
411 func TestKVTxnBlockWriteOperations(t *testing.T) {
412 b, tmpPath := betesting.NewDefaultTmpBackend(t)
413 s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
414
415 tests := []func(){
416 func() { s.Put([]byte("foo"), nil, lease.NoLease) },
417 func() { s.DeleteRange([]byte("foo"), nil) },
418 }
419 for i, tt := range tests {
420 tf := tt
421 txn := s.Write(traceutil.TODO())
422 done := make(chan struct{}, 1)
423 go func() {
424 tf()
425 done <- struct{}{}
426 }()
427 select {
428 case <-done:
429 t.Fatalf("#%d: operation failed to be blocked", i)
430 case <-time.After(10 * time.Millisecond):
431 }
432
433 txn.End()
434 select {
435 case <-done:
436 case <-time.After(10 * time.Second):
437 testutil.FatalStack(t, fmt.Sprintf("#%d: operation failed to be unblocked", i))
438 }
439 }
440
441
442 cleanup(s, b, tmpPath)
443 }
444
445 func TestKVTxnNonBlockRange(t *testing.T) {
446 b, tmpPath := betesting.NewDefaultTmpBackend(t)
447 s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
448 defer cleanup(s, b, tmpPath)
449
450 txn := s.Write(traceutil.TODO())
451 defer txn.End()
452
453 donec := make(chan struct{})
454 go func() {
455 defer close(donec)
456 s.Range(context.TODO(), []byte("foo"), nil, RangeOptions{})
457 }()
458 select {
459 case <-donec:
460 case <-time.After(100 * time.Millisecond):
461 t.Fatalf("range operation blocked on write txn")
462 }
463 }
464
465
466 func TestKVTxnOperationInSequence(t *testing.T) {
467 b, tmpPath := betesting.NewDefaultTmpBackend(t)
468 s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
469 defer cleanup(s, b, tmpPath)
470
471 for i := 0; i < 10; i++ {
472 txn := s.Write(traceutil.TODO())
473 base := int64(i + 1)
474
475
476 rev := txn.Put([]byte("foo"), []byte("bar"), lease.NoLease)
477 if rev != base+1 {
478 t.Errorf("#%d: put rev = %d, want %d", i, rev, base+1)
479 }
480
481 r, err := txn.Range(context.TODO(), []byte("foo"), nil, RangeOptions{Rev: base + 1})
482 if err != nil {
483 t.Fatal(err)
484 }
485 wkvs := []mvccpb.KeyValue{
486 {Key: []byte("foo"), Value: []byte("bar"), CreateRevision: base + 1, ModRevision: base + 1, Version: 1, Lease: int64(lease.NoLease)},
487 }
488 if !reflect.DeepEqual(r.KVs, wkvs) {
489 t.Errorf("#%d: kvs = %+v, want %+v", i, r.KVs, wkvs)
490 }
491 if r.Rev != base+1 {
492 t.Errorf("#%d: range rev = %d, want %d", i, r.Rev, base+1)
493 }
494
495
496 n, rev := txn.DeleteRange([]byte("foo"), nil)
497 if n != 1 || rev != base+1 {
498 t.Errorf("#%d: n = %d, rev = %d, want (%d, %d)", i, n, rev, 1, base+1)
499 }
500
501 r, err = txn.Range(context.TODO(), []byte("foo"), nil, RangeOptions{Rev: base + 1})
502 if err != nil {
503 t.Errorf("#%d: range error (%v)", i, err)
504 }
505 if r.KVs != nil {
506 t.Errorf("#%d: kvs = %+v, want %+v", i, r.KVs, nil)
507 }
508 if r.Rev != base+1 {
509 t.Errorf("#%d: range rev = %d, want %d", i, r.Rev, base+1)
510 }
511
512 txn.End()
513 }
514 }
515
516 func TestKVCompactReserveLastValue(t *testing.T) {
517 b, tmpPath := betesting.NewDefaultTmpBackend(t)
518 s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
519 defer cleanup(s, b, tmpPath)
520
521 s.Put([]byte("foo"), []byte("bar0"), 1)
522 s.Put([]byte("foo"), []byte("bar1"), 2)
523 s.DeleteRange([]byte("foo"), nil)
524 s.Put([]byte("foo"), []byte("bar2"), 3)
525
526
527 tests := []struct {
528 rev int64
529
530 wkvs []mvccpb.KeyValue
531 }{
532 {
533 1,
534 []mvccpb.KeyValue{
535 {Key: []byte("foo"), Value: []byte("bar0"), CreateRevision: 2, ModRevision: 2, Version: 1, Lease: 1},
536 },
537 },
538 {
539 2,
540 []mvccpb.KeyValue{
541 {Key: []byte("foo"), Value: []byte("bar1"), CreateRevision: 2, ModRevision: 3, Version: 2, Lease: 2},
542 },
543 },
544 {
545 3,
546 nil,
547 },
548 {
549 4,
550 []mvccpb.KeyValue{
551 {Key: []byte("foo"), Value: []byte("bar2"), CreateRevision: 5, ModRevision: 5, Version: 1, Lease: 3},
552 },
553 },
554 }
555 for i, tt := range tests {
556 _, err := s.Compact(traceutil.TODO(), tt.rev)
557 if err != nil {
558 t.Errorf("#%d: unexpect compact error %v", i, err)
559 }
560 r, err := s.Range(context.TODO(), []byte("foo"), nil, RangeOptions{Rev: tt.rev + 1})
561 if err != nil {
562 t.Errorf("#%d: unexpect range error %v", i, err)
563 }
564 if !reflect.DeepEqual(r.KVs, tt.wkvs) {
565 t.Errorf("#%d: kvs = %+v, want %+v", i, r.KVs, tt.wkvs)
566 }
567 }
568 }
569
570 func TestKVCompactBad(t *testing.T) {
571 b, tmpPath := betesting.NewDefaultTmpBackend(t)
572 s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
573 defer cleanup(s, b, tmpPath)
574
575 s.Put([]byte("foo"), []byte("bar0"), lease.NoLease)
576 s.Put([]byte("foo"), []byte("bar1"), lease.NoLease)
577 s.Put([]byte("foo"), []byte("bar2"), lease.NoLease)
578
579
580 tests := []struct {
581 rev int64
582 werr error
583 }{
584 {0, nil},
585 {1, nil},
586 {1, ErrCompacted},
587 {4, nil},
588 {5, ErrFutureRev},
589 {100, ErrFutureRev},
590 }
591 for i, tt := range tests {
592 _, err := s.Compact(traceutil.TODO(), tt.rev)
593 if err != tt.werr {
594 t.Errorf("#%d: compact error = %v, want %v", i, err, tt.werr)
595 }
596 }
597 }
598
599 func TestKVHash(t *testing.T) {
600 hashes := make([]uint32, 3)
601
602 for i := 0; i < len(hashes); i++ {
603 var err error
604 b, tmpPath := betesting.NewDefaultTmpBackend(t)
605 kv := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
606 kv.Put([]byte("foo0"), []byte("bar0"), lease.NoLease)
607 kv.Put([]byte("foo1"), []byte("bar0"), lease.NoLease)
608 hashes[i], _, err = kv.hash()
609 if err != nil {
610 t.Fatalf("failed to get hash: %v", err)
611 }
612 cleanup(kv, b, tmpPath)
613 }
614
615 for i := 1; i < len(hashes); i++ {
616 if hashes[i-1] != hashes[i] {
617 t.Errorf("hash[%d](%d) != hash[%d](%d)", i-1, hashes[i-1], i, hashes[i])
618 }
619 }
620 }
621
622 func TestKVRestore(t *testing.T) {
623 tests := []func(kv KV){
624 func(kv KV) {
625 kv.Put([]byte("foo"), []byte("bar0"), 1)
626 kv.Put([]byte("foo"), []byte("bar1"), 2)
627 kv.Put([]byte("foo"), []byte("bar2"), 3)
628 kv.Put([]byte("foo2"), []byte("bar0"), 1)
629 },
630 func(kv KV) {
631 kv.Put([]byte("foo"), []byte("bar0"), 1)
632 kv.DeleteRange([]byte("foo"), nil)
633 kv.Put([]byte("foo"), []byte("bar1"), 2)
634 },
635 func(kv KV) {
636 kv.Put([]byte("foo"), []byte("bar0"), 1)
637 kv.Put([]byte("foo"), []byte("bar1"), 2)
638 kv.Compact(traceutil.TODO(), 1)
639 },
640 }
641 for i, tt := range tests {
642 b, tmpPath := betesting.NewDefaultTmpBackend(t)
643 s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
644 tt(s)
645 var kvss [][]mvccpb.KeyValue
646 for k := int64(0); k < 10; k++ {
647 r, _ := s.Range(context.TODO(), []byte("a"), []byte("z"), RangeOptions{Rev: k})
648 kvss = append(kvss, r.KVs)
649 }
650
651 keysBefore := readGaugeInt(keysGauge)
652 s.Close()
653
654
655 ns := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
656
657 if keysRestore := readGaugeInt(keysGauge); keysBefore != keysRestore {
658 t.Errorf("#%d: got %d key count, expected %d", i, keysRestore, keysBefore)
659 }
660
661
662 testutil.WaitSchedule()
663 var nkvss [][]mvccpb.KeyValue
664 for k := int64(0); k < 10; k++ {
665 r, _ := ns.Range(context.TODO(), []byte("a"), []byte("z"), RangeOptions{Rev: k})
666 nkvss = append(nkvss, r.KVs)
667 }
668 cleanup(ns, b, tmpPath)
669
670 if !reflect.DeepEqual(nkvss, kvss) {
671 t.Errorf("#%d: kvs history = %+v, want %+v", i, nkvss, kvss)
672 }
673 }
674 }
675
676 func readGaugeInt(g prometheus.Gauge) int {
677 ch := make(chan prometheus.Metric, 1)
678 g.Collect(ch)
679 m := <-ch
680 mm := &dto.Metric{}
681 m.Write(mm)
682 return int(mm.GetGauge().GetValue())
683 }
684
685 func TestKVSnapshot(t *testing.T) {
686 b, tmpPath := betesting.NewDefaultTmpBackend(t)
687 s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
688 defer cleanup(s, b, tmpPath)
689
690 wkvs := put3TestKVs(s)
691
692 newPath := "new_test"
693 f, err := os.Create(newPath)
694 if err != nil {
695 t.Fatal(err)
696 }
697 defer os.Remove(newPath)
698
699 snap := s.b.Snapshot()
700 defer snap.Close()
701 _, err = snap.WriteTo(f)
702 if err != nil {
703 t.Fatal(err)
704 }
705 f.Close()
706
707 ns := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
708 defer ns.Close()
709 r, err := ns.Range(context.TODO(), []byte("a"), []byte("z"), RangeOptions{})
710 if err != nil {
711 t.Errorf("unexpect range error (%v)", err)
712 }
713 if !reflect.DeepEqual(r.KVs, wkvs) {
714 t.Errorf("kvs = %+v, want %+v", r.KVs, wkvs)
715 }
716 if r.Rev != 4 {
717 t.Errorf("rev = %d, want %d", r.Rev, 4)
718 }
719 }
720
721 func TestWatchableKVWatch(t *testing.T) {
722 b, tmpPath := betesting.NewDefaultTmpBackend(t)
723 s := WatchableKV(newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{}))
724 defer cleanup(s, b, tmpPath)
725
726 w := s.NewWatchStream()
727 defer w.Close()
728
729 wid, _ := w.Watch(0, []byte("foo"), []byte("fop"), 0)
730
731 wev := []mvccpb.Event{
732 {Type: mvccpb.PUT,
733 Kv: &mvccpb.KeyValue{
734 Key: []byte("foo"),
735 Value: []byte("bar"),
736 CreateRevision: 2,
737 ModRevision: 2,
738 Version: 1,
739 Lease: 1,
740 },
741 },
742 {
743 Type: mvccpb.PUT,
744 Kv: &mvccpb.KeyValue{
745 Key: []byte("foo1"),
746 Value: []byte("bar1"),
747 CreateRevision: 3,
748 ModRevision: 3,
749 Version: 1,
750 Lease: 2,
751 },
752 },
753 {
754 Type: mvccpb.PUT,
755 Kv: &mvccpb.KeyValue{
756 Key: []byte("foo1"),
757 Value: []byte("bar11"),
758 CreateRevision: 3,
759 ModRevision: 4,
760 Version: 2,
761 Lease: 3,
762 },
763 },
764 }
765
766 s.Put([]byte("foo"), []byte("bar"), 1)
767 select {
768 case resp := <-w.Chan():
769 if resp.WatchID != wid {
770 t.Errorf("resp.WatchID got = %d, want = %d", resp.WatchID, wid)
771 }
772 ev := resp.Events[0]
773 if !reflect.DeepEqual(ev, wev[0]) {
774 t.Errorf("watched event = %+v, want %+v", ev, wev[0])
775 }
776 case <-time.After(5 * time.Second):
777
778 testutil.FatalStack(t, "failed to watch the event")
779 }
780
781 s.Put([]byte("foo1"), []byte("bar1"), 2)
782 select {
783 case resp := <-w.Chan():
784 if resp.WatchID != wid {
785 t.Errorf("resp.WatchID got = %d, want = %d", resp.WatchID, wid)
786 }
787 ev := resp.Events[0]
788 if !reflect.DeepEqual(ev, wev[1]) {
789 t.Errorf("watched event = %+v, want %+v", ev, wev[1])
790 }
791 case <-time.After(5 * time.Second):
792 testutil.FatalStack(t, "failed to watch the event")
793 }
794
795 w = s.NewWatchStream()
796 wid, _ = w.Watch(0, []byte("foo1"), []byte("foo2"), 3)
797
798 select {
799 case resp := <-w.Chan():
800 if resp.WatchID != wid {
801 t.Errorf("resp.WatchID got = %d, want = %d", resp.WatchID, wid)
802 }
803 ev := resp.Events[0]
804 if !reflect.DeepEqual(ev, wev[1]) {
805 t.Errorf("watched event = %+v, want %+v", ev, wev[1])
806 }
807 case <-time.After(5 * time.Second):
808 testutil.FatalStack(t, "failed to watch the event")
809 }
810
811 s.Put([]byte("foo1"), []byte("bar11"), 3)
812 select {
813 case resp := <-w.Chan():
814 if resp.WatchID != wid {
815 t.Errorf("resp.WatchID got = %d, want = %d", resp.WatchID, wid)
816 }
817 ev := resp.Events[0]
818 if !reflect.DeepEqual(ev, wev[2]) {
819 t.Errorf("watched event = %+v, want %+v", ev, wev[2])
820 }
821 case <-time.After(5 * time.Second):
822 testutil.FatalStack(t, "failed to watch the event")
823 }
824 }
825
826 func cleanup(s KV, b backend.Backend, path string) {
827 s.Close()
828 b.Close()
829 os.Remove(path)
830 }
831
832 func put3TestKVs(s KV) []mvccpb.KeyValue {
833 s.Put([]byte("foo"), []byte("bar"), 1)
834 s.Put([]byte("foo1"), []byte("bar1"), 2)
835 s.Put([]byte("foo2"), []byte("bar2"), 3)
836 return []mvccpb.KeyValue{
837 {Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 2, ModRevision: 2, Version: 1, Lease: 1},
838 {Key: []byte("foo1"), Value: []byte("bar1"), CreateRevision: 3, ModRevision: 3, Version: 1, Lease: 2},
839 {Key: []byte("foo2"), Value: []byte("bar2"), CreateRevision: 4, ModRevision: 4, Version: 1, Lease: 3},
840 }
841 }
842
View as plain text