1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package metricexport
17
18 import (
19 "context"
20 "sync"
21 "testing"
22 "time"
23
24 "go.opencensus.io/metric"
25 "go.opencensus.io/metric/metricdata"
26 "go.opencensus.io/metric/metricproducer"
27 )
28
29 var (
30 ir1 *IntervalReader
31 ir2 *IntervalReader
32 reader1 = NewReader(WithSpanName("test-export-span"))
33 exporter1 = &metricExporter{}
34 exporter2 = &metricExporter{}
35 gaugeEntry *metric.Int64GaugeEntry
36 duration1 = 1000 * time.Millisecond
37 duration2 = 2000 * time.Millisecond
38 )
39
40 type metricExporter struct {
41 sync.Mutex
42 metrics []*metricdata.Metric
43 }
44
45 func (e *metricExporter) ExportMetrics(ctx context.Context, metrics []*metricdata.Metric) error {
46 e.Lock()
47 defer e.Unlock()
48
49 e.metrics = append(e.metrics, metrics...)
50 return nil
51 }
52
53 func init() {
54 r := metric.NewRegistry()
55 metricproducer.GlobalManager().AddProducer(r)
56 g, _ := r.AddInt64Gauge("active_request",
57 metric.WithDescription("Number of active requests, per method."),
58 metric.WithUnit(metricdata.UnitDimensionless),
59 metric.WithLabelKeys("method"))
60 gaugeEntry, _ = g.GetEntry(metricdata.NewLabelValue("foo"))
61 }
62
63 func TestNewReaderWitDefaultOptions(t *testing.T) {
64 r := NewReader()
65
66 if r.spanName != defaultSpanName {
67 t.Errorf("span name: got %v, want %v\n", r.spanName, defaultSpanName)
68 }
69 }
70
71 func TestNewReaderWitSpanName(t *testing.T) {
72 spanName := "test-span"
73 r := NewReader(WithSpanName(spanName))
74
75 if r.spanName != spanName {
76 t.Errorf("span name: got %+v, want %v\n", r.spanName, spanName)
77 }
78 }
79
80 func TestNewReader(t *testing.T) {
81 r := NewReader()
82
83 gaugeEntry.Add(1)
84
85 r.ReadAndExport(exporter1)
86
87 checkExportedCount(exporter1, 1, t)
88 checkExportedMetricDesc(exporter1, "active_request", t)
89 resetExporter(exporter1)
90 }
91
92 func TestNewIntervalReader(t *testing.T) {
93 ir1 = createAndStart(exporter1, duration1, t)
94
95 gaugeEntry.Add(1)
96
97 time.Sleep(1500 * time.Millisecond)
98 checkExportedCount(exporter1, 1, t)
99 checkExportedMetricDesc(exporter1, "active_request", t)
100 ir1.Stop()
101 resetExporter(exporter1)
102 }
103
104 func TestManualReadForIntervalReader(t *testing.T) {
105 ir1 = createAndStart(exporter1, duration1, t)
106
107 gaugeEntry.Set(1)
108 reader1.ReadAndExport(exporter1)
109 gaugeEntry.Set(4)
110
111 time.Sleep(1500 * time.Millisecond)
112
113 checkExportedCount(exporter1, 2, t)
114 checkExportedValues(exporter1, []int64{1, 4}, t)
115 checkExportedMetricDesc(exporter1, "active_request", t)
116 ir1.Stop()
117 resetExporter(exporter1)
118 }
119
120 func TestFlushNoOpForIntervalReader(t *testing.T) {
121 ir1 = createAndStart(exporter1, duration1, t)
122
123 gaugeEntry.Set(1)
124
125
126 ir1.Flush()
127
128
129 checkExportedCount(exporter1, 0, t)
130 checkExportedMetricDesc(exporter1, "active_request", t)
131 ir1.Stop()
132 resetExporter(exporter1)
133 }
134
135 func TestFlushAllowMultipleForIntervalReader(t *testing.T) {
136 ir1 = createAndStart(exporter1, duration1, t)
137
138 gaugeEntry.Set(1)
139
140 ir1.Stop()
141 ir1.Flush()
142
143
144 gaugeEntry.Add(1)
145
146
147 ir1.Flush()
148
149
150 checkExportedCount(exporter1, 2, t)
151 checkExportedValues(exporter1, []int64{1, 2}, t)
152 checkExportedMetricDesc(exporter1, "active_request", t)
153
154 resetExporter(exporter1)
155 }
156
157 func TestFlushRestartForIntervalReader(t *testing.T) {
158 ir1 = createAndStart(exporter1, duration1, t)
159
160 gaugeEntry.Set(1)
161 ir1.Stop()
162 ir1.Flush()
163
164
165 err := ir1.Start()
166 if err != nil {
167 t.Fatalf("error starting reader %v\n", err)
168 }
169
170 gaugeEntry.Add(1)
171
172 ir1.Stop()
173 ir1.Flush()
174
175
176 checkExportedCount(exporter1, 2, t)
177 checkExportedValues(exporter1, []int64{1, 2}, t)
178 checkExportedMetricDesc(exporter1, "active_request", t)
179
180 resetExporter(exporter1)
181 }
182
183 func TestProducerWithIntervalReaderStop(t *testing.T) {
184 ir1 = createAndStart(exporter1, duration1, t)
185 ir1.Stop()
186
187 gaugeEntry.Add(1)
188
189 time.Sleep(1500 * time.Millisecond)
190
191 checkExportedCount(exporter1, 0, t)
192 checkExportedMetricDesc(exporter1, "active_request", t)
193 resetExporter(exporter1)
194 }
195
196 func TestProducerWithMultipleIntervalReaders(t *testing.T) {
197 ir1 = createAndStart(exporter1, duration1, t)
198 ir2 = createAndStart(exporter2, duration2, t)
199
200 gaugeEntry.Add(1)
201
202 time.Sleep(2500 * time.Millisecond)
203
204 checkExportedCount(exporter1, 2, t)
205 checkExportedMetricDesc(exporter1, "active_request", t)
206 checkExportedCount(exporter2, 1, t)
207 checkExportedMetricDesc(exporter2, "active_request", t)
208 ir1.Stop()
209 ir2.Stop()
210 resetExporter(exporter1)
211 resetExporter(exporter1)
212 }
213
214 func TestIntervalReaderMultipleStop(t *testing.T) {
215 ir1 = createAndStart(exporter1, duration1, t)
216 stop := make(chan bool, 1)
217 go func() {
218 ir1.Stop()
219 ir1.Stop()
220 stop <- true
221 }()
222
223 select {
224 case _ = <-stop:
225 case <-time.After(1 * time.Second):
226 t.Fatalf("ir1 stop got blocked")
227 }
228 }
229
230 func TestIntervalReaderMultipleStart(t *testing.T) {
231 ir1 = createAndStart(exporter1, duration1, t)
232 err := ir1.Start()
233 if err == nil {
234 t.Fatalf("expected error but got nil\n")
235 }
236
237 gaugeEntry.Add(1)
238
239 time.Sleep(1500 * time.Millisecond)
240
241 checkExportedCount(exporter1, 1, t)
242 checkExportedMetricDesc(exporter1, "active_request", t)
243 ir1.Stop()
244 resetExporter(exporter1)
245 }
246
247 func TestNewIntervalReaderWithNilReader(t *testing.T) {
248 _, err := NewIntervalReader(nil, exporter1)
249 if err == nil {
250 t.Fatalf("expected error but got nil\n")
251 }
252 }
253
254 func TestNewIntervalReaderWithNilExporter(t *testing.T) {
255 _, err := NewIntervalReader(reader1, nil)
256 if err == nil {
257 t.Fatalf("expected error but got nil\n")
258 }
259 }
260
261 func TestNewIntervalReaderStartWithInvalidInterval(t *testing.T) {
262 ir, err := NewIntervalReader(reader1, exporter1)
263 ir.ReportingInterval = 500 * time.Millisecond
264 err = ir.Start()
265 if err == nil {
266 t.Fatalf("expected error but got nil\n")
267 }
268 }
269
270 func checkExportedCount(exporter *metricExporter, wantCount int, t *testing.T) {
271 exporter.Lock()
272 defer exporter.Unlock()
273 gotCount := len(exporter.metrics)
274 if gotCount != wantCount {
275 t.Fatalf("exported metric count: got %d, want %d\n", gotCount, wantCount)
276 }
277 }
278
279 func checkExportedValues(exporter *metricExporter, wantValues []int64, t *testing.T) {
280 exporter.Lock()
281 defer exporter.Unlock()
282 gotCount := len(exporter.metrics)
283 wantCount := len(wantValues)
284 if gotCount != wantCount {
285 t.Errorf("exported metric count: got %d, want %d\n", gotCount, wantCount)
286 return
287 }
288 for i, wantValue := range wantValues {
289 var gotValue int64
290 switch v := exporter.metrics[i].TimeSeries[0].Points[0].Value.(type) {
291 case int64:
292 gotValue = v
293 default:
294 t.Errorf("expected float64 value but found other %T", exporter.metrics[i].TimeSeries[0].Points[0].Value)
295 }
296 if gotValue != wantValue {
297 t.Errorf("values idx %d, got: %v, want %v", i, gotValue, wantValue)
298 }
299 }
300 }
301
302 func checkExportedMetricDesc(exporter *metricExporter, wantMdName string, t *testing.T) {
303 exporter.Lock()
304 defer exporter.Unlock()
305 for _, metric := range exporter.metrics {
306 gotMdName := metric.Descriptor.Name
307 if gotMdName != wantMdName {
308 t.Errorf("got %s, want %s\n", gotMdName, wantMdName)
309 }
310 }
311 exporter.metrics = nil
312 }
313
314 func resetExporter(exporter *metricExporter) {
315 exporter.Lock()
316 defer exporter.Unlock()
317 exporter.metrics = nil
318 }
319
320
321 func createAndStart(exporter *metricExporter, d time.Duration, t *testing.T) *IntervalReader {
322 ir, _ := NewIntervalReader(reader1, exporter)
323 ir.ReportingInterval = d
324 err := ir.Start()
325 if err != nil {
326 t.Fatalf("error creating reader %v\n", err)
327 }
328 return ir
329 }
330
View as plain text