...

Source file src/go.etcd.io/etcd/server/v3/mvcc/backend/batch_tx.go

Documentation: go.etcd.io/etcd/server/v3/mvcc/backend

     1  // Copyright 2015 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package backend
    16  
    17  import (
    18  	"bytes"
    19  	"math"
    20  	"sync"
    21  	"sync/atomic"
    22  	"time"
    23  
    24  	bolt "go.etcd.io/bbolt"
    25  	"go.uber.org/zap"
    26  )
    27  
    28  type BucketID int
    29  
    30  type Bucket interface {
    31  	// ID returns a unique identifier of a bucket.
    32  	// The id must NOT be persisted and can be used as lightweight identificator
    33  	// in the in-memory maps.
    34  	ID() BucketID
    35  	Name() []byte
    36  	// String implements Stringer (human readable name).
    37  	String() string
    38  
    39  	// IsSafeRangeBucket is a hack to avoid inadvertently reading duplicate keys;
    40  	// overwrites on a bucket should only fetch with limit=1, but safeRangeBucket
    41  	// is known to never overwrite any key so range is safe.
    42  	IsSafeRangeBucket() bool
    43  }
    44  
    45  type BatchTx interface {
    46  	ReadTx
    47  	UnsafeCreateBucket(bucket Bucket)
    48  	UnsafeDeleteBucket(bucket Bucket)
    49  	UnsafePut(bucket Bucket, key []byte, value []byte)
    50  	UnsafeSeqPut(bucket Bucket, key []byte, value []byte)
    51  	UnsafeDelete(bucket Bucket, key []byte)
    52  	// Commit commits a previous tx and begins a new writable one.
    53  	Commit()
    54  	// CommitAndStop commits the previous tx and does not create a new one.
    55  	CommitAndStop()
    56  	LockInsideApply()
    57  	LockOutsideApply()
    58  }
    59  
    60  type batchTx struct {
    61  	sync.Mutex
    62  	tx      *bolt.Tx
    63  	backend *backend
    64  
    65  	pending int
    66  }
    67  
    68  // Lock is supposed to be called only by the unit test.
    69  func (t *batchTx) Lock() {
    70  	ValidateCalledInsideUnittest(t.backend.lg)
    71  	t.lock()
    72  }
    73  
    74  func (t *batchTx) lock() {
    75  	t.Mutex.Lock()
    76  }
    77  
    78  func (t *batchTx) LockInsideApply() {
    79  	t.lock()
    80  	if t.backend.txPostLockInsideApplyHook != nil {
    81  		// The callers of some methods (i.e., (*RaftCluster).AddMember)
    82  		// can be coming from both InsideApply and OutsideApply, but the
    83  		// callers from OutsideApply will have a nil txPostLockInsideApplyHook.
    84  		// So we should check the txPostLockInsideApplyHook before validating
    85  		// the callstack.
    86  		ValidateCalledInsideApply(t.backend.lg)
    87  		t.backend.txPostLockInsideApplyHook()
    88  	}
    89  }
    90  
    91  func (t *batchTx) LockOutsideApply() {
    92  	ValidateCalledOutSideApply(t.backend.lg)
    93  	t.lock()
    94  }
    95  
    96  func (t *batchTx) Unlock() {
    97  	if t.pending >= t.backend.batchLimit {
    98  		t.commit(false)
    99  	}
   100  	t.Mutex.Unlock()
   101  }
   102  
   103  // BatchTx interface embeds ReadTx interface. But RLock() and RUnlock() do not
   104  // have appropriate semantics in BatchTx interface. Therefore should not be called.
   105  // TODO: might want to decouple ReadTx and BatchTx
   106  
   107  func (t *batchTx) RLock() {
   108  	panic("unexpected RLock")
   109  }
   110  
   111  func (t *batchTx) RUnlock() {
   112  	panic("unexpected RUnlock")
   113  }
   114  
   115  func (t *batchTx) UnsafeCreateBucket(bucket Bucket) {
   116  	_, err := t.tx.CreateBucket(bucket.Name())
   117  	if err != nil && err != bolt.ErrBucketExists {
   118  		t.backend.lg.Fatal(
   119  			"failed to create a bucket",
   120  			zap.Stringer("bucket-name", bucket),
   121  			zap.Error(err),
   122  		)
   123  	}
   124  	t.pending++
   125  }
   126  
   127  func (t *batchTx) UnsafeDeleteBucket(bucket Bucket) {
   128  	err := t.tx.DeleteBucket(bucket.Name())
   129  	if err != nil && err != bolt.ErrBucketNotFound {
   130  		t.backend.lg.Fatal(
   131  			"failed to delete a bucket",
   132  			zap.Stringer("bucket-name", bucket),
   133  			zap.Error(err),
   134  		)
   135  	}
   136  	t.pending++
   137  }
   138  
   139  // UnsafePut must be called holding the lock on the tx.
   140  func (t *batchTx) UnsafePut(bucket Bucket, key []byte, value []byte) {
   141  	t.unsafePut(bucket, key, value, false)
   142  }
   143  
   144  // UnsafeSeqPut must be called holding the lock on the tx.
   145  func (t *batchTx) UnsafeSeqPut(bucket Bucket, key []byte, value []byte) {
   146  	t.unsafePut(bucket, key, value, true)
   147  }
   148  
   149  func (t *batchTx) unsafePut(bucketType Bucket, key []byte, value []byte, seq bool) {
   150  	bucket := t.tx.Bucket(bucketType.Name())
   151  	if bucket == nil {
   152  		t.backend.lg.Fatal(
   153  			"failed to find a bucket",
   154  			zap.Stringer("bucket-name", bucketType),
   155  			zap.Stack("stack"),
   156  		)
   157  	}
   158  	if seq {
   159  		// it is useful to increase fill percent when the workloads are mostly append-only.
   160  		// this can delay the page split and reduce space usage.
   161  		bucket.FillPercent = 0.9
   162  	}
   163  	if err := bucket.Put(key, value); err != nil {
   164  		t.backend.lg.Fatal(
   165  			"failed to write to a bucket",
   166  			zap.Stringer("bucket-name", bucketType),
   167  			zap.Error(err),
   168  		)
   169  	}
   170  	t.pending++
   171  }
   172  
   173  // UnsafeRange must be called holding the lock on the tx.
   174  func (t *batchTx) UnsafeRange(bucketType Bucket, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
   175  	bucket := t.tx.Bucket(bucketType.Name())
   176  	if bucket == nil {
   177  		t.backend.lg.Fatal(
   178  			"failed to find a bucket",
   179  			zap.Stringer("bucket-name", bucketType),
   180  			zap.Stack("stack"),
   181  		)
   182  	}
   183  	return unsafeRange(bucket.Cursor(), key, endKey, limit)
   184  }
   185  
   186  func unsafeRange(c *bolt.Cursor, key, endKey []byte, limit int64) (keys [][]byte, vs [][]byte) {
   187  	if limit <= 0 {
   188  		limit = math.MaxInt64
   189  	}
   190  	var isMatch func(b []byte) bool
   191  	if len(endKey) > 0 {
   192  		isMatch = func(b []byte) bool { return bytes.Compare(b, endKey) < 0 }
   193  	} else {
   194  		isMatch = func(b []byte) bool { return bytes.Equal(b, key) }
   195  		limit = 1
   196  	}
   197  
   198  	for ck, cv := c.Seek(key); ck != nil && isMatch(ck); ck, cv = c.Next() {
   199  		vs = append(vs, cv)
   200  		keys = append(keys, ck)
   201  		if limit == int64(len(keys)) {
   202  			break
   203  		}
   204  	}
   205  	return keys, vs
   206  }
   207  
   208  // UnsafeDelete must be called holding the lock on the tx.
   209  func (t *batchTx) UnsafeDelete(bucketType Bucket, key []byte) {
   210  	bucket := t.tx.Bucket(bucketType.Name())
   211  	if bucket == nil {
   212  		t.backend.lg.Fatal(
   213  			"failed to find a bucket",
   214  			zap.Stringer("bucket-name", bucketType),
   215  			zap.Stack("stack"),
   216  		)
   217  	}
   218  	err := bucket.Delete(key)
   219  	if err != nil {
   220  		t.backend.lg.Fatal(
   221  			"failed to delete a key",
   222  			zap.Stringer("bucket-name", bucketType),
   223  			zap.Error(err),
   224  		)
   225  	}
   226  	t.pending++
   227  }
   228  
   229  // UnsafeForEach must be called holding the lock on the tx.
   230  func (t *batchTx) UnsafeForEach(bucket Bucket, visitor func(k, v []byte) error) error {
   231  	return unsafeForEach(t.tx, bucket, visitor)
   232  }
   233  
   234  func unsafeForEach(tx *bolt.Tx, bucket Bucket, visitor func(k, v []byte) error) error {
   235  	if b := tx.Bucket(bucket.Name()); b != nil {
   236  		return b.ForEach(visitor)
   237  	}
   238  	return nil
   239  }
   240  
   241  // Commit commits a previous tx and begins a new writable one.
   242  func (t *batchTx) Commit() {
   243  	t.lock()
   244  	t.commit(false)
   245  	t.Unlock()
   246  }
   247  
   248  // CommitAndStop commits the previous tx and does not create a new one.
   249  func (t *batchTx) CommitAndStop() {
   250  	t.lock()
   251  	t.commit(true)
   252  	t.Unlock()
   253  }
   254  
   255  func (t *batchTx) safePending() int {
   256  	t.Mutex.Lock()
   257  	defer t.Mutex.Unlock()
   258  	return t.pending
   259  }
   260  
   261  func (t *batchTx) commit(stop bool) {
   262  	// commit the last tx
   263  	if t.tx != nil {
   264  		if t.pending == 0 && !stop {
   265  			return
   266  		}
   267  
   268  		start := time.Now()
   269  
   270  		// gofail: var beforeCommit struct{}
   271  		err := t.tx.Commit()
   272  		// gofail: var afterCommit struct{}
   273  
   274  		rebalanceSec.Observe(t.tx.Stats().RebalanceTime.Seconds())
   275  		spillSec.Observe(t.tx.Stats().SpillTime.Seconds())
   276  		writeSec.Observe(t.tx.Stats().WriteTime.Seconds())
   277  		commitSec.Observe(time.Since(start).Seconds())
   278  		atomic.AddInt64(&t.backend.commits, 1)
   279  
   280  		t.pending = 0
   281  		if err != nil {
   282  			t.backend.lg.Fatal("failed to commit tx", zap.Error(err))
   283  		}
   284  	}
   285  	if !stop {
   286  		t.tx = t.backend.begin(true)
   287  	}
   288  }
   289  
   290  type batchTxBuffered struct {
   291  	batchTx
   292  	buf                     txWriteBuffer
   293  	pendingDeleteOperations int
   294  }
   295  
   296  func newBatchTxBuffered(backend *backend) *batchTxBuffered {
   297  	tx := &batchTxBuffered{
   298  		batchTx: batchTx{backend: backend},
   299  		buf: txWriteBuffer{
   300  			txBuffer:   txBuffer{make(map[BucketID]*bucketBuffer)},
   301  			bucket2seq: make(map[BucketID]bool),
   302  		},
   303  	}
   304  	tx.Commit()
   305  	return tx
   306  }
   307  
   308  func (t *batchTxBuffered) Unlock() {
   309  	if t.pending != 0 {
   310  		t.backend.readTx.Lock() // blocks txReadBuffer for writing.
   311  		// gofail: var beforeWritebackBuf struct{}
   312  		t.buf.writeback(&t.backend.readTx.buf)
   313  		t.backend.readTx.Unlock()
   314  		// We commit the transaction when the number of pending operations
   315  		// reaches the configured limit(batchLimit) to prevent it from
   316  		// becoming excessively large.
   317  		//
   318  		// But we also need to commit the transaction immediately if there
   319  		// is any pending deleting operation, otherwise etcd might run into
   320  		// a situation that it haven't finished committing the data into backend
   321  		// storage (note: etcd periodically commits the bbolt transactions
   322  		// instead of on each request) when it applies next request. Accordingly,
   323  		// etcd may still read the stale data from bbolt when processing next
   324  		// request. So it breaks the linearizability.
   325  		//
   326  		// Note we don't need to commit the transaction for put requests if
   327  		// it doesn't exceed the batch limit, because there is a buffer on top
   328  		// of the bbolt. Each time when etcd reads data from backend storage,
   329  		// it will read data from both bbolt and the buffer. But there is no
   330  		// such a buffer for delete requests.
   331  		//
   332  		// Please also refer to
   333  		// https://github.com/etcd-io/etcd/pull/17119#issuecomment-1857547158
   334  		if t.pending >= t.backend.batchLimit || t.pendingDeleteOperations > 0 {
   335  			t.commit(false)
   336  		}
   337  	}
   338  	t.batchTx.Unlock()
   339  }
   340  
   341  func (t *batchTxBuffered) Commit() {
   342  	t.lock()
   343  	t.commit(false)
   344  	t.Unlock()
   345  }
   346  
   347  func (t *batchTxBuffered) CommitAndStop() {
   348  	t.lock()
   349  	t.commit(true)
   350  	t.Unlock()
   351  }
   352  
   353  func (t *batchTxBuffered) commit(stop bool) {
   354  	// all read txs must be closed to acquire boltdb commit rwlock
   355  	t.backend.readTx.Lock()
   356  	t.unsafeCommit(stop)
   357  	t.backend.readTx.Unlock()
   358  }
   359  
   360  func (t *batchTxBuffered) unsafeCommit(stop bool) {
   361  	if t.backend.hooks != nil {
   362  		t.backend.hooks.OnPreCommitUnsafe(t)
   363  	}
   364  	if t.backend.readTx.tx != nil {
   365  		// wait all store read transactions using the current boltdb tx to finish,
   366  		// then close the boltdb tx
   367  		go func(tx *bolt.Tx, wg *sync.WaitGroup) {
   368  			wg.Wait()
   369  			if err := tx.Rollback(); err != nil {
   370  				t.backend.lg.Fatal("failed to rollback tx", zap.Error(err))
   371  			}
   372  		}(t.backend.readTx.tx, t.backend.readTx.txWg)
   373  		t.backend.readTx.reset()
   374  	}
   375  
   376  	t.batchTx.commit(stop)
   377  	t.pendingDeleteOperations = 0
   378  
   379  	if !stop {
   380  		t.backend.readTx.tx = t.backend.begin(false)
   381  	}
   382  }
   383  
   384  func (t *batchTxBuffered) UnsafePut(bucket Bucket, key []byte, value []byte) {
   385  	t.batchTx.UnsafePut(bucket, key, value)
   386  	t.buf.put(bucket, key, value)
   387  }
   388  
   389  func (t *batchTxBuffered) UnsafeSeqPut(bucket Bucket, key []byte, value []byte) {
   390  	t.batchTx.UnsafeSeqPut(bucket, key, value)
   391  	t.buf.putSeq(bucket, key, value)
   392  }
   393  
   394  func (t *batchTxBuffered) UnsafeDelete(bucketType Bucket, key []byte) {
   395  	t.batchTx.UnsafeDelete(bucketType, key)
   396  	t.pendingDeleteOperations++
   397  }
   398  
   399  func (t *batchTxBuffered) UnsafeDeleteBucket(bucket Bucket) {
   400  	t.batchTx.UnsafeDeleteBucket(bucket)
   401  	t.pendingDeleteOperations++
   402  }
   403  

View as plain text