1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package backend_test
16
17 import (
18 "context"
19 "testing"
20 "time"
21
22 "github.com/stretchr/testify/assert"
23 "go.etcd.io/etcd/server/v3/mvcc/backend"
24 betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing"
25 "go.etcd.io/etcd/server/v3/mvcc/buckets"
26 )
27
28 var (
29 bucket = buckets.Test
30 key = []byte("key")
31 )
32
33 func TestBackendPreCommitHook(t *testing.T) {
34 be := newTestHooksBackend(t, backend.DefaultBackendConfig())
35
36 tx := be.BatchTx()
37 prepareBuckenAndKey(tx)
38 tx.Commit()
39
40
41 tx.Commit()
42
43 assert.Equal(t, ">cc", getCommitsKey(t, be), "expected 2 explict commits")
44 tx.Commit()
45 assert.Equal(t, ">ccc", getCommitsKey(t, be), "expected 3 explict commits")
46 }
47
48 func TestBackendAutoCommitLimitHook(t *testing.T) {
49 cfg := backend.DefaultBackendConfig()
50 cfg.BatchLimit = 3
51 be := newTestHooksBackend(t, cfg)
52
53 tx := be.BatchTx()
54 prepareBuckenAndKey(tx)
55
56 for i := 3; i <= 9; i++ {
57 write(tx, []byte("i"), []byte{byte(i)})
58 }
59
60 assert.Equal(t, ">ccc", getCommitsKey(t, be))
61 }
62
63 func write(tx backend.BatchTx, k, v []byte) {
64 tx.Lock()
65 defer tx.Unlock()
66 tx.UnsafePut(bucket, k, v)
67 }
68
69 func TestBackendAutoCommitBatchIntervalHook(t *testing.T) {
70 ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
71 defer cancel()
72
73 cfg := backend.DefaultBackendConfig()
74 cfg.BatchInterval = 10 * time.Millisecond
75 be := newTestHooksBackend(t, cfg)
76 tx := be.BatchTx()
77 prepareBuckenAndKey(tx)
78
79
80 waitUntil(ctx, t, func() bool { return getCommitsKey(t, be) == ">c" })
81
82 time.Sleep(time.Second)
83
84 assert.Equal(t, ">c", getCommitsKey(t, be))
85
86 write(tx, []byte("foo"), []byte("bar1"))
87
88 waitUntil(ctx, t, func() bool { return getCommitsKey(t, be) == ">cc" })
89
90 write(tx, []byte("foo"), []byte("bar1"))
91
92 waitUntil(ctx, t, func() bool { return getCommitsKey(t, be) == ">ccc" })
93 }
94
95 func waitUntil(ctx context.Context, t testing.TB, f func() bool) {
96 for !f() {
97 select {
98 case <-ctx.Done():
99 t.Fatalf("Context cancelled/timedout without condition met: %v", ctx.Err())
100 default:
101 }
102 time.Sleep(10 * time.Millisecond)
103 }
104 }
105
106 func prepareBuckenAndKey(tx backend.BatchTx) {
107 tx.Lock()
108 defer tx.Unlock()
109 tx.UnsafeCreateBucket(bucket)
110 tx.UnsafePut(bucket, key, []byte(">"))
111 }
112
113 func newTestHooksBackend(t testing.TB, baseConfig backend.BackendConfig) backend.Backend {
114 cfg := baseConfig
115 cfg.Hooks = backend.NewHooks(func(tx backend.BatchTx) {
116 k, v := tx.UnsafeRange(bucket, key, nil, 1)
117 t.Logf("OnPreCommit executed: %v %v", string(k[0]), string(v[0]))
118 assert.Len(t, k, 1)
119 assert.Len(t, v, 1)
120 tx.UnsafePut(bucket, key, append(v[0], byte('c')))
121 })
122
123 be, _ := betesting.NewTmpBackendFromCfg(t, cfg)
124 t.Cleanup(func() {
125 betesting.Close(t, be)
126 })
127 return be
128 }
129
130 func getCommitsKey(t testing.TB, be backend.Backend) string {
131 rtx := be.BatchTx()
132 rtx.Lock()
133 defer rtx.Unlock()
134 _, v := rtx.UnsafeRange(bucket, key, nil, 1)
135 assert.Len(t, v, 1)
136 return string(v[0])
137 }
138
View as plain text