...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package mvcc
16
17 import (
18 "encoding/binary"
19 "fmt"
20 "time"
21
22 humanize "github.com/dustin/go-humanize"
23 "go.etcd.io/etcd/server/v3/mvcc/buckets"
24 "go.uber.org/zap"
25 )
26
27 func (s *store) scheduleCompaction(compactMainRev, prevCompactRev int64) (KeyValueHash, error) {
28 totalStart := time.Now()
29 keep := s.kvindex.Compact(compactMainRev)
30 indexCompactionPauseMs.Observe(float64(time.Since(totalStart) / time.Millisecond))
31
32 totalStart = time.Now()
33 defer func() { dbCompactionTotalMs.Observe(float64(time.Since(totalStart) / time.Millisecond)) }()
34 keyCompactions := 0
35 defer func() { dbCompactionKeysCounter.Add(float64(keyCompactions)) }()
36 defer func() { dbCompactionLast.Set(float64(time.Now().Unix())) }()
37
38 end := make([]byte, 8)
39 binary.BigEndian.PutUint64(end, uint64(compactMainRev+1))
40
41 batchNum := s.cfg.CompactionBatchLimit
42 h := newKVHasher(prevCompactRev, compactMainRev, keep)
43 last := make([]byte, 8+1+8)
44 for {
45 var rev revision
46
47 start := time.Now()
48
49 tx := s.b.BatchTx()
50 tx.LockOutsideApply()
51 keys, values := tx.UnsafeRange(buckets.Key, last, end, int64(batchNum))
52 for i := range keys {
53 rev = bytesToRev(keys[i])
54 if _, ok := keep[rev]; !ok {
55 tx.UnsafeDelete(buckets.Key, keys[i])
56 keyCompactions++
57 }
58 h.WriteKeyValue(keys[i], values[i])
59 }
60
61 if len(keys) < s.cfg.CompactionBatchLimit {
62 rbytes := make([]byte, 8+1+8)
63 revToBytes(revision{main: compactMainRev}, rbytes)
64 tx.UnsafePut(buckets.Meta, finishedCompactKeyName, rbytes)
65 tx.Unlock()
66 hash := h.Hash()
67 size, sizeInUse := s.b.Size(), s.b.SizeInUse()
68 s.lg.Info(
69 "finished scheduled compaction",
70 zap.Int64("compact-revision", compactMainRev),
71 zap.Duration("took", time.Since(totalStart)),
72 zap.Uint32("hash", hash.Hash),
73 zap.Int64("current-db-size-bytes", size),
74 zap.String("current-db-size", humanize.Bytes(uint64(size))),
75 zap.Int64("current-db-size-in-use-bytes", sizeInUse),
76 zap.String("current-db-size-in-use", humanize.Bytes(uint64(sizeInUse))),
77 )
78 return hash, nil
79 }
80
81
82 revToBytes(revision{main: rev.main, sub: rev.sub + 1}, last)
83 tx.Unlock()
84
85 s.b.ForceCommit()
86 dbCompactionPauseMs.Observe(float64(time.Since(start) / time.Millisecond))
87
88 select {
89 case <-time.After(10 * time.Millisecond):
90 case <-s.stopc:
91 return KeyValueHash{}, fmt.Errorf("interrupted due to stop signal")
92 }
93 }
94 }
95
View as plain text