...

Source file src/go.opencensus.io/stats/view/worker_test.go

Documentation: go.opencensus.io/stats/view

     1  // Copyright 2017, OpenCensus Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package view
    17  
    18  import (
    19  	"context"
    20  	"errors"
    21  	"sort"
    22  	"sync"
    23  	"testing"
    24  	"time"
    25  
    26  	"github.com/google/go-cmp/cmp"
    27  	"go.opencensus.io/resource"
    28  
    29  	"go.opencensus.io/metric/metricdata"
    30  	"go.opencensus.io/metric/metricexport"
    31  	"go.opencensus.io/stats"
    32  	"go.opencensus.io/tag"
    33  )
    34  
    35  func Test_Worker_ViewRegistration(t *testing.T) {
    36  	someError := errors.New("some error")
    37  
    38  	sc1 := make(chan *Data)
    39  
    40  	type registration struct {
    41  		c   chan *Data
    42  		vID string
    43  		err error
    44  	}
    45  	type testCase struct {
    46  		label         string
    47  		registrations []registration
    48  	}
    49  	tcs := []testCase{
    50  		{
    51  			"register v1ID",
    52  			[]registration{
    53  				{
    54  					sc1,
    55  					"v1ID",
    56  					nil,
    57  				},
    58  			},
    59  		},
    60  		{
    61  			"register v1ID+v2ID",
    62  			[]registration{
    63  				{
    64  					sc1,
    65  					"v1ID",
    66  					nil,
    67  				},
    68  			},
    69  		},
    70  		{
    71  			"register to v1ID; ??? to v1ID and view with same ID",
    72  			[]registration{
    73  				{
    74  					sc1,
    75  					"v1ID",
    76  					nil,
    77  				},
    78  				{
    79  					sc1,
    80  					"v1SameNameID",
    81  					someError,
    82  				},
    83  			},
    84  		},
    85  	}
    86  
    87  	mf1 := stats.Float64("MF1/Test_Worker_ViewSubscription", "desc MF1", "unit")
    88  	mf2 := stats.Float64("MF2/Test_Worker_ViewSubscription", "desc MF2", "unit")
    89  
    90  	for _, tc := range tcs {
    91  		t.Run(tc.label, func(t *testing.T) {
    92  			restart()
    93  
    94  			views := map[string]*View{
    95  				"v1ID": {
    96  					Name:        "VF1",
    97  					Measure:     mf1,
    98  					Aggregation: Count(),
    99  				},
   100  				"v1SameNameID": {
   101  					Name:        "VF1",
   102  					Description: "desc duplicate name VF1",
   103  					Measure:     mf1,
   104  					Aggregation: Sum(),
   105  				},
   106  				"v2ID": {
   107  					Name:        "VF2",
   108  					Measure:     mf2,
   109  					Aggregation: Count(),
   110  				},
   111  				"vNilID": nil,
   112  			}
   113  
   114  			for _, r := range tc.registrations {
   115  				v := views[r.vID]
   116  				err := Register(v)
   117  				if (err != nil) != (r.err != nil) {
   118  					t.Errorf("%v: Register() = %v, want %v", tc.label, err, r.err)
   119  				}
   120  			}
   121  		})
   122  	}
   123  }
   124  
   125  func Test_Worker_MultiExport(t *testing.T) {
   126  	restart()
   127  
   128  	// This test reports the same data for the default worker and a secondary
   129  	// worker, and ensures that the stats are kept independently.
   130  	extraResource := resource.Resource{
   131  		Type:   "additional",
   132  		Labels: map[string]string{"key1": "value1", "key2": "value2"},
   133  	}
   134  	worker2 := NewMeter().(*worker)
   135  	worker2.Start()
   136  	worker2.SetResource(&extraResource)
   137  
   138  	m := stats.Float64("Test_Worker_MultiExport/MF1", "desc MF1", "unit")
   139  	key := tag.MustNewKey(("key"))
   140  	count := &View{"VF1", "description", []tag.Key{key}, m, Count()}
   141  	sum := &View{"VF2", "description", []tag.Key{}, m, Sum()}
   142  
   143  	Register(count, sum)
   144  	worker2.Register(count) // Don't compute the sum for worker2, to verify independence of computation.
   145  	data := []struct {
   146  		w     Meter
   147  		tags  string // Tag values
   148  		value float64
   149  	}{{
   150  		tags:  "a",
   151  		value: 2.0,
   152  	}, {
   153  		tags:  "b",
   154  		value: 3.0,
   155  	}, {
   156  		tags: "a", value: 2.5,
   157  	}, {
   158  		w: worker2, tags: "b", value: 1.0,
   159  	},
   160  	}
   161  
   162  	for _, d := range data {
   163  		ctx, err := tag.New(context.Background(), tag.Upsert(key, d.tags))
   164  		if err != nil {
   165  			t.Fatalf("%s: failed to add tag %q: %v", d.w, key.Name(), err)
   166  		}
   167  		if d.w != nil {
   168  			d.w.Record(tag.FromContext(ctx), []stats.Measurement{m.M(d.value)}, nil)
   169  		} else {
   170  			stats.Record(ctx, m.M(d.value))
   171  		}
   172  	}
   173  
   174  	makeKey := func(r *resource.Resource, view string) string {
   175  		if r == nil {
   176  			r = &resource.Resource{}
   177  		}
   178  		return resource.EncodeLabels(r.Labels) + "/" + view
   179  	}
   180  
   181  	// Format is Resource.Labels encoded as string, then
   182  	wantPartialData := map[string][]*Row{
   183  		makeKey(nil, count.Name): {
   184  			{[]tag.Tag{{Key: key, Value: "a"}}, &CountData{Value: 2}},
   185  			{[]tag.Tag{{Key: key, Value: "b"}}, &CountData{Value: 1}},
   186  		},
   187  		makeKey(nil, sum.Name): {
   188  			{nil, &SumData{Value: 7.5}},
   189  		},
   190  		makeKey(&extraResource, count.Name): {
   191  			{[]tag.Tag{{Key: key, Value: "b"}}, &CountData{Value: 1}},
   192  		},
   193  	}
   194  
   195  	te := &testExporter{}
   196  	metricexport.NewReader().ReadAndExport(te)
   197  	for _, m := range te.metrics {
   198  		key := makeKey(m.Resource, m.Descriptor.Name)
   199  		want, ok := wantPartialData[key]
   200  		if !ok {
   201  			t.Errorf("Unexpected data for %q: %v", key, m)
   202  			continue
   203  		}
   204  		gotTs := m.TimeSeries
   205  		sort.Sort(byLabel(gotTs))
   206  
   207  		for i, ts := range gotTs {
   208  			for j, label := range ts.LabelValues {
   209  				if want[i].Tags[j].Value != label.Value {
   210  					t.Errorf("Mismatched tag values (want %q, got %q) for %v in %q", want[i].Tags[j].Value, label.Value, ts, key)
   211  				}
   212  			}
   213  			switch wantValue := want[i].Data.(type) {
   214  			case *CountData:
   215  				got := ts.Points[0].Value.(int64)
   216  				if wantValue.Value != got {
   217  					t.Errorf("Mismatched value (want %d, got %d) for %v in %q", wantValue.Value, got, ts, key)
   218  				}
   219  			case *SumData:
   220  				got := ts.Points[0].Value.(float64)
   221  				if wantValue.Value != got {
   222  					t.Errorf("Mismatched value (want %f, got %f) for %v in %q", wantValue.Value, got, ts, key)
   223  				}
   224  			default:
   225  				t.Errorf("Unexpected type of data: %T for %v in %q", wantValue, want[i], key)
   226  			}
   227  		}
   228  	}
   229  
   230  	// Verify that worker has not been computing sum:
   231  	got, err := worker2.RetrieveData(sum.Name)
   232  	if err == nil {
   233  		t.Errorf("%s: expected no data because it was not registered, got %#v", sum.Name, got)
   234  	}
   235  
   236  	Unregister(count, sum)
   237  	worker2.Unregister(count)
   238  	worker2.Stop()
   239  }
   240  
   241  func Test_Worker_RecordFloat64(t *testing.T) {
   242  	restart()
   243  
   244  	someError := errors.New("some error")
   245  	m := stats.Float64("Test_Worker_RecordFloat64/MF1", "desc MF1", "unit")
   246  
   247  	k1 := tag.MustNewKey("k1")
   248  	k2 := tag.MustNewKey("k2")
   249  	ctx, err := tag.New(context.Background(),
   250  		tag.Insert(k1, "v1"),
   251  		tag.Insert(k2, "v2"),
   252  	)
   253  	if err != nil {
   254  		t.Fatal(err)
   255  	}
   256  
   257  	v1 := &View{"VF1", "desc VF1", []tag.Key{k1, k2}, m, Count()}
   258  	v2 := &View{"VF2", "desc VF2", []tag.Key{k1, k2}, m, Count()}
   259  
   260  	type want struct {
   261  		v    *View
   262  		rows []*Row
   263  		err  error
   264  	}
   265  	type testCase struct {
   266  		label         string
   267  		registrations []*View
   268  		records       []float64
   269  		wants         []want
   270  	}
   271  
   272  	tcs := []testCase{
   273  		{
   274  			label:         "0",
   275  			registrations: []*View{},
   276  			records:       []float64{1, 1},
   277  			wants:         []want{{v1, nil, someError}, {v2, nil, someError}},
   278  		},
   279  		{
   280  			label:         "1",
   281  			registrations: []*View{v1},
   282  			records:       []float64{1, 1},
   283  			wants: []want{
   284  				{
   285  					v1,
   286  					[]*Row{
   287  						{
   288  							[]tag.Tag{{Key: k1, Value: "v1"}, {Key: k2, Value: "v2"}},
   289  							&CountData{Value: 2},
   290  						},
   291  					},
   292  					nil,
   293  				},
   294  				{v2, nil, someError},
   295  			},
   296  		},
   297  		{
   298  			label:         "2",
   299  			registrations: []*View{v1, v2},
   300  			records:       []float64{1, 1},
   301  			wants: []want{
   302  				{
   303  					v1,
   304  					[]*Row{
   305  						{
   306  							[]tag.Tag{{Key: k1, Value: "v1"}, {Key: k2, Value: "v2"}},
   307  							&CountData{Value: 2},
   308  						},
   309  					},
   310  					nil,
   311  				},
   312  				{
   313  					v2,
   314  					[]*Row{
   315  						{
   316  							[]tag.Tag{{Key: k1, Value: "v1"}, {Key: k2, Value: "v2"}},
   317  							&CountData{Value: 2},
   318  						},
   319  					},
   320  					nil,
   321  				},
   322  			},
   323  		},
   324  	}
   325  
   326  	for _, tc := range tcs {
   327  		for _, v := range tc.registrations {
   328  			if err := Register(v); err != nil {
   329  				t.Fatalf("%v: Register(%v) = %v; want no errors", tc.label, v.Name, err)
   330  			}
   331  		}
   332  
   333  		for _, value := range tc.records {
   334  			stats.Record(ctx, m.M(value))
   335  		}
   336  
   337  		for _, w := range tc.wants {
   338  			gotRows, err := RetrieveData(w.v.Name)
   339  			for i := range gotRows {
   340  				switch data := gotRows[i].Data.(type) {
   341  				case *CountData:
   342  					data.Start = time.Time{}
   343  				case *SumData:
   344  					data.Start = time.Time{}
   345  				case *DistributionData:
   346  					data.Start = time.Time{}
   347  				}
   348  			}
   349  			if (err != nil) != (w.err != nil) {
   350  				t.Fatalf("%s: RetrieveData(%v) = %v; want error = %v", tc.label, w.v.Name, err, w.err)
   351  			}
   352  			if diff := cmp.Diff(gotRows, w.rows); diff != "" {
   353  				t.Errorf("%v: unexpected row (got-, want+): %s", tc.label, diff)
   354  				break
   355  			}
   356  		}
   357  
   358  		// Cleaning up.
   359  		Unregister(tc.registrations...)
   360  	}
   361  }
   362  
   363  func TestReportUsage(t *testing.T) {
   364  	ctx := context.Background()
   365  
   366  	m := stats.Int64("measure", "desc", "unit")
   367  
   368  	tests := []struct {
   369  		name         string
   370  		view         *View
   371  		wantMaxCount int64
   372  	}{
   373  		{
   374  			name:         "cum",
   375  			view:         &View{Name: "cum1", Measure: m, Aggregation: Count()},
   376  			wantMaxCount: 8,
   377  		},
   378  		{
   379  			name:         "cum2",
   380  			view:         &View{Name: "cum1", Measure: m, Aggregation: Count()},
   381  			wantMaxCount: 8,
   382  		},
   383  	}
   384  
   385  	for _, tt := range tests {
   386  		restart()
   387  		SetReportingPeriod(25 * time.Millisecond)
   388  
   389  		if err := Register(tt.view); err != nil {
   390  			t.Fatalf("%v: cannot register: %v", tt.name, err)
   391  		}
   392  
   393  		e := &countExporter{}
   394  		RegisterExporter(e)
   395  
   396  		stats.Record(ctx, m.M(1))
   397  		stats.Record(ctx, m.M(1))
   398  		stats.Record(ctx, m.M(1))
   399  		stats.Record(ctx, m.M(1))
   400  
   401  		time.Sleep(50 * time.Millisecond)
   402  
   403  		stats.Record(ctx, m.M(1))
   404  		stats.Record(ctx, m.M(1))
   405  		stats.Record(ctx, m.M(1))
   406  		stats.Record(ctx, m.M(1))
   407  
   408  		time.Sleep(50 * time.Millisecond)
   409  
   410  		e.Lock()
   411  		count := e.count
   412  		e.Unlock()
   413  		if got, want := count, tt.wantMaxCount; got > want {
   414  			t.Errorf("%v: got count data = %v; want at most %v", tt.name, got, want)
   415  		}
   416  	}
   417  
   418  }
   419  
   420  func Test_SetReportingPeriodReqNeverBlocks(t *testing.T) {
   421  	t.Parallel()
   422  
   423  	worker := NewMeter().(*worker)
   424  	durations := []time.Duration{-1, 0, 10, 100 * time.Millisecond}
   425  	for i, duration := range durations {
   426  		ackChan := make(chan bool, 1)
   427  		cmd := &setReportingPeriodReq{c: ackChan, d: duration}
   428  		cmd.handleCommand(worker)
   429  
   430  		select {
   431  		case <-ackChan:
   432  		case <-time.After(500 * time.Millisecond): // Arbitrarily using 500ms as the timeout duration.
   433  			t.Errorf("#%d: duration %v blocks", i, duration)
   434  		}
   435  	}
   436  }
   437  
   438  func TestWorkerStarttime(t *testing.T) {
   439  	restart()
   440  
   441  	ctx := context.Background()
   442  	m := stats.Int64("measure/TestWorkerStarttime", "desc", "unit")
   443  	v := &View{
   444  		Name:        "testview",
   445  		Measure:     m,
   446  		Aggregation: Count(),
   447  	}
   448  
   449  	SetReportingPeriod(25 * time.Millisecond)
   450  	if err := Register(v); err != nil {
   451  		t.Fatalf("cannot register to %v: %v", v.Name, err)
   452  	}
   453  
   454  	e := &vdExporter{}
   455  	RegisterExporter(e)
   456  	defer UnregisterExporter(e)
   457  
   458  	stats.Record(ctx, m.M(1))
   459  	stats.Record(ctx, m.M(1))
   460  	stats.Record(ctx, m.M(1))
   461  	stats.Record(ctx, m.M(1))
   462  
   463  	time.Sleep(50 * time.Millisecond)
   464  
   465  	stats.Record(ctx, m.M(1))
   466  	stats.Record(ctx, m.M(1))
   467  	stats.Record(ctx, m.M(1))
   468  	stats.Record(ctx, m.M(1))
   469  
   470  	time.Sleep(50 * time.Millisecond)
   471  
   472  	e.Lock()
   473  	if len(e.vds) == 0 {
   474  		t.Fatal("Got no view data; want at least one")
   475  	}
   476  
   477  	var start time.Time
   478  	for _, vd := range e.vds {
   479  		if start.IsZero() {
   480  			start = vd.Start
   481  		}
   482  		if !vd.Start.Equal(start) {
   483  			t.Errorf("Cumulative view data start time = %v; want %v", vd.Start, start)
   484  		}
   485  	}
   486  	e.Unlock()
   487  }
   488  
   489  func TestUnregisterReportsUsage(t *testing.T) {
   490  	restart()
   491  	ctx := context.Background()
   492  
   493  	m1 := stats.Int64("measure", "desc", "unit")
   494  	view1 := &View{Name: "count", Measure: m1, Aggregation: Count()}
   495  	m2 := stats.Int64("measure2", "desc", "unit")
   496  	view2 := &View{Name: "count2", Measure: m2, Aggregation: Count()}
   497  
   498  	SetReportingPeriod(time.Hour)
   499  
   500  	if err := Register(view1, view2); err != nil {
   501  		t.Fatalf("cannot register: %v", err)
   502  	}
   503  
   504  	e := &countExporter{}
   505  	RegisterExporter(e)
   506  
   507  	stats.Record(ctx, m1.M(1))
   508  	stats.Record(ctx, m2.M(1))
   509  	stats.Record(ctx, m2.M(1))
   510  
   511  	Unregister(view2)
   512  
   513  	// Unregister should only flush view2, so expect the count of 2.
   514  	want := int64(2)
   515  
   516  	e.Lock()
   517  	got := e.totalCount
   518  	e.Unlock()
   519  	if got != want {
   520  		t.Errorf("got count data = %v; want %v", got, want)
   521  	}
   522  }
   523  
   524  func TestWorkerRace(t *testing.T) {
   525  	restart()
   526  	ctx := context.Background()
   527  
   528  	m1 := stats.Int64("measure", "desc", "unit")
   529  	view1 := &View{Name: "count", Measure: m1, Aggregation: Count()}
   530  	m2 := stats.Int64("measure2", "desc", "unit")
   531  	view2 := &View{Name: "count2", Measure: m2, Aggregation: Count()}
   532  
   533  	// 1. This will export every microsecond.
   534  	SetReportingPeriod(time.Microsecond)
   535  
   536  	if err := Register(view1, view2); err != nil {
   537  		t.Fatalf("cannot register: %v", err)
   538  	}
   539  
   540  	e := &countExporter{}
   541  	RegisterExporter(e)
   542  
   543  	// Synchronize and make sure every goroutine has terminated before we exit
   544  	var waiter sync.WaitGroup
   545  	waiter.Add(3)
   546  	defer waiter.Wait()
   547  
   548  	doneCh := make(chan bool)
   549  	// 2. Record write routine at 700ns
   550  	go func() {
   551  		defer waiter.Done()
   552  		tick := time.NewTicker(700 * time.Nanosecond)
   553  		defer tick.Stop()
   554  
   555  		defer func() {
   556  			close(doneCh)
   557  		}()
   558  
   559  		for i := 0; i < 1e3; i++ {
   560  			stats.Record(ctx, m1.M(1))
   561  			stats.Record(ctx, m2.M(1))
   562  			stats.Record(ctx, m2.M(1))
   563  			<-tick.C
   564  		}
   565  	}()
   566  
   567  	// 2. Simulating RetrieveData 900ns
   568  	go func() {
   569  		defer waiter.Done()
   570  		tick := time.NewTicker(900 * time.Nanosecond)
   571  		defer tick.Stop()
   572  
   573  		for {
   574  			select {
   575  			case <-doneCh:
   576  				return
   577  			case <-tick.C:
   578  				RetrieveData(view1.Name)
   579  			}
   580  		}
   581  	}()
   582  
   583  	// 4. Export via Reader routine at 800ns
   584  	go func() {
   585  		defer waiter.Done()
   586  		tick := time.NewTicker(800 * time.Nanosecond)
   587  		defer tick.Stop()
   588  
   589  		reader := metricexport.Reader{}
   590  		for {
   591  			select {
   592  			case <-doneCh:
   593  				return
   594  			case <-tick.C:
   595  				// Perform some collection here
   596  				reader.ReadAndExport(&testExporter{})
   597  			}
   598  		}
   599  	}()
   600  }
   601  
   602  type testExporter struct {
   603  	metrics []*metricdata.Metric
   604  }
   605  
   606  func (te *testExporter) ExportMetrics(ctx context.Context, metrics []*metricdata.Metric) error {
   607  	te.metrics = metrics
   608  	return nil
   609  }
   610  
   611  type countExporter struct {
   612  	sync.Mutex
   613  	count      int64
   614  	totalCount int64
   615  }
   616  
   617  func (e *countExporter) ExportView(vd *Data) {
   618  	if len(vd.Rows) == 0 {
   619  		return
   620  	}
   621  	d := vd.Rows[0].Data.(*CountData)
   622  
   623  	e.Lock()
   624  	defer e.Unlock()
   625  	e.count = d.Value
   626  	e.totalCount += d.Value
   627  }
   628  
   629  type vdExporter struct {
   630  	sync.Mutex
   631  	vds []*Data
   632  }
   633  
   634  func (e *vdExporter) ExportView(vd *Data) {
   635  	e.Lock()
   636  	defer e.Unlock()
   637  
   638  	e.vds = append(e.vds, vd)
   639  }
   640  
   641  // restart stops the current processors and creates a new one.
   642  func restart() {
   643  	defaultWorker.Stop()
   644  	defaultWorker = NewMeter().(*worker)
   645  	go defaultWorker.start()
   646  }
   647  
   648  // byTag implements sort.Interface for *metricdata.TimeSeries by Labels.
   649  type byLabel []*metricdata.TimeSeries
   650  
   651  func (ts byLabel) Len() int      { return len(ts) }
   652  func (ts byLabel) Swap(i, j int) { ts[i], ts[j] = ts[j], ts[i] }
   653  func (ts byLabel) Less(i, j int) bool {
   654  	if len(ts[i].LabelValues) != len(ts[j].LabelValues) {
   655  		return len(ts[i].LabelValues) < len(ts[j].LabelValues)
   656  	}
   657  	for k := range ts[i].LabelValues {
   658  		if ts[i].LabelValues[k].Value != ts[j].LabelValues[k].Value {
   659  			return ts[i].LabelValues[k].Value < ts[j].LabelValues[k].Value
   660  		}
   661  	}
   662  	return false
   663  }
   664  

View as plain text