...

Source file src/github.com/syndtr/goleveldb/leveldb/version_test.go

Documentation: github.com/syndtr/goleveldb/leveldb

     1  package leveldb
     2  
     3  import (
     4  	"encoding/binary"
     5  	"math/rand"
     6  	"reflect"
     7  	"sync"
     8  	"testing"
     9  	"time"
    10  
    11  	"github.com/onsi/gomega"
    12  	"github.com/syndtr/goleveldb/leveldb/storage"
    13  	"github.com/syndtr/goleveldb/leveldb/testutil"
    14  )
    15  
    16  type testFileRec struct {
    17  	level int
    18  	num   int64
    19  }
    20  
    21  func TestVersionStaging(t *testing.T) {
    22  	gomega.RegisterTestingT(t)
    23  	stor := testutil.NewStorage()
    24  	defer stor.Close()
    25  	s, err := newSession(stor, nil)
    26  	if err != nil {
    27  		t.Fatal(err)
    28  	}
    29  	defer func() {
    30  		s.close()
    31  		s.release()
    32  	}()
    33  
    34  	v := newVersion(s)
    35  	v.newStaging()
    36  
    37  	tmp := make([]byte, 4)
    38  	mik := func(i uint64) []byte {
    39  		binary.BigEndian.PutUint32(tmp, uint32(i))
    40  		return []byte(makeInternalKey(nil, tmp, 0, keyTypeVal))
    41  	}
    42  
    43  	for i, x := range []struct {
    44  		add, del []testFileRec
    45  		trivial  bool
    46  		levels   [][]int64
    47  	}{
    48  		{
    49  			add: []testFileRec{
    50  				{1, 1},
    51  			},
    52  			levels: [][]int64{
    53  				{},
    54  				{1},
    55  			},
    56  		},
    57  		{
    58  			add: []testFileRec{
    59  				{1, 1},
    60  			},
    61  			levels: [][]int64{
    62  				{},
    63  				{1},
    64  			},
    65  		},
    66  		{
    67  			del: []testFileRec{
    68  				{1, 1},
    69  			},
    70  			levels: [][]int64{},
    71  		},
    72  		{
    73  			add: []testFileRec{
    74  				{0, 1},
    75  				{0, 3},
    76  				{0, 2},
    77  				{2, 5},
    78  				{1, 4},
    79  			},
    80  			levels: [][]int64{
    81  				{3, 2, 1},
    82  				{4},
    83  				{5},
    84  			},
    85  		},
    86  		{
    87  			add: []testFileRec{
    88  				{1, 6},
    89  				{2, 5},
    90  			},
    91  			del: []testFileRec{
    92  				{0, 1},
    93  				{0, 4},
    94  			},
    95  			levels: [][]int64{
    96  				{3, 2},
    97  				{4, 6},
    98  				{5},
    99  			},
   100  		},
   101  		{
   102  			del: []testFileRec{
   103  				{0, 3},
   104  				{0, 2},
   105  				{1, 4},
   106  				{1, 6},
   107  				{2, 5},
   108  			},
   109  			levels: [][]int64{},
   110  		},
   111  		{
   112  			add: []testFileRec{
   113  				{0, 1},
   114  			},
   115  			levels: [][]int64{
   116  				{1},
   117  			},
   118  		},
   119  		{
   120  			add: []testFileRec{
   121  				{1, 2},
   122  			},
   123  			levels: [][]int64{
   124  				{1},
   125  				{2},
   126  			},
   127  		},
   128  		{
   129  			add: []testFileRec{
   130  				{0, 3},
   131  			},
   132  			levels: [][]int64{
   133  				{3, 1},
   134  				{2},
   135  			},
   136  		},
   137  		{
   138  			add: []testFileRec{
   139  				{6, 9},
   140  			},
   141  			levels: [][]int64{
   142  				{3, 1},
   143  				{2},
   144  				{},
   145  				{},
   146  				{},
   147  				{},
   148  				{9},
   149  			},
   150  		},
   151  		{
   152  			del: []testFileRec{
   153  				{6, 9},
   154  			},
   155  			levels: [][]int64{
   156  				{3, 1},
   157  				{2},
   158  			},
   159  		},
   160  		// memory compaction
   161  		{
   162  			add: []testFileRec{
   163  				{0, 5},
   164  			},
   165  			trivial: true,
   166  			levels: [][]int64{
   167  				{5, 3, 1},
   168  				{2},
   169  			},
   170  		},
   171  		// memory compaction
   172  		{
   173  			add: []testFileRec{
   174  				{0, 4},
   175  			},
   176  			trivial: true,
   177  			levels: [][]int64{
   178  				{5, 4, 3, 1},
   179  				{2},
   180  			},
   181  		},
   182  		// table compaction
   183  		{
   184  			add: []testFileRec{
   185  				{1, 6},
   186  				{1, 7},
   187  				{1, 8},
   188  			},
   189  			del: []testFileRec{
   190  				{0, 3},
   191  				{0, 4},
   192  				{0, 5},
   193  			},
   194  			trivial: true,
   195  			levels: [][]int64{
   196  				{1},
   197  				{2, 6, 7, 8},
   198  			},
   199  		},
   200  	} {
   201  		rec := &sessionRecord{}
   202  		for _, f := range x.add {
   203  			ik := mik(uint64(f.num))
   204  			rec.addTable(f.level, f.num, 1, ik, ik)
   205  		}
   206  		for _, f := range x.del {
   207  			rec.delTable(f.level, f.num)
   208  		}
   209  		vs := v.newStaging()
   210  		vs.commit(rec)
   211  		v = vs.finish(x.trivial)
   212  		if len(v.levels) != len(x.levels) {
   213  			t.Fatalf("#%d: invalid level count: want=%d got=%d", i, len(x.levels), len(v.levels))
   214  		}
   215  		for j, want := range x.levels {
   216  			tables := v.levels[j]
   217  			if len(want) != len(tables) {
   218  				t.Fatalf("#%d.%d: invalid tables count: want=%d got=%d", i, j, len(want), len(tables))
   219  			}
   220  			got := make([]int64, len(tables))
   221  			for k, t := range tables {
   222  				got[k] = t.fd.Num
   223  			}
   224  			if !reflect.DeepEqual(want, got) {
   225  				t.Fatalf("#%d.%d: invalid tables: want=%v got=%v", i, j, want, got)
   226  			}
   227  		}
   228  	}
   229  }
   230  
   231  func TestVersionReference(t *testing.T) {
   232  	gomega.RegisterTestingT(t)
   233  	stor := testutil.NewStorage()
   234  	defer stor.Close()
   235  	s, err := newSession(stor, nil)
   236  	if err != nil {
   237  		t.Fatal(err)
   238  	}
   239  	defer func() {
   240  		s.close()
   241  		s.release()
   242  	}()
   243  
   244  	tmp := make([]byte, 4)
   245  	mik := func(i uint64) []byte {
   246  		binary.BigEndian.PutUint32(tmp, uint32(i))
   247  		return []byte(makeInternalKey(nil, tmp, 0, keyTypeVal))
   248  	}
   249  
   250  	// Test normal version task correctness
   251  	refc := make(chan map[int64]int)
   252  
   253  	for i, x := range []struct {
   254  		add, del []testFileRec
   255  		expect   map[int64]int
   256  		failed   bool
   257  	}{
   258  		{
   259  			[]testFileRec{{0, 1}, {0, 2}},
   260  			nil,
   261  			map[int64]int{1: 1, 2: 1},
   262  			false,
   263  		},
   264  		{
   265  			[]testFileRec{{0, 3}, {0, 4}},
   266  			[]testFileRec{{0, 1}},
   267  			map[int64]int{2: 1, 3: 1, 4: 1},
   268  			false,
   269  		},
   270  		{
   271  			[]testFileRec{{0, 1}, {0, 5}, {0, 6}, {0, 7}},
   272  			[]testFileRec{{0, 2}, {0, 3}, {0, 4}},
   273  			map[int64]int{1: 1, 5: 1, 6: 1, 7: 1},
   274  			false,
   275  		},
   276  		{
   277  			nil,
   278  			nil,
   279  			map[int64]int{1: 1, 5: 1, 6: 1, 7: 1},
   280  			true,
   281  		},
   282  		{
   283  			[]testFileRec{{0, 1}, {0, 5}, {0, 6}, {0, 7}},
   284  			nil,
   285  			map[int64]int{1: 2, 5: 2, 6: 2, 7: 2},
   286  			false,
   287  		},
   288  		{
   289  			nil,
   290  			[]testFileRec{{0, 1}, {0, 5}, {0, 6}, {0, 7}},
   291  			map[int64]int{1: 1, 5: 1, 6: 1, 7: 1},
   292  			false,
   293  		},
   294  		{
   295  			[]testFileRec{{0, 0}},
   296  			[]testFileRec{{0, 1}, {0, 5}, {0, 6}, {0, 7}},
   297  			map[int64]int{0: 1},
   298  			false,
   299  		},
   300  	} {
   301  		rec := &sessionRecord{}
   302  		for n, f := range x.add {
   303  			rec.addTable(f.level, f.num, 1, mik(uint64(i+n)), mik(uint64(i+n)))
   304  		}
   305  		for _, f := range x.del {
   306  			rec.delTable(f.level, f.num)
   307  		}
   308  
   309  		// Simulate some read operations
   310  		var wg sync.WaitGroup
   311  		readN := rand.Intn(300)
   312  		for i := 0; i < readN; i++ {
   313  			wg.Add(1)
   314  			go func() {
   315  				v := s.version()
   316  				time.Sleep(time.Millisecond * time.Duration(rand.Intn(300)))
   317  				v.release()
   318  				wg.Done()
   319  			}()
   320  		}
   321  
   322  		v := s.version()
   323  		vs := v.newStaging()
   324  		vs.commit(rec)
   325  		nv := vs.finish(false)
   326  
   327  		if x.failed {
   328  			s.abandon <- nv.id
   329  		} else {
   330  			s.setVersion(rec, nv)
   331  		}
   332  		v.release()
   333  
   334  		// Wait all read operations
   335  		wg.Wait()
   336  
   337  		time.Sleep(100 * time.Millisecond) // Wait lazy reference finish tasks
   338  
   339  		s.fileRefCh <- refc
   340  		ref := <-refc
   341  		if !reflect.DeepEqual(ref, x.expect) {
   342  			t.Errorf("case %d failed, file reference mismatch, GOT %v, WANT %v", i, ref, x.expect)
   343  		}
   344  	}
   345  
   346  	// Test version task overflow
   347  	var longV = s.version() // This version is held by some long-time operation
   348  	var exp = map[int64]int{0: 1, maxCachedNumber: 1}
   349  	for i := 1; i <= maxCachedNumber; i++ {
   350  		rec := &sessionRecord{}
   351  		rec.addTable(0, int64(i), 1, mik(uint64(i)), mik(uint64(i)))
   352  		rec.delTable(0, int64(i-1))
   353  		v := s.version()
   354  		vs := v.newStaging()
   355  		vs.commit(rec)
   356  		nv := vs.finish(false)
   357  		s.setVersion(rec, nv)
   358  		v.release()
   359  	}
   360  	time.Sleep(100 * time.Millisecond) // Wait lazy reference finish tasks
   361  
   362  	s.fileRefCh <- refc
   363  	ref := <-refc
   364  	if !reflect.DeepEqual(exp, ref) {
   365  		t.Errorf("file reference mismatch, GOT %v, WANT %v", ref, exp)
   366  	}
   367  
   368  	longV.release()
   369  	s.fileRefCh <- refc
   370  	ref = <-refc
   371  	delete(exp, 0)
   372  	if !reflect.DeepEqual(exp, ref) {
   373  		t.Errorf("file reference mismatch, GOT %v, WANT %v", ref, exp)
   374  	}
   375  }
   376  
   377  func BenchmarkVersionStagingNonTrivial(b *testing.B) {
   378  	benchmarkVersionStaging(b, false, 100000)
   379  }
   380  
   381  func BenchmarkVersionStagingTrivial(b *testing.B) {
   382  	benchmarkVersionStaging(b, true, 100000)
   383  }
   384  
   385  func benchmarkVersionStaging(b *testing.B, trivial bool, size int) {
   386  	stor := storage.NewMemStorage()
   387  	defer stor.Close()
   388  	s, err := newSession(stor, nil)
   389  	if err != nil {
   390  		b.Fatal(err)
   391  	}
   392  	defer func() {
   393  		s.close()
   394  		s.release()
   395  	}()
   396  
   397  	tmp := make([]byte, 4)
   398  	mik := func(i uint64) []byte {
   399  		binary.BigEndian.PutUint32(tmp, uint32(i))
   400  		return []byte(makeInternalKey(nil, tmp, 0, keyTypeVal))
   401  	}
   402  
   403  	rec := &sessionRecord{}
   404  	for i := 0; i < size; i++ {
   405  		ik := mik(uint64(i))
   406  		rec.addTable(1, int64(i), 1, ik, ik)
   407  	}
   408  
   409  	v := newVersion(s)
   410  	vs := v.newStaging()
   411  	vs.commit(rec)
   412  	v = vs.finish(false)
   413  
   414  	b.ResetTimer()
   415  	b.ReportAllocs()
   416  
   417  	for i := 0; i < b.N; i++ {
   418  		rec := &sessionRecord{}
   419  		index := rand.Intn(size)
   420  		ik := mik(uint64(index))
   421  
   422  		cnt := 0
   423  		for j := index; j < size && cnt <= 3; j++ {
   424  			rec.addTable(1, int64(i), 1, ik, ik)
   425  			cnt++
   426  		}
   427  		vs := v.newStaging()
   428  		vs.commit(rec)
   429  		vs.finish(trivial)
   430  	}
   431  }
   432  

View as plain text