...
1
16
17 package workqueue
18
19 import (
20 "math"
21 "sync"
22 "time"
23
24 "golang.org/x/time/rate"
25 )
26
27 type RateLimiter interface {
28
29 When(item interface{}) time.Duration
30
31
32 Forget(item interface{})
33
34 NumRequeues(item interface{}) int
35 }
36
37
38
39 func DefaultControllerRateLimiter() RateLimiter {
40 return NewMaxOfRateLimiter(
41 NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
42
43 &BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
44 )
45 }
46
47
48 type BucketRateLimiter struct {
49 *rate.Limiter
50 }
51
52 var _ RateLimiter = &BucketRateLimiter{}
53
54 func (r *BucketRateLimiter) When(item interface{}) time.Duration {
55 return r.Limiter.Reserve().Delay()
56 }
57
58 func (r *BucketRateLimiter) NumRequeues(item interface{}) int {
59 return 0
60 }
61
62 func (r *BucketRateLimiter) Forget(item interface{}) {
63 }
64
65
66
67 type ItemExponentialFailureRateLimiter struct {
68 failuresLock sync.Mutex
69 failures map[interface{}]int
70
71 baseDelay time.Duration
72 maxDelay time.Duration
73 }
74
75 var _ RateLimiter = &ItemExponentialFailureRateLimiter{}
76
77 func NewItemExponentialFailureRateLimiter(baseDelay time.Duration, maxDelay time.Duration) RateLimiter {
78 return &ItemExponentialFailureRateLimiter{
79 failures: map[interface{}]int{},
80 baseDelay: baseDelay,
81 maxDelay: maxDelay,
82 }
83 }
84
85 func DefaultItemBasedRateLimiter() RateLimiter {
86 return NewItemExponentialFailureRateLimiter(time.Millisecond, 1000*time.Second)
87 }
88
89 func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
90 r.failuresLock.Lock()
91 defer r.failuresLock.Unlock()
92
93 exp := r.failures[item]
94 r.failures[item] = r.failures[item] + 1
95
96
97 backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
98 if backoff > math.MaxInt64 {
99 return r.maxDelay
100 }
101
102 calculated := time.Duration(backoff)
103 if calculated > r.maxDelay {
104 return r.maxDelay
105 }
106
107 return calculated
108 }
109
110 func (r *ItemExponentialFailureRateLimiter) NumRequeues(item interface{}) int {
111 r.failuresLock.Lock()
112 defer r.failuresLock.Unlock()
113
114 return r.failures[item]
115 }
116
117 func (r *ItemExponentialFailureRateLimiter) Forget(item interface{}) {
118 r.failuresLock.Lock()
119 defer r.failuresLock.Unlock()
120
121 delete(r.failures, item)
122 }
123
124
125 type ItemFastSlowRateLimiter struct {
126 failuresLock sync.Mutex
127 failures map[interface{}]int
128
129 maxFastAttempts int
130 fastDelay time.Duration
131 slowDelay time.Duration
132 }
133
134 var _ RateLimiter = &ItemFastSlowRateLimiter{}
135
136 func NewItemFastSlowRateLimiter(fastDelay, slowDelay time.Duration, maxFastAttempts int) RateLimiter {
137 return &ItemFastSlowRateLimiter{
138 failures: map[interface{}]int{},
139 fastDelay: fastDelay,
140 slowDelay: slowDelay,
141 maxFastAttempts: maxFastAttempts,
142 }
143 }
144
145 func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration {
146 r.failuresLock.Lock()
147 defer r.failuresLock.Unlock()
148
149 r.failures[item] = r.failures[item] + 1
150
151 if r.failures[item] <= r.maxFastAttempts {
152 return r.fastDelay
153 }
154
155 return r.slowDelay
156 }
157
158 func (r *ItemFastSlowRateLimiter) NumRequeues(item interface{}) int {
159 r.failuresLock.Lock()
160 defer r.failuresLock.Unlock()
161
162 return r.failures[item]
163 }
164
165 func (r *ItemFastSlowRateLimiter) Forget(item interface{}) {
166 r.failuresLock.Lock()
167 defer r.failuresLock.Unlock()
168
169 delete(r.failures, item)
170 }
171
172
173
174
175 type MaxOfRateLimiter struct {
176 limiters []RateLimiter
177 }
178
179 func (r *MaxOfRateLimiter) When(item interface{}) time.Duration {
180 ret := time.Duration(0)
181 for _, limiter := range r.limiters {
182 curr := limiter.When(item)
183 if curr > ret {
184 ret = curr
185 }
186 }
187
188 return ret
189 }
190
191 func NewMaxOfRateLimiter(limiters ...RateLimiter) RateLimiter {
192 return &MaxOfRateLimiter{limiters: limiters}
193 }
194
195 func (r *MaxOfRateLimiter) NumRequeues(item interface{}) int {
196 ret := 0
197 for _, limiter := range r.limiters {
198 curr := limiter.NumRequeues(item)
199 if curr > ret {
200 ret = curr
201 }
202 }
203
204 return ret
205 }
206
207 func (r *MaxOfRateLimiter) Forget(item interface{}) {
208 for _, limiter := range r.limiters {
209 limiter.Forget(item)
210 }
211 }
212
213
214 type WithMaxWaitRateLimiter struct {
215 limiter RateLimiter
216 maxDelay time.Duration
217 }
218
219 func NewWithMaxWaitRateLimiter(limiter RateLimiter, maxDelay time.Duration) RateLimiter {
220 return &WithMaxWaitRateLimiter{limiter: limiter, maxDelay: maxDelay}
221 }
222
223 func (w WithMaxWaitRateLimiter) When(item interface{}) time.Duration {
224 delay := w.limiter.When(item)
225 if delay > w.maxDelay {
226 return w.maxDelay
227 }
228
229 return delay
230 }
231
232 func (w WithMaxWaitRateLimiter) Forget(item interface{}) {
233 w.limiter.Forget(item)
234 }
235
236 func (w WithMaxWaitRateLimiter) NumRequeues(item interface{}) int {
237 return w.limiter.NumRequeues(item)
238 }
239
View as plain text