...

Source file src/go.opencensus.io/plugin/ocgrpc/client_stats_handler_test.go

Documentation: go.opencensus.io/plugin/ocgrpc

     1  // Copyright 2017, OpenCensus Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package ocgrpc
    17  
    18  import (
    19  	"reflect"
    20  	"testing"
    21  
    22  	"github.com/google/go-cmp/cmp"
    23  	"github.com/google/go-cmp/cmp/cmpopts"
    24  
    25  	"go.opencensus.io/trace"
    26  	"google.golang.org/grpc/codes"
    27  	"google.golang.org/grpc/status"
    28  
    29  	"context"
    30  
    31  	"go.opencensus.io/metric/metricdata"
    32  	"go.opencensus.io/stats/view"
    33  	"go.opencensus.io/tag"
    34  
    35  	"google.golang.org/grpc/stats"
    36  )
    37  
    38  func TestClientDefaultCollections(t *testing.T) {
    39  	k1 := tag.MustNewKey("k1")
    40  	k2 := tag.MustNewKey("k2")
    41  
    42  	type tagPair struct {
    43  		k tag.Key
    44  		v string
    45  	}
    46  
    47  	type wantData struct {
    48  		v    func() *view.View
    49  		rows []*view.Row
    50  	}
    51  	type rpc struct {
    52  		tags        []tagPair
    53  		tagInfo     *stats.RPCTagInfo
    54  		inPayloads  []*stats.InPayload
    55  		outPayloads []*stats.OutPayload
    56  		end         *stats.End
    57  	}
    58  
    59  	type testCase struct {
    60  		label string
    61  		rpcs  []*rpc
    62  		wants []*wantData
    63  	}
    64  	tcs := []testCase{
    65  		{
    66  			label: "1",
    67  			rpcs: []*rpc{
    68  				{
    69  					[]tagPair{{k1, "v1"}},
    70  					&stats.RPCTagInfo{FullMethodName: "/package.service/method"},
    71  					[]*stats.InPayload{
    72  						{Length: 10},
    73  					},
    74  					[]*stats.OutPayload{
    75  						{Length: 10},
    76  					},
    77  					&stats.End{Error: nil},
    78  				},
    79  			},
    80  			wants: []*wantData{
    81  				{
    82  					func() *view.View { return ClientSentMessagesPerRPCView },
    83  					[]*view.Row{
    84  						{
    85  							Tags: []tag.Tag{
    86  								{Key: KeyClientMethod, Value: "package.service/method"},
    87  							},
    88  							Data: newDistributionData([]int64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, 1, 1, 1, 1, 0),
    89  						},
    90  					},
    91  				},
    92  				{
    93  					func() *view.View { return ClientReceivedMessagesPerRPCView },
    94  					[]*view.Row{
    95  						{
    96  							Tags: []tag.Tag{
    97  								{Key: KeyClientMethod, Value: "package.service/method"},
    98  							},
    99  							Data: newDistributionData([]int64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, 1, 1, 1, 1, 0),
   100  						},
   101  					},
   102  				},
   103  				{
   104  					func() *view.View { return ClientSentBytesPerRPCView },
   105  					[]*view.Row{
   106  						{
   107  							Tags: []tag.Tag{
   108  								{Key: KeyClientMethod, Value: "package.service/method"},
   109  							},
   110  							Data: newDistributionData([]int64{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, 1, 10, 10, 10, 0),
   111  						},
   112  					},
   113  				},
   114  				{
   115  					func() *view.View { return ClientReceivedBytesPerRPCView },
   116  					[]*view.Row{
   117  						{
   118  							Tags: []tag.Tag{
   119  								{Key: KeyClientMethod, Value: "package.service/method"},
   120  							},
   121  							Data: newDistributionData([]int64{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, 1, 10, 10, 10, 0),
   122  						},
   123  					},
   124  				},
   125  			},
   126  		},
   127  		{
   128  			label: "2",
   129  			rpcs: []*rpc{
   130  				{
   131  					[]tagPair{{k1, "v1"}},
   132  					&stats.RPCTagInfo{FullMethodName: "/package.service/method"},
   133  					[]*stats.InPayload{
   134  						{Length: 10},
   135  					},
   136  					[]*stats.OutPayload{
   137  						{Length: 10},
   138  						{Length: 10},
   139  						{Length: 10},
   140  					},
   141  					&stats.End{Error: nil},
   142  				},
   143  				{
   144  					[]tagPair{{k1, "v11"}},
   145  					&stats.RPCTagInfo{FullMethodName: "/package.service/method"},
   146  					[]*stats.InPayload{
   147  						{Length: 10},
   148  						{Length: 10},
   149  					},
   150  					[]*stats.OutPayload{
   151  						{Length: 10},
   152  						{Length: 10},
   153  					},
   154  					&stats.End{Error: status.Error(codes.Canceled, "canceled")},
   155  				},
   156  			},
   157  			wants: []*wantData{
   158  				{
   159  					func() *view.View { return ClientSentMessagesPerRPCView },
   160  					[]*view.Row{
   161  						{
   162  							Tags: []tag.Tag{
   163  								{Key: KeyClientMethod, Value: "package.service/method"},
   164  							},
   165  							Data: newDistributionData([]int64{0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, 2, 2, 3, 2.5, 0.5),
   166  						},
   167  					},
   168  				},
   169  				{
   170  					func() *view.View { return ClientReceivedMessagesPerRPCView },
   171  					[]*view.Row{
   172  						{
   173  							Tags: []tag.Tag{
   174  								{Key: KeyClientMethod, Value: "package.service/method"},
   175  							},
   176  							Data: newDistributionData([]int64{0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, 2, 1, 2, 1.5, 0.5),
   177  						},
   178  					},
   179  				},
   180  			},
   181  		},
   182  		{
   183  			label: "3",
   184  			rpcs: []*rpc{
   185  				{
   186  					[]tagPair{{k1, "v1"}},
   187  					&stats.RPCTagInfo{FullMethodName: "/package.service/method"},
   188  					[]*stats.InPayload{
   189  						{Length: 1},
   190  					},
   191  					[]*stats.OutPayload{
   192  						{Length: 1},
   193  						{Length: 1024},
   194  						{Length: 65536},
   195  					},
   196  					&stats.End{Error: nil},
   197  				},
   198  				{
   199  					[]tagPair{{k1, "v1"}, {k2, "v2"}},
   200  					&stats.RPCTagInfo{FullMethodName: "/package.service/method"},
   201  					[]*stats.InPayload{
   202  						{Length: 1024},
   203  					},
   204  					[]*stats.OutPayload{
   205  						{Length: 4096},
   206  						{Length: 16384},
   207  					},
   208  					&stats.End{Error: status.Error(codes.Canceled, "canceled")},
   209  				},
   210  				{
   211  					[]tagPair{{k1, "v11"}, {k2, "v22"}},
   212  					&stats.RPCTagInfo{FullMethodName: "/package.service/method"},
   213  					[]*stats.InPayload{
   214  						{Length: 2048},
   215  						{Length: 16384},
   216  					},
   217  					[]*stats.OutPayload{
   218  						{Length: 2048},
   219  						{Length: 4096},
   220  						{Length: 16384},
   221  					},
   222  					&stats.End{Error: status.Error(codes.Aborted, "aborted")},
   223  				},
   224  			},
   225  			wants: []*wantData{
   226  				{
   227  					func() *view.View { return ClientSentMessagesPerRPCView },
   228  					[]*view.Row{
   229  						{
   230  							Tags: []tag.Tag{
   231  								{Key: KeyClientMethod, Value: "package.service/method"},
   232  							},
   233  							Data: newDistributionData([]int64{0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, 3, 2, 3, 2.666666666, 0.333333333*2),
   234  						},
   235  					},
   236  				},
   237  				{
   238  					func() *view.View { return ClientReceivedMessagesPerRPCView },
   239  					[]*view.Row{
   240  						{
   241  							Tags: []tag.Tag{
   242  								{Key: KeyClientMethod, Value: "package.service/method"},
   243  							},
   244  							Data: newDistributionData([]int64{0, 2, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, 3, 1, 2, 1.333333333, 0.333333333*2),
   245  						},
   246  					},
   247  				},
   248  				{
   249  					func() *view.View { return ClientSentBytesPerRPCView },
   250  					[]*view.Row{
   251  						{
   252  							Tags: []tag.Tag{
   253  								{Key: KeyClientMethod, Value: "package.service/method"},
   254  							},
   255  							Data: newDistributionData([]int64{0, 0, 0, 0, 2 /*16384*/, 1 /*65536*/, 0, 0, 0, 0, 0, 0, 0, 0}, 3, 20480, 66561, 36523, 1.355519318e+09),
   256  						},
   257  					},
   258  				},
   259  				{
   260  					func() *view.View { return ClientReceivedBytesPerRPCView },
   261  					[]*view.Row{
   262  						{
   263  							Tags: []tag.Tag{
   264  								{Key: KeyClientMethod, Value: "package.service/method"},
   265  							},
   266  							Data: newDistributionData([]int64{1, 1, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0}, 3, 1, 18432, 6485.666667, 2.1459558466666666e+08),
   267  						},
   268  					},
   269  				},
   270  			},
   271  		},
   272  	}
   273  
   274  	views := []*view.View{
   275  		ClientSentBytesPerRPCView,
   276  		ClientReceivedBytesPerRPCView,
   277  		ClientRoundtripLatencyView,
   278  		ClientCompletedRPCsView,
   279  		ClientSentMessagesPerRPCView,
   280  		ClientReceivedMessagesPerRPCView,
   281  	}
   282  
   283  	for _, tc := range tcs {
   284  		// Register views.
   285  		if err := view.Register(views...); err != nil {
   286  			t.Error(err)
   287  		}
   288  
   289  		h := &ClientHandler{}
   290  		h.StartOptions.Sampler = trace.NeverSample()
   291  		for _, rpc := range tc.rpcs {
   292  			var mods []tag.Mutator
   293  			for _, t := range rpc.tags {
   294  				mods = append(mods, tag.Upsert(t.k, t.v))
   295  			}
   296  			ctx, err := tag.New(context.Background(), mods...)
   297  			if err != nil {
   298  				t.Errorf("%q: NewMap = %v", tc.label, err)
   299  			}
   300  			encoded := tag.Encode(tag.FromContext(ctx))
   301  			ctx = stats.SetTags(context.Background(), encoded)
   302  			ctx = h.TagRPC(ctx, rpc.tagInfo)
   303  			for _, out := range rpc.outPayloads {
   304  				out.Client = true
   305  				h.HandleRPC(ctx, out)
   306  			}
   307  			for _, in := range rpc.inPayloads {
   308  				in.Client = true
   309  				h.HandleRPC(ctx, in)
   310  			}
   311  			rpc.end.Client = true
   312  			h.HandleRPC(ctx, rpc.end)
   313  		}
   314  
   315  		for _, wantData := range tc.wants {
   316  			gotRows, err := view.RetrieveData(wantData.v().Name)
   317  			if err != nil {
   318  				t.Errorf("%q: RetrieveData(%q) = %v", tc.label, wantData.v().Name, err)
   319  				continue
   320  			}
   321  			for i := range gotRows {
   322  				view.ClearStart(gotRows[i].Data)
   323  			}
   324  
   325  			for _, gotRow := range gotRows {
   326  				if !containsRow(wantData.rows, gotRow) {
   327  					t.Errorf("%q: unwanted row for view %q = %v", tc.label, wantData.v().Name, gotRow)
   328  					break
   329  				}
   330  			}
   331  
   332  			for _, wantRow := range wantData.rows {
   333  				if !containsRow(gotRows, wantRow) {
   334  					t.Errorf("%q: row missing for view %q; want %v", tc.label, wantData.v().Name, wantRow)
   335  					break
   336  				}
   337  			}
   338  		}
   339  
   340  		// Unregister views to cleanup.
   341  		view.Unregister(views...)
   342  	}
   343  }
   344  
   345  func TestClientRecordExemplar(t *testing.T) {
   346  	key := tag.MustNewKey("test_key")
   347  	tagInfo := &stats.RPCTagInfo{FullMethodName: "/package.service/method"}
   348  	out := &stats.OutPayload{Length: 2000}
   349  	end := &stats.End{Error: nil}
   350  
   351  	if err := view.Register(ClientSentBytesPerRPCView); err != nil {
   352  		t.Error(err)
   353  	}
   354  	h := &ClientHandler{}
   355  	h.StartOptions.Sampler = trace.AlwaysSample()
   356  	ctx, err := tag.New(context.Background(), tag.Upsert(key, "test_val"))
   357  	if err != nil {
   358  		t.Error(err)
   359  	}
   360  	encoded := tag.Encode(tag.FromContext(ctx))
   361  	ctx = stats.SetTags(context.Background(), encoded)
   362  	ctx = h.TagRPC(ctx, tagInfo)
   363  
   364  	out.Client = true
   365  	h.HandleRPC(ctx, out)
   366  	end.Client = true
   367  	h.HandleRPC(ctx, end)
   368  
   369  	span := trace.FromContext(ctx)
   370  	if span == nil {
   371  		t.Fatal("expected non-nil span, got nil")
   372  	}
   373  	if !span.IsRecordingEvents() {
   374  		t.Errorf("span should be sampled")
   375  	}
   376  	attachments := map[string]interface{}{metricdata.AttachmentKeySpanContext: span.SpanContext()}
   377  	wantExemplar := &metricdata.Exemplar{Value: 2000, Attachments: attachments}
   378  
   379  	rows, err := view.RetrieveData(ClientSentBytesPerRPCView.Name)
   380  	if err != nil {
   381  		t.Fatal("Error RetrieveData ", err)
   382  	}
   383  	if len(rows) == 0 {
   384  		t.Fatal("No data was recorded.")
   385  	}
   386  	data := rows[0].Data
   387  	dis, ok := data.(*view.DistributionData)
   388  	if !ok {
   389  		t.Fatal("want DistributionData, got ", data)
   390  	}
   391  	// Only recorded value is 2000, which falls into the second bucket (1024, 2048].
   392  	wantBuckets := []int64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
   393  	if !reflect.DeepEqual(dis.CountPerBucket, wantBuckets) {
   394  		t.Errorf("want buckets %v, got %v", wantBuckets, dis.CountPerBucket)
   395  	}
   396  	for i, e := range dis.ExemplarsPerBucket {
   397  		// Only the second bucket should have an exemplar.
   398  		if i == 1 {
   399  			if diff := cmpExemplar(e, wantExemplar); diff != "" {
   400  				t.Fatalf("Unexpected Exemplar -got +want: %s", diff)
   401  			}
   402  		} else if e != nil {
   403  			t.Errorf("want nil exemplar, got %v", e)
   404  		}
   405  	}
   406  
   407  	// Unregister views to cleanup.
   408  	view.Unregister(ClientSentBytesPerRPCView)
   409  }
   410  
   411  // containsRow returns true if rows contain r.
   412  func containsRow(rows []*view.Row, r *view.Row) bool {
   413  	for _, x := range rows {
   414  		if r.Equal(x) {
   415  			return true
   416  		}
   417  	}
   418  	return false
   419  }
   420  
   421  // Compare exemplars while ignoring exemplar timestamp, since timestamp is non-deterministic.
   422  func cmpExemplar(got, want *metricdata.Exemplar) string {
   423  	return cmp.Diff(got, want, cmpopts.IgnoreFields(metricdata.Exemplar{}, "Timestamp"), cmpopts.IgnoreUnexported(metricdata.Exemplar{}))
   424  }
   425  

View as plain text