...

Source file src/google.golang.org/grpc/test/goaway_test.go

Documentation: google.golang.org/grpc/test

     1  /*
     2   *
     3   * Copyright 2019 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  package test
    20  
    21  import (
    22  	"context"
    23  	"fmt"
    24  	"io"
    25  	"net"
    26  	"strings"
    27  	"testing"
    28  	"time"
    29  
    30  	"golang.org/x/net/http2"
    31  	"google.golang.org/grpc"
    32  	"google.golang.org/grpc/codes"
    33  	"google.golang.org/grpc/connectivity"
    34  	"google.golang.org/grpc/credentials/insecure"
    35  	"google.golang.org/grpc/internal"
    36  	"google.golang.org/grpc/internal/grpcsync"
    37  	"google.golang.org/grpc/internal/grpctest"
    38  	"google.golang.org/grpc/internal/stubserver"
    39  	"google.golang.org/grpc/internal/testutils"
    40  	"google.golang.org/grpc/keepalive"
    41  	"google.golang.org/grpc/resolver"
    42  	"google.golang.org/grpc/resolver/manual"
    43  	"google.golang.org/grpc/status"
    44  
    45  	testgrpc "google.golang.org/grpc/interop/grpc_testing"
    46  	testpb "google.golang.org/grpc/interop/grpc_testing"
    47  )
    48  
    49  // TestGracefulClientOnGoAway attempts to ensure that when the server sends a
    50  // GOAWAY (in this test, by configuring max connection age on the server), a
    51  // client will never see an error.  This requires that the client is appraised
    52  // of the GOAWAY and updates its state accordingly before the transport stops
    53  // accepting new streams.  If a subconn is chosen by a picker and receives the
    54  // goaway before creating the stream, an error will occur, but upon transparent
    55  // retry, the clientconn will ensure a ready subconn is chosen.
    56  func (s) TestGracefulClientOnGoAway(t *testing.T) {
    57  	const maxConnAge = 100 * time.Millisecond
    58  	const testTime = maxConnAge * 10
    59  
    60  	ss := &stubserver.StubServer{
    61  		EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) {
    62  			return &testpb.Empty{}, nil
    63  		},
    64  	}
    65  
    66  	s := grpc.NewServer(grpc.KeepaliveParams(keepalive.ServerParameters{MaxConnectionAge: maxConnAge}))
    67  	defer s.Stop()
    68  	testgrpc.RegisterTestServiceServer(s, ss)
    69  
    70  	lis, err := net.Listen("tcp", "localhost:0")
    71  	if err != nil {
    72  		t.Fatalf("Failed to create listener: %v", err)
    73  	}
    74  	go s.Serve(lis)
    75  
    76  	cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
    77  	if err != nil {
    78  		t.Fatalf("Failed to dial server: %v", err)
    79  	}
    80  	defer cc.Close()
    81  	c := testgrpc.NewTestServiceClient(cc)
    82  
    83  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
    84  	defer cancel()
    85  
    86  	endTime := time.Now().Add(testTime)
    87  	for time.Now().Before(endTime) {
    88  		if _, err := c.EmptyCall(ctx, &testpb.Empty{}); err != nil {
    89  			t.Fatalf("EmptyCall(_, _) = _, %v; want _, <nil>", err)
    90  		}
    91  	}
    92  }
    93  
    94  func (s) TestDetailedGoAwayErrorOnGracefulClosePropagatesToRPCError(t *testing.T) {
    95  	rpcDoneOnClient := make(chan struct{})
    96  	ss := &stubserver.StubServer{
    97  		FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
    98  			<-rpcDoneOnClient
    99  			return status.Error(codes.Internal, "arbitrary status")
   100  		},
   101  	}
   102  	sopts := []grpc.ServerOption{
   103  		grpc.KeepaliveParams(keepalive.ServerParameters{
   104  			MaxConnectionAge:      time.Millisecond * 100,
   105  			MaxConnectionAgeGrace: time.Nanosecond, // ~instantaneously, but non-zero to avoid default
   106  		}),
   107  	}
   108  	if err := ss.Start(sopts); err != nil {
   109  		t.Fatalf("Error starting endpoint server: %v", err)
   110  	}
   111  	defer ss.Stop()
   112  
   113  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   114  	defer cancel()
   115  	stream, err := ss.Client.FullDuplexCall(ctx)
   116  	if err != nil {
   117  		t.Fatalf("%v.FullDuplexCall = _, %v, want _, <nil>", ss.Client, err)
   118  	}
   119  	const expectedErrorMessageSubstring = "received prior goaway: code: NO_ERROR"
   120  	_, err = stream.Recv()
   121  	close(rpcDoneOnClient)
   122  	if err == nil || !strings.Contains(err.Error(), expectedErrorMessageSubstring) {
   123  		t.Fatalf("%v.Recv() = _, %v, want _, rpc error containing substring: %q", stream, err, expectedErrorMessageSubstring)
   124  	}
   125  }
   126  
   127  func (s) TestDetailedGoAwayErrorOnAbruptClosePropagatesToRPCError(t *testing.T) {
   128  	grpctest.TLogger.ExpectError("Client received GoAway with error code ENHANCE_YOUR_CALM and debug data equal to ASCII \"too_many_pings\"")
   129  	// set the min keepalive time very low so that this test can take
   130  	// a reasonable amount of time
   131  	prev := internal.KeepaliveMinPingTime
   132  	internal.KeepaliveMinPingTime = time.Millisecond
   133  	defer func() { internal.KeepaliveMinPingTime = prev }()
   134  
   135  	rpcDoneOnClient := make(chan struct{})
   136  	ss := &stubserver.StubServer{
   137  		FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
   138  			<-rpcDoneOnClient
   139  			return status.Error(codes.Internal, "arbitrary status")
   140  		},
   141  	}
   142  	sopts := []grpc.ServerOption{
   143  		grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
   144  			MinTime: time.Second * 1000, /* arbitrary, large value */
   145  		}),
   146  	}
   147  	dopts := []grpc.DialOption{
   148  		grpc.WithKeepaliveParams(keepalive.ClientParameters{
   149  			Time:                time.Millisecond,   /* should trigger "too many pings" error quickly */
   150  			Timeout:             time.Second * 1000, /* arbitrary, large value */
   151  			PermitWithoutStream: false,
   152  		}),
   153  	}
   154  	if err := ss.Start(sopts, dopts...); err != nil {
   155  		t.Fatalf("Error starting endpoint server: %v", err)
   156  	}
   157  	defer ss.Stop()
   158  
   159  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   160  	defer cancel()
   161  	stream, err := ss.Client.FullDuplexCall(ctx)
   162  	if err != nil {
   163  		t.Fatalf("%v.FullDuplexCall = _, %v, want _, <nil>", ss.Client, err)
   164  	}
   165  	const expectedErrorMessageSubstring = `received prior goaway: code: ENHANCE_YOUR_CALM, debug data: "too_many_pings"`
   166  	_, err = stream.Recv()
   167  	close(rpcDoneOnClient)
   168  	if err == nil || !strings.Contains(err.Error(), expectedErrorMessageSubstring) {
   169  		t.Fatalf("%v.Recv() = _, %v, want _, rpc error containing substring: |%v|", stream, err, expectedErrorMessageSubstring)
   170  	}
   171  }
   172  
   173  func (s) TestClientConnCloseAfterGoAwayWithActiveStream(t *testing.T) {
   174  	for _, e := range listTestEnv() {
   175  		if e.name == "handler-tls" {
   176  			continue
   177  		}
   178  		testClientConnCloseAfterGoAwayWithActiveStream(t, e)
   179  	}
   180  }
   181  
   182  func testClientConnCloseAfterGoAwayWithActiveStream(t *testing.T, e env) {
   183  	te := newTest(t, e)
   184  	te.startServer(&testServer{security: e.security})
   185  	defer te.tearDown()
   186  	cc := te.clientConn()
   187  	tc := testgrpc.NewTestServiceClient(cc)
   188  
   189  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   190  	defer cancel()
   191  	if _, err := tc.FullDuplexCall(ctx); err != nil {
   192  		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want _, <nil>", tc, err)
   193  	}
   194  	done := make(chan struct{})
   195  	go func() {
   196  		te.srv.GracefulStop()
   197  		close(done)
   198  	}()
   199  	time.Sleep(50 * time.Millisecond)
   200  	cc.Close()
   201  	timeout := time.NewTimer(time.Second)
   202  	select {
   203  	case <-done:
   204  	case <-timeout.C:
   205  		t.Fatalf("Test timed-out.")
   206  	}
   207  }
   208  
   209  func (s) TestServerGoAway(t *testing.T) {
   210  	for _, e := range listTestEnv() {
   211  		if e.name == "handler-tls" {
   212  			continue
   213  		}
   214  		testServerGoAway(t, e)
   215  	}
   216  }
   217  
   218  func testServerGoAway(t *testing.T, e env) {
   219  	te := newTest(t, e)
   220  	te.userAgent = testAppUA
   221  	te.startServer(&testServer{security: e.security})
   222  	defer te.tearDown()
   223  
   224  	cc := te.clientConn()
   225  	tc := testgrpc.NewTestServiceClient(cc)
   226  	// Finish an RPC to make sure the connection is good.
   227  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   228  	defer cancel()
   229  	if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
   230  		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
   231  	}
   232  	ch := make(chan struct{})
   233  	go func() {
   234  		te.srv.GracefulStop()
   235  		close(ch)
   236  	}()
   237  	// Loop until the server side GoAway signal is propagated to the client.
   238  	for {
   239  		ctx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
   240  		if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil && status.Code(err) != codes.DeadlineExceeded {
   241  			cancel()
   242  			break
   243  		}
   244  		cancel()
   245  	}
   246  	// A new RPC should fail.
   247  	ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout)
   248  	defer cancel()
   249  	if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Unavailable && status.Code(err) != codes.Internal {
   250  		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s or %s", err, codes.Unavailable, codes.Internal)
   251  	}
   252  	<-ch
   253  	awaitNewConnLogOutput()
   254  }
   255  
   256  func (s) TestServerGoAwayPendingRPC(t *testing.T) {
   257  	for _, e := range listTestEnv() {
   258  		if e.name == "handler-tls" {
   259  			continue
   260  		}
   261  		testServerGoAwayPendingRPC(t, e)
   262  	}
   263  }
   264  
   265  func testServerGoAwayPendingRPC(t *testing.T, e env) {
   266  	te := newTest(t, e)
   267  	te.userAgent = testAppUA
   268  	te.declareLogNoise(
   269  		"transport: http2Client.notifyError got notified that the client transport was broken EOF",
   270  		"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
   271  		"grpc: addrConn.resetTransport failed to create client transport: connection error",
   272  	)
   273  	te.startServer(&testServer{security: e.security})
   274  	defer te.tearDown()
   275  
   276  	cc := te.clientConn()
   277  	tc := testgrpc.NewTestServiceClient(cc)
   278  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   279  	stream, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true))
   280  	if err != nil {
   281  		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
   282  	}
   283  	// Finish an RPC to make sure the connection is good.
   284  	if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
   285  		t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, <nil>", tc, err)
   286  	}
   287  	ch := make(chan struct{})
   288  	go func() {
   289  		te.srv.GracefulStop()
   290  		close(ch)
   291  	}()
   292  	// Loop until the server side GoAway signal is propagated to the client.
   293  	start := time.Now()
   294  	errored := false
   295  	for time.Since(start) < time.Second {
   296  		ctx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
   297  		_, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true))
   298  		cancel()
   299  		if err != nil {
   300  			errored = true
   301  			break
   302  		}
   303  	}
   304  	if !errored {
   305  		t.Fatalf("GoAway never received by client")
   306  	}
   307  	respParam := []*testpb.ResponseParameters{{Size: 1}}
   308  	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(100))
   309  	if err != nil {
   310  		t.Fatal(err)
   311  	}
   312  	req := &testpb.StreamingOutputCallRequest{
   313  		ResponseType:       testpb.PayloadType_COMPRESSABLE,
   314  		ResponseParameters: respParam,
   315  		Payload:            payload,
   316  	}
   317  	// The existing RPC should be still good to proceed.
   318  	if err := stream.Send(req); err != nil {
   319  		t.Fatalf("%v.Send(_) = %v, want <nil>", stream, err)
   320  	}
   321  	if _, err := stream.Recv(); err != nil {
   322  		t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err)
   323  	}
   324  	// The RPC will run until canceled.
   325  	cancel()
   326  	<-ch
   327  	awaitNewConnLogOutput()
   328  }
   329  
   330  func (s) TestServerMultipleGoAwayPendingRPC(t *testing.T) {
   331  	for _, e := range listTestEnv() {
   332  		if e.name == "handler-tls" {
   333  			continue
   334  		}
   335  		testServerMultipleGoAwayPendingRPC(t, e)
   336  	}
   337  }
   338  
   339  func testServerMultipleGoAwayPendingRPC(t *testing.T, e env) {
   340  	te := newTest(t, e)
   341  	te.userAgent = testAppUA
   342  	te.declareLogNoise(
   343  		"transport: http2Client.notifyError got notified that the client transport was broken EOF",
   344  		"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
   345  		"grpc: addrConn.resetTransport failed to create client transport: connection error",
   346  	)
   347  	te.startServer(&testServer{security: e.security})
   348  	defer te.tearDown()
   349  
   350  	cc := te.clientConn()
   351  	tc := testgrpc.NewTestServiceClient(cc)
   352  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   353  	stream, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true))
   354  	if err != nil {
   355  		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
   356  	}
   357  	// Finish an RPC to make sure the connection is good.
   358  	if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
   359  		t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, <nil>", tc, err)
   360  	}
   361  	ch1 := make(chan struct{})
   362  	go func() {
   363  		te.srv.GracefulStop()
   364  		close(ch1)
   365  	}()
   366  	ch2 := make(chan struct{})
   367  	go func() {
   368  		te.srv.GracefulStop()
   369  		close(ch2)
   370  	}()
   371  	// Loop until the server side GoAway signal is propagated to the client.
   372  
   373  	for {
   374  		ctx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
   375  		if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
   376  			cancel()
   377  			break
   378  		}
   379  		cancel()
   380  	}
   381  	select {
   382  	case <-ch1:
   383  		t.Fatal("GracefulStop() terminated early")
   384  	case <-ch2:
   385  		t.Fatal("GracefulStop() terminated early")
   386  	default:
   387  	}
   388  	respParam := []*testpb.ResponseParameters{
   389  		{
   390  			Size: 1,
   391  		},
   392  	}
   393  	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(100))
   394  	if err != nil {
   395  		t.Fatal(err)
   396  	}
   397  	req := &testpb.StreamingOutputCallRequest{
   398  		ResponseType:       testpb.PayloadType_COMPRESSABLE,
   399  		ResponseParameters: respParam,
   400  		Payload:            payload,
   401  	}
   402  	// The existing RPC should be still good to proceed.
   403  	if err := stream.Send(req); err != nil {
   404  		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
   405  	}
   406  	if _, err := stream.Recv(); err != nil {
   407  		t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err)
   408  	}
   409  	if err := stream.CloseSend(); err != nil {
   410  		t.Fatalf("%v.CloseSend() = %v, want <nil>", stream, err)
   411  	}
   412  
   413  	<-ch1
   414  	<-ch2
   415  	cancel()
   416  	awaitNewConnLogOutput()
   417  }
   418  
   419  func (s) TestConcurrentClientConnCloseAndServerGoAway(t *testing.T) {
   420  	for _, e := range listTestEnv() {
   421  		if e.name == "handler-tls" {
   422  			continue
   423  		}
   424  		testConcurrentClientConnCloseAndServerGoAway(t, e)
   425  	}
   426  }
   427  
   428  func testConcurrentClientConnCloseAndServerGoAway(t *testing.T, e env) {
   429  	te := newTest(t, e)
   430  	te.userAgent = testAppUA
   431  	te.declareLogNoise(
   432  		"transport: http2Client.notifyError got notified that the client transport was broken EOF",
   433  		"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
   434  		"grpc: addrConn.resetTransport failed to create client transport: connection error",
   435  	)
   436  	te.startServer(&testServer{security: e.security})
   437  	defer te.tearDown()
   438  
   439  	cc := te.clientConn()
   440  	tc := testgrpc.NewTestServiceClient(cc)
   441  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   442  	defer cancel()
   443  	if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
   444  		t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, <nil>", tc, err)
   445  	}
   446  	ch := make(chan struct{})
   447  	// Close ClientConn and Server concurrently.
   448  	go func() {
   449  		te.srv.GracefulStop()
   450  		close(ch)
   451  	}()
   452  	go func() {
   453  		cc.Close()
   454  	}()
   455  	<-ch
   456  }
   457  
   458  func (s) TestConcurrentServerStopAndGoAway(t *testing.T) {
   459  	for _, e := range listTestEnv() {
   460  		if e.name == "handler-tls" {
   461  			continue
   462  		}
   463  		testConcurrentServerStopAndGoAway(t, e)
   464  	}
   465  }
   466  
   467  func testConcurrentServerStopAndGoAway(t *testing.T, e env) {
   468  	te := newTest(t, e)
   469  	te.userAgent = testAppUA
   470  	te.declareLogNoise(
   471  		"transport: http2Client.notifyError got notified that the client transport was broken EOF",
   472  		"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
   473  		"grpc: addrConn.resetTransport failed to create client transport: connection error",
   474  	)
   475  	te.startServer(&testServer{security: e.security})
   476  	defer te.tearDown()
   477  
   478  	cc := te.clientConn()
   479  	tc := testgrpc.NewTestServiceClient(cc)
   480  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   481  	defer cancel()
   482  	stream, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true))
   483  	if err != nil {
   484  		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
   485  	}
   486  
   487  	// Finish an RPC to make sure the connection is good.
   488  	if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
   489  		t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, <nil>", tc, err)
   490  	}
   491  
   492  	ch := make(chan struct{})
   493  	go func() {
   494  		te.srv.GracefulStop()
   495  		close(ch)
   496  	}()
   497  	// Loop until the server side GoAway signal is propagated to the client.
   498  	for {
   499  		ctx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
   500  		if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
   501  			cancel()
   502  			break
   503  		}
   504  		cancel()
   505  	}
   506  	// Stop the server and close all the connections.
   507  	te.srv.Stop()
   508  	respParam := []*testpb.ResponseParameters{
   509  		{
   510  			Size: 1,
   511  		},
   512  	}
   513  	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(100))
   514  	if err != nil {
   515  		t.Fatal(err)
   516  	}
   517  	req := &testpb.StreamingOutputCallRequest{
   518  		ResponseType:       testpb.PayloadType_COMPRESSABLE,
   519  		ResponseParameters: respParam,
   520  		Payload:            payload,
   521  	}
   522  	sendStart := time.Now()
   523  	for {
   524  		if err := stream.Send(req); err == io.EOF {
   525  			// stream.Send should eventually send io.EOF
   526  			break
   527  		} else if err != nil {
   528  			// Send should never return a transport-level error.
   529  			t.Fatalf("stream.Send(%v) = %v; want <nil or io.EOF>", req, err)
   530  		}
   531  		if time.Since(sendStart) > 2*time.Second {
   532  			t.Fatalf("stream.Send(_) did not return io.EOF after 2s")
   533  		}
   534  		time.Sleep(time.Millisecond)
   535  	}
   536  	if _, err := stream.Recv(); err == nil || err == io.EOF {
   537  		t.Fatalf("%v.Recv() = _, %v, want _, <non-nil, non-EOF>", stream, err)
   538  	}
   539  	<-ch
   540  	awaitNewConnLogOutput()
   541  }
   542  
   543  // Proxies typically send GO_AWAY followed by connection closure a minute or so later. This
   544  // test ensures that the connection is re-created after GO_AWAY and not affected by the
   545  // subsequent (old) connection closure.
   546  func (s) TestGoAwayThenClose(t *testing.T) {
   547  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   548  	defer cancel()
   549  
   550  	lis1, err := net.Listen("tcp", "localhost:0")
   551  	if err != nil {
   552  		t.Fatalf("Error while listening. Err: %v", err)
   553  	}
   554  	s1 := grpc.NewServer()
   555  	defer s1.Stop()
   556  	ts := &funcServer{
   557  		unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
   558  			return &testpb.SimpleResponse{}, nil
   559  		},
   560  		fullDuplexCall: func(stream testgrpc.TestService_FullDuplexCallServer) error {
   561  			if err := stream.Send(&testpb.StreamingOutputCallResponse{}); err != nil {
   562  				t.Errorf("unexpected error from send: %v", err)
   563  				return err
   564  			}
   565  			// Wait forever.
   566  			_, err := stream.Recv()
   567  			if err == nil {
   568  				t.Error("expected to never receive any message")
   569  			}
   570  			return err
   571  		},
   572  	}
   573  	testgrpc.RegisterTestServiceServer(s1, ts)
   574  	go s1.Serve(lis1)
   575  
   576  	conn2Established := grpcsync.NewEvent()
   577  	lis2, err := listenWithNotifyingListener("tcp", "localhost:0", conn2Established)
   578  	if err != nil {
   579  		t.Fatalf("Error while listening. Err: %v", err)
   580  	}
   581  	s2 := grpc.NewServer()
   582  	defer s2.Stop()
   583  	testgrpc.RegisterTestServiceServer(s2, ts)
   584  
   585  	r := manual.NewBuilderWithScheme("whatever")
   586  	r.InitialState(resolver.State{Addresses: []resolver.Address{
   587  		{Addr: lis1.Addr().String()},
   588  		{Addr: lis2.Addr().String()},
   589  	}})
   590  	cc, err := grpc.DialContext(ctx, r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
   591  	if err != nil {
   592  		t.Fatalf("Error creating client: %v", err)
   593  	}
   594  	defer cc.Close()
   595  
   596  	client := testgrpc.NewTestServiceClient(cc)
   597  
   598  	t.Log("Waiting for the ClientConn to enter READY state.")
   599  	testutils.AwaitState(ctx, t, cc, connectivity.Ready)
   600  
   601  	// We make a streaming RPC and do an one-message-round-trip to make sure
   602  	// it's created on connection 1.
   603  	//
   604  	// We use a long-lived RPC because it will cause GracefulStop to send
   605  	// GO_AWAY, but the connection won't get closed until the server stops and
   606  	// the client receives the error.
   607  	t.Log("Creating first streaming RPC to server 1.")
   608  	stream, err := client.FullDuplexCall(ctx)
   609  	if err != nil {
   610  		t.Fatalf("FullDuplexCall(_) = _, %v; want _, nil", err)
   611  	}
   612  	if _, err = stream.Recv(); err != nil {
   613  		t.Fatalf("unexpected error from first recv: %v", err)
   614  	}
   615  
   616  	go s2.Serve(lis2)
   617  
   618  	t.Log("Gracefully stopping server 1.")
   619  	go s1.GracefulStop()
   620  
   621  	t.Log("Waiting for the ClientConn to enter IDLE state.")
   622  	testutils.AwaitState(ctx, t, cc, connectivity.Idle)
   623  
   624  	t.Log("Performing another RPC to create a connection to server 2.")
   625  	if _, err := client.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
   626  		t.Fatalf("UnaryCall(_) = _, %v; want _, nil", err)
   627  	}
   628  
   629  	t.Log("Waiting for a connection to server 2.")
   630  	select {
   631  	case <-conn2Established.Done():
   632  	case <-ctx.Done():
   633  		t.Fatalf("timed out waiting for connection 2 to be established")
   634  	}
   635  
   636  	// Close the listener for server2 to prevent it from allowing new connections.
   637  	lis2.Close()
   638  
   639  	t.Log("Hard closing connection 1.")
   640  	s1.Stop()
   641  
   642  	t.Log("Waiting for the first stream to error.")
   643  	if _, err = stream.Recv(); err == nil {
   644  		t.Fatal("expected the stream to die, but got a successful Recv")
   645  	}
   646  
   647  	t.Log("Ensuring connection 2 is stable.")
   648  	for i := 0; i < 10; i++ {
   649  		if _, err := client.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
   650  			t.Fatalf("UnaryCall(_) = _, %v; want _, nil", err)
   651  		}
   652  	}
   653  }
   654  
   655  // TestGoAwayStreamIDSmallerThanCreatedStreams tests the scenario where a server
   656  // sends a goaway with a stream id that is smaller than some created streams on
   657  // the client, while the client is simultaneously creating new streams. This
   658  // should not induce a deadlock.
   659  func (s) TestGoAwayStreamIDSmallerThanCreatedStreams(t *testing.T) {
   660  	lis, err := net.Listen("tcp", "localhost:0")
   661  	if err != nil {
   662  		t.Fatalf("error listening: %v", err)
   663  	}
   664  
   665  	ctCh := testutils.NewChannel()
   666  	go func() {
   667  		conn, err := lis.Accept()
   668  		if err != nil {
   669  			t.Errorf("error in lis.Accept(): %v", err)
   670  		}
   671  		ct := newClientTester(t, conn)
   672  		ctCh.Send(ct)
   673  	}()
   674  
   675  	cc, err := grpc.Dial(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
   676  	if err != nil {
   677  		t.Fatalf("error dialing: %v", err)
   678  	}
   679  	defer cc.Close()
   680  
   681  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   682  	defer cancel()
   683  
   684  	val, err := ctCh.Receive(ctx)
   685  	if err != nil {
   686  		t.Fatalf("timeout waiting for client transport (should be given after http2 creation)")
   687  	}
   688  	ct := val.(*clientTester)
   689  
   690  	tc := testgrpc.NewTestServiceClient(cc)
   691  	someStreamsCreated := grpcsync.NewEvent()
   692  	goAwayWritten := grpcsync.NewEvent()
   693  	go func() {
   694  		for i := 0; i < 20; i++ {
   695  			if i == 10 {
   696  				<-goAwayWritten.Done()
   697  			}
   698  			tc.FullDuplexCall(ctx)
   699  			if i == 4 {
   700  				someStreamsCreated.Fire()
   701  			}
   702  		}
   703  	}()
   704  
   705  	<-someStreamsCreated.Done()
   706  	ct.writeGoAway(1, http2.ErrCodeNo, []byte{})
   707  	goAwayWritten.Fire()
   708  }
   709  
   710  // TestTwoGoAwayPingFrames tests the scenario where you get two go away ping
   711  // frames from the client during graceful shutdown. This should not crash the
   712  // server.
   713  func (s) TestTwoGoAwayPingFrames(t *testing.T) {
   714  	lis, err := net.Listen("tcp", "localhost:0")
   715  	if err != nil {
   716  		t.Fatalf("Failed to listen: %v", err)
   717  	}
   718  	defer lis.Close()
   719  	s := grpc.NewServer()
   720  	defer s.Stop()
   721  	go s.Serve(lis)
   722  
   723  	conn, err := net.DialTimeout("tcp", lis.Addr().String(), defaultTestTimeout)
   724  	if err != nil {
   725  		t.Fatalf("Failed to dial: %v", err)
   726  	}
   727  
   728  	st := newServerTesterFromConn(t, conn)
   729  	st.greet()
   730  	pingReceivedClientSide := testutils.NewChannel()
   731  	go func() {
   732  		for {
   733  			f, err := st.readFrame()
   734  			if err != nil {
   735  				return
   736  			}
   737  			switch f.(type) {
   738  			case *http2.GoAwayFrame:
   739  			case *http2.PingFrame:
   740  				pingReceivedClientSide.Send(nil)
   741  			default:
   742  				t.Errorf("server tester received unexpected frame type %T", f)
   743  			}
   744  		}
   745  	}()
   746  	gsDone := testutils.NewChannel()
   747  	go func() {
   748  		s.GracefulStop()
   749  		gsDone.Send(nil)
   750  	}()
   751  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   752  	defer cancel()
   753  	if _, err := pingReceivedClientSide.Receive(ctx); err != nil {
   754  		t.Fatalf("Error waiting for ping frame client side from graceful shutdown: %v", err)
   755  	}
   756  	// Write two goaway pings here.
   757  	st.writePing(true, [8]byte{1, 6, 1, 8, 0, 3, 3, 9})
   758  	st.writePing(true, [8]byte{1, 6, 1, 8, 0, 3, 3, 9})
   759  	// Close the conn to finish up the Graceful Shutdown process.
   760  	conn.Close()
   761  	if _, err := gsDone.Receive(ctx); err != nil {
   762  		t.Fatalf("Error waiting for graceful shutdown of the server: %v", err)
   763  	}
   764  }
   765  
   766  // TestClientSendsAGoAway tests the scenario where you get a go away ping
   767  // frames from the client during graceful shutdown.
   768  func (s) TestClientSendsAGoAway(t *testing.T) {
   769  	lis, err := net.Listen("tcp", "localhost:0")
   770  	if err != nil {
   771  		t.Fatalf("error listening: %v", err)
   772  	}
   773  	ctCh := testutils.NewChannel()
   774  	go func() {
   775  		conn, err := lis.Accept()
   776  		if err != nil {
   777  			t.Errorf("error in lis.Accept(): %v", err)
   778  		}
   779  		ct := newClientTester(t, conn)
   780  		ctCh.Send(ct)
   781  	}()
   782  	defer lis.Close()
   783  
   784  	cc, err := grpc.Dial(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
   785  	if err != nil {
   786  		t.Fatalf("error dialing: %v", err)
   787  	}
   788  
   789  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   790  	defer cancel()
   791  
   792  	val, err := ctCh.Receive(ctx)
   793  	if err != nil {
   794  		t.Fatalf("timeout waiting for client transport (should be given after http2 creation)")
   795  	}
   796  	ct := val.(*clientTester)
   797  	goAwayReceived := make(chan struct{})
   798  	errCh := make(chan error)
   799  	go func() {
   800  		for {
   801  			f, err := ct.fr.ReadFrame()
   802  			if err != nil {
   803  				return
   804  			}
   805  			switch fr := f.(type) {
   806  			case *http2.GoAwayFrame:
   807  				fr = f.(*http2.GoAwayFrame)
   808  				if fr.ErrCode == http2.ErrCodeNo {
   809  					t.Logf("GoAway received from client")
   810  					close(goAwayReceived)
   811  				}
   812  			default:
   813  				t.Errorf("server tester received unexpected frame type %T", f)
   814  				errCh <- fmt.Errorf("server tester received unexpected frame type %T", f)
   815  				close(errCh)
   816  			}
   817  		}
   818  	}()
   819  	cc.Close()
   820  	defer ct.conn.Close()
   821  	select {
   822  	case <-goAwayReceived:
   823  	case err := <-errCh:
   824  		t.Errorf("Error receiving the goAway: %v", err)
   825  	case <-ctx.Done():
   826  		t.Errorf("Context timed out")
   827  	}
   828  }
   829  

View as plain text