...
1
16
17 package workqueue
18
19 import (
20 "sync"
21 "time"
22
23 "k8s.io/utils/clock"
24 )
25
26
27
28
29 type queueMetrics interface {
30 add(item t)
31 get(item t)
32 done(item t)
33 updateUnfinishedWork()
34 }
35
36
37
38 type GaugeMetric interface {
39 Inc()
40 Dec()
41 }
42
43
44
45 type SettableGaugeMetric interface {
46 Set(float64)
47 }
48
49
50
51 type CounterMetric interface {
52 Inc()
53 }
54
55
56 type SummaryMetric interface {
57 Observe(float64)
58 }
59
60
61 type HistogramMetric interface {
62 Observe(float64)
63 }
64
65 type noopMetric struct{}
66
67 func (noopMetric) Inc() {}
68 func (noopMetric) Dec() {}
69 func (noopMetric) Set(float64) {}
70 func (noopMetric) Observe(float64) {}
71
72
73 type defaultQueueMetrics struct {
74 clock clock.Clock
75
76
77 depth GaugeMetric
78
79 adds CounterMetric
80
81 latency HistogramMetric
82
83 workDuration HistogramMetric
84 addTimes map[t]time.Time
85 processingStartTimes map[t]time.Time
86
87
88 unfinishedWorkSeconds SettableGaugeMetric
89 longestRunningProcessor SettableGaugeMetric
90 }
91
92 func (m *defaultQueueMetrics) add(item t) {
93 if m == nil {
94 return
95 }
96
97 m.adds.Inc()
98 m.depth.Inc()
99 if _, exists := m.addTimes[item]; !exists {
100 m.addTimes[item] = m.clock.Now()
101 }
102 }
103
104 func (m *defaultQueueMetrics) get(item t) {
105 if m == nil {
106 return
107 }
108
109 m.depth.Dec()
110 m.processingStartTimes[item] = m.clock.Now()
111 if startTime, exists := m.addTimes[item]; exists {
112 m.latency.Observe(m.sinceInSeconds(startTime))
113 delete(m.addTimes, item)
114 }
115 }
116
117 func (m *defaultQueueMetrics) done(item t) {
118 if m == nil {
119 return
120 }
121
122 if startTime, exists := m.processingStartTimes[item]; exists {
123 m.workDuration.Observe(m.sinceInSeconds(startTime))
124 delete(m.processingStartTimes, item)
125 }
126 }
127
128 func (m *defaultQueueMetrics) updateUnfinishedWork() {
129
130
131 var total float64
132 var oldest float64
133 for _, t := range m.processingStartTimes {
134 age := m.sinceInSeconds(t)
135 total += age
136 if age > oldest {
137 oldest = age
138 }
139 }
140 m.unfinishedWorkSeconds.Set(total)
141 m.longestRunningProcessor.Set(oldest)
142 }
143
144 type noMetrics struct{}
145
146 func (noMetrics) add(item t) {}
147 func (noMetrics) get(item t) {}
148 func (noMetrics) done(item t) {}
149 func (noMetrics) updateUnfinishedWork() {}
150
151
152 func (m *defaultQueueMetrics) sinceInSeconds(start time.Time) float64 {
153 return m.clock.Since(start).Seconds()
154 }
155
156 type retryMetrics interface {
157 retry()
158 }
159
160 type defaultRetryMetrics struct {
161 retries CounterMetric
162 }
163
164 func (m *defaultRetryMetrics) retry() {
165 if m == nil {
166 return
167 }
168
169 m.retries.Inc()
170 }
171
172
173 type MetricsProvider interface {
174 NewDepthMetric(name string) GaugeMetric
175 NewAddsMetric(name string) CounterMetric
176 NewLatencyMetric(name string) HistogramMetric
177 NewWorkDurationMetric(name string) HistogramMetric
178 NewUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric
179 NewLongestRunningProcessorSecondsMetric(name string) SettableGaugeMetric
180 NewRetriesMetric(name string) CounterMetric
181 }
182
183 type noopMetricsProvider struct{}
184
185 func (_ noopMetricsProvider) NewDepthMetric(name string) GaugeMetric {
186 return noopMetric{}
187 }
188
189 func (_ noopMetricsProvider) NewAddsMetric(name string) CounterMetric {
190 return noopMetric{}
191 }
192
193 func (_ noopMetricsProvider) NewLatencyMetric(name string) HistogramMetric {
194 return noopMetric{}
195 }
196
197 func (_ noopMetricsProvider) NewWorkDurationMetric(name string) HistogramMetric {
198 return noopMetric{}
199 }
200
201 func (_ noopMetricsProvider) NewUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric {
202 return noopMetric{}
203 }
204
205 func (_ noopMetricsProvider) NewLongestRunningProcessorSecondsMetric(name string) SettableGaugeMetric {
206 return noopMetric{}
207 }
208
209 func (_ noopMetricsProvider) NewRetriesMetric(name string) CounterMetric {
210 return noopMetric{}
211 }
212
213 var globalMetricsFactory = queueMetricsFactory{
214 metricsProvider: noopMetricsProvider{},
215 }
216
217 type queueMetricsFactory struct {
218 metricsProvider MetricsProvider
219
220 onlyOnce sync.Once
221 }
222
223 func (f *queueMetricsFactory) setProvider(mp MetricsProvider) {
224 f.onlyOnce.Do(func() {
225 f.metricsProvider = mp
226 })
227 }
228
229 func (f *queueMetricsFactory) newQueueMetrics(name string, clock clock.Clock) queueMetrics {
230 mp := f.metricsProvider
231 if len(name) == 0 || mp == (noopMetricsProvider{}) {
232 return noMetrics{}
233 }
234 return &defaultQueueMetrics{
235 clock: clock,
236 depth: mp.NewDepthMetric(name),
237 adds: mp.NewAddsMetric(name),
238 latency: mp.NewLatencyMetric(name),
239 workDuration: mp.NewWorkDurationMetric(name),
240 unfinishedWorkSeconds: mp.NewUnfinishedWorkSecondsMetric(name),
241 longestRunningProcessor: mp.NewLongestRunningProcessorSecondsMetric(name),
242 addTimes: map[t]time.Time{},
243 processingStartTimes: map[t]time.Time{},
244 }
245 }
246
247 func newRetryMetrics(name string, provider MetricsProvider) retryMetrics {
248 var ret *defaultRetryMetrics
249 if len(name) == 0 {
250 return ret
251 }
252
253 if provider == nil {
254 provider = globalMetricsFactory.metricsProvider
255 }
256
257 return &defaultRetryMetrics{
258 retries: provider.NewRetriesMetric(name),
259 }
260 }
261
262
263
264 func SetProvider(metricsProvider MetricsProvider) {
265 globalMetricsFactory.setProvider(metricsProvider)
266 }
267
View as plain text