1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package ocgrpc
17
18 import (
19 "context"
20 "reflect"
21 "testing"
22
23 "google.golang.org/grpc/codes"
24 "google.golang.org/grpc/stats"
25 "google.golang.org/grpc/status"
26
27 "go.opencensus.io/metric/metricdata"
28 "go.opencensus.io/stats/view"
29 "go.opencensus.io/tag"
30 "go.opencensus.io/trace"
31 )
32
33 func TestServerDefaultCollections(t *testing.T) {
34 k1 := tag.MustNewKey("k1")
35 k2 := tag.MustNewKey("k2")
36
37 type tagPair struct {
38 k tag.Key
39 v string
40 }
41
42 type wantData struct {
43 v func() *view.View
44 rows []*view.Row
45 }
46 type rpc struct {
47 tags []tagPair
48 tagInfo *stats.RPCTagInfo
49 inPayloads []*stats.InPayload
50 outPayloads []*stats.OutPayload
51 end *stats.End
52 }
53
54 type testCase struct {
55 label string
56 rpcs []*rpc
57 wants []*wantData
58 }
59
60 tcs := []testCase{
61 {
62 "1",
63 []*rpc{
64 {
65 []tagPair{{k1, "v1"}},
66 &stats.RPCTagInfo{FullMethodName: "/package.service/method"},
67 []*stats.InPayload{
68 {Length: 10},
69 },
70 []*stats.OutPayload{
71 {Length: 10},
72 },
73 &stats.End{Error: nil},
74 },
75 },
76 []*wantData{
77 {
78 func() *view.View { return ServerReceivedMessagesPerRPCView },
79 []*view.Row{
80 {
81 Tags: []tag.Tag{
82 {Key: KeyServerMethod, Value: "package.service/method"},
83 },
84 Data: newDistributionData([]int64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, 1, 1, 1, 1, 0),
85 },
86 },
87 },
88 {
89 func() *view.View { return ServerSentMessagesPerRPCView },
90 []*view.Row{
91 {
92 Tags: []tag.Tag{
93 {Key: KeyServerMethod, Value: "package.service/method"},
94 },
95 Data: newDistributionData([]int64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, 1, 1, 1, 1, 0),
96 },
97 },
98 },
99 {
100 func() *view.View { return ServerReceivedBytesPerRPCView },
101 []*view.Row{
102 {
103 Tags: []tag.Tag{
104 {Key: KeyServerMethod, Value: "package.service/method"},
105 },
106 Data: newDistributionData([]int64{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, 1, 10, 10, 10, 0),
107 },
108 },
109 },
110 {
111 func() *view.View { return ServerSentBytesPerRPCView },
112 []*view.Row{
113 {
114 Tags: []tag.Tag{
115 {Key: KeyServerMethod, Value: "package.service/method"},
116 },
117 Data: newDistributionData([]int64{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, 1, 10, 10, 10, 0),
118 },
119 },
120 },
121 },
122 },
123 {
124 "2",
125 []*rpc{
126 {
127 []tagPair{{k1, "v1"}},
128 &stats.RPCTagInfo{FullMethodName: "/package.service/method"},
129 []*stats.InPayload{
130 {Length: 10},
131 },
132 []*stats.OutPayload{
133 {Length: 10},
134 {Length: 10},
135 {Length: 10},
136 },
137 &stats.End{Error: nil},
138 },
139 {
140 []tagPair{{k1, "v11"}},
141 &stats.RPCTagInfo{FullMethodName: "/package.service/method"},
142 []*stats.InPayload{
143 {Length: 10},
144 {Length: 10},
145 },
146 []*stats.OutPayload{
147 {Length: 10},
148 {Length: 10},
149 },
150 &stats.End{Error: status.Error(codes.Canceled, "canceled")},
151 },
152 },
153 []*wantData{
154 {
155 func() *view.View { return ServerReceivedMessagesPerRPCView },
156 []*view.Row{
157 {
158 Tags: []tag.Tag{
159 {Key: KeyServerMethod, Value: "package.service/method"},
160 },
161 Data: newDistributionData([]int64{0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, 2, 1, 2, 1.5, 0.5),
162 },
163 },
164 },
165 {
166 func() *view.View { return ServerSentMessagesPerRPCView },
167 []*view.Row{
168 {
169 Tags: []tag.Tag{
170 {Key: KeyServerMethod, Value: "package.service/method"},
171 },
172 Data: newDistributionData([]int64{0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, 2, 2, 3, 2.5, 0.5),
173 },
174 },
175 },
176 },
177 },
178 {
179 "3",
180 []*rpc{
181 {
182 []tagPair{{k1, "v1"}},
183 &stats.RPCTagInfo{FullMethodName: "/package.service/method"},
184 []*stats.InPayload{
185 {Length: 1},
186 },
187 []*stats.OutPayload{
188 {Length: 1},
189 {Length: 1024},
190 {Length: 65536},
191 },
192 &stats.End{Error: nil},
193 },
194 {
195 []tagPair{{k1, "v1"}, {k2, "v2"}},
196 &stats.RPCTagInfo{FullMethodName: "/package.service/method"},
197 []*stats.InPayload{
198 {Length: 1024},
199 },
200 []*stats.OutPayload{
201 {Length: 4096},
202 {Length: 16384},
203 },
204 &stats.End{Error: status.Error(codes.Aborted, "aborted")},
205 },
206 {
207 []tagPair{{k1, "v11"}, {k2, "v22"}},
208 &stats.RPCTagInfo{FullMethodName: "/package.service/method"},
209 []*stats.InPayload{
210 {Length: 2048},
211 {Length: 16384},
212 },
213 []*stats.OutPayload{
214 {Length: 2048},
215 {Length: 4096},
216 {Length: 16384},
217 },
218 &stats.End{Error: status.Error(codes.Canceled, "canceled")},
219 },
220 },
221 []*wantData{
222 {
223 func() *view.View { return ServerReceivedMessagesPerRPCView },
224 []*view.Row{
225 {
226 Tags: []tag.Tag{
227 {Key: KeyServerMethod, Value: "package.service/method"},
228 },
229 Data: newDistributionData([]int64{0, 2, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, 3, 1, 2, 1.333333333, 0.333333333*2),
230 },
231 },
232 },
233 {
234 func() *view.View { return ServerSentMessagesPerRPCView },
235 []*view.Row{
236 {
237 Tags: []tag.Tag{
238 {Key: KeyServerMethod, Value: "package.service/method"},
239 },
240 Data: newDistributionData([]int64{0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, 3, 2, 3, 2.666666666, 0.333333333*2),
241 },
242 },
243 },
244 {
245 func() *view.View { return ServerReceivedBytesPerRPCView },
246 []*view.Row{
247 {
248 Tags: []tag.Tag{
249 {Key: KeyServerMethod, Value: "package.service/method"},
250 },
251 Data: newDistributionData([]int64{1, 1, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0}, 3, 1, 18432, 6485.6666667, 2.1459558466666667e+08),
252 },
253 },
254 },
255 {
256 func() *view.View { return ServerSentBytesPerRPCView },
257 []*view.Row{
258 {
259 Tags: []tag.Tag{
260 {Key: KeyServerMethod, Value: "package.service/method"},
261 },
262 Data: newDistributionData([]int64{0, 0, 0, 0, 2, 1, 0, 0, 0, 0, 0, 0, 0, 0}, 3, 20480, 66561, 36523, 1.355519318e+09),
263 },
264 },
265 },
266 },
267 },
268 }
269
270 views := append(DefaultServerViews[:], ServerReceivedMessagesPerRPCView, ServerSentMessagesPerRPCView)
271
272 for _, tc := range tcs {
273 if err := view.Register(views...); err != nil {
274 t.Fatal(err)
275 }
276
277 h := &ServerHandler{}
278 h.StartOptions.Sampler = trace.NeverSample()
279 for _, rpc := range tc.rpcs {
280 mods := []tag.Mutator{}
281 for _, t := range rpc.tags {
282 mods = append(mods, tag.Upsert(t.k, t.v))
283 }
284 ctx, err := tag.New(context.Background(), mods...)
285 if err != nil {
286 t.Errorf("%q: NewMap = %v", tc.label, err)
287 }
288 encoded := tag.Encode(tag.FromContext(ctx))
289 ctx = stats.SetTags(context.Background(), encoded)
290 ctx = h.TagRPC(ctx, rpc.tagInfo)
291
292 for _, in := range rpc.inPayloads {
293 h.HandleRPC(ctx, in)
294 }
295 for _, out := range rpc.outPayloads {
296 h.HandleRPC(ctx, out)
297 }
298 h.HandleRPC(ctx, rpc.end)
299 }
300
301 for _, wantData := range tc.wants {
302 gotRows, err := view.RetrieveData(wantData.v().Name)
303 if err != nil {
304 t.Errorf("%q: RetrieveData (%q) = %v", tc.label, wantData.v().Name, err)
305 continue
306 }
307
308 for i := range gotRows {
309 view.ClearStart(gotRows[i].Data)
310 }
311
312 for _, gotRow := range gotRows {
313 if !containsRow(wantData.rows, gotRow) {
314 t.Errorf("%q: unwanted row for view %q: %v", tc.label, wantData.v().Name, gotRow)
315 break
316 }
317 }
318
319 for _, wantRow := range wantData.rows {
320 if !containsRow(gotRows, wantRow) {
321 t.Errorf("%q: missing row for view %q: %v", tc.label, wantData.v().Name, wantRow)
322 break
323 }
324 }
325 }
326
327
328 view.Unregister(views...)
329 }
330 }
331
332 func newDistributionData(countPerBucket []int64, count int64, min, max, mean, sumOfSquaredDev float64) *view.DistributionData {
333 return &view.DistributionData{
334 Count: count,
335 Min: min,
336 Max: max,
337 Mean: mean,
338 SumOfSquaredDev: sumOfSquaredDev,
339 CountPerBucket: countPerBucket,
340 }
341 }
342
343 func TestServerRecordExemplar(t *testing.T) {
344 key := tag.MustNewKey("test_key")
345 tagInfo := &stats.RPCTagInfo{FullMethodName: "/package.service/method"}
346 out := &stats.OutPayload{Length: 2000}
347 end := &stats.End{Error: nil}
348
349 if err := view.Register(ServerSentBytesPerRPCView); err != nil {
350 t.Error(err)
351 }
352 h := &ServerHandler{}
353 h.StartOptions.Sampler = trace.AlwaysSample()
354 ctx, err := tag.New(context.Background(), tag.Upsert(key, "test_val"))
355 if err != nil {
356 t.Error(err)
357 }
358 encoded := tag.Encode(tag.FromContext(ctx))
359 ctx = stats.SetTags(context.Background(), encoded)
360 ctx = h.TagRPC(ctx, tagInfo)
361
362 out.Client = false
363 h.HandleRPC(ctx, out)
364 end.Client = false
365 h.HandleRPC(ctx, end)
366
367 span := trace.FromContext(ctx)
368 if span == nil {
369 t.Fatal("expected non-nil span, got nil")
370 }
371 if !span.IsRecordingEvents() {
372 t.Errorf("span should be sampled")
373 }
374 attachments := map[string]interface{}{metricdata.AttachmentKeySpanContext: span.SpanContext()}
375 wantExemplar := &metricdata.Exemplar{Value: 2000, Attachments: attachments}
376
377 rows, err := view.RetrieveData(ServerSentBytesPerRPCView.Name)
378 if err != nil {
379 t.Fatal("Error RetrieveData ", err)
380 }
381 if len(rows) == 0 {
382 t.Fatal("No data was recorded.")
383 }
384 data := rows[0].Data
385 dis, ok := data.(*view.DistributionData)
386 if !ok {
387 t.Fatal("want DistributionData, got ", data)
388 }
389
390 wantBuckets := []int64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
391 if !reflect.DeepEqual(dis.CountPerBucket, wantBuckets) {
392 t.Errorf("want buckets %v, got %v", wantBuckets, dis.CountPerBucket)
393 }
394 for i, e := range dis.ExemplarsPerBucket {
395
396 if i == 1 {
397 if diff := cmpExemplar(e, wantExemplar); diff != "" {
398 t.Fatalf("Unexpected Exemplar -got +want: %s", diff)
399 }
400 } else if e != nil {
401 t.Errorf("want nil exemplar, got %v", e)
402 }
403 }
404
405
406 view.Unregister(ServerSentBytesPerRPCView)
407 }
408
View as plain text