1
2
3
4
5
6 package rate
7
8 import (
9 "context"
10 "fmt"
11 "math"
12 "sync"
13 "time"
14 )
15
16
17
18
19 type Limit float64
20
21
22 const Inf = Limit(math.MaxFloat64)
23
24
25 func Every(interval time.Duration) Limit {
26 if interval <= 0 {
27 return Inf
28 }
29 return 1 / Limit(interval.Seconds())
30 }
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57 type Limiter struct {
58 mu sync.Mutex
59 limit Limit
60 burst int
61 tokens float64
62
63 last time.Time
64
65 lastEvent time.Time
66 }
67
68
69 func (lim *Limiter) Limit() Limit {
70 lim.mu.Lock()
71 defer lim.mu.Unlock()
72 return lim.limit
73 }
74
75
76
77
78
79 func (lim *Limiter) Burst() int {
80 lim.mu.Lock()
81 defer lim.mu.Unlock()
82 return lim.burst
83 }
84
85
86 func (lim *Limiter) TokensAt(t time.Time) float64 {
87 lim.mu.Lock()
88 _, tokens := lim.advance(t)
89 lim.mu.Unlock()
90 return tokens
91 }
92
93
94 func (lim *Limiter) Tokens() float64 {
95 return lim.TokensAt(time.Now())
96 }
97
98
99
100 func NewLimiter(r Limit, b int) *Limiter {
101 return &Limiter{
102 limit: r,
103 burst: b,
104 }
105 }
106
107
108 func (lim *Limiter) Allow() bool {
109 return lim.AllowN(time.Now(), 1)
110 }
111
112
113
114
115 func (lim *Limiter) AllowN(t time.Time, n int) bool {
116 return lim.reserveN(t, n, 0).ok
117 }
118
119
120
121 type Reservation struct {
122 ok bool
123 lim *Limiter
124 tokens int
125 timeToAct time.Time
126
127 limit Limit
128 }
129
130
131
132
133 func (r *Reservation) OK() bool {
134 return r.ok
135 }
136
137
138 func (r *Reservation) Delay() time.Duration {
139 return r.DelayFrom(time.Now())
140 }
141
142
143 const InfDuration = time.Duration(math.MaxInt64)
144
145
146
147
148
149 func (r *Reservation) DelayFrom(t time.Time) time.Duration {
150 if !r.ok {
151 return InfDuration
152 }
153 delay := r.timeToAct.Sub(t)
154 if delay < 0 {
155 return 0
156 }
157 return delay
158 }
159
160
161 func (r *Reservation) Cancel() {
162 r.CancelAt(time.Now())
163 }
164
165
166
167
168 func (r *Reservation) CancelAt(t time.Time) {
169 if !r.ok {
170 return
171 }
172
173 r.lim.mu.Lock()
174 defer r.lim.mu.Unlock()
175
176 if r.lim.limit == Inf || r.tokens == 0 || r.timeToAct.Before(t) {
177 return
178 }
179
180
181
182
183 restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct))
184 if restoreTokens <= 0 {
185 return
186 }
187
188 t, tokens := r.lim.advance(t)
189
190 tokens += restoreTokens
191 if burst := float64(r.lim.burst); tokens > burst {
192 tokens = burst
193 }
194
195 r.lim.last = t
196 r.lim.tokens = tokens
197 if r.timeToAct == r.lim.lastEvent {
198 prevEvent := r.timeToAct.Add(r.limit.durationFromTokens(float64(-r.tokens)))
199 if !prevEvent.Before(t) {
200 r.lim.lastEvent = prevEvent
201 }
202 }
203 }
204
205
206 func (lim *Limiter) Reserve() *Reservation {
207 return lim.ReserveN(time.Now(), 1)
208 }
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226 func (lim *Limiter) ReserveN(t time.Time, n int) *Reservation {
227 r := lim.reserveN(t, n, InfDuration)
228 return &r
229 }
230
231
232 func (lim *Limiter) Wait(ctx context.Context) (err error) {
233 return lim.WaitN(ctx, 1)
234 }
235
236
237
238
239
240 func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) {
241
242
243 newTimer := func(d time.Duration) (<-chan time.Time, func() bool, func()) {
244 timer := time.NewTimer(d)
245 return timer.C, timer.Stop, func() {}
246 }
247
248 return lim.wait(ctx, n, time.Now(), newTimer)
249 }
250
251
252 func (lim *Limiter) wait(ctx context.Context, n int, t time.Time, newTimer func(d time.Duration) (<-chan time.Time, func() bool, func())) error {
253 lim.mu.Lock()
254 burst := lim.burst
255 limit := lim.limit
256 lim.mu.Unlock()
257
258 if n > burst && limit != Inf {
259 return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, burst)
260 }
261
262 select {
263 case <-ctx.Done():
264 return ctx.Err()
265 default:
266 }
267
268 waitLimit := InfDuration
269 if deadline, ok := ctx.Deadline(); ok {
270 waitLimit = deadline.Sub(t)
271 }
272
273 r := lim.reserveN(t, n, waitLimit)
274 if !r.ok {
275 return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n)
276 }
277
278 delay := r.DelayFrom(t)
279 if delay == 0 {
280 return nil
281 }
282 ch, stop, advance := newTimer(delay)
283 defer stop()
284 advance()
285 select {
286 case <-ch:
287
288 return nil
289 case <-ctx.Done():
290
291
292 r.Cancel()
293 return ctx.Err()
294 }
295 }
296
297
298 func (lim *Limiter) SetLimit(newLimit Limit) {
299 lim.SetLimitAt(time.Now(), newLimit)
300 }
301
302
303
304
305 func (lim *Limiter) SetLimitAt(t time.Time, newLimit Limit) {
306 lim.mu.Lock()
307 defer lim.mu.Unlock()
308
309 t, tokens := lim.advance(t)
310
311 lim.last = t
312 lim.tokens = tokens
313 lim.limit = newLimit
314 }
315
316
317 func (lim *Limiter) SetBurst(newBurst int) {
318 lim.SetBurstAt(time.Now(), newBurst)
319 }
320
321
322 func (lim *Limiter) SetBurstAt(t time.Time, newBurst int) {
323 lim.mu.Lock()
324 defer lim.mu.Unlock()
325
326 t, tokens := lim.advance(t)
327
328 lim.last = t
329 lim.tokens = tokens
330 lim.burst = newBurst
331 }
332
333
334
335
336 func (lim *Limiter) reserveN(t time.Time, n int, maxFutureReserve time.Duration) Reservation {
337 lim.mu.Lock()
338 defer lim.mu.Unlock()
339
340 if lim.limit == Inf {
341 return Reservation{
342 ok: true,
343 lim: lim,
344 tokens: n,
345 timeToAct: t,
346 }
347 } else if lim.limit == 0 {
348 var ok bool
349 if lim.burst >= n {
350 ok = true
351 lim.burst -= n
352 }
353 return Reservation{
354 ok: ok,
355 lim: lim,
356 tokens: lim.burst,
357 timeToAct: t,
358 }
359 }
360
361 t, tokens := lim.advance(t)
362
363
364 tokens -= float64(n)
365
366
367 var waitDuration time.Duration
368 if tokens < 0 {
369 waitDuration = lim.limit.durationFromTokens(-tokens)
370 }
371
372
373 ok := n <= lim.burst && waitDuration <= maxFutureReserve
374
375
376 r := Reservation{
377 ok: ok,
378 lim: lim,
379 limit: lim.limit,
380 }
381 if ok {
382 r.tokens = n
383 r.timeToAct = t.Add(waitDuration)
384
385
386 lim.last = t
387 lim.tokens = tokens
388 lim.lastEvent = r.timeToAct
389 }
390
391 return r
392 }
393
394
395
396
397 func (lim *Limiter) advance(t time.Time) (newT time.Time, newTokens float64) {
398 last := lim.last
399 if t.Before(last) {
400 last = t
401 }
402
403
404 elapsed := t.Sub(last)
405 delta := lim.limit.tokensFromDuration(elapsed)
406 tokens := lim.tokens + delta
407 if burst := float64(lim.burst); tokens > burst {
408 tokens = burst
409 }
410 return t, tokens
411 }
412
413
414
415 func (limit Limit) durationFromTokens(tokens float64) time.Duration {
416 if limit <= 0 {
417 return InfDuration
418 }
419 seconds := tokens / float64(limit)
420 return time.Duration(float64(time.Second) * seconds)
421 }
422
423
424
425 func (limit Limit) tokensFromDuration(d time.Duration) float64 {
426 if limit <= 0 {
427 return 0
428 }
429 return d.Seconds() * float64(limit)
430 }
431
View as plain text