...

Source file src/go.etcd.io/etcd/server/v3/mvcc/kvstore_txn.go

Documentation: go.etcd.io/etcd/server/v3/mvcc

     1  // Copyright 2017 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package mvcc
    16  
    17  import (
    18  	"context"
    19  	"fmt"
    20  
    21  	"go.etcd.io/etcd/api/v3/mvccpb"
    22  	"go.etcd.io/etcd/pkg/v3/traceutil"
    23  	"go.etcd.io/etcd/server/v3/lease"
    24  	"go.etcd.io/etcd/server/v3/mvcc/backend"
    25  	"go.etcd.io/etcd/server/v3/mvcc/buckets"
    26  	"go.uber.org/zap"
    27  )
    28  
    29  type storeTxnRead struct {
    30  	s  *store
    31  	tx backend.ReadTx
    32  
    33  	firstRev int64
    34  	rev      int64
    35  
    36  	trace *traceutil.Trace
    37  }
    38  
    39  func (s *store) Read(mode ReadTxMode, trace *traceutil.Trace) TxnRead {
    40  	s.mu.RLock()
    41  	s.revMu.RLock()
    42  	// For read-only workloads, we use shared buffer by copying transaction read buffer
    43  	// for higher concurrency with ongoing blocking writes.
    44  	// For write/write-read transactions, we use the shared buffer
    45  	// rather than duplicating transaction read buffer to avoid transaction overhead.
    46  	var tx backend.ReadTx
    47  	if mode == ConcurrentReadTxMode {
    48  		tx = s.b.ConcurrentReadTx()
    49  	} else {
    50  		tx = s.b.ReadTx()
    51  	}
    52  
    53  	tx.RLock() // RLock is no-op. concurrentReadTx does not need to be locked after it is created.
    54  	firstRev, rev := s.compactMainRev, s.currentRev
    55  	s.revMu.RUnlock()
    56  	return newMetricsTxnRead(&storeTxnRead{s, tx, firstRev, rev, trace})
    57  }
    58  
    59  func (tr *storeTxnRead) FirstRev() int64 { return tr.firstRev }
    60  func (tr *storeTxnRead) Rev() int64      { return tr.rev }
    61  
    62  func (tr *storeTxnRead) Range(ctx context.Context, key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
    63  	return tr.rangeKeys(ctx, key, end, tr.Rev(), ro)
    64  }
    65  
    66  func (tr *storeTxnRead) End() {
    67  	tr.tx.RUnlock() // RUnlock signals the end of concurrentReadTx.
    68  	tr.s.mu.RUnlock()
    69  }
    70  
    71  type storeTxnWrite struct {
    72  	storeTxnRead
    73  	tx backend.BatchTx
    74  	// beginRev is the revision where the txn begins; it will write to the next revision.
    75  	beginRev int64
    76  	changes  []mvccpb.KeyValue
    77  }
    78  
    79  func (s *store) Write(trace *traceutil.Trace) TxnWrite {
    80  	s.mu.RLock()
    81  	tx := s.b.BatchTx()
    82  	tx.LockInsideApply()
    83  	tw := &storeTxnWrite{
    84  		storeTxnRead: storeTxnRead{s, tx, 0, 0, trace},
    85  		tx:           tx,
    86  		beginRev:     s.currentRev,
    87  		changes:      make([]mvccpb.KeyValue, 0, 4),
    88  	}
    89  	return newMetricsTxnWrite(tw)
    90  }
    91  
    92  func (tw *storeTxnWrite) Rev() int64 { return tw.beginRev }
    93  
    94  func (tw *storeTxnWrite) Range(ctx context.Context, key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
    95  	rev := tw.beginRev
    96  	if len(tw.changes) > 0 {
    97  		rev++
    98  	}
    99  	return tw.rangeKeys(ctx, key, end, rev, ro)
   100  }
   101  
   102  func (tw *storeTxnWrite) DeleteRange(key, end []byte) (int64, int64) {
   103  	if n := tw.deleteRange(key, end); n != 0 || len(tw.changes) > 0 {
   104  		return n, tw.beginRev + 1
   105  	}
   106  	return 0, tw.beginRev
   107  }
   108  
   109  func (tw *storeTxnWrite) Put(key, value []byte, lease lease.LeaseID) int64 {
   110  	tw.put(key, value, lease)
   111  	return tw.beginRev + 1
   112  }
   113  
   114  func (tw *storeTxnWrite) End() {
   115  	// only update index if the txn modifies the mvcc state.
   116  	if len(tw.changes) != 0 {
   117  		// hold revMu lock to prevent new read txns from opening until writeback.
   118  		tw.s.revMu.Lock()
   119  		tw.s.currentRev++
   120  	}
   121  	tw.tx.Unlock()
   122  	if len(tw.changes) != 0 {
   123  		tw.s.revMu.Unlock()
   124  	}
   125  	tw.s.mu.RUnlock()
   126  }
   127  
   128  func (tr *storeTxnRead) rangeKeys(ctx context.Context, key, end []byte, curRev int64, ro RangeOptions) (*RangeResult, error) {
   129  	rev := ro.Rev
   130  	if rev > curRev {
   131  		return &RangeResult{KVs: nil, Count: -1, Rev: curRev}, ErrFutureRev
   132  	}
   133  	if rev <= 0 {
   134  		rev = curRev
   135  	}
   136  	if rev < tr.s.compactMainRev {
   137  		return &RangeResult{KVs: nil, Count: -1, Rev: 0}, ErrCompacted
   138  	}
   139  	if ro.Count {
   140  		total := tr.s.kvindex.CountRevisions(key, end, rev)
   141  		tr.trace.Step("count revisions from in-memory index tree")
   142  		return &RangeResult{KVs: nil, Count: total, Rev: curRev}, nil
   143  	}
   144  	revpairs, total := tr.s.kvindex.Revisions(key, end, rev, int(ro.Limit))
   145  	tr.trace.Step("range keys from in-memory index tree")
   146  	if len(revpairs) == 0 {
   147  		return &RangeResult{KVs: nil, Count: total, Rev: curRev}, nil
   148  	}
   149  
   150  	limit := int(ro.Limit)
   151  	if limit <= 0 || limit > len(revpairs) {
   152  		limit = len(revpairs)
   153  	}
   154  
   155  	kvs := make([]mvccpb.KeyValue, limit)
   156  	revBytes := newRevBytes()
   157  	for i, revpair := range revpairs[:len(kvs)] {
   158  		select {
   159  		case <-ctx.Done():
   160  			return nil, fmt.Errorf("rangeKeys: context cancelled: %w", ctx.Err())
   161  		default:
   162  		}
   163  		revToBytes(revpair, revBytes)
   164  		_, vs := tr.tx.UnsafeRange(buckets.Key, revBytes, nil, 0)
   165  		if len(vs) != 1 {
   166  			tr.s.lg.Fatal(
   167  				"range failed to find revision pair",
   168  				zap.Int64("revision-main", revpair.main),
   169  				zap.Int64("revision-sub", revpair.sub),
   170  				zap.Int64("revision-current", curRev),
   171  				zap.Int64("range-option-rev", ro.Rev),
   172  				zap.Int64("range-option-limit", ro.Limit),
   173  				zap.Binary("key", key),
   174  				zap.Binary("end", end),
   175  				zap.Int("len-revpairs", len(revpairs)),
   176  				zap.Int("len-values", len(vs)),
   177  			)
   178  		}
   179  		if err := kvs[i].Unmarshal(vs[0]); err != nil {
   180  			tr.s.lg.Fatal(
   181  				"failed to unmarshal mvccpb.KeyValue",
   182  				zap.Error(err),
   183  			)
   184  		}
   185  	}
   186  	tr.trace.Step("range keys from bolt db")
   187  	return &RangeResult{KVs: kvs, Count: total, Rev: curRev}, nil
   188  }
   189  
   190  func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
   191  	rev := tw.beginRev + 1
   192  	c := rev
   193  	oldLease := lease.NoLease
   194  
   195  	// if the key exists before, use its previous created and
   196  	// get its previous leaseID
   197  	_, created, ver, err := tw.s.kvindex.Get(key, rev)
   198  	if err == nil {
   199  		c = created.main
   200  		oldLease = tw.s.le.GetLease(lease.LeaseItem{Key: string(key)})
   201  		tw.trace.Step("get key's previous created_revision and leaseID")
   202  	}
   203  	ibytes := newRevBytes()
   204  	idxRev := revision{main: rev, sub: int64(len(tw.changes))}
   205  	revToBytes(idxRev, ibytes)
   206  
   207  	ver = ver + 1
   208  	kv := mvccpb.KeyValue{
   209  		Key:            key,
   210  		Value:          value,
   211  		CreateRevision: c,
   212  		ModRevision:    rev,
   213  		Version:        ver,
   214  		Lease:          int64(leaseID),
   215  	}
   216  
   217  	d, err := kv.Marshal()
   218  	if err != nil {
   219  		tw.storeTxnRead.s.lg.Fatal(
   220  			"failed to marshal mvccpb.KeyValue",
   221  			zap.Error(err),
   222  		)
   223  	}
   224  
   225  	tw.trace.Step("marshal mvccpb.KeyValue")
   226  	tw.tx.UnsafeSeqPut(buckets.Key, ibytes, d)
   227  	tw.s.kvindex.Put(key, idxRev)
   228  	tw.changes = append(tw.changes, kv)
   229  	tw.trace.Step("store kv pair into bolt db")
   230  
   231  	if oldLease != lease.NoLease {
   232  		if tw.s.le == nil {
   233  			panic("no lessor to detach lease")
   234  		}
   235  		err = tw.s.le.Detach(oldLease, []lease.LeaseItem{{Key: string(key)}})
   236  		if err != nil {
   237  			tw.storeTxnRead.s.lg.Error(
   238  				"failed to detach old lease from a key",
   239  				zap.Error(err),
   240  			)
   241  		}
   242  	}
   243  	if leaseID != lease.NoLease {
   244  		if tw.s.le == nil {
   245  			panic("no lessor to attach lease")
   246  		}
   247  		err = tw.s.le.Attach(leaseID, []lease.LeaseItem{{Key: string(key)}})
   248  		if err != nil {
   249  			panic("unexpected error from lease Attach")
   250  		}
   251  	}
   252  	tw.trace.Step("attach lease to kv pair")
   253  }
   254  
   255  func (tw *storeTxnWrite) deleteRange(key, end []byte) int64 {
   256  	rrev := tw.beginRev
   257  	if len(tw.changes) > 0 {
   258  		rrev++
   259  	}
   260  	keys, _ := tw.s.kvindex.Range(key, end, rrev)
   261  	if len(keys) == 0 {
   262  		return 0
   263  	}
   264  	for _, key := range keys {
   265  		tw.delete(key)
   266  	}
   267  	return int64(len(keys))
   268  }
   269  
   270  func (tw *storeTxnWrite) delete(key []byte) {
   271  	ibytes := newRevBytes()
   272  	idxRev := revision{main: tw.beginRev + 1, sub: int64(len(tw.changes))}
   273  	revToBytes(idxRev, ibytes)
   274  
   275  	ibytes = appendMarkTombstone(tw.storeTxnRead.s.lg, ibytes)
   276  
   277  	kv := mvccpb.KeyValue{Key: key}
   278  
   279  	d, err := kv.Marshal()
   280  	if err != nil {
   281  		tw.storeTxnRead.s.lg.Fatal(
   282  			"failed to marshal mvccpb.KeyValue",
   283  			zap.Error(err),
   284  		)
   285  	}
   286  
   287  	tw.tx.UnsafeSeqPut(buckets.Key, ibytes, d)
   288  	err = tw.s.kvindex.Tombstone(key, idxRev)
   289  	if err != nil {
   290  		tw.storeTxnRead.s.lg.Fatal(
   291  			"failed to tombstone an existing key",
   292  			zap.String("key", string(key)),
   293  			zap.Error(err),
   294  		)
   295  	}
   296  	tw.changes = append(tw.changes, kv)
   297  
   298  	item := lease.LeaseItem{Key: string(key)}
   299  	leaseID := tw.s.le.GetLease(item)
   300  
   301  	if leaseID != lease.NoLease {
   302  		err = tw.s.le.Detach(leaseID, []lease.LeaseItem{item})
   303  		if err != nil {
   304  			tw.storeTxnRead.s.lg.Error(
   305  				"failed to detach old lease from a key",
   306  				zap.Error(err),
   307  			)
   308  		}
   309  	}
   310  }
   311  
   312  func (tw *storeTxnWrite) Changes() []mvccpb.KeyValue { return tw.changes }
   313  

View as plain text