1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package mvcc
16
17 import (
18 "hash"
19 "hash/crc32"
20 "sort"
21 "sync"
22
23 "go.etcd.io/etcd/server/v3/mvcc/backend"
24 "go.etcd.io/etcd/server/v3/mvcc/buckets"
25 "go.uber.org/zap"
26 )
27
28 const (
29 hashStorageMaxSize = 10
30 )
31
32 func unsafeHashByRev(tx backend.ReadTx, compactRevision, revision int64, keep map[revision]struct{}) (KeyValueHash, error) {
33 h := newKVHasher(compactRevision, revision, keep)
34 err := tx.UnsafeForEach(buckets.Key, func(k, v []byte) error {
35 h.WriteKeyValue(k, v)
36 return nil
37 })
38 return h.Hash(), err
39 }
40
41 type kvHasher struct {
42 hash hash.Hash32
43 compactRevision int64
44 revision int64
45 keep map[revision]struct{}
46 }
47
48 func newKVHasher(compactRev, rev int64, keep map[revision]struct{}) kvHasher {
49 h := crc32.New(crc32.MakeTable(crc32.Castagnoli))
50 h.Write(buckets.Key.Name())
51 return kvHasher{
52 hash: h,
53 compactRevision: compactRev,
54 revision: rev,
55 keep: keep,
56 }
57 }
58
59 func (h *kvHasher) WriteKeyValue(k, v []byte) {
60 kr := bytesToRev(k)
61 upper := revision{main: h.revision + 1}
62 if !upper.GreaterThan(kr) {
63 return
64 }
65 lower := revision{main: h.compactRevision + 1}
66
67
68 if lower.GreaterThan(kr) && len(h.keep) > 0 {
69 if _, ok := h.keep[kr]; !ok {
70 return
71 }
72 }
73 h.hash.Write(k)
74 h.hash.Write(v)
75 }
76
77 func (h *kvHasher) Hash() KeyValueHash {
78 return KeyValueHash{Hash: h.hash.Sum32(), CompactRevision: h.compactRevision, Revision: h.revision}
79 }
80
81 type KeyValueHash struct {
82 Hash uint32
83 CompactRevision int64
84 Revision int64
85 }
86
87 type HashStorage interface {
88
89 Hash() (hash uint32, revision int64, err error)
90
91
92 HashByRev(rev int64) (hash KeyValueHash, currentRev int64, err error)
93
94
95 Store(valueHash KeyValueHash)
96
97
98 Hashes() []KeyValueHash
99 }
100
101 type hashStorage struct {
102 store *store
103 hashMu sync.RWMutex
104 hashes []KeyValueHash
105 lg *zap.Logger
106 }
107
108 func newHashStorage(lg *zap.Logger, s *store) *hashStorage {
109 return &hashStorage{
110 store: s,
111 lg: lg,
112 }
113 }
114
115 func (s *hashStorage) Hash() (hash uint32, revision int64, err error) {
116 return s.store.hash()
117 }
118
119 func (s *hashStorage) HashByRev(rev int64) (KeyValueHash, int64, error) {
120 s.hashMu.RLock()
121 for _, h := range s.hashes {
122 if rev == h.Revision {
123 s.hashMu.RUnlock()
124
125 s.store.revMu.RLock()
126 currentRev := s.store.currentRev
127 s.store.revMu.RUnlock()
128 return h, currentRev, nil
129 }
130 }
131 s.hashMu.RUnlock()
132
133 return s.store.hashByRev(rev)
134 }
135
136 func (s *hashStorage) Store(hash KeyValueHash) {
137 s.lg.Info("storing new hash",
138 zap.Uint32("hash", hash.Hash),
139 zap.Int64("revision", hash.Revision),
140 zap.Int64("compact-revision", hash.CompactRevision),
141 )
142 s.hashMu.Lock()
143 defer s.hashMu.Unlock()
144 s.hashes = append(s.hashes, hash)
145 sort.Slice(s.hashes, func(i, j int) bool {
146 return s.hashes[i].Revision < s.hashes[j].Revision
147 })
148 if len(s.hashes) > hashStorageMaxSize {
149 s.hashes = s.hashes[len(s.hashes)-hashStorageMaxSize:]
150 }
151 }
152
153 func (s *hashStorage) Hashes() []KeyValueHash {
154 s.hashMu.RLock()
155
156 hashes := make([]KeyValueHash, 0, len(s.hashes))
157 for _, hash := range s.hashes {
158 hashes = append(hashes, hash)
159 }
160 s.hashMu.RUnlock()
161 return hashes
162 }
163
View as plain text