1
2
3
4
5
6
7 package leveldb
8
9 import (
10 "bytes"
11 "container/list"
12 crand "crypto/rand"
13 "encoding/binary"
14 "fmt"
15 "math/rand"
16 "os"
17 "path/filepath"
18 "runtime"
19 "strings"
20 "sync"
21 "sync/atomic"
22 "testing"
23 "time"
24 "unsafe"
25
26 "github.com/onsi/gomega"
27
28 "github.com/syndtr/goleveldb/leveldb/comparer"
29 "github.com/syndtr/goleveldb/leveldb/errors"
30 "github.com/syndtr/goleveldb/leveldb/filter"
31 "github.com/syndtr/goleveldb/leveldb/iterator"
32 "github.com/syndtr/goleveldb/leveldb/opt"
33 "github.com/syndtr/goleveldb/leveldb/storage"
34 "github.com/syndtr/goleveldb/leveldb/testutil"
35 "github.com/syndtr/goleveldb/leveldb/util"
36 )
37
38 func tkey(i int) []byte {
39 return []byte(fmt.Sprintf("%016d", i))
40 }
41
42 func tval(seed, n int) []byte {
43 r := rand.New(rand.NewSource(int64(seed)))
44 return randomString(r, n)
45 }
46
47 func testingLogger(t *testing.T) func(log string) {
48 return func(log string) {
49 t.Log(log)
50 }
51 }
52
53 func testingPreserveOnFailed(t *testing.T) func() (preserve bool, err error) {
54 return func() (preserve bool, err error) {
55 preserve = t.Failed()
56 return
57 }
58 }
59
60 type dbHarness struct {
61 t *testing.T
62
63 stor *testutil.Storage
64 db *DB
65 o *opt.Options
66 ro *opt.ReadOptions
67 wo *opt.WriteOptions
68 }
69
70 func newDbHarnessWopt(t *testing.T, o *opt.Options) *dbHarness {
71 h := new(dbHarness)
72 h.init(t, o)
73 return h
74 }
75
76 func newDbHarness(t *testing.T) *dbHarness {
77 return newDbHarnessWopt(t, &opt.Options{DisableLargeBatchTransaction: true})
78 }
79
80 func (h *dbHarness) init(t *testing.T, o *opt.Options) {
81 gomega.RegisterTestingT(t)
82 h.t = t
83 h.stor = testutil.NewStorage()
84 h.stor.OnLog(testingLogger(t))
85 h.stor.OnClose(testingPreserveOnFailed(t))
86 h.o = o
87 h.ro = nil
88 h.wo = nil
89
90 if err := h.openDB0(); err != nil {
91
92 defer h.stor.Close()
93 h.t.Fatal("Open (init): got error: ", err)
94 }
95 }
96
97 func (h *dbHarness) openDB0() (err error) {
98 h.t.Log("opening DB")
99 h.db, err = Open(h.stor, h.o)
100 return
101 }
102
103 func (h *dbHarness) openDB() {
104 if err := h.openDB0(); err != nil {
105 h.t.Fatal("Open: got error: ", err)
106 }
107 }
108
109 func (h *dbHarness) closeDB0() error {
110 h.t.Log("closing DB")
111 return h.db.Close()
112 }
113
114 func (h *dbHarness) closeDB() {
115 if h.db != nil {
116 if err := h.closeDB0(); err != nil && err != ErrClosed {
117 h.t.Error("Close: got error: ", err)
118 }
119 }
120 h.stor.CloseCheck()
121 runtime.GC()
122 }
123
124 func (h *dbHarness) reopenDB() {
125 if h.db != nil {
126 h.closeDB()
127 }
128 h.openDB()
129 }
130
131 func (h *dbHarness) close() {
132 if h.db != nil {
133 if err := h.closeDB0(); err != nil && err != ErrClosed {
134 h.t.Error("Close: got error: ", err)
135 }
136 h.db = nil
137 }
138 h.stor.Close()
139 h.stor = nil
140 runtime.GC()
141 }
142
143 func (h *dbHarness) openAssert(want bool) {
144 db, err := Open(h.stor, h.o)
145 if err != nil {
146 if want {
147 h.t.Error("Open: assert: got error: ", err)
148 } else {
149 h.t.Log("Open: assert: got error (expected): ", err)
150 }
151 } else {
152 if !want {
153 h.t.Error("Open: assert: expect error")
154 }
155 db.Close()
156 }
157 }
158
159 func (h *dbHarness) write(batch *Batch) {
160 if err := h.db.Write(batch, h.wo); err != nil {
161 h.t.Error("Write: got error: ", err)
162 }
163 }
164
165 func (h *dbHarness) put(key, value string) {
166 if err := h.db.Put([]byte(key), []byte(value), h.wo); err != nil {
167 h.t.Error("Put: got error: ", err)
168 }
169 }
170
171 func (h *dbHarness) putMulti(n int, low, hi string) {
172 for i := 0; i < n; i++ {
173 h.put(low, "begin")
174 h.put(hi, "end")
175 h.compactMem()
176 }
177 }
178
179 func (h *dbHarness) maxNextLevelOverlappingBytes(want int64) {
180 t := h.t
181 db := h.db
182
183 var (
184 maxOverlaps int64
185 maxLevel int
186 )
187 v := db.s.version()
188 if len(v.levels) > 2 {
189 for i, tt := range v.levels[1 : len(v.levels)-1] {
190 level := i + 1
191 next := v.levels[level+1]
192 for _, t := range tt {
193 r := next.getOverlaps(nil, db.s.icmp, t.imin.ukey(), t.imax.ukey(), false)
194 sum := r.size()
195 if sum > maxOverlaps {
196 maxOverlaps = sum
197 maxLevel = level
198 }
199 }
200 }
201 }
202 v.release()
203
204 if maxOverlaps > want {
205 t.Errorf("next level most overlapping bytes is more than %d, got=%d level=%d", want, maxOverlaps, maxLevel)
206 } else {
207 t.Logf("next level most overlapping bytes is %d, level=%d want=%d", maxOverlaps, maxLevel, want)
208 }
209 }
210
211 func (h *dbHarness) delete(key string) {
212 t := h.t
213 db := h.db
214
215 err := db.Delete([]byte(key), h.wo)
216 if err != nil {
217 t.Error("Delete: got error: ", err)
218 }
219 }
220
221 func (h *dbHarness) assertNumKeys(want int) {
222 iter := h.db.NewIterator(nil, h.ro)
223 defer iter.Release()
224 got := 0
225 for iter.Next() {
226 got++
227 }
228 if err := iter.Error(); err != nil {
229 h.t.Error("assertNumKeys: ", err)
230 }
231 if want != got {
232 h.t.Errorf("assertNumKeys: want=%d got=%d", want, got)
233 }
234 }
235
236 func (h *dbHarness) getr(db Reader, key string, expectFound bool) (found bool, v []byte) {
237 t := h.t
238 v, err := db.Get([]byte(key), h.ro)
239 switch err {
240 case ErrNotFound:
241 if expectFound {
242 t.Errorf("Get: key '%s' not found, want found", key)
243 }
244 case nil:
245 found = true
246 if !expectFound {
247 t.Errorf("Get: key '%s' found, want not found", key)
248 }
249 default:
250 t.Error("Get: got error: ", err)
251 }
252 return
253 }
254
255 func (h *dbHarness) get(key string, expectFound bool) (found bool, v []byte) {
256 return h.getr(h.db, key, expectFound)
257 }
258
259 func (h *dbHarness) getValr(db Reader, key, value string) {
260 t := h.t
261 found, r := h.getr(db, key, true)
262 if !found {
263 return
264 }
265 rval := string(r)
266 if rval != value {
267 t.Errorf("Get: invalid value, got '%s', want '%s'", rval, value)
268 }
269 }
270
271 func (h *dbHarness) getVal(key, value string) {
272 h.getValr(h.db, key, value)
273 }
274
275 func (h *dbHarness) allEntriesFor(key, want string) {
276 t := h.t
277 db := h.db
278 s := db.s
279
280 ikey := makeInternalKey(nil, []byte(key), keyMaxSeq, keyTypeVal)
281 iter := db.newRawIterator(nil, nil, nil, nil)
282 if !iter.Seek(ikey) && iter.Error() != nil {
283 t.Error("AllEntries: error during seek, err: ", iter.Error())
284 return
285 }
286 res := "[ "
287 first := true
288 for iter.Valid() {
289 if ukey, _, kt, kerr := parseInternalKey(iter.Key()); kerr == nil {
290 if s.icmp.uCompare(ikey.ukey(), ukey) != 0 {
291 break
292 }
293 if !first {
294 res += ", "
295 }
296 first = false
297 switch kt {
298 case keyTypeVal:
299 res += string(iter.Value())
300 case keyTypeDel:
301 res += "DEL"
302 }
303 } else {
304 if !first {
305 res += ", "
306 }
307 first = false
308 res += "CORRUPTED"
309 }
310 iter.Next()
311 }
312 if !first {
313 res += " "
314 }
315 res += "]"
316 if res != want {
317 t.Errorf("AllEntries: assert failed for key %q, got=%q want=%q", key, res, want)
318 }
319 }
320
321
322
323 func (h *dbHarness) getKeyVal(want string) {
324 t := h.t
325 db := h.db
326
327 s, err := db.GetSnapshot()
328 if err != nil {
329 t.Fatal("GetSnapshot: got error: ", err)
330 }
331 res := ""
332 iter := s.NewIterator(nil, nil)
333 for iter.Next() {
334 res += fmt.Sprintf("(%s->%s)", string(iter.Key()), string(iter.Value()))
335 }
336 iter.Release()
337
338 if res != want {
339 t.Errorf("GetKeyVal: invalid key/value pair, got=%q want=%q", res, want)
340 }
341 s.Release()
342 }
343
344 func (h *dbHarness) waitCompaction() {
345 t := h.t
346 db := h.db
347 if err := db.compTriggerWait(db.tcompCmdC); err != nil {
348 t.Error("compaction error: ", err)
349 }
350 }
351
352 func (h *dbHarness) waitMemCompaction() {
353 t := h.t
354 db := h.db
355
356 if err := db.compTriggerWait(db.mcompCmdC); err != nil {
357 t.Error("compaction error: ", err)
358 }
359 }
360
361 func (h *dbHarness) compactMem() {
362 t := h.t
363 db := h.db
364
365 t.Log("starting memdb compaction")
366
367 db.writeLockC <- struct{}{}
368 defer func() {
369 <-db.writeLockC
370 }()
371
372 if _, err := db.rotateMem(0, true); err != nil {
373 t.Error("compaction error: ", err)
374 }
375
376 if h.totalTables() == 0 {
377 t.Error("zero tables after mem compaction")
378 }
379
380 t.Log("memdb compaction done")
381 }
382
383 func (h *dbHarness) compactRangeAtErr(level int, min, max string, wanterr bool) {
384 t := h.t
385 db := h.db
386
387 var _min, _max []byte
388 if min != "" {
389 _min = []byte(min)
390 }
391 if max != "" {
392 _max = []byte(max)
393 }
394
395 t.Logf("starting table range compaction: level=%d, min=%q, max=%q", level, min, max)
396
397 if err := db.compTriggerRange(db.tcompCmdC, level, _min, _max); err != nil {
398 if wanterr {
399 t.Log("CompactRangeAt: got error (expected): ", err)
400 } else {
401 t.Error("CompactRangeAt: got error: ", err)
402 }
403 } else if wanterr {
404 t.Error("CompactRangeAt: expect error")
405 }
406
407 t.Log("table range compaction done")
408 }
409
410 func (h *dbHarness) compactRangeAt(level int, min, max string) {
411 h.compactRangeAtErr(level, min, max, false)
412 }
413
414 func (h *dbHarness) compactRange(min, max string) {
415 t := h.t
416 db := h.db
417
418 t.Logf("starting DB range compaction: min=%q, max=%q", min, max)
419
420 var r util.Range
421 if min != "" {
422 r.Start = []byte(min)
423 }
424 if max != "" {
425 r.Limit = []byte(max)
426 }
427 if err := db.CompactRange(r); err != nil {
428 t.Error("CompactRange: got error: ", err)
429 }
430
431 t.Log("DB range compaction done")
432 }
433
434 func (h *dbHarness) sizeOf(start, limit string) int64 {
435 sz, err := h.db.SizeOf([]util.Range{
436 {Start: []byte(start), Limit: []byte(limit)},
437 })
438 if err != nil {
439 h.t.Error("SizeOf: got error: ", err)
440 }
441 return sz.Sum()
442 }
443
444 func (h *dbHarness) sizeAssert(start, limit string, low, hi int64) {
445 sz := h.sizeOf(start, limit)
446 if sz < low || sz > hi {
447 h.t.Errorf("sizeOf %q to %q not in range, want %d - %d, got %d",
448 shorten(start), shorten(limit), low, hi, sz)
449 }
450 }
451
452 func (h *dbHarness) getSnapshot() (s *Snapshot) {
453 s, err := h.db.GetSnapshot()
454 if err != nil {
455 h.t.Fatal("GetSnapshot: got error: ", err)
456 }
457 return
458 }
459
460 func (h *dbHarness) getTablesPerLevel() string {
461 res := ""
462 nz := 0
463 v := h.db.s.version()
464 for level, tables := range v.levels {
465 if level > 0 {
466 res += ","
467 }
468 res += fmt.Sprint(len(tables))
469 if len(tables) > 0 {
470 nz = len(res)
471 }
472 }
473 v.release()
474 return res[:nz]
475 }
476
477 func (h *dbHarness) tablesPerLevel(want string) {
478 res := h.getTablesPerLevel()
479 if res != want {
480 h.t.Errorf("invalid tables len, want=%s, got=%s", want, res)
481 }
482 }
483
484 func (h *dbHarness) totalTables() (n int) {
485 v := h.db.s.version()
486 for _, tables := range v.levels {
487 n += len(tables)
488 }
489 v.release()
490 return
491 }
492
493 type keyValue interface {
494 Key() []byte
495 Value() []byte
496 }
497
498 func testKeyVal(t *testing.T, kv keyValue, want string) {
499 res := string(kv.Key()) + "->" + string(kv.Value())
500 if res != want {
501 t.Errorf("invalid key/value, want=%q, got=%q", want, res)
502 }
503 }
504
505 func numKey(num int) string {
506 return fmt.Sprintf("key%06d", num)
507 }
508
509 var testingBloomFilter = filter.NewBloomFilter(10)
510
511 func truno(t *testing.T, o *opt.Options, f func(h *dbHarness)) {
512 for i := 0; i < 4; i++ {
513 func() {
514 switch i {
515 case 0:
516 case 1:
517 if o == nil {
518 o = &opt.Options{
519 DisableLargeBatchTransaction: true,
520 Filter: testingBloomFilter,
521 }
522 } else {
523 old := o
524 o = &opt.Options{}
525 *o = *old
526 o.Filter = testingBloomFilter
527 }
528 case 2:
529 if o == nil {
530 o = &opt.Options{
531 DisableLargeBatchTransaction: true,
532 Compression: opt.NoCompression,
533 }
534 } else {
535 old := o
536 o = &opt.Options{}
537 *o = *old
538 o.Compression = opt.NoCompression
539 }
540 }
541 h := newDbHarnessWopt(t, o)
542 defer h.close()
543 if i == 3 {
544 h.reopenDB()
545 }
546 f(h)
547 }()
548 }
549 }
550
551 func trun(t *testing.T, f func(h *dbHarness)) {
552 truno(t, nil, f)
553 }
554
555 func testAligned(t *testing.T, name string, offset uintptr) {
556 if offset%8 != 0 {
557 t.Errorf("field %s offset is not 64-bit aligned", name)
558 }
559 }
560
561 func Test_FieldsAligned(t *testing.T) {
562 p1 := new(DB)
563 testAligned(t, "DB.seq", unsafe.Offsetof(p1.seq))
564 p2 := new(session)
565 testAligned(t, "session.stNextFileNum", unsafe.Offsetof(p2.stNextFileNum))
566 testAligned(t, "session.stJournalNum", unsafe.Offsetof(p2.stJournalNum))
567 testAligned(t, "session.stPrevJournalNum", unsafe.Offsetof(p2.stPrevJournalNum))
568 testAligned(t, "session.stSeqNum", unsafe.Offsetof(p2.stSeqNum))
569 }
570
571 func TestDB_Locking(t *testing.T) {
572 h := newDbHarness(t)
573 defer h.stor.Close()
574 h.openAssert(false)
575 h.closeDB()
576 h.openAssert(true)
577 }
578
579 func TestDB_Empty(t *testing.T) {
580 trun(t, func(h *dbHarness) {
581 h.get("foo", false)
582
583 h.reopenDB()
584 h.get("foo", false)
585 })
586 }
587
588 func TestDB_ReadWrite(t *testing.T) {
589 trun(t, func(h *dbHarness) {
590 h.put("foo", "v1")
591 h.getVal("foo", "v1")
592 h.put("bar", "v2")
593 h.put("foo", "v3")
594 h.getVal("foo", "v3")
595 h.getVal("bar", "v2")
596
597 h.reopenDB()
598 h.getVal("foo", "v3")
599 h.getVal("bar", "v2")
600 })
601 }
602
603 func TestDB_PutDeleteGet(t *testing.T) {
604 trun(t, func(h *dbHarness) {
605 h.put("foo", "v1")
606 h.getVal("foo", "v1")
607 h.put("foo", "v2")
608 h.getVal("foo", "v2")
609 h.delete("foo")
610 h.get("foo", false)
611
612 h.reopenDB()
613 h.get("foo", false)
614 })
615 }
616
617 func TestDB_EmptyBatch(t *testing.T) {
618 h := newDbHarness(t)
619 defer h.close()
620
621 h.get("foo", false)
622 err := h.db.Write(new(Batch), h.wo)
623 if err != nil {
624 t.Error("writing empty batch yield error: ", err)
625 }
626 h.get("foo", false)
627 }
628
629 func TestDB_GetFromFrozen(t *testing.T) {
630 h := newDbHarnessWopt(t, &opt.Options{
631 DisableLargeBatchTransaction: true,
632 WriteBuffer: 100100,
633 })
634 defer h.close()
635
636 h.put("foo", "v1")
637 h.getVal("foo", "v1")
638
639 h.stor.Stall(testutil.ModeSync, storage.TypeTable)
640 h.put("k1", strings.Repeat("x", 100000))
641 h.put("k2", strings.Repeat("y", 100000))
642 for i := 0; h.db.getFrozenMem() == nil && i < 100; i++ {
643 time.Sleep(10 * time.Microsecond)
644 }
645 if h.db.getFrozenMem() == nil {
646 h.stor.Release(testutil.ModeSync, storage.TypeTable)
647 t.Fatal("No frozen mem")
648 }
649 h.getVal("foo", "v1")
650 h.stor.Release(testutil.ModeSync, storage.TypeTable)
651
652 h.reopenDB()
653 h.getVal("foo", "v1")
654 h.get("k1", true)
655 h.get("k2", true)
656 }
657
658 func TestDB_GetFromTable(t *testing.T) {
659 trun(t, func(h *dbHarness) {
660 h.put("foo", "v1")
661 h.compactMem()
662 h.getVal("foo", "v1")
663 })
664 }
665
666 func TestDB_GetSnapshot(t *testing.T) {
667 trun(t, func(h *dbHarness) {
668 bar := strings.Repeat("b", 200)
669 h.put("foo", "v1")
670 h.put(bar, "v1")
671
672 snap, err := h.db.GetSnapshot()
673 if err != nil {
674 t.Fatal("GetSnapshot: got error: ", err)
675 }
676
677 h.put("foo", "v2")
678 h.put(bar, "v2")
679
680 h.getVal("foo", "v2")
681 h.getVal(bar, "v2")
682 h.getValr(snap, "foo", "v1")
683 h.getValr(snap, bar, "v1")
684
685 h.compactMem()
686
687 h.getVal("foo", "v2")
688 h.getVal(bar, "v2")
689 h.getValr(snap, "foo", "v1")
690 h.getValr(snap, bar, "v1")
691
692 snap.Release()
693
694 h.reopenDB()
695 h.getVal("foo", "v2")
696 h.getVal(bar, "v2")
697 })
698 }
699
700 func TestDB_GetLevel0Ordering(t *testing.T) {
701 trun(t, func(h *dbHarness) {
702 h.db.memdbMaxLevel = 2
703
704 for i := 0; i < 4; i++ {
705 h.put("bar", fmt.Sprintf("b%d", i))
706 h.put("foo", fmt.Sprintf("v%d", i))
707 h.compactMem()
708 }
709 h.getVal("foo", "v3")
710 h.getVal("bar", "b3")
711
712 v := h.db.s.version()
713 t0len := v.tLen(0)
714 v.release()
715 if t0len < 2 {
716 t.Errorf("level-0 tables is less than 2, got %d", t0len)
717 }
718
719 h.reopenDB()
720 h.getVal("foo", "v3")
721 h.getVal("bar", "b3")
722 })
723 }
724
725 func TestDB_GetOrderedByLevels(t *testing.T) {
726 trun(t, func(h *dbHarness) {
727 h.put("foo", "v1")
728 h.compactMem()
729 h.compactRange("a", "z")
730 h.getVal("foo", "v1")
731 h.put("foo", "v2")
732 h.compactMem()
733 h.getVal("foo", "v2")
734 })
735 }
736
737 func TestDB_GetPicksCorrectFile(t *testing.T) {
738 trun(t, func(h *dbHarness) {
739
740 h.put("a", "va")
741 h.compactMem()
742 h.compactRange("a", "b")
743 h.put("x", "vx")
744 h.compactMem()
745 h.compactRange("x", "y")
746 h.put("f", "vf")
747 h.compactMem()
748 h.compactRange("f", "g")
749
750 h.getVal("a", "va")
751 h.getVal("f", "vf")
752 h.getVal("x", "vx")
753
754 h.compactRange("", "")
755 h.getVal("a", "va")
756 h.getVal("f", "vf")
757 h.getVal("x", "vx")
758 })
759 }
760
761 func TestDB_GetEncountersEmptyLevel(t *testing.T) {
762 trun(t, func(h *dbHarness) {
763 h.db.memdbMaxLevel = 2
764
765
766
767
768
769
770
771
772
773
774 for i := 0; ; i++ {
775 if i >= 100 {
776 t.Fatal("could not fill levels-0 and level-2")
777 }
778 v := h.db.s.version()
779 if v.tLen(0) > 0 && v.tLen(2) > 0 {
780 v.release()
781 break
782 }
783 v.release()
784 h.put("a", "begin")
785 h.put("z", "end")
786 h.compactMem()
787
788 h.getVal("a", "begin")
789 h.getVal("z", "end")
790 }
791
792
793 h.compactRangeAt(1, "", "")
794 h.tablesPerLevel("1,0,1")
795
796 h.getVal("a", "begin")
797 h.getVal("z", "end")
798
799
800 for i := 0; i < 200; i++ {
801 h.get("missing", false)
802 }
803
804
805 h.waitCompaction()
806
807 v := h.db.s.version()
808 if v.tLen(0) > 0 {
809 t.Errorf("level-0 tables more than 0, got %d", v.tLen(0))
810 }
811 v.release()
812
813 h.getVal("a", "begin")
814 h.getVal("z", "end")
815 })
816 }
817
818 func TestDB_IterMultiWithDelete(t *testing.T) {
819 trun(t, func(h *dbHarness) {
820 h.put("a", "va")
821 h.put("b", "vb")
822 h.put("c", "vc")
823 h.delete("b")
824 h.get("b", false)
825
826 iter := h.db.NewIterator(nil, nil)
827 iter.Seek([]byte("c"))
828 testKeyVal(t, iter, "c->vc")
829 iter.Prev()
830 testKeyVal(t, iter, "a->va")
831 iter.Release()
832
833 h.compactMem()
834
835 iter = h.db.NewIterator(nil, nil)
836 iter.Seek([]byte("c"))
837 testKeyVal(t, iter, "c->vc")
838 iter.Prev()
839 testKeyVal(t, iter, "a->va")
840 iter.Release()
841 })
842 }
843
844 func TestDB_IteratorPinsRef(t *testing.T) {
845 h := newDbHarness(t)
846 defer h.close()
847
848 h.put("foo", "hello")
849
850
851 iter := h.db.NewIterator(nil, nil)
852
853
854 h.put("foo", "newvalue1")
855 for i := 0; i < 100; i++ {
856 h.put(numKey(i), strings.Repeat(fmt.Sprintf("v%09d", i), 100000/10))
857 }
858 h.put("foo", "newvalue2")
859
860 iter.First()
861 testKeyVal(t, iter, "foo->hello")
862 if iter.Next() {
863 t.Errorf("expect eof")
864 }
865 iter.Release()
866 }
867
868 func TestDB_Recover(t *testing.T) {
869 trun(t, func(h *dbHarness) {
870 h.put("foo", "v1")
871 h.put("baz", "v5")
872
873 h.reopenDB()
874 h.getVal("foo", "v1")
875
876 h.getVal("foo", "v1")
877 h.getVal("baz", "v5")
878 h.put("bar", "v2")
879 h.put("foo", "v3")
880
881 h.reopenDB()
882 h.getVal("foo", "v3")
883 h.put("foo", "v4")
884 h.getVal("foo", "v4")
885 h.getVal("bar", "v2")
886 h.getVal("baz", "v5")
887 })
888 }
889
890 func TestDB_RecoverWithEmptyJournal(t *testing.T) {
891 trun(t, func(h *dbHarness) {
892 h.put("foo", "v1")
893 h.put("foo", "v2")
894
895 h.reopenDB()
896 h.reopenDB()
897 h.put("foo", "v3")
898
899 h.reopenDB()
900 h.getVal("foo", "v3")
901 })
902 }
903
904 func TestDB_RecoverDuringMemtableCompaction(t *testing.T) {
905 truno(t, &opt.Options{DisableLargeBatchTransaction: true, WriteBuffer: 1000000}, func(h *dbHarness) {
906
907 h.stor.Stall(testutil.ModeSync, storage.TypeTable)
908 h.put("big1", strings.Repeat("x", 10000000))
909 h.put("big2", strings.Repeat("y", 1000))
910 h.put("bar", "v2")
911 h.stor.Release(testutil.ModeSync, storage.TypeTable)
912
913 h.reopenDB()
914 h.getVal("bar", "v2")
915 h.getVal("big1", strings.Repeat("x", 10000000))
916 h.getVal("big2", strings.Repeat("y", 1000))
917 })
918 }
919
920 func TestDB_MinorCompactionsHappen(t *testing.T) {
921 h := newDbHarnessWopt(t, &opt.Options{DisableLargeBatchTransaction: true, WriteBuffer: 10000})
922 defer h.close()
923
924 n := 500
925
926 key := func(i int) string {
927 return fmt.Sprintf("key%06d", i)
928 }
929
930 for i := 0; i < n; i++ {
931 h.put(key(i), key(i)+strings.Repeat("v", 1000))
932 }
933
934 for i := 0; i < n; i++ {
935 h.getVal(key(i), key(i)+strings.Repeat("v", 1000))
936 }
937
938 h.reopenDB()
939 for i := 0; i < n; i++ {
940 h.getVal(key(i), key(i)+strings.Repeat("v", 1000))
941 }
942 }
943
944 func TestDB_RecoverWithLargeJournal(t *testing.T) {
945 h := newDbHarness(t)
946 defer h.close()
947
948 h.put("big1", strings.Repeat("1", 200000))
949 h.put("big2", strings.Repeat("2", 200000))
950 h.put("small3", strings.Repeat("3", 10))
951 h.put("small4", strings.Repeat("4", 10))
952 h.tablesPerLevel("")
953
954
955
956 h.o.WriteBuffer = 100000
957 h.reopenDB()
958 h.getVal("big1", strings.Repeat("1", 200000))
959 h.getVal("big2", strings.Repeat("2", 200000))
960 h.getVal("small3", strings.Repeat("3", 10))
961 h.getVal("small4", strings.Repeat("4", 10))
962 v := h.db.s.version()
963 if v.tLen(0) <= 1 {
964 t.Errorf("tables-0 less than one")
965 }
966 v.release()
967 }
968
969 func TestDB_CompactionsGenerateMultipleFiles(t *testing.T) {
970 h := newDbHarnessWopt(t, &opt.Options{
971 DisableLargeBatchTransaction: true,
972 WriteBuffer: 10000000,
973 Compression: opt.NoCompression,
974 })
975 defer h.close()
976
977 v := h.db.s.version()
978 if v.tLen(0) > 0 {
979 t.Errorf("level-0 tables more than 0, got %d", v.tLen(0))
980 }
981 v.release()
982
983 n := 80
984
985
986 for i := 0; i < n; i++ {
987 h.put(numKey(i), strings.Repeat(fmt.Sprintf("v%09d", i), 100000/10))
988 }
989
990
991 h.reopenDB()
992 h.compactRangeAt(0, "", "")
993
994 v = h.db.s.version()
995 if v.tLen(0) > 0 {
996 t.Errorf("level-0 tables more than 0, got %d", v.tLen(0))
997 }
998 if v.tLen(1) <= 1 {
999 t.Errorf("level-1 tables less than 1, got %d", v.tLen(1))
1000 }
1001 v.release()
1002
1003 for i := 0; i < n; i++ {
1004 h.getVal(numKey(i), strings.Repeat(fmt.Sprintf("v%09d", i), 100000/10))
1005 }
1006 }
1007
1008 func TestDB_RepeatedWritesToSameKey(t *testing.T) {
1009 h := newDbHarnessWopt(t, &opt.Options{DisableLargeBatchTransaction: true, WriteBuffer: 100000})
1010 defer h.close()
1011
1012 maxTables := h.o.GetWriteL0PauseTrigger() + 7
1013
1014 value := strings.Repeat("v", 2*h.o.GetWriteBuffer())
1015 for i := 0; i < 5*maxTables; i++ {
1016 h.put("key", value)
1017 n := h.totalTables()
1018 if n > maxTables {
1019 t.Errorf("total tables exceed %d, got=%d, iter=%d", maxTables, n, i)
1020 }
1021 }
1022 }
1023
1024 func TestDB_RepeatedWritesToSameKeyAfterReopen(t *testing.T) {
1025 h := newDbHarnessWopt(t, &opt.Options{
1026 DisableLargeBatchTransaction: true,
1027 WriteBuffer: 100000,
1028 })
1029 defer h.close()
1030
1031 h.reopenDB()
1032
1033 maxTables := h.o.GetWriteL0PauseTrigger() + 7
1034
1035 value := strings.Repeat("v", 2*h.o.GetWriteBuffer())
1036 for i := 0; i < 5*maxTables; i++ {
1037 h.put("key", value)
1038 n := h.totalTables()
1039 if n > maxTables {
1040 t.Errorf("total tables exceed %d, got=%d, iter=%d", maxTables, n, i)
1041 }
1042 }
1043 }
1044
1045 func TestDB_SparseMerge(t *testing.T) {
1046 h := newDbHarnessWopt(t, &opt.Options{DisableLargeBatchTransaction: true, Compression: opt.NoCompression})
1047 defer h.close()
1048
1049 h.putMulti(7, "A", "Z")
1050
1051
1052
1053
1054
1055
1056
1057 h.put("A", "va")
1058 value := strings.Repeat("x", 1000)
1059 for i := 0; i < 100000; i++ {
1060 h.put(fmt.Sprintf("B%010d", i), value)
1061 }
1062 h.put("C", "vc")
1063 h.compactMem()
1064 h.compactRangeAt(0, "", "")
1065 h.waitCompaction()
1066
1067
1068 h.put("A", "va2")
1069 h.put("B100", "bvalue2")
1070 h.put("C", "vc2")
1071 h.compactMem()
1072
1073 h.waitCompaction()
1074 h.maxNextLevelOverlappingBytes(20 * 1048576)
1075 h.compactRangeAt(0, "", "")
1076 h.waitCompaction()
1077 h.maxNextLevelOverlappingBytes(20 * 1048576)
1078 h.compactRangeAt(1, "", "")
1079 h.waitCompaction()
1080 h.maxNextLevelOverlappingBytes(20 * 1048576)
1081 }
1082
1083 func TestDB_SizeOf(t *testing.T) {
1084 h := newDbHarnessWopt(t, &opt.Options{
1085 DisableLargeBatchTransaction: true,
1086 Compression: opt.NoCompression,
1087 WriteBuffer: 10000000,
1088 })
1089 defer h.close()
1090
1091 h.sizeAssert("", "xyz", 0, 0)
1092 h.reopenDB()
1093 h.sizeAssert("", "xyz", 0, 0)
1094
1095
1096 n := 80
1097 s1 := 100000
1098 s2 := 105000
1099
1100 for i := 0; i < n; i++ {
1101 h.put(numKey(i), strings.Repeat(fmt.Sprintf("v%09d", i), s1/10))
1102 }
1103
1104
1105 h.sizeAssert("", numKey(50), 0, 0)
1106
1107 for r := 0; r < 3; r++ {
1108 h.reopenDB()
1109
1110 for cs := 0; cs < n; cs += 10 {
1111 for i := 0; i < n; i += 10 {
1112 h.sizeAssert("", numKey(i), int64(s1*i), int64(s2*i))
1113 h.sizeAssert("", numKey(i)+".suffix", int64(s1*(i+1)), int64(s2*(i+1)))
1114 h.sizeAssert(numKey(i), numKey(i+10), int64(s1*10), int64(s2*10))
1115 }
1116
1117 h.sizeAssert("", numKey(50), int64(s1*50), int64(s2*50))
1118 h.sizeAssert("", numKey(50)+".suffix", int64(s1*50), int64(s2*50))
1119
1120 h.compactRangeAt(0, numKey(cs), numKey(cs+9))
1121 }
1122
1123 v := h.db.s.version()
1124 if v.tLen(0) != 0 {
1125 t.Errorf("level-0 tables was not zero, got %d", v.tLen(0))
1126 }
1127 if v.tLen(1) == 0 {
1128 t.Error("level-1 tables was zero")
1129 }
1130 v.release()
1131 }
1132 }
1133
1134 func TestDB_SizeOf_MixOfSmallAndLarge(t *testing.T) {
1135 h := newDbHarnessWopt(t, &opt.Options{
1136 DisableLargeBatchTransaction: true,
1137 Compression: opt.NoCompression,
1138 })
1139 defer h.close()
1140
1141 sizes := []int64{
1142 10000,
1143 10000,
1144 100000,
1145 10000,
1146 100000,
1147 10000,
1148 300000,
1149 10000,
1150 }
1151
1152 for i, n := range sizes {
1153 h.put(numKey(i), strings.Repeat(fmt.Sprintf("v%09d", i), int(n)/10))
1154 }
1155
1156 for r := 0; r < 3; r++ {
1157 h.reopenDB()
1158
1159 var x int64
1160 for i, n := range sizes {
1161 y := x
1162 if i > 0 {
1163 y += 1000
1164 }
1165 h.sizeAssert("", numKey(i), x, y)
1166 x += n
1167 }
1168
1169 h.sizeAssert(numKey(3), numKey(5), 110000, 111000)
1170
1171 h.compactRangeAt(0, "", "")
1172 }
1173 }
1174
1175 func TestDB_Snapshot(t *testing.T) {
1176 trun(t, func(h *dbHarness) {
1177 h.put("foo", "v1")
1178 s1 := h.getSnapshot()
1179 h.put("foo", "v2")
1180 s2 := h.getSnapshot()
1181 h.put("foo", "v3")
1182 s3 := h.getSnapshot()
1183 h.put("foo", "v4")
1184
1185 h.getValr(s1, "foo", "v1")
1186 h.getValr(s2, "foo", "v2")
1187 h.getValr(s3, "foo", "v3")
1188 h.getVal("foo", "v4")
1189
1190 s3.Release()
1191 h.getValr(s1, "foo", "v1")
1192 h.getValr(s2, "foo", "v2")
1193 h.getVal("foo", "v4")
1194
1195 s1.Release()
1196 h.getValr(s2, "foo", "v2")
1197 h.getVal("foo", "v4")
1198
1199 s2.Release()
1200 h.getVal("foo", "v4")
1201 })
1202 }
1203
1204 func TestDB_SnapshotList(t *testing.T) {
1205 db := &DB{snapsList: list.New()}
1206 e0a := db.acquireSnapshot()
1207 e0b := db.acquireSnapshot()
1208 db.seq = 1
1209 e1 := db.acquireSnapshot()
1210 db.seq = 2
1211 e2 := db.acquireSnapshot()
1212
1213 if db.minSeq() != 0 {
1214 t.Fatalf("invalid sequence number, got=%d", db.minSeq())
1215 }
1216 db.releaseSnapshot(e0a)
1217 if db.minSeq() != 0 {
1218 t.Fatalf("invalid sequence number, got=%d", db.minSeq())
1219 }
1220 db.releaseSnapshot(e2)
1221 if db.minSeq() != 0 {
1222 t.Fatalf("invalid sequence number, got=%d", db.minSeq())
1223 }
1224 db.releaseSnapshot(e0b)
1225 if db.minSeq() != 1 {
1226 t.Fatalf("invalid sequence number, got=%d", db.minSeq())
1227 }
1228 e2 = db.acquireSnapshot()
1229 if db.minSeq() != 1 {
1230 t.Fatalf("invalid sequence number, got=%d", db.minSeq())
1231 }
1232 db.releaseSnapshot(e1)
1233 if db.minSeq() != 2 {
1234 t.Fatalf("invalid sequence number, got=%d", db.minSeq())
1235 }
1236 db.releaseSnapshot(e2)
1237 if db.minSeq() != 2 {
1238 t.Fatalf("invalid sequence number, got=%d", db.minSeq())
1239 }
1240 }
1241
1242 func TestDB_HiddenValuesAreRemoved(t *testing.T) {
1243 trun(t, func(h *dbHarness) {
1244 s := h.db.s
1245
1246 m := 2
1247 h.db.memdbMaxLevel = m
1248
1249 h.put("foo", "v1")
1250 h.compactMem()
1251 v := s.version()
1252 num := v.tLen(m)
1253 v.release()
1254 if num != 1 {
1255 t.Errorf("invalid level-%d len, want=1 got=%d", m, num)
1256 }
1257
1258
1259 h.put("a", "begin")
1260 h.put("z", "end")
1261 h.compactMem()
1262 v = s.version()
1263 if v.tLen(m) != 1 {
1264 t.Errorf("invalid level-%d len, want=1 got=%d", m, v.tLen(m))
1265 }
1266 if v.tLen(m-1) != 1 {
1267 t.Errorf("invalid level-%d len, want=1 got=%d", m-1, v.tLen(m-1))
1268 }
1269 v.release()
1270
1271 h.delete("foo")
1272 h.put("foo", "v2")
1273 h.allEntriesFor("foo", "[ v2, DEL, v1 ]")
1274 h.compactMem()
1275 h.allEntriesFor("foo", "[ v2, DEL, v1 ]")
1276 h.compactRangeAt(m-2, "", "z")
1277
1278
1279 h.allEntriesFor("foo", "[ v2, v1 ]")
1280 h.compactRangeAt(m-1, "", "")
1281
1282
1283 h.allEntriesFor("foo", "[ v2 ]")
1284 })
1285 }
1286
1287 func TestDB_DeletionMarkers2(t *testing.T) {
1288 h := newDbHarness(t)
1289 defer h.close()
1290 s := h.db.s
1291
1292 m := 2
1293 h.db.memdbMaxLevel = m
1294
1295 h.put("foo", "v1")
1296 h.compactMem()
1297 v := s.version()
1298 num := v.tLen(m)
1299 v.release()
1300 if num != 1 {
1301 t.Errorf("invalid level-%d len, want=1 got=%d", m, num)
1302 }
1303
1304
1305 h.put("a", "begin")
1306 h.put("z", "end")
1307 h.compactMem()
1308 v = s.version()
1309 if v.tLen(m) != 1 {
1310 t.Errorf("invalid level-%d len, want=1 got=%d", m, v.tLen(m))
1311 }
1312 if v.tLen(m-1) != 1 {
1313 t.Errorf("invalid level-%d len, want=1 got=%d", m-1, v.tLen(m-1))
1314 }
1315 v.release()
1316
1317 h.delete("foo")
1318 h.allEntriesFor("foo", "[ DEL, v1 ]")
1319 h.compactMem()
1320 h.allEntriesFor("foo", "[ DEL, v1 ]")
1321 h.compactRangeAt(m-2, "", "")
1322
1323 h.allEntriesFor("foo", "[ DEL, v1 ]")
1324 h.compactRangeAt(m-1, "", "")
1325
1326
1327 h.allEntriesFor("foo", "[ ]")
1328 }
1329
1330 func TestDB_CompactionTableOpenError(t *testing.T) {
1331 h := newDbHarnessWopt(t, &opt.Options{
1332 DisableLargeBatchTransaction: true,
1333 OpenFilesCacheCapacity: -1,
1334 })
1335 defer h.close()
1336
1337 h.db.memdbMaxLevel = 2
1338
1339 im := 10
1340 jm := 10
1341 for r := 0; r < 2; r++ {
1342 for i := 0; i < im; i++ {
1343 for j := 0; j < jm; j++ {
1344 h.put(fmt.Sprintf("k%d,%d", i, j), fmt.Sprintf("v%d,%d", i, j))
1345 }
1346 h.compactMem()
1347 }
1348 }
1349
1350 if n := h.totalTables(); n != im*2 {
1351 t.Errorf("total tables is %d, want %d", n, im*2)
1352 }
1353
1354 h.stor.EmulateError(testutil.ModeOpen, storage.TypeTable, errors.New("open error during table compaction"))
1355 go func() {
1356 if err := h.db.CompactRange(util.Range{}); err != nil && err != ErrClosed {
1357 t.Error("CompactRange error: ", err)
1358 }
1359 }()
1360 if err := h.db.compTriggerWait(h.db.tcompCmdC); err != nil {
1361 t.Log("compaction error: ", err)
1362 }
1363 if err := h.closeDB0(); err != nil {
1364 t.Error("Close: got error: ", err)
1365 }
1366 h.openDB()
1367 h.stor.EmulateError(testutil.ModeOpen, storage.TypeTable, nil)
1368
1369 for i := 0; i < im; i++ {
1370 for j := 0; j < jm; j++ {
1371 h.getVal(fmt.Sprintf("k%d,%d", i, j), fmt.Sprintf("v%d,%d", i, j))
1372 }
1373 }
1374 }
1375
1376 func TestDB_OverlapInLevel0(t *testing.T) {
1377 trun(t, func(h *dbHarness) {
1378 h.db.memdbMaxLevel = 2
1379
1380
1381 h.put("100", "v100")
1382 h.put("999", "v999")
1383 h.compactMem()
1384 h.delete("100")
1385 h.delete("999")
1386 h.compactMem()
1387 h.tablesPerLevel("0,1,1")
1388
1389
1390
1391
1392
1393 h.put("300", "v300")
1394 h.put("500", "v500")
1395 h.compactMem()
1396 h.put("200", "v200")
1397 h.put("600", "v600")
1398 h.put("900", "v900")
1399 h.compactMem()
1400 h.tablesPerLevel("2,1,1")
1401
1402
1403 h.compactRangeAt(1, "", "")
1404 h.compactRangeAt(2, "", "")
1405 h.tablesPerLevel("2")
1406
1407
1408
1409
1410 h.delete("600")
1411 h.compactMem()
1412 h.tablesPerLevel("3")
1413 h.get("600", false)
1414 })
1415 }
1416
1417 func TestDB_L0_CompactionBug_Issue44_a(t *testing.T) {
1418 h := newDbHarness(t)
1419 defer h.close()
1420
1421 h.reopenDB()
1422 h.put("b", "v")
1423 h.reopenDB()
1424 h.delete("b")
1425 h.delete("a")
1426 h.reopenDB()
1427 h.delete("a")
1428 h.reopenDB()
1429 h.put("a", "v")
1430 h.reopenDB()
1431 h.reopenDB()
1432 h.getKeyVal("(a->v)")
1433 h.waitCompaction()
1434 h.getKeyVal("(a->v)")
1435 }
1436
1437 func TestDB_L0_CompactionBug_Issue44_b(t *testing.T) {
1438 h := newDbHarness(t)
1439 defer h.close()
1440
1441 h.reopenDB()
1442 h.put("", "")
1443 h.reopenDB()
1444 h.delete("e")
1445 h.put("", "")
1446 h.reopenDB()
1447 h.put("c", "cv")
1448 h.reopenDB()
1449 h.put("", "")
1450 h.reopenDB()
1451 h.put("", "")
1452 h.waitCompaction()
1453 h.reopenDB()
1454 h.put("d", "dv")
1455 h.reopenDB()
1456 h.put("", "")
1457 h.reopenDB()
1458 h.delete("d")
1459 h.delete("b")
1460 h.reopenDB()
1461 h.getKeyVal("(->)(c->cv)")
1462 h.waitCompaction()
1463 h.getKeyVal("(->)(c->cv)")
1464 }
1465
1466 func TestDB_SingleEntryMemCompaction(t *testing.T) {
1467 trun(t, func(h *dbHarness) {
1468 for i := 0; i < 10; i++ {
1469 h.put("big", strings.Repeat("v", opt.DefaultWriteBuffer))
1470 h.compactMem()
1471 h.put("key", strings.Repeat("v", opt.DefaultBlockSize))
1472 h.compactMem()
1473 h.put("k", "v")
1474 h.compactMem()
1475 h.put("", "")
1476 h.compactMem()
1477 h.put("verybig", strings.Repeat("v", opt.DefaultWriteBuffer*2))
1478 h.compactMem()
1479 }
1480 })
1481 }
1482
1483 func TestDB_ManifestWriteError(t *testing.T) {
1484 for i := 0; i < 2; i++ {
1485 func() {
1486 h := newDbHarness(t)
1487 defer h.close()
1488
1489 h.put("foo", "bar")
1490 h.getVal("foo", "bar")
1491
1492
1493 h.compactMem()
1494 h.getVal("foo", "bar")
1495 v := h.db.s.version()
1496 if n := v.tLen(0); n != 1 {
1497 t.Errorf("invalid total tables, want=1 got=%d", n)
1498 }
1499 v.release()
1500
1501 if i == 0 {
1502 h.stor.EmulateError(testutil.ModeWrite, storage.TypeManifest, errors.New("manifest write error"))
1503 } else {
1504 h.stor.EmulateError(testutil.ModeSync, storage.TypeManifest, errors.New("manifest sync error"))
1505 }
1506
1507
1508 h.compactRangeAtErr(0, "", "", true)
1509
1510 h.db.Close()
1511 h.stor.EmulateError(testutil.ModeWrite, storage.TypeManifest, nil)
1512 h.stor.EmulateError(testutil.ModeSync, storage.TypeManifest, nil)
1513
1514
1515 h.openDB()
1516 h.getVal("foo", "bar")
1517 }()
1518 }
1519 }
1520
1521 func assertErr(t *testing.T, err error, wanterr bool) {
1522 if err != nil {
1523 if wanterr {
1524 t.Log("AssertErr: got error (expected): ", err)
1525 } else {
1526 t.Error("AssertErr: got error: ", err)
1527 }
1528 } else if wanterr {
1529 t.Error("AssertErr: expect error")
1530 }
1531 }
1532
1533 func TestDB_ClosedIsClosed(t *testing.T) {
1534 h := newDbHarness(t)
1535 db := h.db
1536
1537 var iter, iter2 iterator.Iterator
1538 var snap *Snapshot
1539 func() {
1540 defer h.close()
1541
1542 h.put("k", "v")
1543 h.getVal("k", "v")
1544
1545 iter = db.NewIterator(nil, h.ro)
1546 iter.Seek([]byte("k"))
1547 testKeyVal(t, iter, "k->v")
1548
1549 var err error
1550 snap, err = db.GetSnapshot()
1551 if err != nil {
1552 t.Fatal("GetSnapshot: got error: ", err)
1553 }
1554
1555 h.getValr(snap, "k", "v")
1556
1557 iter2 = snap.NewIterator(nil, h.ro)
1558 iter2.Seek([]byte("k"))
1559 testKeyVal(t, iter2, "k->v")
1560
1561 h.put("foo", "v2")
1562 h.delete("foo")
1563
1564
1565 iter.Release()
1566 iter2.Release()
1567 }()
1568
1569 assertErr(t, db.Put([]byte("x"), []byte("y"), h.wo), true)
1570 _, err := db.Get([]byte("k"), h.ro)
1571 assertErr(t, err, true)
1572
1573 if iter.Valid() {
1574 t.Errorf("iter.Valid should false")
1575 }
1576 assertErr(t, iter.Error(), false)
1577 testKeyVal(t, iter, "->")
1578 if iter.Seek([]byte("k")) {
1579 t.Errorf("iter.Seek should false")
1580 }
1581 assertErr(t, iter.Error(), true)
1582
1583 assertErr(t, iter2.Error(), false)
1584
1585 _, err = snap.Get([]byte("k"), h.ro)
1586 assertErr(t, err, true)
1587
1588 _, err = db.GetSnapshot()
1589 assertErr(t, err, true)
1590
1591 iter3 := db.NewIterator(nil, h.ro)
1592 assertErr(t, iter3.Error(), true)
1593
1594 iter3 = snap.NewIterator(nil, h.ro)
1595 assertErr(t, iter3.Error(), true)
1596
1597 assertErr(t, db.Delete([]byte("k"), h.wo), true)
1598
1599 _, err = db.GetProperty("leveldb.stats")
1600 assertErr(t, err, true)
1601
1602 _, err = db.SizeOf([]util.Range{{Start: []byte("a"), Limit: []byte("z")}})
1603 assertErr(t, err, true)
1604
1605 assertErr(t, db.CompactRange(util.Range{}), true)
1606
1607 assertErr(t, db.Close(), true)
1608 }
1609
1610 type numberComparer struct{}
1611
1612 func (numberComparer) num(x []byte) (n int) {
1613 fmt.Sscan(string(x[1:len(x)-1]), &n)
1614 return
1615 }
1616
1617 func (numberComparer) Name() string {
1618 return "test.NumberComparer"
1619 }
1620
1621 func (p numberComparer) Compare(a, b []byte) int {
1622 return p.num(a) - p.num(b)
1623 }
1624
1625 func (numberComparer) Separator(dst, a, b []byte) []byte { return nil }
1626 func (numberComparer) Successor(dst, b []byte) []byte { return nil }
1627
1628 func TestDB_CustomComparer(t *testing.T) {
1629 h := newDbHarnessWopt(t, &opt.Options{
1630 DisableLargeBatchTransaction: true,
1631 Comparer: numberComparer{},
1632 WriteBuffer: 1000,
1633 })
1634 defer h.close()
1635
1636 h.put("[10]", "ten")
1637 h.put("[0x14]", "twenty")
1638 for i := 0; i < 2; i++ {
1639 h.getVal("[10]", "ten")
1640 h.getVal("[0xa]", "ten")
1641 h.getVal("[20]", "twenty")
1642 h.getVal("[0x14]", "twenty")
1643 h.get("[15]", false)
1644 h.get("[0xf]", false)
1645 h.compactMem()
1646 h.compactRange("[0]", "[9999]")
1647 }
1648
1649 for n := 0; n < 2; n++ {
1650 for i := 0; i < 100; i++ {
1651 v := fmt.Sprintf("[%d]", i*10)
1652 h.put(v, v)
1653 }
1654 h.compactMem()
1655 h.compactRange("[0]", "[1000000]")
1656 }
1657 }
1658
1659 func TestDB_ManualCompaction(t *testing.T) {
1660 h := newDbHarness(t)
1661 defer h.close()
1662
1663 h.db.memdbMaxLevel = 2
1664
1665 h.putMulti(3, "p", "q")
1666 h.tablesPerLevel("1,1,1")
1667
1668
1669 h.compactRange("", "c")
1670 h.tablesPerLevel("1,1,1")
1671
1672
1673 h.compactRange("r", "z")
1674 h.tablesPerLevel("1,1,1")
1675
1676
1677 h.compactRange("p1", "p9")
1678 h.tablesPerLevel("0,0,1")
1679
1680
1681 h.putMulti(3, "c", "e")
1682 h.tablesPerLevel("1,1,2")
1683
1684
1685 h.compactRange("b", "f")
1686 h.tablesPerLevel("0,0,2")
1687
1688
1689 h.putMulti(1, "a", "z")
1690 h.tablesPerLevel("0,1,2")
1691 h.compactRange("", "")
1692 h.tablesPerLevel("0,0,1")
1693 }
1694
1695 func TestDB_BloomFilter(t *testing.T) {
1696 h := newDbHarnessWopt(t, &opt.Options{
1697 DisableLargeBatchTransaction: true,
1698 DisableBlockCache: true,
1699 Filter: filter.NewBloomFilter(10),
1700 })
1701 defer h.close()
1702
1703 key := func(i int) string {
1704 return fmt.Sprintf("key%06d", i)
1705 }
1706
1707 const n = 10000
1708
1709
1710 for i := 0; i < n; i++ {
1711 h.put(key(i), key(i))
1712 }
1713 h.compactMem()
1714 h.compactRange("a", "z")
1715 for i := 0; i < n; i += 100 {
1716 h.put(key(i), key(i))
1717 }
1718 h.compactMem()
1719
1720
1721 h.stor.Stall(testutil.ModeSync, storage.TypeTable)
1722
1723
1724 h.stor.ResetCounter(testutil.ModeRead, storage.TypeTable)
1725 for i := 0; i < n; i++ {
1726 h.getVal(key(i), key(i))
1727 }
1728 cnt, _ := h.stor.Counter(testutil.ModeRead, storage.TypeTable)
1729 t.Logf("lookup of %d present keys yield %d sstable I/O reads", n, cnt)
1730 if min, max := n, n+2*n/100; cnt < min || cnt > max {
1731 t.Errorf("num of sstable I/O reads of present keys not in range of %d - %d, got %d", min, max, cnt)
1732 }
1733
1734
1735 h.stor.ResetCounter(testutil.ModeRead, storage.TypeTable)
1736 for i := 0; i < n; i++ {
1737 h.get(key(i)+".missing", false)
1738 }
1739 cnt, _ = h.stor.Counter(testutil.ModeRead, storage.TypeTable)
1740 t.Logf("lookup of %d missing keys yield %d sstable I/O reads", n, cnt)
1741 if max := 3 * n / 100; cnt > max {
1742 t.Errorf("num of sstable I/O reads of missing keys was more than %d, got %d", max, cnt)
1743 }
1744
1745 h.stor.Release(testutil.ModeSync, storage.TypeTable)
1746 }
1747
1748 func TestDB_Concurrent(t *testing.T) {
1749 const n, secs, maxkey = 4, 6, 1000
1750 h := newDbHarness(t)
1751 defer h.close()
1752
1753 runtime.GOMAXPROCS(runtime.NumCPU())
1754
1755 var (
1756 closeWg sync.WaitGroup
1757 stop uint32
1758 cnt [n]uint32
1759 )
1760
1761 for i := 0; i < n; i++ {
1762 closeWg.Add(1)
1763 go func(i int) {
1764 var put, get, found uint
1765 defer func() {
1766 t.Logf("goroutine %d stopped after %d ops, put=%d get=%d found=%d missing=%d",
1767 i, cnt[i], put, get, found, get-found)
1768 closeWg.Done()
1769 }()
1770
1771 rnd := rand.New(rand.NewSource(int64(1000 + i)))
1772 for atomic.LoadUint32(&stop) == 0 {
1773 x := cnt[i]
1774
1775 k := rnd.Intn(maxkey)
1776 kstr := fmt.Sprintf("%016d", k)
1777
1778 if (rnd.Int() % 2) > 0 {
1779 put++
1780 h.put(kstr, fmt.Sprintf("%d.%d.%-1000d", k, i, x))
1781 } else {
1782 get++
1783 v, err := h.db.Get([]byte(kstr), h.ro)
1784 if err == nil {
1785 found++
1786 rk, ri, rx := 0, -1, uint32(0)
1787 fmt.Sscanf(string(v), "%d.%d.%d", &rk, &ri, &rx)
1788 if rk != k {
1789 t.Errorf("invalid key want=%d got=%d", k, rk)
1790 }
1791 if ri < 0 || ri >= n {
1792 t.Error("invalid goroutine number: ", ri)
1793 } else {
1794 tx := atomic.LoadUint32(&(cnt[ri]))
1795 if rx > tx {
1796 t.Errorf("invalid seq number, %d > %d ", rx, tx)
1797 }
1798 }
1799 } else if err != ErrNotFound {
1800 t.Error("Get: got error: ", err)
1801 return
1802 }
1803 }
1804 atomic.AddUint32(&cnt[i], 1)
1805 }
1806 }(i)
1807 }
1808
1809 time.Sleep(secs * time.Second)
1810 atomic.StoreUint32(&stop, 1)
1811 closeWg.Wait()
1812 }
1813
1814 func TestDB_ConcurrentIterator(t *testing.T) {
1815 const n, n2 = 4, 1000
1816 h := newDbHarnessWopt(t, &opt.Options{DisableLargeBatchTransaction: true, WriteBuffer: 30})
1817 defer h.close()
1818
1819 runtime.GOMAXPROCS(runtime.NumCPU())
1820
1821 var (
1822 closeWg sync.WaitGroup
1823 stop uint32
1824 )
1825
1826 for i := 0; i < n; i++ {
1827 closeWg.Add(1)
1828 go func(i int) {
1829 for k := 0; atomic.LoadUint32(&stop) == 0; k++ {
1830 h.put(fmt.Sprintf("k%d", k), fmt.Sprintf("%d.%d.", k, i)+strings.Repeat("x", 10))
1831 }
1832 closeWg.Done()
1833 }(i)
1834 }
1835
1836 for i := 0; i < n; i++ {
1837 closeWg.Add(1)
1838 go func(i int) {
1839 for k := 1000000; k < 0 || atomic.LoadUint32(&stop) == 0; k-- {
1840 h.put(fmt.Sprintf("k%d", k), fmt.Sprintf("%d.%d.", k, i)+strings.Repeat("x", 10))
1841 }
1842 closeWg.Done()
1843 }(i)
1844 }
1845
1846 cmp := comparer.DefaultComparer
1847 for i := 0; i < n2; i++ {
1848 closeWg.Add(1)
1849 go func(i int) {
1850 it := h.db.NewIterator(nil, nil)
1851 var pk []byte
1852 for it.Next() {
1853 kk := it.Key()
1854 if cmp.Compare(kk, pk) <= 0 {
1855 t.Errorf("iter %d: %q is successor of %q", i, pk, kk)
1856 }
1857 pk = append(pk[:0], kk...)
1858 var k, vk, vi int
1859 if n, err := fmt.Sscanf(string(it.Key()), "k%d", &k); err != nil {
1860 t.Errorf("iter %d: Scanf error on key %q: %v", i, it.Key(), err)
1861 } else if n < 1 {
1862 t.Errorf("iter %d: Cannot parse key %q", i, it.Key())
1863 }
1864 if n, err := fmt.Sscanf(string(it.Value()), "%d.%d", &vk, &vi); err != nil {
1865 t.Errorf("iter %d: Scanf error on value %q: %v", i, it.Value(), err)
1866 } else if n < 2 {
1867 t.Errorf("iter %d: Cannot parse value %q", i, it.Value())
1868 }
1869
1870 if vk != k {
1871 t.Errorf("iter %d: invalid value i=%d, want=%d got=%d", i, vi, k, vk)
1872 }
1873 }
1874 if err := it.Error(); err != nil {
1875 t.Errorf("iter %d: Got error: %v", i, err)
1876 }
1877 it.Release()
1878 closeWg.Done()
1879 }(i)
1880 }
1881
1882 atomic.StoreUint32(&stop, 1)
1883 closeWg.Wait()
1884 }
1885
1886 func TestDB_ConcurrentWrite(t *testing.T) {
1887 const n, bk, niter = 10, 3, 10000
1888 h := newDbHarness(t)
1889 defer h.close()
1890
1891 runtime.GOMAXPROCS(runtime.NumCPU())
1892
1893 var wg sync.WaitGroup
1894 for i := 0; i < n; i++ {
1895 wg.Add(1)
1896 go func(i int) {
1897 defer wg.Done()
1898 for k := 0; k < niter; k++ {
1899 kstr := fmt.Sprintf("put-%d.%d", i, k)
1900 vstr := fmt.Sprintf("v%d", k)
1901 h.put(kstr, vstr)
1902
1903 h.getVal(kstr, vstr)
1904 }
1905 }(i)
1906 }
1907 for i := 0; i < n; i++ {
1908 wg.Add(1)
1909 batch := &Batch{}
1910 go func(i int) {
1911 defer wg.Done()
1912 for k := 0; k < niter; k++ {
1913 batch.Reset()
1914 for j := 0; j < bk; j++ {
1915 batch.Put([]byte(fmt.Sprintf("batch-%d.%d.%d", i, k, j)), []byte(fmt.Sprintf("v%d", k)))
1916 }
1917 h.write(batch)
1918
1919 for j := 0; j < bk; j++ {
1920 h.getVal(fmt.Sprintf("batch-%d.%d.%d", i, k, j), fmt.Sprintf("v%d", k))
1921 }
1922 }
1923 }(i)
1924 }
1925 wg.Wait()
1926 }
1927
1928 func TestDB_CreateReopenDbOnFile(t *testing.T) {
1929 dbpath := filepath.Join(os.TempDir(), fmt.Sprintf("goleveldbtestCreateReopenDbOnFile-%d", os.Getuid()))
1930 if err := os.RemoveAll(dbpath); err != nil {
1931 t.Fatal("cannot remove old db: ", err)
1932 }
1933 defer os.RemoveAll(dbpath)
1934
1935 for i := 0; i < 3; i++ {
1936 stor, err := storage.OpenFile(dbpath, false)
1937 if err != nil {
1938 t.Fatalf("(%d) cannot open storage: %s", i, err)
1939 }
1940 db, err := Open(stor, nil)
1941 if err != nil {
1942 t.Fatalf("(%d) cannot open db: %s", i, err)
1943 }
1944 if err := db.Put([]byte("foo"), []byte("bar"), nil); err != nil {
1945 t.Fatalf("(%d) cannot write to db: %s", i, err)
1946 }
1947 if err := db.Close(); err != nil {
1948 t.Fatalf("(%d) cannot close db: %s", i, err)
1949 }
1950 if err := stor.Close(); err != nil {
1951 t.Fatalf("(%d) cannot close storage: %s", i, err)
1952 }
1953 }
1954 }
1955
1956 func TestDB_CreateReopenDbOnFile2(t *testing.T) {
1957 dbpath := filepath.Join(os.TempDir(), fmt.Sprintf("goleveldbtestCreateReopenDbOnFile2-%d", os.Getuid()))
1958 if err := os.RemoveAll(dbpath); err != nil {
1959 t.Fatal("cannot remove old db: ", err)
1960 }
1961 defer os.RemoveAll(dbpath)
1962
1963 for i := 0; i < 3; i++ {
1964 db, err := OpenFile(dbpath, nil)
1965 if err != nil {
1966 t.Fatalf("(%d) cannot open db: %s", i, err)
1967 }
1968 if err := db.Put([]byte("foo"), []byte("bar"), nil); err != nil {
1969 t.Fatalf("(%d) cannot write to db: %s", i, err)
1970 }
1971 if err := db.Close(); err != nil {
1972 t.Fatalf("(%d) cannot close db: %s", i, err)
1973 }
1974 }
1975 }
1976
1977 func TestDB_DeletionMarkersOnMemdb(t *testing.T) {
1978 h := newDbHarness(t)
1979 defer h.close()
1980
1981 h.put("foo", "v1")
1982 h.compactMem()
1983 h.delete("foo")
1984 h.get("foo", false)
1985 h.getKeyVal("")
1986 }
1987
1988 func TestDB_LeveldbIssue178(t *testing.T) {
1989 nKeys := (opt.DefaultCompactionTableSize / 30) * 5
1990 key1 := func(i int) string {
1991 return fmt.Sprintf("my_key_%d", i)
1992 }
1993 key2 := func(i int) string {
1994 return fmt.Sprintf("my_key_%d_xxx", i)
1995 }
1996
1997
1998
1999 h := newDbHarnessWopt(t, &opt.Options{
2000 DisableLargeBatchTransaction: true,
2001 Compression: opt.NoCompression,
2002 })
2003 defer h.close()
2004
2005
2006 batch := new(Batch)
2007 for i := 0; i < nKeys; i++ {
2008 batch.Put([]byte(key1(i)), []byte("value for range 1 key"))
2009 }
2010 h.write(batch)
2011
2012
2013 batch.Reset()
2014 for i := 0; i < nKeys; i++ {
2015 batch.Put([]byte(key2(i)), []byte("value for range 2 key"))
2016 }
2017 h.write(batch)
2018
2019
2020 batch.Reset()
2021 for i := 0; i < nKeys; i++ {
2022 batch.Delete([]byte(key2(i)))
2023 }
2024 h.write(batch)
2025 h.waitMemCompaction()
2026
2027
2028 h.compactRange(key1(0), key1(nKeys-1))
2029
2030
2031 h.assertNumKeys(nKeys)
2032 }
2033
2034 func TestDB_LeveldbIssue200(t *testing.T) {
2035 h := newDbHarness(t)
2036 defer h.close()
2037
2038 h.put("1", "b")
2039 h.put("2", "c")
2040 h.put("3", "d")
2041 h.put("4", "e")
2042 h.put("5", "f")
2043
2044 iter := h.db.NewIterator(nil, h.ro)
2045
2046
2047 h.put("25", "cd")
2048
2049 iter.Seek([]byte("5"))
2050 assertBytes(t, []byte("5"), iter.Key())
2051 iter.Prev()
2052 assertBytes(t, []byte("4"), iter.Key())
2053 iter.Prev()
2054 assertBytes(t, []byte("3"), iter.Key())
2055 iter.Next()
2056 assertBytes(t, []byte("4"), iter.Key())
2057 iter.Next()
2058 assertBytes(t, []byte("5"), iter.Key())
2059 }
2060
2061 func TestDB_GoleveldbIssue74(t *testing.T) {
2062 h := newDbHarnessWopt(t, &opt.Options{
2063 DisableLargeBatchTransaction: true,
2064 WriteBuffer: 1 * opt.MiB,
2065 })
2066 defer h.close()
2067
2068 const n, dur = 10000, 5 * time.Second
2069
2070 runtime.GOMAXPROCS(runtime.NumCPU())
2071
2072 until := time.Now().Add(dur)
2073 wg := new(sync.WaitGroup)
2074 wg.Add(2)
2075 var done uint32
2076 go func() {
2077 var i int
2078 defer func() {
2079 t.Logf("WRITER DONE #%d", i)
2080 atomic.StoreUint32(&done, 1)
2081 wg.Done()
2082 }()
2083
2084 b := new(Batch)
2085 for ; time.Now().Before(until) && atomic.LoadUint32(&done) == 0; i++ {
2086 if t.Failed() {
2087 return
2088 }
2089
2090 iv := fmt.Sprintf("VAL%010d", i)
2091 for k := 0; k < n; k++ {
2092 key := fmt.Sprintf("KEY%06d", k)
2093 b.Put([]byte(key), []byte(key+iv))
2094 b.Put([]byte(fmt.Sprintf("PTR%06d", k)), []byte(key))
2095 }
2096 h.write(b)
2097
2098 b.Reset()
2099 snap := h.getSnapshot()
2100 iter := snap.NewIterator(util.BytesPrefix([]byte("PTR")), nil)
2101 var k int
2102 for ; iter.Next(); k++ {
2103 if t.Failed() {
2104 return
2105 }
2106
2107 ptrKey := iter.Key()
2108 key := iter.Value()
2109
2110 if _, err := snap.Get(ptrKey, nil); err != nil {
2111 t.Errorf("WRITER #%d snapshot.Get %q: %v", i, ptrKey, err)
2112 return
2113 }
2114 if value, err := snap.Get(key, nil); err != nil {
2115 t.Errorf("WRITER #%d snapshot.Get %q: %v", i, key, err)
2116 return
2117 } else if string(value) != string(key)+iv {
2118 t.Errorf("WRITER #%d snapshot.Get %q got invalid value, want %q got %q", i, key, string(key)+iv, value)
2119 return
2120 }
2121
2122 b.Delete(key)
2123 b.Delete(ptrKey)
2124 }
2125 h.write(b)
2126 iter.Release()
2127 snap.Release()
2128 if k != n {
2129 t.Errorf("#%d %d != %d", i, k, n)
2130 }
2131 }
2132 }()
2133 go func() {
2134 var i int
2135 defer func() {
2136 t.Logf("READER DONE #%d", i)
2137 atomic.StoreUint32(&done, 1)
2138 wg.Done()
2139 }()
2140 for ; time.Now().Before(until) && atomic.LoadUint32(&done) == 0; i++ {
2141 if t.Failed() {
2142 return
2143 }
2144
2145 snap := h.getSnapshot()
2146 iter := snap.NewIterator(util.BytesPrefix([]byte("PTR")), nil)
2147 var prevValue string
2148 var k int
2149 for ; iter.Next(); k++ {
2150 if t.Failed() {
2151 return
2152 }
2153
2154 ptrKey := iter.Key()
2155 key := iter.Value()
2156
2157 if _, err := snap.Get(ptrKey, nil); err != nil {
2158 t.Errorf("READER #%d snapshot.Get %q: %v", i, ptrKey, err)
2159 return
2160 }
2161
2162 if value, err := snap.Get(key, nil); err != nil {
2163 t.Errorf("READER #%d snapshot.Get %q: %v", i, key, err)
2164 return
2165 } else if prevValue != "" && string(value) != string(key)+prevValue {
2166 t.Errorf("READER #%d snapshot.Get %q got invalid value, want %q got %q", i, key, string(key)+prevValue, value)
2167 return
2168 } else {
2169 prevValue = string(value[len(key):])
2170 }
2171 }
2172 iter.Release()
2173 snap.Release()
2174 if k > 0 && k != n {
2175 t.Errorf("#%d %d != %d", i, k, n)
2176 }
2177 }
2178 }()
2179 wg.Wait()
2180 }
2181
2182 func TestDB_GetProperties(t *testing.T) {
2183 h := newDbHarness(t)
2184 defer h.close()
2185
2186 _, err := h.db.GetProperty("leveldb.num-files-at-level")
2187 if err == nil {
2188 t.Error("GetProperty() failed to detect missing level")
2189 }
2190
2191 _, err = h.db.GetProperty("leveldb.num-files-at-level0")
2192 if err != nil {
2193 t.Error("got unexpected error", err)
2194 }
2195
2196 _, err = h.db.GetProperty("leveldb.num-files-at-level0x")
2197 if err == nil {
2198 t.Error("GetProperty() failed to detect invalid level")
2199 }
2200 }
2201
2202 func TestDB_GoleveldbIssue72and83(t *testing.T) {
2203 h := newDbHarnessWopt(t, &opt.Options{
2204 DisableLargeBatchTransaction: true,
2205 WriteBuffer: 1 * opt.MiB,
2206 OpenFilesCacheCapacity: 3,
2207 })
2208 defer h.close()
2209
2210 const n, wn, dur = 10000, 100, 30 * time.Second
2211
2212 runtime.GOMAXPROCS(runtime.NumCPU())
2213
2214 randomData := func(prefix byte, i int) []byte {
2215 data := make([]byte, 1+4+32+64+32)
2216 _, err := crand.Reader.Read(data[1 : len(data)-8])
2217 if err != nil {
2218 panic(err)
2219 }
2220 data[0] = prefix
2221 binary.LittleEndian.PutUint32(data[len(data)-8:], uint32(i))
2222 binary.LittleEndian.PutUint32(data[len(data)-4:], util.NewCRC(data[:len(data)-4]).Value())
2223 return data
2224 }
2225
2226 keys := make([][]byte, n)
2227 for i := range keys {
2228 keys[i] = randomData(1, 0)
2229 }
2230
2231 until := time.Now().Add(dur)
2232 wg := new(sync.WaitGroup)
2233 wg.Add(3)
2234 var done uint32
2235 go func() {
2236 i := 0
2237 defer func() {
2238 t.Logf("WRITER DONE #%d", i)
2239 wg.Done()
2240 }()
2241
2242 b := new(Batch)
2243 for ; i < wn && atomic.LoadUint32(&done) == 0; i++ {
2244 if t.Failed() {
2245 return
2246 }
2247
2248 b.Reset()
2249 for _, k1 := range keys {
2250 k2 := randomData(2, i)
2251 b.Put(k2, randomData(42, i))
2252 b.Put(k1, k2)
2253 }
2254 if err := h.db.Write(b, h.wo); err != nil {
2255 atomic.StoreUint32(&done, 1)
2256 t.Errorf("WRITER #%d db.Write: %v", i, err)
2257 return
2258 }
2259 }
2260 }()
2261 go func() {
2262 var i int
2263 defer func() {
2264 t.Logf("READER0 DONE #%d", i)
2265 atomic.StoreUint32(&done, 1)
2266 wg.Done()
2267 }()
2268 for ; time.Now().Before(until) && atomic.LoadUint32(&done) == 0; i++ {
2269 if t.Failed() {
2270 return
2271 }
2272
2273 snap := h.getSnapshot()
2274 seq := snap.elem.seq
2275 if seq == 0 {
2276 snap.Release()
2277 continue
2278 }
2279 iter := snap.NewIterator(util.BytesPrefix([]byte{1}), nil)
2280 writei := int(seq/(n*2) - 1)
2281 var k int
2282 for ; iter.Next(); k++ {
2283 if t.Failed() {
2284 return
2285 }
2286
2287 k1 := iter.Key()
2288 k2 := iter.Value()
2289 k1checksum0 := binary.LittleEndian.Uint32(k1[len(k1)-4:])
2290 k1checksum1 := util.NewCRC(k1[:len(k1)-4]).Value()
2291 if k1checksum0 != k1checksum1 {
2292 t.Errorf("READER0 #%d.%d W#%d invalid K1 checksum: %#x != %#x", i, k, writei, k1checksum0, k1checksum0)
2293 return
2294 }
2295 k2checksum0 := binary.LittleEndian.Uint32(k2[len(k2)-4:])
2296 k2checksum1 := util.NewCRC(k2[:len(k2)-4]).Value()
2297 if k2checksum0 != k2checksum1 {
2298 t.Errorf("READER0 #%d.%d W#%d invalid K2 checksum: %#x != %#x", i, k, writei, k2checksum0, k2checksum1)
2299 return
2300 }
2301 kwritei := int(binary.LittleEndian.Uint32(k2[len(k2)-8:]))
2302 if writei != kwritei {
2303 t.Errorf("READER0 #%d.%d W#%d invalid write iteration num: %d", i, k, writei, kwritei)
2304 return
2305 }
2306 if _, err := snap.Get(k2, nil); err != nil {
2307 t.Errorf("READER0 #%d.%d W#%d snap.Get: %v\nk1: %x\n -> k2: %x", i, k, writei, err, k1, k2)
2308 return
2309 }
2310 }
2311 if err := iter.Error(); err != nil {
2312 t.Errorf("READER0 #%d.%d W#%d snap.Iterator: %v", i, k, writei, err)
2313 return
2314 }
2315 iter.Release()
2316 snap.Release()
2317 if k > 0 && k != n {
2318 t.Errorf("READER0 #%d W#%d short read, got=%d want=%d", i, writei, k, n)
2319 }
2320 }
2321 }()
2322 go func() {
2323 var i int
2324 defer func() {
2325 t.Logf("READER1 DONE #%d", i)
2326 atomic.StoreUint32(&done, 1)
2327 wg.Done()
2328 }()
2329 for ; time.Now().Before(until) && atomic.LoadUint32(&done) == 0; i++ {
2330 if t.Failed() {
2331 return
2332 }
2333
2334 iter := h.db.NewIterator(nil, nil)
2335 seq := iter.(*dbIter).seq
2336 if seq == 0 {
2337 iter.Release()
2338 continue
2339 }
2340 writei := int(seq/(n*2) - 1)
2341 var k int
2342 for ok := iter.Last(); ok; ok = iter.Prev() {
2343 k++
2344 }
2345 if err := iter.Error(); err != nil {
2346 t.Errorf("READER1 #%d.%d W#%d db.Iterator: %v", i, k, writei, err)
2347 return
2348 }
2349 iter.Release()
2350 if m := (writei+1)*n + n; k != m {
2351 t.Errorf("READER1 #%d W#%d short read, got=%d want=%d", i, writei, k, m)
2352 return
2353 }
2354 }
2355 }()
2356
2357 wg.Wait()
2358 }
2359
2360 func TestDB_TransientError(t *testing.T) {
2361 h := newDbHarnessWopt(t, &opt.Options{
2362 DisableLargeBatchTransaction: true,
2363 WriteBuffer: 128 * opt.KiB,
2364 OpenFilesCacheCapacity: 3,
2365 DisableCompactionBackoff: true,
2366 })
2367 defer h.close()
2368
2369 const (
2370 nSnap = 20
2371 nKey = 10000
2372 )
2373
2374 var (
2375 snaps [nSnap]*Snapshot
2376 b = &Batch{}
2377 )
2378 for i := range snaps {
2379 vtail := fmt.Sprintf("VAL%030d", i)
2380 b.Reset()
2381 for k := 0; k < nKey; k++ {
2382 key := fmt.Sprintf("KEY%8d", k)
2383 b.Put([]byte(key), []byte(key+vtail))
2384 }
2385 h.stor.EmulateError(testutil.ModeOpen|testutil.ModeRead, storage.TypeTable, errors.New("table transient read error"))
2386 if err := h.db.Write(b, nil); err != nil {
2387 t.Logf("WRITE #%d error: %v", i, err)
2388 h.stor.EmulateError(testutil.ModeOpen|testutil.ModeRead, storage.TypeTable, nil)
2389 for {
2390 if err := h.db.Write(b, nil); err == nil {
2391 break
2392 } else if errors.IsCorrupted(err) {
2393 t.Fatalf("WRITE #%d corrupted: %v", i, err)
2394 }
2395 }
2396 }
2397
2398 snaps[i] = h.db.newSnapshot()
2399 b.Reset()
2400 for k := 0; k < nKey; k++ {
2401 key := fmt.Sprintf("KEY%8d", k)
2402 b.Delete([]byte(key))
2403 }
2404 h.stor.EmulateError(testutil.ModeOpen|testutil.ModeRead, storage.TypeTable, errors.New("table transient read error"))
2405 if err := h.db.Write(b, nil); err != nil {
2406 t.Logf("WRITE #%d error: %v", i, err)
2407 h.stor.EmulateError(testutil.ModeOpen|testutil.ModeRead, storage.TypeTable, nil)
2408 for {
2409 if err := h.db.Write(b, nil); err == nil {
2410 break
2411 } else if errors.IsCorrupted(err) {
2412 t.Fatalf("WRITE #%d corrupted: %v", i, err)
2413 }
2414 }
2415 }
2416 }
2417 h.stor.EmulateError(testutil.ModeOpen|testutil.ModeRead, storage.TypeTable, nil)
2418
2419 runtime.GOMAXPROCS(runtime.NumCPU())
2420
2421 rnd := rand.New(rand.NewSource(0xecafdaed))
2422 wg := &sync.WaitGroup{}
2423 for i, snap := range snaps {
2424 wg.Add(2)
2425
2426 go func(i int, snap *Snapshot, sk []int) {
2427 defer wg.Done()
2428
2429 vtail := fmt.Sprintf("VAL%030d", i)
2430 for _, k := range sk {
2431 if t.Failed() {
2432 return
2433 }
2434
2435 key := fmt.Sprintf("KEY%8d", k)
2436 xvalue, err := snap.Get([]byte(key), nil)
2437 if err != nil {
2438 t.Errorf("READER_GET #%d SEQ=%d K%d error: %v", i, snap.elem.seq, k, err)
2439 return
2440 }
2441 value := key + vtail
2442 if !bytes.Equal([]byte(value), xvalue) {
2443 t.Errorf("READER_GET #%d SEQ=%d K%d invalid value: want %q, got %q", i, snap.elem.seq, k, value, xvalue)
2444 return
2445 }
2446 }
2447 }(i, snap, rnd.Perm(nKey))
2448
2449 go func(i int, snap *Snapshot) {
2450 defer wg.Done()
2451
2452 vtail := fmt.Sprintf("VAL%030d", i)
2453 iter := snap.NewIterator(nil, nil)
2454 defer iter.Release()
2455 for k := 0; k < nKey; k++ {
2456 if t.Failed() {
2457 return
2458 }
2459
2460 if !iter.Next() {
2461 if err := iter.Error(); err != nil {
2462 t.Errorf("READER_ITER #%d K%d error: %v", i, k, err)
2463 return
2464 }
2465 t.Errorf("READER_ITER #%d K%d eoi", i, k)
2466 return
2467 }
2468 key := fmt.Sprintf("KEY%8d", k)
2469 xkey := iter.Key()
2470 if !bytes.Equal([]byte(key), xkey) {
2471 t.Errorf("READER_ITER #%d K%d invalid key: want %q, got %q", i, k, key, xkey)
2472 return
2473 }
2474 value := key + vtail
2475 xvalue := iter.Value()
2476 if !bytes.Equal([]byte(value), xvalue) {
2477 t.Errorf("READER_ITER #%d K%d invalid value: want %q, got %q", i, k, value, xvalue)
2478 return
2479 }
2480 }
2481 }(i, snap)
2482 }
2483
2484 wg.Wait()
2485 }
2486
2487 func TestDB_UkeyShouldntHopAcrossTable(t *testing.T) {
2488 h := newDbHarnessWopt(t, &opt.Options{
2489 DisableLargeBatchTransaction: true,
2490 WriteBuffer: 112 * opt.KiB,
2491 CompactionTableSize: 90 * opt.KiB,
2492 CompactionExpandLimitFactor: 1,
2493 })
2494 defer h.close()
2495
2496 const (
2497 nSnap = 190
2498 nKey = 140
2499 )
2500
2501 var (
2502 snaps [nSnap]*Snapshot
2503 b = &Batch{}
2504 )
2505 for i := range snaps {
2506 vtail := fmt.Sprintf("VAL%030d", i)
2507 b.Reset()
2508 for k := 0; k < nKey; k++ {
2509 key := fmt.Sprintf("KEY%08d", k)
2510 b.Put([]byte(key), []byte(key+vtail))
2511 }
2512 if err := h.db.Write(b, nil); err != nil {
2513 t.Fatalf("WRITE #%d error: %v", i, err)
2514 }
2515
2516 snaps[i] = h.db.newSnapshot()
2517 b.Reset()
2518 for k := 0; k < nKey; k++ {
2519 key := fmt.Sprintf("KEY%08d", k)
2520 b.Delete([]byte(key))
2521 }
2522 if err := h.db.Write(b, nil); err != nil {
2523 t.Fatalf("WRITE #%d error: %v", i, err)
2524 }
2525 }
2526
2527 h.compactMem()
2528
2529 h.waitCompaction()
2530 for level, tables := range h.db.s.stVersion.levels {
2531 for _, table := range tables {
2532 t.Logf("L%d@%d %q:%q", level, table.fd.Num, table.imin, table.imax)
2533 }
2534 }
2535
2536 h.compactRangeAt(0, "", "")
2537 h.waitCompaction()
2538 for level, tables := range h.db.s.stVersion.levels {
2539 for _, table := range tables {
2540 t.Logf("L%d@%d %q:%q", level, table.fd.Num, table.imin, table.imax)
2541 }
2542 }
2543 h.compactRangeAt(1, "", "")
2544 h.waitCompaction()
2545 for level, tables := range h.db.s.stVersion.levels {
2546 for _, table := range tables {
2547 t.Logf("L%d@%d %q:%q", level, table.fd.Num, table.imin, table.imax)
2548 }
2549 }
2550 runtime.GOMAXPROCS(runtime.NumCPU())
2551
2552 wg := &sync.WaitGroup{}
2553 for i, snap := range snaps {
2554 wg.Add(1)
2555
2556 go func(i int, snap *Snapshot) {
2557 defer wg.Done()
2558
2559 vtail := fmt.Sprintf("VAL%030d", i)
2560 for k := 0; k < nKey; k++ {
2561 if t.Failed() {
2562 return
2563 }
2564
2565 key := fmt.Sprintf("KEY%08d", k)
2566 xvalue, err := snap.Get([]byte(key), nil)
2567 if err != nil {
2568 t.Errorf("READER_GET #%d SEQ=%d K%d error: %v", i, snap.elem.seq, k, err)
2569 return
2570 }
2571 value := key + vtail
2572 if !bytes.Equal([]byte(value), xvalue) {
2573 t.Errorf("READER_GET #%d SEQ=%d K%d invalid value: want %q, got %q", i, snap.elem.seq, k, value, xvalue)
2574 return
2575 }
2576 }
2577 }(i, snap)
2578 }
2579
2580 wg.Wait()
2581 }
2582
2583 func TestDB_TableCompactionBuilder(t *testing.T) {
2584 gomega.RegisterTestingT(t)
2585 stor := testutil.NewStorage()
2586 stor.OnLog(testingLogger(t))
2587 stor.OnClose(testingPreserveOnFailed(t))
2588 defer stor.Close()
2589
2590 const nSeq = 99
2591
2592 o := &opt.Options{
2593 DisableLargeBatchTransaction: true,
2594 WriteBuffer: 112 * opt.KiB,
2595 CompactionTableSize: 43 * opt.KiB,
2596 CompactionExpandLimitFactor: 1,
2597 CompactionGPOverlapsFactor: 1,
2598 DisableBlockCache: true,
2599 }
2600 s, err := newSession(stor, o)
2601 if err != nil {
2602 t.Fatal(err)
2603 }
2604 if err := s.create(); err != nil {
2605 t.Fatal(err)
2606 }
2607 defer s.close()
2608 var (
2609 seq uint64
2610 targetSize = 5 * o.CompactionTableSize
2611 value = bytes.Repeat([]byte{'0'}, 100)
2612 )
2613 for i := 0; i < 2; i++ {
2614 tw, err := s.tops.create(0)
2615 if err != nil {
2616 t.Fatal(err)
2617 }
2618 for k := 0; tw.tw.BytesLen() < targetSize; k++ {
2619 key := []byte(fmt.Sprintf("%09d", k))
2620 seq += nSeq - 1
2621 for x := uint64(0); x < nSeq; x++ {
2622 if err := tw.append(makeInternalKey(nil, key, seq-x, keyTypeVal), value); err != nil {
2623 t.Fatal(err)
2624 }
2625 }
2626 }
2627 tf, err := tw.finish()
2628 if err != nil {
2629 t.Fatal(err)
2630 }
2631 rec := &sessionRecord{}
2632 rec.addTableFile(i, tf)
2633 if err := s.commit(rec, false); err != nil {
2634 t.Fatal(err)
2635 }
2636 }
2637
2638
2639 v := s.version()
2640 c := newCompaction(s, v, 1, append(tFiles{}, v.levels[1]...), undefinedCompaction)
2641 rec := &sessionRecord{}
2642 b := &tableCompactionBuilder{
2643 s: s,
2644 c: c,
2645 rec: rec,
2646 stat1: new(cStatStaging),
2647 minSeq: 0,
2648 strict: true,
2649 tableSize: o.CompactionTableSize/3 + 961,
2650 }
2651 if err := b.run(new(compactionTransactCounter)); err != nil {
2652 t.Fatal(err)
2653 }
2654 for _, t := range c.levels[0] {
2655 rec.delTable(c.sourceLevel, t.fd.Num)
2656 }
2657 if err := s.commit(rec, false); err != nil {
2658 t.Fatal(err)
2659 }
2660 c.release()
2661
2662
2663 v = s.version()
2664 c = newCompaction(s, v, 0, append(tFiles{}, v.levels[0]...), undefinedCompaction)
2665 rec = &sessionRecord{}
2666 b = &tableCompactionBuilder{
2667 s: s,
2668 c: c,
2669 rec: rec,
2670 stat1: new(cStatStaging),
2671 minSeq: 0,
2672 strict: true,
2673 tableSize: o.CompactionTableSize,
2674 }
2675 if err := b.run(new(compactionTransactCounter)); err != nil {
2676 t.Fatal(err)
2677 }
2678 for _, t := range c.levels[0] {
2679 rec.delTable(c.sourceLevel, t.fd.Num)
2680 }
2681
2682 for _, t := range v.levels[2] {
2683 rec.delTable(2, t.fd.Num)
2684 rec.addTableFile(3, t)
2685 }
2686 if err := s.commit(rec, false); err != nil {
2687 t.Fatal(err)
2688 }
2689 c.release()
2690
2691 v = s.version()
2692 for level, want := range []bool{false, true, false, true} {
2693 got := len(v.levels[level]) > 0
2694 if want != got {
2695 t.Fatalf("invalid level-%d tables len: want %v, got %v", level, want, got)
2696 }
2697 }
2698 for i, f := range v.levels[1][:len(v.levels[1])-1] {
2699 nf := v.levels[1][i+1]
2700 if bytes.Equal(f.imax.ukey(), nf.imin.ukey()) {
2701 t.Fatalf("KEY %q hop across table %d .. %d", f.imax.ukey(), f.fd.Num, nf.fd.Num)
2702 }
2703 }
2704 v.release()
2705
2706
2707 v = s.version()
2708 c = newCompaction(s, v, 1, append(tFiles{}, v.levels[1]...), undefinedCompaction)
2709 rec = &sessionRecord{}
2710 b = &tableCompactionBuilder{
2711 s: s,
2712 c: c,
2713 rec: rec,
2714 stat1: new(cStatStaging),
2715 minSeq: 0,
2716 strict: true,
2717 tableSize: o.CompactionTableSize,
2718 }
2719 stor.EmulateErrorOnce(testutil.ModeSync, storage.TypeTable, errors.New("table sync error (once)"))
2720 stor.EmulateRandomError(testutil.ModeRead|testutil.ModeWrite, storage.TypeTable, 0.01, errors.New("table random IO error"))
2721 for {
2722 if err := b.run(new(compactionTransactCounter)); err != nil {
2723 t.Logf("(expected) b.run: %v", err)
2724 } else {
2725 break
2726 }
2727 }
2728 if err := s.commit(rec, false); err != nil {
2729 t.Fatal(err)
2730 }
2731 c.release()
2732
2733 stor.EmulateErrorOnce(testutil.ModeSync, storage.TypeTable, nil)
2734 stor.EmulateRandomError(testutil.ModeRead|testutil.ModeWrite, storage.TypeTable, 0, nil)
2735
2736 v = s.version()
2737 if len(v.levels[1]) != len(v.levels[2]) {
2738 t.Fatalf("invalid tables length, want %d, got %d", len(v.levels[1]), len(v.levels[2]))
2739 }
2740 for i, f0 := range v.levels[1] {
2741 f1 := v.levels[2][i]
2742 iter0 := s.tops.newIterator(f0, nil, nil)
2743 iter1 := s.tops.newIterator(f1, nil, nil)
2744 for j := 0; true; j++ {
2745 next0 := iter0.Next()
2746 next1 := iter1.Next()
2747 if next0 != next1 {
2748 t.Fatalf("#%d.%d invalid eoi: want %v, got %v", i, j, next0, next1)
2749 }
2750 key0 := iter0.Key()
2751 key1 := iter1.Key()
2752 if !bytes.Equal(key0, key1) {
2753 t.Fatalf("#%d.%d invalid key: want %q, got %q", i, j, key0, key1)
2754 }
2755 if next0 == false {
2756 break
2757 }
2758 }
2759 iter0.Release()
2760 iter1.Release()
2761 }
2762 v.release()
2763 }
2764
2765 func testDB_IterTriggeredCompaction(t *testing.T, limitDiv int) {
2766 const (
2767 vSize = 200 * opt.KiB
2768 tSize = 100 * opt.MiB
2769 mIter = 100
2770 n = tSize / vSize
2771 )
2772
2773 h := newDbHarnessWopt(t, &opt.Options{
2774 DisableLargeBatchTransaction: true,
2775 Compression: opt.NoCompression,
2776 DisableBlockCache: true,
2777 })
2778 defer h.close()
2779
2780 h.db.memdbMaxLevel = 2
2781
2782 key := func(x int) string {
2783 return fmt.Sprintf("v%06d", x)
2784 }
2785
2786
2787 value := strings.Repeat("x", vSize)
2788 for i := 0; i < n; i++ {
2789 h.put(key(i), value)
2790 }
2791 h.compactMem()
2792
2793
2794 for i := 0; i < n; i++ {
2795 h.delete(key(i))
2796 }
2797 h.compactMem()
2798
2799 var (
2800 limit = n / limitDiv
2801
2802 startKey = key(0)
2803 limitKey = key(limit)
2804 maxKey = key(n)
2805 slice = &util.Range{Limit: []byte(limitKey)}
2806
2807 initialSize0 = h.sizeOf(startKey, limitKey)
2808 initialSize1 = h.sizeOf(limitKey, maxKey)
2809 )
2810
2811 t.Logf("initial size %s [rest %s]", shortenb(initialSize0), shortenb(initialSize1))
2812
2813 for r := 0; true; r++ {
2814 if r >= mIter {
2815 t.Fatal("taking too long to compact")
2816 }
2817
2818
2819 iter := h.db.NewIterator(slice, h.ro)
2820 for iter.Next() {
2821 }
2822 if err := iter.Error(); err != nil {
2823 t.Fatalf("Iter err: %v", err)
2824 }
2825 iter.Release()
2826
2827
2828 h.waitCompaction()
2829
2830
2831 size0 := h.sizeOf(startKey, limitKey)
2832 size1 := h.sizeOf(limitKey, maxKey)
2833 t.Logf("#%03d size %s [rest %s]", r, shortenb(size0), shortenb(size1))
2834 if size0 < initialSize0/10 {
2835 break
2836 }
2837 }
2838
2839 if initialSize1 > 0 {
2840 h.sizeAssert(limitKey, maxKey, initialSize1/4-opt.MiB, initialSize1+opt.MiB)
2841 }
2842 }
2843
2844 func TestDB_IterTriggeredCompaction(t *testing.T) {
2845 testDB_IterTriggeredCompaction(t, 1)
2846 }
2847
2848 func TestDB_IterTriggeredCompactionHalf(t *testing.T) {
2849 testDB_IterTriggeredCompaction(t, 2)
2850 }
2851
2852 func TestDB_ReadOnly(t *testing.T) {
2853 h := newDbHarness(t)
2854 defer h.close()
2855
2856 h.put("foo", "v1")
2857 h.put("bar", "v2")
2858 h.compactMem()
2859
2860 h.put("xfoo", "v1")
2861 h.put("xbar", "v2")
2862
2863 t.Log("Trigger read-only")
2864 if err := h.db.SetReadOnly(); err != nil {
2865 h.close()
2866 t.Fatalf("SetReadOnly error: %v", err)
2867 }
2868
2869 mode := testutil.ModeCreate | testutil.ModeRemove | testutil.ModeRename | testutil.ModeWrite | testutil.ModeSync
2870 h.stor.EmulateError(mode, storage.TypeAll, errors.New("read-only DB shouldn't writes"))
2871
2872 ro := func(key, value, wantValue string) {
2873 if err := h.db.Put([]byte(key), []byte(value), h.wo); err != ErrReadOnly {
2874 t.Fatalf("unexpected error: %v", err)
2875 }
2876 h.getVal(key, wantValue)
2877 }
2878
2879 ro("foo", "vx", "v1")
2880
2881 h.o.ReadOnly = true
2882 h.reopenDB()
2883
2884 ro("foo", "vx", "v1")
2885 ro("bar", "vx", "v2")
2886 h.assertNumKeys(4)
2887 }
2888
2889 func TestDB_BulkInsertDelete(t *testing.T) {
2890 h := newDbHarnessWopt(t, &opt.Options{
2891 DisableLargeBatchTransaction: true,
2892 Compression: opt.NoCompression,
2893 CompactionTableSize: 128 * opt.KiB,
2894 CompactionTotalSize: 1 * opt.MiB,
2895 WriteBuffer: 256 * opt.KiB,
2896 })
2897 defer h.close()
2898
2899 const R = 100
2900 const N = 2500
2901 key := make([]byte, 4)
2902 value := make([]byte, 256)
2903 for i := 0; i < R; i++ {
2904 offset := N * i
2905 for j := 0; j < N; j++ {
2906 binary.BigEndian.PutUint32(key, uint32(offset+j))
2907 if err := h.db.Put(key, value, nil); err != nil {
2908 t.Fatal("Put error: ", err)
2909 }
2910 }
2911 for j := 0; j < N; j++ {
2912 binary.BigEndian.PutUint32(key, uint32(offset+j))
2913 if err := h.db.Delete(key, nil); err != nil {
2914 t.Fatal("Delete error: ", err)
2915 }
2916 }
2917 }
2918
2919 h.waitCompaction()
2920 if tot := h.totalTables(); tot > 10 {
2921 t.Fatalf("too many uncompacted tables: %d (%s)", tot, h.getTablesPerLevel())
2922 }
2923 }
2924
2925 func TestDB_GracefulClose(t *testing.T) {
2926 runtime.GOMAXPROCS(4)
2927 h := newDbHarnessWopt(t, &opt.Options{
2928 DisableLargeBatchTransaction: true,
2929 Compression: opt.NoCompression,
2930 CompactionTableSize: 1 * opt.MiB,
2931 WriteBuffer: 1 * opt.MiB,
2932 })
2933 defer h.close()
2934
2935 var closeWait sync.WaitGroup
2936
2937
2938 n := 0
2939 closing := false
2940 for i := 0; i < 1000000; i++ {
2941 if !closing && h.totalTables() > 3 {
2942 t.Logf("close db during write, index=%d", i)
2943 closeWait.Add(1)
2944 go func() {
2945 h.closeDB()
2946 closeWait.Done()
2947 }()
2948 closing = true
2949 }
2950 if err := h.db.Put([]byte(fmt.Sprintf("%09d", i)), []byte(fmt.Sprintf("VAL-%09d", i)), h.wo); err != nil {
2951 t.Logf("Put error: %s (expected)", err)
2952 n = i
2953 break
2954 }
2955 }
2956 closeWait.Wait()
2957
2958
2959 h.openDB()
2960 closing = false
2961 for i := 0; i < n; i++ {
2962 if !closing && i > n/2 {
2963 t.Logf("close db during read, index=%d", i)
2964 closeWait.Add(1)
2965 go func() {
2966 h.closeDB()
2967 closeWait.Done()
2968 }()
2969 closing = true
2970 }
2971 if _, err := h.db.Get([]byte(fmt.Sprintf("%09d", i)), h.ro); err != nil {
2972 t.Logf("Get error: %s (expected)", err)
2973 break
2974 }
2975 }
2976 closeWait.Wait()
2977
2978
2979 h.openDB()
2980 closing = false
2981 iter := h.db.NewIterator(nil, h.ro)
2982 for i := 0; iter.Next(); i++ {
2983 if len(iter.Key()) == 0 || len(iter.Value()) == 0 {
2984 t.Error("Key or value has zero length")
2985 }
2986 if !closing {
2987 t.Logf("close db during iter, index=%d", i)
2988 closeWait.Add(1)
2989 go func() {
2990 h.closeDB()
2991 closeWait.Done()
2992 }()
2993 closing = true
2994 }
2995 time.Sleep(time.Millisecond)
2996 }
2997 if err := iter.Error(); err != nil {
2998 t.Logf("Iter error: %s (expected)", err)
2999 }
3000 iter.Release()
3001 closeWait.Wait()
3002 }
3003
View as plain text