...
1
16
17 package workqueue
18
19 import (
20 "testing"
21 "time"
22
23 testingclock "k8s.io/utils/clock/testing"
24 )
25
26 func TestRateLimitingQueue(t *testing.T) {
27 limiter := NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Second)
28 queue := NewRateLimitingQueue(limiter).(*rateLimitingType)
29 fakeClock := testingclock.NewFakeClock(time.Now())
30 delayingQueue := &delayingType{
31 Interface: New(),
32 clock: fakeClock,
33 heartbeat: fakeClock.NewTicker(maxWait),
34 stopCh: make(chan struct{}),
35 waitingForAddCh: make(chan *waitFor, 1000),
36 metrics: newRetryMetrics("", nil),
37 }
38 queue.DelayingInterface = delayingQueue
39
40 queue.AddRateLimited("one")
41 waitEntry := <-delayingQueue.waitingForAddCh
42 if e, a := 1*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a {
43 t.Errorf("expected %v, got %v", e, a)
44 }
45 queue.AddRateLimited("one")
46 waitEntry = <-delayingQueue.waitingForAddCh
47 if e, a := 2*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a {
48 t.Errorf("expected %v, got %v", e, a)
49 }
50 if e, a := 2, queue.NumRequeues("one"); e != a {
51 t.Errorf("expected %v, got %v", e, a)
52 }
53
54 queue.AddRateLimited("two")
55 waitEntry = <-delayingQueue.waitingForAddCh
56 if e, a := 1*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a {
57 t.Errorf("expected %v, got %v", e, a)
58 }
59 queue.AddRateLimited("two")
60 waitEntry = <-delayingQueue.waitingForAddCh
61 if e, a := 2*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a {
62 t.Errorf("expected %v, got %v", e, a)
63 }
64
65 queue.Forget("one")
66 if e, a := 0, queue.NumRequeues("one"); e != a {
67 t.Errorf("expected %v, got %v", e, a)
68 }
69 queue.AddRateLimited("one")
70 waitEntry = <-delayingQueue.waitingForAddCh
71 if e, a := 1*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a {
72 t.Errorf("expected %v, got %v", e, a)
73 }
74
75 }
76
View as plain text