1
18
19
20
21
22
23 package transport
24
25 import (
26 "context"
27 "crypto/tls"
28 "crypto/x509"
29 "fmt"
30 "io"
31 "net"
32 "os"
33 "strings"
34 "testing"
35 "time"
36
37 "golang.org/x/net/http2"
38 "google.golang.org/grpc/credentials"
39 "google.golang.org/grpc/internal/channelz"
40 "google.golang.org/grpc/internal/grpctest"
41 "google.golang.org/grpc/internal/syscall"
42 "google.golang.org/grpc/keepalive"
43 "google.golang.org/grpc/testdata"
44 )
45
46 const defaultTestTimeout = 10 * time.Second
47
48
49
50
51 func (s) TestMaxConnectionIdle(t *testing.T) {
52 serverConfig := &ServerConfig{
53 KeepaliveParams: keepalive.ServerParameters{
54 MaxConnectionIdle: 30 * time.Millisecond,
55 },
56 }
57 server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
58 defer func() {
59 client.Close(fmt.Errorf("closed manually by test"))
60 server.stop()
61 cancel()
62 }()
63
64 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
65 defer cancel()
66 stream, err := client.NewStream(ctx, &CallHdr{})
67 if err != nil {
68 t.Fatalf("client.NewStream() failed: %v", err)
69 }
70 client.CloseStream(stream, io.EOF)
71
72
73
74 select {
75 case <-ctx.Done():
76 t.Fatalf("context expired before receiving GoAway from the server.")
77 case <-client.GoAway():
78 reason, debugMsg := client.GetGoAwayReason()
79 if reason != GoAwayNoReason {
80 t.Fatalf("GoAwayReason is %v, want %v", reason, GoAwayNoReason)
81 }
82 if !strings.Contains(debugMsg, "max_idle") {
83 t.Fatalf("GoAwayDebugMessage is %v, want %v", debugMsg, "max_idle")
84 }
85 }
86 }
87
88
89
90 func (s) TestMaxConnectionIdleBusyClient(t *testing.T) {
91 serverConfig := &ServerConfig{
92 KeepaliveParams: keepalive.ServerParameters{
93 MaxConnectionIdle: 100 * time.Millisecond,
94 },
95 }
96 server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
97 defer func() {
98 client.Close(fmt.Errorf("closed manually by test"))
99 server.stop()
100 cancel()
101 }()
102
103 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
104 defer cancel()
105 _, err := client.NewStream(ctx, &CallHdr{})
106 if err != nil {
107 t.Fatalf("client.NewStream() failed: %v", err)
108 }
109
110
111
112 ctx, cancel = context.WithTimeout(context.Background(), time.Second)
113 defer cancel()
114 select {
115 case <-client.GoAway():
116 t.Fatalf("A busy client received a GoAway.")
117 case <-ctx.Done():
118 }
119 }
120
121
122
123 func (s) TestMaxConnectionAge(t *testing.T) {
124 maxConnAge := 100 * time.Millisecond
125 serverConfig := &ServerConfig{
126 KeepaliveParams: keepalive.ServerParameters{
127 MaxConnectionAge: maxConnAge,
128 MaxConnectionAgeGrace: 10 * time.Millisecond,
129 },
130 }
131 server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
132 defer func() {
133 client.Close(fmt.Errorf("closed manually by test"))
134 server.stop()
135 cancel()
136 }()
137
138 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
139 defer cancel()
140 if _, err := client.NewStream(ctx, &CallHdr{}); err != nil {
141 t.Fatalf("client.NewStream() failed: %v", err)
142 }
143
144
145
146 select {
147 case <-client.GoAway():
148 reason, debugMsg := client.GetGoAwayReason()
149 if reason != GoAwayNoReason {
150 t.Fatalf("GoAwayReason is %v, want %v", reason, GoAwayNoReason)
151 }
152 if !strings.Contains(debugMsg, "max_age") {
153 t.Fatalf("GoAwayDebugMessage is %v, want %v", debugMsg, "max_age")
154 }
155 case <-ctx.Done():
156 t.Fatalf("timed out before getting a GoAway from the server.")
157 }
158 }
159
160 const (
161 defaultWriteBufSize = 32 * 1024
162 defaultReadBufSize = 32 * 1024
163 )
164
165
166
167
168
169
170 func (s) TestKeepaliveServerClosesUnresponsiveClient(t *testing.T) {
171 serverConfig := &ServerConfig{
172 KeepaliveParams: keepalive.ServerParameters{
173 Time: 100 * time.Millisecond,
174 Timeout: 10 * time.Millisecond,
175 },
176 }
177 server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
178 defer func() {
179 client.Close(fmt.Errorf("closed manually by test"))
180 server.stop()
181 cancel()
182 }()
183
184 addr := server.addr()
185 conn, err := net.Dial("tcp", addr)
186 if err != nil {
187 t.Fatalf("net.Dial(tcp, %v) failed: %v", addr, err)
188 }
189 defer conn.Close()
190
191 if n, err := conn.Write(clientPreface); err != nil || n != len(clientPreface) {
192 t.Fatalf("conn.Write(clientPreface) failed: n=%v, err=%v", n, err)
193 }
194 framer := newFramer(conn, defaultWriteBufSize, defaultReadBufSize, false, 0)
195 if err := framer.fr.WriteSettings(http2.Setting{}); err != nil {
196 t.Fatal("framer.WriteSettings(http2.Setting{}) failed:", err)
197 }
198 framer.writer.Flush()
199
200
201
202 errCh := make(chan error, 1)
203 go func() {
204 b := make([]byte, 24)
205 for {
206 if _, err = conn.Read(b); err != nil {
207 errCh <- err
208 return
209 }
210 }
211 }()
212
213
214
215 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
216 defer cancel()
217 select {
218 case err := <-errCh:
219 if err != io.EOF {
220 t.Fatalf("client.Read(_) = _,%v, want io.EOF", err)
221
222 }
223 case <-ctx.Done():
224 t.Fatalf("Test timed out before server closed the connection.")
225 }
226 }
227
228
229
230 func (s) TestKeepaliveServerWithResponsiveClient(t *testing.T) {
231 serverConfig := &ServerConfig{
232 KeepaliveParams: keepalive.ServerParameters{
233 Time: 100 * time.Millisecond,
234 Timeout: 100 * time.Millisecond,
235 },
236 }
237 server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
238 defer func() {
239 client.Close(fmt.Errorf("closed manually by test"))
240 server.stop()
241 cancel()
242 }()
243
244
245 time.Sleep(500 * time.Millisecond)
246
247 if err := checkForHealthyStream(client); err != nil {
248 t.Fatalf("Stream creation failed: %v", err)
249 }
250 }
251
252 func channelzSubChannel(t *testing.T) *channelz.SubChannel {
253 ch := channelz.RegisterChannel(nil, "test chan")
254 sc := channelz.RegisterSubChannel(ch, "test subchan")
255 t.Cleanup(func() {
256 channelz.RemoveEntry(sc.ID)
257 channelz.RemoveEntry(ch.ID)
258 })
259 return sc
260 }
261
262
263
264
265
266
267 func (s) TestKeepaliveClientClosesUnresponsiveServer(t *testing.T) {
268 connCh := make(chan net.Conn, 1)
269 copts := ConnectOptions{
270 ChannelzParent: channelzSubChannel(t),
271 KeepaliveParams: keepalive.ClientParameters{
272 Time: 10 * time.Millisecond,
273 Timeout: 10 * time.Millisecond,
274 PermitWithoutStream: true,
275 },
276 }
277 client, cancel := setUpWithNoPingServer(t, copts, connCh)
278 defer cancel()
279 defer client.Close(fmt.Errorf("closed manually by test"))
280
281 conn, ok := <-connCh
282 if !ok {
283 t.Fatalf("Server didn't return connection object")
284 }
285 defer conn.Close()
286
287 if err := pollForStreamCreationError(client); err != nil {
288 t.Fatal(err)
289 }
290 }
291
292
293
294
295
296
297 func (s) TestKeepaliveClientOpenWithUnresponsiveServer(t *testing.T) {
298 connCh := make(chan net.Conn, 1)
299 copts := ConnectOptions{
300 ChannelzParent: channelzSubChannel(t),
301 KeepaliveParams: keepalive.ClientParameters{
302 Time: 10 * time.Millisecond,
303 Timeout: 10 * time.Millisecond,
304 },
305 }
306 client, cancel := setUpWithNoPingServer(t, copts, connCh)
307 defer cancel()
308 defer client.Close(fmt.Errorf("closed manually by test"))
309
310 conn, ok := <-connCh
311 if !ok {
312 t.Fatalf("Server didn't return connection object")
313 }
314 defer conn.Close()
315
316
317 time.Sleep(500 * time.Millisecond)
318
319 if err := checkForHealthyStream(client); err != nil {
320 t.Fatalf("Stream creation failed: %v", err)
321 }
322 }
323
324
325
326
327 func (s) TestKeepaliveClientClosesWithActiveStreams(t *testing.T) {
328 connCh := make(chan net.Conn, 1)
329 copts := ConnectOptions{
330 ChannelzParent: channelzSubChannel(t),
331 KeepaliveParams: keepalive.ClientParameters{
332 Time: 500 * time.Millisecond,
333 Timeout: 500 * time.Millisecond,
334 },
335 }
336
337
338 client, cancel := setUpWithNoPingServer(t, copts, connCh)
339 defer cancel()
340 defer client.Close(fmt.Errorf("closed manually by test"))
341
342 conn, ok := <-connCh
343 if !ok {
344 t.Fatalf("Server didn't return connection object")
345 }
346 defer conn.Close()
347
348 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
349 defer cancel()
350
351 if _, err := client.NewStream(ctx, &CallHdr{}); err != nil {
352 t.Fatalf("Stream creation failed: %v", err)
353 }
354
355 if err := pollForStreamCreationError(client); err != nil {
356 t.Fatal(err)
357 }
358 }
359
360
361
362
363 func (s) TestKeepaliveClientStaysHealthyWithResponsiveServer(t *testing.T) {
364 server, client, cancel := setUpWithOptions(t, 0,
365 &ServerConfig{
366 KeepalivePolicy: keepalive.EnforcementPolicy{
367 MinTime: 50 * time.Millisecond,
368 PermitWithoutStream: true,
369 },
370 },
371 normal,
372 ConnectOptions{
373 KeepaliveParams: keepalive.ClientParameters{
374 Time: 55 * time.Millisecond,
375 Timeout: time.Second,
376 PermitWithoutStream: true,
377 }})
378 defer func() {
379 client.Close(fmt.Errorf("closed manually by test"))
380 server.stop()
381 cancel()
382 }()
383
384
385 time.Sleep(500 * time.Millisecond)
386
387 if err := checkForHealthyStream(client); err != nil {
388 t.Fatalf("Stream creation failed: %v", err)
389 }
390 }
391
392
393
394
395
396
397
398
399 func (s) TestKeepaliveClientFrequency(t *testing.T) {
400 grpctest.TLogger.ExpectError("Client received GoAway with error code ENHANCE_YOUR_CALM and debug data equal to ASCII \"too_many_pings\"")
401
402 serverConfig := &ServerConfig{
403 KeepalivePolicy: keepalive.EnforcementPolicy{
404 MinTime: 100 * time.Millisecond,
405 PermitWithoutStream: true,
406 },
407 }
408 clientOptions := ConnectOptions{
409 KeepaliveParams: keepalive.ClientParameters{
410 Time: 50 * time.Millisecond,
411 Timeout: time.Second,
412 PermitWithoutStream: true,
413 },
414 }
415 server, client, cancel := setUpWithOptions(t, 0, serverConfig, normal, clientOptions)
416 defer func() {
417 client.Close(fmt.Errorf("closed manually by test"))
418 server.stop()
419 cancel()
420 }()
421
422 if err := waitForGoAwayTooManyPings(client); err != nil {
423 t.Fatal(err)
424 }
425 }
426
427
428
429
430
431 func (s) TestKeepaliveServerEnforcementWithAbusiveClientNoRPC(t *testing.T) {
432 grpctest.TLogger.ExpectError("Client received GoAway with error code ENHANCE_YOUR_CALM and debug data equal to ASCII \"too_many_pings\"")
433
434 serverConfig := &ServerConfig{
435 KeepalivePolicy: keepalive.EnforcementPolicy{
436 MinTime: time.Second,
437 },
438 }
439 clientOptions := ConnectOptions{
440 KeepaliveParams: keepalive.ClientParameters{
441 Time: 20 * time.Millisecond,
442 Timeout: 100 * time.Millisecond,
443 PermitWithoutStream: true,
444 },
445 }
446 server, client, cancel := setUpWithOptions(t, 0, serverConfig, normal, clientOptions)
447 defer func() {
448 client.Close(fmt.Errorf("closed manually by test"))
449 server.stop()
450 cancel()
451 }()
452
453 if err := waitForGoAwayTooManyPings(client); err != nil {
454 t.Fatal(err)
455 }
456 }
457
458
459
460
461
462 func (s) TestKeepaliveServerEnforcementWithAbusiveClientWithRPC(t *testing.T) {
463 grpctest.TLogger.ExpectError("Client received GoAway with error code ENHANCE_YOUR_CALM and debug data equal to ASCII \"too_many_pings\"")
464
465 serverConfig := &ServerConfig{
466 KeepalivePolicy: keepalive.EnforcementPolicy{
467 MinTime: time.Second,
468 },
469 }
470 clientOptions := ConnectOptions{
471 KeepaliveParams: keepalive.ClientParameters{
472 Time: 50 * time.Millisecond,
473 Timeout: 100 * time.Millisecond,
474 },
475 }
476 server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, clientOptions)
477 defer func() {
478 client.Close(fmt.Errorf("closed manually by test"))
479 server.stop()
480 cancel()
481 }()
482
483 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
484 defer cancel()
485 if _, err := client.NewStream(ctx, &CallHdr{}); err != nil {
486 t.Fatalf("Stream creation failed: %v", err)
487 }
488
489 if err := waitForGoAwayTooManyPings(client); err != nil {
490 t.Fatal(err)
491 }
492 }
493
494
495
496
497
498 func (s) TestKeepaliveServerEnforcementWithObeyingClientNoRPC(t *testing.T) {
499 serverConfig := &ServerConfig{
500 KeepalivePolicy: keepalive.EnforcementPolicy{
501 MinTime: 40 * time.Millisecond,
502 PermitWithoutStream: true,
503 },
504 }
505 clientOptions := ConnectOptions{
506 KeepaliveParams: keepalive.ClientParameters{
507 Time: 50 * time.Millisecond,
508 Timeout: time.Second,
509 PermitWithoutStream: true,
510 },
511 }
512 server, client, cancel := setUpWithOptions(t, 0, serverConfig, normal, clientOptions)
513 defer func() {
514 client.Close(fmt.Errorf("closed manually by test"))
515 server.stop()
516 cancel()
517 }()
518
519
520 time.Sleep(500 * time.Millisecond)
521
522
523 if err := checkForHealthyStream(client); err != nil {
524 t.Fatalf("Stream creation failed: %v", err)
525 }
526 }
527
528
529
530
531
532 func (s) TestKeepaliveServerEnforcementWithObeyingClientWithRPC(t *testing.T) {
533 serverConfig := &ServerConfig{
534 KeepalivePolicy: keepalive.EnforcementPolicy{
535 MinTime: 40 * time.Millisecond,
536 },
537 }
538 clientOptions := ConnectOptions{
539 KeepaliveParams: keepalive.ClientParameters{
540 Time: 50 * time.Millisecond,
541 },
542 }
543 server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, clientOptions)
544 defer func() {
545 client.Close(fmt.Errorf("closed manually by test"))
546 server.stop()
547 cancel()
548 }()
549
550 if err := checkForHealthyStream(client); err != nil {
551 t.Fatalf("Stream creation failed: %v", err)
552 }
553
554
555 time.Sleep(500 * time.Millisecond)
556
557 if err := checkForHealthyStream(client); err != nil {
558 t.Fatalf("Stream creation failed: %v", err)
559 }
560 }
561
562
563
564
565
566
567
568 func (s) TestKeepaliveServerEnforcementWithDormantKeepaliveOnClient(t *testing.T) {
569 serverConfig := &ServerConfig{
570 KeepalivePolicy: keepalive.EnforcementPolicy{
571 MinTime: 100 * time.Millisecond,
572 },
573 }
574 clientOptions := ConnectOptions{
575 KeepaliveParams: keepalive.ClientParameters{
576 Time: 10 * time.Millisecond,
577 Timeout: 10 * time.Millisecond,
578 },
579 }
580 server, client, cancel := setUpWithOptions(t, 0, serverConfig, normal, clientOptions)
581 defer func() {
582 client.Close(fmt.Errorf("closed manually by test"))
583 server.stop()
584 cancel()
585 }()
586
587
588 time.Sleep(500 * time.Millisecond)
589
590 if err := checkForHealthyStream(client); err != nil {
591 t.Fatalf("Stream creation failed: %v", err)
592 }
593 }
594
595
596
597 func (s) TestTCPUserTimeout(t *testing.T) {
598 tests := []struct {
599 tls bool
600 time time.Duration
601 timeout time.Duration
602 clientWantTimeout time.Duration
603 serverWantTimeout time.Duration
604 }{
605 {
606 false,
607 10 * time.Second,
608 10 * time.Second,
609 10 * 1000 * time.Millisecond,
610 10 * 1000 * time.Millisecond,
611 },
612 {
613 false,
614 0,
615 0,
616 0,
617 20 * 1000 * time.Millisecond,
618 },
619 {
620 false,
621 infinity,
622 infinity,
623 0,
624 0,
625 },
626 {
627 true,
628 10 * time.Second,
629 10 * time.Second,
630 10 * 1000 * time.Millisecond,
631 10 * 1000 * time.Millisecond,
632 },
633 {
634 true,
635 0,
636 0,
637 0,
638 20 * 1000 * time.Millisecond,
639 },
640 {
641 true,
642 infinity,
643 infinity,
644 0,
645 0,
646 },
647 }
648 for _, tt := range tests {
649 sopts := &ServerConfig{
650 KeepaliveParams: keepalive.ServerParameters{
651 Time: tt.time,
652 Timeout: tt.timeout,
653 },
654 }
655
656 copts := ConnectOptions{
657 KeepaliveParams: keepalive.ClientParameters{
658 Time: tt.time,
659 Timeout: tt.timeout,
660 },
661 }
662
663 if tt.tls {
664 copts.TransportCredentials = makeTLSCreds(t, "x509/client1_cert.pem", "x509/client1_key.pem", "x509/server_ca_cert.pem")
665 sopts.Credentials = makeTLSCreds(t, "x509/server1_cert.pem", "x509/server1_key.pem", "x509/client_ca_cert.pem")
666
667 }
668
669 server, client, cancel := setUpWithOptions(
670 t,
671 0,
672 sopts,
673 normal,
674 copts,
675 )
676 defer func() {
677 client.Close(fmt.Errorf("closed manually by test"))
678 server.stop()
679 cancel()
680 }()
681
682 var sc *http2Server
683 var srawConn net.Conn
684
685 for {
686 server.mu.Lock()
687 if len(server.conns) == 0 {
688 server.mu.Unlock()
689 time.Sleep(time.Millisecond)
690 continue
691 }
692 for k := range server.conns {
693 var ok bool
694 sc, ok = k.(*http2Server)
695 if !ok {
696 t.Fatalf("Failed to convert %v to *http2Server", k)
697 }
698 srawConn = server.conns[k]
699 }
700 server.mu.Unlock()
701 break
702 }
703
704 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
705 defer cancel()
706 stream, err := client.NewStream(ctx, &CallHdr{})
707 if err != nil {
708 t.Fatalf("client.NewStream() failed: %v", err)
709 }
710 client.CloseStream(stream, io.EOF)
711
712
713
714 if !tt.tls {
715 cltOpt, err := syscall.GetTCPUserTimeout(client.conn)
716 if err != nil {
717 t.Fatalf("syscall.GetTCPUserTimeout() failed: %v", err)
718 }
719 if cltOpt < 0 {
720 t.Skipf("skipping test on unsupported environment")
721 }
722 if gotTimeout := time.Duration(cltOpt) * time.Millisecond; gotTimeout != tt.clientWantTimeout {
723 t.Fatalf("syscall.GetTCPUserTimeout() = %d, want %d", gotTimeout, tt.clientWantTimeout)
724 }
725 }
726 scConn := sc.conn
727 if tt.tls {
728 if _, ok := sc.conn.(*net.TCPConn); ok {
729 t.Fatalf("sc.conn is should have wrapped conn with TLS")
730 }
731 scConn = srawConn
732 }
733
734 if _, ok := scConn.(*net.TCPConn); !ok {
735 t.Fatalf("server underlying conn is of type %T, want net.TCPConn", scConn)
736 }
737 srvOpt, err := syscall.GetTCPUserTimeout(scConn)
738 if err != nil {
739 t.Fatalf("syscall.GetTCPUserTimeout() failed: %v", err)
740 }
741 if gotTimeout := time.Duration(srvOpt) * time.Millisecond; gotTimeout != tt.serverWantTimeout {
742 t.Fatalf("syscall.GetTCPUserTimeout() = %d, want %d", gotTimeout, tt.serverWantTimeout)
743 }
744
745 }
746 }
747
748 func makeTLSCreds(t *testing.T, certPath, keyPath, rootsPath string) credentials.TransportCredentials {
749 cert, err := tls.LoadX509KeyPair(testdata.Path(certPath), testdata.Path(keyPath))
750 if err != nil {
751 t.Fatalf("tls.LoadX509KeyPair(%q, %q) failed: %v", certPath, keyPath, err)
752 }
753 b, err := os.ReadFile(testdata.Path(rootsPath))
754 if err != nil {
755 t.Fatalf("os.ReadFile(%q) failed: %v", rootsPath, err)
756 }
757 roots := x509.NewCertPool()
758 if !roots.AppendCertsFromPEM(b) {
759 t.Fatal("failed to append certificates")
760 }
761 return credentials.NewTLS(&tls.Config{
762 Certificates: []tls.Certificate{cert},
763 RootCAs: roots,
764 InsecureSkipVerify: true,
765 })
766 }
767
768
769
770 func checkForHealthyStream(client *http2Client) error {
771 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
772 defer cancel()
773 stream, err := client.NewStream(ctx, &CallHdr{})
774 client.CloseStream(stream, err)
775 return err
776 }
777
778 func pollForStreamCreationError(client *http2Client) error {
779 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
780 defer cancel()
781 for {
782 if _, err := client.NewStream(ctx, &CallHdr{}); err != nil {
783 break
784 }
785 time.Sleep(50 * time.Millisecond)
786 }
787 if ctx.Err() != nil {
788 return fmt.Errorf("test timed out before stream creation returned an error")
789 }
790 return nil
791 }
792
793
794
795
796 func waitForGoAwayTooManyPings(client *http2Client) error {
797 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
798 defer cancel()
799 select {
800 case <-client.GoAway():
801 if reason, _ := client.GetGoAwayReason(); reason != GoAwayTooManyPings {
802 return fmt.Errorf("goAwayReason is %v, want %v", reason, GoAwayTooManyPings)
803 }
804 case <-ctx.Done():
805 return fmt.Errorf("test timed out before getting GoAway with reason:GoAwayTooManyPings from server")
806 }
807
808 if _, err := client.NewStream(ctx, &CallHdr{}); err == nil {
809 return fmt.Errorf("stream creation succeeded after receiving a GoAway from the server")
810 }
811 return nil
812 }
813
View as plain text