1
16
17 package workqueue
18
19 import (
20 "sync"
21 "testing"
22 "time"
23
24 testingclock "k8s.io/utils/clock/testing"
25 )
26
27 type testMetrics struct {
28 added, gotten, finished int64
29
30 updateCalled chan<- struct{}
31 }
32
33 func (m *testMetrics) add(item t) { m.added++ }
34 func (m *testMetrics) get(item t) { m.gotten++ }
35 func (m *testMetrics) done(item t) { m.finished++ }
36 func (m *testMetrics) updateUnfinishedWork() { m.updateCalled <- struct{}{} }
37
38 func TestMetricShutdown(t *testing.T) {
39 ch := make(chan struct{})
40 m := &testMetrics{
41 updateCalled: ch,
42 }
43 c := testingclock.NewFakeClock(time.Now())
44 q := newQueue(c, m, time.Millisecond)
45 for !c.HasWaiters() {
46
47 time.Sleep(time.Millisecond)
48 }
49
50 c.Step(time.Millisecond)
51 <-ch
52 q.ShutDown()
53
54 c.Step(time.Hour)
55 select {
56 default:
57 return
58 case <-ch:
59 t.Errorf("Unexpected update after shutdown was called.")
60 }
61 }
62
63 type testMetric struct {
64 inc int64
65 dec int64
66 set float64
67
68 observedValue float64
69 observedCount int
70
71 notifyCh chan<- struct{}
72
73 lock sync.Mutex
74 }
75
76 func (m *testMetric) Inc() {
77 m.lock.Lock()
78 defer m.lock.Unlock()
79 m.inc++
80 m.notify()
81 }
82
83 func (m *testMetric) Dec() {
84 m.lock.Lock()
85 defer m.lock.Unlock()
86 m.dec++
87 m.notify()
88 }
89
90 func (m *testMetric) Set(f float64) {
91 m.lock.Lock()
92 defer m.lock.Unlock()
93 m.set = f
94 m.notify()
95 }
96
97 func (m *testMetric) Observe(f float64) {
98 m.lock.Lock()
99 defer m.lock.Unlock()
100 m.observedValue = f
101 m.observedCount++
102 m.notify()
103 }
104
105 func (m *testMetric) gaugeValue() float64 {
106 m.lock.Lock()
107 defer m.lock.Unlock()
108 if m.set != 0 {
109 return m.set
110 }
111 return float64(m.inc - m.dec)
112 }
113
114 func (m *testMetric) observationValue() float64 {
115 m.lock.Lock()
116 defer m.lock.Unlock()
117 return m.observedValue
118 }
119
120 func (m *testMetric) observationCount() int {
121 m.lock.Lock()
122 defer m.lock.Unlock()
123 return m.observedCount
124 }
125
126 func (m *testMetric) notify() {
127 if m.notifyCh != nil {
128 m.notifyCh <- struct{}{}
129 }
130 }
131
132 type testMetricsProvider struct {
133 depth testMetric
134 adds testMetric
135 latency testMetric
136 duration testMetric
137 unfinished testMetric
138 longest testMetric
139 retries testMetric
140 }
141
142 func (m *testMetricsProvider) NewDepthMetric(name string) GaugeMetric {
143 return &m.depth
144 }
145
146 func (m *testMetricsProvider) NewAddsMetric(name string) CounterMetric {
147 return &m.adds
148 }
149
150 func (m *testMetricsProvider) NewLatencyMetric(name string) HistogramMetric {
151 return &m.latency
152 }
153
154 func (m *testMetricsProvider) NewWorkDurationMetric(name string) HistogramMetric {
155 return &m.duration
156 }
157
158 func (m *testMetricsProvider) NewUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric {
159 return &m.unfinished
160 }
161
162 func (m *testMetricsProvider) NewLongestRunningProcessorSecondsMetric(name string) SettableGaugeMetric {
163 return &m.longest
164 }
165
166 func (m *testMetricsProvider) NewRetriesMetric(name string) CounterMetric {
167 return &m.retries
168 }
169
170 func TestMetrics(t *testing.T) {
171 mp := testMetricsProvider{}
172 t0 := time.Unix(0, 0)
173 c := testingclock.NewFakeClock(t0)
174 config := QueueConfig{
175 Name: "test",
176 Clock: c,
177 MetricsProvider: &mp,
178 }
179 q := newQueueWithConfig(config, time.Millisecond)
180 defer q.ShutDown()
181 for !c.HasWaiters() {
182
183 time.Sleep(time.Millisecond)
184 }
185
186 q.Add("foo")
187 if e, a := 1.0, mp.adds.gaugeValue(); e != a {
188 t.Errorf("expected %v, got %v", e, a)
189 }
190
191 if e, a := 1.0, mp.depth.gaugeValue(); e != a {
192 t.Errorf("expected %v, got %v", e, a)
193 }
194
195 c.Step(50 * time.Microsecond)
196
197
198 i, _ := q.Get()
199 if i != "foo" {
200 t.Errorf("Expected %v, got %v", "foo", i)
201 }
202
203 if e, a := 5e-05, mp.latency.observationValue(); e != a {
204 t.Errorf("expected %v, got %v", e, a)
205 }
206 if e, a := 1, mp.latency.observationCount(); e != a {
207 t.Errorf("expected %v, got %v", e, a)
208 }
209
210
211
212 q.Add(i)
213 q.Add(i)
214 q.Add(i)
215 q.Add(i)
216 q.Add(i)
217 if e, a := 2.0, mp.adds.gaugeValue(); e != a {
218 t.Errorf("expected %v, got %v", e, a)
219 }
220
221 if e, a := 1.0, mp.depth.gaugeValue(); e != a {
222 t.Errorf("expected %v, got %v", e, a)
223 }
224
225 c.Step(25 * time.Microsecond)
226
227
228 q.Done(i)
229
230 if e, a := 2.5e-05, mp.duration.observationValue(); e != a {
231 t.Errorf("expected %v, got %v", e, a)
232 }
233 if e, a := 1, mp.duration.observationCount(); e != a {
234 t.Errorf("expected %v, got %v", e, a)
235 }
236
237
238 if e, a := 1.0, mp.depth.gaugeValue(); e != a {
239 t.Errorf("expected %v, got %v", e, a)
240 }
241
242
243 i, _ = q.Get()
244 if i != "foo" {
245 t.Errorf("Expected %v, got %v", "foo", i)
246 }
247
248 if e, a := 2.5e-05, mp.latency.observationValue(); e != a {
249 t.Errorf("expected %v, got %v", e, a)
250 }
251 if e, a := 2, mp.latency.observationCount(); e != a {
252 t.Errorf("expected %v, got %v", e, a)
253 }
254
255
256
257 ch := make(chan struct{}, 1)
258 longestCh := make(chan struct{}, 1)
259 mp.unfinished.notifyCh = ch
260 mp.longest.notifyCh = longestCh
261 c.Step(time.Millisecond)
262 <-ch
263 mp.unfinished.notifyCh = nil
264 if e, a := .001, mp.unfinished.gaugeValue(); e != a {
265 t.Errorf("expected %v, got %v", e, a)
266 }
267 <-longestCh
268 mp.longest.notifyCh = nil
269 if e, a := .001, mp.longest.gaugeValue(); e != a {
270 t.Errorf("expected %v, got %v", e, a)
271 }
272
273
274 q.Done(i)
275 if e, a := .001, mp.duration.observationValue(); e != a {
276 t.Errorf("expected %v, got %v", e, a)
277 }
278 if e, a := 2, mp.duration.observationCount(); e != a {
279 t.Errorf("expected %v, got %v", e, a)
280 }
281 }
282
View as plain text