...

Source file src/go.opencensus.io/metric/metricexport/reader_test.go

Documentation: go.opencensus.io/metric/metricexport

     1  // Copyright 2019, OpenCensus Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package metricexport
    17  
    18  import (
    19  	"context"
    20  	"sync"
    21  	"testing"
    22  	"time"
    23  
    24  	"go.opencensus.io/metric"
    25  	"go.opencensus.io/metric/metricdata"
    26  	"go.opencensus.io/metric/metricproducer"
    27  )
    28  
    29  var (
    30  	ir1        *IntervalReader
    31  	ir2        *IntervalReader
    32  	reader1    = NewReader(WithSpanName("test-export-span"))
    33  	exporter1  = &metricExporter{}
    34  	exporter2  = &metricExporter{}
    35  	gaugeEntry *metric.Int64GaugeEntry
    36  	duration1  = 1000 * time.Millisecond
    37  	duration2  = 2000 * time.Millisecond
    38  )
    39  
    40  type metricExporter struct {
    41  	sync.Mutex
    42  	metrics []*metricdata.Metric
    43  }
    44  
    45  func (e *metricExporter) ExportMetrics(ctx context.Context, metrics []*metricdata.Metric) error {
    46  	e.Lock()
    47  	defer e.Unlock()
    48  
    49  	e.metrics = append(e.metrics, metrics...)
    50  	return nil
    51  }
    52  
    53  func init() {
    54  	r := metric.NewRegistry()
    55  	metricproducer.GlobalManager().AddProducer(r)
    56  	g, _ := r.AddInt64Gauge("active_request",
    57  		metric.WithDescription("Number of active requests, per method."),
    58  		metric.WithUnit(metricdata.UnitDimensionless),
    59  		metric.WithLabelKeys("method"))
    60  	gaugeEntry, _ = g.GetEntry(metricdata.NewLabelValue("foo"))
    61  }
    62  
    63  func TestNewReaderWitDefaultOptions(t *testing.T) {
    64  	r := NewReader()
    65  
    66  	if r.spanName != defaultSpanName {
    67  		t.Errorf("span name: got %v, want %v\n", r.spanName, defaultSpanName)
    68  	}
    69  }
    70  
    71  func TestNewReaderWitSpanName(t *testing.T) {
    72  	spanName := "test-span"
    73  	r := NewReader(WithSpanName(spanName))
    74  
    75  	if r.spanName != spanName {
    76  		t.Errorf("span name: got %+v, want %v\n", r.spanName, spanName)
    77  	}
    78  }
    79  
    80  func TestNewReader(t *testing.T) {
    81  	r := NewReader()
    82  
    83  	gaugeEntry.Add(1)
    84  
    85  	r.ReadAndExport(exporter1)
    86  
    87  	checkExportedCount(exporter1, 1, t)
    88  	checkExportedMetricDesc(exporter1, "active_request", t)
    89  	resetExporter(exporter1)
    90  }
    91  
    92  func TestNewIntervalReader(t *testing.T) {
    93  	ir1 = createAndStart(exporter1, duration1, t)
    94  
    95  	gaugeEntry.Add(1)
    96  
    97  	time.Sleep(1500 * time.Millisecond)
    98  	checkExportedCount(exporter1, 1, t)
    99  	checkExportedMetricDesc(exporter1, "active_request", t)
   100  	ir1.Stop()
   101  	resetExporter(exporter1)
   102  }
   103  
   104  func TestManualReadForIntervalReader(t *testing.T) {
   105  	ir1 = createAndStart(exporter1, duration1, t)
   106  
   107  	gaugeEntry.Set(1)
   108  	reader1.ReadAndExport(exporter1)
   109  	gaugeEntry.Set(4)
   110  
   111  	time.Sleep(1500 * time.Millisecond)
   112  
   113  	checkExportedCount(exporter1, 2, t)
   114  	checkExportedValues(exporter1, []int64{1, 4}, t) // one for manual read other for time based.
   115  	checkExportedMetricDesc(exporter1, "active_request", t)
   116  	ir1.Stop()
   117  	resetExporter(exporter1)
   118  }
   119  
   120  func TestFlushNoOpForIntervalReader(t *testing.T) {
   121  	ir1 = createAndStart(exporter1, duration1, t)
   122  
   123  	gaugeEntry.Set(1)
   124  
   125  	// since IR is not stopped, flush does nothing
   126  	ir1.Flush()
   127  
   128  	// expect no data points
   129  	checkExportedCount(exporter1, 0, t)
   130  	checkExportedMetricDesc(exporter1, "active_request", t)
   131  	ir1.Stop()
   132  	resetExporter(exporter1)
   133  }
   134  
   135  func TestFlushAllowMultipleForIntervalReader(t *testing.T) {
   136  	ir1 = createAndStart(exporter1, duration1, t)
   137  
   138  	gaugeEntry.Set(1)
   139  
   140  	ir1.Stop()
   141  	ir1.Flush()
   142  
   143  	// metric is still coming in
   144  	gaugeEntry.Add(1)
   145  
   146  	// one more flush after IR stopped
   147  	ir1.Flush()
   148  
   149  	// expect 2 data point, one from each flush
   150  	checkExportedCount(exporter1, 2, t)
   151  	checkExportedValues(exporter1, []int64{1, 2}, t)
   152  	checkExportedMetricDesc(exporter1, "active_request", t)
   153  
   154  	resetExporter(exporter1)
   155  }
   156  
   157  func TestFlushRestartForIntervalReader(t *testing.T) {
   158  	ir1 = createAndStart(exporter1, duration1, t)
   159  
   160  	gaugeEntry.Set(1)
   161  	ir1.Stop()
   162  	ir1.Flush()
   163  
   164  	// restart the IR
   165  	err := ir1.Start()
   166  	if err != nil {
   167  		t.Fatalf("error starting reader %v\n", err)
   168  	}
   169  
   170  	gaugeEntry.Add(1)
   171  
   172  	ir1.Stop()
   173  	ir1.Flush()
   174  
   175  	// expect 2 data point, one from each flush
   176  	checkExportedCount(exporter1, 2, t)
   177  	checkExportedValues(exporter1, []int64{1, 2}, t)
   178  	checkExportedMetricDesc(exporter1, "active_request", t)
   179  
   180  	resetExporter(exporter1)
   181  }
   182  
   183  func TestProducerWithIntervalReaderStop(t *testing.T) {
   184  	ir1 = createAndStart(exporter1, duration1, t)
   185  	ir1.Stop()
   186  
   187  	gaugeEntry.Add(1)
   188  
   189  	time.Sleep(1500 * time.Millisecond)
   190  
   191  	checkExportedCount(exporter1, 0, t)
   192  	checkExportedMetricDesc(exporter1, "active_request", t)
   193  	resetExporter(exporter1)
   194  }
   195  
   196  func TestProducerWithMultipleIntervalReaders(t *testing.T) {
   197  	ir1 = createAndStart(exporter1, duration1, t)
   198  	ir2 = createAndStart(exporter2, duration2, t)
   199  
   200  	gaugeEntry.Add(1)
   201  
   202  	time.Sleep(2500 * time.Millisecond)
   203  
   204  	checkExportedCount(exporter1, 2, t)
   205  	checkExportedMetricDesc(exporter1, "active_request", t)
   206  	checkExportedCount(exporter2, 1, t)
   207  	checkExportedMetricDesc(exporter2, "active_request", t)
   208  	ir1.Stop()
   209  	ir2.Stop()
   210  	resetExporter(exporter1)
   211  	resetExporter(exporter1)
   212  }
   213  
   214  func TestIntervalReaderMultipleStop(t *testing.T) {
   215  	ir1 = createAndStart(exporter1, duration1, t)
   216  	stop := make(chan bool, 1)
   217  	go func() {
   218  		ir1.Stop()
   219  		ir1.Stop()
   220  		stop <- true
   221  	}()
   222  
   223  	select {
   224  	case _ = <-stop:
   225  	case <-time.After(1 * time.Second):
   226  		t.Fatalf("ir1 stop got blocked")
   227  	}
   228  }
   229  
   230  func TestIntervalReaderMultipleStart(t *testing.T) {
   231  	ir1 = createAndStart(exporter1, duration1, t)
   232  	err := ir1.Start()
   233  	if err == nil {
   234  		t.Fatalf("expected error but got nil\n")
   235  	}
   236  
   237  	gaugeEntry.Add(1)
   238  
   239  	time.Sleep(1500 * time.Millisecond)
   240  
   241  	checkExportedCount(exporter1, 1, t)
   242  	checkExportedMetricDesc(exporter1, "active_request", t)
   243  	ir1.Stop()
   244  	resetExporter(exporter1)
   245  }
   246  
   247  func TestNewIntervalReaderWithNilReader(t *testing.T) {
   248  	_, err := NewIntervalReader(nil, exporter1)
   249  	if err == nil {
   250  		t.Fatalf("expected error but got nil\n")
   251  	}
   252  }
   253  
   254  func TestNewIntervalReaderWithNilExporter(t *testing.T) {
   255  	_, err := NewIntervalReader(reader1, nil)
   256  	if err == nil {
   257  		t.Fatalf("expected error but got nil\n")
   258  	}
   259  }
   260  
   261  func TestNewIntervalReaderStartWithInvalidInterval(t *testing.T) {
   262  	ir, err := NewIntervalReader(reader1, exporter1)
   263  	ir.ReportingInterval = 500 * time.Millisecond
   264  	err = ir.Start()
   265  	if err == nil {
   266  		t.Fatalf("expected error but got nil\n")
   267  	}
   268  }
   269  
   270  func checkExportedCount(exporter *metricExporter, wantCount int, t *testing.T) {
   271  	exporter.Lock()
   272  	defer exporter.Unlock()
   273  	gotCount := len(exporter.metrics)
   274  	if gotCount != wantCount {
   275  		t.Fatalf("exported metric count: got %d, want %d\n", gotCount, wantCount)
   276  	}
   277  }
   278  
   279  func checkExportedValues(exporter *metricExporter, wantValues []int64, t *testing.T) {
   280  	exporter.Lock()
   281  	defer exporter.Unlock()
   282  	gotCount := len(exporter.metrics)
   283  	wantCount := len(wantValues)
   284  	if gotCount != wantCount {
   285  		t.Errorf("exported metric count: got %d, want %d\n", gotCount, wantCount)
   286  		return
   287  	}
   288  	for i, wantValue := range wantValues {
   289  		var gotValue int64
   290  		switch v := exporter.metrics[i].TimeSeries[0].Points[0].Value.(type) {
   291  		case int64:
   292  			gotValue = v
   293  		default:
   294  			t.Errorf("expected float64 value but found other %T", exporter.metrics[i].TimeSeries[0].Points[0].Value)
   295  		}
   296  		if gotValue != wantValue {
   297  			t.Errorf("values idx %d, got: %v, want %v", i, gotValue, wantValue)
   298  		}
   299  	}
   300  }
   301  
   302  func checkExportedMetricDesc(exporter *metricExporter, wantMdName string, t *testing.T) {
   303  	exporter.Lock()
   304  	defer exporter.Unlock()
   305  	for _, metric := range exporter.metrics {
   306  		gotMdName := metric.Descriptor.Name
   307  		if gotMdName != wantMdName {
   308  			t.Errorf("got %s, want %s\n", gotMdName, wantMdName)
   309  		}
   310  	}
   311  	exporter.metrics = nil
   312  }
   313  
   314  func resetExporter(exporter *metricExporter) {
   315  	exporter.Lock()
   316  	defer exporter.Unlock()
   317  	exporter.metrics = nil
   318  }
   319  
   320  // createAndStart stops the current processors and creates a new one.
   321  func createAndStart(exporter *metricExporter, d time.Duration, t *testing.T) *IntervalReader {
   322  	ir, _ := NewIntervalReader(reader1, exporter)
   323  	ir.ReportingInterval = d
   324  	err := ir.Start()
   325  	if err != nil {
   326  		t.Fatalf("error creating reader %v\n", err)
   327  	}
   328  	return ir
   329  }
   330  

View as plain text