1
18
19 package grpc_test
20
21 import (
22 "context"
23 "io"
24 "runtime"
25 "sync"
26 "testing"
27 "time"
28
29 "google.golang.org/grpc"
30 "google.golang.org/grpc/codes"
31 "google.golang.org/grpc/credentials/insecure"
32 "google.golang.org/grpc/internal/grpcsync"
33 "google.golang.org/grpc/internal/stubserver"
34 "google.golang.org/grpc/status"
35
36 testgrpc "google.golang.org/grpc/interop/grpc_testing"
37 testpb "google.golang.org/grpc/interop/grpc_testing"
38 )
39
40
41
42 func (s) TestServer_MaxHandlers(t *testing.T) {
43 started := make(chan struct{})
44 blockCalls := grpcsync.NewEvent()
45
46
47
48 ss := stubserver.StubServer{
49 FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
50 started <- struct{}{}
51 <-blockCalls.Done()
52 return nil
53 },
54 }
55 if err := ss.Start([]grpc.ServerOption{grpc.MaxConcurrentStreams(1)}); err != nil {
56 t.Fatal("Error starting server:", err)
57 }
58 defer ss.Stop()
59
60 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
61 defer cancel()
62
63
64 ctx1, cancel1 := context.WithCancel(ctx)
65 _, err := ss.Client.FullDuplexCall(ctx1)
66 if err != nil {
67 t.Fatal("Error staring call:", err)
68 }
69
70
71 select {
72 case <-started:
73 case <-ctx.Done():
74 t.Fatalf("Timed out waiting for RPC to start on server.")
75 }
76
77
78 cancel1()
79
80 ctx2, cancel2 := context.WithCancel(ctx)
81 defer cancel2()
82 s, err := ss.Client.FullDuplexCall(ctx2)
83 if err != nil {
84 t.Fatal("Error staring call:", err)
85 }
86
87
88
89 select {
90 case <-started:
91 blockCalls.Fire()
92 t.Fatalf("RPC started unexpectedly.")
93 case <-time.After(100 * time.Millisecond):
94 blockCalls.Fire()
95 }
96
97 select {
98 case <-started:
99 case <-ctx.Done():
100 t.Fatalf("Timed out waiting for second RPC to start on server.")
101 }
102 if _, err := s.Recv(); err != io.EOF {
103 t.Fatal("Received unexpected RPC error:", err)
104 }
105 }
106
107
108
109
110
111 func (s) TestStreamWorkers_RPCsAndStop(t *testing.T) {
112 ss := stubserver.StartTestService(t, nil, grpc.NumStreamWorkers(uint32(runtime.NumCPU())))
113
114
115 defer ss.Stop()
116
117 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
118 defer cancel()
119 const numChannels = 20
120 const numRPCLoops = 20
121
122
123
124 ccs := make([]*grpc.ClientConn, numChannels)
125 for i := 0; i < numChannels; i++ {
126 var err error
127 ccs[i], err = grpc.NewClient(ss.Address, grpc.WithTransportCredentials(insecure.NewCredentials()))
128 if err != nil {
129 t.Fatalf("[iteration: %d] grpc.NewClient(%s) failed: %v", i, ss.Address, err)
130 }
131 defer ccs[i].Close()
132 client := testgrpc.NewTestServiceClient(ccs[i])
133 if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
134 t.Fatalf("EmptyCall() failed: %v", err)
135 }
136 }
137
138
139
140 var wg sync.WaitGroup
141 for i := 0; i < numChannels; i++ {
142 client := testgrpc.NewTestServiceClient(ccs[i])
143 for j := 0; j < numRPCLoops; j++ {
144 wg.Add(1)
145 go func(client testgrpc.TestServiceClient) {
146 defer wg.Done()
147 for {
148 _, err := client.EmptyCall(ctx, &testpb.Empty{})
149 if err == nil {
150 continue
151 }
152 if code := status.Code(err); code == codes.Unavailable {
153
154
155 return
156 }
157 t.Errorf("EmptyCall() failed: %v", err)
158 return
159 }
160 }(client)
161 }
162 }
163
164
165 ss.Stop()
166 wg.Wait()
167 }
168
169
170
171
172 func (s) TestStreamWorkers_GracefulStopAndStop(t *testing.T) {
173 ss := stubserver.StartTestService(t, nil, grpc.NumStreamWorkers(uint32(runtime.NumCPU())))
174 defer ss.Stop()
175
176 if err := ss.StartClient(grpc.WithTransportCredentials(insecure.NewCredentials())); err != nil {
177 t.Fatalf("Failed to create client to stub server: %v", err)
178 }
179 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
180 defer cancel()
181 client := testgrpc.NewTestServiceClient(ss.CC)
182 if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
183 t.Fatalf("EmptyCall() failed: %v", err)
184 }
185
186 ss.S.GracefulStop()
187 }
188
189
190
191 func (s) TestServer_WaitForHandlers(t *testing.T) {
192 started := grpcsync.NewEvent()
193 blockCalls := grpcsync.NewEvent()
194
195
196
197 ss := stubserver.StubServer{
198 FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
199 started.Fire()
200 <-blockCalls.Done()
201 return nil
202 },
203 }
204 if err := ss.Start([]grpc.ServerOption{grpc.WaitForHandlers(true)}); err != nil {
205 t.Fatal("Error starting server:", err)
206 }
207 defer ss.Stop()
208
209 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
210 defer cancel()
211
212
213 ctx1, cancel1 := context.WithCancel(ctx)
214 _, err := ss.Client.FullDuplexCall(ctx1)
215 if err != nil {
216 t.Fatal("Error staring call:", err)
217 }
218
219
220 select {
221 case <-started.Done():
222 case <-ctx.Done():
223 t.Fatalf("Timed out waiting for RPC to start on server.")
224 }
225
226
227 cancel1()
228
229
230
231
232 ss.CC.Close()
233
234
235
236 stopped := grpcsync.NewEvent()
237 go func() {
238 ss.S.Stop()
239 stopped.Fire()
240 }()
241
242
243 select {
244 case <-stopped.Done():
245 trace := make([]byte, 4096)
246 trace = trace[0:runtime.Stack(trace, true)]
247 blockCalls.Fire()
248 t.Fatalf("Server returned from Stop() illegally. Stack trace:\n%v", string(trace))
249 case <-time.After(100 * time.Millisecond):
250
251 blockCalls.Fire()
252 }
253
254 select {
255 case <-stopped.Done():
256 case <-ctx.Done():
257 t.Fatalf("Timed out waiting for second RPC to start on server.")
258 }
259 }
260
261
262
263
264 func (s) TestServer_GracefulStopWaits(t *testing.T) {
265 started := grpcsync.NewEvent()
266 blockCalls := grpcsync.NewEvent()
267
268
269
270 ss := stubserver.StubServer{
271 FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
272 started.Fire()
273 <-blockCalls.Done()
274 return nil
275 },
276 }
277 if err := ss.Start(nil); err != nil {
278 t.Fatal("Error starting server:", err)
279 }
280 defer ss.Stop()
281
282 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
283 defer cancel()
284
285
286 ctx1, cancel1 := context.WithCancel(ctx)
287 _, err := ss.Client.FullDuplexCall(ctx1)
288 if err != nil {
289 t.Fatal("Error staring call:", err)
290 }
291
292
293 select {
294 case <-started.Done():
295 case <-ctx.Done():
296 t.Fatalf("Timed out waiting for RPC to start on server.")
297 }
298
299
300 cancel1()
301
302
303
304
305 ss.CC.Close()
306
307
308
309 stopped := grpcsync.NewEvent()
310 go func() {
311 ss.S.GracefulStop()
312 stopped.Fire()
313 }()
314
315
316 select {
317 case <-stopped.Done():
318 trace := make([]byte, 4096)
319 trace = trace[0:runtime.Stack(trace, true)]
320 blockCalls.Fire()
321 t.Fatalf("Server returned from Stop() illegally. Stack trace:\n%v", string(trace))
322 case <-time.After(100 * time.Millisecond):
323
324 blockCalls.Fire()
325 }
326
327 select {
328 case <-stopped.Done():
329 case <-ctx.Done():
330 t.Fatalf("Timed out waiting for second RPC to start on server.")
331 }
332 }
333
View as plain text