1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package backend_test
16
17 import (
18 "fmt"
19 "io/ioutil"
20 "reflect"
21 "testing"
22 "time"
23
24 "github.com/stretchr/testify/assert"
25 bolt "go.etcd.io/bbolt"
26 "go.etcd.io/etcd/server/v3/mvcc/backend"
27 betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing"
28 "go.etcd.io/etcd/server/v3/mvcc/buckets"
29 )
30
31 func TestBackendClose(t *testing.T) {
32 b, _ := betesting.NewTmpBackend(t, time.Hour, 10000)
33
34
35 done := make(chan struct{})
36 go func() {
37 err := b.Close()
38 if err != nil {
39 t.Errorf("close error = %v, want nil", err)
40 }
41 done <- struct{}{}
42 }()
43 select {
44 case <-done:
45 case <-time.After(10 * time.Second):
46 t.Errorf("failed to close database in 10s")
47 }
48 }
49
50 func TestBackendSnapshot(t *testing.T) {
51 b, _ := betesting.NewTmpBackend(t, time.Hour, 10000)
52 defer betesting.Close(t, b)
53
54 tx := b.BatchTx()
55 tx.Lock()
56 tx.UnsafeCreateBucket(buckets.Test)
57 tx.UnsafePut(buckets.Test, []byte("foo"), []byte("bar"))
58 tx.Unlock()
59 b.ForceCommit()
60
61
62 f, err := ioutil.TempFile(t.TempDir(), "etcd_backend_test")
63 if err != nil {
64 t.Fatal(err)
65 }
66 snap := b.Snapshot()
67 defer func() { assert.NoError(t, snap.Close()) }()
68 if _, err := snap.WriteTo(f); err != nil {
69 t.Fatal(err)
70 }
71 assert.NoError(t, f.Close())
72
73
74 bcfg := backend.DefaultBackendConfig()
75 bcfg.Path, bcfg.BatchInterval, bcfg.BatchLimit = f.Name(), time.Hour, 10000
76 nb := backend.New(bcfg)
77 defer betesting.Close(t, nb)
78
79 newTx := nb.BatchTx()
80 newTx.Lock()
81 ks, _ := newTx.UnsafeRange(buckets.Test, []byte("foo"), []byte("goo"), 0)
82 if len(ks) != 1 {
83 t.Errorf("len(kvs) = %d, want 1", len(ks))
84 }
85 newTx.Unlock()
86 }
87
88 func TestBackendBatchIntervalCommit(t *testing.T) {
89
90
91 b, _ := betesting.NewTmpBackend(t, time.Nanosecond, 10000)
92 defer betesting.Close(t, b)
93
94 pc := backend.CommitsForTest(b)
95
96 tx := b.BatchTx()
97 tx.Lock()
98 tx.UnsafeCreateBucket(buckets.Test)
99 tx.UnsafePut(buckets.Test, []byte("foo"), []byte("bar"))
100 tx.Unlock()
101
102 for i := 0; i < 10; i++ {
103 if backend.CommitsForTest(b) >= pc+1 {
104 break
105 }
106 time.Sleep(time.Duration(i*100) * time.Millisecond)
107 }
108
109
110 assert.NoError(t, backend.DbFromBackendForTest(b).View(func(tx *bolt.Tx) error {
111 bucket := tx.Bucket([]byte("test"))
112 if bucket == nil {
113 t.Errorf("bucket test does not exit")
114 return nil
115 }
116 v := bucket.Get([]byte("foo"))
117 if v == nil {
118 t.Errorf("foo key failed to written in backend")
119 }
120 return nil
121 }))
122 }
123
124 func TestBackendDefrag(t *testing.T) {
125 bcfg := backend.DefaultBackendConfig()
126
127
128 if bcfg.BackendFreelistType == bolt.FreelistMapType {
129 bcfg.BackendFreelistType = bolt.FreelistArrayType
130 } else {
131 bcfg.BackendFreelistType = bolt.FreelistMapType
132 }
133
134 b, _ := betesting.NewTmpBackendFromCfg(t, bcfg)
135
136 defer betesting.Close(t, b)
137
138 tx := b.BatchTx()
139 tx.Lock()
140 tx.UnsafeCreateBucket(buckets.Test)
141 for i := 0; i < backend.DefragLimitForTest()+100; i++ {
142 tx.UnsafePut(buckets.Test, []byte(fmt.Sprintf("foo_%d", i)), []byte("bar"))
143 }
144 tx.Unlock()
145 b.ForceCommit()
146
147
148 tx = b.BatchTx()
149 tx.Lock()
150 for i := 0; i < 50; i++ {
151 tx.UnsafeDelete(buckets.Test, []byte(fmt.Sprintf("foo_%d", i)))
152 }
153 tx.Unlock()
154 b.ForceCommit()
155
156 size := b.Size()
157
158
159 oh, err := b.Hash(nil)
160 if err != nil {
161 t.Fatal(err)
162 }
163
164 err = b.Defrag()
165 if err != nil {
166 t.Fatal(err)
167 }
168
169 nh, err := b.Hash(nil)
170 if err != nil {
171 t.Fatal(err)
172 }
173 if oh != nh {
174 t.Errorf("hash = %v, want %v", nh, oh)
175 }
176
177 nsize := b.Size()
178 if nsize >= size {
179 t.Errorf("new size = %v, want < %d", nsize, size)
180 }
181 db := backend.DbFromBackendForTest(b)
182 if db.FreelistType != bcfg.BackendFreelistType {
183 t.Errorf("db FreelistType = [%v], want [%v]", db.FreelistType, bcfg.BackendFreelistType)
184 }
185
186
187 tx = b.BatchTx()
188 tx.Lock()
189 tx.UnsafeCreateBucket(buckets.Test)
190 tx.UnsafePut(buckets.Test, []byte("more"), []byte("bar"))
191 tx.Unlock()
192 b.ForceCommit()
193 }
194
195
196 func TestBackendWriteback(t *testing.T) {
197 b, _ := betesting.NewDefaultTmpBackend(t)
198 defer betesting.Close(t, b)
199
200 tx := b.BatchTx()
201 tx.Lock()
202 tx.UnsafeCreateBucket(buckets.Key)
203 tx.UnsafePut(buckets.Key, []byte("abc"), []byte("bar"))
204 tx.UnsafePut(buckets.Key, []byte("def"), []byte("baz"))
205 tx.UnsafePut(buckets.Key, []byte("overwrite"), []byte("1"))
206 tx.Unlock()
207
208
209 tx.Lock()
210 tx.UnsafePut(buckets.Key, []byte("overwrite"), []byte("2"))
211 tx.Unlock()
212
213 keys := []struct {
214 key []byte
215 end []byte
216 limit int64
217
218 wkey [][]byte
219 wval [][]byte
220 }{
221 {
222 key: []byte("abc"),
223 end: nil,
224
225 wkey: [][]byte{[]byte("abc")},
226 wval: [][]byte{[]byte("bar")},
227 },
228 {
229 key: []byte("abc"),
230 end: []byte("def"),
231
232 wkey: [][]byte{[]byte("abc")},
233 wval: [][]byte{[]byte("bar")},
234 },
235 {
236 key: []byte("abc"),
237 end: []byte("deg"),
238
239 wkey: [][]byte{[]byte("abc"), []byte("def")},
240 wval: [][]byte{[]byte("bar"), []byte("baz")},
241 },
242 {
243 key: []byte("abc"),
244 end: []byte("\xff"),
245 limit: 1,
246
247 wkey: [][]byte{[]byte("abc")},
248 wval: [][]byte{[]byte("bar")},
249 },
250 {
251 key: []byte("abc"),
252 end: []byte("\xff"),
253
254 wkey: [][]byte{[]byte("abc"), []byte("def"), []byte("overwrite")},
255 wval: [][]byte{[]byte("bar"), []byte("baz"), []byte("2")},
256 },
257 }
258 rtx := b.ReadTx()
259 for i, tt := range keys {
260 func() {
261 rtx.RLock()
262 defer rtx.RUnlock()
263 k, v := rtx.UnsafeRange(buckets.Key, tt.key, tt.end, tt.limit)
264 if !reflect.DeepEqual(tt.wkey, k) || !reflect.DeepEqual(tt.wval, v) {
265 t.Errorf("#%d: want k=%+v, v=%+v; got k=%+v, v=%+v", i, tt.wkey, tt.wval, k, v)
266 }
267 }()
268 }
269 }
270
271
272 func TestConcurrentReadTx(t *testing.T) {
273 b, _ := betesting.NewTmpBackend(t, time.Hour, 10000)
274 defer betesting.Close(t, b)
275
276 wtx1 := b.BatchTx()
277 wtx1.Lock()
278 wtx1.UnsafeCreateBucket(buckets.Key)
279 wtx1.UnsafePut(buckets.Key, []byte("abc"), []byte("ABC"))
280 wtx1.UnsafePut(buckets.Key, []byte("overwrite"), []byte("1"))
281 wtx1.Unlock()
282
283 wtx2 := b.BatchTx()
284 wtx2.Lock()
285 wtx2.UnsafePut(buckets.Key, []byte("def"), []byte("DEF"))
286 wtx2.UnsafePut(buckets.Key, []byte("overwrite"), []byte("2"))
287 wtx2.Unlock()
288
289 rtx := b.ConcurrentReadTx()
290 rtx.RLock()
291 k, v := rtx.UnsafeRange(buckets.Key, []byte("abc"), []byte("\xff"), 0)
292 rtx.RUnlock()
293 wKey := [][]byte{[]byte("abc"), []byte("def"), []byte("overwrite")}
294 wVal := [][]byte{[]byte("ABC"), []byte("DEF"), []byte("2")}
295 if !reflect.DeepEqual(wKey, k) || !reflect.DeepEqual(wVal, v) {
296 t.Errorf("want k=%+v, v=%+v; got k=%+v, v=%+v", wKey, wVal, k, v)
297 }
298 }
299
300
301
302 func TestBackendWritebackForEach(t *testing.T) {
303 b, _ := betesting.NewTmpBackend(t, time.Hour, 10000)
304 defer betesting.Close(t, b)
305
306 tx := b.BatchTx()
307 tx.Lock()
308 tx.UnsafeCreateBucket(buckets.Key)
309 for i := 0; i < 5; i++ {
310 k := []byte(fmt.Sprintf("%04d", i))
311 tx.UnsafePut(buckets.Key, k, []byte("bar"))
312 }
313 tx.Unlock()
314
315
316 b.ForceCommit()
317
318 tx.Lock()
319 tx.UnsafeCreateBucket(buckets.Key)
320 for i := 5; i < 20; i++ {
321 k := []byte(fmt.Sprintf("%04d", i))
322 tx.UnsafePut(buckets.Key, k, []byte("bar"))
323 }
324 tx.Unlock()
325
326 seq := ""
327 getSeq := func(k, v []byte) error {
328 seq += string(k)
329 return nil
330 }
331 rtx := b.ReadTx()
332 rtx.RLock()
333 assert.NoError(t, rtx.UnsafeForEach(buckets.Key, getSeq))
334 rtx.RUnlock()
335
336 partialSeq := seq
337
338 seq = ""
339 b.ForceCommit()
340
341 tx.Lock()
342 assert.NoError(t, tx.UnsafeForEach(buckets.Key, getSeq))
343 tx.Unlock()
344
345 if seq != partialSeq {
346 t.Fatalf("expected %q, got %q", seq, partialSeq)
347 }
348 }
349
View as plain text