...

Source file src/google.golang.org/grpc/test/stream_cleanup_test.go

Documentation: google.golang.org/grpc/test

     1  /*
     2   *
     3   * Copyright 2019 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  package test
    20  
    21  import (
    22  	"context"
    23  	"io"
    24  	"testing"
    25  	"time"
    26  
    27  	"google.golang.org/grpc"
    28  	"google.golang.org/grpc/codes"
    29  	"google.golang.org/grpc/internal/stubserver"
    30  	"google.golang.org/grpc/status"
    31  
    32  	testgrpc "google.golang.org/grpc/interop/grpc_testing"
    33  	testpb "google.golang.org/grpc/interop/grpc_testing"
    34  )
    35  
    36  func (s) TestStreamCleanup(t *testing.T) {
    37  	const initialWindowSize uint = 70 * 1024 // Must be higher than default 64K, ignored otherwise
    38  	const bodySize = 2 * initialWindowSize   // Something that is not going to fit in a single window
    39  	const callRecvMsgSize uint = 1           // The maximum message size the client can receive
    40  
    41  	ss := &stubserver.StubServer{
    42  		UnaryCallF: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
    43  			return &testpb.SimpleResponse{Payload: &testpb.Payload{
    44  				Body: make([]byte, bodySize),
    45  			}}, nil
    46  		},
    47  		EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) {
    48  			return &testpb.Empty{}, nil
    49  		},
    50  	}
    51  	if err := ss.Start(nil, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(int(callRecvMsgSize))), grpc.WithInitialWindowSize(int32(initialWindowSize))); err != nil {
    52  		t.Fatalf("Error starting endpoint server: %v", err)
    53  	}
    54  	defer ss.Stop()
    55  
    56  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
    57  	defer cancel()
    58  	if _, err := ss.Client.UnaryCall(ctx, &testpb.SimpleRequest{}); status.Code(err) != codes.ResourceExhausted {
    59  		t.Fatalf("should fail with ResourceExhausted, message's body size: %v, maximum message size the client can receive: %v", bodySize, callRecvMsgSize)
    60  	}
    61  	if _, err := ss.Client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
    62  		t.Fatalf("should succeed, err: %v", err)
    63  	}
    64  }
    65  
    66  func (s) TestStreamCleanupAfterSendStatus(t *testing.T) {
    67  	const initialWindowSize uint = 70 * 1024 // Must be higher than default 64K, ignored otherwise
    68  	const bodySize = 2 * initialWindowSize   // Something that is not going to fit in a single window
    69  
    70  	serverReturnedStatus := make(chan struct{})
    71  
    72  	ss := &stubserver.StubServer{
    73  		FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
    74  			defer func() {
    75  				close(serverReturnedStatus)
    76  			}()
    77  			return stream.Send(&testpb.StreamingOutputCallResponse{
    78  				Payload: &testpb.Payload{
    79  					Body: make([]byte, bodySize),
    80  				},
    81  			})
    82  		},
    83  	}
    84  	if err := ss.Start(nil, grpc.WithInitialWindowSize(int32(initialWindowSize))); err != nil {
    85  		t.Fatalf("Error starting endpoint server: %v", err)
    86  	}
    87  	defer ss.Stop()
    88  
    89  	// This test makes sure we don't delete stream from server transport's
    90  	// activeStreams list too aggressively.
    91  
    92  	// 1. Make a long living stream RPC. So server's activeStream list is not
    93  	// empty.
    94  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
    95  	defer cancel()
    96  	stream, err := ss.Client.FullDuplexCall(ctx)
    97  	if err != nil {
    98  		t.Fatalf("FullDuplexCall= _, %v; want _, <nil>", err)
    99  	}
   100  
   101  	// 2. Wait for service handler to return status.
   102  	//
   103  	// This will trigger a stream cleanup code, which will eventually remove
   104  	// this stream from activeStream.
   105  	//
   106  	// But the stream removal won't happen because it's supposed to be done
   107  	// after the status is sent by loopyWriter, and the status send is blocked
   108  	// by flow control.
   109  	<-serverReturnedStatus
   110  
   111  	// 3. GracefulStop (besides sending goaway) checks the number of
   112  	// activeStreams.
   113  	//
   114  	// It will close the connection if there's no active streams. This won't
   115  	// happen because of the pending stream. But if there's a bug in stream
   116  	// cleanup that causes stream to be removed too aggressively, the connection
   117  	// will be closd and the stream will be broken.
   118  	gracefulStopDone := make(chan struct{})
   119  	go func() {
   120  		defer close(gracefulStopDone)
   121  		ss.S.GracefulStop()
   122  	}()
   123  
   124  	// 4. Make sure the stream is not broken.
   125  	if _, err := stream.Recv(); err != nil {
   126  		t.Fatalf("stream.Recv() = _, %v, want _, <nil>", err)
   127  	}
   128  	if _, err := stream.Recv(); err != io.EOF {
   129  		t.Fatalf("stream.Recv() = _, %v, want _, io.EOF", err)
   130  	}
   131  
   132  	timer := time.NewTimer(time.Second)
   133  	select {
   134  	case <-gracefulStopDone:
   135  		timer.Stop()
   136  	case <-timer.C:
   137  		t.Fatalf("s.GracefulStop() didn't finish within 1 second after the last RPC")
   138  	}
   139  }
   140  

View as plain text