...

Source file src/go.etcd.io/etcd/server/v3/mvcc/backend/batch_tx_test.go

Documentation: go.etcd.io/etcd/server/v3/mvcc/backend

     1  // Copyright 2015 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package backend_test
    16  
    17  import (
    18  	"reflect"
    19  	"testing"
    20  	"time"
    21  
    22  	"github.com/google/go-cmp/cmp"
    23  	bolt "go.etcd.io/bbolt"
    24  	"go.etcd.io/etcd/server/v3/mvcc/backend"
    25  	betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing"
    26  	"go.etcd.io/etcd/server/v3/mvcc/buckets"
    27  )
    28  
    29  func TestBatchTxPut(t *testing.T) {
    30  	b, _ := betesting.NewTmpBackend(t, time.Hour, 10000)
    31  	defer betesting.Close(t, b)
    32  
    33  	tx := b.BatchTx()
    34  
    35  	tx.Lock()
    36  
    37  	// create bucket
    38  	tx.UnsafeCreateBucket(buckets.Test)
    39  
    40  	// put
    41  	v := []byte("bar")
    42  	tx.UnsafePut(buckets.Test, []byte("foo"), v)
    43  
    44  	tx.Unlock()
    45  
    46  	// check put result before and after tx is committed
    47  	for k := 0; k < 2; k++ {
    48  		tx.Lock()
    49  		_, gv := tx.UnsafeRange(buckets.Test, []byte("foo"), nil, 0)
    50  		tx.Unlock()
    51  		if !reflect.DeepEqual(gv[0], v) {
    52  			t.Errorf("v = %s, want %s", string(gv[0]), string(v))
    53  		}
    54  		tx.Commit()
    55  	}
    56  }
    57  
    58  func TestBatchTxRange(t *testing.T) {
    59  	b, _ := betesting.NewTmpBackend(t, time.Hour, 10000)
    60  	defer betesting.Close(t, b)
    61  
    62  	tx := b.BatchTx()
    63  	tx.Lock()
    64  	defer tx.Unlock()
    65  
    66  	tx.UnsafeCreateBucket(buckets.Test)
    67  	// put keys
    68  	allKeys := [][]byte{[]byte("foo"), []byte("foo1"), []byte("foo2")}
    69  	allVals := [][]byte{[]byte("bar"), []byte("bar1"), []byte("bar2")}
    70  	for i := range allKeys {
    71  		tx.UnsafePut(buckets.Test, allKeys[i], allVals[i])
    72  	}
    73  
    74  	tests := []struct {
    75  		key    []byte
    76  		endKey []byte
    77  		limit  int64
    78  
    79  		wkeys [][]byte
    80  		wvals [][]byte
    81  	}{
    82  		// single key
    83  		{
    84  			[]byte("foo"), nil, 0,
    85  			allKeys[:1], allVals[:1],
    86  		},
    87  		// single key, bad
    88  		{
    89  			[]byte("doo"), nil, 0,
    90  			nil, nil,
    91  		},
    92  		// key range
    93  		{
    94  			[]byte("foo"), []byte("foo1"), 0,
    95  			allKeys[:1], allVals[:1],
    96  		},
    97  		// key range, get all keys
    98  		{
    99  			[]byte("foo"), []byte("foo3"), 0,
   100  			allKeys, allVals,
   101  		},
   102  		// key range, bad
   103  		{
   104  			[]byte("goo"), []byte("goo3"), 0,
   105  			nil, nil,
   106  		},
   107  		// key range with effective limit
   108  		{
   109  			[]byte("foo"), []byte("foo3"), 1,
   110  			allKeys[:1], allVals[:1],
   111  		},
   112  		// key range with limit
   113  		{
   114  			[]byte("foo"), []byte("foo3"), 4,
   115  			allKeys, allVals,
   116  		},
   117  	}
   118  	for i, tt := range tests {
   119  		keys, vals := tx.UnsafeRange(buckets.Test, tt.key, tt.endKey, tt.limit)
   120  		if !reflect.DeepEqual(keys, tt.wkeys) {
   121  			t.Errorf("#%d: keys = %+v, want %+v", i, keys, tt.wkeys)
   122  		}
   123  		if !reflect.DeepEqual(vals, tt.wvals) {
   124  			t.Errorf("#%d: vals = %+v, want %+v", i, vals, tt.wvals)
   125  		}
   126  	}
   127  }
   128  
   129  func TestBatchTxDelete(t *testing.T) {
   130  	b, _ := betesting.NewTmpBackend(t, time.Hour, 10000)
   131  	defer betesting.Close(t, b)
   132  
   133  	tx := b.BatchTx()
   134  	tx.Lock()
   135  
   136  	tx.UnsafeCreateBucket(buckets.Test)
   137  	tx.UnsafePut(buckets.Test, []byte("foo"), []byte("bar"))
   138  
   139  	tx.UnsafeDelete(buckets.Test, []byte("foo"))
   140  
   141  	tx.Unlock()
   142  
   143  	// check put result before and after tx is committed
   144  	for k := 0; k < 2; k++ {
   145  		tx.Lock()
   146  		ks, _ := tx.UnsafeRange(buckets.Test, []byte("foo"), nil, 0)
   147  		tx.Unlock()
   148  		if len(ks) != 0 {
   149  			t.Errorf("keys on foo = %v, want nil", ks)
   150  		}
   151  		tx.Commit()
   152  	}
   153  }
   154  
   155  func TestBatchTxCommit(t *testing.T) {
   156  	b, _ := betesting.NewTmpBackend(t, time.Hour, 10000)
   157  	defer betesting.Close(t, b)
   158  
   159  	tx := b.BatchTx()
   160  	tx.Lock()
   161  	tx.UnsafeCreateBucket(buckets.Test)
   162  	tx.UnsafePut(buckets.Test, []byte("foo"), []byte("bar"))
   163  	tx.Unlock()
   164  
   165  	tx.Commit()
   166  
   167  	// check whether put happens via db view
   168  	backend.DbFromBackendForTest(b).View(func(tx *bolt.Tx) error {
   169  		bucket := tx.Bucket(buckets.Test.Name())
   170  		if bucket == nil {
   171  			t.Errorf("bucket test does not exit")
   172  			return nil
   173  		}
   174  		v := bucket.Get([]byte("foo"))
   175  		if v == nil {
   176  			t.Errorf("foo key failed to written in backend")
   177  		}
   178  		return nil
   179  	})
   180  }
   181  
   182  func TestBatchTxBatchLimitCommit(t *testing.T) {
   183  	// start backend with batch limit 1 so one write can
   184  	// trigger a commit
   185  	b, _ := betesting.NewTmpBackend(t, time.Hour, 1)
   186  	defer betesting.Close(t, b)
   187  
   188  	tx := b.BatchTx()
   189  	tx.Lock()
   190  	tx.UnsafeCreateBucket(buckets.Test)
   191  	tx.UnsafePut(buckets.Test, []byte("foo"), []byte("bar"))
   192  	tx.Unlock()
   193  
   194  	// batch limit commit should have been triggered
   195  	// check whether put happens via db view
   196  	backend.DbFromBackendForTest(b).View(func(tx *bolt.Tx) error {
   197  		bucket := tx.Bucket(buckets.Test.Name())
   198  		if bucket == nil {
   199  			t.Errorf("bucket test does not exit")
   200  			return nil
   201  		}
   202  		v := bucket.Get([]byte("foo"))
   203  		if v == nil {
   204  			t.Errorf("foo key failed to written in backend")
   205  		}
   206  		return nil
   207  	})
   208  }
   209  
   210  func TestRangeAfterDeleteBucketMatch(t *testing.T) {
   211  	b, _ := betesting.NewTmpBackend(t, time.Hour, 10000)
   212  	defer betesting.Close(t, b)
   213  
   214  	tx := b.BatchTx()
   215  	tx.Lock()
   216  	tx.UnsafeCreateBucket(buckets.Test)
   217  	tx.UnsafePut(buckets.Test, []byte("foo"), []byte("bar"))
   218  	tx.Unlock()
   219  	tx.Commit()
   220  
   221  	checkForEach(t, b.BatchTx(), b.ReadTx(), [][]byte{[]byte("foo")}, [][]byte{[]byte("bar")})
   222  
   223  	tx.Lock()
   224  	tx.UnsafeDeleteBucket(buckets.Test)
   225  	tx.Unlock()
   226  
   227  	checkForEach(t, b.BatchTx(), b.ReadTx(), nil, nil)
   228  }
   229  
   230  func TestRangeAfterDeleteMatch(t *testing.T) {
   231  	b, _ := betesting.NewTmpBackend(t, time.Hour, 10000)
   232  	defer betesting.Close(t, b)
   233  
   234  	tx := b.BatchTx()
   235  
   236  	tx.Lock()
   237  	tx.UnsafeCreateBucket(buckets.Test)
   238  	tx.UnsafePut(buckets.Test, []byte("foo"), []byte("bar"))
   239  	tx.Unlock()
   240  	tx.Commit()
   241  
   242  	checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), []byte("foo"), nil, 0)
   243  	checkForEach(t, b.BatchTx(), b.ReadTx(), [][]byte{[]byte("foo")}, [][]byte{[]byte("bar")})
   244  
   245  	tx.Lock()
   246  	tx.UnsafeDelete(buckets.Test, []byte("foo"))
   247  	tx.Unlock()
   248  
   249  	checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), []byte("foo"), nil, 0)
   250  	checkForEach(t, b.BatchTx(), b.ReadTx(), nil, nil)
   251  }
   252  
   253  func checkRangeResponseMatch(t *testing.T, tx backend.BatchTx, rtx backend.ReadTx, key, endKey []byte, limit int64) {
   254  	tx.Lock()
   255  	ks1, vs1 := tx.UnsafeRange(buckets.Test, key, endKey, limit)
   256  	tx.Unlock()
   257  
   258  	rtx.RLock()
   259  	ks2, vs2 := rtx.UnsafeRange(buckets.Test, key, endKey, limit)
   260  	rtx.RUnlock()
   261  
   262  	if diff := cmp.Diff(ks1, ks2); diff != "" {
   263  		t.Errorf("keys on read and batch transaction doesn't match, diff: %s", diff)
   264  	}
   265  	if diff := cmp.Diff(vs1, vs2); diff != "" {
   266  		t.Errorf("values on read and batch transaction doesn't match, diff: %s", diff)
   267  	}
   268  }
   269  
   270  func checkForEach(t *testing.T, tx backend.BatchTx, rtx backend.ReadTx, expectedKeys, expectedValues [][]byte) {
   271  	tx.Lock()
   272  	checkUnsafeForEach(t, tx, expectedKeys, expectedValues)
   273  	tx.Unlock()
   274  
   275  	rtx.RLock()
   276  	checkUnsafeForEach(t, rtx, expectedKeys, expectedValues)
   277  	rtx.RUnlock()
   278  }
   279  
   280  func checkUnsafeForEach(t *testing.T, tx backend.ReadTx, expectedKeys, expectedValues [][]byte) {
   281  	var ks, vs [][]byte
   282  	tx.UnsafeForEach(buckets.Test, func(k, v []byte) error {
   283  		ks = append(ks, k)
   284  		vs = append(vs, v)
   285  		return nil
   286  	})
   287  
   288  	if diff := cmp.Diff(ks, expectedKeys); diff != "" {
   289  		t.Errorf("keys on transaction doesn't match expected, diff: %s", diff)
   290  	}
   291  	if diff := cmp.Diff(vs, expectedValues); diff != "" {
   292  		t.Errorf("values on transaction doesn't match expected, diff: %s", diff)
   293  	}
   294  }
   295  

View as plain text