1
18
19 package test
20
21 import (
22 "context"
23 "fmt"
24 "io"
25 "net"
26 "strings"
27 "testing"
28 "time"
29
30 "golang.org/x/net/http2"
31 "google.golang.org/grpc"
32 "google.golang.org/grpc/codes"
33 "google.golang.org/grpc/connectivity"
34 "google.golang.org/grpc/credentials/insecure"
35 "google.golang.org/grpc/internal"
36 "google.golang.org/grpc/internal/grpcsync"
37 "google.golang.org/grpc/internal/grpctest"
38 "google.golang.org/grpc/internal/stubserver"
39 "google.golang.org/grpc/internal/testutils"
40 "google.golang.org/grpc/keepalive"
41 "google.golang.org/grpc/resolver"
42 "google.golang.org/grpc/resolver/manual"
43 "google.golang.org/grpc/status"
44
45 testgrpc "google.golang.org/grpc/interop/grpc_testing"
46 testpb "google.golang.org/grpc/interop/grpc_testing"
47 )
48
49
50
51
52
53
54
55
56 func (s) TestGracefulClientOnGoAway(t *testing.T) {
57 const maxConnAge = 100 * time.Millisecond
58 const testTime = maxConnAge * 10
59
60 ss := &stubserver.StubServer{
61 EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) {
62 return &testpb.Empty{}, nil
63 },
64 }
65
66 s := grpc.NewServer(grpc.KeepaliveParams(keepalive.ServerParameters{MaxConnectionAge: maxConnAge}))
67 defer s.Stop()
68 testgrpc.RegisterTestServiceServer(s, ss)
69
70 lis, err := net.Listen("tcp", "localhost:0")
71 if err != nil {
72 t.Fatalf("Failed to create listener: %v", err)
73 }
74 go s.Serve(lis)
75
76 cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
77 if err != nil {
78 t.Fatalf("Failed to dial server: %v", err)
79 }
80 defer cc.Close()
81 c := testgrpc.NewTestServiceClient(cc)
82
83 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
84 defer cancel()
85
86 endTime := time.Now().Add(testTime)
87 for time.Now().Before(endTime) {
88 if _, err := c.EmptyCall(ctx, &testpb.Empty{}); err != nil {
89 t.Fatalf("EmptyCall(_, _) = _, %v; want _, <nil>", err)
90 }
91 }
92 }
93
94 func (s) TestDetailedGoAwayErrorOnGracefulClosePropagatesToRPCError(t *testing.T) {
95 rpcDoneOnClient := make(chan struct{})
96 ss := &stubserver.StubServer{
97 FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
98 <-rpcDoneOnClient
99 return status.Error(codes.Internal, "arbitrary status")
100 },
101 }
102 sopts := []grpc.ServerOption{
103 grpc.KeepaliveParams(keepalive.ServerParameters{
104 MaxConnectionAge: time.Millisecond * 100,
105 MaxConnectionAgeGrace: time.Nanosecond,
106 }),
107 }
108 if err := ss.Start(sopts); err != nil {
109 t.Fatalf("Error starting endpoint server: %v", err)
110 }
111 defer ss.Stop()
112
113 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
114 defer cancel()
115 stream, err := ss.Client.FullDuplexCall(ctx)
116 if err != nil {
117 t.Fatalf("%v.FullDuplexCall = _, %v, want _, <nil>", ss.Client, err)
118 }
119 const expectedErrorMessageSubstring = "received prior goaway: code: NO_ERROR"
120 _, err = stream.Recv()
121 close(rpcDoneOnClient)
122 if err == nil || !strings.Contains(err.Error(), expectedErrorMessageSubstring) {
123 t.Fatalf("%v.Recv() = _, %v, want _, rpc error containing substring: %q", stream, err, expectedErrorMessageSubstring)
124 }
125 }
126
127 func (s) TestDetailedGoAwayErrorOnAbruptClosePropagatesToRPCError(t *testing.T) {
128 grpctest.TLogger.ExpectError("Client received GoAway with error code ENHANCE_YOUR_CALM and debug data equal to ASCII \"too_many_pings\"")
129
130
131 prev := internal.KeepaliveMinPingTime
132 internal.KeepaliveMinPingTime = time.Millisecond
133 defer func() { internal.KeepaliveMinPingTime = prev }()
134
135 rpcDoneOnClient := make(chan struct{})
136 ss := &stubserver.StubServer{
137 FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
138 <-rpcDoneOnClient
139 return status.Error(codes.Internal, "arbitrary status")
140 },
141 }
142 sopts := []grpc.ServerOption{
143 grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
144 MinTime: time.Second * 1000,
145 }),
146 }
147 dopts := []grpc.DialOption{
148 grpc.WithKeepaliveParams(keepalive.ClientParameters{
149 Time: time.Millisecond,
150 Timeout: time.Second * 1000,
151 PermitWithoutStream: false,
152 }),
153 }
154 if err := ss.Start(sopts, dopts...); err != nil {
155 t.Fatalf("Error starting endpoint server: %v", err)
156 }
157 defer ss.Stop()
158
159 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
160 defer cancel()
161 stream, err := ss.Client.FullDuplexCall(ctx)
162 if err != nil {
163 t.Fatalf("%v.FullDuplexCall = _, %v, want _, <nil>", ss.Client, err)
164 }
165 const expectedErrorMessageSubstring = `received prior goaway: code: ENHANCE_YOUR_CALM, debug data: "too_many_pings"`
166 _, err = stream.Recv()
167 close(rpcDoneOnClient)
168 if err == nil || !strings.Contains(err.Error(), expectedErrorMessageSubstring) {
169 t.Fatalf("%v.Recv() = _, %v, want _, rpc error containing substring: |%v|", stream, err, expectedErrorMessageSubstring)
170 }
171 }
172
173 func (s) TestClientConnCloseAfterGoAwayWithActiveStream(t *testing.T) {
174 for _, e := range listTestEnv() {
175 if e.name == "handler-tls" {
176 continue
177 }
178 testClientConnCloseAfterGoAwayWithActiveStream(t, e)
179 }
180 }
181
182 func testClientConnCloseAfterGoAwayWithActiveStream(t *testing.T, e env) {
183 te := newTest(t, e)
184 te.startServer(&testServer{security: e.security})
185 defer te.tearDown()
186 cc := te.clientConn()
187 tc := testgrpc.NewTestServiceClient(cc)
188
189 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
190 defer cancel()
191 if _, err := tc.FullDuplexCall(ctx); err != nil {
192 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want _, <nil>", tc, err)
193 }
194 done := make(chan struct{})
195 go func() {
196 te.srv.GracefulStop()
197 close(done)
198 }()
199 time.Sleep(50 * time.Millisecond)
200 cc.Close()
201 timeout := time.NewTimer(time.Second)
202 select {
203 case <-done:
204 case <-timeout.C:
205 t.Fatalf("Test timed-out.")
206 }
207 }
208
209 func (s) TestServerGoAway(t *testing.T) {
210 for _, e := range listTestEnv() {
211 if e.name == "handler-tls" {
212 continue
213 }
214 testServerGoAway(t, e)
215 }
216 }
217
218 func testServerGoAway(t *testing.T, e env) {
219 te := newTest(t, e)
220 te.userAgent = testAppUA
221 te.startServer(&testServer{security: e.security})
222 defer te.tearDown()
223
224 cc := te.clientConn()
225 tc := testgrpc.NewTestServiceClient(cc)
226
227 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
228 defer cancel()
229 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
230 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
231 }
232 ch := make(chan struct{})
233 go func() {
234 te.srv.GracefulStop()
235 close(ch)
236 }()
237
238 for {
239 ctx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
240 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil && status.Code(err) != codes.DeadlineExceeded {
241 cancel()
242 break
243 }
244 cancel()
245 }
246
247 ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout)
248 defer cancel()
249 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Unavailable && status.Code(err) != codes.Internal {
250 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s or %s", err, codes.Unavailable, codes.Internal)
251 }
252 <-ch
253 awaitNewConnLogOutput()
254 }
255
256 func (s) TestServerGoAwayPendingRPC(t *testing.T) {
257 for _, e := range listTestEnv() {
258 if e.name == "handler-tls" {
259 continue
260 }
261 testServerGoAwayPendingRPC(t, e)
262 }
263 }
264
265 func testServerGoAwayPendingRPC(t *testing.T, e env) {
266 te := newTest(t, e)
267 te.userAgent = testAppUA
268 te.declareLogNoise(
269 "transport: http2Client.notifyError got notified that the client transport was broken EOF",
270 "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
271 "grpc: addrConn.resetTransport failed to create client transport: connection error",
272 )
273 te.startServer(&testServer{security: e.security})
274 defer te.tearDown()
275
276 cc := te.clientConn()
277 tc := testgrpc.NewTestServiceClient(cc)
278 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
279 stream, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true))
280 if err != nil {
281 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
282 }
283
284 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
285 t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, <nil>", tc, err)
286 }
287 ch := make(chan struct{})
288 go func() {
289 te.srv.GracefulStop()
290 close(ch)
291 }()
292
293 start := time.Now()
294 errored := false
295 for time.Since(start) < time.Second {
296 ctx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
297 _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true))
298 cancel()
299 if err != nil {
300 errored = true
301 break
302 }
303 }
304 if !errored {
305 t.Fatalf("GoAway never received by client")
306 }
307 respParam := []*testpb.ResponseParameters{{Size: 1}}
308 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(100))
309 if err != nil {
310 t.Fatal(err)
311 }
312 req := &testpb.StreamingOutputCallRequest{
313 ResponseType: testpb.PayloadType_COMPRESSABLE,
314 ResponseParameters: respParam,
315 Payload: payload,
316 }
317
318 if err := stream.Send(req); err != nil {
319 t.Fatalf("%v.Send(_) = %v, want <nil>", stream, err)
320 }
321 if _, err := stream.Recv(); err != nil {
322 t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err)
323 }
324
325 cancel()
326 <-ch
327 awaitNewConnLogOutput()
328 }
329
330 func (s) TestServerMultipleGoAwayPendingRPC(t *testing.T) {
331 for _, e := range listTestEnv() {
332 if e.name == "handler-tls" {
333 continue
334 }
335 testServerMultipleGoAwayPendingRPC(t, e)
336 }
337 }
338
339 func testServerMultipleGoAwayPendingRPC(t *testing.T, e env) {
340 te := newTest(t, e)
341 te.userAgent = testAppUA
342 te.declareLogNoise(
343 "transport: http2Client.notifyError got notified that the client transport was broken EOF",
344 "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
345 "grpc: addrConn.resetTransport failed to create client transport: connection error",
346 )
347 te.startServer(&testServer{security: e.security})
348 defer te.tearDown()
349
350 cc := te.clientConn()
351 tc := testgrpc.NewTestServiceClient(cc)
352 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
353 stream, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true))
354 if err != nil {
355 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
356 }
357
358 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
359 t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, <nil>", tc, err)
360 }
361 ch1 := make(chan struct{})
362 go func() {
363 te.srv.GracefulStop()
364 close(ch1)
365 }()
366 ch2 := make(chan struct{})
367 go func() {
368 te.srv.GracefulStop()
369 close(ch2)
370 }()
371
372
373 for {
374 ctx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
375 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
376 cancel()
377 break
378 }
379 cancel()
380 }
381 select {
382 case <-ch1:
383 t.Fatal("GracefulStop() terminated early")
384 case <-ch2:
385 t.Fatal("GracefulStop() terminated early")
386 default:
387 }
388 respParam := []*testpb.ResponseParameters{
389 {
390 Size: 1,
391 },
392 }
393 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(100))
394 if err != nil {
395 t.Fatal(err)
396 }
397 req := &testpb.StreamingOutputCallRequest{
398 ResponseType: testpb.PayloadType_COMPRESSABLE,
399 ResponseParameters: respParam,
400 Payload: payload,
401 }
402
403 if err := stream.Send(req); err != nil {
404 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
405 }
406 if _, err := stream.Recv(); err != nil {
407 t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err)
408 }
409 if err := stream.CloseSend(); err != nil {
410 t.Fatalf("%v.CloseSend() = %v, want <nil>", stream, err)
411 }
412
413 <-ch1
414 <-ch2
415 cancel()
416 awaitNewConnLogOutput()
417 }
418
419 func (s) TestConcurrentClientConnCloseAndServerGoAway(t *testing.T) {
420 for _, e := range listTestEnv() {
421 if e.name == "handler-tls" {
422 continue
423 }
424 testConcurrentClientConnCloseAndServerGoAway(t, e)
425 }
426 }
427
428 func testConcurrentClientConnCloseAndServerGoAway(t *testing.T, e env) {
429 te := newTest(t, e)
430 te.userAgent = testAppUA
431 te.declareLogNoise(
432 "transport: http2Client.notifyError got notified that the client transport was broken EOF",
433 "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
434 "grpc: addrConn.resetTransport failed to create client transport: connection error",
435 )
436 te.startServer(&testServer{security: e.security})
437 defer te.tearDown()
438
439 cc := te.clientConn()
440 tc := testgrpc.NewTestServiceClient(cc)
441 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
442 defer cancel()
443 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
444 t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, <nil>", tc, err)
445 }
446 ch := make(chan struct{})
447
448 go func() {
449 te.srv.GracefulStop()
450 close(ch)
451 }()
452 go func() {
453 cc.Close()
454 }()
455 <-ch
456 }
457
458 func (s) TestConcurrentServerStopAndGoAway(t *testing.T) {
459 for _, e := range listTestEnv() {
460 if e.name == "handler-tls" {
461 continue
462 }
463 testConcurrentServerStopAndGoAway(t, e)
464 }
465 }
466
467 func testConcurrentServerStopAndGoAway(t *testing.T, e env) {
468 te := newTest(t, e)
469 te.userAgent = testAppUA
470 te.declareLogNoise(
471 "transport: http2Client.notifyError got notified that the client transport was broken EOF",
472 "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
473 "grpc: addrConn.resetTransport failed to create client transport: connection error",
474 )
475 te.startServer(&testServer{security: e.security})
476 defer te.tearDown()
477
478 cc := te.clientConn()
479 tc := testgrpc.NewTestServiceClient(cc)
480 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
481 defer cancel()
482 stream, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true))
483 if err != nil {
484 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
485 }
486
487
488 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
489 t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, <nil>", tc, err)
490 }
491
492 ch := make(chan struct{})
493 go func() {
494 te.srv.GracefulStop()
495 close(ch)
496 }()
497
498 for {
499 ctx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
500 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
501 cancel()
502 break
503 }
504 cancel()
505 }
506
507 te.srv.Stop()
508 respParam := []*testpb.ResponseParameters{
509 {
510 Size: 1,
511 },
512 }
513 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(100))
514 if err != nil {
515 t.Fatal(err)
516 }
517 req := &testpb.StreamingOutputCallRequest{
518 ResponseType: testpb.PayloadType_COMPRESSABLE,
519 ResponseParameters: respParam,
520 Payload: payload,
521 }
522 sendStart := time.Now()
523 for {
524 if err := stream.Send(req); err == io.EOF {
525
526 break
527 } else if err != nil {
528
529 t.Fatalf("stream.Send(%v) = %v; want <nil or io.EOF>", req, err)
530 }
531 if time.Since(sendStart) > 2*time.Second {
532 t.Fatalf("stream.Send(_) did not return io.EOF after 2s")
533 }
534 time.Sleep(time.Millisecond)
535 }
536 if _, err := stream.Recv(); err == nil || err == io.EOF {
537 t.Fatalf("%v.Recv() = _, %v, want _, <non-nil, non-EOF>", stream, err)
538 }
539 <-ch
540 awaitNewConnLogOutput()
541 }
542
543
544
545
546 func (s) TestGoAwayThenClose(t *testing.T) {
547 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
548 defer cancel()
549
550 lis1, err := net.Listen("tcp", "localhost:0")
551 if err != nil {
552 t.Fatalf("Error while listening. Err: %v", err)
553 }
554 s1 := grpc.NewServer()
555 defer s1.Stop()
556 ts := &funcServer{
557 unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
558 return &testpb.SimpleResponse{}, nil
559 },
560 fullDuplexCall: func(stream testgrpc.TestService_FullDuplexCallServer) error {
561 if err := stream.Send(&testpb.StreamingOutputCallResponse{}); err != nil {
562 t.Errorf("unexpected error from send: %v", err)
563 return err
564 }
565
566 _, err := stream.Recv()
567 if err == nil {
568 t.Error("expected to never receive any message")
569 }
570 return err
571 },
572 }
573 testgrpc.RegisterTestServiceServer(s1, ts)
574 go s1.Serve(lis1)
575
576 conn2Established := grpcsync.NewEvent()
577 lis2, err := listenWithNotifyingListener("tcp", "localhost:0", conn2Established)
578 if err != nil {
579 t.Fatalf("Error while listening. Err: %v", err)
580 }
581 s2 := grpc.NewServer()
582 defer s2.Stop()
583 testgrpc.RegisterTestServiceServer(s2, ts)
584
585 r := manual.NewBuilderWithScheme("whatever")
586 r.InitialState(resolver.State{Addresses: []resolver.Address{
587 {Addr: lis1.Addr().String()},
588 {Addr: lis2.Addr().String()},
589 }})
590 cc, err := grpc.DialContext(ctx, r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
591 if err != nil {
592 t.Fatalf("Error creating client: %v", err)
593 }
594 defer cc.Close()
595
596 client := testgrpc.NewTestServiceClient(cc)
597
598 t.Log("Waiting for the ClientConn to enter READY state.")
599 testutils.AwaitState(ctx, t, cc, connectivity.Ready)
600
601
602
603
604
605
606
607 t.Log("Creating first streaming RPC to server 1.")
608 stream, err := client.FullDuplexCall(ctx)
609 if err != nil {
610 t.Fatalf("FullDuplexCall(_) = _, %v; want _, nil", err)
611 }
612 if _, err = stream.Recv(); err != nil {
613 t.Fatalf("unexpected error from first recv: %v", err)
614 }
615
616 go s2.Serve(lis2)
617
618 t.Log("Gracefully stopping server 1.")
619 go s1.GracefulStop()
620
621 t.Log("Waiting for the ClientConn to enter IDLE state.")
622 testutils.AwaitState(ctx, t, cc, connectivity.Idle)
623
624 t.Log("Performing another RPC to create a connection to server 2.")
625 if _, err := client.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
626 t.Fatalf("UnaryCall(_) = _, %v; want _, nil", err)
627 }
628
629 t.Log("Waiting for a connection to server 2.")
630 select {
631 case <-conn2Established.Done():
632 case <-ctx.Done():
633 t.Fatalf("timed out waiting for connection 2 to be established")
634 }
635
636
637 lis2.Close()
638
639 t.Log("Hard closing connection 1.")
640 s1.Stop()
641
642 t.Log("Waiting for the first stream to error.")
643 if _, err = stream.Recv(); err == nil {
644 t.Fatal("expected the stream to die, but got a successful Recv")
645 }
646
647 t.Log("Ensuring connection 2 is stable.")
648 for i := 0; i < 10; i++ {
649 if _, err := client.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
650 t.Fatalf("UnaryCall(_) = _, %v; want _, nil", err)
651 }
652 }
653 }
654
655
656
657
658
659 func (s) TestGoAwayStreamIDSmallerThanCreatedStreams(t *testing.T) {
660 lis, err := net.Listen("tcp", "localhost:0")
661 if err != nil {
662 t.Fatalf("error listening: %v", err)
663 }
664
665 ctCh := testutils.NewChannel()
666 go func() {
667 conn, err := lis.Accept()
668 if err != nil {
669 t.Errorf("error in lis.Accept(): %v", err)
670 }
671 ct := newClientTester(t, conn)
672 ctCh.Send(ct)
673 }()
674
675 cc, err := grpc.Dial(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
676 if err != nil {
677 t.Fatalf("error dialing: %v", err)
678 }
679 defer cc.Close()
680
681 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
682 defer cancel()
683
684 val, err := ctCh.Receive(ctx)
685 if err != nil {
686 t.Fatalf("timeout waiting for client transport (should be given after http2 creation)")
687 }
688 ct := val.(*clientTester)
689
690 tc := testgrpc.NewTestServiceClient(cc)
691 someStreamsCreated := grpcsync.NewEvent()
692 goAwayWritten := grpcsync.NewEvent()
693 go func() {
694 for i := 0; i < 20; i++ {
695 if i == 10 {
696 <-goAwayWritten.Done()
697 }
698 tc.FullDuplexCall(ctx)
699 if i == 4 {
700 someStreamsCreated.Fire()
701 }
702 }
703 }()
704
705 <-someStreamsCreated.Done()
706 ct.writeGoAway(1, http2.ErrCodeNo, []byte{})
707 goAwayWritten.Fire()
708 }
709
710
711
712
713 func (s) TestTwoGoAwayPingFrames(t *testing.T) {
714 lis, err := net.Listen("tcp", "localhost:0")
715 if err != nil {
716 t.Fatalf("Failed to listen: %v", err)
717 }
718 defer lis.Close()
719 s := grpc.NewServer()
720 defer s.Stop()
721 go s.Serve(lis)
722
723 conn, err := net.DialTimeout("tcp", lis.Addr().String(), defaultTestTimeout)
724 if err != nil {
725 t.Fatalf("Failed to dial: %v", err)
726 }
727
728 st := newServerTesterFromConn(t, conn)
729 st.greet()
730 pingReceivedClientSide := testutils.NewChannel()
731 go func() {
732 for {
733 f, err := st.readFrame()
734 if err != nil {
735 return
736 }
737 switch f.(type) {
738 case *http2.GoAwayFrame:
739 case *http2.PingFrame:
740 pingReceivedClientSide.Send(nil)
741 default:
742 t.Errorf("server tester received unexpected frame type %T", f)
743 }
744 }
745 }()
746 gsDone := testutils.NewChannel()
747 go func() {
748 s.GracefulStop()
749 gsDone.Send(nil)
750 }()
751 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
752 defer cancel()
753 if _, err := pingReceivedClientSide.Receive(ctx); err != nil {
754 t.Fatalf("Error waiting for ping frame client side from graceful shutdown: %v", err)
755 }
756
757 st.writePing(true, [8]byte{1, 6, 1, 8, 0, 3, 3, 9})
758 st.writePing(true, [8]byte{1, 6, 1, 8, 0, 3, 3, 9})
759
760 conn.Close()
761 if _, err := gsDone.Receive(ctx); err != nil {
762 t.Fatalf("Error waiting for graceful shutdown of the server: %v", err)
763 }
764 }
765
766
767
768 func (s) TestClientSendsAGoAway(t *testing.T) {
769 lis, err := net.Listen("tcp", "localhost:0")
770 if err != nil {
771 t.Fatalf("error listening: %v", err)
772 }
773 ctCh := testutils.NewChannel()
774 go func() {
775 conn, err := lis.Accept()
776 if err != nil {
777 t.Errorf("error in lis.Accept(): %v", err)
778 }
779 ct := newClientTester(t, conn)
780 ctCh.Send(ct)
781 }()
782 defer lis.Close()
783
784 cc, err := grpc.Dial(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
785 if err != nil {
786 t.Fatalf("error dialing: %v", err)
787 }
788
789 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
790 defer cancel()
791
792 val, err := ctCh.Receive(ctx)
793 if err != nil {
794 t.Fatalf("timeout waiting for client transport (should be given after http2 creation)")
795 }
796 ct := val.(*clientTester)
797 goAwayReceived := make(chan struct{})
798 errCh := make(chan error)
799 go func() {
800 for {
801 f, err := ct.fr.ReadFrame()
802 if err != nil {
803 return
804 }
805 switch fr := f.(type) {
806 case *http2.GoAwayFrame:
807 fr = f.(*http2.GoAwayFrame)
808 if fr.ErrCode == http2.ErrCodeNo {
809 t.Logf("GoAway received from client")
810 close(goAwayReceived)
811 }
812 default:
813 t.Errorf("server tester received unexpected frame type %T", f)
814 errCh <- fmt.Errorf("server tester received unexpected frame type %T", f)
815 close(errCh)
816 }
817 }
818 }()
819 cc.Close()
820 defer ct.conn.Close()
821 select {
822 case <-goAwayReceived:
823 case err := <-errCh:
824 t.Errorf("Error receiving the goAway: %v", err)
825 case <-ctx.Done():
826 t.Errorf("Context timed out")
827 }
828 }
829
View as plain text