...

Source file src/go.etcd.io/bbolt/concurrent_test.go

Documentation: go.etcd.io/bbolt

     1  package bbolt_test
     2  
     3  import (
     4  	"bytes"
     5  	crand "crypto/rand"
     6  	"encoding/hex"
     7  	"encoding/json"
     8  	"fmt"
     9  	"io"
    10  	mrand "math/rand"
    11  	"os"
    12  	"path/filepath"
    13  	"sort"
    14  	"strings"
    15  	"sync"
    16  	"testing"
    17  	"time"
    18  	"unicode/utf8"
    19  
    20  	"github.com/stretchr/testify/require"
    21  	"golang.org/x/sync/errgroup"
    22  
    23  	bolt "go.etcd.io/bbolt"
    24  )
    25  
    26  const (
    27  	bucketPrefix = "bucket"
    28  	keyPrefix    = "key"
    29  	noopTxKey    = "%magic-no-op-key%"
    30  
    31  	// TestConcurrentCaseDuration is used as a env variable to specify the
    32  	// concurrent test duration.
    33  	testConcurrentCaseDuration    = "TEST_CONCURRENT_CASE_DURATION"
    34  	defaultConcurrentTestDuration = 30 * time.Second
    35  )
    36  
    37  type duration struct {
    38  	min time.Duration
    39  	max time.Duration
    40  }
    41  
    42  type bytesRange struct {
    43  	min int
    44  	max int
    45  }
    46  
    47  type operationChance struct {
    48  	operation OperationType
    49  	chance    int
    50  }
    51  
    52  type concurrentConfig struct {
    53  	bucketCount    int
    54  	keyCount       int
    55  	workInterval   duration
    56  	operationRatio []operationChance
    57  	readInterval   duration   // only used by readOperation
    58  	noopWriteRatio int        // only used by writeOperation
    59  	writeBytes     bytesRange // only used by writeOperation
    60  }
    61  
    62  /*
    63  TestConcurrentGenericReadAndWrite verifies:
    64   1. Repeatable read: a read transaction should always see the same data
    65      view during its lifecycle.
    66   2. Any data written by a writing transaction should be visible to any
    67      following reading transactions (with txid >= previous writing txid).
    68   3. The txid should never decrease.
    69  */
    70  func TestConcurrentGenericReadAndWrite(t *testing.T) {
    71  	if testing.Short() {
    72  		t.Skip("skipping test in short mode.")
    73  	}
    74  
    75  	testDuration := concurrentTestDuration(t)
    76  	conf := concurrentConfig{
    77  		bucketCount:  5,
    78  		keyCount:     10000,
    79  		workInterval: duration{},
    80  		operationRatio: []operationChance{
    81  			{operation: Read, chance: 60},
    82  			{operation: Write, chance: 20},
    83  			{operation: Delete, chance: 20},
    84  		},
    85  		readInterval: duration{
    86  			min: 50 * time.Millisecond,
    87  			max: 100 * time.Millisecond,
    88  		},
    89  		noopWriteRatio: 20,
    90  		writeBytes: bytesRange{
    91  			min: 200,
    92  			max: 16000,
    93  		},
    94  	}
    95  
    96  	testCases := []struct {
    97  		name         string
    98  		workerCount  int
    99  		conf         concurrentConfig
   100  		testDuration time.Duration
   101  	}{
   102  		{
   103  			name:         "1 worker",
   104  			workerCount:  1,
   105  			conf:         conf,
   106  			testDuration: testDuration,
   107  		},
   108  		{
   109  			name:         "10 workers",
   110  			workerCount:  10,
   111  			conf:         conf,
   112  			testDuration: testDuration,
   113  		},
   114  		{
   115  			name:         "50 workers",
   116  			workerCount:  50,
   117  			conf:         conf,
   118  			testDuration: testDuration,
   119  		},
   120  		{
   121  			name:         "100 workers",
   122  			workerCount:  100,
   123  			conf:         conf,
   124  			testDuration: testDuration,
   125  		},
   126  		{
   127  			name:         "200 workers",
   128  			workerCount:  200,
   129  			conf:         conf,
   130  			testDuration: testDuration,
   131  		},
   132  	}
   133  
   134  	for _, tc := range testCases {
   135  		tc := tc
   136  		t.Run(tc.name, func(t *testing.T) {
   137  			concurrentReadAndWrite(t,
   138  				tc.workerCount,
   139  				tc.conf,
   140  				tc.testDuration)
   141  		})
   142  	}
   143  }
   144  
   145  func concurrentTestDuration(t *testing.T) time.Duration {
   146  	durationInEnv := strings.ToLower(os.Getenv(testConcurrentCaseDuration))
   147  	if durationInEnv == "" {
   148  		t.Logf("%q not set, defaults to %s", testConcurrentCaseDuration, defaultConcurrentTestDuration)
   149  		return defaultConcurrentTestDuration
   150  	}
   151  
   152  	d, err := time.ParseDuration(durationInEnv)
   153  	if err != nil {
   154  		t.Logf("Failed to parse %s=%s, error: %v, defaults to %s", testConcurrentCaseDuration, durationInEnv, err, defaultConcurrentTestDuration)
   155  		return defaultConcurrentTestDuration
   156  	}
   157  
   158  	t.Logf("Concurrent test duration set by %s=%s", testConcurrentCaseDuration, d)
   159  	return d
   160  }
   161  
   162  func concurrentReadAndWrite(t *testing.T,
   163  	workerCount int,
   164  	conf concurrentConfig,
   165  	testDuration time.Duration) {
   166  
   167  	t.Log("Preparing db.")
   168  	db := mustCreateDB(t, nil)
   169  	defer db.Close()
   170  	err := db.Update(func(tx *bolt.Tx) error {
   171  		for i := 0; i < conf.bucketCount; i++ {
   172  			if _, err := tx.CreateBucketIfNotExists(bucketName(i)); err != nil {
   173  				return err
   174  			}
   175  		}
   176  		return nil
   177  	})
   178  	require.NoError(t, err)
   179  
   180  	var records historyRecords
   181  	// t.Failed() returns false during panicking. We need to forcibly
   182  	// save data on panicking.
   183  	// Refer to: https://github.com/golang/go/issues/49929
   184  	panicked := true
   185  	defer func() {
   186  		t.Log("Save data if failed.")
   187  		saveDataIfFailed(t, db, records, panicked)
   188  	}()
   189  
   190  	t.Log("Starting workers.")
   191  	records = runWorkers(t,
   192  		db,
   193  		workerCount,
   194  		conf,
   195  		testDuration)
   196  
   197  	t.Log("Analyzing the history records.")
   198  	if err := validateSequential(records); err != nil {
   199  		t.Errorf("The history records are not sequential:\n %v", err)
   200  	}
   201  
   202  	t.Log("Checking database consistency.")
   203  	if err := checkConsistency(t, db); err != nil {
   204  		t.Errorf("The data isn't consistency: %v", err)
   205  	}
   206  
   207  	panicked = false
   208  	// TODO (ahrtr):
   209  	//   1. intentionally inject a random failpoint.
   210  }
   211  
   212  // mustCreateDB is created in place of `btesting.MustCreateDB`, and it's
   213  // only supposed to be used by the concurrent test case. The purpose is
   214  // to ensure the test case can be executed on old branches or versions,
   215  // e.g. `release-1.3` or `1.3.[5-7]`.
   216  func mustCreateDB(t *testing.T, o *bolt.Options) *bolt.DB {
   217  	f := filepath.Join(t.TempDir(), "db")
   218  
   219  	return mustOpenDB(t, f, o)
   220  }
   221  
   222  func mustReOpenDB(t *testing.T, db *bolt.DB, o *bolt.Options) *bolt.DB {
   223  	f := db.Path()
   224  
   225  	t.Logf("Closing bbolt DB at: %s", f)
   226  	err := db.Close()
   227  	require.NoError(t, err)
   228  
   229  	return mustOpenDB(t, f, o)
   230  }
   231  
   232  func mustOpenDB(t *testing.T, dbPath string, o *bolt.Options) *bolt.DB {
   233  	t.Logf("Opening bbolt DB at: %s", dbPath)
   234  	if o == nil {
   235  		o = bolt.DefaultOptions
   236  	}
   237  
   238  	freelistType := bolt.FreelistArrayType
   239  	if env := os.Getenv("TEST_FREELIST_TYPE"); env == string(bolt.FreelistMapType) {
   240  		freelistType = bolt.FreelistMapType
   241  	}
   242  
   243  	o.FreelistType = freelistType
   244  
   245  	db, err := bolt.Open(dbPath, 0600, o)
   246  	require.NoError(t, err)
   247  
   248  	return db
   249  }
   250  
   251  func checkConsistency(t *testing.T, db *bolt.DB) error {
   252  	return db.View(func(tx *bolt.Tx) error {
   253  		cnt := 0
   254  		for err := range tx.Check() {
   255  			t.Errorf("Consistency error: %v", err)
   256  			cnt++
   257  		}
   258  		if cnt > 0 {
   259  			return fmt.Errorf("%d consistency errors found", cnt)
   260  		}
   261  		return nil
   262  	})
   263  }
   264  
   265  /*
   266  *********************************************************
   267  Data structures and functions/methods for running concurrent
   268  workers, which execute different operations, including `Read`,
   269  `Write` and `Delete`.
   270  *********************************************************
   271  */
   272  func runWorkers(t *testing.T,
   273  	db *bolt.DB,
   274  	workerCount int,
   275  	conf concurrentConfig,
   276  	testDuration time.Duration) historyRecords {
   277  	stopCh := make(chan struct{}, 1)
   278  	errCh := make(chan error, workerCount)
   279  
   280  	var mu sync.Mutex
   281  	var rs historyRecords
   282  
   283  	g := new(errgroup.Group)
   284  	for i := 0; i < workerCount; i++ {
   285  		w := &worker{
   286  			id: i,
   287  			db: db,
   288  
   289  			conf: conf,
   290  
   291  			errCh:  errCh,
   292  			stopCh: stopCh,
   293  			t:      t,
   294  		}
   295  		g.Go(func() error {
   296  			wrs, err := runWorker(t, w, errCh)
   297  			mu.Lock()
   298  			rs = append(rs, wrs...)
   299  			mu.Unlock()
   300  			return err
   301  		})
   302  	}
   303  
   304  	t.Logf("Keep all workers running for about %s.", testDuration)
   305  	select {
   306  	case <-time.After(testDuration):
   307  	case <-errCh:
   308  	}
   309  
   310  	close(stopCh)
   311  	t.Log("Waiting for all workers to finish.")
   312  	if err := g.Wait(); err != nil {
   313  		t.Errorf("Received error: %v", err)
   314  	}
   315  
   316  	return rs
   317  }
   318  
   319  func runWorker(t *testing.T, w *worker, errCh chan error) (historyRecords, error) {
   320  	rs, err := w.run()
   321  	if len(rs) > 0 && err == nil {
   322  		if terr := validateIncrementalTxid(rs); terr != nil {
   323  			txidErr := fmt.Errorf("[%s]: %w", w.name(), terr)
   324  			t.Error(txidErr)
   325  			errCh <- txidErr
   326  			return rs, txidErr
   327  		}
   328  	}
   329  	return rs, err
   330  }
   331  
   332  type worker struct {
   333  	id int
   334  	db *bolt.DB
   335  
   336  	conf concurrentConfig
   337  
   338  	errCh  chan error
   339  	stopCh chan struct{}
   340  
   341  	t *testing.T
   342  }
   343  
   344  func (w *worker) name() string {
   345  	return fmt.Sprintf("worker-%d", w.id)
   346  }
   347  
   348  func (w *worker) run() (historyRecords, error) {
   349  	var rs historyRecords
   350  	for {
   351  		select {
   352  		case <-w.stopCh:
   353  			w.t.Logf("%q finished.", w.name())
   354  			return rs, nil
   355  		default:
   356  		}
   357  
   358  		op := w.pickOperation()
   359  		bucket, key := w.pickBucket(), w.pickKey()
   360  		rec, err := executeOperation(op, w.db, bucket, key, w.conf)
   361  		if err != nil {
   362  			readErr := fmt.Errorf("[%s: %s]: %w", w.name(), op, err)
   363  			w.t.Error(readErr)
   364  			w.errCh <- readErr
   365  			return rs, readErr
   366  		}
   367  
   368  		rs = append(rs, rec)
   369  		if w.conf.workInterval != (duration{}) {
   370  			time.Sleep(randomDurationInRange(w.conf.workInterval.min, w.conf.workInterval.max))
   371  		}
   372  	}
   373  }
   374  
   375  func (w *worker) pickBucket() []byte {
   376  	return bucketName(mrand.Intn(w.conf.bucketCount))
   377  }
   378  
   379  func bucketName(index int) []byte {
   380  	bucket := fmt.Sprintf("%s_%d", bucketPrefix, index)
   381  	return []byte(bucket)
   382  }
   383  
   384  func (w *worker) pickKey() []byte {
   385  	key := fmt.Sprintf("%s_%d", keyPrefix, mrand.Intn(w.conf.keyCount))
   386  	return []byte(key)
   387  }
   388  
   389  func (w *worker) pickOperation() OperationType {
   390  	sum := 0
   391  	for _, op := range w.conf.operationRatio {
   392  		sum += op.chance
   393  	}
   394  	roll := mrand.Int() % sum
   395  	for _, op := range w.conf.operationRatio {
   396  		if roll < op.chance {
   397  			return op.operation
   398  		}
   399  		roll -= op.chance
   400  	}
   401  	panic("unexpected")
   402  }
   403  
   404  func executeOperation(op OperationType, db *bolt.DB, bucket []byte, key []byte, conf concurrentConfig) (historyRecord, error) {
   405  	switch op {
   406  	case Read:
   407  		return executeRead(db, bucket, key, conf.readInterval)
   408  	case Write:
   409  		return executeWrite(db, bucket, key, conf.writeBytes, conf.noopWriteRatio)
   410  	case Delete:
   411  		return executeDelete(db, bucket, key)
   412  	default:
   413  		panic(fmt.Sprintf("unexpected operation type: %s", op))
   414  	}
   415  }
   416  
   417  func executeRead(db *bolt.DB, bucket []byte, key []byte, readInterval duration) (historyRecord, error) {
   418  	var rec historyRecord
   419  	err := db.View(func(tx *bolt.Tx) error {
   420  		b := tx.Bucket(bucket)
   421  
   422  		initialVal := b.Get(key)
   423  		time.Sleep(randomDurationInRange(readInterval.min, readInterval.max))
   424  		val := b.Get(key)
   425  
   426  		if !bytes.Equal(initialVal, val) {
   427  			return fmt.Errorf("read different values for the same key (%q), value1: %q, value2: %q",
   428  				string(key), formatBytes(initialVal), formatBytes(val))
   429  		}
   430  
   431  		clonedVal := make([]byte, len(val))
   432  		copy(clonedVal, val)
   433  
   434  		rec = historyRecord{
   435  			OperationType: Read,
   436  			Bucket:        string(bucket),
   437  			Key:           string(key),
   438  			Value:         clonedVal,
   439  			Txid:          tx.ID(),
   440  		}
   441  
   442  		return nil
   443  	})
   444  
   445  	return rec, err
   446  }
   447  
   448  func executeWrite(db *bolt.DB, bucket []byte, key []byte, writeBytes bytesRange, noopWriteRatio int) (historyRecord, error) {
   449  	var rec historyRecord
   450  
   451  	err := db.Update(func(tx *bolt.Tx) error {
   452  		if mrand.Intn(100) < noopWriteRatio {
   453  			// A no-op write transaction has two consequences:
   454  			//    1. The txid increases by 1;
   455  			//    2. Two meta pages point to the same root page.
   456  			rec = historyRecord{
   457  				OperationType: Write,
   458  				Bucket:        string(bucket),
   459  				Key:           noopTxKey,
   460  				Value:         nil,
   461  				Txid:          tx.ID(),
   462  			}
   463  			return nil
   464  		}
   465  
   466  		b := tx.Bucket(bucket)
   467  
   468  		valueBytes := randomIntInRange(writeBytes.min, writeBytes.max)
   469  		v := make([]byte, valueBytes)
   470  		if _, cErr := crand.Read(v); cErr != nil {
   471  			return cErr
   472  		}
   473  
   474  		putErr := b.Put(key, v)
   475  		if putErr == nil {
   476  			rec = historyRecord{
   477  				OperationType: Write,
   478  				Bucket:        string(bucket),
   479  				Key:           string(key),
   480  				Value:         v,
   481  				Txid:          tx.ID(),
   482  			}
   483  		}
   484  
   485  		return putErr
   486  	})
   487  
   488  	return rec, err
   489  }
   490  
   491  func executeDelete(db *bolt.DB, bucket []byte, key []byte) (historyRecord, error) {
   492  	var rec historyRecord
   493  
   494  	err := db.Update(func(tx *bolt.Tx) error {
   495  		b := tx.Bucket(bucket)
   496  
   497  		deleteErr := b.Delete(key)
   498  		if deleteErr == nil {
   499  			rec = historyRecord{
   500  				OperationType: Delete,
   501  				Bucket:        string(bucket),
   502  				Key:           string(key),
   503  				Txid:          tx.ID(),
   504  			}
   505  		}
   506  
   507  		return deleteErr
   508  	})
   509  
   510  	return rec, err
   511  }
   512  
   513  func randomDurationInRange(min, max time.Duration) time.Duration {
   514  	d := int64(max) - int64(min)
   515  	d = int64(mrand.Intn(int(d))) + int64(min)
   516  	return time.Duration(d)
   517  }
   518  
   519  func randomIntInRange(min, max int) int {
   520  	return mrand.Intn(max-min) + min
   521  }
   522  
   523  func formatBytes(val []byte) string {
   524  	if utf8.ValidString(string(val)) {
   525  		return string(val)
   526  	}
   527  
   528  	return hex.EncodeToString(val)
   529  }
   530  
   531  /*
   532  *********************************************************
   533  Functions for persisting test data, including db file
   534  and operation history
   535  *********************************************************
   536  */
   537  func saveDataIfFailed(t *testing.T, db *bolt.DB, rs historyRecords, force bool) {
   538  	if t.Failed() || force {
   539  		t.Log("Saving data...")
   540  		dbPath := db.Path()
   541  		if err := db.Close(); err != nil {
   542  			t.Errorf("Failed to close db: %v", err)
   543  		}
   544  		backupPath := testResultsDirectory(t)
   545  		backupDB(t, dbPath, backupPath)
   546  		persistHistoryRecords(t, rs, backupPath)
   547  	}
   548  }
   549  
   550  func backupDB(t *testing.T, srcPath string, dstPath string) {
   551  	targetFile := filepath.Join(dstPath, "db.bak")
   552  	t.Logf("Saving the DB file to %s", targetFile)
   553  	err := copyFile(srcPath, targetFile)
   554  	require.NoError(t, err)
   555  	t.Logf("DB file saved to %s", targetFile)
   556  }
   557  
   558  func copyFile(srcPath, dstPath string) error {
   559  	// Ensure source file exists.
   560  	_, err := os.Stat(srcPath)
   561  	if os.IsNotExist(err) {
   562  		return fmt.Errorf("source file %q not found", srcPath)
   563  	} else if err != nil {
   564  		return err
   565  	}
   566  
   567  	// Ensure output file not exist.
   568  	_, err = os.Stat(dstPath)
   569  	if err == nil {
   570  		return fmt.Errorf("output file %q already exists", dstPath)
   571  	} else if !os.IsNotExist(err) {
   572  		return err
   573  	}
   574  
   575  	srcDB, err := os.Open(srcPath)
   576  	if err != nil {
   577  		return fmt.Errorf("failed to open source file %q: %w", srcPath, err)
   578  	}
   579  	defer srcDB.Close()
   580  	dstDB, err := os.Create(dstPath)
   581  	if err != nil {
   582  		return fmt.Errorf("failed to create output file %q: %w", dstPath, err)
   583  	}
   584  	defer dstDB.Close()
   585  	written, err := io.Copy(dstDB, srcDB)
   586  	if err != nil {
   587  		return fmt.Errorf("failed to copy database file from %q to %q: %w", srcPath, dstPath, err)
   588  	}
   589  
   590  	srcFi, err := srcDB.Stat()
   591  	if err != nil {
   592  		return fmt.Errorf("failed to get source file info %q: %w", srcPath, err)
   593  	}
   594  	initialSize := srcFi.Size()
   595  	if initialSize != written {
   596  		return fmt.Errorf("the byte copied (%q: %d) isn't equal to the initial db size (%q: %d)", dstPath, written, srcPath, initialSize)
   597  	}
   598  
   599  	return nil
   600  }
   601  
   602  func persistHistoryRecords(t *testing.T, rs historyRecords, path string) {
   603  	recordFilePath := filepath.Join(path, "history_records.json")
   604  	t.Logf("Saving history records to %s", recordFilePath)
   605  	recordFile, err := os.OpenFile(recordFilePath, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0755)
   606  	require.NoError(t, err)
   607  	defer recordFile.Close()
   608  	encoder := json.NewEncoder(recordFile)
   609  	for _, rec := range rs {
   610  		err := encoder.Encode(rec)
   611  		require.NoError(t, err)
   612  	}
   613  }
   614  
   615  func testResultsDirectory(t *testing.T) string {
   616  	resultsDirectory, ok := os.LookupEnv("RESULTS_DIR")
   617  	var err error
   618  	if !ok {
   619  		resultsDirectory, err = os.MkdirTemp("", "*.db")
   620  		require.NoError(t, err)
   621  	}
   622  	resultsDirectory, err = filepath.Abs(resultsDirectory)
   623  	require.NoError(t, err)
   624  
   625  	path, err := filepath.Abs(filepath.Join(resultsDirectory, strings.ReplaceAll(t.Name(), "/", "_")))
   626  	require.NoError(t, err)
   627  
   628  	err = os.RemoveAll(path)
   629  	require.NoError(t, err)
   630  
   631  	err = os.MkdirAll(path, 0700)
   632  	require.NoError(t, err)
   633  
   634  	return path
   635  }
   636  
   637  /*
   638  *********************************************************
   639  Data structures and functions for analyzing history records
   640  *********************************************************
   641  */
   642  type OperationType string
   643  
   644  const (
   645  	Read   OperationType = "read"
   646  	Write  OperationType = "write"
   647  	Delete OperationType = "delete"
   648  )
   649  
   650  type historyRecord struct {
   651  	OperationType OperationType `json:"operationType,omitempty"`
   652  	Txid          int           `json:"txid,omitempty"`
   653  	Bucket        string        `json:"bucket,omitempty"`
   654  	Key           string        `json:"key,omitempty"`
   655  	Value         []byte        `json:"value,omitempty"`
   656  }
   657  
   658  type historyRecords []historyRecord
   659  
   660  func (rs historyRecords) Len() int {
   661  	return len(rs)
   662  }
   663  
   664  func (rs historyRecords) Less(i, j int) bool {
   665  	// Sorted by (bucket, key) firstly: all records in the same
   666  	// (bucket, key) are grouped together.
   667  	bucketCmp := strings.Compare(rs[i].Bucket, rs[j].Bucket)
   668  	if bucketCmp != 0 {
   669  		return bucketCmp < 0
   670  	}
   671  	keyCmp := strings.Compare(rs[i].Key, rs[j].Key)
   672  	if keyCmp != 0 {
   673  		return keyCmp < 0
   674  	}
   675  
   676  	// Sorted by txid
   677  	if rs[i].Txid != rs[j].Txid {
   678  		return rs[i].Txid < rs[j].Txid
   679  	}
   680  
   681  	// Sorted by operation type: put `Read` after other operation types
   682  	// if they operate on the same (bucket, key) and have the same txid.
   683  	if rs[i].OperationType == Read {
   684  		return false
   685  	}
   686  
   687  	return true
   688  }
   689  
   690  func (rs historyRecords) Swap(i, j int) {
   691  	rs[i], rs[j] = rs[j], rs[i]
   692  }
   693  
   694  func validateIncrementalTxid(rs historyRecords) error {
   695  	lastTxid := rs[0].Txid
   696  
   697  	for i := 1; i < len(rs); i++ {
   698  		if (rs[i].OperationType == Read && rs[i].Txid < lastTxid) || (rs[i].OperationType != Read && rs[i].Txid <= lastTxid) {
   699  			return fmt.Errorf("detected non-incremental txid(%d, %d) in %s mode", lastTxid, rs[i].Txid, rs[i].OperationType)
   700  		}
   701  		lastTxid = rs[i].Txid
   702  	}
   703  
   704  	return nil
   705  }
   706  
   707  func validateSequential(rs historyRecords) error {
   708  	sort.Sort(rs)
   709  
   710  	type bucketAndKey struct {
   711  		bucket string
   712  		key    string
   713  	}
   714  	lastWriteKeyValueMap := make(map[bucketAndKey]*historyRecord)
   715  
   716  	for _, rec := range rs {
   717  		bk := bucketAndKey{
   718  			bucket: rec.Bucket,
   719  			key:    rec.Key,
   720  		}
   721  		if v, ok := lastWriteKeyValueMap[bk]; ok {
   722  			if rec.OperationType == Write {
   723  				v.Txid = rec.Txid
   724  				if rec.Key != noopTxKey {
   725  					v.Value = rec.Value
   726  				}
   727  			} else if rec.OperationType == Delete {
   728  				delete(lastWriteKeyValueMap, bk)
   729  			} else {
   730  				if !bytes.Equal(v.Value, rec.Value) {
   731  					return fmt.Errorf("readOperation[txid: %d, bucket: %s, key: %s] read %x, \nbut writer[txid: %d] wrote %x",
   732  						rec.Txid, rec.Bucket, rec.Key, rec.Value, v.Txid, v.Value)
   733  				}
   734  			}
   735  		} else {
   736  			if rec.OperationType == Write && rec.Key != noopTxKey {
   737  				lastWriteKeyValueMap[bk] = &historyRecord{
   738  					OperationType: Write,
   739  					Bucket:        rec.Bucket,
   740  					Key:           rec.Key,
   741  					Value:         rec.Value,
   742  					Txid:          rec.Txid,
   743  				}
   744  			} else if rec.OperationType == Read {
   745  				if len(rec.Value) != 0 {
   746  					return fmt.Errorf("expected the first readOperation[txid: %d, bucket: %s, key: %s] read nil, \nbut got %x",
   747  						rec.Txid, rec.Bucket, rec.Key, rec.Value)
   748  				}
   749  			}
   750  		}
   751  	}
   752  
   753  	return nil
   754  }
   755  
   756  /*
   757  TestConcurrentRepeatableRead verifies repeatable read. The case
   758  intentionally creates a scenario that read and write transactions
   759  are interleaved. It performs several writing operations after starting
   760  each long-running read transaction to ensure it has a larger txid
   761  than previous read transaction. It verifies that bbolt correctly
   762  releases free pages, and will not pollute (e.g. prematurely release)
   763  any pages which are still being used by any read transaction.
   764  */
   765  func TestConcurrentRepeatableRead(t *testing.T) {
   766  	if testing.Short() {
   767  		t.Skip("skipping test in short mode.")
   768  	}
   769  
   770  	testCases := []struct {
   771  		name           string
   772  		noFreelistSync bool
   773  		freelistType   bolt.FreelistType
   774  	}{
   775  		// [array] freelist
   776  		{
   777  			name:           "sync array freelist",
   778  			noFreelistSync: false,
   779  			freelistType:   bolt.FreelistArrayType,
   780  		},
   781  		{
   782  			name:           "not sync array freelist",
   783  			noFreelistSync: true,
   784  			freelistType:   bolt.FreelistArrayType,
   785  		},
   786  		// [map] freelist
   787  		{
   788  			name:           "sync map freelist",
   789  			noFreelistSync: false,
   790  			freelistType:   bolt.FreelistMapType,
   791  		},
   792  		{
   793  			name:           "not sync map freelist",
   794  			noFreelistSync: true,
   795  			freelistType:   bolt.FreelistMapType,
   796  		},
   797  	}
   798  
   799  	for _, tc := range testCases {
   800  		tc := tc
   801  		t.Run(tc.name, func(t *testing.T) {
   802  
   803  			t.Log("Preparing db.")
   804  			var (
   805  				bucket = []byte("data")
   806  				key    = []byte("mykey")
   807  
   808  				option = &bolt.Options{
   809  					PageSize:       4096,
   810  					NoFreelistSync: tc.noFreelistSync,
   811  					FreelistType:   tc.freelistType,
   812  				}
   813  			)
   814  
   815  			db := mustCreateDB(t, option)
   816  			defer func() {
   817  				// The db will be reopened later, so put `db.Close()` in a function
   818  				// to avoid premature evaluation of `db`. Note that the execution
   819  				// of a deferred function is deferred to the moment the surrounding
   820  				// function returns, but the function value and parameters to the
   821  				// call are evaluated as usual and saved anew.
   822  				db.Close()
   823  			}()
   824  
   825  			// Create lots of K/V to allocate some pages
   826  			err := db.Update(func(tx *bolt.Tx) error {
   827  				b, err := tx.CreateBucketIfNotExists(bucket)
   828  				if err != nil {
   829  					return err
   830  				}
   831  				for i := 0; i < 1000; i++ {
   832  					k := fmt.Sprintf("key_%d", i)
   833  					if err := b.Put([]byte(k), make([]byte, 1024)); err != nil {
   834  						return err
   835  					}
   836  				}
   837  				return nil
   838  			})
   839  			require.NoError(t, err)
   840  
   841  			// Remove all K/V to create some free pages
   842  			err = db.Update(func(tx *bolt.Tx) error {
   843  				b := tx.Bucket(bucket)
   844  				for i := 0; i < 1000; i++ {
   845  					k := fmt.Sprintf("key_%d", i)
   846  					if err := b.Delete([]byte(k)); err != nil {
   847  						return err
   848  					}
   849  				}
   850  				return b.Put(key, []byte("randomValue"))
   851  			})
   852  			require.NoError(t, err)
   853  
   854  			// bbolt will not release free pages directly after committing
   855  			// a writing transaction; instead all pages freed are putting
   856  			// into a pending list. Accordingly, the free pages might not
   857  			// be able to be reused by following writing transactions. So
   858  			// we reopen the db to completely release all free pages.
   859  			db = mustReOpenDB(t, db, option)
   860  
   861  			var (
   862  				wg                     sync.WaitGroup
   863  				longRunningReaderCount = 10
   864  				stopCh                 = make(chan struct{})
   865  				errCh                  = make(chan error, longRunningReaderCount)
   866  				readInterval           = duration{5 * time.Millisecond, 10 * time.Millisecond}
   867  
   868  				writeOperationCountInBetween = 5
   869  				writeBytes                   = bytesRange{10, 20}
   870  
   871  				testDuration = 10 * time.Second
   872  			)
   873  
   874  			for i := 0; i < longRunningReaderCount; i++ {
   875  				readWorkerName := fmt.Sprintf("reader_%d", i)
   876  				t.Logf("Starting long running read operation: %s", readWorkerName)
   877  				wg.Add(1)
   878  				go func() {
   879  					defer wg.Done()
   880  					rErr := executeLongRunningRead(t, readWorkerName, db, bucket, key, readInterval, stopCh)
   881  					if rErr != nil {
   882  						errCh <- rErr
   883  					}
   884  				}()
   885  				time.Sleep(500 * time.Millisecond)
   886  
   887  				t.Logf("Perform %d write operations after starting a long running read operation", writeOperationCountInBetween)
   888  				for j := 0; j < writeOperationCountInBetween; j++ {
   889  					_, err := executeWrite(db, bucket, key, writeBytes, 0)
   890  					require.NoError(t, err)
   891  				}
   892  			}
   893  
   894  			t.Log("Perform lots of write operations to check whether the long running read operations will read dirty data")
   895  			wg.Add(1)
   896  			go func() {
   897  				defer wg.Done()
   898  				cnt := longRunningReaderCount * writeOperationCountInBetween
   899  				for i := 0; i < cnt; i++ {
   900  					select {
   901  					case <-stopCh:
   902  						return
   903  					default:
   904  					}
   905  					_, err := executeWrite(db, bucket, key, writeBytes, 0)
   906  					require.NoError(t, err)
   907  				}
   908  			}()
   909  
   910  			t.Log("Waiting for result")
   911  			select {
   912  			case err := <-errCh:
   913  				close(stopCh)
   914  				t.Errorf("Detected dirty read: %v", err)
   915  			case <-time.After(testDuration):
   916  				close(stopCh)
   917  			}
   918  
   919  			wg.Wait()
   920  		})
   921  	}
   922  }
   923  
   924  func executeLongRunningRead(t *testing.T, name string, db *bolt.DB, bucket []byte, key []byte, readInterval duration, stopCh chan struct{}) error {
   925  	err := db.View(func(tx *bolt.Tx) error {
   926  		b := tx.Bucket(bucket)
   927  
   928  		initialVal := b.Get(key)
   929  
   930  		for {
   931  			select {
   932  			case <-stopCh:
   933  				t.Logf("%q finished.", name)
   934  				return nil
   935  			default:
   936  			}
   937  
   938  			time.Sleep(randomDurationInRange(readInterval.min, readInterval.max))
   939  			val := b.Get(key)
   940  
   941  			if !bytes.Equal(initialVal, val) {
   942  				dirtyReadErr := fmt.Errorf("read different values for the same key (%q), value1: %q, value2: %q",
   943  					string(key), formatBytes(initialVal), formatBytes(val))
   944  				return dirtyReadErr
   945  			}
   946  		}
   947  	})
   948  
   949  	return err
   950  }
   951  

View as plain text