1
18
19 package idle_test
20
21 import (
22 "context"
23 "fmt"
24 "io"
25 "strings"
26 "sync"
27 "testing"
28 "time"
29
30 "google.golang.org/grpc"
31 "google.golang.org/grpc/balancer"
32 "google.golang.org/grpc/balancer/roundrobin"
33 "google.golang.org/grpc/codes"
34 "google.golang.org/grpc/connectivity"
35 "google.golang.org/grpc/credentials/insecure"
36 "google.golang.org/grpc/internal"
37 "google.golang.org/grpc/internal/balancer/stub"
38 "google.golang.org/grpc/internal/channelz"
39 "google.golang.org/grpc/internal/grpctest"
40 "google.golang.org/grpc/internal/stubserver"
41 "google.golang.org/grpc/internal/testutils"
42 "google.golang.org/grpc/resolver"
43 "google.golang.org/grpc/resolver/manual"
44 "google.golang.org/grpc/status"
45
46 testgrpc "google.golang.org/grpc/interop/grpc_testing"
47 testpb "google.golang.org/grpc/interop/grpc_testing"
48 )
49
50 func init() {
51 channelz.TurnOn()
52 }
53
54 type s struct {
55 grpctest.Tester
56 }
57
58 func Test(t *testing.T) {
59 grpctest.RunSubTests(t, s{})
60 }
61
62 const (
63 defaultTestTimeout = 10 * time.Second
64 defaultTestShortTimeout = 100 * time.Millisecond
65 defaultTestShortIdleTimeout = 500 * time.Millisecond
66 )
67
68
69
70
71 func channelzTraceEventFound(ctx context.Context, wantDesc string) error {
72 for ctx.Err() == nil {
73 tcs, _ := channelz.GetTopChannels(0, 0)
74 if l := len(tcs); l != 1 {
75 return fmt.Errorf("when looking for channelz trace event with description %q, found %d top-level channels, want 1", wantDesc, l)
76 }
77 trace := tcs[0].Trace()
78 if trace == nil {
79 return fmt.Errorf("when looking for channelz trace event with description %q, no trace events found for top-level channel", wantDesc)
80 }
81
82 for _, e := range trace.Events {
83 if strings.Contains(e.Desc, wantDesc) {
84 return nil
85 }
86 }
87 }
88 return fmt.Errorf("when looking for channelz trace event with description %q, %w", wantDesc, ctx.Err())
89 }
90
91
92
93
94
95
96 func registerWrappedRoundRobinPolicy(t *testing.T) chan struct{} {
97 rrBuilder := balancer.Get(roundrobin.Name)
98 closeCh := make(chan struct{}, 1)
99 stub.Register(roundrobin.Name, stub.BalancerFuncs{
100 Init: func(bd *stub.BalancerData) {
101 bd.Data = rrBuilder.Build(bd.ClientConn, bd.BuildOptions)
102 },
103 UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
104 bal := bd.Data.(balancer.Balancer)
105 return bal.UpdateClientConnState(ccs)
106 },
107 Close: func(bd *stub.BalancerData) {
108 select {
109 case closeCh <- struct{}{}:
110 default:
111 }
112 bal := bd.Data.(balancer.Balancer)
113 bal.Close()
114 },
115 })
116 t.Cleanup(func() { balancer.Register(rrBuilder) })
117
118 return closeCh
119 }
120
121
122
123 func (s) TestChannelIdleness_Disabled_NoActivity(t *testing.T) {
124 closeCh := registerWrappedRoundRobinPolicy(t)
125
126
127 r := manual.NewBuilderWithScheme("whatever")
128 dopts := []grpc.DialOption{
129 grpc.WithTransportCredentials(insecure.NewCredentials()),
130 grpc.WithResolvers(r),
131 grpc.WithIdleTimeout(0),
132 grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`),
133 }
134 cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...)
135 if err != nil {
136 t.Fatalf("grpc.Dial() failed: %v", err)
137 }
138 defer cc.Close()
139
140
141 backend := stubserver.StartTestService(t, nil)
142 defer backend.Stop()
143 r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})
144
145
146 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
147 defer cancel()
148 testutils.AwaitState(ctx, t, cc, connectivity.Ready)
149
150
151 sCtx, sCancel := context.WithTimeout(ctx, 3*defaultTestShortIdleTimeout)
152 defer sCancel()
153 testutils.AwaitNoStateChange(sCtx, t, cc, connectivity.Ready)
154
155
156
157 sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortIdleTimeout)
158 defer sCancel()
159 select {
160 case <-sCtx.Done():
161 case <-closeCh:
162 t.Fatal("LB policy closed when expected not to")
163 }
164 }
165
166
167
168
169 func (s) TestChannelIdleness_Enabled_NoActivity(t *testing.T) {
170 closeCh := registerWrappedRoundRobinPolicy(t)
171
172
173 r := manual.NewBuilderWithScheme("whatever")
174 dopts := []grpc.DialOption{
175 grpc.WithTransportCredentials(insecure.NewCredentials()),
176 grpc.WithResolvers(r),
177 grpc.WithIdleTimeout(defaultTestShortIdleTimeout),
178 grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`),
179 }
180 cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...)
181 if err != nil {
182 t.Fatalf("grpc.Dial() failed: %v", err)
183 }
184 defer cc.Close()
185
186
187 lis := testutils.NewListenerWrapper(t, nil)
188 backend := stubserver.StartTestService(t, &stubserver.StubServer{Listener: lis})
189 defer backend.Stop()
190 r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})
191
192
193 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
194 defer cancel()
195 testutils.AwaitState(ctx, t, cc, connectivity.Ready)
196
197
198 v, err := lis.NewConnCh.Receive(ctx)
199 if err != nil {
200 t.Fatalf("Failed to retrieve conn from test listener: %v", err)
201 }
202 conn := v.(*testutils.ConnWrapper)
203
204
205 testutils.AwaitState(ctx, t, cc, connectivity.Idle)
206
207
208 if err := channelzTraceEventFound(ctx, "entering idle mode"); err != nil {
209 t.Fatal(err)
210 }
211
212
213 if _, err := conn.CloseCh.Receive(ctx); err != nil {
214 t.Fatalf("Failed when waiting for connection to be closed after channel entered IDLE: %v", err)
215 }
216
217
218 select {
219 case <-ctx.Done():
220 t.Fatal("Timeout waiting for LB policy to be closed after the channel enters IDLE")
221 case <-closeCh:
222 }
223 }
224
225
226
227 func (s) TestChannelIdleness_Enabled_OngoingCall(t *testing.T) {
228 tests := []struct {
229 name string
230 makeRPC func(ctx context.Context, client testgrpc.TestServiceClient) error
231 }{
232 {
233 name: "unary",
234 makeRPC: func(ctx context.Context, client testgrpc.TestServiceClient) error {
235 if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
236 return fmt.Errorf("EmptyCall RPC failed: %v", err)
237 }
238 return nil
239 },
240 },
241 {
242 name: "streaming",
243 makeRPC: func(ctx context.Context, client testgrpc.TestServiceClient) error {
244 stream, err := client.FullDuplexCall(ctx)
245 if err != nil {
246 t.Fatalf("FullDuplexCall RPC failed: %v", err)
247 }
248 if _, err := stream.Recv(); err != nil && err != io.EOF {
249 t.Fatalf("stream.Recv() failed: %v", err)
250 }
251 return nil
252 },
253 },
254 }
255
256 for _, test := range tests {
257 t.Run(test.name, func(t *testing.T) {
258 closeCh := registerWrappedRoundRobinPolicy(t)
259
260
261 r := manual.NewBuilderWithScheme("whatever")
262 dopts := []grpc.DialOption{
263 grpc.WithTransportCredentials(insecure.NewCredentials()),
264 grpc.WithResolvers(r),
265 grpc.WithIdleTimeout(defaultTestShortIdleTimeout),
266 grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`),
267 }
268 cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...)
269 if err != nil {
270 t.Fatalf("grpc.Dial() failed: %v", err)
271 }
272 defer cc.Close()
273
274
275
276 blockCh := make(chan struct{})
277 backend := &stubserver.StubServer{
278 EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
279 <-blockCh
280 return &testpb.Empty{}, nil
281 },
282 FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error {
283 <-blockCh
284 return nil
285 },
286 }
287 if err := backend.StartServer(); err != nil {
288 t.Fatalf("Failed to start backend: %v", err)
289 }
290 defer backend.Stop()
291
292
293
294 r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})
295
296
297 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
298 defer cancel()
299 testutils.AwaitState(ctx, t, cc, connectivity.Ready)
300
301
302
303 errCh := make(chan error, 1)
304 go func() {
305 defer close(blockCh)
306
307
308 sCtx, sCancel := context.WithTimeout(ctx, 3*defaultTestShortIdleTimeout)
309 defer sCancel()
310 if cc.WaitForStateChange(sCtx, connectivity.Ready) {
311 errCh <- fmt.Errorf("state changed from %q to %q when no state change was expected", connectivity.Ready, cc.GetState())
312 return
313 }
314
315
316
317 sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortIdleTimeout)
318 defer sCancel()
319 select {
320 case <-sCtx.Done():
321 case <-closeCh:
322 errCh <- fmt.Errorf("LB policy closed when expected not to")
323 }
324 errCh <- nil
325 }()
326
327 if err := test.makeRPC(ctx, testgrpc.NewTestServiceClient(cc)); err != nil {
328 t.Fatalf("%s rpc failed: %v", test.name, err)
329 }
330
331 select {
332 case err := <-errCh:
333 if err != nil {
334 t.Fatal(err)
335 }
336 case <-ctx.Done():
337 t.Fatalf("Timeout when trying to verify that an active RPC keeps channel from moving to IDLE")
338 }
339 })
340 }
341 }
342
343
344
345
346 func (s) TestChannelIdleness_Enabled_ActiveSinceLastCheck(t *testing.T) {
347 closeCh := registerWrappedRoundRobinPolicy(t)
348
349
350 r := manual.NewBuilderWithScheme("whatever")
351 dopts := []grpc.DialOption{
352 grpc.WithTransportCredentials(insecure.NewCredentials()),
353 grpc.WithResolvers(r),
354 grpc.WithIdleTimeout(defaultTestShortIdleTimeout),
355 grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`),
356 }
357 cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...)
358 if err != nil {
359 t.Fatalf("grpc.Dial() failed: %v", err)
360 }
361 defer cc.Close()
362
363
364 backend := stubserver.StartTestService(t, nil)
365 defer backend.Stop()
366 r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})
367
368
369 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
370 defer cancel()
371 testutils.AwaitState(ctx, t, cc, connectivity.Ready)
372
373
374
375
376 sCtx, sCancel := context.WithTimeout(ctx, 3*defaultTestShortIdleTimeout)
377 defer sCancel()
378 go func() {
379 for ; sCtx.Err() == nil; <-time.After(defaultTestShortIdleTimeout / 4) {
380 client := testgrpc.NewTestServiceClient(cc)
381 if _, err := client.EmptyCall(sCtx, &testpb.Empty{}); err != nil {
382
383
384
385 if status.Code(err) != codes.DeadlineExceeded {
386 t.Errorf("EmptyCall RPC failed: %v", err)
387 return
388 }
389 }
390 }
391 }()
392
393
394 testutils.AwaitNoStateChange(sCtx, t, cc, connectivity.Ready)
395
396
397
398 select {
399 case <-sCtx.Done():
400 case <-closeCh:
401 t.Fatal("LB policy closed when expected not to")
402 }
403 }
404
405
406
407
408 func (s) TestChannelIdleness_Enabled_ExitIdleOnRPC(t *testing.T) {
409 closeCh := registerWrappedRoundRobinPolicy(t)
410
411
412
413
414 r := manual.NewBuilderWithScheme("whatever")
415 backend := stubserver.StartTestService(t, nil)
416 defer backend.Stop()
417 r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})
418
419
420 dopts := []grpc.DialOption{
421 grpc.WithTransportCredentials(insecure.NewCredentials()),
422 grpc.WithResolvers(r),
423 grpc.WithIdleTimeout(defaultTestShortIdleTimeout),
424 grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`),
425 }
426 cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...)
427 if err != nil {
428 t.Fatalf("grpc.Dial() failed: %v", err)
429 }
430 defer cc.Close()
431
432
433 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
434 defer cancel()
435 testutils.AwaitState(ctx, t, cc, connectivity.Ready)
436
437
438 testutils.AwaitState(ctx, t, cc, connectivity.Idle)
439
440
441 if err := channelzTraceEventFound(ctx, "entering idle mode"); err != nil {
442 t.Fatal(err)
443 }
444
445
446 select {
447 case <-ctx.Done():
448 t.Fatal("Timeout waiting for LB policy to be closed after the channel enters IDLE")
449 case <-closeCh:
450 }
451
452
453
454 client := testgrpc.NewTestServiceClient(cc)
455 if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
456 t.Fatalf("EmptyCall RPC failed: %v", err)
457 }
458 testutils.AwaitState(ctx, t, cc, connectivity.Ready)
459 if err := channelzTraceEventFound(ctx, "exiting idle mode"); err != nil {
460 t.Fatal(err)
461 }
462 }
463
464
465
466
467
468
469
470
471
472
473
474
475 func (s) TestChannelIdleness_Enabled_IdleTimeoutRacesWithRPCs(t *testing.T) {
476
477
478
479 r := manual.NewBuilderWithScheme("whatever")
480 backend := stubserver.StartTestService(t, nil)
481 defer backend.Stop()
482 r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})
483
484
485 dopts := []grpc.DialOption{
486 grpc.WithTransportCredentials(insecure.NewCredentials()),
487 grpc.WithResolvers(r),
488 grpc.WithIdleTimeout(defaultTestShortTimeout),
489 grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`),
490 }
491 cc, err := grpc.NewClient(r.Scheme()+":///test.server", dopts...)
492 if err != nil {
493 t.Fatalf("grpc.NewClient() failed: %v", err)
494 }
495 defer cc.Close()
496
497
498 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
499 defer cancel()
500 client := testgrpc.NewTestServiceClient(cc)
501 if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
502 t.Errorf("EmptyCall RPC failed: %v", err)
503 }
504
505
506
507
508 for i := 0; i < 20; i++ {
509 <-time.After(defaultTestShortTimeout)
510 if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
511 t.Fatalf("EmptyCall RPC failed: %v", err)
512 }
513 t.Logf("Iteration %d succeeded", i)
514 }
515 }
516
517
518 func (s) TestChannelIdleness_Connect(t *testing.T) {
519
520
521
522 r := manual.NewBuilderWithScheme("whatever")
523 backend := stubserver.StartTestService(t, nil)
524 defer backend.Stop()
525 r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})
526
527
528 dopts := []grpc.DialOption{
529 grpc.WithTransportCredentials(insecure.NewCredentials()),
530 grpc.WithResolvers(r),
531 grpc.WithIdleTimeout(defaultTestShortIdleTimeout),
532 grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`),
533 }
534 cc, err := grpc.NewClient(r.Scheme()+":///test.server", dopts...)
535 if err != nil {
536 t.Fatalf("grpc.NewClient() failed: %v", err)
537 }
538 defer cc.Close()
539
540
541 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
542 defer cancel()
543
544 testutils.AwaitState(ctx, t, cc, connectivity.Idle)
545
546
547 cc.Connect()
548
549
550 testutils.AwaitState(ctx, t, cc, connectivity.Ready)
551 }
552
553
554 func runFunc(ctx context.Context, f func()) {
555 for {
556 select {
557 case <-ctx.Done():
558 return
559 case <-time.After(10 * time.Millisecond):
560 f()
561 }
562 }
563 }
564
565
566
567 func (s) TestChannelIdleness_RaceBetweenEnterAndExitIdleMode(t *testing.T) {
568
569
570
571 r := manual.NewBuilderWithScheme("whatever")
572 backend := stubserver.StartTestService(t, nil)
573 defer backend.Stop()
574 r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})
575
576
577
578 dopts := []grpc.DialOption{
579 grpc.WithTransportCredentials(insecure.NewCredentials()),
580 grpc.WithResolvers(r),
581 grpc.WithIdleTimeout(30 * time.Minute),
582 grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"pick_first":{}}]}`),
583 }
584 cc, err := grpc.NewClient(r.Scheme()+":///test.server", dopts...)
585 if err != nil {
586 t.Fatalf("grpc.NewClient() failed: %v", err)
587 }
588 defer cc.Close()
589
590 enterIdle := internal.EnterIdleModeForTesting.(func(*grpc.ClientConn))
591 enterIdleFunc := func() { enterIdle(cc) }
592 exitIdle := internal.ExitIdleModeForTesting.(func(*grpc.ClientConn) error)
593 exitIdleFunc := func() {
594 if err := exitIdle(cc); err != nil {
595 t.Errorf("Failed to exit idle mode: %v", err)
596 }
597 }
598
599
600 ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
601 defer cancel()
602 var wg sync.WaitGroup
603 wg.Add(4)
604 go func() {
605 runFunc(ctx, enterIdleFunc)
606 wg.Done()
607 }()
608 go func() {
609 runFunc(ctx, enterIdleFunc)
610 wg.Done()
611 }()
612 go func() {
613 runFunc(ctx, exitIdleFunc)
614 wg.Done()
615 }()
616 go func() {
617 runFunc(ctx, exitIdleFunc)
618 wg.Done()
619 }()
620 wg.Wait()
621 }
622
View as plain text