1
18
19 package test
20
21 import (
22 "context"
23 "crypto/tls"
24 "fmt"
25 "net"
26 "regexp"
27 "strings"
28 "sync"
29 "testing"
30 "time"
31
32 "github.com/google/go-cmp/cmp"
33 "golang.org/x/net/http2"
34 "google.golang.org/grpc"
35 _ "google.golang.org/grpc/balancer/grpclb"
36 grpclbstate "google.golang.org/grpc/balancer/grpclb/state"
37 "google.golang.org/grpc/balancer/roundrobin"
38 "google.golang.org/grpc/codes"
39 "google.golang.org/grpc/connectivity"
40 "google.golang.org/grpc/credentials"
41 "google.golang.org/grpc/internal"
42 "google.golang.org/grpc/internal/channelz"
43 "google.golang.org/grpc/internal/stubserver"
44 "google.golang.org/grpc/internal/testutils"
45 "google.golang.org/grpc/keepalive"
46 "google.golang.org/grpc/resolver"
47 "google.golang.org/grpc/resolver/manual"
48 "google.golang.org/grpc/status"
49 "google.golang.org/grpc/testdata"
50
51 testgrpc "google.golang.org/grpc/interop/grpc_testing"
52 testpb "google.golang.org/grpc/interop/grpc_testing"
53 )
54
55 func verifyResultWithDelay(f func() (bool, error)) error {
56 var ok bool
57 var err error
58 for i := 0; i < 1000; i++ {
59 if ok, err = f(); ok {
60 return nil
61 }
62 time.Sleep(10 * time.Millisecond)
63 }
64 return err
65 }
66
67 func (s) TestCZServerRegistrationAndDeletion(t *testing.T) {
68 testcases := []struct {
69 total int
70 start int64
71 max int
72 length int
73 end bool
74 }{
75 {total: int(channelz.EntriesPerPage), start: 0, max: 0, length: channelz.EntriesPerPage, end: true},
76 {total: int(channelz.EntriesPerPage) - 1, start: 0, max: 0, length: channelz.EntriesPerPage - 1, end: true},
77 {total: int(channelz.EntriesPerPage) + 1, start: 0, max: 0, length: channelz.EntriesPerPage, end: false},
78 {total: int(channelz.EntriesPerPage) + 1, start: int64(2*(channelz.EntriesPerPage+1) + 1), max: 0, length: 0, end: true},
79 {total: int(channelz.EntriesPerPage), start: 0, max: 1, length: 1, end: false},
80 {total: int(channelz.EntriesPerPage), start: 0, max: channelz.EntriesPerPage - 1, length: channelz.EntriesPerPage - 1, end: false},
81 }
82
83 for i, c := range testcases {
84
85 channelz.IDGen.Reset()
86
87 e := tcpClearRREnv
88 te := newTest(t, e)
89 te.startServers(&testServer{security: e.security}, c.total)
90
91 ss, end := channelz.GetServers(c.start, c.max)
92 if len(ss) != c.length || end != c.end {
93 t.Fatalf("%d: GetServers(%d) = %+v (len of which: %d), end: %+v, want len(GetServers(%d)) = %d, end: %+v", i, c.start, ss, len(ss), end, c.start, c.length, c.end)
94 }
95 te.tearDown()
96 ss, end = channelz.GetServers(c.start, c.max)
97 if len(ss) != 0 || !end {
98 t.Fatalf("%d: GetServers(0) = %+v (len of which: %d), end: %+v, want len(GetServers(0)) = 0, end: true", i, ss, len(ss), end)
99 }
100 }
101 }
102
103 func (s) TestCZGetChannel(t *testing.T) {
104 e := tcpClearRREnv
105 e.balancer = ""
106 te := newTest(t, e)
107 te.startServer(&testServer{security: e.security})
108 r := manual.NewBuilderWithScheme("whatever")
109 addrs := []resolver.Address{{Addr: te.srvAddr}}
110 r.InitialState(resolver.State{Addresses: addrs})
111 te.resolverScheme = r.Scheme()
112 te.clientConn(grpc.WithResolvers(r))
113 defer te.tearDown()
114 if err := verifyResultWithDelay(func() (bool, error) {
115 tcs, _ := channelz.GetTopChannels(0, 0)
116 if len(tcs) != 1 {
117 return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
118 }
119 target := tcs[0].ChannelMetrics.Target.Load()
120 wantTarget := "whatever:///" + te.srvAddr
121 if target == nil || *target != wantTarget {
122 return false, fmt.Errorf("Got channelz target=%v; want %q", target, wantTarget)
123 }
124 state := tcs[0].ChannelMetrics.State.Load()
125 if state == nil || *state != connectivity.Ready {
126 return false, fmt.Errorf("Got channelz state=%v; want %q", state, connectivity.Ready)
127 }
128 return true, nil
129 }); err != nil {
130 t.Fatal(err)
131 }
132 }
133
134 func (s) TestCZGetSubChannel(t *testing.T) {
135 e := tcpClearRREnv
136 e.balancer = ""
137 te := newTest(t, e)
138 te.startServer(&testServer{security: e.security})
139 r := manual.NewBuilderWithScheme("whatever")
140 addrs := []resolver.Address{{Addr: te.srvAddr}}
141 r.InitialState(resolver.State{Addresses: addrs})
142 te.resolverScheme = r.Scheme()
143 te.clientConn(grpc.WithResolvers(r))
144 defer te.tearDown()
145 if err := verifyResultWithDelay(func() (bool, error) {
146 tcs, _ := channelz.GetTopChannels(0, 0)
147 if len(tcs) != 1 {
148 return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
149 }
150 scs := tcs[0].SubChans()
151 if len(scs) != 1 {
152 return false, fmt.Errorf("there should be one subchannel, not %d", len(scs))
153 }
154 var scid int64
155 for scid = range scs {
156 }
157 sc := channelz.GetSubChannel(scid)
158 if sc == nil {
159 return false, fmt.Errorf("subchannel with id %v is nil", scid)
160 }
161 target := sc.ChannelMetrics.Target.Load()
162 if target == nil || !strings.HasPrefix(*target, "localhost") {
163 t.Fatalf("subchannel target must never be set incorrectly; got: %v, want <HasPrefix('localhost')>", target)
164 }
165 state := sc.ChannelMetrics.State.Load()
166 if state == nil || *state != connectivity.Ready {
167 return false, fmt.Errorf("Got subchannel state=%v; want %q", state, connectivity.Ready)
168 }
169 return true, nil
170 }); err != nil {
171 t.Fatal(err)
172 }
173 }
174
175 func (s) TestCZGetServer(t *testing.T) {
176 e := tcpClearRREnv
177 te := newTest(t, e)
178 te.startServer(&testServer{security: e.security})
179 defer te.tearDown()
180
181 ss, _ := channelz.GetServers(0, 0)
182 if len(ss) != 1 {
183 t.Fatalf("there should only be one server, not %d", len(ss))
184 }
185
186 serverID := ss[0].ID
187 srv := channelz.GetServer(serverID)
188 if srv == nil {
189 t.Fatalf("server %d does not exist", serverID)
190 }
191 if srv.ID != serverID {
192 t.Fatalf("server want id %d, but got %d", serverID, srv.ID)
193 }
194
195 te.tearDown()
196
197 if err := verifyResultWithDelay(func() (bool, error) {
198 srv := channelz.GetServer(serverID)
199 if srv != nil {
200 return false, fmt.Errorf("server %d should not exist", serverID)
201 }
202
203 return true, nil
204 }); err != nil {
205 t.Fatal(err)
206 }
207 }
208
209 func (s) TestCZGetSocket(t *testing.T) {
210 e := tcpClearRREnv
211 te := newTest(t, e)
212 lis := te.listenAndServe(&testServer{security: e.security}, net.Listen)
213 defer te.tearDown()
214
215 if err := verifyResultWithDelay(func() (bool, error) {
216 ss, _ := channelz.GetServers(0, 0)
217 if len(ss) != 1 {
218 return false, fmt.Errorf("len(ss) = %v; want %v", len(ss), 1)
219 }
220
221 serverID := ss[0].ID
222 srv := channelz.GetServer(serverID)
223 if srv == nil {
224 return false, fmt.Errorf("server %d does not exist", serverID)
225 }
226 if srv.ID != serverID {
227 return false, fmt.Errorf("srv.ID = %d; want %v", srv.ID, serverID)
228 }
229
230 skts := srv.ListenSockets()
231 if got, want := len(skts), 1; got != want {
232 return false, fmt.Errorf("len(skts) = %v; want %v", got, want)
233 }
234 var sktID int64
235 for sktID = range skts {
236 }
237
238 skt := channelz.GetSocket(sktID)
239 if skt == nil {
240 return false, fmt.Errorf("socket %v does not exist", sktID)
241 }
242
243 if got, want := skt.LocalAddr, lis.Addr(); got != want {
244 return false, fmt.Errorf("socket %v LocalAddr=%v; want %v", sktID, got, want)
245 }
246 return true, nil
247 }); err != nil {
248 t.Fatal(err)
249 }
250 }
251
252 func (s) TestCZTopChannelRegistrationAndDeletion(t *testing.T) {
253 testcases := []struct {
254 total int
255 start int64
256 max int
257 length int
258 end bool
259 }{
260 {total: int(channelz.EntriesPerPage), start: 0, max: 0, length: channelz.EntriesPerPage, end: true},
261 {total: int(channelz.EntriesPerPage) - 1, start: 0, max: 0, length: channelz.EntriesPerPage - 1, end: true},
262 {total: int(channelz.EntriesPerPage) + 1, start: 0, max: 0, length: channelz.EntriesPerPage, end: false},
263 {total: int(channelz.EntriesPerPage) + 1, start: int64(2*(channelz.EntriesPerPage+1) + 1), max: 0, length: 0, end: true},
264 {total: int(channelz.EntriesPerPage), start: 0, max: 1, length: 1, end: false},
265 {total: int(channelz.EntriesPerPage), start: 0, max: channelz.EntriesPerPage - 1, length: channelz.EntriesPerPage - 1, end: false},
266 }
267
268 for _, c := range testcases {
269
270 channelz.IDGen.Reset()
271
272 e := tcpClearRREnv
273 te := newTest(t, e)
274 var ccs []*grpc.ClientConn
275 for i := 0; i < c.total; i++ {
276 cc := te.clientConn()
277 te.cc = nil
278
279 te.srvAddr = ""
280 ccs = append(ccs, cc)
281 }
282 if err := verifyResultWithDelay(func() (bool, error) {
283 if tcs, end := channelz.GetTopChannels(c.start, c.max); len(tcs) != c.length || end != c.end {
284 return false, fmt.Errorf("getTopChannels(%d) = %+v (len of which: %d), end: %+v, want len(GetTopChannels(%d)) = %d, end: %+v", c.start, tcs, len(tcs), end, c.start, c.length, c.end)
285 }
286 return true, nil
287 }); err != nil {
288 t.Fatal(err)
289 }
290
291 for _, cc := range ccs {
292 cc.Close()
293 }
294
295 if err := verifyResultWithDelay(func() (bool, error) {
296 if tcs, end := channelz.GetTopChannels(c.start, c.max); len(tcs) != 0 || !end {
297 return false, fmt.Errorf("getTopChannels(0) = %+v (len of which: %d), end: %+v, want len(GetTopChannels(0)) = 0, end: true", tcs, len(tcs), end)
298 }
299 return true, nil
300 }); err != nil {
301 t.Fatal(err)
302 }
303 te.tearDown()
304 }
305 }
306
307 func (s) TestCZTopChannelRegistrationAndDeletionWhenDialFail(t *testing.T) {
308
309 _, err := grpc.NewClient("fake.addr")
310 if err == nil {
311 t.Fatal("expecting dial to fail")
312 }
313 if tcs, end := channelz.GetTopChannels(0, 0); tcs != nil || !end {
314 t.Fatalf("GetTopChannels(0, 0) = %v, %v, want <nil>, true", tcs, end)
315 }
316 }
317
318 func (s) TestCZNestedChannelRegistrationAndDeletion(t *testing.T) {
319 e := tcpClearRREnv
320
321 e.balancer = ""
322 te := newTest(t, e)
323 r := manual.NewBuilderWithScheme("whatever")
324 te.resolverScheme = r.Scheme()
325 te.clientConn(grpc.WithResolvers(r))
326 resolvedAddrs := []resolver.Address{{Addr: "127.0.0.1:0", ServerName: "grpclb.server"}}
327 grpclbConfig := parseServiceConfig(t, r, `{"loadBalancingPolicy": "grpclb"}`)
328 r.UpdateState(grpclbstate.Set(resolver.State{ServiceConfig: grpclbConfig}, &grpclbstate.State{BalancerAddresses: resolvedAddrs}))
329 defer te.tearDown()
330
331 if err := verifyResultWithDelay(func() (bool, error) {
332 tcs, _ := channelz.GetTopChannels(0, 0)
333 if len(tcs) != 1 {
334 return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
335 }
336 if nestedChans := tcs[0].NestedChans(); len(nestedChans) != 1 {
337 return false, fmt.Errorf("there should be one nested channel from grpclb, not %d", len(nestedChans))
338 }
339 return true, nil
340 }); err != nil {
341 t.Fatal(err)
342 }
343
344 r.UpdateState(resolver.State{
345 Addresses: []resolver.Address{{Addr: "127.0.0.1:0"}},
346 ServiceConfig: parseServiceConfig(t, r, `{"loadBalancingPolicy": "round_robin"}`),
347 })
348
349
350 if err := verifyResultWithDelay(func() (bool, error) {
351 tcs, _ := channelz.GetTopChannels(0, 0)
352 if len(tcs) != 1 {
353 return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
354 }
355 if nestedChans := tcs[0].NestedChans(); len(nestedChans) != 0 {
356 return false, fmt.Errorf("there should be 0 nested channel from grpclb, not %d", len(nestedChans))
357 }
358 return true, nil
359 }); err != nil {
360 t.Fatal(err)
361 }
362 }
363
364 func (s) TestCZClientSubChannelSocketRegistrationAndDeletion(t *testing.T) {
365 e := tcpClearRREnv
366 num := 3
367 te := newTest(t, e)
368 var svrAddrs []resolver.Address
369 te.startServers(&testServer{security: e.security}, num)
370 r := manual.NewBuilderWithScheme("whatever")
371 for _, a := range te.srvAddrs {
372 svrAddrs = append(svrAddrs, resolver.Address{Addr: a})
373 }
374 r.InitialState(resolver.State{Addresses: svrAddrs})
375 te.resolverScheme = r.Scheme()
376 te.clientConn(grpc.WithResolvers(r))
377 defer te.tearDown()
378
379
380 if err := verifyResultWithDelay(func() (bool, error) {
381 tcs, _ := channelz.GetTopChannels(0, 0)
382 if len(tcs) != 1 {
383 return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
384 }
385 subChans := tcs[0].SubChans()
386 if len(subChans) != num {
387 return false, fmt.Errorf("there should be %d subchannel not %d", num, len(subChans))
388 }
389 count := 0
390 for k := range subChans {
391 sc := channelz.GetSubChannel(k)
392 if sc == nil {
393 return false, fmt.Errorf("got <nil> subchannel")
394 }
395 count += len(sc.Sockets())
396 }
397 if count != num {
398 return false, fmt.Errorf("there should be %d sockets not %d", num, count)
399 }
400
401 return true, nil
402 }); err != nil {
403 t.Fatal(err)
404 }
405
406 r.UpdateState(resolver.State{Addresses: svrAddrs[:len(svrAddrs)-1]})
407
408 if err := verifyResultWithDelay(func() (bool, error) {
409 tcs, _ := channelz.GetTopChannels(0, 0)
410 if len(tcs) != 1 {
411 return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
412 }
413 subChans := tcs[0].SubChans()
414 if len(subChans) != num-1 {
415 return false, fmt.Errorf("there should be %d subchannel not %d", num-1, len(subChans))
416 }
417 count := 0
418 for k := range subChans {
419 sc := channelz.GetSubChannel(k)
420 if sc == nil {
421 return false, fmt.Errorf("got <nil> subchannel")
422 }
423 count += len(sc.Sockets())
424 }
425 if count != num-1 {
426 return false, fmt.Errorf("there should be %d sockets not %d", num-1, count)
427 }
428
429 return true, nil
430 }); err != nil {
431 t.Fatal(err)
432 }
433 }
434
435 func (s) TestCZServerSocketRegistrationAndDeletion(t *testing.T) {
436 testcases := []struct {
437 total int
438 start int64
439 max int
440 length int
441 end bool
442 }{
443 {total: int(channelz.EntriesPerPage), start: 0, max: 0, length: channelz.EntriesPerPage, end: true},
444 {total: int(channelz.EntriesPerPage) - 1, start: 0, max: 0, length: channelz.EntriesPerPage - 1, end: true},
445 {total: int(channelz.EntriesPerPage) + 1, start: 0, max: 0, length: channelz.EntriesPerPage, end: false},
446 {total: int(channelz.EntriesPerPage), start: 1, max: 0, length: channelz.EntriesPerPage - 1, end: true},
447 {total: int(channelz.EntriesPerPage) + 1, start: int64(channelz.EntriesPerPage) + 1, max: 0, length: 0, end: true},
448 {total: int(channelz.EntriesPerPage), start: 0, max: 1, length: 1, end: false},
449 {total: int(channelz.EntriesPerPage), start: 0, max: channelz.EntriesPerPage - 1, length: channelz.EntriesPerPage - 1, end: false},
450 }
451
452 for _, c := range testcases {
453
454 channelz.IDGen.Reset()
455
456 e := tcpClearRREnv
457 te := newTest(t, e)
458 te.startServer(&testServer{security: e.security})
459 var ccs []*grpc.ClientConn
460 for i := 0; i < c.total; i++ {
461 cc := te.clientConn()
462 te.cc = nil
463 ccs = append(ccs, cc)
464 }
465
466 var svrID int64
467 if err := verifyResultWithDelay(func() (bool, error) {
468 ss, _ := channelz.GetServers(0, 0)
469 if len(ss) != 1 {
470 return false, fmt.Errorf("there should only be one server, not %d", len(ss))
471 }
472 if got := len(ss[0].ListenSockets()); got != 1 {
473 return false, fmt.Errorf("there should only be one server listen socket, not %d", got)
474 }
475
476 startID := c.start
477 if startID != 0 {
478 ns, _ := channelz.GetServerSockets(ss[0].ID, 0, c.total)
479 if int64(len(ns)) < c.start {
480 return false, fmt.Errorf("there should more than %d sockets, not %d", len(ns), c.start)
481 }
482 startID = ns[c.start-1].ID + 1
483 }
484
485 ns, end := channelz.GetServerSockets(ss[0].ID, startID, c.max)
486 if len(ns) != c.length || end != c.end {
487 return false, fmt.Errorf("GetServerSockets(%d) = %+v (len of which: %d), end: %+v, want len(GetServerSockets(%d)) = %d, end: %+v", c.start, ns, len(ns), end, c.start, c.length, c.end)
488 }
489
490 svrID = ss[0].ID
491 return true, nil
492 }); err != nil {
493 t.Fatal(err)
494 }
495
496 for _, cc := range ccs {
497 cc.Close()
498 }
499
500 if err := verifyResultWithDelay(func() (bool, error) {
501 ns, _ := channelz.GetServerSockets(svrID, c.start, c.max)
502 if len(ns) != 0 {
503 return false, fmt.Errorf("there should be %d normal sockets not %d", 0, len(ns))
504 }
505 return true, nil
506 }); err != nil {
507 t.Fatal(err)
508 }
509 te.tearDown()
510 }
511 }
512
513 func (s) TestCZServerListenSocketDeletion(t *testing.T) {
514 s := grpc.NewServer()
515 lis, err := net.Listen("tcp", "localhost:0")
516 if err != nil {
517 t.Fatalf("failed to listen: %v", err)
518 }
519 go s.Serve(lis)
520 if err := verifyResultWithDelay(func() (bool, error) {
521 ss, _ := channelz.GetServers(0, 0)
522 if len(ss) != 1 {
523 return false, fmt.Errorf("there should only be one server, not %d", len(ss))
524 }
525 skts := ss[0].ListenSockets()
526 if len(skts) != 1 {
527 return false, fmt.Errorf("there should only be one server listen socket, not %v", skts)
528 }
529 return true, nil
530 }); err != nil {
531 t.Fatal(err)
532 }
533
534 lis.Close()
535 if err := verifyResultWithDelay(func() (bool, error) {
536 ss, _ := channelz.GetServers(0, 0)
537 if len(ss) != 1 {
538 return false, fmt.Errorf("there should be 1 server, not %d", len(ss))
539 }
540 skts := ss[0].ListenSockets()
541 if len(skts) != 0 {
542 return false, fmt.Errorf("there should only be %d server listen socket, not %v", 0, skts)
543 }
544 return true, nil
545 }); err != nil {
546 t.Fatal(err)
547 }
548 s.Stop()
549 }
550
551 func (s) TestCZRecusivelyDeletionOfEntry(t *testing.T) {
552
553
554
555
556
557
558
559
560 topChan := channelz.RegisterChannel(nil, "")
561 subChan1 := channelz.RegisterSubChannel(topChan, "")
562 subChan2 := channelz.RegisterSubChannel(topChan, "")
563 skt1 := channelz.RegisterSocket(&channelz.Socket{SocketType: channelz.SocketTypeNormal, Parent: subChan1})
564 skt2 := channelz.RegisterSocket(&channelz.Socket{SocketType: channelz.SocketTypeNormal, Parent: subChan1})
565
566 tcs, _ := channelz.GetTopChannels(0, 0)
567 if tcs == nil || len(tcs) != 1 {
568 t.Fatalf("There should be one TopChannel entry")
569 }
570 if len(tcs[0].SubChans()) != 2 {
571 t.Fatalf("There should be two SubChannel entries")
572 }
573 sc := channelz.GetSubChannel(subChan1.ID)
574 if sc == nil || len(sc.Sockets()) != 2 {
575 t.Fatalf("There should be two Socket entries")
576 }
577
578 channelz.RemoveEntry(topChan.ID)
579 tcs, _ = channelz.GetTopChannels(0, 0)
580 if tcs == nil || len(tcs) != 1 {
581 t.Fatalf("There should be one TopChannel entry")
582 }
583
584 channelz.RemoveEntry(subChan1.ID)
585 channelz.RemoveEntry(subChan2.ID)
586 tcs, _ = channelz.GetTopChannels(0, 0)
587 if tcs == nil || len(tcs) != 1 {
588 t.Fatalf("There should be one TopChannel entry")
589 }
590 if len(tcs[0].SubChans()) != 1 {
591 t.Fatalf("There should be one SubChannel entry")
592 }
593
594 channelz.RemoveEntry(skt1.ID)
595 channelz.RemoveEntry(skt2.ID)
596 tcs, _ = channelz.GetTopChannels(0, 0)
597 if tcs != nil {
598 t.Fatalf("There should be no TopChannel entry")
599 }
600 }
601
602 func (s) TestCZChannelMetrics(t *testing.T) {
603 e := tcpClearRREnv
604 num := 3
605 te := newTest(t, e)
606 te.maxClientSendMsgSize = newInt(8)
607 var svrAddrs []resolver.Address
608 te.startServers(&testServer{security: e.security}, num)
609 r := manual.NewBuilderWithScheme("whatever")
610 for _, a := range te.srvAddrs {
611 svrAddrs = append(svrAddrs, resolver.Address{Addr: a})
612 }
613 r.InitialState(resolver.State{Addresses: svrAddrs})
614 te.resolverScheme = r.Scheme()
615 cc := te.clientConn(grpc.WithResolvers(r))
616 defer te.tearDown()
617 tc := testgrpc.NewTestServiceClient(cc)
618 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
619 defer cancel()
620 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
621 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
622 }
623
624 const smallSize = 1
625 const largeSize = 8
626
627 largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize)
628 if err != nil {
629 t.Fatal(err)
630 }
631 req := &testpb.SimpleRequest{
632 ResponseType: testpb.PayloadType_COMPRESSABLE,
633 ResponseSize: int32(smallSize),
634 Payload: largePayload,
635 }
636
637 if _, err := tc.UnaryCall(ctx, req); err == nil || status.Code(err) != codes.ResourceExhausted {
638 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
639 }
640
641 stream, err := tc.FullDuplexCall(ctx)
642 if err != nil {
643 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
644 }
645 defer stream.CloseSend()
646
647
648 if err := verifyResultWithDelay(func() (bool, error) {
649 tcs, _ := channelz.GetTopChannels(0, 0)
650 if len(tcs) != 1 {
651 return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
652 }
653 subChans := tcs[0].SubChans()
654 if len(subChans) != num {
655 return false, fmt.Errorf("there should be %d subchannel not %d", num, len(subChans))
656 }
657 var cst, csu, cf int64
658 for k := range subChans {
659 sc := channelz.GetSubChannel(k)
660 if sc == nil {
661 return false, fmt.Errorf("got <nil> subchannel")
662 }
663 cst += sc.ChannelMetrics.CallsStarted.Load()
664 csu += sc.ChannelMetrics.CallsSucceeded.Load()
665 cf += sc.ChannelMetrics.CallsFailed.Load()
666 }
667 if cst != 3 {
668 return false, fmt.Errorf("there should be 3 CallsStarted not %d", cst)
669 }
670 if csu != 1 {
671 return false, fmt.Errorf("there should be 1 CallsSucceeded not %d", csu)
672 }
673 if cf != 1 {
674 return false, fmt.Errorf("there should be 1 CallsFailed not %d", cf)
675 }
676 if got := tcs[0].ChannelMetrics.CallsStarted.Load(); got != 3 {
677 return false, fmt.Errorf("there should be 3 CallsStarted not %d", got)
678 }
679 if got := tcs[0].ChannelMetrics.CallsSucceeded.Load(); got != 1 {
680 return false, fmt.Errorf("there should be 1 CallsSucceeded not %d", got)
681 }
682 if got := tcs[0].ChannelMetrics.CallsFailed.Load(); got != 1 {
683 return false, fmt.Errorf("there should be 1 CallsFailed not %d", got)
684 }
685 return true, nil
686 }); err != nil {
687 t.Fatal(err)
688 }
689 }
690
691 func (s) TestCZServerMetrics(t *testing.T) {
692 e := tcpClearRREnv
693 te := newTest(t, e)
694 te.maxServerReceiveMsgSize = newInt(8)
695 te.startServer(&testServer{security: e.security})
696 defer te.tearDown()
697 cc := te.clientConn()
698 tc := testgrpc.NewTestServiceClient(cc)
699 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
700 defer cancel()
701 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
702 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
703 }
704
705 const smallSize = 1
706 const largeSize = 8
707
708 largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize)
709 if err != nil {
710 t.Fatal(err)
711 }
712 req := &testpb.SimpleRequest{
713 ResponseType: testpb.PayloadType_COMPRESSABLE,
714 ResponseSize: int32(smallSize),
715 Payload: largePayload,
716 }
717 if _, err := tc.UnaryCall(ctx, req); err == nil || status.Code(err) != codes.ResourceExhausted {
718 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
719 }
720
721 stream, err := tc.FullDuplexCall(ctx)
722 if err != nil {
723 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
724 }
725 defer stream.CloseSend()
726
727 if err := verifyResultWithDelay(func() (bool, error) {
728 ss, _ := channelz.GetServers(0, 0)
729 if len(ss) != 1 {
730 return false, fmt.Errorf("there should only be one server, not %d", len(ss))
731 }
732 if cs := ss[0].ServerMetrics.CallsStarted.Load(); cs != 3 {
733 return false, fmt.Errorf("there should be 3 CallsStarted not %d", cs)
734 }
735 if cs := ss[0].ServerMetrics.CallsSucceeded.Load(); cs != 1 {
736 return false, fmt.Errorf("there should be 1 CallsSucceeded not %d", cs)
737 }
738 if cf := ss[0].ServerMetrics.CallsFailed.Load(); cf != 1 {
739 return false, fmt.Errorf("there should be 1 CallsFailed not %d", cf)
740 }
741 return true, nil
742 }); err != nil {
743 t.Fatal(err)
744 }
745 }
746
747 type testServiceClientWrapper struct {
748 testgrpc.TestServiceClient
749 mu sync.RWMutex
750 streamsCreated int
751 }
752
753 func (t *testServiceClientWrapper) getCurrentStreamID() uint32 {
754 t.mu.RLock()
755 defer t.mu.RUnlock()
756 return uint32(2*t.streamsCreated - 1)
757 }
758
759 func (t *testServiceClientWrapper) EmptyCall(ctx context.Context, in *testpb.Empty, opts ...grpc.CallOption) (*testpb.Empty, error) {
760 t.mu.Lock()
761 defer t.mu.Unlock()
762 t.streamsCreated++
763 return t.TestServiceClient.EmptyCall(ctx, in, opts...)
764 }
765
766 func (t *testServiceClientWrapper) UnaryCall(ctx context.Context, in *testpb.SimpleRequest, opts ...grpc.CallOption) (*testpb.SimpleResponse, error) {
767 t.mu.Lock()
768 defer t.mu.Unlock()
769 t.streamsCreated++
770 return t.TestServiceClient.UnaryCall(ctx, in, opts...)
771 }
772
773 func (t *testServiceClientWrapper) StreamingOutputCall(ctx context.Context, in *testpb.StreamingOutputCallRequest, opts ...grpc.CallOption) (testgrpc.TestService_StreamingOutputCallClient, error) {
774 t.mu.Lock()
775 defer t.mu.Unlock()
776 t.streamsCreated++
777 return t.TestServiceClient.StreamingOutputCall(ctx, in, opts...)
778 }
779
780 func (t *testServiceClientWrapper) StreamingInputCall(ctx context.Context, opts ...grpc.CallOption) (testgrpc.TestService_StreamingInputCallClient, error) {
781 t.mu.Lock()
782 defer t.mu.Unlock()
783 t.streamsCreated++
784 return t.TestServiceClient.StreamingInputCall(ctx, opts...)
785 }
786
787 func (t *testServiceClientWrapper) FullDuplexCall(ctx context.Context, opts ...grpc.CallOption) (testgrpc.TestService_FullDuplexCallClient, error) {
788 t.mu.Lock()
789 defer t.mu.Unlock()
790 t.streamsCreated++
791 return t.TestServiceClient.FullDuplexCall(ctx, opts...)
792 }
793
794 func (t *testServiceClientWrapper) HalfDuplexCall(ctx context.Context, opts ...grpc.CallOption) (testgrpc.TestService_HalfDuplexCallClient, error) {
795 t.mu.Lock()
796 defer t.mu.Unlock()
797 t.streamsCreated++
798 return t.TestServiceClient.HalfDuplexCall(ctx, opts...)
799 }
800
801 func doSuccessfulUnaryCall(tc testgrpc.TestServiceClient, t *testing.T) {
802 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
803 defer cancel()
804 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
805 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
806 }
807 }
808
809 func doStreamingInputCallWithLargePayload(tc testgrpc.TestServiceClient, t *testing.T) {
810 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
811 defer cancel()
812 s, err := tc.StreamingInputCall(ctx)
813 if err != nil {
814 t.Fatalf("TestService/StreamingInputCall(_) = _, %v, want <nil>", err)
815 }
816 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 10000)
817 if err != nil {
818 t.Fatal(err)
819 }
820 s.Send(&testpb.StreamingInputCallRequest{Payload: payload})
821 }
822
823 func doServerSideFailedUnaryCall(tc testgrpc.TestServiceClient, t *testing.T) {
824 const smallSize = 1
825 const largeSize = 2000
826
827 largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize)
828 if err != nil {
829 t.Fatal(err)
830 }
831 req := &testpb.SimpleRequest{
832 ResponseType: testpb.PayloadType_COMPRESSABLE,
833 ResponseSize: int32(smallSize),
834 Payload: largePayload,
835 }
836 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
837 defer cancel()
838 if _, err := tc.UnaryCall(ctx, req); err == nil || status.Code(err) != codes.ResourceExhausted {
839 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
840 }
841 }
842
843 func doClientSideInitiatedFailedStream(tc testgrpc.TestServiceClient, t *testing.T) {
844 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
845 stream, err := tc.FullDuplexCall(ctx)
846 if err != nil {
847 t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want <nil>", err)
848 }
849
850 const smallSize = 1
851 smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize)
852 if err != nil {
853 t.Fatal(err)
854 }
855
856 sreq := &testpb.StreamingOutputCallRequest{
857 ResponseType: testpb.PayloadType_COMPRESSABLE,
858 ResponseParameters: []*testpb.ResponseParameters{
859 {Size: smallSize},
860 },
861 Payload: smallPayload,
862 }
863
864 if err := stream.Send(sreq); err != nil {
865 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
866 }
867 if _, err := stream.Recv(); err != nil {
868 t.Fatalf("%v.Recv() = %v, want <nil>", stream, err)
869 }
870
871
872 cancel()
873 }
874
875
876 func doServerSideInitiatedFailedStreamWithRSTStream(tc testgrpc.TestServiceClient, t *testing.T, l *listenerWrapper) {
877 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
878 defer cancel()
879 stream, err := tc.FullDuplexCall(ctx)
880 if err != nil {
881 t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want <nil>", err)
882 }
883
884 const smallSize = 1
885 smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize)
886 if err != nil {
887 t.Fatal(err)
888 }
889
890 sreq := &testpb.StreamingOutputCallRequest{
891 ResponseType: testpb.PayloadType_COMPRESSABLE,
892 ResponseParameters: []*testpb.ResponseParameters{
893 {Size: smallSize},
894 },
895 Payload: smallPayload,
896 }
897
898 if err := stream.Send(sreq); err != nil {
899 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
900 }
901 if _, err := stream.Recv(); err != nil {
902 t.Fatalf("%v.Recv() = %v, want <nil>", stream, err)
903 }
904
905 rcw := l.getLastConn()
906
907 if rcw != nil {
908 rcw.writeRSTStream(tc.(*testServiceClientWrapper).getCurrentStreamID(), http2.ErrCodeCancel)
909 }
910 if _, err := stream.Recv(); err == nil {
911 t.Fatalf("%v.Recv() = %v, want <non-nil>", stream, err)
912 }
913 }
914
915
916 func doServerSideInitiatedFailedStreamWithGoAway(ctx context.Context, tc testgrpc.TestServiceClient, t *testing.T, l *listenerWrapper) {
917
918
919 s, err := tc.FullDuplexCall(ctx)
920 if err != nil {
921 t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want <nil>", err)
922 }
923 if err := s.Send(&testpb.StreamingOutputCallRequest{ResponseParameters: []*testpb.ResponseParameters{
924 {
925 Size: 1,
926 },
927 }}); err != nil {
928 t.Fatalf("s.Send() failed with error: %v", err)
929 }
930 if _, err := s.Recv(); err != nil {
931 t.Fatalf("s.Recv() failed with error: %v", err)
932 }
933
934 s, err = tc.FullDuplexCall(ctx)
935 if err != nil {
936 t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want <nil>", err)
937 }
938 if err := s.Send(&testpb.StreamingOutputCallRequest{ResponseParameters: []*testpb.ResponseParameters{
939 {
940 Size: 1,
941 },
942 }}); err != nil {
943 t.Fatalf("s.Send() failed with error: %v", err)
944 }
945 if _, err := s.Recv(); err != nil {
946 t.Fatalf("s.Recv() failed with error: %v", err)
947 }
948
949 rcw := l.getLastConn()
950 if rcw != nil {
951 rcw.writeGoAway(tc.(*testServiceClientWrapper).getCurrentStreamID()-2, http2.ErrCodeCancel, []byte{})
952 }
953 if _, err := s.Recv(); err == nil {
954 t.Fatalf("%v.Recv() = %v, want <non-nil>", s, err)
955 }
956 }
957
958 func (s) TestCZClientSocketMetricsStreamsAndMessagesCount(t *testing.T) {
959 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
960 defer cancel()
961
962 e := tcpClearRREnv
963 te := newTest(t, e)
964 te.maxServerReceiveMsgSize = newInt(20)
965 te.maxClientReceiveMsgSize = newInt(20)
966 rcw := te.startServerWithConnControl(&testServer{security: e.security})
967 defer te.tearDown()
968 cc := te.clientConn()
969 tc := &testServiceClientWrapper{TestServiceClient: testgrpc.NewTestServiceClient(cc)}
970
971 doSuccessfulUnaryCall(tc, t)
972 var scID, skID int64
973 if err := verifyResultWithDelay(func() (bool, error) {
974 tchan, _ := channelz.GetTopChannels(0, 0)
975 if len(tchan) != 1 {
976 return false, fmt.Errorf("there should only be one top channel, not %d", len(tchan))
977 }
978 subChans := tchan[0].SubChans()
979 if len(subChans) != 1 {
980 return false, fmt.Errorf("there should only be one subchannel under top channel %d, not %d", tchan[0].ID, len(subChans))
981 }
982
983 for scID = range subChans {
984 break
985 }
986 sc := channelz.GetSubChannel(scID)
987 if sc == nil {
988 return false, fmt.Errorf("there should only be one socket under subchannel %d, not 0", scID)
989 }
990 skts := sc.Sockets()
991 if len(skts) != 1 {
992 return false, fmt.Errorf("there should only be one socket under subchannel %d, not %d", sc.ID, len(skts))
993 }
994 for skID = range skts {
995 break
996 }
997 skt := channelz.GetSocket(skID)
998 sktData := &skt.SocketMetrics
999 if sktData.StreamsStarted.Load() != 1 || sktData.StreamsSucceeded.Load() != 1 || sktData.MessagesSent.Load() != 1 || sktData.MessagesReceived.Load() != 1 {
1000 return false, fmt.Errorf("channelz.GetSocket(%d), want (StreamsStarted.Load(), StreamsSucceeded.Load(), MessagesSent.Load(), MessagesReceived.Load()) = (1, 1, 1, 1), got (%d, %d, %d, %d)", skt.ID, sktData.StreamsStarted.Load(), sktData.StreamsSucceeded.Load(), sktData.MessagesSent.Load(), sktData.MessagesReceived.Load())
1001 }
1002 return true, nil
1003 }); err != nil {
1004 t.Fatal(err)
1005 }
1006
1007 doServerSideFailedUnaryCall(tc, t)
1008 if err := verifyResultWithDelay(func() (bool, error) {
1009 skt := channelz.GetSocket(skID)
1010 sktData := &skt.SocketMetrics
1011 if sktData.StreamsStarted.Load() != 2 || sktData.StreamsSucceeded.Load() != 2 || sktData.MessagesSent.Load() != 2 || sktData.MessagesReceived.Load() != 1 {
1012 return false, fmt.Errorf("channelz.GetSocket(%d), want (StreamsStarted.Load(), StreamsSucceeded.Load(), MessagesSent.Load(), MessagesReceived.Load()) = (2, 2, 2, 1), got (%d, %d, %d, %d)", skt.ID, sktData.StreamsStarted.Load(), sktData.StreamsSucceeded.Load(), sktData.MessagesSent.Load(), sktData.MessagesReceived.Load())
1013 }
1014 return true, nil
1015 }); err != nil {
1016 t.Fatal(err)
1017 }
1018
1019 doClientSideInitiatedFailedStream(tc, t)
1020 if err := verifyResultWithDelay(func() (bool, error) {
1021 skt := channelz.GetSocket(skID)
1022 sktData := &skt.SocketMetrics
1023 if sktData.StreamsStarted.Load() != 3 || sktData.StreamsSucceeded.Load() != 2 || sktData.StreamsFailed.Load() != 1 || sktData.MessagesSent.Load() != 3 || sktData.MessagesReceived.Load() != 2 {
1024 return false, fmt.Errorf("channelz.GetSocket(%d), want (StreamsStarted.Load(), StreamsSucceeded.Load(), StreamsFailed.Load(), MessagesSent.Load(), MessagesReceived.Load()) = (3, 2, 1, 3, 2), got (%d, %d, %d, %d, %d)", skt.ID, sktData.StreamsStarted.Load(), sktData.StreamsSucceeded.Load(), sktData.StreamsFailed.Load(), sktData.MessagesSent.Load(), sktData.MessagesReceived.Load())
1025 }
1026 return true, nil
1027 }); err != nil {
1028 t.Fatal(err)
1029 }
1030
1031 doServerSideInitiatedFailedStreamWithRSTStream(tc, t, rcw)
1032 if err := verifyResultWithDelay(func() (bool, error) {
1033 skt := channelz.GetSocket(skID)
1034 sktData := &skt.SocketMetrics
1035 if sktData.StreamsStarted.Load() != 4 || sktData.StreamsSucceeded.Load() != 2 || sktData.StreamsFailed.Load() != 2 || sktData.MessagesSent.Load() != 4 || sktData.MessagesReceived.Load() != 3 {
1036 return false, fmt.Errorf("channelz.GetSocket(%d), want (StreamsStarted.Load(), StreamsSucceeded.Load(), StreamsFailed.Load(), MessagesSent.Load(), MessagesReceived.Load()) = (4, 2, 2, 4, 3), got (%d, %d, %d, %d, %d)", skt.ID, sktData.StreamsStarted.Load(), sktData.StreamsSucceeded.Load(), sktData.StreamsFailed.Load(), sktData.MessagesSent.Load(), sktData.MessagesReceived.Load())
1037 }
1038 return true, nil
1039 }); err != nil {
1040 t.Fatal(err)
1041 }
1042
1043 doServerSideInitiatedFailedStreamWithGoAway(ctx, tc, t, rcw)
1044 if err := verifyResultWithDelay(func() (bool, error) {
1045 skt := channelz.GetSocket(skID)
1046 sktData := &skt.SocketMetrics
1047 if sktData.StreamsStarted.Load() != 6 || sktData.StreamsSucceeded.Load() != 2 || sktData.StreamsFailed.Load() != 3 || sktData.MessagesSent.Load() != 6 || sktData.MessagesReceived.Load() != 5 {
1048 return false, fmt.Errorf("channelz.GetSocket(%d), want (StreamsStarted.Load(), StreamsSucceeded.Load(), StreamsFailed.Load(), MessagesSent.Load(), MessagesReceived.Load()) = (6, 2, 3, 6, 5), got (%d, %d, %d, %d, %d)", skt.ID, sktData.StreamsStarted.Load(), sktData.StreamsSucceeded.Load(), sktData.StreamsFailed.Load(), sktData.MessagesSent.Load(), sktData.MessagesReceived.Load())
1049 }
1050 return true, nil
1051 }); err != nil {
1052 t.Fatal(err)
1053 }
1054 }
1055
1056
1057
1058
1059
1060
1061 func (s) TestCZClientAndServerSocketMetricsStreamsCountFlowControlRSTStream(t *testing.T) {
1062 e := tcpClearRREnv
1063 te := newTest(t, e)
1064 te.serverInitialWindowSize = 65536
1065
1066
1067 te.serverInitialConnWindowSize = 65536 * 2
1068 ts := &stubserver.StubServer{FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
1069 stream.Send(&testpb.StreamingOutputCallResponse{})
1070 <-stream.Context().Done()
1071 return status.Errorf(codes.DeadlineExceeded, "deadline exceeded or cancelled")
1072 }}
1073 te.startServer(ts)
1074 defer te.tearDown()
1075 cc, dw := te.clientConnWithConnControl()
1076 tc := &testServiceClientWrapper{TestServiceClient: testgrpc.NewTestServiceClient(cc)}
1077
1078 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
1079 stream, err := tc.FullDuplexCall(ctx)
1080 if err != nil {
1081 t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want <nil>", err)
1082 }
1083 if _, err := stream.Recv(); err != nil {
1084 t.Fatalf("stream.Recv() = %v, want nil", err)
1085 }
1086 go func() {
1087 payload := make([]byte, 16384)
1088 for i := 0; i < 6; i++ {
1089 dw.getRawConnWrapper().writeRawFrame(http2.FrameData, 0, tc.getCurrentStreamID(), payload)
1090 }
1091 }()
1092 if _, err := stream.Recv(); status.Code(err) != codes.ResourceExhausted {
1093 t.Fatalf("stream.Recv() = %v, want error code: %v", err, codes.ResourceExhausted)
1094 }
1095 cancel()
1096
1097 if err := verifyResultWithDelay(func() (bool, error) {
1098 tchan, _ := channelz.GetTopChannels(0, 0)
1099 if len(tchan) != 1 {
1100 return false, fmt.Errorf("there should only be one top channel, not %d", len(tchan))
1101 }
1102 subChans := tchan[0].SubChans()
1103 if len(subChans) != 1 {
1104 return false, fmt.Errorf("there should only be one subchannel under top channel %d, not %d", tchan[0].ID, len(subChans))
1105 }
1106 var id int64
1107 for id = range subChans {
1108 break
1109 }
1110 sc := channelz.GetSubChannel(id)
1111 if sc == nil {
1112 return false, fmt.Errorf("there should only be one socket under subchannel %d, not 0", id)
1113 }
1114 skts := sc.Sockets()
1115 if len(skts) != 1 {
1116 return false, fmt.Errorf("there should only be one socket under subchannel %d, not %d", sc.ID, len(skts))
1117 }
1118 for id = range skts {
1119 break
1120 }
1121 skt := channelz.GetSocket(id)
1122 sktData := &skt.SocketMetrics
1123 if sktData.StreamsStarted.Load() != 1 || sktData.StreamsSucceeded.Load() != 0 || sktData.StreamsFailed.Load() != 1 {
1124 return false, fmt.Errorf("channelz.GetSocket(%d), want (StreamsStarted.Load(), StreamsSucceeded.Load(), StreamsFailed.Load()) = (1, 0, 1), got (%d, %d, %d)", skt.ID, sktData.StreamsStarted.Load(), sktData.StreamsSucceeded.Load(), sktData.StreamsFailed.Load())
1125 }
1126 ss, _ := channelz.GetServers(0, 0)
1127 if len(ss) != 1 {
1128 return false, fmt.Errorf("there should only be one server, not %d", len(ss))
1129 }
1130
1131 ns, _ := channelz.GetServerSockets(ss[0].ID, 0, 0)
1132 if len(ns) != 1 {
1133 return false, fmt.Errorf("there should be one server normal socket, not %d", len(ns))
1134 }
1135 sktData = &ns[0].SocketMetrics
1136 if sktData.StreamsStarted.Load() != 1 || sktData.StreamsSucceeded.Load() != 0 || sktData.StreamsFailed.Load() != 1 {
1137 return false, fmt.Errorf("server socket metric with ID %d, want (StreamsStarted.Load(), StreamsSucceeded.Load(), StreamsFailed.Load()) = (1, 0, 1), got (%d, %d, %d)", ns[0].ID, sktData.StreamsStarted.Load(), sktData.StreamsSucceeded.Load(), sktData.StreamsFailed.Load())
1138 }
1139 return true, nil
1140 }); err != nil {
1141 t.Fatal(err)
1142 }
1143 }
1144
1145 func (s) TestCZClientAndServerSocketMetricsFlowControl(t *testing.T) {
1146 e := tcpClearRREnv
1147 te := newTest(t, e)
1148
1149 te.serverInitialWindowSize = 65536
1150 te.serverInitialConnWindowSize = 65536
1151 te.clientInitialWindowSize = 65536
1152 te.clientInitialConnWindowSize = 65536
1153 te.startServer(&testServer{security: e.security})
1154 defer te.tearDown()
1155 cc := te.clientConn()
1156 tc := testgrpc.NewTestServiceClient(cc)
1157
1158 for i := 0; i < 10; i++ {
1159 doSuccessfulUnaryCall(tc, t)
1160 }
1161
1162 var cliSktID, svrSktID int64
1163 if err := verifyResultWithDelay(func() (bool, error) {
1164 tchan, _ := channelz.GetTopChannels(0, 0)
1165 if len(tchan) != 1 {
1166 return false, fmt.Errorf("there should only be one top channel, not %d", len(tchan))
1167 }
1168 subChans := tchan[0].SubChans()
1169 if len(subChans) != 1 {
1170 return false, fmt.Errorf("there should only be one subchannel under top channel %d, not %d", tchan[0].ID, len(subChans))
1171 }
1172 var id int64
1173 for id = range subChans {
1174 break
1175 }
1176 sc := channelz.GetSubChannel(id)
1177 if sc == nil {
1178 return false, fmt.Errorf("there should only be one socket under subchannel %d, not 0", id)
1179 }
1180 skts := sc.Sockets()
1181 if len(skts) != 1 {
1182 return false, fmt.Errorf("there should only be one socket under subchannel %d, not %d", sc.ID, len(skts))
1183 }
1184 for id = range skts {
1185 break
1186 }
1187 skt := channelz.GetSocket(id)
1188 sktData := skt.EphemeralMetrics()
1189
1190 if sktData.LocalFlowControlWindow != 65486 || sktData.RemoteFlowControlWindow != 65486 {
1191 return false, fmt.Errorf("client: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (65536, 65486), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow)
1192 }
1193 ss, _ := channelz.GetServers(0, 0)
1194 if len(ss) != 1 {
1195 return false, fmt.Errorf("there should only be one server, not %d", len(ss))
1196 }
1197 ns, _ := channelz.GetServerSockets(ss[0].ID, 0, 0)
1198 sktData = ns[0].EphemeralMetrics()
1199 if sktData.LocalFlowControlWindow != 65486 || sktData.RemoteFlowControlWindow != 65486 {
1200 return false, fmt.Errorf("server: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (65536, 65486), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow)
1201 }
1202 cliSktID, svrSktID = id, ss[0].ID
1203 return true, nil
1204 }); err != nil {
1205 t.Fatal(err)
1206 }
1207
1208 doStreamingInputCallWithLargePayload(tc, t)
1209
1210 if err := verifyResultWithDelay(func() (bool, error) {
1211 skt := channelz.GetSocket(cliSktID)
1212 sktData := skt.EphemeralMetrics()
1213
1214
1215 if sktData.LocalFlowControlWindow != 65486 || sktData.RemoteFlowControlWindow != 55475 {
1216 return false, fmt.Errorf("client: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (65486, 55475), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow)
1217 }
1218 ss, _ := channelz.GetServers(0, 0)
1219 if len(ss) != 1 {
1220 return false, fmt.Errorf("there should only be one server, not %d", len(ss))
1221 }
1222 ns, _ := channelz.GetServerSockets(svrSktID, 0, 0)
1223 sktData = ns[0].EphemeralMetrics()
1224 if sktData.LocalFlowControlWindow != 55475 || sktData.RemoteFlowControlWindow != 65486 {
1225 return false, fmt.Errorf("server: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (55475, 65486), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow)
1226 }
1227 return true, nil
1228 }); err != nil {
1229 t.Fatal(err)
1230 }
1231
1232
1233
1234 doStreamingInputCallWithLargePayload(tc, t)
1235 if err := verifyResultWithDelay(func() (bool, error) {
1236 skt := channelz.GetSocket(cliSktID)
1237 sktData := skt.EphemeralMetrics()
1238
1239
1240 if sktData.LocalFlowControlWindow != 65486 || sktData.RemoteFlowControlWindow != 65536 {
1241 return false, fmt.Errorf("client: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (65486, 65536), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow)
1242 }
1243 ss, _ := channelz.GetServers(0, 0)
1244 if len(ss) != 1 {
1245 return false, fmt.Errorf("there should only be one server, not %d", len(ss))
1246 }
1247 ns, _ := channelz.GetServerSockets(svrSktID, 0, 0)
1248 sktData = ns[0].EphemeralMetrics()
1249 if sktData.LocalFlowControlWindow != 65536 || sktData.RemoteFlowControlWindow != 65486 {
1250 return false, fmt.Errorf("server: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (65536, 65486), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow)
1251 }
1252 return true, nil
1253 }); err != nil {
1254 t.Fatal(err)
1255 }
1256 }
1257
1258 func (s) TestCZClientSocketMetricsKeepAlive(t *testing.T) {
1259 const keepaliveRate = 50 * time.Millisecond
1260 defer func(t time.Duration) { internal.KeepaliveMinPingTime = t }(internal.KeepaliveMinPingTime)
1261 internal.KeepaliveMinPingTime = keepaliveRate
1262 e := tcpClearRREnv
1263 te := newTest(t, e)
1264 te.customDialOptions = append(te.customDialOptions, grpc.WithKeepaliveParams(
1265 keepalive.ClientParameters{
1266 Time: keepaliveRate,
1267 Timeout: 500 * time.Millisecond,
1268 PermitWithoutStream: true,
1269 }))
1270 te.customServerOptions = append(te.customServerOptions, grpc.KeepaliveEnforcementPolicy(
1271 keepalive.EnforcementPolicy{
1272 MinTime: keepaliveRate,
1273 PermitWithoutStream: true,
1274 }))
1275 te.startServer(&testServer{security: e.security})
1276 cc := te.clientConn()
1277 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
1278 defer cancel()
1279 testutils.AwaitState(ctx, t, cc, connectivity.Ready)
1280 start := time.Now()
1281
1282 time.Sleep(2 * keepaliveRate)
1283 defer te.tearDown()
1284 if err := verifyResultWithDelay(func() (bool, error) {
1285 tchan, _ := channelz.GetTopChannels(0, 0)
1286 if len(tchan) != 1 {
1287 return false, fmt.Errorf("there should only be one top channel, not %d", len(tchan))
1288 }
1289 subChans := tchan[0].SubChans()
1290 if len(subChans) != 1 {
1291 return false, fmt.Errorf("there should only be one subchannel under top channel %d, not %d", tchan[0].ID, len(subChans))
1292 }
1293 var id int64
1294 for id = range subChans {
1295 break
1296 }
1297 sc := channelz.GetSubChannel(id)
1298 if sc == nil {
1299 return false, fmt.Errorf("there should only be one socket under subchannel %d, not 0", id)
1300 }
1301 skts := sc.Sockets()
1302 if len(skts) != 1 {
1303 return false, fmt.Errorf("there should only be one socket under subchannel %d, not %d", sc.ID, len(skts))
1304 }
1305 for id = range skts {
1306 break
1307 }
1308 skt := channelz.GetSocket(id)
1309 want := int64(time.Since(start) / keepaliveRate)
1310 if got := skt.SocketMetrics.KeepAlivesSent.Load(); got != want {
1311 return false, fmt.Errorf("there should be %v KeepAlives sent, not %d", want, got)
1312 }
1313 return true, nil
1314 }); err != nil {
1315 t.Fatal(err)
1316 }
1317 }
1318
1319 func (s) TestCZServerSocketMetricsStreamsAndMessagesCount(t *testing.T) {
1320 e := tcpClearRREnv
1321 te := newTest(t, e)
1322 te.maxServerReceiveMsgSize = newInt(20)
1323 te.maxClientReceiveMsgSize = newInt(20)
1324 te.startServer(&testServer{security: e.security})
1325 defer te.tearDown()
1326 cc, _ := te.clientConnWithConnControl()
1327 tc := &testServiceClientWrapper{TestServiceClient: testgrpc.NewTestServiceClient(cc)}
1328
1329 var svrID int64
1330 if err := verifyResultWithDelay(func() (bool, error) {
1331 ss, _ := channelz.GetServers(0, 0)
1332 if len(ss) != 1 {
1333 return false, fmt.Errorf("there should only be one server, not %d", len(ss))
1334 }
1335 svrID = ss[0].ID
1336 return true, nil
1337 }); err != nil {
1338 t.Fatal(err)
1339 }
1340
1341 doSuccessfulUnaryCall(tc, t)
1342 if err := verifyResultWithDelay(func() (bool, error) {
1343 ns, _ := channelz.GetServerSockets(svrID, 0, 0)
1344 sktData := &ns[0].SocketMetrics
1345 if sktData.StreamsStarted.Load() != 1 || sktData.StreamsSucceeded.Load() != 1 || sktData.StreamsFailed.Load() != 0 || sktData.MessagesSent.Load() != 1 || sktData.MessagesReceived.Load() != 1 {
1346 return false, fmt.Errorf("server socket metric with ID %d, want (StreamsStarted.Load(), StreamsSucceeded.Load(), MessagesSent.Load(), MessagesReceived.Load()) = (1, 1, 1, 1), got (%d, %d, %d, %d, %d)", ns[0].ID, sktData.StreamsStarted.Load(), sktData.StreamsSucceeded.Load(), sktData.StreamsFailed.Load(), sktData.MessagesSent.Load(), sktData.MessagesReceived.Load())
1347 }
1348 return true, nil
1349 }); err != nil {
1350 t.Fatal(err)
1351 }
1352
1353 doServerSideFailedUnaryCall(tc, t)
1354 if err := verifyResultWithDelay(func() (bool, error) {
1355 ns, _ := channelz.GetServerSockets(svrID, 0, 0)
1356 sktData := &ns[0].SocketMetrics
1357 if sktData.StreamsStarted.Load() != 2 || sktData.StreamsSucceeded.Load() != 2 || sktData.StreamsFailed.Load() != 0 || sktData.MessagesSent.Load() != 1 || sktData.MessagesReceived.Load() != 1 {
1358 return false, fmt.Errorf("server socket metric with ID %d, want (StreamsStarted.Load(), StreamsSucceeded.Load(), StreamsFailed.Load(), MessagesSent.Load(), MessagesReceived.Load()) = (2, 2, 0, 1, 1), got (%d, %d, %d, %d, %d)", ns[0].ID, sktData.StreamsStarted.Load(), sktData.StreamsSucceeded.Load(), sktData.StreamsFailed.Load(), sktData.MessagesSent.Load(), sktData.MessagesReceived.Load())
1359 }
1360 return true, nil
1361 }); err != nil {
1362 t.Fatal(err)
1363 }
1364
1365 doClientSideInitiatedFailedStream(tc, t)
1366 if err := verifyResultWithDelay(func() (bool, error) {
1367 ns, _ := channelz.GetServerSockets(svrID, 0, 0)
1368 sktData := &ns[0].SocketMetrics
1369 if sktData.StreamsStarted.Load() != 3 || sktData.StreamsSucceeded.Load() != 2 || sktData.StreamsFailed.Load() != 1 || sktData.MessagesSent.Load() != 2 || sktData.MessagesReceived.Load() != 2 {
1370 return false, fmt.Errorf("server socket metric with ID %d, want (StreamsStarted.Load(), StreamsSucceeded.Load(), StreamsFailed.Load(), MessagesSent.Load(), MessagesReceived.Load()) = (3, 2, 1, 2, 2), got (%d, %d, %d, %d, %d)", ns[0].ID, sktData.StreamsStarted.Load(), sktData.StreamsSucceeded.Load(), sktData.StreamsFailed.Load(), sktData.MessagesSent.Load(), sktData.MessagesReceived.Load())
1371 }
1372 return true, nil
1373 }); err != nil {
1374 t.Fatal(err)
1375 }
1376 }
1377
1378 func (s) TestCZServerSocketMetricsKeepAlive(t *testing.T) {
1379 defer func(t time.Duration) { internal.KeepaliveMinServerPingTime = t }(internal.KeepaliveMinServerPingTime)
1380 internal.KeepaliveMinServerPingTime = 50 * time.Millisecond
1381
1382 e := tcpClearRREnv
1383 te := newTest(t, e)
1384
1385
1386
1387
1388
1389
1390 kpOption := grpc.KeepaliveParams(keepalive.ServerParameters{
1391 Time: 50 * time.Millisecond,
1392 Timeout: 5 * time.Second,
1393 })
1394 te.customServerOptions = append(te.customServerOptions, kpOption)
1395 te.startServer(&testServer{security: e.security})
1396 defer te.tearDown()
1397 cc := te.clientConn()
1398
1399 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
1400 defer cancel()
1401 testutils.AwaitState(ctx, t, cc, connectivity.Ready)
1402
1403
1404 time.Sleep(255 * time.Millisecond)
1405
1406 ss, _ := channelz.GetServers(0, 0)
1407 if len(ss) != 1 {
1408 t.Fatalf("there should be one server, not %d", len(ss))
1409 }
1410 ns, _ := channelz.GetServerSockets(ss[0].ID, 0, 0)
1411 if len(ns) != 1 {
1412 t.Fatalf("there should be one server normal socket, not %d", len(ns))
1413 }
1414 const wantMin, wantMax = 3, 7
1415 if got := ns[0].SocketMetrics.KeepAlivesSent.Load(); got < wantMin || got > wantMax {
1416 t.Fatalf("got keepalivesCount: %v, want keepalivesCount: [%v,%v]", got, wantMin, wantMax)
1417 }
1418 }
1419
1420 var cipherSuites = []string{
1421 "TLS_RSA_WITH_RC4_128_SHA",
1422 "TLS_RSA_WITH_3DES_EDE_CBC_SHA",
1423 "TLS_RSA_WITH_AES_128_CBC_SHA",
1424 "TLS_RSA_WITH_AES_256_CBC_SHA",
1425 "TLS_RSA_WITH_AES_128_GCM_SHA256",
1426 "TLS_RSA_WITH_AES_256_GCM_SHA384",
1427 "TLS_ECDHE_ECDSA_WITH_RC4_128_SHA",
1428 "TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA",
1429 "TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA",
1430 "TLS_ECDHE_RSA_WITH_RC4_128_SHA",
1431 "TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA",
1432 "TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA",
1433 "TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA",
1434 "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256",
1435 "TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256",
1436 "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384",
1437 "TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384",
1438 "TLS_FALLBACK_SCSV",
1439 "TLS_RSA_WITH_AES_128_CBC_SHA256",
1440 "TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256",
1441 "TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256",
1442 "TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305",
1443 "TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305",
1444 "TLS_AES_128_GCM_SHA256",
1445 "TLS_AES_256_GCM_SHA384",
1446 "TLS_CHACHA20_POLY1305_SHA256",
1447 }
1448
1449 func (s) TestCZSocketGetSecurityValueTLS(t *testing.T) {
1450 e := tcpTLSRREnv
1451 te := newTest(t, e)
1452 te.startServer(&testServer{security: e.security})
1453 defer te.tearDown()
1454 te.clientConn()
1455 if err := verifyResultWithDelay(func() (bool, error) {
1456 tchan, _ := channelz.GetTopChannels(0, 0)
1457 if len(tchan) != 1 {
1458 return false, fmt.Errorf("there should only be one top channel, not %d", len(tchan))
1459 }
1460 subChans := tchan[0].SubChans()
1461 if len(subChans) != 1 {
1462 return false, fmt.Errorf("there should only be one subchannel under top channel %d, not %d", tchan[0].ID, len(subChans))
1463 }
1464 var id int64
1465 for id = range subChans {
1466 break
1467 }
1468 sc := channelz.GetSubChannel(id)
1469 if sc == nil {
1470 return false, fmt.Errorf("there should only be one socket under subchannel %d, not 0", id)
1471 }
1472 skts := sc.Sockets()
1473 if len(skts) != 1 {
1474 return false, fmt.Errorf("there should only be one socket under subchannel %d, not %d", sc.ID, len(skts))
1475 }
1476 for id = range skts {
1477 break
1478 }
1479 skt := channelz.GetSocket(id)
1480 cert, _ := tls.LoadX509KeyPair(testdata.Path("x509/server1_cert.pem"), testdata.Path("x509/server1_key.pem"))
1481 securityVal, ok := skt.Security.(*credentials.TLSChannelzSecurityValue)
1482 if !ok {
1483 return false, fmt.Errorf("the Security is of type: %T, want: *credentials.TLSChannelzSecurityValue", skt.Security)
1484 }
1485 if !cmp.Equal(securityVal.RemoteCertificate, cert.Certificate[0]) {
1486 return false, fmt.Errorf("Security.RemoteCertificate got: %v, want: %v", securityVal.RemoteCertificate, cert.Certificate[0])
1487 }
1488 for _, v := range cipherSuites {
1489 if v == securityVal.StandardName {
1490 return true, nil
1491 }
1492 }
1493 return false, fmt.Errorf("Security.StandardName got: %v, want it to be one of %v", securityVal.StandardName, cipherSuites)
1494 }); err != nil {
1495 t.Fatal(err)
1496 }
1497 }
1498
1499 func (s) TestCZChannelTraceCreationDeletion(t *testing.T) {
1500 e := tcpClearRREnv
1501
1502 e.balancer = ""
1503 te := newTest(t, e)
1504 r := manual.NewBuilderWithScheme("whatever")
1505 te.resolverScheme = r.Scheme()
1506 te.clientConn(grpc.WithResolvers(r))
1507 resolvedAddrs := []resolver.Address{{Addr: "127.0.0.1:0", ServerName: "grpclb.server"}}
1508 grpclbConfig := parseServiceConfig(t, r, `{"loadBalancingPolicy": "grpclb"}`)
1509 r.UpdateState(grpclbstate.Set(resolver.State{ServiceConfig: grpclbConfig}, &grpclbstate.State{BalancerAddresses: resolvedAddrs}))
1510 defer te.tearDown()
1511
1512 var nestedConn int64
1513 if err := verifyResultWithDelay(func() (bool, error) {
1514 tcs, _ := channelz.GetTopChannels(0, 0)
1515 if len(tcs) != 1 {
1516 return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
1517 }
1518 nestedChans := tcs[0].NestedChans()
1519 if len(nestedChans) != 1 {
1520 return false, fmt.Errorf("there should be one nested channel from grpclb, not %d", len(nestedChans))
1521 }
1522 for k := range nestedChans {
1523 nestedConn = k
1524 }
1525 trace := tcs[0].Trace()
1526 for _, e := range trace.Events {
1527 if e.RefID == nestedConn && e.RefType != channelz.RefChannel {
1528 return false, fmt.Errorf("nested channel trace event shoud have RefChannel as RefType")
1529 }
1530 }
1531 ncm := channelz.GetChannel(nestedConn)
1532 ncmTrace := ncm.Trace()
1533 if ncmTrace == nil {
1534 return false, fmt.Errorf("trace for nested channel should not be empty")
1535 }
1536 if len(ncmTrace.Events) == 0 {
1537 return false, fmt.Errorf("there should be at least one trace event for nested channel not 0")
1538 }
1539 pattern := `Channel created`
1540 if ok, _ := regexp.MatchString(pattern, ncmTrace.Events[0].Desc); !ok {
1541 return false, fmt.Errorf("the first trace event should be %q, not %q", pattern, ncmTrace.Events[0].Desc)
1542 }
1543 return true, nil
1544 }); err != nil {
1545 t.Fatal(err)
1546 }
1547
1548 r.UpdateState(resolver.State{
1549 Addresses: []resolver.Address{{Addr: "127.0.0.1:0"}},
1550 ServiceConfig: parseServiceConfig(t, r, `{"loadBalancingPolicy": "round_robin"}`),
1551 })
1552
1553
1554 if err := verifyResultWithDelay(func() (bool, error) {
1555 tcs, _ := channelz.GetTopChannels(0, 0)
1556 if len(tcs) != 1 {
1557 return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
1558 }
1559 nestedChans := tcs[0].NestedChans()
1560 if len(nestedChans) != 0 {
1561 return false, fmt.Errorf("there should be 0 nested channel from grpclb, not %d", len(nestedChans))
1562 }
1563 ncm := channelz.GetChannel(nestedConn)
1564 if ncm == nil {
1565 return false, fmt.Errorf("nested channel should still exist due to parent's trace reference")
1566 }
1567 trace := ncm.Trace()
1568 if trace == nil {
1569 return false, fmt.Errorf("trace for nested channel should not be empty")
1570 }
1571 if len(trace.Events) == 0 {
1572 return false, fmt.Errorf("there should be at least one trace event for nested channel not 0")
1573 }
1574 pattern := `Channel created`
1575 if ok, _ := regexp.MatchString(pattern, trace.Events[0].Desc); !ok {
1576 return false, fmt.Errorf("the first trace event should be %q, not %q", pattern, trace.Events[0].Desc)
1577 }
1578 return true, nil
1579 }); err != nil {
1580 t.Fatal(err)
1581 }
1582 }
1583
1584 func (s) TestCZSubChannelTraceCreationDeletion(t *testing.T) {
1585 e := tcpClearRREnv
1586 te := newTest(t, e)
1587 te.startServer(&testServer{security: e.security})
1588 r := manual.NewBuilderWithScheme("whatever")
1589 r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: te.srvAddr}}})
1590 te.resolverScheme = r.Scheme()
1591 te.clientConn(grpc.WithResolvers(r))
1592 defer te.tearDown()
1593 var subConn int64
1594
1595
1596 if err := verifyResultWithDelay(func() (bool, error) {
1597 tcs, _ := channelz.GetTopChannels(0, 0)
1598 if len(tcs) != 1 {
1599 return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
1600 }
1601 subChans := tcs[0].SubChans()
1602 if len(subChans) != 1 {
1603 return false, fmt.Errorf("there should be 1 subchannel not %d", len(subChans))
1604 }
1605 for k := range subChans {
1606 subConn = k
1607 }
1608 trace := tcs[0].Trace()
1609 for _, e := range trace.Events {
1610 if e.RefID == subConn && e.RefType != channelz.RefSubChannel {
1611 return false, fmt.Errorf("subchannel trace event shoud have RefType to be RefSubChannel")
1612 }
1613 }
1614 scm := channelz.GetSubChannel(subConn)
1615 if scm == nil {
1616 return false, fmt.Errorf("subChannel does not exist")
1617 }
1618 scTrace := scm.Trace()
1619 if scTrace == nil {
1620 return false, fmt.Errorf("trace for subChannel should not be empty")
1621 }
1622 if len(scTrace.Events) == 0 {
1623 return false, fmt.Errorf("there should be at least one trace event for subChannel not 0")
1624 }
1625 pattern := `Subchannel created`
1626 if ok, _ := regexp.MatchString(pattern, scTrace.Events[0].Desc); !ok {
1627 return false, fmt.Errorf("the first trace event should be %q, not %q", pattern, scTrace.Events[0].Desc)
1628 }
1629 return true, nil
1630 }); err != nil {
1631 t.Fatal(err)
1632 }
1633
1634 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
1635 defer cancel()
1636 testutils.AwaitState(ctx, t, te.cc, connectivity.Ready)
1637 r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "fake address"}}})
1638 testutils.AwaitNotState(ctx, t, te.cc, connectivity.Ready)
1639
1640 if err := verifyResultWithDelay(func() (bool, error) {
1641 tcs, _ := channelz.GetTopChannels(0, 0)
1642 if len(tcs) != 1 {
1643 return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
1644 }
1645 subChans := tcs[0].SubChans()
1646 if len(subChans) != 1 {
1647 return false, fmt.Errorf("there should be 1 subchannel not %d", len(subChans))
1648 }
1649 scm := channelz.GetSubChannel(subConn)
1650 if scm == nil {
1651 return false, fmt.Errorf("subChannel should still exist due to parent's trace reference")
1652 }
1653 trace := scm.Trace()
1654 if trace == nil {
1655 return false, fmt.Errorf("trace for SubChannel should not be empty")
1656 }
1657 if len(trace.Events) == 0 {
1658 return false, fmt.Errorf("there should be at least one trace event for subChannel not 0")
1659 }
1660
1661 pattern := `Subchannel deleted`
1662 desc := trace.Events[len(trace.Events)-1].Desc
1663 if ok, _ := regexp.MatchString(pattern, desc); !ok {
1664 return false, fmt.Errorf("the last trace event should be %q, not %q", pattern, desc)
1665 }
1666 return true, nil
1667 }); err != nil {
1668 t.Fatal(err)
1669 }
1670 }
1671
1672 func (s) TestCZChannelAddressResolutionChange(t *testing.T) {
1673 e := tcpClearRREnv
1674 e.balancer = ""
1675 te := newTest(t, e)
1676 te.startServer(&testServer{security: e.security})
1677 r := manual.NewBuilderWithScheme("whatever")
1678 addrs := []resolver.Address{{Addr: te.srvAddr}}
1679 r.InitialState(resolver.State{Addresses: addrs})
1680 te.resolverScheme = r.Scheme()
1681 te.clientConn(grpc.WithResolvers(r))
1682 defer te.tearDown()
1683 var cid int64
1684
1685
1686 if err := verifyResultWithDelay(func() (bool, error) {
1687 tcs, _ := channelz.GetTopChannels(0, 0)
1688 if len(tcs) != 1 {
1689 return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
1690 }
1691 cid = tcs[0].ID
1692 trace := tcs[0].Trace()
1693 for i := len(trace.Events) - 1; i >= 0; i-- {
1694 if strings.Contains(trace.Events[i].Desc, "resolver returned new addresses") {
1695 break
1696 }
1697 if i == 0 {
1698 return false, fmt.Errorf("events do not contain expected address resolution from empty address state. Got: %+v", trace.Events)
1699 }
1700 }
1701 return true, nil
1702 }); err != nil {
1703 t.Fatal(err)
1704 }
1705 r.UpdateState(resolver.State{
1706 Addresses: addrs,
1707 ServiceConfig: parseServiceConfig(t, r, `{"loadBalancingPolicy": "round_robin"}`),
1708 })
1709
1710 if err := verifyResultWithDelay(func() (bool, error) {
1711 cm := channelz.GetChannel(cid)
1712 trace := cm.Trace()
1713 for i := len(trace.Events) - 1; i >= 0; i-- {
1714 if strings.Contains(trace.Events[i].Desc, fmt.Sprintf("Channel switches to new LB policy %q", roundrobin.Name)) {
1715 break
1716 }
1717 if i == 0 {
1718 return false, fmt.Errorf("events do not contain expected address resolution change of LB policy")
1719 }
1720 }
1721 return true, nil
1722 }); err != nil {
1723 t.Fatal(err)
1724 }
1725
1726 newSC := parseServiceConfig(t, r, `{
1727 "methodConfig": [
1728 {
1729 "name": [
1730 {
1731 "service": "grpc.testing.TestService",
1732 "method": "EmptyCall"
1733 }
1734 ],
1735 "waitForReady": false,
1736 "timeout": ".001s"
1737 }
1738 ]
1739 }`)
1740 r.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: newSC})
1741
1742 if err := verifyResultWithDelay(func() (bool, error) {
1743 cm := channelz.GetChannel(cid)
1744
1745 var es []string
1746 trace := cm.Trace()
1747 for i := len(trace.Events) - 1; i >= 0; i-- {
1748 if strings.Contains(trace.Events[i].Desc, "service config updated") {
1749 break
1750 }
1751 es = append(es, trace.Events[i].Desc)
1752 if i == 0 {
1753 return false, fmt.Errorf("events do not contain expected address resolution of new service config\n Events:\n%v", strings.Join(es, "\n"))
1754 }
1755 }
1756 return true, nil
1757 }); err != nil {
1758 t.Fatal(err)
1759 }
1760
1761 r.UpdateState(resolver.State{Addresses: []resolver.Address{}, ServiceConfig: newSC})
1762
1763 if err := verifyResultWithDelay(func() (bool, error) {
1764 cm := channelz.GetChannel(cid)
1765 trace := cm.Trace()
1766 for i := len(trace.Events) - 1; i >= 0; i-- {
1767 if strings.Contains(trace.Events[i].Desc, "resolver returned an empty address list") {
1768 break
1769 }
1770 if i == 0 {
1771 return false, fmt.Errorf("events do not contain expected address resolution of empty address")
1772 }
1773 }
1774 return true, nil
1775 }); err != nil {
1776 t.Fatal(err)
1777 }
1778 }
1779
1780 func (s) TestCZSubChannelPickedNewAddress(t *testing.T) {
1781 e := tcpClearRREnv
1782 e.balancer = ""
1783 te := newTest(t, e)
1784 te.startServers(&testServer{security: e.security}, 3)
1785 r := manual.NewBuilderWithScheme("whatever")
1786 var svrAddrs []resolver.Address
1787 for _, a := range te.srvAddrs {
1788 svrAddrs = append(svrAddrs, resolver.Address{Addr: a})
1789 }
1790 r.InitialState(resolver.State{Addresses: svrAddrs})
1791 te.resolverScheme = r.Scheme()
1792 cc := te.clientConn(grpc.WithResolvers(r))
1793 defer te.tearDown()
1794 tc := testgrpc.NewTestServiceClient(cc)
1795
1796 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
1797 defer cancel()
1798 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
1799 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
1800 }
1801 te.srvs[0].Stop()
1802 te.srvs[1].Stop()
1803
1804
1805 done := make(chan struct{})
1806 defer close(done)
1807 go func() {
1808 for {
1809 tc.EmptyCall(ctx, &testpb.Empty{})
1810 select {
1811 case <-time.After(10 * time.Millisecond):
1812 case <-done:
1813 return
1814 }
1815 }
1816 }()
1817 if err := verifyResultWithDelay(func() (bool, error) {
1818 tcs, _ := channelz.GetTopChannels(0, 0)
1819 if len(tcs) != 1 {
1820 return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
1821 }
1822 subChans := tcs[0].SubChans()
1823 if len(subChans) != 1 {
1824 return false, fmt.Errorf("there should be 1 subchannel not %d", len(subChans))
1825 }
1826 var subConn int64
1827 for k := range subChans {
1828 subConn = k
1829 }
1830 scm := channelz.GetSubChannel(subConn)
1831 trace := scm.Trace()
1832 if trace == nil {
1833 return false, fmt.Errorf("trace for SubChannel should not be empty")
1834 }
1835 if len(trace.Events) == 0 {
1836 return false, fmt.Errorf("there should be at least one trace event for subChannel not 0")
1837 }
1838 for i := len(trace.Events) - 1; i >= 0; i-- {
1839 if strings.Contains(trace.Events[i].Desc, fmt.Sprintf("Subchannel picks a new address %q to connect", te.srvAddrs[2])) {
1840 break
1841 }
1842 if i == 0 {
1843 return false, fmt.Errorf("events do not contain expected address resolution of subchannel picked new address")
1844 }
1845 }
1846 return true, nil
1847 }); err != nil {
1848 t.Fatal(err)
1849 }
1850 }
1851
1852 func (s) TestCZSubChannelConnectivityState(t *testing.T) {
1853 e := tcpClearRREnv
1854 te := newTest(t, e)
1855 te.startServer(&testServer{security: e.security})
1856 r := manual.NewBuilderWithScheme("whatever")
1857 r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: te.srvAddr}}})
1858 te.resolverScheme = r.Scheme()
1859 cc := te.clientConn(grpc.WithResolvers(r))
1860 defer te.tearDown()
1861 tc := testgrpc.NewTestServiceClient(cc)
1862
1863 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
1864 defer cancel()
1865 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
1866 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
1867 }
1868 te.srv.Stop()
1869
1870 var subConn int64
1871 if err := verifyResultWithDelay(func() (bool, error) {
1872
1873
1874 if subConn == 0 {
1875 tcs, _ := channelz.GetTopChannels(0, 0)
1876 if len(tcs) != 1 {
1877 return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
1878 }
1879 subChans := tcs[0].SubChans()
1880 if len(subChans) != 1 {
1881 return false, fmt.Errorf("there should be 1 subchannel not %d", len(subChans))
1882 }
1883 for k := range subChans {
1884
1885 subConn = k
1886 t.Logf("SubChannel Id is %d", subConn)
1887 }
1888 }
1889 scm := channelz.GetSubChannel(subConn)
1890 if scm == nil {
1891 return false, fmt.Errorf("subChannel should still exist due to parent's trace reference")
1892 }
1893 trace := scm.Trace()
1894 if trace == nil {
1895 return false, fmt.Errorf("trace for SubChannel should not be empty")
1896 }
1897 if len(trace.Events) == 0 {
1898 return false, fmt.Errorf("there should be at least one trace event for subChannel not 0")
1899 }
1900 var ready, connecting, transient, shutdown int
1901 t.Log("SubChannel trace events seen so far...")
1902 for _, e := range trace.Events {
1903 t.Log(e.Desc)
1904 if strings.Contains(e.Desc, fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.TransientFailure)) {
1905 transient++
1906 }
1907 }
1908
1909
1910 if transient == 0 {
1911 return false, fmt.Errorf("transient failure has not happened on SubChannel yet")
1912 }
1913 transient = 0
1914 r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "fake address"}}})
1915 t.Log("SubChannel trace events seen so far...")
1916 for _, e := range trace.Events {
1917 t.Log(e.Desc)
1918 if strings.Contains(e.Desc, fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.Ready)) {
1919 ready++
1920 }
1921 if strings.Contains(e.Desc, fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.Connecting)) {
1922 connecting++
1923 }
1924 if strings.Contains(e.Desc, fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.TransientFailure)) {
1925 transient++
1926 }
1927 if strings.Contains(e.Desc, fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.Shutdown)) {
1928 shutdown++
1929 }
1930 }
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941 if ready != 1 || connecting < 1 || transient < 1 || shutdown != 1 {
1942 return false, fmt.Errorf("got: ready = %d, connecting = %d, transient = %d, shutdown = %d, want: 1, >=1, >=1, 1", ready, connecting, transient, shutdown)
1943 }
1944
1945 return true, nil
1946 }); err != nil {
1947 t.Fatal(err)
1948 }
1949 }
1950
1951 func (s) TestCZChannelConnectivityState(t *testing.T) {
1952 e := tcpClearRREnv
1953 te := newTest(t, e)
1954 te.startServer(&testServer{security: e.security})
1955 r := manual.NewBuilderWithScheme("whatever")
1956 r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: te.srvAddr}}})
1957 te.resolverScheme = r.Scheme()
1958 cc := te.clientConn(grpc.WithResolvers(r))
1959 defer te.tearDown()
1960 tc := testgrpc.NewTestServiceClient(cc)
1961
1962 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
1963 defer cancel()
1964 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
1965 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
1966 }
1967 te.srv.Stop()
1968
1969 if err := verifyResultWithDelay(func() (bool, error) {
1970 tcs, _ := channelz.GetTopChannels(0, 0)
1971 if len(tcs) != 1 {
1972 return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
1973 }
1974
1975 var ready, connecting, transient int
1976 t.Log("Channel trace events seen so far...")
1977 for _, e := range tcs[0].Trace().Events {
1978 t.Log(e.Desc)
1979 if strings.Contains(e.Desc, fmt.Sprintf("Channel Connectivity change to %v", connectivity.Ready)) {
1980 ready++
1981 }
1982 if strings.Contains(e.Desc, fmt.Sprintf("Channel Connectivity change to %v", connectivity.Connecting)) {
1983 connecting++
1984 }
1985 if strings.Contains(e.Desc, fmt.Sprintf("Channel Connectivity change to %v", connectivity.TransientFailure)) {
1986 transient++
1987 }
1988 }
1989
1990
1991
1992
1993
1994
1995
1996
1997
1998
1999 if ready != 1 || connecting < 1 || transient < 1 {
2000 return false, fmt.Errorf("got: ready = %d, connecting = %d, transient = %d, want: 1, >=1, >=1", ready, connecting, transient)
2001 }
2002 return true, nil
2003 }); err != nil {
2004 t.Fatal(err)
2005 }
2006 }
2007
2008 func (s) TestCZTraceOverwriteChannelDeletion(t *testing.T) {
2009 e := tcpClearRREnv
2010 e.balancer = ""
2011 te := newTest(t, e)
2012 channelz.SetMaxTraceEntry(1)
2013 defer channelz.ResetMaxTraceEntryToDefault()
2014 r := manual.NewBuilderWithScheme("whatever")
2015 te.resolverScheme = r.Scheme()
2016 te.clientConn(grpc.WithResolvers(r))
2017 resolvedAddrs := []resolver.Address{{Addr: "127.0.0.1:0", ServerName: "grpclb.server"}}
2018 grpclbConfig := parseServiceConfig(t, r, `{"loadBalancingPolicy": "grpclb"}`)
2019 r.UpdateState(grpclbstate.Set(resolver.State{ServiceConfig: grpclbConfig}, &grpclbstate.State{BalancerAddresses: resolvedAddrs}))
2020 defer te.tearDown()
2021 var nestedConn int64
2022 if err := verifyResultWithDelay(func() (bool, error) {
2023 tcs, _ := channelz.GetTopChannels(0, 0)
2024 if len(tcs) != 1 {
2025 return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
2026 }
2027 nestedChans := tcs[0].NestedChans()
2028 if len(nestedChans) != 1 {
2029 return false, fmt.Errorf("there should be one nested channel from grpclb, not %d", len(nestedChans))
2030 }
2031 for k := range nestedChans {
2032 nestedConn = k
2033 }
2034 return true, nil
2035 }); err != nil {
2036 t.Fatal(err)
2037 }
2038
2039 r.UpdateState(resolver.State{
2040 Addresses: []resolver.Address{{Addr: "127.0.0.1:0"}},
2041 ServiceConfig: parseServiceConfig(t, r, `{"loadBalancingPolicy": "round_robin"}`),
2042 })
2043
2044
2045 if err := verifyResultWithDelay(func() (bool, error) {
2046 tcs, _ := channelz.GetTopChannels(0, 0)
2047 if len(tcs) != 1 {
2048 return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
2049 }
2050
2051 if nestedChans := tcs[0].NestedChans(); len(nestedChans) != 0 {
2052 return false, fmt.Errorf("there should be 0 nested channel from grpclb, not %d", len(nestedChans))
2053 }
2054 return true, nil
2055 }); err != nil {
2056 t.Fatal(err)
2057 }
2058
2059
2060
2061 r.UpdateState(resolver.State{
2062 Addresses: []resolver.Address{{Addr: "127.0.0.1:0"}},
2063 ServiceConfig: parseServiceConfig(t, r, `{"loadBalancingPolicy": "round_robin"}`),
2064 })
2065
2066
2067 if err := verifyResultWithDelay(func() (bool, error) {
2068 cm := channelz.GetChannel(nestedConn)
2069 if cm != nil {
2070 return false, fmt.Errorf("nested channel should have been deleted since its parent's trace should not contain any reference to it anymore")
2071 }
2072 return true, nil
2073 }); err != nil {
2074 t.Fatal(err)
2075 }
2076 }
2077
2078 func (s) TestCZTraceOverwriteSubChannelDeletion(t *testing.T) {
2079 e := tcpClearRREnv
2080 te := newTest(t, e)
2081 channelz.SetMaxTraceEntry(1)
2082 defer channelz.ResetMaxTraceEntryToDefault()
2083 te.startServer(&testServer{security: e.security})
2084 r := manual.NewBuilderWithScheme("whatever")
2085 r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: te.srvAddr}}})
2086 te.resolverScheme = r.Scheme()
2087 te.clientConn(grpc.WithResolvers(r))
2088 defer te.tearDown()
2089 var subConn int64
2090
2091
2092 if err := verifyResultWithDelay(func() (bool, error) {
2093 tcs, _ := channelz.GetTopChannels(0, 0)
2094 if len(tcs) != 1 {
2095 return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
2096 }
2097 subChans := tcs[0].SubChans()
2098 if len(subChans) != 1 {
2099 return false, fmt.Errorf("there should be 1 subchannel not %d", len(subChans))
2100 }
2101 for k := range subChans {
2102 subConn = k
2103 }
2104 return true, nil
2105 }); err != nil {
2106 t.Fatal(err)
2107 }
2108
2109 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
2110 defer cancel()
2111 testutils.AwaitState(ctx, t, te.cc, connectivity.Ready)
2112 r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "fake address"}}})
2113 testutils.AwaitNotState(ctx, t, te.cc, connectivity.Ready)
2114
2115
2116 if err := verifyResultWithDelay(func() (bool, error) {
2117 cm := channelz.GetChannel(subConn)
2118 if cm != nil {
2119 return false, fmt.Errorf("subchannel should have been deleted since its parent's trace should not contain any reference to it anymore")
2120 }
2121 return true, nil
2122 }); err != nil {
2123 t.Fatal(err)
2124 }
2125 }
2126
2127 func (s) TestCZTraceTopChannelDeletionTraceClear(t *testing.T) {
2128 e := tcpClearRREnv
2129 te := newTest(t, e)
2130 te.startServer(&testServer{security: e.security})
2131 r := manual.NewBuilderWithScheme("whatever")
2132 r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: te.srvAddr}}})
2133 te.resolverScheme = r.Scheme()
2134 te.clientConn(grpc.WithResolvers(r))
2135 var subConn int64
2136
2137
2138 if err := verifyResultWithDelay(func() (bool, error) {
2139 tcs, _ := channelz.GetTopChannels(0, 0)
2140 if len(tcs) != 1 {
2141 return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
2142 }
2143 subChans := tcs[0].SubChans()
2144 if len(subChans) != 1 {
2145 return false, fmt.Errorf("there should be 1 subchannel not %d", len(subChans))
2146 }
2147 for k := range subChans {
2148 subConn = k
2149 }
2150 return true, nil
2151 }); err != nil {
2152 t.Fatal(err)
2153 }
2154 te.tearDown()
2155
2156 if err := verifyResultWithDelay(func() (bool, error) {
2157 cm := channelz.GetChannel(subConn)
2158 if cm != nil {
2159 return false, fmt.Errorf("subchannel should have been deleted since its parent's trace should not contain any reference to it anymore")
2160 }
2161 return true, nil
2162 }); err != nil {
2163 t.Fatal(err)
2164 }
2165 }
2166
View as plain text