...

Source file src/go.etcd.io/etcd/server/v3/mvcc/kvstore_test.go

Documentation: go.etcd.io/etcd/server/v3/mvcc

     1  // Copyright 2015 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package mvcc
    16  
    17  import (
    18  	"bytes"
    19  	"context"
    20  	"crypto/rand"
    21  	"encoding/binary"
    22  	"fmt"
    23  	"math"
    24  	mrand "math/rand"
    25  	"os"
    26  	"reflect"
    27  	"sort"
    28  	"strconv"
    29  	"sync"
    30  	"testing"
    31  	"time"
    32  
    33  	"go.etcd.io/etcd/api/v3/mvccpb"
    34  	"go.etcd.io/etcd/client/pkg/v3/testutil"
    35  	"go.etcd.io/etcd/pkg/v3/schedule"
    36  	"go.etcd.io/etcd/pkg/v3/traceutil"
    37  	"go.etcd.io/etcd/server/v3/lease"
    38  	"go.etcd.io/etcd/server/v3/mvcc/backend"
    39  	betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing"
    40  	"go.etcd.io/etcd/server/v3/mvcc/buckets"
    41  
    42  	"go.uber.org/zap"
    43  	"go.uber.org/zap/zaptest"
    44  )
    45  
    46  func TestStoreRev(t *testing.T) {
    47  	b, _ := betesting.NewDefaultTmpBackend(t)
    48  	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
    49  	defer s.Close()
    50  
    51  	for i := 1; i <= 3; i++ {
    52  		s.Put([]byte("foo"), []byte("bar"), lease.NoLease)
    53  		if r := s.Rev(); r != int64(i+1) {
    54  			t.Errorf("#%d: rev = %d, want %d", i, r, i+1)
    55  		}
    56  	}
    57  }
    58  
    59  func TestStorePut(t *testing.T) {
    60  	kv := mvccpb.KeyValue{
    61  		Key:            []byte("foo"),
    62  		Value:          []byte("bar"),
    63  		CreateRevision: 1,
    64  		ModRevision:    2,
    65  		Version:        1,
    66  	}
    67  	kvb, err := kv.Marshal()
    68  	if err != nil {
    69  		t.Fatal(err)
    70  	}
    71  
    72  	tests := []struct {
    73  		rev revision
    74  		r   indexGetResp
    75  		rr  *rangeResp
    76  
    77  		wrev    revision
    78  		wkey    []byte
    79  		wkv     mvccpb.KeyValue
    80  		wputrev revision
    81  	}{
    82  		{
    83  			revision{1, 0},
    84  			indexGetResp{revision{}, revision{}, 0, ErrRevisionNotFound},
    85  			nil,
    86  
    87  			revision{2, 0},
    88  			newTestKeyBytes(revision{2, 0}, false),
    89  			mvccpb.KeyValue{
    90  				Key:            []byte("foo"),
    91  				Value:          []byte("bar"),
    92  				CreateRevision: 2,
    93  				ModRevision:    2,
    94  				Version:        1,
    95  				Lease:          1,
    96  			},
    97  			revision{2, 0},
    98  		},
    99  		{
   100  			revision{1, 1},
   101  			indexGetResp{revision{2, 0}, revision{2, 0}, 1, nil},
   102  			&rangeResp{[][]byte{newTestKeyBytes(revision{2, 1}, false)}, [][]byte{kvb}},
   103  
   104  			revision{2, 0},
   105  			newTestKeyBytes(revision{2, 0}, false),
   106  			mvccpb.KeyValue{
   107  				Key:            []byte("foo"),
   108  				Value:          []byte("bar"),
   109  				CreateRevision: 2,
   110  				ModRevision:    2,
   111  				Version:        2,
   112  				Lease:          2,
   113  			},
   114  			revision{2, 0},
   115  		},
   116  		{
   117  			revision{2, 0},
   118  			indexGetResp{revision{2, 1}, revision{2, 0}, 2, nil},
   119  			&rangeResp{[][]byte{newTestKeyBytes(revision{2, 1}, false)}, [][]byte{kvb}},
   120  
   121  			revision{3, 0},
   122  			newTestKeyBytes(revision{3, 0}, false),
   123  			mvccpb.KeyValue{
   124  				Key:            []byte("foo"),
   125  				Value:          []byte("bar"),
   126  				CreateRevision: 2,
   127  				ModRevision:    3,
   128  				Version:        3,
   129  				Lease:          3,
   130  			},
   131  			revision{3, 0},
   132  		},
   133  	}
   134  	for i, tt := range tests {
   135  		s := newFakeStore()
   136  		b := s.b.(*fakeBackend)
   137  		fi := s.kvindex.(*fakeIndex)
   138  
   139  		s.currentRev = tt.rev.main
   140  		fi.indexGetRespc <- tt.r
   141  		if tt.rr != nil {
   142  			b.tx.rangeRespc <- *tt.rr
   143  		}
   144  
   145  		s.Put([]byte("foo"), []byte("bar"), lease.LeaseID(i+1))
   146  
   147  		data, err := tt.wkv.Marshal()
   148  		if err != nil {
   149  			t.Errorf("#%d: marshal err = %v, want nil", i, err)
   150  		}
   151  
   152  		wact := []testutil.Action{
   153  			{Name: "seqput", Params: []interface{}{buckets.Key, tt.wkey, data}},
   154  		}
   155  
   156  		if tt.rr != nil {
   157  			wact = []testutil.Action{
   158  				{Name: "seqput", Params: []interface{}{buckets.Key, tt.wkey, data}},
   159  			}
   160  		}
   161  
   162  		if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
   163  			t.Errorf("#%d: tx action = %+v, want %+v", i, g, wact)
   164  		}
   165  		wact = []testutil.Action{
   166  			{Name: "get", Params: []interface{}{[]byte("foo"), tt.wputrev.main}},
   167  			{Name: "put", Params: []interface{}{[]byte("foo"), tt.wputrev}},
   168  		}
   169  		if g := fi.Action(); !reflect.DeepEqual(g, wact) {
   170  			t.Errorf("#%d: index action = %+v, want %+v", i, g, wact)
   171  		}
   172  		if s.currentRev != tt.wrev.main {
   173  			t.Errorf("#%d: rev = %+v, want %+v", i, s.currentRev, tt.wrev)
   174  		}
   175  
   176  		s.Close()
   177  	}
   178  }
   179  
   180  func TestStoreRange(t *testing.T) {
   181  	key := newTestKeyBytes(revision{2, 0}, false)
   182  	kv := mvccpb.KeyValue{
   183  		Key:            []byte("foo"),
   184  		Value:          []byte("bar"),
   185  		CreateRevision: 1,
   186  		ModRevision:    2,
   187  		Version:        1,
   188  	}
   189  	kvb, err := kv.Marshal()
   190  	if err != nil {
   191  		t.Fatal(err)
   192  	}
   193  	wrev := int64(2)
   194  
   195  	tests := []struct {
   196  		idxr indexRangeResp
   197  		r    rangeResp
   198  	}{
   199  		{
   200  			indexRangeResp{[][]byte{[]byte("foo")}, []revision{{2, 0}}},
   201  			rangeResp{[][]byte{key}, [][]byte{kvb}},
   202  		},
   203  		{
   204  			indexRangeResp{[][]byte{[]byte("foo"), []byte("foo1")}, []revision{{2, 0}, {3, 0}}},
   205  			rangeResp{[][]byte{key}, [][]byte{kvb}},
   206  		},
   207  	}
   208  
   209  	ro := RangeOptions{Limit: 1, Rev: 0, Count: false}
   210  	for i, tt := range tests {
   211  		s := newFakeStore()
   212  		b := s.b.(*fakeBackend)
   213  		fi := s.kvindex.(*fakeIndex)
   214  
   215  		s.currentRev = 2
   216  		b.tx.rangeRespc <- tt.r
   217  		fi.indexRangeRespc <- tt.idxr
   218  
   219  		ret, err := s.Range(context.TODO(), []byte("foo"), []byte("goo"), ro)
   220  		if err != nil {
   221  			t.Errorf("#%d: err = %v, want nil", i, err)
   222  		}
   223  		if w := []mvccpb.KeyValue{kv}; !reflect.DeepEqual(ret.KVs, w) {
   224  			t.Errorf("#%d: kvs = %+v, want %+v", i, ret.KVs, w)
   225  		}
   226  		if ret.Rev != wrev {
   227  			t.Errorf("#%d: rev = %d, want %d", i, ret.Rev, wrev)
   228  		}
   229  
   230  		wstart := newRevBytes()
   231  		revToBytes(tt.idxr.revs[0], wstart)
   232  		wact := []testutil.Action{
   233  			{Name: "range", Params: []interface{}{buckets.Key, wstart, []byte(nil), int64(0)}},
   234  		}
   235  		if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
   236  			t.Errorf("#%d: tx action = %+v, want %+v", i, g, wact)
   237  		}
   238  		wact = []testutil.Action{
   239  			{Name: "range", Params: []interface{}{[]byte("foo"), []byte("goo"), wrev}},
   240  		}
   241  		if g := fi.Action(); !reflect.DeepEqual(g, wact) {
   242  			t.Errorf("#%d: index action = %+v, want %+v", i, g, wact)
   243  		}
   244  		if s.currentRev != 2 {
   245  			t.Errorf("#%d: current rev = %+v, want %+v", i, s.currentRev, 2)
   246  		}
   247  
   248  		s.Close()
   249  	}
   250  }
   251  
   252  func TestStoreDeleteRange(t *testing.T) {
   253  	key := newTestKeyBytes(revision{2, 0}, false)
   254  	kv := mvccpb.KeyValue{
   255  		Key:            []byte("foo"),
   256  		Value:          []byte("bar"),
   257  		CreateRevision: 1,
   258  		ModRevision:    2,
   259  		Version:        1,
   260  	}
   261  	kvb, err := kv.Marshal()
   262  	if err != nil {
   263  		t.Fatal(err)
   264  	}
   265  
   266  	tests := []struct {
   267  		rev revision
   268  		r   indexRangeResp
   269  		rr  rangeResp
   270  
   271  		wkey    []byte
   272  		wrev    revision
   273  		wrrev   int64
   274  		wdelrev revision
   275  	}{
   276  		{
   277  			revision{2, 0},
   278  			indexRangeResp{[][]byte{[]byte("foo")}, []revision{{2, 0}}},
   279  			rangeResp{[][]byte{key}, [][]byte{kvb}},
   280  
   281  			newTestKeyBytes(revision{3, 0}, true),
   282  			revision{3, 0},
   283  			2,
   284  			revision{3, 0},
   285  		},
   286  	}
   287  	for i, tt := range tests {
   288  		s := newFakeStore()
   289  		b := s.b.(*fakeBackend)
   290  		fi := s.kvindex.(*fakeIndex)
   291  
   292  		s.currentRev = tt.rev.main
   293  		fi.indexRangeRespc <- tt.r
   294  		b.tx.rangeRespc <- tt.rr
   295  
   296  		n, _ := s.DeleteRange([]byte("foo"), []byte("goo"))
   297  		if n != 1 {
   298  			t.Errorf("#%d: n = %d, want 1", i, n)
   299  		}
   300  
   301  		data, err := (&mvccpb.KeyValue{
   302  			Key: []byte("foo"),
   303  		}).Marshal()
   304  		if err != nil {
   305  			t.Errorf("#%d: marshal err = %v, want nil", i, err)
   306  		}
   307  		wact := []testutil.Action{
   308  			{Name: "seqput", Params: []interface{}{buckets.Key, tt.wkey, data}},
   309  		}
   310  		if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
   311  			t.Errorf("#%d: tx action = %+v, want %+v", i, g, wact)
   312  		}
   313  		wact = []testutil.Action{
   314  			{Name: "range", Params: []interface{}{[]byte("foo"), []byte("goo"), tt.wrrev}},
   315  			{Name: "tombstone", Params: []interface{}{[]byte("foo"), tt.wdelrev}},
   316  		}
   317  		if g := fi.Action(); !reflect.DeepEqual(g, wact) {
   318  			t.Errorf("#%d: index action = %+v, want %+v", i, g, wact)
   319  		}
   320  		if s.currentRev != tt.wrev.main {
   321  			t.Errorf("#%d: rev = %+v, want %+v", i, s.currentRev, tt.wrev)
   322  		}
   323  	}
   324  }
   325  
   326  func TestStoreCompact(t *testing.T) {
   327  	s := newFakeStore()
   328  	defer s.Close()
   329  	b := s.b.(*fakeBackend)
   330  	fi := s.kvindex.(*fakeIndex)
   331  
   332  	s.currentRev = 3
   333  	fi.indexCompactRespc <- map[revision]struct{}{{1, 0}: {}}
   334  	key1 := newTestKeyBytes(revision{1, 0}, false)
   335  	key2 := newTestKeyBytes(revision{2, 0}, false)
   336  	b.tx.rangeRespc <- rangeResp{[][]byte{}, [][]byte{}}
   337  	b.tx.rangeRespc <- rangeResp{[][]byte{}, [][]byte{}}
   338  	b.tx.rangeRespc <- rangeResp{[][]byte{key1, key2}, [][]byte{[]byte("alice"), []byte("bob")}}
   339  
   340  	s.Compact(traceutil.TODO(), 3)
   341  	s.fifoSched.WaitFinish(1)
   342  
   343  	if s.compactMainRev != 3 {
   344  		t.Errorf("compact main rev = %d, want 3", s.compactMainRev)
   345  	}
   346  	end := make([]byte, 8)
   347  	binary.BigEndian.PutUint64(end, uint64(4))
   348  	wact := []testutil.Action{
   349  		{Name: "range", Params: []interface{}{buckets.Meta, scheduledCompactKeyName, []uint8(nil), int64(0)}},
   350  		{Name: "range", Params: []interface{}{buckets.Meta, finishedCompactKeyName, []uint8(nil), int64(0)}},
   351  		{Name: "put", Params: []interface{}{buckets.Meta, scheduledCompactKeyName, newTestRevBytes(revision{3, 0})}},
   352  		{Name: "range", Params: []interface{}{buckets.Key, make([]byte, 17), end, int64(10000)}},
   353  		{Name: "delete", Params: []interface{}{buckets.Key, key2}},
   354  		{Name: "put", Params: []interface{}{buckets.Meta, finishedCompactKeyName, newTestRevBytes(revision{3, 0})}},
   355  	}
   356  	if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
   357  		t.Errorf("tx actions = %+v, want %+v", g, wact)
   358  	}
   359  	wact = []testutil.Action{
   360  		{Name: "compact", Params: []interface{}{int64(3)}},
   361  	}
   362  	if g := fi.Action(); !reflect.DeepEqual(g, wact) {
   363  		t.Errorf("index action = %+v, want %+v", g, wact)
   364  	}
   365  }
   366  
   367  func TestStoreRestore(t *testing.T) {
   368  	s := newFakeStore()
   369  	b := s.b.(*fakeBackend)
   370  	fi := s.kvindex.(*fakeIndex)
   371  
   372  	putkey := newTestKeyBytes(revision{3, 0}, false)
   373  	putkv := mvccpb.KeyValue{
   374  		Key:            []byte("foo"),
   375  		Value:          []byte("bar"),
   376  		CreateRevision: 4,
   377  		ModRevision:    4,
   378  		Version:        1,
   379  	}
   380  	putkvb, err := putkv.Marshal()
   381  	if err != nil {
   382  		t.Fatal(err)
   383  	}
   384  	delkey := newTestKeyBytes(revision{5, 0}, true)
   385  	delkv := mvccpb.KeyValue{
   386  		Key: []byte("foo"),
   387  	}
   388  	delkvb, err := delkv.Marshal()
   389  	if err != nil {
   390  		t.Fatal(err)
   391  	}
   392  	b.tx.rangeRespc <- rangeResp{[][]byte{finishedCompactKeyName}, [][]byte{newTestRevBytes(revision{3, 0})}}
   393  	b.tx.rangeRespc <- rangeResp{[][]byte{scheduledCompactKeyName}, [][]byte{newTestRevBytes(revision{3, 0})}}
   394  
   395  	b.tx.rangeRespc <- rangeResp{[][]byte{putkey, delkey}, [][]byte{putkvb, delkvb}}
   396  	b.tx.rangeRespc <- rangeResp{nil, nil}
   397  
   398  	s.restore()
   399  
   400  	if s.compactMainRev != 3 {
   401  		t.Errorf("compact rev = %d, want 3", s.compactMainRev)
   402  	}
   403  	if s.currentRev != 5 {
   404  		t.Errorf("current rev = %v, want 5", s.currentRev)
   405  	}
   406  	wact := []testutil.Action{
   407  		{Name: "range", Params: []interface{}{buckets.Meta, finishedCompactKeyName, []byte(nil), int64(0)}},
   408  		{Name: "range", Params: []interface{}{buckets.Meta, scheduledCompactKeyName, []byte(nil), int64(0)}},
   409  		{Name: "range", Params: []interface{}{buckets.Key, newTestRevBytes(revision{1, 0}), newTestRevBytes(revision{math.MaxInt64, math.MaxInt64}), int64(restoreChunkKeys)}},
   410  	}
   411  	if g := b.tx.Action(); !reflect.DeepEqual(g, wact) {
   412  		t.Errorf("tx actions = %+v, want %+v", g, wact)
   413  	}
   414  
   415  	gens := []generation{
   416  		{created: revision{4, 0}, ver: 2, revs: []revision{{3, 0}, {5, 0}}},
   417  		{created: revision{0, 0}, ver: 0, revs: nil},
   418  	}
   419  	ki := &keyIndex{key: []byte("foo"), modified: revision{5, 0}, generations: gens}
   420  	wact = []testutil.Action{
   421  		{Name: "keyIndex", Params: []interface{}{ki}},
   422  		{Name: "insert", Params: []interface{}{ki}},
   423  	}
   424  	if g := fi.Action(); !reflect.DeepEqual(g, wact) {
   425  		t.Errorf("index action = %+v, want %+v", g, wact)
   426  	}
   427  }
   428  
   429  func TestRestoreDelete(t *testing.T) {
   430  	oldChunk := restoreChunkKeys
   431  	restoreChunkKeys = mrand.Intn(3) + 2
   432  	defer func() { restoreChunkKeys = oldChunk }()
   433  
   434  	b, _ := betesting.NewDefaultTmpBackend(t)
   435  	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
   436  
   437  	keys := make(map[string]struct{})
   438  	for i := 0; i < 20; i++ {
   439  		ks := fmt.Sprintf("foo-%d", i)
   440  		k := []byte(ks)
   441  		s.Put(k, []byte("bar"), lease.NoLease)
   442  		keys[ks] = struct{}{}
   443  		switch mrand.Intn(3) {
   444  		case 0:
   445  			// put random key from past via random range on map
   446  			ks = fmt.Sprintf("foo-%d", mrand.Intn(i+1))
   447  			s.Put([]byte(ks), []byte("baz"), lease.NoLease)
   448  			keys[ks] = struct{}{}
   449  		case 1:
   450  			// delete random key via random range on map
   451  			for k := range keys {
   452  				s.DeleteRange([]byte(k), nil)
   453  				delete(keys, k)
   454  				break
   455  			}
   456  		}
   457  	}
   458  	s.Close()
   459  
   460  	s = NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
   461  	defer s.Close()
   462  	for i := 0; i < 20; i++ {
   463  		ks := fmt.Sprintf("foo-%d", i)
   464  		r, err := s.Range(context.TODO(), []byte(ks), nil, RangeOptions{})
   465  		if err != nil {
   466  			t.Fatal(err)
   467  		}
   468  		if _, ok := keys[ks]; ok {
   469  			if len(r.KVs) == 0 {
   470  				t.Errorf("#%d: expected %q, got deleted", i, ks)
   471  			}
   472  		} else if len(r.KVs) != 0 {
   473  			t.Errorf("#%d: expected deleted, got %q", i, ks)
   474  		}
   475  	}
   476  }
   477  
   478  func TestRestoreContinueUnfinishedCompaction(t *testing.T) {
   479  	tests := []string{"recreate", "restore"}
   480  	for _, test := range tests {
   481  		b, _ := betesting.NewDefaultTmpBackend(t)
   482  		s0 := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
   483  
   484  		s0.Put([]byte("foo"), []byte("bar"), lease.NoLease)
   485  		s0.Put([]byte("foo"), []byte("bar1"), lease.NoLease)
   486  		s0.Put([]byte("foo"), []byte("bar2"), lease.NoLease)
   487  
   488  		// write scheduled compaction, but not do compaction
   489  		rbytes := newRevBytes()
   490  		revToBytes(revision{main: 2}, rbytes)
   491  		tx := s0.b.BatchTx()
   492  		tx.Lock()
   493  		tx.UnsafePut(buckets.Meta, scheduledCompactKeyName, rbytes)
   494  		tx.Unlock()
   495  
   496  		s0.Close()
   497  
   498  		var s *store
   499  		switch test {
   500  		case "recreate":
   501  			s = NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
   502  		case "restore":
   503  			s0.Restore(b)
   504  			s = s0
   505  		}
   506  
   507  		// wait for scheduled compaction to be finished
   508  		time.Sleep(100 * time.Millisecond)
   509  
   510  		if _, err := s.Range(context.TODO(), []byte("foo"), nil, RangeOptions{Rev: 1}); err != ErrCompacted {
   511  			t.Errorf("range on compacted rev error = %v, want %v", err, ErrCompacted)
   512  		}
   513  		// check the key in backend is deleted
   514  		revbytes := newRevBytes()
   515  		revToBytes(revision{main: 1}, revbytes)
   516  
   517  		// The disk compaction is done asynchronously and requires more time on slow disk.
   518  		// try 5 times for CI with slow IO.
   519  		for i := 0; i < 5; i++ {
   520  			tx = s.b.BatchTx()
   521  			tx.Lock()
   522  			ks, _ := tx.UnsafeRange(buckets.Key, revbytes, nil, 0)
   523  			tx.Unlock()
   524  			if len(ks) != 0 {
   525  				time.Sleep(100 * time.Millisecond)
   526  				continue
   527  			}
   528  			return
   529  		}
   530  
   531  		t.Errorf("key for rev %+v still exists, want deleted", bytesToRev(revbytes))
   532  	}
   533  }
   534  
   535  type hashKVResult struct {
   536  	hash       uint32
   537  	compactRev int64
   538  }
   539  
   540  // TestHashKVWhenCompacting ensures that HashKV returns correct hash when compacting.
   541  func TestHashKVWhenCompacting(t *testing.T) {
   542  	b, tmpPath := betesting.NewDefaultTmpBackend(t)
   543  	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
   544  	defer cleanup(s, b, tmpPath)
   545  
   546  	rev := 10000
   547  	for i := 2; i <= rev; i++ {
   548  		s.Put([]byte("foo"), []byte(fmt.Sprintf("bar%d", i)), lease.NoLease)
   549  	}
   550  
   551  	hashCompactc := make(chan hashKVResult, 1)
   552  	var wg sync.WaitGroup
   553  	donec := make(chan struct{})
   554  	stopc := make(chan struct{})
   555  
   556  	// Call HashByRev(10000) in multiple goroutines until donec is closed
   557  	for i := 0; i < 10; i++ {
   558  		wg.Add(1)
   559  		go func() {
   560  			defer wg.Done()
   561  			for {
   562  				hash, _, err := s.HashStorage().HashByRev(int64(rev))
   563  				if err != nil {
   564  					t.Error(err)
   565  				}
   566  				select {
   567  				case <-stopc:
   568  					return
   569  				case <-donec:
   570  					return
   571  				case hashCompactc <- hashKVResult{hash.Hash, hash.CompactRevision}:
   572  				}
   573  			}
   574  		}()
   575  	}
   576  
   577  	// Check computed hashes by HashByRev are correct in a goroutine, until donec is closed
   578  	wg.Add(1)
   579  	go func() {
   580  		defer wg.Done()
   581  		revHash := make(map[int64]uint32)
   582  		for {
   583  			r := <-hashCompactc
   584  			if revHash[r.compactRev] == 0 {
   585  				revHash[r.compactRev] = r.hash
   586  			}
   587  			if r.hash != revHash[r.compactRev] {
   588  				t.Errorf("Hashes differ (current %v) != (saved %v)", r.hash, revHash[r.compactRev])
   589  			}
   590  
   591  			select {
   592  			case <-stopc:
   593  				return
   594  			case <-donec:
   595  				return
   596  			default:
   597  			}
   598  		}
   599  	}()
   600  
   601  	// Compact the store in a goroutine, using revision 9900 to 10000 and close donec when finished
   602  	wg.Add(1)
   603  	go func() {
   604  		defer func() {
   605  			close(donec)
   606  			wg.Done()
   607  		}()
   608  
   609  		for i := 100; i >= 0; i-- {
   610  			select {
   611  			case <-stopc:
   612  				return
   613  			default:
   614  			}
   615  
   616  			_, err := s.Compact(traceutil.TODO(), int64(rev-i))
   617  			if err != nil {
   618  				t.Error(err)
   619  			}
   620  			// Wait for the compaction job to finish
   621  			s.fifoSched.WaitFinish(1)
   622  			// Leave time for calls to HashByRev to take place after each compaction
   623  			time.Sleep(10 * time.Millisecond)
   624  		}
   625  	}()
   626  
   627  	select {
   628  	case <-donec:
   629  	case <-time.After(10 * time.Second):
   630  		close(stopc)
   631  		wg.Wait()
   632  		testutil.FatalStack(t, "timeout")
   633  	}
   634  
   635  	close(stopc)
   636  	wg.Wait()
   637  }
   638  
   639  // TestHashKVWithCompactedAndFutureRevisions ensures that HashKV returns a correct hash when called
   640  // with a past revision (lower than compacted), a future revision, and the exact compacted revision
   641  func TestHashKVWithCompactedAndFutureRevisions(t *testing.T) {
   642  	b, tmpPath := betesting.NewDefaultTmpBackend(t)
   643  	s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
   644  	defer cleanup(s, b, tmpPath)
   645  
   646  	rev := 10000
   647  	compactRev := rev / 2
   648  
   649  	for i := 2; i <= rev; i++ {
   650  		s.Put([]byte("foo"), []byte(fmt.Sprintf("bar%d", i)), lease.NoLease)
   651  	}
   652  	if _, err := s.Compact(traceutil.TODO(), int64(compactRev)); err != nil {
   653  		t.Fatal(err)
   654  	}
   655  
   656  	_, _, errFutureRev := s.HashStorage().HashByRev(int64(rev + 1))
   657  	if errFutureRev != ErrFutureRev {
   658  		t.Error(errFutureRev)
   659  	}
   660  
   661  	_, _, errPastRev := s.HashStorage().HashByRev(int64(compactRev - 1))
   662  	if errPastRev != ErrCompacted {
   663  		t.Error(errPastRev)
   664  	}
   665  
   666  	_, _, errCompactRev := s.HashStorage().HashByRev(int64(compactRev))
   667  	if errCompactRev != nil {
   668  		t.Error(errCompactRev)
   669  	}
   670  }
   671  
   672  // TestHashKVZeroRevision ensures that "HashByRev(0)" computes
   673  // correct hash value with latest revision.
   674  func TestHashKVZeroRevision(t *testing.T) {
   675  	b, tmpPath := betesting.NewDefaultTmpBackend(t)
   676  	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
   677  	defer cleanup(s, b, tmpPath)
   678  
   679  	rev := 10000
   680  	for i := 2; i <= rev; i++ {
   681  		s.Put([]byte("foo"), []byte(fmt.Sprintf("bar%d", i)), lease.NoLease)
   682  	}
   683  	if _, err := s.Compact(traceutil.TODO(), int64(rev/2)); err != nil {
   684  		t.Fatal(err)
   685  	}
   686  
   687  	hash1, _, err := s.HashStorage().HashByRev(int64(rev))
   688  	if err != nil {
   689  		t.Fatal(err)
   690  	}
   691  	var hash2 KeyValueHash
   692  	hash2, _, err = s.HashStorage().HashByRev(0)
   693  	if err != nil {
   694  		t.Fatal(err)
   695  	}
   696  	if hash1 != hash2 {
   697  		t.Errorf("hash %d (rev %d) != hash %d (rev 0)", hash1, rev, hash2)
   698  	}
   699  }
   700  
   701  func TestTxnPut(t *testing.T) {
   702  	// assign arbitrary size
   703  	bytesN := 30
   704  	sliceN := 100
   705  	keys := createBytesSlice(bytesN, sliceN)
   706  	vals := createBytesSlice(bytesN, sliceN)
   707  
   708  	b, tmpPath := betesting.NewDefaultTmpBackend(t)
   709  	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
   710  	defer cleanup(s, b, tmpPath)
   711  
   712  	for i := 0; i < sliceN; i++ {
   713  		txn := s.Write(traceutil.TODO())
   714  		base := int64(i + 2)
   715  		if rev := txn.Put(keys[i], vals[i], lease.NoLease); rev != base {
   716  			t.Errorf("#%d: rev = %d, want %d", i, rev, base)
   717  		}
   718  		txn.End()
   719  	}
   720  }
   721  
   722  // TestConcurrentReadNotBlockingWrite ensures Read does not blocking Write after its creation
   723  func TestConcurrentReadNotBlockingWrite(t *testing.T) {
   724  	b, tmpPath := betesting.NewDefaultTmpBackend(t)
   725  	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
   726  	defer os.Remove(tmpPath)
   727  
   728  	// write something to read later
   729  	s.Put([]byte("foo"), []byte("bar"), lease.NoLease)
   730  
   731  	// readTx simulates a long read request
   732  	readTx1 := s.Read(ConcurrentReadTxMode, traceutil.TODO())
   733  
   734  	// write should not be blocked by reads
   735  	done := make(chan struct{}, 1)
   736  	go func() {
   737  		s.Put([]byte("foo"), []byte("newBar"), lease.NoLease) // this is a write Txn
   738  		done <- struct{}{}
   739  	}()
   740  	select {
   741  	case <-done:
   742  	case <-time.After(1 * time.Second):
   743  		t.Fatalf("write should not be blocked by read")
   744  	}
   745  
   746  	// readTx2 simulates a short read request
   747  	readTx2 := s.Read(ConcurrentReadTxMode, traceutil.TODO())
   748  	ro := RangeOptions{Limit: 1, Rev: 0, Count: false}
   749  	ret, err := readTx2.Range(context.TODO(), []byte("foo"), nil, ro)
   750  	if err != nil {
   751  		t.Fatalf("failed to range: %v", err)
   752  	}
   753  	// readTx2 should see the result of new write
   754  	w := mvccpb.KeyValue{
   755  		Key:            []byte("foo"),
   756  		Value:          []byte("newBar"),
   757  		CreateRevision: 2,
   758  		ModRevision:    3,
   759  		Version:        2,
   760  	}
   761  	if !reflect.DeepEqual(ret.KVs[0], w) {
   762  		t.Fatalf("range result = %+v, want = %+v", ret.KVs[0], w)
   763  	}
   764  	readTx2.End()
   765  
   766  	ret, err = readTx1.Range(context.TODO(), []byte("foo"), nil, ro)
   767  	if err != nil {
   768  		t.Fatalf("failed to range: %v", err)
   769  	}
   770  	// readTx1 should not see the result of new write
   771  	w = mvccpb.KeyValue{
   772  		Key:            []byte("foo"),
   773  		Value:          []byte("bar"),
   774  		CreateRevision: 2,
   775  		ModRevision:    2,
   776  		Version:        1,
   777  	}
   778  	if !reflect.DeepEqual(ret.KVs[0], w) {
   779  		t.Fatalf("range result = %+v, want = %+v", ret.KVs[0], w)
   780  	}
   781  	readTx1.End()
   782  }
   783  
   784  // TestConcurrentReadTxAndWrite creates random concurrent Reads and Writes, and ensures Reads always see latest Writes
   785  func TestConcurrentReadTxAndWrite(t *testing.T) {
   786  	var (
   787  		numOfReads           = 100
   788  		numOfWrites          = 100
   789  		maxNumOfPutsPerWrite = 10
   790  		committedKVs         kvs        // committedKVs records the key-value pairs written by the finished Write Txns
   791  		mu                   sync.Mutex // mu protects committedKVs
   792  	)
   793  	b, tmpPath := betesting.NewDefaultTmpBackend(t)
   794  	s := NewStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})
   795  	defer os.Remove(tmpPath)
   796  
   797  	var wg sync.WaitGroup
   798  	wg.Add(numOfWrites)
   799  	for i := 0; i < numOfWrites; i++ {
   800  		go func() {
   801  			defer wg.Done()
   802  			time.Sleep(time.Duration(mrand.Intn(100)) * time.Millisecond) // random starting time
   803  
   804  			tx := s.Write(traceutil.TODO())
   805  			numOfPuts := mrand.Intn(maxNumOfPutsPerWrite) + 1
   806  			var pendingKvs kvs
   807  			for j := 0; j < numOfPuts; j++ {
   808  				k := []byte(strconv.Itoa(mrand.Int()))
   809  				v := []byte(strconv.Itoa(mrand.Int()))
   810  				tx.Put(k, v, lease.NoLease)
   811  				pendingKvs = append(pendingKvs, kv{k, v})
   812  			}
   813  			// reads should not see above Puts until write is finished
   814  			mu.Lock()
   815  			committedKVs = merge(committedKVs, pendingKvs) // update shared data structure
   816  			tx.End()
   817  			mu.Unlock()
   818  		}()
   819  	}
   820  
   821  	wg.Add(numOfReads)
   822  	for i := 0; i < numOfReads; i++ {
   823  		go func() {
   824  			defer wg.Done()
   825  			time.Sleep(time.Duration(mrand.Intn(100)) * time.Millisecond) // random starting time
   826  
   827  			mu.Lock()
   828  			wKVs := make(kvs, len(committedKVs))
   829  			copy(wKVs, committedKVs)
   830  			tx := s.Read(ConcurrentReadTxMode, traceutil.TODO())
   831  			mu.Unlock()
   832  			// get all keys in backend store, and compare with wKVs
   833  			ret, err := tx.Range(context.TODO(), []byte("\x00000000"), []byte("\xffffffff"), RangeOptions{})
   834  			tx.End()
   835  			if err != nil {
   836  				t.Errorf("failed to range keys: %v", err)
   837  				return
   838  			}
   839  			if len(wKVs) == 0 && len(ret.KVs) == 0 { // no committed KVs yet
   840  				return
   841  			}
   842  			var result kvs
   843  			for _, keyValue := range ret.KVs {
   844  				result = append(result, kv{keyValue.Key, keyValue.Value})
   845  			}
   846  			if !reflect.DeepEqual(wKVs, result) {
   847  				t.Errorf("unexpected range result") // too many key value pairs, skip printing them
   848  			}
   849  		}()
   850  	}
   851  
   852  	// wait until goroutines finish or timeout
   853  	doneC := make(chan struct{})
   854  	go func() {
   855  		wg.Wait()
   856  		close(doneC)
   857  	}()
   858  	select {
   859  	case <-doneC:
   860  	case <-time.After(5 * time.Minute):
   861  		testutil.FatalStack(t, "timeout")
   862  	}
   863  }
   864  
   865  type kv struct {
   866  	key []byte
   867  	val []byte
   868  }
   869  
   870  type kvs []kv
   871  
   872  func (kvs kvs) Len() int           { return len(kvs) }
   873  func (kvs kvs) Less(i, j int) bool { return bytes.Compare(kvs[i].key, kvs[j].key) < 0 }
   874  func (kvs kvs) Swap(i, j int)      { kvs[i], kvs[j] = kvs[j], kvs[i] }
   875  
   876  func merge(dst, src kvs) kvs {
   877  	dst = append(dst, src...)
   878  	sort.Stable(dst)
   879  	// remove duplicates, using only the newest value
   880  	// ref: tx_buffer.go
   881  	widx := 0
   882  	for ridx := 1; ridx < len(dst); ridx++ {
   883  		if !bytes.Equal(dst[widx].key, dst[ridx].key) {
   884  			widx++
   885  		}
   886  		dst[widx] = dst[ridx]
   887  	}
   888  	return dst[:widx+1]
   889  }
   890  
   891  // TODO: test attach key to lessor
   892  
   893  func newTestRevBytes(rev revision) []byte {
   894  	bytes := newRevBytes()
   895  	revToBytes(rev, bytes)
   896  	return bytes
   897  }
   898  
   899  func newTestKeyBytes(rev revision, tombstone bool) []byte {
   900  	bytes := newRevBytes()
   901  	revToBytes(rev, bytes)
   902  	if tombstone {
   903  		bytes = appendMarkTombstone(zap.NewExample(), bytes)
   904  	}
   905  	return bytes
   906  }
   907  
   908  func newFakeStore() *store {
   909  	b := &fakeBackend{&fakeBatchTx{
   910  		Recorder:   &testutil.RecorderBuffered{},
   911  		rangeRespc: make(chan rangeResp, 5)}}
   912  	s := &store{
   913  		cfg:            StoreConfig{CompactionBatchLimit: 10000},
   914  		b:              b,
   915  		le:             &lease.FakeLessor{},
   916  		kvindex:        newFakeIndex(),
   917  		currentRev:     0,
   918  		compactMainRev: -1,
   919  		fifoSched:      schedule.NewFIFOScheduler(),
   920  		stopc:          make(chan struct{}),
   921  		lg:             zap.NewExample(),
   922  	}
   923  	s.ReadView, s.WriteView = &readView{s}, &writeView{s}
   924  	s.hashes = newHashStorage(zap.NewExample(), s)
   925  	return s
   926  }
   927  
   928  func newFakeIndex() *fakeIndex {
   929  	return &fakeIndex{
   930  		Recorder:              &testutil.RecorderBuffered{},
   931  		indexGetRespc:         make(chan indexGetResp, 1),
   932  		indexRangeRespc:       make(chan indexRangeResp, 1),
   933  		indexRangeEventsRespc: make(chan indexRangeEventsResp, 1),
   934  		indexCompactRespc:     make(chan map[revision]struct{}, 1),
   935  	}
   936  }
   937  
   938  type rangeResp struct {
   939  	keys [][]byte
   940  	vals [][]byte
   941  }
   942  
   943  type fakeBatchTx struct {
   944  	testutil.Recorder
   945  	rangeRespc chan rangeResp
   946  }
   947  
   948  func (b *fakeBatchTx) LockInsideApply()                         {}
   949  func (b *fakeBatchTx) LockOutsideApply()                        {}
   950  func (b *fakeBatchTx) Lock()                                    {}
   951  func (b *fakeBatchTx) Unlock()                                  {}
   952  func (b *fakeBatchTx) RLock()                                   {}
   953  func (b *fakeBatchTx) RUnlock()                                 {}
   954  func (b *fakeBatchTx) UnsafeCreateBucket(bucket backend.Bucket) {}
   955  func (b *fakeBatchTx) UnsafeDeleteBucket(bucket backend.Bucket) {}
   956  func (b *fakeBatchTx) UnsafePut(bucket backend.Bucket, key []byte, value []byte) {
   957  	b.Recorder.Record(testutil.Action{Name: "put", Params: []interface{}{bucket, key, value}})
   958  }
   959  func (b *fakeBatchTx) UnsafeSeqPut(bucket backend.Bucket, key []byte, value []byte) {
   960  	b.Recorder.Record(testutil.Action{Name: "seqput", Params: []interface{}{bucket, key, value}})
   961  }
   962  func (b *fakeBatchTx) UnsafeRange(bucket backend.Bucket, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte) {
   963  	b.Recorder.Record(testutil.Action{Name: "range", Params: []interface{}{bucket, key, endKey, limit}})
   964  	r := <-b.rangeRespc
   965  	return r.keys, r.vals
   966  }
   967  func (b *fakeBatchTx) UnsafeDelete(bucket backend.Bucket, key []byte) {
   968  	b.Recorder.Record(testutil.Action{Name: "delete", Params: []interface{}{bucket, key}})
   969  }
   970  func (b *fakeBatchTx) UnsafeForEach(bucket backend.Bucket, visitor func(k, v []byte) error) error {
   971  	return nil
   972  }
   973  func (b *fakeBatchTx) Commit()        {}
   974  func (b *fakeBatchTx) CommitAndStop() {}
   975  
   976  type fakeBackend struct {
   977  	tx *fakeBatchTx
   978  }
   979  
   980  func (b *fakeBackend) BatchTx() backend.BatchTx                                   { return b.tx }
   981  func (b *fakeBackend) ReadTx() backend.ReadTx                                     { return b.tx }
   982  func (b *fakeBackend) ConcurrentReadTx() backend.ReadTx                           { return b.tx }
   983  func (b *fakeBackend) Hash(func(bucketName, keyName []byte) bool) (uint32, error) { return 0, nil }
   984  func (b *fakeBackend) Size() int64                                                { return 0 }
   985  func (b *fakeBackend) SizeInUse() int64                                           { return 0 }
   986  func (b *fakeBackend) OpenReadTxN() int64                                         { return 0 }
   987  func (b *fakeBackend) Snapshot() backend.Snapshot                                 { return nil }
   988  func (b *fakeBackend) ForceCommit()                                               {}
   989  func (b *fakeBackend) Defrag() error                                              { return nil }
   990  func (b *fakeBackend) Close() error                                               { return nil }
   991  func (b *fakeBackend) SetTxPostLockInsideApplyHook(func())                        {}
   992  
   993  type indexGetResp struct {
   994  	rev     revision
   995  	created revision
   996  	ver     int64
   997  	err     error
   998  }
   999  
  1000  type indexRangeResp struct {
  1001  	keys [][]byte
  1002  	revs []revision
  1003  }
  1004  
  1005  type indexRangeEventsResp struct {
  1006  	revs []revision
  1007  }
  1008  
  1009  type fakeIndex struct {
  1010  	testutil.Recorder
  1011  	indexGetRespc         chan indexGetResp
  1012  	indexRangeRespc       chan indexRangeResp
  1013  	indexRangeEventsRespc chan indexRangeEventsResp
  1014  	indexCompactRespc     chan map[revision]struct{}
  1015  }
  1016  
  1017  func (i *fakeIndex) Revisions(key, end []byte, atRev int64, limit int) ([]revision, int) {
  1018  	_, rev := i.Range(key, end, atRev)
  1019  	if len(rev) >= limit {
  1020  		rev = rev[:limit]
  1021  	}
  1022  	return rev, len(rev)
  1023  }
  1024  
  1025  func (i *fakeIndex) CountRevisions(key, end []byte, atRev int64) int {
  1026  	_, rev := i.Range(key, end, atRev)
  1027  	return len(rev)
  1028  }
  1029  
  1030  func (i *fakeIndex) Get(key []byte, atRev int64) (rev, created revision, ver int64, err error) {
  1031  	i.Recorder.Record(testutil.Action{Name: "get", Params: []interface{}{key, atRev}})
  1032  	r := <-i.indexGetRespc
  1033  	return r.rev, r.created, r.ver, r.err
  1034  }
  1035  func (i *fakeIndex) Range(key, end []byte, atRev int64) ([][]byte, []revision) {
  1036  	i.Recorder.Record(testutil.Action{Name: "range", Params: []interface{}{key, end, atRev}})
  1037  	r := <-i.indexRangeRespc
  1038  	return r.keys, r.revs
  1039  }
  1040  func (i *fakeIndex) Put(key []byte, rev revision) {
  1041  	i.Recorder.Record(testutil.Action{Name: "put", Params: []interface{}{key, rev}})
  1042  }
  1043  func (i *fakeIndex) Tombstone(key []byte, rev revision) error {
  1044  	i.Recorder.Record(testutil.Action{Name: "tombstone", Params: []interface{}{key, rev}})
  1045  	return nil
  1046  }
  1047  func (i *fakeIndex) RangeSince(key, end []byte, rev int64) []revision {
  1048  	i.Recorder.Record(testutil.Action{Name: "rangeEvents", Params: []interface{}{key, end, rev}})
  1049  	r := <-i.indexRangeEventsRespc
  1050  	return r.revs
  1051  }
  1052  func (i *fakeIndex) Compact(rev int64) map[revision]struct{} {
  1053  	i.Recorder.Record(testutil.Action{Name: "compact", Params: []interface{}{rev}})
  1054  	return <-i.indexCompactRespc
  1055  }
  1056  func (i *fakeIndex) Keep(rev int64) map[revision]struct{} {
  1057  	i.Recorder.Record(testutil.Action{Name: "keep", Params: []interface{}{rev}})
  1058  	return <-i.indexCompactRespc
  1059  }
  1060  func (i *fakeIndex) Equal(b index) bool { return false }
  1061  
  1062  func (i *fakeIndex) Insert(ki *keyIndex) {
  1063  	i.Recorder.Record(testutil.Action{Name: "insert", Params: []interface{}{ki}})
  1064  }
  1065  
  1066  func (i *fakeIndex) KeyIndex(ki *keyIndex) *keyIndex {
  1067  	i.Recorder.Record(testutil.Action{Name: "keyIndex", Params: []interface{}{ki}})
  1068  	return nil
  1069  }
  1070  
  1071  func createBytesSlice(bytesN, sliceN int) [][]byte {
  1072  	rs := [][]byte{}
  1073  	for len(rs) != sliceN {
  1074  		v := make([]byte, bytesN)
  1075  		if _, err := rand.Read(v); err != nil {
  1076  			panic(err)
  1077  		}
  1078  		rs = append(rs, v)
  1079  	}
  1080  	return rs
  1081  }
  1082  

View as plain text