...

Source file src/go.etcd.io/etcd/server/v3/mvcc/kvstore.go

Documentation: go.etcd.io/etcd/server/v3/mvcc

     1  // Copyright 2015 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package mvcc
    16  
    17  import (
    18  	"context"
    19  	"errors"
    20  	"fmt"
    21  	"math"
    22  	"sync"
    23  	"time"
    24  
    25  	"go.etcd.io/etcd/api/v3/mvccpb"
    26  	"go.etcd.io/etcd/pkg/v3/schedule"
    27  	"go.etcd.io/etcd/pkg/v3/traceutil"
    28  	"go.etcd.io/etcd/server/v3/lease"
    29  	"go.etcd.io/etcd/server/v3/mvcc/backend"
    30  	"go.etcd.io/etcd/server/v3/mvcc/buckets"
    31  
    32  	"go.uber.org/zap"
    33  )
    34  
    35  var (
    36  	scheduledCompactKeyName = []byte("scheduledCompactRev")
    37  	finishedCompactKeyName  = []byte("finishedCompactRev")
    38  
    39  	ErrCompacted = errors.New("mvcc: required revision has been compacted")
    40  	ErrFutureRev = errors.New("mvcc: required revision is a future revision")
    41  )
    42  
    43  const (
    44  	// markedRevBytesLen is the byte length of marked revision.
    45  	// The first `revBytesLen` bytes represents a normal revision. The last
    46  	// one byte is the mark.
    47  	markedRevBytesLen      = revBytesLen + 1
    48  	markBytePosition       = markedRevBytesLen - 1
    49  	markTombstone     byte = 't'
    50  )
    51  
    52  var restoreChunkKeys = 10000 // non-const for testing
    53  var defaultCompactBatchLimit = 1000
    54  
    55  type StoreConfig struct {
    56  	CompactionBatchLimit int
    57  }
    58  
    59  type store struct {
    60  	ReadView
    61  	WriteView
    62  
    63  	cfg StoreConfig
    64  
    65  	// mu read locks for txns and write locks for non-txn store changes.
    66  	mu sync.RWMutex
    67  
    68  	b       backend.Backend
    69  	kvindex index
    70  
    71  	le lease.Lessor
    72  
    73  	// revMuLock protects currentRev and compactMainRev.
    74  	// Locked at end of write txn and released after write txn unlock lock.
    75  	// Locked before locking read txn and released after locking.
    76  	revMu sync.RWMutex
    77  	// currentRev is the revision of the last completed transaction.
    78  	currentRev int64
    79  	// compactMainRev is the main revision of the last compaction.
    80  	compactMainRev int64
    81  
    82  	fifoSched schedule.Scheduler
    83  
    84  	stopc chan struct{}
    85  
    86  	lg     *zap.Logger
    87  	hashes HashStorage
    88  }
    89  
    90  // NewStore returns a new store. It is useful to create a store inside
    91  // mvcc pkg. It should only be used for testing externally.
    92  func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) *store {
    93  	if lg == nil {
    94  		lg = zap.NewNop()
    95  	}
    96  	if cfg.CompactionBatchLimit == 0 {
    97  		cfg.CompactionBatchLimit = defaultCompactBatchLimit
    98  	}
    99  	s := &store{
   100  		cfg:     cfg,
   101  		b:       b,
   102  		kvindex: newTreeIndex(lg),
   103  
   104  		le: le,
   105  
   106  		currentRev:     1,
   107  		compactMainRev: -1,
   108  
   109  		fifoSched: schedule.NewFIFOScheduler(),
   110  
   111  		stopc: make(chan struct{}),
   112  
   113  		lg: lg,
   114  	}
   115  	s.hashes = newHashStorage(lg, s)
   116  	s.ReadView = &readView{s}
   117  	s.WriteView = &writeView{s}
   118  	if s.le != nil {
   119  		s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write(traceutil.TODO()) })
   120  	}
   121  
   122  	tx := s.b.BatchTx()
   123  	tx.LockOutsideApply()
   124  	tx.UnsafeCreateBucket(buckets.Key)
   125  	tx.UnsafeCreateBucket(buckets.Meta)
   126  	tx.Unlock()
   127  	s.b.ForceCommit()
   128  
   129  	s.mu.Lock()
   130  	defer s.mu.Unlock()
   131  	if err := s.restore(); err != nil {
   132  		// TODO: return the error instead of panic here?
   133  		panic("failed to recover store from backend")
   134  	}
   135  
   136  	return s
   137  }
   138  
   139  func (s *store) compactBarrier(ctx context.Context, ch chan struct{}) {
   140  	if ctx == nil || ctx.Err() != nil {
   141  		select {
   142  		case <-s.stopc:
   143  		default:
   144  			// fix deadlock in mvcc,for more information, please refer to pr 11817.
   145  			// s.stopc is only updated in restore operation, which is called by apply
   146  			// snapshot call, compaction and apply snapshot requests are serialized by
   147  			// raft, and do not happen at the same time.
   148  			s.mu.Lock()
   149  			f := func(ctx context.Context) { s.compactBarrier(ctx, ch) }
   150  			s.fifoSched.Schedule(f)
   151  			s.mu.Unlock()
   152  		}
   153  		return
   154  	}
   155  	close(ch)
   156  }
   157  
   158  func (s *store) hash() (hash uint32, revision int64, err error) {
   159  	// TODO: hash and revision could be inconsistent, one possible fix is to add s.revMu.RLock() at the beginning of function, which is costly
   160  	start := time.Now()
   161  
   162  	s.b.ForceCommit()
   163  	h, err := s.b.Hash(buckets.DefaultIgnores)
   164  
   165  	hashSec.Observe(time.Since(start).Seconds())
   166  	return h, s.currentRev, err
   167  }
   168  
   169  func (s *store) hashByRev(rev int64) (hash KeyValueHash, currentRev int64, err error) {
   170  	var compactRev int64
   171  	start := time.Now()
   172  
   173  	s.mu.RLock()
   174  	s.revMu.RLock()
   175  	compactRev, currentRev = s.compactMainRev, s.currentRev
   176  	s.revMu.RUnlock()
   177  
   178  	if rev > 0 && rev < compactRev {
   179  		s.mu.RUnlock()
   180  		return KeyValueHash{}, 0, ErrCompacted
   181  	} else if rev > 0 && rev > currentRev {
   182  		s.mu.RUnlock()
   183  		return KeyValueHash{}, currentRev, ErrFutureRev
   184  	}
   185  	if rev == 0 {
   186  		rev = currentRev
   187  	}
   188  	keep := s.kvindex.Keep(rev)
   189  
   190  	tx := s.b.ReadTx()
   191  	tx.RLock()
   192  	defer tx.RUnlock()
   193  	s.mu.RUnlock()
   194  	hash, err = unsafeHashByRev(tx, compactRev, rev, keep)
   195  	hashRevSec.Observe(time.Since(start).Seconds())
   196  	return hash, currentRev, err
   197  }
   198  
   199  func (s *store) updateCompactRev(rev int64) (<-chan struct{}, int64, error) {
   200  	s.revMu.Lock()
   201  	if rev <= s.compactMainRev {
   202  		ch := make(chan struct{})
   203  		f := func(ctx context.Context) { s.compactBarrier(ctx, ch) }
   204  		s.fifoSched.Schedule(f)
   205  		s.revMu.Unlock()
   206  		return ch, 0, ErrCompacted
   207  	}
   208  	if rev > s.currentRev {
   209  		s.revMu.Unlock()
   210  		return nil, 0, ErrFutureRev
   211  	}
   212  	compactMainRev := s.compactMainRev
   213  	s.compactMainRev = rev
   214  
   215  	rbytes := newRevBytes()
   216  	revToBytes(revision{main: rev}, rbytes)
   217  
   218  	tx := s.b.BatchTx()
   219  	tx.LockInsideApply()
   220  	tx.UnsafePut(buckets.Meta, scheduledCompactKeyName, rbytes)
   221  	tx.Unlock()
   222  	// ensure that desired compaction is persisted
   223  	s.b.ForceCommit()
   224  
   225  	s.revMu.Unlock()
   226  
   227  	return nil, compactMainRev, nil
   228  }
   229  
   230  // checkPrevCompactionCompleted checks whether the previous scheduled compaction is completed.
   231  func (s *store) checkPrevCompactionCompleted() bool {
   232  	tx := s.b.ReadTx()
   233  	tx.Lock()
   234  	defer tx.Unlock()
   235  	scheduledCompact, scheduledCompactFound := UnsafeReadScheduledCompact(tx)
   236  	finishedCompact, finishedCompactFound := UnsafeReadFinishedCompact(tx)
   237  	return scheduledCompact == finishedCompact && scheduledCompactFound == finishedCompactFound
   238  }
   239  
   240  func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64, prevCompactionCompleted bool) (<-chan struct{}, error) {
   241  	ch := make(chan struct{})
   242  	var j = func(ctx context.Context) {
   243  		if ctx.Err() != nil {
   244  			s.compactBarrier(ctx, ch)
   245  			return
   246  		}
   247  		hash, err := s.scheduleCompaction(rev, prevCompactRev)
   248  		if err != nil {
   249  			s.lg.Warn("Failed compaction", zap.Error(err))
   250  			s.compactBarrier(context.TODO(), ch)
   251  			return
   252  		}
   253  		// Only store the hash value if the previous hash is completed, i.e. this compaction
   254  		// hashes every revision from last compaction. For more details, see #15919.
   255  		if prevCompactionCompleted {
   256  			s.hashes.Store(hash)
   257  		} else {
   258  			s.lg.Info("previous compaction was interrupted, skip storing compaction hash value")
   259  		}
   260  		close(ch)
   261  	}
   262  
   263  	s.fifoSched.Schedule(j)
   264  	trace.Step("schedule compaction")
   265  	return ch, nil
   266  }
   267  
   268  func (s *store) compactLockfree(rev int64) (<-chan struct{}, error) {
   269  	prevCompactionCompleted := s.checkPrevCompactionCompleted()
   270  	ch, prevCompactRev, err := s.updateCompactRev(rev)
   271  	if err != nil {
   272  		return ch, err
   273  	}
   274  
   275  	return s.compact(traceutil.TODO(), rev, prevCompactRev, prevCompactionCompleted)
   276  }
   277  
   278  func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) {
   279  	s.mu.Lock()
   280  
   281  	prevCompactionCompleted := s.checkPrevCompactionCompleted()
   282  	ch, prevCompactRev, err := s.updateCompactRev(rev)
   283  	trace.Step("check and update compact revision")
   284  	if err != nil {
   285  		s.mu.Unlock()
   286  		return ch, err
   287  	}
   288  	s.mu.Unlock()
   289  
   290  	return s.compact(trace, rev, prevCompactRev, prevCompactionCompleted)
   291  }
   292  
   293  func (s *store) Commit() {
   294  	s.mu.Lock()
   295  	defer s.mu.Unlock()
   296  	s.b.ForceCommit()
   297  }
   298  
   299  func (s *store) Restore(b backend.Backend) error {
   300  	s.mu.Lock()
   301  	defer s.mu.Unlock()
   302  
   303  	close(s.stopc)
   304  	s.fifoSched.Stop()
   305  
   306  	s.b = b
   307  	s.kvindex = newTreeIndex(s.lg)
   308  
   309  	{
   310  		// During restore the metrics might report 'special' values
   311  		s.revMu.Lock()
   312  		s.currentRev = 1
   313  		s.compactMainRev = -1
   314  		s.revMu.Unlock()
   315  	}
   316  
   317  	s.fifoSched = schedule.NewFIFOScheduler()
   318  	s.stopc = make(chan struct{})
   319  
   320  	return s.restore()
   321  }
   322  
   323  func (s *store) restore() error {
   324  	s.setupMetricsReporter()
   325  
   326  	min, max := newRevBytes(), newRevBytes()
   327  	revToBytes(revision{main: 1}, min)
   328  	revToBytes(revision{main: math.MaxInt64, sub: math.MaxInt64}, max)
   329  
   330  	keyToLease := make(map[string]lease.LeaseID)
   331  
   332  	// restore index
   333  	tx := s.b.ReadTx()
   334  	tx.Lock()
   335  
   336  	finishedCompact, found := UnsafeReadFinishedCompact(tx)
   337  	if found {
   338  		s.revMu.Lock()
   339  		s.compactMainRev = finishedCompact
   340  
   341  		s.lg.Info(
   342  			"restored last compact revision",
   343  			zap.Stringer("meta-bucket-name", buckets.Meta),
   344  			zap.String("meta-bucket-name-key", string(finishedCompactKeyName)),
   345  			zap.Int64("restored-compact-revision", s.compactMainRev),
   346  		)
   347  		s.revMu.Unlock()
   348  	}
   349  	scheduledCompact, _ := UnsafeReadScheduledCompact(tx)
   350  
   351  	// index keys concurrently as they're loaded in from tx
   352  	keysGauge.Set(0)
   353  	rkvc, revc := restoreIntoIndex(s.lg, s.kvindex)
   354  	for {
   355  		keys, vals := tx.UnsafeRange(buckets.Key, min, max, int64(restoreChunkKeys))
   356  		if len(keys) == 0 {
   357  			break
   358  		}
   359  		// rkvc blocks if the total pending keys exceeds the restore
   360  		// chunk size to keep keys from consuming too much memory.
   361  		restoreChunk(s.lg, rkvc, keys, vals, keyToLease)
   362  		if len(keys) < restoreChunkKeys {
   363  			// partial set implies final set
   364  			break
   365  		}
   366  		// next set begins after where this one ended
   367  		newMin := bytesToRev(keys[len(keys)-1][:revBytesLen])
   368  		newMin.sub++
   369  		revToBytes(newMin, min)
   370  	}
   371  	close(rkvc)
   372  
   373  	{
   374  		s.revMu.Lock()
   375  		s.currentRev = <-revc
   376  
   377  		// keys in the range [compacted revision -N, compaction] might all be deleted due to compaction.
   378  		// the correct revision should be set to compaction revision in the case, not the largest revision
   379  		// we have seen.
   380  		if s.currentRev < s.compactMainRev {
   381  			s.currentRev = s.compactMainRev
   382  		}
   383  		s.revMu.Unlock()
   384  	}
   385  
   386  	if scheduledCompact <= s.compactMainRev {
   387  		scheduledCompact = 0
   388  	}
   389  
   390  	for key, lid := range keyToLease {
   391  		if s.le == nil {
   392  			tx.Unlock()
   393  			panic("no lessor to attach lease")
   394  		}
   395  		err := s.le.Attach(lid, []lease.LeaseItem{{Key: key}})
   396  		if err != nil {
   397  			s.lg.Error(
   398  				"failed to attach a lease",
   399  				zap.String("lease-id", fmt.Sprintf("%016x", lid)),
   400  				zap.Error(err),
   401  			)
   402  		}
   403  	}
   404  
   405  	tx.Unlock()
   406  
   407  	s.lg.Info("kvstore restored", zap.Int64("current-rev", s.currentRev))
   408  
   409  	if scheduledCompact != 0 {
   410  		if _, err := s.compactLockfree(scheduledCompact); err != nil {
   411  			s.lg.Warn("compaction encountered error", zap.Error(err))
   412  		}
   413  
   414  		s.lg.Info(
   415  			"resume scheduled compaction",
   416  			zap.Stringer("meta-bucket-name", buckets.Meta),
   417  			zap.String("meta-bucket-name-key", string(scheduledCompactKeyName)),
   418  			zap.Int64("scheduled-compact-revision", scheduledCompact),
   419  		)
   420  	}
   421  
   422  	return nil
   423  }
   424  
   425  type revKeyValue struct {
   426  	key  []byte
   427  	kv   mvccpb.KeyValue
   428  	kstr string
   429  }
   430  
   431  func restoreIntoIndex(lg *zap.Logger, idx index) (chan<- revKeyValue, <-chan int64) {
   432  	rkvc, revc := make(chan revKeyValue, restoreChunkKeys), make(chan int64, 1)
   433  	go func() {
   434  		currentRev := int64(1)
   435  		defer func() { revc <- currentRev }()
   436  		// restore the tree index from streaming the unordered index.
   437  		kiCache := make(map[string]*keyIndex, restoreChunkKeys)
   438  		for rkv := range rkvc {
   439  			ki, ok := kiCache[rkv.kstr]
   440  			// purge kiCache if many keys but still missing in the cache
   441  			if !ok && len(kiCache) >= restoreChunkKeys {
   442  				i := 10
   443  				for k := range kiCache {
   444  					delete(kiCache, k)
   445  					if i--; i == 0 {
   446  						break
   447  					}
   448  				}
   449  			}
   450  			// cache miss, fetch from tree index if there
   451  			if !ok {
   452  				ki = &keyIndex{key: rkv.kv.Key}
   453  				if idxKey := idx.KeyIndex(ki); idxKey != nil {
   454  					kiCache[rkv.kstr], ki = idxKey, idxKey
   455  					ok = true
   456  				}
   457  			}
   458  			rev := bytesToRev(rkv.key)
   459  			currentRev = rev.main
   460  			if ok {
   461  				if isTombstone(rkv.key) {
   462  					if err := ki.tombstone(lg, rev.main, rev.sub); err != nil {
   463  						lg.Warn("tombstone encountered error", zap.Error(err))
   464  					}
   465  					continue
   466  				}
   467  				ki.put(lg, rev.main, rev.sub)
   468  			} else if !isTombstone(rkv.key) {
   469  				ki.restore(lg, revision{rkv.kv.CreateRevision, 0}, rev, rkv.kv.Version)
   470  				idx.Insert(ki)
   471  				kiCache[rkv.kstr] = ki
   472  			}
   473  		}
   474  	}()
   475  	return rkvc, revc
   476  }
   477  
   478  func restoreChunk(lg *zap.Logger, kvc chan<- revKeyValue, keys, vals [][]byte, keyToLease map[string]lease.LeaseID) {
   479  	for i, key := range keys {
   480  		rkv := revKeyValue{key: key}
   481  		if err := rkv.kv.Unmarshal(vals[i]); err != nil {
   482  			lg.Fatal("failed to unmarshal mvccpb.KeyValue", zap.Error(err))
   483  		}
   484  		rkv.kstr = string(rkv.kv.Key)
   485  		if isTombstone(key) {
   486  			delete(keyToLease, rkv.kstr)
   487  		} else if lid := lease.LeaseID(rkv.kv.Lease); lid != lease.NoLease {
   488  			keyToLease[rkv.kstr] = lid
   489  		} else {
   490  			delete(keyToLease, rkv.kstr)
   491  		}
   492  		kvc <- rkv
   493  	}
   494  }
   495  
   496  func (s *store) Close() error {
   497  	close(s.stopc)
   498  	s.fifoSched.Stop()
   499  	return nil
   500  }
   501  
   502  func (s *store) setupMetricsReporter() {
   503  	b := s.b
   504  	reportDbTotalSizeInBytesMu.Lock()
   505  	reportDbTotalSizeInBytes = func() float64 { return float64(b.Size()) }
   506  	reportDbTotalSizeInBytesMu.Unlock()
   507  	reportDbTotalSizeInBytesDebugMu.Lock()
   508  	reportDbTotalSizeInBytesDebug = func() float64 { return float64(b.Size()) }
   509  	reportDbTotalSizeInBytesDebugMu.Unlock()
   510  	reportDbTotalSizeInUseInBytesMu.Lock()
   511  	reportDbTotalSizeInUseInBytes = func() float64 { return float64(b.SizeInUse()) }
   512  	reportDbTotalSizeInUseInBytesMu.Unlock()
   513  	reportDbOpenReadTxNMu.Lock()
   514  	reportDbOpenReadTxN = func() float64 { return float64(b.OpenReadTxN()) }
   515  	reportDbOpenReadTxNMu.Unlock()
   516  	reportCurrentRevMu.Lock()
   517  	reportCurrentRev = func() float64 {
   518  		s.revMu.RLock()
   519  		defer s.revMu.RUnlock()
   520  		return float64(s.currentRev)
   521  	}
   522  	reportCurrentRevMu.Unlock()
   523  	reportCompactRevMu.Lock()
   524  	reportCompactRev = func() float64 {
   525  		s.revMu.RLock()
   526  		defer s.revMu.RUnlock()
   527  		return float64(s.compactMainRev)
   528  	}
   529  	reportCompactRevMu.Unlock()
   530  }
   531  
   532  // appendMarkTombstone appends tombstone mark to normal revision bytes.
   533  func appendMarkTombstone(lg *zap.Logger, b []byte) []byte {
   534  	if len(b) != revBytesLen {
   535  		lg.Panic(
   536  			"cannot append tombstone mark to non-normal revision bytes",
   537  			zap.Int("expected-revision-bytes-size", revBytesLen),
   538  			zap.Int("given-revision-bytes-size", len(b)),
   539  		)
   540  	}
   541  	return append(b, markTombstone)
   542  }
   543  
   544  // isTombstone checks whether the revision bytes is a tombstone.
   545  func isTombstone(b []byte) bool {
   546  	return len(b) == markedRevBytesLen && b[markBytePosition] == markTombstone
   547  }
   548  
   549  func (s *store) HashStorage() HashStorage {
   550  	return s.hashes
   551  }
   552  

View as plain text