1
16
17 package workqueue_test
18
19 import (
20 "runtime"
21 "sync"
22 "sync/atomic"
23 "testing"
24 "time"
25
26 "k8s.io/apimachinery/pkg/util/wait"
27 "k8s.io/client-go/util/workqueue"
28 )
29
30 func TestBasic(t *testing.T) {
31 tests := []struct {
32 queue *workqueue.Type
33 queueShutDown func(workqueue.Interface)
34 }{
35 {
36 queue: workqueue.New(),
37 queueShutDown: workqueue.Interface.ShutDown,
38 },
39 {
40 queue: workqueue.New(),
41 queueShutDown: workqueue.Interface.ShutDownWithDrain,
42 },
43 }
44 for _, test := range tests {
45
46
47
48 const producers = 50
49 producerWG := sync.WaitGroup{}
50 producerWG.Add(producers)
51 for i := 0; i < producers; i++ {
52 go func(i int) {
53 defer producerWG.Done()
54 for j := 0; j < 50; j++ {
55 test.queue.Add(i)
56 time.Sleep(time.Millisecond)
57 }
58 }(i)
59 }
60
61
62 const consumers = 10
63 consumerWG := sync.WaitGroup{}
64 consumerWG.Add(consumers)
65 for i := 0; i < consumers; i++ {
66 go func(i int) {
67 defer consumerWG.Done()
68 for {
69 item, quit := test.queue.Get()
70 if item == "added after shutdown!" {
71 t.Errorf("Got an item added after shutdown.")
72 }
73 if quit {
74 return
75 }
76 t.Logf("Worker %v: begin processing %v", i, item)
77 time.Sleep(3 * time.Millisecond)
78 t.Logf("Worker %v: done processing %v", i, item)
79 test.queue.Done(item)
80 }
81 }(i)
82 }
83
84 producerWG.Wait()
85 test.queueShutDown(test.queue)
86 test.queue.Add("added after shutdown!")
87 consumerWG.Wait()
88 if test.queue.Len() != 0 {
89 t.Errorf("Expected the queue to be empty, had: %v items", test.queue.Len())
90 }
91 }
92 }
93
94 func TestAddWhileProcessing(t *testing.T) {
95 tests := []struct {
96 queue *workqueue.Type
97 queueShutDown func(workqueue.Interface)
98 }{
99 {
100 queue: workqueue.New(),
101 queueShutDown: workqueue.Interface.ShutDown,
102 },
103 {
104 queue: workqueue.New(),
105 queueShutDown: workqueue.Interface.ShutDownWithDrain,
106 },
107 }
108 for _, test := range tests {
109
110
111 const producers = 50
112 producerWG := sync.WaitGroup{}
113 producerWG.Add(producers)
114 for i := 0; i < producers; i++ {
115 go func(i int) {
116 defer producerWG.Done()
117 test.queue.Add(i)
118 }(i)
119 }
120
121
122 const consumers = 10
123 consumerWG := sync.WaitGroup{}
124 consumerWG.Add(consumers)
125 for i := 0; i < consumers; i++ {
126 go func(i int) {
127 defer consumerWG.Done()
128
129
130 counters := map[interface{}]int{}
131 for {
132 item, quit := test.queue.Get()
133 if quit {
134 return
135 }
136 counters[item]++
137 if counters[item] < 2 {
138 test.queue.Add(item)
139 }
140 test.queue.Done(item)
141 }
142 }(i)
143 }
144
145 producerWG.Wait()
146 test.queueShutDown(test.queue)
147 consumerWG.Wait()
148 if test.queue.Len() != 0 {
149 t.Errorf("Expected the queue to be empty, had: %v items", test.queue.Len())
150 }
151 }
152 }
153
154 func TestLen(t *testing.T) {
155 q := workqueue.New()
156 q.Add("foo")
157 if e, a := 1, q.Len(); e != a {
158 t.Errorf("Expected %v, got %v", e, a)
159 }
160 q.Add("bar")
161 if e, a := 2, q.Len(); e != a {
162 t.Errorf("Expected %v, got %v", e, a)
163 }
164 q.Add("foo")
165 if e, a := 2, q.Len(); e != a {
166 t.Errorf("Expected %v, got %v", e, a)
167 }
168 }
169
170 func TestReinsert(t *testing.T) {
171 q := workqueue.New()
172 q.Add("foo")
173
174
175 i, _ := q.Get()
176 if i != "foo" {
177 t.Errorf("Expected %v, got %v", "foo", i)
178 }
179
180
181 q.Add(i)
182
183
184 q.Done(i)
185
186
187 i, _ = q.Get()
188 if i != "foo" {
189 t.Errorf("Expected %v, got %v", "foo", i)
190 }
191
192
193 q.Done(i)
194
195 if a := q.Len(); a != 0 {
196 t.Errorf("Expected queue to be empty. Has %v items", a)
197 }
198 }
199
200 func TestCollapse(t *testing.T) {
201 q := workqueue.New()
202
203 q.Add("bar")
204 q.Add("bar")
205
206
207 i, _ := q.Get()
208 if i != "bar" {
209 t.Errorf("Expected %v, got %v", "bar", i)
210 }
211
212
213 q.Done(i)
214
215
216 if a := q.Len(); a != 0 {
217 t.Errorf("Expected queue to be empty. Has %v items", a)
218 }
219 }
220
221 func TestCollapseWhileProcessing(t *testing.T) {
222 q := workqueue.New()
223 q.Add("foo")
224
225
226 i, _ := q.Get()
227 if i != "foo" {
228 t.Errorf("Expected %v, got %v", "foo", i)
229 }
230
231
232 q.Add("foo")
233 q.Add("foo")
234
235 waitCh := make(chan struct{})
236
237 go func() {
238 defer close(waitCh)
239 i, _ := q.Get()
240 if i != "foo" {
241 t.Errorf("Expected %v, got %v", "foo", i)
242 }
243
244 q.Done(i)
245 }()
246
247
248
249 time.Sleep(100 * time.Millisecond)
250
251 select {
252 case <-waitCh:
253 t.Errorf("worker should be blocked until we are done")
254 default:
255 q.Done("foo")
256 }
257
258
259
260 <-waitCh
261 if a := q.Len(); a != 0 {
262 t.Errorf("Expected queue to be empty. Has %v items", a)
263 }
264 }
265
266 func TestQueueDrainageUsingShutDownWithDrain(t *testing.T) {
267
268 q := workqueue.New()
269
270 q.Add("foo")
271 q.Add("bar")
272
273 firstItem, _ := q.Get()
274 secondItem, _ := q.Get()
275
276 finishedWG := sync.WaitGroup{}
277 finishedWG.Add(1)
278 go func() {
279 defer finishedWG.Done()
280 q.ShutDownWithDrain()
281 }()
282
283
284
285
286 shuttingDown := false
287 for !shuttingDown {
288 _, shuttingDown = q.Get()
289 }
290
291
292 q.Done(firstItem)
293 q.Done(secondItem)
294
295 finishedWG.Wait()
296 }
297
298 func TestNoQueueDrainageUsingShutDown(t *testing.T) {
299
300 q := workqueue.New()
301
302 q.Add("foo")
303 q.Add("bar")
304
305 q.Get()
306 q.Get()
307
308 finishedWG := sync.WaitGroup{}
309 finishedWG.Add(1)
310 go func() {
311 defer finishedWG.Done()
312
313 q.ShutDown()
314 }()
315
316
317
318 finishedWG.Wait()
319 }
320
321 func TestForceQueueShutdownUsingShutDown(t *testing.T) {
322
323 q := workqueue.New()
324
325 q.Add("foo")
326 q.Add("bar")
327
328 q.Get()
329 q.Get()
330
331 finishedWG := sync.WaitGroup{}
332 finishedWG.Add(1)
333 go func() {
334 defer finishedWG.Done()
335 q.ShutDownWithDrain()
336 }()
337
338
339
340 shuttingDown := false
341 for !shuttingDown {
342 _, shuttingDown = q.Get()
343 }
344
345
346
347 q.ShutDown()
348
349
350
351 finishedWG.Wait()
352 }
353
354 func TestQueueDrainageUsingShutDownWithDrainWithDirtyItem(t *testing.T) {
355 q := workqueue.New()
356
357 q.Add("foo")
358 gotten, _ := q.Get()
359 q.Add("foo")
360
361 finishedWG := sync.WaitGroup{}
362 finishedWG.Add(1)
363 go func() {
364 defer finishedWG.Done()
365 q.ShutDownWithDrain()
366 }()
367
368
369 shuttingDown := false
370 for !shuttingDown {
371 _, shuttingDown = q.Get()
372 }
373
374
375 q.Done(gotten)
376
377
378
379 again, shuttingDown := q.Get()
380 if shuttingDown {
381 t.Fatalf("should not have been done")
382 }
383 q.Done(again)
384
385
386 _, shuttingDown = q.Get()
387 if !shuttingDown {
388 t.Fatalf("should have been done")
389 }
390
391 finishedWG.Wait()
392 }
393
394
395
396 func TestGarbageCollection(t *testing.T) {
397 type bigObject struct {
398 data []byte
399 }
400 leakQueue := workqueue.New()
401 t.Cleanup(func() {
402
403 runtime.KeepAlive(leakQueue)
404 })
405 c := &bigObject{data: []byte("hello")}
406 mustGarbageCollect(t, c)
407 leakQueue.Add(c)
408 o, _ := leakQueue.Get()
409 leakQueue.Done(o)
410 }
411
412
413
414 func mustGarbageCollect(t *testing.T, i interface{}) {
415 t.Helper()
416 var collected int32 = 0
417 runtime.SetFinalizer(i, func(x interface{}) {
418 atomic.StoreInt32(&collected, 1)
419 })
420 t.Cleanup(func() {
421 if err := wait.PollImmediate(time.Millisecond*100, wait.ForeverTestTimeout, func() (done bool, err error) {
422
423 runtime.GC()
424 return atomic.LoadInt32(&collected) == 1, nil
425 }); err != nil {
426 t.Errorf("object was not garbage collected")
427 }
428 })
429 }
430
View as plain text