1
16
17 package workqueue
18
19 import (
20 "fmt"
21 "math/rand"
22 "reflect"
23 "testing"
24 "time"
25
26 "k8s.io/apimachinery/pkg/util/wait"
27 testingclock "k8s.io/utils/clock/testing"
28 )
29
30 func TestSimpleQueue(t *testing.T) {
31 fakeClock := testingclock.NewFakeClock(time.Now())
32 q := NewDelayingQueueWithConfig(DelayingQueueConfig{Clock: fakeClock})
33
34 first := "foo"
35
36 q.AddAfter(first, 50*time.Millisecond)
37 if err := waitForWaitingQueueToFill(q); err != nil {
38 t.Fatalf("unexpected err: %v", err)
39 }
40
41 if q.Len() != 0 {
42 t.Errorf("should not have added")
43 }
44
45 fakeClock.Step(60 * time.Millisecond)
46
47 if err := waitForAdded(q, 1); err != nil {
48 t.Errorf("should have added")
49 }
50 item, _ := q.Get()
51 q.Done(item)
52
53
54 fakeClock.Step(10 * time.Second)
55
56 err := wait.Poll(1*time.Millisecond, 30*time.Millisecond, func() (done bool, err error) {
57 if q.Len() > 0 {
58 return false, fmt.Errorf("added to queue")
59 }
60
61 return false, nil
62 })
63 if err != wait.ErrWaitTimeout {
64 t.Errorf("expected timeout, got: %v", err)
65 }
66
67 if q.Len() != 0 {
68 t.Errorf("should not have added")
69 }
70 }
71
72 func TestDeduping(t *testing.T) {
73 fakeClock := testingclock.NewFakeClock(time.Now())
74 q := NewDelayingQueueWithConfig(DelayingQueueConfig{Clock: fakeClock})
75
76 first := "foo"
77
78 q.AddAfter(first, 50*time.Millisecond)
79 if err := waitForWaitingQueueToFill(q); err != nil {
80 t.Fatalf("unexpected err: %v", err)
81 }
82 q.AddAfter(first, 70*time.Millisecond)
83 if err := waitForWaitingQueueToFill(q); err != nil {
84 t.Fatalf("unexpected err: %v", err)
85 }
86 if q.Len() != 0 {
87 t.Errorf("should not have added")
88 }
89
90
91 fakeClock.Step(60 * time.Millisecond)
92 if err := waitForAdded(q, 1); err != nil {
93 t.Errorf("should have added")
94 }
95 item, _ := q.Get()
96 q.Done(item)
97
98
99 fakeClock.Step(20 * time.Millisecond)
100 if q.Len() != 0 {
101 t.Errorf("should not have added")
102 }
103
104
105 q.AddAfter(first, 50*time.Millisecond)
106 q.AddAfter(first, 30*time.Millisecond)
107 if err := waitForWaitingQueueToFill(q); err != nil {
108 t.Fatalf("unexpected err: %v", err)
109 }
110 if q.Len() != 0 {
111 t.Errorf("should not have added")
112 }
113
114 fakeClock.Step(40 * time.Millisecond)
115 if err := waitForAdded(q, 1); err != nil {
116 t.Errorf("should have added")
117 }
118 item, _ = q.Get()
119 q.Done(item)
120
121
122 fakeClock.Step(20 * time.Millisecond)
123 if q.Len() != 0 {
124 t.Errorf("should not have added")
125 }
126 }
127
128 func TestAddTwoFireEarly(t *testing.T) {
129 fakeClock := testingclock.NewFakeClock(time.Now())
130 q := NewDelayingQueueWithConfig(DelayingQueueConfig{Clock: fakeClock})
131
132 first := "foo"
133 second := "bar"
134 third := "baz"
135
136 q.AddAfter(first, 1*time.Second)
137 q.AddAfter(second, 50*time.Millisecond)
138 if err := waitForWaitingQueueToFill(q); err != nil {
139 t.Fatalf("unexpected err: %v", err)
140 }
141
142 if q.Len() != 0 {
143 t.Errorf("should not have added")
144 }
145
146 fakeClock.Step(60 * time.Millisecond)
147
148 if err := waitForAdded(q, 1); err != nil {
149 t.Fatalf("unexpected err: %v", err)
150 }
151 item, _ := q.Get()
152 if !reflect.DeepEqual(item, second) {
153 t.Errorf("expected %v, got %v", second, item)
154 }
155
156 q.AddAfter(third, 2*time.Second)
157
158 fakeClock.Step(1 * time.Second)
159 if err := waitForAdded(q, 1); err != nil {
160 t.Fatalf("unexpected err: %v", err)
161 }
162 item, _ = q.Get()
163 if !reflect.DeepEqual(item, first) {
164 t.Errorf("expected %v, got %v", first, item)
165 }
166
167 fakeClock.Step(2 * time.Second)
168 if err := waitForAdded(q, 1); err != nil {
169 t.Fatalf("unexpected err: %v", err)
170 }
171 item, _ = q.Get()
172 if !reflect.DeepEqual(item, third) {
173 t.Errorf("expected %v, got %v", third, item)
174 }
175 }
176
177 func TestCopyShifting(t *testing.T) {
178 fakeClock := testingclock.NewFakeClock(time.Now())
179 q := NewDelayingQueueWithConfig(DelayingQueueConfig{Clock: fakeClock})
180
181 first := "foo"
182 second := "bar"
183 third := "baz"
184
185 q.AddAfter(first, 1*time.Second)
186 q.AddAfter(second, 500*time.Millisecond)
187 q.AddAfter(third, 250*time.Millisecond)
188 if err := waitForWaitingQueueToFill(q); err != nil {
189 t.Fatalf("unexpected err: %v", err)
190 }
191
192 if q.Len() != 0 {
193 t.Errorf("should not have added")
194 }
195
196 fakeClock.Step(2 * time.Second)
197
198 if err := waitForAdded(q, 3); err != nil {
199 t.Fatalf("unexpected err: %v", err)
200 }
201 actualFirst, _ := q.Get()
202 if !reflect.DeepEqual(actualFirst, third) {
203 t.Errorf("expected %v, got %v", third, actualFirst)
204 }
205 actualSecond, _ := q.Get()
206 if !reflect.DeepEqual(actualSecond, second) {
207 t.Errorf("expected %v, got %v", second, actualSecond)
208 }
209 actualThird, _ := q.Get()
210 if !reflect.DeepEqual(actualThird, first) {
211 t.Errorf("expected %v, got %v", first, actualThird)
212 }
213 }
214
215 func BenchmarkDelayingQueue_AddAfter(b *testing.B) {
216 fakeClock := testingclock.NewFakeClock(time.Now())
217 q := NewDelayingQueueWithConfig(DelayingQueueConfig{Clock: fakeClock})
218
219
220 for n := 0; n < b.N; n++ {
221 data := fmt.Sprintf("%d", n)
222 q.AddAfter(data, time.Duration(rand.Int63n(int64(10*time.Minute))))
223 }
224
225
226 fakeClock.Step(11 * time.Minute)
227 for n := 0; n < b.N; n++ {
228 _, _ = q.Get()
229 }
230 }
231
232 func waitForAdded(q DelayingInterface, depth int) error {
233 return wait.Poll(1*time.Millisecond, 10*time.Second, func() (done bool, err error) {
234 if q.Len() == depth {
235 return true, nil
236 }
237
238 return false, nil
239 })
240 }
241
242 func waitForWaitingQueueToFill(q DelayingInterface) error {
243 return wait.Poll(1*time.Millisecond, 10*time.Second, func() (done bool, err error) {
244 if len(q.(*delayingType).waitingForAddCh) == 0 {
245 return true, nil
246 }
247
248 return false, nil
249 })
250 }
251
View as plain text