...

Source file src/go.etcd.io/bbolt/simulation_test.go

Documentation: go.etcd.io/bbolt

     1  package bbolt_test
     2  
     3  import (
     4  	"bytes"
     5  	"fmt"
     6  	"math/rand"
     7  	"sync"
     8  	"sync/atomic"
     9  	"testing"
    10  
    11  	bolt "go.etcd.io/bbolt"
    12  	"go.etcd.io/bbolt/internal/btesting"
    13  )
    14  
    15  func TestSimulate_1op_1p(t *testing.T)     { testSimulate(t, nil, 1, 1, 1) }
    16  func TestSimulate_10op_1p(t *testing.T)    { testSimulate(t, nil, 1, 10, 1) }
    17  func TestSimulate_100op_1p(t *testing.T)   { testSimulate(t, nil, 1, 100, 1) }
    18  func TestSimulate_1000op_1p(t *testing.T)  { testSimulate(t, nil, 1, 1000, 1) }
    19  func TestSimulate_10000op_1p(t *testing.T) { testSimulate(t, nil, 1, 10000, 1) }
    20  
    21  func TestSimulate_10op_10p(t *testing.T)    { testSimulate(t, nil, 1, 10, 10) }
    22  func TestSimulate_100op_10p(t *testing.T)   { testSimulate(t, nil, 1, 100, 10) }
    23  func TestSimulate_1000op_10p(t *testing.T)  { testSimulate(t, nil, 1, 1000, 10) }
    24  func TestSimulate_10000op_10p(t *testing.T) { testSimulate(t, nil, 1, 10000, 10) }
    25  
    26  func TestSimulate_100op_100p(t *testing.T)   { testSimulate(t, nil, 1, 100, 100) }
    27  func TestSimulate_1000op_100p(t *testing.T)  { testSimulate(t, nil, 1, 1000, 100) }
    28  func TestSimulate_10000op_100p(t *testing.T) { testSimulate(t, nil, 1, 10000, 100) }
    29  
    30  func TestSimulate_10000op_1000p(t *testing.T) { testSimulate(t, nil, 1, 10000, 1000) }
    31  
    32  // Randomly generate operations on a given database with multiple clients to ensure consistency and thread safety.
    33  func testSimulate(t *testing.T, openOption *bolt.Options, round, threadCount, parallelism int) {
    34  	if testing.Short() {
    35  		t.Skip("skipping test in short mode.")
    36  	}
    37  
    38  	rand.Seed(int64(qseed))
    39  
    40  	// A list of operations that readers and writers can perform.
    41  	var readerHandlers = []simulateHandler{simulateGetHandler}
    42  	var writerHandlers = []simulateHandler{simulateGetHandler, simulatePutHandler}
    43  
    44  	var versions = make(map[int]*QuickDB)
    45  	versions[1] = NewQuickDB()
    46  
    47  	db := btesting.MustCreateDBWithOption(t, openOption)
    48  
    49  	var mutex sync.Mutex
    50  
    51  	for n := 0; n < round; n++ {
    52  		// Run n threads in parallel, each with their own operation.
    53  		var threads = make(chan bool, parallelism)
    54  		var wg sync.WaitGroup
    55  
    56  		// counter for how many goroutines were fired
    57  		var opCount int64
    58  
    59  		// counter for ignored operations
    60  		var igCount int64
    61  
    62  		var errCh = make(chan error, threadCount)
    63  
    64  		var i int
    65  		for {
    66  			// this buffered channel will keep accepting booleans
    67  			// until it hits the limit defined by the parallelism
    68  			// argument to testSimulate()
    69  			threads <- true
    70  
    71  			// this wait group can only be marked "done" from inside
    72  			// the subsequent goroutine
    73  			wg.Add(1)
    74  			writable := ((rand.Int() % 100) < 20) // 20% writers
    75  
    76  			// Choose an operation to execute.
    77  			var handler simulateHandler
    78  			if writable {
    79  				handler = writerHandlers[rand.Intn(len(writerHandlers))]
    80  			} else {
    81  				handler = readerHandlers[rand.Intn(len(readerHandlers))]
    82  			}
    83  
    84  			// Execute a thread for the given operation.
    85  			go func(writable bool, handler simulateHandler) {
    86  				defer wg.Done()
    87  				atomic.AddInt64(&opCount, 1)
    88  				// Start transaction.
    89  				tx, err := db.Begin(writable)
    90  				if err != nil {
    91  					errCh <- fmt.Errorf("error tx begin: %v", err)
    92  					return
    93  				}
    94  
    95  				// Obtain current state of the dataset.
    96  				mutex.Lock()
    97  				var qdb = versions[tx.ID()]
    98  				if writable {
    99  					qdb = versions[tx.ID()-1].Copy()
   100  				}
   101  				mutex.Unlock()
   102  
   103  				// Make sure we commit/rollback the tx at the end and update the state.
   104  				if writable {
   105  					defer func() {
   106  						mutex.Lock()
   107  						versions[tx.ID()] = qdb
   108  						mutex.Unlock()
   109  
   110  						if err := tx.Commit(); err != nil {
   111  							errCh <- err
   112  							return
   113  						}
   114  					}()
   115  				} else {
   116  					defer func() { _ = tx.Rollback() }()
   117  				}
   118  
   119  				// Ignore operation if we don't have data yet.
   120  				if qdb == nil {
   121  					atomic.AddInt64(&igCount, 1)
   122  					return
   123  				}
   124  
   125  				// Execute handler.
   126  				handler(tx, qdb)
   127  
   128  				// Release a thread back to the scheduling loop.
   129  				<-threads
   130  			}(writable, handler)
   131  
   132  			i++
   133  			if i >= threadCount {
   134  				break
   135  			}
   136  		}
   137  
   138  		// Wait until all threads are done.
   139  		wg.Wait()
   140  		t.Logf("transactions:%d ignored:%d", opCount, igCount)
   141  		close(errCh)
   142  		for err := range errCh {
   143  			if err != nil {
   144  				t.Fatalf("error from inside goroutine: %v", err)
   145  			}
   146  		}
   147  
   148  		db.MustClose()
   149  		// I have doubts the DB drop is indented here (as 'versions' is not being reset).
   150  		// But I'm preserving for now the original behavior.
   151  		db.MustDeleteFile()
   152  		db.MustReopen()
   153  	}
   154  
   155  }
   156  
   157  type simulateHandler func(tx *bolt.Tx, qdb *QuickDB)
   158  
   159  // Retrieves a key from the database and verifies that it is what is expected.
   160  func simulateGetHandler(tx *bolt.Tx, qdb *QuickDB) {
   161  	// Randomly retrieve an existing exist.
   162  	keys := qdb.Rand()
   163  	if len(keys) == 0 {
   164  		return
   165  	}
   166  
   167  	// Retrieve root bucket.
   168  	b := tx.Bucket(keys[0])
   169  	if b == nil {
   170  		panic(fmt.Sprintf("bucket[0] expected: %08x\n", trunc(keys[0], 4)))
   171  	}
   172  
   173  	// Drill into nested buckets.
   174  	for _, key := range keys[1 : len(keys)-1] {
   175  		b = b.Bucket(key)
   176  		if b == nil {
   177  			panic(fmt.Sprintf("bucket[n] expected: %v -> %v\n", keys, key))
   178  		}
   179  	}
   180  
   181  	// Verify key/value on the final bucket.
   182  	expected := qdb.Get(keys)
   183  	actual := b.Get(keys[len(keys)-1])
   184  	if !bytes.Equal(actual, expected) {
   185  		fmt.Println("=== EXPECTED ===")
   186  		fmt.Println(expected)
   187  		fmt.Println("=== ACTUAL ===")
   188  		fmt.Println(actual)
   189  		fmt.Println("=== END ===")
   190  		panic("value mismatch")
   191  	}
   192  }
   193  
   194  // Inserts a key into the database.
   195  func simulatePutHandler(tx *bolt.Tx, qdb *QuickDB) {
   196  	var err error
   197  	keys, value := randKeys(), randValue()
   198  
   199  	// Retrieve root bucket.
   200  	b := tx.Bucket(keys[0])
   201  	if b == nil {
   202  		b, err = tx.CreateBucket(keys[0])
   203  		if err != nil {
   204  			panic("create bucket: " + err.Error())
   205  		}
   206  	}
   207  
   208  	// Create nested buckets, if necessary.
   209  	for _, key := range keys[1 : len(keys)-1] {
   210  		child := b.Bucket(key)
   211  		if child != nil {
   212  			b = child
   213  		} else {
   214  			b, err = b.CreateBucket(key)
   215  			if err != nil {
   216  				panic("create bucket: " + err.Error())
   217  			}
   218  		}
   219  	}
   220  
   221  	// Insert into database.
   222  	if err := b.Put(keys[len(keys)-1], value); err != nil {
   223  		panic("put: " + err.Error())
   224  	}
   225  
   226  	// Insert into in-memory database.
   227  	qdb.Put(keys, value)
   228  }
   229  
   230  // QuickDB is an in-memory database that replicates the functionality of the
   231  // Bolt DB type except that it is entirely in-memory. It is meant for testing
   232  // that the Bolt database is consistent.
   233  type QuickDB struct {
   234  	sync.RWMutex
   235  	m map[string]interface{}
   236  }
   237  
   238  // NewQuickDB returns an instance of QuickDB.
   239  func NewQuickDB() *QuickDB {
   240  	return &QuickDB{m: make(map[string]interface{})}
   241  }
   242  
   243  // Get retrieves the value at a key path.
   244  func (db *QuickDB) Get(keys [][]byte) []byte {
   245  	db.RLock()
   246  	defer db.RUnlock()
   247  
   248  	m := db.m
   249  	for _, key := range keys[:len(keys)-1] {
   250  		value := m[string(key)]
   251  		if value == nil {
   252  			return nil
   253  		}
   254  		switch value := value.(type) {
   255  		case map[string]interface{}:
   256  			m = value
   257  		case []byte:
   258  			return nil
   259  		}
   260  	}
   261  
   262  	// Only return if it's a simple value.
   263  	if value, ok := m[string(keys[len(keys)-1])].([]byte); ok {
   264  		return value
   265  	}
   266  	return nil
   267  }
   268  
   269  // Put inserts a value into a key path.
   270  func (db *QuickDB) Put(keys [][]byte, value []byte) {
   271  	db.Lock()
   272  	defer db.Unlock()
   273  
   274  	// Build buckets all the way down the key path.
   275  	m := db.m
   276  	for _, key := range keys[:len(keys)-1] {
   277  		if _, ok := m[string(key)].([]byte); ok {
   278  			return // Keypath intersects with a simple value. Do nothing.
   279  		}
   280  
   281  		if m[string(key)] == nil {
   282  			m[string(key)] = make(map[string]interface{})
   283  		}
   284  		m = m[string(key)].(map[string]interface{})
   285  	}
   286  
   287  	// Insert value into the last key.
   288  	m[string(keys[len(keys)-1])] = value
   289  }
   290  
   291  // Rand returns a random key path that points to a simple value.
   292  func (db *QuickDB) Rand() [][]byte {
   293  	db.RLock()
   294  	defer db.RUnlock()
   295  	if len(db.m) == 0 {
   296  		return nil
   297  	}
   298  	var keys [][]byte
   299  	db.rand(db.m, &keys)
   300  	return keys
   301  }
   302  
   303  func (db *QuickDB) rand(m map[string]interface{}, keys *[][]byte) {
   304  	i, index := 0, rand.Intn(len(m))
   305  	for k, v := range m {
   306  		if i == index {
   307  			*keys = append(*keys, []byte(k))
   308  			if v, ok := v.(map[string]interface{}); ok {
   309  				db.rand(v, keys)
   310  			}
   311  			return
   312  		}
   313  		i++
   314  	}
   315  	panic("quickdb rand: out-of-range")
   316  }
   317  
   318  // Copy copies the entire database.
   319  func (db *QuickDB) Copy() *QuickDB {
   320  	db.RLock()
   321  	defer db.RUnlock()
   322  	return &QuickDB{m: db.copy(db.m)}
   323  }
   324  
   325  func (db *QuickDB) copy(m map[string]interface{}) map[string]interface{} {
   326  	clone := make(map[string]interface{}, len(m))
   327  	for k, v := range m {
   328  		switch v := v.(type) {
   329  		case map[string]interface{}:
   330  			clone[k] = db.copy(v)
   331  		default:
   332  			clone[k] = v
   333  		}
   334  	}
   335  	return clone
   336  }
   337  
   338  func randKey() []byte {
   339  	var min, max = 1, 1024
   340  	n := rand.Intn(max-min) + min
   341  	b := make([]byte, n)
   342  	for i := 0; i < n; i++ {
   343  		b[i] = byte(rand.Intn(255))
   344  	}
   345  	return b
   346  }
   347  
   348  func randKeys() [][]byte {
   349  	var keys [][]byte
   350  	var count = rand.Intn(2) + 2
   351  	for i := 0; i < count; i++ {
   352  		keys = append(keys, randKey())
   353  	}
   354  	return keys
   355  }
   356  
   357  func randValue() []byte {
   358  	n := rand.Intn(8192)
   359  	b := make([]byte, n)
   360  	for i := 0; i < n; i++ {
   361  		b[i] = byte(rand.Intn(255))
   362  	}
   363  	return b
   364  }
   365  

View as plain text