1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package mvcc
16
17 import (
18 "context"
19 "fmt"
20
21 "go.etcd.io/etcd/api/v3/mvccpb"
22 "go.etcd.io/etcd/pkg/v3/traceutil"
23 "go.etcd.io/etcd/server/v3/lease"
24 "go.etcd.io/etcd/server/v3/mvcc/backend"
25 "go.etcd.io/etcd/server/v3/mvcc/buckets"
26 "go.uber.org/zap"
27 )
28
29 type storeTxnRead struct {
30 s *store
31 tx backend.ReadTx
32
33 firstRev int64
34 rev int64
35
36 trace *traceutil.Trace
37 }
38
39 func (s *store) Read(mode ReadTxMode, trace *traceutil.Trace) TxnRead {
40 s.mu.RLock()
41 s.revMu.RLock()
42
43
44
45
46 var tx backend.ReadTx
47 if mode == ConcurrentReadTxMode {
48 tx = s.b.ConcurrentReadTx()
49 } else {
50 tx = s.b.ReadTx()
51 }
52
53 tx.RLock()
54 firstRev, rev := s.compactMainRev, s.currentRev
55 s.revMu.RUnlock()
56 return newMetricsTxnRead(&storeTxnRead{s, tx, firstRev, rev, trace})
57 }
58
59 func (tr *storeTxnRead) FirstRev() int64 { return tr.firstRev }
60 func (tr *storeTxnRead) Rev() int64 { return tr.rev }
61
62 func (tr *storeTxnRead) Range(ctx context.Context, key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
63 return tr.rangeKeys(ctx, key, end, tr.Rev(), ro)
64 }
65
66 func (tr *storeTxnRead) End() {
67 tr.tx.RUnlock()
68 tr.s.mu.RUnlock()
69 }
70
71 type storeTxnWrite struct {
72 storeTxnRead
73 tx backend.BatchTx
74
75 beginRev int64
76 changes []mvccpb.KeyValue
77 }
78
79 func (s *store) Write(trace *traceutil.Trace) TxnWrite {
80 s.mu.RLock()
81 tx := s.b.BatchTx()
82 tx.LockInsideApply()
83 tw := &storeTxnWrite{
84 storeTxnRead: storeTxnRead{s, tx, 0, 0, trace},
85 tx: tx,
86 beginRev: s.currentRev,
87 changes: make([]mvccpb.KeyValue, 0, 4),
88 }
89 return newMetricsTxnWrite(tw)
90 }
91
92 func (tw *storeTxnWrite) Rev() int64 { return tw.beginRev }
93
94 func (tw *storeTxnWrite) Range(ctx context.Context, key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
95 rev := tw.beginRev
96 if len(tw.changes) > 0 {
97 rev++
98 }
99 return tw.rangeKeys(ctx, key, end, rev, ro)
100 }
101
102 func (tw *storeTxnWrite) DeleteRange(key, end []byte) (int64, int64) {
103 if n := tw.deleteRange(key, end); n != 0 || len(tw.changes) > 0 {
104 return n, tw.beginRev + 1
105 }
106 return 0, tw.beginRev
107 }
108
109 func (tw *storeTxnWrite) Put(key, value []byte, lease lease.LeaseID) int64 {
110 tw.put(key, value, lease)
111 return tw.beginRev + 1
112 }
113
114 func (tw *storeTxnWrite) End() {
115
116 if len(tw.changes) != 0 {
117
118 tw.s.revMu.Lock()
119 tw.s.currentRev++
120 }
121 tw.tx.Unlock()
122 if len(tw.changes) != 0 {
123 tw.s.revMu.Unlock()
124 }
125 tw.s.mu.RUnlock()
126 }
127
128 func (tr *storeTxnRead) rangeKeys(ctx context.Context, key, end []byte, curRev int64, ro RangeOptions) (*RangeResult, error) {
129 rev := ro.Rev
130 if rev > curRev {
131 return &RangeResult{KVs: nil, Count: -1, Rev: curRev}, ErrFutureRev
132 }
133 if rev <= 0 {
134 rev = curRev
135 }
136 if rev < tr.s.compactMainRev {
137 return &RangeResult{KVs: nil, Count: -1, Rev: 0}, ErrCompacted
138 }
139 if ro.Count {
140 total := tr.s.kvindex.CountRevisions(key, end, rev)
141 tr.trace.Step("count revisions from in-memory index tree")
142 return &RangeResult{KVs: nil, Count: total, Rev: curRev}, nil
143 }
144 revpairs, total := tr.s.kvindex.Revisions(key, end, rev, int(ro.Limit))
145 tr.trace.Step("range keys from in-memory index tree")
146 if len(revpairs) == 0 {
147 return &RangeResult{KVs: nil, Count: total, Rev: curRev}, nil
148 }
149
150 limit := int(ro.Limit)
151 if limit <= 0 || limit > len(revpairs) {
152 limit = len(revpairs)
153 }
154
155 kvs := make([]mvccpb.KeyValue, limit)
156 revBytes := newRevBytes()
157 for i, revpair := range revpairs[:len(kvs)] {
158 select {
159 case <-ctx.Done():
160 return nil, fmt.Errorf("rangeKeys: context cancelled: %w", ctx.Err())
161 default:
162 }
163 revToBytes(revpair, revBytes)
164 _, vs := tr.tx.UnsafeRange(buckets.Key, revBytes, nil, 0)
165 if len(vs) != 1 {
166 tr.s.lg.Fatal(
167 "range failed to find revision pair",
168 zap.Int64("revision-main", revpair.main),
169 zap.Int64("revision-sub", revpair.sub),
170 zap.Int64("revision-current", curRev),
171 zap.Int64("range-option-rev", ro.Rev),
172 zap.Int64("range-option-limit", ro.Limit),
173 zap.Binary("key", key),
174 zap.Binary("end", end),
175 zap.Int("len-revpairs", len(revpairs)),
176 zap.Int("len-values", len(vs)),
177 )
178 }
179 if err := kvs[i].Unmarshal(vs[0]); err != nil {
180 tr.s.lg.Fatal(
181 "failed to unmarshal mvccpb.KeyValue",
182 zap.Error(err),
183 )
184 }
185 }
186 tr.trace.Step("range keys from bolt db")
187 return &RangeResult{KVs: kvs, Count: total, Rev: curRev}, nil
188 }
189
190 func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
191 rev := tw.beginRev + 1
192 c := rev
193 oldLease := lease.NoLease
194
195
196
197 _, created, ver, err := tw.s.kvindex.Get(key, rev)
198 if err == nil {
199 c = created.main
200 oldLease = tw.s.le.GetLease(lease.LeaseItem{Key: string(key)})
201 tw.trace.Step("get key's previous created_revision and leaseID")
202 }
203 ibytes := newRevBytes()
204 idxRev := revision{main: rev, sub: int64(len(tw.changes))}
205 revToBytes(idxRev, ibytes)
206
207 ver = ver + 1
208 kv := mvccpb.KeyValue{
209 Key: key,
210 Value: value,
211 CreateRevision: c,
212 ModRevision: rev,
213 Version: ver,
214 Lease: int64(leaseID),
215 }
216
217 d, err := kv.Marshal()
218 if err != nil {
219 tw.storeTxnRead.s.lg.Fatal(
220 "failed to marshal mvccpb.KeyValue",
221 zap.Error(err),
222 )
223 }
224
225 tw.trace.Step("marshal mvccpb.KeyValue")
226 tw.tx.UnsafeSeqPut(buckets.Key, ibytes, d)
227 tw.s.kvindex.Put(key, idxRev)
228 tw.changes = append(tw.changes, kv)
229 tw.trace.Step("store kv pair into bolt db")
230
231 if oldLease != lease.NoLease {
232 if tw.s.le == nil {
233 panic("no lessor to detach lease")
234 }
235 err = tw.s.le.Detach(oldLease, []lease.LeaseItem{{Key: string(key)}})
236 if err != nil {
237 tw.storeTxnRead.s.lg.Error(
238 "failed to detach old lease from a key",
239 zap.Error(err),
240 )
241 }
242 }
243 if leaseID != lease.NoLease {
244 if tw.s.le == nil {
245 panic("no lessor to attach lease")
246 }
247 err = tw.s.le.Attach(leaseID, []lease.LeaseItem{{Key: string(key)}})
248 if err != nil {
249 panic("unexpected error from lease Attach")
250 }
251 }
252 tw.trace.Step("attach lease to kv pair")
253 }
254
255 func (tw *storeTxnWrite) deleteRange(key, end []byte) int64 {
256 rrev := tw.beginRev
257 if len(tw.changes) > 0 {
258 rrev++
259 }
260 keys, _ := tw.s.kvindex.Range(key, end, rrev)
261 if len(keys) == 0 {
262 return 0
263 }
264 for _, key := range keys {
265 tw.delete(key)
266 }
267 return int64(len(keys))
268 }
269
270 func (tw *storeTxnWrite) delete(key []byte) {
271 ibytes := newRevBytes()
272 idxRev := revision{main: tw.beginRev + 1, sub: int64(len(tw.changes))}
273 revToBytes(idxRev, ibytes)
274
275 ibytes = appendMarkTombstone(tw.storeTxnRead.s.lg, ibytes)
276
277 kv := mvccpb.KeyValue{Key: key}
278
279 d, err := kv.Marshal()
280 if err != nil {
281 tw.storeTxnRead.s.lg.Fatal(
282 "failed to marshal mvccpb.KeyValue",
283 zap.Error(err),
284 )
285 }
286
287 tw.tx.UnsafeSeqPut(buckets.Key, ibytes, d)
288 err = tw.s.kvindex.Tombstone(key, idxRev)
289 if err != nil {
290 tw.storeTxnRead.s.lg.Fatal(
291 "failed to tombstone an existing key",
292 zap.String("key", string(key)),
293 zap.Error(err),
294 )
295 }
296 tw.changes = append(tw.changes, kv)
297
298 item := lease.LeaseItem{Key: string(key)}
299 leaseID := tw.s.le.GetLease(item)
300
301 if leaseID != lease.NoLease {
302 err = tw.s.le.Detach(leaseID, []lease.LeaseItem{item})
303 if err != nil {
304 tw.storeTxnRead.s.lg.Error(
305 "failed to detach old lease from a key",
306 zap.Error(err),
307 )
308 }
309 }
310 }
311
312 func (tw *storeTxnWrite) Changes() []mvccpb.KeyValue { return tw.changes }
313
View as plain text