...
1
16
17 package workqueue
18
19 import (
20 "container/heap"
21 "sync"
22 "time"
23
24 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
25 "k8s.io/utils/clock"
26 )
27
28
29
30 type DelayingInterface interface {
31 Interface
32
33 AddAfter(item interface{}, duration time.Duration)
34 }
35
36
37 type DelayingQueueConfig struct {
38
39 Name string
40
41
42
43 MetricsProvider MetricsProvider
44
45
46 Clock clock.WithTicker
47
48
49 Queue Interface
50 }
51
52
53
54
55 func NewDelayingQueue() DelayingInterface {
56 return NewDelayingQueueWithConfig(DelayingQueueConfig{})
57 }
58
59
60
61 func NewDelayingQueueWithConfig(config DelayingQueueConfig) DelayingInterface {
62 if config.Clock == nil {
63 config.Clock = clock.RealClock{}
64 }
65
66 if config.Queue == nil {
67 config.Queue = NewWithConfig(QueueConfig{
68 Name: config.Name,
69 MetricsProvider: config.MetricsProvider,
70 Clock: config.Clock,
71 })
72 }
73
74 return newDelayingQueue(config.Clock, config.Queue, config.Name, config.MetricsProvider)
75 }
76
77
78
79
80 func NewDelayingQueueWithCustomQueue(q Interface, name string) DelayingInterface {
81 return NewDelayingQueueWithConfig(DelayingQueueConfig{
82 Name: name,
83 Queue: q,
84 })
85 }
86
87
88
89 func NewNamedDelayingQueue(name string) DelayingInterface {
90 return NewDelayingQueueWithConfig(DelayingQueueConfig{Name: name})
91 }
92
93
94
95
96 func NewDelayingQueueWithCustomClock(clock clock.WithTicker, name string) DelayingInterface {
97 return NewDelayingQueueWithConfig(DelayingQueueConfig{
98 Name: name,
99 Clock: clock,
100 })
101 }
102
103 func newDelayingQueue(clock clock.WithTicker, q Interface, name string, provider MetricsProvider) *delayingType {
104 ret := &delayingType{
105 Interface: q,
106 clock: clock,
107 heartbeat: clock.NewTicker(maxWait),
108 stopCh: make(chan struct{}),
109 waitingForAddCh: make(chan *waitFor, 1000),
110 metrics: newRetryMetrics(name, provider),
111 }
112
113 go ret.waitingLoop()
114 return ret
115 }
116
117
118 type delayingType struct {
119 Interface
120
121
122 clock clock.Clock
123
124
125 stopCh chan struct{}
126
127 stopOnce sync.Once
128
129
130 heartbeat clock.Ticker
131
132
133 waitingForAddCh chan *waitFor
134
135
136 metrics retryMetrics
137 }
138
139
140 type waitFor struct {
141 data t
142 readyAt time.Time
143
144 index int
145 }
146
147
148
149
150
151
152
153
154
155 type waitForPriorityQueue []*waitFor
156
157 func (pq waitForPriorityQueue) Len() int {
158 return len(pq)
159 }
160 func (pq waitForPriorityQueue) Less(i, j int) bool {
161 return pq[i].readyAt.Before(pq[j].readyAt)
162 }
163 func (pq waitForPriorityQueue) Swap(i, j int) {
164 pq[i], pq[j] = pq[j], pq[i]
165 pq[i].index = i
166 pq[j].index = j
167 }
168
169
170
171 func (pq *waitForPriorityQueue) Push(x interface{}) {
172 n := len(*pq)
173 item := x.(*waitFor)
174 item.index = n
175 *pq = append(*pq, item)
176 }
177
178
179
180 func (pq *waitForPriorityQueue) Pop() interface{} {
181 n := len(*pq)
182 item := (*pq)[n-1]
183 item.index = -1
184 *pq = (*pq)[0:(n - 1)]
185 return item
186 }
187
188
189
190 func (pq waitForPriorityQueue) Peek() interface{} {
191 return pq[0]
192 }
193
194
195
196 func (q *delayingType) ShutDown() {
197 q.stopOnce.Do(func() {
198 q.Interface.ShutDown()
199 close(q.stopCh)
200 q.heartbeat.Stop()
201 })
202 }
203
204
205 func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
206
207 if q.ShuttingDown() {
208 return
209 }
210
211 q.metrics.retry()
212
213
214 if duration <= 0 {
215 q.Add(item)
216 return
217 }
218
219 select {
220 case <-q.stopCh:
221
222 case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
223 }
224 }
225
226
227
228
229 const maxWait = 10 * time.Second
230
231
232 func (q *delayingType) waitingLoop() {
233 defer utilruntime.HandleCrash()
234
235
236 never := make(<-chan time.Time)
237
238
239 var nextReadyAtTimer clock.Timer
240
241 waitingForQueue := &waitForPriorityQueue{}
242 heap.Init(waitingForQueue)
243
244 waitingEntryByData := map[t]*waitFor{}
245
246 for {
247 if q.Interface.ShuttingDown() {
248 return
249 }
250
251 now := q.clock.Now()
252
253
254 for waitingForQueue.Len() > 0 {
255 entry := waitingForQueue.Peek().(*waitFor)
256 if entry.readyAt.After(now) {
257 break
258 }
259
260 entry = heap.Pop(waitingForQueue).(*waitFor)
261 q.Add(entry.data)
262 delete(waitingEntryByData, entry.data)
263 }
264
265
266 nextReadyAt := never
267 if waitingForQueue.Len() > 0 {
268 if nextReadyAtTimer != nil {
269 nextReadyAtTimer.Stop()
270 }
271 entry := waitingForQueue.Peek().(*waitFor)
272 nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now))
273 nextReadyAt = nextReadyAtTimer.C()
274 }
275
276 select {
277 case <-q.stopCh:
278 return
279
280 case <-q.heartbeat.C():
281
282
283 case <-nextReadyAt:
284
285
286 case waitEntry := <-q.waitingForAddCh:
287 if waitEntry.readyAt.After(q.clock.Now()) {
288 insert(waitingForQueue, waitingEntryByData, waitEntry)
289 } else {
290 q.Add(waitEntry.data)
291 }
292
293 drained := false
294 for !drained {
295 select {
296 case waitEntry := <-q.waitingForAddCh:
297 if waitEntry.readyAt.After(q.clock.Now()) {
298 insert(waitingForQueue, waitingEntryByData, waitEntry)
299 } else {
300 q.Add(waitEntry.data)
301 }
302 default:
303 drained = true
304 }
305 }
306 }
307 }
308 }
309
310
311 func insert(q *waitForPriorityQueue, knownEntries map[t]*waitFor, entry *waitFor) {
312
313 existing, exists := knownEntries[entry.data]
314 if exists {
315 if existing.readyAt.After(entry.readyAt) {
316 existing.readyAt = entry.readyAt
317 heap.Fix(q, existing.index)
318 }
319
320 return
321 }
322
323 heap.Push(q, entry)
324 knownEntries[entry.data] = entry
325 }
326
View as plain text