...

Source file src/go.etcd.io/etcd/server/v3/mvcc/backend/backend_test.go

Documentation: go.etcd.io/etcd/server/v3/mvcc/backend

     1  // Copyright 2015 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package backend_test
    16  
    17  import (
    18  	"fmt"
    19  	"io/ioutil"
    20  	"reflect"
    21  	"testing"
    22  	"time"
    23  
    24  	"github.com/stretchr/testify/assert"
    25  	bolt "go.etcd.io/bbolt"
    26  	"go.etcd.io/etcd/server/v3/mvcc/backend"
    27  	betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing"
    28  	"go.etcd.io/etcd/server/v3/mvcc/buckets"
    29  )
    30  
    31  func TestBackendClose(t *testing.T) {
    32  	b, _ := betesting.NewTmpBackend(t, time.Hour, 10000)
    33  
    34  	// check close could work
    35  	done := make(chan struct{})
    36  	go func() {
    37  		err := b.Close()
    38  		if err != nil {
    39  			t.Errorf("close error = %v, want nil", err)
    40  		}
    41  		done <- struct{}{}
    42  	}()
    43  	select {
    44  	case <-done:
    45  	case <-time.After(10 * time.Second):
    46  		t.Errorf("failed to close database in 10s")
    47  	}
    48  }
    49  
    50  func TestBackendSnapshot(t *testing.T) {
    51  	b, _ := betesting.NewTmpBackend(t, time.Hour, 10000)
    52  	defer betesting.Close(t, b)
    53  
    54  	tx := b.BatchTx()
    55  	tx.Lock()
    56  	tx.UnsafeCreateBucket(buckets.Test)
    57  	tx.UnsafePut(buckets.Test, []byte("foo"), []byte("bar"))
    58  	tx.Unlock()
    59  	b.ForceCommit()
    60  
    61  	// write snapshot to a new file
    62  	f, err := ioutil.TempFile(t.TempDir(), "etcd_backend_test")
    63  	if err != nil {
    64  		t.Fatal(err)
    65  	}
    66  	snap := b.Snapshot()
    67  	defer func() { assert.NoError(t, snap.Close()) }()
    68  	if _, err := snap.WriteTo(f); err != nil {
    69  		t.Fatal(err)
    70  	}
    71  	assert.NoError(t, f.Close())
    72  
    73  	// bootstrap new backend from the snapshot
    74  	bcfg := backend.DefaultBackendConfig()
    75  	bcfg.Path, bcfg.BatchInterval, bcfg.BatchLimit = f.Name(), time.Hour, 10000
    76  	nb := backend.New(bcfg)
    77  	defer betesting.Close(t, nb)
    78  
    79  	newTx := nb.BatchTx()
    80  	newTx.Lock()
    81  	ks, _ := newTx.UnsafeRange(buckets.Test, []byte("foo"), []byte("goo"), 0)
    82  	if len(ks) != 1 {
    83  		t.Errorf("len(kvs) = %d, want 1", len(ks))
    84  	}
    85  	newTx.Unlock()
    86  }
    87  
    88  func TestBackendBatchIntervalCommit(t *testing.T) {
    89  	// start backend with super short batch interval so
    90  	// we do not need to wait long before commit to happen.
    91  	b, _ := betesting.NewTmpBackend(t, time.Nanosecond, 10000)
    92  	defer betesting.Close(t, b)
    93  
    94  	pc := backend.CommitsForTest(b)
    95  
    96  	tx := b.BatchTx()
    97  	tx.Lock()
    98  	tx.UnsafeCreateBucket(buckets.Test)
    99  	tx.UnsafePut(buckets.Test, []byte("foo"), []byte("bar"))
   100  	tx.Unlock()
   101  
   102  	for i := 0; i < 10; i++ {
   103  		if backend.CommitsForTest(b) >= pc+1 {
   104  			break
   105  		}
   106  		time.Sleep(time.Duration(i*100) * time.Millisecond)
   107  	}
   108  
   109  	// check whether put happens via db view
   110  	assert.NoError(t, backend.DbFromBackendForTest(b).View(func(tx *bolt.Tx) error {
   111  		bucket := tx.Bucket([]byte("test"))
   112  		if bucket == nil {
   113  			t.Errorf("bucket test does not exit")
   114  			return nil
   115  		}
   116  		v := bucket.Get([]byte("foo"))
   117  		if v == nil {
   118  			t.Errorf("foo key failed to written in backend")
   119  		}
   120  		return nil
   121  	}))
   122  }
   123  
   124  func TestBackendDefrag(t *testing.T) {
   125  	bcfg := backend.DefaultBackendConfig()
   126  	// Make sure we change BackendFreelistType
   127  	// The goal is to verify that we restore config option after defrag.
   128  	if bcfg.BackendFreelistType == bolt.FreelistMapType {
   129  		bcfg.BackendFreelistType = bolt.FreelistArrayType
   130  	} else {
   131  		bcfg.BackendFreelistType = bolt.FreelistMapType
   132  	}
   133  
   134  	b, _ := betesting.NewTmpBackendFromCfg(t, bcfg)
   135  
   136  	defer betesting.Close(t, b)
   137  
   138  	tx := b.BatchTx()
   139  	tx.Lock()
   140  	tx.UnsafeCreateBucket(buckets.Test)
   141  	for i := 0; i < backend.DefragLimitForTest()+100; i++ {
   142  		tx.UnsafePut(buckets.Test, []byte(fmt.Sprintf("foo_%d", i)), []byte("bar"))
   143  	}
   144  	tx.Unlock()
   145  	b.ForceCommit()
   146  
   147  	// remove some keys to ensure the disk space will be reclaimed after defrag
   148  	tx = b.BatchTx()
   149  	tx.Lock()
   150  	for i := 0; i < 50; i++ {
   151  		tx.UnsafeDelete(buckets.Test, []byte(fmt.Sprintf("foo_%d", i)))
   152  	}
   153  	tx.Unlock()
   154  	b.ForceCommit()
   155  
   156  	size := b.Size()
   157  
   158  	// shrink and check hash
   159  	oh, err := b.Hash(nil)
   160  	if err != nil {
   161  		t.Fatal(err)
   162  	}
   163  
   164  	err = b.Defrag()
   165  	if err != nil {
   166  		t.Fatal(err)
   167  	}
   168  
   169  	nh, err := b.Hash(nil)
   170  	if err != nil {
   171  		t.Fatal(err)
   172  	}
   173  	if oh != nh {
   174  		t.Errorf("hash = %v, want %v", nh, oh)
   175  	}
   176  
   177  	nsize := b.Size()
   178  	if nsize >= size {
   179  		t.Errorf("new size = %v, want < %d", nsize, size)
   180  	}
   181  	db := backend.DbFromBackendForTest(b)
   182  	if db.FreelistType != bcfg.BackendFreelistType {
   183  		t.Errorf("db FreelistType = [%v], want [%v]", db.FreelistType, bcfg.BackendFreelistType)
   184  	}
   185  
   186  	// try put more keys after shrink.
   187  	tx = b.BatchTx()
   188  	tx.Lock()
   189  	tx.UnsafeCreateBucket(buckets.Test)
   190  	tx.UnsafePut(buckets.Test, []byte("more"), []byte("bar"))
   191  	tx.Unlock()
   192  	b.ForceCommit()
   193  }
   194  
   195  // TestBackendWriteback ensures writes are stored to the read txn on write txn unlock.
   196  func TestBackendWriteback(t *testing.T) {
   197  	b, _ := betesting.NewDefaultTmpBackend(t)
   198  	defer betesting.Close(t, b)
   199  
   200  	tx := b.BatchTx()
   201  	tx.Lock()
   202  	tx.UnsafeCreateBucket(buckets.Key)
   203  	tx.UnsafePut(buckets.Key, []byte("abc"), []byte("bar"))
   204  	tx.UnsafePut(buckets.Key, []byte("def"), []byte("baz"))
   205  	tx.UnsafePut(buckets.Key, []byte("overwrite"), []byte("1"))
   206  	tx.Unlock()
   207  
   208  	// overwrites should be propagated too
   209  	tx.Lock()
   210  	tx.UnsafePut(buckets.Key, []byte("overwrite"), []byte("2"))
   211  	tx.Unlock()
   212  
   213  	keys := []struct {
   214  		key   []byte
   215  		end   []byte
   216  		limit int64
   217  
   218  		wkey [][]byte
   219  		wval [][]byte
   220  	}{
   221  		{
   222  			key: []byte("abc"),
   223  			end: nil,
   224  
   225  			wkey: [][]byte{[]byte("abc")},
   226  			wval: [][]byte{[]byte("bar")},
   227  		},
   228  		{
   229  			key: []byte("abc"),
   230  			end: []byte("def"),
   231  
   232  			wkey: [][]byte{[]byte("abc")},
   233  			wval: [][]byte{[]byte("bar")},
   234  		},
   235  		{
   236  			key: []byte("abc"),
   237  			end: []byte("deg"),
   238  
   239  			wkey: [][]byte{[]byte("abc"), []byte("def")},
   240  			wval: [][]byte{[]byte("bar"), []byte("baz")},
   241  		},
   242  		{
   243  			key:   []byte("abc"),
   244  			end:   []byte("\xff"),
   245  			limit: 1,
   246  
   247  			wkey: [][]byte{[]byte("abc")},
   248  			wval: [][]byte{[]byte("bar")},
   249  		},
   250  		{
   251  			key: []byte("abc"),
   252  			end: []byte("\xff"),
   253  
   254  			wkey: [][]byte{[]byte("abc"), []byte("def"), []byte("overwrite")},
   255  			wval: [][]byte{[]byte("bar"), []byte("baz"), []byte("2")},
   256  		},
   257  	}
   258  	rtx := b.ReadTx()
   259  	for i, tt := range keys {
   260  		func() {
   261  			rtx.RLock()
   262  			defer rtx.RUnlock()
   263  			k, v := rtx.UnsafeRange(buckets.Key, tt.key, tt.end, tt.limit)
   264  			if !reflect.DeepEqual(tt.wkey, k) || !reflect.DeepEqual(tt.wval, v) {
   265  				t.Errorf("#%d: want k=%+v, v=%+v; got k=%+v, v=%+v", i, tt.wkey, tt.wval, k, v)
   266  			}
   267  		}()
   268  	}
   269  }
   270  
   271  // TestConcurrentReadTx ensures that current read transaction can see all prior writes stored in read buffer
   272  func TestConcurrentReadTx(t *testing.T) {
   273  	b, _ := betesting.NewTmpBackend(t, time.Hour, 10000)
   274  	defer betesting.Close(t, b)
   275  
   276  	wtx1 := b.BatchTx()
   277  	wtx1.Lock()
   278  	wtx1.UnsafeCreateBucket(buckets.Key)
   279  	wtx1.UnsafePut(buckets.Key, []byte("abc"), []byte("ABC"))
   280  	wtx1.UnsafePut(buckets.Key, []byte("overwrite"), []byte("1"))
   281  	wtx1.Unlock()
   282  
   283  	wtx2 := b.BatchTx()
   284  	wtx2.Lock()
   285  	wtx2.UnsafePut(buckets.Key, []byte("def"), []byte("DEF"))
   286  	wtx2.UnsafePut(buckets.Key, []byte("overwrite"), []byte("2"))
   287  	wtx2.Unlock()
   288  
   289  	rtx := b.ConcurrentReadTx()
   290  	rtx.RLock() // no-op
   291  	k, v := rtx.UnsafeRange(buckets.Key, []byte("abc"), []byte("\xff"), 0)
   292  	rtx.RUnlock()
   293  	wKey := [][]byte{[]byte("abc"), []byte("def"), []byte("overwrite")}
   294  	wVal := [][]byte{[]byte("ABC"), []byte("DEF"), []byte("2")}
   295  	if !reflect.DeepEqual(wKey, k) || !reflect.DeepEqual(wVal, v) {
   296  		t.Errorf("want k=%+v, v=%+v; got k=%+v, v=%+v", wKey, wVal, k, v)
   297  	}
   298  }
   299  
   300  // TestBackendWritebackForEach checks that partially written / buffered
   301  // data is visited in the same order as fully committed data.
   302  func TestBackendWritebackForEach(t *testing.T) {
   303  	b, _ := betesting.NewTmpBackend(t, time.Hour, 10000)
   304  	defer betesting.Close(t, b)
   305  
   306  	tx := b.BatchTx()
   307  	tx.Lock()
   308  	tx.UnsafeCreateBucket(buckets.Key)
   309  	for i := 0; i < 5; i++ {
   310  		k := []byte(fmt.Sprintf("%04d", i))
   311  		tx.UnsafePut(buckets.Key, k, []byte("bar"))
   312  	}
   313  	tx.Unlock()
   314  
   315  	// writeback
   316  	b.ForceCommit()
   317  
   318  	tx.Lock()
   319  	tx.UnsafeCreateBucket(buckets.Key)
   320  	for i := 5; i < 20; i++ {
   321  		k := []byte(fmt.Sprintf("%04d", i))
   322  		tx.UnsafePut(buckets.Key, k, []byte("bar"))
   323  	}
   324  	tx.Unlock()
   325  
   326  	seq := ""
   327  	getSeq := func(k, v []byte) error {
   328  		seq += string(k)
   329  		return nil
   330  	}
   331  	rtx := b.ReadTx()
   332  	rtx.RLock()
   333  	assert.NoError(t, rtx.UnsafeForEach(buckets.Key, getSeq))
   334  	rtx.RUnlock()
   335  
   336  	partialSeq := seq
   337  
   338  	seq = ""
   339  	b.ForceCommit()
   340  
   341  	tx.Lock()
   342  	assert.NoError(t, tx.UnsafeForEach(buckets.Key, getSeq))
   343  	tx.Unlock()
   344  
   345  	if seq != partialSeq {
   346  		t.Fatalf("expected %q, got %q", seq, partialSeq)
   347  	}
   348  }
   349  

View as plain text