...

Source file src/go.opencensus.io/plugin/ocgrpc/server_stats_handler_test.go

Documentation: go.opencensus.io/plugin/ocgrpc

     1  // Copyright 2017, OpenCensus Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  //
    15  
    16  package ocgrpc
    17  
    18  import (
    19  	"context"
    20  	"reflect"
    21  	"testing"
    22  
    23  	"google.golang.org/grpc/codes"
    24  	"google.golang.org/grpc/stats"
    25  	"google.golang.org/grpc/status"
    26  
    27  	"go.opencensus.io/metric/metricdata"
    28  	"go.opencensus.io/stats/view"
    29  	"go.opencensus.io/tag"
    30  	"go.opencensus.io/trace"
    31  )
    32  
    33  func TestServerDefaultCollections(t *testing.T) {
    34  	k1 := tag.MustNewKey("k1")
    35  	k2 := tag.MustNewKey("k2")
    36  
    37  	type tagPair struct {
    38  		k tag.Key
    39  		v string
    40  	}
    41  
    42  	type wantData struct {
    43  		v    func() *view.View
    44  		rows []*view.Row
    45  	}
    46  	type rpc struct {
    47  		tags        []tagPair
    48  		tagInfo     *stats.RPCTagInfo
    49  		inPayloads  []*stats.InPayload
    50  		outPayloads []*stats.OutPayload
    51  		end         *stats.End
    52  	}
    53  
    54  	type testCase struct {
    55  		label string
    56  		rpcs  []*rpc
    57  		wants []*wantData
    58  	}
    59  
    60  	tcs := []testCase{
    61  		{
    62  			"1",
    63  			[]*rpc{
    64  				{
    65  					[]tagPair{{k1, "v1"}},
    66  					&stats.RPCTagInfo{FullMethodName: "/package.service/method"},
    67  					[]*stats.InPayload{
    68  						{Length: 10},
    69  					},
    70  					[]*stats.OutPayload{
    71  						{Length: 10},
    72  					},
    73  					&stats.End{Error: nil},
    74  				},
    75  			},
    76  			[]*wantData{
    77  				{
    78  					func() *view.View { return ServerReceivedMessagesPerRPCView },
    79  					[]*view.Row{
    80  						{
    81  							Tags: []tag.Tag{
    82  								{Key: KeyServerMethod, Value: "package.service/method"},
    83  							},
    84  							Data: newDistributionData([]int64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, 1, 1, 1, 1, 0),
    85  						},
    86  					},
    87  				},
    88  				{
    89  					func() *view.View { return ServerSentMessagesPerRPCView },
    90  					[]*view.Row{
    91  						{
    92  							Tags: []tag.Tag{
    93  								{Key: KeyServerMethod, Value: "package.service/method"},
    94  							},
    95  							Data: newDistributionData([]int64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, 1, 1, 1, 1, 0),
    96  						},
    97  					},
    98  				},
    99  				{
   100  					func() *view.View { return ServerReceivedBytesPerRPCView },
   101  					[]*view.Row{
   102  						{
   103  							Tags: []tag.Tag{
   104  								{Key: KeyServerMethod, Value: "package.service/method"},
   105  							},
   106  							Data: newDistributionData([]int64{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, 1, 10, 10, 10, 0),
   107  						},
   108  					},
   109  				},
   110  				{
   111  					func() *view.View { return ServerSentBytesPerRPCView },
   112  					[]*view.Row{
   113  						{
   114  							Tags: []tag.Tag{
   115  								{Key: KeyServerMethod, Value: "package.service/method"},
   116  							},
   117  							Data: newDistributionData([]int64{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, 1, 10, 10, 10, 0),
   118  						},
   119  					},
   120  				},
   121  			},
   122  		},
   123  		{
   124  			"2",
   125  			[]*rpc{
   126  				{
   127  					[]tagPair{{k1, "v1"}},
   128  					&stats.RPCTagInfo{FullMethodName: "/package.service/method"},
   129  					[]*stats.InPayload{
   130  						{Length: 10},
   131  					},
   132  					[]*stats.OutPayload{
   133  						{Length: 10},
   134  						{Length: 10},
   135  						{Length: 10},
   136  					},
   137  					&stats.End{Error: nil},
   138  				},
   139  				{
   140  					[]tagPair{{k1, "v11"}},
   141  					&stats.RPCTagInfo{FullMethodName: "/package.service/method"},
   142  					[]*stats.InPayload{
   143  						{Length: 10},
   144  						{Length: 10},
   145  					},
   146  					[]*stats.OutPayload{
   147  						{Length: 10},
   148  						{Length: 10},
   149  					},
   150  					&stats.End{Error: status.Error(codes.Canceled, "canceled")},
   151  				},
   152  			},
   153  			[]*wantData{
   154  				{
   155  					func() *view.View { return ServerReceivedMessagesPerRPCView },
   156  					[]*view.Row{
   157  						{
   158  							Tags: []tag.Tag{
   159  								{Key: KeyServerMethod, Value: "package.service/method"},
   160  							},
   161  							Data: newDistributionData([]int64{0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, 2, 1, 2, 1.5, 0.5),
   162  						},
   163  					},
   164  				},
   165  				{
   166  					func() *view.View { return ServerSentMessagesPerRPCView },
   167  					[]*view.Row{
   168  						{
   169  							Tags: []tag.Tag{
   170  								{Key: KeyServerMethod, Value: "package.service/method"},
   171  							},
   172  							Data: newDistributionData([]int64{0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, 2, 2, 3, 2.5, 0.5),
   173  						},
   174  					},
   175  				},
   176  			},
   177  		},
   178  		{
   179  			"3",
   180  			[]*rpc{
   181  				{
   182  					[]tagPair{{k1, "v1"}},
   183  					&stats.RPCTagInfo{FullMethodName: "/package.service/method"},
   184  					[]*stats.InPayload{
   185  						{Length: 1},
   186  					},
   187  					[]*stats.OutPayload{
   188  						{Length: 1},
   189  						{Length: 1024},
   190  						{Length: 65536},
   191  					},
   192  					&stats.End{Error: nil},
   193  				},
   194  				{
   195  					[]tagPair{{k1, "v1"}, {k2, "v2"}},
   196  					&stats.RPCTagInfo{FullMethodName: "/package.service/method"},
   197  					[]*stats.InPayload{
   198  						{Length: 1024},
   199  					},
   200  					[]*stats.OutPayload{
   201  						{Length: 4096},
   202  						{Length: 16384},
   203  					},
   204  					&stats.End{Error: status.Error(codes.Aborted, "aborted")},
   205  				},
   206  				{
   207  					[]tagPair{{k1, "v11"}, {k2, "v22"}},
   208  					&stats.RPCTagInfo{FullMethodName: "/package.service/method"},
   209  					[]*stats.InPayload{
   210  						{Length: 2048},
   211  						{Length: 16384},
   212  					},
   213  					[]*stats.OutPayload{
   214  						{Length: 2048},
   215  						{Length: 4096},
   216  						{Length: 16384},
   217  					},
   218  					&stats.End{Error: status.Error(codes.Canceled, "canceled")},
   219  				},
   220  			},
   221  			[]*wantData{
   222  				{
   223  					func() *view.View { return ServerReceivedMessagesPerRPCView },
   224  					[]*view.Row{
   225  						{
   226  							Tags: []tag.Tag{
   227  								{Key: KeyServerMethod, Value: "package.service/method"},
   228  							},
   229  							Data: newDistributionData([]int64{0, 2, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, 3, 1, 2, 1.333333333, 0.333333333*2),
   230  						},
   231  					},
   232  				},
   233  				{
   234  					func() *view.View { return ServerSentMessagesPerRPCView },
   235  					[]*view.Row{
   236  						{
   237  							Tags: []tag.Tag{
   238  								{Key: KeyServerMethod, Value: "package.service/method"},
   239  							},
   240  							Data: newDistributionData([]int64{0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, 3, 2, 3, 2.666666666, 0.333333333*2),
   241  						},
   242  					},
   243  				},
   244  				{
   245  					func() *view.View { return ServerReceivedBytesPerRPCView },
   246  					[]*view.Row{
   247  						{
   248  							Tags: []tag.Tag{
   249  								{Key: KeyServerMethod, Value: "package.service/method"},
   250  							},
   251  							Data: newDistributionData([]int64{1, 1, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0}, 3, 1, 18432, 6485.6666667, 2.1459558466666667e+08),
   252  						},
   253  					},
   254  				},
   255  				{
   256  					func() *view.View { return ServerSentBytesPerRPCView },
   257  					[]*view.Row{
   258  						{
   259  							Tags: []tag.Tag{
   260  								{Key: KeyServerMethod, Value: "package.service/method"},
   261  							},
   262  							Data: newDistributionData([]int64{0, 0, 0, 0, 2, 1, 0, 0, 0, 0, 0, 0, 0, 0}, 3, 20480, 66561, 36523, 1.355519318e+09),
   263  						},
   264  					},
   265  				},
   266  			},
   267  		},
   268  	}
   269  
   270  	views := append(DefaultServerViews[:], ServerReceivedMessagesPerRPCView, ServerSentMessagesPerRPCView)
   271  
   272  	for _, tc := range tcs {
   273  		if err := view.Register(views...); err != nil {
   274  			t.Fatal(err)
   275  		}
   276  
   277  		h := &ServerHandler{}
   278  		h.StartOptions.Sampler = trace.NeverSample()
   279  		for _, rpc := range tc.rpcs {
   280  			mods := []tag.Mutator{}
   281  			for _, t := range rpc.tags {
   282  				mods = append(mods, tag.Upsert(t.k, t.v))
   283  			}
   284  			ctx, err := tag.New(context.Background(), mods...)
   285  			if err != nil {
   286  				t.Errorf("%q: NewMap = %v", tc.label, err)
   287  			}
   288  			encoded := tag.Encode(tag.FromContext(ctx))
   289  			ctx = stats.SetTags(context.Background(), encoded)
   290  			ctx = h.TagRPC(ctx, rpc.tagInfo)
   291  
   292  			for _, in := range rpc.inPayloads {
   293  				h.HandleRPC(ctx, in)
   294  			}
   295  			for _, out := range rpc.outPayloads {
   296  				h.HandleRPC(ctx, out)
   297  			}
   298  			h.HandleRPC(ctx, rpc.end)
   299  		}
   300  
   301  		for _, wantData := range tc.wants {
   302  			gotRows, err := view.RetrieveData(wantData.v().Name)
   303  			if err != nil {
   304  				t.Errorf("%q: RetrieveData (%q) = %v", tc.label, wantData.v().Name, err)
   305  				continue
   306  			}
   307  
   308  			for i := range gotRows {
   309  				view.ClearStart(gotRows[i].Data)
   310  			}
   311  
   312  			for _, gotRow := range gotRows {
   313  				if !containsRow(wantData.rows, gotRow) {
   314  					t.Errorf("%q: unwanted row for view %q: %v", tc.label, wantData.v().Name, gotRow)
   315  					break
   316  				}
   317  			}
   318  
   319  			for _, wantRow := range wantData.rows {
   320  				if !containsRow(gotRows, wantRow) {
   321  					t.Errorf("%q: missing row for view %q: %v", tc.label, wantData.v().Name, wantRow)
   322  					break
   323  				}
   324  			}
   325  		}
   326  
   327  		// Unregister views to cleanup.
   328  		view.Unregister(views...)
   329  	}
   330  }
   331  
   332  func newDistributionData(countPerBucket []int64, count int64, min, max, mean, sumOfSquaredDev float64) *view.DistributionData {
   333  	return &view.DistributionData{
   334  		Count:           count,
   335  		Min:             min,
   336  		Max:             max,
   337  		Mean:            mean,
   338  		SumOfSquaredDev: sumOfSquaredDev,
   339  		CountPerBucket:  countPerBucket,
   340  	}
   341  }
   342  
   343  func TestServerRecordExemplar(t *testing.T) {
   344  	key := tag.MustNewKey("test_key")
   345  	tagInfo := &stats.RPCTagInfo{FullMethodName: "/package.service/method"}
   346  	out := &stats.OutPayload{Length: 2000}
   347  	end := &stats.End{Error: nil}
   348  
   349  	if err := view.Register(ServerSentBytesPerRPCView); err != nil {
   350  		t.Error(err)
   351  	}
   352  	h := &ServerHandler{}
   353  	h.StartOptions.Sampler = trace.AlwaysSample()
   354  	ctx, err := tag.New(context.Background(), tag.Upsert(key, "test_val"))
   355  	if err != nil {
   356  		t.Error(err)
   357  	}
   358  	encoded := tag.Encode(tag.FromContext(ctx))
   359  	ctx = stats.SetTags(context.Background(), encoded)
   360  	ctx = h.TagRPC(ctx, tagInfo)
   361  
   362  	out.Client = false
   363  	h.HandleRPC(ctx, out)
   364  	end.Client = false
   365  	h.HandleRPC(ctx, end)
   366  
   367  	span := trace.FromContext(ctx)
   368  	if span == nil {
   369  		t.Fatal("expected non-nil span, got nil")
   370  	}
   371  	if !span.IsRecordingEvents() {
   372  		t.Errorf("span should be sampled")
   373  	}
   374  	attachments := map[string]interface{}{metricdata.AttachmentKeySpanContext: span.SpanContext()}
   375  	wantExemplar := &metricdata.Exemplar{Value: 2000, Attachments: attachments}
   376  
   377  	rows, err := view.RetrieveData(ServerSentBytesPerRPCView.Name)
   378  	if err != nil {
   379  		t.Fatal("Error RetrieveData ", err)
   380  	}
   381  	if len(rows) == 0 {
   382  		t.Fatal("No data was recorded.")
   383  	}
   384  	data := rows[0].Data
   385  	dis, ok := data.(*view.DistributionData)
   386  	if !ok {
   387  		t.Fatal("want DistributionData, got ", data)
   388  	}
   389  	// Only recorded value is 2000, which falls into the second bucket (1024, 2048].
   390  	wantBuckets := []int64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
   391  	if !reflect.DeepEqual(dis.CountPerBucket, wantBuckets) {
   392  		t.Errorf("want buckets %v, got %v", wantBuckets, dis.CountPerBucket)
   393  	}
   394  	for i, e := range dis.ExemplarsPerBucket {
   395  		// Only the second bucket should have an exemplar.
   396  		if i == 1 {
   397  			if diff := cmpExemplar(e, wantExemplar); diff != "" {
   398  				t.Fatalf("Unexpected Exemplar -got +want: %s", diff)
   399  			}
   400  		} else if e != nil {
   401  			t.Errorf("want nil exemplar, got %v", e)
   402  		}
   403  	}
   404  
   405  	// Unregister views to cleanup.
   406  	view.Unregister(ServerSentBytesPerRPCView)
   407  }
   408  

View as plain text