...
1
16
17 package async
18
19 import (
20 "fmt"
21 "sync"
22 "time"
23
24 "k8s.io/client-go/util/flowcontrol"
25
26 "k8s.io/klog/v2"
27 )
28
29
30
31 type BoundedFrequencyRunner struct {
32 name string
33 minInterval time.Duration
34 maxInterval time.Duration
35
36 run chan struct{}
37
38 mu sync.Mutex
39 fn func()
40 lastRun time.Time
41 timer timer
42 limiter rateLimiter
43
44 retry chan struct{}
45 retryMu sync.Mutex
46 retryTime time.Time
47 }
48
49
50 type rateLimiter interface {
51 TryAccept() bool
52 Stop()
53 }
54
55 type nullLimiter struct{}
56
57 func (nullLimiter) TryAccept() bool {
58 return true
59 }
60
61 func (nullLimiter) Stop() {}
62
63 var _ rateLimiter = nullLimiter{}
64
65
66 type timer interface {
67
68 C() <-chan time.Time
69
70
71 Reset(d time.Duration) bool
72
73
74 Stop() bool
75
76
77 Now() time.Time
78
79
80 Remaining() time.Duration
81
82
83 Since(t time.Time) time.Duration
84
85
86 Sleep(d time.Duration)
87 }
88
89
90 type realTimer struct {
91 timer *time.Timer
92 next time.Time
93 }
94
95 func (rt *realTimer) C() <-chan time.Time {
96 return rt.timer.C
97 }
98
99 func (rt *realTimer) Reset(d time.Duration) bool {
100 rt.next = time.Now().Add(d)
101 return rt.timer.Reset(d)
102 }
103
104 func (rt *realTimer) Stop() bool {
105 return rt.timer.Stop()
106 }
107
108 func (rt *realTimer) Now() time.Time {
109 return time.Now()
110 }
111
112 func (rt *realTimer) Remaining() time.Duration {
113 return rt.next.Sub(time.Now())
114 }
115
116 func (rt *realTimer) Since(t time.Time) time.Duration {
117 return time.Since(t)
118 }
119
120 func (rt *realTimer) Sleep(d time.Duration) {
121 time.Sleep(d)
122 }
123
124 var _ timer = &realTimer{}
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155 func NewBoundedFrequencyRunner(name string, fn func(), minInterval, maxInterval time.Duration, burstRuns int) *BoundedFrequencyRunner {
156 timer := &realTimer{timer: time.NewTimer(0)}
157 <-timer.C()
158 return construct(name, fn, minInterval, maxInterval, burstRuns, timer)
159 }
160
161
162 func construct(name string, fn func(), minInterval, maxInterval time.Duration, burstRuns int, timer timer) *BoundedFrequencyRunner {
163 if maxInterval < minInterval {
164 panic(fmt.Sprintf("%s: maxInterval (%v) must be >= minInterval (%v)", name, maxInterval, minInterval))
165 }
166 if timer == nil {
167 panic(fmt.Sprintf("%s: timer must be non-nil", name))
168 }
169
170 bfr := &BoundedFrequencyRunner{
171 name: name,
172 fn: fn,
173 minInterval: minInterval,
174 maxInterval: maxInterval,
175 run: make(chan struct{}, 1),
176 retry: make(chan struct{}, 1),
177 timer: timer,
178 }
179 if minInterval == 0 {
180 bfr.limiter = nullLimiter{}
181 } else {
182
183 qps := float32(time.Second) / float32(minInterval)
184 bfr.limiter = flowcontrol.NewTokenBucketRateLimiterWithClock(qps, burstRuns, timer)
185 }
186 return bfr
187 }
188
189
190
191 func (bfr *BoundedFrequencyRunner) Loop(stop <-chan struct{}) {
192 klog.V(3).Infof("%s Loop running", bfr.name)
193 bfr.timer.Reset(bfr.maxInterval)
194 for {
195 select {
196 case <-stop:
197 bfr.stop()
198 klog.V(3).Infof("%s Loop stopping", bfr.name)
199 return
200 case <-bfr.timer.C():
201 bfr.tryRun()
202 case <-bfr.run:
203 bfr.tryRun()
204 case <-bfr.retry:
205 bfr.doRetry()
206 }
207 }
208 }
209
210
211
212
213
214
215 func (bfr *BoundedFrequencyRunner) Run() {
216
217
218
219
220 select {
221 case bfr.run <- struct{}{}:
222 default:
223 }
224 }
225
226
227
228
229 func (bfr *BoundedFrequencyRunner) RetryAfter(interval time.Duration) {
230
231
232
233
234
235
236
237 retryTime := bfr.timer.Now().Add(interval)
238
239
240
241
242
243 bfr.retryMu.Lock()
244 defer bfr.retryMu.Unlock()
245 if !bfr.retryTime.IsZero() && bfr.retryTime.Before(retryTime) {
246 return
247 }
248 bfr.retryTime = retryTime
249
250 select {
251 case bfr.retry <- struct{}{}:
252 default:
253 }
254 }
255
256
257 func (bfr *BoundedFrequencyRunner) stop() {
258 bfr.mu.Lock()
259 defer bfr.mu.Unlock()
260 bfr.limiter.Stop()
261 bfr.timer.Stop()
262 }
263
264
265 func (bfr *BoundedFrequencyRunner) doRetry() {
266 bfr.mu.Lock()
267 defer bfr.mu.Unlock()
268 bfr.retryMu.Lock()
269 defer bfr.retryMu.Unlock()
270
271 if bfr.retryTime.IsZero() {
272 return
273 }
274
275
276 retryInterval := bfr.retryTime.Sub(bfr.timer.Now())
277 bfr.retryTime = time.Time{}
278 if retryInterval < bfr.timer.Remaining() {
279 klog.V(3).Infof("%s: retrying in %v", bfr.name, retryInterval)
280 bfr.timer.Stop()
281 bfr.timer.Reset(retryInterval)
282 }
283 }
284
285
286 func (bfr *BoundedFrequencyRunner) tryRun() {
287 bfr.mu.Lock()
288 defer bfr.mu.Unlock()
289
290 if bfr.limiter.TryAccept() {
291
292 bfr.fn()
293 bfr.lastRun = bfr.timer.Now()
294 bfr.timer.Stop()
295 bfr.timer.Reset(bfr.maxInterval)
296 klog.V(3).Infof("%s: ran, next possible in %v, periodic in %v", bfr.name, bfr.minInterval, bfr.maxInterval)
297 return
298 }
299
300
301 elapsed := bfr.timer.Since(bfr.lastRun)
302 nextPossible := bfr.minInterval - elapsed
303 nextScheduled := bfr.timer.Remaining()
304 klog.V(4).Infof("%s: %v since last run, possible in %v, scheduled in %v", bfr.name, elapsed, nextPossible, nextScheduled)
305
306
307
308 if nextPossible < nextScheduled {
309 nextScheduled = nextPossible
310 }
311 bfr.timer.Stop()
312 bfr.timer.Reset(nextScheduled)
313 }
314
View as plain text