...
1
16
17 package scheduler
18
19 import (
20 "container/heap"
21 "sync"
22 "time"
23
24 "k8s.io/apimachinery/pkg/util/sets"
25 "k8s.io/client-go/util/flowcontrol"
26 "k8s.io/klog/v2"
27 )
28
29 const (
30
31
32 NodeHealthUpdateRetry = 5
33
34
35 NodeEvictionPeriod = 100 * time.Millisecond
36
37
38 EvictionRateLimiterBurst = 1
39 )
40
41
42 type TimedValue struct {
43 Value string
44
45 UID interface{}
46 AddedAt time.Time
47 ProcessAt time.Time
48 }
49
50
51 var now = time.Now
52
53
54 type TimedQueue []*TimedValue
55
56
57 func (h TimedQueue) Len() int { return len(h) }
58
59
60 func (h TimedQueue) Less(i, j int) bool { return h[i].ProcessAt.Before(h[j].ProcessAt) }
61
62
63 func (h TimedQueue) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
64
65
66 func (h *TimedQueue) Push(x interface{}) {
67 *h = append(*h, x.(*TimedValue))
68 }
69
70
71 func (h *TimedQueue) Pop() interface{} {
72 old := *h
73 n := len(old)
74 x := old[n-1]
75 *h = old[0 : n-1]
76 return x
77 }
78
79
80
81 type UniqueQueue struct {
82 lock sync.Mutex
83 queue TimedQueue
84 set sets.String
85 }
86
87
88
89
90 func (q *UniqueQueue) Add(value TimedValue) bool {
91 q.lock.Lock()
92 defer q.lock.Unlock()
93
94 if q.set.Has(value.Value) {
95 return false
96 }
97 heap.Push(&q.queue, &value)
98 q.set.Insert(value.Value)
99 return true
100 }
101
102
103
104
105 func (q *UniqueQueue) Replace(value TimedValue) bool {
106 q.lock.Lock()
107 defer q.lock.Unlock()
108
109 for i := range q.queue {
110 if q.queue[i].Value != value.Value {
111 continue
112 }
113 heap.Remove(&q.queue, i)
114 heap.Push(&q.queue, &value)
115 return true
116 }
117 return false
118 }
119
120
121
122
123 func (q *UniqueQueue) RemoveFromQueue(value string) bool {
124 q.lock.Lock()
125 defer q.lock.Unlock()
126
127 if !q.set.Has(value) {
128 return false
129 }
130 for i, val := range q.queue {
131 if val.Value == value {
132 heap.Remove(&q.queue, i)
133 return true
134 }
135 }
136 return false
137 }
138
139
140
141
142 func (q *UniqueQueue) Remove(value string) bool {
143 q.lock.Lock()
144 defer q.lock.Unlock()
145
146 if !q.set.Has(value) {
147 return false
148 }
149 q.set.Delete(value)
150 for i, val := range q.queue {
151 if val.Value == value {
152 heap.Remove(&q.queue, i)
153 return true
154 }
155 }
156 return true
157 }
158
159
160 func (q *UniqueQueue) Get() (TimedValue, bool) {
161 q.lock.Lock()
162 defer q.lock.Unlock()
163 if len(q.queue) == 0 {
164 return TimedValue{}, false
165 }
166 result := heap.Pop(&q.queue).(*TimedValue)
167 q.set.Delete(result.Value)
168 return *result, true
169 }
170
171
172
173 func (q *UniqueQueue) Head() (TimedValue, bool) {
174 q.lock.Lock()
175 defer q.lock.Unlock()
176 if len(q.queue) == 0 {
177 return TimedValue{}, false
178 }
179 result := q.queue[0]
180 return *result, true
181 }
182
183
184
185 func (q *UniqueQueue) Clear() {
186 q.lock.Lock()
187 defer q.lock.Unlock()
188 if q.queue.Len() > 0 {
189 q.queue = make(TimedQueue, 0)
190 }
191 if len(q.set) > 0 {
192 q.set = sets.NewString()
193 }
194 }
195
196
197
198 type RateLimitedTimedQueue struct {
199 queue UniqueQueue
200 limiterLock sync.Mutex
201 limiter flowcontrol.RateLimiter
202 }
203
204
205
206 func NewRateLimitedTimedQueue(limiter flowcontrol.RateLimiter) *RateLimitedTimedQueue {
207 return &RateLimitedTimedQueue{
208 queue: UniqueQueue{
209 queue: TimedQueue{},
210 set: sets.NewString(),
211 },
212 limiter: limiter,
213 }
214 }
215
216
217
218
219 type ActionFunc func(TimedValue) (bool, time.Duration)
220
221
222
223
224
225
226
227
228
229
230
231 func (q *RateLimitedTimedQueue) Try(logger klog.Logger, fn ActionFunc) {
232 val, ok := q.queue.Head()
233 q.limiterLock.Lock()
234 defer q.limiterLock.Unlock()
235 for ok {
236
237 if !q.limiter.TryAccept() {
238 logger.V(10).Info("Try rate limited", "value", val)
239
240 break
241 }
242
243 now := now()
244 if now.Before(val.ProcessAt) {
245 break
246 }
247
248 if ok, wait := fn(val); !ok {
249 val.ProcessAt = now.Add(wait + 1)
250 q.queue.Replace(val)
251 } else {
252 q.queue.RemoveFromQueue(val.Value)
253 }
254 val, ok = q.queue.Head()
255 }
256 }
257
258
259
260
261 func (q *RateLimitedTimedQueue) Add(value string, uid interface{}) bool {
262 now := now()
263 return q.queue.Add(TimedValue{
264 Value: value,
265 UID: uid,
266 AddedAt: now,
267 ProcessAt: now,
268 })
269 }
270
271
272
273 func (q *RateLimitedTimedQueue) Remove(value string) bool {
274 return q.queue.Remove(value)
275 }
276
277
278 func (q *RateLimitedTimedQueue) Clear() {
279 q.queue.Clear()
280 }
281
282
283
284 func (q *RateLimitedTimedQueue) SwapLimiter(newQPS float32) {
285 q.limiterLock.Lock()
286 defer q.limiterLock.Unlock()
287 if q.limiter.QPS() == newQPS {
288 return
289 }
290 var newLimiter flowcontrol.RateLimiter
291 if newQPS <= 0 {
292 newLimiter = flowcontrol.NewFakeNeverRateLimiter()
293 } else {
294 newLimiter = flowcontrol.NewTokenBucketRateLimiter(newQPS, EvictionRateLimiterBurst)
295
296
297
298
299
300
301
302 if q.limiter.TryAccept() == false {
303 newLimiter.TryAccept()
304 }
305 }
306 q.limiter.Stop()
307 q.limiter = newLimiter
308 }
309
View as plain text