...

Source file src/google.golang.org/grpc/internal/transport/keepalive_test.go

Documentation: google.golang.org/grpc/internal/transport

     1  /*
     2   *
     3   * Copyright 2019 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  // This file contains tests related to the following proposals:
    20  // https://github.com/grpc/proposal/blob/master/A8-client-side-keepalive.md
    21  // https://github.com/grpc/proposal/blob/master/A9-server-side-conn-mgt.md
    22  // https://github.com/grpc/proposal/blob/master/A18-tcp-user-timeout.md
    23  package transport
    24  
    25  import (
    26  	"context"
    27  	"crypto/tls"
    28  	"crypto/x509"
    29  	"fmt"
    30  	"io"
    31  	"net"
    32  	"os"
    33  	"strings"
    34  	"testing"
    35  	"time"
    36  
    37  	"golang.org/x/net/http2"
    38  	"google.golang.org/grpc/credentials"
    39  	"google.golang.org/grpc/internal/channelz"
    40  	"google.golang.org/grpc/internal/grpctest"
    41  	"google.golang.org/grpc/internal/syscall"
    42  	"google.golang.org/grpc/keepalive"
    43  	"google.golang.org/grpc/testdata"
    44  )
    45  
    46  const defaultTestTimeout = 10 * time.Second
    47  
    48  // TestMaxConnectionIdle tests that a server will send GoAway to an idle
    49  // client. An idle client is one who doesn't make any RPC calls for a duration
    50  // of MaxConnectionIdle time.
    51  func (s) TestMaxConnectionIdle(t *testing.T) {
    52  	serverConfig := &ServerConfig{
    53  		KeepaliveParams: keepalive.ServerParameters{
    54  			MaxConnectionIdle: 30 * time.Millisecond,
    55  		},
    56  	}
    57  	server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
    58  	defer func() {
    59  		client.Close(fmt.Errorf("closed manually by test"))
    60  		server.stop()
    61  		cancel()
    62  	}()
    63  
    64  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
    65  	defer cancel()
    66  	stream, err := client.NewStream(ctx, &CallHdr{})
    67  	if err != nil {
    68  		t.Fatalf("client.NewStream() failed: %v", err)
    69  	}
    70  	client.CloseStream(stream, io.EOF)
    71  
    72  	// Verify the server sends a GoAway to client after MaxConnectionIdle timeout
    73  	// kicks in.
    74  	select {
    75  	case <-ctx.Done():
    76  		t.Fatalf("context expired before receiving GoAway from the server.")
    77  	case <-client.GoAway():
    78  		reason, debugMsg := client.GetGoAwayReason()
    79  		if reason != GoAwayNoReason {
    80  			t.Fatalf("GoAwayReason is %v, want %v", reason, GoAwayNoReason)
    81  		}
    82  		if !strings.Contains(debugMsg, "max_idle") {
    83  			t.Fatalf("GoAwayDebugMessage is %v, want %v", debugMsg, "max_idle")
    84  		}
    85  	}
    86  }
    87  
    88  // TestMaxConnectionIdleBusyClient tests that a server will not send GoAway to
    89  // a busy client.
    90  func (s) TestMaxConnectionIdleBusyClient(t *testing.T) {
    91  	serverConfig := &ServerConfig{
    92  		KeepaliveParams: keepalive.ServerParameters{
    93  			MaxConnectionIdle: 100 * time.Millisecond,
    94  		},
    95  	}
    96  	server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
    97  	defer func() {
    98  		client.Close(fmt.Errorf("closed manually by test"))
    99  		server.stop()
   100  		cancel()
   101  	}()
   102  
   103  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   104  	defer cancel()
   105  	_, err := client.NewStream(ctx, &CallHdr{})
   106  	if err != nil {
   107  		t.Fatalf("client.NewStream() failed: %v", err)
   108  	}
   109  
   110  	// Verify the server does not send a GoAway to client even after MaxConnectionIdle
   111  	// timeout kicks in.
   112  	ctx, cancel = context.WithTimeout(context.Background(), time.Second)
   113  	defer cancel()
   114  	select {
   115  	case <-client.GoAway():
   116  		t.Fatalf("A busy client received a GoAway.")
   117  	case <-ctx.Done():
   118  	}
   119  }
   120  
   121  // TestMaxConnectionAge tests that a server will send GoAway after a duration
   122  // of MaxConnectionAge.
   123  func (s) TestMaxConnectionAge(t *testing.T) {
   124  	maxConnAge := 100 * time.Millisecond
   125  	serverConfig := &ServerConfig{
   126  		KeepaliveParams: keepalive.ServerParameters{
   127  			MaxConnectionAge:      maxConnAge,
   128  			MaxConnectionAgeGrace: 10 * time.Millisecond,
   129  		},
   130  	}
   131  	server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
   132  	defer func() {
   133  		client.Close(fmt.Errorf("closed manually by test"))
   134  		server.stop()
   135  		cancel()
   136  	}()
   137  
   138  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   139  	defer cancel()
   140  	if _, err := client.NewStream(ctx, &CallHdr{}); err != nil {
   141  		t.Fatalf("client.NewStream() failed: %v", err)
   142  	}
   143  
   144  	// Verify the server sends a GoAway to client even after client remains idle
   145  	// for more than MaxConnectionIdle time.
   146  	select {
   147  	case <-client.GoAway():
   148  		reason, debugMsg := client.GetGoAwayReason()
   149  		if reason != GoAwayNoReason {
   150  			t.Fatalf("GoAwayReason is %v, want %v", reason, GoAwayNoReason)
   151  		}
   152  		if !strings.Contains(debugMsg, "max_age") {
   153  			t.Fatalf("GoAwayDebugMessage is %v, want %v", debugMsg, "max_age")
   154  		}
   155  	case <-ctx.Done():
   156  		t.Fatalf("timed out before getting a GoAway from the server.")
   157  	}
   158  }
   159  
   160  const (
   161  	defaultWriteBufSize = 32 * 1024
   162  	defaultReadBufSize  = 32 * 1024
   163  )
   164  
   165  // TestKeepaliveServerClosesUnresponsiveClient tests that a server closes
   166  // the connection with a client that doesn't respond to keepalive pings.
   167  //
   168  // This test creates a regular net.Conn connection to the server and sends the
   169  // clientPreface and the initial Settings frame, and then remains unresponsive.
   170  func (s) TestKeepaliveServerClosesUnresponsiveClient(t *testing.T) {
   171  	serverConfig := &ServerConfig{
   172  		KeepaliveParams: keepalive.ServerParameters{
   173  			Time:    100 * time.Millisecond,
   174  			Timeout: 10 * time.Millisecond,
   175  		},
   176  	}
   177  	server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
   178  	defer func() {
   179  		client.Close(fmt.Errorf("closed manually by test"))
   180  		server.stop()
   181  		cancel()
   182  	}()
   183  
   184  	addr := server.addr()
   185  	conn, err := net.Dial("tcp", addr)
   186  	if err != nil {
   187  		t.Fatalf("net.Dial(tcp, %v) failed: %v", addr, err)
   188  	}
   189  	defer conn.Close()
   190  
   191  	if n, err := conn.Write(clientPreface); err != nil || n != len(clientPreface) {
   192  		t.Fatalf("conn.Write(clientPreface) failed: n=%v, err=%v", n, err)
   193  	}
   194  	framer := newFramer(conn, defaultWriteBufSize, defaultReadBufSize, false, 0)
   195  	if err := framer.fr.WriteSettings(http2.Setting{}); err != nil {
   196  		t.Fatal("framer.WriteSettings(http2.Setting{}) failed:", err)
   197  	}
   198  	framer.writer.Flush()
   199  
   200  	// We read from the net.Conn till we get an error, which is expected when
   201  	// the server closes the connection as part of the keepalive logic.
   202  	errCh := make(chan error, 1)
   203  	go func() {
   204  		b := make([]byte, 24)
   205  		for {
   206  			if _, err = conn.Read(b); err != nil {
   207  				errCh <- err
   208  				return
   209  			}
   210  		}
   211  	}()
   212  
   213  	// Server waits for KeepaliveParams.Time seconds before sending out a ping,
   214  	// and then waits for KeepaliveParams.Timeout for a ping ack.
   215  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   216  	defer cancel()
   217  	select {
   218  	case err := <-errCh:
   219  		if err != io.EOF {
   220  			t.Fatalf("client.Read(_) = _,%v, want io.EOF", err)
   221  
   222  		}
   223  	case <-ctx.Done():
   224  		t.Fatalf("Test timed out before server closed the connection.")
   225  	}
   226  }
   227  
   228  // TestKeepaliveServerWithResponsiveClient tests that a server doesn't close
   229  // the connection with a client that responds to keepalive pings.
   230  func (s) TestKeepaliveServerWithResponsiveClient(t *testing.T) {
   231  	serverConfig := &ServerConfig{
   232  		KeepaliveParams: keepalive.ServerParameters{
   233  			Time:    100 * time.Millisecond,
   234  			Timeout: 100 * time.Millisecond,
   235  		},
   236  	}
   237  	server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
   238  	defer func() {
   239  		client.Close(fmt.Errorf("closed manually by test"))
   240  		server.stop()
   241  		cancel()
   242  	}()
   243  
   244  	// Give keepalive logic some time by sleeping.
   245  	time.Sleep(500 * time.Millisecond)
   246  
   247  	if err := checkForHealthyStream(client); err != nil {
   248  		t.Fatalf("Stream creation failed: %v", err)
   249  	}
   250  }
   251  
   252  func channelzSubChannel(t *testing.T) *channelz.SubChannel {
   253  	ch := channelz.RegisterChannel(nil, "test chan")
   254  	sc := channelz.RegisterSubChannel(ch, "test subchan")
   255  	t.Cleanup(func() {
   256  		channelz.RemoveEntry(sc.ID)
   257  		channelz.RemoveEntry(ch.ID)
   258  	})
   259  	return sc
   260  }
   261  
   262  // TestKeepaliveClientClosesUnresponsiveServer creates a server which does not
   263  // respond to keepalive pings, and makes sure that the client closes the
   264  // transport once the keepalive logic kicks in. Here, we set the
   265  // `PermitWithoutStream` parameter to true which ensures that the keepalive
   266  // logic is running even without any active streams.
   267  func (s) TestKeepaliveClientClosesUnresponsiveServer(t *testing.T) {
   268  	connCh := make(chan net.Conn, 1)
   269  	copts := ConnectOptions{
   270  		ChannelzParent: channelzSubChannel(t),
   271  		KeepaliveParams: keepalive.ClientParameters{
   272  			Time:                10 * time.Millisecond,
   273  			Timeout:             10 * time.Millisecond,
   274  			PermitWithoutStream: true,
   275  		},
   276  	}
   277  	client, cancel := setUpWithNoPingServer(t, copts, connCh)
   278  	defer cancel()
   279  	defer client.Close(fmt.Errorf("closed manually by test"))
   280  
   281  	conn, ok := <-connCh
   282  	if !ok {
   283  		t.Fatalf("Server didn't return connection object")
   284  	}
   285  	defer conn.Close()
   286  
   287  	if err := pollForStreamCreationError(client); err != nil {
   288  		t.Fatal(err)
   289  	}
   290  }
   291  
   292  // TestKeepaliveClientOpenWithUnresponsiveServer creates a server which does
   293  // not respond to keepalive pings, and makes sure that the client does not
   294  // close the transport. Here, we do not set the `PermitWithoutStream` parameter
   295  // to true which ensures that the keepalive logic is turned off without any
   296  // active streams, and therefore the transport stays open.
   297  func (s) TestKeepaliveClientOpenWithUnresponsiveServer(t *testing.T) {
   298  	connCh := make(chan net.Conn, 1)
   299  	copts := ConnectOptions{
   300  		ChannelzParent: channelzSubChannel(t),
   301  		KeepaliveParams: keepalive.ClientParameters{
   302  			Time:    10 * time.Millisecond,
   303  			Timeout: 10 * time.Millisecond,
   304  		},
   305  	}
   306  	client, cancel := setUpWithNoPingServer(t, copts, connCh)
   307  	defer cancel()
   308  	defer client.Close(fmt.Errorf("closed manually by test"))
   309  
   310  	conn, ok := <-connCh
   311  	if !ok {
   312  		t.Fatalf("Server didn't return connection object")
   313  	}
   314  	defer conn.Close()
   315  
   316  	// Give keepalive some time.
   317  	time.Sleep(500 * time.Millisecond)
   318  
   319  	if err := checkForHealthyStream(client); err != nil {
   320  		t.Fatalf("Stream creation failed: %v", err)
   321  	}
   322  }
   323  
   324  // TestKeepaliveClientClosesWithActiveStreams creates a server which does not
   325  // respond to keepalive pings, and makes sure that the client closes the
   326  // transport even when there is an active stream.
   327  func (s) TestKeepaliveClientClosesWithActiveStreams(t *testing.T) {
   328  	connCh := make(chan net.Conn, 1)
   329  	copts := ConnectOptions{
   330  		ChannelzParent: channelzSubChannel(t),
   331  		KeepaliveParams: keepalive.ClientParameters{
   332  			Time:    500 * time.Millisecond,
   333  			Timeout: 500 * time.Millisecond,
   334  		},
   335  	}
   336  	// TODO(i/6099): Setup a server which can ping and no-ping based on a flag to
   337  	// reduce the flakiness in this test.
   338  	client, cancel := setUpWithNoPingServer(t, copts, connCh)
   339  	defer cancel()
   340  	defer client.Close(fmt.Errorf("closed manually by test"))
   341  
   342  	conn, ok := <-connCh
   343  	if !ok {
   344  		t.Fatalf("Server didn't return connection object")
   345  	}
   346  	defer conn.Close()
   347  
   348  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   349  	defer cancel()
   350  	// Create a stream, but send no data on it.
   351  	if _, err := client.NewStream(ctx, &CallHdr{}); err != nil {
   352  		t.Fatalf("Stream creation failed: %v", err)
   353  	}
   354  
   355  	if err := pollForStreamCreationError(client); err != nil {
   356  		t.Fatal(err)
   357  	}
   358  }
   359  
   360  // TestKeepaliveClientStaysHealthyWithResponsiveServer creates a server which
   361  // responds to keepalive pings, and makes sure than a client transport stays
   362  // healthy without any active streams.
   363  func (s) TestKeepaliveClientStaysHealthyWithResponsiveServer(t *testing.T) {
   364  	server, client, cancel := setUpWithOptions(t, 0,
   365  		&ServerConfig{
   366  			KeepalivePolicy: keepalive.EnforcementPolicy{
   367  				MinTime:             50 * time.Millisecond,
   368  				PermitWithoutStream: true,
   369  			},
   370  		},
   371  		normal,
   372  		ConnectOptions{
   373  			KeepaliveParams: keepalive.ClientParameters{
   374  				Time:                55 * time.Millisecond,
   375  				Timeout:             time.Second,
   376  				PermitWithoutStream: true,
   377  			}})
   378  	defer func() {
   379  		client.Close(fmt.Errorf("closed manually by test"))
   380  		server.stop()
   381  		cancel()
   382  	}()
   383  
   384  	// Give keepalive some time.
   385  	time.Sleep(500 * time.Millisecond)
   386  
   387  	if err := checkForHealthyStream(client); err != nil {
   388  		t.Fatalf("Stream creation failed: %v", err)
   389  	}
   390  }
   391  
   392  // TestKeepaliveClientFrequency creates a server which expects at most 1 client
   393  // ping for every 100 ms, while the client is configured to send a ping
   394  // every 50 ms. So, this configuration should end up with the client
   395  // transport being closed. But we had a bug wherein the client was sending one
   396  // ping every [Time+Timeout] instead of every [Time] period, and this test
   397  // explicitly makes sure the fix works and the client sends a ping every [Time]
   398  // period.
   399  func (s) TestKeepaliveClientFrequency(t *testing.T) {
   400  	grpctest.TLogger.ExpectError("Client received GoAway with error code ENHANCE_YOUR_CALM and debug data equal to ASCII \"too_many_pings\"")
   401  
   402  	serverConfig := &ServerConfig{
   403  		KeepalivePolicy: keepalive.EnforcementPolicy{
   404  			MinTime:             100 * time.Millisecond,
   405  			PermitWithoutStream: true,
   406  		},
   407  	}
   408  	clientOptions := ConnectOptions{
   409  		KeepaliveParams: keepalive.ClientParameters{
   410  			Time:                50 * time.Millisecond,
   411  			Timeout:             time.Second,
   412  			PermitWithoutStream: true,
   413  		},
   414  	}
   415  	server, client, cancel := setUpWithOptions(t, 0, serverConfig, normal, clientOptions)
   416  	defer func() {
   417  		client.Close(fmt.Errorf("closed manually by test"))
   418  		server.stop()
   419  		cancel()
   420  	}()
   421  
   422  	if err := waitForGoAwayTooManyPings(client); err != nil {
   423  		t.Fatal(err)
   424  	}
   425  }
   426  
   427  // TestKeepaliveServerEnforcementWithAbusiveClientNoRPC verifies that the
   428  // server closes a client transport when it sends too many keepalive pings
   429  // (when there are no active streams), based on the configured
   430  // EnforcementPolicy.
   431  func (s) TestKeepaliveServerEnforcementWithAbusiveClientNoRPC(t *testing.T) {
   432  	grpctest.TLogger.ExpectError("Client received GoAway with error code ENHANCE_YOUR_CALM and debug data equal to ASCII \"too_many_pings\"")
   433  
   434  	serverConfig := &ServerConfig{
   435  		KeepalivePolicy: keepalive.EnforcementPolicy{
   436  			MinTime: time.Second,
   437  		},
   438  	}
   439  	clientOptions := ConnectOptions{
   440  		KeepaliveParams: keepalive.ClientParameters{
   441  			Time:                20 * time.Millisecond,
   442  			Timeout:             100 * time.Millisecond,
   443  			PermitWithoutStream: true,
   444  		},
   445  	}
   446  	server, client, cancel := setUpWithOptions(t, 0, serverConfig, normal, clientOptions)
   447  	defer func() {
   448  		client.Close(fmt.Errorf("closed manually by test"))
   449  		server.stop()
   450  		cancel()
   451  	}()
   452  
   453  	if err := waitForGoAwayTooManyPings(client); err != nil {
   454  		t.Fatal(err)
   455  	}
   456  }
   457  
   458  // TestKeepaliveServerEnforcementWithAbusiveClientWithRPC verifies that the
   459  // server closes a client transport when it sends too many keepalive pings
   460  // (even when there is an active stream), based on the configured
   461  // EnforcementPolicy.
   462  func (s) TestKeepaliveServerEnforcementWithAbusiveClientWithRPC(t *testing.T) {
   463  	grpctest.TLogger.ExpectError("Client received GoAway with error code ENHANCE_YOUR_CALM and debug data equal to ASCII \"too_many_pings\"")
   464  
   465  	serverConfig := &ServerConfig{
   466  		KeepalivePolicy: keepalive.EnforcementPolicy{
   467  			MinTime: time.Second,
   468  		},
   469  	}
   470  	clientOptions := ConnectOptions{
   471  		KeepaliveParams: keepalive.ClientParameters{
   472  			Time:    50 * time.Millisecond,
   473  			Timeout: 100 * time.Millisecond,
   474  		},
   475  	}
   476  	server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, clientOptions)
   477  	defer func() {
   478  		client.Close(fmt.Errorf("closed manually by test"))
   479  		server.stop()
   480  		cancel()
   481  	}()
   482  
   483  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   484  	defer cancel()
   485  	if _, err := client.NewStream(ctx, &CallHdr{}); err != nil {
   486  		t.Fatalf("Stream creation failed: %v", err)
   487  	}
   488  
   489  	if err := waitForGoAwayTooManyPings(client); err != nil {
   490  		t.Fatal(err)
   491  	}
   492  }
   493  
   494  // TestKeepaliveServerEnforcementWithObeyingClientNoRPC verifies that the
   495  // server does not close a client transport (with no active streams) which
   496  // sends keepalive pings in accordance to the configured keepalive
   497  // EnforcementPolicy.
   498  func (s) TestKeepaliveServerEnforcementWithObeyingClientNoRPC(t *testing.T) {
   499  	serverConfig := &ServerConfig{
   500  		KeepalivePolicy: keepalive.EnforcementPolicy{
   501  			MinTime:             40 * time.Millisecond,
   502  			PermitWithoutStream: true,
   503  		},
   504  	}
   505  	clientOptions := ConnectOptions{
   506  		KeepaliveParams: keepalive.ClientParameters{
   507  			Time:                50 * time.Millisecond,
   508  			Timeout:             time.Second,
   509  			PermitWithoutStream: true,
   510  		},
   511  	}
   512  	server, client, cancel := setUpWithOptions(t, 0, serverConfig, normal, clientOptions)
   513  	defer func() {
   514  		client.Close(fmt.Errorf("closed manually by test"))
   515  		server.stop()
   516  		cancel()
   517  	}()
   518  
   519  	// Sleep for client to send ~10 keepalive pings.
   520  	time.Sleep(500 * time.Millisecond)
   521  
   522  	// Verify that the server does not close the client transport.
   523  	if err := checkForHealthyStream(client); err != nil {
   524  		t.Fatalf("Stream creation failed: %v", err)
   525  	}
   526  }
   527  
   528  // TestKeepaliveServerEnforcementWithObeyingClientWithRPC verifies that the
   529  // server does not close a client transport (with active streams) which
   530  // sends keepalive pings in accordance to the configured keepalive
   531  // EnforcementPolicy.
   532  func (s) TestKeepaliveServerEnforcementWithObeyingClientWithRPC(t *testing.T) {
   533  	serverConfig := &ServerConfig{
   534  		KeepalivePolicy: keepalive.EnforcementPolicy{
   535  			MinTime: 40 * time.Millisecond,
   536  		},
   537  	}
   538  	clientOptions := ConnectOptions{
   539  		KeepaliveParams: keepalive.ClientParameters{
   540  			Time: 50 * time.Millisecond,
   541  		},
   542  	}
   543  	server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, clientOptions)
   544  	defer func() {
   545  		client.Close(fmt.Errorf("closed manually by test"))
   546  		server.stop()
   547  		cancel()
   548  	}()
   549  
   550  	if err := checkForHealthyStream(client); err != nil {
   551  		t.Fatalf("Stream creation failed: %v", err)
   552  	}
   553  
   554  	// Give keepalive enough time.
   555  	time.Sleep(500 * time.Millisecond)
   556  
   557  	if err := checkForHealthyStream(client); err != nil {
   558  		t.Fatalf("Stream creation failed: %v", err)
   559  	}
   560  }
   561  
   562  // TestKeepaliveServerEnforcementWithDormantKeepaliveOnClient verifies that the
   563  // server does not closes a client transport, which has been configured to send
   564  // more pings than allowed by the server's EnforcementPolicy. This client
   565  // transport does not have any active streams and `PermitWithoutStream` is set
   566  // to false. This should ensure that the keepalive functionality on the client
   567  // side enters a dormant state.
   568  func (s) TestKeepaliveServerEnforcementWithDormantKeepaliveOnClient(t *testing.T) {
   569  	serverConfig := &ServerConfig{
   570  		KeepalivePolicy: keepalive.EnforcementPolicy{
   571  			MinTime: 100 * time.Millisecond,
   572  		},
   573  	}
   574  	clientOptions := ConnectOptions{
   575  		KeepaliveParams: keepalive.ClientParameters{
   576  			Time:    10 * time.Millisecond,
   577  			Timeout: 10 * time.Millisecond,
   578  		},
   579  	}
   580  	server, client, cancel := setUpWithOptions(t, 0, serverConfig, normal, clientOptions)
   581  	defer func() {
   582  		client.Close(fmt.Errorf("closed manually by test"))
   583  		server.stop()
   584  		cancel()
   585  	}()
   586  
   587  	// No active streams on the client. Give keepalive enough time.
   588  	time.Sleep(500 * time.Millisecond)
   589  
   590  	if err := checkForHealthyStream(client); err != nil {
   591  		t.Fatalf("Stream creation failed: %v", err)
   592  	}
   593  }
   594  
   595  // TestTCPUserTimeout tests that the TCP_USER_TIMEOUT socket option is set to
   596  // the keepalive timeout, as detailed in proposal A18.
   597  func (s) TestTCPUserTimeout(t *testing.T) {
   598  	tests := []struct {
   599  		tls               bool
   600  		time              time.Duration
   601  		timeout           time.Duration
   602  		clientWantTimeout time.Duration
   603  		serverWantTimeout time.Duration
   604  	}{
   605  		{
   606  			false,
   607  			10 * time.Second,
   608  			10 * time.Second,
   609  			10 * 1000 * time.Millisecond,
   610  			10 * 1000 * time.Millisecond,
   611  		},
   612  		{
   613  			false,
   614  			0,
   615  			0,
   616  			0,
   617  			20 * 1000 * time.Millisecond,
   618  		},
   619  		{
   620  			false,
   621  			infinity,
   622  			infinity,
   623  			0,
   624  			0,
   625  		},
   626  		{
   627  			true,
   628  			10 * time.Second,
   629  			10 * time.Second,
   630  			10 * 1000 * time.Millisecond,
   631  			10 * 1000 * time.Millisecond,
   632  		},
   633  		{
   634  			true,
   635  			0,
   636  			0,
   637  			0,
   638  			20 * 1000 * time.Millisecond,
   639  		},
   640  		{
   641  			true,
   642  			infinity,
   643  			infinity,
   644  			0,
   645  			0,
   646  		},
   647  	}
   648  	for _, tt := range tests {
   649  		sopts := &ServerConfig{
   650  			KeepaliveParams: keepalive.ServerParameters{
   651  				Time:    tt.time,
   652  				Timeout: tt.timeout,
   653  			},
   654  		}
   655  
   656  		copts := ConnectOptions{
   657  			KeepaliveParams: keepalive.ClientParameters{
   658  				Time:    tt.time,
   659  				Timeout: tt.timeout,
   660  			},
   661  		}
   662  
   663  		if tt.tls {
   664  			copts.TransportCredentials = makeTLSCreds(t, "x509/client1_cert.pem", "x509/client1_key.pem", "x509/server_ca_cert.pem")
   665  			sopts.Credentials = makeTLSCreds(t, "x509/server1_cert.pem", "x509/server1_key.pem", "x509/client_ca_cert.pem")
   666  
   667  		}
   668  
   669  		server, client, cancel := setUpWithOptions(
   670  			t,
   671  			0,
   672  			sopts,
   673  			normal,
   674  			copts,
   675  		)
   676  		defer func() {
   677  			client.Close(fmt.Errorf("closed manually by test"))
   678  			server.stop()
   679  			cancel()
   680  		}()
   681  
   682  		var sc *http2Server
   683  		var srawConn net.Conn
   684  		// Wait until the server transport is setup.
   685  		for {
   686  			server.mu.Lock()
   687  			if len(server.conns) == 0 {
   688  				server.mu.Unlock()
   689  				time.Sleep(time.Millisecond)
   690  				continue
   691  			}
   692  			for k := range server.conns {
   693  				var ok bool
   694  				sc, ok = k.(*http2Server)
   695  				if !ok {
   696  					t.Fatalf("Failed to convert %v to *http2Server", k)
   697  				}
   698  				srawConn = server.conns[k]
   699  			}
   700  			server.mu.Unlock()
   701  			break
   702  		}
   703  
   704  		ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   705  		defer cancel()
   706  		stream, err := client.NewStream(ctx, &CallHdr{})
   707  		if err != nil {
   708  			t.Fatalf("client.NewStream() failed: %v", err)
   709  		}
   710  		client.CloseStream(stream, io.EOF)
   711  
   712  		// check client TCP user timeout only when non TLS
   713  		// TODO : find a way to get the underlying conn for client when TLS
   714  		if !tt.tls {
   715  			cltOpt, err := syscall.GetTCPUserTimeout(client.conn)
   716  			if err != nil {
   717  				t.Fatalf("syscall.GetTCPUserTimeout() failed: %v", err)
   718  			}
   719  			if cltOpt < 0 {
   720  				t.Skipf("skipping test on unsupported environment")
   721  			}
   722  			if gotTimeout := time.Duration(cltOpt) * time.Millisecond; gotTimeout != tt.clientWantTimeout {
   723  				t.Fatalf("syscall.GetTCPUserTimeout() = %d, want %d", gotTimeout, tt.clientWantTimeout)
   724  			}
   725  		}
   726  		scConn := sc.conn
   727  		if tt.tls {
   728  			if _, ok := sc.conn.(*net.TCPConn); ok {
   729  				t.Fatalf("sc.conn is should have wrapped conn with TLS")
   730  			}
   731  			scConn = srawConn
   732  		}
   733  		// verify the type of scConn (on which TCP user timeout will be got)
   734  		if _, ok := scConn.(*net.TCPConn); !ok {
   735  			t.Fatalf("server underlying conn is of type %T, want net.TCPConn", scConn)
   736  		}
   737  		srvOpt, err := syscall.GetTCPUserTimeout(scConn)
   738  		if err != nil {
   739  			t.Fatalf("syscall.GetTCPUserTimeout() failed: %v", err)
   740  		}
   741  		if gotTimeout := time.Duration(srvOpt) * time.Millisecond; gotTimeout != tt.serverWantTimeout {
   742  			t.Fatalf("syscall.GetTCPUserTimeout() = %d, want %d", gotTimeout, tt.serverWantTimeout)
   743  		}
   744  
   745  	}
   746  }
   747  
   748  func makeTLSCreds(t *testing.T, certPath, keyPath, rootsPath string) credentials.TransportCredentials {
   749  	cert, err := tls.LoadX509KeyPair(testdata.Path(certPath), testdata.Path(keyPath))
   750  	if err != nil {
   751  		t.Fatalf("tls.LoadX509KeyPair(%q, %q) failed: %v", certPath, keyPath, err)
   752  	}
   753  	b, err := os.ReadFile(testdata.Path(rootsPath))
   754  	if err != nil {
   755  		t.Fatalf("os.ReadFile(%q) failed: %v", rootsPath, err)
   756  	}
   757  	roots := x509.NewCertPool()
   758  	if !roots.AppendCertsFromPEM(b) {
   759  		t.Fatal("failed to append certificates")
   760  	}
   761  	return credentials.NewTLS(&tls.Config{
   762  		Certificates:       []tls.Certificate{cert},
   763  		RootCAs:            roots,
   764  		InsecureSkipVerify: true,
   765  	})
   766  }
   767  
   768  // checkForHealthyStream attempts to create a stream and return error if any.
   769  // The stream created is closed right after to avoid any leakages.
   770  func checkForHealthyStream(client *http2Client) error {
   771  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   772  	defer cancel()
   773  	stream, err := client.NewStream(ctx, &CallHdr{})
   774  	client.CloseStream(stream, err)
   775  	return err
   776  }
   777  
   778  func pollForStreamCreationError(client *http2Client) error {
   779  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   780  	defer cancel()
   781  	for {
   782  		if _, err := client.NewStream(ctx, &CallHdr{}); err != nil {
   783  			break
   784  		}
   785  		time.Sleep(50 * time.Millisecond)
   786  	}
   787  	if ctx.Err() != nil {
   788  		return fmt.Errorf("test timed out before stream creation returned an error")
   789  	}
   790  	return nil
   791  }
   792  
   793  // waitForGoAwayTooManyPings waits for client to receive a GoAwayTooManyPings
   794  // from server. It also asserts that stream creation fails after receiving a
   795  // GoAway.
   796  func waitForGoAwayTooManyPings(client *http2Client) error {
   797  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   798  	defer cancel()
   799  	select {
   800  	case <-client.GoAway():
   801  		if reason, _ := client.GetGoAwayReason(); reason != GoAwayTooManyPings {
   802  			return fmt.Errorf("goAwayReason is %v, want %v", reason, GoAwayTooManyPings)
   803  		}
   804  	case <-ctx.Done():
   805  		return fmt.Errorf("test timed out before getting GoAway with reason:GoAwayTooManyPings from server")
   806  	}
   807  
   808  	if _, err := client.NewStream(ctx, &CallHdr{}); err == nil {
   809  		return fmt.Errorf("stream creation succeeded after receiving a GoAway from the server")
   810  	}
   811  	return nil
   812  }
   813  

View as plain text