1 package bbolt_test
2
3 import (
4 "bytes"
5 "fmt"
6 "math/rand"
7 "sync"
8 "sync/atomic"
9 "testing"
10
11 bolt "go.etcd.io/bbolt"
12 "go.etcd.io/bbolt/internal/btesting"
13 )
14
15 func TestSimulate_1op_1p(t *testing.T) { testSimulate(t, nil, 1, 1, 1) }
16 func TestSimulate_10op_1p(t *testing.T) { testSimulate(t, nil, 1, 10, 1) }
17 func TestSimulate_100op_1p(t *testing.T) { testSimulate(t, nil, 1, 100, 1) }
18 func TestSimulate_1000op_1p(t *testing.T) { testSimulate(t, nil, 1, 1000, 1) }
19 func TestSimulate_10000op_1p(t *testing.T) { testSimulate(t, nil, 1, 10000, 1) }
20
21 func TestSimulate_10op_10p(t *testing.T) { testSimulate(t, nil, 1, 10, 10) }
22 func TestSimulate_100op_10p(t *testing.T) { testSimulate(t, nil, 1, 100, 10) }
23 func TestSimulate_1000op_10p(t *testing.T) { testSimulate(t, nil, 1, 1000, 10) }
24 func TestSimulate_10000op_10p(t *testing.T) { testSimulate(t, nil, 1, 10000, 10) }
25
26 func TestSimulate_100op_100p(t *testing.T) { testSimulate(t, nil, 1, 100, 100) }
27 func TestSimulate_1000op_100p(t *testing.T) { testSimulate(t, nil, 1, 1000, 100) }
28 func TestSimulate_10000op_100p(t *testing.T) { testSimulate(t, nil, 1, 10000, 100) }
29
30 func TestSimulate_10000op_1000p(t *testing.T) { testSimulate(t, nil, 1, 10000, 1000) }
31
32
33 func testSimulate(t *testing.T, openOption *bolt.Options, round, threadCount, parallelism int) {
34 if testing.Short() {
35 t.Skip("skipping test in short mode.")
36 }
37
38 rand.Seed(int64(qseed))
39
40
41 var readerHandlers = []simulateHandler{simulateGetHandler}
42 var writerHandlers = []simulateHandler{simulateGetHandler, simulatePutHandler}
43
44 var versions = make(map[int]*QuickDB)
45 versions[1] = NewQuickDB()
46
47 db := btesting.MustCreateDBWithOption(t, openOption)
48
49 var mutex sync.Mutex
50
51 for n := 0; n < round; n++ {
52
53 var threads = make(chan bool, parallelism)
54 var wg sync.WaitGroup
55
56
57 var opCount int64
58
59
60 var igCount int64
61
62 var errCh = make(chan error, threadCount)
63
64 var i int
65 for {
66
67
68
69 threads <- true
70
71
72
73 wg.Add(1)
74 writable := ((rand.Int() % 100) < 20)
75
76
77 var handler simulateHandler
78 if writable {
79 handler = writerHandlers[rand.Intn(len(writerHandlers))]
80 } else {
81 handler = readerHandlers[rand.Intn(len(readerHandlers))]
82 }
83
84
85 go func(writable bool, handler simulateHandler) {
86 defer wg.Done()
87 atomic.AddInt64(&opCount, 1)
88
89 tx, err := db.Begin(writable)
90 if err != nil {
91 errCh <- fmt.Errorf("error tx begin: %v", err)
92 return
93 }
94
95
96 mutex.Lock()
97 var qdb = versions[tx.ID()]
98 if writable {
99 qdb = versions[tx.ID()-1].Copy()
100 }
101 mutex.Unlock()
102
103
104 if writable {
105 defer func() {
106 mutex.Lock()
107 versions[tx.ID()] = qdb
108 mutex.Unlock()
109
110 if err := tx.Commit(); err != nil {
111 errCh <- err
112 return
113 }
114 }()
115 } else {
116 defer func() { _ = tx.Rollback() }()
117 }
118
119
120 if qdb == nil {
121 atomic.AddInt64(&igCount, 1)
122 return
123 }
124
125
126 handler(tx, qdb)
127
128
129 <-threads
130 }(writable, handler)
131
132 i++
133 if i >= threadCount {
134 break
135 }
136 }
137
138
139 wg.Wait()
140 t.Logf("transactions:%d ignored:%d", opCount, igCount)
141 close(errCh)
142 for err := range errCh {
143 if err != nil {
144 t.Fatalf("error from inside goroutine: %v", err)
145 }
146 }
147
148 db.MustClose()
149
150
151 db.MustDeleteFile()
152 db.MustReopen()
153 }
154
155 }
156
157 type simulateHandler func(tx *bolt.Tx, qdb *QuickDB)
158
159
160 func simulateGetHandler(tx *bolt.Tx, qdb *QuickDB) {
161
162 keys := qdb.Rand()
163 if len(keys) == 0 {
164 return
165 }
166
167
168 b := tx.Bucket(keys[0])
169 if b == nil {
170 panic(fmt.Sprintf("bucket[0] expected: %08x\n", trunc(keys[0], 4)))
171 }
172
173
174 for _, key := range keys[1 : len(keys)-1] {
175 b = b.Bucket(key)
176 if b == nil {
177 panic(fmt.Sprintf("bucket[n] expected: %v -> %v\n", keys, key))
178 }
179 }
180
181
182 expected := qdb.Get(keys)
183 actual := b.Get(keys[len(keys)-1])
184 if !bytes.Equal(actual, expected) {
185 fmt.Println("=== EXPECTED ===")
186 fmt.Println(expected)
187 fmt.Println("=== ACTUAL ===")
188 fmt.Println(actual)
189 fmt.Println("=== END ===")
190 panic("value mismatch")
191 }
192 }
193
194
195 func simulatePutHandler(tx *bolt.Tx, qdb *QuickDB) {
196 var err error
197 keys, value := randKeys(), randValue()
198
199
200 b := tx.Bucket(keys[0])
201 if b == nil {
202 b, err = tx.CreateBucket(keys[0])
203 if err != nil {
204 panic("create bucket: " + err.Error())
205 }
206 }
207
208
209 for _, key := range keys[1 : len(keys)-1] {
210 child := b.Bucket(key)
211 if child != nil {
212 b = child
213 } else {
214 b, err = b.CreateBucket(key)
215 if err != nil {
216 panic("create bucket: " + err.Error())
217 }
218 }
219 }
220
221
222 if err := b.Put(keys[len(keys)-1], value); err != nil {
223 panic("put: " + err.Error())
224 }
225
226
227 qdb.Put(keys, value)
228 }
229
230
231
232
233 type QuickDB struct {
234 sync.RWMutex
235 m map[string]interface{}
236 }
237
238
239 func NewQuickDB() *QuickDB {
240 return &QuickDB{m: make(map[string]interface{})}
241 }
242
243
244 func (db *QuickDB) Get(keys [][]byte) []byte {
245 db.RLock()
246 defer db.RUnlock()
247
248 m := db.m
249 for _, key := range keys[:len(keys)-1] {
250 value := m[string(key)]
251 if value == nil {
252 return nil
253 }
254 switch value := value.(type) {
255 case map[string]interface{}:
256 m = value
257 case []byte:
258 return nil
259 }
260 }
261
262
263 if value, ok := m[string(keys[len(keys)-1])].([]byte); ok {
264 return value
265 }
266 return nil
267 }
268
269
270 func (db *QuickDB) Put(keys [][]byte, value []byte) {
271 db.Lock()
272 defer db.Unlock()
273
274
275 m := db.m
276 for _, key := range keys[:len(keys)-1] {
277 if _, ok := m[string(key)].([]byte); ok {
278 return
279 }
280
281 if m[string(key)] == nil {
282 m[string(key)] = make(map[string]interface{})
283 }
284 m = m[string(key)].(map[string]interface{})
285 }
286
287
288 m[string(keys[len(keys)-1])] = value
289 }
290
291
292 func (db *QuickDB) Rand() [][]byte {
293 db.RLock()
294 defer db.RUnlock()
295 if len(db.m) == 0 {
296 return nil
297 }
298 var keys [][]byte
299 db.rand(db.m, &keys)
300 return keys
301 }
302
303 func (db *QuickDB) rand(m map[string]interface{}, keys *[][]byte) {
304 i, index := 0, rand.Intn(len(m))
305 for k, v := range m {
306 if i == index {
307 *keys = append(*keys, []byte(k))
308 if v, ok := v.(map[string]interface{}); ok {
309 db.rand(v, keys)
310 }
311 return
312 }
313 i++
314 }
315 panic("quickdb rand: out-of-range")
316 }
317
318
319 func (db *QuickDB) Copy() *QuickDB {
320 db.RLock()
321 defer db.RUnlock()
322 return &QuickDB{m: db.copy(db.m)}
323 }
324
325 func (db *QuickDB) copy(m map[string]interface{}) map[string]interface{} {
326 clone := make(map[string]interface{}, len(m))
327 for k, v := range m {
328 switch v := v.(type) {
329 case map[string]interface{}:
330 clone[k] = db.copy(v)
331 default:
332 clone[k] = v
333 }
334 }
335 return clone
336 }
337
338 func randKey() []byte {
339 var min, max = 1, 1024
340 n := rand.Intn(max-min) + min
341 b := make([]byte, n)
342 for i := 0; i < n; i++ {
343 b[i] = byte(rand.Intn(255))
344 }
345 return b
346 }
347
348 func randKeys() [][]byte {
349 var keys [][]byte
350 var count = rand.Intn(2) + 2
351 for i := 0; i < count; i++ {
352 keys = append(keys, randKey())
353 }
354 return keys
355 }
356
357 func randValue() []byte {
358 n := rand.Intn(8192)
359 b := make([]byte, n)
360 for i := 0; i < n; i++ {
361 b[i] = byte(rand.Intn(255))
362 }
363 return b
364 }
365
View as plain text