...

Source file src/github.com/syndtr/goleveldb/leveldb/db_test.go

Documentation: github.com/syndtr/goleveldb/leveldb

     1  // Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
     2  // All rights reserved.
     3  //
     4  // Use of this source code is governed by a BSD-style license that can be
     5  // found in the LICENSE file.
     6  
     7  package leveldb
     8  
     9  import (
    10  	"bytes"
    11  	"container/list"
    12  	crand "crypto/rand"
    13  	"encoding/binary"
    14  	"fmt"
    15  	"math/rand"
    16  	"os"
    17  	"path/filepath"
    18  	"runtime"
    19  	"strings"
    20  	"sync"
    21  	"sync/atomic"
    22  	"testing"
    23  	"time"
    24  	"unsafe"
    25  
    26  	"github.com/onsi/gomega"
    27  
    28  	"github.com/syndtr/goleveldb/leveldb/comparer"
    29  	"github.com/syndtr/goleveldb/leveldb/errors"
    30  	"github.com/syndtr/goleveldb/leveldb/filter"
    31  	"github.com/syndtr/goleveldb/leveldb/iterator"
    32  	"github.com/syndtr/goleveldb/leveldb/opt"
    33  	"github.com/syndtr/goleveldb/leveldb/storage"
    34  	"github.com/syndtr/goleveldb/leveldb/testutil"
    35  	"github.com/syndtr/goleveldb/leveldb/util"
    36  )
    37  
    38  func tkey(i int) []byte {
    39  	return []byte(fmt.Sprintf("%016d", i))
    40  }
    41  
    42  func tval(seed, n int) []byte {
    43  	r := rand.New(rand.NewSource(int64(seed)))
    44  	return randomString(r, n)
    45  }
    46  
    47  func testingLogger(t *testing.T) func(log string) {
    48  	return func(log string) {
    49  		t.Log(log)
    50  	}
    51  }
    52  
    53  func testingPreserveOnFailed(t *testing.T) func() (preserve bool, err error) {
    54  	return func() (preserve bool, err error) {
    55  		preserve = t.Failed()
    56  		return
    57  	}
    58  }
    59  
    60  type dbHarness struct {
    61  	t *testing.T
    62  
    63  	stor *testutil.Storage
    64  	db   *DB
    65  	o    *opt.Options
    66  	ro   *opt.ReadOptions
    67  	wo   *opt.WriteOptions
    68  }
    69  
    70  func newDbHarnessWopt(t *testing.T, o *opt.Options) *dbHarness {
    71  	h := new(dbHarness)
    72  	h.init(t, o)
    73  	return h
    74  }
    75  
    76  func newDbHarness(t *testing.T) *dbHarness {
    77  	return newDbHarnessWopt(t, &opt.Options{DisableLargeBatchTransaction: true})
    78  }
    79  
    80  func (h *dbHarness) init(t *testing.T, o *opt.Options) {
    81  	gomega.RegisterTestingT(t)
    82  	h.t = t
    83  	h.stor = testutil.NewStorage()
    84  	h.stor.OnLog(testingLogger(t))
    85  	h.stor.OnClose(testingPreserveOnFailed(t))
    86  	h.o = o
    87  	h.ro = nil
    88  	h.wo = nil
    89  
    90  	if err := h.openDB0(); err != nil {
    91  		// So that it will come after fatal message.
    92  		defer h.stor.Close()
    93  		h.t.Fatal("Open (init): got error: ", err)
    94  	}
    95  }
    96  
    97  func (h *dbHarness) openDB0() (err error) {
    98  	h.t.Log("opening DB")
    99  	h.db, err = Open(h.stor, h.o)
   100  	return
   101  }
   102  
   103  func (h *dbHarness) openDB() {
   104  	if err := h.openDB0(); err != nil {
   105  		h.t.Fatal("Open: got error: ", err)
   106  	}
   107  }
   108  
   109  func (h *dbHarness) closeDB0() error {
   110  	h.t.Log("closing DB")
   111  	return h.db.Close()
   112  }
   113  
   114  func (h *dbHarness) closeDB() {
   115  	if h.db != nil {
   116  		if err := h.closeDB0(); err != nil && err != ErrClosed {
   117  			h.t.Error("Close: got error: ", err)
   118  		}
   119  	}
   120  	h.stor.CloseCheck()
   121  	runtime.GC()
   122  }
   123  
   124  func (h *dbHarness) reopenDB() {
   125  	if h.db != nil {
   126  		h.closeDB()
   127  	}
   128  	h.openDB()
   129  }
   130  
   131  func (h *dbHarness) close() {
   132  	if h.db != nil {
   133  		if err := h.closeDB0(); err != nil && err != ErrClosed {
   134  			h.t.Error("Close: got error: ", err)
   135  		}
   136  		h.db = nil
   137  	}
   138  	h.stor.Close()
   139  	h.stor = nil
   140  	runtime.GC()
   141  }
   142  
   143  func (h *dbHarness) openAssert(want bool) {
   144  	db, err := Open(h.stor, h.o)
   145  	if err != nil {
   146  		if want {
   147  			h.t.Error("Open: assert: got error: ", err)
   148  		} else {
   149  			h.t.Log("Open: assert: got error (expected): ", err)
   150  		}
   151  	} else {
   152  		if !want {
   153  			h.t.Error("Open: assert: expect error")
   154  		}
   155  		db.Close()
   156  	}
   157  }
   158  
   159  func (h *dbHarness) write(batch *Batch) {
   160  	if err := h.db.Write(batch, h.wo); err != nil {
   161  		h.t.Error("Write: got error: ", err)
   162  	}
   163  }
   164  
   165  func (h *dbHarness) put(key, value string) {
   166  	if err := h.db.Put([]byte(key), []byte(value), h.wo); err != nil {
   167  		h.t.Error("Put: got error: ", err)
   168  	}
   169  }
   170  
   171  func (h *dbHarness) putMulti(n int, low, hi string) {
   172  	for i := 0; i < n; i++ {
   173  		h.put(low, "begin")
   174  		h.put(hi, "end")
   175  		h.compactMem()
   176  	}
   177  }
   178  
   179  func (h *dbHarness) maxNextLevelOverlappingBytes(want int64) {
   180  	t := h.t
   181  	db := h.db
   182  
   183  	var (
   184  		maxOverlaps int64
   185  		maxLevel    int
   186  	)
   187  	v := db.s.version()
   188  	if len(v.levels) > 2 {
   189  		for i, tt := range v.levels[1 : len(v.levels)-1] {
   190  			level := i + 1
   191  			next := v.levels[level+1]
   192  			for _, t := range tt {
   193  				r := next.getOverlaps(nil, db.s.icmp, t.imin.ukey(), t.imax.ukey(), false)
   194  				sum := r.size()
   195  				if sum > maxOverlaps {
   196  					maxOverlaps = sum
   197  					maxLevel = level
   198  				}
   199  			}
   200  		}
   201  	}
   202  	v.release()
   203  
   204  	if maxOverlaps > want {
   205  		t.Errorf("next level most overlapping bytes is more than %d, got=%d level=%d", want, maxOverlaps, maxLevel)
   206  	} else {
   207  		t.Logf("next level most overlapping bytes is %d, level=%d want=%d", maxOverlaps, maxLevel, want)
   208  	}
   209  }
   210  
   211  func (h *dbHarness) delete(key string) {
   212  	t := h.t
   213  	db := h.db
   214  
   215  	err := db.Delete([]byte(key), h.wo)
   216  	if err != nil {
   217  		t.Error("Delete: got error: ", err)
   218  	}
   219  }
   220  
   221  func (h *dbHarness) assertNumKeys(want int) {
   222  	iter := h.db.NewIterator(nil, h.ro)
   223  	defer iter.Release()
   224  	got := 0
   225  	for iter.Next() {
   226  		got++
   227  	}
   228  	if err := iter.Error(); err != nil {
   229  		h.t.Error("assertNumKeys: ", err)
   230  	}
   231  	if want != got {
   232  		h.t.Errorf("assertNumKeys: want=%d got=%d", want, got)
   233  	}
   234  }
   235  
   236  func (h *dbHarness) getr(db Reader, key string, expectFound bool) (found bool, v []byte) {
   237  	t := h.t
   238  	v, err := db.Get([]byte(key), h.ro)
   239  	switch err {
   240  	case ErrNotFound:
   241  		if expectFound {
   242  			t.Errorf("Get: key '%s' not found, want found", key)
   243  		}
   244  	case nil:
   245  		found = true
   246  		if !expectFound {
   247  			t.Errorf("Get: key '%s' found, want not found", key)
   248  		}
   249  	default:
   250  		t.Error("Get: got error: ", err)
   251  	}
   252  	return
   253  }
   254  
   255  func (h *dbHarness) get(key string, expectFound bool) (found bool, v []byte) {
   256  	return h.getr(h.db, key, expectFound)
   257  }
   258  
   259  func (h *dbHarness) getValr(db Reader, key, value string) {
   260  	t := h.t
   261  	found, r := h.getr(db, key, true)
   262  	if !found {
   263  		return
   264  	}
   265  	rval := string(r)
   266  	if rval != value {
   267  		t.Errorf("Get: invalid value, got '%s', want '%s'", rval, value)
   268  	}
   269  }
   270  
   271  func (h *dbHarness) getVal(key, value string) {
   272  	h.getValr(h.db, key, value)
   273  }
   274  
   275  func (h *dbHarness) allEntriesFor(key, want string) {
   276  	t := h.t
   277  	db := h.db
   278  	s := db.s
   279  
   280  	ikey := makeInternalKey(nil, []byte(key), keyMaxSeq, keyTypeVal)
   281  	iter := db.newRawIterator(nil, nil, nil, nil)
   282  	if !iter.Seek(ikey) && iter.Error() != nil {
   283  		t.Error("AllEntries: error during seek, err: ", iter.Error())
   284  		return
   285  	}
   286  	res := "[ "
   287  	first := true
   288  	for iter.Valid() {
   289  		if ukey, _, kt, kerr := parseInternalKey(iter.Key()); kerr == nil {
   290  			if s.icmp.uCompare(ikey.ukey(), ukey) != 0 {
   291  				break
   292  			}
   293  			if !first {
   294  				res += ", "
   295  			}
   296  			first = false
   297  			switch kt {
   298  			case keyTypeVal:
   299  				res += string(iter.Value())
   300  			case keyTypeDel:
   301  				res += "DEL"
   302  			}
   303  		} else {
   304  			if !first {
   305  				res += ", "
   306  			}
   307  			first = false
   308  			res += "CORRUPTED"
   309  		}
   310  		iter.Next()
   311  	}
   312  	if !first {
   313  		res += " "
   314  	}
   315  	res += "]"
   316  	if res != want {
   317  		t.Errorf("AllEntries: assert failed for key %q, got=%q want=%q", key, res, want)
   318  	}
   319  }
   320  
   321  // Return a string that contains all key,value pairs in order,
   322  // formatted like "(k1->v1)(k2->v2)".
   323  func (h *dbHarness) getKeyVal(want string) {
   324  	t := h.t
   325  	db := h.db
   326  
   327  	s, err := db.GetSnapshot()
   328  	if err != nil {
   329  		t.Fatal("GetSnapshot: got error: ", err)
   330  	}
   331  	res := ""
   332  	iter := s.NewIterator(nil, nil)
   333  	for iter.Next() {
   334  		res += fmt.Sprintf("(%s->%s)", string(iter.Key()), string(iter.Value()))
   335  	}
   336  	iter.Release()
   337  
   338  	if res != want {
   339  		t.Errorf("GetKeyVal: invalid key/value pair, got=%q want=%q", res, want)
   340  	}
   341  	s.Release()
   342  }
   343  
   344  func (h *dbHarness) waitCompaction() {
   345  	t := h.t
   346  	db := h.db
   347  	if err := db.compTriggerWait(db.tcompCmdC); err != nil {
   348  		t.Error("compaction error: ", err)
   349  	}
   350  }
   351  
   352  func (h *dbHarness) waitMemCompaction() {
   353  	t := h.t
   354  	db := h.db
   355  
   356  	if err := db.compTriggerWait(db.mcompCmdC); err != nil {
   357  		t.Error("compaction error: ", err)
   358  	}
   359  }
   360  
   361  func (h *dbHarness) compactMem() {
   362  	t := h.t
   363  	db := h.db
   364  
   365  	t.Log("starting memdb compaction")
   366  
   367  	db.writeLockC <- struct{}{}
   368  	defer func() {
   369  		<-db.writeLockC
   370  	}()
   371  
   372  	if _, err := db.rotateMem(0, true); err != nil {
   373  		t.Error("compaction error: ", err)
   374  	}
   375  
   376  	if h.totalTables() == 0 {
   377  		t.Error("zero tables after mem compaction")
   378  	}
   379  
   380  	t.Log("memdb compaction done")
   381  }
   382  
   383  func (h *dbHarness) compactRangeAtErr(level int, min, max string, wanterr bool) {
   384  	t := h.t
   385  	db := h.db
   386  
   387  	var _min, _max []byte
   388  	if min != "" {
   389  		_min = []byte(min)
   390  	}
   391  	if max != "" {
   392  		_max = []byte(max)
   393  	}
   394  
   395  	t.Logf("starting table range compaction: level=%d, min=%q, max=%q", level, min, max)
   396  
   397  	if err := db.compTriggerRange(db.tcompCmdC, level, _min, _max); err != nil {
   398  		if wanterr {
   399  			t.Log("CompactRangeAt: got error (expected): ", err)
   400  		} else {
   401  			t.Error("CompactRangeAt: got error: ", err)
   402  		}
   403  	} else if wanterr {
   404  		t.Error("CompactRangeAt: expect error")
   405  	}
   406  
   407  	t.Log("table range compaction done")
   408  }
   409  
   410  func (h *dbHarness) compactRangeAt(level int, min, max string) {
   411  	h.compactRangeAtErr(level, min, max, false)
   412  }
   413  
   414  func (h *dbHarness) compactRange(min, max string) {
   415  	t := h.t
   416  	db := h.db
   417  
   418  	t.Logf("starting DB range compaction: min=%q, max=%q", min, max)
   419  
   420  	var r util.Range
   421  	if min != "" {
   422  		r.Start = []byte(min)
   423  	}
   424  	if max != "" {
   425  		r.Limit = []byte(max)
   426  	}
   427  	if err := db.CompactRange(r); err != nil {
   428  		t.Error("CompactRange: got error: ", err)
   429  	}
   430  
   431  	t.Log("DB range compaction done")
   432  }
   433  
   434  func (h *dbHarness) sizeOf(start, limit string) int64 {
   435  	sz, err := h.db.SizeOf([]util.Range{
   436  		{Start: []byte(start), Limit: []byte(limit)},
   437  	})
   438  	if err != nil {
   439  		h.t.Error("SizeOf: got error: ", err)
   440  	}
   441  	return sz.Sum()
   442  }
   443  
   444  func (h *dbHarness) sizeAssert(start, limit string, low, hi int64) {
   445  	sz := h.sizeOf(start, limit)
   446  	if sz < low || sz > hi {
   447  		h.t.Errorf("sizeOf %q to %q not in range, want %d - %d, got %d",
   448  			shorten(start), shorten(limit), low, hi, sz)
   449  	}
   450  }
   451  
   452  func (h *dbHarness) getSnapshot() (s *Snapshot) {
   453  	s, err := h.db.GetSnapshot()
   454  	if err != nil {
   455  		h.t.Fatal("GetSnapshot: got error: ", err)
   456  	}
   457  	return
   458  }
   459  
   460  func (h *dbHarness) getTablesPerLevel() string {
   461  	res := ""
   462  	nz := 0
   463  	v := h.db.s.version()
   464  	for level, tables := range v.levels {
   465  		if level > 0 {
   466  			res += ","
   467  		}
   468  		res += fmt.Sprint(len(tables))
   469  		if len(tables) > 0 {
   470  			nz = len(res)
   471  		}
   472  	}
   473  	v.release()
   474  	return res[:nz]
   475  }
   476  
   477  func (h *dbHarness) tablesPerLevel(want string) {
   478  	res := h.getTablesPerLevel()
   479  	if res != want {
   480  		h.t.Errorf("invalid tables len, want=%s, got=%s", want, res)
   481  	}
   482  }
   483  
   484  func (h *dbHarness) totalTables() (n int) {
   485  	v := h.db.s.version()
   486  	for _, tables := range v.levels {
   487  		n += len(tables)
   488  	}
   489  	v.release()
   490  	return
   491  }
   492  
   493  type keyValue interface {
   494  	Key() []byte
   495  	Value() []byte
   496  }
   497  
   498  func testKeyVal(t *testing.T, kv keyValue, want string) {
   499  	res := string(kv.Key()) + "->" + string(kv.Value())
   500  	if res != want {
   501  		t.Errorf("invalid key/value, want=%q, got=%q", want, res)
   502  	}
   503  }
   504  
   505  func numKey(num int) string {
   506  	return fmt.Sprintf("key%06d", num)
   507  }
   508  
   509  var testingBloomFilter = filter.NewBloomFilter(10)
   510  
   511  func truno(t *testing.T, o *opt.Options, f func(h *dbHarness)) {
   512  	for i := 0; i < 4; i++ {
   513  		func() {
   514  			switch i {
   515  			case 0:
   516  			case 1:
   517  				if o == nil {
   518  					o = &opt.Options{
   519  						DisableLargeBatchTransaction: true,
   520  						Filter:                       testingBloomFilter,
   521  					}
   522  				} else {
   523  					old := o
   524  					o = &opt.Options{}
   525  					*o = *old
   526  					o.Filter = testingBloomFilter
   527  				}
   528  			case 2:
   529  				if o == nil {
   530  					o = &opt.Options{
   531  						DisableLargeBatchTransaction: true,
   532  						Compression:                  opt.NoCompression,
   533  					}
   534  				} else {
   535  					old := o
   536  					o = &opt.Options{}
   537  					*o = *old
   538  					o.Compression = opt.NoCompression
   539  				}
   540  			}
   541  			h := newDbHarnessWopt(t, o)
   542  			defer h.close()
   543  			if i == 3 {
   544  				h.reopenDB()
   545  			}
   546  			f(h)
   547  		}()
   548  	}
   549  }
   550  
   551  func trun(t *testing.T, f func(h *dbHarness)) {
   552  	truno(t, nil, f)
   553  }
   554  
   555  func testAligned(t *testing.T, name string, offset uintptr) {
   556  	if offset%8 != 0 {
   557  		t.Errorf("field %s offset is not 64-bit aligned", name)
   558  	}
   559  }
   560  
   561  func Test_FieldsAligned(t *testing.T) {
   562  	p1 := new(DB) //nolint:staticcheck
   563  	testAligned(t, "DB.seq", unsafe.Offsetof(p1.seq))
   564  	p2 := new(session) //nolint:staticcheck
   565  	testAligned(t, "session.stNextFileNum", unsafe.Offsetof(p2.stNextFileNum))
   566  	testAligned(t, "session.stJournalNum", unsafe.Offsetof(p2.stJournalNum))
   567  	testAligned(t, "session.stPrevJournalNum", unsafe.Offsetof(p2.stPrevJournalNum))
   568  	testAligned(t, "session.stSeqNum", unsafe.Offsetof(p2.stSeqNum))
   569  }
   570  
   571  func TestDB_Locking(t *testing.T) {
   572  	h := newDbHarness(t)
   573  	defer h.stor.Close()
   574  	h.openAssert(false)
   575  	h.closeDB()
   576  	h.openAssert(true)
   577  }
   578  
   579  func TestDB_Empty(t *testing.T) {
   580  	trun(t, func(h *dbHarness) {
   581  		h.get("foo", false)
   582  
   583  		h.reopenDB()
   584  		h.get("foo", false)
   585  	})
   586  }
   587  
   588  func TestDB_ReadWrite(t *testing.T) {
   589  	trun(t, func(h *dbHarness) {
   590  		h.put("foo", "v1")
   591  		h.getVal("foo", "v1")
   592  		h.put("bar", "v2")
   593  		h.put("foo", "v3")
   594  		h.getVal("foo", "v3")
   595  		h.getVal("bar", "v2")
   596  
   597  		h.reopenDB()
   598  		h.getVal("foo", "v3")
   599  		h.getVal("bar", "v2")
   600  	})
   601  }
   602  
   603  func TestDB_PutDeleteGet(t *testing.T) {
   604  	trun(t, func(h *dbHarness) {
   605  		h.put("foo", "v1")
   606  		h.getVal("foo", "v1")
   607  		h.put("foo", "v2")
   608  		h.getVal("foo", "v2")
   609  		h.delete("foo")
   610  		h.get("foo", false)
   611  
   612  		h.reopenDB()
   613  		h.get("foo", false)
   614  	})
   615  }
   616  
   617  func TestDB_EmptyBatch(t *testing.T) {
   618  	h := newDbHarness(t)
   619  	defer h.close()
   620  
   621  	h.get("foo", false)
   622  	err := h.db.Write(new(Batch), h.wo)
   623  	if err != nil {
   624  		t.Error("writing empty batch yield error: ", err)
   625  	}
   626  	h.get("foo", false)
   627  }
   628  
   629  func TestDB_GetFromFrozen(t *testing.T) {
   630  	h := newDbHarnessWopt(t, &opt.Options{
   631  		DisableLargeBatchTransaction: true,
   632  		WriteBuffer:                  100100,
   633  	})
   634  	defer h.close()
   635  
   636  	h.put("foo", "v1")
   637  	h.getVal("foo", "v1")
   638  
   639  	h.stor.Stall(testutil.ModeSync, storage.TypeTable) // Block sync calls
   640  	h.put("k1", strings.Repeat("x", 100000))           // Fill memtable
   641  	h.put("k2", strings.Repeat("y", 100000))           // Trigger compaction
   642  	for i := 0; h.db.getFrozenMem() == nil && i < 100; i++ {
   643  		time.Sleep(10 * time.Microsecond)
   644  	}
   645  	if h.db.getFrozenMem() == nil {
   646  		h.stor.Release(testutil.ModeSync, storage.TypeTable)
   647  		t.Fatal("No frozen mem")
   648  	}
   649  	h.getVal("foo", "v1")
   650  	h.stor.Release(testutil.ModeSync, storage.TypeTable) // Release sync calls
   651  
   652  	h.reopenDB()
   653  	h.getVal("foo", "v1")
   654  	h.get("k1", true)
   655  	h.get("k2", true)
   656  }
   657  
   658  func TestDB_GetFromTable(t *testing.T) {
   659  	trun(t, func(h *dbHarness) {
   660  		h.put("foo", "v1")
   661  		h.compactMem()
   662  		h.getVal("foo", "v1")
   663  	})
   664  }
   665  
   666  func TestDB_GetSnapshot(t *testing.T) {
   667  	trun(t, func(h *dbHarness) {
   668  		bar := strings.Repeat("b", 200)
   669  		h.put("foo", "v1")
   670  		h.put(bar, "v1")
   671  
   672  		snap, err := h.db.GetSnapshot()
   673  		if err != nil {
   674  			t.Fatal("GetSnapshot: got error: ", err)
   675  		}
   676  
   677  		h.put("foo", "v2")
   678  		h.put(bar, "v2")
   679  
   680  		h.getVal("foo", "v2")
   681  		h.getVal(bar, "v2")
   682  		h.getValr(snap, "foo", "v1")
   683  		h.getValr(snap, bar, "v1")
   684  
   685  		h.compactMem()
   686  
   687  		h.getVal("foo", "v2")
   688  		h.getVal(bar, "v2")
   689  		h.getValr(snap, "foo", "v1")
   690  		h.getValr(snap, bar, "v1")
   691  
   692  		snap.Release()
   693  
   694  		h.reopenDB()
   695  		h.getVal("foo", "v2")
   696  		h.getVal(bar, "v2")
   697  	})
   698  }
   699  
   700  func TestDB_GetLevel0Ordering(t *testing.T) {
   701  	trun(t, func(h *dbHarness) {
   702  		h.db.memdbMaxLevel = 2
   703  
   704  		for i := 0; i < 4; i++ {
   705  			h.put("bar", fmt.Sprintf("b%d", i))
   706  			h.put("foo", fmt.Sprintf("v%d", i))
   707  			h.compactMem()
   708  		}
   709  		h.getVal("foo", "v3")
   710  		h.getVal("bar", "b3")
   711  
   712  		v := h.db.s.version()
   713  		t0len := v.tLen(0)
   714  		v.release()
   715  		if t0len < 2 {
   716  			t.Errorf("level-0 tables is less than 2, got %d", t0len)
   717  		}
   718  
   719  		h.reopenDB()
   720  		h.getVal("foo", "v3")
   721  		h.getVal("bar", "b3")
   722  	})
   723  }
   724  
   725  func TestDB_GetOrderedByLevels(t *testing.T) {
   726  	trun(t, func(h *dbHarness) {
   727  		h.put("foo", "v1")
   728  		h.compactMem()
   729  		h.compactRange("a", "z")
   730  		h.getVal("foo", "v1")
   731  		h.put("foo", "v2")
   732  		h.compactMem()
   733  		h.getVal("foo", "v2")
   734  	})
   735  }
   736  
   737  func TestDB_GetPicksCorrectFile(t *testing.T) {
   738  	trun(t, func(h *dbHarness) {
   739  		// Arrange to have multiple files in a non-level-0 level.
   740  		h.put("a", "va")
   741  		h.compactMem()
   742  		h.compactRange("a", "b")
   743  		h.put("x", "vx")
   744  		h.compactMem()
   745  		h.compactRange("x", "y")
   746  		h.put("f", "vf")
   747  		h.compactMem()
   748  		h.compactRange("f", "g")
   749  
   750  		h.getVal("a", "va")
   751  		h.getVal("f", "vf")
   752  		h.getVal("x", "vx")
   753  
   754  		h.compactRange("", "")
   755  		h.getVal("a", "va")
   756  		h.getVal("f", "vf")
   757  		h.getVal("x", "vx")
   758  	})
   759  }
   760  
   761  func TestDB_GetEncountersEmptyLevel(t *testing.T) {
   762  	trun(t, func(h *dbHarness) {
   763  		h.db.memdbMaxLevel = 2
   764  
   765  		// Arrange for the following to happen:
   766  		//   * sstable A in level 0
   767  		//   * nothing in level 1
   768  		//   * sstable B in level 2
   769  		// Then do enough Get() calls to arrange for an automatic compaction
   770  		// of sstable A.  A bug would cause the compaction to be marked as
   771  		// occurring at level 1 (instead of the correct level 0).
   772  
   773  		// Step 1: First place sstables in levels 0 and 2
   774  		for i := 0; ; i++ {
   775  			if i >= 100 {
   776  				t.Fatal("could not fill levels-0 and level-2")
   777  			}
   778  			v := h.db.s.version()
   779  			if v.tLen(0) > 0 && v.tLen(2) > 0 {
   780  				v.release()
   781  				break
   782  			}
   783  			v.release()
   784  			h.put("a", "begin")
   785  			h.put("z", "end")
   786  			h.compactMem()
   787  
   788  			h.getVal("a", "begin")
   789  			h.getVal("z", "end")
   790  		}
   791  
   792  		// Step 2: clear level 1 if necessary.
   793  		h.compactRangeAt(1, "", "")
   794  		h.tablesPerLevel("1,0,1")
   795  
   796  		h.getVal("a", "begin")
   797  		h.getVal("z", "end")
   798  
   799  		// Step 3: read a bunch of times
   800  		for i := 0; i < 200; i++ {
   801  			h.get("missing", false)
   802  		}
   803  
   804  		// Step 4: Wait for compaction to finish
   805  		h.waitCompaction()
   806  
   807  		v := h.db.s.version()
   808  		if v.tLen(0) > 0 {
   809  			t.Errorf("level-0 tables more than 0, got %d", v.tLen(0))
   810  		}
   811  		v.release()
   812  
   813  		h.getVal("a", "begin")
   814  		h.getVal("z", "end")
   815  	})
   816  }
   817  
   818  func TestDB_IterMultiWithDelete(t *testing.T) {
   819  	trun(t, func(h *dbHarness) {
   820  		h.put("a", "va")
   821  		h.put("b", "vb")
   822  		h.put("c", "vc")
   823  		h.delete("b")
   824  		h.get("b", false)
   825  
   826  		iter := h.db.NewIterator(nil, nil)
   827  		iter.Seek([]byte("c"))
   828  		testKeyVal(t, iter, "c->vc")
   829  		iter.Prev()
   830  		testKeyVal(t, iter, "a->va")
   831  		iter.Release()
   832  
   833  		h.compactMem()
   834  
   835  		iter = h.db.NewIterator(nil, nil)
   836  		iter.Seek([]byte("c"))
   837  		testKeyVal(t, iter, "c->vc")
   838  		iter.Prev()
   839  		testKeyVal(t, iter, "a->va")
   840  		iter.Release()
   841  	})
   842  }
   843  
   844  func TestDB_IteratorPinsRef(t *testing.T) {
   845  	h := newDbHarness(t)
   846  	defer h.close()
   847  
   848  	h.put("foo", "hello")
   849  
   850  	// Get iterator that will yield the current contents of the DB.
   851  	iter := h.db.NewIterator(nil, nil)
   852  
   853  	// Write to force compactions
   854  	h.put("foo", "newvalue1")
   855  	for i := 0; i < 100; i++ {
   856  		h.put(numKey(i), strings.Repeat(fmt.Sprintf("v%09d", i), 100000/10))
   857  	}
   858  	h.put("foo", "newvalue2")
   859  
   860  	iter.First()
   861  	testKeyVal(t, iter, "foo->hello")
   862  	if iter.Next() {
   863  		t.Errorf("expect eof")
   864  	}
   865  	iter.Release()
   866  }
   867  
   868  func TestDB_Recover(t *testing.T) {
   869  	trun(t, func(h *dbHarness) {
   870  		h.put("foo", "v1")
   871  		h.put("baz", "v5")
   872  
   873  		h.reopenDB()
   874  		h.getVal("foo", "v1")
   875  
   876  		h.getVal("foo", "v1")
   877  		h.getVal("baz", "v5")
   878  		h.put("bar", "v2")
   879  		h.put("foo", "v3")
   880  
   881  		h.reopenDB()
   882  		h.getVal("foo", "v3")
   883  		h.put("foo", "v4")
   884  		h.getVal("foo", "v4")
   885  		h.getVal("bar", "v2")
   886  		h.getVal("baz", "v5")
   887  	})
   888  }
   889  
   890  func TestDB_RecoverWithEmptyJournal(t *testing.T) {
   891  	trun(t, func(h *dbHarness) {
   892  		h.put("foo", "v1")
   893  		h.put("foo", "v2")
   894  
   895  		h.reopenDB()
   896  		h.reopenDB()
   897  		h.put("foo", "v3")
   898  
   899  		h.reopenDB()
   900  		h.getVal("foo", "v3")
   901  	})
   902  }
   903  
   904  func TestDB_RecoverDuringMemtableCompaction(t *testing.T) {
   905  	truno(t, &opt.Options{DisableLargeBatchTransaction: true, WriteBuffer: 1000000}, func(h *dbHarness) {
   906  
   907  		h.stor.Stall(testutil.ModeSync, storage.TypeTable)
   908  		h.put("big1", strings.Repeat("x", 10000000))
   909  		h.put("big2", strings.Repeat("y", 1000))
   910  		h.put("bar", "v2")
   911  		h.stor.Release(testutil.ModeSync, storage.TypeTable)
   912  
   913  		h.reopenDB()
   914  		h.getVal("bar", "v2")
   915  		h.getVal("big1", strings.Repeat("x", 10000000))
   916  		h.getVal("big2", strings.Repeat("y", 1000))
   917  	})
   918  }
   919  
   920  func TestDB_MinorCompactionsHappen(t *testing.T) {
   921  	h := newDbHarnessWopt(t, &opt.Options{DisableLargeBatchTransaction: true, WriteBuffer: 10000})
   922  	defer h.close()
   923  
   924  	n := 500
   925  
   926  	key := func(i int) string {
   927  		return fmt.Sprintf("key%06d", i)
   928  	}
   929  
   930  	for i := 0; i < n; i++ {
   931  		h.put(key(i), key(i)+strings.Repeat("v", 1000))
   932  	}
   933  
   934  	for i := 0; i < n; i++ {
   935  		h.getVal(key(i), key(i)+strings.Repeat("v", 1000))
   936  	}
   937  
   938  	h.reopenDB()
   939  	for i := 0; i < n; i++ {
   940  		h.getVal(key(i), key(i)+strings.Repeat("v", 1000))
   941  	}
   942  }
   943  
   944  func TestDB_RecoverWithLargeJournal(t *testing.T) {
   945  	h := newDbHarness(t)
   946  	defer h.close()
   947  
   948  	h.put("big1", strings.Repeat("1", 200000))
   949  	h.put("big2", strings.Repeat("2", 200000))
   950  	h.put("small3", strings.Repeat("3", 10))
   951  	h.put("small4", strings.Repeat("4", 10))
   952  	h.tablesPerLevel("")
   953  
   954  	// Make sure that if we re-open with a small write buffer size that
   955  	// we flush table files in the middle of a large journal file.
   956  	h.o.WriteBuffer = 100000
   957  	h.reopenDB()
   958  	h.getVal("big1", strings.Repeat("1", 200000))
   959  	h.getVal("big2", strings.Repeat("2", 200000))
   960  	h.getVal("small3", strings.Repeat("3", 10))
   961  	h.getVal("small4", strings.Repeat("4", 10))
   962  	v := h.db.s.version()
   963  	if v.tLen(0) <= 1 {
   964  		t.Errorf("tables-0 less than one")
   965  	}
   966  	v.release()
   967  }
   968  
   969  func TestDB_CompactionsGenerateMultipleFiles(t *testing.T) {
   970  	h := newDbHarnessWopt(t, &opt.Options{
   971  		DisableLargeBatchTransaction: true,
   972  		WriteBuffer:                  10000000,
   973  		Compression:                  opt.NoCompression,
   974  	})
   975  	defer h.close()
   976  
   977  	v := h.db.s.version()
   978  	if v.tLen(0) > 0 {
   979  		t.Errorf("level-0 tables more than 0, got %d", v.tLen(0))
   980  	}
   981  	v.release()
   982  
   983  	n := 80
   984  
   985  	// Write 8MB (80 values, each 100K)
   986  	for i := 0; i < n; i++ {
   987  		h.put(numKey(i), strings.Repeat(fmt.Sprintf("v%09d", i), 100000/10))
   988  	}
   989  
   990  	// Reopening moves updates to level-0
   991  	h.reopenDB()
   992  	h.compactRangeAt(0, "", "")
   993  
   994  	v = h.db.s.version()
   995  	if v.tLen(0) > 0 {
   996  		t.Errorf("level-0 tables more than 0, got %d", v.tLen(0))
   997  	}
   998  	if v.tLen(1) <= 1 {
   999  		t.Errorf("level-1 tables less than 1, got %d", v.tLen(1))
  1000  	}
  1001  	v.release()
  1002  
  1003  	for i := 0; i < n; i++ {
  1004  		h.getVal(numKey(i), strings.Repeat(fmt.Sprintf("v%09d", i), 100000/10))
  1005  	}
  1006  }
  1007  
  1008  func TestDB_RepeatedWritesToSameKey(t *testing.T) {
  1009  	h := newDbHarnessWopt(t, &opt.Options{DisableLargeBatchTransaction: true, WriteBuffer: 100000})
  1010  	defer h.close()
  1011  
  1012  	maxTables := h.o.GetWriteL0PauseTrigger() + 7
  1013  
  1014  	value := strings.Repeat("v", 2*h.o.GetWriteBuffer())
  1015  	for i := 0; i < 5*maxTables; i++ {
  1016  		h.put("key", value)
  1017  		n := h.totalTables()
  1018  		if n > maxTables {
  1019  			t.Errorf("total tables exceed %d, got=%d, iter=%d", maxTables, n, i)
  1020  		}
  1021  	}
  1022  }
  1023  
  1024  func TestDB_RepeatedWritesToSameKeyAfterReopen(t *testing.T) {
  1025  	h := newDbHarnessWopt(t, &opt.Options{
  1026  		DisableLargeBatchTransaction: true,
  1027  		WriteBuffer:                  100000,
  1028  	})
  1029  	defer h.close()
  1030  
  1031  	h.reopenDB()
  1032  
  1033  	maxTables := h.o.GetWriteL0PauseTrigger() + 7
  1034  
  1035  	value := strings.Repeat("v", 2*h.o.GetWriteBuffer())
  1036  	for i := 0; i < 5*maxTables; i++ {
  1037  		h.put("key", value)
  1038  		n := h.totalTables()
  1039  		if n > maxTables {
  1040  			t.Errorf("total tables exceed %d, got=%d, iter=%d", maxTables, n, i)
  1041  		}
  1042  	}
  1043  }
  1044  
  1045  func TestDB_SparseMerge(t *testing.T) {
  1046  	h := newDbHarnessWopt(t, &opt.Options{DisableLargeBatchTransaction: true, Compression: opt.NoCompression})
  1047  	defer h.close()
  1048  
  1049  	h.putMulti(7, "A", "Z")
  1050  
  1051  	// Suppose there is:
  1052  	//    small amount of data with prefix A
  1053  	//    large amount of data with prefix B
  1054  	//    small amount of data with prefix C
  1055  	// and that recent updates have made small changes to all three prefixes.
  1056  	// Check that we do not do a compaction that merges all of B in one shot.
  1057  	h.put("A", "va")
  1058  	value := strings.Repeat("x", 1000)
  1059  	for i := 0; i < 100000; i++ {
  1060  		h.put(fmt.Sprintf("B%010d", i), value)
  1061  	}
  1062  	h.put("C", "vc")
  1063  	h.compactMem()
  1064  	h.compactRangeAt(0, "", "")
  1065  	h.waitCompaction()
  1066  
  1067  	// Make sparse update
  1068  	h.put("A", "va2")
  1069  	h.put("B100", "bvalue2")
  1070  	h.put("C", "vc2")
  1071  	h.compactMem()
  1072  
  1073  	h.waitCompaction()
  1074  	h.maxNextLevelOverlappingBytes(20 * 1048576)
  1075  	h.compactRangeAt(0, "", "")
  1076  	h.waitCompaction()
  1077  	h.maxNextLevelOverlappingBytes(20 * 1048576)
  1078  	h.compactRangeAt(1, "", "")
  1079  	h.waitCompaction()
  1080  	h.maxNextLevelOverlappingBytes(20 * 1048576)
  1081  }
  1082  
  1083  func TestDB_SizeOf(t *testing.T) {
  1084  	h := newDbHarnessWopt(t, &opt.Options{
  1085  		DisableLargeBatchTransaction: true,
  1086  		Compression:                  opt.NoCompression,
  1087  		WriteBuffer:                  10000000,
  1088  	})
  1089  	defer h.close()
  1090  
  1091  	h.sizeAssert("", "xyz", 0, 0)
  1092  	h.reopenDB()
  1093  	h.sizeAssert("", "xyz", 0, 0)
  1094  
  1095  	// Write 8MB (80 values, each 100K)
  1096  	n := 80
  1097  	s1 := 100000
  1098  	s2 := 105000
  1099  
  1100  	for i := 0; i < n; i++ {
  1101  		h.put(numKey(i), strings.Repeat(fmt.Sprintf("v%09d", i), s1/10))
  1102  	}
  1103  
  1104  	// 0 because SizeOf() does not account for memtable space
  1105  	h.sizeAssert("", numKey(50), 0, 0)
  1106  
  1107  	for r := 0; r < 3; r++ {
  1108  		h.reopenDB()
  1109  
  1110  		for cs := 0; cs < n; cs += 10 {
  1111  			for i := 0; i < n; i += 10 {
  1112  				h.sizeAssert("", numKey(i), int64(s1*i), int64(s2*i))
  1113  				h.sizeAssert("", numKey(i)+".suffix", int64(s1*(i+1)), int64(s2*(i+1)))
  1114  				h.sizeAssert(numKey(i), numKey(i+10), int64(s1*10), int64(s2*10))
  1115  			}
  1116  
  1117  			h.sizeAssert("", numKey(50), int64(s1*50), int64(s2*50))
  1118  			h.sizeAssert("", numKey(50)+".suffix", int64(s1*50), int64(s2*50))
  1119  
  1120  			h.compactRangeAt(0, numKey(cs), numKey(cs+9))
  1121  		}
  1122  
  1123  		v := h.db.s.version()
  1124  		if v.tLen(0) != 0 {
  1125  			t.Errorf("level-0 tables was not zero, got %d", v.tLen(0))
  1126  		}
  1127  		if v.tLen(1) == 0 {
  1128  			t.Error("level-1 tables was zero")
  1129  		}
  1130  		v.release()
  1131  	}
  1132  }
  1133  
  1134  func TestDB_SizeOf_MixOfSmallAndLarge(t *testing.T) {
  1135  	h := newDbHarnessWopt(t, &opt.Options{
  1136  		DisableLargeBatchTransaction: true,
  1137  		Compression:                  opt.NoCompression,
  1138  	})
  1139  	defer h.close()
  1140  
  1141  	sizes := []int64{
  1142  		10000,
  1143  		10000,
  1144  		100000,
  1145  		10000,
  1146  		100000,
  1147  		10000,
  1148  		300000,
  1149  		10000,
  1150  	}
  1151  
  1152  	for i, n := range sizes {
  1153  		h.put(numKey(i), strings.Repeat(fmt.Sprintf("v%09d", i), int(n)/10))
  1154  	}
  1155  
  1156  	for r := 0; r < 3; r++ {
  1157  		h.reopenDB()
  1158  
  1159  		var x int64
  1160  		for i, n := range sizes {
  1161  			y := x
  1162  			if i > 0 {
  1163  				y += 1000
  1164  			}
  1165  			h.sizeAssert("", numKey(i), x, y)
  1166  			x += n
  1167  		}
  1168  
  1169  		h.sizeAssert(numKey(3), numKey(5), 110000, 111000)
  1170  
  1171  		h.compactRangeAt(0, "", "")
  1172  	}
  1173  }
  1174  
  1175  func TestDB_Snapshot(t *testing.T) {
  1176  	trun(t, func(h *dbHarness) {
  1177  		h.put("foo", "v1")
  1178  		s1 := h.getSnapshot()
  1179  		h.put("foo", "v2")
  1180  		s2 := h.getSnapshot()
  1181  		h.put("foo", "v3")
  1182  		s3 := h.getSnapshot()
  1183  		h.put("foo", "v4")
  1184  
  1185  		h.getValr(s1, "foo", "v1")
  1186  		h.getValr(s2, "foo", "v2")
  1187  		h.getValr(s3, "foo", "v3")
  1188  		h.getVal("foo", "v4")
  1189  
  1190  		s3.Release()
  1191  		h.getValr(s1, "foo", "v1")
  1192  		h.getValr(s2, "foo", "v2")
  1193  		h.getVal("foo", "v4")
  1194  
  1195  		s1.Release()
  1196  		h.getValr(s2, "foo", "v2")
  1197  		h.getVal("foo", "v4")
  1198  
  1199  		s2.Release()
  1200  		h.getVal("foo", "v4")
  1201  	})
  1202  }
  1203  
  1204  func TestDB_SnapshotList(t *testing.T) {
  1205  	db := &DB{snapsList: list.New()}
  1206  	e0a := db.acquireSnapshot()
  1207  	e0b := db.acquireSnapshot()
  1208  	db.seq = 1
  1209  	e1 := db.acquireSnapshot()
  1210  	db.seq = 2
  1211  	e2 := db.acquireSnapshot()
  1212  
  1213  	if db.minSeq() != 0 {
  1214  		t.Fatalf("invalid sequence number, got=%d", db.minSeq())
  1215  	}
  1216  	db.releaseSnapshot(e0a)
  1217  	if db.minSeq() != 0 {
  1218  		t.Fatalf("invalid sequence number, got=%d", db.minSeq())
  1219  	}
  1220  	db.releaseSnapshot(e2)
  1221  	if db.minSeq() != 0 {
  1222  		t.Fatalf("invalid sequence number, got=%d", db.minSeq())
  1223  	}
  1224  	db.releaseSnapshot(e0b)
  1225  	if db.minSeq() != 1 {
  1226  		t.Fatalf("invalid sequence number, got=%d", db.minSeq())
  1227  	}
  1228  	e2 = db.acquireSnapshot()
  1229  	if db.minSeq() != 1 {
  1230  		t.Fatalf("invalid sequence number, got=%d", db.minSeq())
  1231  	}
  1232  	db.releaseSnapshot(e1)
  1233  	if db.minSeq() != 2 {
  1234  		t.Fatalf("invalid sequence number, got=%d", db.minSeq())
  1235  	}
  1236  	db.releaseSnapshot(e2)
  1237  	if db.minSeq() != 2 {
  1238  		t.Fatalf("invalid sequence number, got=%d", db.minSeq())
  1239  	}
  1240  }
  1241  
  1242  func TestDB_HiddenValuesAreRemoved(t *testing.T) {
  1243  	trun(t, func(h *dbHarness) {
  1244  		s := h.db.s
  1245  
  1246  		m := 2
  1247  		h.db.memdbMaxLevel = m
  1248  
  1249  		h.put("foo", "v1")
  1250  		h.compactMem()
  1251  		v := s.version()
  1252  		num := v.tLen(m)
  1253  		v.release()
  1254  		if num != 1 {
  1255  			t.Errorf("invalid level-%d len, want=1 got=%d", m, num)
  1256  		}
  1257  
  1258  		// Place a table at level last-1 to prevent merging with preceding mutation
  1259  		h.put("a", "begin")
  1260  		h.put("z", "end")
  1261  		h.compactMem()
  1262  		v = s.version()
  1263  		if v.tLen(m) != 1 {
  1264  			t.Errorf("invalid level-%d len, want=1 got=%d", m, v.tLen(m))
  1265  		}
  1266  		if v.tLen(m-1) != 1 {
  1267  			t.Errorf("invalid level-%d len, want=1 got=%d", m-1, v.tLen(m-1))
  1268  		}
  1269  		v.release()
  1270  
  1271  		h.delete("foo")
  1272  		h.put("foo", "v2")
  1273  		h.allEntriesFor("foo", "[ v2, DEL, v1 ]")
  1274  		h.compactMem()
  1275  		h.allEntriesFor("foo", "[ v2, DEL, v1 ]")
  1276  		h.compactRangeAt(m-2, "", "z")
  1277  		// DEL eliminated, but v1 remains because we aren't compacting that level
  1278  		// (DEL can be eliminated because v2 hides v1).
  1279  		h.allEntriesFor("foo", "[ v2, v1 ]")
  1280  		h.compactRangeAt(m-1, "", "")
  1281  		// Merging last-1 w/ last, so we are the base level for "foo", so
  1282  		// DEL is removed.  (as is v1).
  1283  		h.allEntriesFor("foo", "[ v2 ]")
  1284  	})
  1285  }
  1286  
  1287  func TestDB_DeletionMarkers2(t *testing.T) {
  1288  	h := newDbHarness(t)
  1289  	defer h.close()
  1290  	s := h.db.s
  1291  
  1292  	m := 2
  1293  	h.db.memdbMaxLevel = m
  1294  
  1295  	h.put("foo", "v1")
  1296  	h.compactMem()
  1297  	v := s.version()
  1298  	num := v.tLen(m)
  1299  	v.release()
  1300  	if num != 1 {
  1301  		t.Errorf("invalid level-%d len, want=1 got=%d", m, num)
  1302  	}
  1303  
  1304  	// Place a table at level last-1 to prevent merging with preceding mutation
  1305  	h.put("a", "begin")
  1306  	h.put("z", "end")
  1307  	h.compactMem()
  1308  	v = s.version()
  1309  	if v.tLen(m) != 1 {
  1310  		t.Errorf("invalid level-%d len, want=1 got=%d", m, v.tLen(m))
  1311  	}
  1312  	if v.tLen(m-1) != 1 {
  1313  		t.Errorf("invalid level-%d len, want=1 got=%d", m-1, v.tLen(m-1))
  1314  	}
  1315  	v.release()
  1316  
  1317  	h.delete("foo")
  1318  	h.allEntriesFor("foo", "[ DEL, v1 ]")
  1319  	h.compactMem() // Moves to level last-2
  1320  	h.allEntriesFor("foo", "[ DEL, v1 ]")
  1321  	h.compactRangeAt(m-2, "", "")
  1322  	// DEL kept: "last" file overlaps
  1323  	h.allEntriesFor("foo", "[ DEL, v1 ]")
  1324  	h.compactRangeAt(m-1, "", "")
  1325  	// Merging last-1 w/ last, so we are the base level for "foo", so
  1326  	// DEL is removed.  (as is v1).
  1327  	h.allEntriesFor("foo", "[ ]")
  1328  }
  1329  
  1330  func TestDB_CompactionTableOpenError(t *testing.T) {
  1331  	h := newDbHarnessWopt(t, &opt.Options{
  1332  		DisableLargeBatchTransaction: true,
  1333  		OpenFilesCacheCapacity:       -1,
  1334  	})
  1335  	defer h.close()
  1336  
  1337  	h.db.memdbMaxLevel = 2
  1338  
  1339  	im := 10
  1340  	jm := 10
  1341  	for r := 0; r < 2; r++ {
  1342  		for i := 0; i < im; i++ {
  1343  			for j := 0; j < jm; j++ {
  1344  				h.put(fmt.Sprintf("k%d,%d", i, j), fmt.Sprintf("v%d,%d", i, j))
  1345  			}
  1346  			h.compactMem()
  1347  		}
  1348  	}
  1349  
  1350  	if n := h.totalTables(); n != im*2 {
  1351  		t.Errorf("total tables is %d, want %d", n, im*2)
  1352  	}
  1353  
  1354  	h.stor.EmulateError(testutil.ModeOpen, storage.TypeTable, errors.New("open error during table compaction"))
  1355  	go func() {
  1356  		if err := h.db.CompactRange(util.Range{}); err != nil && err != ErrClosed {
  1357  			t.Error("CompactRange error: ", err)
  1358  		}
  1359  	}()
  1360  	if err := h.db.compTriggerWait(h.db.tcompCmdC); err != nil {
  1361  		t.Log("compaction error: ", err)
  1362  	}
  1363  	if err := h.closeDB0(); err != nil {
  1364  		t.Error("Close: got error: ", err)
  1365  	}
  1366  	h.openDB()
  1367  	h.stor.EmulateError(testutil.ModeOpen, storage.TypeTable, nil)
  1368  
  1369  	for i := 0; i < im; i++ {
  1370  		for j := 0; j < jm; j++ {
  1371  			h.getVal(fmt.Sprintf("k%d,%d", i, j), fmt.Sprintf("v%d,%d", i, j))
  1372  		}
  1373  	}
  1374  }
  1375  
  1376  func TestDB_OverlapInLevel0(t *testing.T) {
  1377  	trun(t, func(h *dbHarness) {
  1378  		h.db.memdbMaxLevel = 2
  1379  
  1380  		// Fill levels 1 and 2 to disable the pushing of new memtables to levels > 0.
  1381  		h.put("100", "v100")
  1382  		h.put("999", "v999")
  1383  		h.compactMem()
  1384  		h.delete("100")
  1385  		h.delete("999")
  1386  		h.compactMem()
  1387  		h.tablesPerLevel("0,1,1")
  1388  
  1389  		// Make files spanning the following ranges in level-0:
  1390  		//  files[0]  200 .. 900
  1391  		//  files[1]  300 .. 500
  1392  		// Note that files are sorted by min key.
  1393  		h.put("300", "v300")
  1394  		h.put("500", "v500")
  1395  		h.compactMem()
  1396  		h.put("200", "v200")
  1397  		h.put("600", "v600")
  1398  		h.put("900", "v900")
  1399  		h.compactMem()
  1400  		h.tablesPerLevel("2,1,1")
  1401  
  1402  		// Compact away the placeholder files we created initially
  1403  		h.compactRangeAt(1, "", "")
  1404  		h.compactRangeAt(2, "", "")
  1405  		h.tablesPerLevel("2")
  1406  
  1407  		// Do a memtable compaction.  Before bug-fix, the compaction would
  1408  		// not detect the overlap with level-0 files and would incorrectly place
  1409  		// the deletion in a deeper level.
  1410  		h.delete("600")
  1411  		h.compactMem()
  1412  		h.tablesPerLevel("3")
  1413  		h.get("600", false)
  1414  	})
  1415  }
  1416  
  1417  func TestDB_L0_CompactionBug_Issue44_a(t *testing.T) {
  1418  	h := newDbHarness(t)
  1419  	defer h.close()
  1420  
  1421  	h.reopenDB()
  1422  	h.put("b", "v")
  1423  	h.reopenDB()
  1424  	h.delete("b")
  1425  	h.delete("a")
  1426  	h.reopenDB()
  1427  	h.delete("a")
  1428  	h.reopenDB()
  1429  	h.put("a", "v")
  1430  	h.reopenDB()
  1431  	h.reopenDB()
  1432  	h.getKeyVal("(a->v)")
  1433  	h.waitCompaction()
  1434  	h.getKeyVal("(a->v)")
  1435  }
  1436  
  1437  func TestDB_L0_CompactionBug_Issue44_b(t *testing.T) {
  1438  	h := newDbHarness(t)
  1439  	defer h.close()
  1440  
  1441  	h.reopenDB()
  1442  	h.put("", "")
  1443  	h.reopenDB()
  1444  	h.delete("e")
  1445  	h.put("", "")
  1446  	h.reopenDB()
  1447  	h.put("c", "cv")
  1448  	h.reopenDB()
  1449  	h.put("", "")
  1450  	h.reopenDB()
  1451  	h.put("", "")
  1452  	h.waitCompaction()
  1453  	h.reopenDB()
  1454  	h.put("d", "dv")
  1455  	h.reopenDB()
  1456  	h.put("", "")
  1457  	h.reopenDB()
  1458  	h.delete("d")
  1459  	h.delete("b")
  1460  	h.reopenDB()
  1461  	h.getKeyVal("(->)(c->cv)")
  1462  	h.waitCompaction()
  1463  	h.getKeyVal("(->)(c->cv)")
  1464  }
  1465  
  1466  func TestDB_SingleEntryMemCompaction(t *testing.T) {
  1467  	trun(t, func(h *dbHarness) {
  1468  		for i := 0; i < 10; i++ {
  1469  			h.put("big", strings.Repeat("v", opt.DefaultWriteBuffer))
  1470  			h.compactMem()
  1471  			h.put("key", strings.Repeat("v", opt.DefaultBlockSize))
  1472  			h.compactMem()
  1473  			h.put("k", "v")
  1474  			h.compactMem()
  1475  			h.put("", "")
  1476  			h.compactMem()
  1477  			h.put("verybig", strings.Repeat("v", opt.DefaultWriteBuffer*2))
  1478  			h.compactMem()
  1479  		}
  1480  	})
  1481  }
  1482  
  1483  func TestDB_ManifestWriteError(t *testing.T) {
  1484  	for i := 0; i < 2; i++ {
  1485  		func() {
  1486  			h := newDbHarness(t)
  1487  			defer h.close()
  1488  
  1489  			h.put("foo", "bar")
  1490  			h.getVal("foo", "bar")
  1491  
  1492  			// Mem compaction (will succeed)
  1493  			h.compactMem()
  1494  			h.getVal("foo", "bar")
  1495  			v := h.db.s.version()
  1496  			if n := v.tLen(0); n != 1 {
  1497  				t.Errorf("invalid total tables, want=1 got=%d", n)
  1498  			}
  1499  			v.release()
  1500  
  1501  			if i == 0 {
  1502  				h.stor.EmulateError(testutil.ModeWrite, storage.TypeManifest, errors.New("manifest write error"))
  1503  			} else {
  1504  				h.stor.EmulateError(testutil.ModeSync, storage.TypeManifest, errors.New("manifest sync error"))
  1505  			}
  1506  
  1507  			// Merging compaction (will fail)
  1508  			h.compactRangeAtErr(0, "", "", true)
  1509  
  1510  			h.db.Close()
  1511  			h.stor.EmulateError(testutil.ModeWrite, storage.TypeManifest, nil)
  1512  			h.stor.EmulateError(testutil.ModeSync, storage.TypeManifest, nil)
  1513  
  1514  			// Should not lose data
  1515  			h.openDB()
  1516  			h.getVal("foo", "bar")
  1517  		}()
  1518  	}
  1519  }
  1520  
  1521  func assertErr(t *testing.T, err error, wanterr bool) {
  1522  	if err != nil {
  1523  		if wanterr {
  1524  			t.Log("AssertErr: got error (expected): ", err)
  1525  		} else {
  1526  			t.Error("AssertErr: got error: ", err)
  1527  		}
  1528  	} else if wanterr {
  1529  		t.Error("AssertErr: expect error")
  1530  	}
  1531  }
  1532  
  1533  func TestDB_ClosedIsClosed(t *testing.T) {
  1534  	h := newDbHarness(t)
  1535  	db := h.db
  1536  
  1537  	var iter, iter2 iterator.Iterator
  1538  	var snap *Snapshot
  1539  	func() {
  1540  		defer h.close()
  1541  
  1542  		h.put("k", "v")
  1543  		h.getVal("k", "v")
  1544  
  1545  		iter = db.NewIterator(nil, h.ro)
  1546  		iter.Seek([]byte("k"))
  1547  		testKeyVal(t, iter, "k->v")
  1548  
  1549  		var err error
  1550  		snap, err = db.GetSnapshot()
  1551  		if err != nil {
  1552  			t.Fatal("GetSnapshot: got error: ", err)
  1553  		}
  1554  
  1555  		h.getValr(snap, "k", "v")
  1556  
  1557  		iter2 = snap.NewIterator(nil, h.ro)
  1558  		iter2.Seek([]byte("k"))
  1559  		testKeyVal(t, iter2, "k->v")
  1560  
  1561  		h.put("foo", "v2")
  1562  		h.delete("foo")
  1563  
  1564  		// closing DB
  1565  		iter.Release()
  1566  		iter2.Release()
  1567  	}()
  1568  
  1569  	assertErr(t, db.Put([]byte("x"), []byte("y"), h.wo), true)
  1570  	_, err := db.Get([]byte("k"), h.ro)
  1571  	assertErr(t, err, true)
  1572  
  1573  	if iter.Valid() {
  1574  		t.Errorf("iter.Valid should false")
  1575  	}
  1576  	assertErr(t, iter.Error(), false)
  1577  	testKeyVal(t, iter, "->")
  1578  	if iter.Seek([]byte("k")) {
  1579  		t.Errorf("iter.Seek should false")
  1580  	}
  1581  	assertErr(t, iter.Error(), true)
  1582  
  1583  	assertErr(t, iter2.Error(), false)
  1584  
  1585  	_, err = snap.Get([]byte("k"), h.ro)
  1586  	assertErr(t, err, true)
  1587  
  1588  	_, err = db.GetSnapshot()
  1589  	assertErr(t, err, true)
  1590  
  1591  	iter3 := db.NewIterator(nil, h.ro)
  1592  	assertErr(t, iter3.Error(), true)
  1593  
  1594  	iter3 = snap.NewIterator(nil, h.ro)
  1595  	assertErr(t, iter3.Error(), true)
  1596  
  1597  	assertErr(t, db.Delete([]byte("k"), h.wo), true)
  1598  
  1599  	_, err = db.GetProperty("leveldb.stats")
  1600  	assertErr(t, err, true)
  1601  
  1602  	_, err = db.SizeOf([]util.Range{{Start: []byte("a"), Limit: []byte("z")}})
  1603  	assertErr(t, err, true)
  1604  
  1605  	assertErr(t, db.CompactRange(util.Range{}), true)
  1606  
  1607  	assertErr(t, db.Close(), true)
  1608  }
  1609  
  1610  type numberComparer struct{}
  1611  
  1612  func (numberComparer) num(x []byte) (n int) {
  1613  	fmt.Sscan(string(x[1:len(x)-1]), &n)
  1614  	return
  1615  }
  1616  
  1617  func (numberComparer) Name() string {
  1618  	return "test.NumberComparer"
  1619  }
  1620  
  1621  func (p numberComparer) Compare(a, b []byte) int {
  1622  	return p.num(a) - p.num(b)
  1623  }
  1624  
  1625  func (numberComparer) Separator(dst, a, b []byte) []byte { return nil }
  1626  func (numberComparer) Successor(dst, b []byte) []byte    { return nil }
  1627  
  1628  func TestDB_CustomComparer(t *testing.T) {
  1629  	h := newDbHarnessWopt(t, &opt.Options{
  1630  		DisableLargeBatchTransaction: true,
  1631  		Comparer:                     numberComparer{},
  1632  		WriteBuffer:                  1000,
  1633  	})
  1634  	defer h.close()
  1635  
  1636  	h.put("[10]", "ten")
  1637  	h.put("[0x14]", "twenty")
  1638  	for i := 0; i < 2; i++ {
  1639  		h.getVal("[10]", "ten")
  1640  		h.getVal("[0xa]", "ten")
  1641  		h.getVal("[20]", "twenty")
  1642  		h.getVal("[0x14]", "twenty")
  1643  		h.get("[15]", false)
  1644  		h.get("[0xf]", false)
  1645  		h.compactMem()
  1646  		h.compactRange("[0]", "[9999]")
  1647  	}
  1648  
  1649  	for n := 0; n < 2; n++ {
  1650  		for i := 0; i < 100; i++ {
  1651  			v := fmt.Sprintf("[%d]", i*10)
  1652  			h.put(v, v)
  1653  		}
  1654  		h.compactMem()
  1655  		h.compactRange("[0]", "[1000000]")
  1656  	}
  1657  }
  1658  
  1659  func TestDB_ManualCompaction(t *testing.T) {
  1660  	h := newDbHarness(t)
  1661  	defer h.close()
  1662  
  1663  	h.db.memdbMaxLevel = 2
  1664  
  1665  	h.putMulti(3, "p", "q")
  1666  	h.tablesPerLevel("1,1,1")
  1667  
  1668  	// Compaction range falls before files
  1669  	h.compactRange("", "c")
  1670  	h.tablesPerLevel("1,1,1")
  1671  
  1672  	// Compaction range falls after files
  1673  	h.compactRange("r", "z")
  1674  	h.tablesPerLevel("1,1,1")
  1675  
  1676  	// Compaction range overlaps files
  1677  	h.compactRange("p1", "p9")
  1678  	h.tablesPerLevel("0,0,1")
  1679  
  1680  	// Populate a different range
  1681  	h.putMulti(3, "c", "e")
  1682  	h.tablesPerLevel("1,1,2")
  1683  
  1684  	// Compact just the new range
  1685  	h.compactRange("b", "f")
  1686  	h.tablesPerLevel("0,0,2")
  1687  
  1688  	// Compact all
  1689  	h.putMulti(1, "a", "z")
  1690  	h.tablesPerLevel("0,1,2")
  1691  	h.compactRange("", "")
  1692  	h.tablesPerLevel("0,0,1")
  1693  }
  1694  
  1695  func TestDB_BloomFilter(t *testing.T) {
  1696  	h := newDbHarnessWopt(t, &opt.Options{
  1697  		DisableLargeBatchTransaction: true,
  1698  		DisableBlockCache:            true,
  1699  		Filter:                       filter.NewBloomFilter(10),
  1700  	})
  1701  	defer h.close()
  1702  
  1703  	key := func(i int) string {
  1704  		return fmt.Sprintf("key%06d", i)
  1705  	}
  1706  
  1707  	const n = 10000
  1708  
  1709  	// Populate multiple layers
  1710  	for i := 0; i < n; i++ {
  1711  		h.put(key(i), key(i))
  1712  	}
  1713  	h.compactMem()
  1714  	h.compactRange("a", "z")
  1715  	for i := 0; i < n; i += 100 {
  1716  		h.put(key(i), key(i))
  1717  	}
  1718  	h.compactMem()
  1719  
  1720  	// Prevent auto compactions triggered by seeks
  1721  	h.stor.Stall(testutil.ModeSync, storage.TypeTable)
  1722  
  1723  	// Lookup present keys. Should rarely read from small sstable.
  1724  	h.stor.ResetCounter(testutil.ModeRead, storage.TypeTable)
  1725  	for i := 0; i < n; i++ {
  1726  		h.getVal(key(i), key(i))
  1727  	}
  1728  	cnt, _ := h.stor.Counter(testutil.ModeRead, storage.TypeTable)
  1729  	t.Logf("lookup of %d present keys yield %d sstable I/O reads", n, cnt)
  1730  	if min, max := n, n+2*n/100; cnt < min || cnt > max {
  1731  		t.Errorf("num of sstable I/O reads of present keys not in range of %d - %d, got %d", min, max, cnt)
  1732  	}
  1733  
  1734  	// Lookup missing keys. Should rarely read from either sstable.
  1735  	h.stor.ResetCounter(testutil.ModeRead, storage.TypeTable)
  1736  	for i := 0; i < n; i++ {
  1737  		h.get(key(i)+".missing", false)
  1738  	}
  1739  	cnt, _ = h.stor.Counter(testutil.ModeRead, storage.TypeTable)
  1740  	t.Logf("lookup of %d missing keys yield %d sstable I/O reads", n, cnt)
  1741  	if max := 3 * n / 100; cnt > max {
  1742  		t.Errorf("num of sstable I/O reads of missing keys was more than %d, got %d", max, cnt)
  1743  	}
  1744  
  1745  	h.stor.Release(testutil.ModeSync, storage.TypeTable)
  1746  }
  1747  
  1748  func TestDB_Concurrent(t *testing.T) {
  1749  	const n, secs, maxkey = 4, 6, 1000
  1750  	h := newDbHarness(t)
  1751  	defer h.close()
  1752  
  1753  	runtime.GOMAXPROCS(runtime.NumCPU())
  1754  
  1755  	var (
  1756  		closeWg sync.WaitGroup
  1757  		stop    uint32
  1758  		cnt     [n]uint32
  1759  	)
  1760  
  1761  	for i := 0; i < n; i++ {
  1762  		closeWg.Add(1)
  1763  		go func(i int) {
  1764  			var put, get, found uint
  1765  			defer func() {
  1766  				t.Logf("goroutine %d stopped after %d ops, put=%d get=%d found=%d missing=%d",
  1767  					i, cnt[i], put, get, found, get-found)
  1768  				closeWg.Done()
  1769  			}()
  1770  
  1771  			rnd := rand.New(rand.NewSource(int64(1000 + i)))
  1772  			for atomic.LoadUint32(&stop) == 0 {
  1773  				x := cnt[i]
  1774  
  1775  				k := rnd.Intn(maxkey)
  1776  				kstr := fmt.Sprintf("%016d", k)
  1777  
  1778  				if (rnd.Int() % 2) > 0 {
  1779  					put++
  1780  					h.put(kstr, fmt.Sprintf("%d.%d.%-1000d", k, i, x))
  1781  				} else {
  1782  					get++
  1783  					v, err := h.db.Get([]byte(kstr), h.ro)
  1784  					if err == nil {
  1785  						found++
  1786  						rk, ri, rx := 0, -1, uint32(0)
  1787  						fmt.Sscanf(string(v), "%d.%d.%d", &rk, &ri, &rx)
  1788  						if rk != k {
  1789  							t.Errorf("invalid key want=%d got=%d", k, rk)
  1790  						}
  1791  						if ri < 0 || ri >= n {
  1792  							t.Error("invalid goroutine number: ", ri)
  1793  						} else {
  1794  							tx := atomic.LoadUint32(&(cnt[ri]))
  1795  							if rx > tx {
  1796  								t.Errorf("invalid seq number, %d > %d ", rx, tx)
  1797  							}
  1798  						}
  1799  					} else if err != ErrNotFound {
  1800  						t.Error("Get: got error: ", err)
  1801  						return
  1802  					}
  1803  				}
  1804  				atomic.AddUint32(&cnt[i], 1)
  1805  			}
  1806  		}(i)
  1807  	}
  1808  
  1809  	time.Sleep(secs * time.Second)
  1810  	atomic.StoreUint32(&stop, 1)
  1811  	closeWg.Wait()
  1812  }
  1813  
  1814  func TestDB_ConcurrentIterator(t *testing.T) {
  1815  	const n, n2 = 4, 1000
  1816  	h := newDbHarnessWopt(t, &opt.Options{DisableLargeBatchTransaction: true, WriteBuffer: 30})
  1817  	defer h.close()
  1818  
  1819  	runtime.GOMAXPROCS(runtime.NumCPU())
  1820  
  1821  	var (
  1822  		closeWg sync.WaitGroup
  1823  		stop    uint32
  1824  	)
  1825  
  1826  	for i := 0; i < n; i++ {
  1827  		closeWg.Add(1)
  1828  		go func(i int) {
  1829  			for k := 0; atomic.LoadUint32(&stop) == 0; k++ {
  1830  				h.put(fmt.Sprintf("k%d", k), fmt.Sprintf("%d.%d.", k, i)+strings.Repeat("x", 10))
  1831  			}
  1832  			closeWg.Done()
  1833  		}(i)
  1834  	}
  1835  
  1836  	for i := 0; i < n; i++ {
  1837  		closeWg.Add(1)
  1838  		go func(i int) {
  1839  			for k := 1000000; k < 0 || atomic.LoadUint32(&stop) == 0; k-- {
  1840  				h.put(fmt.Sprintf("k%d", k), fmt.Sprintf("%d.%d.", k, i)+strings.Repeat("x", 10))
  1841  			}
  1842  			closeWg.Done()
  1843  		}(i)
  1844  	}
  1845  
  1846  	cmp := comparer.DefaultComparer
  1847  	for i := 0; i < n2; i++ {
  1848  		closeWg.Add(1)
  1849  		go func(i int) {
  1850  			it := h.db.NewIterator(nil, nil)
  1851  			var pk []byte
  1852  			for it.Next() {
  1853  				kk := it.Key()
  1854  				if cmp.Compare(kk, pk) <= 0 {
  1855  					t.Errorf("iter %d: %q is successor of %q", i, pk, kk)
  1856  				}
  1857  				pk = append(pk[:0], kk...)
  1858  				var k, vk, vi int
  1859  				if n, err := fmt.Sscanf(string(it.Key()), "k%d", &k); err != nil {
  1860  					t.Errorf("iter %d: Scanf error on key %q: %v", i, it.Key(), err)
  1861  				} else if n < 1 {
  1862  					t.Errorf("iter %d: Cannot parse key %q", i, it.Key())
  1863  				}
  1864  				if n, err := fmt.Sscanf(string(it.Value()), "%d.%d", &vk, &vi); err != nil {
  1865  					t.Errorf("iter %d: Scanf error on value %q: %v", i, it.Value(), err)
  1866  				} else if n < 2 {
  1867  					t.Errorf("iter %d: Cannot parse value %q", i, it.Value())
  1868  				}
  1869  
  1870  				if vk != k {
  1871  					t.Errorf("iter %d: invalid value i=%d, want=%d got=%d", i, vi, k, vk)
  1872  				}
  1873  			}
  1874  			if err := it.Error(); err != nil {
  1875  				t.Errorf("iter %d: Got error: %v", i, err)
  1876  			}
  1877  			it.Release()
  1878  			closeWg.Done()
  1879  		}(i)
  1880  	}
  1881  
  1882  	atomic.StoreUint32(&stop, 1)
  1883  	closeWg.Wait()
  1884  }
  1885  
  1886  func TestDB_ConcurrentWrite(t *testing.T) {
  1887  	const n, bk, niter = 10, 3, 10000
  1888  	h := newDbHarness(t)
  1889  	defer h.close()
  1890  
  1891  	runtime.GOMAXPROCS(runtime.NumCPU())
  1892  
  1893  	var wg sync.WaitGroup
  1894  	for i := 0; i < n; i++ {
  1895  		wg.Add(1)
  1896  		go func(i int) {
  1897  			defer wg.Done()
  1898  			for k := 0; k < niter; k++ {
  1899  				kstr := fmt.Sprintf("put-%d.%d", i, k)
  1900  				vstr := fmt.Sprintf("v%d", k)
  1901  				h.put(kstr, vstr)
  1902  				// Key should immediately available after put returns.
  1903  				h.getVal(kstr, vstr)
  1904  			}
  1905  		}(i)
  1906  	}
  1907  	for i := 0; i < n; i++ {
  1908  		wg.Add(1)
  1909  		batch := &Batch{}
  1910  		go func(i int) {
  1911  			defer wg.Done()
  1912  			for k := 0; k < niter; k++ {
  1913  				batch.Reset()
  1914  				for j := 0; j < bk; j++ {
  1915  					batch.Put([]byte(fmt.Sprintf("batch-%d.%d.%d", i, k, j)), []byte(fmt.Sprintf("v%d", k)))
  1916  				}
  1917  				h.write(batch)
  1918  				// Key should immediately available after put returns.
  1919  				for j := 0; j < bk; j++ {
  1920  					h.getVal(fmt.Sprintf("batch-%d.%d.%d", i, k, j), fmt.Sprintf("v%d", k))
  1921  				}
  1922  			}
  1923  		}(i)
  1924  	}
  1925  	wg.Wait()
  1926  }
  1927  
  1928  func TestDB_CreateReopenDbOnFile(t *testing.T) {
  1929  	dbpath := filepath.Join(os.TempDir(), fmt.Sprintf("goleveldbtestCreateReopenDbOnFile-%d", os.Getuid()))
  1930  	if err := os.RemoveAll(dbpath); err != nil {
  1931  		t.Fatal("cannot remove old db: ", err)
  1932  	}
  1933  	defer os.RemoveAll(dbpath)
  1934  
  1935  	for i := 0; i < 3; i++ {
  1936  		stor, err := storage.OpenFile(dbpath, false)
  1937  		if err != nil {
  1938  			t.Fatalf("(%d) cannot open storage: %s", i, err)
  1939  		}
  1940  		db, err := Open(stor, nil)
  1941  		if err != nil {
  1942  			t.Fatalf("(%d) cannot open db: %s", i, err)
  1943  		}
  1944  		if err := db.Put([]byte("foo"), []byte("bar"), nil); err != nil {
  1945  			t.Fatalf("(%d) cannot write to db: %s", i, err)
  1946  		}
  1947  		if err := db.Close(); err != nil {
  1948  			t.Fatalf("(%d) cannot close db: %s", i, err)
  1949  		}
  1950  		if err := stor.Close(); err != nil {
  1951  			t.Fatalf("(%d) cannot close storage: %s", i, err)
  1952  		}
  1953  	}
  1954  }
  1955  
  1956  func TestDB_CreateReopenDbOnFile2(t *testing.T) {
  1957  	dbpath := filepath.Join(os.TempDir(), fmt.Sprintf("goleveldbtestCreateReopenDbOnFile2-%d", os.Getuid()))
  1958  	if err := os.RemoveAll(dbpath); err != nil {
  1959  		t.Fatal("cannot remove old db: ", err)
  1960  	}
  1961  	defer os.RemoveAll(dbpath)
  1962  
  1963  	for i := 0; i < 3; i++ {
  1964  		db, err := OpenFile(dbpath, nil)
  1965  		if err != nil {
  1966  			t.Fatalf("(%d) cannot open db: %s", i, err)
  1967  		}
  1968  		if err := db.Put([]byte("foo"), []byte("bar"), nil); err != nil {
  1969  			t.Fatalf("(%d) cannot write to db: %s", i, err)
  1970  		}
  1971  		if err := db.Close(); err != nil {
  1972  			t.Fatalf("(%d) cannot close db: %s", i, err)
  1973  		}
  1974  	}
  1975  }
  1976  
  1977  func TestDB_DeletionMarkersOnMemdb(t *testing.T) {
  1978  	h := newDbHarness(t)
  1979  	defer h.close()
  1980  
  1981  	h.put("foo", "v1")
  1982  	h.compactMem()
  1983  	h.delete("foo")
  1984  	h.get("foo", false)
  1985  	h.getKeyVal("")
  1986  }
  1987  
  1988  func TestDB_LeveldbIssue178(t *testing.T) {
  1989  	nKeys := (opt.DefaultCompactionTableSize / 30) * 5
  1990  	key1 := func(i int) string {
  1991  		return fmt.Sprintf("my_key_%d", i)
  1992  	}
  1993  	key2 := func(i int) string {
  1994  		return fmt.Sprintf("my_key_%d_xxx", i)
  1995  	}
  1996  
  1997  	// Disable compression since it affects the creation of layers and the
  1998  	// code below is trying to test against a very specific scenario.
  1999  	h := newDbHarnessWopt(t, &opt.Options{
  2000  		DisableLargeBatchTransaction: true,
  2001  		Compression:                  opt.NoCompression,
  2002  	})
  2003  	defer h.close()
  2004  
  2005  	// Create first key range.
  2006  	batch := new(Batch)
  2007  	for i := 0; i < nKeys; i++ {
  2008  		batch.Put([]byte(key1(i)), []byte("value for range 1 key"))
  2009  	}
  2010  	h.write(batch)
  2011  
  2012  	// Create second key range.
  2013  	batch.Reset()
  2014  	for i := 0; i < nKeys; i++ {
  2015  		batch.Put([]byte(key2(i)), []byte("value for range 2 key"))
  2016  	}
  2017  	h.write(batch)
  2018  
  2019  	// Delete second key range.
  2020  	batch.Reset()
  2021  	for i := 0; i < nKeys; i++ {
  2022  		batch.Delete([]byte(key2(i)))
  2023  	}
  2024  	h.write(batch)
  2025  	h.waitMemCompaction()
  2026  
  2027  	// Run manual compaction.
  2028  	h.compactRange(key1(0), key1(nKeys-1))
  2029  
  2030  	// Checking the keys.
  2031  	h.assertNumKeys(nKeys)
  2032  }
  2033  
  2034  func TestDB_LeveldbIssue200(t *testing.T) {
  2035  	h := newDbHarness(t)
  2036  	defer h.close()
  2037  
  2038  	h.put("1", "b")
  2039  	h.put("2", "c")
  2040  	h.put("3", "d")
  2041  	h.put("4", "e")
  2042  	h.put("5", "f")
  2043  
  2044  	iter := h.db.NewIterator(nil, h.ro)
  2045  
  2046  	// Add an element that should not be reflected in the iterator.
  2047  	h.put("25", "cd")
  2048  
  2049  	iter.Seek([]byte("5"))
  2050  	assertBytes(t, []byte("5"), iter.Key())
  2051  	iter.Prev()
  2052  	assertBytes(t, []byte("4"), iter.Key())
  2053  	iter.Prev()
  2054  	assertBytes(t, []byte("3"), iter.Key())
  2055  	iter.Next()
  2056  	assertBytes(t, []byte("4"), iter.Key())
  2057  	iter.Next()
  2058  	assertBytes(t, []byte("5"), iter.Key())
  2059  }
  2060  
  2061  func TestDB_GoleveldbIssue74(t *testing.T) {
  2062  	h := newDbHarnessWopt(t, &opt.Options{
  2063  		DisableLargeBatchTransaction: true,
  2064  		WriteBuffer:                  1 * opt.MiB,
  2065  	})
  2066  	defer h.close()
  2067  
  2068  	const n, dur = 10000, 5 * time.Second
  2069  
  2070  	runtime.GOMAXPROCS(runtime.NumCPU())
  2071  
  2072  	until := time.Now().Add(dur)
  2073  	wg := new(sync.WaitGroup)
  2074  	wg.Add(2)
  2075  	var done uint32
  2076  	go func() {
  2077  		var i int
  2078  		defer func() {
  2079  			t.Logf("WRITER DONE #%d", i)
  2080  			atomic.StoreUint32(&done, 1)
  2081  			wg.Done()
  2082  		}()
  2083  
  2084  		b := new(Batch)
  2085  		for ; time.Now().Before(until) && atomic.LoadUint32(&done) == 0; i++ {
  2086  			if t.Failed() {
  2087  				return
  2088  			}
  2089  
  2090  			iv := fmt.Sprintf("VAL%010d", i)
  2091  			for k := 0; k < n; k++ {
  2092  				key := fmt.Sprintf("KEY%06d", k)
  2093  				b.Put([]byte(key), []byte(key+iv))
  2094  				b.Put([]byte(fmt.Sprintf("PTR%06d", k)), []byte(key))
  2095  			}
  2096  			h.write(b)
  2097  
  2098  			b.Reset()
  2099  			snap := h.getSnapshot()
  2100  			iter := snap.NewIterator(util.BytesPrefix([]byte("PTR")), nil)
  2101  			var k int
  2102  			for ; iter.Next(); k++ {
  2103  				if t.Failed() {
  2104  					return
  2105  				}
  2106  
  2107  				ptrKey := iter.Key()
  2108  				key := iter.Value()
  2109  
  2110  				if _, err := snap.Get(ptrKey, nil); err != nil {
  2111  					t.Errorf("WRITER #%d snapshot.Get %q: %v", i, ptrKey, err)
  2112  					return
  2113  				}
  2114  				if value, err := snap.Get(key, nil); err != nil {
  2115  					t.Errorf("WRITER #%d snapshot.Get %q: %v", i, key, err)
  2116  					return
  2117  				} else if string(value) != string(key)+iv {
  2118  					t.Errorf("WRITER #%d snapshot.Get %q got invalid value, want %q got %q", i, key, string(key)+iv, value)
  2119  					return
  2120  				}
  2121  
  2122  				b.Delete(key)
  2123  				b.Delete(ptrKey)
  2124  			}
  2125  			h.write(b)
  2126  			iter.Release()
  2127  			snap.Release()
  2128  			if k != n {
  2129  				t.Errorf("#%d %d != %d", i, k, n)
  2130  			}
  2131  		}
  2132  	}()
  2133  	go func() {
  2134  		var i int
  2135  		defer func() {
  2136  			t.Logf("READER DONE #%d", i)
  2137  			atomic.StoreUint32(&done, 1)
  2138  			wg.Done()
  2139  		}()
  2140  		for ; time.Now().Before(until) && atomic.LoadUint32(&done) == 0; i++ {
  2141  			if t.Failed() {
  2142  				return
  2143  			}
  2144  
  2145  			snap := h.getSnapshot()
  2146  			iter := snap.NewIterator(util.BytesPrefix([]byte("PTR")), nil)
  2147  			var prevValue string
  2148  			var k int
  2149  			for ; iter.Next(); k++ {
  2150  				if t.Failed() {
  2151  					return
  2152  				}
  2153  
  2154  				ptrKey := iter.Key()
  2155  				key := iter.Value()
  2156  
  2157  				if _, err := snap.Get(ptrKey, nil); err != nil {
  2158  					t.Errorf("READER #%d snapshot.Get %q: %v", i, ptrKey, err)
  2159  					return
  2160  				}
  2161  
  2162  				if value, err := snap.Get(key, nil); err != nil {
  2163  					t.Errorf("READER #%d snapshot.Get %q: %v", i, key, err)
  2164  					return
  2165  				} else if prevValue != "" && string(value) != string(key)+prevValue {
  2166  					t.Errorf("READER #%d snapshot.Get %q got invalid value, want %q got %q", i, key, string(key)+prevValue, value)
  2167  					return
  2168  				} else {
  2169  					prevValue = string(value[len(key):])
  2170  				}
  2171  			}
  2172  			iter.Release()
  2173  			snap.Release()
  2174  			if k > 0 && k != n {
  2175  				t.Errorf("#%d %d != %d", i, k, n)
  2176  			}
  2177  		}
  2178  	}()
  2179  	wg.Wait()
  2180  }
  2181  
  2182  func TestDB_GetProperties(t *testing.T) {
  2183  	h := newDbHarness(t)
  2184  	defer h.close()
  2185  
  2186  	_, err := h.db.GetProperty("leveldb.num-files-at-level")
  2187  	if err == nil {
  2188  		t.Error("GetProperty() failed to detect missing level")
  2189  	}
  2190  
  2191  	_, err = h.db.GetProperty("leveldb.num-files-at-level0")
  2192  	if err != nil {
  2193  		t.Error("got unexpected error", err)
  2194  	}
  2195  
  2196  	_, err = h.db.GetProperty("leveldb.num-files-at-level0x")
  2197  	if err == nil {
  2198  		t.Error("GetProperty() failed to detect invalid level")
  2199  	}
  2200  }
  2201  
  2202  func TestDB_GoleveldbIssue72and83(t *testing.T) {
  2203  	h := newDbHarnessWopt(t, &opt.Options{
  2204  		DisableLargeBatchTransaction: true,
  2205  		WriteBuffer:                  1 * opt.MiB,
  2206  		OpenFilesCacheCapacity:       3,
  2207  	})
  2208  	defer h.close()
  2209  
  2210  	const n, wn, dur = 10000, 100, 30 * time.Second
  2211  
  2212  	runtime.GOMAXPROCS(runtime.NumCPU())
  2213  
  2214  	randomData := func(prefix byte, i int) []byte {
  2215  		data := make([]byte, 1+4+32+64+32)
  2216  		_, err := crand.Reader.Read(data[1 : len(data)-8])
  2217  		if err != nil {
  2218  			panic(err)
  2219  		}
  2220  		data[0] = prefix
  2221  		binary.LittleEndian.PutUint32(data[len(data)-8:], uint32(i))
  2222  		binary.LittleEndian.PutUint32(data[len(data)-4:], util.NewCRC(data[:len(data)-4]).Value())
  2223  		return data
  2224  	}
  2225  
  2226  	keys := make([][]byte, n)
  2227  	for i := range keys {
  2228  		keys[i] = randomData(1, 0)
  2229  	}
  2230  
  2231  	until := time.Now().Add(dur)
  2232  	wg := new(sync.WaitGroup)
  2233  	wg.Add(3)
  2234  	var done uint32
  2235  	go func() {
  2236  		i := 0
  2237  		defer func() {
  2238  			t.Logf("WRITER DONE #%d", i)
  2239  			wg.Done()
  2240  		}()
  2241  
  2242  		b := new(Batch)
  2243  		for ; i < wn && atomic.LoadUint32(&done) == 0; i++ {
  2244  			if t.Failed() {
  2245  				return
  2246  			}
  2247  
  2248  			b.Reset()
  2249  			for _, k1 := range keys {
  2250  				k2 := randomData(2, i)
  2251  				b.Put(k2, randomData(42, i))
  2252  				b.Put(k1, k2)
  2253  			}
  2254  			if err := h.db.Write(b, h.wo); err != nil {
  2255  				atomic.StoreUint32(&done, 1)
  2256  				t.Errorf("WRITER #%d db.Write: %v", i, err)
  2257  				return
  2258  			}
  2259  		}
  2260  	}()
  2261  	go func() {
  2262  		var i int
  2263  		defer func() {
  2264  			t.Logf("READER0 DONE #%d", i)
  2265  			atomic.StoreUint32(&done, 1)
  2266  			wg.Done()
  2267  		}()
  2268  		for ; time.Now().Before(until) && atomic.LoadUint32(&done) == 0; i++ {
  2269  			if t.Failed() {
  2270  				return
  2271  			}
  2272  
  2273  			snap := h.getSnapshot()
  2274  			seq := snap.elem.seq
  2275  			if seq == 0 {
  2276  				snap.Release()
  2277  				continue
  2278  			}
  2279  			iter := snap.NewIterator(util.BytesPrefix([]byte{1}), nil)
  2280  			writei := int(seq/(n*2) - 1)
  2281  			var k int
  2282  			for ; iter.Next(); k++ {
  2283  				if t.Failed() {
  2284  					return
  2285  				}
  2286  
  2287  				k1 := iter.Key()
  2288  				k2 := iter.Value()
  2289  				k1checksum0 := binary.LittleEndian.Uint32(k1[len(k1)-4:])
  2290  				k1checksum1 := util.NewCRC(k1[:len(k1)-4]).Value()
  2291  				if k1checksum0 != k1checksum1 {
  2292  					t.Errorf("READER0 #%d.%d W#%d invalid K1 checksum: %#x != %#x", i, k, writei, k1checksum0, k1checksum0)
  2293  					return
  2294  				}
  2295  				k2checksum0 := binary.LittleEndian.Uint32(k2[len(k2)-4:])
  2296  				k2checksum1 := util.NewCRC(k2[:len(k2)-4]).Value()
  2297  				if k2checksum0 != k2checksum1 {
  2298  					t.Errorf("READER0 #%d.%d W#%d invalid K2 checksum: %#x != %#x", i, k, writei, k2checksum0, k2checksum1)
  2299  					return
  2300  				}
  2301  				kwritei := int(binary.LittleEndian.Uint32(k2[len(k2)-8:]))
  2302  				if writei != kwritei {
  2303  					t.Errorf("READER0 #%d.%d W#%d invalid write iteration num: %d", i, k, writei, kwritei)
  2304  					return
  2305  				}
  2306  				if _, err := snap.Get(k2, nil); err != nil {
  2307  					t.Errorf("READER0 #%d.%d W#%d snap.Get: %v\nk1: %x\n -> k2: %x", i, k, writei, err, k1, k2)
  2308  					return
  2309  				}
  2310  			}
  2311  			if err := iter.Error(); err != nil {
  2312  				t.Errorf("READER0 #%d.%d W#%d snap.Iterator: %v", i, k, writei, err)
  2313  				return
  2314  			}
  2315  			iter.Release()
  2316  			snap.Release()
  2317  			if k > 0 && k != n {
  2318  				t.Errorf("READER0 #%d W#%d short read, got=%d want=%d", i, writei, k, n)
  2319  			}
  2320  		}
  2321  	}()
  2322  	go func() {
  2323  		var i int
  2324  		defer func() {
  2325  			t.Logf("READER1 DONE #%d", i)
  2326  			atomic.StoreUint32(&done, 1)
  2327  			wg.Done()
  2328  		}()
  2329  		for ; time.Now().Before(until) && atomic.LoadUint32(&done) == 0; i++ {
  2330  			if t.Failed() {
  2331  				return
  2332  			}
  2333  
  2334  			iter := h.db.NewIterator(nil, nil)
  2335  			seq := iter.(*dbIter).seq
  2336  			if seq == 0 {
  2337  				iter.Release()
  2338  				continue
  2339  			}
  2340  			writei := int(seq/(n*2) - 1)
  2341  			var k int
  2342  			for ok := iter.Last(); ok; ok = iter.Prev() {
  2343  				k++
  2344  			}
  2345  			if err := iter.Error(); err != nil {
  2346  				t.Errorf("READER1 #%d.%d W#%d db.Iterator: %v", i, k, writei, err)
  2347  				return
  2348  			}
  2349  			iter.Release()
  2350  			if m := (writei+1)*n + n; k != m {
  2351  				t.Errorf("READER1 #%d W#%d short read, got=%d want=%d", i, writei, k, m)
  2352  				return
  2353  			}
  2354  		}
  2355  	}()
  2356  
  2357  	wg.Wait()
  2358  }
  2359  
  2360  func TestDB_TransientError(t *testing.T) {
  2361  	h := newDbHarnessWopt(t, &opt.Options{
  2362  		DisableLargeBatchTransaction: true,
  2363  		WriteBuffer:                  128 * opt.KiB,
  2364  		OpenFilesCacheCapacity:       3,
  2365  		DisableCompactionBackoff:     true,
  2366  	})
  2367  	defer h.close()
  2368  
  2369  	const (
  2370  		nSnap = 20
  2371  		nKey  = 10000
  2372  	)
  2373  
  2374  	var (
  2375  		snaps [nSnap]*Snapshot
  2376  		b     = &Batch{}
  2377  	)
  2378  	for i := range snaps {
  2379  		vtail := fmt.Sprintf("VAL%030d", i)
  2380  		b.Reset()
  2381  		for k := 0; k < nKey; k++ {
  2382  			key := fmt.Sprintf("KEY%8d", k)
  2383  			b.Put([]byte(key), []byte(key+vtail))
  2384  		}
  2385  		h.stor.EmulateError(testutil.ModeOpen|testutil.ModeRead, storage.TypeTable, errors.New("table transient read error"))
  2386  		if err := h.db.Write(b, nil); err != nil {
  2387  			t.Logf("WRITE #%d error: %v", i, err)
  2388  			h.stor.EmulateError(testutil.ModeOpen|testutil.ModeRead, storage.TypeTable, nil)
  2389  			for {
  2390  				if err := h.db.Write(b, nil); err == nil {
  2391  					break
  2392  				} else if errors.IsCorrupted(err) {
  2393  					t.Fatalf("WRITE #%d corrupted: %v", i, err)
  2394  				}
  2395  			}
  2396  		}
  2397  
  2398  		snaps[i] = h.db.newSnapshot()
  2399  		b.Reset()
  2400  		for k := 0; k < nKey; k++ {
  2401  			key := fmt.Sprintf("KEY%8d", k)
  2402  			b.Delete([]byte(key))
  2403  		}
  2404  		h.stor.EmulateError(testutil.ModeOpen|testutil.ModeRead, storage.TypeTable, errors.New("table transient read error"))
  2405  		if err := h.db.Write(b, nil); err != nil {
  2406  			t.Logf("WRITE #%d  error: %v", i, err)
  2407  			h.stor.EmulateError(testutil.ModeOpen|testutil.ModeRead, storage.TypeTable, nil)
  2408  			for {
  2409  				if err := h.db.Write(b, nil); err == nil {
  2410  					break
  2411  				} else if errors.IsCorrupted(err) {
  2412  					t.Fatalf("WRITE #%d corrupted: %v", i, err)
  2413  				}
  2414  			}
  2415  		}
  2416  	}
  2417  	h.stor.EmulateError(testutil.ModeOpen|testutil.ModeRead, storage.TypeTable, nil)
  2418  
  2419  	runtime.GOMAXPROCS(runtime.NumCPU())
  2420  
  2421  	rnd := rand.New(rand.NewSource(0xecafdaed))
  2422  	wg := &sync.WaitGroup{}
  2423  	for i, snap := range snaps {
  2424  		wg.Add(2)
  2425  
  2426  		go func(i int, snap *Snapshot, sk []int) {
  2427  			defer wg.Done()
  2428  
  2429  			vtail := fmt.Sprintf("VAL%030d", i)
  2430  			for _, k := range sk {
  2431  				if t.Failed() {
  2432  					return
  2433  				}
  2434  
  2435  				key := fmt.Sprintf("KEY%8d", k)
  2436  				xvalue, err := snap.Get([]byte(key), nil)
  2437  				if err != nil {
  2438  					t.Errorf("READER_GET #%d SEQ=%d K%d error: %v", i, snap.elem.seq, k, err)
  2439  					return
  2440  				}
  2441  				value := key + vtail
  2442  				if !bytes.Equal([]byte(value), xvalue) {
  2443  					t.Errorf("READER_GET #%d SEQ=%d K%d invalid value: want %q, got %q", i, snap.elem.seq, k, value, xvalue)
  2444  					return
  2445  				}
  2446  			}
  2447  		}(i, snap, rnd.Perm(nKey))
  2448  
  2449  		go func(i int, snap *Snapshot) {
  2450  			defer wg.Done()
  2451  
  2452  			vtail := fmt.Sprintf("VAL%030d", i)
  2453  			iter := snap.NewIterator(nil, nil)
  2454  			defer iter.Release()
  2455  			for k := 0; k < nKey; k++ {
  2456  				if t.Failed() {
  2457  					return
  2458  				}
  2459  
  2460  				if !iter.Next() {
  2461  					if err := iter.Error(); err != nil {
  2462  						t.Errorf("READER_ITER #%d K%d error: %v", i, k, err)
  2463  						return
  2464  					}
  2465  					t.Errorf("READER_ITER #%d K%d eoi", i, k)
  2466  					return
  2467  				}
  2468  				key := fmt.Sprintf("KEY%8d", k)
  2469  				xkey := iter.Key()
  2470  				if !bytes.Equal([]byte(key), xkey) {
  2471  					t.Errorf("READER_ITER #%d K%d invalid key: want %q, got %q", i, k, key, xkey)
  2472  					return
  2473  				}
  2474  				value := key + vtail
  2475  				xvalue := iter.Value()
  2476  				if !bytes.Equal([]byte(value), xvalue) {
  2477  					t.Errorf("READER_ITER #%d K%d invalid value: want %q, got %q", i, k, value, xvalue)
  2478  					return
  2479  				}
  2480  			}
  2481  		}(i, snap)
  2482  	}
  2483  
  2484  	wg.Wait()
  2485  }
  2486  
  2487  func TestDB_UkeyShouldntHopAcrossTable(t *testing.T) {
  2488  	h := newDbHarnessWopt(t, &opt.Options{
  2489  		DisableLargeBatchTransaction: true,
  2490  		WriteBuffer:                  112 * opt.KiB,
  2491  		CompactionTableSize:          90 * opt.KiB,
  2492  		CompactionExpandLimitFactor:  1,
  2493  	})
  2494  	defer h.close()
  2495  
  2496  	const (
  2497  		nSnap = 190
  2498  		nKey  = 140
  2499  	)
  2500  
  2501  	var (
  2502  		snaps [nSnap]*Snapshot
  2503  		b     = &Batch{}
  2504  	)
  2505  	for i := range snaps {
  2506  		vtail := fmt.Sprintf("VAL%030d", i)
  2507  		b.Reset()
  2508  		for k := 0; k < nKey; k++ {
  2509  			key := fmt.Sprintf("KEY%08d", k)
  2510  			b.Put([]byte(key), []byte(key+vtail))
  2511  		}
  2512  		if err := h.db.Write(b, nil); err != nil {
  2513  			t.Fatalf("WRITE #%d error: %v", i, err)
  2514  		}
  2515  
  2516  		snaps[i] = h.db.newSnapshot()
  2517  		b.Reset()
  2518  		for k := 0; k < nKey; k++ {
  2519  			key := fmt.Sprintf("KEY%08d", k)
  2520  			b.Delete([]byte(key))
  2521  		}
  2522  		if err := h.db.Write(b, nil); err != nil {
  2523  			t.Fatalf("WRITE #%d  error: %v", i, err)
  2524  		}
  2525  	}
  2526  
  2527  	h.compactMem()
  2528  
  2529  	h.waitCompaction()
  2530  	for level, tables := range h.db.s.stVersion.levels {
  2531  		for _, table := range tables {
  2532  			t.Logf("L%d@%d %q:%q", level, table.fd.Num, table.imin, table.imax)
  2533  		}
  2534  	}
  2535  
  2536  	h.compactRangeAt(0, "", "")
  2537  	h.waitCompaction()
  2538  	for level, tables := range h.db.s.stVersion.levels {
  2539  		for _, table := range tables {
  2540  			t.Logf("L%d@%d %q:%q", level, table.fd.Num, table.imin, table.imax)
  2541  		}
  2542  	}
  2543  	h.compactRangeAt(1, "", "")
  2544  	h.waitCompaction()
  2545  	for level, tables := range h.db.s.stVersion.levels {
  2546  		for _, table := range tables {
  2547  			t.Logf("L%d@%d %q:%q", level, table.fd.Num, table.imin, table.imax)
  2548  		}
  2549  	}
  2550  	runtime.GOMAXPROCS(runtime.NumCPU())
  2551  
  2552  	wg := &sync.WaitGroup{}
  2553  	for i, snap := range snaps {
  2554  		wg.Add(1)
  2555  
  2556  		go func(i int, snap *Snapshot) {
  2557  			defer wg.Done()
  2558  
  2559  			vtail := fmt.Sprintf("VAL%030d", i)
  2560  			for k := 0; k < nKey; k++ {
  2561  				if t.Failed() {
  2562  					return
  2563  				}
  2564  
  2565  				key := fmt.Sprintf("KEY%08d", k)
  2566  				xvalue, err := snap.Get([]byte(key), nil)
  2567  				if err != nil {
  2568  					t.Errorf("READER_GET #%d SEQ=%d K%d error: %v", i, snap.elem.seq, k, err)
  2569  					return
  2570  				}
  2571  				value := key + vtail
  2572  				if !bytes.Equal([]byte(value), xvalue) {
  2573  					t.Errorf("READER_GET #%d SEQ=%d K%d invalid value: want %q, got %q", i, snap.elem.seq, k, value, xvalue)
  2574  					return
  2575  				}
  2576  			}
  2577  		}(i, snap)
  2578  	}
  2579  
  2580  	wg.Wait()
  2581  }
  2582  
  2583  func TestDB_TableCompactionBuilder(t *testing.T) {
  2584  	gomega.RegisterTestingT(t)
  2585  	stor := testutil.NewStorage()
  2586  	stor.OnLog(testingLogger(t))
  2587  	stor.OnClose(testingPreserveOnFailed(t))
  2588  	defer stor.Close()
  2589  
  2590  	const nSeq = 99
  2591  
  2592  	o := &opt.Options{
  2593  		DisableLargeBatchTransaction: true,
  2594  		WriteBuffer:                  112 * opt.KiB,
  2595  		CompactionTableSize:          43 * opt.KiB,
  2596  		CompactionExpandLimitFactor:  1,
  2597  		CompactionGPOverlapsFactor:   1,
  2598  		DisableBlockCache:            true,
  2599  	}
  2600  	s, err := newSession(stor, o)
  2601  	if err != nil {
  2602  		t.Fatal(err)
  2603  	}
  2604  	if err := s.create(); err != nil {
  2605  		t.Fatal(err)
  2606  	}
  2607  	defer s.close()
  2608  	var (
  2609  		seq        uint64
  2610  		targetSize = 5 * o.CompactionTableSize
  2611  		value      = bytes.Repeat([]byte{'0'}, 100)
  2612  	)
  2613  	for i := 0; i < 2; i++ {
  2614  		tw, err := s.tops.create(0)
  2615  		if err != nil {
  2616  			t.Fatal(err)
  2617  		}
  2618  		for k := 0; tw.tw.BytesLen() < targetSize; k++ {
  2619  			key := []byte(fmt.Sprintf("%09d", k))
  2620  			seq += nSeq - 1
  2621  			for x := uint64(0); x < nSeq; x++ {
  2622  				if err := tw.append(makeInternalKey(nil, key, seq-x, keyTypeVal), value); err != nil {
  2623  					t.Fatal(err)
  2624  				}
  2625  			}
  2626  		}
  2627  		tf, err := tw.finish()
  2628  		if err != nil {
  2629  			t.Fatal(err)
  2630  		}
  2631  		rec := &sessionRecord{}
  2632  		rec.addTableFile(i, tf)
  2633  		if err := s.commit(rec, false); err != nil {
  2634  			t.Fatal(err)
  2635  		}
  2636  	}
  2637  
  2638  	// Build grandparent.
  2639  	v := s.version()
  2640  	c := newCompaction(s, v, 1, append(tFiles{}, v.levels[1]...), undefinedCompaction)
  2641  	rec := &sessionRecord{}
  2642  	b := &tableCompactionBuilder{
  2643  		s:         s,
  2644  		c:         c,
  2645  		rec:       rec,
  2646  		stat1:     new(cStatStaging),
  2647  		minSeq:    0,
  2648  		strict:    true,
  2649  		tableSize: o.CompactionTableSize/3 + 961,
  2650  	}
  2651  	if err := b.run(new(compactionTransactCounter)); err != nil {
  2652  		t.Fatal(err)
  2653  	}
  2654  	for _, t := range c.levels[0] {
  2655  		rec.delTable(c.sourceLevel, t.fd.Num)
  2656  	}
  2657  	if err := s.commit(rec, false); err != nil {
  2658  		t.Fatal(err)
  2659  	}
  2660  	c.release()
  2661  
  2662  	// Build level-1.
  2663  	v = s.version()
  2664  	c = newCompaction(s, v, 0, append(tFiles{}, v.levels[0]...), undefinedCompaction)
  2665  	rec = &sessionRecord{}
  2666  	b = &tableCompactionBuilder{
  2667  		s:         s,
  2668  		c:         c,
  2669  		rec:       rec,
  2670  		stat1:     new(cStatStaging),
  2671  		minSeq:    0,
  2672  		strict:    true,
  2673  		tableSize: o.CompactionTableSize,
  2674  	}
  2675  	if err := b.run(new(compactionTransactCounter)); err != nil {
  2676  		t.Fatal(err)
  2677  	}
  2678  	for _, t := range c.levels[0] {
  2679  		rec.delTable(c.sourceLevel, t.fd.Num)
  2680  	}
  2681  	// Move grandparent to level-3
  2682  	for _, t := range v.levels[2] {
  2683  		rec.delTable(2, t.fd.Num)
  2684  		rec.addTableFile(3, t)
  2685  	}
  2686  	if err := s.commit(rec, false); err != nil {
  2687  		t.Fatal(err)
  2688  	}
  2689  	c.release()
  2690  
  2691  	v = s.version()
  2692  	for level, want := range []bool{false, true, false, true} {
  2693  		got := len(v.levels[level]) > 0
  2694  		if want != got {
  2695  			t.Fatalf("invalid level-%d tables len: want %v, got %v", level, want, got)
  2696  		}
  2697  	}
  2698  	for i, f := range v.levels[1][:len(v.levels[1])-1] {
  2699  		nf := v.levels[1][i+1]
  2700  		if bytes.Equal(f.imax.ukey(), nf.imin.ukey()) {
  2701  			t.Fatalf("KEY %q hop across table %d .. %d", f.imax.ukey(), f.fd.Num, nf.fd.Num)
  2702  		}
  2703  	}
  2704  	v.release()
  2705  
  2706  	// Compaction with transient error.
  2707  	v = s.version()
  2708  	c = newCompaction(s, v, 1, append(tFiles{}, v.levels[1]...), undefinedCompaction)
  2709  	rec = &sessionRecord{}
  2710  	b = &tableCompactionBuilder{
  2711  		s:         s,
  2712  		c:         c,
  2713  		rec:       rec,
  2714  		stat1:     new(cStatStaging),
  2715  		minSeq:    0,
  2716  		strict:    true,
  2717  		tableSize: o.CompactionTableSize,
  2718  	}
  2719  	stor.EmulateErrorOnce(testutil.ModeSync, storage.TypeTable, errors.New("table sync error (once)"))
  2720  	stor.EmulateRandomError(testutil.ModeRead|testutil.ModeWrite, storage.TypeTable, 0.01, errors.New("table random IO error"))
  2721  	for {
  2722  		if err := b.run(new(compactionTransactCounter)); err != nil {
  2723  			t.Logf("(expected) b.run: %v", err)
  2724  		} else {
  2725  			break
  2726  		}
  2727  	}
  2728  	if err := s.commit(rec, false); err != nil {
  2729  		t.Fatal(err)
  2730  	}
  2731  	c.release()
  2732  
  2733  	stor.EmulateErrorOnce(testutil.ModeSync, storage.TypeTable, nil)
  2734  	stor.EmulateRandomError(testutil.ModeRead|testutil.ModeWrite, storage.TypeTable, 0, nil)
  2735  
  2736  	v = s.version()
  2737  	if len(v.levels[1]) != len(v.levels[2]) {
  2738  		t.Fatalf("invalid tables length, want %d, got %d", len(v.levels[1]), len(v.levels[2]))
  2739  	}
  2740  	for i, f0 := range v.levels[1] {
  2741  		f1 := v.levels[2][i]
  2742  		iter0 := s.tops.newIterator(f0, nil, nil)
  2743  		iter1 := s.tops.newIterator(f1, nil, nil)
  2744  		for j := 0; true; j++ {
  2745  			next0 := iter0.Next()
  2746  			next1 := iter1.Next()
  2747  			if next0 != next1 {
  2748  				t.Fatalf("#%d.%d invalid eoi: want %v, got %v", i, j, next0, next1)
  2749  			}
  2750  			key0 := iter0.Key()
  2751  			key1 := iter1.Key()
  2752  			if !bytes.Equal(key0, key1) {
  2753  				t.Fatalf("#%d.%d invalid key: want %q, got %q", i, j, key0, key1)
  2754  			}
  2755  			if next0 == false {
  2756  				break
  2757  			}
  2758  		}
  2759  		iter0.Release()
  2760  		iter1.Release()
  2761  	}
  2762  	v.release()
  2763  }
  2764  
  2765  func testDB_IterTriggeredCompaction(t *testing.T, limitDiv int) {
  2766  	const (
  2767  		vSize = 200 * opt.KiB
  2768  		tSize = 100 * opt.MiB
  2769  		mIter = 100
  2770  		n     = tSize / vSize
  2771  	)
  2772  
  2773  	h := newDbHarnessWopt(t, &opt.Options{
  2774  		DisableLargeBatchTransaction: true,
  2775  		Compression:                  opt.NoCompression,
  2776  		DisableBlockCache:            true,
  2777  	})
  2778  	defer h.close()
  2779  
  2780  	h.db.memdbMaxLevel = 2
  2781  
  2782  	key := func(x int) string {
  2783  		return fmt.Sprintf("v%06d", x)
  2784  	}
  2785  
  2786  	// Fill.
  2787  	value := strings.Repeat("x", vSize)
  2788  	for i := 0; i < n; i++ {
  2789  		h.put(key(i), value)
  2790  	}
  2791  	h.compactMem()
  2792  
  2793  	// Delete all.
  2794  	for i := 0; i < n; i++ {
  2795  		h.delete(key(i))
  2796  	}
  2797  	h.compactMem()
  2798  
  2799  	var (
  2800  		limit = n / limitDiv
  2801  
  2802  		startKey = key(0)
  2803  		limitKey = key(limit)
  2804  		maxKey   = key(n)
  2805  		slice    = &util.Range{Limit: []byte(limitKey)}
  2806  
  2807  		initialSize0 = h.sizeOf(startKey, limitKey)
  2808  		initialSize1 = h.sizeOf(limitKey, maxKey)
  2809  	)
  2810  
  2811  	t.Logf("initial size %s [rest %s]", shortenb(initialSize0), shortenb(initialSize1))
  2812  
  2813  	for r := 0; true; r++ {
  2814  		if r >= mIter {
  2815  			t.Fatal("taking too long to compact")
  2816  		}
  2817  
  2818  		// Iterates.
  2819  		iter := h.db.NewIterator(slice, h.ro)
  2820  		for iter.Next() {
  2821  		}
  2822  		if err := iter.Error(); err != nil {
  2823  			t.Fatalf("Iter err: %v", err)
  2824  		}
  2825  		iter.Release()
  2826  
  2827  		// Wait compaction.
  2828  		h.waitCompaction()
  2829  
  2830  		// Check size.
  2831  		size0 := h.sizeOf(startKey, limitKey)
  2832  		size1 := h.sizeOf(limitKey, maxKey)
  2833  		t.Logf("#%03d size %s [rest %s]", r, shortenb(size0), shortenb(size1))
  2834  		if size0 < initialSize0/10 {
  2835  			break
  2836  		}
  2837  	}
  2838  
  2839  	if initialSize1 > 0 {
  2840  		h.sizeAssert(limitKey, maxKey, initialSize1/4-opt.MiB, initialSize1+opt.MiB)
  2841  	}
  2842  }
  2843  
  2844  func TestDB_IterTriggeredCompaction(t *testing.T) {
  2845  	testDB_IterTriggeredCompaction(t, 1)
  2846  }
  2847  
  2848  func TestDB_IterTriggeredCompactionHalf(t *testing.T) {
  2849  	testDB_IterTriggeredCompaction(t, 2)
  2850  }
  2851  
  2852  func TestDB_ReadOnly(t *testing.T) {
  2853  	h := newDbHarness(t)
  2854  	defer h.close()
  2855  
  2856  	h.put("foo", "v1")
  2857  	h.put("bar", "v2")
  2858  	h.compactMem()
  2859  
  2860  	h.put("xfoo", "v1")
  2861  	h.put("xbar", "v2")
  2862  
  2863  	t.Log("Trigger read-only")
  2864  	if err := h.db.SetReadOnly(); err != nil {
  2865  		h.close()
  2866  		t.Fatalf("SetReadOnly error: %v", err)
  2867  	}
  2868  
  2869  	mode := testutil.ModeCreate | testutil.ModeRemove | testutil.ModeRename | testutil.ModeWrite | testutil.ModeSync
  2870  	h.stor.EmulateError(mode, storage.TypeAll, errors.New("read-only DB shouldn't writes"))
  2871  
  2872  	ro := func(key, value, wantValue string) {
  2873  		if err := h.db.Put([]byte(key), []byte(value), h.wo); err != ErrReadOnly {
  2874  			t.Fatalf("unexpected error: %v", err)
  2875  		}
  2876  		h.getVal(key, wantValue)
  2877  	}
  2878  
  2879  	ro("foo", "vx", "v1")
  2880  
  2881  	h.o.ReadOnly = true
  2882  	h.reopenDB()
  2883  
  2884  	ro("foo", "vx", "v1")
  2885  	ro("bar", "vx", "v2")
  2886  	h.assertNumKeys(4)
  2887  }
  2888  
  2889  func TestDB_BulkInsertDelete(t *testing.T) {
  2890  	h := newDbHarnessWopt(t, &opt.Options{
  2891  		DisableLargeBatchTransaction: true,
  2892  		Compression:                  opt.NoCompression,
  2893  		CompactionTableSize:          128 * opt.KiB,
  2894  		CompactionTotalSize:          1 * opt.MiB,
  2895  		WriteBuffer:                  256 * opt.KiB,
  2896  	})
  2897  	defer h.close()
  2898  
  2899  	const R = 100
  2900  	const N = 2500
  2901  	key := make([]byte, 4)
  2902  	value := make([]byte, 256)
  2903  	for i := 0; i < R; i++ {
  2904  		offset := N * i
  2905  		for j := 0; j < N; j++ {
  2906  			binary.BigEndian.PutUint32(key, uint32(offset+j))
  2907  			if err := h.db.Put(key, value, nil); err != nil {
  2908  				t.Fatal("Put error: ", err)
  2909  			}
  2910  		}
  2911  		for j := 0; j < N; j++ {
  2912  			binary.BigEndian.PutUint32(key, uint32(offset+j))
  2913  			if err := h.db.Delete(key, nil); err != nil {
  2914  				t.Fatal("Delete error: ", err)
  2915  			}
  2916  		}
  2917  	}
  2918  
  2919  	h.waitCompaction()
  2920  	if tot := h.totalTables(); tot > 10 {
  2921  		t.Fatalf("too many uncompacted tables: %d (%s)", tot, h.getTablesPerLevel())
  2922  	}
  2923  }
  2924  
  2925  func TestDB_GracefulClose(t *testing.T) {
  2926  	runtime.GOMAXPROCS(4)
  2927  	h := newDbHarnessWopt(t, &opt.Options{
  2928  		DisableLargeBatchTransaction: true,
  2929  		Compression:                  opt.NoCompression,
  2930  		CompactionTableSize:          1 * opt.MiB,
  2931  		WriteBuffer:                  1 * opt.MiB,
  2932  	})
  2933  	defer h.close()
  2934  
  2935  	var closeWait sync.WaitGroup
  2936  
  2937  	// During write.
  2938  	n := 0
  2939  	closing := false
  2940  	for i := 0; i < 1000000; i++ {
  2941  		if !closing && h.totalTables() > 3 {
  2942  			t.Logf("close db during write, index=%d", i)
  2943  			closeWait.Add(1)
  2944  			go func() {
  2945  				h.closeDB()
  2946  				closeWait.Done()
  2947  			}()
  2948  			closing = true
  2949  		}
  2950  		if err := h.db.Put([]byte(fmt.Sprintf("%09d", i)), []byte(fmt.Sprintf("VAL-%09d", i)), h.wo); err != nil {
  2951  			t.Logf("Put error: %s (expected)", err)
  2952  			n = i
  2953  			break
  2954  		}
  2955  	}
  2956  	closeWait.Wait()
  2957  
  2958  	// During read.
  2959  	h.openDB()
  2960  	closing = false
  2961  	for i := 0; i < n; i++ {
  2962  		if !closing && i > n/2 {
  2963  			t.Logf("close db during read, index=%d", i)
  2964  			closeWait.Add(1)
  2965  			go func() {
  2966  				h.closeDB()
  2967  				closeWait.Done()
  2968  			}()
  2969  			closing = true
  2970  		}
  2971  		if _, err := h.db.Get([]byte(fmt.Sprintf("%09d", i)), h.ro); err != nil {
  2972  			t.Logf("Get error: %s (expected)", err)
  2973  			break
  2974  		}
  2975  	}
  2976  	closeWait.Wait()
  2977  
  2978  	// During iterate.
  2979  	h.openDB()
  2980  	closing = false
  2981  	iter := h.db.NewIterator(nil, h.ro)
  2982  	for i := 0; iter.Next(); i++ {
  2983  		if len(iter.Key()) == 0 || len(iter.Value()) == 0 {
  2984  			t.Error("Key or value has zero length")
  2985  		}
  2986  		if !closing {
  2987  			t.Logf("close db during iter, index=%d", i)
  2988  			closeWait.Add(1)
  2989  			go func() {
  2990  				h.closeDB()
  2991  				closeWait.Done()
  2992  			}()
  2993  			closing = true
  2994  		}
  2995  		time.Sleep(time.Millisecond)
  2996  	}
  2997  	if err := iter.Error(); err != nil {
  2998  		t.Logf("Iter error: %s (expected)", err)
  2999  	}
  3000  	iter.Release()
  3001  	closeWait.Wait()
  3002  }
  3003  

View as plain text