...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package v3compactor
16
17 import (
18 "context"
19 "sync"
20 "time"
21
22 pb "go.etcd.io/etcd/api/v3/etcdserverpb"
23 "go.etcd.io/etcd/server/v3/mvcc"
24
25 "github.com/jonboulle/clockwork"
26 "go.uber.org/zap"
27 )
28
29
30
31 type Revision struct {
32 lg *zap.Logger
33
34 clock clockwork.Clock
35 retention int64
36
37 rg RevGetter
38 c Compactable
39
40 ctx context.Context
41 cancel context.CancelFunc
42
43 mu sync.Mutex
44 paused bool
45 }
46
47
48
49 func newRevision(lg *zap.Logger, clock clockwork.Clock, retention int64, rg RevGetter, c Compactable) *Revision {
50 rc := &Revision{
51 lg: lg,
52 clock: clock,
53 retention: retention,
54 rg: rg,
55 c: c,
56 }
57 rc.ctx, rc.cancel = context.WithCancel(context.Background())
58 return rc
59 }
60
61 const revInterval = 5 * time.Minute
62
63
64 func (rc *Revision) Run() {
65 prev := int64(0)
66 go func() {
67 for {
68 select {
69 case <-rc.ctx.Done():
70 return
71 case <-rc.clock.After(revInterval):
72 rc.mu.Lock()
73 p := rc.paused
74 rc.mu.Unlock()
75 if p {
76 continue
77 }
78 }
79
80 rev := rc.rg.Rev() - rc.retention
81 if rev <= 0 || rev == prev {
82 continue
83 }
84
85 now := time.Now()
86 rc.lg.Info(
87 "starting auto revision compaction",
88 zap.Int64("revision", rev),
89 zap.Int64("revision-compaction-retention", rc.retention),
90 )
91 _, err := rc.c.Compact(rc.ctx, &pb.CompactionRequest{Revision: rev})
92 if err == nil || err == mvcc.ErrCompacted {
93 prev = rev
94 rc.lg.Info(
95 "completed auto revision compaction",
96 zap.Int64("revision", rev),
97 zap.Int64("revision-compaction-retention", rc.retention),
98 zap.Duration("took", time.Since(now)),
99 )
100 } else {
101 rc.lg.Warn(
102 "failed auto revision compaction",
103 zap.Int64("revision", rev),
104 zap.Int64("revision-compaction-retention", rc.retention),
105 zap.Duration("retry-interval", revInterval),
106 zap.Error(err),
107 )
108 }
109 }
110 }()
111 }
112
113
114 func (rc *Revision) Stop() {
115 rc.cancel()
116 }
117
118
119 func (rc *Revision) Pause() {
120 rc.mu.Lock()
121 rc.paused = true
122 rc.mu.Unlock()
123 }
124
125
126 func (rc *Revision) Resume() {
127 rc.mu.Lock()
128 rc.paused = false
129 rc.mu.Unlock()
130 }
131
View as plain text