...

Source file src/go.etcd.io/etcd/server/v3/mvcc/hash.go

Documentation: go.etcd.io/etcd/server/v3/mvcc

     1  // Copyright 2022 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package mvcc
    16  
    17  import (
    18  	"hash"
    19  	"hash/crc32"
    20  	"sort"
    21  	"sync"
    22  
    23  	"go.etcd.io/etcd/server/v3/mvcc/backend"
    24  	"go.etcd.io/etcd/server/v3/mvcc/buckets"
    25  	"go.uber.org/zap"
    26  )
    27  
    28  const (
    29  	hashStorageMaxSize = 10
    30  )
    31  
    32  func unsafeHashByRev(tx backend.ReadTx, compactRevision, revision int64, keep map[revision]struct{}) (KeyValueHash, error) {
    33  	h := newKVHasher(compactRevision, revision, keep)
    34  	err := tx.UnsafeForEach(buckets.Key, func(k, v []byte) error {
    35  		h.WriteKeyValue(k, v)
    36  		return nil
    37  	})
    38  	return h.Hash(), err
    39  }
    40  
    41  type kvHasher struct {
    42  	hash            hash.Hash32
    43  	compactRevision int64
    44  	revision        int64
    45  	keep            map[revision]struct{}
    46  }
    47  
    48  func newKVHasher(compactRev, rev int64, keep map[revision]struct{}) kvHasher {
    49  	h := crc32.New(crc32.MakeTable(crc32.Castagnoli))
    50  	h.Write(buckets.Key.Name())
    51  	return kvHasher{
    52  		hash:            h,
    53  		compactRevision: compactRev,
    54  		revision:        rev,
    55  		keep:            keep,
    56  	}
    57  }
    58  
    59  func (h *kvHasher) WriteKeyValue(k, v []byte) {
    60  	kr := bytesToRev(k)
    61  	upper := revision{main: h.revision + 1}
    62  	if !upper.GreaterThan(kr) {
    63  		return
    64  	}
    65  	lower := revision{main: h.compactRevision + 1}
    66  	// skip revisions that are scheduled for deletion
    67  	// due to compacting; don't skip if there isn't one.
    68  	if lower.GreaterThan(kr) && len(h.keep) > 0 {
    69  		if _, ok := h.keep[kr]; !ok {
    70  			return
    71  		}
    72  	}
    73  	h.hash.Write(k)
    74  	h.hash.Write(v)
    75  }
    76  
    77  func (h *kvHasher) Hash() KeyValueHash {
    78  	return KeyValueHash{Hash: h.hash.Sum32(), CompactRevision: h.compactRevision, Revision: h.revision}
    79  }
    80  
    81  type KeyValueHash struct {
    82  	Hash            uint32
    83  	CompactRevision int64
    84  	Revision        int64
    85  }
    86  
    87  type HashStorage interface {
    88  	// Hash computes the hash of the KV's backend.
    89  	Hash() (hash uint32, revision int64, err error)
    90  
    91  	// HashByRev computes the hash of all MVCC revisions up to a given revision.
    92  	HashByRev(rev int64) (hash KeyValueHash, currentRev int64, err error)
    93  
    94  	// Store adds hash value in local cache, allowing it can be returned by HashByRev.
    95  	Store(valueHash KeyValueHash)
    96  
    97  	// Hashes returns list of up to `hashStorageMaxSize` newest previously stored hashes.
    98  	Hashes() []KeyValueHash
    99  }
   100  
   101  type hashStorage struct {
   102  	store  *store
   103  	hashMu sync.RWMutex
   104  	hashes []KeyValueHash
   105  	lg     *zap.Logger
   106  }
   107  
   108  func newHashStorage(lg *zap.Logger, s *store) *hashStorage {
   109  	return &hashStorage{
   110  		store: s,
   111  		lg:    lg,
   112  	}
   113  }
   114  
   115  func (s *hashStorage) Hash() (hash uint32, revision int64, err error) {
   116  	return s.store.hash()
   117  }
   118  
   119  func (s *hashStorage) HashByRev(rev int64) (KeyValueHash, int64, error) {
   120  	s.hashMu.RLock()
   121  	for _, h := range s.hashes {
   122  		if rev == h.Revision {
   123  			s.hashMu.RUnlock()
   124  
   125  			s.store.revMu.RLock()
   126  			currentRev := s.store.currentRev
   127  			s.store.revMu.RUnlock()
   128  			return h, currentRev, nil
   129  		}
   130  	}
   131  	s.hashMu.RUnlock()
   132  
   133  	return s.store.hashByRev(rev)
   134  }
   135  
   136  func (s *hashStorage) Store(hash KeyValueHash) {
   137  	s.lg.Info("storing new hash",
   138  		zap.Uint32("hash", hash.Hash),
   139  		zap.Int64("revision", hash.Revision),
   140  		zap.Int64("compact-revision", hash.CompactRevision),
   141  	)
   142  	s.hashMu.Lock()
   143  	defer s.hashMu.Unlock()
   144  	s.hashes = append(s.hashes, hash)
   145  	sort.Slice(s.hashes, func(i, j int) bool {
   146  		return s.hashes[i].Revision < s.hashes[j].Revision
   147  	})
   148  	if len(s.hashes) > hashStorageMaxSize {
   149  		s.hashes = s.hashes[len(s.hashes)-hashStorageMaxSize:]
   150  	}
   151  }
   152  
   153  func (s *hashStorage) Hashes() []KeyValueHash {
   154  	s.hashMu.RLock()
   155  	// Copy out hashes under lock just to be safe
   156  	hashes := make([]KeyValueHash, 0, len(s.hashes))
   157  	for _, hash := range s.hashes {
   158  		hashes = append(hashes, hash)
   159  	}
   160  	s.hashMu.RUnlock()
   161  	return hashes
   162  }
   163  

View as plain text