1
18
19 package test
20
21 import (
22 "context"
23 "io"
24 "testing"
25 "time"
26
27 "google.golang.org/grpc"
28 "google.golang.org/grpc/codes"
29 "google.golang.org/grpc/internal/stubserver"
30 "google.golang.org/grpc/status"
31
32 testgrpc "google.golang.org/grpc/interop/grpc_testing"
33 testpb "google.golang.org/grpc/interop/grpc_testing"
34 )
35
36 func (s) TestStreamCleanup(t *testing.T) {
37 const initialWindowSize uint = 70 * 1024
38 const bodySize = 2 * initialWindowSize
39 const callRecvMsgSize uint = 1
40
41 ss := &stubserver.StubServer{
42 UnaryCallF: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
43 return &testpb.SimpleResponse{Payload: &testpb.Payload{
44 Body: make([]byte, bodySize),
45 }}, nil
46 },
47 EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) {
48 return &testpb.Empty{}, nil
49 },
50 }
51 if err := ss.Start(nil, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(int(callRecvMsgSize))), grpc.WithInitialWindowSize(int32(initialWindowSize))); err != nil {
52 t.Fatalf("Error starting endpoint server: %v", err)
53 }
54 defer ss.Stop()
55
56 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
57 defer cancel()
58 if _, err := ss.Client.UnaryCall(ctx, &testpb.SimpleRequest{}); status.Code(err) != codes.ResourceExhausted {
59 t.Fatalf("should fail with ResourceExhausted, message's body size: %v, maximum message size the client can receive: %v", bodySize, callRecvMsgSize)
60 }
61 if _, err := ss.Client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
62 t.Fatalf("should succeed, err: %v", err)
63 }
64 }
65
66 func (s) TestStreamCleanupAfterSendStatus(t *testing.T) {
67 const initialWindowSize uint = 70 * 1024
68 const bodySize = 2 * initialWindowSize
69
70 serverReturnedStatus := make(chan struct{})
71
72 ss := &stubserver.StubServer{
73 FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
74 defer func() {
75 close(serverReturnedStatus)
76 }()
77 return stream.Send(&testpb.StreamingOutputCallResponse{
78 Payload: &testpb.Payload{
79 Body: make([]byte, bodySize),
80 },
81 })
82 },
83 }
84 if err := ss.Start(nil, grpc.WithInitialWindowSize(int32(initialWindowSize))); err != nil {
85 t.Fatalf("Error starting endpoint server: %v", err)
86 }
87 defer ss.Stop()
88
89
90
91
92
93
94 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
95 defer cancel()
96 stream, err := ss.Client.FullDuplexCall(ctx)
97 if err != nil {
98 t.Fatalf("FullDuplexCall= _, %v; want _, <nil>", err)
99 }
100
101
102
103
104
105
106
107
108
109 <-serverReturnedStatus
110
111
112
113
114
115
116
117
118 gracefulStopDone := make(chan struct{})
119 go func() {
120 defer close(gracefulStopDone)
121 ss.S.GracefulStop()
122 }()
123
124
125 if _, err := stream.Recv(); err != nil {
126 t.Fatalf("stream.Recv() = _, %v, want _, <nil>", err)
127 }
128 if _, err := stream.Recv(); err != io.EOF {
129 t.Fatalf("stream.Recv() = _, %v, want _, io.EOF", err)
130 }
131
132 timer := time.NewTimer(time.Second)
133 select {
134 case <-gracefulStopDone:
135 timer.Stop()
136 case <-timer.C:
137 t.Fatalf("s.GracefulStop() didn't finish within 1 second after the last RPC")
138 }
139 }
140
View as plain text