1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package mvcc
16
17 import (
18 "bytes"
19 "context"
20 "crypto/rand"
21 "encoding/binary"
22 "fmt"
23 "math"
24 mrand "math/rand"
25 "os"
26 "reflect"
27 "sort"
28 "strconv"
29 "sync"
30 "testing"
31 "time"
32
33 "go.etcd.io/etcd/api/v3/mvccpb"
34 "go.etcd.io/etcd/client/pkg/v3/testutil"
35 "go.etcd.io/etcd/pkg/v3/schedule"
36 "go.etcd.io/etcd/pkg/v3/traceutil"
37 "go.etcd.io/etcd/server/v3/lease"
38 "go.etcd.io/etcd/server/v3/mvcc/backend"
39 betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing"
40 "go.etcd.io/etcd/server/v3/mvcc/buckets"
41
42 "go.uber.org/zap"
43 "go.uber.org/zap/zaptest"
44 )
45
46 func TestStoreRev(t *testing.T) {
47 b, _ := betesting.NewDefaultTmpBackend(t)
48 s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
49 defer s.Close()
50
51 for i := 1; i <= 3; i++ {
52 s.Put([]byte("foo"), []byte("bar"), lease.NoLease)
53 if r := s.Rev(); r != int64(i+1) {
54 t.Errorf("#%d: rev = %d, want %d", i, r, i+1)
55 }
56 }
57 }
58
59 func TestStorePut(t *testing.T) {
60 kv := mvccpb.KeyValue{
61 Key: []byte("foo"),
62 Value: []byte("bar"),
63 CreateRevision: 1,
64 ModRevision: 2,
65 Version: 1,
66 }
67 kvb, err := kv.Marshal()
68 if err != nil {
69 t.Fatal(err)
70 }
71
72 tests := []struct {
73 rev revision
74 r indexGetResp
75 rr *rangeResp
76
77 wrev revision
78 wkey []byte
79 wkv mvccpb.KeyValue
80 wputrev revision
81 }{
82 {
83 revision{1, 0},
84 indexGetResp{revision{}, revision{}, 0, ErrRevisionNotFound},
85 nil,
86
87 revision{2, 0},
88 newTestKeyBytes(revision{2, 0}, false),
89 mvccpb.KeyValue{
90 Key: []byte("foo"),
91 Value: []byte("bar"),
92 CreateRevision: 2,
93 ModRevision: 2,
94 Version: 1,
95 Lease: 1,
96 },
97 revision{2, 0},
98 },
99 {
100 revision{1, 1},
101 indexGetResp{revision{2, 0}, revision{2, 0}, 1, nil},
102 &rangeResp{[][]byte{newTestKeyBytes(revision{2, 1}, false)}, [][]byte{kvb}},
103
104 revision{2, 0},
105 newTestKeyBytes(revision{2, 0}, false),
106 mvccpb.KeyValue{
107 Key: []byte("foo"),
108 Value: []byte("bar"),
109 CreateRevision: 2,
110 ModRevision: 2,
111 Version: 2,
112 Lease: 2,
113 },
114 revision{2, 0},
115 },
116 {
117 revision{2, 0},
118 indexGetResp{revision{2, 1}, revision{2, 0}, 2, nil},
119 &rangeResp{[][]byte{newTestKeyBytes(revision{2, 1}, false)}, [][]byte{kvb}},
120
121 revision{3, 0},
122 newTestKeyBytes(revision{3, 0}, false),
123 mvccpb.KeyValue{
124 Key: []byte("foo"),
125 Value: []byte("bar"),
126 CreateRevision: 2,
127 ModRevision: 3,
128 Version: 3,
129 Lease: 3,
130 },
131 revision{3, 0},
132 },
133 }
134 for i, tt := range tests {
135 s := newFakeStore()
136 b := s.b.(*fakeBackend)
137 fi := s.kvindex.(*fakeIndex)
138
139 s.currentRev = tt.rev.main
140 fi.indexGetRespc <- tt.r
141 if tt.rr != nil {
142 b.tx.rangeRespc <- *tt.rr
143 }
144
145 s.Put([]byte("foo"), []byte("bar"), lease.LeaseID(i+1))
146
147 data, err := tt.wkv.Marshal()
148 if err != nil {
149 t.Errorf("#%d: marshal err = %v, want nil", i, err)
150 }
151
152 wact := []testutil.Action{
153 {Name: "seqput", Params: []interface{}{buckets.Key, tt.wkey, data}},
154 }
155
156 if tt.rr != nil {
157 wact = []testutil.Action{
158 {Name: "seqput", Params: []interface{}{buckets.Key, tt.wkey, data}},
159 }
160 }
161
162 if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
163 t.Errorf("#%d: tx action = %+v, want %+v", i, g, wact)
164 }
165 wact = []testutil.Action{
166 {Name: "get", Params: []interface{}{[]byte("foo"), tt.wputrev.main}},
167 {Name: "put", Params: []interface{}{[]byte("foo"), tt.wputrev}},
168 }
169 if g := fi.Action(); !reflect.DeepEqual(g, wact) {
170 t.Errorf("#%d: index action = %+v, want %+v", i, g, wact)
171 }
172 if s.currentRev != tt.wrev.main {
173 t.Errorf("#%d: rev = %+v, want %+v", i, s.currentRev, tt.wrev)
174 }
175
176 s.Close()
177 }
178 }
179
180 func TestStoreRange(t *testing.T) {
181 key := newTestKeyBytes(revision{2, 0}, false)
182 kv := mvccpb.KeyValue{
183 Key: []byte("foo"),
184 Value: []byte("bar"),
185 CreateRevision: 1,
186 ModRevision: 2,
187 Version: 1,
188 }
189 kvb, err := kv.Marshal()
190 if err != nil {
191 t.Fatal(err)
192 }
193 wrev := int64(2)
194
195 tests := []struct {
196 idxr indexRangeResp
197 r rangeResp
198 }{
199 {
200 indexRangeResp{[][]byte{[]byte("foo")}, []revision{{2, 0}}},
201 rangeResp{[][]byte{key}, [][]byte{kvb}},
202 },
203 {
204 indexRangeResp{[][]byte{[]byte("foo"), []byte("foo1")}, []revision{{2, 0}, {3, 0}}},
205 rangeResp{[][]byte{key}, [][]byte{kvb}},
206 },
207 }
208
209 ro := RangeOptions{Limit: 1, Rev: 0, Count: false}
210 for i, tt := range tests {
211 s := newFakeStore()
212 b := s.b.(*fakeBackend)
213 fi := s.kvindex.(*fakeIndex)
214
215 s.currentRev = 2
216 b.tx.rangeRespc <- tt.r
217 fi.indexRangeRespc <- tt.idxr
218
219 ret, err := s.Range(context.TODO(), []byte("foo"), []byte("goo"), ro)
220 if err != nil {
221 t.Errorf("#%d: err = %v, want nil", i, err)
222 }
223 if w := []mvccpb.KeyValue{kv}; !reflect.DeepEqual(ret.KVs, w) {
224 t.Errorf("#%d: kvs = %+v, want %+v", i, ret.KVs, w)
225 }
226 if ret.Rev != wrev {
227 t.Errorf("#%d: rev = %d, want %d", i, ret.Rev, wrev)
228 }
229
230 wstart := newRevBytes()
231 revToBytes(tt.idxr.revs[0], wstart)
232 wact := []testutil.Action{
233 {Name: "range", Params: []interface{}{buckets.Key, wstart, []byte(nil), int64(0)}},
234 }
235 if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
236 t.Errorf("#%d: tx action = %+v, want %+v", i, g, wact)
237 }
238 wact = []testutil.Action{
239 {Name: "range", Params: []interface{}{[]byte("foo"), []byte("goo"), wrev}},
240 }
241 if g := fi.Action(); !reflect.DeepEqual(g, wact) {
242 t.Errorf("#%d: index action = %+v, want %+v", i, g, wact)
243 }
244 if s.currentRev != 2 {
245 t.Errorf("#%d: current rev = %+v, want %+v", i, s.currentRev, 2)
246 }
247
248 s.Close()
249 }
250 }
251
252 func TestStoreDeleteRange(t *testing.T) {
253 key := newTestKeyBytes(revision{2, 0}, false)
254 kv := mvccpb.KeyValue{
255 Key: []byte("foo"),
256 Value: []byte("bar"),
257 CreateRevision: 1,
258 ModRevision: 2,
259 Version: 1,
260 }
261 kvb, err := kv.Marshal()
262 if err != nil {
263 t.Fatal(err)
264 }
265
266 tests := []struct {
267 rev revision
268 r indexRangeResp
269 rr rangeResp
270
271 wkey []byte
272 wrev revision
273 wrrev int64
274 wdelrev revision
275 }{
276 {
277 revision{2, 0},
278 indexRangeResp{[][]byte{[]byte("foo")}, []revision{{2, 0}}},
279 rangeResp{[][]byte{key}, [][]byte{kvb}},
280
281 newTestKeyBytes(revision{3, 0}, true),
282 revision{3, 0},
283 2,
284 revision{3, 0},
285 },
286 }
287 for i, tt := range tests {
288 s := newFakeStore()
289 b := s.b.(*fakeBackend)
290 fi := s.kvindex.(*fakeIndex)
291
292 s.currentRev = tt.rev.main
293 fi.indexRangeRespc <- tt.r
294 b.tx.rangeRespc <- tt.rr
295
296 n, _ := s.DeleteRange([]byte("foo"), []byte("goo"))
297 if n != 1 {
298 t.Errorf("#%d: n = %d, want 1", i, n)
299 }
300
301 data, err := (&mvccpb.KeyValue{
302 Key: []byte("foo"),
303 }).Marshal()
304 if err != nil {
305 t.Errorf("#%d: marshal err = %v, want nil", i, err)
306 }
307 wact := []testutil.Action{
308 {Name: "seqput", Params: []interface{}{buckets.Key, tt.wkey, data}},
309 }
310 if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
311 t.Errorf("#%d: tx action = %+v, want %+v", i, g, wact)
312 }
313 wact = []testutil.Action{
314 {Name: "range", Params: []interface{}{[]byte("foo"), []byte("goo"), tt.wrrev}},
315 {Name: "tombstone", Params: []interface{}{[]byte("foo"), tt.wdelrev}},
316 }
317 if g := fi.Action(); !reflect.DeepEqual(g, wact) {
318 t.Errorf("#%d: index action = %+v, want %+v", i, g, wact)
319 }
320 if s.currentRev != tt.wrev.main {
321 t.Errorf("#%d: rev = %+v, want %+v", i, s.currentRev, tt.wrev)
322 }
323 }
324 }
325
326 func TestStoreCompact(t *testing.T) {
327 s := newFakeStore()
328 defer s.Close()
329 b := s.b.(*fakeBackend)
330 fi := s.kvindex.(*fakeIndex)
331
332 s.currentRev = 3
333 fi.indexCompactRespc <- map[revision]struct{}{{1, 0}: {}}
334 key1 := newTestKeyBytes(revision{1, 0}, false)
335 key2 := newTestKeyBytes(revision{2, 0}, false)
336 b.tx.rangeRespc <- rangeResp{[][]byte{}, [][]byte{}}
337 b.tx.rangeRespc <- rangeResp{[][]byte{}, [][]byte{}}
338 b.tx.rangeRespc <- rangeResp{[][]byte{key1, key2}, [][]byte{[]byte("alice"), []byte("bob")}}
339
340 s.Compact(traceutil.TODO(), 3)
341 s.fifoSched.WaitFinish(1)
342
343 if s.compactMainRev != 3 {
344 t.Errorf("compact main rev = %d, want 3", s.compactMainRev)
345 }
346 end := make([]byte, 8)
347 binary.BigEndian.PutUint64(end, uint64(4))
348 wact := []testutil.Action{
349 {Name: "range", Params: []interface{}{buckets.Meta, scheduledCompactKeyName, []uint8(nil), int64(0)}},
350 {Name: "range", Params: []interface{}{buckets.Meta, finishedCompactKeyName, []uint8(nil), int64(0)}},
351 {Name: "put", Params: []interface{}{buckets.Meta, scheduledCompactKeyName, newTestRevBytes(revision{3, 0})}},
352 {Name: "range", Params: []interface{}{buckets.Key, make([]byte, 17), end, int64(10000)}},
353 {Name: "delete", Params: []interface{}{buckets.Key, key2}},
354 {Name: "put", Params: []interface{}{buckets.Meta, finishedCompactKeyName, newTestRevBytes(revision{3, 0})}},
355 }
356 if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
357 t.Errorf("tx actions = %+v, want %+v", g, wact)
358 }
359 wact = []testutil.Action{
360 {Name: "compact", Params: []interface{}{int64(3)}},
361 }
362 if g := fi.Action(); !reflect.DeepEqual(g, wact) {
363 t.Errorf("index action = %+v, want %+v", g, wact)
364 }
365 }
366
367 func TestStoreRestore(t *testing.T) {
368 s := newFakeStore()
369 b := s.b.(*fakeBackend)
370 fi := s.kvindex.(*fakeIndex)
371
372 putkey := newTestKeyBytes(revision{3, 0}, false)
373 putkv := mvccpb.KeyValue{
374 Key: []byte("foo"),
375 Value: []byte("bar"),
376 CreateRevision: 4,
377 ModRevision: 4,
378 Version: 1,
379 }
380 putkvb, err := putkv.Marshal()
381 if err != nil {
382 t.Fatal(err)
383 }
384 delkey := newTestKeyBytes(revision{5, 0}, true)
385 delkv := mvccpb.KeyValue{
386 Key: []byte("foo"),
387 }
388 delkvb, err := delkv.Marshal()
389 if err != nil {
390 t.Fatal(err)
391 }
392 b.tx.rangeRespc <- rangeResp{[][]byte{finishedCompactKeyName}, [][]byte{newTestRevBytes(revision{3, 0})}}
393 b.tx.rangeRespc <- rangeResp{[][]byte{scheduledCompactKeyName}, [][]byte{newTestRevBytes(revision{3, 0})}}
394
395 b.tx.rangeRespc <- rangeResp{[][]byte{putkey, delkey}, [][]byte{putkvb, delkvb}}
396 b.tx.rangeRespc <- rangeResp{nil, nil}
397
398 s.restore()
399
400 if s.compactMainRev != 3 {
401 t.Errorf("compact rev = %d, want 3", s.compactMainRev)
402 }
403 if s.currentRev != 5 {
404 t.Errorf("current rev = %v, want 5", s.currentRev)
405 }
406 wact := []testutil.Action{
407 {Name: "range", Params: []interface{}{buckets.Meta, finishedCompactKeyName, []byte(nil), int64(0)}},
408 {Name: "range", Params: []interface{}{buckets.Meta, scheduledCompactKeyName, []byte(nil), int64(0)}},
409 {Name: "range", Params: []interface{}{buckets.Key, newTestRevBytes(revision{1, 0}), newTestRevBytes(revision{math.MaxInt64, math.MaxInt64}), int64(restoreChunkKeys)}},
410 }
411 if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
412 t.Errorf("tx actions = %+v, want %+v", g, wact)
413 }
414
415 gens := []generation{
416 {created: revision{4, 0}, ver: 2, revs: []revision{{3, 0}, {5, 0}}},
417 {created: revision{0, 0}, ver: 0, revs: nil},
418 }
419 ki := &keyIndex{key: []byte("foo"), modified: revision{5, 0}, generations: gens}
420 wact = []testutil.Action{
421 {Name: "keyIndex", Params: []interface{}{ki}},
422 {Name: "insert", Params: []interface{}{ki}},
423 }
424 if g := fi.Action(); !reflect.DeepEqual(g, wact) {
425 t.Errorf("index action = %+v, want %+v", g, wact)
426 }
427 }
428
429 func TestRestoreDelete(t *testing.T) {
430 oldChunk := restoreChunkKeys
431 restoreChunkKeys = mrand.Intn(3) + 2
432 defer func() { restoreChunkKeys = oldChunk }()
433
434 b, _ := betesting.NewDefaultTmpBackend(t)
435 s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
436
437 keys := make(map[string]struct{})
438 for i := 0; i < 20; i++ {
439 ks := fmt.Sprintf("foo-%d", i)
440 k := []byte(ks)
441 s.Put(k, []byte("bar"), lease.NoLease)
442 keys[ks] = struct{}{}
443 switch mrand.Intn(3) {
444 case 0:
445
446 ks = fmt.Sprintf("foo-%d", mrand.Intn(i+1))
447 s.Put([]byte(ks), []byte("baz"), lease.NoLease)
448 keys[ks] = struct{}{}
449 case 1:
450
451 for k := range keys {
452 s.DeleteRange([]byte(k), nil)
453 delete(keys, k)
454 break
455 }
456 }
457 }
458 s.Close()
459
460 s = NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
461 defer s.Close()
462 for i := 0; i < 20; i++ {
463 ks := fmt.Sprintf("foo-%d", i)
464 r, err := s.Range(context.TODO(), []byte(ks), nil, RangeOptions{})
465 if err != nil {
466 t.Fatal(err)
467 }
468 if _, ok := keys[ks]; ok {
469 if len(r.KVs) == 0 {
470 t.Errorf("#%d: expected %q, got deleted", i, ks)
471 }
472 } else if len(r.KVs) != 0 {
473 t.Errorf("#%d: expected deleted, got %q", i, ks)
474 }
475 }
476 }
477
478 func TestRestoreContinueUnfinishedCompaction(t *testing.T) {
479 tests := []string{"recreate", "restore"}
480 for _, test := range tests {
481 b, _ := betesting.NewDefaultTmpBackend(t)
482 s0 := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
483
484 s0.Put([]byte("foo"), []byte("bar"), lease.NoLease)
485 s0.Put([]byte("foo"), []byte("bar1"), lease.NoLease)
486 s0.Put([]byte("foo"), []byte("bar2"), lease.NoLease)
487
488
489 rbytes := newRevBytes()
490 revToBytes(revision{main: 2}, rbytes)
491 tx := s0.b.BatchTx()
492 tx.Lock()
493 tx.UnsafePut(buckets.Meta, scheduledCompactKeyName, rbytes)
494 tx.Unlock()
495
496 s0.Close()
497
498 var s *store
499 switch test {
500 case "recreate":
501 s = NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
502 case "restore":
503 s0.Restore(b)
504 s = s0
505 }
506
507
508 time.Sleep(100 * time.Millisecond)
509
510 if _, err := s.Range(context.TODO(), []byte("foo"), nil, RangeOptions{Rev: 1}); err != ErrCompacted {
511 t.Errorf("range on compacted rev error = %v, want %v", err, ErrCompacted)
512 }
513
514 revbytes := newRevBytes()
515 revToBytes(revision{main: 1}, revbytes)
516
517
518
519 for i := 0; i < 5; i++ {
520 tx = s.b.BatchTx()
521 tx.Lock()
522 ks, _ := tx.UnsafeRange(buckets.Key, revbytes, nil, 0)
523 tx.Unlock()
524 if len(ks) != 0 {
525 time.Sleep(100 * time.Millisecond)
526 continue
527 }
528 return
529 }
530
531 t.Errorf("key for rev %+v still exists, want deleted", bytesToRev(revbytes))
532 }
533 }
534
535 type hashKVResult struct {
536 hash uint32
537 compactRev int64
538 }
539
540
541 func TestHashKVWhenCompacting(t *testing.T) {
542 b, tmpPath := betesting.NewDefaultTmpBackend(t)
543 s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
544 defer cleanup(s, b, tmpPath)
545
546 rev := 10000
547 for i := 2; i <= rev; i++ {
548 s.Put([]byte("foo"), []byte(fmt.Sprintf("bar%d", i)), lease.NoLease)
549 }
550
551 hashCompactc := make(chan hashKVResult, 1)
552 var wg sync.WaitGroup
553 donec := make(chan struct{})
554 stopc := make(chan struct{})
555
556
557 for i := 0; i < 10; i++ {
558 wg.Add(1)
559 go func() {
560 defer wg.Done()
561 for {
562 hash, _, err := s.HashStorage().HashByRev(int64(rev))
563 if err != nil {
564 t.Error(err)
565 }
566 select {
567 case <-stopc:
568 return
569 case <-donec:
570 return
571 case hashCompactc <- hashKVResult{hash.Hash, hash.CompactRevision}:
572 }
573 }
574 }()
575 }
576
577
578 wg.Add(1)
579 go func() {
580 defer wg.Done()
581 revHash := make(map[int64]uint32)
582 for {
583 r := <-hashCompactc
584 if revHash[r.compactRev] == 0 {
585 revHash[r.compactRev] = r.hash
586 }
587 if r.hash != revHash[r.compactRev] {
588 t.Errorf("Hashes differ (current %v) != (saved %v)", r.hash, revHash[r.compactRev])
589 }
590
591 select {
592 case <-stopc:
593 return
594 case <-donec:
595 return
596 default:
597 }
598 }
599 }()
600
601
602 wg.Add(1)
603 go func() {
604 defer func() {
605 close(donec)
606 wg.Done()
607 }()
608
609 for i := 100; i >= 0; i-- {
610 select {
611 case <-stopc:
612 return
613 default:
614 }
615
616 _, err := s.Compact(traceutil.TODO(), int64(rev-i))
617 if err != nil {
618 t.Error(err)
619 }
620
621 s.fifoSched.WaitFinish(1)
622
623 time.Sleep(10 * time.Millisecond)
624 }
625 }()
626
627 select {
628 case <-donec:
629 case <-time.After(10 * time.Second):
630 close(stopc)
631 wg.Wait()
632 testutil.FatalStack(t, "timeout")
633 }
634
635 close(stopc)
636 wg.Wait()
637 }
638
639
640
641 func TestHashKVWithCompactedAndFutureRevisions(t *testing.T) {
642 b, tmpPath := betesting.NewDefaultTmpBackend(t)
643 s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
644 defer cleanup(s, b, tmpPath)
645
646 rev := 10000
647 compactRev := rev / 2
648
649 for i := 2; i <= rev; i++ {
650 s.Put([]byte("foo"), []byte(fmt.Sprintf("bar%d", i)), lease.NoLease)
651 }
652 if _, err := s.Compact(traceutil.TODO(), int64(compactRev)); err != nil {
653 t.Fatal(err)
654 }
655
656 _, _, errFutureRev := s.HashStorage().HashByRev(int64(rev + 1))
657 if errFutureRev != ErrFutureRev {
658 t.Error(errFutureRev)
659 }
660
661 _, _, errPastRev := s.HashStorage().HashByRev(int64(compactRev - 1))
662 if errPastRev != ErrCompacted {
663 t.Error(errPastRev)
664 }
665
666 _, _, errCompactRev := s.HashStorage().HashByRev(int64(compactRev))
667 if errCompactRev != nil {
668 t.Error(errCompactRev)
669 }
670 }
671
672
673
674 func TestHashKVZeroRevision(t *testing.T) {
675 b, tmpPath := betesting.NewDefaultTmpBackend(t)
676 s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
677 defer cleanup(s, b, tmpPath)
678
679 rev := 10000
680 for i := 2; i <= rev; i++ {
681 s.Put([]byte("foo"), []byte(fmt.Sprintf("bar%d", i)), lease.NoLease)
682 }
683 if _, err := s.Compact(traceutil.TODO(), int64(rev/2)); err != nil {
684 t.Fatal(err)
685 }
686
687 hash1, _, err := s.HashStorage().HashByRev(int64(rev))
688 if err != nil {
689 t.Fatal(err)
690 }
691 var hash2 KeyValueHash
692 hash2, _, err = s.HashStorage().HashByRev(0)
693 if err != nil {
694 t.Fatal(err)
695 }
696 if hash1 != hash2 {
697 t.Errorf("hash %d (rev %d) != hash %d (rev 0)", hash1, rev, hash2)
698 }
699 }
700
701 func TestTxnPut(t *testing.T) {
702
703 bytesN := 30
704 sliceN := 100
705 keys := createBytesSlice(bytesN, sliceN)
706 vals := createBytesSlice(bytesN, sliceN)
707
708 b, tmpPath := betesting.NewDefaultTmpBackend(t)
709 s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
710 defer cleanup(s, b, tmpPath)
711
712 for i := 0; i < sliceN; i++ {
713 txn := s.Write(traceutil.TODO())
714 base := int64(i + 2)
715 if rev := txn.Put(keys[i], vals[i], lease.NoLease); rev != base {
716 t.Errorf("#%d: rev = %d, want %d", i, rev, base)
717 }
718 txn.End()
719 }
720 }
721
722
723 func TestConcurrentReadNotBlockingWrite(t *testing.T) {
724 b, tmpPath := betesting.NewDefaultTmpBackend(t)
725 s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
726 defer os.Remove(tmpPath)
727
728
729 s.Put([]byte("foo"), []byte("bar"), lease.NoLease)
730
731
732 readTx1 := s.Read(ConcurrentReadTxMode, traceutil.TODO())
733
734
735 done := make(chan struct{}, 1)
736 go func() {
737 s.Put([]byte("foo"), []byte("newBar"), lease.NoLease)
738 done <- struct{}{}
739 }()
740 select {
741 case <-done:
742 case <-time.After(1 * time.Second):
743 t.Fatalf("write should not be blocked by read")
744 }
745
746
747 readTx2 := s.Read(ConcurrentReadTxMode, traceutil.TODO())
748 ro := RangeOptions{Limit: 1, Rev: 0, Count: false}
749 ret, err := readTx2.Range(context.TODO(), []byte("foo"), nil, ro)
750 if err != nil {
751 t.Fatalf("failed to range: %v", err)
752 }
753
754 w := mvccpb.KeyValue{
755 Key: []byte("foo"),
756 Value: []byte("newBar"),
757 CreateRevision: 2,
758 ModRevision: 3,
759 Version: 2,
760 }
761 if !reflect.DeepEqual(ret.KVs[0], w) {
762 t.Fatalf("range result = %+v, want = %+v", ret.KVs[0], w)
763 }
764 readTx2.End()
765
766 ret, err = readTx1.Range(context.TODO(), []byte("foo"), nil, ro)
767 if err != nil {
768 t.Fatalf("failed to range: %v", err)
769 }
770
771 w = mvccpb.KeyValue{
772 Key: []byte("foo"),
773 Value: []byte("bar"),
774 CreateRevision: 2,
775 ModRevision: 2,
776 Version: 1,
777 }
778 if !reflect.DeepEqual(ret.KVs[0], w) {
779 t.Fatalf("range result = %+v, want = %+v", ret.KVs[0], w)
780 }
781 readTx1.End()
782 }
783
784
785 func TestConcurrentReadTxAndWrite(t *testing.T) {
786 var (
787 numOfReads = 100
788 numOfWrites = 100
789 maxNumOfPutsPerWrite = 10
790 committedKVs kvs
791 mu sync.Mutex
792 )
793 b, tmpPath := betesting.NewDefaultTmpBackend(t)
794 s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
795 defer os.Remove(tmpPath)
796
797 var wg sync.WaitGroup
798 wg.Add(numOfWrites)
799 for i := 0; i < numOfWrites; i++ {
800 go func() {
801 defer wg.Done()
802 time.Sleep(time.Duration(mrand.Intn(100)) * time.Millisecond)
803
804 tx := s.Write(traceutil.TODO())
805 numOfPuts := mrand.Intn(maxNumOfPutsPerWrite) + 1
806 var pendingKvs kvs
807 for j := 0; j < numOfPuts; j++ {
808 k := []byte(strconv.Itoa(mrand.Int()))
809 v := []byte(strconv.Itoa(mrand.Int()))
810 tx.Put(k, v, lease.NoLease)
811 pendingKvs = append(pendingKvs, kv{k, v})
812 }
813
814 mu.Lock()
815 committedKVs = merge(committedKVs, pendingKvs)
816 tx.End()
817 mu.Unlock()
818 }()
819 }
820
821 wg.Add(numOfReads)
822 for i := 0; i < numOfReads; i++ {
823 go func() {
824 defer wg.Done()
825 time.Sleep(time.Duration(mrand.Intn(100)) * time.Millisecond)
826
827 mu.Lock()
828 wKVs := make(kvs, len(committedKVs))
829 copy(wKVs, committedKVs)
830 tx := s.Read(ConcurrentReadTxMode, traceutil.TODO())
831 mu.Unlock()
832
833 ret, err := tx.Range(context.TODO(), []byte("\x00000000"), []byte("\xffffffff"), RangeOptions{})
834 tx.End()
835 if err != nil {
836 t.Errorf("failed to range keys: %v", err)
837 return
838 }
839 if len(wKVs) == 0 && len(ret.KVs) == 0 {
840 return
841 }
842 var result kvs
843 for _, keyValue := range ret.KVs {
844 result = append(result, kv{keyValue.Key, keyValue.Value})
845 }
846 if !reflect.DeepEqual(wKVs, result) {
847 t.Errorf("unexpected range result")
848 }
849 }()
850 }
851
852
853 doneC := make(chan struct{})
854 go func() {
855 wg.Wait()
856 close(doneC)
857 }()
858 select {
859 case <-doneC:
860 case <-time.After(5 * time.Minute):
861 testutil.FatalStack(t, "timeout")
862 }
863 }
864
865 type kv struct {
866 key []byte
867 val []byte
868 }
869
870 type kvs []kv
871
872 func (kvs kvs) Len() int { return len(kvs) }
873 func (kvs kvs) Less(i, j int) bool { return bytes.Compare(kvs[i].key, kvs[j].key) < 0 }
874 func (kvs kvs) Swap(i, j int) { kvs[i], kvs[j] = kvs[j], kvs[i] }
875
876 func merge(dst, src kvs) kvs {
877 dst = append(dst, src...)
878 sort.Stable(dst)
879
880
881 widx := 0
882 for ridx := 1; ridx < len(dst); ridx++ {
883 if !bytes.Equal(dst[widx].key, dst[ridx].key) {
884 widx++
885 }
886 dst[widx] = dst[ridx]
887 }
888 return dst[:widx+1]
889 }
890
891
892
893 func newTestRevBytes(rev revision) []byte {
894 bytes := newRevBytes()
895 revToBytes(rev, bytes)
896 return bytes
897 }
898
899 func newTestKeyBytes(rev revision, tombstone bool) []byte {
900 bytes := newRevBytes()
901 revToBytes(rev, bytes)
902 if tombstone {
903 bytes = appendMarkTombstone(zap.NewExample(), bytes)
904 }
905 return bytes
906 }
907
908 func newFakeStore() *store {
909 b := &fakeBackend{&fakeBatchTx{
910 Recorder: &testutil.RecorderBuffered{},
911 rangeRespc: make(chan rangeResp, 5)}}
912 s := &store{
913 cfg: StoreConfig{CompactionBatchLimit: 10000},
914 b: b,
915 le: &lease.FakeLessor{},
916 kvindex: newFakeIndex(),
917 currentRev: 0,
918 compactMainRev: -1,
919 fifoSched: schedule.NewFIFOScheduler(),
920 stopc: make(chan struct{}),
921 lg: zap.NewExample(),
922 }
923 s.ReadView, s.WriteView = &readView{s}, &writeView{s}
924 s.hashes = newHashStorage(zap.NewExample(), s)
925 return s
926 }
927
928 func newFakeIndex() *fakeIndex {
929 return &fakeIndex{
930 Recorder: &testutil.RecorderBuffered{},
931 indexGetRespc: make(chan indexGetResp, 1),
932 indexRangeRespc: make(chan indexRangeResp, 1),
933 indexRangeEventsRespc: make(chan indexRangeEventsResp, 1),
934 indexCompactRespc: make(chan map[revision]struct{}, 1),
935 }
936 }
937
938 type rangeResp struct {
939 keys [][]byte
940 vals [][]byte
941 }
942
943 type fakeBatchTx struct {
944 testutil.Recorder
945 rangeRespc chan rangeResp
946 }
947
948 func (b *fakeBatchTx) LockInsideApply() {}
949 func (b *fakeBatchTx) LockOutsideApply() {}
950 func (b *fakeBatchTx) Lock() {}
951 func (b *fakeBatchTx) Unlock() {}
952 func (b *fakeBatchTx) RLock() {}
953 func (b *fakeBatchTx) RUnlock() {}
954 func (b *fakeBatchTx) UnsafeCreateBucket(bucket backend.Bucket) {}
955 func (b *fakeBatchTx) UnsafeDeleteBucket(bucket backend.Bucket) {}
956 func (b *fakeBatchTx) UnsafePut(bucket backend.Bucket, key []byte, value []byte) {
957 b.Recorder.Record(testutil.Action{Name: "put", Params: []interface{}{bucket, key, value}})
958 }
959 func (b *fakeBatchTx) UnsafeSeqPut(bucket backend.Bucket, key []byte, value []byte) {
960 b.Recorder.Record(testutil.Action{Name: "seqput", Params: []interface{}{bucket, key, value}})
961 }
962 func (b *fakeBatchTx) UnsafeRange(bucket backend.Bucket, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte) {
963 b.Recorder.Record(testutil.Action{Name: "range", Params: []interface{}{bucket, key, endKey, limit}})
964 r := <-b.rangeRespc
965 return r.keys, r.vals
966 }
967 func (b *fakeBatchTx) UnsafeDelete(bucket backend.Bucket, key []byte) {
968 b.Recorder.Record(testutil.Action{Name: "delete", Params: []interface{}{bucket, key}})
969 }
970 func (b *fakeBatchTx) UnsafeForEach(bucket backend.Bucket, visitor func(k, v []byte) error) error {
971 return nil
972 }
973 func (b *fakeBatchTx) Commit() {}
974 func (b *fakeBatchTx) CommitAndStop() {}
975
976 type fakeBackend struct {
977 tx *fakeBatchTx
978 }
979
980 func (b *fakeBackend) BatchTx() backend.BatchTx { return b.tx }
981 func (b *fakeBackend) ReadTx() backend.ReadTx { return b.tx }
982 func (b *fakeBackend) ConcurrentReadTx() backend.ReadTx { return b.tx }
983 func (b *fakeBackend) Hash(func(bucketName, keyName []byte) bool) (uint32, error) { return 0, nil }
984 func (b *fakeBackend) Size() int64 { return 0 }
985 func (b *fakeBackend) SizeInUse() int64 { return 0 }
986 func (b *fakeBackend) OpenReadTxN() int64 { return 0 }
987 func (b *fakeBackend) Snapshot() backend.Snapshot { return nil }
988 func (b *fakeBackend) ForceCommit() {}
989 func (b *fakeBackend) Defrag() error { return nil }
990 func (b *fakeBackend) Close() error { return nil }
991 func (b *fakeBackend) SetTxPostLockInsideApplyHook(func()) {}
992
993 type indexGetResp struct {
994 rev revision
995 created revision
996 ver int64
997 err error
998 }
999
1000 type indexRangeResp struct {
1001 keys [][]byte
1002 revs []revision
1003 }
1004
1005 type indexRangeEventsResp struct {
1006 revs []revision
1007 }
1008
1009 type fakeIndex struct {
1010 testutil.Recorder
1011 indexGetRespc chan indexGetResp
1012 indexRangeRespc chan indexRangeResp
1013 indexRangeEventsRespc chan indexRangeEventsResp
1014 indexCompactRespc chan map[revision]struct{}
1015 }
1016
1017 func (i *fakeIndex) Revisions(key, end []byte, atRev int64, limit int) ([]revision, int) {
1018 _, rev := i.Range(key, end, atRev)
1019 if len(rev) >= limit {
1020 rev = rev[:limit]
1021 }
1022 return rev, len(rev)
1023 }
1024
1025 func (i *fakeIndex) CountRevisions(key, end []byte, atRev int64) int {
1026 _, rev := i.Range(key, end, atRev)
1027 return len(rev)
1028 }
1029
1030 func (i *fakeIndex) Get(key []byte, atRev int64) (rev, created revision, ver int64, err error) {
1031 i.Recorder.Record(testutil.Action{Name: "get", Params: []interface{}{key, atRev}})
1032 r := <-i.indexGetRespc
1033 return r.rev, r.created, r.ver, r.err
1034 }
1035 func (i *fakeIndex) Range(key, end []byte, atRev int64) ([][]byte, []revision) {
1036 i.Recorder.Record(testutil.Action{Name: "range", Params: []interface{}{key, end, atRev}})
1037 r := <-i.indexRangeRespc
1038 return r.keys, r.revs
1039 }
1040 func (i *fakeIndex) Put(key []byte, rev revision) {
1041 i.Recorder.Record(testutil.Action{Name: "put", Params: []interface{}{key, rev}})
1042 }
1043 func (i *fakeIndex) Tombstone(key []byte, rev revision) error {
1044 i.Recorder.Record(testutil.Action{Name: "tombstone", Params: []interface{}{key, rev}})
1045 return nil
1046 }
1047 func (i *fakeIndex) RangeSince(key, end []byte, rev int64) []revision {
1048 i.Recorder.Record(testutil.Action{Name: "rangeEvents", Params: []interface{}{key, end, rev}})
1049 r := <-i.indexRangeEventsRespc
1050 return r.revs
1051 }
1052 func (i *fakeIndex) Compact(rev int64) map[revision]struct{} {
1053 i.Recorder.Record(testutil.Action{Name: "compact", Params: []interface{}{rev}})
1054 return <-i.indexCompactRespc
1055 }
1056 func (i *fakeIndex) Keep(rev int64) map[revision]struct{} {
1057 i.Recorder.Record(testutil.Action{Name: "keep", Params: []interface{}{rev}})
1058 return <-i.indexCompactRespc
1059 }
1060 func (i *fakeIndex) Equal(b index) bool { return false }
1061
1062 func (i *fakeIndex) Insert(ki *keyIndex) {
1063 i.Recorder.Record(testutil.Action{Name: "insert", Params: []interface{}{ki}})
1064 }
1065
1066 func (i *fakeIndex) KeyIndex(ki *keyIndex) *keyIndex {
1067 i.Recorder.Record(testutil.Action{Name: "keyIndex", Params: []interface{}{ki}})
1068 return nil
1069 }
1070
1071 func createBytesSlice(bytesN, sliceN int) [][]byte {
1072 rs := [][]byte{}
1073 for len(rs) != sliceN {
1074 v := make([]byte, bytesN)
1075 if _, err := rand.Read(v); err != nil {
1076 panic(err)
1077 }
1078 rs = append(rs, v)
1079 }
1080 return rs
1081 }
1082
View as plain text