1 package bbolt_test
2
3 import (
4 "bytes"
5 crand "crypto/rand"
6 "encoding/hex"
7 "encoding/json"
8 "fmt"
9 "io"
10 mrand "math/rand"
11 "os"
12 "path/filepath"
13 "sort"
14 "strings"
15 "sync"
16 "testing"
17 "time"
18 "unicode/utf8"
19
20 "github.com/stretchr/testify/require"
21 "golang.org/x/sync/errgroup"
22
23 bolt "go.etcd.io/bbolt"
24 )
25
26 const (
27 bucketPrefix = "bucket"
28 keyPrefix = "key"
29 noopTxKey = "%magic-no-op-key%"
30
31
32
33 testConcurrentCaseDuration = "TEST_CONCURRENT_CASE_DURATION"
34 defaultConcurrentTestDuration = 30 * time.Second
35 )
36
37 type duration struct {
38 min time.Duration
39 max time.Duration
40 }
41
42 type bytesRange struct {
43 min int
44 max int
45 }
46
47 type operationChance struct {
48 operation OperationType
49 chance int
50 }
51
52 type concurrentConfig struct {
53 bucketCount int
54 keyCount int
55 workInterval duration
56 operationRatio []operationChance
57 readInterval duration
58 noopWriteRatio int
59 writeBytes bytesRange
60 }
61
62
70 func TestConcurrentGenericReadAndWrite(t *testing.T) {
71 if testing.Short() {
72 t.Skip("skipping test in short mode.")
73 }
74
75 testDuration := concurrentTestDuration(t)
76 conf := concurrentConfig{
77 bucketCount: 5,
78 keyCount: 10000,
79 workInterval: duration{},
80 operationRatio: []operationChance{
81 {operation: Read, chance: 60},
82 {operation: Write, chance: 20},
83 {operation: Delete, chance: 20},
84 },
85 readInterval: duration{
86 min: 50 * time.Millisecond,
87 max: 100 * time.Millisecond,
88 },
89 noopWriteRatio: 20,
90 writeBytes: bytesRange{
91 min: 200,
92 max: 16000,
93 },
94 }
95
96 testCases := []struct {
97 name string
98 workerCount int
99 conf concurrentConfig
100 testDuration time.Duration
101 }{
102 {
103 name: "1 worker",
104 workerCount: 1,
105 conf: conf,
106 testDuration: testDuration,
107 },
108 {
109 name: "10 workers",
110 workerCount: 10,
111 conf: conf,
112 testDuration: testDuration,
113 },
114 {
115 name: "50 workers",
116 workerCount: 50,
117 conf: conf,
118 testDuration: testDuration,
119 },
120 {
121 name: "100 workers",
122 workerCount: 100,
123 conf: conf,
124 testDuration: testDuration,
125 },
126 {
127 name: "200 workers",
128 workerCount: 200,
129 conf: conf,
130 testDuration: testDuration,
131 },
132 }
133
134 for _, tc := range testCases {
135 tc := tc
136 t.Run(tc.name, func(t *testing.T) {
137 concurrentReadAndWrite(t,
138 tc.workerCount,
139 tc.conf,
140 tc.testDuration)
141 })
142 }
143 }
144
145 func concurrentTestDuration(t *testing.T) time.Duration {
146 durationInEnv := strings.ToLower(os.Getenv(testConcurrentCaseDuration))
147 if durationInEnv == "" {
148 t.Logf("%q not set, defaults to %s", testConcurrentCaseDuration, defaultConcurrentTestDuration)
149 return defaultConcurrentTestDuration
150 }
151
152 d, err := time.ParseDuration(durationInEnv)
153 if err != nil {
154 t.Logf("Failed to parse %s=%s, error: %v, defaults to %s", testConcurrentCaseDuration, durationInEnv, err, defaultConcurrentTestDuration)
155 return defaultConcurrentTestDuration
156 }
157
158 t.Logf("Concurrent test duration set by %s=%s", testConcurrentCaseDuration, d)
159 return d
160 }
161
162 func concurrentReadAndWrite(t *testing.T,
163 workerCount int,
164 conf concurrentConfig,
165 testDuration time.Duration) {
166
167 t.Log("Preparing db.")
168 db := mustCreateDB(t, nil)
169 defer db.Close()
170 err := db.Update(func(tx *bolt.Tx) error {
171 for i := 0; i < conf.bucketCount; i++ {
172 if _, err := tx.CreateBucketIfNotExists(bucketName(i)); err != nil {
173 return err
174 }
175 }
176 return nil
177 })
178 require.NoError(t, err)
179
180 var records historyRecords
181
182
183
184 panicked := true
185 defer func() {
186 t.Log("Save data if failed.")
187 saveDataIfFailed(t, db, records, panicked)
188 }()
189
190 t.Log("Starting workers.")
191 records = runWorkers(t,
192 db,
193 workerCount,
194 conf,
195 testDuration)
196
197 t.Log("Analyzing the history records.")
198 if err := validateSequential(records); err != nil {
199 t.Errorf("The history records are not sequential:\n %v", err)
200 }
201
202 t.Log("Checking database consistency.")
203 if err := checkConsistency(t, db); err != nil {
204 t.Errorf("The data isn't consistency: %v", err)
205 }
206
207 panicked = false
208
209
210 }
211
212
213
214
215
216 func mustCreateDB(t *testing.T, o *bolt.Options) *bolt.DB {
217 f := filepath.Join(t.TempDir(), "db")
218
219 return mustOpenDB(t, f, o)
220 }
221
222 func mustReOpenDB(t *testing.T, db *bolt.DB, o *bolt.Options) *bolt.DB {
223 f := db.Path()
224
225 t.Logf("Closing bbolt DB at: %s", f)
226 err := db.Close()
227 require.NoError(t, err)
228
229 return mustOpenDB(t, f, o)
230 }
231
232 func mustOpenDB(t *testing.T, dbPath string, o *bolt.Options) *bolt.DB {
233 t.Logf("Opening bbolt DB at: %s", dbPath)
234 if o == nil {
235 o = bolt.DefaultOptions
236 }
237
238 freelistType := bolt.FreelistArrayType
239 if env := os.Getenv("TEST_FREELIST_TYPE"); env == string(bolt.FreelistMapType) {
240 freelistType = bolt.FreelistMapType
241 }
242
243 o.FreelistType = freelistType
244
245 db, err := bolt.Open(dbPath, 0600, o)
246 require.NoError(t, err)
247
248 return db
249 }
250
251 func checkConsistency(t *testing.T, db *bolt.DB) error {
252 return db.View(func(tx *bolt.Tx) error {
253 cnt := 0
254 for err := range tx.Check() {
255 t.Errorf("Consistency error: %v", err)
256 cnt++
257 }
258 if cnt > 0 {
259 return fmt.Errorf("%d consistency errors found", cnt)
260 }
261 return nil
262 })
263 }
264
265
272 func runWorkers(t *testing.T,
273 db *bolt.DB,
274 workerCount int,
275 conf concurrentConfig,
276 testDuration time.Duration) historyRecords {
277 stopCh := make(chan struct{}, 1)
278 errCh := make(chan error, workerCount)
279
280 var mu sync.Mutex
281 var rs historyRecords
282
283 g := new(errgroup.Group)
284 for i := 0; i < workerCount; i++ {
285 w := &worker{
286 id: i,
287 db: db,
288
289 conf: conf,
290
291 errCh: errCh,
292 stopCh: stopCh,
293 t: t,
294 }
295 g.Go(func() error {
296 wrs, err := runWorker(t, w, errCh)
297 mu.Lock()
298 rs = append(rs, wrs...)
299 mu.Unlock()
300 return err
301 })
302 }
303
304 t.Logf("Keep all workers running for about %s.", testDuration)
305 select {
306 case <-time.After(testDuration):
307 case <-errCh:
308 }
309
310 close(stopCh)
311 t.Log("Waiting for all workers to finish.")
312 if err := g.Wait(); err != nil {
313 t.Errorf("Received error: %v", err)
314 }
315
316 return rs
317 }
318
319 func runWorker(t *testing.T, w *worker, errCh chan error) (historyRecords, error) {
320 rs, err := w.run()
321 if len(rs) > 0 && err == nil {
322 if terr := validateIncrementalTxid(rs); terr != nil {
323 txidErr := fmt.Errorf("[%s]: %w", w.name(), terr)
324 t.Error(txidErr)
325 errCh <- txidErr
326 return rs, txidErr
327 }
328 }
329 return rs, err
330 }
331
332 type worker struct {
333 id int
334 db *bolt.DB
335
336 conf concurrentConfig
337
338 errCh chan error
339 stopCh chan struct{}
340
341 t *testing.T
342 }
343
344 func (w *worker) name() string {
345 return fmt.Sprintf("worker-%d", w.id)
346 }
347
348 func (w *worker) run() (historyRecords, error) {
349 var rs historyRecords
350 for {
351 select {
352 case <-w.stopCh:
353 w.t.Logf("%q finished.", w.name())
354 return rs, nil
355 default:
356 }
357
358 op := w.pickOperation()
359 bucket, key := w.pickBucket(), w.pickKey()
360 rec, err := executeOperation(op, w.db, bucket, key, w.conf)
361 if err != nil {
362 readErr := fmt.Errorf("[%s: %s]: %w", w.name(), op, err)
363 w.t.Error(readErr)
364 w.errCh <- readErr
365 return rs, readErr
366 }
367
368 rs = append(rs, rec)
369 if w.conf.workInterval != (duration{}) {
370 time.Sleep(randomDurationInRange(w.conf.workInterval.min, w.conf.workInterval.max))
371 }
372 }
373 }
374
375 func (w *worker) pickBucket() []byte {
376 return bucketName(mrand.Intn(w.conf.bucketCount))
377 }
378
379 func bucketName(index int) []byte {
380 bucket := fmt.Sprintf("%s_%d", bucketPrefix, index)
381 return []byte(bucket)
382 }
383
384 func (w *worker) pickKey() []byte {
385 key := fmt.Sprintf("%s_%d", keyPrefix, mrand.Intn(w.conf.keyCount))
386 return []byte(key)
387 }
388
389 func (w *worker) pickOperation() OperationType {
390 sum := 0
391 for _, op := range w.conf.operationRatio {
392 sum += op.chance
393 }
394 roll := mrand.Int() % sum
395 for _, op := range w.conf.operationRatio {
396 if roll < op.chance {
397 return op.operation
398 }
399 roll -= op.chance
400 }
401 panic("unexpected")
402 }
403
404 func executeOperation(op OperationType, db *bolt.DB, bucket []byte, key []byte, conf concurrentConfig) (historyRecord, error) {
405 switch op {
406 case Read:
407 return executeRead(db, bucket, key, conf.readInterval)
408 case Write:
409 return executeWrite(db, bucket, key, conf.writeBytes, conf.noopWriteRatio)
410 case Delete:
411 return executeDelete(db, bucket, key)
412 default:
413 panic(fmt.Sprintf("unexpected operation type: %s", op))
414 }
415 }
416
417 func executeRead(db *bolt.DB, bucket []byte, key []byte, readInterval duration) (historyRecord, error) {
418 var rec historyRecord
419 err := db.View(func(tx *bolt.Tx) error {
420 b := tx.Bucket(bucket)
421
422 initialVal := b.Get(key)
423 time.Sleep(randomDurationInRange(readInterval.min, readInterval.max))
424 val := b.Get(key)
425
426 if !bytes.Equal(initialVal, val) {
427 return fmt.Errorf("read different values for the same key (%q), value1: %q, value2: %q",
428 string(key), formatBytes(initialVal), formatBytes(val))
429 }
430
431 clonedVal := make([]byte, len(val))
432 copy(clonedVal, val)
433
434 rec = historyRecord{
435 OperationType: Read,
436 Bucket: string(bucket),
437 Key: string(key),
438 Value: clonedVal,
439 Txid: tx.ID(),
440 }
441
442 return nil
443 })
444
445 return rec, err
446 }
447
448 func executeWrite(db *bolt.DB, bucket []byte, key []byte, writeBytes bytesRange, noopWriteRatio int) (historyRecord, error) {
449 var rec historyRecord
450
451 err := db.Update(func(tx *bolt.Tx) error {
452 if mrand.Intn(100) < noopWriteRatio {
453
454
455
456 rec = historyRecord{
457 OperationType: Write,
458 Bucket: string(bucket),
459 Key: noopTxKey,
460 Value: nil,
461 Txid: tx.ID(),
462 }
463 return nil
464 }
465
466 b := tx.Bucket(bucket)
467
468 valueBytes := randomIntInRange(writeBytes.min, writeBytes.max)
469 v := make([]byte, valueBytes)
470 if _, cErr := crand.Read(v); cErr != nil {
471 return cErr
472 }
473
474 putErr := b.Put(key, v)
475 if putErr == nil {
476 rec = historyRecord{
477 OperationType: Write,
478 Bucket: string(bucket),
479 Key: string(key),
480 Value: v,
481 Txid: tx.ID(),
482 }
483 }
484
485 return putErr
486 })
487
488 return rec, err
489 }
490
491 func executeDelete(db *bolt.DB, bucket []byte, key []byte) (historyRecord, error) {
492 var rec historyRecord
493
494 err := db.Update(func(tx *bolt.Tx) error {
495 b := tx.Bucket(bucket)
496
497 deleteErr := b.Delete(key)
498 if deleteErr == nil {
499 rec = historyRecord{
500 OperationType: Delete,
501 Bucket: string(bucket),
502 Key: string(key),
503 Txid: tx.ID(),
504 }
505 }
506
507 return deleteErr
508 })
509
510 return rec, err
511 }
512
513 func randomDurationInRange(min, max time.Duration) time.Duration {
514 d := int64(max) - int64(min)
515 d = int64(mrand.Intn(int(d))) + int64(min)
516 return time.Duration(d)
517 }
518
519 func randomIntInRange(min, max int) int {
520 return mrand.Intn(max-min) + min
521 }
522
523 func formatBytes(val []byte) string {
524 if utf8.ValidString(string(val)) {
525 return string(val)
526 }
527
528 return hex.EncodeToString(val)
529 }
530
531
537 func saveDataIfFailed(t *testing.T, db *bolt.DB, rs historyRecords, force bool) {
538 if t.Failed() || force {
539 t.Log("Saving data...")
540 dbPath := db.Path()
541 if err := db.Close(); err != nil {
542 t.Errorf("Failed to close db: %v", err)
543 }
544 backupPath := testResultsDirectory(t)
545 backupDB(t, dbPath, backupPath)
546 persistHistoryRecords(t, rs, backupPath)
547 }
548 }
549
550 func backupDB(t *testing.T, srcPath string, dstPath string) {
551 targetFile := filepath.Join(dstPath, "db.bak")
552 t.Logf("Saving the DB file to %s", targetFile)
553 err := copyFile(srcPath, targetFile)
554 require.NoError(t, err)
555 t.Logf("DB file saved to %s", targetFile)
556 }
557
558 func copyFile(srcPath, dstPath string) error {
559
560 _, err := os.Stat(srcPath)
561 if os.IsNotExist(err) {
562 return fmt.Errorf("source file %q not found", srcPath)
563 } else if err != nil {
564 return err
565 }
566
567
568 _, err = os.Stat(dstPath)
569 if err == nil {
570 return fmt.Errorf("output file %q already exists", dstPath)
571 } else if !os.IsNotExist(err) {
572 return err
573 }
574
575 srcDB, err := os.Open(srcPath)
576 if err != nil {
577 return fmt.Errorf("failed to open source file %q: %w", srcPath, err)
578 }
579 defer srcDB.Close()
580 dstDB, err := os.Create(dstPath)
581 if err != nil {
582 return fmt.Errorf("failed to create output file %q: %w", dstPath, err)
583 }
584 defer dstDB.Close()
585 written, err := io.Copy(dstDB, srcDB)
586 if err != nil {
587 return fmt.Errorf("failed to copy database file from %q to %q: %w", srcPath, dstPath, err)
588 }
589
590 srcFi, err := srcDB.Stat()
591 if err != nil {
592 return fmt.Errorf("failed to get source file info %q: %w", srcPath, err)
593 }
594 initialSize := srcFi.Size()
595 if initialSize != written {
596 return fmt.Errorf("the byte copied (%q: %d) isn't equal to the initial db size (%q: %d)", dstPath, written, srcPath, initialSize)
597 }
598
599 return nil
600 }
601
602 func persistHistoryRecords(t *testing.T, rs historyRecords, path string) {
603 recordFilePath := filepath.Join(path, "history_records.json")
604 t.Logf("Saving history records to %s", recordFilePath)
605 recordFile, err := os.OpenFile(recordFilePath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0755)
606 require.NoError(t, err)
607 defer recordFile.Close()
608 encoder := json.NewEncoder(recordFile)
609 for _, rec := range rs {
610 err := encoder.Encode(rec)
611 require.NoError(t, err)
612 }
613 }
614
615 func testResultsDirectory(t *testing.T) string {
616 resultsDirectory, ok := os.LookupEnv("RESULTS_DIR")
617 var err error
618 if !ok {
619 resultsDirectory, err = os.MkdirTemp("", "*.db")
620 require.NoError(t, err)
621 }
622 resultsDirectory, err = filepath.Abs(resultsDirectory)
623 require.NoError(t, err)
624
625 path, err := filepath.Abs(filepath.Join(resultsDirectory, strings.ReplaceAll(t.Name(), "/", "_")))
626 require.NoError(t, err)
627
628 err = os.RemoveAll(path)
629 require.NoError(t, err)
630
631 err = os.MkdirAll(path, 0700)
632 require.NoError(t, err)
633
634 return path
635 }
636
637
642 type OperationType string
643
644 const (
645 Read OperationType = "read"
646 Write OperationType = "write"
647 Delete OperationType = "delete"
648 )
649
650 type historyRecord struct {
651 OperationType OperationType `json:"operationType,omitempty"`
652 Txid int `json:"txid,omitempty"`
653 Bucket string `json:"bucket,omitempty"`
654 Key string `json:"key,omitempty"`
655 Value []byte `json:"value,omitempty"`
656 }
657
658 type historyRecords []historyRecord
659
660 func (rs historyRecords) Len() int {
661 return len(rs)
662 }
663
664 func (rs historyRecords) Less(i, j int) bool {
665
666
667 bucketCmp := strings.Compare(rs[i].Bucket, rs[j].Bucket)
668 if bucketCmp != 0 {
669 return bucketCmp < 0
670 }
671 keyCmp := strings.Compare(rs[i].Key, rs[j].Key)
672 if keyCmp != 0 {
673 return keyCmp < 0
674 }
675
676
677 if rs[i].Txid != rs[j].Txid {
678 return rs[i].Txid < rs[j].Txid
679 }
680
681
682
683 if rs[i].OperationType == Read {
684 return false
685 }
686
687 return true
688 }
689
690 func (rs historyRecords) Swap(i, j int) {
691 rs[i], rs[j] = rs[j], rs[i]
692 }
693
694 func validateIncrementalTxid(rs historyRecords) error {
695 lastTxid := rs[0].Txid
696
697 for i := 1; i < len(rs); i++ {
698 if (rs[i].OperationType == Read && rs[i].Txid < lastTxid) || (rs[i].OperationType != Read && rs[i].Txid <= lastTxid) {
699 return fmt.Errorf("detected non-incremental txid(%d, %d) in %s mode", lastTxid, rs[i].Txid, rs[i].OperationType)
700 }
701 lastTxid = rs[i].Txid
702 }
703
704 return nil
705 }
706
707 func validateSequential(rs historyRecords) error {
708 sort.Sort(rs)
709
710 type bucketAndKey struct {
711 bucket string
712 key string
713 }
714 lastWriteKeyValueMap := make(map[bucketAndKey]*historyRecord)
715
716 for _, rec := range rs {
717 bk := bucketAndKey{
718 bucket: rec.Bucket,
719 key: rec.Key,
720 }
721 if v, ok := lastWriteKeyValueMap[bk]; ok {
722 if rec.OperationType == Write {
723 v.Txid = rec.Txid
724 if rec.Key != noopTxKey {
725 v.Value = rec.Value
726 }
727 } else if rec.OperationType == Delete {
728 delete(lastWriteKeyValueMap, bk)
729 } else {
730 if !bytes.Equal(v.Value, rec.Value) {
731 return fmt.Errorf("readOperation[txid: %d, bucket: %s, key: %s] read %x, \nbut writer[txid: %d] wrote %x",
732 rec.Txid, rec.Bucket, rec.Key, rec.Value, v.Txid, v.Value)
733 }
734 }
735 } else {
736 if rec.OperationType == Write && rec.Key != noopTxKey {
737 lastWriteKeyValueMap[bk] = &historyRecord{
738 OperationType: Write,
739 Bucket: rec.Bucket,
740 Key: rec.Key,
741 Value: rec.Value,
742 Txid: rec.Txid,
743 }
744 } else if rec.OperationType == Read {
745 if len(rec.Value) != 0 {
746 return fmt.Errorf("expected the first readOperation[txid: %d, bucket: %s, key: %s] read nil, \nbut got %x",
747 rec.Txid, rec.Bucket, rec.Key, rec.Value)
748 }
749 }
750 }
751 }
752
753 return nil
754 }
755
756
765 func TestConcurrentRepeatableRead(t *testing.T) {
766 if testing.Short() {
767 t.Skip("skipping test in short mode.")
768 }
769
770 testCases := []struct {
771 name string
772 noFreelistSync bool
773 freelistType bolt.FreelistType
774 }{
775
776 {
777 name: "sync array freelist",
778 noFreelistSync: false,
779 freelistType: bolt.FreelistArrayType,
780 },
781 {
782 name: "not sync array freelist",
783 noFreelistSync: true,
784 freelistType: bolt.FreelistArrayType,
785 },
786
787 {
788 name: "sync map freelist",
789 noFreelistSync: false,
790 freelistType: bolt.FreelistMapType,
791 },
792 {
793 name: "not sync map freelist",
794 noFreelistSync: true,
795 freelistType: bolt.FreelistMapType,
796 },
797 }
798
799 for _, tc := range testCases {
800 tc := tc
801 t.Run(tc.name, func(t *testing.T) {
802
803 t.Log("Preparing db.")
804 var (
805 bucket = []byte("data")
806 key = []byte("mykey")
807
808 option = &bolt.Options{
809 PageSize: 4096,
810 NoFreelistSync: tc.noFreelistSync,
811 FreelistType: tc.freelistType,
812 }
813 )
814
815 db := mustCreateDB(t, option)
816 defer func() {
817
818
819
820
821
822 db.Close()
823 }()
824
825
826 err := db.Update(func(tx *bolt.Tx) error {
827 b, err := tx.CreateBucketIfNotExists(bucket)
828 if err != nil {
829 return err
830 }
831 for i := 0; i < 1000; i++ {
832 k := fmt.Sprintf("key_%d", i)
833 if err := b.Put([]byte(k), make([]byte, 1024)); err != nil {
834 return err
835 }
836 }
837 return nil
838 })
839 require.NoError(t, err)
840
841
842 err = db.Update(func(tx *bolt.Tx) error {
843 b := tx.Bucket(bucket)
844 for i := 0; i < 1000; i++ {
845 k := fmt.Sprintf("key_%d", i)
846 if err := b.Delete([]byte(k)); err != nil {
847 return err
848 }
849 }
850 return b.Put(key, []byte("randomValue"))
851 })
852 require.NoError(t, err)
853
854
855
856
857
858
859 db = mustReOpenDB(t, db, option)
860
861 var (
862 wg sync.WaitGroup
863 longRunningReaderCount = 10
864 stopCh = make(chan struct{})
865 errCh = make(chan error, longRunningReaderCount)
866 readInterval = duration{5 * time.Millisecond, 10 * time.Millisecond}
867
868 writeOperationCountInBetween = 5
869 writeBytes = bytesRange{10, 20}
870
871 testDuration = 10 * time.Second
872 )
873
874 for i := 0; i < longRunningReaderCount; i++ {
875 readWorkerName := fmt.Sprintf("reader_%d", i)
876 t.Logf("Starting long running read operation: %s", readWorkerName)
877 wg.Add(1)
878 go func() {
879 defer wg.Done()
880 rErr := executeLongRunningRead(t, readWorkerName, db, bucket, key, readInterval, stopCh)
881 if rErr != nil {
882 errCh <- rErr
883 }
884 }()
885 time.Sleep(500 * time.Millisecond)
886
887 t.Logf("Perform %d write operations after starting a long running read operation", writeOperationCountInBetween)
888 for j := 0; j < writeOperationCountInBetween; j++ {
889 _, err := executeWrite(db, bucket, key, writeBytes, 0)
890 require.NoError(t, err)
891 }
892 }
893
894 t.Log("Perform lots of write operations to check whether the long running read operations will read dirty data")
895 wg.Add(1)
896 go func() {
897 defer wg.Done()
898 cnt := longRunningReaderCount * writeOperationCountInBetween
899 for i := 0; i < cnt; i++ {
900 select {
901 case <-stopCh:
902 return
903 default:
904 }
905 _, err := executeWrite(db, bucket, key, writeBytes, 0)
906 require.NoError(t, err)
907 }
908 }()
909
910 t.Log("Waiting for result")
911 select {
912 case err := <-errCh:
913 close(stopCh)
914 t.Errorf("Detected dirty read: %v", err)
915 case <-time.After(testDuration):
916 close(stopCh)
917 }
918
919 wg.Wait()
920 })
921 }
922 }
923
924 func executeLongRunningRead(t *testing.T, name string, db *bolt.DB, bucket []byte, key []byte, readInterval duration, stopCh chan struct{}) error {
925 err := db.View(func(tx *bolt.Tx) error {
926 b := tx.Bucket(bucket)
927
928 initialVal := b.Get(key)
929
930 for {
931 select {
932 case <-stopCh:
933 t.Logf("%q finished.", name)
934 return nil
935 default:
936 }
937
938 time.Sleep(randomDurationInRange(readInterval.min, readInterval.max))
939 val := b.Get(key)
940
941 if !bytes.Equal(initialVal, val) {
942 dirtyReadErr := fmt.Errorf("read different values for the same key (%q), value1: %q, value2: %q",
943 string(key), formatBytes(initialVal), formatBytes(val))
944 return dirtyReadErr
945 }
946 }
947 })
948
949 return err
950 }
951
View as plain text