...

Source file src/go.etcd.io/etcd/server/v3/mvcc/backend/read_tx.go

Documentation: go.etcd.io/etcd/server/v3/mvcc/backend

     1  // Copyright 2017 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package backend
    16  
    17  import (
    18  	"math"
    19  	"sync"
    20  
    21  	bolt "go.etcd.io/bbolt"
    22  )
    23  
    24  // IsSafeRangeBucket is a hack to avoid inadvertently reading duplicate keys;
    25  // overwrites on a bucket should only fetch with limit=1, but IsSafeRangeBucket
    26  // is known to never overwrite any key so range is safe.
    27  
    28  type ReadTx interface {
    29  	Lock()
    30  	Unlock()
    31  	RLock()
    32  	RUnlock()
    33  
    34  	UnsafeRange(bucket Bucket, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte)
    35  	UnsafeForEach(bucket Bucket, visitor func(k, v []byte) error) error
    36  }
    37  
    38  // Base type for readTx and concurrentReadTx to eliminate duplicate functions between these
    39  type baseReadTx struct {
    40  	// mu protects accesses to the txReadBuffer
    41  	mu  sync.RWMutex
    42  	buf txReadBuffer
    43  
    44  	// TODO: group and encapsulate {txMu, tx, buckets, txWg}, as they share the same lifecycle.
    45  	// txMu protects accesses to buckets and tx on Range requests.
    46  	txMu    *sync.RWMutex
    47  	tx      *bolt.Tx
    48  	buckets map[BucketID]*bolt.Bucket
    49  	// txWg protects tx from being rolled back at the end of a batch interval until all reads using this tx are done.
    50  	txWg *sync.WaitGroup
    51  }
    52  
    53  func (baseReadTx *baseReadTx) UnsafeForEach(bucket Bucket, visitor func(k, v []byte) error) error {
    54  	dups := make(map[string]struct{})
    55  	getDups := func(k, v []byte) error {
    56  		dups[string(k)] = struct{}{}
    57  		return nil
    58  	}
    59  	visitNoDup := func(k, v []byte) error {
    60  		if _, ok := dups[string(k)]; ok {
    61  			return nil
    62  		}
    63  		return visitor(k, v)
    64  	}
    65  	if err := baseReadTx.buf.ForEach(bucket, getDups); err != nil {
    66  		return err
    67  	}
    68  	baseReadTx.txMu.Lock()
    69  	err := unsafeForEach(baseReadTx.tx, bucket, visitNoDup)
    70  	baseReadTx.txMu.Unlock()
    71  	if err != nil {
    72  		return err
    73  	}
    74  	return baseReadTx.buf.ForEach(bucket, visitor)
    75  }
    76  
    77  func (baseReadTx *baseReadTx) UnsafeRange(bucketType Bucket, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
    78  	if endKey == nil {
    79  		// forbid duplicates for single keys
    80  		limit = 1
    81  	}
    82  	if limit <= 0 {
    83  		limit = math.MaxInt64
    84  	}
    85  	if limit > 1 && !bucketType.IsSafeRangeBucket() {
    86  		panic("do not use unsafeRange on non-keys bucket")
    87  	}
    88  	keys, vals := baseReadTx.buf.Range(bucketType, key, endKey, limit)
    89  	if int64(len(keys)) == limit {
    90  		return keys, vals
    91  	}
    92  
    93  	// find/cache bucket
    94  	bn := bucketType.ID()
    95  	baseReadTx.txMu.RLock()
    96  	bucket, ok := baseReadTx.buckets[bn]
    97  	baseReadTx.txMu.RUnlock()
    98  	lockHeld := false
    99  	if !ok {
   100  		baseReadTx.txMu.Lock()
   101  		lockHeld = true
   102  		bucket = baseReadTx.tx.Bucket(bucketType.Name())
   103  		baseReadTx.buckets[bn] = bucket
   104  	}
   105  
   106  	// ignore missing bucket since may have been created in this batch
   107  	if bucket == nil {
   108  		if lockHeld {
   109  			baseReadTx.txMu.Unlock()
   110  		}
   111  		return keys, vals
   112  	}
   113  	if !lockHeld {
   114  		baseReadTx.txMu.Lock()
   115  	}
   116  	c := bucket.Cursor()
   117  	baseReadTx.txMu.Unlock()
   118  
   119  	k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys)))
   120  	return append(k2, keys...), append(v2, vals...)
   121  }
   122  
   123  type readTx struct {
   124  	baseReadTx
   125  }
   126  
   127  func (rt *readTx) Lock()    { rt.mu.Lock() }
   128  func (rt *readTx) Unlock()  { rt.mu.Unlock() }
   129  func (rt *readTx) RLock()   { rt.mu.RLock() }
   130  func (rt *readTx) RUnlock() { rt.mu.RUnlock() }
   131  
   132  func (rt *readTx) reset() {
   133  	rt.buf.reset()
   134  	rt.buckets = make(map[BucketID]*bolt.Bucket)
   135  	rt.tx = nil
   136  	rt.txWg = new(sync.WaitGroup)
   137  }
   138  
   139  type concurrentReadTx struct {
   140  	baseReadTx
   141  }
   142  
   143  func (rt *concurrentReadTx) Lock()   {}
   144  func (rt *concurrentReadTx) Unlock() {}
   145  
   146  // RLock is no-op. concurrentReadTx does not need to be locked after it is created.
   147  func (rt *concurrentReadTx) RLock() {}
   148  
   149  // RUnlock signals the end of concurrentReadTx.
   150  func (rt *concurrentReadTx) RUnlock() { rt.txWg.Done() }
   151  

View as plain text