1
18
19 package test
20
21 import (
22 "bufio"
23 "bytes"
24 "context"
25 "crypto/tls"
26 "encoding/json"
27 "errors"
28 "flag"
29 "fmt"
30 "io"
31 "math"
32 "net"
33 "net/http"
34 "os"
35 "reflect"
36 "runtime"
37 "strings"
38 "sync"
39 "sync/atomic"
40 "syscall"
41 "testing"
42 "time"
43
44 "golang.org/x/net/http2"
45 "golang.org/x/net/http2/hpack"
46 "google.golang.org/grpc"
47 "google.golang.org/grpc/balancer"
48 "google.golang.org/grpc/balancer/roundrobin"
49 "google.golang.org/grpc/codes"
50 "google.golang.org/grpc/connectivity"
51 "google.golang.org/grpc/credentials"
52 "google.golang.org/grpc/credentials/insecure"
53 "google.golang.org/grpc/health"
54 "google.golang.org/grpc/internal"
55 "google.golang.org/grpc/internal/binarylog"
56 "google.golang.org/grpc/internal/channelz"
57 "google.golang.org/grpc/internal/grpcsync"
58 "google.golang.org/grpc/internal/grpctest"
59 "google.golang.org/grpc/internal/stubserver"
60 "google.golang.org/grpc/internal/testutils"
61 "google.golang.org/grpc/internal/transport"
62 "google.golang.org/grpc/metadata"
63 "google.golang.org/grpc/peer"
64 "google.golang.org/grpc/resolver"
65 "google.golang.org/grpc/resolver/manual"
66 "google.golang.org/grpc/serviceconfig"
67 "google.golang.org/grpc/stats"
68 "google.golang.org/grpc/status"
69 "google.golang.org/grpc/tap"
70 "google.golang.org/grpc/test/bufconn"
71 "google.golang.org/grpc/testdata"
72
73 spb "google.golang.org/genproto/googleapis/rpc/status"
74 healthgrpc "google.golang.org/grpc/health/grpc_health_v1"
75 healthpb "google.golang.org/grpc/health/grpc_health_v1"
76 testgrpc "google.golang.org/grpc/interop/grpc_testing"
77 testpb "google.golang.org/grpc/interop/grpc_testing"
78 "google.golang.org/protobuf/proto"
79 "google.golang.org/protobuf/types/known/anypb"
80
81 _ "google.golang.org/grpc/encoding/gzip"
82 )
83
84 const defaultHealthService = "grpc.health.v1.Health"
85
86 func init() {
87 channelz.TurnOn()
88 balancer.Register(triggerRPCBlockPickerBalancerBuilder{})
89 }
90
91 type s struct {
92 grpctest.Tester
93 }
94
95 func Test(t *testing.T) {
96 grpctest.RunSubTests(t, s{})
97 }
98
99 var (
100
101 testMetadata = metadata.MD{
102 "key1": []string{"value1"},
103 "key2": []string{"value2"},
104 "key3-bin": []string{"binvalue1", string([]byte{1, 2, 3})},
105 }
106 testMetadata2 = metadata.MD{
107 "key1": []string{"value12"},
108 "key2": []string{"value22"},
109 }
110
111 testTrailerMetadata = metadata.MD{
112 "tkey1": []string{"trailerValue1"},
113 "tkey2": []string{"trailerValue2"},
114 "tkey3-bin": []string{"trailerbinvalue1", string([]byte{3, 2, 1})},
115 }
116 testTrailerMetadata2 = metadata.MD{
117 "tkey1": []string{"trailerValue12"},
118 "tkey2": []string{"trailerValue22"},
119 }
120
121 malformedHTTP2Metadata = metadata.MD{
122 "Key": []string{"foo"},
123 }
124 testAppUA = "myApp1/1.0 myApp2/0.9"
125 failAppUA = "fail-this-RPC"
126 detailedError = status.ErrorProto(&spb.Status{
127 Code: int32(codes.DataLoss),
128 Message: "error for testing: " + failAppUA,
129 Details: []*anypb.Any{{
130 TypeUrl: "url",
131 Value: []byte{6, 0, 0, 6, 1, 3},
132 }},
133 })
134 )
135
136 var raceMode bool
137
138 type testServer struct {
139 testgrpc.UnimplementedTestServiceServer
140
141 security string
142 earlyFail bool
143 setAndSendHeader bool
144 setHeaderOnly bool
145 multipleSetTrailer bool
146 unaryCallSleepTime time.Duration
147 }
148
149 func (s *testServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
150 if md, ok := metadata.FromIncomingContext(ctx); ok {
151
152
153 if ua, ok := md["user-agent"]; !ok || strings.HasPrefix(ua[0], failAppUA) {
154 return nil, detailedError
155 }
156 var str []string
157 for _, entry := range md["user-agent"] {
158 str = append(str, "ua", entry)
159 }
160 grpc.SendHeader(ctx, metadata.Pairs(str...))
161 }
162 return new(testpb.Empty), nil
163 }
164
165 func newPayload(t testpb.PayloadType, size int32) (*testpb.Payload, error) {
166 if size < 0 {
167 return nil, fmt.Errorf("requested a response with invalid length %d", size)
168 }
169 body := make([]byte, size)
170 switch t {
171 case testpb.PayloadType_COMPRESSABLE:
172 default:
173 return nil, fmt.Errorf("unsupported payload type: %d", t)
174 }
175 return &testpb.Payload{
176 Type: t,
177 Body: body,
178 }, nil
179 }
180
181 func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
182 md, ok := metadata.FromIncomingContext(ctx)
183 if ok {
184 if _, exists := md[":authority"]; !exists {
185 return nil, status.Errorf(codes.DataLoss, "expected an :authority metadata: %v", md)
186 }
187 if s.setAndSendHeader {
188 if err := grpc.SetHeader(ctx, md); err != nil {
189 return nil, status.Errorf(status.Code(err), "grpc.SetHeader(_, %v) = %v, want <nil>", md, err)
190 }
191 if err := grpc.SendHeader(ctx, testMetadata2); err != nil {
192 return nil, status.Errorf(status.Code(err), "grpc.SendHeader(_, %v) = %v, want <nil>", testMetadata2, err)
193 }
194 } else if s.setHeaderOnly {
195 if err := grpc.SetHeader(ctx, md); err != nil {
196 return nil, status.Errorf(status.Code(err), "grpc.SetHeader(_, %v) = %v, want <nil>", md, err)
197 }
198 if err := grpc.SetHeader(ctx, testMetadata2); err != nil {
199 return nil, status.Errorf(status.Code(err), "grpc.SetHeader(_, %v) = %v, want <nil>", testMetadata2, err)
200 }
201 } else {
202 if err := grpc.SendHeader(ctx, md); err != nil {
203 return nil, status.Errorf(status.Code(err), "grpc.SendHeader(_, %v) = %v, want <nil>", md, err)
204 }
205 }
206 if err := grpc.SetTrailer(ctx, testTrailerMetadata); err != nil {
207 return nil, status.Errorf(status.Code(err), "grpc.SetTrailer(_, %v) = %v, want <nil>", testTrailerMetadata, err)
208 }
209 if s.multipleSetTrailer {
210 if err := grpc.SetTrailer(ctx, testTrailerMetadata2); err != nil {
211 return nil, status.Errorf(status.Code(err), "grpc.SetTrailer(_, %v) = %v, want <nil>", testTrailerMetadata2, err)
212 }
213 }
214 }
215 pr, ok := peer.FromContext(ctx)
216 if !ok {
217 return nil, status.Error(codes.DataLoss, "failed to get peer from ctx")
218 }
219 if pr.Addr == net.Addr(nil) {
220 return nil, status.Error(codes.DataLoss, "failed to get peer address")
221 }
222 if s.security != "" {
223
224 var authType, serverName string
225 switch info := pr.AuthInfo.(type) {
226 case credentials.TLSInfo:
227 authType = info.AuthType()
228 serverName = info.State.ServerName
229 default:
230 return nil, status.Error(codes.Unauthenticated, "Unknown AuthInfo type")
231 }
232 if authType != s.security {
233 return nil, status.Errorf(codes.Unauthenticated, "Wrong auth type: got %q, want %q", authType, s.security)
234 }
235 if serverName != "x.test.example.com" {
236 return nil, status.Errorf(codes.Unauthenticated, "Unknown server name %q", serverName)
237 }
238 }
239
240 time.Sleep(s.unaryCallSleepTime)
241
242 payload, err := newPayload(in.GetResponseType(), in.GetResponseSize())
243 if err != nil {
244 return nil, err
245 }
246
247 return &testpb.SimpleResponse{
248 Payload: payload,
249 }, nil
250 }
251
252 func (s *testServer) StreamingOutputCall(args *testpb.StreamingOutputCallRequest, stream testgrpc.TestService_StreamingOutputCallServer) error {
253 if md, ok := metadata.FromIncomingContext(stream.Context()); ok {
254 if _, exists := md[":authority"]; !exists {
255 return status.Errorf(codes.DataLoss, "expected an :authority metadata: %v", md)
256 }
257
258
259 if ua, ok := md["user-agent"]; !ok || strings.HasPrefix(ua[0], failAppUA) {
260 return status.Error(codes.DataLoss, "error for testing: "+failAppUA)
261 }
262 }
263 cs := args.GetResponseParameters()
264 for _, c := range cs {
265 if us := c.GetIntervalUs(); us > 0 {
266 time.Sleep(time.Duration(us) * time.Microsecond)
267 }
268
269 payload, err := newPayload(args.GetResponseType(), c.GetSize())
270 if err != nil {
271 return err
272 }
273
274 if err := stream.Send(&testpb.StreamingOutputCallResponse{
275 Payload: payload,
276 }); err != nil {
277 return err
278 }
279 }
280 return nil
281 }
282
283 func (s *testServer) StreamingInputCall(stream testgrpc.TestService_StreamingInputCallServer) error {
284 var sum int
285 for {
286 in, err := stream.Recv()
287 if err == io.EOF {
288 return stream.SendAndClose(&testpb.StreamingInputCallResponse{
289 AggregatedPayloadSize: int32(sum),
290 })
291 }
292 if err != nil {
293 return err
294 }
295 p := in.GetPayload().GetBody()
296 sum += len(p)
297 if s.earlyFail {
298 return status.Error(codes.NotFound, "not found")
299 }
300 }
301 }
302
303 func (s *testServer) FullDuplexCall(stream testgrpc.TestService_FullDuplexCallServer) error {
304 md, ok := metadata.FromIncomingContext(stream.Context())
305 if ok {
306 if s.setAndSendHeader {
307 if err := stream.SetHeader(md); err != nil {
308 return status.Errorf(status.Code(err), "%v.SetHeader(_, %v) = %v, want <nil>", stream, md, err)
309 }
310 if err := stream.SendHeader(testMetadata2); err != nil {
311 return status.Errorf(status.Code(err), "%v.SendHeader(_, %v) = %v, want <nil>", stream, testMetadata2, err)
312 }
313 } else if s.setHeaderOnly {
314 if err := stream.SetHeader(md); err != nil {
315 return status.Errorf(status.Code(err), "%v.SetHeader(_, %v) = %v, want <nil>", stream, md, err)
316 }
317 if err := stream.SetHeader(testMetadata2); err != nil {
318 return status.Errorf(status.Code(err), "%v.SetHeader(_, %v) = %v, want <nil>", stream, testMetadata2, err)
319 }
320 } else {
321 if err := stream.SendHeader(md); err != nil {
322 return status.Errorf(status.Code(err), "%v.SendHeader(%v) = %v, want %v", stream, md, err, nil)
323 }
324 }
325 stream.SetTrailer(testTrailerMetadata)
326 if s.multipleSetTrailer {
327 stream.SetTrailer(testTrailerMetadata2)
328 }
329 }
330 for {
331 in, err := stream.Recv()
332 if err == io.EOF {
333
334 return nil
335 }
336 if err != nil {
337
338 if status.Code(err) == codes.ResourceExhausted {
339 return status.Errorf(codes.Internal, "fake error for test testSvrWriteStatusEarlyWrite. true error: %s", err.Error())
340 }
341 return err
342 }
343 cs := in.GetResponseParameters()
344 for _, c := range cs {
345 if us := c.GetIntervalUs(); us > 0 {
346 time.Sleep(time.Duration(us) * time.Microsecond)
347 }
348
349 payload, err := newPayload(in.GetResponseType(), c.GetSize())
350 if err != nil {
351 return err
352 }
353
354 if err := stream.Send(&testpb.StreamingOutputCallResponse{
355 Payload: payload,
356 }); err != nil {
357
358 if status.Code(err) == codes.ResourceExhausted {
359 return status.Errorf(codes.Internal, "fake error for test testSvrWriteStatusEarlyWrite. true error: %s", err.Error())
360 }
361 return err
362 }
363 }
364 }
365 }
366
367 func (s *testServer) HalfDuplexCall(stream testgrpc.TestService_HalfDuplexCallServer) error {
368 var msgBuf []*testpb.StreamingOutputCallRequest
369 for {
370 in, err := stream.Recv()
371 if err == io.EOF {
372
373 break
374 }
375 if err != nil {
376 return err
377 }
378 msgBuf = append(msgBuf, in)
379 }
380 for _, m := range msgBuf {
381 cs := m.GetResponseParameters()
382 for _, c := range cs {
383 if us := c.GetIntervalUs(); us > 0 {
384 time.Sleep(time.Duration(us) * time.Microsecond)
385 }
386
387 payload, err := newPayload(m.GetResponseType(), c.GetSize())
388 if err != nil {
389 return err
390 }
391
392 if err := stream.Send(&testpb.StreamingOutputCallResponse{
393 Payload: payload,
394 }); err != nil {
395 return err
396 }
397 }
398 }
399 return nil
400 }
401
402 type env struct {
403 name string
404 network string
405 security string
406 httpHandler bool
407 balancer string
408 customDialer func(string, string, time.Duration) (net.Conn, error)
409 }
410
411 func (e env) runnable() bool {
412 if runtime.GOOS == "windows" && e.network == "unix" {
413 return false
414 }
415 return true
416 }
417
418 func (e env) dialer(addr string, timeout time.Duration) (net.Conn, error) {
419 if e.customDialer != nil {
420 return e.customDialer(e.network, addr, timeout)
421 }
422 return net.DialTimeout(e.network, addr, timeout)
423 }
424
425 var (
426 tcpClearEnv = env{name: "tcp-clear-v1-balancer", network: "tcp"}
427 tcpTLSEnv = env{name: "tcp-tls-v1-balancer", network: "tcp", security: "tls"}
428 tcpClearRREnv = env{name: "tcp-clear", network: "tcp", balancer: "round_robin"}
429 tcpTLSRREnv = env{name: "tcp-tls", network: "tcp", security: "tls", balancer: "round_robin"}
430 handlerEnv = env{name: "handler-tls", network: "tcp", security: "tls", httpHandler: true, balancer: "round_robin"}
431 noBalancerEnv = env{name: "no-balancer", network: "tcp", security: "tls"}
432 allEnv = []env{tcpClearEnv, tcpTLSEnv, tcpClearRREnv, tcpTLSRREnv, handlerEnv, noBalancerEnv}
433 )
434
435 var onlyEnv = flag.String("only_env", "", "If non-empty, one of 'tcp-clear', 'tcp-tls', 'unix-clear', 'unix-tls', or 'handler-tls' to only run the tests for that environment. Empty means all.")
436
437 func listTestEnv() (envs []env) {
438 if *onlyEnv != "" {
439 for _, e := range allEnv {
440 if e.name == *onlyEnv {
441 if !e.runnable() {
442 panic(fmt.Sprintf("--only_env environment %q does not run on %s", *onlyEnv, runtime.GOOS))
443 }
444 return []env{e}
445 }
446 }
447 panic(fmt.Sprintf("invalid --only_env value %q", *onlyEnv))
448 }
449 for _, e := range allEnv {
450 if e.runnable() {
451 envs = append(envs, e)
452 }
453 }
454 return envs
455 }
456
457
458
459
460 type test struct {
461
462 t *testing.T
463 e env
464 ctx context.Context
465 cancel context.CancelFunc
466
467
468
469
470
471
472 enableHealthServer bool
473
474
475
476
477 healthServer healthgrpc.HealthServer
478 maxStream uint32
479 tapHandle tap.ServerInHandle
480 maxServerMsgSize *int
481 maxServerReceiveMsgSize *int
482 maxServerSendMsgSize *int
483 maxServerHeaderListSize *uint32
484
485 serverCompression bool
486 unknownHandler grpc.StreamHandler
487 unaryServerInt grpc.UnaryServerInterceptor
488 streamServerInt grpc.StreamServerInterceptor
489 serverInitialWindowSize int32
490 serverInitialConnWindowSize int32
491 customServerOptions []grpc.ServerOption
492
493
494
495 maxClientMsgSize *int
496 maxClientReceiveMsgSize *int
497 maxClientSendMsgSize *int
498 maxClientHeaderListSize *uint32
499 userAgent string
500
501 clientCompression bool
502
503 clientUseCompression bool
504
505 clientNopCompression bool
506 unaryClientInt grpc.UnaryClientInterceptor
507 streamClientInt grpc.StreamClientInterceptor
508 clientInitialWindowSize int32
509 clientInitialConnWindowSize int32
510 perRPCCreds credentials.PerRPCCredentials
511 customDialOptions []grpc.DialOption
512 resolverScheme string
513
514
515
516 srv stopper
517 hSrv healthgrpc.HealthServer
518 srvAddr string
519
520
521 srvs []stopper
522 hSrvs []healthgrpc.HealthServer
523 srvAddrs []string
524
525 cc *grpc.ClientConn
526 restoreLogs func()
527 }
528
529 type stopper interface {
530 Stop()
531 GracefulStop()
532 }
533
534 func (te *test) tearDown() {
535 if te.cancel != nil {
536 te.cancel()
537 te.cancel = nil
538 }
539
540 if te.cc != nil {
541 te.cc.Close()
542 te.cc = nil
543 }
544
545 if te.restoreLogs != nil {
546 te.restoreLogs()
547 te.restoreLogs = nil
548 }
549
550 if te.srv != nil {
551 te.srv.Stop()
552 }
553 for _, s := range te.srvs {
554 s.Stop()
555 }
556 }
557
558
559
560
561 func newTest(t *testing.T, e env) *test {
562 te := &test{
563 t: t,
564 e: e,
565 maxStream: math.MaxUint32,
566 }
567 te.ctx, te.cancel = context.WithTimeout(context.Background(), defaultTestTimeout)
568 return te
569 }
570
571 func (te *test) listenAndServe(ts testgrpc.TestServiceServer, listen func(network, address string) (net.Listener, error)) net.Listener {
572 te.t.Helper()
573 te.t.Logf("Running test in %s environment...", te.e.name)
574 sopts := []grpc.ServerOption{grpc.MaxConcurrentStreams(te.maxStream)}
575 if te.maxServerMsgSize != nil {
576 sopts = append(sopts, grpc.MaxMsgSize(*te.maxServerMsgSize))
577 }
578 if te.maxServerReceiveMsgSize != nil {
579 sopts = append(sopts, grpc.MaxRecvMsgSize(*te.maxServerReceiveMsgSize))
580 }
581 if te.maxServerSendMsgSize != nil {
582 sopts = append(sopts, grpc.MaxSendMsgSize(*te.maxServerSendMsgSize))
583 }
584 if te.maxServerHeaderListSize != nil {
585 sopts = append(sopts, grpc.MaxHeaderListSize(*te.maxServerHeaderListSize))
586 }
587 if te.tapHandle != nil {
588 sopts = append(sopts, grpc.InTapHandle(te.tapHandle))
589 }
590 if te.serverCompression {
591 sopts = append(sopts,
592 grpc.RPCCompressor(grpc.NewGZIPCompressor()),
593 grpc.RPCDecompressor(grpc.NewGZIPDecompressor()),
594 )
595 }
596 if te.unaryServerInt != nil {
597 sopts = append(sopts, grpc.UnaryInterceptor(te.unaryServerInt))
598 }
599 if te.streamServerInt != nil {
600 sopts = append(sopts, grpc.StreamInterceptor(te.streamServerInt))
601 }
602 if te.unknownHandler != nil {
603 sopts = append(sopts, grpc.UnknownServiceHandler(te.unknownHandler))
604 }
605 if te.serverInitialWindowSize > 0 {
606 sopts = append(sopts, grpc.InitialWindowSize(te.serverInitialWindowSize))
607 }
608 if te.serverInitialConnWindowSize > 0 {
609 sopts = append(sopts, grpc.InitialConnWindowSize(te.serverInitialConnWindowSize))
610 }
611 la := "localhost:0"
612 switch te.e.network {
613 case "unix":
614 la = "/tmp/testsock" + fmt.Sprintf("%d", time.Now().UnixNano())
615 syscall.Unlink(la)
616 }
617 lis, err := listen(te.e.network, la)
618 if err != nil {
619 te.t.Fatalf("Failed to listen: %v", err)
620 }
621 if te.e.security == "tls" {
622 creds, err := credentials.NewServerTLSFromFile(testdata.Path("x509/server1_cert.pem"), testdata.Path("x509/server1_key.pem"))
623 if err != nil {
624 te.t.Fatalf("Failed to generate credentials %v", err)
625 }
626 sopts = append(sopts, grpc.Creds(creds))
627 }
628 sopts = append(sopts, te.customServerOptions...)
629 s := grpc.NewServer(sopts...)
630 if ts != nil {
631 testgrpc.RegisterTestServiceServer(s, ts)
632 }
633
634
635
636 hs := te.healthServer
637 if te.enableHealthServer {
638 hs = health.NewServer()
639 }
640 if hs != nil {
641 healthgrpc.RegisterHealthServer(s, hs)
642 }
643
644 addr := la
645 switch te.e.network {
646 case "unix":
647 default:
648 _, port, err := net.SplitHostPort(lis.Addr().String())
649 if err != nil {
650 te.t.Fatalf("Failed to parse listener address: %v", err)
651 }
652 addr = "localhost:" + port
653 }
654
655 te.srv = s
656 te.hSrv = hs
657 te.srvAddr = addr
658
659 if te.e.httpHandler {
660 if te.e.security != "tls" {
661 te.t.Fatalf("unsupported environment settings")
662 }
663 cert, err := tls.LoadX509KeyPair(testdata.Path("x509/server1_cert.pem"), testdata.Path("x509/server1_key.pem"))
664 if err != nil {
665 te.t.Fatal("tls.LoadX509KeyPair(server1.pem, server1.key) failed: ", err)
666 }
667 hs := &http.Server{
668 Handler: s,
669 TLSConfig: &tls.Config{Certificates: []tls.Certificate{cert}},
670 }
671 if err := http2.ConfigureServer(hs, &http2.Server{MaxConcurrentStreams: te.maxStream}); err != nil {
672 te.t.Fatal("http2.ConfigureServer(_, _) failed: ", err)
673 }
674 te.srv = wrapHS{hs}
675 tlsListener := tls.NewListener(lis, hs.TLSConfig)
676 go hs.Serve(tlsListener)
677 return lis
678 }
679
680 go s.Serve(lis)
681 return lis
682 }
683
684 type wrapHS struct {
685 s *http.Server
686 }
687
688 func (w wrapHS) GracefulStop() {
689 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
690 defer cancel()
691 w.s.Shutdown(ctx)
692 }
693
694 func (w wrapHS) Stop() {
695 w.s.Close()
696 w.s.Handler.(*grpc.Server).Stop()
697 }
698
699 func (te *test) startServerWithConnControl(ts testgrpc.TestServiceServer) *listenerWrapper {
700 l := te.listenAndServe(ts, listenWithConnControl)
701 return l.(*listenerWrapper)
702 }
703
704
705
706 func (te *test) startServer(ts testgrpc.TestServiceServer) {
707 te.t.Helper()
708 te.listenAndServe(ts, net.Listen)
709 }
710
711
712 func (te *test) startServers(ts testgrpc.TestServiceServer, num int) {
713 for i := 0; i < num; i++ {
714 te.startServer(ts)
715 te.srvs = append(te.srvs, te.srv.(*grpc.Server))
716 te.hSrvs = append(te.hSrvs, te.hSrv)
717 te.srvAddrs = append(te.srvAddrs, te.srvAddr)
718 te.srv = nil
719 te.hSrv = nil
720 te.srvAddr = ""
721 }
722 }
723
724
725 func (te *test) setHealthServingStatus(service string, status healthpb.HealthCheckResponse_ServingStatus) {
726 hs, ok := te.hSrv.(*health.Server)
727 if !ok {
728 panic(fmt.Sprintf("SetServingStatus(%v, %v) called for health server of type %T", service, status, hs))
729 }
730 hs.SetServingStatus(service, status)
731 }
732
733 type nopCompressor struct {
734 grpc.Compressor
735 }
736
737
738 func newNopCompressor() grpc.Compressor {
739 return &nopCompressor{grpc.NewGZIPCompressor()}
740 }
741
742 func (c *nopCompressor) Type() string {
743 return "nop"
744 }
745
746 type nopDecompressor struct {
747 grpc.Decompressor
748 }
749
750
751 func newNopDecompressor() grpc.Decompressor {
752 return &nopDecompressor{grpc.NewGZIPDecompressor()}
753 }
754
755 func (d *nopDecompressor) Type() string {
756 return "nop"
757 }
758
759 func (te *test) configDial(opts ...grpc.DialOption) ([]grpc.DialOption, string) {
760 opts = append(opts, grpc.WithDialer(te.e.dialer), grpc.WithUserAgent(te.userAgent))
761
762 if te.clientCompression {
763 opts = append(opts,
764 grpc.WithCompressor(grpc.NewGZIPCompressor()),
765 grpc.WithDecompressor(grpc.NewGZIPDecompressor()),
766 )
767 }
768 if te.clientUseCompression {
769 opts = append(opts, grpc.WithDefaultCallOptions(grpc.UseCompressor("gzip")))
770 }
771 if te.clientNopCompression {
772 opts = append(opts,
773 grpc.WithCompressor(newNopCompressor()),
774 grpc.WithDecompressor(newNopDecompressor()),
775 )
776 }
777 if te.unaryClientInt != nil {
778 opts = append(opts, grpc.WithUnaryInterceptor(te.unaryClientInt))
779 }
780 if te.streamClientInt != nil {
781 opts = append(opts, grpc.WithStreamInterceptor(te.streamClientInt))
782 }
783 if te.maxClientMsgSize != nil {
784 opts = append(opts, grpc.WithMaxMsgSize(*te.maxClientMsgSize))
785 }
786 if te.maxClientReceiveMsgSize != nil {
787 opts = append(opts, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(*te.maxClientReceiveMsgSize)))
788 }
789 if te.maxClientSendMsgSize != nil {
790 opts = append(opts, grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(*te.maxClientSendMsgSize)))
791 }
792 if te.maxClientHeaderListSize != nil {
793 opts = append(opts, grpc.WithMaxHeaderListSize(*te.maxClientHeaderListSize))
794 }
795 switch te.e.security {
796 case "tls":
797 creds, err := credentials.NewClientTLSFromFile(testdata.Path("x509/server_ca_cert.pem"), "x.test.example.com")
798 if err != nil {
799 te.t.Fatalf("Failed to load credentials: %v", err)
800 }
801 opts = append(opts, grpc.WithTransportCredentials(creds))
802 case "empty":
803
804 default:
805 opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
806 }
807
808 var scheme string
809 if te.resolverScheme == "" {
810 scheme = "passthrough:///"
811 } else {
812 scheme = te.resolverScheme + ":///"
813 }
814 if te.e.balancer != "" {
815 opts = append(opts, grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, te.e.balancer)))
816 }
817 if te.clientInitialWindowSize > 0 {
818 opts = append(opts, grpc.WithInitialWindowSize(te.clientInitialWindowSize))
819 }
820 if te.clientInitialConnWindowSize > 0 {
821 opts = append(opts, grpc.WithInitialConnWindowSize(te.clientInitialConnWindowSize))
822 }
823 if te.perRPCCreds != nil {
824 opts = append(opts, grpc.WithPerRPCCredentials(te.perRPCCreds))
825 }
826 if te.srvAddr == "" {
827 te.srvAddr = "client.side.only.test"
828 }
829 opts = append(opts, te.customDialOptions...)
830 return opts, scheme
831 }
832
833 func (te *test) clientConnWithConnControl() (*grpc.ClientConn, *dialerWrapper) {
834 if te.cc != nil {
835 return te.cc, nil
836 }
837 opts, scheme := te.configDial()
838 dw := &dialerWrapper{}
839
840 opts = append(opts, grpc.WithDialer(dw.dialer))
841 var err error
842 te.cc, err = grpc.Dial(scheme+te.srvAddr, opts...)
843 if err != nil {
844 te.t.Fatalf("Dial(%q) = %v", scheme+te.srvAddr, err)
845 }
846 return te.cc, dw
847 }
848
849 func (te *test) clientConn(opts ...grpc.DialOption) *grpc.ClientConn {
850 if te.cc != nil {
851 return te.cc
852 }
853 var scheme string
854 opts, scheme = te.configDial(opts...)
855 var err error
856 te.cc, err = grpc.Dial(scheme+te.srvAddr, opts...)
857 if err != nil {
858 te.t.Fatalf("Dial(%q) = %v", scheme+te.srvAddr, err)
859 }
860 return te.cc
861 }
862
863 func (te *test) declareLogNoise(phrases ...string) {
864 te.restoreLogs = declareLogNoise(te.t, phrases...)
865 }
866
867 func (te *test) withServerTester(fn func(st *serverTester)) {
868 c, err := te.e.dialer(te.srvAddr, 10*time.Second)
869 if err != nil {
870 te.t.Fatal(err)
871 }
872 defer c.Close()
873 if te.e.security == "tls" {
874 c = tls.Client(c, &tls.Config{
875 InsecureSkipVerify: true,
876 NextProtos: []string{http2.NextProtoTLS},
877 })
878 }
879 st := newServerTesterFromConn(te.t, c)
880 st.greet()
881 fn(st)
882 }
883
884 type lazyConn struct {
885 net.Conn
886 beLazy int32
887 }
888
889
890 const possibleConnResetMsg = "connection reset by peer"
891 const possibleEOFMsg = "error reading from server: EOF"
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908 func isConnClosedErr(err error) bool {
909 errContainsConnResetMsg := strings.Contains(err.Error(), possibleConnResetMsg)
910 errContainsEOFMsg := strings.Contains(err.Error(), possibleEOFMsg)
911
912 return errContainsConnResetMsg || errContainsEOFMsg || err == io.EOF
913 }
914
915 func (l *lazyConn) Write(b []byte) (int, error) {
916 if atomic.LoadInt32(&(l.beLazy)) == 1 {
917 time.Sleep(time.Second)
918 }
919 return l.Conn.Write(b)
920 }
921
922 func (s) TestContextDeadlineNotIgnored(t *testing.T) {
923 e := noBalancerEnv
924 var lc *lazyConn
925 e.customDialer = func(network, addr string, timeout time.Duration) (net.Conn, error) {
926 conn, err := net.DialTimeout(network, addr, timeout)
927 if err != nil {
928 return nil, err
929 }
930 lc = &lazyConn{Conn: conn}
931 return lc, nil
932 }
933
934 te := newTest(t, e)
935 te.startServer(&testServer{security: e.security})
936 defer te.tearDown()
937
938 cc := te.clientConn()
939 tc := testgrpc.NewTestServiceClient(cc)
940 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
941 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
942 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
943 }
944 cancel()
945 atomic.StoreInt32(&(lc.beLazy), 1)
946 ctx, cancel = context.WithTimeout(context.Background(), defaultTestShortTimeout)
947 defer cancel()
948 t1 := time.Now()
949 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded {
950 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, context.DeadlineExceeded", err)
951 }
952 if time.Since(t1) > 2*time.Second {
953 t.Fatalf("TestService/EmptyCall(_, _) ran over the deadline")
954 }
955 }
956
957 func (s) TestTimeoutOnDeadServer(t *testing.T) {
958 for _, e := range listTestEnv() {
959 testTimeoutOnDeadServer(t, e)
960 }
961 }
962
963 func testTimeoutOnDeadServer(t *testing.T, e env) {
964 te := newTest(t, e)
965 te.userAgent = testAppUA
966 te.startServer(&testServer{security: e.security})
967 defer te.tearDown()
968
969 cc := te.clientConn()
970 tc := testgrpc.NewTestServiceClient(cc)
971 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
972 defer cancel()
973 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
974 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
975 }
976
977
978 testutils.AwaitState(ctx, t, cc, connectivity.Ready)
979 te.srv.Stop()
980 testutils.AwaitNotState(ctx, t, cc, connectivity.Ready)
981 ctx, cancel = context.WithTimeout(ctx, defaultTestShortTimeout)
982 defer cancel()
983 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded {
984 t.Fatalf("TestService/EmptyCall(%v, _) = _, %v, want _, error code: %s", ctx, err, codes.DeadlineExceeded)
985 }
986 awaitNewConnLogOutput()
987 }
988
989 func (s) TestServerGracefulStopIdempotent(t *testing.T) {
990 for _, e := range listTestEnv() {
991 if e.name == "handler-tls" {
992 continue
993 }
994 testServerGracefulStopIdempotent(t, e)
995 }
996 }
997
998 func testServerGracefulStopIdempotent(t *testing.T, e env) {
999 te := newTest(t, e)
1000 te.userAgent = testAppUA
1001 te.startServer(&testServer{security: e.security})
1002 defer te.tearDown()
1003
1004 for i := 0; i < 3; i++ {
1005 te.srv.GracefulStop()
1006 }
1007 }
1008
1009 func (s) TestDetailedConnectionCloseErrorPropagatesToRPCError(t *testing.T) {
1010 rpcStartedOnServer := make(chan struct{})
1011 rpcDoneOnClient := make(chan struct{})
1012 defer close(rpcDoneOnClient)
1013 ss := &stubserver.StubServer{
1014 FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
1015 close(rpcStartedOnServer)
1016 <-rpcDoneOnClient
1017 return status.Error(codes.Internal, "arbitrary status")
1018 },
1019 }
1020 if err := ss.Start(nil); err != nil {
1021 t.Fatalf("Error starting endpoint server: %v", err)
1022 }
1023 defer ss.Stop()
1024
1025 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
1026 defer cancel()
1027
1028
1029
1030
1031
1032 stream, err := ss.Client.FullDuplexCall(ctx)
1033 if err != nil {
1034 t.Fatalf("%v.FullDuplexCall = _, %v, want _, <nil>", ss.Client, err)
1035 }
1036
1037
1038
1039
1040 <-rpcStartedOnServer
1041 ss.S.Stop()
1042
1043
1044
1045
1046 if _, err := stream.Recv(); err == io.EOF || !isConnClosedErr(err) {
1047 t.Fatalf("%v.Recv() = _, %v, want _, rpc error containing substring: %q OR %q", stream, err, possibleConnResetMsg, possibleEOFMsg)
1048 }
1049 }
1050
1051 func (s) TestFailFast(t *testing.T) {
1052 for _, e := range listTestEnv() {
1053 testFailFast(t, e)
1054 }
1055 }
1056
1057 func testFailFast(t *testing.T, e env) {
1058 te := newTest(t, e)
1059 te.userAgent = testAppUA
1060 te.startServer(&testServer{security: e.security})
1061 defer te.tearDown()
1062
1063 cc := te.clientConn()
1064 tc := testgrpc.NewTestServiceClient(cc)
1065 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
1066 defer cancel()
1067 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
1068 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
1069 }
1070
1071 te.srv.Stop()
1072
1073 for {
1074 if err := ctx.Err(); err != nil {
1075 t.Fatalf("EmptyCall did not return UNAVAILABLE before timeout")
1076 }
1077 _, err := tc.EmptyCall(ctx, &testpb.Empty{})
1078 if status.Code(err) == codes.Unavailable {
1079 break
1080 }
1081 t.Logf("%v.EmptyCall(_, _) = _, %v", tc, err)
1082 time.Sleep(10 * time.Millisecond)
1083 }
1084
1085 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Unavailable {
1086 t.Fatalf("TestService/EmptyCall(_, _, _) = _, %v, want _, error code: %s", err, codes.Unavailable)
1087 }
1088 if _, err := tc.StreamingInputCall(ctx); status.Code(err) != codes.Unavailable {
1089 t.Fatalf("TestService/StreamingInputCall(_) = _, %v, want _, error code: %s", err, codes.Unavailable)
1090 }
1091
1092 awaitNewConnLogOutput()
1093 }
1094
1095 func testServiceConfigSetup(t *testing.T, e env) *test {
1096 te := newTest(t, e)
1097 te.userAgent = testAppUA
1098 te.declareLogNoise(
1099 "Failed to dial : context canceled; please retry.",
1100 )
1101 return te
1102 }
1103
1104 func newInt(b int) (a *int) {
1105 return &b
1106 }
1107
1108 func (s) TestGetMethodConfig(t *testing.T) {
1109 te := testServiceConfigSetup(t, tcpClearRREnv)
1110 defer te.tearDown()
1111 r := manual.NewBuilderWithScheme("whatever")
1112
1113 te.resolverScheme = r.Scheme()
1114 cc := te.clientConn(grpc.WithResolvers(r))
1115 addrs := []resolver.Address{{Addr: te.srvAddr}}
1116 r.UpdateState(resolver.State{
1117 Addresses: addrs,
1118 ServiceConfig: parseServiceConfig(t, r, `{
1119 "methodConfig": [
1120 {
1121 "name": [
1122 {
1123 "service": "grpc.testing.TestService",
1124 "method": "EmptyCall"
1125 }
1126 ],
1127 "waitForReady": true,
1128 "timeout": ".001s"
1129 },
1130 {
1131 "name": [
1132 {
1133 "service": "grpc.testing.TestService"
1134 }
1135 ],
1136 "waitForReady": false
1137 }
1138 ]
1139 }`)})
1140
1141 tc := testgrpc.NewTestServiceClient(cc)
1142
1143
1144 for {
1145 if cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall").WaitForReady != nil {
1146 break
1147 }
1148 time.Sleep(time.Millisecond)
1149 }
1150
1151 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
1152 defer cancel()
1153
1154 var err error
1155 if _, err = tc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded {
1156 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
1157 }
1158
1159 r.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: parseServiceConfig(t, r, `{
1160 "methodConfig": [
1161 {
1162 "name": [
1163 {
1164 "service": "grpc.testing.TestService",
1165 "method": "UnaryCall"
1166 }
1167 ],
1168 "waitForReady": true,
1169 "timeout": ".001s"
1170 },
1171 {
1172 "name": [
1173 {
1174 "service": "grpc.testing.TestService"
1175 }
1176 ],
1177 "waitForReady": false
1178 }
1179 ]
1180 }`)})
1181
1182
1183 for {
1184 if mc := cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall"); mc.WaitForReady != nil && !*mc.WaitForReady {
1185 break
1186 }
1187 time.Sleep(time.Millisecond)
1188 }
1189
1190 if _, err = tc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Unavailable {
1191 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.Unavailable)
1192 }
1193 }
1194
1195 func (s) TestServiceConfigWaitForReady(t *testing.T) {
1196 te := testServiceConfigSetup(t, tcpClearRREnv)
1197 defer te.tearDown()
1198 r := manual.NewBuilderWithScheme("whatever")
1199
1200
1201 te.resolverScheme = r.Scheme()
1202 cc := te.clientConn(grpc.WithResolvers(r))
1203 addrs := []resolver.Address{{Addr: te.srvAddr}}
1204 r.UpdateState(resolver.State{
1205 Addresses: addrs,
1206 ServiceConfig: parseServiceConfig(t, r, `{
1207 "methodConfig": [
1208 {
1209 "name": [
1210 {
1211 "service": "grpc.testing.TestService",
1212 "method": "EmptyCall"
1213 },
1214 {
1215 "service": "grpc.testing.TestService",
1216 "method": "FullDuplexCall"
1217 }
1218 ],
1219 "waitForReady": false,
1220 "timeout": ".001s"
1221 }
1222 ]
1223 }`)})
1224
1225 tc := testgrpc.NewTestServiceClient(cc)
1226
1227
1228 for {
1229 if cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").WaitForReady != nil {
1230 break
1231 }
1232 time.Sleep(time.Millisecond)
1233 }
1234 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
1235 defer cancel()
1236
1237 var err error
1238 if _, err = tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded {
1239 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
1240 }
1241 if _, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded {
1242 t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded)
1243 }
1244
1245
1246
1247 r.UpdateState(resolver.State{
1248 Addresses: addrs,
1249 ServiceConfig: parseServiceConfig(t, r, `{
1250 "methodConfig": [
1251 {
1252 "name": [
1253 {
1254 "service": "grpc.testing.TestService",
1255 "method": "EmptyCall"
1256 },
1257 {
1258 "service": "grpc.testing.TestService",
1259 "method": "FullDuplexCall"
1260 }
1261 ],
1262 "waitForReady": true,
1263 "timeout": ".001s"
1264 }
1265 ]
1266 }`)})
1267
1268
1269 for {
1270 if mc := cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall"); mc.WaitForReady != nil && *mc.WaitForReady {
1271 break
1272 }
1273 time.Sleep(time.Millisecond)
1274 }
1275
1276 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded {
1277 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
1278 }
1279 if _, err := tc.FullDuplexCall(ctx); status.Code(err) != codes.DeadlineExceeded {
1280 t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded)
1281 }
1282 }
1283
1284 func (s) TestServiceConfigTimeout(t *testing.T) {
1285 te := testServiceConfigSetup(t, tcpClearRREnv)
1286 defer te.tearDown()
1287 r := manual.NewBuilderWithScheme("whatever")
1288
1289
1290 te.resolverScheme = r.Scheme()
1291 cc := te.clientConn(grpc.WithResolvers(r))
1292 addrs := []resolver.Address{{Addr: te.srvAddr}}
1293 r.UpdateState(resolver.State{
1294 Addresses: addrs,
1295 ServiceConfig: parseServiceConfig(t, r, `{
1296 "methodConfig": [
1297 {
1298 "name": [
1299 {
1300 "service": "grpc.testing.TestService",
1301 "method": "EmptyCall"
1302 },
1303 {
1304 "service": "grpc.testing.TestService",
1305 "method": "FullDuplexCall"
1306 }
1307 ],
1308 "waitForReady": true,
1309 "timeout": "3600s"
1310 }
1311 ]
1312 }`)})
1313
1314 tc := testgrpc.NewTestServiceClient(cc)
1315
1316
1317 for {
1318 if cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").Timeout != nil {
1319 break
1320 }
1321 time.Sleep(time.Millisecond)
1322 }
1323
1324
1325 var err error
1326 ctx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
1327 if _, err = tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded {
1328 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
1329 }
1330 cancel()
1331
1332 ctx, cancel = context.WithTimeout(context.Background(), defaultTestShortTimeout)
1333 if _, err = tc.FullDuplexCall(ctx, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded {
1334 t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded)
1335 }
1336 cancel()
1337
1338
1339
1340 r.UpdateState(resolver.State{
1341 Addresses: addrs,
1342 ServiceConfig: parseServiceConfig(t, r, `{
1343 "methodConfig": [
1344 {
1345 "name": [
1346 {
1347 "service": "grpc.testing.TestService",
1348 "method": "EmptyCall"
1349 },
1350 {
1351 "service": "grpc.testing.TestService",
1352 "method": "FullDuplexCall"
1353 }
1354 ],
1355 "waitForReady": true,
1356 "timeout": ".000000001s"
1357 }
1358 ]
1359 }`)})
1360
1361
1362 for {
1363 if mc := cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall"); mc.Timeout != nil && *mc.Timeout == time.Nanosecond {
1364 break
1365 }
1366 time.Sleep(time.Millisecond)
1367 }
1368
1369 ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout)
1370 defer cancel()
1371 if _, err = tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded {
1372 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
1373 }
1374
1375 if _, err = tc.FullDuplexCall(ctx, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded {
1376 t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded)
1377 }
1378 }
1379
1380 func (s) TestServiceConfigMaxMsgSize(t *testing.T) {
1381 e := tcpClearRREnv
1382 r := manual.NewBuilderWithScheme("whatever")
1383
1384
1385 const smallSize = 1
1386 const largeSize = 1024
1387 const extraLargeSize = 2048
1388
1389 smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize)
1390 if err != nil {
1391 t.Fatal(err)
1392 }
1393 largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize)
1394 if err != nil {
1395 t.Fatal(err)
1396 }
1397 extraLargePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, extraLargeSize)
1398 if err != nil {
1399 t.Fatal(err)
1400 }
1401
1402
1403 te1 := testServiceConfigSetup(t, e)
1404 defer te1.tearDown()
1405
1406 te1.resolverScheme = r.Scheme()
1407 te1.startServer(&testServer{security: e.security})
1408 cc1 := te1.clientConn(grpc.WithResolvers(r))
1409
1410 addrs := []resolver.Address{{Addr: te1.srvAddr}}
1411 sc := parseServiceConfig(t, r, `{
1412 "methodConfig": [
1413 {
1414 "name": [
1415 {
1416 "service": "grpc.testing.TestService",
1417 "method": "UnaryCall"
1418 },
1419 {
1420 "service": "grpc.testing.TestService",
1421 "method": "FullDuplexCall"
1422 }
1423 ],
1424 "maxRequestMessageBytes": 2048,
1425 "maxResponseMessageBytes": 2048
1426 }
1427 ]
1428 }`)
1429 r.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: sc})
1430 tc := testgrpc.NewTestServiceClient(cc1)
1431
1432 req := &testpb.SimpleRequest{
1433 ResponseType: testpb.PayloadType_COMPRESSABLE,
1434 ResponseSize: int32(extraLargeSize),
1435 Payload: smallPayload,
1436 }
1437
1438 for {
1439 if cc1.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").MaxReqSize != nil {
1440 break
1441 }
1442 time.Sleep(time.Millisecond)
1443 }
1444 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
1445 defer cancel()
1446
1447 if _, err = tc.UnaryCall(ctx, req, grpc.WaitForReady(true)); err == nil || status.Code(err) != codes.ResourceExhausted {
1448 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
1449 }
1450
1451
1452 req.Payload = extraLargePayload
1453 req.ResponseSize = int32(smallSize)
1454 if _, err := tc.UnaryCall(ctx, req); err == nil || status.Code(err) != codes.ResourceExhausted {
1455 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
1456 }
1457
1458
1459 respParam := []*testpb.ResponseParameters{
1460 {
1461 Size: int32(extraLargeSize),
1462 },
1463 }
1464 sreq := &testpb.StreamingOutputCallRequest{
1465 ResponseType: testpb.PayloadType_COMPRESSABLE,
1466 ResponseParameters: respParam,
1467 Payload: smallPayload,
1468 }
1469 stream, err := tc.FullDuplexCall(te1.ctx)
1470 if err != nil {
1471 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
1472 }
1473 if err = stream.Send(sreq); err != nil {
1474 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
1475 }
1476 if _, err = stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
1477 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
1478 }
1479
1480
1481 respParam[0].Size = int32(smallSize)
1482 sreq.Payload = extraLargePayload
1483 stream, err = tc.FullDuplexCall(te1.ctx)
1484 if err != nil {
1485 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
1486 }
1487 if err = stream.Send(sreq); err == nil || status.Code(err) != codes.ResourceExhausted {
1488 t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted)
1489 }
1490
1491
1492 te2 := testServiceConfigSetup(t, e)
1493 te2.resolverScheme = r.Scheme()
1494 te2.maxClientReceiveMsgSize = newInt(1024)
1495 te2.maxClientSendMsgSize = newInt(1024)
1496
1497 te2.startServer(&testServer{security: e.security})
1498 defer te2.tearDown()
1499 cc2 := te2.clientConn(grpc.WithResolvers(r))
1500 r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: te2.srvAddr}}, ServiceConfig: sc})
1501 tc = testgrpc.NewTestServiceClient(cc2)
1502
1503 for {
1504 if cc2.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").MaxReqSize != nil {
1505 break
1506 }
1507 time.Sleep(time.Millisecond)
1508 }
1509
1510
1511 req.Payload = smallPayload
1512 req.ResponseSize = int32(largeSize)
1513
1514 if _, err = tc.UnaryCall(ctx, req, grpc.WaitForReady(true)); err == nil || status.Code(err) != codes.ResourceExhausted {
1515 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
1516 }
1517
1518
1519 req.Payload = largePayload
1520 req.ResponseSize = int32(smallSize)
1521 if _, err := tc.UnaryCall(ctx, req); err == nil || status.Code(err) != codes.ResourceExhausted {
1522 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
1523 }
1524
1525
1526 stream, err = tc.FullDuplexCall(te2.ctx)
1527 respParam[0].Size = int32(largeSize)
1528 sreq.Payload = smallPayload
1529 if err != nil {
1530 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
1531 }
1532 if err = stream.Send(sreq); err != nil {
1533 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
1534 }
1535 if _, err = stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
1536 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
1537 }
1538
1539
1540 respParam[0].Size = int32(smallSize)
1541 sreq.Payload = largePayload
1542 stream, err = tc.FullDuplexCall(te2.ctx)
1543 if err != nil {
1544 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
1545 }
1546 if err = stream.Send(sreq); err == nil || status.Code(err) != codes.ResourceExhausted {
1547 t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted)
1548 }
1549
1550
1551 te3 := testServiceConfigSetup(t, e)
1552 te3.resolverScheme = r.Scheme()
1553 te3.maxClientReceiveMsgSize = newInt(4096)
1554 te3.maxClientSendMsgSize = newInt(4096)
1555
1556 te3.startServer(&testServer{security: e.security})
1557 defer te3.tearDown()
1558
1559 cc3 := te3.clientConn(grpc.WithResolvers(r))
1560 r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: te3.srvAddr}}, ServiceConfig: sc})
1561 tc = testgrpc.NewTestServiceClient(cc3)
1562
1563 for {
1564 if cc3.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").MaxReqSize != nil {
1565 break
1566 }
1567 time.Sleep(time.Millisecond)
1568 }
1569
1570
1571 req.Payload = smallPayload
1572 req.ResponseSize = int32(largeSize)
1573
1574 if _, err = tc.UnaryCall(ctx, req, grpc.WaitForReady(true)); err != nil {
1575 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want <nil>", err)
1576 }
1577
1578 req.ResponseSize = int32(extraLargeSize)
1579 if _, err := tc.UnaryCall(ctx, req); err == nil || status.Code(err) != codes.ResourceExhausted {
1580 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
1581 }
1582
1583
1584 req.Payload = largePayload
1585 req.ResponseSize = int32(smallSize)
1586 if _, err := tc.UnaryCall(ctx, req); err != nil {
1587 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want <nil>", err)
1588 }
1589
1590 req.Payload = extraLargePayload
1591 if _, err = tc.UnaryCall(ctx, req); err == nil || status.Code(err) != codes.ResourceExhausted {
1592 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
1593 }
1594
1595
1596 stream, err = tc.FullDuplexCall(te3.ctx)
1597 if err != nil {
1598 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
1599 }
1600 respParam[0].Size = int32(largeSize)
1601 sreq.Payload = smallPayload
1602
1603 if err = stream.Send(sreq); err != nil {
1604 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
1605 }
1606 if _, err = stream.Recv(); err != nil {
1607 t.Fatalf("%v.Recv() = _, %v, want <nil>", stream, err)
1608 }
1609
1610 respParam[0].Size = int32(extraLargeSize)
1611
1612 if err = stream.Send(sreq); err != nil {
1613 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
1614 }
1615 if _, err = stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
1616 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
1617 }
1618
1619
1620 respParam[0].Size = int32(smallSize)
1621 sreq.Payload = largePayload
1622 stream, err = tc.FullDuplexCall(te3.ctx)
1623 if err != nil {
1624 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
1625 }
1626 if err := stream.Send(sreq); err != nil {
1627 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
1628 }
1629 sreq.Payload = extraLargePayload
1630 if err := stream.Send(sreq); err == nil || status.Code(err) != codes.ResourceExhausted {
1631 t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted)
1632 }
1633 }
1634
1635
1636
1637
1638 func (s) TestStreamingRPCWithTimeoutInServiceConfigRecv(t *testing.T) {
1639 te := testServiceConfigSetup(t, tcpClearRREnv)
1640 te.startServer(&testServer{security: tcpClearRREnv.security})
1641 defer te.tearDown()
1642 r := manual.NewBuilderWithScheme("whatever")
1643
1644 te.resolverScheme = r.Scheme()
1645 cc := te.clientConn(grpc.WithResolvers(r))
1646 tc := testgrpc.NewTestServiceClient(cc)
1647
1648 r.UpdateState(resolver.State{
1649 Addresses: []resolver.Address{{Addr: te.srvAddr}},
1650 ServiceConfig: parseServiceConfig(t, r, `{
1651 "methodConfig": [
1652 {
1653 "name": [
1654 {
1655 "service": "grpc.testing.TestService",
1656 "method": "FullDuplexCall"
1657 }
1658 ],
1659 "waitForReady": true,
1660 "timeout": "10s"
1661 }
1662 ]
1663 }`)})
1664
1665 for {
1666 if cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").Timeout != nil {
1667 break
1668 }
1669 time.Sleep(time.Millisecond)
1670 }
1671
1672 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
1673 defer cancel()
1674 stream, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true))
1675 if err != nil {
1676 t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want <nil>", err)
1677 }
1678
1679 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 0)
1680 if err != nil {
1681 t.Fatalf("failed to newPayload: %v", err)
1682 }
1683 req := &testpb.StreamingOutputCallRequest{
1684 ResponseType: testpb.PayloadType_COMPRESSABLE,
1685 ResponseParameters: []*testpb.ResponseParameters{{Size: 0}},
1686 Payload: payload,
1687 }
1688 if err := stream.Send(req); err != nil {
1689 t.Fatalf("stream.Send(%v) = %v, want <nil>", req, err)
1690 }
1691 stream.CloseSend()
1692 time.Sleep(time.Second)
1693
1694
1695 if _, err := stream.Recv(); err != nil {
1696 t.Fatalf("stream.Recv = _, %v, want _, <nil>", err)
1697 }
1698
1699 for {
1700 if _, err := stream.Recv(); err != nil {
1701 break
1702 }
1703 }
1704 }
1705
1706 func (s) TestPreloaderClientSend(t *testing.T) {
1707 for _, e := range listTestEnv() {
1708 testPreloaderClientSend(t, e)
1709 }
1710 }
1711
1712 func testPreloaderClientSend(t *testing.T, e env) {
1713 te := newTest(t, e)
1714 te.userAgent = testAppUA
1715 te.declareLogNoise(
1716 "Failed to dial : context canceled; please retry.",
1717 )
1718 te.startServer(&testServer{security: e.security})
1719
1720 defer te.tearDown()
1721 tc := testgrpc.NewTestServiceClient(te.clientConn())
1722
1723
1724
1725 stream, err := tc.FullDuplexCall(te.ctx, grpc.UseCompressor("gzip"))
1726 if err != nil {
1727 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
1728 }
1729 var index int
1730 for index < len(reqSizes) {
1731 respParam := []*testpb.ResponseParameters{
1732 {
1733 Size: int32(respSizes[index]),
1734 },
1735 }
1736
1737 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(reqSizes[index]))
1738 if err != nil {
1739 t.Fatal(err)
1740 }
1741
1742 req := &testpb.StreamingOutputCallRequest{
1743 ResponseType: testpb.PayloadType_COMPRESSABLE,
1744 ResponseParameters: respParam,
1745 Payload: payload,
1746 }
1747 preparedMsg := &grpc.PreparedMsg{}
1748 err = preparedMsg.Encode(stream, req)
1749 if err != nil {
1750 t.Fatalf("PrepareMsg failed for size %d : %v", reqSizes[index], err)
1751 }
1752 if err := stream.SendMsg(preparedMsg); err != nil {
1753 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
1754 }
1755 reply, err := stream.Recv()
1756 if err != nil {
1757 t.Fatalf("%v.Recv() = %v, want <nil>", stream, err)
1758 }
1759 pt := reply.GetPayload().GetType()
1760 if pt != testpb.PayloadType_COMPRESSABLE {
1761 t.Fatalf("Got the reply of type %d, want %d", pt, testpb.PayloadType_COMPRESSABLE)
1762 }
1763 size := len(reply.GetPayload().GetBody())
1764 if size != int(respSizes[index]) {
1765 t.Fatalf("Got reply body of length %d, want %d", size, respSizes[index])
1766 }
1767 index++
1768 }
1769 if err := stream.CloseSend(); err != nil {
1770 t.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil)
1771 }
1772 if _, err := stream.Recv(); err != io.EOF {
1773 t.Fatalf("%v failed to complele the ping pong test: %v", stream, err)
1774 }
1775 }
1776
1777 func (s) TestPreloaderSenderSend(t *testing.T) {
1778 ss := &stubserver.StubServer{
1779 FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
1780 for i := 0; i < 10; i++ {
1781 preparedMsg := &grpc.PreparedMsg{}
1782 err := preparedMsg.Encode(stream, &testpb.StreamingOutputCallResponse{
1783 Payload: &testpb.Payload{
1784 Body: []byte{'0' + uint8(i)},
1785 },
1786 })
1787 if err != nil {
1788 return err
1789 }
1790 stream.SendMsg(preparedMsg)
1791 }
1792 return nil
1793 },
1794 }
1795 if err := ss.Start(nil); err != nil {
1796 t.Fatalf("Error starting endpoint server: %v", err)
1797 }
1798 defer ss.Stop()
1799
1800 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
1801 defer cancel()
1802
1803 stream, err := ss.Client.FullDuplexCall(ctx)
1804 if err != nil {
1805 t.Fatalf("ss.Client.EmptyCall(_, _) = _, %v; want _, nil", err)
1806 }
1807
1808 var ngot int
1809 var buf bytes.Buffer
1810 for {
1811 reply, err := stream.Recv()
1812 if err == io.EOF {
1813 break
1814 }
1815 if err != nil {
1816 t.Fatal(err)
1817 }
1818 ngot++
1819 if buf.Len() > 0 {
1820 buf.WriteByte(',')
1821 }
1822 buf.Write(reply.GetPayload().GetBody())
1823 }
1824 if want := 10; ngot != want {
1825 t.Errorf("Got %d replies, want %d", ngot, want)
1826 }
1827 if got, want := buf.String(), "0,1,2,3,4,5,6,7,8,9"; got != want {
1828 t.Errorf("Got replies %q; want %q", got, want)
1829 }
1830 }
1831
1832 func (s) TestMaxMsgSizeClientDefault(t *testing.T) {
1833 for _, e := range listTestEnv() {
1834 testMaxMsgSizeClientDefault(t, e)
1835 }
1836 }
1837
1838 func testMaxMsgSizeClientDefault(t *testing.T, e env) {
1839 te := newTest(t, e)
1840 te.userAgent = testAppUA
1841 te.declareLogNoise(
1842 "Failed to dial : context canceled; please retry.",
1843 )
1844 te.startServer(&testServer{security: e.security})
1845
1846 defer te.tearDown()
1847 tc := testgrpc.NewTestServiceClient(te.clientConn())
1848
1849 const smallSize = 1
1850 const largeSize = 4 * 1024 * 1024
1851 smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize)
1852 if err != nil {
1853 t.Fatal(err)
1854 }
1855 req := &testpb.SimpleRequest{
1856 ResponseType: testpb.PayloadType_COMPRESSABLE,
1857 ResponseSize: int32(largeSize),
1858 Payload: smallPayload,
1859 }
1860
1861 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
1862 defer cancel()
1863
1864 if _, err := tc.UnaryCall(ctx, req); err == nil || status.Code(err) != codes.ResourceExhausted {
1865 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
1866 }
1867
1868 respParam := []*testpb.ResponseParameters{
1869 {
1870 Size: int32(largeSize),
1871 },
1872 }
1873 sreq := &testpb.StreamingOutputCallRequest{
1874 ResponseType: testpb.PayloadType_COMPRESSABLE,
1875 ResponseParameters: respParam,
1876 Payload: smallPayload,
1877 }
1878
1879
1880 stream, err := tc.FullDuplexCall(te.ctx)
1881 if err != nil {
1882 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
1883 }
1884 if err := stream.Send(sreq); err != nil {
1885 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
1886 }
1887 if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
1888 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
1889 }
1890 }
1891
1892 func (s) TestMaxMsgSizeClientAPI(t *testing.T) {
1893 for _, e := range listTestEnv() {
1894 testMaxMsgSizeClientAPI(t, e)
1895 }
1896 }
1897
1898 func testMaxMsgSizeClientAPI(t *testing.T, e env) {
1899 te := newTest(t, e)
1900 te.userAgent = testAppUA
1901
1902 te.maxServerSendMsgSize = newInt(5 * 1024 * 1024)
1903 te.maxClientReceiveMsgSize = newInt(1024)
1904 te.maxClientSendMsgSize = newInt(1024)
1905 te.declareLogNoise(
1906 "Failed to dial : context canceled; please retry.",
1907 )
1908 te.startServer(&testServer{security: e.security})
1909
1910 defer te.tearDown()
1911 tc := testgrpc.NewTestServiceClient(te.clientConn())
1912
1913 const smallSize = 1
1914 const largeSize = 1024
1915 smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize)
1916 if err != nil {
1917 t.Fatal(err)
1918 }
1919
1920 largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize)
1921 if err != nil {
1922 t.Fatal(err)
1923 }
1924 req := &testpb.SimpleRequest{
1925 ResponseType: testpb.PayloadType_COMPRESSABLE,
1926 ResponseSize: int32(largeSize),
1927 Payload: smallPayload,
1928 }
1929
1930 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
1931 defer cancel()
1932
1933 if _, err := tc.UnaryCall(ctx, req); err == nil || status.Code(err) != codes.ResourceExhausted {
1934 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
1935 }
1936
1937
1938 req.Payload = largePayload
1939 req.ResponseSize = int32(smallSize)
1940 if _, err := tc.UnaryCall(ctx, req); err == nil || status.Code(err) != codes.ResourceExhausted {
1941 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
1942 }
1943
1944 respParam := []*testpb.ResponseParameters{
1945 {
1946 Size: int32(largeSize),
1947 },
1948 }
1949 sreq := &testpb.StreamingOutputCallRequest{
1950 ResponseType: testpb.PayloadType_COMPRESSABLE,
1951 ResponseParameters: respParam,
1952 Payload: smallPayload,
1953 }
1954
1955
1956 stream, err := tc.FullDuplexCall(te.ctx)
1957 if err != nil {
1958 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
1959 }
1960 if err := stream.Send(sreq); err != nil {
1961 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
1962 }
1963 if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
1964 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
1965 }
1966
1967
1968 respParam[0].Size = int32(smallSize)
1969 sreq.Payload = largePayload
1970 stream, err = tc.FullDuplexCall(te.ctx)
1971 if err != nil {
1972 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
1973 }
1974 if err := stream.Send(sreq); err == nil || status.Code(err) != codes.ResourceExhausted {
1975 t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted)
1976 }
1977 }
1978
1979 func (s) TestMaxMsgSizeServerAPI(t *testing.T) {
1980 for _, e := range listTestEnv() {
1981 testMaxMsgSizeServerAPI(t, e)
1982 }
1983 }
1984
1985 func testMaxMsgSizeServerAPI(t *testing.T, e env) {
1986 te := newTest(t, e)
1987 te.userAgent = testAppUA
1988 te.maxServerReceiveMsgSize = newInt(1024)
1989 te.maxServerSendMsgSize = newInt(1024)
1990 te.declareLogNoise(
1991 "Failed to dial : context canceled; please retry.",
1992 )
1993 te.startServer(&testServer{security: e.security})
1994
1995 defer te.tearDown()
1996 tc := testgrpc.NewTestServiceClient(te.clientConn())
1997
1998 const smallSize = 1
1999 const largeSize = 1024
2000 smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize)
2001 if err != nil {
2002 t.Fatal(err)
2003 }
2004
2005 largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize)
2006 if err != nil {
2007 t.Fatal(err)
2008 }
2009 req := &testpb.SimpleRequest{
2010 ResponseType: testpb.PayloadType_COMPRESSABLE,
2011 ResponseSize: int32(largeSize),
2012 Payload: smallPayload,
2013 }
2014
2015 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
2016 defer cancel()
2017
2018 if _, err := tc.UnaryCall(ctx, req); err == nil || status.Code(err) != codes.ResourceExhausted {
2019 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
2020 }
2021
2022
2023 req.Payload = largePayload
2024 req.ResponseSize = int32(smallSize)
2025 if _, err := tc.UnaryCall(ctx, req); err == nil || status.Code(err) != codes.ResourceExhausted {
2026 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
2027 }
2028
2029 respParam := []*testpb.ResponseParameters{
2030 {
2031 Size: int32(largeSize),
2032 },
2033 }
2034 sreq := &testpb.StreamingOutputCallRequest{
2035 ResponseType: testpb.PayloadType_COMPRESSABLE,
2036 ResponseParameters: respParam,
2037 Payload: smallPayload,
2038 }
2039
2040
2041 stream, err := tc.FullDuplexCall(te.ctx)
2042 if err != nil {
2043 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
2044 }
2045 if err := stream.Send(sreq); err != nil {
2046 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
2047 }
2048 if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
2049 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
2050 }
2051
2052
2053 respParam[0].Size = int32(smallSize)
2054 sreq.Payload = largePayload
2055 stream, err = tc.FullDuplexCall(te.ctx)
2056 if err != nil {
2057 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
2058 }
2059 if err := stream.Send(sreq); err != nil {
2060 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
2061 }
2062 if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
2063 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
2064 }
2065 }
2066
2067 func (s) TestTap(t *testing.T) {
2068 for _, e := range listTestEnv() {
2069 if e.name == "handler-tls" {
2070 continue
2071 }
2072 testTap(t, e)
2073 }
2074 }
2075
2076 type myTap struct {
2077 cnt int
2078 }
2079
2080 func (t *myTap) handle(ctx context.Context, info *tap.Info) (context.Context, error) {
2081 if info != nil {
2082 switch info.FullMethodName {
2083 case "/grpc.testing.TestService/EmptyCall":
2084 t.cnt++
2085
2086 if vals := info.Header.Get("return-error"); len(vals) > 0 && vals[0] == "true" {
2087 return nil, status.Errorf(codes.Unknown, "tap error")
2088 }
2089 case "/grpc.testing.TestService/UnaryCall":
2090 return nil, fmt.Errorf("tap error")
2091 case "/grpc.testing.TestService/FullDuplexCall":
2092 return nil, status.Errorf(codes.FailedPrecondition, "test custom error")
2093 }
2094 }
2095 return ctx, nil
2096 }
2097
2098 func testTap(t *testing.T, e env) {
2099 te := newTest(t, e)
2100 te.userAgent = testAppUA
2101 ttap := &myTap{}
2102 te.tapHandle = ttap.handle
2103 te.startServer(&testServer{security: e.security})
2104 defer te.tearDown()
2105
2106 cc := te.clientConn()
2107 tc := testgrpc.NewTestServiceClient(cc)
2108 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
2109 defer cancel()
2110
2111 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
2112 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
2113 }
2114 if ttap.cnt != 1 {
2115 t.Fatalf("Get the count in ttap %d, want 1", ttap.cnt)
2116 }
2117
2118 if _, err := tc.EmptyCall(metadata.AppendToOutgoingContext(ctx, "return-error", "false"), &testpb.Empty{}); err != nil {
2119 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
2120 }
2121 if ttap.cnt != 2 {
2122 t.Fatalf("Get the count in ttap %d, want 2", ttap.cnt)
2123 }
2124
2125 if _, err := tc.EmptyCall(metadata.AppendToOutgoingContext(ctx, "return-error", "true"), &testpb.Empty{}); status.Code(err) != codes.Unknown {
2126 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.Unknown)
2127 }
2128 if ttap.cnt != 3 {
2129 t.Fatalf("Get the count in ttap %d, want 3", ttap.cnt)
2130 }
2131
2132 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 31)
2133 if err != nil {
2134 t.Fatal(err)
2135 }
2136
2137 req := &testpb.SimpleRequest{
2138 ResponseType: testpb.PayloadType_COMPRESSABLE,
2139 ResponseSize: 45,
2140 Payload: payload,
2141 }
2142 if _, err := tc.UnaryCall(ctx, req); status.Code(err) != codes.PermissionDenied {
2143 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, %s", err, codes.PermissionDenied)
2144 }
2145 str, err := tc.FullDuplexCall(ctx)
2146 if err != nil {
2147 t.Fatalf("Unexpected error creating stream: %v", err)
2148 }
2149 if _, err := str.Recv(); status.Code(err) != codes.FailedPrecondition {
2150 t.Fatalf("FullDuplexCall Recv() = _, %v, want _, %s", err, codes.FailedPrecondition)
2151 }
2152 }
2153
2154 func (s) TestEmptyUnaryWithUserAgent(t *testing.T) {
2155 for _, e := range listTestEnv() {
2156 testEmptyUnaryWithUserAgent(t, e)
2157 }
2158 }
2159
2160 func testEmptyUnaryWithUserAgent(t *testing.T, e env) {
2161 te := newTest(t, e)
2162 te.userAgent = testAppUA
2163 te.startServer(&testServer{security: e.security})
2164 defer te.tearDown()
2165
2166 cc := te.clientConn()
2167 tc := testgrpc.NewTestServiceClient(cc)
2168 var header metadata.MD
2169 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
2170 defer cancel()
2171 reply, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.Header(&header))
2172 if err != nil || !proto.Equal(&testpb.Empty{}, reply) {
2173 t.Fatalf("TestService/EmptyCall(_, _) = %v, %v, want %v, <nil>", reply, err, &testpb.Empty{})
2174 }
2175 if v, ok := header["ua"]; !ok || !strings.HasPrefix(v[0], testAppUA) {
2176 t.Fatalf("header[\"ua\"] = %q, %t, want string with prefix %q, true", v, ok, testAppUA)
2177 }
2178
2179 te.srv.Stop()
2180 }
2181
2182 func (s) TestFailedEmptyUnary(t *testing.T) {
2183 for _, e := range listTestEnv() {
2184 if e.name == "handler-tls" {
2185
2186
2187 continue
2188 }
2189 testFailedEmptyUnary(t, e)
2190 }
2191 }
2192
2193 func testFailedEmptyUnary(t *testing.T, e env) {
2194 te := newTest(t, e)
2195 te.userAgent = failAppUA
2196 te.startServer(&testServer{security: e.security})
2197 defer te.tearDown()
2198 tc := testgrpc.NewTestServiceClient(te.clientConn())
2199
2200 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
2201 defer cancel()
2202 ctx = metadata.NewOutgoingContext(ctx, testMetadata)
2203 wantErr := detailedError
2204 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); !testutils.StatusErrEqual(err, wantErr) {
2205 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %v", err, wantErr)
2206 }
2207 }
2208
2209 func (s) TestLargeUnary(t *testing.T) {
2210 for _, e := range listTestEnv() {
2211 testLargeUnary(t, e)
2212 }
2213 }
2214
2215 func testLargeUnary(t *testing.T, e env) {
2216 te := newTest(t, e)
2217 te.startServer(&testServer{security: e.security})
2218 defer te.tearDown()
2219 tc := testgrpc.NewTestServiceClient(te.clientConn())
2220
2221 const argSize = 271828
2222 const respSize = 314159
2223
2224 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
2225 if err != nil {
2226 t.Fatal(err)
2227 }
2228
2229 req := &testpb.SimpleRequest{
2230 ResponseType: testpb.PayloadType_COMPRESSABLE,
2231 ResponseSize: respSize,
2232 Payload: payload,
2233 }
2234
2235 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
2236 defer cancel()
2237 reply, err := tc.UnaryCall(ctx, req)
2238 if err != nil {
2239 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, <nil>", err)
2240 }
2241 pt := reply.GetPayload().GetType()
2242 ps := len(reply.GetPayload().GetBody())
2243 if pt != testpb.PayloadType_COMPRESSABLE || ps != respSize {
2244 t.Fatalf("Got the reply with type %d len %d; want %d, %d", pt, ps, testpb.PayloadType_COMPRESSABLE, respSize)
2245 }
2246 }
2247
2248
2249 func (s) TestExceedMsgLimit(t *testing.T) {
2250 for _, e := range listTestEnv() {
2251 testExceedMsgLimit(t, e)
2252 }
2253 }
2254
2255 func testExceedMsgLimit(t *testing.T, e env) {
2256 te := newTest(t, e)
2257 maxMsgSize := 1024
2258 te.maxServerMsgSize, te.maxClientMsgSize = newInt(maxMsgSize), newInt(maxMsgSize)
2259 te.startServer(&testServer{security: e.security})
2260 defer te.tearDown()
2261 tc := testgrpc.NewTestServiceClient(te.clientConn())
2262
2263 largeSize := int32(maxMsgSize + 1)
2264 const smallSize = 1
2265
2266 largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize)
2267 if err != nil {
2268 t.Fatal(err)
2269 }
2270 smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize)
2271 if err != nil {
2272 t.Fatal(err)
2273 }
2274
2275
2276 req := &testpb.SimpleRequest{
2277 ResponseType: testpb.PayloadType_COMPRESSABLE,
2278 ResponseSize: smallSize,
2279 Payload: largePayload,
2280 }
2281
2282 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
2283 defer cancel()
2284 if _, err := tc.UnaryCall(ctx, req); err == nil || status.Code(err) != codes.ResourceExhausted {
2285 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
2286 }
2287
2288 req.ResponseSize = largeSize
2289 req.Payload = smallPayload
2290 if _, err := tc.UnaryCall(ctx, req); err == nil || status.Code(err) != codes.ResourceExhausted {
2291 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
2292 }
2293
2294
2295 stream, err := tc.FullDuplexCall(te.ctx)
2296 if err != nil {
2297 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
2298 }
2299 respParam := []*testpb.ResponseParameters{
2300 {
2301 Size: 1,
2302 },
2303 }
2304
2305 sreq := &testpb.StreamingOutputCallRequest{
2306 ResponseType: testpb.PayloadType_COMPRESSABLE,
2307 ResponseParameters: respParam,
2308 Payload: largePayload,
2309 }
2310 if err := stream.Send(sreq); err != nil {
2311 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
2312 }
2313 if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
2314 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
2315 }
2316
2317
2318 stream, err = tc.FullDuplexCall(te.ctx)
2319 if err != nil {
2320 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
2321 }
2322 respParam[0].Size = largeSize
2323 sreq.Payload = smallPayload
2324 if err := stream.Send(sreq); err != nil {
2325 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
2326 }
2327 if _, err := stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
2328 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
2329 }
2330 }
2331
2332 func (s) TestPeerClientSide(t *testing.T) {
2333 for _, e := range listTestEnv() {
2334 testPeerClientSide(t, e)
2335 }
2336 }
2337
2338 func testPeerClientSide(t *testing.T, e env) {
2339 te := newTest(t, e)
2340 te.userAgent = testAppUA
2341 te.startServer(&testServer{security: e.security})
2342 defer te.tearDown()
2343 tc := testgrpc.NewTestServiceClient(te.clientConn())
2344 peer := new(peer.Peer)
2345 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
2346 defer cancel()
2347 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil {
2348 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
2349 }
2350 pa := peer.Addr.String()
2351 if e.network == "unix" {
2352 if pa != te.srvAddr {
2353 t.Fatalf("peer.Addr = %v, want %v", pa, te.srvAddr)
2354 }
2355 return
2356 }
2357 _, pp, err := net.SplitHostPort(pa)
2358 if err != nil {
2359 t.Fatalf("Failed to parse address from peer.")
2360 }
2361 _, sp, err := net.SplitHostPort(te.srvAddr)
2362 if err != nil {
2363 t.Fatalf("Failed to parse address of test server.")
2364 }
2365 if pp != sp {
2366 t.Fatalf("peer.Addr = localhost:%v, want localhost:%v", pp, sp)
2367 }
2368 }
2369
2370
2371
2372
2373 func (s) TestPeerNegative(t *testing.T) {
2374 for _, e := range listTestEnv() {
2375 testPeerNegative(t, e)
2376 }
2377 }
2378
2379 func testPeerNegative(t *testing.T, e env) {
2380 te := newTest(t, e)
2381 te.startServer(&testServer{security: e.security})
2382 defer te.tearDown()
2383
2384 cc := te.clientConn()
2385 tc := testgrpc.NewTestServiceClient(cc)
2386 peer := new(peer.Peer)
2387 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
2388 cancel()
2389 tc.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer))
2390 }
2391
2392 func (s) TestPeerFailedRPC(t *testing.T) {
2393 for _, e := range listTestEnv() {
2394 testPeerFailedRPC(t, e)
2395 }
2396 }
2397
2398 func testPeerFailedRPC(t *testing.T, e env) {
2399 te := newTest(t, e)
2400 te.maxServerReceiveMsgSize = newInt(1 * 1024)
2401 te.startServer(&testServer{security: e.security})
2402
2403 defer te.tearDown()
2404 tc := testgrpc.NewTestServiceClient(te.clientConn())
2405
2406 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
2407 defer cancel()
2408
2409 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
2410 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
2411 }
2412
2413
2414 const largeSize = 5 * 1024
2415 largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize)
2416 if err != nil {
2417 t.Fatal(err)
2418 }
2419 req := &testpb.SimpleRequest{
2420 ResponseType: testpb.PayloadType_COMPRESSABLE,
2421 Payload: largePayload,
2422 }
2423
2424 peer := new(peer.Peer)
2425 if _, err := tc.UnaryCall(ctx, req, grpc.Peer(peer)); err == nil || status.Code(err) != codes.ResourceExhausted {
2426 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted)
2427 } else {
2428 pa := peer.Addr.String()
2429 if e.network == "unix" {
2430 if pa != te.srvAddr {
2431 t.Fatalf("peer.Addr = %v, want %v", pa, te.srvAddr)
2432 }
2433 return
2434 }
2435 _, pp, err := net.SplitHostPort(pa)
2436 if err != nil {
2437 t.Fatalf("Failed to parse address from peer.")
2438 }
2439 _, sp, err := net.SplitHostPort(te.srvAddr)
2440 if err != nil {
2441 t.Fatalf("Failed to parse address of test server.")
2442 }
2443 if pp != sp {
2444 t.Fatalf("peer.Addr = localhost:%v, want localhost:%v", pp, sp)
2445 }
2446 }
2447 }
2448
2449 func (s) TestMetadataUnaryRPC(t *testing.T) {
2450 for _, e := range listTestEnv() {
2451 testMetadataUnaryRPC(t, e)
2452 }
2453 }
2454
2455 func testMetadataUnaryRPC(t *testing.T, e env) {
2456 te := newTest(t, e)
2457 te.startServer(&testServer{security: e.security})
2458 defer te.tearDown()
2459 tc := testgrpc.NewTestServiceClient(te.clientConn())
2460
2461 const argSize = 2718
2462 const respSize = 314
2463
2464 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
2465 if err != nil {
2466 t.Fatal(err)
2467 }
2468
2469 req := &testpb.SimpleRequest{
2470 ResponseType: testpb.PayloadType_COMPRESSABLE,
2471 ResponseSize: respSize,
2472 Payload: payload,
2473 }
2474 var header, trailer metadata.MD
2475 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
2476 defer cancel()
2477 ctx = metadata.NewOutgoingContext(ctx, testMetadata)
2478 if _, err := tc.UnaryCall(ctx, req, grpc.Header(&header), grpc.Trailer(&trailer)); err != nil {
2479 t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, <nil>", ctx, err)
2480 }
2481
2482 if header != nil {
2483 delete(header, "trailer")
2484 delete(header, "date")
2485 delete(header, "user-agent")
2486 delete(header, "content-type")
2487 delete(header, "grpc-accept-encoding")
2488 }
2489 if !reflect.DeepEqual(header, testMetadata) {
2490 t.Fatalf("Received header metadata %v, want %v", header, testMetadata)
2491 }
2492 if !reflect.DeepEqual(trailer, testTrailerMetadata) {
2493 t.Fatalf("Received trailer metadata %v, want %v", trailer, testTrailerMetadata)
2494 }
2495 }
2496
2497 func (s) TestMetadataOrderUnaryRPC(t *testing.T) {
2498 for _, e := range listTestEnv() {
2499 testMetadataOrderUnaryRPC(t, e)
2500 }
2501 }
2502
2503 func testMetadataOrderUnaryRPC(t *testing.T, e env) {
2504 te := newTest(t, e)
2505 te.startServer(&testServer{security: e.security})
2506 defer te.tearDown()
2507 tc := testgrpc.NewTestServiceClient(te.clientConn())
2508
2509 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
2510 defer cancel()
2511 ctx = metadata.NewOutgoingContext(ctx, testMetadata)
2512 ctx = metadata.AppendToOutgoingContext(ctx, "key1", "value2")
2513 ctx = metadata.AppendToOutgoingContext(ctx, "key1", "value3")
2514
2515
2516 newMetadata := metadata.Join(testMetadata, metadata.Pairs("key1", "value2", "key1", "value3"))
2517
2518 var header metadata.MD
2519 if _, err := tc.UnaryCall(ctx, &testpb.SimpleRequest{}, grpc.Header(&header)); err != nil {
2520 t.Fatal(err)
2521 }
2522
2523
2524 if header != nil {
2525 delete(header, "trailer")
2526 delete(header, "date")
2527 delete(header, "user-agent")
2528 delete(header, "content-type")
2529 delete(header, "grpc-accept-encoding")
2530 }
2531
2532 if !reflect.DeepEqual(header, newMetadata) {
2533 t.Fatalf("Received header metadata %v, want %v", header, newMetadata)
2534 }
2535 }
2536
2537 func (s) TestMultipleSetTrailerUnaryRPC(t *testing.T) {
2538 for _, e := range listTestEnv() {
2539 testMultipleSetTrailerUnaryRPC(t, e)
2540 }
2541 }
2542
2543 func testMultipleSetTrailerUnaryRPC(t *testing.T, e env) {
2544 te := newTest(t, e)
2545 te.startServer(&testServer{security: e.security, multipleSetTrailer: true})
2546 defer te.tearDown()
2547 tc := testgrpc.NewTestServiceClient(te.clientConn())
2548
2549 const (
2550 argSize = 1
2551 respSize = 1
2552 )
2553 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
2554 if err != nil {
2555 t.Fatal(err)
2556 }
2557
2558 req := &testpb.SimpleRequest{
2559 ResponseType: testpb.PayloadType_COMPRESSABLE,
2560 ResponseSize: respSize,
2561 Payload: payload,
2562 }
2563 var trailer metadata.MD
2564 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
2565 defer cancel()
2566 ctx = metadata.NewOutgoingContext(ctx, testMetadata)
2567 if _, err := tc.UnaryCall(ctx, req, grpc.Trailer(&trailer), grpc.WaitForReady(true)); err != nil {
2568 t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, <nil>", ctx, err)
2569 }
2570 expectedTrailer := metadata.Join(testTrailerMetadata, testTrailerMetadata2)
2571 if !reflect.DeepEqual(trailer, expectedTrailer) {
2572 t.Fatalf("Received trailer metadata %v, want %v", trailer, expectedTrailer)
2573 }
2574 }
2575
2576 func (s) TestMultipleSetTrailerStreamingRPC(t *testing.T) {
2577 for _, e := range listTestEnv() {
2578 testMultipleSetTrailerStreamingRPC(t, e)
2579 }
2580 }
2581
2582 func testMultipleSetTrailerStreamingRPC(t *testing.T, e env) {
2583 te := newTest(t, e)
2584 te.startServer(&testServer{security: e.security, multipleSetTrailer: true})
2585 defer te.tearDown()
2586 tc := testgrpc.NewTestServiceClient(te.clientConn())
2587
2588 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
2589 defer cancel()
2590 ctx = metadata.NewOutgoingContext(ctx, testMetadata)
2591 stream, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true))
2592 if err != nil {
2593 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
2594 }
2595 if err := stream.CloseSend(); err != nil {
2596 t.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil)
2597 }
2598 if _, err := stream.Recv(); err != io.EOF {
2599 t.Fatalf("%v failed to complele the FullDuplexCall: %v", stream, err)
2600 }
2601
2602 trailer := stream.Trailer()
2603 expectedTrailer := metadata.Join(testTrailerMetadata, testTrailerMetadata2)
2604 if !reflect.DeepEqual(trailer, expectedTrailer) {
2605 t.Fatalf("Received trailer metadata %v, want %v", trailer, expectedTrailer)
2606 }
2607 }
2608
2609 func (s) TestSetAndSendHeaderUnaryRPC(t *testing.T) {
2610 for _, e := range listTestEnv() {
2611 if e.name == "handler-tls" {
2612 continue
2613 }
2614 testSetAndSendHeaderUnaryRPC(t, e)
2615 }
2616 }
2617
2618
2619 func testSetAndSendHeaderUnaryRPC(t *testing.T, e env) {
2620 te := newTest(t, e)
2621 te.startServer(&testServer{security: e.security, setAndSendHeader: true})
2622 defer te.tearDown()
2623 tc := testgrpc.NewTestServiceClient(te.clientConn())
2624
2625 const (
2626 argSize = 1
2627 respSize = 1
2628 )
2629 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
2630 if err != nil {
2631 t.Fatal(err)
2632 }
2633
2634 req := &testpb.SimpleRequest{
2635 ResponseType: testpb.PayloadType_COMPRESSABLE,
2636 ResponseSize: respSize,
2637 Payload: payload,
2638 }
2639 var header metadata.MD
2640 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
2641 defer cancel()
2642 ctx = metadata.NewOutgoingContext(ctx, testMetadata)
2643 if _, err := tc.UnaryCall(ctx, req, grpc.Header(&header), grpc.WaitForReady(true)); err != nil {
2644 t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, <nil>", ctx, err)
2645 }
2646 delete(header, "user-agent")
2647 delete(header, "content-type")
2648 delete(header, "grpc-accept-encoding")
2649
2650 expectedHeader := metadata.Join(testMetadata, testMetadata2)
2651 if !reflect.DeepEqual(header, expectedHeader) {
2652 t.Fatalf("Received header metadata %v, want %v", header, expectedHeader)
2653 }
2654 }
2655
2656 func (s) TestMultipleSetHeaderUnaryRPC(t *testing.T) {
2657 for _, e := range listTestEnv() {
2658 if e.name == "handler-tls" {
2659 continue
2660 }
2661 testMultipleSetHeaderUnaryRPC(t, e)
2662 }
2663 }
2664
2665
2666 func testMultipleSetHeaderUnaryRPC(t *testing.T, e env) {
2667 te := newTest(t, e)
2668 te.startServer(&testServer{security: e.security, setHeaderOnly: true})
2669 defer te.tearDown()
2670 tc := testgrpc.NewTestServiceClient(te.clientConn())
2671
2672 const (
2673 argSize = 1
2674 respSize = 1
2675 )
2676 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
2677 if err != nil {
2678 t.Fatal(err)
2679 }
2680
2681 req := &testpb.SimpleRequest{
2682 ResponseType: testpb.PayloadType_COMPRESSABLE,
2683 ResponseSize: respSize,
2684 Payload: payload,
2685 }
2686
2687 var header metadata.MD
2688 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
2689 defer cancel()
2690 ctx = metadata.NewOutgoingContext(ctx, testMetadata)
2691 if _, err := tc.UnaryCall(ctx, req, grpc.Header(&header), grpc.WaitForReady(true)); err != nil {
2692 t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, <nil>", ctx, err)
2693 }
2694 delete(header, "user-agent")
2695 delete(header, "content-type")
2696 delete(header, "grpc-accept-encoding")
2697 expectedHeader := metadata.Join(testMetadata, testMetadata2)
2698 if !reflect.DeepEqual(header, expectedHeader) {
2699 t.Fatalf("Received header metadata %v, want %v", header, expectedHeader)
2700 }
2701 }
2702
2703 func (s) TestMultipleSetHeaderUnaryRPCError(t *testing.T) {
2704 for _, e := range listTestEnv() {
2705 if e.name == "handler-tls" {
2706 continue
2707 }
2708 testMultipleSetHeaderUnaryRPCError(t, e)
2709 }
2710 }
2711
2712
2713 func testMultipleSetHeaderUnaryRPCError(t *testing.T, e env) {
2714 te := newTest(t, e)
2715 te.startServer(&testServer{security: e.security, setHeaderOnly: true})
2716 defer te.tearDown()
2717 tc := testgrpc.NewTestServiceClient(te.clientConn())
2718
2719 const (
2720 argSize = 1
2721 respSize = -1
2722 )
2723 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
2724 if err != nil {
2725 t.Fatal(err)
2726 }
2727
2728 req := &testpb.SimpleRequest{
2729 ResponseType: testpb.PayloadType_COMPRESSABLE,
2730 ResponseSize: respSize,
2731 Payload: payload,
2732 }
2733 var header metadata.MD
2734 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
2735 defer cancel()
2736 ctx = metadata.NewOutgoingContext(ctx, testMetadata)
2737 if _, err := tc.UnaryCall(ctx, req, grpc.Header(&header), grpc.WaitForReady(true)); err == nil {
2738 t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, <non-nil>", ctx, err)
2739 }
2740 delete(header, "user-agent")
2741 delete(header, "content-type")
2742 delete(header, "grpc-accept-encoding")
2743 expectedHeader := metadata.Join(testMetadata, testMetadata2)
2744 if !reflect.DeepEqual(header, expectedHeader) {
2745 t.Fatalf("Received header metadata %v, want %v", header, expectedHeader)
2746 }
2747 }
2748
2749 func (s) TestSetAndSendHeaderStreamingRPC(t *testing.T) {
2750 for _, e := range listTestEnv() {
2751 if e.name == "handler-tls" {
2752 continue
2753 }
2754 testSetAndSendHeaderStreamingRPC(t, e)
2755 }
2756 }
2757
2758
2759 func testSetAndSendHeaderStreamingRPC(t *testing.T, e env) {
2760 te := newTest(t, e)
2761 te.startServer(&testServer{security: e.security, setAndSendHeader: true})
2762 defer te.tearDown()
2763 tc := testgrpc.NewTestServiceClient(te.clientConn())
2764
2765 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
2766 defer cancel()
2767 ctx = metadata.NewOutgoingContext(ctx, testMetadata)
2768 stream, err := tc.FullDuplexCall(ctx)
2769 if err != nil {
2770 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
2771 }
2772 if err := stream.CloseSend(); err != nil {
2773 t.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil)
2774 }
2775 if _, err := stream.Recv(); err != io.EOF {
2776 t.Fatalf("%v failed to complele the FullDuplexCall: %v", stream, err)
2777 }
2778
2779 header, err := stream.Header()
2780 if err != nil {
2781 t.Fatalf("%v.Header() = _, %v, want _, <nil>", stream, err)
2782 }
2783 delete(header, "user-agent")
2784 delete(header, "content-type")
2785 delete(header, "grpc-accept-encoding")
2786 expectedHeader := metadata.Join(testMetadata, testMetadata2)
2787 if !reflect.DeepEqual(header, expectedHeader) {
2788 t.Fatalf("Received header metadata %v, want %v", header, expectedHeader)
2789 }
2790 }
2791
2792 func (s) TestMultipleSetHeaderStreamingRPC(t *testing.T) {
2793 for _, e := range listTestEnv() {
2794 if e.name == "handler-tls" {
2795 continue
2796 }
2797 testMultipleSetHeaderStreamingRPC(t, e)
2798 }
2799 }
2800
2801
2802 func testMultipleSetHeaderStreamingRPC(t *testing.T, e env) {
2803 te := newTest(t, e)
2804 te.startServer(&testServer{security: e.security, setHeaderOnly: true})
2805 defer te.tearDown()
2806 tc := testgrpc.NewTestServiceClient(te.clientConn())
2807
2808 const (
2809 argSize = 1
2810 respSize = 1
2811 )
2812 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
2813 defer cancel()
2814 ctx = metadata.NewOutgoingContext(ctx, testMetadata)
2815 stream, err := tc.FullDuplexCall(ctx)
2816 if err != nil {
2817 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
2818 }
2819
2820 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
2821 if err != nil {
2822 t.Fatal(err)
2823 }
2824
2825 req := &testpb.StreamingOutputCallRequest{
2826 ResponseType: testpb.PayloadType_COMPRESSABLE,
2827 ResponseParameters: []*testpb.ResponseParameters{
2828 {Size: respSize},
2829 },
2830 Payload: payload,
2831 }
2832 if err := stream.Send(req); err != nil {
2833 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
2834 }
2835 if _, err := stream.Recv(); err != nil {
2836 t.Fatalf("%v.Recv() = %v, want <nil>", stream, err)
2837 }
2838 if err := stream.CloseSend(); err != nil {
2839 t.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil)
2840 }
2841 if _, err := stream.Recv(); err != io.EOF {
2842 t.Fatalf("%v failed to complele the FullDuplexCall: %v", stream, err)
2843 }
2844
2845 header, err := stream.Header()
2846 if err != nil {
2847 t.Fatalf("%v.Header() = _, %v, want _, <nil>", stream, err)
2848 }
2849 delete(header, "user-agent")
2850 delete(header, "content-type")
2851 delete(header, "grpc-accept-encoding")
2852 expectedHeader := metadata.Join(testMetadata, testMetadata2)
2853 if !reflect.DeepEqual(header, expectedHeader) {
2854 t.Fatalf("Received header metadata %v, want %v", header, expectedHeader)
2855 }
2856
2857 }
2858
2859 func (s) TestMultipleSetHeaderStreamingRPCError(t *testing.T) {
2860 for _, e := range listTestEnv() {
2861 if e.name == "handler-tls" {
2862 continue
2863 }
2864 testMultipleSetHeaderStreamingRPCError(t, e)
2865 }
2866 }
2867
2868
2869 func testMultipleSetHeaderStreamingRPCError(t *testing.T, e env) {
2870 te := newTest(t, e)
2871 te.startServer(&testServer{security: e.security, setHeaderOnly: true})
2872 defer te.tearDown()
2873 tc := testgrpc.NewTestServiceClient(te.clientConn())
2874
2875 const (
2876 argSize = 1
2877 respSize = -1
2878 )
2879 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
2880 defer cancel()
2881 ctx = metadata.NewOutgoingContext(ctx, testMetadata)
2882 stream, err := tc.FullDuplexCall(ctx)
2883 if err != nil {
2884 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
2885 }
2886
2887 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
2888 if err != nil {
2889 t.Fatal(err)
2890 }
2891
2892 req := &testpb.StreamingOutputCallRequest{
2893 ResponseType: testpb.PayloadType_COMPRESSABLE,
2894 ResponseParameters: []*testpb.ResponseParameters{
2895 {Size: respSize},
2896 },
2897 Payload: payload,
2898 }
2899 if err := stream.Send(req); err != nil {
2900 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
2901 }
2902 if _, err := stream.Recv(); err == nil {
2903 t.Fatalf("%v.Recv() = %v, want <non-nil>", stream, err)
2904 }
2905
2906 header, err := stream.Header()
2907 if err != nil {
2908 t.Fatalf("%v.Header() = _, %v, want _, <nil>", stream, err)
2909 }
2910 delete(header, "user-agent")
2911 delete(header, "content-type")
2912 delete(header, "grpc-accept-encoding")
2913 expectedHeader := metadata.Join(testMetadata, testMetadata2)
2914 if !reflect.DeepEqual(header, expectedHeader) {
2915 t.Fatalf("Received header metadata %v, want %v", header, expectedHeader)
2916 }
2917 if err := stream.CloseSend(); err != nil {
2918 t.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil)
2919 }
2920 }
2921
2922
2923
2924 func (s) TestMalformedHTTP2Metadata(t *testing.T) {
2925 for _, e := range listTestEnv() {
2926 if e.name == "handler-tls" {
2927
2928
2929 continue
2930 }
2931 testMalformedHTTP2Metadata(t, e)
2932 }
2933 }
2934
2935 func testMalformedHTTP2Metadata(t *testing.T, e env) {
2936 te := newTest(t, e)
2937 te.startServer(&testServer{security: e.security})
2938 defer te.tearDown()
2939 tc := testgrpc.NewTestServiceClient(te.clientConn())
2940
2941 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 2718)
2942 if err != nil {
2943 t.Fatal(err)
2944 }
2945
2946 req := &testpb.SimpleRequest{
2947 ResponseType: testpb.PayloadType_COMPRESSABLE,
2948 ResponseSize: 314,
2949 Payload: payload,
2950 }
2951 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
2952 defer cancel()
2953 ctx = metadata.NewOutgoingContext(ctx, malformedHTTP2Metadata)
2954 if _, err := tc.UnaryCall(ctx, req); status.Code(err) != codes.Internal {
2955 t.Fatalf("TestService.UnaryCall(%v, _) = _, %v; want _, %s", ctx, err, codes.Internal)
2956 }
2957 }
2958
2959
2960
2961 func (s) TestTransparentRetry(t *testing.T) {
2962 testCases := []struct {
2963 failFast bool
2964 errCode codes.Code
2965 }{{
2966
2967 }, {
2968
2969 }, {
2970
2971 errCode: codes.Unavailable,
2972 }, {
2973
2974 failFast: true,
2975 }, {
2976
2977 failFast: true,
2978 }, {
2979
2980 failFast: true,
2981 errCode: codes.Unavailable,
2982 }}
2983
2984 lis, err := net.Listen("tcp", "localhost:0")
2985 if err != nil {
2986 t.Fatalf("Failed to listen. Err: %v", err)
2987 }
2988 defer lis.Close()
2989 server := &httpServer{
2990 responses: []httpServerResponse{{
2991 trailers: [][]string{{
2992 ":status", "200",
2993 "content-type", "application/grpc",
2994 "grpc-status", "0",
2995 }},
2996 }},
2997 refuseStream: func(i uint32) bool {
2998 switch i {
2999 case 1, 5, 11, 15:
3000 return false
3001 }
3002 return true
3003 },
3004 }
3005 server.start(t, lis)
3006 cc, err := grpc.Dial(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
3007 if err != nil {
3008 t.Fatalf("failed to dial due to err: %v", err)
3009 }
3010 defer cc.Close()
3011
3012 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
3013 defer cancel()
3014
3015 client := testgrpc.NewTestServiceClient(cc)
3016
3017 for i, tc := range testCases {
3018 stream, err := client.FullDuplexCall(ctx)
3019 if err != nil {
3020 t.Fatalf("error creating stream due to err: %v", err)
3021 }
3022 code := func(err error) codes.Code {
3023 if err == io.EOF {
3024 return codes.OK
3025 }
3026 return status.Code(err)
3027 }
3028 if _, err := stream.Recv(); code(err) != tc.errCode {
3029 t.Fatalf("%v: stream.Recv() = _, %v, want error code: %v", i, err, tc.errCode)
3030 }
3031
3032 }
3033 }
3034
3035 func (s) TestCancel(t *testing.T) {
3036 for _, e := range listTestEnv() {
3037 t.Run(e.name, func(t *testing.T) {
3038 testCancel(t, e)
3039 })
3040 }
3041 }
3042
3043 func testCancel(t *testing.T, e env) {
3044 te := newTest(t, e)
3045 te.declareLogNoise("grpc: the client connection is closing; please retry")
3046 te.startServer(&testServer{security: e.security, unaryCallSleepTime: time.Second})
3047 defer te.tearDown()
3048
3049 cc := te.clientConn()
3050 tc := testgrpc.NewTestServiceClient(cc)
3051
3052 const argSize = 2718
3053 const respSize = 314
3054
3055 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
3056 if err != nil {
3057 t.Fatal(err)
3058 }
3059
3060 req := &testpb.SimpleRequest{
3061 ResponseType: testpb.PayloadType_COMPRESSABLE,
3062 ResponseSize: respSize,
3063 Payload: payload,
3064 }
3065 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
3066 time.AfterFunc(1*time.Millisecond, cancel)
3067 if r, err := tc.UnaryCall(ctx, req); status.Code(err) != codes.Canceled {
3068 t.Fatalf("TestService/UnaryCall(_, _) = %v, %v; want _, error code: %s", r, err, codes.Canceled)
3069 }
3070 awaitNewConnLogOutput()
3071 }
3072
3073 func (s) TestCancelNoIO(t *testing.T) {
3074 for _, e := range listTestEnv() {
3075 testCancelNoIO(t, e)
3076 }
3077 }
3078
3079 func testCancelNoIO(t *testing.T, e env) {
3080 te := newTest(t, e)
3081 te.declareLogNoise("http2Client.notifyError got notified that the client transport was broken")
3082 te.maxStream = 1
3083 te.startServer(&testServer{security: e.security})
3084 defer te.tearDown()
3085
3086 cc := te.clientConn()
3087 tc := testgrpc.NewTestServiceClient(cc)
3088
3089
3090
3091
3092 ctx, cancelFirst := context.WithTimeout(context.Background(), defaultTestTimeout)
3093 _, err := tc.StreamingInputCall(ctx)
3094 if err != nil {
3095 t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, <nil>", tc, err)
3096 }
3097
3098
3099
3100
3101
3102
3103
3104 for {
3105 ctx, cancelSecond := context.WithTimeout(context.Background(), defaultTestShortTimeout)
3106 _, err := tc.StreamingInputCall(ctx)
3107 cancelSecond()
3108 if err == nil {
3109 continue
3110 }
3111 if status.Code(err) == codes.DeadlineExceeded {
3112 break
3113 }
3114 t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, %s", tc, err, codes.DeadlineExceeded)
3115 }
3116
3117
3118
3119 time.Sleep(50 * time.Millisecond)
3120
3121 go func() {
3122 time.Sleep(50 * time.Millisecond)
3123 cancelFirst()
3124 }()
3125
3126
3127 ctx, cancelThird := context.WithTimeout(context.Background(), defaultTestShortTimeout)
3128 if _, err := tc.StreamingInputCall(ctx); err != nil {
3129 t.Errorf("%v.StreamingInputCall(_) = _, %v, want _, <nil>", tc, err)
3130 }
3131 cancelThird()
3132 }
3133
3134
3135
3136 var (
3137 reqSizes = []int{27182, 8, 1828, 45904}
3138 respSizes = []int{31415, 9, 2653, 58979}
3139 )
3140
3141 func (s) TestNoService(t *testing.T) {
3142 for _, e := range listTestEnv() {
3143 testNoService(t, e)
3144 }
3145 }
3146
3147 func testNoService(t *testing.T, e env) {
3148 te := newTest(t, e)
3149 te.startServer(nil)
3150 defer te.tearDown()
3151
3152 cc := te.clientConn()
3153 tc := testgrpc.NewTestServiceClient(cc)
3154
3155 stream, err := tc.FullDuplexCall(te.ctx, grpc.WaitForReady(true))
3156 if err != nil {
3157 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
3158 }
3159 if _, err := stream.Recv(); status.Code(err) != codes.Unimplemented {
3160 t.Fatalf("stream.Recv() = _, %v, want _, error code %s", err, codes.Unimplemented)
3161 }
3162 }
3163
3164 func (s) TestPingPong(t *testing.T) {
3165 for _, e := range listTestEnv() {
3166 testPingPong(t, e)
3167 }
3168 }
3169
3170 func testPingPong(t *testing.T, e env) {
3171 te := newTest(t, e)
3172 te.startServer(&testServer{security: e.security})
3173 defer te.tearDown()
3174 tc := testgrpc.NewTestServiceClient(te.clientConn())
3175
3176 stream, err := tc.FullDuplexCall(te.ctx)
3177 if err != nil {
3178 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
3179 }
3180 var index int
3181 for index < len(reqSizes) {
3182 respParam := []*testpb.ResponseParameters{
3183 {
3184 Size: int32(respSizes[index]),
3185 },
3186 }
3187
3188 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(reqSizes[index]))
3189 if err != nil {
3190 t.Fatal(err)
3191 }
3192
3193 req := &testpb.StreamingOutputCallRequest{
3194 ResponseType: testpb.PayloadType_COMPRESSABLE,
3195 ResponseParameters: respParam,
3196 Payload: payload,
3197 }
3198 if err := stream.Send(req); err != nil {
3199 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
3200 }
3201 reply, err := stream.Recv()
3202 if err != nil {
3203 t.Fatalf("%v.Recv() = %v, want <nil>", stream, err)
3204 }
3205 pt := reply.GetPayload().GetType()
3206 if pt != testpb.PayloadType_COMPRESSABLE {
3207 t.Fatalf("Got the reply of type %d, want %d", pt, testpb.PayloadType_COMPRESSABLE)
3208 }
3209 size := len(reply.GetPayload().GetBody())
3210 if size != int(respSizes[index]) {
3211 t.Fatalf("Got reply body of length %d, want %d", size, respSizes[index])
3212 }
3213 index++
3214 }
3215 if err := stream.CloseSend(); err != nil {
3216 t.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil)
3217 }
3218 if _, err := stream.Recv(); err != io.EOF {
3219 t.Fatalf("%v failed to complele the ping pong test: %v", stream, err)
3220 }
3221 }
3222
3223 func (s) TestMetadataStreamingRPC(t *testing.T) {
3224 for _, e := range listTestEnv() {
3225 testMetadataStreamingRPC(t, e)
3226 }
3227 }
3228
3229 func testMetadataStreamingRPC(t *testing.T, e env) {
3230 te := newTest(t, e)
3231 te.startServer(&testServer{security: e.security})
3232 defer te.tearDown()
3233 tc := testgrpc.NewTestServiceClient(te.clientConn())
3234
3235 ctx := metadata.NewOutgoingContext(te.ctx, testMetadata)
3236 stream, err := tc.FullDuplexCall(ctx)
3237 if err != nil {
3238 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
3239 }
3240 go func() {
3241 headerMD, err := stream.Header()
3242 if e.security == "tls" {
3243 delete(headerMD, "transport_security_type")
3244 }
3245 delete(headerMD, "trailer")
3246 delete(headerMD, "user-agent")
3247 delete(headerMD, "content-type")
3248 delete(headerMD, "grpc-accept-encoding")
3249 if err != nil || !reflect.DeepEqual(testMetadata, headerMD) {
3250 t.Errorf("#1 %v.Header() = %v, %v, want %v, <nil>", stream, headerMD, err, testMetadata)
3251 }
3252
3253 headerMD, err = stream.Header()
3254 delete(headerMD, "trailer")
3255 delete(headerMD, "user-agent")
3256 delete(headerMD, "content-type")
3257 delete(headerMD, "grpc-accept-encoding")
3258 if err != nil || !reflect.DeepEqual(testMetadata, headerMD) {
3259 t.Errorf("#2 %v.Header() = %v, %v, want %v, <nil>", stream, headerMD, err, testMetadata)
3260 }
3261 err = func() error {
3262 for index := 0; index < len(reqSizes); index++ {
3263 respParam := []*testpb.ResponseParameters{
3264 {
3265 Size: int32(respSizes[index]),
3266 },
3267 }
3268
3269 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(reqSizes[index]))
3270 if err != nil {
3271 return err
3272 }
3273
3274 req := &testpb.StreamingOutputCallRequest{
3275 ResponseType: testpb.PayloadType_COMPRESSABLE,
3276 ResponseParameters: respParam,
3277 Payload: payload,
3278 }
3279 if err := stream.Send(req); err != nil {
3280 return fmt.Errorf("%v.Send(%v) = %v, want <nil>", stream, req, err)
3281 }
3282 }
3283 return nil
3284 }()
3285
3286 stream.CloseSend()
3287 if err != nil {
3288 t.Error(err)
3289 }
3290 }()
3291 for {
3292 if _, err := stream.Recv(); err != nil {
3293 break
3294 }
3295 }
3296 trailerMD := stream.Trailer()
3297 if !reflect.DeepEqual(testTrailerMetadata, trailerMD) {
3298 t.Fatalf("%v.Trailer() = %v, want %v", stream, trailerMD, testTrailerMetadata)
3299 }
3300 }
3301
3302 func (s) TestServerStreaming(t *testing.T) {
3303 for _, e := range listTestEnv() {
3304 testServerStreaming(t, e)
3305 }
3306 }
3307
3308 func testServerStreaming(t *testing.T, e env) {
3309 te := newTest(t, e)
3310 te.startServer(&testServer{security: e.security})
3311 defer te.tearDown()
3312 tc := testgrpc.NewTestServiceClient(te.clientConn())
3313
3314 respParam := make([]*testpb.ResponseParameters, len(respSizes))
3315 for i, s := range respSizes {
3316 respParam[i] = &testpb.ResponseParameters{
3317 Size: int32(s),
3318 }
3319 }
3320 req := &testpb.StreamingOutputCallRequest{
3321 ResponseType: testpb.PayloadType_COMPRESSABLE,
3322 ResponseParameters: respParam,
3323 }
3324
3325 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
3326 defer cancel()
3327 stream, err := tc.StreamingOutputCall(ctx, req)
3328 if err != nil {
3329 t.Fatalf("%v.StreamingOutputCall(_) = _, %v, want <nil>", tc, err)
3330 }
3331 var rpcStatus error
3332 var respCnt int
3333 var index int
3334 for {
3335 reply, err := stream.Recv()
3336 if err != nil {
3337 rpcStatus = err
3338 break
3339 }
3340 pt := reply.GetPayload().GetType()
3341 if pt != testpb.PayloadType_COMPRESSABLE {
3342 t.Fatalf("Got the reply of type %d, want %d", pt, testpb.PayloadType_COMPRESSABLE)
3343 }
3344 size := len(reply.GetPayload().GetBody())
3345 if size != int(respSizes[index]) {
3346 t.Fatalf("Got reply body of length %d, want %d", size, respSizes[index])
3347 }
3348 index++
3349 respCnt++
3350 }
3351 if rpcStatus != io.EOF {
3352 t.Fatalf("Failed to finish the server streaming rpc: %v, want <EOF>", rpcStatus)
3353 }
3354 if respCnt != len(respSizes) {
3355 t.Fatalf("Got %d reply, want %d", len(respSizes), respCnt)
3356 }
3357 }
3358
3359 func (s) TestFailedServerStreaming(t *testing.T) {
3360 for _, e := range listTestEnv() {
3361 testFailedServerStreaming(t, e)
3362 }
3363 }
3364
3365 func testFailedServerStreaming(t *testing.T, e env) {
3366 te := newTest(t, e)
3367 te.userAgent = failAppUA
3368 te.startServer(&testServer{security: e.security})
3369 defer te.tearDown()
3370 tc := testgrpc.NewTestServiceClient(te.clientConn())
3371
3372 respParam := make([]*testpb.ResponseParameters, len(respSizes))
3373 for i, s := range respSizes {
3374 respParam[i] = &testpb.ResponseParameters{
3375 Size: int32(s),
3376 }
3377 }
3378 req := &testpb.StreamingOutputCallRequest{
3379 ResponseType: testpb.PayloadType_COMPRESSABLE,
3380 ResponseParameters: respParam,
3381 }
3382 ctx := metadata.NewOutgoingContext(te.ctx, testMetadata)
3383 stream, err := tc.StreamingOutputCall(ctx, req)
3384 if err != nil {
3385 t.Fatalf("%v.StreamingOutputCall(_) = _, %v, want <nil>", tc, err)
3386 }
3387 wantErr := status.Error(codes.DataLoss, "error for testing: "+failAppUA)
3388 if _, err := stream.Recv(); !equalError(err, wantErr) {
3389 t.Fatalf("%v.Recv() = _, %v, want _, %v", stream, err, wantErr)
3390 }
3391 }
3392
3393 func equalError(x, y error) bool {
3394 return x == y || (x != nil && y != nil && x.Error() == y.Error())
3395 }
3396
3397
3398
3399
3400
3401
3402
3403 type concurrentSendServer struct {
3404 testgrpc.TestServiceServer
3405 }
3406
3407 func (s concurrentSendServer) StreamingOutputCall(args *testpb.StreamingOutputCallRequest, stream testgrpc.TestService_StreamingOutputCallServer) error {
3408 for i := 0; i < 10; i++ {
3409 stream.Send(&testpb.StreamingOutputCallResponse{
3410 Payload: &testpb.Payload{
3411 Body: []byte{'0' + uint8(i)},
3412 },
3413 })
3414 }
3415 return nil
3416 }
3417
3418
3419 func (s) TestServerStreamingConcurrent(t *testing.T) {
3420 for _, e := range listTestEnv() {
3421 testServerStreamingConcurrent(t, e)
3422 }
3423 }
3424
3425 func testServerStreamingConcurrent(t *testing.T, e env) {
3426 te := newTest(t, e)
3427 te.startServer(concurrentSendServer{})
3428 defer te.tearDown()
3429
3430 cc := te.clientConn()
3431 tc := testgrpc.NewTestServiceClient(cc)
3432
3433 doStreamingCall := func() {
3434 req := &testpb.StreamingOutputCallRequest{}
3435 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
3436 defer cancel()
3437 stream, err := tc.StreamingOutputCall(ctx, req)
3438 if err != nil {
3439 t.Errorf("%v.StreamingOutputCall(_) = _, %v, want <nil>", tc, err)
3440 return
3441 }
3442 var ngot int
3443 var buf bytes.Buffer
3444 for {
3445 reply, err := stream.Recv()
3446 if err == io.EOF {
3447 break
3448 }
3449 if err != nil {
3450 t.Fatal(err)
3451 }
3452 ngot++
3453 if buf.Len() > 0 {
3454 buf.WriteByte(',')
3455 }
3456 buf.Write(reply.GetPayload().GetBody())
3457 }
3458 if want := 10; ngot != want {
3459 t.Errorf("Got %d replies, want %d", ngot, want)
3460 }
3461 if got, want := buf.String(), "0,1,2,3,4,5,6,7,8,9"; got != want {
3462 t.Errorf("Got replies %q; want %q", got, want)
3463 }
3464 }
3465
3466 var wg sync.WaitGroup
3467 for i := 0; i < 20; i++ {
3468 wg.Add(1)
3469 go func() {
3470 defer wg.Done()
3471 doStreamingCall()
3472 }()
3473 }
3474 wg.Wait()
3475
3476 }
3477
3478 func generatePayloadSizes() [][]int {
3479 reqSizes := [][]int{
3480 {27182, 8, 1828, 45904},
3481 }
3482
3483 num8KPayloads := 1024
3484 eightKPayloads := []int{}
3485 for i := 0; i < num8KPayloads; i++ {
3486 eightKPayloads = append(eightKPayloads, (1 << 13))
3487 }
3488 reqSizes = append(reqSizes, eightKPayloads)
3489
3490 num2MPayloads := 8
3491 twoMPayloads := []int{}
3492 for i := 0; i < num2MPayloads; i++ {
3493 twoMPayloads = append(twoMPayloads, (1 << 21))
3494 }
3495 reqSizes = append(reqSizes, twoMPayloads)
3496
3497 return reqSizes
3498 }
3499
3500 func (s) TestClientStreaming(t *testing.T) {
3501 for _, s := range generatePayloadSizes() {
3502 for _, e := range listTestEnv() {
3503 testClientStreaming(t, e, s)
3504 }
3505 }
3506 }
3507
3508 func testClientStreaming(t *testing.T, e env, sizes []int) {
3509 te := newTest(t, e)
3510 te.startServer(&testServer{security: e.security})
3511 defer te.tearDown()
3512 tc := testgrpc.NewTestServiceClient(te.clientConn())
3513
3514 ctx, cancel := context.WithTimeout(te.ctx, defaultTestTimeout)
3515 defer cancel()
3516 stream, err := tc.StreamingInputCall(ctx)
3517 if err != nil {
3518 t.Fatalf("%v.StreamingInputCall(_) = _, %v, want <nil>", tc, err)
3519 }
3520
3521 var sum int
3522 for _, s := range sizes {
3523 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(s))
3524 if err != nil {
3525 t.Fatal(err)
3526 }
3527
3528 req := &testpb.StreamingInputCallRequest{
3529 Payload: payload,
3530 }
3531 if err := stream.Send(req); err != nil {
3532 t.Fatalf("%v.Send(_) = %v, want <nil>", stream, err)
3533 }
3534 sum += s
3535 }
3536 reply, err := stream.CloseAndRecv()
3537 if err != nil {
3538 t.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil)
3539 }
3540 if reply.GetAggregatedPayloadSize() != int32(sum) {
3541 t.Fatalf("%v.CloseAndRecv().GetAggregatePayloadSize() = %v; want %v", stream, reply.GetAggregatedPayloadSize(), sum)
3542 }
3543 }
3544
3545 func (s) TestClientStreamingError(t *testing.T) {
3546 for _, e := range listTestEnv() {
3547 if e.name == "handler-tls" {
3548 continue
3549 }
3550 testClientStreamingError(t, e)
3551 }
3552 }
3553
3554 func testClientStreamingError(t *testing.T, e env) {
3555 te := newTest(t, e)
3556 te.startServer(&testServer{security: e.security, earlyFail: true})
3557 defer te.tearDown()
3558 tc := testgrpc.NewTestServiceClient(te.clientConn())
3559
3560 stream, err := tc.StreamingInputCall(te.ctx)
3561 if err != nil {
3562 t.Fatalf("%v.StreamingInputCall(_) = _, %v, want <nil>", tc, err)
3563 }
3564 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 1)
3565 if err != nil {
3566 t.Fatal(err)
3567 }
3568
3569 req := &testpb.StreamingInputCallRequest{
3570 Payload: payload,
3571 }
3572
3573 if err := stream.Send(req); err != nil {
3574 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
3575 }
3576 for {
3577 if err := stream.Send(req); err != io.EOF {
3578 continue
3579 }
3580 if _, err := stream.CloseAndRecv(); status.Code(err) != codes.NotFound {
3581 t.Fatalf("%v.CloseAndRecv() = %v, want error %s", stream, err, codes.NotFound)
3582 }
3583 break
3584 }
3585 }
3586
3587 func (s) TestExceedMaxStreamsLimit(t *testing.T) {
3588 for _, e := range listTestEnv() {
3589 testExceedMaxStreamsLimit(t, e)
3590 }
3591 }
3592
3593 func testExceedMaxStreamsLimit(t *testing.T, e env) {
3594 te := newTest(t, e)
3595 te.declareLogNoise(
3596 "http2Client.notifyError got notified that the client transport was broken",
3597 "Conn.resetTransport failed to create client transport",
3598 "grpc: the connection is closing",
3599 )
3600 te.maxStream = 1
3601 te.startServer(&testServer{security: e.security})
3602 defer te.tearDown()
3603
3604 cc := te.clientConn()
3605 tc := testgrpc.NewTestServiceClient(cc)
3606
3607 _, err := tc.StreamingInputCall(te.ctx)
3608 if err != nil {
3609 t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, <nil>", tc, err)
3610 }
3611
3612 for {
3613 ctx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
3614 defer cancel()
3615 _, err := tc.StreamingInputCall(ctx)
3616 if err == nil {
3617 time.Sleep(50 * time.Millisecond)
3618 continue
3619 }
3620 if status.Code(err) == codes.DeadlineExceeded {
3621 break
3622 }
3623 t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, %s", tc, err, codes.DeadlineExceeded)
3624 }
3625 }
3626
3627 func (s) TestStreamsQuotaRecovery(t *testing.T) {
3628 for _, e := range listTestEnv() {
3629 testStreamsQuotaRecovery(t, e)
3630 }
3631 }
3632
3633 func testStreamsQuotaRecovery(t *testing.T, e env) {
3634 te := newTest(t, e)
3635 te.declareLogNoise(
3636 "http2Client.notifyError got notified that the client transport was broken",
3637 "Conn.resetTransport failed to create client transport",
3638 "grpc: the connection is closing",
3639 )
3640 te.maxStream = 1
3641 te.startServer(&testServer{security: e.security})
3642 defer te.tearDown()
3643
3644 cc := te.clientConn()
3645 tc := testgrpc.NewTestServiceClient(cc)
3646 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
3647 defer cancel()
3648 if _, err := tc.StreamingInputCall(ctx); err != nil {
3649 t.Fatalf("tc.StreamingInputCall(_) = _, %v, want _, <nil>", err)
3650 }
3651
3652 for {
3653 ctx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
3654 _, err := tc.StreamingInputCall(ctx)
3655 cancel()
3656 if err == nil {
3657 time.Sleep(5 * time.Millisecond)
3658 continue
3659 }
3660 if status.Code(err) == codes.DeadlineExceeded {
3661 break
3662 }
3663 t.Fatalf("tc.StreamingInputCall(_) = _, %v, want _, %s", err, codes.DeadlineExceeded)
3664 }
3665
3666 var wg sync.WaitGroup
3667 for i := 0; i < 10; i++ {
3668 wg.Add(1)
3669 go func() {
3670 defer wg.Done()
3671 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 314)
3672 if err != nil {
3673 t.Error(err)
3674 return
3675 }
3676 req := &testpb.SimpleRequest{
3677 ResponseType: testpb.PayloadType_COMPRESSABLE,
3678 ResponseSize: 1592,
3679 Payload: payload,
3680 }
3681
3682 ctx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
3683 defer cancel()
3684 if _, err := tc.UnaryCall(ctx, req, grpc.WaitForReady(true)); status.Code(err) != codes.DeadlineExceeded {
3685 t.Errorf("tc.UnaryCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
3686 }
3687 }()
3688 }
3689 wg.Wait()
3690
3691 cancel()
3692
3693 ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout)
3694 defer cancel()
3695 if _, err := tc.StreamingInputCall(ctx); err != nil {
3696 t.Fatalf("tc.StreamingInputCall(_) = _, %v, want _, %v", err, nil)
3697 }
3698 }
3699
3700 func (s) TestUnaryClientInterceptor(t *testing.T) {
3701 for _, e := range listTestEnv() {
3702 testUnaryClientInterceptor(t, e)
3703 }
3704 }
3705
3706 func failOkayRPC(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
3707 err := invoker(ctx, method, req, reply, cc, opts...)
3708 if err == nil {
3709 return status.Error(codes.NotFound, "")
3710 }
3711 return err
3712 }
3713
3714 func testUnaryClientInterceptor(t *testing.T, e env) {
3715 te := newTest(t, e)
3716 te.userAgent = testAppUA
3717 te.unaryClientInt = failOkayRPC
3718 te.startServer(&testServer{security: e.security})
3719 defer te.tearDown()
3720
3721 tc := testgrpc.NewTestServiceClient(te.clientConn())
3722 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
3723 defer cancel()
3724 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.NotFound {
3725 t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, error code %s", tc, err, codes.NotFound)
3726 }
3727 }
3728
3729 func (s) TestStreamClientInterceptor(t *testing.T) {
3730 for _, e := range listTestEnv() {
3731 testStreamClientInterceptor(t, e)
3732 }
3733 }
3734
3735 func failOkayStream(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
3736 s, err := streamer(ctx, desc, cc, method, opts...)
3737 if err == nil {
3738 return nil, status.Error(codes.NotFound, "")
3739 }
3740 return s, nil
3741 }
3742
3743 func testStreamClientInterceptor(t *testing.T, e env) {
3744 te := newTest(t, e)
3745 te.streamClientInt = failOkayStream
3746 te.startServer(&testServer{security: e.security})
3747 defer te.tearDown()
3748
3749 tc := testgrpc.NewTestServiceClient(te.clientConn())
3750 respParam := []*testpb.ResponseParameters{
3751 {
3752 Size: int32(1),
3753 },
3754 }
3755 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(1))
3756 if err != nil {
3757 t.Fatal(err)
3758 }
3759 req := &testpb.StreamingOutputCallRequest{
3760 ResponseType: testpb.PayloadType_COMPRESSABLE,
3761 ResponseParameters: respParam,
3762 Payload: payload,
3763 }
3764 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
3765 defer cancel()
3766 if _, err := tc.StreamingOutputCall(ctx, req); status.Code(err) != codes.NotFound {
3767 t.Fatalf("%v.StreamingOutputCall(_) = _, %v, want _, error code %s", tc, err, codes.NotFound)
3768 }
3769 }
3770
3771 func (s) TestUnaryServerInterceptor(t *testing.T) {
3772 for _, e := range listTestEnv() {
3773 testUnaryServerInterceptor(t, e)
3774 }
3775 }
3776
3777 func errInjector(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
3778 return nil, status.Error(codes.PermissionDenied, "")
3779 }
3780
3781 func testUnaryServerInterceptor(t *testing.T, e env) {
3782 te := newTest(t, e)
3783 te.unaryServerInt = errInjector
3784 te.startServer(&testServer{security: e.security})
3785 defer te.tearDown()
3786
3787 tc := testgrpc.NewTestServiceClient(te.clientConn())
3788 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
3789 defer cancel()
3790 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.PermissionDenied {
3791 t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, error code %s", tc, err, codes.PermissionDenied)
3792 }
3793 }
3794
3795 func (s) TestStreamServerInterceptor(t *testing.T) {
3796 for _, e := range listTestEnv() {
3797
3798 if e.name == "handler-tls" {
3799 continue
3800 }
3801 testStreamServerInterceptor(t, e)
3802 }
3803 }
3804
3805 func fullDuplexOnly(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
3806 if info.FullMethod == "/grpc.testing.TestService/FullDuplexCall" {
3807 return handler(srv, ss)
3808 }
3809
3810 return status.Error(codes.PermissionDenied, "")
3811 }
3812
3813 func testStreamServerInterceptor(t *testing.T, e env) {
3814 te := newTest(t, e)
3815 te.streamServerInt = fullDuplexOnly
3816 te.startServer(&testServer{security: e.security})
3817 defer te.tearDown()
3818
3819 tc := testgrpc.NewTestServiceClient(te.clientConn())
3820 respParam := []*testpb.ResponseParameters{
3821 {
3822 Size: int32(1),
3823 },
3824 }
3825 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(1))
3826 if err != nil {
3827 t.Fatal(err)
3828 }
3829 req := &testpb.StreamingOutputCallRequest{
3830 ResponseType: testpb.PayloadType_COMPRESSABLE,
3831 ResponseParameters: respParam,
3832 Payload: payload,
3833 }
3834 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
3835 defer cancel()
3836 s1, err := tc.StreamingOutputCall(ctx, req)
3837 if err != nil {
3838 t.Fatalf("%v.StreamingOutputCall(_) = _, %v, want _, <nil>", tc, err)
3839 }
3840 if _, err := s1.Recv(); status.Code(err) != codes.PermissionDenied {
3841 t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, error code %s", tc, err, codes.PermissionDenied)
3842 }
3843 s2, err := tc.FullDuplexCall(ctx)
3844 if err != nil {
3845 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
3846 }
3847 if err := s2.Send(req); err != nil {
3848 t.Fatalf("%v.Send(_) = %v, want <nil>", s2, err)
3849 }
3850 if _, err := s2.Recv(); err != nil {
3851 t.Fatalf("%v.Recv() = _, %v, want _, <nil>", s2, err)
3852 }
3853 }
3854
3855
3856
3857
3858
3859 type funcServer struct {
3860 testgrpc.TestServiceServer
3861 unaryCall func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error)
3862 streamingInputCall func(stream testgrpc.TestService_StreamingInputCallServer) error
3863 fullDuplexCall func(stream testgrpc.TestService_FullDuplexCallServer) error
3864 }
3865
3866 func (s *funcServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
3867 return s.unaryCall(ctx, in)
3868 }
3869
3870 func (s *funcServer) StreamingInputCall(stream testgrpc.TestService_StreamingInputCallServer) error {
3871 return s.streamingInputCall(stream)
3872 }
3873
3874 func (s *funcServer) FullDuplexCall(stream testgrpc.TestService_FullDuplexCallServer) error {
3875 return s.fullDuplexCall(stream)
3876 }
3877
3878 func (s) TestClientRequestBodyErrorUnexpectedEOF(t *testing.T) {
3879 for _, e := range listTestEnv() {
3880 testClientRequestBodyErrorUnexpectedEOF(t, e)
3881 }
3882 }
3883
3884 func testClientRequestBodyErrorUnexpectedEOF(t *testing.T, e env) {
3885 te := newTest(t, e)
3886 ts := &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
3887 errUnexpectedCall := errors.New("unexpected call func server method")
3888 t.Error(errUnexpectedCall)
3889 return nil, errUnexpectedCall
3890 }}
3891 te.startServer(ts)
3892 defer te.tearDown()
3893 te.withServerTester(func(st *serverTester) {
3894 st.writeHeadersGRPC(1, "/grpc.testing.TestService/UnaryCall", false)
3895
3896 st.writeData(1, true, []byte{0, 0, 0, 0, 5})
3897 st.wantAnyFrame()
3898 })
3899 }
3900
3901 func (s) TestClientRequestBodyErrorCloseAfterLength(t *testing.T) {
3902 for _, e := range listTestEnv() {
3903 testClientRequestBodyErrorCloseAfterLength(t, e)
3904 }
3905 }
3906
3907
3908
3909
3910
3911
3912
3913
3914 func (s) TestClientInvalidStreamID(t *testing.T) {
3915 lis, err := net.Listen("tcp", "localhost:0")
3916 if err != nil {
3917 t.Fatalf("Failed to listen: %v", err)
3918 }
3919 defer lis.Close()
3920 s := grpc.NewServer()
3921 defer s.Stop()
3922 go s.Serve(lis)
3923
3924 conn, err := net.DialTimeout("tcp", lis.Addr().String(), defaultTestTimeout)
3925 if err != nil {
3926 t.Fatalf("Failed to dial: %v", err)
3927 }
3928 st := newServerTesterFromConn(t, conn)
3929 st.greet()
3930 st.writeHeadersGRPC(2, "/grpc.testing.TestService/StreamingInputCall", true)
3931 goAwayFrame := st.wantGoAway(http2.ErrCodeProtocol)
3932 want := "received an illegal stream id: 2."
3933 if got := string(goAwayFrame.DebugData()); !strings.Contains(got, want) {
3934 t.Fatalf(" Received: %v, Expected error message to contain: %v.", got, want)
3935 }
3936 }
3937
3938
3939
3940
3941 func (s) TestInvalidStreamIDSmallerThanPrevious(t *testing.T) {
3942 lis, err := net.Listen("tcp", "localhost:0")
3943 if err != nil {
3944 t.Fatalf("Failed to listen: %v", err)
3945 }
3946 defer lis.Close()
3947 s := grpc.NewServer()
3948 defer s.Stop()
3949 go s.Serve(lis)
3950
3951 conn, err := net.DialTimeout("tcp", lis.Addr().String(), defaultTestTimeout)
3952 if err != nil {
3953 t.Fatalf("Failed to dial: %v", err)
3954 }
3955 st := newServerTesterFromConn(t, conn)
3956 st.greet()
3957 st.writeHeadersGRPC(3, "/grpc.testing.TestService/StreamingInputCall", true)
3958 st.wantAnyFrame()
3959 st.writeHeadersGRPC(1, "/grpc.testing.TestService/StreamingInputCall", true)
3960 goAwayFrame := st.wantGoAway(http2.ErrCodeProtocol)
3961 want := "received an illegal stream id: 1"
3962 if got := string(goAwayFrame.DebugData()); !strings.Contains(got, want) {
3963 t.Fatalf(" Received: %v, Expected error message to contain: %v.", got, want)
3964 }
3965 }
3966
3967 func testClientRequestBodyErrorCloseAfterLength(t *testing.T, e env) {
3968 te := newTest(t, e)
3969 te.declareLogNoise("Server.processUnaryRPC failed to write status")
3970 ts := &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
3971 errUnexpectedCall := errors.New("unexpected call func server method")
3972 t.Error(errUnexpectedCall)
3973 return nil, errUnexpectedCall
3974 }}
3975 te.startServer(ts)
3976 defer te.tearDown()
3977 te.withServerTester(func(st *serverTester) {
3978 st.writeHeadersGRPC(1, "/grpc.testing.TestService/UnaryCall", false)
3979
3980 st.writeData(1, false, []byte{0, 0, 0, 0, 5})
3981 st.cc.Close()
3982 })
3983 }
3984
3985 func (s) TestClientRequestBodyErrorCancel(t *testing.T) {
3986 for _, e := range listTestEnv() {
3987 testClientRequestBodyErrorCancel(t, e)
3988 }
3989 }
3990
3991 func testClientRequestBodyErrorCancel(t *testing.T, e env) {
3992 te := newTest(t, e)
3993 gotCall := make(chan bool, 1)
3994 ts := &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
3995 gotCall <- true
3996 return new(testpb.SimpleResponse), nil
3997 }}
3998 te.startServer(ts)
3999 defer te.tearDown()
4000 te.withServerTester(func(st *serverTester) {
4001 st.writeHeadersGRPC(1, "/grpc.testing.TestService/UnaryCall", false)
4002
4003 st.writeRSTStream(1, http2.ErrCodeCancel)
4004 st.writeData(1, false, []byte{0, 0, 0, 0, 5})
4005
4006
4007 select {
4008 case <-gotCall:
4009 t.Fatal("unexpected call")
4010 default:
4011 }
4012
4013
4014 st.writeHeadersGRPC(3, "/grpc.testing.TestService/UnaryCall", false)
4015 st.writeData(3, true, []byte{0, 0, 0, 0, 0})
4016 <-gotCall
4017 st.wantAnyFrame()
4018 })
4019 }
4020
4021 func (s) TestClientRequestBodyErrorCancelStreamingInput(t *testing.T) {
4022 for _, e := range listTestEnv() {
4023 testClientRequestBodyErrorCancelStreamingInput(t, e)
4024 }
4025 }
4026
4027 func testClientRequestBodyErrorCancelStreamingInput(t *testing.T, e env) {
4028 te := newTest(t, e)
4029 recvErr := make(chan error, 1)
4030 ts := &funcServer{streamingInputCall: func(stream testgrpc.TestService_StreamingInputCallServer) error {
4031 _, err := stream.Recv()
4032 recvErr <- err
4033 return nil
4034 }}
4035 te.startServer(ts)
4036 defer te.tearDown()
4037 te.withServerTester(func(st *serverTester) {
4038 st.writeHeadersGRPC(1, "/grpc.testing.TestService/StreamingInputCall", false)
4039
4040 st.writeData(1, false, []byte{0, 0, 0, 0, 5})
4041 st.writeRSTStream(1, http2.ErrCodeCancel)
4042
4043 var got error
4044 select {
4045 case got = <-recvErr:
4046 case <-time.After(3 * time.Second):
4047 t.Fatal("timeout waiting for error")
4048 }
4049 if grpc.Code(got) != codes.Canceled {
4050 t.Errorf("error = %#v; want error code %s", got, codes.Canceled)
4051 }
4052 })
4053 }
4054
4055 func (s) TestClientInitialHeaderEndStream(t *testing.T) {
4056 for _, e := range listTestEnv() {
4057 if e.httpHandler {
4058 continue
4059 }
4060 testClientInitialHeaderEndStream(t, e)
4061 }
4062 }
4063
4064 func testClientInitialHeaderEndStream(t *testing.T, e env) {
4065
4066
4067 frameCheckingDone := make(chan struct{})
4068
4069
4070 handlerDone := make(chan struct{})
4071 te := newTest(t, e)
4072 ts := &funcServer{streamingInputCall: func(stream testgrpc.TestService_StreamingInputCallServer) error {
4073 defer close(handlerDone)
4074
4075
4076 <-frameCheckingDone
4077 data, err := stream.Recv()
4078 if err == nil {
4079 t.Errorf("unexpected data received in func server method: '%v'", data)
4080 } else if status.Code(err) != codes.Canceled {
4081 t.Errorf("expected canceled error, instead received '%v'", err)
4082 }
4083 return nil
4084 }}
4085 te.startServer(ts)
4086 defer te.tearDown()
4087 te.withServerTester(func(st *serverTester) {
4088
4089 st.writeHeadersGRPC(1, "/grpc.testing.TestService/StreamingInputCall", true)
4090 st.writeData(1, false, []byte{0, 0, 0, 0, 0})
4091 st.wantAnyFrame()
4092 st.wantAnyFrame()
4093 st.wantRSTStream(http2.ErrCodeStreamClosed)
4094 close(frameCheckingDone)
4095 <-handlerDone
4096 })
4097 }
4098
4099 func (s) TestClientSendDataAfterCloseSend(t *testing.T) {
4100 for _, e := range listTestEnv() {
4101 if e.httpHandler {
4102 continue
4103 }
4104 testClientSendDataAfterCloseSend(t, e)
4105 }
4106 }
4107
4108 func testClientSendDataAfterCloseSend(t *testing.T, e env) {
4109
4110
4111 frameCheckingDone := make(chan struct{})
4112
4113
4114 handlerDone := make(chan struct{})
4115 te := newTest(t, e)
4116 ts := &funcServer{streamingInputCall: func(stream testgrpc.TestService_StreamingInputCallServer) error {
4117 defer close(handlerDone)
4118
4119
4120 <-frameCheckingDone
4121 for {
4122 _, err := stream.Recv()
4123 if err == io.EOF {
4124 break
4125 }
4126 if err != nil {
4127 if status.Code(err) != codes.Canceled {
4128 t.Errorf("expected canceled error, instead received '%v'", err)
4129 }
4130 break
4131 }
4132 }
4133 if err := stream.SendMsg(nil); err == nil {
4134 t.Error("expected error sending message on stream after stream closed due to illegal data")
4135 } else if status.Code(err) != codes.Canceled {
4136 t.Errorf("expected cancel error, instead received '%v'", err)
4137 }
4138 return nil
4139 }}
4140 te.startServer(ts)
4141 defer te.tearDown()
4142 te.withServerTester(func(st *serverTester) {
4143 st.writeHeadersGRPC(1, "/grpc.testing.TestService/StreamingInputCall", false)
4144
4145 st.writeData(1, true, []byte{0, 0, 0, 0, 0})
4146 st.writeData(1, false, []byte{0, 0, 0, 0, 0})
4147 st.wantAnyFrame()
4148 st.wantAnyFrame()
4149 st.wantRSTStream(http2.ErrCodeStreamClosed)
4150 close(frameCheckingDone)
4151 <-handlerDone
4152 })
4153 }
4154
4155 func (s) TestClientResourceExhaustedCancelFullDuplex(t *testing.T) {
4156 for _, e := range listTestEnv() {
4157 if e.httpHandler {
4158
4159 continue
4160 }
4161 testClientResourceExhaustedCancelFullDuplex(t, e)
4162 }
4163 }
4164
4165 func testClientResourceExhaustedCancelFullDuplex(t *testing.T, e env) {
4166 te := newTest(t, e)
4167 recvErr := make(chan error, 1)
4168 ts := &funcServer{fullDuplexCall: func(stream testgrpc.TestService_FullDuplexCallServer) error {
4169 defer close(recvErr)
4170 _, err := stream.Recv()
4171 if err != nil {
4172 return status.Errorf(codes.Internal, "stream.Recv() got error: %v, want <nil>", err)
4173 }
4174
4175 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 10)
4176 if err != nil {
4177 return err
4178 }
4179 resp := &testpb.StreamingOutputCallResponse{
4180 Payload: payload,
4181 }
4182 ce := make(chan error, 1)
4183 go func() {
4184 var err error
4185 for {
4186 if err = stream.Send(resp); err != nil {
4187 break
4188 }
4189 }
4190 ce <- err
4191 }()
4192 select {
4193 case err = <-ce:
4194 case <-time.After(10 * time.Second):
4195 err = errors.New("10s timeout reached")
4196 }
4197 recvErr <- err
4198 return err
4199 }}
4200 te.startServer(ts)
4201 defer te.tearDown()
4202
4203
4204 te.maxClientReceiveMsgSize = newInt(10)
4205 cc := te.clientConn()
4206 tc := testgrpc.NewTestServiceClient(cc)
4207
4208 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
4209 defer cancel()
4210 stream, err := tc.FullDuplexCall(ctx)
4211 if err != nil {
4212 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
4213 }
4214 req := &testpb.StreamingOutputCallRequest{}
4215 if err := stream.Send(req); err != nil {
4216 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
4217 }
4218 if _, err := stream.Recv(); status.Code(err) != codes.ResourceExhausted {
4219 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
4220 }
4221 err = <-recvErr
4222 if status.Code(err) != codes.Canceled {
4223 t.Fatalf("server got error %v, want error code: %s", err, codes.Canceled)
4224 }
4225 }
4226
4227 type clientFailCreds struct{}
4228
4229 func (c *clientFailCreds) ServerHandshake(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
4230 return rawConn, nil, nil
4231 }
4232 func (c *clientFailCreds) ClientHandshake(ctx context.Context, authority string, rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
4233 return nil, nil, fmt.Errorf("client handshake fails with fatal error")
4234 }
4235 func (c *clientFailCreds) Info() credentials.ProtocolInfo {
4236 return credentials.ProtocolInfo{}
4237 }
4238 func (c *clientFailCreds) Clone() credentials.TransportCredentials {
4239 return c
4240 }
4241 func (c *clientFailCreds) OverrideServerName(s string) error {
4242 return nil
4243 }
4244
4245
4246
4247 func (s) TestFailfastRPCFailOnFatalHandshakeError(t *testing.T) {
4248 lis, err := net.Listen("tcp", "localhost:0")
4249 if err != nil {
4250 t.Fatalf("Failed to listen: %v", err)
4251 }
4252 defer lis.Close()
4253
4254 cc, err := grpc.Dial("passthrough:///"+lis.Addr().String(), grpc.WithTransportCredentials(&clientFailCreds{}))
4255 if err != nil {
4256 t.Fatalf("grpc.Dial(_) = %v", err)
4257 }
4258 defer cc.Close()
4259
4260 tc := testgrpc.NewTestServiceClient(cc)
4261
4262 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
4263 defer cancel()
4264 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(false)); status.Code(err) != codes.Unavailable {
4265 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want <Unavailable>", err)
4266 }
4267 }
4268
4269 func (s) TestFlowControlLogicalRace(t *testing.T) {
4270
4271
4272
4273 const (
4274 itemCount = 100
4275 itemSize = 1 << 10
4276 recvCount = 2
4277 maxFailures = 3
4278 )
4279
4280 requestCount := 3000
4281 if raceMode {
4282 requestCount = 1000
4283 }
4284
4285 lis, err := net.Listen("tcp", "localhost:0")
4286 if err != nil {
4287 t.Fatalf("Failed to listen: %v", err)
4288 }
4289 defer lis.Close()
4290
4291 s := grpc.NewServer()
4292 testgrpc.RegisterTestServiceServer(s, &flowControlLogicalRaceServer{
4293 itemCount: itemCount,
4294 itemSize: itemSize,
4295 })
4296 defer s.Stop()
4297
4298 go s.Serve(lis)
4299
4300 cc, err := grpc.Dial(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
4301 if err != nil {
4302 t.Fatalf("grpc.Dial(%q) = %v", lis.Addr().String(), err)
4303 }
4304 defer cc.Close()
4305 cl := testgrpc.NewTestServiceClient(cc)
4306
4307 failures := 0
4308 for i := 0; i < requestCount; i++ {
4309 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
4310 output, err := cl.StreamingOutputCall(ctx, &testpb.StreamingOutputCallRequest{})
4311 if err != nil {
4312 t.Fatalf("StreamingOutputCall; err = %q", err)
4313 }
4314
4315 for j := 0; j < recvCount; j++ {
4316 if _, err := output.Recv(); err != nil {
4317 if err == io.EOF || status.Code(err) == codes.DeadlineExceeded {
4318 t.Errorf("got %d responses to request %d", j, i)
4319 failures++
4320 break
4321 }
4322 t.Fatalf("Recv; err = %q", err)
4323 }
4324 }
4325 cancel()
4326
4327 if failures >= maxFailures {
4328
4329
4330 t.Fatalf("Too many failures received; aborting")
4331 }
4332 }
4333 }
4334
4335 type flowControlLogicalRaceServer struct {
4336 testgrpc.TestServiceServer
4337
4338 itemSize int
4339 itemCount int
4340 }
4341
4342 func (s *flowControlLogicalRaceServer) StreamingOutputCall(req *testpb.StreamingOutputCallRequest, srv testgrpc.TestService_StreamingOutputCallServer) error {
4343 for i := 0; i < s.itemCount; i++ {
4344 err := srv.Send(&testpb.StreamingOutputCallResponse{
4345 Payload: &testpb.Payload{
4346
4347
4348
4349
4350
4351
4352
4353 Body: bytes.Repeat([]byte("a"), s.itemSize),
4354 },
4355 })
4356 if err != nil {
4357 return err
4358 }
4359 }
4360 return nil
4361 }
4362
4363 type lockingWriter struct {
4364 mu sync.Mutex
4365 w io.Writer
4366 }
4367
4368 func (lw *lockingWriter) Write(p []byte) (n int, err error) {
4369 lw.mu.Lock()
4370 defer lw.mu.Unlock()
4371 return lw.w.Write(p)
4372 }
4373
4374 func (lw *lockingWriter) setWriter(w io.Writer) {
4375 lw.mu.Lock()
4376 defer lw.mu.Unlock()
4377 lw.w = w
4378 }
4379
4380 var testLogOutput = &lockingWriter{w: os.Stderr}
4381
4382
4383
4384
4385
4386
4387 func awaitNewConnLogOutput() {
4388 awaitLogOutput(50*time.Millisecond, "grpc: the client connection is closing; please retry")
4389 }
4390
4391 func awaitLogOutput(maxWait time.Duration, phrase string) {
4392 pb := []byte(phrase)
4393
4394 timer := time.NewTimer(maxWait)
4395 defer timer.Stop()
4396 wakeup := make(chan bool, 1)
4397 for {
4398 if logOutputHasContents(pb, wakeup) {
4399 return
4400 }
4401 select {
4402 case <-timer.C:
4403
4404 return
4405 case <-wakeup:
4406 }
4407 }
4408 }
4409
4410 func logOutputHasContents(v []byte, wakeup chan<- bool) bool {
4411 testLogOutput.mu.Lock()
4412 defer testLogOutput.mu.Unlock()
4413 fw, ok := testLogOutput.w.(*filterWriter)
4414 if !ok {
4415 return false
4416 }
4417 fw.mu.Lock()
4418 defer fw.mu.Unlock()
4419 if bytes.Contains(fw.buf.Bytes(), v) {
4420 return true
4421 }
4422 fw.wakeup = wakeup
4423 return false
4424 }
4425
4426 var verboseLogs = flag.Bool("verbose_logs", false, "show all log output, without filtering")
4427
4428 func noop() {}
4429
4430
4431
4432
4433
4434 func declareLogNoise(t *testing.T, phrases ...string) (restore func()) {
4435 if *verboseLogs {
4436 return noop
4437 }
4438 fw := &filterWriter{dst: os.Stderr, filter: phrases}
4439 testLogOutput.setWriter(fw)
4440 return func() {
4441 if t.Failed() {
4442 fw.mu.Lock()
4443 defer fw.mu.Unlock()
4444 if fw.buf.Len() > 0 {
4445 t.Logf("Complete log output:\n%s", fw.buf.Bytes())
4446 }
4447 }
4448 testLogOutput.setWriter(os.Stderr)
4449 }
4450 }
4451
4452 type filterWriter struct {
4453 dst io.Writer
4454 filter []string
4455
4456 mu sync.Mutex
4457 buf bytes.Buffer
4458 wakeup chan<- bool
4459 }
4460
4461 func (fw *filterWriter) Write(p []byte) (n int, err error) {
4462 fw.mu.Lock()
4463 fw.buf.Write(p)
4464 if fw.wakeup != nil {
4465 select {
4466 case fw.wakeup <- true:
4467 default:
4468 }
4469 }
4470 fw.mu.Unlock()
4471
4472 ps := string(p)
4473 for _, f := range fw.filter {
4474 if strings.Contains(ps, f) {
4475 return len(p), nil
4476 }
4477 }
4478 return fw.dst.Write(p)
4479 }
4480
4481 func (s) TestGRPCMethod(t *testing.T) {
4482 var method string
4483 var ok bool
4484
4485 ss := &stubserver.StubServer{
4486 EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
4487 method, ok = grpc.Method(ctx)
4488 return &testpb.Empty{}, nil
4489 },
4490 }
4491 if err := ss.Start(nil); err != nil {
4492 t.Fatalf("Error starting endpoint server: %v", err)
4493 }
4494 defer ss.Stop()
4495
4496 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
4497 defer cancel()
4498
4499 if _, err := ss.Client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
4500 t.Fatalf("ss.Client.EmptyCall(_, _) = _, %v; want _, nil", err)
4501 }
4502
4503 if want := "/grpc.testing.TestService/EmptyCall"; !ok || method != want {
4504 t.Fatalf("grpc.Method(_) = %q, %v; want %q, true", method, ok, want)
4505 }
4506 }
4507
4508 func (s) TestUnaryProxyDoesNotForwardMetadata(t *testing.T) {
4509 const mdkey = "somedata"
4510
4511
4512 endpoint := &stubserver.StubServer{
4513 EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
4514 if md, ok := metadata.FromIncomingContext(ctx); !ok || md[mdkey] != nil {
4515 return nil, status.Errorf(codes.Internal, "endpoint: md=%v; want !contains(%q)", md, mdkey)
4516 }
4517 return &testpb.Empty{}, nil
4518 },
4519 }
4520 if err := endpoint.Start(nil); err != nil {
4521 t.Fatalf("Error starting endpoint server: %v", err)
4522 }
4523 defer endpoint.Stop()
4524
4525
4526
4527 proxy := &stubserver.StubServer{
4528 EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
4529 if md, ok := metadata.FromIncomingContext(ctx); !ok || md[mdkey] == nil {
4530 return nil, status.Errorf(codes.Internal, "proxy: md=%v; want contains(%q)", md, mdkey)
4531 }
4532 return endpoint.Client.EmptyCall(ctx, in)
4533 },
4534 }
4535 if err := proxy.Start(nil); err != nil {
4536 t.Fatalf("Error starting proxy server: %v", err)
4537 }
4538 defer proxy.Stop()
4539
4540 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
4541 defer cancel()
4542 md := metadata.Pairs(mdkey, "val")
4543 ctx = metadata.NewOutgoingContext(ctx, md)
4544
4545
4546 _, err := endpoint.Client.EmptyCall(ctx, &testpb.Empty{})
4547 if s, ok := status.FromError(err); !ok || s.Code() != codes.Internal {
4548 t.Fatalf("endpoint.Client.EmptyCall(_, _) = _, %v; want _, <status with Code()=Internal>", err)
4549 }
4550
4551 if _, err := proxy.Client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
4552 t.Fatal(err.Error())
4553 }
4554 }
4555
4556 func (s) TestStreamingProxyDoesNotForwardMetadata(t *testing.T) {
4557 const mdkey = "somedata"
4558
4559
4560
4561
4562 doFDC := func(ctx context.Context, client testgrpc.TestServiceClient) error {
4563 stream, err := client.FullDuplexCall(ctx)
4564 if err != nil {
4565 t.Fatalf("Unwanted error: %v", err)
4566 }
4567 if _, err := stream.Recv(); err != io.EOF {
4568 return err
4569 }
4570 return nil
4571 }
4572
4573
4574 endpoint := &stubserver.StubServer{
4575 FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
4576 ctx := stream.Context()
4577 if md, ok := metadata.FromIncomingContext(ctx); !ok || md[mdkey] != nil {
4578 return status.Errorf(codes.Internal, "endpoint: md=%v; want !contains(%q)", md, mdkey)
4579 }
4580 return nil
4581 },
4582 }
4583 if err := endpoint.Start(nil); err != nil {
4584 t.Fatalf("Error starting endpoint server: %v", err)
4585 }
4586 defer endpoint.Stop()
4587
4588
4589
4590 proxy := &stubserver.StubServer{
4591 FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
4592 ctx := stream.Context()
4593 if md, ok := metadata.FromIncomingContext(ctx); !ok || md[mdkey] == nil {
4594 return status.Errorf(codes.Internal, "endpoint: md=%v; want !contains(%q)", md, mdkey)
4595 }
4596 return doFDC(ctx, endpoint.Client)
4597 },
4598 }
4599 if err := proxy.Start(nil); err != nil {
4600 t.Fatalf("Error starting proxy server: %v", err)
4601 }
4602 defer proxy.Stop()
4603
4604 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
4605 defer cancel()
4606 md := metadata.Pairs(mdkey, "val")
4607 ctx = metadata.NewOutgoingContext(ctx, md)
4608
4609
4610 err := doFDC(ctx, endpoint.Client)
4611 if s, ok := status.FromError(err); !ok || s.Code() != codes.Internal {
4612 t.Fatalf("stream.Recv() = _, %v; want _, <status with Code()=Internal>", err)
4613 }
4614
4615 if err := doFDC(ctx, proxy.Client); err != nil {
4616 t.Fatalf("doFDC(_, proxy.Client) = %v; want nil", err)
4617 }
4618 }
4619
4620 func (s) TestStatsTagsAndTrace(t *testing.T) {
4621
4622 tags := []byte{1, 5, 2, 4, 3}
4623 trace := []byte{5, 2, 1, 3, 4}
4624
4625
4626
4627 endpoint := &stubserver.StubServer{
4628 EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
4629 md, _ := metadata.FromIncomingContext(ctx)
4630 if tg := stats.Tags(ctx); !reflect.DeepEqual(tg, tags) {
4631 return nil, status.Errorf(codes.Internal, "stats.Tags(%v)=%v; want %v", ctx, tg, tags)
4632 }
4633 if !reflect.DeepEqual(md["grpc-tags-bin"], []string{string(tags)}) {
4634 return nil, status.Errorf(codes.Internal, "md['grpc-tags-bin']=%v; want %v", md["grpc-tags-bin"], tags)
4635 }
4636 if tr := stats.Trace(ctx); !reflect.DeepEqual(tr, trace) {
4637 return nil, status.Errorf(codes.Internal, "stats.Trace(%v)=%v; want %v", ctx, tr, trace)
4638 }
4639 if !reflect.DeepEqual(md["grpc-trace-bin"], []string{string(trace)}) {
4640 return nil, status.Errorf(codes.Internal, "md['grpc-trace-bin']=%v; want %v", md["grpc-trace-bin"], trace)
4641 }
4642 return &testpb.Empty{}, nil
4643 },
4644 }
4645 if err := endpoint.Start(nil); err != nil {
4646 t.Fatalf("Error starting endpoint server: %v", err)
4647 }
4648 defer endpoint.Stop()
4649
4650 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
4651 defer cancel()
4652
4653 testCases := []struct {
4654 ctx context.Context
4655 want codes.Code
4656 }{
4657 {ctx: ctx, want: codes.Internal},
4658 {ctx: stats.SetTags(ctx, tags), want: codes.Internal},
4659 {ctx: stats.SetTrace(ctx, trace), want: codes.Internal},
4660 {ctx: stats.SetTags(stats.SetTrace(ctx, tags), tags), want: codes.Internal},
4661 {ctx: stats.SetTags(stats.SetTrace(ctx, trace), tags), want: codes.OK},
4662 }
4663
4664 for _, tc := range testCases {
4665 _, err := endpoint.Client.EmptyCall(tc.ctx, &testpb.Empty{})
4666 if tc.want == codes.OK && err != nil {
4667 t.Fatalf("endpoint.Client.EmptyCall(%v, _) = _, %v; want _, nil", tc.ctx, err)
4668 }
4669 if s, ok := status.FromError(err); !ok || s.Code() != tc.want {
4670 t.Fatalf("endpoint.Client.EmptyCall(%v, _) = _, %v; want _, <status with Code()=%v>", tc.ctx, err, tc.want)
4671 }
4672 }
4673 }
4674
4675 func (s) TestTapTimeout(t *testing.T) {
4676 sopts := []grpc.ServerOption{
4677 grpc.InTapHandle(func(ctx context.Context, _ *tap.Info) (context.Context, error) {
4678 c, cancel := context.WithCancel(ctx)
4679
4680
4681
4682 time.AfterFunc(10*time.Millisecond, cancel)
4683 return c, nil
4684 }),
4685 }
4686
4687 ss := &stubserver.StubServer{
4688 EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
4689 <-ctx.Done()
4690 return nil, status.Errorf(codes.Canceled, ctx.Err().Error())
4691 },
4692 }
4693 if err := ss.Start(sopts); err != nil {
4694 t.Fatalf("Error starting endpoint server: %v", err)
4695 }
4696 defer ss.Stop()
4697
4698
4699 for i := 0; i < 10; i++ {
4700
4701 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
4702 res, err := ss.Client.EmptyCall(ctx, &testpb.Empty{})
4703 cancel()
4704 if s, ok := status.FromError(err); !ok || s.Code() != codes.Canceled {
4705 t.Fatalf("ss.Client.EmptyCall(ctx, _) = %v, %v; want nil, <status with Code()=Canceled>", res, err)
4706 }
4707 }
4708
4709 }
4710
4711 func (s) TestClientWriteFailsAfterServerClosesStream(t *testing.T) {
4712 ss := &stubserver.StubServer{
4713 FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
4714 return status.Errorf(codes.Internal, "")
4715 },
4716 }
4717 sopts := []grpc.ServerOption{}
4718 if err := ss.Start(sopts); err != nil {
4719 t.Fatalf("Error starting endpoint server: %v", err)
4720 }
4721 defer ss.Stop()
4722 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
4723 defer cancel()
4724 stream, err := ss.Client.FullDuplexCall(ctx)
4725 if err != nil {
4726 t.Fatalf("Error while creating stream: %v", err)
4727 }
4728 for {
4729 if err := stream.Send(&testpb.StreamingOutputCallRequest{}); err == nil {
4730 time.Sleep(5 * time.Millisecond)
4731 } else if err == io.EOF {
4732 break
4733 } else {
4734 t.Fatalf("stream.Send(_) = %v, want io.EOF", err)
4735 }
4736 }
4737 }
4738
4739 type windowSizeConfig struct {
4740 serverStream int32
4741 serverConn int32
4742 clientStream int32
4743 clientConn int32
4744 }
4745
4746 func max(a, b int32) int32 {
4747 if a > b {
4748 return a
4749 }
4750 return b
4751 }
4752
4753 func (s) TestConfigurableWindowSizeWithLargeWindow(t *testing.T) {
4754 wc := windowSizeConfig{
4755 serverStream: 8 * 1024 * 1024,
4756 serverConn: 12 * 1024 * 1024,
4757 clientStream: 6 * 1024 * 1024,
4758 clientConn: 8 * 1024 * 1024,
4759 }
4760 for _, e := range listTestEnv() {
4761 testConfigurableWindowSize(t, e, wc)
4762 }
4763 }
4764
4765 func (s) TestConfigurableWindowSizeWithSmallWindow(t *testing.T) {
4766 wc := windowSizeConfig{
4767 serverStream: 1,
4768 serverConn: 1,
4769 clientStream: 1,
4770 clientConn: 1,
4771 }
4772 for _, e := range listTestEnv() {
4773 testConfigurableWindowSize(t, e, wc)
4774 }
4775 }
4776
4777 func testConfigurableWindowSize(t *testing.T, e env, wc windowSizeConfig) {
4778 te := newTest(t, e)
4779 te.serverInitialWindowSize = wc.serverStream
4780 te.serverInitialConnWindowSize = wc.serverConn
4781 te.clientInitialWindowSize = wc.clientStream
4782 te.clientInitialConnWindowSize = wc.clientConn
4783
4784 te.startServer(&testServer{security: e.security})
4785 defer te.tearDown()
4786
4787 cc := te.clientConn()
4788 tc := testgrpc.NewTestServiceClient(cc)
4789 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
4790 defer cancel()
4791 stream, err := tc.FullDuplexCall(ctx)
4792 if err != nil {
4793 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
4794 }
4795 numOfIter := 11
4796
4797 messageSize := max(max(wc.serverStream, wc.serverConn), max(wc.clientStream, wc.clientConn)) / int32(numOfIter-1)
4798 messageSize = max(messageSize, 64*1024)
4799 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, messageSize)
4800 if err != nil {
4801 t.Fatal(err)
4802 }
4803 respParams := []*testpb.ResponseParameters{
4804 {
4805 Size: messageSize,
4806 },
4807 }
4808 req := &testpb.StreamingOutputCallRequest{
4809 ResponseType: testpb.PayloadType_COMPRESSABLE,
4810 ResponseParameters: respParams,
4811 Payload: payload,
4812 }
4813 for i := 0; i < numOfIter; i++ {
4814 if err := stream.Send(req); err != nil {
4815 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
4816 }
4817 if _, err := stream.Recv(); err != nil {
4818 t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err)
4819 }
4820 }
4821 if err := stream.CloseSend(); err != nil {
4822 t.Fatalf("%v.CloseSend() = %v, want <nil>", stream, err)
4823 }
4824 }
4825
4826 func (s) TestWaitForReadyConnection(t *testing.T) {
4827 for _, e := range listTestEnv() {
4828 testWaitForReadyConnection(t, e)
4829 }
4830
4831 }
4832
4833 func testWaitForReadyConnection(t *testing.T, e env) {
4834 te := newTest(t, e)
4835 te.userAgent = testAppUA
4836 te.startServer(&testServer{security: e.security})
4837 defer te.tearDown()
4838
4839 cc := te.clientConn()
4840 tc := testgrpc.NewTestServiceClient(cc)
4841 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
4842 defer cancel()
4843 testutils.AwaitState(ctx, t, cc, connectivity.Ready)
4844
4845 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil {
4846 t.Fatalf("TestService/EmptyCall(_,_) = _, %v, want _, nil", err)
4847 }
4848 }
4849
4850 func (s) TestSvrWriteStatusEarlyWrite(t *testing.T) {
4851 for _, e := range listTestEnv() {
4852 testSvrWriteStatusEarlyWrite(t, e)
4853 }
4854 }
4855
4856 func testSvrWriteStatusEarlyWrite(t *testing.T, e env) {
4857 te := newTest(t, e)
4858 const smallSize = 1024
4859 const largeSize = 2048
4860 const extraLargeSize = 4096
4861 te.maxServerReceiveMsgSize = newInt(largeSize)
4862 te.maxServerSendMsgSize = newInt(largeSize)
4863 smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize)
4864 if err != nil {
4865 t.Fatal(err)
4866 }
4867 extraLargePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, extraLargeSize)
4868 if err != nil {
4869 t.Fatal(err)
4870 }
4871 te.startServer(&testServer{security: e.security})
4872 defer te.tearDown()
4873 tc := testgrpc.NewTestServiceClient(te.clientConn())
4874 respParam := []*testpb.ResponseParameters{
4875 {
4876 Size: int32(smallSize),
4877 },
4878 }
4879 sreq := &testpb.StreamingOutputCallRequest{
4880 ResponseType: testpb.PayloadType_COMPRESSABLE,
4881 ResponseParameters: respParam,
4882 Payload: extraLargePayload,
4883 }
4884
4885 stream, err := tc.FullDuplexCall(te.ctx)
4886 if err != nil {
4887 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
4888 }
4889 if err = stream.Send(sreq); err != nil {
4890 t.Fatalf("%v.Send() = _, %v, want <nil>", stream, err)
4891 }
4892 if _, err = stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
4893 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
4894 }
4895
4896 sreq.Payload = smallPayload
4897 respParam[0].Size = int32(extraLargeSize)
4898
4899 stream, err = tc.FullDuplexCall(te.ctx)
4900 if err != nil {
4901 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
4902 }
4903 if err = stream.Send(sreq); err != nil {
4904 t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
4905 }
4906 if _, err = stream.Recv(); err == nil || status.Code(err) != codes.ResourceExhausted {
4907 t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted)
4908 }
4909 }
4910
4911
4912
4913
4914 func (s) TestMalformedStreamMethod(t *testing.T) {
4915 const testMethod = "a-method-name-without-any-slashes"
4916 te := newTest(t, tcpClearRREnv)
4917 te.startServer(nil)
4918 defer te.tearDown()
4919
4920 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
4921 defer cancel()
4922 err := te.clientConn().Invoke(ctx, testMethod, nil, nil)
4923 if gotCode := status.Code(err); gotCode != codes.Unimplemented {
4924 t.Fatalf("Invoke with method %q, got code %s, want %s", testMethod, gotCode, codes.Unimplemented)
4925 }
4926 }
4927
4928 func (s) TestMethodFromServerStream(t *testing.T) {
4929 const testMethod = "/package.service/method"
4930 e := tcpClearRREnv
4931 te := newTest(t, e)
4932 var method string
4933 var ok bool
4934 te.unknownHandler = func(srv any, stream grpc.ServerStream) error {
4935 method, ok = grpc.MethodFromServerStream(stream)
4936 return nil
4937 }
4938
4939 te.startServer(nil)
4940 defer te.tearDown()
4941 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
4942 defer cancel()
4943 _ = te.clientConn().Invoke(ctx, testMethod, nil, nil)
4944 if !ok || method != testMethod {
4945 t.Fatalf("Invoke with method %q, got %q, %v, want %q, true", testMethod, method, ok, testMethod)
4946 }
4947 }
4948
4949 func (s) TestInterceptorCanAccessCallOptions(t *testing.T) {
4950 e := tcpClearRREnv
4951 te := newTest(t, e)
4952 te.startServer(&testServer{security: e.security})
4953 defer te.tearDown()
4954
4955 type observedOptions struct {
4956 headers []*metadata.MD
4957 trailers []*metadata.MD
4958 peer []*peer.Peer
4959 creds []credentials.PerRPCCredentials
4960 failFast []bool
4961 maxRecvSize []int
4962 maxSendSize []int
4963 compressor []string
4964 subtype []string
4965 }
4966 var observedOpts observedOptions
4967 populateOpts := func(opts []grpc.CallOption) {
4968 for _, o := range opts {
4969 switch o := o.(type) {
4970 case grpc.HeaderCallOption:
4971 observedOpts.headers = append(observedOpts.headers, o.HeaderAddr)
4972 case grpc.TrailerCallOption:
4973 observedOpts.trailers = append(observedOpts.trailers, o.TrailerAddr)
4974 case grpc.PeerCallOption:
4975 observedOpts.peer = append(observedOpts.peer, o.PeerAddr)
4976 case grpc.PerRPCCredsCallOption:
4977 observedOpts.creds = append(observedOpts.creds, o.Creds)
4978 case grpc.FailFastCallOption:
4979 observedOpts.failFast = append(observedOpts.failFast, o.FailFast)
4980 case grpc.MaxRecvMsgSizeCallOption:
4981 observedOpts.maxRecvSize = append(observedOpts.maxRecvSize, o.MaxRecvMsgSize)
4982 case grpc.MaxSendMsgSizeCallOption:
4983 observedOpts.maxSendSize = append(observedOpts.maxSendSize, o.MaxSendMsgSize)
4984 case grpc.CompressorCallOption:
4985 observedOpts.compressor = append(observedOpts.compressor, o.CompressorType)
4986 case grpc.ContentSubtypeCallOption:
4987 observedOpts.subtype = append(observedOpts.subtype, o.ContentSubtype)
4988 }
4989 }
4990 }
4991
4992 te.unaryClientInt = func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
4993 populateOpts(opts)
4994 return nil
4995 }
4996 te.streamClientInt = func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
4997 populateOpts(opts)
4998 return nil, nil
4999 }
5000
5001 defaults := []grpc.CallOption{
5002 grpc.WaitForReady(true),
5003 grpc.MaxCallRecvMsgSize(1010),
5004 }
5005 tc := testgrpc.NewTestServiceClient(te.clientConn(grpc.WithDefaultCallOptions(defaults...)))
5006
5007 var headers metadata.MD
5008 var trailers metadata.MD
5009 var pr peer.Peer
5010 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
5011 defer cancel()
5012 tc.UnaryCall(ctx, &testpb.SimpleRequest{},
5013 grpc.MaxCallRecvMsgSize(100),
5014 grpc.MaxCallSendMsgSize(200),
5015 grpc.PerRPCCredentials(testPerRPCCredentials{}),
5016 grpc.Header(&headers),
5017 grpc.Trailer(&trailers),
5018 grpc.Peer(&pr))
5019 expected := observedOptions{
5020 failFast: []bool{false},
5021 maxRecvSize: []int{1010, 100},
5022 maxSendSize: []int{200},
5023 creds: []credentials.PerRPCCredentials{testPerRPCCredentials{}},
5024 headers: []*metadata.MD{&headers},
5025 trailers: []*metadata.MD{&trailers},
5026 peer: []*peer.Peer{&pr},
5027 }
5028
5029 if !reflect.DeepEqual(expected, observedOpts) {
5030 t.Errorf("unary call did not observe expected options: expected %#v, got %#v", expected, observedOpts)
5031 }
5032
5033 observedOpts = observedOptions{}
5034
5035 tc.StreamingInputCall(ctx,
5036 grpc.WaitForReady(false),
5037 grpc.MaxCallSendMsgSize(2020),
5038 grpc.UseCompressor("comp-type"),
5039 grpc.CallContentSubtype("json"))
5040 expected = observedOptions{
5041 failFast: []bool{false, true},
5042 maxRecvSize: []int{1010},
5043 maxSendSize: []int{2020},
5044 compressor: []string{"comp-type"},
5045 subtype: []string{"json"},
5046 }
5047
5048 if !reflect.DeepEqual(expected, observedOpts) {
5049 t.Errorf("streaming call did not observe expected options: expected %#v, got %#v", expected, observedOpts)
5050 }
5051 }
5052
5053 func (s) TestServeExitsWhenListenerClosed(t *testing.T) {
5054 ss := &stubserver.StubServer{
5055 EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) {
5056 return &testpb.Empty{}, nil
5057 },
5058 }
5059
5060 s := grpc.NewServer()
5061 defer s.Stop()
5062 testgrpc.RegisterTestServiceServer(s, ss)
5063
5064 lis, err := net.Listen("tcp", "localhost:0")
5065 if err != nil {
5066 t.Fatalf("Failed to create listener: %v", err)
5067 }
5068
5069 done := make(chan struct{})
5070 go func() {
5071 s.Serve(lis)
5072 close(done)
5073 }()
5074
5075 cc, err := grpc.Dial(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
5076 if err != nil {
5077 t.Fatalf("Failed to dial server: %v", err)
5078 }
5079 defer cc.Close()
5080 c := testgrpc.NewTestServiceClient(cc)
5081 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
5082 defer cancel()
5083 if _, err := c.EmptyCall(ctx, &testpb.Empty{}); err != nil {
5084 t.Fatalf("Failed to send test RPC to server: %v", err)
5085 }
5086
5087 if err := lis.Close(); err != nil {
5088 t.Fatalf("Failed to close listener: %v", err)
5089 }
5090 const timeout = 5 * time.Second
5091 timer := time.NewTimer(timeout)
5092 select {
5093 case <-done:
5094 return
5095 case <-timer.C:
5096 t.Fatalf("Serve did not return after %v", timeout)
5097 }
5098 }
5099
5100
5101 func (s) TestStatusInvalidUTF8Message(t *testing.T) {
5102 var (
5103 origMsg = string([]byte{0xff, 0xfe, 0xfd})
5104 wantMsg = "���"
5105 )
5106
5107 ss := &stubserver.StubServer{
5108 EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
5109 return nil, status.Errorf(codes.Internal, origMsg)
5110 },
5111 }
5112 if err := ss.Start(nil); err != nil {
5113 t.Fatalf("Error starting endpoint server: %v", err)
5114 }
5115 defer ss.Stop()
5116
5117 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
5118 defer cancel()
5119
5120 if _, err := ss.Client.EmptyCall(ctx, &testpb.Empty{}); status.Convert(err).Message() != wantMsg {
5121 t.Fatalf("ss.Client.EmptyCall(_, _) = _, %v (msg %q); want _, err with msg %q", err, status.Convert(err).Message(), wantMsg)
5122 }
5123 }
5124
5125
5126
5127
5128 func (s) TestStatusInvalidUTF8Details(t *testing.T) {
5129 grpctest.TLogger.ExpectError("Failed to marshal rpc status")
5130
5131 var (
5132 origMsg = string([]byte{0xff, 0xfe, 0xfd})
5133 wantMsg = "���"
5134 )
5135
5136 ss := &stubserver.StubServer{
5137 EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
5138 st := status.New(codes.Internal, origMsg)
5139 st, err := st.WithDetails(&testpb.Empty{})
5140 if err != nil {
5141 return nil, err
5142 }
5143 return nil, st.Err()
5144 },
5145 }
5146 if err := ss.Start(nil); err != nil {
5147 t.Fatalf("Error starting endpoint server: %v", err)
5148 }
5149 defer ss.Stop()
5150
5151 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
5152 defer cancel()
5153
5154 _, err := ss.Client.EmptyCall(ctx, &testpb.Empty{})
5155 st := status.Convert(err)
5156 if st.Message() != wantMsg {
5157 t.Fatalf("ss.Client.EmptyCall(_, _) = _, %v (msg %q); want _, err with msg %q", err, st.Message(), wantMsg)
5158 }
5159 if len(st.Details()) != 0 {
5160
5161 t.Fatalf("RPC status contain details: %v, want no details", st.Details())
5162 }
5163 }
5164
5165 func (s) TestRPCTimeout(t *testing.T) {
5166 for _, e := range listTestEnv() {
5167 testRPCTimeout(t, e)
5168 }
5169 }
5170
5171 func testRPCTimeout(t *testing.T, e env) {
5172 te := newTest(t, e)
5173 te.startServer(&testServer{security: e.security, unaryCallSleepTime: 500 * time.Millisecond})
5174 defer te.tearDown()
5175
5176 cc := te.clientConn()
5177 tc := testgrpc.NewTestServiceClient(cc)
5178
5179 const argSize = 2718
5180 const respSize = 314
5181
5182 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
5183 if err != nil {
5184 t.Fatal(err)
5185 }
5186
5187 req := &testpb.SimpleRequest{
5188 ResponseType: testpb.PayloadType_COMPRESSABLE,
5189 ResponseSize: respSize,
5190 Payload: payload,
5191 }
5192 for i := -1; i <= 10; i++ {
5193 ctx, cancel := context.WithTimeout(context.Background(), time.Duration(i)*time.Millisecond)
5194 if _, err := tc.UnaryCall(ctx, req); status.Code(err) != codes.DeadlineExceeded {
5195 t.Fatalf("TestService/UnaryCallv(_, _) = _, %v; want <nil>, error code: %s", err, codes.DeadlineExceeded)
5196 }
5197 cancel()
5198 }
5199 }
5200
5201 func (s) TestDisabledIOBuffers(t *testing.T) {
5202 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(60000))
5203 if err != nil {
5204 t.Fatalf("Failed to create payload: %v", err)
5205 }
5206 req := &testpb.StreamingOutputCallRequest{
5207 Payload: payload,
5208 }
5209 resp := &testpb.StreamingOutputCallResponse{
5210 Payload: payload,
5211 }
5212
5213 ss := &stubserver.StubServer{
5214 FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
5215 for {
5216 in, err := stream.Recv()
5217 if err == io.EOF {
5218 return nil
5219 }
5220 if err != nil {
5221 t.Errorf("stream.Recv() = _, %v, want _, <nil>", err)
5222 return err
5223 }
5224 if !reflect.DeepEqual(in.Payload.Body, payload.Body) {
5225 t.Errorf("Received message(len: %v) on server not what was expected(len: %v).", len(in.Payload.Body), len(payload.Body))
5226 return err
5227 }
5228 if err := stream.Send(resp); err != nil {
5229 t.Errorf("stream.Send(_)= %v, want <nil>", err)
5230 return err
5231 }
5232
5233 }
5234 },
5235 }
5236
5237 s := grpc.NewServer(grpc.WriteBufferSize(0), grpc.ReadBufferSize(0))
5238 testgrpc.RegisterTestServiceServer(s, ss)
5239
5240 lis, err := net.Listen("tcp", "localhost:0")
5241 if err != nil {
5242 t.Fatalf("Failed to create listener: %v", err)
5243 }
5244
5245 go func() {
5246 s.Serve(lis)
5247 }()
5248 defer s.Stop()
5249 dctx, dcancel := context.WithTimeout(context.Background(), defaultTestTimeout)
5250 defer dcancel()
5251 cc, err := grpc.DialContext(dctx, lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithWriteBufferSize(0), grpc.WithReadBufferSize(0))
5252 if err != nil {
5253 t.Fatalf("Failed to dial server")
5254 }
5255 defer cc.Close()
5256 c := testgrpc.NewTestServiceClient(cc)
5257 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
5258 defer cancel()
5259 stream, err := c.FullDuplexCall(ctx, grpc.WaitForReady(true))
5260 if err != nil {
5261 t.Fatalf("Failed to send test RPC to server")
5262 }
5263 for i := 0; i < 10; i++ {
5264 if err := stream.Send(req); err != nil {
5265 t.Fatalf("stream.Send(_) = %v, want <nil>", err)
5266 }
5267 in, err := stream.Recv()
5268 if err != nil {
5269 t.Fatalf("stream.Recv() = _, %v, want _, <nil>", err)
5270 }
5271 if !reflect.DeepEqual(in.Payload.Body, payload.Body) {
5272 t.Fatalf("Received message(len: %v) on client not what was expected(len: %v).", len(in.Payload.Body), len(payload.Body))
5273 }
5274 }
5275 stream.CloseSend()
5276 if _, err := stream.Recv(); err != io.EOF {
5277 t.Fatalf("stream.Recv() = _, %v, want _, io.EOF", err)
5278 }
5279 }
5280
5281 func (s) TestServerMaxHeaderListSizeClientUserViolation(t *testing.T) {
5282 for _, e := range listTestEnv() {
5283 if e.httpHandler {
5284 continue
5285 }
5286 testServerMaxHeaderListSizeClientUserViolation(t, e)
5287 }
5288 }
5289
5290 func testServerMaxHeaderListSizeClientUserViolation(t *testing.T, e env) {
5291 te := newTest(t, e)
5292 te.maxServerHeaderListSize = new(uint32)
5293 *te.maxServerHeaderListSize = 216
5294 te.startServer(&testServer{security: e.security})
5295 defer te.tearDown()
5296
5297 cc := te.clientConn()
5298 tc := testgrpc.NewTestServiceClient(cc)
5299 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
5300 defer cancel()
5301 metadata.AppendToOutgoingContext(ctx, "oversize", string(make([]byte, 216)))
5302 var err error
5303 if err = verifyResultWithDelay(func() (bool, error) {
5304 if _, err = tc.EmptyCall(ctx, &testpb.Empty{}); err != nil && status.Code(err) == codes.Internal {
5305 return true, nil
5306 }
5307 return false, fmt.Errorf("tc.EmptyCall() = _, err: %v, want _, error code: %v", err, codes.Internal)
5308 }); err != nil {
5309 t.Fatal(err)
5310 }
5311 }
5312
5313 func (s) TestClientMaxHeaderListSizeServerUserViolation(t *testing.T) {
5314 for _, e := range listTestEnv() {
5315 if e.httpHandler {
5316 continue
5317 }
5318 testClientMaxHeaderListSizeServerUserViolation(t, e)
5319 }
5320 }
5321
5322 func testClientMaxHeaderListSizeServerUserViolation(t *testing.T, e env) {
5323 te := newTest(t, e)
5324 te.maxClientHeaderListSize = new(uint32)
5325 *te.maxClientHeaderListSize = 1
5326 te.startServer(&testServer{security: e.security})
5327 defer te.tearDown()
5328
5329 cc := te.clientConn()
5330 tc := testgrpc.NewTestServiceClient(cc)
5331 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
5332 defer cancel()
5333 var err error
5334 if err = verifyResultWithDelay(func() (bool, error) {
5335 if _, err = tc.EmptyCall(ctx, &testpb.Empty{}); err != nil && status.Code(err) == codes.Internal {
5336 return true, nil
5337 }
5338 return false, fmt.Errorf("tc.EmptyCall() = _, err: %v, want _, error code: %v", err, codes.Internal)
5339 }); err != nil {
5340 t.Fatal(err)
5341 }
5342 }
5343
5344 func (s) TestServerMaxHeaderListSizeClientIntentionalViolation(t *testing.T) {
5345 for _, e := range listTestEnv() {
5346 if e.httpHandler || e.security == "tls" {
5347 continue
5348 }
5349 testServerMaxHeaderListSizeClientIntentionalViolation(t, e)
5350 }
5351 }
5352
5353 func testServerMaxHeaderListSizeClientIntentionalViolation(t *testing.T, e env) {
5354 te := newTest(t, e)
5355 te.maxServerHeaderListSize = new(uint32)
5356 *te.maxServerHeaderListSize = 512
5357 te.startServer(&testServer{security: e.security})
5358 defer te.tearDown()
5359
5360 cc, dw := te.clientConnWithConnControl()
5361 tc := &testServiceClientWrapper{TestServiceClient: testgrpc.NewTestServiceClient(cc)}
5362 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
5363 defer cancel()
5364 stream, err := tc.FullDuplexCall(ctx)
5365 if err != nil {
5366 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want _, <nil>", tc, err)
5367 }
5368 rcw := dw.getRawConnWrapper()
5369 val := make([]string, 512)
5370 for i := range val {
5371 val[i] = "a"
5372 }
5373
5374 time.Sleep(100 * time.Millisecond)
5375 rcw.writeHeaders(http2.HeadersFrameParam{
5376 StreamID: tc.getCurrentStreamID(),
5377 BlockFragment: rcw.encodeHeader("oversize", strings.Join(val, "")),
5378 EndStream: false,
5379 EndHeaders: true,
5380 })
5381 if _, err := stream.Recv(); err == nil || status.Code(err) != codes.Internal {
5382 t.Fatalf("stream.Recv() = _, %v, want _, error code: %v", err, codes.Internal)
5383 }
5384 }
5385
5386 func (s) TestClientMaxHeaderListSizeServerIntentionalViolation(t *testing.T) {
5387 for _, e := range listTestEnv() {
5388 if e.httpHandler || e.security == "tls" {
5389 continue
5390 }
5391 testClientMaxHeaderListSizeServerIntentionalViolation(t, e)
5392 }
5393 }
5394
5395 func testClientMaxHeaderListSizeServerIntentionalViolation(t *testing.T, e env) {
5396 te := newTest(t, e)
5397 te.maxClientHeaderListSize = new(uint32)
5398 *te.maxClientHeaderListSize = 200
5399 lw := te.startServerWithConnControl(&testServer{security: e.security, setHeaderOnly: true})
5400 defer te.tearDown()
5401 cc, _ := te.clientConnWithConnControl()
5402 tc := &testServiceClientWrapper{TestServiceClient: testgrpc.NewTestServiceClient(cc)}
5403 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
5404 defer cancel()
5405 stream, err := tc.FullDuplexCall(ctx)
5406 if err != nil {
5407 t.Fatalf("%v.FullDuplexCall(_) = _, %v, want _, <nil>", tc, err)
5408 }
5409 var i int
5410 var rcw *rawConnWrapper
5411 for i = 0; i < 100; i++ {
5412 rcw = lw.getLastConn()
5413 if rcw != nil {
5414 break
5415 }
5416 time.Sleep(10 * time.Millisecond)
5417 continue
5418 }
5419 if i == 100 {
5420 t.Fatalf("failed to create server transport after 1s")
5421 }
5422
5423 val := make([]string, 200)
5424 for i := range val {
5425 val[i] = "a"
5426 }
5427
5428 time.Sleep(100 * time.Millisecond)
5429 rcw.writeHeaders(http2.HeadersFrameParam{
5430 StreamID: tc.getCurrentStreamID(),
5431 BlockFragment: rcw.encodeRawHeader("oversize", strings.Join(val, "")),
5432 EndStream: false,
5433 EndHeaders: true,
5434 })
5435 if _, err := stream.Recv(); err == nil || status.Code(err) != codes.Internal {
5436 t.Fatalf("stream.Recv() = _, %v, want _, error code: %v", err, codes.Internal)
5437 }
5438 }
5439
5440 func (s) TestNetPipeConn(t *testing.T) {
5441
5442
5443 pl := testutils.NewPipeListener()
5444 s := grpc.NewServer()
5445 defer s.Stop()
5446 ts := &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
5447 return &testpb.SimpleResponse{}, nil
5448 }}
5449 testgrpc.RegisterTestServiceServer(s, ts)
5450 go s.Serve(pl)
5451 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
5452 defer cancel()
5453 cc, err := grpc.DialContext(ctx, "", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDialer(pl.Dialer()))
5454 if err != nil {
5455 t.Fatalf("Error creating client: %v", err)
5456 }
5457 defer cc.Close()
5458 client := testgrpc.NewTestServiceClient(cc)
5459 if _, err := client.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
5460 t.Fatalf("UnaryCall(_) = _, %v; want _, nil", err)
5461 }
5462 }
5463
5464 func (s) TestLargeTimeout(t *testing.T) {
5465 for _, e := range listTestEnv() {
5466 testLargeTimeout(t, e)
5467 }
5468 }
5469
5470 func testLargeTimeout(t *testing.T, e env) {
5471 te := newTest(t, e)
5472 te.declareLogNoise("Server.processUnaryRPC failed to write status")
5473
5474 ts := &funcServer{}
5475 te.startServer(ts)
5476 defer te.tearDown()
5477 tc := testgrpc.NewTestServiceClient(te.clientConn())
5478
5479 timeouts := []time.Duration{
5480 time.Duration(math.MaxInt64),
5481
5482 2562047 * time.Hour,
5483 }
5484
5485 for i, maxTimeout := range timeouts {
5486 ts.unaryCall = func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
5487 deadline, ok := ctx.Deadline()
5488 timeout := time.Until(deadline)
5489 minTimeout := maxTimeout - 5*time.Second
5490 if !ok || timeout < minTimeout || timeout > maxTimeout {
5491 t.Errorf("ctx.Deadline() = (now+%v), %v; want [%v, %v], true", timeout, ok, minTimeout, maxTimeout)
5492 return nil, status.Error(codes.OutOfRange, "deadline error")
5493 }
5494 return &testpb.SimpleResponse{}, nil
5495 }
5496
5497 ctx, cancel := context.WithTimeout(context.Background(), maxTimeout)
5498 defer cancel()
5499
5500 if _, err := tc.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
5501 t.Errorf("case %v: UnaryCall(_) = _, %v; want _, nil", i, err)
5502 }
5503 }
5504 }
5505
5506 func listenWithNotifyingListener(network, address string, event *grpcsync.Event) (net.Listener, error) {
5507 lis, err := net.Listen(network, address)
5508 if err != nil {
5509 return nil, err
5510 }
5511 return notifyingListener{connEstablished: event, Listener: lis}, nil
5512 }
5513
5514 type notifyingListener struct {
5515 connEstablished *grpcsync.Event
5516 net.Listener
5517 }
5518
5519 func (lis notifyingListener) Accept() (net.Conn, error) {
5520 defer lis.connEstablished.Fire()
5521 return lis.Listener.Accept()
5522 }
5523
5524 func (s) TestRPCWaitsForResolver(t *testing.T) {
5525 te := testServiceConfigSetup(t, tcpClearRREnv)
5526 te.startServer(&testServer{security: tcpClearRREnv.security})
5527 defer te.tearDown()
5528 r := manual.NewBuilderWithScheme("whatever")
5529
5530 te.resolverScheme = r.Scheme()
5531 cc := te.clientConn(grpc.WithResolvers(r))
5532 tc := testgrpc.NewTestServiceClient(cc)
5533
5534 ctx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
5535 defer cancel()
5536
5537 if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded {
5538 t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
5539 }
5540
5541 ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout)
5542 defer cancel()
5543 go func() {
5544 time.Sleep(time.Second)
5545 r.UpdateState(resolver.State{
5546 Addresses: []resolver.Address{{Addr: te.srvAddr}},
5547 ServiceConfig: parseServiceConfig(t, r, `{
5548 "methodConfig": [
5549 {
5550 "name": [
5551 {
5552 "service": "grpc.testing.TestService",
5553 "method": "UnaryCall"
5554 }
5555 ],
5556 "maxRequestMessageBytes": 0
5557 }
5558 ]
5559 }`)})
5560 }()
5561
5562
5563
5564 payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 1)
5565 if err != nil {
5566 t.Fatal(err)
5567 }
5568 if _, err := tc.UnaryCall(ctx, &testpb.SimpleRequest{Payload: payload}); status.Code(err) != codes.ResourceExhausted {
5569 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, nil", err)
5570 }
5571 if got := ctx.Err(); got != nil {
5572 t.Fatalf("ctx.Err() = %v; want nil (deadline should be set short by service config)", got)
5573 }
5574 if _, err := tc.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
5575 t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, nil", err)
5576 }
5577 }
5578
5579 type httpServerResponse struct {
5580 headers [][]string
5581 payload []byte
5582 trailers [][]string
5583 }
5584
5585 type httpServer struct {
5586
5587
5588 waitForEndStream bool
5589 refuseStream func(uint32) bool
5590 responses []httpServerResponse
5591 }
5592
5593 func (s *httpServer) writeHeader(framer *http2.Framer, sid uint32, headerFields []string, endStream bool) error {
5594 if len(headerFields)%2 == 1 {
5595 panic("odd number of kv args")
5596 }
5597
5598 var buf bytes.Buffer
5599 henc := hpack.NewEncoder(&buf)
5600 for len(headerFields) > 0 {
5601 k, v := headerFields[0], headerFields[1]
5602 headerFields = headerFields[2:]
5603 henc.WriteField(hpack.HeaderField{Name: k, Value: v})
5604 }
5605
5606 return framer.WriteHeaders(http2.HeadersFrameParam{
5607 StreamID: sid,
5608 BlockFragment: buf.Bytes(),
5609 EndStream: endStream,
5610 EndHeaders: true,
5611 })
5612 }
5613
5614 func (s *httpServer) writePayload(framer *http2.Framer, sid uint32, payload []byte) error {
5615 return framer.WriteData(sid, false, payload)
5616 }
5617
5618 func (s *httpServer) start(t *testing.T, lis net.Listener) {
5619
5620 go func() {
5621 conn, err := lis.Accept()
5622 if err != nil {
5623 t.Errorf("Error accepting connection: %v", err)
5624 return
5625 }
5626 defer conn.Close()
5627
5628 if _, err = io.ReadFull(conn, make([]byte, len(http2.ClientPreface))); err != nil {
5629 t.Errorf("Error at server-side while reading preface from client. Err: %v", err)
5630 return
5631 }
5632 reader := bufio.NewReader(conn)
5633 writer := bufio.NewWriter(conn)
5634 framer := http2.NewFramer(writer, reader)
5635 if err = framer.WriteSettingsAck(); err != nil {
5636 t.Errorf("Error at server-side while sending Settings ack. Err: %v", err)
5637 return
5638 }
5639 writer.Flush()
5640 var sid uint32
5641
5642 for requestNum := 0; ; requestNum = (requestNum + 1) % len(s.responses) {
5643
5644 for {
5645 frame, err := framer.ReadFrame()
5646 if err != nil {
5647 if !isConnClosedErr(err) {
5648 t.Errorf("Error at server-side while reading frame. got: %q, want: rpc error containing substring %q OR %q", err, possibleConnResetMsg, possibleEOFMsg)
5649 }
5650 return
5651 }
5652 sid = 0
5653 switch fr := frame.(type) {
5654 case *http2.HeadersFrame:
5655
5656
5657 if !s.waitForEndStream || fr.StreamEnded() {
5658 sid = fr.Header().StreamID
5659 }
5660
5661 case *http2.DataFrame:
5662
5663
5664
5665
5666 if s.waitForEndStream && fr.StreamEnded() {
5667 sid = fr.Header().StreamID
5668 }
5669 }
5670 if sid != 0 {
5671 if s.refuseStream == nil || !s.refuseStream(sid) {
5672 break
5673 }
5674 framer.WriteRSTStream(sid, http2.ErrCodeRefusedStream)
5675 writer.Flush()
5676 }
5677 }
5678
5679 response := s.responses[requestNum]
5680 for _, header := range response.headers {
5681 if err = s.writeHeader(framer, sid, header, false); err != nil {
5682 t.Errorf("Error at server-side while writing headers. Err: %v", err)
5683 return
5684 }
5685 writer.Flush()
5686 }
5687 if response.payload != nil {
5688 if err = s.writePayload(framer, sid, response.payload); err != nil {
5689 t.Errorf("Error at server-side while writing payload. Err: %v", err)
5690 return
5691 }
5692 writer.Flush()
5693 }
5694 for i, trailer := range response.trailers {
5695 if err = s.writeHeader(framer, sid, trailer, i == len(response.trailers)-1); err != nil {
5696 t.Errorf("Error at server-side while writing trailers. Err: %v", err)
5697 return
5698 }
5699 writer.Flush()
5700 }
5701 }
5702 }()
5703 }
5704
5705 func (s) TestClientCancellationPropagatesUnary(t *testing.T) {
5706 wg := &sync.WaitGroup{}
5707 called, done := make(chan struct{}), make(chan struct{})
5708 ss := &stubserver.StubServer{
5709 EmptyCallF: func(ctx context.Context, _ *testpb.Empty) (*testpb.Empty, error) {
5710 close(called)
5711 <-ctx.Done()
5712 err := ctx.Err()
5713 if err != context.Canceled {
5714 t.Errorf("ctx.Err() = %v; want context.Canceled", err)
5715 }
5716 close(done)
5717 return nil, err
5718 },
5719 }
5720 if err := ss.Start(nil); err != nil {
5721 t.Fatalf("Error starting endpoint server: %v", err)
5722 }
5723 defer ss.Stop()
5724
5725 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
5726
5727 wg.Add(1)
5728 go func() {
5729 if _, err := ss.Client.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Canceled {
5730 t.Errorf("ss.Client.EmptyCall() = _, %v; want _, Code()=codes.Canceled", err)
5731 }
5732 wg.Done()
5733 }()
5734
5735 select {
5736 case <-called:
5737 case <-time.After(5 * time.Second):
5738 t.Fatalf("failed to perform EmptyCall after 10s")
5739 }
5740 cancel()
5741 select {
5742 case <-done:
5743 case <-time.After(5 * time.Second):
5744 t.Fatalf("server failed to close done chan due to cancellation propagation")
5745 }
5746 wg.Wait()
5747 }
5748
5749
5750
5751 func (s) TestCanceledRPCCallOptionRace(t *testing.T) {
5752 ss := &stubserver.StubServer{
5753 FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
5754 err := stream.Send(&testpb.StreamingOutputCallResponse{})
5755 if err != nil {
5756 return err
5757 }
5758 <-stream.Context().Done()
5759 return nil
5760 },
5761 }
5762 if err := ss.Start(nil); err != nil {
5763 t.Fatalf("Error starting endpoint server: %v", err)
5764 }
5765 defer ss.Stop()
5766
5767 const count = 1000
5768 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
5769 defer cancel()
5770
5771 var wg sync.WaitGroup
5772 wg.Add(count)
5773 for i := 0; i < count; i++ {
5774 go func() {
5775 defer wg.Done()
5776 var p peer.Peer
5777 ctx, cancel := context.WithCancel(ctx)
5778 defer cancel()
5779 stream, err := ss.Client.FullDuplexCall(ctx, grpc.Peer(&p))
5780 if err != nil {
5781 t.Errorf("_.FullDuplexCall(_) = _, %v", err)
5782 return
5783 }
5784 if err := stream.Send(&testpb.StreamingOutputCallRequest{}); err != nil {
5785 t.Errorf("_ has error %v while sending", err)
5786 return
5787 }
5788 if _, err := stream.Recv(); err != nil {
5789 t.Errorf("%v.Recv() = %v", stream, err)
5790 return
5791 }
5792 cancel()
5793 if _, err := stream.Recv(); status.Code(err) != codes.Canceled {
5794 t.Errorf("%v compleled with error %v, want %s", stream, err, codes.Canceled)
5795 return
5796 }
5797
5798
5799 if p.Addr == nil {
5800 t.Errorf("peer.Addr is nil, want non-nil")
5801 return
5802 }
5803 }()
5804 }
5805 wg.Wait()
5806 }
5807
5808 func (s) TestClientSettingsFloodCloseConn(t *testing.T) {
5809
5810
5811
5812
5813 s := grpc.NewServer(grpc.WriteBufferSize(20))
5814 l := bufconn.Listen(20)
5815 go s.Serve(l)
5816
5817
5818 conn, err := l.Dial()
5819 if err != nil {
5820 t.Fatalf("Error dialing bufconn: %v", err)
5821 }
5822
5823 n, err := conn.Write([]byte(http2.ClientPreface))
5824 if err != nil || n != len(http2.ClientPreface) {
5825 t.Fatalf("Error writing client preface: %v, %v", n, err)
5826 }
5827
5828 fr := http2.NewFramer(conn, conn)
5829 f, err := fr.ReadFrame()
5830 if err != nil {
5831 t.Fatalf("Error reading initial settings frame: %v", err)
5832 }
5833 if _, ok := f.(*http2.SettingsFrame); ok {
5834 if err := fr.WriteSettingsAck(); err != nil {
5835 t.Fatalf("Error writing settings ack: %v", err)
5836 }
5837 } else {
5838 t.Fatalf("Error reading initial settings frame: type=%T", f)
5839 }
5840
5841
5842 if err = fr.WriteSettings(); err != nil {
5843 t.Fatalf("Error writing settings frame: %v", err)
5844 }
5845 if f, err = fr.ReadFrame(); err != nil {
5846 t.Fatalf("Error reading frame: %v", err)
5847 }
5848 if sf, ok := f.(*http2.SettingsFrame); !ok || !sf.IsAck() {
5849 t.Fatalf("Unexpected frame: %v", f)
5850 }
5851
5852
5853
5854 for {
5855 conn.SetWriteDeadline(time.Now().Add(50 * time.Millisecond))
5856 if err := fr.WriteSettings(); err != nil {
5857 if to, ok := err.(interface{ Timeout() bool }); !ok || !to.Timeout() {
5858 t.Fatalf("Received unexpected write error: %v", err)
5859 }
5860 break
5861 }
5862 }
5863 conn.Close()
5864
5865
5866
5867
5868
5869
5870
5871 timer := time.AfterFunc(5*time.Second, func() {
5872 t.Errorf("Timeout waiting for GracefulStop to return")
5873 s.Stop()
5874 })
5875 s.GracefulStop()
5876 timer.Stop()
5877 }
5878
5879 func unaryInterceptorVerifyConn(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
5880 conn := transport.GetConnection(ctx)
5881 if conn == nil {
5882 return nil, status.Error(codes.NotFound, "connection was not in context")
5883 }
5884 return nil, status.Error(codes.OK, "")
5885 }
5886
5887
5888
5889 func (s) TestUnaryServerInterceptorGetsConnection(t *testing.T) {
5890 ss := &stubserver.StubServer{}
5891 if err := ss.Start([]grpc.ServerOption{grpc.UnaryInterceptor(unaryInterceptorVerifyConn)}); err != nil {
5892 t.Fatalf("Error starting endpoint server: %v", err)
5893 }
5894 defer ss.Stop()
5895
5896 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
5897 defer cancel()
5898
5899 if _, err := ss.Client.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.OK {
5900 t.Fatalf("ss.Client.EmptyCall(_, _) = _, %v, want _, error code %s", err, codes.OK)
5901 }
5902 }
5903
5904 func streamingInterceptorVerifyConn(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
5905 conn := transport.GetConnection(ss.Context())
5906 if conn == nil {
5907 return status.Error(codes.NotFound, "connection was not in context")
5908 }
5909 return status.Error(codes.OK, "")
5910 }
5911
5912
5913
5914 func (s) TestStreamingServerInterceptorGetsConnection(t *testing.T) {
5915 ss := &stubserver.StubServer{}
5916 if err := ss.Start([]grpc.ServerOption{grpc.StreamInterceptor(streamingInterceptorVerifyConn)}); err != nil {
5917 t.Fatalf("Error starting endpoint server: %v", err)
5918 }
5919 defer ss.Stop()
5920
5921 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
5922 defer cancel()
5923
5924 s, err := ss.Client.StreamingOutputCall(ctx, &testpb.StreamingOutputCallRequest{})
5925 if err != nil {
5926 t.Fatalf("ss.Client.StreamingOutputCall(_) = _, %v, want _, <nil>", err)
5927 }
5928 if _, err := s.Recv(); err != io.EOF {
5929 t.Fatalf("ss.Client.StreamingInputCall(_) = _, %v, want _, %v", err, io.EOF)
5930 }
5931 }
5932
5933
5934
5935
5936 func unaryInterceptorVerifyAuthority(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
5937 md, ok := metadata.FromIncomingContext(ctx)
5938 if !ok {
5939 return nil, status.Error(codes.NotFound, "metadata was not in context")
5940 }
5941 authority := md.Get(":authority")
5942 if len(authority) > 1 {
5943 return nil, status.Error(codes.NotFound, ":authority value had more than one value")
5944 }
5945
5946
5947 host := md.Get("host")
5948 if len(host) != 0 {
5949 return nil, status.Error(codes.NotFound, "host header should not be present in metadata")
5950 }
5951
5952
5953 if len(authority) == 0 {
5954
5955 return nil, status.Error(codes.NotFound, "")
5956 }
5957 return nil, status.Error(codes.NotFound, authority[0])
5958 }
5959
5960
5961
5962 func (s) TestAuthorityHeader(t *testing.T) {
5963 tests := []struct {
5964 name string
5965 headers []string
5966 wantAuthority string
5967 }{
5968
5969 {
5970 name: "Missing :authority",
5971
5972
5973 headers: []string{
5974 ":method", "POST",
5975 ":path", "/grpc.testing.TestService/UnaryCall",
5976 "content-type", "application/grpc",
5977 "te", "trailers",
5978 "host", "localhost",
5979 },
5980 wantAuthority: "localhost",
5981 },
5982 {
5983 name: "Missing :authority and host",
5984
5985
5986 headers: []string{
5987 ":method", "POST",
5988 ":path", "/grpc.testing.TestService/UnaryCall",
5989 "content-type", "application/grpc",
5990 "te", "trailers",
5991 },
5992 wantAuthority: "",
5993 },
5994
5995 {
5996 name: ":authority and host present",
5997
5998
5999 headers: []string{
6000 ":method", "POST",
6001 ":path", "/grpc.testing.TestService/UnaryCall",
6002 ":authority", "localhost",
6003 "content-type", "application/grpc",
6004 "host", "localhost2",
6005 },
6006 wantAuthority: "localhost",
6007 },
6008 }
6009 for _, test := range tests {
6010 t.Run(test.name, func(t *testing.T) {
6011 te := newTest(t, tcpClearRREnv)
6012 ts := &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
6013 return &testpb.SimpleResponse{}, nil
6014 }}
6015 te.unaryServerInt = unaryInterceptorVerifyAuthority
6016 te.startServer(ts)
6017 defer te.tearDown()
6018 success := testutils.NewChannel()
6019 te.withServerTester(func(st *serverTester) {
6020 st.writeHeaders(http2.HeadersFrameParam{
6021 StreamID: 1,
6022 BlockFragment: st.encodeHeader(test.headers...),
6023 EndStream: false,
6024 EndHeaders: true,
6025 })
6026 st.writeData(1, true, []byte{0, 0, 0, 0, 0})
6027
6028 for {
6029 frame := st.wantAnyFrame()
6030 f, ok := frame.(*http2.MetaHeadersFrame)
6031 if !ok {
6032 continue
6033 }
6034 for _, header := range f.Fields {
6035 if header.Name == "grpc-message" {
6036 success.Send(header.Value)
6037 return
6038 }
6039 }
6040 }
6041 })
6042
6043 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
6044 defer cancel()
6045 gotAuthority, err := success.Receive(ctx)
6046 if err != nil {
6047 t.Fatalf("Error receiving from channel: %v", err)
6048 }
6049 if gotAuthority != test.wantAuthority {
6050 t.Fatalf("gotAuthority: %v, wantAuthority %v", gotAuthority, test.wantAuthority)
6051 }
6052 })
6053 }
6054 }
6055
6056
6057
6058 type wrapCloseListener struct {
6059 net.Listener
6060 connsOpen int32
6061 }
6062
6063
6064
6065 type wrapCloseConn struct {
6066 net.Conn
6067 lis *wrapCloseListener
6068 closeOnce sync.Once
6069 }
6070
6071 func (w *wrapCloseListener) Accept() (net.Conn, error) {
6072 conn, err := w.Listener.Accept()
6073 if err != nil {
6074 return nil, err
6075 }
6076 atomic.AddInt32(&w.connsOpen, 1)
6077 return &wrapCloseConn{Conn: conn, lis: w}, nil
6078 }
6079
6080 func (w *wrapCloseConn) Close() error {
6081 defer w.closeOnce.Do(func() { atomic.AddInt32(&w.lis.connsOpen, -1) })
6082 return w.Conn.Close()
6083 }
6084
6085
6086
6087 func (s) TestServerClosesConn(t *testing.T) {
6088 lis := bufconn.Listen(20)
6089 wrapLis := &wrapCloseListener{Listener: lis}
6090
6091 s := grpc.NewServer()
6092 go s.Serve(wrapLis)
6093 defer s.Stop()
6094
6095 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
6096 defer cancel()
6097
6098 for i := 0; i < 10; i++ {
6099 conn, err := lis.DialContext(ctx)
6100 if err != nil {
6101 t.Fatalf("Dial = _, %v; want _, nil", err)
6102 }
6103 conn.Close()
6104 }
6105 for ctx.Err() == nil {
6106 if atomic.LoadInt32(&wrapLis.connsOpen) == 0 {
6107 return
6108 }
6109 time.Sleep(50 * time.Millisecond)
6110 }
6111 t.Fatalf("timed out waiting for conns to be closed by server; still open: %v", atomic.LoadInt32(&wrapLis.connsOpen))
6112 }
6113
6114
6115
6116 func (s) TestNilStatsHandler(t *testing.T) {
6117 grpctest.TLogger.ExpectErrorN("ignoring nil parameter", 2)
6118 ss := &stubserver.StubServer{
6119 UnaryCallF: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
6120 return &testpb.SimpleResponse{}, nil
6121 },
6122 }
6123 if err := ss.Start([]grpc.ServerOption{grpc.StatsHandler(nil)}, grpc.WithStatsHandler(nil)); err != nil {
6124 t.Fatalf("Error starting endpoint server: %v", err)
6125 }
6126 defer ss.Stop()
6127
6128 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
6129 defer cancel()
6130 if _, err := ss.Client.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
6131 t.Fatalf("Unexpected error from UnaryCall: %v", err)
6132 }
6133 }
6134
6135
6136
6137
6138
6139 func (s) TestUnexpectedEOF(t *testing.T) {
6140 ss := &stubserver.StubServer{
6141 UnaryCallF: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
6142 return &testpb.SimpleResponse{
6143 Payload: &testpb.Payload{
6144 Body: bytes.Repeat([]byte("a"), int(in.ResponseSize)),
6145 },
6146 }, nil
6147 },
6148 }
6149 if err := ss.Start([]grpc.ServerOption{}); err != nil {
6150 t.Fatalf("Error starting endpoint server: %v", err)
6151 }
6152 defer ss.Stop()
6153
6154 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
6155 defer cancel()
6156 for i := 0; i < 10; i++ {
6157
6158
6159 _, err := ss.Client.UnaryCall(ctx, &testpb.SimpleRequest{ResponseSize: 4194304})
6160 if code := status.Code(err); code != codes.ResourceExhausted {
6161 t.Fatalf("UnaryCall RPC returned error: %v, want status code %v", err, codes.ResourceExhausted)
6162 }
6163
6164
6165 if _, err := ss.Client.UnaryCall(ctx, &testpb.SimpleRequest{ResponseSize: 275075}); err != nil {
6166 t.Fatalf("UnaryCall RPC failed: %v", err)
6167 }
6168 }
6169 }
6170
6171
6172
6173
6174
6175
6176
6177 func (s) TestRecvWhileReturningStatus(t *testing.T) {
6178 ss := &stubserver.StubServer{
6179 FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
6180
6181
6182 go stream.Recv()
6183 return nil
6184 },
6185 }
6186 if err := ss.Start(nil); err != nil {
6187 t.Fatalf("Error starting endpoint server: %v", err)
6188 }
6189 defer ss.Stop()
6190 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
6191 defer cancel()
6192 for i := 0; i < 100; i++ {
6193 stream, err := ss.Client.FullDuplexCall(ctx)
6194 if err != nil {
6195 t.Fatalf("Error while creating stream: %v", err)
6196 }
6197 if _, err := stream.Recv(); err != io.EOF {
6198 t.Fatalf("stream.Recv() = %v, want io.EOF", err)
6199 }
6200 }
6201 }
6202
6203 type mockBinaryLogger struct {
6204 mml *mockMethodLogger
6205 }
6206
6207 func newMockBinaryLogger() *mockBinaryLogger {
6208 return &mockBinaryLogger{
6209 mml: &mockMethodLogger{},
6210 }
6211 }
6212
6213 func (mbl *mockBinaryLogger) GetMethodLogger(string) binarylog.MethodLogger {
6214 return mbl.mml
6215 }
6216
6217 type mockMethodLogger struct {
6218 events uint64
6219 }
6220
6221 func (mml *mockMethodLogger) Log(context.Context, binarylog.LogEntryConfig) {
6222 atomic.AddUint64(&mml.events, 1)
6223 }
6224
6225
6226
6227
6228
6229
6230 func (s) TestGlobalBinaryLoggingOptions(t *testing.T) {
6231 csbl := newMockBinaryLogger()
6232 ssbl := newMockBinaryLogger()
6233
6234 internal.AddGlobalDialOptions.(func(opt ...grpc.DialOption))(internal.WithBinaryLogger.(func(bl binarylog.Logger) grpc.DialOption)(csbl))
6235 internal.AddGlobalServerOptions.(func(opt ...grpc.ServerOption))(internal.BinaryLogger.(func(bl binarylog.Logger) grpc.ServerOption)(ssbl))
6236 defer func() {
6237 internal.ClearGlobalDialOptions()
6238 internal.ClearGlobalServerOptions()
6239 }()
6240 ss := &stubserver.StubServer{
6241 UnaryCallF: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
6242 return &testpb.SimpleResponse{}, nil
6243 },
6244 FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
6245 _, err := stream.Recv()
6246 if err == io.EOF {
6247 return nil
6248 }
6249 return status.Errorf(codes.Unknown, "expected client to call CloseSend")
6250 },
6251 }
6252
6253
6254
6255 if err := ss.Start(nil); err != nil {
6256 t.Fatalf("Error starting endpoint server: %v", err)
6257 }
6258 defer ss.Stop()
6259
6260 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
6261 defer cancel()
6262
6263 if _, err := ss.Client.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
6264 t.Fatalf("Unexpected error from UnaryCall: %v", err)
6265 }
6266 if csbl.mml.events != 5 {
6267 t.Fatalf("want 5 client side binary logging events, got %v", csbl.mml.events)
6268 }
6269 if ssbl.mml.events != 5 {
6270 t.Fatalf("want 5 server side binary logging events, got %v", ssbl.mml.events)
6271 }
6272
6273
6274 stream, err := ss.Client.FullDuplexCall(ctx)
6275 if err != nil {
6276 t.Fatalf("ss.Client.FullDuplexCall failed: %f", err)
6277 }
6278
6279 stream.CloseSend()
6280 if _, err = stream.Recv(); err != io.EOF {
6281 t.Fatalf("unexpected error: %v, expected an EOF error", err)
6282 }
6283
6284 if csbl.mml.events != 8 {
6285 t.Fatalf("want 8 client side binary logging events, got %v", csbl.mml.events)
6286 }
6287 if ssbl.mml.events != 8 {
6288 t.Fatalf("want 8 server side binary logging events, got %v", ssbl.mml.events)
6289 }
6290 }
6291
6292 type statsHandlerRecordEvents struct {
6293 mu sync.Mutex
6294 s []stats.RPCStats
6295 }
6296
6297 func (*statsHandlerRecordEvents) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) context.Context {
6298 return ctx
6299 }
6300 func (h *statsHandlerRecordEvents) HandleRPC(_ context.Context, s stats.RPCStats) {
6301 h.mu.Lock()
6302 defer h.mu.Unlock()
6303 h.s = append(h.s, s)
6304 }
6305 func (*statsHandlerRecordEvents) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context {
6306 return ctx
6307 }
6308 func (*statsHandlerRecordEvents) HandleConn(context.Context, stats.ConnStats) {}
6309
6310 type triggerRPCBlockPicker struct {
6311 pickDone func()
6312 }
6313
6314 func (bp *triggerRPCBlockPicker) Pick(pi balancer.PickInfo) (balancer.PickResult, error) {
6315 bp.pickDone()
6316 return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
6317 }
6318
6319 const name = "triggerRPCBlockBalancer"
6320
6321 type triggerRPCBlockPickerBalancerBuilder struct{}
6322
6323 func (triggerRPCBlockPickerBalancerBuilder) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
6324 b := &triggerRPCBlockBalancer{
6325 blockingPickerDone: grpcsync.NewEvent(),
6326 ClientConn: cc,
6327 }
6328
6329
6330 builder := balancer.Get(roundrobin.Name)
6331 rr := builder.Build(b, bOpts)
6332 if rr == nil {
6333 panic("round robin builder returned nil")
6334 }
6335 b.Balancer = rr
6336 return b
6337 }
6338
6339 func (triggerRPCBlockPickerBalancerBuilder) ParseConfig(json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
6340 return &bpbConfig{}, nil
6341 }
6342
6343 func (triggerRPCBlockPickerBalancerBuilder) Name() string {
6344 return name
6345 }
6346
6347 type bpbConfig struct {
6348 serviceconfig.LoadBalancingConfig
6349 }
6350
6351
6352
6353
6354
6355
6356
6357 type triggerRPCBlockBalancer struct {
6358 stateMu sync.Mutex
6359 childState balancer.State
6360
6361 blockingPickerDone *grpcsync.Event
6362
6363 balancer.ClientConn
6364
6365 balancer.Balancer
6366 }
6367
6368 func (bpb *triggerRPCBlockBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
6369 err := bpb.Balancer.UpdateClientConnState(s)
6370 bpb.ClientConn.UpdateState(balancer.State{
6371 ConnectivityState: connectivity.Connecting,
6372 Picker: &triggerRPCBlockPicker{
6373 pickDone: func() {
6374 bpb.stateMu.Lock()
6375 defer bpb.stateMu.Unlock()
6376 bpb.blockingPickerDone.Fire()
6377 if bpb.childState.ConnectivityState == connectivity.Ready {
6378 bpb.ClientConn.UpdateState(bpb.childState)
6379 }
6380 },
6381 },
6382 })
6383 return err
6384 }
6385
6386 func (bpb *triggerRPCBlockBalancer) UpdateState(state balancer.State) {
6387 bpb.stateMu.Lock()
6388 defer bpb.stateMu.Unlock()
6389 bpb.childState = state
6390 if bpb.blockingPickerDone.HasFired() {
6391 if state.ConnectivityState == connectivity.Ready {
6392 bpb.ClientConn.UpdateState(state)
6393 }
6394 }
6395 }
6396
6397
6398
6399
6400 func (s) TestRPCBlockingOnPickerStatsCall(t *testing.T) {
6401 sh := &statsHandlerRecordEvents{}
6402 ss := &stubserver.StubServer{
6403 UnaryCallF: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
6404 return &testpb.SimpleResponse{}, nil
6405 },
6406 }
6407
6408 if err := ss.StartServer(); err != nil {
6409 t.Fatalf("Error starting endpoint server: %v", err)
6410 }
6411 defer ss.Stop()
6412
6413 lbCfgJSON := `{
6414 "loadBalancingConfig": [
6415 {
6416 "triggerRPCBlockBalancer": {}
6417 }
6418 ]
6419 }`
6420
6421 sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(lbCfgJSON)
6422 mr := manual.NewBuilderWithScheme("pickerupdatedbalancer")
6423 defer mr.Close()
6424 mr.InitialState(resolver.State{
6425 Addresses: []resolver.Address{
6426 {Addr: ss.Address},
6427 },
6428 ServiceConfig: sc,
6429 })
6430
6431 cc, err := grpc.Dial(mr.Scheme()+":///", grpc.WithResolvers(mr), grpc.WithStatsHandler(sh), grpc.WithTransportCredentials(insecure.NewCredentials()))
6432 if err != nil {
6433 t.Fatalf("grpc.Dial() failed: %v", err)
6434 }
6435 defer cc.Close()
6436 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
6437 defer cancel()
6438 testServiceClient := testgrpc.NewTestServiceClient(cc)
6439 if _, err := testServiceClient.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
6440 t.Fatalf("Unexpected error from UnaryCall: %v", err)
6441 }
6442
6443 var pickerUpdatedCount uint
6444 for _, stat := range sh.s {
6445 if _, ok := stat.(*stats.PickerUpdated); ok {
6446 pickerUpdatedCount++
6447 }
6448 }
6449 if pickerUpdatedCount != 1 {
6450 t.Fatalf("sh.pickerUpdated count: %v, want: %v", pickerUpdatedCount, 2)
6451 }
6452 }
6453
View as plain text