...

Source file src/google.golang.org/grpc/test/end2end_test.go

Documentation: google.golang.org/grpc/test

     1  /*
     2   *
     3   * Copyright 2014 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  package test
    20  
    21  import (
    22  	"bufio"
    23  	"bytes"
    24  	"context"
    25  	"crypto/tls"
    26  	"encoding/json"
    27  	"errors"
    28  	"flag"
    29  	"fmt"
    30  	"io"
    31  	"math"
    32  	"net"
    33  	"net/http"
    34  	"os"
    35  	"reflect"
    36  	"runtime"
    37  	"strings"
    38  	"sync"
    39  	"sync/atomic"
    40  	"syscall"
    41  	"testing"
    42  	"time"
    43  
    44  	"golang.org/x/net/http2"
    45  	"golang.org/x/net/http2/hpack"
    46  	"google.golang.org/grpc"
    47  	"google.golang.org/grpc/balancer"
    48  	"google.golang.org/grpc/balancer/roundrobin"
    49  	"google.golang.org/grpc/codes"
    50  	"google.golang.org/grpc/connectivity"
    51  	"google.golang.org/grpc/credentials"
    52  	"google.golang.org/grpc/credentials/insecure"
    53  	"google.golang.org/grpc/health"
    54  	"google.golang.org/grpc/internal"
    55  	"google.golang.org/grpc/internal/binarylog"
    56  	"google.golang.org/grpc/internal/channelz"
    57  	"google.golang.org/grpc/internal/grpcsync"
    58  	"google.golang.org/grpc/internal/grpctest"
    59  	"google.golang.org/grpc/internal/stubserver"
    60  	"google.golang.org/grpc/internal/testutils"
    61  	"google.golang.org/grpc/internal/transport"
    62  	"google.golang.org/grpc/metadata"
    63  	"google.golang.org/grpc/peer"
    64  	"google.golang.org/grpc/resolver"
    65  	"google.golang.org/grpc/resolver/manual"
    66  	"google.golang.org/grpc/serviceconfig"
    67  	"google.golang.org/grpc/stats"
    68  	"google.golang.org/grpc/status"
    69  	"google.golang.org/grpc/tap"
    70  	"google.golang.org/grpc/test/bufconn"
    71  	"google.golang.org/grpc/testdata"
    72  
    73  	spb "google.golang.org/genproto/googleapis/rpc/status"
    74  	healthgrpc "google.golang.org/grpc/health/grpc_health_v1"
    75  	healthpb "google.golang.org/grpc/health/grpc_health_v1"
    76  	testgrpc "google.golang.org/grpc/interop/grpc_testing"
    77  	testpb "google.golang.org/grpc/interop/grpc_testing"
    78  	"google.golang.org/protobuf/proto"
    79  	"google.golang.org/protobuf/types/known/anypb"
    80  
    81  	_ "google.golang.org/grpc/encoding/gzip"
    82  )
    83  
    84  const defaultHealthService = "grpc.health.v1.Health"
    85  
    86  func init() {
    87  	channelz.TurnOn()
    88  	balancer.Register(triggerRPCBlockPickerBalancerBuilder{})
    89  }
    90  
    91  type s struct {
    92  	grpctest.Tester
    93  }
    94  
    95  func Test(t *testing.T) {
    96  	grpctest.RunSubTests(t, s{})
    97  }
    98  
    99  var (
   100  	// For headers:
   101  	testMetadata = metadata.MD{
   102  		"key1":     []string{"value1"},
   103  		"key2":     []string{"value2"},
   104  		"key3-bin": []string{"binvalue1", string([]byte{1, 2, 3})},
   105  	}
   106  	testMetadata2 = metadata.MD{
   107  		"key1": []string{"value12"},
   108  		"key2": []string{"value22"},
   109  	}
   110  	// For trailers:
   111  	testTrailerMetadata = metadata.MD{
   112  		"tkey1":     []string{"trailerValue1"},
   113  		"tkey2":     []string{"trailerValue2"},
   114  		"tkey3-bin": []string{"trailerbinvalue1", string([]byte{3, 2, 1})},
   115  	}
   116  	testTrailerMetadata2 = metadata.MD{
   117  		"tkey1": []string{"trailerValue12"},
   118  		"tkey2": []string{"trailerValue22"},
   119  	}
   120  	// capital "Key" is illegal in HTTP/2.
   121  	malformedHTTP2Metadata = metadata.MD{
   122  		"Key": []string{"foo"},
   123  	}
   124  	testAppUA     = "myApp1/1.0 myApp2/0.9"
   125  	failAppUA     = "fail-this-RPC"
   126  	detailedError = status.ErrorProto(&spb.Status{
   127  		Code:    int32(codes.DataLoss),
   128  		Message: "error for testing: " + failAppUA,
   129  		Details: []*anypb.Any{{
   130  			TypeUrl: "url",
   131  			Value:   []byte{6, 0, 0, 6, 1, 3},
   132  		}},
   133  	})
   134  )
   135  
   136  var raceMode bool // set by race.go in race mode
   137  
   138  type testServer struct {
   139  	testgrpc.UnimplementedTestServiceServer
   140  
   141  	security           string // indicate the authentication protocol used by this server.
   142  	earlyFail          bool   // whether to error out the execution of a service handler prematurely.
   143  	setAndSendHeader   bool   // whether to call setHeader and sendHeader.
   144  	setHeaderOnly      bool   // whether to only call setHeader, not sendHeader.
   145  	multipleSetTrailer bool   // whether to call setTrailer multiple times.
   146  	unaryCallSleepTime time.Duration
   147  }
   148  
   149  func (s *testServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
   150  	if md, ok := metadata.FromIncomingContext(ctx); ok {
   151  		// For testing purpose, returns an error if user-agent is failAppUA.
   152  		// To test that client gets the correct error.
   153  		if ua, ok := md["user-agent"]; !ok || strings.HasPrefix(ua[0], failAppUA) {
   154  			return nil, detailedError
   155  		}
   156  		var str []string
   157  		for _, entry := range md["user-agent"] {
   158  			str = append(str, "ua", entry)
   159  		}
   160  		grpc.SendHeader(ctx, metadata.Pairs(str...))
   161  	}
   162  	return new(testpb.Empty), nil
   163  }
   164  
   165  func newPayload(t testpb.PayloadType, size int32) (*testpb.Payload, error) {
   166  	if size < 0 {
   167  		return nil, fmt.Errorf("requested a response with invalid length %d", size)
   168  	}
   169  	body := make([]byte, size)
   170  	switch t {
   171  	case testpb.PayloadType_COMPRESSABLE:
   172  	default:
   173  		return nil, fmt.Errorf("unsupported payload type: %d", t)
   174  	}
   175  	return &testpb.Payload{
   176  		Type: t,
   177  		Body: body,
   178  	}, nil
   179  }
   180  
   181  func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
   182  	md, ok := metadata.FromIncomingContext(ctx)
   183  	if ok {
   184  		if _, exists := md[":authority"]; !exists {
   185  			return nil, status.Errorf(codes.DataLoss, "expected an :authority metadata: %v", md)
   186  		}
   187  		if s.setAndSendHeader {
   188  			if err := grpc.SetHeader(ctx, md); err != nil {
   189  				return nil, status.Errorf(status.Code(err), "grpc.SetHeader(_, %v) = %v, want <nil>", md, err)
   190  			}
   191  			if err := grpc.SendHeader(ctx, testMetadata2); err != nil {
   192  				return nil, status.Errorf(status.Code(err), "grpc.SendHeader(_, %v) = %v, want <nil>", testMetadata2, err)
   193  			}
   194  		} else if s.setHeaderOnly {
   195  			if err := grpc.SetHeader(ctx, md); err != nil {
   196  				return nil, status.Errorf(status.Code(err), "grpc.SetHeader(_, %v) = %v, want <nil>", md, err)
   197  			}
   198  			if err := grpc.SetHeader(ctx, testMetadata2); err != nil {
   199  				return nil, status.Errorf(status.Code(err), "grpc.SetHeader(_, %v) = %v, want <nil>", testMetadata2, err)
   200  			}
   201  		} else {
   202  			if err := grpc.SendHeader(ctx, md); err != nil {
   203  				return nil, status.Errorf(status.Code(err), "grpc.SendHeader(_, %v) = %v, want <nil>", md, err)
   204  			}
   205  		}
   206  		if err := grpc.SetTrailer(ctx, testTrailerMetadata); err != nil {
   207  			return nil, status.Errorf(status.Code(err), "grpc.SetTrailer(_, %v) = %v, want <nil>", testTrailerMetadata, err)
   208  		}
   209  		if s.multipleSetTrailer {
   210  			if err := grpc.SetTrailer(ctx, testTrailerMetadata2); err != nil {
   211  				return nil, status.Errorf(status.Code(err), "grpc.SetTrailer(_, %v) = %v, want <nil>", testTrailerMetadata2, err)
   212  			}
   213  		}
   214  	}
   215  	pr, ok := peer.FromContext(ctx)
   216  	if !ok {
   217  		return nil, status.Error(codes.DataLoss, "failed to get peer from ctx")
   218  	}
   219  	if pr.Addr == net.Addr(nil) {
   220  		return nil, status.Error(codes.DataLoss, "failed to get peer address")
   221  	}
   222  	if s.security != "" {
   223  		// Check Auth info
   224  		var authType, serverName string
   225  		switch info := pr.AuthInfo.(type) {
   226  		case credentials.TLSInfo:
   227  			authType = info.AuthType()
   228  			serverName = info.State.ServerName
   229  		default:
   230  			return nil, status.Error(codes.Unauthenticated, "Unknown AuthInfo type")
   231  		}
   232  		if authType != s.security {
   233  			return nil, status.Errorf(codes.Unauthenticated, "Wrong auth type: got %q, want %q", authType, s.security)
   234  		}
   235  		if serverName != "x.test.example.com" {
   236  			return nil, status.Errorf(codes.Unauthenticated, "Unknown server name %q", serverName)
   237  		}
   238  	}
   239  	// Simulate some service delay.
   240  	time.Sleep(s.unaryCallSleepTime)
   241  
   242  	payload, err := newPayload(in.GetResponseType(), in.GetResponseSize())
   243  	if err != nil {
   244  		return nil, err
   245  	}
   246  
   247  	return &testpb.SimpleResponse{
   248  		Payload: payload,
   249  	}, nil
   250  }
   251  
   252  func (s *testServer) StreamingOutputCall(args *testpb.StreamingOutputCallRequest, stream testgrpc.TestService_StreamingOutputCallServer) error {
   253  	if md, ok := metadata.FromIncomingContext(stream.Context()); ok {
   254  		if _, exists := md[":authority"]; !exists {
   255  			return status.Errorf(codes.DataLoss, "expected an :authority metadata: %v", md)
   256  		}
   257  		// For testing purpose, returns an error if user-agent is failAppUA.
   258  		// To test that client gets the correct error.
   259  		if ua, ok := md["user-agent"]; !ok || strings.HasPrefix(ua[0], failAppUA) {
   260  			return status.Error(codes.DataLoss, "error for testing: "+failAppUA)
   261  		}
   262  	}
   263  	cs := args.GetResponseParameters()
   264  	for _, c := range cs {
   265  		if us := c.GetIntervalUs(); us > 0 {
   266  			time.Sleep(time.Duration(us) * time.Microsecond)
   267  		}
   268  
   269  		payload, err := newPayload(args.GetResponseType(), c.GetSize())
   270  		if err != nil {
   271  			return err
   272  		}
   273  
   274  		if err := stream.Send(&testpb.StreamingOutputCallResponse{
   275  			Payload: payload,
   276  		}); err != nil {
   277  			return err
   278  		}
   279  	}
   280  	return nil
   281  }
   282  
   283  func (s *testServer) StreamingInputCall(stream testgrpc.TestService_StreamingInputCallServer) error {
   284  	var sum int
   285  	for {
   286  		in, err := stream.Recv()
   287  		if err == io.EOF {
   288  			return stream.SendAndClose(&testpb.StreamingInputCallResponse{
   289  				AggregatedPayloadSize: int32(sum),
   290  			})
   291  		}
   292  		if err != nil {
   293  			return err
   294  		}
   295  		p := in.GetPayload().GetBody()
   296  		sum += len(p)
   297  		if s.earlyFail {
   298  			return status.Error(codes.NotFound, "not found")
   299  		}
   300  	}
   301  }
   302  
   303  func (s *testServer) FullDuplexCall(stream testgrpc.TestService_FullDuplexCallServer) error {
   304  	md, ok := metadata.FromIncomingContext(stream.Context())
   305  	if ok {
   306  		if s.setAndSendHeader {
   307  			if err := stream.SetHeader(md); err != nil {
   308  				return status.Errorf(status.Code(err), "%v.SetHeader(_, %v) = %v, want <nil>", stream, md, err)
   309  			}
   310  			if err := stream.SendHeader(testMetadata2); err != nil {
   311  				return status.Errorf(status.Code(err), "%v.SendHeader(_, %v) = %v, want <nil>", stream, testMetadata2, err)
   312  			}
   313  		} else if s.setHeaderOnly {
   314  			if err := stream.SetHeader(md); err != nil {
   315  				return status.Errorf(status.Code(err), "%v.SetHeader(_, %v) = %v, want <nil>", stream, md, err)
   316  			}
   317  			if err := stream.SetHeader(testMetadata2); err != nil {
   318  				return status.Errorf(status.Code(err), "%v.SetHeader(_, %v) = %v, want <nil>", stream, testMetadata2, err)
   319  			}
   320  		} else {
   321  			if err := stream.SendHeader(md); err != nil {
   322  				return status.Errorf(status.Code(err), "%v.SendHeader(%v) = %v, want %v", stream, md, err, nil)
   323  			}
   324  		}
   325  		stream.SetTrailer(testTrailerMetadata)
   326  		if s.multipleSetTrailer {
   327  			stream.SetTrailer(testTrailerMetadata2)
   328  		}
   329  	}
   330  	for {
   331  		in, err := stream.Recv()
   332  		if err == io.EOF {
   333  			// read done.
   334  			return nil
   335  		}
   336  		if err != nil {
   337  			// to facilitate testSvrWriteStatusEarlyWrite
   338  			if status.Code(err) == codes.ResourceExhausted {
   339  				return status.Errorf(codes.Internal, "fake error for test testSvrWriteStatusEarlyWrite. true error: %s", err.Error())
   340  			}
   341  			return err
   342  		}
   343  		cs := in.GetResponseParameters()
   344  		for _, c := range cs {
   345  			if us := c.GetIntervalUs(); us > 0 {
   346  				time.Sleep(time.Duration(us) * time.Microsecond)
   347  			}
   348  
   349  			payload, err := newPayload(in.GetResponseType(), c.GetSize())
   350  			if err != nil {
   351  				return err
   352  			}
   353  
   354  			if err := stream.Send(&testpb.StreamingOutputCallResponse{
   355  				Payload: payload,
   356  			}); err != nil {
   357  				// to facilitate testSvrWriteStatusEarlyWrite
   358  				if status.Code(err) == codes.ResourceExhausted {
   359  					return status.Errorf(codes.Internal, "fake error for test testSvrWriteStatusEarlyWrite. true error: %s", err.Error())
   360  				}
   361  				return err
   362  			}
   363  		}
   364  	}
   365  }
   366  
   367  func (s *testServer) HalfDuplexCall(stream testgrpc.TestService_HalfDuplexCallServer) error {
   368  	var msgBuf []*testpb.StreamingOutputCallRequest
   369  	for {
   370  		in, err := stream.Recv()
   371  		if err == io.EOF {
   372  			// read done.
   373  			break
   374  		}
   375  		if err != nil {
   376  			return err
   377  		}
   378  		msgBuf = append(msgBuf, in)
   379  	}
   380  	for _, m := range msgBuf {
   381  		cs := m.GetResponseParameters()
   382  		for _, c := range cs {
   383  			if us := c.GetIntervalUs(); us > 0 {
   384  				time.Sleep(time.Duration(us) * time.Microsecond)
   385  			}
   386  
   387  			payload, err := newPayload(m.GetResponseType(), c.GetSize())
   388  			if err != nil {
   389  				return err
   390  			}
   391  
   392  			if err := stream.Send(&testpb.StreamingOutputCallResponse{
   393  				Payload: payload,
   394  			}); err != nil {
   395  				return err
   396  			}
   397  		}
   398  	}
   399  	return nil
   400  }
   401  
   402  type env struct {
   403  	name         string
   404  	network      string // The type of network such as tcp, unix, etc.
   405  	security     string // The security protocol such as TLS, SSH, etc.
   406  	httpHandler  bool   // whether to use the http.Handler ServerTransport; requires TLS
   407  	balancer     string // One of "round_robin", "pick_first", or "".
   408  	customDialer func(string, string, time.Duration) (net.Conn, error)
   409  }
   410  
   411  func (e env) runnable() bool {
   412  	if runtime.GOOS == "windows" && e.network == "unix" {
   413  		return false
   414  	}
   415  	return true
   416  }
   417  
   418  func (e env) dialer(addr string, timeout time.Duration) (net.Conn, error) {
   419  	if e.customDialer != nil {
   420  		return e.customDialer(e.network, addr, timeout)
   421  	}
   422  	return net.DialTimeout(e.network, addr, timeout)
   423  }
   424  
   425  var (
   426  	tcpClearEnv   = env{name: "tcp-clear-v1-balancer", network: "tcp"}
   427  	tcpTLSEnv     = env{name: "tcp-tls-v1-balancer", network: "tcp", security: "tls"}
   428  	tcpClearRREnv = env{name: "tcp-clear", network: "tcp", balancer: "round_robin"}
   429  	tcpTLSRREnv   = env{name: "tcp-tls", network: "tcp", security: "tls", balancer: "round_robin"}
   430  	handlerEnv    = env{name: "handler-tls", network: "tcp", security: "tls", httpHandler: true, balancer: "round_robin"}
   431  	noBalancerEnv = env{name: "no-balancer", network: "tcp", security: "tls"}
   432  	allEnv        = []env{tcpClearEnv, tcpTLSEnv, tcpClearRREnv, tcpTLSRREnv, handlerEnv, noBalancerEnv}
   433  )
   434  
   435  var onlyEnv = flag.String("only_env", "", "If non-empty, one of 'tcp-clear', 'tcp-tls', 'unix-clear', 'unix-tls', or 'handler-tls' to only run the tests for that environment. Empty means all.")
   436  
   437  func listTestEnv() (envs []env) {
   438  	if *onlyEnv != "" {
   439  		for _, e := range allEnv {
   440  			if e.name == *onlyEnv {
   441  				if !e.runnable() {
   442  					panic(fmt.Sprintf("--only_env environment %q does not run on %s", *onlyEnv, runtime.GOOS))
   443  				}
   444  				return []env{e}
   445  			}
   446  		}
   447  		panic(fmt.Sprintf("invalid --only_env value %q", *onlyEnv))
   448  	}
   449  	for _, e := range allEnv {
   450  		if e.runnable() {
   451  			envs = append(envs, e)
   452  		}
   453  	}
   454  	return envs
   455  }
   456  
   457  // test is an end-to-end test. It should be created with the newTest
   458  // func, modified as needed, and then started with its startServer method.
   459  // It should be cleaned up with the tearDown method.
   460  type test struct {
   461  	// The following are setup in newTest().
   462  	t      *testing.T
   463  	e      env
   464  	ctx    context.Context // valid for life of test, before tearDown
   465  	cancel context.CancelFunc
   466  
   467  	// The following knobs are for the server-side, and should be set after
   468  	// calling newTest() and before calling startServer().
   469  
   470  	// whether or not to expose the server's health via the default health
   471  	// service implementation.
   472  	enableHealthServer bool
   473  	// In almost all cases, one should set the 'enableHealthServer' flag above to
   474  	// expose the server's health using the default health service
   475  	// implementation. This should only be used when a non-default health service
   476  	// implementation is required.
   477  	healthServer            healthgrpc.HealthServer
   478  	maxStream               uint32
   479  	tapHandle               tap.ServerInHandle
   480  	maxServerMsgSize        *int
   481  	maxServerReceiveMsgSize *int
   482  	maxServerSendMsgSize    *int
   483  	maxServerHeaderListSize *uint32
   484  	// Used to test the deprecated API WithCompressor and WithDecompressor.
   485  	serverCompression           bool
   486  	unknownHandler              grpc.StreamHandler
   487  	unaryServerInt              grpc.UnaryServerInterceptor
   488  	streamServerInt             grpc.StreamServerInterceptor
   489  	serverInitialWindowSize     int32
   490  	serverInitialConnWindowSize int32
   491  	customServerOptions         []grpc.ServerOption
   492  
   493  	// The following knobs are for the client-side, and should be set after
   494  	// calling newTest() and before calling clientConn().
   495  	maxClientMsgSize        *int
   496  	maxClientReceiveMsgSize *int
   497  	maxClientSendMsgSize    *int
   498  	maxClientHeaderListSize *uint32
   499  	userAgent               string
   500  	// Used to test the deprecated API WithCompressor and WithDecompressor.
   501  	clientCompression bool
   502  	// Used to test the new compressor registration API UseCompressor.
   503  	clientUseCompression bool
   504  	// clientNopCompression is set to create a compressor whose type is not supported.
   505  	clientNopCompression        bool
   506  	unaryClientInt              grpc.UnaryClientInterceptor
   507  	streamClientInt             grpc.StreamClientInterceptor
   508  	clientInitialWindowSize     int32
   509  	clientInitialConnWindowSize int32
   510  	perRPCCreds                 credentials.PerRPCCredentials
   511  	customDialOptions           []grpc.DialOption
   512  	resolverScheme              string
   513  
   514  	// These are are set once startServer is called. The common case is to have
   515  	// only one testServer.
   516  	srv     stopper
   517  	hSrv    healthgrpc.HealthServer
   518  	srvAddr string
   519  
   520  	// These are are set once startServers is called.
   521  	srvs     []stopper
   522  	hSrvs    []healthgrpc.HealthServer
   523  	srvAddrs []string
   524  
   525  	cc          *grpc.ClientConn // nil until requested via clientConn
   526  	restoreLogs func()           // nil unless declareLogNoise is used
   527  }
   528  
   529  type stopper interface {
   530  	Stop()
   531  	GracefulStop()
   532  }
   533  
   534  func (te *test) tearDown() {
   535  	if te.cancel != nil {
   536  		te.cancel()
   537  		te.cancel = nil
   538  	}
   539  
   540  	if te.cc != nil {
   541  		te.cc.Close()
   542  		te.cc = nil
   543  	}
   544  
   545  	if te.restoreLogs != nil {
   546  		te.restoreLogs()
   547  		te.restoreLogs = nil
   548  	}
   549  
   550  	if te.srv != nil {
   551  		te.srv.Stop()
   552  	}
   553  	for _, s := range te.srvs {
   554  		s.Stop()
   555  	}
   556  }
   557  
   558  // newTest returns a new test using the provided testing.T and
   559  // environment.  It is returned with default values. Tests should
   560  // modify it before calling its startServer and clientConn methods.
   561  func newTest(t *testing.T, e env) *test {
   562  	te := &test{
   563  		t:         t,
   564  		e:         e,
   565  		maxStream: math.MaxUint32,
   566  	}
   567  	te.ctx, te.cancel = context.WithTimeout(context.Background(), defaultTestTimeout)
   568  	return te
   569  }
   570  
   571  func (te *test) listenAndServe(ts testgrpc.TestServiceServer, listen func(network, address string) (net.Listener, error)) net.Listener {
   572  	te.t.Helper()
   573  	te.t.Logf("Running test in %s environment...", te.e.name)
   574  	sopts := []grpc.ServerOption{grpc.MaxConcurrentStreams(te.maxStream)}
   575  	if te.maxServerMsgSize != nil {
   576  		sopts = append(sopts, grpc.MaxMsgSize(*te.maxServerMsgSize))
   577  	}
   578  	if te.maxServerReceiveMsgSize != nil {
   579  		sopts = append(sopts, grpc.MaxRecvMsgSize(*te.maxServerReceiveMsgSize))
   580  	}
   581  	if te.maxServerSendMsgSize != nil {
   582  		sopts = append(sopts, grpc.MaxSendMsgSize(*te.maxServerSendMsgSize))
   583  	}
   584  	if te.maxServerHeaderListSize != nil {
   585  		sopts = append(sopts, grpc.MaxHeaderListSize(*te.maxServerHeaderListSize))
   586  	}
   587  	if te.tapHandle != nil {
   588  		sopts = append(sopts, grpc.InTapHandle(te.tapHandle))
   589  	}
   590  	if te.serverCompression {
   591  		sopts = append(sopts,
   592  			grpc.RPCCompressor(grpc.NewGZIPCompressor()),
   593  			grpc.RPCDecompressor(grpc.NewGZIPDecompressor()),
   594  		)
   595  	}
   596  	if te.unaryServerInt != nil {
   597  		sopts = append(sopts, grpc.UnaryInterceptor(te.unaryServerInt))
   598  	}
   599  	if te.streamServerInt != nil {
   600  		sopts = append(sopts, grpc.StreamInterceptor(te.streamServerInt))
   601  	}
   602  	if te.unknownHandler != nil {
   603  		sopts = append(sopts, grpc.UnknownServiceHandler(te.unknownHandler))
   604  	}
   605  	if te.serverInitialWindowSize > 0 {
   606  		sopts = append(sopts, grpc.InitialWindowSize(te.serverInitialWindowSize))
   607  	}
   608  	if te.serverInitialConnWindowSize > 0 {
   609  		sopts = append(sopts, grpc.InitialConnWindowSize(te.serverInitialConnWindowSize))
   610  	}
   611  	la := "localhost:0"
   612  	switch te.e.network {
   613  	case "unix":
   614  		la = "/tmp/testsock" + fmt.Sprintf("%d", time.Now().UnixNano())
   615  		syscall.Unlink(la)
   616  	}
   617  	lis, err := listen(te.e.network, la)
   618  	if err != nil {
   619  		te.t.Fatalf("Failed to listen: %v", err)
   620  	}
   621  	if te.e.security == "tls" {
   622  		creds, err := credentials.NewServerTLSFromFile(testdata.Path("x509/server1_cert.pem"), testdata.Path("x509/server1_key.pem"))
   623  		if err != nil {
   624  			te.t.Fatalf("Failed to generate credentials %v", err)
   625  		}
   626  		sopts = append(sopts, grpc.Creds(creds))
   627  	}
   628  	sopts = append(sopts, te.customServerOptions...)
   629  	s := grpc.NewServer(sopts...)
   630  	if ts != nil {
   631  		testgrpc.RegisterTestServiceServer(s, ts)
   632  	}
   633  
   634  	// Create a new default health server if enableHealthServer is set, or use
   635  	// the provided one.
   636  	hs := te.healthServer
   637  	if te.enableHealthServer {
   638  		hs = health.NewServer()
   639  	}
   640  	if hs != nil {
   641  		healthgrpc.RegisterHealthServer(s, hs)
   642  	}
   643  
   644  	addr := la
   645  	switch te.e.network {
   646  	case "unix":
   647  	default:
   648  		_, port, err := net.SplitHostPort(lis.Addr().String())
   649  		if err != nil {
   650  			te.t.Fatalf("Failed to parse listener address: %v", err)
   651  		}
   652  		addr = "localhost:" + port
   653  	}
   654  
   655  	te.srv = s
   656  	te.hSrv = hs
   657  	te.srvAddr = addr
   658  
   659  	if te.e.httpHandler {
   660  		if te.e.security != "tls" {
   661  			te.t.Fatalf("unsupported environment settings")
   662  		}
   663  		cert, err := tls.LoadX509KeyPair(testdata.Path("x509/server1_cert.pem"), testdata.Path("x509/server1_key.pem"))
   664  		if err != nil {
   665  			te.t.Fatal("tls.LoadX509KeyPair(server1.pem, server1.key) failed: ", err)
   666  		}
   667  		hs := &http.Server{
   668  			Handler:   s,
   669  			TLSConfig: &tls.Config{Certificates: []tls.Certificate{cert}},
   670  		}
   671  		if err := http2.ConfigureServer(hs, &http2.Server{MaxConcurrentStreams: te.maxStream}); err != nil {
   672  			te.t.Fatal("http2.ConfigureServer(_, _) failed: ", err)
   673  		}
   674  		te.srv = wrapHS{hs}
   675  		tlsListener := tls.NewListener(lis, hs.TLSConfig)
   676  		go hs.Serve(tlsListener)
   677  		return lis
   678  	}
   679  
   680  	go s.Serve(lis)
   681  	return lis
   682  }
   683  
   684  type wrapHS struct {
   685  	s *http.Server
   686  }
   687  
   688  func (w wrapHS) GracefulStop() {
   689  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   690  	defer cancel()
   691  	w.s.Shutdown(ctx)
   692  }
   693  
   694  func (w wrapHS) Stop() {
   695  	w.s.Close()
   696  	w.s.Handler.(*grpc.Server).Stop()
   697  }
   698  
   699  func (te *test) startServerWithConnControl(ts testgrpc.TestServiceServer) *listenerWrapper {
   700  	l := te.listenAndServe(ts, listenWithConnControl)
   701  	return l.(*listenerWrapper)
   702  }
   703  
   704  // startServer starts a gRPC server exposing the provided TestService
   705  // implementation. Callers should defer a call to te.tearDown to clean up
   706  func (te *test) startServer(ts testgrpc.TestServiceServer) {
   707  	te.t.Helper()
   708  	te.listenAndServe(ts, net.Listen)
   709  }
   710  
   711  // startServers starts 'num' gRPC servers exposing the provided TestService.
   712  func (te *test) startServers(ts testgrpc.TestServiceServer, num int) {
   713  	for i := 0; i < num; i++ {
   714  		te.startServer(ts)
   715  		te.srvs = append(te.srvs, te.srv.(*grpc.Server))
   716  		te.hSrvs = append(te.hSrvs, te.hSrv)
   717  		te.srvAddrs = append(te.srvAddrs, te.srvAddr)
   718  		te.srv = nil
   719  		te.hSrv = nil
   720  		te.srvAddr = ""
   721  	}
   722  }
   723  
   724  // setHealthServingStatus is a helper function to set the health status.
   725  func (te *test) setHealthServingStatus(service string, status healthpb.HealthCheckResponse_ServingStatus) {
   726  	hs, ok := te.hSrv.(*health.Server)
   727  	if !ok {
   728  		panic(fmt.Sprintf("SetServingStatus(%v, %v) called for health server of type %T", service, status, hs))
   729  	}
   730  	hs.SetServingStatus(service, status)
   731  }
   732  
   733  type nopCompressor struct {
   734  	grpc.Compressor
   735  }
   736  
   737  // newNopCompressor creates a compressor to test the case that type is not supported.
   738  func newNopCompressor() grpc.Compressor {
   739  	return &nopCompressor{grpc.NewGZIPCompressor()}
   740  }
   741  
   742  func (c *nopCompressor) Type() string {
   743  	return "nop"
   744  }
   745  
   746  type nopDecompressor struct {
   747  	grpc.Decompressor
   748  }
   749  
   750  // newNopDecompressor creates a decompressor to test the case that type is not supported.
   751  func newNopDecompressor() grpc.Decompressor {
   752  	return &nopDecompressor{grpc.NewGZIPDecompressor()}
   753  }
   754  
   755  func (d *nopDecompressor) Type() string {
   756  	return "nop"
   757  }
   758  
   759  func (te *test) configDial(opts ...grpc.DialOption) ([]grpc.DialOption, string) {
   760  	opts = append(opts, grpc.WithDialer(te.e.dialer), grpc.WithUserAgent(te.userAgent))
   761  
   762  	if te.clientCompression {
   763  		opts = append(opts,
   764  			grpc.WithCompressor(grpc.NewGZIPCompressor()),
   765  			grpc.WithDecompressor(grpc.NewGZIPDecompressor()),
   766  		)
   767  	}
   768  	if te.clientUseCompression {
   769  		opts = append(opts, grpc.WithDefaultCallOptions(grpc.UseCompressor("gzip")))
   770  	}
   771  	if te.clientNopCompression {
   772  		opts = append(opts,
   773  			grpc.WithCompressor(newNopCompressor()),
   774  			grpc.WithDecompressor(newNopDecompressor()),
   775  		)
   776  	}
   777  	if te.unaryClientInt != nil {
   778  		opts = append(opts, grpc.WithUnaryInterceptor(te.unaryClientInt))
   779  	}
   780  	if te.streamClientInt != nil {
   781  		opts = append(opts, grpc.WithStreamInterceptor(te.streamClientInt))
   782  	}
   783  	if te.maxClientMsgSize != nil {
   784  		opts = append(opts, grpc.WithMaxMsgSize(*te.maxClientMsgSize))
   785  	}
   786  	if te.maxClientReceiveMsgSize != nil {
   787  		opts = append(opts, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(*te.maxClientReceiveMsgSize)))
   788  	}
   789  	if te.maxClientSendMsgSize != nil {
   790  		opts = append(opts, grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(*te.maxClientSendMsgSize)))
   791  	}
   792  	if te.maxClientHeaderListSize != nil {
   793  		opts = append(opts, grpc.WithMaxHeaderListSize(*te.maxClientHeaderListSize))
   794  	}
   795  	switch te.e.security {
   796  	case "tls":
   797  		creds, err := credentials.NewClientTLSFromFile(testdata.Path("x509/server_ca_cert.pem"), "x.test.example.com")
   798  		if err != nil {
   799  			te.t.Fatalf("Failed to load credentials: %v", err)
   800  		}
   801  		opts = append(opts, grpc.WithTransportCredentials(creds))
   802  	case "empty":
   803  		// Don't add any transport creds option.
   804  	default:
   805  		opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
   806  	}
   807  	// TODO(bar) switch balancer case "pick_first".
   808  	var scheme string
   809  	if te.resolverScheme == "" {
   810  		scheme = "passthrough:///"
   811  	} else {
   812  		scheme = te.resolverScheme + ":///"
   813  	}
   814  	if te.e.balancer != "" {
   815  		opts = append(opts, grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, te.e.balancer)))
   816  	}
   817  	if te.clientInitialWindowSize > 0 {
   818  		opts = append(opts, grpc.WithInitialWindowSize(te.clientInitialWindowSize))
   819  	}
   820  	if te.clientInitialConnWindowSize > 0 {
   821  		opts = append(opts, grpc.WithInitialConnWindowSize(te.clientInitialConnWindowSize))
   822  	}
   823  	if te.perRPCCreds != nil {
   824  		opts = append(opts, grpc.WithPerRPCCredentials(te.perRPCCreds))
   825  	}
   826  	if te.srvAddr == "" {
   827  		te.srvAddr = "client.side.only.test"
   828  	}
   829  	opts = append(opts, te.customDialOptions...)
   830  	return opts, scheme
   831  }
   832  
   833  func (te *test) clientConnWithConnControl() (*grpc.ClientConn, *dialerWrapper) {
   834  	if te.cc != nil {
   835  		return te.cc, nil
   836  	}
   837  	opts, scheme := te.configDial()
   838  	dw := &dialerWrapper{}
   839  	// overwrite the dialer before
   840  	opts = append(opts, grpc.WithDialer(dw.dialer))
   841  	var err error
   842  	te.cc, err = grpc.Dial(scheme+te.srvAddr, opts...)
   843  	if err != nil {
   844  		te.t.Fatalf("Dial(%q) = %v", scheme+te.srvAddr, err)
   845  	}
   846  	return te.cc, dw
   847  }
   848  
   849  func (te *test) clientConn(opts ...grpc.DialOption) *grpc.ClientConn {
   850  	if te.cc != nil {
   851  		return te.cc
   852  	}
   853  	var scheme string
   854  	opts, scheme = te.configDial(opts...)
   855  	var err error
   856  	te.cc, err = grpc.Dial(scheme+te.srvAddr, opts...)
   857  	if err != nil {
   858  		te.t.Fatalf("Dial(%q) = %v", scheme+te.srvAddr, err)
   859  	}
   860  	return te.cc
   861  }
   862  
   863  func (te *test) declareLogNoise(phrases ...string) {
   864  	te.restoreLogs = declareLogNoise(te.t, phrases...)
   865  }
   866  
   867  func (te *test) withServerTester(fn func(st *serverTester)) {
   868  	c, err := te.e.dialer(te.srvAddr, 10*time.Second)
   869  	if err != nil {
   870  		te.t.Fatal(err)
   871  	}
   872  	defer c.Close()
   873  	if te.e.security == "tls" {
   874  		c = tls.Client(c, &tls.Config{
   875  			InsecureSkipVerify: true,
   876  			NextProtos:         []string{http2.NextProtoTLS},
   877  		})
   878  	}
   879  	st := newServerTesterFromConn(te.t, c)
   880  	st.greet()
   881  	fn(st)
   882  }
   883  
   884  type lazyConn struct {
   885  	net.Conn
   886  	beLazy int32
   887  }
   888  
   889  // possible conn closed errors.
   890  const possibleConnResetMsg = "connection reset by peer"
   891  const possibleEOFMsg = "error reading from server: EOF"
   892  
   893  // isConnClosedErr checks the error msg for possible conn closed messages. There
   894  // is a raceyness in the timing of when TCP packets are sent from client to
   895  // server, and when we tell the server to stop, so we need to check for both of
   896  // these possible error messages:
   897  //  1. If the call to ss.S.Stop() causes the server's sockets to close while
   898  //     there's still in-fight data from the client on the TCP connection, then
   899  //     the kernel can send an RST back to the client (also see
   900  //     https://stackoverflow.com/questions/33053507/econnreset-in-send-linux-c).
   901  //     Note that while this condition is expected to be rare due to the
   902  //     test httpServer start synchronization, in theory it should be possible,
   903  //     e.g. if the client sends a BDP ping at the right time.
   904  //  2. If, for example, the call to ss.S.Stop() happens after the RPC headers
   905  //     have been received at the server, then the TCP connection can shutdown
   906  //     gracefully when the server's socket closes.
   907  //  3. If there is an actual io.EOF received because the client stopped the stream.
   908  func isConnClosedErr(err error) bool {
   909  	errContainsConnResetMsg := strings.Contains(err.Error(), possibleConnResetMsg)
   910  	errContainsEOFMsg := strings.Contains(err.Error(), possibleEOFMsg)
   911  
   912  	return errContainsConnResetMsg || errContainsEOFMsg || err == io.EOF
   913  }
   914  
   915  func (l *lazyConn) Write(b []byte) (int, error) {
   916  	if atomic.LoadInt32(&(l.beLazy)) == 1 {
   917  		time.Sleep(time.Second)
   918  	}
   919  	return l.Conn.Write(b)
   920  }
   921  
   922  func (s) TestContextDeadlineNotIgnored(t *testing.T) {
   923  	e := noBalancerEnv
   924  	var lc *lazyConn
   925  	e.customDialer = func(network, addr string, timeout time.Duration) (net.Conn, error) {
   926  		conn, err := net.DialTimeout(network, addr, timeout)
   927  		if err != nil {
   928  			return nil, err
   929  		}
   930  		lc = &lazyConn{Conn: conn}
   931  		return lc, nil
   932  	}
   933  
   934  	te := newTest(t, e)
   935  	te.startServer(&testServer{security: e.security})
   936  	defer te.tearDown()
   937  
   938  	cc := te.clientConn()
   939  	tc := testgrpc.NewTestServiceClient(cc)
   940  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   941  	if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
   942  		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
   943  	}
   944  	cancel()
   945  	atomic.StoreInt32(&(lc.beLazy), 1)
   946  	ctx, cancel = context.WithTimeout(context.Background(), defaultTestShortTimeout)
   947  	defer cancel()
   948  	t1 := time.Now()
   949  	if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded {
   950  		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, context.DeadlineExceeded", err)
   951  	}
   952  	if time.Since(t1) > 2*time.Second {
   953  		t.Fatalf("TestService/EmptyCall(_, _) ran over the deadline")
   954  	}
   955  }
   956  
   957  func (s) TestTimeoutOnDeadServer(t *testing.T) {
   958  	for _, e := range listTestEnv() {
   959  		testTimeoutOnDeadServer(t, e)
   960  	}
   961  }
   962  
   963  func testTimeoutOnDeadServer(t *testing.T, e env) {
   964  	te := newTest(t, e)
   965  	te.userAgent = testAppUA
   966  	te.startServer(&testServer{security: e.security})
   967  	defer te.tearDown()
   968  
   969  	cc := te.clientConn()
   970  	tc := testgrpc.NewTestServiceClient(cc)
   971  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   972  	defer cancel()
   973  	if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
   974  		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
   975  	}
   976  	// Wait for the client to report READY, stop the server, then wait for the
   977  	// client to notice the connection is gone.
   978  	testutils.AwaitState(ctx, t, cc, connectivity.Ready)
   979  	te.srv.Stop()
   980  	testutils.AwaitNotState(ctx, t, cc, connectivity.Ready)
   981  	ctx, cancel = context.WithTimeout(ctx, defaultTestShortTimeout)
   982  	defer cancel()
   983  	if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded {
   984  		t.Fatalf("TestService/EmptyCall(%v, _) = _, %v, want _, error code: %s", ctx, err, codes.DeadlineExceeded)
   985  	}
   986  	awaitNewConnLogOutput()
   987  }
   988  
   989  func (s) TestServerGracefulStopIdempotent(t *testing.T) {
   990  	for _, e := range listTestEnv() {
   991  		if e.name == "handler-tls" {
   992  			continue
   993  		}
   994  		testServerGracefulStopIdempotent(t, e)
   995  	}
   996  }
   997  
   998  func testServerGracefulStopIdempotent(t *testing.T, e env) {
   999  	te := newTest(t, e)
  1000  	te.userAgent = testAppUA
  1001  	te.startServer(&testServer{security: e.security})
  1002  	defer te.tearDown()
  1003  
  1004  	for i := 0; i < 3; i++ {
  1005  		te.srv.GracefulStop()
  1006  	}
  1007  }
  1008  
  1009  func (s) TestDetailedConnectionCloseErrorPropagatesToRPCError(t *testing.T) {
  1010  	rpcStartedOnServer := make(chan struct{})
  1011  	rpcDoneOnClient := make(chan struct{})
  1012  	defer close(rpcDoneOnClient)
  1013  	ss := &stubserver.StubServer{
  1014  		FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
  1015  			close(rpcStartedOnServer)
  1016  			<-rpcDoneOnClient
  1017  			return status.Error(codes.Internal, "arbitrary status")
  1018  		},
  1019  	}
  1020  	if err := ss.Start(nil); err != nil {
  1021  		t.Fatalf("Error starting endpoint server: %v", err)
  1022  	}
  1023  	defer ss.Stop()
  1024  
  1025  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  1026  	defer cancel()
  1027  
  1028  	// Start an RPC. Then, while the RPC is still being accepted or handled at
  1029  	// the server, abruptly stop the server, killing the connection. The RPC
  1030  	// error message should include details about the specific connection error
  1031  	// that was encountered.
  1032  	stream, err := ss.Client.FullDuplexCall(ctx)
  1033  	if err != nil {
  1034  		t.Fatalf("%v.FullDuplexCall = _, %v, want _, <nil>", ss.Client, err)
  1035  	}
  1036  	// Block until the RPC has been started on the server. This ensures that the
  1037  	// ClientConn will find a healthy connection for the RPC to go out on
  1038  	// initially, and that the TCP connection will shut down strictly after the
  1039  	// RPC has been started on it.
  1040  	<-rpcStartedOnServer
  1041  	ss.S.Stop()
  1042  	// The precise behavior of this test is subject to raceyness around the
  1043  	// timing of when TCP packets are sent from client to server, and when we
  1044  	// tell the server to stop, so we need to account for both possible error
  1045  	// messages.
  1046  	if _, err := stream.Recv(); err == io.EOF || !isConnClosedErr(err) {
  1047  		t.Fatalf("%v.Recv() = _, %v, want _, rpc error containing substring: %q OR %q", stream, err, possibleConnResetMsg, possibleEOFMsg)
  1048  	}
  1049  }
  1050  
  1051  func (s) TestFailFast(t *testing.T) {
  1052  	for _, e := range listTestEnv() {
  1053  		testFailFast(t, e)
  1054  	}
  1055  }
  1056  
  1057  func testFailFast(t *testing.T, e env) {
  1058  	te := newTest(t, e)
  1059  	te.userAgent = testAppUA
  1060  	te.startServer(&testServer{security: e.security})
  1061  	defer te.tearDown()
  1062  
  1063  	cc := te.clientConn()
  1064  	tc := testgrpc.NewTestServiceClient(cc)
  1065  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  1066  	defer cancel()
  1067  	if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
  1068  		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
  1069  	}
  1070  	// Stop the server and tear down all the existing connections.
  1071  	te.srv.Stop()
  1072  	// Loop until the server teardown is propagated to the client.
  1073  	for {
  1074  		if err := ctx.Err(); err != nil {
  1075  			t.Fatalf("EmptyCall did not return UNAVAILABLE before timeout")
  1076  		}
  1077  		_, err := tc.EmptyCall(ctx, &testpb.Empty{})
  1078  		if status.Code(err) == codes.Unavailable {
  1079  			break
  1080  		}
  1081  		t.Logf("%v.EmptyCall(_, _) = _, %v", tc, err)
  1082  		time.Sleep(10 * time.Millisecond)
  1083  	}
  1084  	// The client keeps reconnecting and ongoing fail-fast RPCs should fail with code.Unavailable.
  1085  	if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Unavailable {
  1086  		t.Fatalf("TestService/EmptyCall(_, _, _) = _, %v, want _, error code: %s", err, codes.Unavailable)
  1087  	}
  1088  	if _, err := tc.StreamingInputCall(ctx); status.Code(err) != codes.Unavailable {
  1089  		t.Fatalf("TestService/StreamingInputCall(_) = _, %v, want _, error code: %s", err, codes.Unavailable)
  1090  	}
  1091  
  1092  	awaitNewConnLogOutput()
  1093  }
  1094  
  1095  func testServiceConfigSetup(t *testing.T, e env) *test {
  1096  	te := newTest(t, e)
  1097  	te.userAgent = testAppUA
  1098  	te.declareLogNoise(
  1099  		"Failed to dial : context canceled; please retry.",
  1100  	)
  1101  	return te
  1102  }
  1103  
  1104  func newInt(b int) (a *int) {
  1105  	return &b
  1106  }
  1107  
  1108  func (s) TestGetMethodConfig(t *testing.T) {
  1109  	te := testServiceConfigSetup(t, tcpClearRREnv)
  1110  	defer te.tearDown()
  1111  	r := manual.NewBuilderWithScheme("whatever")
  1112  
  1113  	te.resolverScheme = r.Scheme()
  1114  	cc := te.clientConn(grpc.WithResolvers(r))
  1115  	addrs := []resolver.Address{{Addr: te.srvAddr}}
  1116  	r.UpdateState(resolver.State{
  1117  		Addresses: addrs,
  1118  		ServiceConfig: parseServiceConfig(t, r, `{
  1119      "methodConfig": [
  1120          {
  1121              "name": [
  1122                  {
  1123                      "service": "grpc.testing.TestService",
  1124                      "method": "EmptyCall"
  1125                  }
  1126              ],
  1127              "waitForReady": true,
  1128              "timeout": ".001s"
  1129          },
  1130          {
  1131              "name": [
  1132                  {
  1133                      "service": "grpc.testing.TestService"
  1134                  }
  1135              ],
  1136              "waitForReady": false
  1137          }
  1138      ]
  1139  }`)})
  1140  
  1141  	tc := testgrpc.NewTestServiceClient(cc)
  1142  
  1143  	// Make sure service config has been processed by grpc.
  1144  	for {
  1145  		if cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall").WaitForReady != nil {
  1146  			break
  1147  		}
  1148  		time.Sleep(time.Millisecond)
  1149  	}
  1150  
  1151  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  1152  	defer cancel()
  1153  	// The following RPCs are expected to become non-fail-fast ones with 1ms deadline.
  1154  	var err error
  1155  	if _, err = tc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded {
  1156  		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
  1157  	}
  1158  
  1159  	r.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: parseServiceConfig(t, r, `{
  1160      "methodConfig": [
  1161          {
  1162              "name": [
  1163                  {
  1164                      "service": "grpc.testing.TestService",
  1165                      "method": "UnaryCall"
  1166                  }
  1167              ],
  1168              "waitForReady": true,
  1169              "timeout": ".001s"
  1170          },
  1171          {
  1172              "name": [
  1173                  {
  1174                      "service": "grpc.testing.TestService"
  1175                  }
  1176              ],
  1177              "waitForReady": false
  1178          }
  1179      ]
  1180  }`)})
  1181  
  1182  	// Make sure service config has been processed by grpc.
  1183  	for {
  1184  		if mc := cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall"); mc.WaitForReady != nil && !*mc.WaitForReady {
  1185  			break
  1186  		}
  1187  		time.Sleep(time.Millisecond)
  1188  	}
  1189  	// The following RPCs are expected to become fail-fast.
  1190  	if _, err = tc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Unavailable {
  1191  		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.Unavailable)
  1192  	}
  1193  }
  1194  
  1195  func (s) TestServiceConfigWaitForReady(t *testing.T) {
  1196  	te := testServiceConfigSetup(t, tcpClearRREnv)
  1197  	defer te.tearDown()
  1198  	r := manual.NewBuilderWithScheme("whatever")
  1199  
  1200  	// Case1: Client API set failfast to be false, and service config set wait_for_ready to be false, Client API should win, and the rpc will wait until deadline exceeds.
  1201  	te.resolverScheme = r.Scheme()
  1202  	cc := te.clientConn(grpc.WithResolvers(r))
  1203  	addrs := []resolver.Address{{Addr: te.srvAddr}}
  1204  	r.UpdateState(resolver.State{
  1205  		Addresses: addrs,
  1206  		ServiceConfig: parseServiceConfig(t, r, `{
  1207      "methodConfig": [
  1208          {
  1209              "name": [
  1210                  {
  1211                      "service": "grpc.testing.TestService",
  1212                      "method": "EmptyCall"
  1213                  },
  1214                  {
  1215                      "service": "grpc.testing.TestService",
  1216                      "method": "FullDuplexCall"
  1217                  }
  1218              ],
  1219              "waitForReady": false,
  1220              "timeout": ".001s"
  1221          }
  1222      ]
  1223  }`)})
  1224  
  1225  	tc := testgrpc.NewTestServiceClient(cc)
  1226  
  1227  	// Make sure service config has been processed by grpc.
  1228  	for {
  1229  		if cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").WaitForReady != nil {
  1230  			break
  1231  		}
  1232  		time.Sleep(time.Millisecond)
  1233  	}
  1234  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  1235  	defer cancel()
  1236  	// The following RPCs are expected to become non-fail-fast ones with 1ms deadline.
  1237  	var err error
  1238  	if _, err = tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded {
  1239  		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
  1240  	}
  1241  	if _, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded {
  1242  		t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded)
  1243  	}
  1244  
  1245  	// Generate a service config update.
  1246  	// Case2:Client API set failfast to be false, and service config set wait_for_ready to be true, and the rpc will wait until deadline exceeds.
  1247  	r.UpdateState(resolver.State{
  1248  		Addresses: addrs,
  1249  		ServiceConfig: parseServiceConfig(t, r, `{
  1250      "methodConfig": [
  1251          {
  1252              "name": [
  1253                  {
  1254                      "service": "grpc.testing.TestService",
  1255                      "method": "EmptyCall"
  1256                  },
  1257                  {
  1258                      "service": "grpc.testing.TestService",
  1259                      "method": "FullDuplexCall"
  1260                  }
  1261              ],
  1262              "waitForReady": true,
  1263              "timeout": ".001s"
  1264          }
  1265      ]
  1266  }`)})
  1267  
  1268  	// Wait for the new service config to take effect.
  1269  	for {
  1270  		if mc := cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall"); mc.WaitForReady != nil && *mc.WaitForReady {
  1271  			break
  1272  		}
  1273  		time.Sleep(time.Millisecond)
  1274  	}
  1275  	// The following RPCs are expected to become non-fail-fast ones with 1ms deadline.
  1276  	if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded {
  1277  		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
  1278  	}
  1279  	if _, err := tc.FullDuplexCall(ctx); status.Code(err) != codes.DeadlineExceeded {
  1280  		t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded)
  1281  	}
  1282  }
  1283  
  1284  func (s) TestServiceConfigTimeout(t *testing.T) {
  1285  	te := testServiceConfigSetup(t, tcpClearRREnv)
  1286  	defer te.tearDown()
  1287  	r := manual.NewBuilderWithScheme("whatever")
  1288  
  1289  	// Case1: Client API sets timeout to be 1ns and ServiceConfig sets timeout to be 1hr. Timeout should be 1ns (min of 1ns and 1hr) and the rpc will wait until deadline exceeds.
  1290  	te.resolverScheme = r.Scheme()
  1291  	cc := te.clientConn(grpc.WithResolvers(r))
  1292  	addrs := []resolver.Address{{Addr: te.srvAddr}}
  1293  	r.UpdateState(resolver.State{
  1294  		Addresses: addrs,
  1295  		ServiceConfig: parseServiceConfig(t, r, `{
  1296      "methodConfig": [
  1297          {
  1298              "name": [
  1299                  {
  1300                      "service": "grpc.testing.TestService",
  1301                      "method": "EmptyCall"
  1302                  },
  1303                  {
  1304                      "service": "grpc.testing.TestService",
  1305                      "method": "FullDuplexCall"
  1306                  }
  1307              ],
  1308              "waitForReady": true,
  1309              "timeout": "3600s"
  1310          }
  1311      ]
  1312  }`)})
  1313  
  1314  	tc := testgrpc.NewTestServiceClient(cc)
  1315  
  1316  	// Make sure service config has been processed by grpc.
  1317  	for {
  1318  		if cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").Timeout != nil {
  1319  			break
  1320  		}
  1321  		time.Sleep(time.Millisecond)
  1322  	}
  1323  
  1324  	// The following RPCs are expected to become non-fail-fast ones with 1ns deadline.
  1325  	var err error
  1326  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
  1327  	if _, err = tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded {
  1328  		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
  1329  	}
  1330  	cancel()
  1331  
  1332  	ctx, cancel = context.WithTimeout(context.Background(), defaultTestShortTimeout)
  1333  	if _, err = tc.FullDuplexCall(ctx, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded {
  1334  		t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded)
  1335  	}
  1336  	cancel()
  1337  
  1338  	// Generate a service config update.
  1339  	// Case2: Client API sets timeout to be 1hr and ServiceConfig sets timeout to be 1ns. Timeout should be 1ns (min of 1ns and 1hr) and the rpc will wait until deadline exceeds.
  1340  	r.UpdateState(resolver.State{
  1341  		Addresses: addrs,
  1342  		ServiceConfig: parseServiceConfig(t, r, `{
  1343      "methodConfig": [
  1344          {
  1345              "name": [
  1346                  {
  1347                      "service": "grpc.testing.TestService",
  1348                      "method": "EmptyCall"
  1349                  },
  1350                  {
  1351                      "service": "grpc.testing.TestService",
  1352                      "method": "FullDuplexCall"
  1353                  }
  1354              ],
  1355              "waitForReady": true,
  1356              "timeout": ".000000001s"
  1357          }
  1358      ]
  1359  }`)})
  1360  
  1361  	// Wait for the new service config to take effect.
  1362  	for {
  1363  		if mc := cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall"); mc.Timeout != nil && *mc.Timeout == time.Nanosecond {
  1364  			break
  1365  		}
  1366  		time.Sleep(time.Millisecond)
  1367  	}
  1368  
  1369  	ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout)
  1370  	defer cancel()
  1371  	if _, err = tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded {
  1372  		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
  1373  	}
  1374  
  1375  	if _, err = tc.FullDuplexCall(ctx, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded {
  1376  		t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded)
  1377  	}
  1378  }
  1379  
  1380  func (s) TestServiceConfigMaxMsgSize(t *testing.T) {
  1381  	e := tcpClearRREnv
  1382  	r := manual.NewBuilderWithScheme("whatever")
  1383  
  1384  	// Setting up values and objects shared across all test cases.
  1385  	const smallSize = 1
  1386  	const largeSize = 1024
  1387  	const extraLargeSize = 2048
  1388  
  1389  	smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize)
  1390  	if err != nil {
  1391  		t.Fatal(err)
  1392  	}
  1393  	largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize)
  1394  	if err != nil {
  1395  		t.Fatal(err)
  1396  	}
  1397  	extraLargePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, extraLargeSize)
  1398  	if err != nil {
  1399  		t.Fatal(err)
  1400  	}
  1401  
  1402  	// Case1: sc set maxReqSize to 2048 (send), maxRespSize to 2048 (recv).
  1403  	te1 := testServiceConfigSetup(t, e)
  1404  	defer te1.tearDown()
  1405  
  1406  	te1.resolverScheme = r.Scheme()
  1407  	te1.startServer(&testServer{security: e.security})
  1408  	cc1 := te1.clientConn(grpc.WithResolvers(r))
  1409  
  1410  	addrs := []resolver.Address{{Addr: te1.srvAddr}}
  1411  	sc := parseServiceConfig(t, r, `{
  1412      "methodConfig": [
  1413          {
  1414              "name": [
  1415                  {
  1416                      "service": "grpc.testing.TestService",
  1417                      "method": "UnaryCall"
  1418                  },
  1419                  {
  1420                      "service": "grpc.testing.TestService",
  1421                      "method": "FullDuplexCall"
  1422                  }
  1423              ],
  1424              "maxRequestMessageBytes": 2048,
  1425              "maxResponseMessageBytes": 2048
  1426          }
  1427      ]
  1428  }`)
  1429  	r.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: sc})
  1430  	tc := testgrpc.NewTestServiceClient(cc1)
  1431  
  1432  	req := &testpb.SimpleRequest{
  1433  		ResponseType: testpb.PayloadType_COMPRESSABLE,
  1434  		ResponseSize: int32(extraLargeSize),
  1435  		Payload:      smallPayload,
  1436  	}
  1437  
  1438  	for {
  1439  		if cc1.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").MaxReqSize != nil {
  1440  			break
  1441  		}
  1442  		time.Sleep(time.Millisecond)
  1443  	}
  1444  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  1445  	defer cancel()
  1446  	// Test for unary RPC recv.
  1447  	if _, err = tc.UnaryCall(ctx, req, grpc.WaitForReady(true)); err == nil || status.Code(err) != codes.ResourceExhausted {
  1448  		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
  1449  	}
  1450  
  1451  	// Test for unary RPC send.
  1452  	req.Payload = extraLargePayload
  1453  	req.ResponseSize = int32(smallSize)
  1454  	if _, err := tc.UnaryCall(ctx, req); err == nil || status.Code(err) != codes.ResourceExhausted {
  1455  		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
  1456  	}
  1457  
  1458  	// Test for streaming RPC recv.
  1459  	respParam := []*testpb.ResponseParameters{
  1460  		{
  1461  			Size: int32(extraLargeSize),
  1462  		},
  1463  	}
  1464  	sreq := &testpb.StreamingOutputCallRequest{
  1465  		ResponseType:       testpb.PayloadType_COMPRESSABLE,
  1466  		ResponseParameters: respParam,
  1467  		Payload:            smallPayload,
  1468  	}
  1469  	stream, err := tc.FullDuplexCall(te1.ctx)
  1470  	if err != nil {
  1471  		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
  1472  	}
  1473  	if err = stream.Send(sreq); err != nil {
  1474  		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
  1475  	}
  1476  	if _, err = stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
  1477  		t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
  1478  	}
  1479  
  1480  	// Test for streaming RPC send.
  1481  	respParam[0].Size = int32(smallSize)
  1482  	sreq.Payload = extraLargePayload
  1483  	stream, err = tc.FullDuplexCall(te1.ctx)
  1484  	if err != nil {
  1485  		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
  1486  	}
  1487  	if err = stream.Send(sreq); err == nil || status.Code(err) != codes.ResourceExhausted {
  1488  		t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted)
  1489  	}
  1490  
  1491  	// Case2: Client API set maxReqSize to 1024 (send), maxRespSize to 1024 (recv). Sc sets maxReqSize to 2048 (send), maxRespSize to 2048 (recv).
  1492  	te2 := testServiceConfigSetup(t, e)
  1493  	te2.resolverScheme = r.Scheme()
  1494  	te2.maxClientReceiveMsgSize = newInt(1024)
  1495  	te2.maxClientSendMsgSize = newInt(1024)
  1496  
  1497  	te2.startServer(&testServer{security: e.security})
  1498  	defer te2.tearDown()
  1499  	cc2 := te2.clientConn(grpc.WithResolvers(r))
  1500  	r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: te2.srvAddr}}, ServiceConfig: sc})
  1501  	tc = testgrpc.NewTestServiceClient(cc2)
  1502  
  1503  	for {
  1504  		if cc2.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").MaxReqSize != nil {
  1505  			break
  1506  		}
  1507  		time.Sleep(time.Millisecond)
  1508  	}
  1509  
  1510  	// Test for unary RPC recv.
  1511  	req.Payload = smallPayload
  1512  	req.ResponseSize = int32(largeSize)
  1513  
  1514  	if _, err = tc.UnaryCall(ctx, req, grpc.WaitForReady(true)); err == nil || status.Code(err) != codes.ResourceExhausted {
  1515  		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
  1516  	}
  1517  
  1518  	// Test for unary RPC send.
  1519  	req.Payload = largePayload
  1520  	req.ResponseSize = int32(smallSize)
  1521  	if _, err := tc.UnaryCall(ctx, req); err == nil || status.Code(err) != codes.ResourceExhausted {
  1522  		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
  1523  	}
  1524  
  1525  	// Test for streaming RPC recv.
  1526  	stream, err = tc.FullDuplexCall(te2.ctx)
  1527  	respParam[0].Size = int32(largeSize)
  1528  	sreq.Payload = smallPayload
  1529  	if err != nil {
  1530  		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
  1531  	}
  1532  	if err = stream.Send(sreq); err != nil {
  1533  		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
  1534  	}
  1535  	if _, err = stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
  1536  		t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
  1537  	}
  1538  
  1539  	// Test for streaming RPC send.
  1540  	respParam[0].Size = int32(smallSize)
  1541  	sreq.Payload = largePayload
  1542  	stream, err = tc.FullDuplexCall(te2.ctx)
  1543  	if err != nil {
  1544  		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
  1545  	}
  1546  	if err = stream.Send(sreq); err == nil || status.Code(err) != codes.ResourceExhausted {
  1547  		t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted)
  1548  	}
  1549  
  1550  	// Case3: Client API set maxReqSize to 4096 (send), maxRespSize to 4096 (recv). Sc sets maxReqSize to 2048 (send), maxRespSize to 2048 (recv).
  1551  	te3 := testServiceConfigSetup(t, e)
  1552  	te3.resolverScheme = r.Scheme()
  1553  	te3.maxClientReceiveMsgSize = newInt(4096)
  1554  	te3.maxClientSendMsgSize = newInt(4096)
  1555  
  1556  	te3.startServer(&testServer{security: e.security})
  1557  	defer te3.tearDown()
  1558  
  1559  	cc3 := te3.clientConn(grpc.WithResolvers(r))
  1560  	r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: te3.srvAddr}}, ServiceConfig: sc})
  1561  	tc = testgrpc.NewTestServiceClient(cc3)
  1562  
  1563  	for {
  1564  		if cc3.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").MaxReqSize != nil {
  1565  			break
  1566  		}
  1567  		time.Sleep(time.Millisecond)
  1568  	}
  1569  
  1570  	// Test for unary RPC recv.
  1571  	req.Payload = smallPayload
  1572  	req.ResponseSize = int32(largeSize)
  1573  
  1574  	if _, err = tc.UnaryCall(ctx, req, grpc.WaitForReady(true)); err != nil {
  1575  		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want <nil>", err)
  1576  	}
  1577  
  1578  	req.ResponseSize = int32(extraLargeSize)
  1579  	if _, err := tc.UnaryCall(ctx, req); err == nil || status.Code(err) != codes.ResourceExhausted {
  1580  		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
  1581  	}
  1582  
  1583  	// Test for unary RPC send.
  1584  	req.Payload = largePayload
  1585  	req.ResponseSize = int32(smallSize)
  1586  	if _, err := tc.UnaryCall(ctx, req); err != nil {
  1587  		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want <nil>", err)
  1588  	}
  1589  
  1590  	req.Payload = extraLargePayload
  1591  	if _, err = tc.UnaryCall(ctx, req); err == nil || status.Code(err) != codes.ResourceExhausted {
  1592  		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
  1593  	}
  1594  
  1595  	// Test for streaming RPC recv.
  1596  	stream, err = tc.FullDuplexCall(te3.ctx)
  1597  	if err != nil {
  1598  		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
  1599  	}
  1600  	respParam[0].Size = int32(largeSize)
  1601  	sreq.Payload = smallPayload
  1602  
  1603  	if err = stream.Send(sreq); err != nil {
  1604  		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
  1605  	}
  1606  	if _, err = stream.Recv(); err != nil {
  1607  		t.Fatalf("%v.Recv() = _, %v, want <nil>", stream, err)
  1608  	}
  1609  
  1610  	respParam[0].Size = int32(extraLargeSize)
  1611  
  1612  	if err = stream.Send(sreq); err != nil {
  1613  		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
  1614  	}
  1615  	if _, err = stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
  1616  		t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
  1617  	}
  1618  
  1619  	// Test for streaming RPC send.
  1620  	respParam[0].Size = int32(smallSize)
  1621  	sreq.Payload = largePayload
  1622  	stream, err = tc.FullDuplexCall(te3.ctx)
  1623  	if err != nil {
  1624  		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
  1625  	}
  1626  	if err := stream.Send(sreq); err != nil {
  1627  		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
  1628  	}
  1629  	sreq.Payload = extraLargePayload
  1630  	if err := stream.Send(sreq); err == nil || status.Code(err) != codes.ResourceExhausted {
  1631  		t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted)
  1632  	}
  1633  }
  1634  
  1635  // Reading from a streaming RPC may fail with context canceled if timeout was
  1636  // set by service config (https://github.com/grpc/grpc-go/issues/1818). This
  1637  // test makes sure read from streaming RPC doesn't fail in this case.
  1638  func (s) TestStreamingRPCWithTimeoutInServiceConfigRecv(t *testing.T) {
  1639  	te := testServiceConfigSetup(t, tcpClearRREnv)
  1640  	te.startServer(&testServer{security: tcpClearRREnv.security})
  1641  	defer te.tearDown()
  1642  	r := manual.NewBuilderWithScheme("whatever")
  1643  
  1644  	te.resolverScheme = r.Scheme()
  1645  	cc := te.clientConn(grpc.WithResolvers(r))
  1646  	tc := testgrpc.NewTestServiceClient(cc)
  1647  
  1648  	r.UpdateState(resolver.State{
  1649  		Addresses: []resolver.Address{{Addr: te.srvAddr}},
  1650  		ServiceConfig: parseServiceConfig(t, r, `{
  1651  	    "methodConfig": [
  1652  	        {
  1653  	            "name": [
  1654  	                {
  1655  	                    "service": "grpc.testing.TestService",
  1656  	                    "method": "FullDuplexCall"
  1657  	                }
  1658  	            ],
  1659  	            "waitForReady": true,
  1660  	            "timeout": "10s"
  1661  	        }
  1662  	    ]
  1663  	}`)})
  1664  	// Make sure service config has been processed by grpc.
  1665  	for {
  1666  		if cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").Timeout != nil {
  1667  			break
  1668  		}
  1669  		time.Sleep(time.Millisecond)
  1670  	}
  1671  
  1672  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  1673  	defer cancel()
  1674  	stream, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true))
  1675  	if err != nil {
  1676  		t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want <nil>", err)
  1677  	}
  1678  
  1679  	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 0)
  1680  	if err != nil {
  1681  		t.Fatalf("failed to newPayload: %v", err)
  1682  	}
  1683  	req := &testpb.StreamingOutputCallRequest{
  1684  		ResponseType:       testpb.PayloadType_COMPRESSABLE,
  1685  		ResponseParameters: []*testpb.ResponseParameters{{Size: 0}},
  1686  		Payload:            payload,
  1687  	}
  1688  	if err := stream.Send(req); err != nil {
  1689  		t.Fatalf("stream.Send(%v) = %v, want <nil>", req, err)
  1690  	}
  1691  	stream.CloseSend()
  1692  	time.Sleep(time.Second)
  1693  	// Sleep 1 second before recv to make sure the final status is received
  1694  	// before the recv.
  1695  	if _, err := stream.Recv(); err != nil {
  1696  		t.Fatalf("stream.Recv = _, %v, want _, <nil>", err)
  1697  	}
  1698  	// Keep reading to drain the stream.
  1699  	for {
  1700  		if _, err := stream.Recv(); err != nil {
  1701  			break
  1702  		}
  1703  	}
  1704  }
  1705  
  1706  func (s) TestPreloaderClientSend(t *testing.T) {
  1707  	for _, e := range listTestEnv() {
  1708  		testPreloaderClientSend(t, e)
  1709  	}
  1710  }
  1711  
  1712  func testPreloaderClientSend(t *testing.T, e env) {
  1713  	te := newTest(t, e)
  1714  	te.userAgent = testAppUA
  1715  	te.declareLogNoise(
  1716  		"Failed to dial : context canceled; please retry.",
  1717  	)
  1718  	te.startServer(&testServer{security: e.security})
  1719  
  1720  	defer te.tearDown()
  1721  	tc := testgrpc.NewTestServiceClient(te.clientConn())
  1722  
  1723  	// Test for streaming RPC recv.
  1724  	// Set context for send with proper RPC Information
  1725  	stream, err := tc.FullDuplexCall(te.ctx, grpc.UseCompressor("gzip"))
  1726  	if err != nil {
  1727  		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
  1728  	}
  1729  	var index int
  1730  	for index < len(reqSizes) {
  1731  		respParam := []*testpb.ResponseParameters{
  1732  			{
  1733  				Size: int32(respSizes[index]),
  1734  			},
  1735  		}
  1736  
  1737  		payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(reqSizes[index]))
  1738  		if err != nil {
  1739  			t.Fatal(err)
  1740  		}
  1741  
  1742  		req := &testpb.StreamingOutputCallRequest{
  1743  			ResponseType:       testpb.PayloadType_COMPRESSABLE,
  1744  			ResponseParameters: respParam,
  1745  			Payload:            payload,
  1746  		}
  1747  		preparedMsg := &grpc.PreparedMsg{}
  1748  		err = preparedMsg.Encode(stream, req)
  1749  		if err != nil {
  1750  			t.Fatalf("PrepareMsg failed for size %d : %v", reqSizes[index], err)
  1751  		}
  1752  		if err := stream.SendMsg(preparedMsg); err != nil {
  1753  			t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
  1754  		}
  1755  		reply, err := stream.Recv()
  1756  		if err != nil {
  1757  			t.Fatalf("%v.Recv() = %v, want <nil>", stream, err)
  1758  		}
  1759  		pt := reply.GetPayload().GetType()
  1760  		if pt != testpb.PayloadType_COMPRESSABLE {
  1761  			t.Fatalf("Got the reply of type %d, want %d", pt, testpb.PayloadType_COMPRESSABLE)
  1762  		}
  1763  		size := len(reply.GetPayload().GetBody())
  1764  		if size != int(respSizes[index]) {
  1765  			t.Fatalf("Got reply body of length %d, want %d", size, respSizes[index])
  1766  		}
  1767  		index++
  1768  	}
  1769  	if err := stream.CloseSend(); err != nil {
  1770  		t.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil)
  1771  	}
  1772  	if _, err := stream.Recv(); err != io.EOF {
  1773  		t.Fatalf("%v failed to complele the ping pong test: %v", stream, err)
  1774  	}
  1775  }
  1776  
  1777  func (s) TestPreloaderSenderSend(t *testing.T) {
  1778  	ss := &stubserver.StubServer{
  1779  		FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
  1780  			for i := 0; i < 10; i++ {
  1781  				preparedMsg := &grpc.PreparedMsg{}
  1782  				err := preparedMsg.Encode(stream, &testpb.StreamingOutputCallResponse{
  1783  					Payload: &testpb.Payload{
  1784  						Body: []byte{'0' + uint8(i)},
  1785  					},
  1786  				})
  1787  				if err != nil {
  1788  					return err
  1789  				}
  1790  				stream.SendMsg(preparedMsg)
  1791  			}
  1792  			return nil
  1793  		},
  1794  	}
  1795  	if err := ss.Start(nil); err != nil {
  1796  		t.Fatalf("Error starting endpoint server: %v", err)
  1797  	}
  1798  	defer ss.Stop()
  1799  
  1800  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  1801  	defer cancel()
  1802  
  1803  	stream, err := ss.Client.FullDuplexCall(ctx)
  1804  	if err != nil {
  1805  		t.Fatalf("ss.Client.EmptyCall(_, _) = _, %v; want _, nil", err)
  1806  	}
  1807  
  1808  	var ngot int
  1809  	var buf bytes.Buffer
  1810  	for {
  1811  		reply, err := stream.Recv()
  1812  		if err == io.EOF {
  1813  			break
  1814  		}
  1815  		if err != nil {
  1816  			t.Fatal(err)
  1817  		}
  1818  		ngot++
  1819  		if buf.Len() > 0 {
  1820  			buf.WriteByte(',')
  1821  		}
  1822  		buf.Write(reply.GetPayload().GetBody())
  1823  	}
  1824  	if want := 10; ngot != want {
  1825  		t.Errorf("Got %d replies, want %d", ngot, want)
  1826  	}
  1827  	if got, want := buf.String(), "0,1,2,3,4,5,6,7,8,9"; got != want {
  1828  		t.Errorf("Got replies %q; want %q", got, want)
  1829  	}
  1830  }
  1831  
  1832  func (s) TestMaxMsgSizeClientDefault(t *testing.T) {
  1833  	for _, e := range listTestEnv() {
  1834  		testMaxMsgSizeClientDefault(t, e)
  1835  	}
  1836  }
  1837  
  1838  func testMaxMsgSizeClientDefault(t *testing.T, e env) {
  1839  	te := newTest(t, e)
  1840  	te.userAgent = testAppUA
  1841  	te.declareLogNoise(
  1842  		"Failed to dial : context canceled; please retry.",
  1843  	)
  1844  	te.startServer(&testServer{security: e.security})
  1845  
  1846  	defer te.tearDown()
  1847  	tc := testgrpc.NewTestServiceClient(te.clientConn())
  1848  
  1849  	const smallSize = 1
  1850  	const largeSize = 4 * 1024 * 1024
  1851  	smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize)
  1852  	if err != nil {
  1853  		t.Fatal(err)
  1854  	}
  1855  	req := &testpb.SimpleRequest{
  1856  		ResponseType: testpb.PayloadType_COMPRESSABLE,
  1857  		ResponseSize: int32(largeSize),
  1858  		Payload:      smallPayload,
  1859  	}
  1860  
  1861  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  1862  	defer cancel()
  1863  	// Test for unary RPC recv.
  1864  	if _, err := tc.UnaryCall(ctx, req); err == nil || status.Code(err) != codes.ResourceExhausted {
  1865  		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
  1866  	}
  1867  
  1868  	respParam := []*testpb.ResponseParameters{
  1869  		{
  1870  			Size: int32(largeSize),
  1871  		},
  1872  	}
  1873  	sreq := &testpb.StreamingOutputCallRequest{
  1874  		ResponseType:       testpb.PayloadType_COMPRESSABLE,
  1875  		ResponseParameters: respParam,
  1876  		Payload:            smallPayload,
  1877  	}
  1878  
  1879  	// Test for streaming RPC recv.
  1880  	stream, err := tc.FullDuplexCall(te.ctx)
  1881  	if err != nil {
  1882  		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
  1883  	}
  1884  	if err := stream.Send(sreq); err != nil {
  1885  		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
  1886  	}
  1887  	if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
  1888  		t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
  1889  	}
  1890  }
  1891  
  1892  func (s) TestMaxMsgSizeClientAPI(t *testing.T) {
  1893  	for _, e := range listTestEnv() {
  1894  		testMaxMsgSizeClientAPI(t, e)
  1895  	}
  1896  }
  1897  
  1898  func testMaxMsgSizeClientAPI(t *testing.T, e env) {
  1899  	te := newTest(t, e)
  1900  	te.userAgent = testAppUA
  1901  	// To avoid error on server side.
  1902  	te.maxServerSendMsgSize = newInt(5 * 1024 * 1024)
  1903  	te.maxClientReceiveMsgSize = newInt(1024)
  1904  	te.maxClientSendMsgSize = newInt(1024)
  1905  	te.declareLogNoise(
  1906  		"Failed to dial : context canceled; please retry.",
  1907  	)
  1908  	te.startServer(&testServer{security: e.security})
  1909  
  1910  	defer te.tearDown()
  1911  	tc := testgrpc.NewTestServiceClient(te.clientConn())
  1912  
  1913  	const smallSize = 1
  1914  	const largeSize = 1024
  1915  	smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize)
  1916  	if err != nil {
  1917  		t.Fatal(err)
  1918  	}
  1919  
  1920  	largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize)
  1921  	if err != nil {
  1922  		t.Fatal(err)
  1923  	}
  1924  	req := &testpb.SimpleRequest{
  1925  		ResponseType: testpb.PayloadType_COMPRESSABLE,
  1926  		ResponseSize: int32(largeSize),
  1927  		Payload:      smallPayload,
  1928  	}
  1929  
  1930  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  1931  	defer cancel()
  1932  	// Test for unary RPC recv.
  1933  	if _, err := tc.UnaryCall(ctx, req); err == nil || status.Code(err) != codes.ResourceExhausted {
  1934  		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
  1935  	}
  1936  
  1937  	// Test for unary RPC send.
  1938  	req.Payload = largePayload
  1939  	req.ResponseSize = int32(smallSize)
  1940  	if _, err := tc.UnaryCall(ctx, req); err == nil || status.Code(err) != codes.ResourceExhausted {
  1941  		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
  1942  	}
  1943  
  1944  	respParam := []*testpb.ResponseParameters{
  1945  		{
  1946  			Size: int32(largeSize),
  1947  		},
  1948  	}
  1949  	sreq := &testpb.StreamingOutputCallRequest{
  1950  		ResponseType:       testpb.PayloadType_COMPRESSABLE,
  1951  		ResponseParameters: respParam,
  1952  		Payload:            smallPayload,
  1953  	}
  1954  
  1955  	// Test for streaming RPC recv.
  1956  	stream, err := tc.FullDuplexCall(te.ctx)
  1957  	if err != nil {
  1958  		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
  1959  	}
  1960  	if err := stream.Send(sreq); err != nil {
  1961  		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
  1962  	}
  1963  	if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
  1964  		t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
  1965  	}
  1966  
  1967  	// Test for streaming RPC send.
  1968  	respParam[0].Size = int32(smallSize)
  1969  	sreq.Payload = largePayload
  1970  	stream, err = tc.FullDuplexCall(te.ctx)
  1971  	if err != nil {
  1972  		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
  1973  	}
  1974  	if err := stream.Send(sreq); err == nil || status.Code(err) != codes.ResourceExhausted {
  1975  		t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted)
  1976  	}
  1977  }
  1978  
  1979  func (s) TestMaxMsgSizeServerAPI(t *testing.T) {
  1980  	for _, e := range listTestEnv() {
  1981  		testMaxMsgSizeServerAPI(t, e)
  1982  	}
  1983  }
  1984  
  1985  func testMaxMsgSizeServerAPI(t *testing.T, e env) {
  1986  	te := newTest(t, e)
  1987  	te.userAgent = testAppUA
  1988  	te.maxServerReceiveMsgSize = newInt(1024)
  1989  	te.maxServerSendMsgSize = newInt(1024)
  1990  	te.declareLogNoise(
  1991  		"Failed to dial : context canceled; please retry.",
  1992  	)
  1993  	te.startServer(&testServer{security: e.security})
  1994  
  1995  	defer te.tearDown()
  1996  	tc := testgrpc.NewTestServiceClient(te.clientConn())
  1997  
  1998  	const smallSize = 1
  1999  	const largeSize = 1024
  2000  	smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize)
  2001  	if err != nil {
  2002  		t.Fatal(err)
  2003  	}
  2004  
  2005  	largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize)
  2006  	if err != nil {
  2007  		t.Fatal(err)
  2008  	}
  2009  	req := &testpb.SimpleRequest{
  2010  		ResponseType: testpb.PayloadType_COMPRESSABLE,
  2011  		ResponseSize: int32(largeSize),
  2012  		Payload:      smallPayload,
  2013  	}
  2014  
  2015  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  2016  	defer cancel()
  2017  	// Test for unary RPC send.
  2018  	if _, err := tc.UnaryCall(ctx, req); err == nil || status.Code(err) != codes.ResourceExhausted {
  2019  		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
  2020  	}
  2021  
  2022  	// Test for unary RPC recv.
  2023  	req.Payload = largePayload
  2024  	req.ResponseSize = int32(smallSize)
  2025  	if _, err := tc.UnaryCall(ctx, req); err == nil || status.Code(err) != codes.ResourceExhausted {
  2026  		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
  2027  	}
  2028  
  2029  	respParam := []*testpb.ResponseParameters{
  2030  		{
  2031  			Size: int32(largeSize),
  2032  		},
  2033  	}
  2034  	sreq := &testpb.StreamingOutputCallRequest{
  2035  		ResponseType:       testpb.PayloadType_COMPRESSABLE,
  2036  		ResponseParameters: respParam,
  2037  		Payload:            smallPayload,
  2038  	}
  2039  
  2040  	// Test for streaming RPC send.
  2041  	stream, err := tc.FullDuplexCall(te.ctx)
  2042  	if err != nil {
  2043  		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
  2044  	}
  2045  	if err := stream.Send(sreq); err != nil {
  2046  		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
  2047  	}
  2048  	if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
  2049  		t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
  2050  	}
  2051  
  2052  	// Test for streaming RPC recv.
  2053  	respParam[0].Size = int32(smallSize)
  2054  	sreq.Payload = largePayload
  2055  	stream, err = tc.FullDuplexCall(te.ctx)
  2056  	if err != nil {
  2057  		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
  2058  	}
  2059  	if err := stream.Send(sreq); err != nil {
  2060  		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
  2061  	}
  2062  	if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
  2063  		t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
  2064  	}
  2065  }
  2066  
  2067  func (s) TestTap(t *testing.T) {
  2068  	for _, e := range listTestEnv() {
  2069  		if e.name == "handler-tls" {
  2070  			continue
  2071  		}
  2072  		testTap(t, e)
  2073  	}
  2074  }
  2075  
  2076  type myTap struct {
  2077  	cnt int
  2078  }
  2079  
  2080  func (t *myTap) handle(ctx context.Context, info *tap.Info) (context.Context, error) {
  2081  	if info != nil {
  2082  		switch info.FullMethodName {
  2083  		case "/grpc.testing.TestService/EmptyCall":
  2084  			t.cnt++
  2085  
  2086  			if vals := info.Header.Get("return-error"); len(vals) > 0 && vals[0] == "true" {
  2087  				return nil, status.Errorf(codes.Unknown, "tap error")
  2088  			}
  2089  		case "/grpc.testing.TestService/UnaryCall":
  2090  			return nil, fmt.Errorf("tap error")
  2091  		case "/grpc.testing.TestService/FullDuplexCall":
  2092  			return nil, status.Errorf(codes.FailedPrecondition, "test custom error")
  2093  		}
  2094  	}
  2095  	return ctx, nil
  2096  }
  2097  
  2098  func testTap(t *testing.T, e env) {
  2099  	te := newTest(t, e)
  2100  	te.userAgent = testAppUA
  2101  	ttap := &myTap{}
  2102  	te.tapHandle = ttap.handle
  2103  	te.startServer(&testServer{security: e.security})
  2104  	defer te.tearDown()
  2105  
  2106  	cc := te.clientConn()
  2107  	tc := testgrpc.NewTestServiceClient(cc)
  2108  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  2109  	defer cancel()
  2110  
  2111  	if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
  2112  		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
  2113  	}
  2114  	if ttap.cnt != 1 {
  2115  		t.Fatalf("Get the count in ttap %d, want 1", ttap.cnt)
  2116  	}
  2117  
  2118  	if _, err := tc.EmptyCall(metadata.AppendToOutgoingContext(ctx, "return-error", "false"), &testpb.Empty{}); err != nil {
  2119  		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
  2120  	}
  2121  	if ttap.cnt != 2 {
  2122  		t.Fatalf("Get the count in ttap %d, want 2", ttap.cnt)
  2123  	}
  2124  
  2125  	if _, err := tc.EmptyCall(metadata.AppendToOutgoingContext(ctx, "return-error", "true"), &testpb.Empty{}); status.Code(err) != codes.Unknown {
  2126  		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.Unknown)
  2127  	}
  2128  	if ttap.cnt != 3 {
  2129  		t.Fatalf("Get the count in ttap %d, want 3", ttap.cnt)
  2130  	}
  2131  
  2132  	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 31)
  2133  	if err != nil {
  2134  		t.Fatal(err)
  2135  	}
  2136  
  2137  	req := &testpb.SimpleRequest{
  2138  		ResponseType: testpb.PayloadType_COMPRESSABLE,
  2139  		ResponseSize: 45,
  2140  		Payload:      payload,
  2141  	}
  2142  	if _, err := tc.UnaryCall(ctx, req); status.Code(err) != codes.PermissionDenied {
  2143  		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, %s", err, codes.PermissionDenied)
  2144  	}
  2145  	str, err := tc.FullDuplexCall(ctx)
  2146  	if err != nil {
  2147  		t.Fatalf("Unexpected error creating stream: %v", err)
  2148  	}
  2149  	if _, err := str.Recv(); status.Code(err) != codes.FailedPrecondition {
  2150  		t.Fatalf("FullDuplexCall Recv() = _, %v, want _, %s", err, codes.FailedPrecondition)
  2151  	}
  2152  }
  2153  
  2154  func (s) TestEmptyUnaryWithUserAgent(t *testing.T) {
  2155  	for _, e := range listTestEnv() {
  2156  		testEmptyUnaryWithUserAgent(t, e)
  2157  	}
  2158  }
  2159  
  2160  func testEmptyUnaryWithUserAgent(t *testing.T, e env) {
  2161  	te := newTest(t, e)
  2162  	te.userAgent = testAppUA
  2163  	te.startServer(&testServer{security: e.security})
  2164  	defer te.tearDown()
  2165  
  2166  	cc := te.clientConn()
  2167  	tc := testgrpc.NewTestServiceClient(cc)
  2168  	var header metadata.MD
  2169  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  2170  	defer cancel()
  2171  	reply, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.Header(&header))
  2172  	if err != nil || !proto.Equal(&testpb.Empty{}, reply) {
  2173  		t.Fatalf("TestService/EmptyCall(_, _) = %v, %v, want %v, <nil>", reply, err, &testpb.Empty{})
  2174  	}
  2175  	if v, ok := header["ua"]; !ok || !strings.HasPrefix(v[0], testAppUA) {
  2176  		t.Fatalf("header[\"ua\"] = %q, %t, want string with prefix %q, true", v, ok, testAppUA)
  2177  	}
  2178  
  2179  	te.srv.Stop()
  2180  }
  2181  
  2182  func (s) TestFailedEmptyUnary(t *testing.T) {
  2183  	for _, e := range listTestEnv() {
  2184  		if e.name == "handler-tls" {
  2185  			// This test covers status details, but
  2186  			// Grpc-Status-Details-Bin is not support in handler_server.
  2187  			continue
  2188  		}
  2189  		testFailedEmptyUnary(t, e)
  2190  	}
  2191  }
  2192  
  2193  func testFailedEmptyUnary(t *testing.T, e env) {
  2194  	te := newTest(t, e)
  2195  	te.userAgent = failAppUA
  2196  	te.startServer(&testServer{security: e.security})
  2197  	defer te.tearDown()
  2198  	tc := testgrpc.NewTestServiceClient(te.clientConn())
  2199  
  2200  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  2201  	defer cancel()
  2202  	ctx = metadata.NewOutgoingContext(ctx, testMetadata)
  2203  	wantErr := detailedError
  2204  	if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); !testutils.StatusErrEqual(err, wantErr) {
  2205  		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %v", err, wantErr)
  2206  	}
  2207  }
  2208  
  2209  func (s) TestLargeUnary(t *testing.T) {
  2210  	for _, e := range listTestEnv() {
  2211  		testLargeUnary(t, e)
  2212  	}
  2213  }
  2214  
  2215  func testLargeUnary(t *testing.T, e env) {
  2216  	te := newTest(t, e)
  2217  	te.startServer(&testServer{security: e.security})
  2218  	defer te.tearDown()
  2219  	tc := testgrpc.NewTestServiceClient(te.clientConn())
  2220  
  2221  	const argSize = 271828
  2222  	const respSize = 314159
  2223  
  2224  	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
  2225  	if err != nil {
  2226  		t.Fatal(err)
  2227  	}
  2228  
  2229  	req := &testpb.SimpleRequest{
  2230  		ResponseType: testpb.PayloadType_COMPRESSABLE,
  2231  		ResponseSize: respSize,
  2232  		Payload:      payload,
  2233  	}
  2234  
  2235  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  2236  	defer cancel()
  2237  	reply, err := tc.UnaryCall(ctx, req)
  2238  	if err != nil {
  2239  		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, <nil>", err)
  2240  	}
  2241  	pt := reply.GetPayload().GetType()
  2242  	ps := len(reply.GetPayload().GetBody())
  2243  	if pt != testpb.PayloadType_COMPRESSABLE || ps != respSize {
  2244  		t.Fatalf("Got the reply with type %d len %d; want %d, %d", pt, ps, testpb.PayloadType_COMPRESSABLE, respSize)
  2245  	}
  2246  }
  2247  
  2248  // Test backward-compatibility API for setting msg size limit.
  2249  func (s) TestExceedMsgLimit(t *testing.T) {
  2250  	for _, e := range listTestEnv() {
  2251  		testExceedMsgLimit(t, e)
  2252  	}
  2253  }
  2254  
  2255  func testExceedMsgLimit(t *testing.T, e env) {
  2256  	te := newTest(t, e)
  2257  	maxMsgSize := 1024
  2258  	te.maxServerMsgSize, te.maxClientMsgSize = newInt(maxMsgSize), newInt(maxMsgSize)
  2259  	te.startServer(&testServer{security: e.security})
  2260  	defer te.tearDown()
  2261  	tc := testgrpc.NewTestServiceClient(te.clientConn())
  2262  
  2263  	largeSize := int32(maxMsgSize + 1)
  2264  	const smallSize = 1
  2265  
  2266  	largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize)
  2267  	if err != nil {
  2268  		t.Fatal(err)
  2269  	}
  2270  	smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize)
  2271  	if err != nil {
  2272  		t.Fatal(err)
  2273  	}
  2274  
  2275  	// Make sure the server cannot receive a unary RPC of largeSize.
  2276  	req := &testpb.SimpleRequest{
  2277  		ResponseType: testpb.PayloadType_COMPRESSABLE,
  2278  		ResponseSize: smallSize,
  2279  		Payload:      largePayload,
  2280  	}
  2281  
  2282  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  2283  	defer cancel()
  2284  	if _, err := tc.UnaryCall(ctx, req); err == nil || status.Code(err) != codes.ResourceExhausted {
  2285  		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
  2286  	}
  2287  	// Make sure the client cannot receive a unary RPC of largeSize.
  2288  	req.ResponseSize = largeSize
  2289  	req.Payload = smallPayload
  2290  	if _, err := tc.UnaryCall(ctx, req); err == nil || status.Code(err) != codes.ResourceExhausted {
  2291  		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
  2292  	}
  2293  
  2294  	// Make sure the server cannot receive a streaming RPC of largeSize.
  2295  	stream, err := tc.FullDuplexCall(te.ctx)
  2296  	if err != nil {
  2297  		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
  2298  	}
  2299  	respParam := []*testpb.ResponseParameters{
  2300  		{
  2301  			Size: 1,
  2302  		},
  2303  	}
  2304  
  2305  	sreq := &testpb.StreamingOutputCallRequest{
  2306  		ResponseType:       testpb.PayloadType_COMPRESSABLE,
  2307  		ResponseParameters: respParam,
  2308  		Payload:            largePayload,
  2309  	}
  2310  	if err := stream.Send(sreq); err != nil {
  2311  		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
  2312  	}
  2313  	if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
  2314  		t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
  2315  	}
  2316  
  2317  	// Test on client side for streaming RPC.
  2318  	stream, err = tc.FullDuplexCall(te.ctx)
  2319  	if err != nil {
  2320  		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
  2321  	}
  2322  	respParam[0].Size = largeSize
  2323  	sreq.Payload = smallPayload
  2324  	if err := stream.Send(sreq); err != nil {
  2325  		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
  2326  	}
  2327  	if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
  2328  		t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
  2329  	}
  2330  }
  2331  
  2332  func (s) TestPeerClientSide(t *testing.T) {
  2333  	for _, e := range listTestEnv() {
  2334  		testPeerClientSide(t, e)
  2335  	}
  2336  }
  2337  
  2338  func testPeerClientSide(t *testing.T, e env) {
  2339  	te := newTest(t, e)
  2340  	te.userAgent = testAppUA
  2341  	te.startServer(&testServer{security: e.security})
  2342  	defer te.tearDown()
  2343  	tc := testgrpc.NewTestServiceClient(te.clientConn())
  2344  	peer := new(peer.Peer)
  2345  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  2346  	defer cancel()
  2347  	if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil {
  2348  		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
  2349  	}
  2350  	pa := peer.Addr.String()
  2351  	if e.network == "unix" {
  2352  		if pa != te.srvAddr {
  2353  			t.Fatalf("peer.Addr = %v, want %v", pa, te.srvAddr)
  2354  		}
  2355  		return
  2356  	}
  2357  	_, pp, err := net.SplitHostPort(pa)
  2358  	if err != nil {
  2359  		t.Fatalf("Failed to parse address from peer.")
  2360  	}
  2361  	_, sp, err := net.SplitHostPort(te.srvAddr)
  2362  	if err != nil {
  2363  		t.Fatalf("Failed to parse address of test server.")
  2364  	}
  2365  	if pp != sp {
  2366  		t.Fatalf("peer.Addr = localhost:%v, want localhost:%v", pp, sp)
  2367  	}
  2368  }
  2369  
  2370  // TestPeerNegative tests that if call fails setting peer
  2371  // doesn't cause a segmentation fault.
  2372  // issue#1141 https://github.com/grpc/grpc-go/issues/1141
  2373  func (s) TestPeerNegative(t *testing.T) {
  2374  	for _, e := range listTestEnv() {
  2375  		testPeerNegative(t, e)
  2376  	}
  2377  }
  2378  
  2379  func testPeerNegative(t *testing.T, e env) {
  2380  	te := newTest(t, e)
  2381  	te.startServer(&testServer{security: e.security})
  2382  	defer te.tearDown()
  2383  
  2384  	cc := te.clientConn()
  2385  	tc := testgrpc.NewTestServiceClient(cc)
  2386  	peer := new(peer.Peer)
  2387  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  2388  	cancel()
  2389  	tc.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer))
  2390  }
  2391  
  2392  func (s) TestPeerFailedRPC(t *testing.T) {
  2393  	for _, e := range listTestEnv() {
  2394  		testPeerFailedRPC(t, e)
  2395  	}
  2396  }
  2397  
  2398  func testPeerFailedRPC(t *testing.T, e env) {
  2399  	te := newTest(t, e)
  2400  	te.maxServerReceiveMsgSize = newInt(1 * 1024)
  2401  	te.startServer(&testServer{security: e.security})
  2402  
  2403  	defer te.tearDown()
  2404  	tc := testgrpc.NewTestServiceClient(te.clientConn())
  2405  
  2406  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  2407  	defer cancel()
  2408  	// first make a successful request to the server
  2409  	if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
  2410  		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
  2411  	}
  2412  
  2413  	// make a second request that will be rejected by the server
  2414  	const largeSize = 5 * 1024
  2415  	largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize)
  2416  	if err != nil {
  2417  		t.Fatal(err)
  2418  	}
  2419  	req := &testpb.SimpleRequest{
  2420  		ResponseType: testpb.PayloadType_COMPRESSABLE,
  2421  		Payload:      largePayload,
  2422  	}
  2423  
  2424  	peer := new(peer.Peer)
  2425  	if _, err := tc.UnaryCall(ctx, req, grpc.Peer(peer)); err == nil || status.Code(err) != codes.ResourceExhausted {
  2426  		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
  2427  	} else {
  2428  		pa := peer.Addr.String()
  2429  		if e.network == "unix" {
  2430  			if pa != te.srvAddr {
  2431  				t.Fatalf("peer.Addr = %v, want %v", pa, te.srvAddr)
  2432  			}
  2433  			return
  2434  		}
  2435  		_, pp, err := net.SplitHostPort(pa)
  2436  		if err != nil {
  2437  			t.Fatalf("Failed to parse address from peer.")
  2438  		}
  2439  		_, sp, err := net.SplitHostPort(te.srvAddr)
  2440  		if err != nil {
  2441  			t.Fatalf("Failed to parse address of test server.")
  2442  		}
  2443  		if pp != sp {
  2444  			t.Fatalf("peer.Addr = localhost:%v, want localhost:%v", pp, sp)
  2445  		}
  2446  	}
  2447  }
  2448  
  2449  func (s) TestMetadataUnaryRPC(t *testing.T) {
  2450  	for _, e := range listTestEnv() {
  2451  		testMetadataUnaryRPC(t, e)
  2452  	}
  2453  }
  2454  
  2455  func testMetadataUnaryRPC(t *testing.T, e env) {
  2456  	te := newTest(t, e)
  2457  	te.startServer(&testServer{security: e.security})
  2458  	defer te.tearDown()
  2459  	tc := testgrpc.NewTestServiceClient(te.clientConn())
  2460  
  2461  	const argSize = 2718
  2462  	const respSize = 314
  2463  
  2464  	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
  2465  	if err != nil {
  2466  		t.Fatal(err)
  2467  	}
  2468  
  2469  	req := &testpb.SimpleRequest{
  2470  		ResponseType: testpb.PayloadType_COMPRESSABLE,
  2471  		ResponseSize: respSize,
  2472  		Payload:      payload,
  2473  	}
  2474  	var header, trailer metadata.MD
  2475  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  2476  	defer cancel()
  2477  	ctx = metadata.NewOutgoingContext(ctx, testMetadata)
  2478  	if _, err := tc.UnaryCall(ctx, req, grpc.Header(&header), grpc.Trailer(&trailer)); err != nil {
  2479  		t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, <nil>", ctx, err)
  2480  	}
  2481  	// Ignore optional response headers that Servers may set:
  2482  	if header != nil {
  2483  		delete(header, "trailer") // RFC 2616 says server SHOULD (but optional) declare trailers
  2484  		delete(header, "date")    // the Date header is also optional
  2485  		delete(header, "user-agent")
  2486  		delete(header, "content-type")
  2487  		delete(header, "grpc-accept-encoding")
  2488  	}
  2489  	if !reflect.DeepEqual(header, testMetadata) {
  2490  		t.Fatalf("Received header metadata %v, want %v", header, testMetadata)
  2491  	}
  2492  	if !reflect.DeepEqual(trailer, testTrailerMetadata) {
  2493  		t.Fatalf("Received trailer metadata %v, want %v", trailer, testTrailerMetadata)
  2494  	}
  2495  }
  2496  
  2497  func (s) TestMetadataOrderUnaryRPC(t *testing.T) {
  2498  	for _, e := range listTestEnv() {
  2499  		testMetadataOrderUnaryRPC(t, e)
  2500  	}
  2501  }
  2502  
  2503  func testMetadataOrderUnaryRPC(t *testing.T, e env) {
  2504  	te := newTest(t, e)
  2505  	te.startServer(&testServer{security: e.security})
  2506  	defer te.tearDown()
  2507  	tc := testgrpc.NewTestServiceClient(te.clientConn())
  2508  
  2509  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  2510  	defer cancel()
  2511  	ctx = metadata.NewOutgoingContext(ctx, testMetadata)
  2512  	ctx = metadata.AppendToOutgoingContext(ctx, "key1", "value2")
  2513  	ctx = metadata.AppendToOutgoingContext(ctx, "key1", "value3")
  2514  
  2515  	// using Join to built expected metadata instead of FromOutgoingContext
  2516  	newMetadata := metadata.Join(testMetadata, metadata.Pairs("key1", "value2", "key1", "value3"))
  2517  
  2518  	var header metadata.MD
  2519  	if _, err := tc.UnaryCall(ctx, &testpb.SimpleRequest{}, grpc.Header(&header)); err != nil {
  2520  		t.Fatal(err)
  2521  	}
  2522  
  2523  	// Ignore optional response headers that Servers may set:
  2524  	if header != nil {
  2525  		delete(header, "trailer") // RFC 2616 says server SHOULD (but optional) declare trailers
  2526  		delete(header, "date")    // the Date header is also optional
  2527  		delete(header, "user-agent")
  2528  		delete(header, "content-type")
  2529  		delete(header, "grpc-accept-encoding")
  2530  	}
  2531  
  2532  	if !reflect.DeepEqual(header, newMetadata) {
  2533  		t.Fatalf("Received header metadata %v, want %v", header, newMetadata)
  2534  	}
  2535  }
  2536  
  2537  func (s) TestMultipleSetTrailerUnaryRPC(t *testing.T) {
  2538  	for _, e := range listTestEnv() {
  2539  		testMultipleSetTrailerUnaryRPC(t, e)
  2540  	}
  2541  }
  2542  
  2543  func testMultipleSetTrailerUnaryRPC(t *testing.T, e env) {
  2544  	te := newTest(t, e)
  2545  	te.startServer(&testServer{security: e.security, multipleSetTrailer: true})
  2546  	defer te.tearDown()
  2547  	tc := testgrpc.NewTestServiceClient(te.clientConn())
  2548  
  2549  	const (
  2550  		argSize  = 1
  2551  		respSize = 1
  2552  	)
  2553  	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
  2554  	if err != nil {
  2555  		t.Fatal(err)
  2556  	}
  2557  
  2558  	req := &testpb.SimpleRequest{
  2559  		ResponseType: testpb.PayloadType_COMPRESSABLE,
  2560  		ResponseSize: respSize,
  2561  		Payload:      payload,
  2562  	}
  2563  	var trailer metadata.MD
  2564  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  2565  	defer cancel()
  2566  	ctx = metadata.NewOutgoingContext(ctx, testMetadata)
  2567  	if _, err := tc.UnaryCall(ctx, req, grpc.Trailer(&trailer), grpc.WaitForReady(true)); err != nil {
  2568  		t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, <nil>", ctx, err)
  2569  	}
  2570  	expectedTrailer := metadata.Join(testTrailerMetadata, testTrailerMetadata2)
  2571  	if !reflect.DeepEqual(trailer, expectedTrailer) {
  2572  		t.Fatalf("Received trailer metadata %v, want %v", trailer, expectedTrailer)
  2573  	}
  2574  }
  2575  
  2576  func (s) TestMultipleSetTrailerStreamingRPC(t *testing.T) {
  2577  	for _, e := range listTestEnv() {
  2578  		testMultipleSetTrailerStreamingRPC(t, e)
  2579  	}
  2580  }
  2581  
  2582  func testMultipleSetTrailerStreamingRPC(t *testing.T, e env) {
  2583  	te := newTest(t, e)
  2584  	te.startServer(&testServer{security: e.security, multipleSetTrailer: true})
  2585  	defer te.tearDown()
  2586  	tc := testgrpc.NewTestServiceClient(te.clientConn())
  2587  
  2588  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  2589  	defer cancel()
  2590  	ctx = metadata.NewOutgoingContext(ctx, testMetadata)
  2591  	stream, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true))
  2592  	if err != nil {
  2593  		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
  2594  	}
  2595  	if err := stream.CloseSend(); err != nil {
  2596  		t.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil)
  2597  	}
  2598  	if _, err := stream.Recv(); err != io.EOF {
  2599  		t.Fatalf("%v failed to complele the FullDuplexCall: %v", stream, err)
  2600  	}
  2601  
  2602  	trailer := stream.Trailer()
  2603  	expectedTrailer := metadata.Join(testTrailerMetadata, testTrailerMetadata2)
  2604  	if !reflect.DeepEqual(trailer, expectedTrailer) {
  2605  		t.Fatalf("Received trailer metadata %v, want %v", trailer, expectedTrailer)
  2606  	}
  2607  }
  2608  
  2609  func (s) TestSetAndSendHeaderUnaryRPC(t *testing.T) {
  2610  	for _, e := range listTestEnv() {
  2611  		if e.name == "handler-tls" {
  2612  			continue
  2613  		}
  2614  		testSetAndSendHeaderUnaryRPC(t, e)
  2615  	}
  2616  }
  2617  
  2618  // To test header metadata is sent on SendHeader().
  2619  func testSetAndSendHeaderUnaryRPC(t *testing.T, e env) {
  2620  	te := newTest(t, e)
  2621  	te.startServer(&testServer{security: e.security, setAndSendHeader: true})
  2622  	defer te.tearDown()
  2623  	tc := testgrpc.NewTestServiceClient(te.clientConn())
  2624  
  2625  	const (
  2626  		argSize  = 1
  2627  		respSize = 1
  2628  	)
  2629  	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
  2630  	if err != nil {
  2631  		t.Fatal(err)
  2632  	}
  2633  
  2634  	req := &testpb.SimpleRequest{
  2635  		ResponseType: testpb.PayloadType_COMPRESSABLE,
  2636  		ResponseSize: respSize,
  2637  		Payload:      payload,
  2638  	}
  2639  	var header metadata.MD
  2640  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  2641  	defer cancel()
  2642  	ctx = metadata.NewOutgoingContext(ctx, testMetadata)
  2643  	if _, err := tc.UnaryCall(ctx, req, grpc.Header(&header), grpc.WaitForReady(true)); err != nil {
  2644  		t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, <nil>", ctx, err)
  2645  	}
  2646  	delete(header, "user-agent")
  2647  	delete(header, "content-type")
  2648  	delete(header, "grpc-accept-encoding")
  2649  
  2650  	expectedHeader := metadata.Join(testMetadata, testMetadata2)
  2651  	if !reflect.DeepEqual(header, expectedHeader) {
  2652  		t.Fatalf("Received header metadata %v, want %v", header, expectedHeader)
  2653  	}
  2654  }
  2655  
  2656  func (s) TestMultipleSetHeaderUnaryRPC(t *testing.T) {
  2657  	for _, e := range listTestEnv() {
  2658  		if e.name == "handler-tls" {
  2659  			continue
  2660  		}
  2661  		testMultipleSetHeaderUnaryRPC(t, e)
  2662  	}
  2663  }
  2664  
  2665  // To test header metadata is sent when sending response.
  2666  func testMultipleSetHeaderUnaryRPC(t *testing.T, e env) {
  2667  	te := newTest(t, e)
  2668  	te.startServer(&testServer{security: e.security, setHeaderOnly: true})
  2669  	defer te.tearDown()
  2670  	tc := testgrpc.NewTestServiceClient(te.clientConn())
  2671  
  2672  	const (
  2673  		argSize  = 1
  2674  		respSize = 1
  2675  	)
  2676  	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
  2677  	if err != nil {
  2678  		t.Fatal(err)
  2679  	}
  2680  
  2681  	req := &testpb.SimpleRequest{
  2682  		ResponseType: testpb.PayloadType_COMPRESSABLE,
  2683  		ResponseSize: respSize,
  2684  		Payload:      payload,
  2685  	}
  2686  
  2687  	var header metadata.MD
  2688  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  2689  	defer cancel()
  2690  	ctx = metadata.NewOutgoingContext(ctx, testMetadata)
  2691  	if _, err := tc.UnaryCall(ctx, req, grpc.Header(&header), grpc.WaitForReady(true)); err != nil {
  2692  		t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, <nil>", ctx, err)
  2693  	}
  2694  	delete(header, "user-agent")
  2695  	delete(header, "content-type")
  2696  	delete(header, "grpc-accept-encoding")
  2697  	expectedHeader := metadata.Join(testMetadata, testMetadata2)
  2698  	if !reflect.DeepEqual(header, expectedHeader) {
  2699  		t.Fatalf("Received header metadata %v, want %v", header, expectedHeader)
  2700  	}
  2701  }
  2702  
  2703  func (s) TestMultipleSetHeaderUnaryRPCError(t *testing.T) {
  2704  	for _, e := range listTestEnv() {
  2705  		if e.name == "handler-tls" {
  2706  			continue
  2707  		}
  2708  		testMultipleSetHeaderUnaryRPCError(t, e)
  2709  	}
  2710  }
  2711  
  2712  // To test header metadata is sent when sending status.
  2713  func testMultipleSetHeaderUnaryRPCError(t *testing.T, e env) {
  2714  	te := newTest(t, e)
  2715  	te.startServer(&testServer{security: e.security, setHeaderOnly: true})
  2716  	defer te.tearDown()
  2717  	tc := testgrpc.NewTestServiceClient(te.clientConn())
  2718  
  2719  	const (
  2720  		argSize  = 1
  2721  		respSize = -1 // Invalid respSize to make RPC fail.
  2722  	)
  2723  	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
  2724  	if err != nil {
  2725  		t.Fatal(err)
  2726  	}
  2727  
  2728  	req := &testpb.SimpleRequest{
  2729  		ResponseType: testpb.PayloadType_COMPRESSABLE,
  2730  		ResponseSize: respSize,
  2731  		Payload:      payload,
  2732  	}
  2733  	var header metadata.MD
  2734  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  2735  	defer cancel()
  2736  	ctx = metadata.NewOutgoingContext(ctx, testMetadata)
  2737  	if _, err := tc.UnaryCall(ctx, req, grpc.Header(&header), grpc.WaitForReady(true)); err == nil {
  2738  		t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, <non-nil>", ctx, err)
  2739  	}
  2740  	delete(header, "user-agent")
  2741  	delete(header, "content-type")
  2742  	delete(header, "grpc-accept-encoding")
  2743  	expectedHeader := metadata.Join(testMetadata, testMetadata2)
  2744  	if !reflect.DeepEqual(header, expectedHeader) {
  2745  		t.Fatalf("Received header metadata %v, want %v", header, expectedHeader)
  2746  	}
  2747  }
  2748  
  2749  func (s) TestSetAndSendHeaderStreamingRPC(t *testing.T) {
  2750  	for _, e := range listTestEnv() {
  2751  		if e.name == "handler-tls" {
  2752  			continue
  2753  		}
  2754  		testSetAndSendHeaderStreamingRPC(t, e)
  2755  	}
  2756  }
  2757  
  2758  // To test header metadata is sent on SendHeader().
  2759  func testSetAndSendHeaderStreamingRPC(t *testing.T, e env) {
  2760  	te := newTest(t, e)
  2761  	te.startServer(&testServer{security: e.security, setAndSendHeader: true})
  2762  	defer te.tearDown()
  2763  	tc := testgrpc.NewTestServiceClient(te.clientConn())
  2764  
  2765  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  2766  	defer cancel()
  2767  	ctx = metadata.NewOutgoingContext(ctx, testMetadata)
  2768  	stream, err := tc.FullDuplexCall(ctx)
  2769  	if err != nil {
  2770  		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
  2771  	}
  2772  	if err := stream.CloseSend(); err != nil {
  2773  		t.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil)
  2774  	}
  2775  	if _, err := stream.Recv(); err != io.EOF {
  2776  		t.Fatalf("%v failed to complele the FullDuplexCall: %v", stream, err)
  2777  	}
  2778  
  2779  	header, err := stream.Header()
  2780  	if err != nil {
  2781  		t.Fatalf("%v.Header() = _, %v, want _, <nil>", stream, err)
  2782  	}
  2783  	delete(header, "user-agent")
  2784  	delete(header, "content-type")
  2785  	delete(header, "grpc-accept-encoding")
  2786  	expectedHeader := metadata.Join(testMetadata, testMetadata2)
  2787  	if !reflect.DeepEqual(header, expectedHeader) {
  2788  		t.Fatalf("Received header metadata %v, want %v", header, expectedHeader)
  2789  	}
  2790  }
  2791  
  2792  func (s) TestMultipleSetHeaderStreamingRPC(t *testing.T) {
  2793  	for _, e := range listTestEnv() {
  2794  		if e.name == "handler-tls" {
  2795  			continue
  2796  		}
  2797  		testMultipleSetHeaderStreamingRPC(t, e)
  2798  	}
  2799  }
  2800  
  2801  // To test header metadata is sent when sending response.
  2802  func testMultipleSetHeaderStreamingRPC(t *testing.T, e env) {
  2803  	te := newTest(t, e)
  2804  	te.startServer(&testServer{security: e.security, setHeaderOnly: true})
  2805  	defer te.tearDown()
  2806  	tc := testgrpc.NewTestServiceClient(te.clientConn())
  2807  
  2808  	const (
  2809  		argSize  = 1
  2810  		respSize = 1
  2811  	)
  2812  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  2813  	defer cancel()
  2814  	ctx = metadata.NewOutgoingContext(ctx, testMetadata)
  2815  	stream, err := tc.FullDuplexCall(ctx)
  2816  	if err != nil {
  2817  		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
  2818  	}
  2819  
  2820  	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
  2821  	if err != nil {
  2822  		t.Fatal(err)
  2823  	}
  2824  
  2825  	req := &testpb.StreamingOutputCallRequest{
  2826  		ResponseType: testpb.PayloadType_COMPRESSABLE,
  2827  		ResponseParameters: []*testpb.ResponseParameters{
  2828  			{Size: respSize},
  2829  		},
  2830  		Payload: payload,
  2831  	}
  2832  	if err := stream.Send(req); err != nil {
  2833  		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
  2834  	}
  2835  	if _, err := stream.Recv(); err != nil {
  2836  		t.Fatalf("%v.Recv() = %v, want <nil>", stream, err)
  2837  	}
  2838  	if err := stream.CloseSend(); err != nil {
  2839  		t.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil)
  2840  	}
  2841  	if _, err := stream.Recv(); err != io.EOF {
  2842  		t.Fatalf("%v failed to complele the FullDuplexCall: %v", stream, err)
  2843  	}
  2844  
  2845  	header, err := stream.Header()
  2846  	if err != nil {
  2847  		t.Fatalf("%v.Header() = _, %v, want _, <nil>", stream, err)
  2848  	}
  2849  	delete(header, "user-agent")
  2850  	delete(header, "content-type")
  2851  	delete(header, "grpc-accept-encoding")
  2852  	expectedHeader := metadata.Join(testMetadata, testMetadata2)
  2853  	if !reflect.DeepEqual(header, expectedHeader) {
  2854  		t.Fatalf("Received header metadata %v, want %v", header, expectedHeader)
  2855  	}
  2856  
  2857  }
  2858  
  2859  func (s) TestMultipleSetHeaderStreamingRPCError(t *testing.T) {
  2860  	for _, e := range listTestEnv() {
  2861  		if e.name == "handler-tls" {
  2862  			continue
  2863  		}
  2864  		testMultipleSetHeaderStreamingRPCError(t, e)
  2865  	}
  2866  }
  2867  
  2868  // To test header metadata is sent when sending status.
  2869  func testMultipleSetHeaderStreamingRPCError(t *testing.T, e env) {
  2870  	te := newTest(t, e)
  2871  	te.startServer(&testServer{security: e.security, setHeaderOnly: true})
  2872  	defer te.tearDown()
  2873  	tc := testgrpc.NewTestServiceClient(te.clientConn())
  2874  
  2875  	const (
  2876  		argSize  = 1
  2877  		respSize = -1
  2878  	)
  2879  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  2880  	defer cancel()
  2881  	ctx = metadata.NewOutgoingContext(ctx, testMetadata)
  2882  	stream, err := tc.FullDuplexCall(ctx)
  2883  	if err != nil {
  2884  		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
  2885  	}
  2886  
  2887  	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
  2888  	if err != nil {
  2889  		t.Fatal(err)
  2890  	}
  2891  
  2892  	req := &testpb.StreamingOutputCallRequest{
  2893  		ResponseType: testpb.PayloadType_COMPRESSABLE,
  2894  		ResponseParameters: []*testpb.ResponseParameters{
  2895  			{Size: respSize},
  2896  		},
  2897  		Payload: payload,
  2898  	}
  2899  	if err := stream.Send(req); err != nil {
  2900  		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
  2901  	}
  2902  	if _, err := stream.Recv(); err == nil {
  2903  		t.Fatalf("%v.Recv() = %v, want <non-nil>", stream, err)
  2904  	}
  2905  
  2906  	header, err := stream.Header()
  2907  	if err != nil {
  2908  		t.Fatalf("%v.Header() = _, %v, want _, <nil>", stream, err)
  2909  	}
  2910  	delete(header, "user-agent")
  2911  	delete(header, "content-type")
  2912  	delete(header, "grpc-accept-encoding")
  2913  	expectedHeader := metadata.Join(testMetadata, testMetadata2)
  2914  	if !reflect.DeepEqual(header, expectedHeader) {
  2915  		t.Fatalf("Received header metadata %v, want %v", header, expectedHeader)
  2916  	}
  2917  	if err := stream.CloseSend(); err != nil {
  2918  		t.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil)
  2919  	}
  2920  }
  2921  
  2922  // TestMalformedHTTP2Metadata verfies the returned error when the client
  2923  // sends an illegal metadata.
  2924  func (s) TestMalformedHTTP2Metadata(t *testing.T) {
  2925  	for _, e := range listTestEnv() {
  2926  		if e.name == "handler-tls" {
  2927  			// Failed with "server stops accepting new RPCs".
  2928  			// Server stops accepting new RPCs when the client sends an illegal http2 header.
  2929  			continue
  2930  		}
  2931  		testMalformedHTTP2Metadata(t, e)
  2932  	}
  2933  }
  2934  
  2935  func testMalformedHTTP2Metadata(t *testing.T, e env) {
  2936  	te := newTest(t, e)
  2937  	te.startServer(&testServer{security: e.security})
  2938  	defer te.tearDown()
  2939  	tc := testgrpc.NewTestServiceClient(te.clientConn())
  2940  
  2941  	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 2718)
  2942  	if err != nil {
  2943  		t.Fatal(err)
  2944  	}
  2945  
  2946  	req := &testpb.SimpleRequest{
  2947  		ResponseType: testpb.PayloadType_COMPRESSABLE,
  2948  		ResponseSize: 314,
  2949  		Payload:      payload,
  2950  	}
  2951  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  2952  	defer cancel()
  2953  	ctx = metadata.NewOutgoingContext(ctx, malformedHTTP2Metadata)
  2954  	if _, err := tc.UnaryCall(ctx, req); status.Code(err) != codes.Internal {
  2955  		t.Fatalf("TestService.UnaryCall(%v, _) = _, %v; want _, %s", ctx, err, codes.Internal)
  2956  	}
  2957  }
  2958  
  2959  // Tests that the client transparently retries correctly when receiving a
  2960  // RST_STREAM with code REFUSED_STREAM.
  2961  func (s) TestTransparentRetry(t *testing.T) {
  2962  	testCases := []struct {
  2963  		failFast bool
  2964  		errCode  codes.Code
  2965  	}{{
  2966  		// success attempt: 1, (stream ID 1)
  2967  	}, {
  2968  		// success attempt: 2, (stream IDs 3, 5)
  2969  	}, {
  2970  		// no success attempt (stream IDs 7, 9)
  2971  		errCode: codes.Unavailable,
  2972  	}, {
  2973  		// success attempt: 1 (stream ID 11),
  2974  		failFast: true,
  2975  	}, {
  2976  		// success attempt: 2 (stream IDs 13, 15),
  2977  		failFast: true,
  2978  	}, {
  2979  		// no success attempt (stream IDs 17, 19)
  2980  		failFast: true,
  2981  		errCode:  codes.Unavailable,
  2982  	}}
  2983  
  2984  	lis, err := net.Listen("tcp", "localhost:0")
  2985  	if err != nil {
  2986  		t.Fatalf("Failed to listen. Err: %v", err)
  2987  	}
  2988  	defer lis.Close()
  2989  	server := &httpServer{
  2990  		responses: []httpServerResponse{{
  2991  			trailers: [][]string{{
  2992  				":status", "200",
  2993  				"content-type", "application/grpc",
  2994  				"grpc-status", "0",
  2995  			}},
  2996  		}},
  2997  		refuseStream: func(i uint32) bool {
  2998  			switch i {
  2999  			case 1, 5, 11, 15: // these stream IDs succeed
  3000  				return false
  3001  			}
  3002  			return true // these are refused
  3003  		},
  3004  	}
  3005  	server.start(t, lis)
  3006  	cc, err := grpc.Dial(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
  3007  	if err != nil {
  3008  		t.Fatalf("failed to dial due to err: %v", err)
  3009  	}
  3010  	defer cc.Close()
  3011  
  3012  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  3013  	defer cancel()
  3014  
  3015  	client := testgrpc.NewTestServiceClient(cc)
  3016  
  3017  	for i, tc := range testCases {
  3018  		stream, err := client.FullDuplexCall(ctx)
  3019  		if err != nil {
  3020  			t.Fatalf("error creating stream due to err: %v", err)
  3021  		}
  3022  		code := func(err error) codes.Code {
  3023  			if err == io.EOF {
  3024  				return codes.OK
  3025  			}
  3026  			return status.Code(err)
  3027  		}
  3028  		if _, err := stream.Recv(); code(err) != tc.errCode {
  3029  			t.Fatalf("%v: stream.Recv() = _, %v, want error code: %v", i, err, tc.errCode)
  3030  		}
  3031  
  3032  	}
  3033  }
  3034  
  3035  func (s) TestCancel(t *testing.T) {
  3036  	for _, e := range listTestEnv() {
  3037  		t.Run(e.name, func(t *testing.T) {
  3038  			testCancel(t, e)
  3039  		})
  3040  	}
  3041  }
  3042  
  3043  func testCancel(t *testing.T, e env) {
  3044  	te := newTest(t, e)
  3045  	te.declareLogNoise("grpc: the client connection is closing; please retry")
  3046  	te.startServer(&testServer{security: e.security, unaryCallSleepTime: time.Second})
  3047  	defer te.tearDown()
  3048  
  3049  	cc := te.clientConn()
  3050  	tc := testgrpc.NewTestServiceClient(cc)
  3051  
  3052  	const argSize = 2718
  3053  	const respSize = 314
  3054  
  3055  	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
  3056  	if err != nil {
  3057  		t.Fatal(err)
  3058  	}
  3059  
  3060  	req := &testpb.SimpleRequest{
  3061  		ResponseType: testpb.PayloadType_COMPRESSABLE,
  3062  		ResponseSize: respSize,
  3063  		Payload:      payload,
  3064  	}
  3065  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  3066  	time.AfterFunc(1*time.Millisecond, cancel)
  3067  	if r, err := tc.UnaryCall(ctx, req); status.Code(err) != codes.Canceled {
  3068  		t.Fatalf("TestService/UnaryCall(_, _) = %v, %v; want _, error code: %s", r, err, codes.Canceled)
  3069  	}
  3070  	awaitNewConnLogOutput()
  3071  }
  3072  
  3073  func (s) TestCancelNoIO(t *testing.T) {
  3074  	for _, e := range listTestEnv() {
  3075  		testCancelNoIO(t, e)
  3076  	}
  3077  }
  3078  
  3079  func testCancelNoIO(t *testing.T, e env) {
  3080  	te := newTest(t, e)
  3081  	te.declareLogNoise("http2Client.notifyError got notified that the client transport was broken")
  3082  	te.maxStream = 1 // Only allows 1 live stream per server transport.
  3083  	te.startServer(&testServer{security: e.security})
  3084  	defer te.tearDown()
  3085  
  3086  	cc := te.clientConn()
  3087  	tc := testgrpc.NewTestServiceClient(cc)
  3088  
  3089  	// Start one blocked RPC for which we'll never send streaming
  3090  	// input. This will consume the 1 maximum concurrent streams,
  3091  	// causing future RPCs to hang.
  3092  	ctx, cancelFirst := context.WithTimeout(context.Background(), defaultTestTimeout)
  3093  	_, err := tc.StreamingInputCall(ctx)
  3094  	if err != nil {
  3095  		t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, <nil>", tc, err)
  3096  	}
  3097  
  3098  	// Loop until the ClientConn receives the initial settings
  3099  	// frame from the server, notifying it about the maximum
  3100  	// concurrent streams. We know when it's received it because
  3101  	// an RPC will fail with codes.DeadlineExceeded instead of
  3102  	// succeeding.
  3103  	// TODO(bradfitz): add internal test hook for this (Issue 534)
  3104  	for {
  3105  		ctx, cancelSecond := context.WithTimeout(context.Background(), defaultTestShortTimeout)
  3106  		_, err := tc.StreamingInputCall(ctx)
  3107  		cancelSecond()
  3108  		if err == nil {
  3109  			continue
  3110  		}
  3111  		if status.Code(err) == codes.DeadlineExceeded {
  3112  			break
  3113  		}
  3114  		t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, %s", tc, err, codes.DeadlineExceeded)
  3115  	}
  3116  	// If there are any RPCs in flight before the client receives
  3117  	// the max streams setting, let them be expired.
  3118  	// TODO(bradfitz): add internal test hook for this (Issue 534)
  3119  	time.Sleep(50 * time.Millisecond)
  3120  
  3121  	go func() {
  3122  		time.Sleep(50 * time.Millisecond)
  3123  		cancelFirst()
  3124  	}()
  3125  
  3126  	// This should be blocked until the 1st is canceled, then succeed.
  3127  	ctx, cancelThird := context.WithTimeout(context.Background(), defaultTestShortTimeout)
  3128  	if _, err := tc.StreamingInputCall(ctx); err != nil {
  3129  		t.Errorf("%v.StreamingInputCall(_) = _, %v, want _, <nil>", tc, err)
  3130  	}
  3131  	cancelThird()
  3132  }
  3133  
  3134  // The following tests the gRPC streaming RPC implementations.
  3135  // TODO(zhaoq): Have better coverage on error cases.
  3136  var (
  3137  	reqSizes  = []int{27182, 8, 1828, 45904}
  3138  	respSizes = []int{31415, 9, 2653, 58979}
  3139  )
  3140  
  3141  func (s) TestNoService(t *testing.T) {
  3142  	for _, e := range listTestEnv() {
  3143  		testNoService(t, e)
  3144  	}
  3145  }
  3146  
  3147  func testNoService(t *testing.T, e env) {
  3148  	te := newTest(t, e)
  3149  	te.startServer(nil)
  3150  	defer te.tearDown()
  3151  
  3152  	cc := te.clientConn()
  3153  	tc := testgrpc.NewTestServiceClient(cc)
  3154  
  3155  	stream, err := tc.FullDuplexCall(te.ctx, grpc.WaitForReady(true))
  3156  	if err != nil {
  3157  		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
  3158  	}
  3159  	if _, err := stream.Recv(); status.Code(err) != codes.Unimplemented {
  3160  		t.Fatalf("stream.Recv() = _, %v, want _, error code %s", err, codes.Unimplemented)
  3161  	}
  3162  }
  3163  
  3164  func (s) TestPingPong(t *testing.T) {
  3165  	for _, e := range listTestEnv() {
  3166  		testPingPong(t, e)
  3167  	}
  3168  }
  3169  
  3170  func testPingPong(t *testing.T, e env) {
  3171  	te := newTest(t, e)
  3172  	te.startServer(&testServer{security: e.security})
  3173  	defer te.tearDown()
  3174  	tc := testgrpc.NewTestServiceClient(te.clientConn())
  3175  
  3176  	stream, err := tc.FullDuplexCall(te.ctx)
  3177  	if err != nil {
  3178  		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
  3179  	}
  3180  	var index int
  3181  	for index < len(reqSizes) {
  3182  		respParam := []*testpb.ResponseParameters{
  3183  			{
  3184  				Size: int32(respSizes[index]),
  3185  			},
  3186  		}
  3187  
  3188  		payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(reqSizes[index]))
  3189  		if err != nil {
  3190  			t.Fatal(err)
  3191  		}
  3192  
  3193  		req := &testpb.StreamingOutputCallRequest{
  3194  			ResponseType:       testpb.PayloadType_COMPRESSABLE,
  3195  			ResponseParameters: respParam,
  3196  			Payload:            payload,
  3197  		}
  3198  		if err := stream.Send(req); err != nil {
  3199  			t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
  3200  		}
  3201  		reply, err := stream.Recv()
  3202  		if err != nil {
  3203  			t.Fatalf("%v.Recv() = %v, want <nil>", stream, err)
  3204  		}
  3205  		pt := reply.GetPayload().GetType()
  3206  		if pt != testpb.PayloadType_COMPRESSABLE {
  3207  			t.Fatalf("Got the reply of type %d, want %d", pt, testpb.PayloadType_COMPRESSABLE)
  3208  		}
  3209  		size := len(reply.GetPayload().GetBody())
  3210  		if size != int(respSizes[index]) {
  3211  			t.Fatalf("Got reply body of length %d, want %d", size, respSizes[index])
  3212  		}
  3213  		index++
  3214  	}
  3215  	if err := stream.CloseSend(); err != nil {
  3216  		t.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil)
  3217  	}
  3218  	if _, err := stream.Recv(); err != io.EOF {
  3219  		t.Fatalf("%v failed to complele the ping pong test: %v", stream, err)
  3220  	}
  3221  }
  3222  
  3223  func (s) TestMetadataStreamingRPC(t *testing.T) {
  3224  	for _, e := range listTestEnv() {
  3225  		testMetadataStreamingRPC(t, e)
  3226  	}
  3227  }
  3228  
  3229  func testMetadataStreamingRPC(t *testing.T, e env) {
  3230  	te := newTest(t, e)
  3231  	te.startServer(&testServer{security: e.security})
  3232  	defer te.tearDown()
  3233  	tc := testgrpc.NewTestServiceClient(te.clientConn())
  3234  
  3235  	ctx := metadata.NewOutgoingContext(te.ctx, testMetadata)
  3236  	stream, err := tc.FullDuplexCall(ctx)
  3237  	if err != nil {
  3238  		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
  3239  	}
  3240  	go func() {
  3241  		headerMD, err := stream.Header()
  3242  		if e.security == "tls" {
  3243  			delete(headerMD, "transport_security_type")
  3244  		}
  3245  		delete(headerMD, "trailer") // ignore if present
  3246  		delete(headerMD, "user-agent")
  3247  		delete(headerMD, "content-type")
  3248  		delete(headerMD, "grpc-accept-encoding")
  3249  		if err != nil || !reflect.DeepEqual(testMetadata, headerMD) {
  3250  			t.Errorf("#1 %v.Header() = %v, %v, want %v, <nil>", stream, headerMD, err, testMetadata)
  3251  		}
  3252  		// test the cached value.
  3253  		headerMD, err = stream.Header()
  3254  		delete(headerMD, "trailer") // ignore if present
  3255  		delete(headerMD, "user-agent")
  3256  		delete(headerMD, "content-type")
  3257  		delete(headerMD, "grpc-accept-encoding")
  3258  		if err != nil || !reflect.DeepEqual(testMetadata, headerMD) {
  3259  			t.Errorf("#2 %v.Header() = %v, %v, want %v, <nil>", stream, headerMD, err, testMetadata)
  3260  		}
  3261  		err = func() error {
  3262  			for index := 0; index < len(reqSizes); index++ {
  3263  				respParam := []*testpb.ResponseParameters{
  3264  					{
  3265  						Size: int32(respSizes[index]),
  3266  					},
  3267  				}
  3268  
  3269  				payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(reqSizes[index]))
  3270  				if err != nil {
  3271  					return err
  3272  				}
  3273  
  3274  				req := &testpb.StreamingOutputCallRequest{
  3275  					ResponseType:       testpb.PayloadType_COMPRESSABLE,
  3276  					ResponseParameters: respParam,
  3277  					Payload:            payload,
  3278  				}
  3279  				if err := stream.Send(req); err != nil {
  3280  					return fmt.Errorf("%v.Send(%v) = %v, want <nil>", stream, req, err)
  3281  				}
  3282  			}
  3283  			return nil
  3284  		}()
  3285  		// Tell the server we're done sending args.
  3286  		stream.CloseSend()
  3287  		if err != nil {
  3288  			t.Error(err)
  3289  		}
  3290  	}()
  3291  	for {
  3292  		if _, err := stream.Recv(); err != nil {
  3293  			break
  3294  		}
  3295  	}
  3296  	trailerMD := stream.Trailer()
  3297  	if !reflect.DeepEqual(testTrailerMetadata, trailerMD) {
  3298  		t.Fatalf("%v.Trailer() = %v, want %v", stream, trailerMD, testTrailerMetadata)
  3299  	}
  3300  }
  3301  
  3302  func (s) TestServerStreaming(t *testing.T) {
  3303  	for _, e := range listTestEnv() {
  3304  		testServerStreaming(t, e)
  3305  	}
  3306  }
  3307  
  3308  func testServerStreaming(t *testing.T, e env) {
  3309  	te := newTest(t, e)
  3310  	te.startServer(&testServer{security: e.security})
  3311  	defer te.tearDown()
  3312  	tc := testgrpc.NewTestServiceClient(te.clientConn())
  3313  
  3314  	respParam := make([]*testpb.ResponseParameters, len(respSizes))
  3315  	for i, s := range respSizes {
  3316  		respParam[i] = &testpb.ResponseParameters{
  3317  			Size: int32(s),
  3318  		}
  3319  	}
  3320  	req := &testpb.StreamingOutputCallRequest{
  3321  		ResponseType:       testpb.PayloadType_COMPRESSABLE,
  3322  		ResponseParameters: respParam,
  3323  	}
  3324  
  3325  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  3326  	defer cancel()
  3327  	stream, err := tc.StreamingOutputCall(ctx, req)
  3328  	if err != nil {
  3329  		t.Fatalf("%v.StreamingOutputCall(_) = _, %v, want <nil>", tc, err)
  3330  	}
  3331  	var rpcStatus error
  3332  	var respCnt int
  3333  	var index int
  3334  	for {
  3335  		reply, err := stream.Recv()
  3336  		if err != nil {
  3337  			rpcStatus = err
  3338  			break
  3339  		}
  3340  		pt := reply.GetPayload().GetType()
  3341  		if pt != testpb.PayloadType_COMPRESSABLE {
  3342  			t.Fatalf("Got the reply of type %d, want %d", pt, testpb.PayloadType_COMPRESSABLE)
  3343  		}
  3344  		size := len(reply.GetPayload().GetBody())
  3345  		if size != int(respSizes[index]) {
  3346  			t.Fatalf("Got reply body of length %d, want %d", size, respSizes[index])
  3347  		}
  3348  		index++
  3349  		respCnt++
  3350  	}
  3351  	if rpcStatus != io.EOF {
  3352  		t.Fatalf("Failed to finish the server streaming rpc: %v, want <EOF>", rpcStatus)
  3353  	}
  3354  	if respCnt != len(respSizes) {
  3355  		t.Fatalf("Got %d reply, want %d", len(respSizes), respCnt)
  3356  	}
  3357  }
  3358  
  3359  func (s) TestFailedServerStreaming(t *testing.T) {
  3360  	for _, e := range listTestEnv() {
  3361  		testFailedServerStreaming(t, e)
  3362  	}
  3363  }
  3364  
  3365  func testFailedServerStreaming(t *testing.T, e env) {
  3366  	te := newTest(t, e)
  3367  	te.userAgent = failAppUA
  3368  	te.startServer(&testServer{security: e.security})
  3369  	defer te.tearDown()
  3370  	tc := testgrpc.NewTestServiceClient(te.clientConn())
  3371  
  3372  	respParam := make([]*testpb.ResponseParameters, len(respSizes))
  3373  	for i, s := range respSizes {
  3374  		respParam[i] = &testpb.ResponseParameters{
  3375  			Size: int32(s),
  3376  		}
  3377  	}
  3378  	req := &testpb.StreamingOutputCallRequest{
  3379  		ResponseType:       testpb.PayloadType_COMPRESSABLE,
  3380  		ResponseParameters: respParam,
  3381  	}
  3382  	ctx := metadata.NewOutgoingContext(te.ctx, testMetadata)
  3383  	stream, err := tc.StreamingOutputCall(ctx, req)
  3384  	if err != nil {
  3385  		t.Fatalf("%v.StreamingOutputCall(_) = _, %v, want <nil>", tc, err)
  3386  	}
  3387  	wantErr := status.Error(codes.DataLoss, "error for testing: "+failAppUA)
  3388  	if _, err := stream.Recv(); !equalError(err, wantErr) {
  3389  		t.Fatalf("%v.Recv() = _, %v, want _, %v", stream, err, wantErr)
  3390  	}
  3391  }
  3392  
  3393  func equalError(x, y error) bool {
  3394  	return x == y || (x != nil && y != nil && x.Error() == y.Error())
  3395  }
  3396  
  3397  // concurrentSendServer is a TestServiceServer whose
  3398  // StreamingOutputCall makes ten serial Send calls, sending payloads
  3399  // "0".."9", inclusive.  TestServerStreamingConcurrent verifies they
  3400  // were received in the correct order, and that there were no races.
  3401  //
  3402  // All other TestServiceServer methods crash if called.
  3403  type concurrentSendServer struct {
  3404  	testgrpc.TestServiceServer
  3405  }
  3406  
  3407  func (s concurrentSendServer) StreamingOutputCall(args *testpb.StreamingOutputCallRequest, stream testgrpc.TestService_StreamingOutputCallServer) error {
  3408  	for i := 0; i < 10; i++ {
  3409  		stream.Send(&testpb.StreamingOutputCallResponse{
  3410  			Payload: &testpb.Payload{
  3411  				Body: []byte{'0' + uint8(i)},
  3412  			},
  3413  		})
  3414  	}
  3415  	return nil
  3416  }
  3417  
  3418  // Tests doing a bunch of concurrent streaming output calls.
  3419  func (s) TestServerStreamingConcurrent(t *testing.T) {
  3420  	for _, e := range listTestEnv() {
  3421  		testServerStreamingConcurrent(t, e)
  3422  	}
  3423  }
  3424  
  3425  func testServerStreamingConcurrent(t *testing.T, e env) {
  3426  	te := newTest(t, e)
  3427  	te.startServer(concurrentSendServer{})
  3428  	defer te.tearDown()
  3429  
  3430  	cc := te.clientConn()
  3431  	tc := testgrpc.NewTestServiceClient(cc)
  3432  
  3433  	doStreamingCall := func() {
  3434  		req := &testpb.StreamingOutputCallRequest{}
  3435  		ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  3436  		defer cancel()
  3437  		stream, err := tc.StreamingOutputCall(ctx, req)
  3438  		if err != nil {
  3439  			t.Errorf("%v.StreamingOutputCall(_) = _, %v, want <nil>", tc, err)
  3440  			return
  3441  		}
  3442  		var ngot int
  3443  		var buf bytes.Buffer
  3444  		for {
  3445  			reply, err := stream.Recv()
  3446  			if err == io.EOF {
  3447  				break
  3448  			}
  3449  			if err != nil {
  3450  				t.Fatal(err)
  3451  			}
  3452  			ngot++
  3453  			if buf.Len() > 0 {
  3454  				buf.WriteByte(',')
  3455  			}
  3456  			buf.Write(reply.GetPayload().GetBody())
  3457  		}
  3458  		if want := 10; ngot != want {
  3459  			t.Errorf("Got %d replies, want %d", ngot, want)
  3460  		}
  3461  		if got, want := buf.String(), "0,1,2,3,4,5,6,7,8,9"; got != want {
  3462  			t.Errorf("Got replies %q; want %q", got, want)
  3463  		}
  3464  	}
  3465  
  3466  	var wg sync.WaitGroup
  3467  	for i := 0; i < 20; i++ {
  3468  		wg.Add(1)
  3469  		go func() {
  3470  			defer wg.Done()
  3471  			doStreamingCall()
  3472  		}()
  3473  	}
  3474  	wg.Wait()
  3475  
  3476  }
  3477  
  3478  func generatePayloadSizes() [][]int {
  3479  	reqSizes := [][]int{
  3480  		{27182, 8, 1828, 45904},
  3481  	}
  3482  
  3483  	num8KPayloads := 1024
  3484  	eightKPayloads := []int{}
  3485  	for i := 0; i < num8KPayloads; i++ {
  3486  		eightKPayloads = append(eightKPayloads, (1 << 13))
  3487  	}
  3488  	reqSizes = append(reqSizes, eightKPayloads)
  3489  
  3490  	num2MPayloads := 8
  3491  	twoMPayloads := []int{}
  3492  	for i := 0; i < num2MPayloads; i++ {
  3493  		twoMPayloads = append(twoMPayloads, (1 << 21))
  3494  	}
  3495  	reqSizes = append(reqSizes, twoMPayloads)
  3496  
  3497  	return reqSizes
  3498  }
  3499  
  3500  func (s) TestClientStreaming(t *testing.T) {
  3501  	for _, s := range generatePayloadSizes() {
  3502  		for _, e := range listTestEnv() {
  3503  			testClientStreaming(t, e, s)
  3504  		}
  3505  	}
  3506  }
  3507  
  3508  func testClientStreaming(t *testing.T, e env, sizes []int) {
  3509  	te := newTest(t, e)
  3510  	te.startServer(&testServer{security: e.security})
  3511  	defer te.tearDown()
  3512  	tc := testgrpc.NewTestServiceClient(te.clientConn())
  3513  
  3514  	ctx, cancel := context.WithTimeout(te.ctx, defaultTestTimeout)
  3515  	defer cancel()
  3516  	stream, err := tc.StreamingInputCall(ctx)
  3517  	if err != nil {
  3518  		t.Fatalf("%v.StreamingInputCall(_) = _, %v, want <nil>", tc, err)
  3519  	}
  3520  
  3521  	var sum int
  3522  	for _, s := range sizes {
  3523  		payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(s))
  3524  		if err != nil {
  3525  			t.Fatal(err)
  3526  		}
  3527  
  3528  		req := &testpb.StreamingInputCallRequest{
  3529  			Payload: payload,
  3530  		}
  3531  		if err := stream.Send(req); err != nil {
  3532  			t.Fatalf("%v.Send(_) = %v, want <nil>", stream, err)
  3533  		}
  3534  		sum += s
  3535  	}
  3536  	reply, err := stream.CloseAndRecv()
  3537  	if err != nil {
  3538  		t.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil)
  3539  	}
  3540  	if reply.GetAggregatedPayloadSize() != int32(sum) {
  3541  		t.Fatalf("%v.CloseAndRecv().GetAggregatePayloadSize() = %v; want %v", stream, reply.GetAggregatedPayloadSize(), sum)
  3542  	}
  3543  }
  3544  
  3545  func (s) TestClientStreamingError(t *testing.T) {
  3546  	for _, e := range listTestEnv() {
  3547  		if e.name == "handler-tls" {
  3548  			continue
  3549  		}
  3550  		testClientStreamingError(t, e)
  3551  	}
  3552  }
  3553  
  3554  func testClientStreamingError(t *testing.T, e env) {
  3555  	te := newTest(t, e)
  3556  	te.startServer(&testServer{security: e.security, earlyFail: true})
  3557  	defer te.tearDown()
  3558  	tc := testgrpc.NewTestServiceClient(te.clientConn())
  3559  
  3560  	stream, err := tc.StreamingInputCall(te.ctx)
  3561  	if err != nil {
  3562  		t.Fatalf("%v.StreamingInputCall(_) = _, %v, want <nil>", tc, err)
  3563  	}
  3564  	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 1)
  3565  	if err != nil {
  3566  		t.Fatal(err)
  3567  	}
  3568  
  3569  	req := &testpb.StreamingInputCallRequest{
  3570  		Payload: payload,
  3571  	}
  3572  	// The 1st request should go through.
  3573  	if err := stream.Send(req); err != nil {
  3574  		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
  3575  	}
  3576  	for {
  3577  		if err := stream.Send(req); err != io.EOF {
  3578  			continue
  3579  		}
  3580  		if _, err := stream.CloseAndRecv(); status.Code(err) != codes.NotFound {
  3581  			t.Fatalf("%v.CloseAndRecv() = %v, want error %s", stream, err, codes.NotFound)
  3582  		}
  3583  		break
  3584  	}
  3585  }
  3586  
  3587  func (s) TestExceedMaxStreamsLimit(t *testing.T) {
  3588  	for _, e := range listTestEnv() {
  3589  		testExceedMaxStreamsLimit(t, e)
  3590  	}
  3591  }
  3592  
  3593  func testExceedMaxStreamsLimit(t *testing.T, e env) {
  3594  	te := newTest(t, e)
  3595  	te.declareLogNoise(
  3596  		"http2Client.notifyError got notified that the client transport was broken",
  3597  		"Conn.resetTransport failed to create client transport",
  3598  		"grpc: the connection is closing",
  3599  	)
  3600  	te.maxStream = 1 // Only allows 1 live stream per server transport.
  3601  	te.startServer(&testServer{security: e.security})
  3602  	defer te.tearDown()
  3603  
  3604  	cc := te.clientConn()
  3605  	tc := testgrpc.NewTestServiceClient(cc)
  3606  
  3607  	_, err := tc.StreamingInputCall(te.ctx)
  3608  	if err != nil {
  3609  		t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, <nil>", tc, err)
  3610  	}
  3611  	// Loop until receiving the new max stream setting from the server.
  3612  	for {
  3613  		ctx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
  3614  		defer cancel()
  3615  		_, err := tc.StreamingInputCall(ctx)
  3616  		if err == nil {
  3617  			time.Sleep(50 * time.Millisecond)
  3618  			continue
  3619  		}
  3620  		if status.Code(err) == codes.DeadlineExceeded {
  3621  			break
  3622  		}
  3623  		t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, %s", tc, err, codes.DeadlineExceeded)
  3624  	}
  3625  }
  3626  
  3627  func (s) TestStreamsQuotaRecovery(t *testing.T) {
  3628  	for _, e := range listTestEnv() {
  3629  		testStreamsQuotaRecovery(t, e)
  3630  	}
  3631  }
  3632  
  3633  func testStreamsQuotaRecovery(t *testing.T, e env) {
  3634  	te := newTest(t, e)
  3635  	te.declareLogNoise(
  3636  		"http2Client.notifyError got notified that the client transport was broken",
  3637  		"Conn.resetTransport failed to create client transport",
  3638  		"grpc: the connection is closing",
  3639  	)
  3640  	te.maxStream = 1 // Allows 1 live stream.
  3641  	te.startServer(&testServer{security: e.security})
  3642  	defer te.tearDown()
  3643  
  3644  	cc := te.clientConn()
  3645  	tc := testgrpc.NewTestServiceClient(cc)
  3646  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  3647  	defer cancel()
  3648  	if _, err := tc.StreamingInputCall(ctx); err != nil {
  3649  		t.Fatalf("tc.StreamingInputCall(_) = _, %v, want _, <nil>", err)
  3650  	}
  3651  	// Loop until the new max stream setting is effective.
  3652  	for {
  3653  		ctx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
  3654  		_, err := tc.StreamingInputCall(ctx)
  3655  		cancel()
  3656  		if err == nil {
  3657  			time.Sleep(5 * time.Millisecond)
  3658  			continue
  3659  		}
  3660  		if status.Code(err) == codes.DeadlineExceeded {
  3661  			break
  3662  		}
  3663  		t.Fatalf("tc.StreamingInputCall(_) = _, %v, want _, %s", err, codes.DeadlineExceeded)
  3664  	}
  3665  
  3666  	var wg sync.WaitGroup
  3667  	for i := 0; i < 10; i++ {
  3668  		wg.Add(1)
  3669  		go func() {
  3670  			defer wg.Done()
  3671  			payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 314)
  3672  			if err != nil {
  3673  				t.Error(err)
  3674  				return
  3675  			}
  3676  			req := &testpb.SimpleRequest{
  3677  				ResponseType: testpb.PayloadType_COMPRESSABLE,
  3678  				ResponseSize: 1592,
  3679  				Payload:      payload,
  3680  			}
  3681  			// No rpc should go through due to the max streams limit.
  3682  			ctx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
  3683  			defer cancel()
  3684  			if _, err := tc.UnaryCall(ctx, req, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded {
  3685  				t.Errorf("tc.UnaryCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
  3686  			}
  3687  		}()
  3688  	}
  3689  	wg.Wait()
  3690  
  3691  	cancel()
  3692  	// A new stream should be allowed after canceling the first one.
  3693  	ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout)
  3694  	defer cancel()
  3695  	if _, err := tc.StreamingInputCall(ctx); err != nil {
  3696  		t.Fatalf("tc.StreamingInputCall(_) = _, %v, want _, %v", err, nil)
  3697  	}
  3698  }
  3699  
  3700  func (s) TestUnaryClientInterceptor(t *testing.T) {
  3701  	for _, e := range listTestEnv() {
  3702  		testUnaryClientInterceptor(t, e)
  3703  	}
  3704  }
  3705  
  3706  func failOkayRPC(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
  3707  	err := invoker(ctx, method, req, reply, cc, opts...)
  3708  	if err == nil {
  3709  		return status.Error(codes.NotFound, "")
  3710  	}
  3711  	return err
  3712  }
  3713  
  3714  func testUnaryClientInterceptor(t *testing.T, e env) {
  3715  	te := newTest(t, e)
  3716  	te.userAgent = testAppUA
  3717  	te.unaryClientInt = failOkayRPC
  3718  	te.startServer(&testServer{security: e.security})
  3719  	defer te.tearDown()
  3720  
  3721  	tc := testgrpc.NewTestServiceClient(te.clientConn())
  3722  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  3723  	defer cancel()
  3724  	if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.NotFound {
  3725  		t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, error code %s", tc, err, codes.NotFound)
  3726  	}
  3727  }
  3728  
  3729  func (s) TestStreamClientInterceptor(t *testing.T) {
  3730  	for _, e := range listTestEnv() {
  3731  		testStreamClientInterceptor(t, e)
  3732  	}
  3733  }
  3734  
  3735  func failOkayStream(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
  3736  	s, err := streamer(ctx, desc, cc, method, opts...)
  3737  	if err == nil {
  3738  		return nil, status.Error(codes.NotFound, "")
  3739  	}
  3740  	return s, nil
  3741  }
  3742  
  3743  func testStreamClientInterceptor(t *testing.T, e env) {
  3744  	te := newTest(t, e)
  3745  	te.streamClientInt = failOkayStream
  3746  	te.startServer(&testServer{security: e.security})
  3747  	defer te.tearDown()
  3748  
  3749  	tc := testgrpc.NewTestServiceClient(te.clientConn())
  3750  	respParam := []*testpb.ResponseParameters{
  3751  		{
  3752  			Size: int32(1),
  3753  		},
  3754  	}
  3755  	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(1))
  3756  	if err != nil {
  3757  		t.Fatal(err)
  3758  	}
  3759  	req := &testpb.StreamingOutputCallRequest{
  3760  		ResponseType:       testpb.PayloadType_COMPRESSABLE,
  3761  		ResponseParameters: respParam,
  3762  		Payload:            payload,
  3763  	}
  3764  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  3765  	defer cancel()
  3766  	if _, err := tc.StreamingOutputCall(ctx, req); status.Code(err) != codes.NotFound {
  3767  		t.Fatalf("%v.StreamingOutputCall(_) = _, %v, want _, error code %s", tc, err, codes.NotFound)
  3768  	}
  3769  }
  3770  
  3771  func (s) TestUnaryServerInterceptor(t *testing.T) {
  3772  	for _, e := range listTestEnv() {
  3773  		testUnaryServerInterceptor(t, e)
  3774  	}
  3775  }
  3776  
  3777  func errInjector(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
  3778  	return nil, status.Error(codes.PermissionDenied, "")
  3779  }
  3780  
  3781  func testUnaryServerInterceptor(t *testing.T, e env) {
  3782  	te := newTest(t, e)
  3783  	te.unaryServerInt = errInjector
  3784  	te.startServer(&testServer{security: e.security})
  3785  	defer te.tearDown()
  3786  
  3787  	tc := testgrpc.NewTestServiceClient(te.clientConn())
  3788  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  3789  	defer cancel()
  3790  	if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.PermissionDenied {
  3791  		t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, error code %s", tc, err, codes.PermissionDenied)
  3792  	}
  3793  }
  3794  
  3795  func (s) TestStreamServerInterceptor(t *testing.T) {
  3796  	for _, e := range listTestEnv() {
  3797  		// TODO(bradfitz): Temporarily skip this env due to #619.
  3798  		if e.name == "handler-tls" {
  3799  			continue
  3800  		}
  3801  		testStreamServerInterceptor(t, e)
  3802  	}
  3803  }
  3804  
  3805  func fullDuplexOnly(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
  3806  	if info.FullMethod == "/grpc.testing.TestService/FullDuplexCall" {
  3807  		return handler(srv, ss)
  3808  	}
  3809  	// Reject the other methods.
  3810  	return status.Error(codes.PermissionDenied, "")
  3811  }
  3812  
  3813  func testStreamServerInterceptor(t *testing.T, e env) {
  3814  	te := newTest(t, e)
  3815  	te.streamServerInt = fullDuplexOnly
  3816  	te.startServer(&testServer{security: e.security})
  3817  	defer te.tearDown()
  3818  
  3819  	tc := testgrpc.NewTestServiceClient(te.clientConn())
  3820  	respParam := []*testpb.ResponseParameters{
  3821  		{
  3822  			Size: int32(1),
  3823  		},
  3824  	}
  3825  	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(1))
  3826  	if err != nil {
  3827  		t.Fatal(err)
  3828  	}
  3829  	req := &testpb.StreamingOutputCallRequest{
  3830  		ResponseType:       testpb.PayloadType_COMPRESSABLE,
  3831  		ResponseParameters: respParam,
  3832  		Payload:            payload,
  3833  	}
  3834  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  3835  	defer cancel()
  3836  	s1, err := tc.StreamingOutputCall(ctx, req)
  3837  	if err != nil {
  3838  		t.Fatalf("%v.StreamingOutputCall(_) = _, %v, want _, <nil>", tc, err)
  3839  	}
  3840  	if _, err := s1.Recv(); status.Code(err) != codes.PermissionDenied {
  3841  		t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, error code %s", tc, err, codes.PermissionDenied)
  3842  	}
  3843  	s2, err := tc.FullDuplexCall(ctx)
  3844  	if err != nil {
  3845  		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
  3846  	}
  3847  	if err := s2.Send(req); err != nil {
  3848  		t.Fatalf("%v.Send(_) = %v, want <nil>", s2, err)
  3849  	}
  3850  	if _, err := s2.Recv(); err != nil {
  3851  		t.Fatalf("%v.Recv() = _, %v, want _, <nil>", s2, err)
  3852  	}
  3853  }
  3854  
  3855  // funcServer implements methods of TestServiceServer using funcs,
  3856  // similar to an http.HandlerFunc.
  3857  // Any unimplemented method will crash. Tests implement the method(s)
  3858  // they need.
  3859  type funcServer struct {
  3860  	testgrpc.TestServiceServer
  3861  	unaryCall          func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error)
  3862  	streamingInputCall func(stream testgrpc.TestService_StreamingInputCallServer) error
  3863  	fullDuplexCall     func(stream testgrpc.TestService_FullDuplexCallServer) error
  3864  }
  3865  
  3866  func (s *funcServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
  3867  	return s.unaryCall(ctx, in)
  3868  }
  3869  
  3870  func (s *funcServer) StreamingInputCall(stream testgrpc.TestService_StreamingInputCallServer) error {
  3871  	return s.streamingInputCall(stream)
  3872  }
  3873  
  3874  func (s *funcServer) FullDuplexCall(stream testgrpc.TestService_FullDuplexCallServer) error {
  3875  	return s.fullDuplexCall(stream)
  3876  }
  3877  
  3878  func (s) TestClientRequestBodyErrorUnexpectedEOF(t *testing.T) {
  3879  	for _, e := range listTestEnv() {
  3880  		testClientRequestBodyErrorUnexpectedEOF(t, e)
  3881  	}
  3882  }
  3883  
  3884  func testClientRequestBodyErrorUnexpectedEOF(t *testing.T, e env) {
  3885  	te := newTest(t, e)
  3886  	ts := &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
  3887  		errUnexpectedCall := errors.New("unexpected call func server method")
  3888  		t.Error(errUnexpectedCall)
  3889  		return nil, errUnexpectedCall
  3890  	}}
  3891  	te.startServer(ts)
  3892  	defer te.tearDown()
  3893  	te.withServerTester(func(st *serverTester) {
  3894  		st.writeHeadersGRPC(1, "/grpc.testing.TestService/UnaryCall", false)
  3895  		// Say we have 5 bytes coming, but set END_STREAM flag:
  3896  		st.writeData(1, true, []byte{0, 0, 0, 0, 5})
  3897  		st.wantAnyFrame() // wait for server to crash (it used to crash)
  3898  	})
  3899  }
  3900  
  3901  func (s) TestClientRequestBodyErrorCloseAfterLength(t *testing.T) {
  3902  	for _, e := range listTestEnv() {
  3903  		testClientRequestBodyErrorCloseAfterLength(t, e)
  3904  	}
  3905  }
  3906  
  3907  // Tests gRPC server's behavior when a gRPC client sends a frame with an invalid
  3908  // streamID. Per [HTTP/2 spec]: Streams initiated by a client MUST use
  3909  // odd-numbered stream identifiers. This test sets up a test server and sends a
  3910  // header frame with stream ID of 2. The test asserts that a subsequent read on
  3911  // the transport sends a GoAwayFrame with error code: PROTOCOL_ERROR.
  3912  //
  3913  // [HTTP/2 spec]: https://httpwg.org/specs/rfc7540.html#StreamIdentifiers
  3914  func (s) TestClientInvalidStreamID(t *testing.T) {
  3915  	lis, err := net.Listen("tcp", "localhost:0")
  3916  	if err != nil {
  3917  		t.Fatalf("Failed to listen: %v", err)
  3918  	}
  3919  	defer lis.Close()
  3920  	s := grpc.NewServer()
  3921  	defer s.Stop()
  3922  	go s.Serve(lis)
  3923  
  3924  	conn, err := net.DialTimeout("tcp", lis.Addr().String(), defaultTestTimeout)
  3925  	if err != nil {
  3926  		t.Fatalf("Failed to dial: %v", err)
  3927  	}
  3928  	st := newServerTesterFromConn(t, conn)
  3929  	st.greet()
  3930  	st.writeHeadersGRPC(2, "/grpc.testing.TestService/StreamingInputCall", true)
  3931  	goAwayFrame := st.wantGoAway(http2.ErrCodeProtocol)
  3932  	want := "received an illegal stream id: 2."
  3933  	if got := string(goAwayFrame.DebugData()); !strings.Contains(got, want) {
  3934  		t.Fatalf(" Received: %v, Expected error message to contain: %v.", got, want)
  3935  	}
  3936  }
  3937  
  3938  // TestInvalidStreamIDSmallerThanPrevious tests the server sends a GOAWAY frame
  3939  // with error code: PROTOCOL_ERROR when the streamID of the current frame is
  3940  // lower than the previous frames.
  3941  func (s) TestInvalidStreamIDSmallerThanPrevious(t *testing.T) {
  3942  	lis, err := net.Listen("tcp", "localhost:0")
  3943  	if err != nil {
  3944  		t.Fatalf("Failed to listen: %v", err)
  3945  	}
  3946  	defer lis.Close()
  3947  	s := grpc.NewServer()
  3948  	defer s.Stop()
  3949  	go s.Serve(lis)
  3950  
  3951  	conn, err := net.DialTimeout("tcp", lis.Addr().String(), defaultTestTimeout)
  3952  	if err != nil {
  3953  		t.Fatalf("Failed to dial: %v", err)
  3954  	}
  3955  	st := newServerTesterFromConn(t, conn)
  3956  	st.greet()
  3957  	st.writeHeadersGRPC(3, "/grpc.testing.TestService/StreamingInputCall", true)
  3958  	st.wantAnyFrame()
  3959  	st.writeHeadersGRPC(1, "/grpc.testing.TestService/StreamingInputCall", true)
  3960  	goAwayFrame := st.wantGoAway(http2.ErrCodeProtocol)
  3961  	want := "received an illegal stream id: 1"
  3962  	if got := string(goAwayFrame.DebugData()); !strings.Contains(got, want) {
  3963  		t.Fatalf(" Received: %v, Expected error message to contain: %v.", got, want)
  3964  	}
  3965  }
  3966  
  3967  func testClientRequestBodyErrorCloseAfterLength(t *testing.T, e env) {
  3968  	te := newTest(t, e)
  3969  	te.declareLogNoise("Server.processUnaryRPC failed to write status")
  3970  	ts := &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
  3971  		errUnexpectedCall := errors.New("unexpected call func server method")
  3972  		t.Error(errUnexpectedCall)
  3973  		return nil, errUnexpectedCall
  3974  	}}
  3975  	te.startServer(ts)
  3976  	defer te.tearDown()
  3977  	te.withServerTester(func(st *serverTester) {
  3978  		st.writeHeadersGRPC(1, "/grpc.testing.TestService/UnaryCall", false)
  3979  		// say we're sending 5 bytes, but then close the connection instead.
  3980  		st.writeData(1, false, []byte{0, 0, 0, 0, 5})
  3981  		st.cc.Close()
  3982  	})
  3983  }
  3984  
  3985  func (s) TestClientRequestBodyErrorCancel(t *testing.T) {
  3986  	for _, e := range listTestEnv() {
  3987  		testClientRequestBodyErrorCancel(t, e)
  3988  	}
  3989  }
  3990  
  3991  func testClientRequestBodyErrorCancel(t *testing.T, e env) {
  3992  	te := newTest(t, e)
  3993  	gotCall := make(chan bool, 1)
  3994  	ts := &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
  3995  		gotCall <- true
  3996  		return new(testpb.SimpleResponse), nil
  3997  	}}
  3998  	te.startServer(ts)
  3999  	defer te.tearDown()
  4000  	te.withServerTester(func(st *serverTester) {
  4001  		st.writeHeadersGRPC(1, "/grpc.testing.TestService/UnaryCall", false)
  4002  		// Say we have 5 bytes coming, but cancel it instead.
  4003  		st.writeRSTStream(1, http2.ErrCodeCancel)
  4004  		st.writeData(1, false, []byte{0, 0, 0, 0, 5})
  4005  
  4006  		// Verify we didn't a call yet.
  4007  		select {
  4008  		case <-gotCall:
  4009  			t.Fatal("unexpected call")
  4010  		default:
  4011  		}
  4012  
  4013  		// And now send an uncanceled (but still invalid), just to get a response.
  4014  		st.writeHeadersGRPC(3, "/grpc.testing.TestService/UnaryCall", false)
  4015  		st.writeData(3, true, []byte{0, 0, 0, 0, 0})
  4016  		<-gotCall
  4017  		st.wantAnyFrame()
  4018  	})
  4019  }
  4020  
  4021  func (s) TestClientRequestBodyErrorCancelStreamingInput(t *testing.T) {
  4022  	for _, e := range listTestEnv() {
  4023  		testClientRequestBodyErrorCancelStreamingInput(t, e)
  4024  	}
  4025  }
  4026  
  4027  func testClientRequestBodyErrorCancelStreamingInput(t *testing.T, e env) {
  4028  	te := newTest(t, e)
  4029  	recvErr := make(chan error, 1)
  4030  	ts := &funcServer{streamingInputCall: func(stream testgrpc.TestService_StreamingInputCallServer) error {
  4031  		_, err := stream.Recv()
  4032  		recvErr <- err
  4033  		return nil
  4034  	}}
  4035  	te.startServer(ts)
  4036  	defer te.tearDown()
  4037  	te.withServerTester(func(st *serverTester) {
  4038  		st.writeHeadersGRPC(1, "/grpc.testing.TestService/StreamingInputCall", false)
  4039  		// Say we have 5 bytes coming, but cancel it instead.
  4040  		st.writeData(1, false, []byte{0, 0, 0, 0, 5})
  4041  		st.writeRSTStream(1, http2.ErrCodeCancel)
  4042  
  4043  		var got error
  4044  		select {
  4045  		case got = <-recvErr:
  4046  		case <-time.After(3 * time.Second):
  4047  			t.Fatal("timeout waiting for error")
  4048  		}
  4049  		if grpc.Code(got) != codes.Canceled {
  4050  			t.Errorf("error = %#v; want error code %s", got, codes.Canceled)
  4051  		}
  4052  	})
  4053  }
  4054  
  4055  func (s) TestClientInitialHeaderEndStream(t *testing.T) {
  4056  	for _, e := range listTestEnv() {
  4057  		if e.httpHandler {
  4058  			continue
  4059  		}
  4060  		testClientInitialHeaderEndStream(t, e)
  4061  	}
  4062  }
  4063  
  4064  func testClientInitialHeaderEndStream(t *testing.T, e env) {
  4065  	// To ensure RST_STREAM is sent for illegal data write and not normal stream
  4066  	// close.
  4067  	frameCheckingDone := make(chan struct{})
  4068  	// To ensure goroutine for test does not end before RPC handler performs error
  4069  	// checking.
  4070  	handlerDone := make(chan struct{})
  4071  	te := newTest(t, e)
  4072  	ts := &funcServer{streamingInputCall: func(stream testgrpc.TestService_StreamingInputCallServer) error {
  4073  		defer close(handlerDone)
  4074  		// Block on serverTester receiving RST_STREAM. This ensures server has closed
  4075  		// stream before stream.Recv().
  4076  		<-frameCheckingDone
  4077  		data, err := stream.Recv()
  4078  		if err == nil {
  4079  			t.Errorf("unexpected data received in func server method: '%v'", data)
  4080  		} else if status.Code(err) != codes.Canceled {
  4081  			t.Errorf("expected canceled error, instead received '%v'", err)
  4082  		}
  4083  		return nil
  4084  	}}
  4085  	te.startServer(ts)
  4086  	defer te.tearDown()
  4087  	te.withServerTester(func(st *serverTester) {
  4088  		// Send a headers with END_STREAM flag, but then write data.
  4089  		st.writeHeadersGRPC(1, "/grpc.testing.TestService/StreamingInputCall", true)
  4090  		st.writeData(1, false, []byte{0, 0, 0, 0, 0})
  4091  		st.wantAnyFrame()
  4092  		st.wantAnyFrame()
  4093  		st.wantRSTStream(http2.ErrCodeStreamClosed)
  4094  		close(frameCheckingDone)
  4095  		<-handlerDone
  4096  	})
  4097  }
  4098  
  4099  func (s) TestClientSendDataAfterCloseSend(t *testing.T) {
  4100  	for _, e := range listTestEnv() {
  4101  		if e.httpHandler {
  4102  			continue
  4103  		}
  4104  		testClientSendDataAfterCloseSend(t, e)
  4105  	}
  4106  }
  4107  
  4108  func testClientSendDataAfterCloseSend(t *testing.T, e env) {
  4109  	// To ensure RST_STREAM is sent for illegal data write prior to execution of RPC
  4110  	// handler.
  4111  	frameCheckingDone := make(chan struct{})
  4112  	// To ensure goroutine for test does not end before RPC handler performs error
  4113  	// checking.
  4114  	handlerDone := make(chan struct{})
  4115  	te := newTest(t, e)
  4116  	ts := &funcServer{streamingInputCall: func(stream testgrpc.TestService_StreamingInputCallServer) error {
  4117  		defer close(handlerDone)
  4118  		// Block on serverTester receiving RST_STREAM. This ensures server has closed
  4119  		// stream before stream.Recv().
  4120  		<-frameCheckingDone
  4121  		for {
  4122  			_, err := stream.Recv()
  4123  			if err == io.EOF {
  4124  				break
  4125  			}
  4126  			if err != nil {
  4127  				if status.Code(err) != codes.Canceled {
  4128  					t.Errorf("expected canceled error, instead received '%v'", err)
  4129  				}
  4130  				break
  4131  			}
  4132  		}
  4133  		if err := stream.SendMsg(nil); err == nil {
  4134  			t.Error("expected error sending message on stream after stream closed due to illegal data")
  4135  		} else if status.Code(err) != codes.Canceled {
  4136  			t.Errorf("expected cancel error, instead received '%v'", err)
  4137  		}
  4138  		return nil
  4139  	}}
  4140  	te.startServer(ts)
  4141  	defer te.tearDown()
  4142  	te.withServerTester(func(st *serverTester) {
  4143  		st.writeHeadersGRPC(1, "/grpc.testing.TestService/StreamingInputCall", false)
  4144  		// Send data with END_STREAM flag, but then write more data.
  4145  		st.writeData(1, true, []byte{0, 0, 0, 0, 0})
  4146  		st.writeData(1, false, []byte{0, 0, 0, 0, 0})
  4147  		st.wantAnyFrame()
  4148  		st.wantAnyFrame()
  4149  		st.wantRSTStream(http2.ErrCodeStreamClosed)
  4150  		close(frameCheckingDone)
  4151  		<-handlerDone
  4152  	})
  4153  }
  4154  
  4155  func (s) TestClientResourceExhaustedCancelFullDuplex(t *testing.T) {
  4156  	for _, e := range listTestEnv() {
  4157  		if e.httpHandler {
  4158  			// httpHandler write won't be blocked on flow control window.
  4159  			continue
  4160  		}
  4161  		testClientResourceExhaustedCancelFullDuplex(t, e)
  4162  	}
  4163  }
  4164  
  4165  func testClientResourceExhaustedCancelFullDuplex(t *testing.T, e env) {
  4166  	te := newTest(t, e)
  4167  	recvErr := make(chan error, 1)
  4168  	ts := &funcServer{fullDuplexCall: func(stream testgrpc.TestService_FullDuplexCallServer) error {
  4169  		defer close(recvErr)
  4170  		_, err := stream.Recv()
  4171  		if err != nil {
  4172  			return status.Errorf(codes.Internal, "stream.Recv() got error: %v, want <nil>", err)
  4173  		}
  4174  		// create a payload that's larger than the default flow control window.
  4175  		payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 10)
  4176  		if err != nil {
  4177  			return err
  4178  		}
  4179  		resp := &testpb.StreamingOutputCallResponse{
  4180  			Payload: payload,
  4181  		}
  4182  		ce := make(chan error, 1)
  4183  		go func() {
  4184  			var err error
  4185  			for {
  4186  				if err = stream.Send(resp); err != nil {
  4187  					break
  4188  				}
  4189  			}
  4190  			ce <- err
  4191  		}()
  4192  		select {
  4193  		case err = <-ce:
  4194  		case <-time.After(10 * time.Second):
  4195  			err = errors.New("10s timeout reached")
  4196  		}
  4197  		recvErr <- err
  4198  		return err
  4199  	}}
  4200  	te.startServer(ts)
  4201  	defer te.tearDown()
  4202  	// set a low limit on receive message size to error with Resource Exhausted on
  4203  	// client side when server send a large message.
  4204  	te.maxClientReceiveMsgSize = newInt(10)
  4205  	cc := te.clientConn()
  4206  	tc := testgrpc.NewTestServiceClient(cc)
  4207  
  4208  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  4209  	defer cancel()
  4210  	stream, err := tc.FullDuplexCall(ctx)
  4211  	if err != nil {
  4212  		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
  4213  	}
  4214  	req := &testpb.StreamingOutputCallRequest{}
  4215  	if err := stream.Send(req); err != nil {
  4216  		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
  4217  	}
  4218  	if _, err := stream.Recv(); status.Code(err) != codes.ResourceExhausted {
  4219  		t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
  4220  	}
  4221  	err = <-recvErr
  4222  	if status.Code(err) != codes.Canceled {
  4223  		t.Fatalf("server got error %v, want error code: %s", err, codes.Canceled)
  4224  	}
  4225  }
  4226  
  4227  type clientFailCreds struct{}
  4228  
  4229  func (c *clientFailCreds) ServerHandshake(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
  4230  	return rawConn, nil, nil
  4231  }
  4232  func (c *clientFailCreds) ClientHandshake(ctx context.Context, authority string, rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
  4233  	return nil, nil, fmt.Errorf("client handshake fails with fatal error")
  4234  }
  4235  func (c *clientFailCreds) Info() credentials.ProtocolInfo {
  4236  	return credentials.ProtocolInfo{}
  4237  }
  4238  func (c *clientFailCreds) Clone() credentials.TransportCredentials {
  4239  	return c
  4240  }
  4241  func (c *clientFailCreds) OverrideServerName(s string) error {
  4242  	return nil
  4243  }
  4244  
  4245  // This test makes sure that failfast RPCs fail if client handshake fails with
  4246  // fatal errors.
  4247  func (s) TestFailfastRPCFailOnFatalHandshakeError(t *testing.T) {
  4248  	lis, err := net.Listen("tcp", "localhost:0")
  4249  	if err != nil {
  4250  		t.Fatalf("Failed to listen: %v", err)
  4251  	}
  4252  	defer lis.Close()
  4253  
  4254  	cc, err := grpc.Dial("passthrough:///"+lis.Addr().String(), grpc.WithTransportCredentials(&clientFailCreds{}))
  4255  	if err != nil {
  4256  		t.Fatalf("grpc.Dial(_) = %v", err)
  4257  	}
  4258  	defer cc.Close()
  4259  
  4260  	tc := testgrpc.NewTestServiceClient(cc)
  4261  	// This unary call should fail, but not timeout.
  4262  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  4263  	defer cancel()
  4264  	if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(false)); status.Code(err) != codes.Unavailable {
  4265  		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want <Unavailable>", err)
  4266  	}
  4267  }
  4268  
  4269  func (s) TestFlowControlLogicalRace(t *testing.T) {
  4270  	// Test for a regression of https://github.com/grpc/grpc-go/issues/632,
  4271  	// and other flow control bugs.
  4272  
  4273  	const (
  4274  		itemCount   = 100
  4275  		itemSize    = 1 << 10
  4276  		recvCount   = 2
  4277  		maxFailures = 3
  4278  	)
  4279  
  4280  	requestCount := 3000
  4281  	if raceMode {
  4282  		requestCount = 1000
  4283  	}
  4284  
  4285  	lis, err := net.Listen("tcp", "localhost:0")
  4286  	if err != nil {
  4287  		t.Fatalf("Failed to listen: %v", err)
  4288  	}
  4289  	defer lis.Close()
  4290  
  4291  	s := grpc.NewServer()
  4292  	testgrpc.RegisterTestServiceServer(s, &flowControlLogicalRaceServer{
  4293  		itemCount: itemCount,
  4294  		itemSize:  itemSize,
  4295  	})
  4296  	defer s.Stop()
  4297  
  4298  	go s.Serve(lis)
  4299  
  4300  	cc, err := grpc.Dial(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
  4301  	if err != nil {
  4302  		t.Fatalf("grpc.Dial(%q) = %v", lis.Addr().String(), err)
  4303  	}
  4304  	defer cc.Close()
  4305  	cl := testgrpc.NewTestServiceClient(cc)
  4306  
  4307  	failures := 0
  4308  	for i := 0; i < requestCount; i++ {
  4309  		ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  4310  		output, err := cl.StreamingOutputCall(ctx, &testpb.StreamingOutputCallRequest{})
  4311  		if err != nil {
  4312  			t.Fatalf("StreamingOutputCall; err = %q", err)
  4313  		}
  4314  
  4315  		for j := 0; j < recvCount; j++ {
  4316  			if _, err := output.Recv(); err != nil {
  4317  				if err == io.EOF || status.Code(err) == codes.DeadlineExceeded {
  4318  					t.Errorf("got %d responses to request %d", j, i)
  4319  					failures++
  4320  					break
  4321  				}
  4322  				t.Fatalf("Recv; err = %q", err)
  4323  			}
  4324  		}
  4325  		cancel()
  4326  
  4327  		if failures >= maxFailures {
  4328  			// Continue past the first failure to see if the connection is
  4329  			// entirely broken, or if only a single RPC was affected
  4330  			t.Fatalf("Too many failures received; aborting")
  4331  		}
  4332  	}
  4333  }
  4334  
  4335  type flowControlLogicalRaceServer struct {
  4336  	testgrpc.TestServiceServer
  4337  
  4338  	itemSize  int
  4339  	itemCount int
  4340  }
  4341  
  4342  func (s *flowControlLogicalRaceServer) StreamingOutputCall(req *testpb.StreamingOutputCallRequest, srv testgrpc.TestService_StreamingOutputCallServer) error {
  4343  	for i := 0; i < s.itemCount; i++ {
  4344  		err := srv.Send(&testpb.StreamingOutputCallResponse{
  4345  			Payload: &testpb.Payload{
  4346  				// Sending a large stream of data which the client reject
  4347  				// helps to trigger some types of flow control bugs.
  4348  				//
  4349  				// Reallocating memory here is inefficient, but the stress it
  4350  				// puts on the GC leads to more frequent flow control
  4351  				// failures. The GC likely causes more variety in the
  4352  				// goroutine scheduling orders.
  4353  				Body: bytes.Repeat([]byte("a"), s.itemSize),
  4354  			},
  4355  		})
  4356  		if err != nil {
  4357  			return err
  4358  		}
  4359  	}
  4360  	return nil
  4361  }
  4362  
  4363  type lockingWriter struct {
  4364  	mu sync.Mutex
  4365  	w  io.Writer
  4366  }
  4367  
  4368  func (lw *lockingWriter) Write(p []byte) (n int, err error) {
  4369  	lw.mu.Lock()
  4370  	defer lw.mu.Unlock()
  4371  	return lw.w.Write(p)
  4372  }
  4373  
  4374  func (lw *lockingWriter) setWriter(w io.Writer) {
  4375  	lw.mu.Lock()
  4376  	defer lw.mu.Unlock()
  4377  	lw.w = w
  4378  }
  4379  
  4380  var testLogOutput = &lockingWriter{w: os.Stderr}
  4381  
  4382  // awaitNewConnLogOutput waits for any of grpc.NewConn's goroutines to
  4383  // terminate, if they're still running. It spams logs with this
  4384  // message.  We wait for it so our log filter is still
  4385  // active. Otherwise the "defer restore()" at the top of various test
  4386  // functions restores our log filter and then the goroutine spams.
  4387  func awaitNewConnLogOutput() {
  4388  	awaitLogOutput(50*time.Millisecond, "grpc: the client connection is closing; please retry")
  4389  }
  4390  
  4391  func awaitLogOutput(maxWait time.Duration, phrase string) {
  4392  	pb := []byte(phrase)
  4393  
  4394  	timer := time.NewTimer(maxWait)
  4395  	defer timer.Stop()
  4396  	wakeup := make(chan bool, 1)
  4397  	for {
  4398  		if logOutputHasContents(pb, wakeup) {
  4399  			return
  4400  		}
  4401  		select {
  4402  		case <-timer.C:
  4403  			// Too slow. Oh well.
  4404  			return
  4405  		case <-wakeup:
  4406  		}
  4407  	}
  4408  }
  4409  
  4410  func logOutputHasContents(v []byte, wakeup chan<- bool) bool {
  4411  	testLogOutput.mu.Lock()
  4412  	defer testLogOutput.mu.Unlock()
  4413  	fw, ok := testLogOutput.w.(*filterWriter)
  4414  	if !ok {
  4415  		return false
  4416  	}
  4417  	fw.mu.Lock()
  4418  	defer fw.mu.Unlock()
  4419  	if bytes.Contains(fw.buf.Bytes(), v) {
  4420  		return true
  4421  	}
  4422  	fw.wakeup = wakeup
  4423  	return false
  4424  }
  4425  
  4426  var verboseLogs = flag.Bool("verbose_logs", false, "show all log output, without filtering")
  4427  
  4428  func noop() {}
  4429  
  4430  // declareLogNoise declares that t is expected to emit the following noisy
  4431  // phrases, even on success. Those phrases will be filtered from log output and
  4432  // only be shown if *verbose_logs or t ends up failing. The returned restore
  4433  // function should be called with defer to be run before the test ends.
  4434  func declareLogNoise(t *testing.T, phrases ...string) (restore func()) {
  4435  	if *verboseLogs {
  4436  		return noop
  4437  	}
  4438  	fw := &filterWriter{dst: os.Stderr, filter: phrases}
  4439  	testLogOutput.setWriter(fw)
  4440  	return func() {
  4441  		if t.Failed() {
  4442  			fw.mu.Lock()
  4443  			defer fw.mu.Unlock()
  4444  			if fw.buf.Len() > 0 {
  4445  				t.Logf("Complete log output:\n%s", fw.buf.Bytes())
  4446  			}
  4447  		}
  4448  		testLogOutput.setWriter(os.Stderr)
  4449  	}
  4450  }
  4451  
  4452  type filterWriter struct {
  4453  	dst    io.Writer
  4454  	filter []string
  4455  
  4456  	mu     sync.Mutex
  4457  	buf    bytes.Buffer
  4458  	wakeup chan<- bool // if non-nil, gets true on write
  4459  }
  4460  
  4461  func (fw *filterWriter) Write(p []byte) (n int, err error) {
  4462  	fw.mu.Lock()
  4463  	fw.buf.Write(p)
  4464  	if fw.wakeup != nil {
  4465  		select {
  4466  		case fw.wakeup <- true:
  4467  		default:
  4468  		}
  4469  	}
  4470  	fw.mu.Unlock()
  4471  
  4472  	ps := string(p)
  4473  	for _, f := range fw.filter {
  4474  		if strings.Contains(ps, f) {
  4475  			return len(p), nil
  4476  		}
  4477  	}
  4478  	return fw.dst.Write(p)
  4479  }
  4480  
  4481  func (s) TestGRPCMethod(t *testing.T) {
  4482  	var method string
  4483  	var ok bool
  4484  
  4485  	ss := &stubserver.StubServer{
  4486  		EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
  4487  			method, ok = grpc.Method(ctx)
  4488  			return &testpb.Empty{}, nil
  4489  		},
  4490  	}
  4491  	if err := ss.Start(nil); err != nil {
  4492  		t.Fatalf("Error starting endpoint server: %v", err)
  4493  	}
  4494  	defer ss.Stop()
  4495  
  4496  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  4497  	defer cancel()
  4498  
  4499  	if _, err := ss.Client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
  4500  		t.Fatalf("ss.Client.EmptyCall(_, _) = _, %v; want _, nil", err)
  4501  	}
  4502  
  4503  	if want := "/grpc.testing.TestService/EmptyCall"; !ok || method != want {
  4504  		t.Fatalf("grpc.Method(_) = %q, %v; want %q, true", method, ok, want)
  4505  	}
  4506  }
  4507  
  4508  func (s) TestUnaryProxyDoesNotForwardMetadata(t *testing.T) {
  4509  	const mdkey = "somedata"
  4510  
  4511  	// endpoint ensures mdkey is NOT in metadata and returns an error if it is.
  4512  	endpoint := &stubserver.StubServer{
  4513  		EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
  4514  			if md, ok := metadata.FromIncomingContext(ctx); !ok || md[mdkey] != nil {
  4515  				return nil, status.Errorf(codes.Internal, "endpoint: md=%v; want !contains(%q)", md, mdkey)
  4516  			}
  4517  			return &testpb.Empty{}, nil
  4518  		},
  4519  	}
  4520  	if err := endpoint.Start(nil); err != nil {
  4521  		t.Fatalf("Error starting endpoint server: %v", err)
  4522  	}
  4523  	defer endpoint.Stop()
  4524  
  4525  	// proxy ensures mdkey IS in metadata, then forwards the RPC to endpoint
  4526  	// without explicitly copying the metadata.
  4527  	proxy := &stubserver.StubServer{
  4528  		EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
  4529  			if md, ok := metadata.FromIncomingContext(ctx); !ok || md[mdkey] == nil {
  4530  				return nil, status.Errorf(codes.Internal, "proxy: md=%v; want contains(%q)", md, mdkey)
  4531  			}
  4532  			return endpoint.Client.EmptyCall(ctx, in)
  4533  		},
  4534  	}
  4535  	if err := proxy.Start(nil); err != nil {
  4536  		t.Fatalf("Error starting proxy server: %v", err)
  4537  	}
  4538  	defer proxy.Stop()
  4539  
  4540  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  4541  	defer cancel()
  4542  	md := metadata.Pairs(mdkey, "val")
  4543  	ctx = metadata.NewOutgoingContext(ctx, md)
  4544  
  4545  	// Sanity check that endpoint properly errors when it sees mdkey.
  4546  	_, err := endpoint.Client.EmptyCall(ctx, &testpb.Empty{})
  4547  	if s, ok := status.FromError(err); !ok || s.Code() != codes.Internal {
  4548  		t.Fatalf("endpoint.Client.EmptyCall(_, _) = _, %v; want _, <status with Code()=Internal>", err)
  4549  	}
  4550  
  4551  	if _, err := proxy.Client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
  4552  		t.Fatal(err.Error())
  4553  	}
  4554  }
  4555  
  4556  func (s) TestStreamingProxyDoesNotForwardMetadata(t *testing.T) {
  4557  	const mdkey = "somedata"
  4558  
  4559  	// doFDC performs a FullDuplexCall with client and returns the error from the
  4560  	// first stream.Recv call, or nil if that error is io.EOF.  Calls t.Fatal if
  4561  	// the stream cannot be established.
  4562  	doFDC := func(ctx context.Context, client testgrpc.TestServiceClient) error {
  4563  		stream, err := client.FullDuplexCall(ctx)
  4564  		if err != nil {
  4565  			t.Fatalf("Unwanted error: %v", err)
  4566  		}
  4567  		if _, err := stream.Recv(); err != io.EOF {
  4568  			return err
  4569  		}
  4570  		return nil
  4571  	}
  4572  
  4573  	// endpoint ensures mdkey is NOT in metadata and returns an error if it is.
  4574  	endpoint := &stubserver.StubServer{
  4575  		FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
  4576  			ctx := stream.Context()
  4577  			if md, ok := metadata.FromIncomingContext(ctx); !ok || md[mdkey] != nil {
  4578  				return status.Errorf(codes.Internal, "endpoint: md=%v; want !contains(%q)", md, mdkey)
  4579  			}
  4580  			return nil
  4581  		},
  4582  	}
  4583  	if err := endpoint.Start(nil); err != nil {
  4584  		t.Fatalf("Error starting endpoint server: %v", err)
  4585  	}
  4586  	defer endpoint.Stop()
  4587  
  4588  	// proxy ensures mdkey IS in metadata, then forwards the RPC to endpoint
  4589  	// without explicitly copying the metadata.
  4590  	proxy := &stubserver.StubServer{
  4591  		FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
  4592  			ctx := stream.Context()
  4593  			if md, ok := metadata.FromIncomingContext(ctx); !ok || md[mdkey] == nil {
  4594  				return status.Errorf(codes.Internal, "endpoint: md=%v; want !contains(%q)", md, mdkey)
  4595  			}
  4596  			return doFDC(ctx, endpoint.Client)
  4597  		},
  4598  	}
  4599  	if err := proxy.Start(nil); err != nil {
  4600  		t.Fatalf("Error starting proxy server: %v", err)
  4601  	}
  4602  	defer proxy.Stop()
  4603  
  4604  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  4605  	defer cancel()
  4606  	md := metadata.Pairs(mdkey, "val")
  4607  	ctx = metadata.NewOutgoingContext(ctx, md)
  4608  
  4609  	// Sanity check that endpoint properly errors when it sees mdkey in ctx.
  4610  	err := doFDC(ctx, endpoint.Client)
  4611  	if s, ok := status.FromError(err); !ok || s.Code() != codes.Internal {
  4612  		t.Fatalf("stream.Recv() = _, %v; want _, <status with Code()=Internal>", err)
  4613  	}
  4614  
  4615  	if err := doFDC(ctx, proxy.Client); err != nil {
  4616  		t.Fatalf("doFDC(_, proxy.Client) = %v; want nil", err)
  4617  	}
  4618  }
  4619  
  4620  func (s) TestStatsTagsAndTrace(t *testing.T) {
  4621  	// Data added to context by client (typically in a stats handler).
  4622  	tags := []byte{1, 5, 2, 4, 3}
  4623  	trace := []byte{5, 2, 1, 3, 4}
  4624  
  4625  	// endpoint ensures Tags() and Trace() in context match those that were added
  4626  	// by the client and returns an error if not.
  4627  	endpoint := &stubserver.StubServer{
  4628  		EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
  4629  			md, _ := metadata.FromIncomingContext(ctx)
  4630  			if tg := stats.Tags(ctx); !reflect.DeepEqual(tg, tags) {
  4631  				return nil, status.Errorf(codes.Internal, "stats.Tags(%v)=%v; want %v", ctx, tg, tags)
  4632  			}
  4633  			if !reflect.DeepEqual(md["grpc-tags-bin"], []string{string(tags)}) {
  4634  				return nil, status.Errorf(codes.Internal, "md['grpc-tags-bin']=%v; want %v", md["grpc-tags-bin"], tags)
  4635  			}
  4636  			if tr := stats.Trace(ctx); !reflect.DeepEqual(tr, trace) {
  4637  				return nil, status.Errorf(codes.Internal, "stats.Trace(%v)=%v; want %v", ctx, tr, trace)
  4638  			}
  4639  			if !reflect.DeepEqual(md["grpc-trace-bin"], []string{string(trace)}) {
  4640  				return nil, status.Errorf(codes.Internal, "md['grpc-trace-bin']=%v; want %v", md["grpc-trace-bin"], trace)
  4641  			}
  4642  			return &testpb.Empty{}, nil
  4643  		},
  4644  	}
  4645  	if err := endpoint.Start(nil); err != nil {
  4646  		t.Fatalf("Error starting endpoint server: %v", err)
  4647  	}
  4648  	defer endpoint.Stop()
  4649  
  4650  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  4651  	defer cancel()
  4652  
  4653  	testCases := []struct {
  4654  		ctx  context.Context
  4655  		want codes.Code
  4656  	}{
  4657  		{ctx: ctx, want: codes.Internal},
  4658  		{ctx: stats.SetTags(ctx, tags), want: codes.Internal},
  4659  		{ctx: stats.SetTrace(ctx, trace), want: codes.Internal},
  4660  		{ctx: stats.SetTags(stats.SetTrace(ctx, tags), tags), want: codes.Internal},
  4661  		{ctx: stats.SetTags(stats.SetTrace(ctx, trace), tags), want: codes.OK},
  4662  	}
  4663  
  4664  	for _, tc := range testCases {
  4665  		_, err := endpoint.Client.EmptyCall(tc.ctx, &testpb.Empty{})
  4666  		if tc.want == codes.OK && err != nil {
  4667  			t.Fatalf("endpoint.Client.EmptyCall(%v, _) = _, %v; want _, nil", tc.ctx, err)
  4668  		}
  4669  		if s, ok := status.FromError(err); !ok || s.Code() != tc.want {
  4670  			t.Fatalf("endpoint.Client.EmptyCall(%v, _) = _, %v; want _, <status with Code()=%v>", tc.ctx, err, tc.want)
  4671  		}
  4672  	}
  4673  }
  4674  
  4675  func (s) TestTapTimeout(t *testing.T) {
  4676  	sopts := []grpc.ServerOption{
  4677  		grpc.InTapHandle(func(ctx context.Context, _ *tap.Info) (context.Context, error) {
  4678  			c, cancel := context.WithCancel(ctx)
  4679  			// Call cancel instead of setting a deadline so we can detect which error
  4680  			// occurred -- this cancellation (desired) or the client's deadline
  4681  			// expired (indicating this cancellation did not affect the RPC).
  4682  			time.AfterFunc(10*time.Millisecond, cancel)
  4683  			return c, nil
  4684  		}),
  4685  	}
  4686  
  4687  	ss := &stubserver.StubServer{
  4688  		EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
  4689  			<-ctx.Done()
  4690  			return nil, status.Errorf(codes.Canceled, ctx.Err().Error())
  4691  		},
  4692  	}
  4693  	if err := ss.Start(sopts); err != nil {
  4694  		t.Fatalf("Error starting endpoint server: %v", err)
  4695  	}
  4696  	defer ss.Stop()
  4697  
  4698  	// This was known to be flaky; test several times.
  4699  	for i := 0; i < 10; i++ {
  4700  		// Set our own deadline in case the server hangs.
  4701  		ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  4702  		res, err := ss.Client.EmptyCall(ctx, &testpb.Empty{})
  4703  		cancel()
  4704  		if s, ok := status.FromError(err); !ok || s.Code() != codes.Canceled {
  4705  			t.Fatalf("ss.Client.EmptyCall(ctx, _) = %v, %v; want nil, <status with Code()=Canceled>", res, err)
  4706  		}
  4707  	}
  4708  
  4709  }
  4710  
  4711  func (s) TestClientWriteFailsAfterServerClosesStream(t *testing.T) {
  4712  	ss := &stubserver.StubServer{
  4713  		FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
  4714  			return status.Errorf(codes.Internal, "")
  4715  		},
  4716  	}
  4717  	sopts := []grpc.ServerOption{}
  4718  	if err := ss.Start(sopts); err != nil {
  4719  		t.Fatalf("Error starting endpoint server: %v", err)
  4720  	}
  4721  	defer ss.Stop()
  4722  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  4723  	defer cancel()
  4724  	stream, err := ss.Client.FullDuplexCall(ctx)
  4725  	if err != nil {
  4726  		t.Fatalf("Error while creating stream: %v", err)
  4727  	}
  4728  	for {
  4729  		if err := stream.Send(&testpb.StreamingOutputCallRequest{}); err == nil {
  4730  			time.Sleep(5 * time.Millisecond)
  4731  		} else if err == io.EOF {
  4732  			break // Success.
  4733  		} else {
  4734  			t.Fatalf("stream.Send(_) = %v, want io.EOF", err)
  4735  		}
  4736  	}
  4737  }
  4738  
  4739  type windowSizeConfig struct {
  4740  	serverStream int32
  4741  	serverConn   int32
  4742  	clientStream int32
  4743  	clientConn   int32
  4744  }
  4745  
  4746  func max(a, b int32) int32 {
  4747  	if a > b {
  4748  		return a
  4749  	}
  4750  	return b
  4751  }
  4752  
  4753  func (s) TestConfigurableWindowSizeWithLargeWindow(t *testing.T) {
  4754  	wc := windowSizeConfig{
  4755  		serverStream: 8 * 1024 * 1024,
  4756  		serverConn:   12 * 1024 * 1024,
  4757  		clientStream: 6 * 1024 * 1024,
  4758  		clientConn:   8 * 1024 * 1024,
  4759  	}
  4760  	for _, e := range listTestEnv() {
  4761  		testConfigurableWindowSize(t, e, wc)
  4762  	}
  4763  }
  4764  
  4765  func (s) TestConfigurableWindowSizeWithSmallWindow(t *testing.T) {
  4766  	wc := windowSizeConfig{
  4767  		serverStream: 1,
  4768  		serverConn:   1,
  4769  		clientStream: 1,
  4770  		clientConn:   1,
  4771  	}
  4772  	for _, e := range listTestEnv() {
  4773  		testConfigurableWindowSize(t, e, wc)
  4774  	}
  4775  }
  4776  
  4777  func testConfigurableWindowSize(t *testing.T, e env, wc windowSizeConfig) {
  4778  	te := newTest(t, e)
  4779  	te.serverInitialWindowSize = wc.serverStream
  4780  	te.serverInitialConnWindowSize = wc.serverConn
  4781  	te.clientInitialWindowSize = wc.clientStream
  4782  	te.clientInitialConnWindowSize = wc.clientConn
  4783  
  4784  	te.startServer(&testServer{security: e.security})
  4785  	defer te.tearDown()
  4786  
  4787  	cc := te.clientConn()
  4788  	tc := testgrpc.NewTestServiceClient(cc)
  4789  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  4790  	defer cancel()
  4791  	stream, err := tc.FullDuplexCall(ctx)
  4792  	if err != nil {
  4793  		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
  4794  	}
  4795  	numOfIter := 11
  4796  	// Set message size to exhaust largest of window sizes.
  4797  	messageSize := max(max(wc.serverStream, wc.serverConn), max(wc.clientStream, wc.clientConn)) / int32(numOfIter-1)
  4798  	messageSize = max(messageSize, 64*1024)
  4799  	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, messageSize)
  4800  	if err != nil {
  4801  		t.Fatal(err)
  4802  	}
  4803  	respParams := []*testpb.ResponseParameters{
  4804  		{
  4805  			Size: messageSize,
  4806  		},
  4807  	}
  4808  	req := &testpb.StreamingOutputCallRequest{
  4809  		ResponseType:       testpb.PayloadType_COMPRESSABLE,
  4810  		ResponseParameters: respParams,
  4811  		Payload:            payload,
  4812  	}
  4813  	for i := 0; i < numOfIter; i++ {
  4814  		if err := stream.Send(req); err != nil {
  4815  			t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
  4816  		}
  4817  		if _, err := stream.Recv(); err != nil {
  4818  			t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err)
  4819  		}
  4820  	}
  4821  	if err := stream.CloseSend(); err != nil {
  4822  		t.Fatalf("%v.CloseSend() = %v, want <nil>", stream, err)
  4823  	}
  4824  }
  4825  
  4826  func (s) TestWaitForReadyConnection(t *testing.T) {
  4827  	for _, e := range listTestEnv() {
  4828  		testWaitForReadyConnection(t, e)
  4829  	}
  4830  
  4831  }
  4832  
  4833  func testWaitForReadyConnection(t *testing.T, e env) {
  4834  	te := newTest(t, e)
  4835  	te.userAgent = testAppUA
  4836  	te.startServer(&testServer{security: e.security})
  4837  	defer te.tearDown()
  4838  
  4839  	cc := te.clientConn() // Non-blocking dial.
  4840  	tc := testgrpc.NewTestServiceClient(cc)
  4841  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  4842  	defer cancel()
  4843  	testutils.AwaitState(ctx, t, cc, connectivity.Ready)
  4844  	// Make a fail-fast RPC.
  4845  	if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
  4846  		t.Fatalf("TestService/EmptyCall(_,_) = _, %v, want _, nil", err)
  4847  	}
  4848  }
  4849  
  4850  func (s) TestSvrWriteStatusEarlyWrite(t *testing.T) {
  4851  	for _, e := range listTestEnv() {
  4852  		testSvrWriteStatusEarlyWrite(t, e)
  4853  	}
  4854  }
  4855  
  4856  func testSvrWriteStatusEarlyWrite(t *testing.T, e env) {
  4857  	te := newTest(t, e)
  4858  	const smallSize = 1024
  4859  	const largeSize = 2048
  4860  	const extraLargeSize = 4096
  4861  	te.maxServerReceiveMsgSize = newInt(largeSize)
  4862  	te.maxServerSendMsgSize = newInt(largeSize)
  4863  	smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize)
  4864  	if err != nil {
  4865  		t.Fatal(err)
  4866  	}
  4867  	extraLargePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, extraLargeSize)
  4868  	if err != nil {
  4869  		t.Fatal(err)
  4870  	}
  4871  	te.startServer(&testServer{security: e.security})
  4872  	defer te.tearDown()
  4873  	tc := testgrpc.NewTestServiceClient(te.clientConn())
  4874  	respParam := []*testpb.ResponseParameters{
  4875  		{
  4876  			Size: int32(smallSize),
  4877  		},
  4878  	}
  4879  	sreq := &testpb.StreamingOutputCallRequest{
  4880  		ResponseType:       testpb.PayloadType_COMPRESSABLE,
  4881  		ResponseParameters: respParam,
  4882  		Payload:            extraLargePayload,
  4883  	}
  4884  	// Test recv case: server receives a message larger than maxServerReceiveMsgSize.
  4885  	stream, err := tc.FullDuplexCall(te.ctx)
  4886  	if err != nil {
  4887  		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
  4888  	}
  4889  	if err = stream.Send(sreq); err != nil {
  4890  		t.Fatalf("%v.Send() = _, %v, want <nil>", stream, err)
  4891  	}
  4892  	if _, err = stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
  4893  		t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
  4894  	}
  4895  	// Test send case: server sends a message larger than maxServerSendMsgSize.
  4896  	sreq.Payload = smallPayload
  4897  	respParam[0].Size = int32(extraLargeSize)
  4898  
  4899  	stream, err = tc.FullDuplexCall(te.ctx)
  4900  	if err != nil {
  4901  		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
  4902  	}
  4903  	if err = stream.Send(sreq); err != nil {
  4904  		t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
  4905  	}
  4906  	if _, err = stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
  4907  		t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
  4908  	}
  4909  }
  4910  
  4911  // TestMalformedStreamMethod starts a test server and sends an RPC with a
  4912  // malformed method name. The server should respond with an UNIMPLEMENTED status
  4913  // code in this case.
  4914  func (s) TestMalformedStreamMethod(t *testing.T) {
  4915  	const testMethod = "a-method-name-without-any-slashes"
  4916  	te := newTest(t, tcpClearRREnv)
  4917  	te.startServer(nil)
  4918  	defer te.tearDown()
  4919  
  4920  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  4921  	defer cancel()
  4922  	err := te.clientConn().Invoke(ctx, testMethod, nil, nil)
  4923  	if gotCode := status.Code(err); gotCode != codes.Unimplemented {
  4924  		t.Fatalf("Invoke with method %q, got code %s, want %s", testMethod, gotCode, codes.Unimplemented)
  4925  	}
  4926  }
  4927  
  4928  func (s) TestMethodFromServerStream(t *testing.T) {
  4929  	const testMethod = "/package.service/method"
  4930  	e := tcpClearRREnv
  4931  	te := newTest(t, e)
  4932  	var method string
  4933  	var ok bool
  4934  	te.unknownHandler = func(srv any, stream grpc.ServerStream) error {
  4935  		method, ok = grpc.MethodFromServerStream(stream)
  4936  		return nil
  4937  	}
  4938  
  4939  	te.startServer(nil)
  4940  	defer te.tearDown()
  4941  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  4942  	defer cancel()
  4943  	_ = te.clientConn().Invoke(ctx, testMethod, nil, nil)
  4944  	if !ok || method != testMethod {
  4945  		t.Fatalf("Invoke with method %q, got %q, %v, want %q, true", testMethod, method, ok, testMethod)
  4946  	}
  4947  }
  4948  
  4949  func (s) TestInterceptorCanAccessCallOptions(t *testing.T) {
  4950  	e := tcpClearRREnv
  4951  	te := newTest(t, e)
  4952  	te.startServer(&testServer{security: e.security})
  4953  	defer te.tearDown()
  4954  
  4955  	type observedOptions struct {
  4956  		headers     []*metadata.MD
  4957  		trailers    []*metadata.MD
  4958  		peer        []*peer.Peer
  4959  		creds       []credentials.PerRPCCredentials
  4960  		failFast    []bool
  4961  		maxRecvSize []int
  4962  		maxSendSize []int
  4963  		compressor  []string
  4964  		subtype     []string
  4965  	}
  4966  	var observedOpts observedOptions
  4967  	populateOpts := func(opts []grpc.CallOption) {
  4968  		for _, o := range opts {
  4969  			switch o := o.(type) {
  4970  			case grpc.HeaderCallOption:
  4971  				observedOpts.headers = append(observedOpts.headers, o.HeaderAddr)
  4972  			case grpc.TrailerCallOption:
  4973  				observedOpts.trailers = append(observedOpts.trailers, o.TrailerAddr)
  4974  			case grpc.PeerCallOption:
  4975  				observedOpts.peer = append(observedOpts.peer, o.PeerAddr)
  4976  			case grpc.PerRPCCredsCallOption:
  4977  				observedOpts.creds = append(observedOpts.creds, o.Creds)
  4978  			case grpc.FailFastCallOption:
  4979  				observedOpts.failFast = append(observedOpts.failFast, o.FailFast)
  4980  			case grpc.MaxRecvMsgSizeCallOption:
  4981  				observedOpts.maxRecvSize = append(observedOpts.maxRecvSize, o.MaxRecvMsgSize)
  4982  			case grpc.MaxSendMsgSizeCallOption:
  4983  				observedOpts.maxSendSize = append(observedOpts.maxSendSize, o.MaxSendMsgSize)
  4984  			case grpc.CompressorCallOption:
  4985  				observedOpts.compressor = append(observedOpts.compressor, o.CompressorType)
  4986  			case grpc.ContentSubtypeCallOption:
  4987  				observedOpts.subtype = append(observedOpts.subtype, o.ContentSubtype)
  4988  			}
  4989  		}
  4990  	}
  4991  
  4992  	te.unaryClientInt = func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
  4993  		populateOpts(opts)
  4994  		return nil
  4995  	}
  4996  	te.streamClientInt = func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
  4997  		populateOpts(opts)
  4998  		return nil, nil
  4999  	}
  5000  
  5001  	defaults := []grpc.CallOption{
  5002  		grpc.WaitForReady(true),
  5003  		grpc.MaxCallRecvMsgSize(1010),
  5004  	}
  5005  	tc := testgrpc.NewTestServiceClient(te.clientConn(grpc.WithDefaultCallOptions(defaults...)))
  5006  
  5007  	var headers metadata.MD
  5008  	var trailers metadata.MD
  5009  	var pr peer.Peer
  5010  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  5011  	defer cancel()
  5012  	tc.UnaryCall(ctx, &testpb.SimpleRequest{},
  5013  		grpc.MaxCallRecvMsgSize(100),
  5014  		grpc.MaxCallSendMsgSize(200),
  5015  		grpc.PerRPCCredentials(testPerRPCCredentials{}),
  5016  		grpc.Header(&headers),
  5017  		grpc.Trailer(&trailers),
  5018  		grpc.Peer(&pr))
  5019  	expected := observedOptions{
  5020  		failFast:    []bool{false},
  5021  		maxRecvSize: []int{1010, 100},
  5022  		maxSendSize: []int{200},
  5023  		creds:       []credentials.PerRPCCredentials{testPerRPCCredentials{}},
  5024  		headers:     []*metadata.MD{&headers},
  5025  		trailers:    []*metadata.MD{&trailers},
  5026  		peer:        []*peer.Peer{&pr},
  5027  	}
  5028  
  5029  	if !reflect.DeepEqual(expected, observedOpts) {
  5030  		t.Errorf("unary call did not observe expected options: expected %#v, got %#v", expected, observedOpts)
  5031  	}
  5032  
  5033  	observedOpts = observedOptions{} // reset
  5034  
  5035  	tc.StreamingInputCall(ctx,
  5036  		grpc.WaitForReady(false),
  5037  		grpc.MaxCallSendMsgSize(2020),
  5038  		grpc.UseCompressor("comp-type"),
  5039  		grpc.CallContentSubtype("json"))
  5040  	expected = observedOptions{
  5041  		failFast:    []bool{false, true},
  5042  		maxRecvSize: []int{1010},
  5043  		maxSendSize: []int{2020},
  5044  		compressor:  []string{"comp-type"},
  5045  		subtype:     []string{"json"},
  5046  	}
  5047  
  5048  	if !reflect.DeepEqual(expected, observedOpts) {
  5049  		t.Errorf("streaming call did not observe expected options: expected %#v, got %#v", expected, observedOpts)
  5050  	}
  5051  }
  5052  
  5053  func (s) TestServeExitsWhenListenerClosed(t *testing.T) {
  5054  	ss := &stubserver.StubServer{
  5055  		EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) {
  5056  			return &testpb.Empty{}, nil
  5057  		},
  5058  	}
  5059  
  5060  	s := grpc.NewServer()
  5061  	defer s.Stop()
  5062  	testgrpc.RegisterTestServiceServer(s, ss)
  5063  
  5064  	lis, err := net.Listen("tcp", "localhost:0")
  5065  	if err != nil {
  5066  		t.Fatalf("Failed to create listener: %v", err)
  5067  	}
  5068  
  5069  	done := make(chan struct{})
  5070  	go func() {
  5071  		s.Serve(lis)
  5072  		close(done)
  5073  	}()
  5074  
  5075  	cc, err := grpc.Dial(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
  5076  	if err != nil {
  5077  		t.Fatalf("Failed to dial server: %v", err)
  5078  	}
  5079  	defer cc.Close()
  5080  	c := testgrpc.NewTestServiceClient(cc)
  5081  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  5082  	defer cancel()
  5083  	if _, err := c.EmptyCall(ctx, &testpb.Empty{}); err != nil {
  5084  		t.Fatalf("Failed to send test RPC to server: %v", err)
  5085  	}
  5086  
  5087  	if err := lis.Close(); err != nil {
  5088  		t.Fatalf("Failed to close listener: %v", err)
  5089  	}
  5090  	const timeout = 5 * time.Second
  5091  	timer := time.NewTimer(timeout)
  5092  	select {
  5093  	case <-done:
  5094  		return
  5095  	case <-timer.C:
  5096  		t.Fatalf("Serve did not return after %v", timeout)
  5097  	}
  5098  }
  5099  
  5100  // Service handler returns status with invalid utf8 message.
  5101  func (s) TestStatusInvalidUTF8Message(t *testing.T) {
  5102  	var (
  5103  		origMsg = string([]byte{0xff, 0xfe, 0xfd})
  5104  		wantMsg = "���"
  5105  	)
  5106  
  5107  	ss := &stubserver.StubServer{
  5108  		EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
  5109  			return nil, status.Errorf(codes.Internal, origMsg)
  5110  		},
  5111  	}
  5112  	if err := ss.Start(nil); err != nil {
  5113  		t.Fatalf("Error starting endpoint server: %v", err)
  5114  	}
  5115  	defer ss.Stop()
  5116  
  5117  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  5118  	defer cancel()
  5119  
  5120  	if _, err := ss.Client.EmptyCall(ctx, &testpb.Empty{}); status.Convert(err).Message() != wantMsg {
  5121  		t.Fatalf("ss.Client.EmptyCall(_, _) = _, %v (msg %q); want _, err with msg %q", err, status.Convert(err).Message(), wantMsg)
  5122  	}
  5123  }
  5124  
  5125  // Service handler returns status with details and invalid utf8 message. Proto
  5126  // will fail to marshal the status because of the invalid utf8 message. Details
  5127  // will be dropped when sending.
  5128  func (s) TestStatusInvalidUTF8Details(t *testing.T) {
  5129  	grpctest.TLogger.ExpectError("Failed to marshal rpc status")
  5130  
  5131  	var (
  5132  		origMsg = string([]byte{0xff, 0xfe, 0xfd})
  5133  		wantMsg = "���"
  5134  	)
  5135  
  5136  	ss := &stubserver.StubServer{
  5137  		EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
  5138  			st := status.New(codes.Internal, origMsg)
  5139  			st, err := st.WithDetails(&testpb.Empty{})
  5140  			if err != nil {
  5141  				return nil, err
  5142  			}
  5143  			return nil, st.Err()
  5144  		},
  5145  	}
  5146  	if err := ss.Start(nil); err != nil {
  5147  		t.Fatalf("Error starting endpoint server: %v", err)
  5148  	}
  5149  	defer ss.Stop()
  5150  
  5151  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  5152  	defer cancel()
  5153  
  5154  	_, err := ss.Client.EmptyCall(ctx, &testpb.Empty{})
  5155  	st := status.Convert(err)
  5156  	if st.Message() != wantMsg {
  5157  		t.Fatalf("ss.Client.EmptyCall(_, _) = _, %v (msg %q); want _, err with msg %q", err, st.Message(), wantMsg)
  5158  	}
  5159  	if len(st.Details()) != 0 {
  5160  		// Details should be dropped on the server side.
  5161  		t.Fatalf("RPC status contain details: %v, want no details", st.Details())
  5162  	}
  5163  }
  5164  
  5165  func (s) TestRPCTimeout(t *testing.T) {
  5166  	for _, e := range listTestEnv() {
  5167  		testRPCTimeout(t, e)
  5168  	}
  5169  }
  5170  
  5171  func testRPCTimeout(t *testing.T, e env) {
  5172  	te := newTest(t, e)
  5173  	te.startServer(&testServer{security: e.security, unaryCallSleepTime: 500 * time.Millisecond})
  5174  	defer te.tearDown()
  5175  
  5176  	cc := te.clientConn()
  5177  	tc := testgrpc.NewTestServiceClient(cc)
  5178  
  5179  	const argSize = 2718
  5180  	const respSize = 314
  5181  
  5182  	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
  5183  	if err != nil {
  5184  		t.Fatal(err)
  5185  	}
  5186  
  5187  	req := &testpb.SimpleRequest{
  5188  		ResponseType: testpb.PayloadType_COMPRESSABLE,
  5189  		ResponseSize: respSize,
  5190  		Payload:      payload,
  5191  	}
  5192  	for i := -1; i <= 10; i++ {
  5193  		ctx, cancel := context.WithTimeout(context.Background(), time.Duration(i)*time.Millisecond)
  5194  		if _, err := tc.UnaryCall(ctx, req); status.Code(err) != codes.DeadlineExceeded {
  5195  			t.Fatalf("TestService/UnaryCallv(_, _) = _, %v; want <nil>, error code: %s", err, codes.DeadlineExceeded)
  5196  		}
  5197  		cancel()
  5198  	}
  5199  }
  5200  
  5201  func (s) TestDisabledIOBuffers(t *testing.T) {
  5202  	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(60000))
  5203  	if err != nil {
  5204  		t.Fatalf("Failed to create payload: %v", err)
  5205  	}
  5206  	req := &testpb.StreamingOutputCallRequest{
  5207  		Payload: payload,
  5208  	}
  5209  	resp := &testpb.StreamingOutputCallResponse{
  5210  		Payload: payload,
  5211  	}
  5212  
  5213  	ss := &stubserver.StubServer{
  5214  		FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
  5215  			for {
  5216  				in, err := stream.Recv()
  5217  				if err == io.EOF {
  5218  					return nil
  5219  				}
  5220  				if err != nil {
  5221  					t.Errorf("stream.Recv() = _, %v, want _, <nil>", err)
  5222  					return err
  5223  				}
  5224  				if !reflect.DeepEqual(in.Payload.Body, payload.Body) {
  5225  					t.Errorf("Received message(len: %v) on server not what was expected(len: %v).", len(in.Payload.Body), len(payload.Body))
  5226  					return err
  5227  				}
  5228  				if err := stream.Send(resp); err != nil {
  5229  					t.Errorf("stream.Send(_)= %v, want <nil>", err)
  5230  					return err
  5231  				}
  5232  
  5233  			}
  5234  		},
  5235  	}
  5236  
  5237  	s := grpc.NewServer(grpc.WriteBufferSize(0), grpc.ReadBufferSize(0))
  5238  	testgrpc.RegisterTestServiceServer(s, ss)
  5239  
  5240  	lis, err := net.Listen("tcp", "localhost:0")
  5241  	if err != nil {
  5242  		t.Fatalf("Failed to create listener: %v", err)
  5243  	}
  5244  
  5245  	go func() {
  5246  		s.Serve(lis)
  5247  	}()
  5248  	defer s.Stop()
  5249  	dctx, dcancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  5250  	defer dcancel()
  5251  	cc, err := grpc.DialContext(dctx, lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithWriteBufferSize(0), grpc.WithReadBufferSize(0))
  5252  	if err != nil {
  5253  		t.Fatalf("Failed to dial server")
  5254  	}
  5255  	defer cc.Close()
  5256  	c := testgrpc.NewTestServiceClient(cc)
  5257  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  5258  	defer cancel()
  5259  	stream, err := c.FullDuplexCall(ctx, grpc.WaitForReady(true))
  5260  	if err != nil {
  5261  		t.Fatalf("Failed to send test RPC to server")
  5262  	}
  5263  	for i := 0; i < 10; i++ {
  5264  		if err := stream.Send(req); err != nil {
  5265  			t.Fatalf("stream.Send(_) = %v, want <nil>", err)
  5266  		}
  5267  		in, err := stream.Recv()
  5268  		if err != nil {
  5269  			t.Fatalf("stream.Recv() = _, %v, want _, <nil>", err)
  5270  		}
  5271  		if !reflect.DeepEqual(in.Payload.Body, payload.Body) {
  5272  			t.Fatalf("Received message(len: %v) on client not what was expected(len: %v).", len(in.Payload.Body), len(payload.Body))
  5273  		}
  5274  	}
  5275  	stream.CloseSend()
  5276  	if _, err := stream.Recv(); err != io.EOF {
  5277  		t.Fatalf("stream.Recv() = _, %v, want _, io.EOF", err)
  5278  	}
  5279  }
  5280  
  5281  func (s) TestServerMaxHeaderListSizeClientUserViolation(t *testing.T) {
  5282  	for _, e := range listTestEnv() {
  5283  		if e.httpHandler {
  5284  			continue
  5285  		}
  5286  		testServerMaxHeaderListSizeClientUserViolation(t, e)
  5287  	}
  5288  }
  5289  
  5290  func testServerMaxHeaderListSizeClientUserViolation(t *testing.T, e env) {
  5291  	te := newTest(t, e)
  5292  	te.maxServerHeaderListSize = new(uint32)
  5293  	*te.maxServerHeaderListSize = 216
  5294  	te.startServer(&testServer{security: e.security})
  5295  	defer te.tearDown()
  5296  
  5297  	cc := te.clientConn()
  5298  	tc := testgrpc.NewTestServiceClient(cc)
  5299  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  5300  	defer cancel()
  5301  	metadata.AppendToOutgoingContext(ctx, "oversize", string(make([]byte, 216)))
  5302  	var err error
  5303  	if err = verifyResultWithDelay(func() (bool, error) {
  5304  		if _, err = tc.EmptyCall(ctx, &testpb.Empty{}); err != nil && status.Code(err) == codes.Internal {
  5305  			return true, nil
  5306  		}
  5307  		return false, fmt.Errorf("tc.EmptyCall() = _, err: %v, want _, error code: %v", err, codes.Internal)
  5308  	}); err != nil {
  5309  		t.Fatal(err)
  5310  	}
  5311  }
  5312  
  5313  func (s) TestClientMaxHeaderListSizeServerUserViolation(t *testing.T) {
  5314  	for _, e := range listTestEnv() {
  5315  		if e.httpHandler {
  5316  			continue
  5317  		}
  5318  		testClientMaxHeaderListSizeServerUserViolation(t, e)
  5319  	}
  5320  }
  5321  
  5322  func testClientMaxHeaderListSizeServerUserViolation(t *testing.T, e env) {
  5323  	te := newTest(t, e)
  5324  	te.maxClientHeaderListSize = new(uint32)
  5325  	*te.maxClientHeaderListSize = 1 // any header server sends will violate
  5326  	te.startServer(&testServer{security: e.security})
  5327  	defer te.tearDown()
  5328  
  5329  	cc := te.clientConn()
  5330  	tc := testgrpc.NewTestServiceClient(cc)
  5331  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  5332  	defer cancel()
  5333  	var err error
  5334  	if err = verifyResultWithDelay(func() (bool, error) {
  5335  		if _, err = tc.EmptyCall(ctx, &testpb.Empty{}); err != nil && status.Code(err) == codes.Internal {
  5336  			return true, nil
  5337  		}
  5338  		return false, fmt.Errorf("tc.EmptyCall() = _, err: %v, want _, error code: %v", err, codes.Internal)
  5339  	}); err != nil {
  5340  		t.Fatal(err)
  5341  	}
  5342  }
  5343  
  5344  func (s) TestServerMaxHeaderListSizeClientIntentionalViolation(t *testing.T) {
  5345  	for _, e := range listTestEnv() {
  5346  		if e.httpHandler || e.security == "tls" {
  5347  			continue
  5348  		}
  5349  		testServerMaxHeaderListSizeClientIntentionalViolation(t, e)
  5350  	}
  5351  }
  5352  
  5353  func testServerMaxHeaderListSizeClientIntentionalViolation(t *testing.T, e env) {
  5354  	te := newTest(t, e)
  5355  	te.maxServerHeaderListSize = new(uint32)
  5356  	*te.maxServerHeaderListSize = 512
  5357  	te.startServer(&testServer{security: e.security})
  5358  	defer te.tearDown()
  5359  
  5360  	cc, dw := te.clientConnWithConnControl()
  5361  	tc := &testServiceClientWrapper{TestServiceClient: testgrpc.NewTestServiceClient(cc)}
  5362  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  5363  	defer cancel()
  5364  	stream, err := tc.FullDuplexCall(ctx)
  5365  	if err != nil {
  5366  		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want _, <nil>", tc, err)
  5367  	}
  5368  	rcw := dw.getRawConnWrapper()
  5369  	val := make([]string, 512)
  5370  	for i := range val {
  5371  		val[i] = "a"
  5372  	}
  5373  	// allow for client to send the initial header
  5374  	time.Sleep(100 * time.Millisecond)
  5375  	rcw.writeHeaders(http2.HeadersFrameParam{
  5376  		StreamID:      tc.getCurrentStreamID(),
  5377  		BlockFragment: rcw.encodeHeader("oversize", strings.Join(val, "")),
  5378  		EndStream:     false,
  5379  		EndHeaders:    true,
  5380  	})
  5381  	if _, err := stream.Recv(); err == nil || status.Code(err) != codes.Internal {
  5382  		t.Fatalf("stream.Recv() = _, %v, want _, error code: %v", err, codes.Internal)
  5383  	}
  5384  }
  5385  
  5386  func (s) TestClientMaxHeaderListSizeServerIntentionalViolation(t *testing.T) {
  5387  	for _, e := range listTestEnv() {
  5388  		if e.httpHandler || e.security == "tls" {
  5389  			continue
  5390  		}
  5391  		testClientMaxHeaderListSizeServerIntentionalViolation(t, e)
  5392  	}
  5393  }
  5394  
  5395  func testClientMaxHeaderListSizeServerIntentionalViolation(t *testing.T, e env) {
  5396  	te := newTest(t, e)
  5397  	te.maxClientHeaderListSize = new(uint32)
  5398  	*te.maxClientHeaderListSize = 200
  5399  	lw := te.startServerWithConnControl(&testServer{security: e.security, setHeaderOnly: true})
  5400  	defer te.tearDown()
  5401  	cc, _ := te.clientConnWithConnControl()
  5402  	tc := &testServiceClientWrapper{TestServiceClient: testgrpc.NewTestServiceClient(cc)}
  5403  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  5404  	defer cancel()
  5405  	stream, err := tc.FullDuplexCall(ctx)
  5406  	if err != nil {
  5407  		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want _, <nil>", tc, err)
  5408  	}
  5409  	var i int
  5410  	var rcw *rawConnWrapper
  5411  	for i = 0; i < 100; i++ {
  5412  		rcw = lw.getLastConn()
  5413  		if rcw != nil {
  5414  			break
  5415  		}
  5416  		time.Sleep(10 * time.Millisecond)
  5417  		continue
  5418  	}
  5419  	if i == 100 {
  5420  		t.Fatalf("failed to create server transport after 1s")
  5421  	}
  5422  
  5423  	val := make([]string, 200)
  5424  	for i := range val {
  5425  		val[i] = "a"
  5426  	}
  5427  	// allow for client to send the initial header.
  5428  	time.Sleep(100 * time.Millisecond)
  5429  	rcw.writeHeaders(http2.HeadersFrameParam{
  5430  		StreamID:      tc.getCurrentStreamID(),
  5431  		BlockFragment: rcw.encodeRawHeader("oversize", strings.Join(val, "")),
  5432  		EndStream:     false,
  5433  		EndHeaders:    true,
  5434  	})
  5435  	if _, err := stream.Recv(); err == nil || status.Code(err) != codes.Internal {
  5436  		t.Fatalf("stream.Recv() = _, %v, want _, error code: %v", err, codes.Internal)
  5437  	}
  5438  }
  5439  
  5440  func (s) TestNetPipeConn(t *testing.T) {
  5441  	// This test will block indefinitely if grpc writes both client and server
  5442  	// prefaces without either reading from the Conn.
  5443  	pl := testutils.NewPipeListener()
  5444  	s := grpc.NewServer()
  5445  	defer s.Stop()
  5446  	ts := &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
  5447  		return &testpb.SimpleResponse{}, nil
  5448  	}}
  5449  	testgrpc.RegisterTestServiceServer(s, ts)
  5450  	go s.Serve(pl)
  5451  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  5452  	defer cancel()
  5453  	cc, err := grpc.DialContext(ctx, "", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDialer(pl.Dialer()))
  5454  	if err != nil {
  5455  		t.Fatalf("Error creating client: %v", err)
  5456  	}
  5457  	defer cc.Close()
  5458  	client := testgrpc.NewTestServiceClient(cc)
  5459  	if _, err := client.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
  5460  		t.Fatalf("UnaryCall(_) = _, %v; want _, nil", err)
  5461  	}
  5462  }
  5463  
  5464  func (s) TestLargeTimeout(t *testing.T) {
  5465  	for _, e := range listTestEnv() {
  5466  		testLargeTimeout(t, e)
  5467  	}
  5468  }
  5469  
  5470  func testLargeTimeout(t *testing.T, e env) {
  5471  	te := newTest(t, e)
  5472  	te.declareLogNoise("Server.processUnaryRPC failed to write status")
  5473  
  5474  	ts := &funcServer{}
  5475  	te.startServer(ts)
  5476  	defer te.tearDown()
  5477  	tc := testgrpc.NewTestServiceClient(te.clientConn())
  5478  
  5479  	timeouts := []time.Duration{
  5480  		time.Duration(math.MaxInt64), // will be (correctly) converted to
  5481  		// 2562048 hours, which overflows upon converting back to an int64
  5482  		2562047 * time.Hour, // the largest timeout that does not overflow
  5483  	}
  5484  
  5485  	for i, maxTimeout := range timeouts {
  5486  		ts.unaryCall = func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
  5487  			deadline, ok := ctx.Deadline()
  5488  			timeout := time.Until(deadline)
  5489  			minTimeout := maxTimeout - 5*time.Second
  5490  			if !ok || timeout < minTimeout || timeout > maxTimeout {
  5491  				t.Errorf("ctx.Deadline() = (now+%v), %v; want [%v, %v], true", timeout, ok, minTimeout, maxTimeout)
  5492  				return nil, status.Error(codes.OutOfRange, "deadline error")
  5493  			}
  5494  			return &testpb.SimpleResponse{}, nil
  5495  		}
  5496  
  5497  		ctx, cancel := context.WithTimeout(context.Background(), maxTimeout)
  5498  		defer cancel()
  5499  
  5500  		if _, err := tc.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
  5501  			t.Errorf("case %v: UnaryCall(_) = _, %v; want _, nil", i, err)
  5502  		}
  5503  	}
  5504  }
  5505  
  5506  func listenWithNotifyingListener(network, address string, event *grpcsync.Event) (net.Listener, error) {
  5507  	lis, err := net.Listen(network, address)
  5508  	if err != nil {
  5509  		return nil, err
  5510  	}
  5511  	return notifyingListener{connEstablished: event, Listener: lis}, nil
  5512  }
  5513  
  5514  type notifyingListener struct {
  5515  	connEstablished *grpcsync.Event
  5516  	net.Listener
  5517  }
  5518  
  5519  func (lis notifyingListener) Accept() (net.Conn, error) {
  5520  	defer lis.connEstablished.Fire()
  5521  	return lis.Listener.Accept()
  5522  }
  5523  
  5524  func (s) TestRPCWaitsForResolver(t *testing.T) {
  5525  	te := testServiceConfigSetup(t, tcpClearRREnv)
  5526  	te.startServer(&testServer{security: tcpClearRREnv.security})
  5527  	defer te.tearDown()
  5528  	r := manual.NewBuilderWithScheme("whatever")
  5529  
  5530  	te.resolverScheme = r.Scheme()
  5531  	cc := te.clientConn(grpc.WithResolvers(r))
  5532  	tc := testgrpc.NewTestServiceClient(cc)
  5533  
  5534  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
  5535  	defer cancel()
  5536  	// With no resolved addresses yet, this will timeout.
  5537  	if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded {
  5538  		t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
  5539  	}
  5540  
  5541  	ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout)
  5542  	defer cancel()
  5543  	go func() {
  5544  		time.Sleep(time.Second)
  5545  		r.UpdateState(resolver.State{
  5546  			Addresses: []resolver.Address{{Addr: te.srvAddr}},
  5547  			ServiceConfig: parseServiceConfig(t, r, `{
  5548  		    "methodConfig": [
  5549  		        {
  5550  		            "name": [
  5551  		                {
  5552  		                    "service": "grpc.testing.TestService",
  5553  		                    "method": "UnaryCall"
  5554  		                }
  5555  		            ],
  5556                      "maxRequestMessageBytes": 0
  5557  		        }
  5558  		    ]
  5559  		}`)})
  5560  	}()
  5561  	// We wait a second before providing a service config and resolving
  5562  	// addresses.  So this will wait for that and then honor the
  5563  	// maxRequestMessageBytes it contains.
  5564  	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 1)
  5565  	if err != nil {
  5566  		t.Fatal(err)
  5567  	}
  5568  	if _, err := tc.UnaryCall(ctx, &testpb.SimpleRequest{Payload: payload}); status.Code(err) != codes.ResourceExhausted {
  5569  		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, nil", err)
  5570  	}
  5571  	if got := ctx.Err(); got != nil {
  5572  		t.Fatalf("ctx.Err() = %v; want nil (deadline should be set short by service config)", got)
  5573  	}
  5574  	if _, err := tc.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
  5575  		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, nil", err)
  5576  	}
  5577  }
  5578  
  5579  type httpServerResponse struct {
  5580  	headers  [][]string
  5581  	payload  []byte
  5582  	trailers [][]string
  5583  }
  5584  
  5585  type httpServer struct {
  5586  	// If waitForEndStream is set, wait for the client to send a frame with end
  5587  	// stream in it before sending a response/refused stream.
  5588  	waitForEndStream bool
  5589  	refuseStream     func(uint32) bool
  5590  	responses        []httpServerResponse
  5591  }
  5592  
  5593  func (s *httpServer) writeHeader(framer *http2.Framer, sid uint32, headerFields []string, endStream bool) error {
  5594  	if len(headerFields)%2 == 1 {
  5595  		panic("odd number of kv args")
  5596  	}
  5597  
  5598  	var buf bytes.Buffer
  5599  	henc := hpack.NewEncoder(&buf)
  5600  	for len(headerFields) > 0 {
  5601  		k, v := headerFields[0], headerFields[1]
  5602  		headerFields = headerFields[2:]
  5603  		henc.WriteField(hpack.HeaderField{Name: k, Value: v})
  5604  	}
  5605  
  5606  	return framer.WriteHeaders(http2.HeadersFrameParam{
  5607  		StreamID:      sid,
  5608  		BlockFragment: buf.Bytes(),
  5609  		EndStream:     endStream,
  5610  		EndHeaders:    true,
  5611  	})
  5612  }
  5613  
  5614  func (s *httpServer) writePayload(framer *http2.Framer, sid uint32, payload []byte) error {
  5615  	return framer.WriteData(sid, false, payload)
  5616  }
  5617  
  5618  func (s *httpServer) start(t *testing.T, lis net.Listener) {
  5619  	// Launch an HTTP server to send back header.
  5620  	go func() {
  5621  		conn, err := lis.Accept()
  5622  		if err != nil {
  5623  			t.Errorf("Error accepting connection: %v", err)
  5624  			return
  5625  		}
  5626  		defer conn.Close()
  5627  		// Read preface sent by client.
  5628  		if _, err = io.ReadFull(conn, make([]byte, len(http2.ClientPreface))); err != nil {
  5629  			t.Errorf("Error at server-side while reading preface from client. Err: %v", err)
  5630  			return
  5631  		}
  5632  		reader := bufio.NewReader(conn)
  5633  		writer := bufio.NewWriter(conn)
  5634  		framer := http2.NewFramer(writer, reader)
  5635  		if err = framer.WriteSettingsAck(); err != nil {
  5636  			t.Errorf("Error at server-side while sending Settings ack. Err: %v", err)
  5637  			return
  5638  		}
  5639  		writer.Flush() // necessary since client is expecting preface before declaring connection fully setup.
  5640  		var sid uint32
  5641  		// Loop until framer returns possible conn closed errors.
  5642  		for requestNum := 0; ; requestNum = (requestNum + 1) % len(s.responses) {
  5643  			// Read frames until a header is received.
  5644  			for {
  5645  				frame, err := framer.ReadFrame()
  5646  				if err != nil {
  5647  					if !isConnClosedErr(err) {
  5648  						t.Errorf("Error at server-side while reading frame. got: %q, want: rpc error containing substring %q OR %q", err, possibleConnResetMsg, possibleEOFMsg)
  5649  					}
  5650  					return
  5651  				}
  5652  				sid = 0
  5653  				switch fr := frame.(type) {
  5654  				case *http2.HeadersFrame:
  5655  					// Respond after this if we are not waiting for an end
  5656  					// stream or if this frame ends it.
  5657  					if !s.waitForEndStream || fr.StreamEnded() {
  5658  						sid = fr.Header().StreamID
  5659  					}
  5660  
  5661  				case *http2.DataFrame:
  5662  					// Respond after this if we were waiting for an end stream
  5663  					// and this frame ends it.  (If we were not waiting for an
  5664  					// end stream, this stream was already responded to when
  5665  					// the headers were received.)
  5666  					if s.waitForEndStream && fr.StreamEnded() {
  5667  						sid = fr.Header().StreamID
  5668  					}
  5669  				}
  5670  				if sid != 0 {
  5671  					if s.refuseStream == nil || !s.refuseStream(sid) {
  5672  						break
  5673  					}
  5674  					framer.WriteRSTStream(sid, http2.ErrCodeRefusedStream)
  5675  					writer.Flush()
  5676  				}
  5677  			}
  5678  
  5679  			response := s.responses[requestNum]
  5680  			for _, header := range response.headers {
  5681  				if err = s.writeHeader(framer, sid, header, false); err != nil {
  5682  					t.Errorf("Error at server-side while writing headers. Err: %v", err)
  5683  					return
  5684  				}
  5685  				writer.Flush()
  5686  			}
  5687  			if response.payload != nil {
  5688  				if err = s.writePayload(framer, sid, response.payload); err != nil {
  5689  					t.Errorf("Error at server-side while writing payload. Err: %v", err)
  5690  					return
  5691  				}
  5692  				writer.Flush()
  5693  			}
  5694  			for i, trailer := range response.trailers {
  5695  				if err = s.writeHeader(framer, sid, trailer, i == len(response.trailers)-1); err != nil {
  5696  					t.Errorf("Error at server-side while writing trailers. Err: %v", err)
  5697  					return
  5698  				}
  5699  				writer.Flush()
  5700  			}
  5701  		}
  5702  	}()
  5703  }
  5704  
  5705  func (s) TestClientCancellationPropagatesUnary(t *testing.T) {
  5706  	wg := &sync.WaitGroup{}
  5707  	called, done := make(chan struct{}), make(chan struct{})
  5708  	ss := &stubserver.StubServer{
  5709  		EmptyCallF: func(ctx context.Context, _ *testpb.Empty) (*testpb.Empty, error) {
  5710  			close(called)
  5711  			<-ctx.Done()
  5712  			err := ctx.Err()
  5713  			if err != context.Canceled {
  5714  				t.Errorf("ctx.Err() = %v; want context.Canceled", err)
  5715  			}
  5716  			close(done)
  5717  			return nil, err
  5718  		},
  5719  	}
  5720  	if err := ss.Start(nil); err != nil {
  5721  		t.Fatalf("Error starting endpoint server: %v", err)
  5722  	}
  5723  	defer ss.Stop()
  5724  
  5725  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  5726  
  5727  	wg.Add(1)
  5728  	go func() {
  5729  		if _, err := ss.Client.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Canceled {
  5730  			t.Errorf("ss.Client.EmptyCall() = _, %v; want _, Code()=codes.Canceled", err)
  5731  		}
  5732  		wg.Done()
  5733  	}()
  5734  
  5735  	select {
  5736  	case <-called:
  5737  	case <-time.After(5 * time.Second):
  5738  		t.Fatalf("failed to perform EmptyCall after 10s")
  5739  	}
  5740  	cancel()
  5741  	select {
  5742  	case <-done:
  5743  	case <-time.After(5 * time.Second):
  5744  		t.Fatalf("server failed to close done chan due to cancellation propagation")
  5745  	}
  5746  	wg.Wait()
  5747  }
  5748  
  5749  // When an RPC is canceled, it's possible that the last Recv() returns before
  5750  // all call options' after are executed.
  5751  func (s) TestCanceledRPCCallOptionRace(t *testing.T) {
  5752  	ss := &stubserver.StubServer{
  5753  		FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
  5754  			err := stream.Send(&testpb.StreamingOutputCallResponse{})
  5755  			if err != nil {
  5756  				return err
  5757  			}
  5758  			<-stream.Context().Done()
  5759  			return nil
  5760  		},
  5761  	}
  5762  	if err := ss.Start(nil); err != nil {
  5763  		t.Fatalf("Error starting endpoint server: %v", err)
  5764  	}
  5765  	defer ss.Stop()
  5766  
  5767  	const count = 1000
  5768  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  5769  	defer cancel()
  5770  
  5771  	var wg sync.WaitGroup
  5772  	wg.Add(count)
  5773  	for i := 0; i < count; i++ {
  5774  		go func() {
  5775  			defer wg.Done()
  5776  			var p peer.Peer
  5777  			ctx, cancel := context.WithCancel(ctx)
  5778  			defer cancel()
  5779  			stream, err := ss.Client.FullDuplexCall(ctx, grpc.Peer(&p))
  5780  			if err != nil {
  5781  				t.Errorf("_.FullDuplexCall(_) = _, %v", err)
  5782  				return
  5783  			}
  5784  			if err := stream.Send(&testpb.StreamingOutputCallRequest{}); err != nil {
  5785  				t.Errorf("_ has error %v while sending", err)
  5786  				return
  5787  			}
  5788  			if _, err := stream.Recv(); err != nil {
  5789  				t.Errorf("%v.Recv() = %v", stream, err)
  5790  				return
  5791  			}
  5792  			cancel()
  5793  			if _, err := stream.Recv(); status.Code(err) != codes.Canceled {
  5794  				t.Errorf("%v compleled with error %v, want %s", stream, err, codes.Canceled)
  5795  				return
  5796  			}
  5797  			// If recv returns before call options are executed, peer.Addr is not set,
  5798  			// fail the test.
  5799  			if p.Addr == nil {
  5800  				t.Errorf("peer.Addr is nil, want non-nil")
  5801  				return
  5802  			}
  5803  		}()
  5804  	}
  5805  	wg.Wait()
  5806  }
  5807  
  5808  func (s) TestClientSettingsFloodCloseConn(t *testing.T) {
  5809  	// Tests that the server properly closes its transport if the client floods
  5810  	// settings frames and then closes the connection.
  5811  
  5812  	// Minimize buffer sizes to stimulate failure condition more quickly.
  5813  	s := grpc.NewServer(grpc.WriteBufferSize(20))
  5814  	l := bufconn.Listen(20)
  5815  	go s.Serve(l)
  5816  
  5817  	// Dial our server and handshake.
  5818  	conn, err := l.Dial()
  5819  	if err != nil {
  5820  		t.Fatalf("Error dialing bufconn: %v", err)
  5821  	}
  5822  
  5823  	n, err := conn.Write([]byte(http2.ClientPreface))
  5824  	if err != nil || n != len(http2.ClientPreface) {
  5825  		t.Fatalf("Error writing client preface: %v, %v", n, err)
  5826  	}
  5827  
  5828  	fr := http2.NewFramer(conn, conn)
  5829  	f, err := fr.ReadFrame()
  5830  	if err != nil {
  5831  		t.Fatalf("Error reading initial settings frame: %v", err)
  5832  	}
  5833  	if _, ok := f.(*http2.SettingsFrame); ok {
  5834  		if err := fr.WriteSettingsAck(); err != nil {
  5835  			t.Fatalf("Error writing settings ack: %v", err)
  5836  		}
  5837  	} else {
  5838  		t.Fatalf("Error reading initial settings frame: type=%T", f)
  5839  	}
  5840  
  5841  	// Confirm settings can be written, and that an ack is read.
  5842  	if err = fr.WriteSettings(); err != nil {
  5843  		t.Fatalf("Error writing settings frame: %v", err)
  5844  	}
  5845  	if f, err = fr.ReadFrame(); err != nil {
  5846  		t.Fatalf("Error reading frame: %v", err)
  5847  	}
  5848  	if sf, ok := f.(*http2.SettingsFrame); !ok || !sf.IsAck() {
  5849  		t.Fatalf("Unexpected frame: %v", f)
  5850  	}
  5851  
  5852  	// Flood settings frames until a timeout occurs, indiciating the server has
  5853  	// stopped reading from the connection, then close the conn.
  5854  	for {
  5855  		conn.SetWriteDeadline(time.Now().Add(50 * time.Millisecond))
  5856  		if err := fr.WriteSettings(); err != nil {
  5857  			if to, ok := err.(interface{ Timeout() bool }); !ok || !to.Timeout() {
  5858  				t.Fatalf("Received unexpected write error: %v", err)
  5859  			}
  5860  			break
  5861  		}
  5862  	}
  5863  	conn.Close()
  5864  
  5865  	// If the server does not handle this situation correctly, it will never
  5866  	// close the transport.  This is because its loopyWriter.run() will have
  5867  	// exited, and thus not handle the goAway the draining process initiates.
  5868  	// Also, we would see a goroutine leak in this case, as the reader would be
  5869  	// blocked on the controlBuf's throttle() method indefinitely.
  5870  
  5871  	timer := time.AfterFunc(5*time.Second, func() {
  5872  		t.Errorf("Timeout waiting for GracefulStop to return")
  5873  		s.Stop()
  5874  	})
  5875  	s.GracefulStop()
  5876  	timer.Stop()
  5877  }
  5878  
  5879  func unaryInterceptorVerifyConn(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
  5880  	conn := transport.GetConnection(ctx)
  5881  	if conn == nil {
  5882  		return nil, status.Error(codes.NotFound, "connection was not in context")
  5883  	}
  5884  	return nil, status.Error(codes.OK, "")
  5885  }
  5886  
  5887  // TestUnaryServerInterceptorGetsConnection tests whether the accepted conn on
  5888  // the server gets to any unary interceptors on the server side.
  5889  func (s) TestUnaryServerInterceptorGetsConnection(t *testing.T) {
  5890  	ss := &stubserver.StubServer{}
  5891  	if err := ss.Start([]grpc.ServerOption{grpc.UnaryInterceptor(unaryInterceptorVerifyConn)}); err != nil {
  5892  		t.Fatalf("Error starting endpoint server: %v", err)
  5893  	}
  5894  	defer ss.Stop()
  5895  
  5896  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  5897  	defer cancel()
  5898  
  5899  	if _, err := ss.Client.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.OK {
  5900  		t.Fatalf("ss.Client.EmptyCall(_, _) = _, %v, want _, error code %s", err, codes.OK)
  5901  	}
  5902  }
  5903  
  5904  func streamingInterceptorVerifyConn(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
  5905  	conn := transport.GetConnection(ss.Context())
  5906  	if conn == nil {
  5907  		return status.Error(codes.NotFound, "connection was not in context")
  5908  	}
  5909  	return status.Error(codes.OK, "")
  5910  }
  5911  
  5912  // TestStreamingServerInterceptorGetsConnection tests whether the accepted conn on
  5913  // the server gets to any streaming interceptors on the server side.
  5914  func (s) TestStreamingServerInterceptorGetsConnection(t *testing.T) {
  5915  	ss := &stubserver.StubServer{}
  5916  	if err := ss.Start([]grpc.ServerOption{grpc.StreamInterceptor(streamingInterceptorVerifyConn)}); err != nil {
  5917  		t.Fatalf("Error starting endpoint server: %v", err)
  5918  	}
  5919  	defer ss.Stop()
  5920  
  5921  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  5922  	defer cancel()
  5923  
  5924  	s, err := ss.Client.StreamingOutputCall(ctx, &testpb.StreamingOutputCallRequest{})
  5925  	if err != nil {
  5926  		t.Fatalf("ss.Client.StreamingOutputCall(_) = _, %v, want _, <nil>", err)
  5927  	}
  5928  	if _, err := s.Recv(); err != io.EOF {
  5929  		t.Fatalf("ss.Client.StreamingInputCall(_) = _, %v, want _, %v", err, io.EOF)
  5930  	}
  5931  }
  5932  
  5933  // unaryInterceptorVerifyAuthority verifies there is an unambiguous :authority
  5934  // once the request gets to an interceptor. An unambiguous :authority is defined
  5935  // as at most a single :authority header, and no host header according to A41.
  5936  func unaryInterceptorVerifyAuthority(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
  5937  	md, ok := metadata.FromIncomingContext(ctx)
  5938  	if !ok {
  5939  		return nil, status.Error(codes.NotFound, "metadata was not in context")
  5940  	}
  5941  	authority := md.Get(":authority")
  5942  	if len(authority) > 1 { // Should be an unambiguous authority by the time it gets to interceptor.
  5943  		return nil, status.Error(codes.NotFound, ":authority value had more than one value")
  5944  	}
  5945  	// Host header shouldn't be present by the time it gets to the interceptor
  5946  	// level (should either be renamed to :authority or explicitly deleted).
  5947  	host := md.Get("host")
  5948  	if len(host) != 0 {
  5949  		return nil, status.Error(codes.NotFound, "host header should not be present in metadata")
  5950  	}
  5951  	// Pass back the authority for verification on client - NotFound so
  5952  	// grpc-message will be available to read for verification.
  5953  	if len(authority) == 0 {
  5954  		// Represent no :authority header present with an empty string.
  5955  		return nil, status.Error(codes.NotFound, "")
  5956  	}
  5957  	return nil, status.Error(codes.NotFound, authority[0])
  5958  }
  5959  
  5960  // TestAuthorityHeader tests that the eventual :authority that reaches the grpc
  5961  // layer is unambiguous due to logic added in A41.
  5962  func (s) TestAuthorityHeader(t *testing.T) {
  5963  	tests := []struct {
  5964  		name          string
  5965  		headers       []string
  5966  		wantAuthority string
  5967  	}{
  5968  		// "If :authority is missing, Host must be renamed to :authority." - A41
  5969  		{
  5970  			name: "Missing :authority",
  5971  			// Codepath triggered by incoming headers with no authority but with
  5972  			// a host.
  5973  			headers: []string{
  5974  				":method", "POST",
  5975  				":path", "/grpc.testing.TestService/UnaryCall",
  5976  				"content-type", "application/grpc",
  5977  				"te", "trailers",
  5978  				"host", "localhost",
  5979  			},
  5980  			wantAuthority: "localhost",
  5981  		},
  5982  		{
  5983  			name: "Missing :authority and host",
  5984  			// Codepath triggered by incoming headers with no :authority and no
  5985  			// host.
  5986  			headers: []string{
  5987  				":method", "POST",
  5988  				":path", "/grpc.testing.TestService/UnaryCall",
  5989  				"content-type", "application/grpc",
  5990  				"te", "trailers",
  5991  			},
  5992  			wantAuthority: "",
  5993  		},
  5994  		// "If :authority is present, Host must be discarded." - A41
  5995  		{
  5996  			name: ":authority and host present",
  5997  			// Codepath triggered by incoming headers with both an authority
  5998  			// header and a host header.
  5999  			headers: []string{
  6000  				":method", "POST",
  6001  				":path", "/grpc.testing.TestService/UnaryCall",
  6002  				":authority", "localhost",
  6003  				"content-type", "application/grpc",
  6004  				"host", "localhost2",
  6005  			},
  6006  			wantAuthority: "localhost",
  6007  		},
  6008  	}
  6009  	for _, test := range tests {
  6010  		t.Run(test.name, func(t *testing.T) {
  6011  			te := newTest(t, tcpClearRREnv)
  6012  			ts := &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
  6013  				return &testpb.SimpleResponse{}, nil
  6014  			}}
  6015  			te.unaryServerInt = unaryInterceptorVerifyAuthority
  6016  			te.startServer(ts)
  6017  			defer te.tearDown()
  6018  			success := testutils.NewChannel()
  6019  			te.withServerTester(func(st *serverTester) {
  6020  				st.writeHeaders(http2.HeadersFrameParam{
  6021  					StreamID:      1,
  6022  					BlockFragment: st.encodeHeader(test.headers...),
  6023  					EndStream:     false,
  6024  					EndHeaders:    true,
  6025  				})
  6026  				st.writeData(1, true, []byte{0, 0, 0, 0, 0})
  6027  
  6028  				for {
  6029  					frame := st.wantAnyFrame()
  6030  					f, ok := frame.(*http2.MetaHeadersFrame)
  6031  					if !ok {
  6032  						continue
  6033  					}
  6034  					for _, header := range f.Fields {
  6035  						if header.Name == "grpc-message" {
  6036  							success.Send(header.Value)
  6037  							return
  6038  						}
  6039  					}
  6040  				}
  6041  			})
  6042  
  6043  			ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  6044  			defer cancel()
  6045  			gotAuthority, err := success.Receive(ctx)
  6046  			if err != nil {
  6047  				t.Fatalf("Error receiving from channel: %v", err)
  6048  			}
  6049  			if gotAuthority != test.wantAuthority {
  6050  				t.Fatalf("gotAuthority: %v, wantAuthority %v", gotAuthority, test.wantAuthority)
  6051  			}
  6052  		})
  6053  	}
  6054  }
  6055  
  6056  // wrapCloseListener tracks Accepts/Closes and maintains a counter of the
  6057  // number of open connections.
  6058  type wrapCloseListener struct {
  6059  	net.Listener
  6060  	connsOpen int32
  6061  }
  6062  
  6063  // wrapCloseListener is returned by wrapCloseListener.Accept and decrements its
  6064  // connsOpen when Close is called.
  6065  type wrapCloseConn struct {
  6066  	net.Conn
  6067  	lis       *wrapCloseListener
  6068  	closeOnce sync.Once
  6069  }
  6070  
  6071  func (w *wrapCloseListener) Accept() (net.Conn, error) {
  6072  	conn, err := w.Listener.Accept()
  6073  	if err != nil {
  6074  		return nil, err
  6075  	}
  6076  	atomic.AddInt32(&w.connsOpen, 1)
  6077  	return &wrapCloseConn{Conn: conn, lis: w}, nil
  6078  }
  6079  
  6080  func (w *wrapCloseConn) Close() error {
  6081  	defer w.closeOnce.Do(func() { atomic.AddInt32(&w.lis.connsOpen, -1) })
  6082  	return w.Conn.Close()
  6083  }
  6084  
  6085  // TestServerClosesConn ensures conn.Close is always closed even if the client
  6086  // doesn't complete the HTTP/2 handshake.
  6087  func (s) TestServerClosesConn(t *testing.T) {
  6088  	lis := bufconn.Listen(20)
  6089  	wrapLis := &wrapCloseListener{Listener: lis}
  6090  
  6091  	s := grpc.NewServer()
  6092  	go s.Serve(wrapLis)
  6093  	defer s.Stop()
  6094  
  6095  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  6096  	defer cancel()
  6097  
  6098  	for i := 0; i < 10; i++ {
  6099  		conn, err := lis.DialContext(ctx)
  6100  		if err != nil {
  6101  			t.Fatalf("Dial = _, %v; want _, nil", err)
  6102  		}
  6103  		conn.Close()
  6104  	}
  6105  	for ctx.Err() == nil {
  6106  		if atomic.LoadInt32(&wrapLis.connsOpen) == 0 {
  6107  			return
  6108  		}
  6109  		time.Sleep(50 * time.Millisecond)
  6110  	}
  6111  	t.Fatalf("timed out waiting for conns to be closed by server; still open: %v", atomic.LoadInt32(&wrapLis.connsOpen))
  6112  }
  6113  
  6114  // TestNilStatsHandler ensures we do not panic as a result of a nil stats
  6115  // handler.
  6116  func (s) TestNilStatsHandler(t *testing.T) {
  6117  	grpctest.TLogger.ExpectErrorN("ignoring nil parameter", 2)
  6118  	ss := &stubserver.StubServer{
  6119  		UnaryCallF: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
  6120  			return &testpb.SimpleResponse{}, nil
  6121  		},
  6122  	}
  6123  	if err := ss.Start([]grpc.ServerOption{grpc.StatsHandler(nil)}, grpc.WithStatsHandler(nil)); err != nil {
  6124  		t.Fatalf("Error starting endpoint server: %v", err)
  6125  	}
  6126  	defer ss.Stop()
  6127  
  6128  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  6129  	defer cancel()
  6130  	if _, err := ss.Client.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
  6131  		t.Fatalf("Unexpected error from UnaryCall: %v", err)
  6132  	}
  6133  }
  6134  
  6135  // TestUnexpectedEOF tests a scenario where a client invokes two unary RPC
  6136  // calls. The first call receives a payload which exceeds max grpc receive
  6137  // message length, and the second gets a large response. This second RPC should
  6138  // not fail with unexpected.EOF.
  6139  func (s) TestUnexpectedEOF(t *testing.T) {
  6140  	ss := &stubserver.StubServer{
  6141  		UnaryCallF: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
  6142  			return &testpb.SimpleResponse{
  6143  				Payload: &testpb.Payload{
  6144  					Body: bytes.Repeat([]byte("a"), int(in.ResponseSize)),
  6145  				},
  6146  			}, nil
  6147  		},
  6148  	}
  6149  	if err := ss.Start([]grpc.ServerOption{}); err != nil {
  6150  		t.Fatalf("Error starting endpoint server: %v", err)
  6151  	}
  6152  	defer ss.Stop()
  6153  
  6154  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  6155  	defer cancel()
  6156  	for i := 0; i < 10; i++ {
  6157  		// exceeds grpc.DefaultMaxRecvMessageSize, this should error with
  6158  		// RESOURCE_EXHAUSTED error.
  6159  		_, err := ss.Client.UnaryCall(ctx, &testpb.SimpleRequest{ResponseSize: 4194304})
  6160  		if code := status.Code(err); code != codes.ResourceExhausted {
  6161  			t.Fatalf("UnaryCall RPC returned error: %v, want status code %v", err, codes.ResourceExhausted)
  6162  		}
  6163  		// Larger response that doesn't exceed DefaultMaxRecvMessageSize, this
  6164  		// should work normally.
  6165  		if _, err := ss.Client.UnaryCall(ctx, &testpb.SimpleRequest{ResponseSize: 275075}); err != nil {
  6166  			t.Fatalf("UnaryCall RPC failed: %v", err)
  6167  		}
  6168  	}
  6169  }
  6170  
  6171  // TestRecvWhileReturningStatus performs a Recv in a service handler while the
  6172  // handler returns its status.  A race condition could result in the server
  6173  // sending the first headers frame without the HTTP :status header.  This can
  6174  // happen when the failed Recv (due to the handler returning) and the handler's
  6175  // status both attempt to write the status, which would be the first headers
  6176  // frame sent, simultaneously.
  6177  func (s) TestRecvWhileReturningStatus(t *testing.T) {
  6178  	ss := &stubserver.StubServer{
  6179  		FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
  6180  			// The client never sends, so this Recv blocks until the server
  6181  			// returns and causes stream operations to return errors.
  6182  			go stream.Recv()
  6183  			return nil
  6184  		},
  6185  	}
  6186  	if err := ss.Start(nil); err != nil {
  6187  		t.Fatalf("Error starting endpoint server: %v", err)
  6188  	}
  6189  	defer ss.Stop()
  6190  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  6191  	defer cancel()
  6192  	for i := 0; i < 100; i++ {
  6193  		stream, err := ss.Client.FullDuplexCall(ctx)
  6194  		if err != nil {
  6195  			t.Fatalf("Error while creating stream: %v", err)
  6196  		}
  6197  		if _, err := stream.Recv(); err != io.EOF {
  6198  			t.Fatalf("stream.Recv() = %v, want io.EOF", err)
  6199  		}
  6200  	}
  6201  }
  6202  
  6203  type mockBinaryLogger struct {
  6204  	mml *mockMethodLogger
  6205  }
  6206  
  6207  func newMockBinaryLogger() *mockBinaryLogger {
  6208  	return &mockBinaryLogger{
  6209  		mml: &mockMethodLogger{},
  6210  	}
  6211  }
  6212  
  6213  func (mbl *mockBinaryLogger) GetMethodLogger(string) binarylog.MethodLogger {
  6214  	return mbl.mml
  6215  }
  6216  
  6217  type mockMethodLogger struct {
  6218  	events uint64
  6219  }
  6220  
  6221  func (mml *mockMethodLogger) Log(context.Context, binarylog.LogEntryConfig) {
  6222  	atomic.AddUint64(&mml.events, 1)
  6223  }
  6224  
  6225  // TestGlobalBinaryLoggingOptions tests the binary logging options for client
  6226  // and server side. The test configures a binary logger to be plumbed into every
  6227  // created ClientConn and server. It then makes a unary RPC call, and a
  6228  // streaming RPC call. A certain amount of logging calls should happen as a
  6229  // result of the stream operations on each of these calls.
  6230  func (s) TestGlobalBinaryLoggingOptions(t *testing.T) {
  6231  	csbl := newMockBinaryLogger()
  6232  	ssbl := newMockBinaryLogger()
  6233  
  6234  	internal.AddGlobalDialOptions.(func(opt ...grpc.DialOption))(internal.WithBinaryLogger.(func(bl binarylog.Logger) grpc.DialOption)(csbl))
  6235  	internal.AddGlobalServerOptions.(func(opt ...grpc.ServerOption))(internal.BinaryLogger.(func(bl binarylog.Logger) grpc.ServerOption)(ssbl))
  6236  	defer func() {
  6237  		internal.ClearGlobalDialOptions()
  6238  		internal.ClearGlobalServerOptions()
  6239  	}()
  6240  	ss := &stubserver.StubServer{
  6241  		UnaryCallF: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
  6242  			return &testpb.SimpleResponse{}, nil
  6243  		},
  6244  		FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
  6245  			_, err := stream.Recv()
  6246  			if err == io.EOF {
  6247  				return nil
  6248  			}
  6249  			return status.Errorf(codes.Unknown, "expected client to call CloseSend")
  6250  		},
  6251  	}
  6252  
  6253  	// No client or server options specified, because should pick up configured
  6254  	// global options.
  6255  	if err := ss.Start(nil); err != nil {
  6256  		t.Fatalf("Error starting endpoint server: %v", err)
  6257  	}
  6258  	defer ss.Stop()
  6259  
  6260  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  6261  	defer cancel()
  6262  	// Make a Unary RPC. This should cause Log calls on the MethodLogger.
  6263  	if _, err := ss.Client.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
  6264  		t.Fatalf("Unexpected error from UnaryCall: %v", err)
  6265  	}
  6266  	if csbl.mml.events != 5 {
  6267  		t.Fatalf("want 5 client side binary logging events, got %v", csbl.mml.events)
  6268  	}
  6269  	if ssbl.mml.events != 5 {
  6270  		t.Fatalf("want 5 server side binary logging events, got %v", ssbl.mml.events)
  6271  	}
  6272  
  6273  	// Make a streaming RPC. This should cause Log calls on the MethodLogger.
  6274  	stream, err := ss.Client.FullDuplexCall(ctx)
  6275  	if err != nil {
  6276  		t.Fatalf("ss.Client.FullDuplexCall failed: %f", err)
  6277  	}
  6278  
  6279  	stream.CloseSend()
  6280  	if _, err = stream.Recv(); err != io.EOF {
  6281  		t.Fatalf("unexpected error: %v, expected an EOF error", err)
  6282  	}
  6283  
  6284  	if csbl.mml.events != 8 {
  6285  		t.Fatalf("want 8 client side binary logging events, got %v", csbl.mml.events)
  6286  	}
  6287  	if ssbl.mml.events != 8 {
  6288  		t.Fatalf("want 8 server side binary logging events, got %v", ssbl.mml.events)
  6289  	}
  6290  }
  6291  
  6292  type statsHandlerRecordEvents struct {
  6293  	mu sync.Mutex
  6294  	s  []stats.RPCStats
  6295  }
  6296  
  6297  func (*statsHandlerRecordEvents) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) context.Context {
  6298  	return ctx
  6299  }
  6300  func (h *statsHandlerRecordEvents) HandleRPC(_ context.Context, s stats.RPCStats) {
  6301  	h.mu.Lock()
  6302  	defer h.mu.Unlock()
  6303  	h.s = append(h.s, s)
  6304  }
  6305  func (*statsHandlerRecordEvents) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context {
  6306  	return ctx
  6307  }
  6308  func (*statsHandlerRecordEvents) HandleConn(context.Context, stats.ConnStats) {}
  6309  
  6310  type triggerRPCBlockPicker struct {
  6311  	pickDone func()
  6312  }
  6313  
  6314  func (bp *triggerRPCBlockPicker) Pick(pi balancer.PickInfo) (balancer.PickResult, error) {
  6315  	bp.pickDone()
  6316  	return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
  6317  }
  6318  
  6319  const name = "triggerRPCBlockBalancer"
  6320  
  6321  type triggerRPCBlockPickerBalancerBuilder struct{}
  6322  
  6323  func (triggerRPCBlockPickerBalancerBuilder) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
  6324  	b := &triggerRPCBlockBalancer{
  6325  		blockingPickerDone: grpcsync.NewEvent(),
  6326  		ClientConn:         cc,
  6327  	}
  6328  	// round_robin child to complete balancer tree with a usable leaf policy and
  6329  	// have RPCs actually work.
  6330  	builder := balancer.Get(roundrobin.Name)
  6331  	rr := builder.Build(b, bOpts)
  6332  	if rr == nil {
  6333  		panic("round robin builder returned nil")
  6334  	}
  6335  	b.Balancer = rr
  6336  	return b
  6337  }
  6338  
  6339  func (triggerRPCBlockPickerBalancerBuilder) ParseConfig(json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
  6340  	return &bpbConfig{}, nil
  6341  }
  6342  
  6343  func (triggerRPCBlockPickerBalancerBuilder) Name() string {
  6344  	return name
  6345  }
  6346  
  6347  type bpbConfig struct {
  6348  	serviceconfig.LoadBalancingConfig
  6349  }
  6350  
  6351  // triggerRPCBlockBalancer uses a child RR balancer, but blocks all UpdateState
  6352  // calls until the first Pick call. That first Pick returns
  6353  // ErrNoSubConnAvailable to make the RPC block and trigger the appropriate stats
  6354  // handler callout. After the first Pick call, it will forward at least one
  6355  // READY picker update from the child, causing RPCs to proceed as normal using a
  6356  // round robin balancer's picker if it updates with a READY picker.
  6357  type triggerRPCBlockBalancer struct {
  6358  	stateMu    sync.Mutex
  6359  	childState balancer.State
  6360  
  6361  	blockingPickerDone *grpcsync.Event
  6362  	// embed a ClientConn to wrap only UpdateState() operation
  6363  	balancer.ClientConn
  6364  	// embed a Balancer to wrap only UpdateClientConnState() operation
  6365  	balancer.Balancer
  6366  }
  6367  
  6368  func (bpb *triggerRPCBlockBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
  6369  	err := bpb.Balancer.UpdateClientConnState(s)
  6370  	bpb.ClientConn.UpdateState(balancer.State{
  6371  		ConnectivityState: connectivity.Connecting,
  6372  		Picker: &triggerRPCBlockPicker{
  6373  			pickDone: func() {
  6374  				bpb.stateMu.Lock()
  6375  				defer bpb.stateMu.Unlock()
  6376  				bpb.blockingPickerDone.Fire()
  6377  				if bpb.childState.ConnectivityState == connectivity.Ready {
  6378  					bpb.ClientConn.UpdateState(bpb.childState)
  6379  				}
  6380  			},
  6381  		},
  6382  	})
  6383  	return err
  6384  }
  6385  
  6386  func (bpb *triggerRPCBlockBalancer) UpdateState(state balancer.State) {
  6387  	bpb.stateMu.Lock()
  6388  	defer bpb.stateMu.Unlock()
  6389  	bpb.childState = state
  6390  	if bpb.blockingPickerDone.HasFired() { // guard first one to get a picker sending ErrNoSubConnAvailable first
  6391  		if state.ConnectivityState == connectivity.Ready {
  6392  			bpb.ClientConn.UpdateState(state) // after the first rr picker update, only forward once READY for deterministic picker counts
  6393  		}
  6394  	}
  6395  }
  6396  
  6397  // TestRPCBlockingOnPickerStatsCall tests the emission of a stats handler call
  6398  // that represents the RPC had to block waiting for a new picker due to
  6399  // ErrNoSubConnAvailable being returned from the first picker call.
  6400  func (s) TestRPCBlockingOnPickerStatsCall(t *testing.T) {
  6401  	sh := &statsHandlerRecordEvents{}
  6402  	ss := &stubserver.StubServer{
  6403  		UnaryCallF: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
  6404  			return &testpb.SimpleResponse{}, nil
  6405  		},
  6406  	}
  6407  
  6408  	if err := ss.StartServer(); err != nil {
  6409  		t.Fatalf("Error starting endpoint server: %v", err)
  6410  	}
  6411  	defer ss.Stop()
  6412  
  6413  	lbCfgJSON := `{
  6414    		"loadBalancingConfig": [
  6415      		{
  6416        			"triggerRPCBlockBalancer": {}
  6417      		}
  6418  		]
  6419  	}`
  6420  
  6421  	sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(lbCfgJSON)
  6422  	mr := manual.NewBuilderWithScheme("pickerupdatedbalancer")
  6423  	defer mr.Close()
  6424  	mr.InitialState(resolver.State{
  6425  		Addresses: []resolver.Address{
  6426  			{Addr: ss.Address},
  6427  		},
  6428  		ServiceConfig: sc,
  6429  	})
  6430  
  6431  	cc, err := grpc.Dial(mr.Scheme()+":///", grpc.WithResolvers(mr), grpc.WithStatsHandler(sh), grpc.WithTransportCredentials(insecure.NewCredentials()))
  6432  	if err != nil {
  6433  		t.Fatalf("grpc.Dial() failed: %v", err)
  6434  	}
  6435  	defer cc.Close()
  6436  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  6437  	defer cancel()
  6438  	testServiceClient := testgrpc.NewTestServiceClient(cc)
  6439  	if _, err := testServiceClient.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
  6440  		t.Fatalf("Unexpected error from UnaryCall: %v", err)
  6441  	}
  6442  
  6443  	var pickerUpdatedCount uint
  6444  	for _, stat := range sh.s {
  6445  		if _, ok := stat.(*stats.PickerUpdated); ok {
  6446  			pickerUpdatedCount++
  6447  		}
  6448  	}
  6449  	if pickerUpdatedCount != 1 {
  6450  		t.Fatalf("sh.pickerUpdated count: %v, want: %v", pickerUpdatedCount, 2)
  6451  	}
  6452  }
  6453  

View as plain text