1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package mvcc
16
17 import (
18 "context"
19 "errors"
20 "fmt"
21 "math"
22 "sync"
23 "time"
24
25 "go.etcd.io/etcd/api/v3/mvccpb"
26 "go.etcd.io/etcd/pkg/v3/schedule"
27 "go.etcd.io/etcd/pkg/v3/traceutil"
28 "go.etcd.io/etcd/server/v3/lease"
29 "go.etcd.io/etcd/server/v3/mvcc/backend"
30 "go.etcd.io/etcd/server/v3/mvcc/buckets"
31
32 "go.uber.org/zap"
33 )
34
35 var (
36 scheduledCompactKeyName = []byte("scheduledCompactRev")
37 finishedCompactKeyName = []byte("finishedCompactRev")
38
39 ErrCompacted = errors.New("mvcc: required revision has been compacted")
40 ErrFutureRev = errors.New("mvcc: required revision is a future revision")
41 )
42
43 const (
44
45
46
47 markedRevBytesLen = revBytesLen + 1
48 markBytePosition = markedRevBytesLen - 1
49 markTombstone byte = 't'
50 )
51
52 var restoreChunkKeys = 10000
53 var defaultCompactBatchLimit = 1000
54
55 type StoreConfig struct {
56 CompactionBatchLimit int
57 }
58
59 type store struct {
60 ReadView
61 WriteView
62
63 cfg StoreConfig
64
65
66 mu sync.RWMutex
67
68 b backend.Backend
69 kvindex index
70
71 le lease.Lessor
72
73
74
75
76 revMu sync.RWMutex
77
78 currentRev int64
79
80 compactMainRev int64
81
82 fifoSched schedule.Scheduler
83
84 stopc chan struct{}
85
86 lg *zap.Logger
87 hashes HashStorage
88 }
89
90
91
92 func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) *store {
93 if lg == nil {
94 lg = zap.NewNop()
95 }
96 if cfg.CompactionBatchLimit == 0 {
97 cfg.CompactionBatchLimit = defaultCompactBatchLimit
98 }
99 s := &store{
100 cfg: cfg,
101 b: b,
102 kvindex: newTreeIndex(lg),
103
104 le: le,
105
106 currentRev: 1,
107 compactMainRev: -1,
108
109 fifoSched: schedule.NewFIFOScheduler(),
110
111 stopc: make(chan struct{}),
112
113 lg: lg,
114 }
115 s.hashes = newHashStorage(lg, s)
116 s.ReadView = &readView{s}
117 s.WriteView = &writeView{s}
118 if s.le != nil {
119 s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write(traceutil.TODO()) })
120 }
121
122 tx := s.b.BatchTx()
123 tx.LockOutsideApply()
124 tx.UnsafeCreateBucket(buckets.Key)
125 tx.UnsafeCreateBucket(buckets.Meta)
126 tx.Unlock()
127 s.b.ForceCommit()
128
129 s.mu.Lock()
130 defer s.mu.Unlock()
131 if err := s.restore(); err != nil {
132
133 panic("failed to recover store from backend")
134 }
135
136 return s
137 }
138
139 func (s *store) compactBarrier(ctx context.Context, ch chan struct{}) {
140 if ctx == nil || ctx.Err() != nil {
141 select {
142 case <-s.stopc:
143 default:
144
145
146
147
148 s.mu.Lock()
149 f := func(ctx context.Context) { s.compactBarrier(ctx, ch) }
150 s.fifoSched.Schedule(f)
151 s.mu.Unlock()
152 }
153 return
154 }
155 close(ch)
156 }
157
158 func (s *store) hash() (hash uint32, revision int64, err error) {
159
160 start := time.Now()
161
162 s.b.ForceCommit()
163 h, err := s.b.Hash(buckets.DefaultIgnores)
164
165 hashSec.Observe(time.Since(start).Seconds())
166 return h, s.currentRev, err
167 }
168
169 func (s *store) hashByRev(rev int64) (hash KeyValueHash, currentRev int64, err error) {
170 var compactRev int64
171 start := time.Now()
172
173 s.mu.RLock()
174 s.revMu.RLock()
175 compactRev, currentRev = s.compactMainRev, s.currentRev
176 s.revMu.RUnlock()
177
178 if rev > 0 && rev < compactRev {
179 s.mu.RUnlock()
180 return KeyValueHash{}, 0, ErrCompacted
181 } else if rev > 0 && rev > currentRev {
182 s.mu.RUnlock()
183 return KeyValueHash{}, currentRev, ErrFutureRev
184 }
185 if rev == 0 {
186 rev = currentRev
187 }
188 keep := s.kvindex.Keep(rev)
189
190 tx := s.b.ReadTx()
191 tx.RLock()
192 defer tx.RUnlock()
193 s.mu.RUnlock()
194 hash, err = unsafeHashByRev(tx, compactRev, rev, keep)
195 hashRevSec.Observe(time.Since(start).Seconds())
196 return hash, currentRev, err
197 }
198
199 func (s *store) updateCompactRev(rev int64) (<-chan struct{}, int64, error) {
200 s.revMu.Lock()
201 if rev <= s.compactMainRev {
202 ch := make(chan struct{})
203 f := func(ctx context.Context) { s.compactBarrier(ctx, ch) }
204 s.fifoSched.Schedule(f)
205 s.revMu.Unlock()
206 return ch, 0, ErrCompacted
207 }
208 if rev > s.currentRev {
209 s.revMu.Unlock()
210 return nil, 0, ErrFutureRev
211 }
212 compactMainRev := s.compactMainRev
213 s.compactMainRev = rev
214
215 rbytes := newRevBytes()
216 revToBytes(revision{main: rev}, rbytes)
217
218 tx := s.b.BatchTx()
219 tx.LockInsideApply()
220 tx.UnsafePut(buckets.Meta, scheduledCompactKeyName, rbytes)
221 tx.Unlock()
222
223 s.b.ForceCommit()
224
225 s.revMu.Unlock()
226
227 return nil, compactMainRev, nil
228 }
229
230
231 func (s *store) checkPrevCompactionCompleted() bool {
232 tx := s.b.ReadTx()
233 tx.Lock()
234 defer tx.Unlock()
235 scheduledCompact, scheduledCompactFound := UnsafeReadScheduledCompact(tx)
236 finishedCompact, finishedCompactFound := UnsafeReadFinishedCompact(tx)
237 return scheduledCompact == finishedCompact && scheduledCompactFound == finishedCompactFound
238 }
239
240 func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64, prevCompactionCompleted bool) (<-chan struct{}, error) {
241 ch := make(chan struct{})
242 var j = func(ctx context.Context) {
243 if ctx.Err() != nil {
244 s.compactBarrier(ctx, ch)
245 return
246 }
247 hash, err := s.scheduleCompaction(rev, prevCompactRev)
248 if err != nil {
249 s.lg.Warn("Failed compaction", zap.Error(err))
250 s.compactBarrier(context.TODO(), ch)
251 return
252 }
253
254
255 if prevCompactionCompleted {
256 s.hashes.Store(hash)
257 } else {
258 s.lg.Info("previous compaction was interrupted, skip storing compaction hash value")
259 }
260 close(ch)
261 }
262
263 s.fifoSched.Schedule(j)
264 trace.Step("schedule compaction")
265 return ch, nil
266 }
267
268 func (s *store) compactLockfree(rev int64) (<-chan struct{}, error) {
269 prevCompactionCompleted := s.checkPrevCompactionCompleted()
270 ch, prevCompactRev, err := s.updateCompactRev(rev)
271 if err != nil {
272 return ch, err
273 }
274
275 return s.compact(traceutil.TODO(), rev, prevCompactRev, prevCompactionCompleted)
276 }
277
278 func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) {
279 s.mu.Lock()
280
281 prevCompactionCompleted := s.checkPrevCompactionCompleted()
282 ch, prevCompactRev, err := s.updateCompactRev(rev)
283 trace.Step("check and update compact revision")
284 if err != nil {
285 s.mu.Unlock()
286 return ch, err
287 }
288 s.mu.Unlock()
289
290 return s.compact(trace, rev, prevCompactRev, prevCompactionCompleted)
291 }
292
293 func (s *store) Commit() {
294 s.mu.Lock()
295 defer s.mu.Unlock()
296 s.b.ForceCommit()
297 }
298
299 func (s *store) Restore(b backend.Backend) error {
300 s.mu.Lock()
301 defer s.mu.Unlock()
302
303 close(s.stopc)
304 s.fifoSched.Stop()
305
306 s.b = b
307 s.kvindex = newTreeIndex(s.lg)
308
309 {
310
311 s.revMu.Lock()
312 s.currentRev = 1
313 s.compactMainRev = -1
314 s.revMu.Unlock()
315 }
316
317 s.fifoSched = schedule.NewFIFOScheduler()
318 s.stopc = make(chan struct{})
319
320 return s.restore()
321 }
322
323 func (s *store) restore() error {
324 s.setupMetricsReporter()
325
326 min, max := newRevBytes(), newRevBytes()
327 revToBytes(revision{main: 1}, min)
328 revToBytes(revision{main: math.MaxInt64, sub: math.MaxInt64}, max)
329
330 keyToLease := make(map[string]lease.LeaseID)
331
332
333 tx := s.b.ReadTx()
334 tx.Lock()
335
336 finishedCompact, found := UnsafeReadFinishedCompact(tx)
337 if found {
338 s.revMu.Lock()
339 s.compactMainRev = finishedCompact
340
341 s.lg.Info(
342 "restored last compact revision",
343 zap.Stringer("meta-bucket-name", buckets.Meta),
344 zap.String("meta-bucket-name-key", string(finishedCompactKeyName)),
345 zap.Int64("restored-compact-revision", s.compactMainRev),
346 )
347 s.revMu.Unlock()
348 }
349 scheduledCompact, _ := UnsafeReadScheduledCompact(tx)
350
351
352 keysGauge.Set(0)
353 rkvc, revc := restoreIntoIndex(s.lg, s.kvindex)
354 for {
355 keys, vals := tx.UnsafeRange(buckets.Key, min, max, int64(restoreChunkKeys))
356 if len(keys) == 0 {
357 break
358 }
359
360
361 restoreChunk(s.lg, rkvc, keys, vals, keyToLease)
362 if len(keys) < restoreChunkKeys {
363
364 break
365 }
366
367 newMin := bytesToRev(keys[len(keys)-1][:revBytesLen])
368 newMin.sub++
369 revToBytes(newMin, min)
370 }
371 close(rkvc)
372
373 {
374 s.revMu.Lock()
375 s.currentRev = <-revc
376
377
378
379
380 if s.currentRev < s.compactMainRev {
381 s.currentRev = s.compactMainRev
382 }
383 s.revMu.Unlock()
384 }
385
386 if scheduledCompact <= s.compactMainRev {
387 scheduledCompact = 0
388 }
389
390 for key, lid := range keyToLease {
391 if s.le == nil {
392 tx.Unlock()
393 panic("no lessor to attach lease")
394 }
395 err := s.le.Attach(lid, []lease.LeaseItem{{Key: key}})
396 if err != nil {
397 s.lg.Error(
398 "failed to attach a lease",
399 zap.String("lease-id", fmt.Sprintf("%016x", lid)),
400 zap.Error(err),
401 )
402 }
403 }
404
405 tx.Unlock()
406
407 s.lg.Info("kvstore restored", zap.Int64("current-rev", s.currentRev))
408
409 if scheduledCompact != 0 {
410 if _, err := s.compactLockfree(scheduledCompact); err != nil {
411 s.lg.Warn("compaction encountered error", zap.Error(err))
412 }
413
414 s.lg.Info(
415 "resume scheduled compaction",
416 zap.Stringer("meta-bucket-name", buckets.Meta),
417 zap.String("meta-bucket-name-key", string(scheduledCompactKeyName)),
418 zap.Int64("scheduled-compact-revision", scheduledCompact),
419 )
420 }
421
422 return nil
423 }
424
425 type revKeyValue struct {
426 key []byte
427 kv mvccpb.KeyValue
428 kstr string
429 }
430
431 func restoreIntoIndex(lg *zap.Logger, idx index) (chan<- revKeyValue, <-chan int64) {
432 rkvc, revc := make(chan revKeyValue, restoreChunkKeys), make(chan int64, 1)
433 go func() {
434 currentRev := int64(1)
435 defer func() { revc <- currentRev }()
436
437 kiCache := make(map[string]*keyIndex, restoreChunkKeys)
438 for rkv := range rkvc {
439 ki, ok := kiCache[rkv.kstr]
440
441 if !ok && len(kiCache) >= restoreChunkKeys {
442 i := 10
443 for k := range kiCache {
444 delete(kiCache, k)
445 if i--; i == 0 {
446 break
447 }
448 }
449 }
450
451 if !ok {
452 ki = &keyIndex{key: rkv.kv.Key}
453 if idxKey := idx.KeyIndex(ki); idxKey != nil {
454 kiCache[rkv.kstr], ki = idxKey, idxKey
455 ok = true
456 }
457 }
458 rev := bytesToRev(rkv.key)
459 currentRev = rev.main
460 if ok {
461 if isTombstone(rkv.key) {
462 if err := ki.tombstone(lg, rev.main, rev.sub); err != nil {
463 lg.Warn("tombstone encountered error", zap.Error(err))
464 }
465 continue
466 }
467 ki.put(lg, rev.main, rev.sub)
468 } else if !isTombstone(rkv.key) {
469 ki.restore(lg, revision{rkv.kv.CreateRevision, 0}, rev, rkv.kv.Version)
470 idx.Insert(ki)
471 kiCache[rkv.kstr] = ki
472 }
473 }
474 }()
475 return rkvc, revc
476 }
477
478 func restoreChunk(lg *zap.Logger, kvc chan<- revKeyValue, keys, vals [][]byte, keyToLease map[string]lease.LeaseID) {
479 for i, key := range keys {
480 rkv := revKeyValue{key: key}
481 if err := rkv.kv.Unmarshal(vals[i]); err != nil {
482 lg.Fatal("failed to unmarshal mvccpb.KeyValue", zap.Error(err))
483 }
484 rkv.kstr = string(rkv.kv.Key)
485 if isTombstone(key) {
486 delete(keyToLease, rkv.kstr)
487 } else if lid := lease.LeaseID(rkv.kv.Lease); lid != lease.NoLease {
488 keyToLease[rkv.kstr] = lid
489 } else {
490 delete(keyToLease, rkv.kstr)
491 }
492 kvc <- rkv
493 }
494 }
495
496 func (s *store) Close() error {
497 close(s.stopc)
498 s.fifoSched.Stop()
499 return nil
500 }
501
502 func (s *store) setupMetricsReporter() {
503 b := s.b
504 reportDbTotalSizeInBytesMu.Lock()
505 reportDbTotalSizeInBytes = func() float64 { return float64(b.Size()) }
506 reportDbTotalSizeInBytesMu.Unlock()
507 reportDbTotalSizeInBytesDebugMu.Lock()
508 reportDbTotalSizeInBytesDebug = func() float64 { return float64(b.Size()) }
509 reportDbTotalSizeInBytesDebugMu.Unlock()
510 reportDbTotalSizeInUseInBytesMu.Lock()
511 reportDbTotalSizeInUseInBytes = func() float64 { return float64(b.SizeInUse()) }
512 reportDbTotalSizeInUseInBytesMu.Unlock()
513 reportDbOpenReadTxNMu.Lock()
514 reportDbOpenReadTxN = func() float64 { return float64(b.OpenReadTxN()) }
515 reportDbOpenReadTxNMu.Unlock()
516 reportCurrentRevMu.Lock()
517 reportCurrentRev = func() float64 {
518 s.revMu.RLock()
519 defer s.revMu.RUnlock()
520 return float64(s.currentRev)
521 }
522 reportCurrentRevMu.Unlock()
523 reportCompactRevMu.Lock()
524 reportCompactRev = func() float64 {
525 s.revMu.RLock()
526 defer s.revMu.RUnlock()
527 return float64(s.compactMainRev)
528 }
529 reportCompactRevMu.Unlock()
530 }
531
532
533 func appendMarkTombstone(lg *zap.Logger, b []byte) []byte {
534 if len(b) != revBytesLen {
535 lg.Panic(
536 "cannot append tombstone mark to non-normal revision bytes",
537 zap.Int("expected-revision-bytes-size", revBytesLen),
538 zap.Int("given-revision-bytes-size", len(b)),
539 )
540 }
541 return append(b, markTombstone)
542 }
543
544
545 func isTombstone(b []byte) bool {
546 return len(b) == markedRevBytesLen && b[markBytePosition] == markTombstone
547 }
548
549 func (s *store) HashStorage() HashStorage {
550 return s.hashes
551 }
552
View as plain text