...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package v3compactor
16
17 import (
18 "context"
19 "sync"
20 "time"
21
22 pb "go.etcd.io/etcd/api/v3/etcdserverpb"
23 "go.etcd.io/etcd/server/v3/mvcc"
24
25 "github.com/jonboulle/clockwork"
26 "go.uber.org/zap"
27 )
28
29
30
31 type Periodic struct {
32 lg *zap.Logger
33 clock clockwork.Clock
34 period time.Duration
35
36 rg RevGetter
37 c Compactable
38
39 revs []int64
40 ctx context.Context
41 cancel context.CancelFunc
42
43
44 mu sync.RWMutex
45 paused bool
46 }
47
48
49
50 func newPeriodic(lg *zap.Logger, clock clockwork.Clock, h time.Duration, rg RevGetter, c Compactable) *Periodic {
51 pc := &Periodic{
52 lg: lg,
53 clock: clock,
54 period: h,
55 rg: rg,
56 c: c,
57 revs: make([]int64, 0),
58 }
59 pc.ctx, pc.cancel = context.WithCancel(context.Background())
60 return pc
61 }
62
63
96
97
98 func (pc *Periodic) Run() {
99 compactInterval := pc.getCompactInterval()
100 retryInterval := pc.getRetryInterval()
101 retentions := pc.getRetentions()
102
103 go func() {
104 lastSuccess := pc.clock.Now()
105 baseInterval := pc.period
106 for {
107 pc.revs = append(pc.revs, pc.rg.Rev())
108 if len(pc.revs) > retentions {
109 pc.revs = pc.revs[1:]
110 }
111
112 select {
113 case <-pc.ctx.Done():
114 return
115 case <-pc.clock.After(retryInterval):
116 pc.mu.Lock()
117 p := pc.paused
118 pc.mu.Unlock()
119 if p {
120 continue
121 }
122 }
123
124 if pc.clock.Now().Sub(lastSuccess) < baseInterval {
125 continue
126 }
127
128
129 if baseInterval == pc.period {
130 baseInterval = compactInterval
131 }
132 rev := pc.revs[0]
133
134 pc.lg.Info(
135 "starting auto periodic compaction",
136 zap.Int64("revision", rev),
137 zap.Duration("compact-period", pc.period),
138 )
139 startTime := pc.clock.Now()
140 _, err := pc.c.Compact(pc.ctx, &pb.CompactionRequest{Revision: rev})
141 if err == nil || err == mvcc.ErrCompacted {
142 pc.lg.Info(
143 "completed auto periodic compaction",
144 zap.Int64("revision", rev),
145 zap.Duration("compact-period", pc.period),
146 zap.Duration("took", pc.clock.Now().Sub(startTime)),
147 )
148 lastSuccess = pc.clock.Now()
149 } else {
150 pc.lg.Warn(
151 "failed auto periodic compaction",
152 zap.Int64("revision", rev),
153 zap.Duration("compact-period", pc.period),
154 zap.Duration("retry-interval", retryInterval),
155 zap.Error(err),
156 )
157 }
158 }
159 }()
160 }
161
162
163
164
165
166 func (pc *Periodic) getCompactInterval() time.Duration {
167 itv := pc.period
168 if itv > time.Hour {
169 itv = time.Hour
170 }
171 return itv
172 }
173
174 func (pc *Periodic) getRetentions() int {
175 return int(pc.period/pc.getRetryInterval()) + 1
176 }
177
178 const retryDivisor = 10
179
180 func (pc *Periodic) getRetryInterval() time.Duration {
181 itv := pc.period
182 if itv > time.Hour {
183 itv = time.Hour
184 }
185 return itv / retryDivisor
186 }
187
188
189 func (pc *Periodic) Stop() {
190 pc.cancel()
191 }
192
193
194 func (pc *Periodic) Pause() {
195 pc.mu.Lock()
196 pc.paused = true
197 pc.mu.Unlock()
198 }
199
200
201 func (pc *Periodic) Resume() {
202 pc.mu.Lock()
203 pc.paused = false
204 pc.mu.Unlock()
205 }
206
View as plain text