...
1
16
17 package workqueue
18
19 import (
20 "sync"
21 "time"
22
23 "k8s.io/utils/clock"
24 )
25
26 type Interface interface {
27 Add(item interface{})
28 Len() int
29 Get() (item interface{}, shutdown bool)
30 Done(item interface{})
31 ShutDown()
32 ShutDownWithDrain()
33 ShuttingDown() bool
34 }
35
36
37 type QueueConfig struct {
38
39 Name string
40
41
42
43 MetricsProvider MetricsProvider
44
45
46 Clock clock.WithTicker
47 }
48
49
50 func New() *Type {
51 return NewWithConfig(QueueConfig{
52 Name: "",
53 })
54 }
55
56
57
58 func NewWithConfig(config QueueConfig) *Type {
59 return newQueueWithConfig(config, defaultUnfinishedWorkUpdatePeriod)
60 }
61
62
63
64 func NewNamed(name string) *Type {
65 return NewWithConfig(QueueConfig{
66 Name: name,
67 })
68 }
69
70
71
72 func newQueueWithConfig(config QueueConfig, updatePeriod time.Duration) *Type {
73 var metricsFactory *queueMetricsFactory
74 if config.MetricsProvider != nil {
75 metricsFactory = &queueMetricsFactory{
76 metricsProvider: config.MetricsProvider,
77 }
78 } else {
79 metricsFactory = &globalMetricsFactory
80 }
81
82 if config.Clock == nil {
83 config.Clock = clock.RealClock{}
84 }
85
86 return newQueue(
87 config.Clock,
88 metricsFactory.newQueueMetrics(config.Name, config.Clock),
89 updatePeriod,
90 )
91 }
92
93 func newQueue(c clock.WithTicker, metrics queueMetrics, updatePeriod time.Duration) *Type {
94 t := &Type{
95 clock: c,
96 dirty: set{},
97 processing: set{},
98 cond: sync.NewCond(&sync.Mutex{}),
99 metrics: metrics,
100 unfinishedWorkUpdatePeriod: updatePeriod,
101 }
102
103
104
105 if _, ok := metrics.(noMetrics); !ok {
106 go t.updateUnfinishedWorkLoop()
107 }
108
109 return t
110 }
111
112 const defaultUnfinishedWorkUpdatePeriod = 500 * time.Millisecond
113
114
115 type Type struct {
116
117
118
119 queue []t
120
121
122 dirty set
123
124
125
126
127
128 processing set
129
130 cond *sync.Cond
131
132 shuttingDown bool
133 drain bool
134
135 metrics queueMetrics
136
137 unfinishedWorkUpdatePeriod time.Duration
138 clock clock.WithTicker
139 }
140
141 type empty struct{}
142 type t interface{}
143 type set map[t]empty
144
145 func (s set) has(item t) bool {
146 _, exists := s[item]
147 return exists
148 }
149
150 func (s set) insert(item t) {
151 s[item] = empty{}
152 }
153
154 func (s set) delete(item t) {
155 delete(s, item)
156 }
157
158 func (s set) len() int {
159 return len(s)
160 }
161
162
163 func (q *Type) Add(item interface{}) {
164 q.cond.L.Lock()
165 defer q.cond.L.Unlock()
166 if q.shuttingDown {
167 return
168 }
169 if q.dirty.has(item) {
170 return
171 }
172
173 q.metrics.add(item)
174
175 q.dirty.insert(item)
176 if q.processing.has(item) {
177 return
178 }
179
180 q.queue = append(q.queue, item)
181 q.cond.Signal()
182 }
183
184
185
186
187 func (q *Type) Len() int {
188 q.cond.L.Lock()
189 defer q.cond.L.Unlock()
190 return len(q.queue)
191 }
192
193
194
195
196 func (q *Type) Get() (item interface{}, shutdown bool) {
197 q.cond.L.Lock()
198 defer q.cond.L.Unlock()
199 for len(q.queue) == 0 && !q.shuttingDown {
200 q.cond.Wait()
201 }
202 if len(q.queue) == 0 {
203
204 return nil, true
205 }
206
207 item = q.queue[0]
208
209 q.queue[0] = nil
210 q.queue = q.queue[1:]
211
212 q.metrics.get(item)
213
214 q.processing.insert(item)
215 q.dirty.delete(item)
216
217 return item, false
218 }
219
220
221
222
223 func (q *Type) Done(item interface{}) {
224 q.cond.L.Lock()
225 defer q.cond.L.Unlock()
226
227 q.metrics.done(item)
228
229 q.processing.delete(item)
230 if q.dirty.has(item) {
231 q.queue = append(q.queue, item)
232 q.cond.Signal()
233 } else if q.processing.len() == 0 {
234 q.cond.Signal()
235 }
236 }
237
238
239
240 func (q *Type) ShutDown() {
241 q.cond.L.Lock()
242 defer q.cond.L.Unlock()
243
244 q.drain = false
245 q.shuttingDown = true
246 q.cond.Broadcast()
247 }
248
249
250
251
252
253
254
255
256
257
258 func (q *Type) ShutDownWithDrain() {
259 q.cond.L.Lock()
260 defer q.cond.L.Unlock()
261
262 q.drain = true
263 q.shuttingDown = true
264 q.cond.Broadcast()
265
266 for q.processing.len() != 0 && q.drain {
267 q.cond.Wait()
268 }
269 }
270
271 func (q *Type) ShuttingDown() bool {
272 q.cond.L.Lock()
273 defer q.cond.L.Unlock()
274
275 return q.shuttingDown
276 }
277
278 func (q *Type) updateUnfinishedWorkLoop() {
279 t := q.clock.NewTicker(q.unfinishedWorkUpdatePeriod)
280 defer t.Stop()
281 for range t.C() {
282 if !func() bool {
283 q.cond.L.Lock()
284 defer q.cond.L.Unlock()
285 if !q.shuttingDown {
286 q.metrics.updateUnfinishedWork()
287 return true
288 }
289 return false
290
291 }() {
292 return
293 }
294 }
295 }
296
View as plain text