...

Source file src/github.com/syndtr/goleveldb/leveldb/corrupt_test.go

Documentation: github.com/syndtr/goleveldb/leveldb

     1  // Copyright (c) 2013, Suryandaru Triandana <syndtr@gmail.com>
     2  // All rights reserved.
     3  //
     4  // Use of this source code is governed by a BSD-style license that can be
     5  // found in the LICENSE file.
     6  
     7  package leveldb
     8  
     9  import (
    10  	"bytes"
    11  	"fmt"
    12  	"io"
    13  	"math/rand"
    14  	"testing"
    15  	"time"
    16  
    17  	"github.com/syndtr/goleveldb/leveldb/filter"
    18  	"github.com/syndtr/goleveldb/leveldb/opt"
    19  	"github.com/syndtr/goleveldb/leveldb/storage"
    20  )
    21  
    22  const ctValSize = 1000
    23  
    24  type dbCorruptHarness struct {
    25  	dbHarness
    26  }
    27  
    28  func newDbCorruptHarnessWopt(t *testing.T, o *opt.Options) *dbCorruptHarness {
    29  	h := new(dbCorruptHarness)
    30  	h.init(t, o)
    31  	return h
    32  }
    33  
    34  func newDbCorruptHarness(t *testing.T) *dbCorruptHarness {
    35  	return newDbCorruptHarnessWopt(t, &opt.Options{
    36  		BlockCacheCapacity: 100,
    37  		Strict:             opt.StrictJournalChecksum,
    38  	})
    39  }
    40  
    41  func (h *dbCorruptHarness) recover() {
    42  	p := &h.dbHarness
    43  	t := p.t
    44  
    45  	var err error
    46  	p.db, err = Recover(h.stor, h.o)
    47  	if err != nil {
    48  		t.Fatal("Repair: got error: ", err)
    49  	}
    50  }
    51  
    52  func (h *dbCorruptHarness) build(n int) {
    53  	p := &h.dbHarness
    54  	t := p.t
    55  	db := p.db
    56  
    57  	batch := new(Batch)
    58  	for i := 0; i < n; i++ {
    59  		batch.Reset()
    60  		batch.Put(tkey(i), tval(i, ctValSize))
    61  		err := db.Write(batch, p.wo)
    62  		if err != nil {
    63  			t.Fatal("write error: ", err)
    64  		}
    65  	}
    66  }
    67  
    68  func (h *dbCorruptHarness) buildShuffled(n int, rnd *rand.Rand) {
    69  	p := &h.dbHarness
    70  	t := p.t
    71  	db := p.db
    72  
    73  	batch := new(Batch)
    74  	for i := range rnd.Perm(n) {
    75  		batch.Reset()
    76  		batch.Put(tkey(i), tval(i, ctValSize))
    77  		err := db.Write(batch, p.wo)
    78  		if err != nil {
    79  			t.Fatal("write error: ", err)
    80  		}
    81  	}
    82  }
    83  
    84  func (h *dbCorruptHarness) deleteRand(n, max int, rnd *rand.Rand) {
    85  	p := &h.dbHarness
    86  	t := p.t
    87  	db := p.db
    88  
    89  	batch := new(Batch)
    90  	for i := 0; i < n; i++ {
    91  		batch.Reset()
    92  		batch.Delete(tkey(rnd.Intn(max)))
    93  		err := db.Write(batch, p.wo)
    94  		if err != nil {
    95  			t.Fatal("write error: ", err)
    96  		}
    97  	}
    98  }
    99  
   100  func (h *dbCorruptHarness) corrupt(ft storage.FileType, fi, offset, n int) {
   101  	p := &h.dbHarness
   102  	t := p.t
   103  
   104  	fds, _ := p.stor.List(ft)
   105  	sortFds(fds)
   106  	if fi < 0 {
   107  		fi = len(fds) - 1
   108  	}
   109  	if fi >= len(fds) {
   110  		t.Fatalf("no such file with type %q with index %d", ft, fi)
   111  	}
   112  
   113  	fd := fds[fi]
   114  	r, err := h.stor.Open(fd)
   115  	if err != nil {
   116  		t.Fatal("cannot open file: ", err)
   117  	}
   118  	x, err := r.Seek(0, 2)
   119  	if err != nil {
   120  		t.Fatal("cannot query file size: ", err)
   121  	}
   122  	m := int(x)
   123  	if _, err := r.Seek(0, 0); err != nil {
   124  		t.Fatal(err)
   125  	}
   126  
   127  	if offset < 0 {
   128  		if -offset > m {
   129  			offset = 0
   130  		} else {
   131  			offset = m + offset
   132  		}
   133  	}
   134  	if offset > m {
   135  		offset = m
   136  	}
   137  	if offset+n > m {
   138  		n = m - offset
   139  	}
   140  
   141  	buf := make([]byte, m)
   142  	_, err = io.ReadFull(r, buf)
   143  	if err != nil {
   144  		t.Fatal("cannot read file: ", err)
   145  	}
   146  	r.Close()
   147  
   148  	for i := 0; i < n; i++ {
   149  		buf[offset+i] ^= 0x80
   150  	}
   151  
   152  	err = h.stor.Remove(fd)
   153  	if err != nil {
   154  		t.Fatal("cannot remove old file: ", err)
   155  	}
   156  	w, err := h.stor.Create(fd)
   157  	if err != nil {
   158  		t.Fatal("cannot create new file: ", err)
   159  	}
   160  	_, err = w.Write(buf)
   161  	if err != nil {
   162  		t.Fatal("cannot write new file: ", err)
   163  	}
   164  	w.Close()
   165  }
   166  
   167  func (h *dbCorruptHarness) forceRemoveAll(ft storage.FileType) {
   168  	fds, err := h.stor.List(ft)
   169  	if err != nil {
   170  		h.t.Fatal("get files: ", err)
   171  	}
   172  	for _, fd := range fds {
   173  		if err := h.stor.ForceRemove(fd); err != nil {
   174  			h.t.Error("remove file: ", err)
   175  		}
   176  	}
   177  }
   178  
   179  func (h *dbCorruptHarness) removeOne(ft storage.FileType) {
   180  	fds, err := h.stor.List(ft)
   181  	if err != nil {
   182  		h.t.Fatal("get files: ", err)
   183  	}
   184  	fd := fds[rand.Intn(len(fds))]
   185  	h.t.Logf("removing file @%d", fd.Num)
   186  	if err := h.stor.Remove(fd); err != nil {
   187  		h.t.Error("remove file: ", err)
   188  	}
   189  }
   190  
   191  func (h *dbCorruptHarness) check(min, max int) {
   192  	p := &h.dbHarness
   193  	t := p.t
   194  	db := p.db
   195  
   196  	var n, badk, badv, missed, good int
   197  	iter := db.NewIterator(nil, p.ro)
   198  	for iter.Next() {
   199  		k := 0
   200  		fmt.Sscanf(string(iter.Key()), "%d", &k)
   201  		if k < n {
   202  			badk++
   203  			continue
   204  		}
   205  		missed += k - n
   206  		n = k + 1
   207  		if !bytes.Equal(iter.Value(), tval(k, ctValSize)) {
   208  			badv++
   209  		} else {
   210  			good++
   211  		}
   212  	}
   213  	err := iter.Error()
   214  	iter.Release()
   215  	t.Logf("want=%d..%d got=%d badkeys=%d badvalues=%d missed=%d, err=%v",
   216  		min, max, good, badk, badv, missed, err)
   217  	if good < min || good > max {
   218  		t.Errorf("good entries number not in range")
   219  	}
   220  }
   221  
   222  func TestCorruptDB_Journal(t *testing.T) {
   223  	h := newDbCorruptHarness(t)
   224  	defer h.close()
   225  
   226  	h.build(100)
   227  	h.check(100, 100)
   228  	h.closeDB()
   229  	h.corrupt(storage.TypeJournal, -1, 19, 1)
   230  	h.corrupt(storage.TypeJournal, -1, 32*1024+1000, 1)
   231  
   232  	h.openDB()
   233  	h.check(36, 36)
   234  }
   235  
   236  func TestCorruptDB_Table(t *testing.T) {
   237  	h := newDbCorruptHarness(t)
   238  	defer h.close()
   239  
   240  	h.build(100)
   241  	h.compactMem()
   242  	h.compactRangeAt(0, "", "")
   243  	h.compactRangeAt(1, "", "")
   244  	h.closeDB()
   245  	h.corrupt(storage.TypeTable, -1, 100, 1)
   246  
   247  	h.openDB()
   248  	h.check(99, 99)
   249  }
   250  
   251  func TestCorruptDB_TableIndex(t *testing.T) {
   252  	h := newDbCorruptHarness(t)
   253  	defer h.close()
   254  
   255  	h.build(10000)
   256  	h.compactMem()
   257  	h.closeDB()
   258  	h.corrupt(storage.TypeTable, -1, -2000, 500)
   259  
   260  	h.openDB()
   261  	h.check(5000, 9999)
   262  }
   263  
   264  func TestCorruptDB_MissingManifest(t *testing.T) {
   265  	rnd := rand.New(rand.NewSource(0x0badda7a))
   266  	h := newDbCorruptHarnessWopt(t, &opt.Options{
   267  		BlockCacheCapacity: 100,
   268  		Strict:             opt.StrictJournalChecksum,
   269  		WriteBuffer:        1000 * 60,
   270  	})
   271  	defer h.close()
   272  
   273  	h.build(1000)
   274  	h.compactMem()
   275  	h.buildShuffled(1000, rnd)
   276  	h.compactMem()
   277  	h.deleteRand(500, 1000, rnd)
   278  	h.compactMem()
   279  	h.buildShuffled(1000, rnd)
   280  	h.compactMem()
   281  	h.deleteRand(500, 1000, rnd)
   282  	h.compactMem()
   283  	h.buildShuffled(1000, rnd)
   284  	h.compactMem()
   285  	h.closeDB()
   286  
   287  	h.forceRemoveAll(storage.TypeManifest)
   288  	h.openAssert(false)
   289  
   290  	h.recover()
   291  	h.check(1000, 1000)
   292  	h.build(1000)
   293  	h.compactMem()
   294  	h.compactRange("", "")
   295  	h.closeDB()
   296  
   297  	h.recover()
   298  	h.check(1000, 1000)
   299  }
   300  
   301  func TestCorruptDB_SequenceNumberRecovery(t *testing.T) {
   302  	h := newDbCorruptHarness(t)
   303  	defer h.close()
   304  
   305  	h.put("foo", "v1")
   306  	h.put("foo", "v2")
   307  	h.put("foo", "v3")
   308  	h.put("foo", "v4")
   309  	h.put("foo", "v5")
   310  	h.closeDB()
   311  
   312  	h.recover()
   313  	h.getVal("foo", "v5")
   314  	h.put("foo", "v6")
   315  	h.getVal("foo", "v6")
   316  
   317  	h.reopenDB()
   318  	h.getVal("foo", "v6")
   319  }
   320  
   321  func TestCorruptDB_SequenceNumberRecoveryTable(t *testing.T) {
   322  	h := newDbCorruptHarness(t)
   323  	defer h.close()
   324  
   325  	h.put("foo", "v1")
   326  	h.put("foo", "v2")
   327  	h.put("foo", "v3")
   328  	h.compactMem()
   329  	h.put("foo", "v4")
   330  	h.put("foo", "v5")
   331  	h.compactMem()
   332  	h.closeDB()
   333  
   334  	h.recover()
   335  	h.getVal("foo", "v5")
   336  	h.put("foo", "v6")
   337  	h.getVal("foo", "v6")
   338  
   339  	h.reopenDB()
   340  	h.getVal("foo", "v6")
   341  }
   342  
   343  func TestCorruptDB_CorruptedManifest(t *testing.T) {
   344  	h := newDbCorruptHarness(t)
   345  	defer h.close()
   346  
   347  	h.put("foo", "hello")
   348  	h.compactMem()
   349  	h.compactRange("", "")
   350  	h.closeDB()
   351  	h.corrupt(storage.TypeManifest, -1, 0, 1000)
   352  	h.openAssert(false)
   353  
   354  	h.recover()
   355  	h.getVal("foo", "hello")
   356  }
   357  
   358  func TestCorruptDB_CompactionInputError(t *testing.T) {
   359  	h := newDbCorruptHarness(t)
   360  	defer h.close()
   361  
   362  	h.build(10)
   363  	h.compactMem()
   364  	h.closeDB()
   365  	h.corrupt(storage.TypeTable, -1, 100, 1)
   366  
   367  	h.openDB()
   368  	h.check(9, 9)
   369  
   370  	h.build(10000)
   371  	h.check(10000, 10000)
   372  }
   373  
   374  func TestCorruptDB_UnrelatedKeys(t *testing.T) {
   375  	h := newDbCorruptHarness(t)
   376  	defer h.close()
   377  
   378  	h.build(10)
   379  	h.compactMem()
   380  	h.closeDB()
   381  	h.corrupt(storage.TypeTable, -1, 100, 1)
   382  
   383  	h.openDB()
   384  	h.put(string(tkey(1000)), string(tval(1000, ctValSize)))
   385  	h.getVal(string(tkey(1000)), string(tval(1000, ctValSize)))
   386  	h.compactMem()
   387  	h.getVal(string(tkey(1000)), string(tval(1000, ctValSize)))
   388  }
   389  
   390  func TestCorruptDB_Level0NewerFileHasOlderSeqnum(t *testing.T) {
   391  	h := newDbCorruptHarness(t)
   392  	defer h.close()
   393  
   394  	h.put("a", "v1")
   395  	h.put("b", "v1")
   396  	h.compactMem()
   397  	h.put("a", "v2")
   398  	h.put("b", "v2")
   399  	h.compactMem()
   400  	h.put("a", "v3")
   401  	h.put("b", "v3")
   402  	h.compactMem()
   403  	h.put("c", "v0")
   404  	h.put("d", "v0")
   405  	h.compactMem()
   406  	h.compactRangeAt(1, "", "")
   407  	h.closeDB()
   408  
   409  	h.recover()
   410  	h.getVal("a", "v3")
   411  	h.getVal("b", "v3")
   412  	h.getVal("c", "v0")
   413  	h.getVal("d", "v0")
   414  }
   415  
   416  func TestCorruptDB_RecoverInvalidSeq_Issue53(t *testing.T) {
   417  	h := newDbCorruptHarness(t)
   418  	defer h.close()
   419  
   420  	h.put("a", "v1")
   421  	h.put("b", "v1")
   422  	h.compactMem()
   423  	h.put("a", "v2")
   424  	h.put("b", "v2")
   425  	h.compactMem()
   426  	h.put("a", "v3")
   427  	h.put("b", "v3")
   428  	h.compactMem()
   429  	h.put("c", "v0")
   430  	h.put("d", "v0")
   431  	h.compactMem()
   432  	h.compactRangeAt(0, "", "")
   433  	h.closeDB()
   434  
   435  	h.recover()
   436  	h.getVal("a", "v3")
   437  	h.getVal("b", "v3")
   438  	h.getVal("c", "v0")
   439  	h.getVal("d", "v0")
   440  }
   441  
   442  func TestCorruptDB_MissingTableFiles(t *testing.T) {
   443  	h := newDbCorruptHarness(t)
   444  	defer h.close()
   445  
   446  	h.put("a", "v1")
   447  	h.put("b", "v1")
   448  	h.compactMem()
   449  	h.put("c", "v2")
   450  	h.put("d", "v2")
   451  	h.compactMem()
   452  	h.put("e", "v3")
   453  	h.put("f", "v3")
   454  	h.closeDB()
   455  
   456  	h.removeOne(storage.TypeTable)
   457  	h.openAssert(false)
   458  }
   459  
   460  func TestCorruptDB_RecoverTable(t *testing.T) {
   461  	h := newDbCorruptHarnessWopt(t, &opt.Options{
   462  		WriteBuffer:         112 * opt.KiB,
   463  		CompactionTableSize: 90 * opt.KiB,
   464  		Filter:              filter.NewBloomFilter(10),
   465  	})
   466  	defer h.close()
   467  
   468  	h.build(1000)
   469  	h.compactMem()
   470  	h.compactRangeAt(0, "", "")
   471  	h.compactRangeAt(1, "", "")
   472  	seq := h.db.seq
   473  	time.Sleep(100 * time.Millisecond) // Wait lazy reference finish tasks
   474  	h.closeDB()
   475  	h.corrupt(storage.TypeTable, 0, 1000, 1)
   476  	h.corrupt(storage.TypeTable, 3, 10000, 1)
   477  	// Corrupted filter shouldn't affect recovery.
   478  	h.corrupt(storage.TypeTable, 3, 113888, 10)
   479  	h.corrupt(storage.TypeTable, -1, 20000, 1)
   480  
   481  	h.recover()
   482  	if h.db.seq != seq {
   483  		t.Errorf("invalid seq, want=%d got=%d", seq, h.db.seq)
   484  	}
   485  	h.check(985, 985)
   486  }
   487  

View as plain text