1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package view
17
18 import (
19 "context"
20 "errors"
21 "sort"
22 "sync"
23 "testing"
24 "time"
25
26 "github.com/google/go-cmp/cmp"
27 "go.opencensus.io/resource"
28
29 "go.opencensus.io/metric/metricdata"
30 "go.opencensus.io/metric/metricexport"
31 "go.opencensus.io/stats"
32 "go.opencensus.io/tag"
33 )
34
35 func Test_Worker_ViewRegistration(t *testing.T) {
36 someError := errors.New("some error")
37
38 sc1 := make(chan *Data)
39
40 type registration struct {
41 c chan *Data
42 vID string
43 err error
44 }
45 type testCase struct {
46 label string
47 registrations []registration
48 }
49 tcs := []testCase{
50 {
51 "register v1ID",
52 []registration{
53 {
54 sc1,
55 "v1ID",
56 nil,
57 },
58 },
59 },
60 {
61 "register v1ID+v2ID",
62 []registration{
63 {
64 sc1,
65 "v1ID",
66 nil,
67 },
68 },
69 },
70 {
71 "register to v1ID; ??? to v1ID and view with same ID",
72 []registration{
73 {
74 sc1,
75 "v1ID",
76 nil,
77 },
78 {
79 sc1,
80 "v1SameNameID",
81 someError,
82 },
83 },
84 },
85 }
86
87 mf1 := stats.Float64("MF1/Test_Worker_ViewSubscription", "desc MF1", "unit")
88 mf2 := stats.Float64("MF2/Test_Worker_ViewSubscription", "desc MF2", "unit")
89
90 for _, tc := range tcs {
91 t.Run(tc.label, func(t *testing.T) {
92 restart()
93
94 views := map[string]*View{
95 "v1ID": {
96 Name: "VF1",
97 Measure: mf1,
98 Aggregation: Count(),
99 },
100 "v1SameNameID": {
101 Name: "VF1",
102 Description: "desc duplicate name VF1",
103 Measure: mf1,
104 Aggregation: Sum(),
105 },
106 "v2ID": {
107 Name: "VF2",
108 Measure: mf2,
109 Aggregation: Count(),
110 },
111 "vNilID": nil,
112 }
113
114 for _, r := range tc.registrations {
115 v := views[r.vID]
116 err := Register(v)
117 if (err != nil) != (r.err != nil) {
118 t.Errorf("%v: Register() = %v, want %v", tc.label, err, r.err)
119 }
120 }
121 })
122 }
123 }
124
125 func Test_Worker_MultiExport(t *testing.T) {
126 restart()
127
128
129
130 extraResource := resource.Resource{
131 Type: "additional",
132 Labels: map[string]string{"key1": "value1", "key2": "value2"},
133 }
134 worker2 := NewMeter().(*worker)
135 worker2.Start()
136 worker2.SetResource(&extraResource)
137
138 m := stats.Float64("Test_Worker_MultiExport/MF1", "desc MF1", "unit")
139 key := tag.MustNewKey(("key"))
140 count := &View{"VF1", "description", []tag.Key{key}, m, Count()}
141 sum := &View{"VF2", "description", []tag.Key{}, m, Sum()}
142
143 Register(count, sum)
144 worker2.Register(count)
145 data := []struct {
146 w Meter
147 tags string
148 value float64
149 }{{
150 tags: "a",
151 value: 2.0,
152 }, {
153 tags: "b",
154 value: 3.0,
155 }, {
156 tags: "a", value: 2.5,
157 }, {
158 w: worker2, tags: "b", value: 1.0,
159 },
160 }
161
162 for _, d := range data {
163 ctx, err := tag.New(context.Background(), tag.Upsert(key, d.tags))
164 if err != nil {
165 t.Fatalf("%s: failed to add tag %q: %v", d.w, key.Name(), err)
166 }
167 if d.w != nil {
168 d.w.Record(tag.FromContext(ctx), []stats.Measurement{m.M(d.value)}, nil)
169 } else {
170 stats.Record(ctx, m.M(d.value))
171 }
172 }
173
174 makeKey := func(r *resource.Resource, view string) string {
175 if r == nil {
176 r = &resource.Resource{}
177 }
178 return resource.EncodeLabels(r.Labels) + "/" + view
179 }
180
181
182 wantPartialData := map[string][]*Row{
183 makeKey(nil, count.Name): {
184 {[]tag.Tag{{Key: key, Value: "a"}}, &CountData{Value: 2}},
185 {[]tag.Tag{{Key: key, Value: "b"}}, &CountData{Value: 1}},
186 },
187 makeKey(nil, sum.Name): {
188 {nil, &SumData{Value: 7.5}},
189 },
190 makeKey(&extraResource, count.Name): {
191 {[]tag.Tag{{Key: key, Value: "b"}}, &CountData{Value: 1}},
192 },
193 }
194
195 te := &testExporter{}
196 metricexport.NewReader().ReadAndExport(te)
197 for _, m := range te.metrics {
198 key := makeKey(m.Resource, m.Descriptor.Name)
199 want, ok := wantPartialData[key]
200 if !ok {
201 t.Errorf("Unexpected data for %q: %v", key, m)
202 continue
203 }
204 gotTs := m.TimeSeries
205 sort.Sort(byLabel(gotTs))
206
207 for i, ts := range gotTs {
208 for j, label := range ts.LabelValues {
209 if want[i].Tags[j].Value != label.Value {
210 t.Errorf("Mismatched tag values (want %q, got %q) for %v in %q", want[i].Tags[j].Value, label.Value, ts, key)
211 }
212 }
213 switch wantValue := want[i].Data.(type) {
214 case *CountData:
215 got := ts.Points[0].Value.(int64)
216 if wantValue.Value != got {
217 t.Errorf("Mismatched value (want %d, got %d) for %v in %q", wantValue.Value, got, ts, key)
218 }
219 case *SumData:
220 got := ts.Points[0].Value.(float64)
221 if wantValue.Value != got {
222 t.Errorf("Mismatched value (want %f, got %f) for %v in %q", wantValue.Value, got, ts, key)
223 }
224 default:
225 t.Errorf("Unexpected type of data: %T for %v in %q", wantValue, want[i], key)
226 }
227 }
228 }
229
230
231 got, err := worker2.RetrieveData(sum.Name)
232 if err == nil {
233 t.Errorf("%s: expected no data because it was not registered, got %#v", sum.Name, got)
234 }
235
236 Unregister(count, sum)
237 worker2.Unregister(count)
238 worker2.Stop()
239 }
240
241 func Test_Worker_RecordFloat64(t *testing.T) {
242 restart()
243
244 someError := errors.New("some error")
245 m := stats.Float64("Test_Worker_RecordFloat64/MF1", "desc MF1", "unit")
246
247 k1 := tag.MustNewKey("k1")
248 k2 := tag.MustNewKey("k2")
249 ctx, err := tag.New(context.Background(),
250 tag.Insert(k1, "v1"),
251 tag.Insert(k2, "v2"),
252 )
253 if err != nil {
254 t.Fatal(err)
255 }
256
257 v1 := &View{"VF1", "desc VF1", []tag.Key{k1, k2}, m, Count()}
258 v2 := &View{"VF2", "desc VF2", []tag.Key{k1, k2}, m, Count()}
259
260 type want struct {
261 v *View
262 rows []*Row
263 err error
264 }
265 type testCase struct {
266 label string
267 registrations []*View
268 records []float64
269 wants []want
270 }
271
272 tcs := []testCase{
273 {
274 label: "0",
275 registrations: []*View{},
276 records: []float64{1, 1},
277 wants: []want{{v1, nil, someError}, {v2, nil, someError}},
278 },
279 {
280 label: "1",
281 registrations: []*View{v1},
282 records: []float64{1, 1},
283 wants: []want{
284 {
285 v1,
286 []*Row{
287 {
288 []tag.Tag{{Key: k1, Value: "v1"}, {Key: k2, Value: "v2"}},
289 &CountData{Value: 2},
290 },
291 },
292 nil,
293 },
294 {v2, nil, someError},
295 },
296 },
297 {
298 label: "2",
299 registrations: []*View{v1, v2},
300 records: []float64{1, 1},
301 wants: []want{
302 {
303 v1,
304 []*Row{
305 {
306 []tag.Tag{{Key: k1, Value: "v1"}, {Key: k2, Value: "v2"}},
307 &CountData{Value: 2},
308 },
309 },
310 nil,
311 },
312 {
313 v2,
314 []*Row{
315 {
316 []tag.Tag{{Key: k1, Value: "v1"}, {Key: k2, Value: "v2"}},
317 &CountData{Value: 2},
318 },
319 },
320 nil,
321 },
322 },
323 },
324 }
325
326 for _, tc := range tcs {
327 for _, v := range tc.registrations {
328 if err := Register(v); err != nil {
329 t.Fatalf("%v: Register(%v) = %v; want no errors", tc.label, v.Name, err)
330 }
331 }
332
333 for _, value := range tc.records {
334 stats.Record(ctx, m.M(value))
335 }
336
337 for _, w := range tc.wants {
338 gotRows, err := RetrieveData(w.v.Name)
339 for i := range gotRows {
340 switch data := gotRows[i].Data.(type) {
341 case *CountData:
342 data.Start = time.Time{}
343 case *SumData:
344 data.Start = time.Time{}
345 case *DistributionData:
346 data.Start = time.Time{}
347 }
348 }
349 if (err != nil) != (w.err != nil) {
350 t.Fatalf("%s: RetrieveData(%v) = %v; want error = %v", tc.label, w.v.Name, err, w.err)
351 }
352 if diff := cmp.Diff(gotRows, w.rows); diff != "" {
353 t.Errorf("%v: unexpected row (got-, want+): %s", tc.label, diff)
354 break
355 }
356 }
357
358
359 Unregister(tc.registrations...)
360 }
361 }
362
363 func TestReportUsage(t *testing.T) {
364 ctx := context.Background()
365
366 m := stats.Int64("measure", "desc", "unit")
367
368 tests := []struct {
369 name string
370 view *View
371 wantMaxCount int64
372 }{
373 {
374 name: "cum",
375 view: &View{Name: "cum1", Measure: m, Aggregation: Count()},
376 wantMaxCount: 8,
377 },
378 {
379 name: "cum2",
380 view: &View{Name: "cum1", Measure: m, Aggregation: Count()},
381 wantMaxCount: 8,
382 },
383 }
384
385 for _, tt := range tests {
386 restart()
387 SetReportingPeriod(25 * time.Millisecond)
388
389 if err := Register(tt.view); err != nil {
390 t.Fatalf("%v: cannot register: %v", tt.name, err)
391 }
392
393 e := &countExporter{}
394 RegisterExporter(e)
395
396 stats.Record(ctx, m.M(1))
397 stats.Record(ctx, m.M(1))
398 stats.Record(ctx, m.M(1))
399 stats.Record(ctx, m.M(1))
400
401 time.Sleep(50 * time.Millisecond)
402
403 stats.Record(ctx, m.M(1))
404 stats.Record(ctx, m.M(1))
405 stats.Record(ctx, m.M(1))
406 stats.Record(ctx, m.M(1))
407
408 time.Sleep(50 * time.Millisecond)
409
410 e.Lock()
411 count := e.count
412 e.Unlock()
413 if got, want := count, tt.wantMaxCount; got > want {
414 t.Errorf("%v: got count data = %v; want at most %v", tt.name, got, want)
415 }
416 }
417
418 }
419
420 func Test_SetReportingPeriodReqNeverBlocks(t *testing.T) {
421 t.Parallel()
422
423 worker := NewMeter().(*worker)
424 durations := []time.Duration{-1, 0, 10, 100 * time.Millisecond}
425 for i, duration := range durations {
426 ackChan := make(chan bool, 1)
427 cmd := &setReportingPeriodReq{c: ackChan, d: duration}
428 cmd.handleCommand(worker)
429
430 select {
431 case <-ackChan:
432 case <-time.After(500 * time.Millisecond):
433 t.Errorf("#%d: duration %v blocks", i, duration)
434 }
435 }
436 }
437
438 func TestWorkerStarttime(t *testing.T) {
439 restart()
440
441 ctx := context.Background()
442 m := stats.Int64("measure/TestWorkerStarttime", "desc", "unit")
443 v := &View{
444 Name: "testview",
445 Measure: m,
446 Aggregation: Count(),
447 }
448
449 SetReportingPeriod(25 * time.Millisecond)
450 if err := Register(v); err != nil {
451 t.Fatalf("cannot register to %v: %v", v.Name, err)
452 }
453
454 e := &vdExporter{}
455 RegisterExporter(e)
456 defer UnregisterExporter(e)
457
458 stats.Record(ctx, m.M(1))
459 stats.Record(ctx, m.M(1))
460 stats.Record(ctx, m.M(1))
461 stats.Record(ctx, m.M(1))
462
463 time.Sleep(50 * time.Millisecond)
464
465 stats.Record(ctx, m.M(1))
466 stats.Record(ctx, m.M(1))
467 stats.Record(ctx, m.M(1))
468 stats.Record(ctx, m.M(1))
469
470 time.Sleep(50 * time.Millisecond)
471
472 e.Lock()
473 if len(e.vds) == 0 {
474 t.Fatal("Got no view data; want at least one")
475 }
476
477 var start time.Time
478 for _, vd := range e.vds {
479 if start.IsZero() {
480 start = vd.Start
481 }
482 if !vd.Start.Equal(start) {
483 t.Errorf("Cumulative view data start time = %v; want %v", vd.Start, start)
484 }
485 }
486 e.Unlock()
487 }
488
489 func TestUnregisterReportsUsage(t *testing.T) {
490 restart()
491 ctx := context.Background()
492
493 m1 := stats.Int64("measure", "desc", "unit")
494 view1 := &View{Name: "count", Measure: m1, Aggregation: Count()}
495 m2 := stats.Int64("measure2", "desc", "unit")
496 view2 := &View{Name: "count2", Measure: m2, Aggregation: Count()}
497
498 SetReportingPeriod(time.Hour)
499
500 if err := Register(view1, view2); err != nil {
501 t.Fatalf("cannot register: %v", err)
502 }
503
504 e := &countExporter{}
505 RegisterExporter(e)
506
507 stats.Record(ctx, m1.M(1))
508 stats.Record(ctx, m2.M(1))
509 stats.Record(ctx, m2.M(1))
510
511 Unregister(view2)
512
513
514 want := int64(2)
515
516 e.Lock()
517 got := e.totalCount
518 e.Unlock()
519 if got != want {
520 t.Errorf("got count data = %v; want %v", got, want)
521 }
522 }
523
524 func TestWorkerRace(t *testing.T) {
525 restart()
526 ctx := context.Background()
527
528 m1 := stats.Int64("measure", "desc", "unit")
529 view1 := &View{Name: "count", Measure: m1, Aggregation: Count()}
530 m2 := stats.Int64("measure2", "desc", "unit")
531 view2 := &View{Name: "count2", Measure: m2, Aggregation: Count()}
532
533
534 SetReportingPeriod(time.Microsecond)
535
536 if err := Register(view1, view2); err != nil {
537 t.Fatalf("cannot register: %v", err)
538 }
539
540 e := &countExporter{}
541 RegisterExporter(e)
542
543
544 var waiter sync.WaitGroup
545 waiter.Add(3)
546 defer waiter.Wait()
547
548 doneCh := make(chan bool)
549
550 go func() {
551 defer waiter.Done()
552 tick := time.NewTicker(700 * time.Nanosecond)
553 defer tick.Stop()
554
555 defer func() {
556 close(doneCh)
557 }()
558
559 for i := 0; i < 1e3; i++ {
560 stats.Record(ctx, m1.M(1))
561 stats.Record(ctx, m2.M(1))
562 stats.Record(ctx, m2.M(1))
563 <-tick.C
564 }
565 }()
566
567
568 go func() {
569 defer waiter.Done()
570 tick := time.NewTicker(900 * time.Nanosecond)
571 defer tick.Stop()
572
573 for {
574 select {
575 case <-doneCh:
576 return
577 case <-tick.C:
578 RetrieveData(view1.Name)
579 }
580 }
581 }()
582
583
584 go func() {
585 defer waiter.Done()
586 tick := time.NewTicker(800 * time.Nanosecond)
587 defer tick.Stop()
588
589 reader := metricexport.Reader{}
590 for {
591 select {
592 case <-doneCh:
593 return
594 case <-tick.C:
595
596 reader.ReadAndExport(&testExporter{})
597 }
598 }
599 }()
600 }
601
602 type testExporter struct {
603 metrics []*metricdata.Metric
604 }
605
606 func (te *testExporter) ExportMetrics(ctx context.Context, metrics []*metricdata.Metric) error {
607 te.metrics = metrics
608 return nil
609 }
610
611 type countExporter struct {
612 sync.Mutex
613 count int64
614 totalCount int64
615 }
616
617 func (e *countExporter) ExportView(vd *Data) {
618 if len(vd.Rows) == 0 {
619 return
620 }
621 d := vd.Rows[0].Data.(*CountData)
622
623 e.Lock()
624 defer e.Unlock()
625 e.count = d.Value
626 e.totalCount += d.Value
627 }
628
629 type vdExporter struct {
630 sync.Mutex
631 vds []*Data
632 }
633
634 func (e *vdExporter) ExportView(vd *Data) {
635 e.Lock()
636 defer e.Unlock()
637
638 e.vds = append(e.vds, vd)
639 }
640
641
642 func restart() {
643 defaultWorker.Stop()
644 defaultWorker = NewMeter().(*worker)
645 go defaultWorker.start()
646 }
647
648
649 type byLabel []*metricdata.TimeSeries
650
651 func (ts byLabel) Len() int { return len(ts) }
652 func (ts byLabel) Swap(i, j int) { ts[i], ts[j] = ts[j], ts[i] }
653 func (ts byLabel) Less(i, j int) bool {
654 if len(ts[i].LabelValues) != len(ts[j].LabelValues) {
655 return len(ts[i].LabelValues) < len(ts[j].LabelValues)
656 }
657 for k := range ts[i].LabelValues {
658 if ts[i].LabelValues[k].Value != ts[j].LabelValues[k].Value {
659 return ts[i].LabelValues[k].Value < ts[j].LabelValues[k].Value
660 }
661 }
662 return false
663 }
664
View as plain text