1
18
19 package test
20
21 import (
22 "context"
23 "fmt"
24 "net"
25 "sync"
26 "testing"
27 "time"
28
29 "golang.org/x/net/http2"
30 "google.golang.org/grpc"
31 "google.golang.org/grpc/codes"
32 "google.golang.org/grpc/credentials/insecure"
33 "google.golang.org/grpc/internal/stubserver"
34 "google.golang.org/grpc/status"
35
36 testgrpc "google.golang.org/grpc/interop/grpc_testing"
37 testpb "google.golang.org/grpc/interop/grpc_testing"
38 )
39
40 type delayListener struct {
41 net.Listener
42 closeCalled chan struct{}
43 acceptCalled chan struct{}
44 allowCloseCh chan struct{}
45 dialed bool
46 }
47
48 func (d *delayListener) Accept() (net.Conn, error) {
49 select {
50 case <-d.acceptCalled:
51
52 <-d.closeCalled
53 <-d.allowCloseCh
54 return nil, fmt.Errorf("listener is closed")
55 default:
56 close(d.acceptCalled)
57 conn, err := d.Listener.Accept()
58 if err != nil {
59 return nil, err
60 }
61
62
63
64 d.allowClose()
65 return conn, nil
66 }
67 }
68
69 func (d *delayListener) allowClose() {
70 close(d.allowCloseCh)
71 }
72 func (d *delayListener) Close() error {
73 close(d.closeCalled)
74 go func() {
75 <-d.allowCloseCh
76 d.Listener.Close()
77 }()
78 return nil
79 }
80
81 func (d *delayListener) Dial(ctx context.Context) (net.Conn, error) {
82 if d.dialed {
83
84
85
86 return nil, fmt.Errorf("no more conns")
87 }
88 d.dialed = true
89 return (&net.Dialer{}).DialContext(ctx, "tcp", d.Listener.Addr().String())
90 }
91
92
93
94
95
96
97
98
99
100
101
102 func (s) TestGracefulStop(t *testing.T) {
103 lis, err := net.Listen("tcp", "localhost:0")
104 if err != nil {
105 t.Fatalf("Error listenening: %v", err)
106 }
107 dlis := &delayListener{
108 Listener: lis,
109 acceptCalled: make(chan struct{}),
110 closeCalled: make(chan struct{}),
111 allowCloseCh: make(chan struct{}),
112 }
113 d := func(ctx context.Context, _ string) (net.Conn, error) { return dlis.Dial(ctx) }
114
115 ss := &stubserver.StubServer{
116 FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
117 _, err := stream.Recv()
118 if err != nil {
119 return err
120 }
121 return stream.Send(&testpb.StreamingOutputCallResponse{})
122 },
123 }
124 s := grpc.NewServer()
125 testgrpc.RegisterTestServiceServer(s, ss)
126
127
128 wg := sync.WaitGroup{}
129 wg.Add(1)
130 go func() {
131 s.Serve(dlis)
132 wg.Done()
133 }()
134
135
136
137 <-dlis.acceptCalled
138 wg.Add(1)
139 go func() {
140 s.GracefulStop()
141 wg.Done()
142 }()
143
144
145
146
147 <-dlis.closeCalled
148
149
150
151 ctx, dialCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
152 defer dialCancel()
153 cc, err := grpc.DialContext(ctx, "", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithContextDialer(d))
154 if err != nil {
155 t.Fatalf("grpc.DialContext(_, %q, _) = %v", lis.Addr().String(), err)
156 }
157 client := testgrpc.NewTestServiceClient(cc)
158 defer cc.Close()
159
160
161
162
163 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
164 if _, err = client.FullDuplexCall(ctx); err == nil || status.Code(err) != codes.Unavailable {
165 t.Fatalf("FullDuplexCall= _, %v; want _, <status code Unavailable>", err)
166 }
167 cancel()
168 wg.Wait()
169 }
170
171
172
173
174 func (s) TestGracefulStopClosesConnAfterLastStream(t *testing.T) {
175
176 handlerCalled := make(chan struct{})
177 gracefulStopCalled := make(chan struct{})
178
179 ts := &funcServer{streamingInputCall: func(stream testgrpc.TestService_StreamingInputCallServer) error {
180 close(handlerCalled)
181 <-gracefulStopCalled
182 return nil
183 }}
184
185 te := newTest(t, tcpClearEnv)
186 te.startServer(ts)
187 defer te.tearDown()
188
189 te.withServerTester(func(st *serverTester) {
190 st.writeHeadersGRPC(1, "/grpc.testing.TestService/StreamingInputCall", false)
191
192 <-handlerCalled
193
194
195 gracefulStopDone := make(chan struct{})
196 go func() {
197 te.srv.GracefulStop()
198 close(gracefulStopDone)
199 }()
200 st.wantGoAway(http2.ErrCodeNo)
201 pf := st.wantPing()
202 st.writePing(true, pf.Data)
203 st.wantGoAway(http2.ErrCodeNo)
204
205 close(gracefulStopCalled)
206
207 fr := st.wantAnyFrame()
208 hdr, ok := fr.(*http2.MetaHeadersFrame)
209 if !ok {
210 t.Fatalf("Received unexpected frame of type (%T) from server: %v; want HEADERS", fr, fr)
211 }
212 if !hdr.StreamEnded() {
213 t.Fatalf("Received unexpected HEADERS frame from server: %v; want END_STREAM set", fr)
214 }
215
216 st.wantRSTStream(http2.ErrCodeNo)
217
218 <-gracefulStopDone
219 })
220 }
221
222
223
224 func (s) TestGracefulStopBlocksUntilGRPCConnectionsTerminate(t *testing.T) {
225 unblockGRPCCall := make(chan struct{})
226 grpcCallExecuting := make(chan struct{})
227 ss := &stubserver.StubServer{
228 UnaryCallF: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
229 close(grpcCallExecuting)
230 <-unblockGRPCCall
231 return &testpb.SimpleResponse{}, nil
232 },
233 }
234
235 err := ss.Start(nil)
236 if err != nil {
237 t.Fatalf("StubServer.start failed: %s", err)
238 }
239 t.Cleanup(ss.Stop)
240
241 grpcClientCallReturned := make(chan struct{})
242 go func() {
243 clt := ss.Client
244 _, err := clt.UnaryCall(context.Background(), &testpb.SimpleRequest{})
245 if err != nil {
246 t.Errorf("rpc failed with error: %s", err)
247 }
248 close(grpcClientCallReturned)
249 }()
250
251 gracefulStopReturned := make(chan struct{})
252 <-grpcCallExecuting
253 go func() {
254 ss.S.GracefulStop()
255 close(gracefulStopReturned)
256 }()
257
258 select {
259 case <-gracefulStopReturned:
260 t.Error("GracefulStop returned before rpc method call ended")
261 case <-time.After(defaultTestShortTimeout):
262 }
263
264 unblockGRPCCall <- struct{}{}
265 <-grpcClientCallReturned
266 <-gracefulStopReturned
267 }
268
269
270
271
272
273 func (s) TestStopAbortsBlockingGRPCCall(t *testing.T) {
274 unblockGRPCCall := make(chan struct{})
275 grpcCallExecuting := make(chan struct{})
276 ss := &stubserver.StubServer{
277 UnaryCallF: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
278 close(grpcCallExecuting)
279 <-unblockGRPCCall
280 return &testpb.SimpleResponse{}, nil
281 },
282 }
283
284 err := ss.Start(nil)
285 if err != nil {
286 t.Fatalf("StubServer.start failed: %s", err)
287 }
288 t.Cleanup(ss.Stop)
289
290 grpcClientCallReturned := make(chan struct{})
291 go func() {
292 clt := ss.Client
293 _, err := clt.UnaryCall(context.Background(), &testpb.SimpleRequest{})
294 if err == nil || !isConnClosedErr(err) {
295 t.Errorf("expected rpc to fail with connection closed error, got: %v", err)
296 }
297 close(grpcClientCallReturned)
298 }()
299
300 <-grpcCallExecuting
301 ss.S.Stop()
302
303 unblockGRPCCall <- struct{}{}
304 <-grpcClientCallReturned
305 }
306
View as plain text