1
18
19 package test
20
21 import (
22 "context"
23 "errors"
24 "fmt"
25 "strings"
26 "testing"
27 "time"
28
29 "google.golang.org/grpc"
30 "google.golang.org/grpc/backoff"
31 "google.golang.org/grpc/codes"
32 "google.golang.org/grpc/connectivity"
33 "google.golang.org/grpc/credentials/insecure"
34 "google.golang.org/grpc/internal"
35 "google.golang.org/grpc/internal/channelz"
36 "google.golang.org/grpc/internal/grpcrand"
37 "google.golang.org/grpc/internal/stubserver"
38 "google.golang.org/grpc/internal/testutils"
39 "google.golang.org/grpc/internal/testutils/pickfirst"
40 "google.golang.org/grpc/resolver"
41 "google.golang.org/grpc/resolver/manual"
42 "google.golang.org/grpc/serviceconfig"
43 "google.golang.org/grpc/status"
44
45 testgrpc "google.golang.org/grpc/interop/grpc_testing"
46 testpb "google.golang.org/grpc/interop/grpc_testing"
47 )
48
49 const pickFirstServiceConfig = `{"loadBalancingConfig": [{"pick_first":{}}]}`
50
51
52
53
54 func setupPickFirst(t *testing.T, backendCount int, opts ...grpc.DialOption) (*grpc.ClientConn, *manual.Resolver, []*stubserver.StubServer) {
55 t.Helper()
56
57 r := manual.NewBuilderWithScheme("whatever")
58
59 backends := make([]*stubserver.StubServer, backendCount)
60 addrs := make([]resolver.Address, backendCount)
61 for i := 0; i < backendCount; i++ {
62 backend := &stubserver.StubServer{
63 EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
64 return &testpb.Empty{}, nil
65 },
66 }
67 if err := backend.StartServer(); err != nil {
68 t.Fatalf("Failed to start backend: %v", err)
69 }
70 t.Logf("Started TestService backend at: %q", backend.Address)
71 t.Cleanup(func() { backend.Stop() })
72
73 backends[i] = backend
74 addrs[i] = resolver.Address{Addr: backend.Address}
75 }
76
77 dopts := []grpc.DialOption{
78 grpc.WithTransportCredentials(insecure.NewCredentials()),
79 grpc.WithResolvers(r),
80 grpc.WithDefaultServiceConfig(pickFirstServiceConfig),
81 }
82 dopts = append(dopts, opts...)
83 cc, err := grpc.NewClient(r.Scheme()+":///test.server", dopts...)
84 if err != nil {
85 t.Fatalf("grpc.NewClient() failed: %v", err)
86 }
87 t.Cleanup(func() { cc.Close() })
88
89
90
91 sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
92 defer sCancel()
93 client := testgrpc.NewTestServiceClient(cc)
94 if _, err := client.EmptyCall(sCtx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded {
95 t.Fatalf("EmptyCall() = %s, want %s", status.Code(err), codes.DeadlineExceeded)
96 }
97 return cc, r, backends
98 }
99
100
101
102 func stubBackendsToResolverAddrs(backends []*stubserver.StubServer) []resolver.Address {
103 addrs := make([]resolver.Address, len(backends))
104 for i, backend := range backends {
105 addrs[i] = resolver.Address{Addr: backend.Address}
106 }
107 return addrs
108 }
109
110
111
112 func (s) TestPickFirst_OneBackend(t *testing.T) {
113 cc, r, backends := setupPickFirst(t, 1)
114
115 addrs := stubBackendsToResolverAddrs(backends)
116 r.UpdateState(resolver.State{Addresses: addrs})
117
118 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
119 defer cancel()
120 if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
121 t.Fatal(err)
122 }
123 }
124
125
126
127 func (s) TestPickFirst_MultipleBackends(t *testing.T) {
128 cc, r, backends := setupPickFirst(t, 2)
129
130 addrs := stubBackendsToResolverAddrs(backends)
131 r.UpdateState(resolver.State{Addresses: addrs})
132
133 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
134 defer cancel()
135 if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
136 t.Fatal(err)
137 }
138 }
139
140
141
142
143 func (s) TestPickFirst_OneServerDown(t *testing.T) {
144 cc, r, backends := setupPickFirst(t, 2)
145
146 addrs := stubBackendsToResolverAddrs(backends)
147 r.UpdateState(resolver.State{Addresses: addrs})
148
149 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
150 defer cancel()
151 if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
152 t.Fatal(err)
153 }
154
155
156
157 backends[0].Stop()
158 if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[1]); err != nil {
159 t.Fatal(err)
160 }
161 }
162
163
164
165
166 func (s) TestPickFirst_AllServersDown(t *testing.T) {
167 cc, r, backends := setupPickFirst(t, 2)
168
169 addrs := stubBackendsToResolverAddrs(backends)
170 r.UpdateState(resolver.State{Addresses: addrs})
171
172 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
173 defer cancel()
174 if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
175 t.Fatal(err)
176 }
177
178 for _, b := range backends {
179 b.Stop()
180 }
181
182 client := testgrpc.NewTestServiceClient(cc)
183 for {
184 if ctx.Err() != nil {
185 t.Fatalf("channel failed to move to Unavailable after all backends were stopped: %v", ctx.Err())
186 }
187 if _, err := client.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) == codes.Unavailable {
188 return
189 }
190 time.Sleep(defaultTestShortTimeout)
191 }
192 }
193
194
195
196
197 func (s) TestPickFirst_AddressesRemoved(t *testing.T) {
198 cc, r, backends := setupPickFirst(t, 3)
199
200 addrs := stubBackendsToResolverAddrs(backends)
201 r.UpdateState(resolver.State{Addresses: addrs})
202
203 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
204 defer cancel()
205 if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
206 t.Fatal(err)
207 }
208
209
210
211 r.UpdateState(resolver.State{Addresses: []resolver.Address{addrs[1], addrs[2]}})
212 if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[1]); err != nil {
213 t.Fatal(err)
214 }
215
216
217
218 r.UpdateState(resolver.State{Addresses: []resolver.Address{addrs[1], addrs[2], addrs[0]}})
219 if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[1]); err != nil {
220 t.Fatal(err)
221 }
222
223
224
225 r.UpdateState(resolver.State{Addresses: []resolver.Address{addrs[2], addrs[0]}})
226 if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[2]); err != nil {
227 t.Fatal(err)
228 }
229
230
231
232 r.UpdateState(resolver.State{Addresses: []resolver.Address{addrs[0]}})
233 if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
234 t.Fatal(err)
235 }
236 }
237
238
239
240
241
242
243 func (s) TestPickFirst_NewAddressWhileBlocking(t *testing.T) {
244 cc, r, backends := setupPickFirst(t, 2)
245 addrs := stubBackendsToResolverAddrs(backends)
246 r.UpdateState(resolver.State{Addresses: addrs})
247
248 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
249 defer cancel()
250 if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
251 t.Fatal(err)
252 }
253
254
255
256 r.UpdateState(resolver.State{})
257 testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)
258
259 doneCh := make(chan struct{})
260 client := testgrpc.NewTestServiceClient(cc)
261 go func() {
262
263
264
265 if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
266 t.Errorf("EmptyCall() = %v, want <nil>", err)
267 }
268 close(doneCh)
269 }()
270
271
272
273
274 for {
275 if err := ctx.Err(); err != nil {
276 t.Fatal(err)
277 }
278 tcs, _ := channelz.GetTopChannels(0, 0)
279 if len(tcs) != 1 {
280 t.Fatalf("there should only be one top channel, not %d", len(tcs))
281 }
282 started := tcs[0].ChannelMetrics.CallsStarted.Load()
283 completed := tcs[0].ChannelMetrics.CallsSucceeded.Load() + tcs[0].ChannelMetrics.CallsFailed.Load()
284 if (started - completed) == 1 {
285 break
286 }
287 time.Sleep(defaultTestShortTimeout)
288 }
289
290
291
292 r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backends[0].Address}}})
293
294 select {
295 case <-ctx.Done():
296 t.Fatal("Timeout when waiting for blocked RPC to complete")
297 case <-doneCh:
298 }
299 }
300
301
302
303
304
305
306 func (s) TestPickFirst_StickyTransientFailure(t *testing.T) {
307
308
309 lis, err := testutils.LocalTCPListener()
310 if err != nil {
311 t.Fatalf("Failed to create listener: %v", err)
312 }
313 t.Cleanup(func() { lis.Close() })
314
315 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
316 defer cancel()
317 connCh := make(chan struct{}, 1)
318 go func() {
319 for {
320 conn, err := lis.Accept()
321 if err != nil {
322 return
323 }
324 select {
325 case connCh <- struct{}{}:
326 conn.Close()
327 case <-ctx.Done():
328 return
329 }
330 }
331 }()
332
333
334
335 dopts := []grpc.DialOption{
336 grpc.WithTransportCredentials(insecure.NewCredentials()),
337 grpc.WithDefaultServiceConfig(pickFirstServiceConfig),
338 grpc.WithConnectParams(grpc.ConnectParams{
339 Backoff: backoff.Config{
340 BaseDelay: defaultTestShortTimeout,
341 Multiplier: float64(0),
342 Jitter: float64(0),
343 MaxDelay: defaultTestShortTimeout,
344 },
345 }),
346 }
347 cc, err := grpc.Dial(lis.Addr().String(), dopts...)
348 if err != nil {
349 t.Fatalf("Failed to dial server at %q: %v", lis.Addr(), err)
350 }
351 t.Cleanup(func() { cc.Close() })
352
353 testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)
354
355
356
357
358 go func() {
359 if cc.WaitForStateChange(ctx, connectivity.TransientFailure) {
360 if state := cc.GetState(); state != connectivity.Shutdown {
361 t.Errorf("Unexpected state change from TransientFailure to %s", cc.GetState())
362 }
363 }
364 }()
365
366
367 for i := 0; i < 10; i++ {
368 select {
369 case <-connCh:
370 case <-time.After(2 * defaultTestShortTimeout):
371 t.Error("Timeout when waiting for pick_first to reconnect")
372 }
373 }
374 }
375
376
377 func (s) TestPickFirst_ShuffleAddressList(t *testing.T) {
378 const serviceConfig = `{"loadBalancingConfig": [{"pick_first":{ "shuffleAddressList": true }}]}`
379
380
381 origShuf := grpcrand.Shuffle
382 defer func() { grpcrand.Shuffle = origShuf }()
383 grpcrand.Shuffle = func(n int, f func(int, int)) {
384 if n != 2 {
385 t.Errorf("Shuffle called with n=%v; want 2", n)
386 return
387 }
388 f(0, 1)
389 }
390
391
392 cc, r, backends := setupPickFirst(t, 2)
393 addrs := stubBackendsToResolverAddrs(backends)
394
395 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
396 defer cancel()
397
398
399
400 r.UpdateState(resolver.State{Endpoints: []resolver.Endpoint{
401 {Addresses: []resolver.Address{addrs[0]}},
402 {Addresses: []resolver.Address{addrs[1]}},
403 }})
404 if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
405 t.Fatal(err)
406 }
407
408
409
410 shufState := resolver.State{
411 ServiceConfig: parseServiceConfig(t, r, serviceConfig),
412 Endpoints: []resolver.Endpoint{
413 {Addresses: []resolver.Address{addrs[0]}},
414 {Addresses: []resolver.Address{addrs[1]}},
415 },
416 }
417 r.UpdateState(shufState)
418 if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
419 t.Fatal(err)
420 }
421
422
423
424 r.UpdateState(resolver.State{})
425 testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)
426
427
428
429 r.UpdateState(shufState)
430 if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[1]); err != nil {
431 t.Fatal(err)
432 }
433 }
434
435
436 func (s) TestPickFirst_ParseConfig_Success(t *testing.T) {
437
438 origShuf := grpcrand.Shuffle
439 defer func() { grpcrand.Shuffle = origShuf }()
440 grpcrand.Shuffle = func(n int, f func(int, int)) {
441 if n != 2 {
442 t.Errorf("Shuffle called with n=%v; want 2", n)
443 return
444 }
445 f(0, 1)
446 }
447
448 tests := []struct {
449 name string
450 serviceConfig string
451 wantFirstAddr bool
452 }{
453 {
454 name: "empty pickfirst config",
455 serviceConfig: `{"loadBalancingConfig": [{"pick_first":{}}]}`,
456 wantFirstAddr: true,
457 },
458 {
459 name: "empty good pickfirst config",
460 serviceConfig: `{"loadBalancingConfig": [{"pick_first":{ "shuffleAddressList": true }}]}`,
461 wantFirstAddr: false,
462 },
463 }
464
465 for _, test := range tests {
466 t.Run(test.name, func(t *testing.T) {
467
468 cc, r, backends := setupPickFirst(t, 2)
469 addrs := stubBackendsToResolverAddrs(backends)
470
471 r.UpdateState(resolver.State{
472 ServiceConfig: parseServiceConfig(t, r, test.serviceConfig),
473 Addresses: addrs,
474 })
475
476
477
478
479
480 wantAddr := addrs[0]
481 if !test.wantFirstAddr {
482 wantAddr = addrs[1]
483 }
484
485 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
486 defer cancel()
487 if err := pickfirst.CheckRPCsToBackend(ctx, cc, wantAddr); err != nil {
488 t.Fatal(err)
489 }
490 })
491 }
492 }
493
494
495 func (s) TestPickFirst_ParseConfig_Failure(t *testing.T) {
496
497
498
499 const sc = `{"loadBalancingConfig": [{"pick_first":{ "shuffleAddressList": 666 }}]}`
500 scpr := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(sc)
501 if scpr.Err == nil {
502 t.Fatalf("ParseConfig() succeeded and returned %+v, when expected to fail", scpr)
503 }
504 }
505
506
507
508 func setupPickFirstWithListenerWrapper(t *testing.T, backendCount int, opts ...grpc.DialOption) (*grpc.ClientConn, *manual.Resolver, []*stubserver.StubServer, []*testutils.ListenerWrapper) {
509 t.Helper()
510
511 backends := make([]*stubserver.StubServer, backendCount)
512 addrs := make([]resolver.Address, backendCount)
513 listeners := make([]*testutils.ListenerWrapper, backendCount)
514 for i := 0; i < backendCount; i++ {
515 lis := testutils.NewListenerWrapper(t, nil)
516 backend := &stubserver.StubServer{
517 Listener: lis,
518 EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
519 return &testpb.Empty{}, nil
520 },
521 }
522 if err := backend.StartServer(); err != nil {
523 t.Fatalf("Failed to start backend: %v", err)
524 }
525 t.Logf("Started TestService backend at: %q", backend.Address)
526 t.Cleanup(func() { backend.Stop() })
527
528 backends[i] = backend
529 addrs[i] = resolver.Address{Addr: backend.Address}
530 listeners[i] = lis
531 }
532
533 r := manual.NewBuilderWithScheme("whatever")
534 dopts := []grpc.DialOption{
535 grpc.WithTransportCredentials(insecure.NewCredentials()),
536 grpc.WithResolvers(r),
537 grpc.WithDefaultServiceConfig(pickFirstServiceConfig),
538 }
539 dopts = append(dopts, opts...)
540 cc, err := grpc.NewClient(r.Scheme()+":///test.server", dopts...)
541 if err != nil {
542 t.Fatalf("grpc.NewClient() failed: %v", err)
543 }
544 t.Cleanup(func() { cc.Close() })
545
546
547
548 sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
549 defer sCancel()
550 client := testgrpc.NewTestServiceClient(cc)
551 if _, err := client.EmptyCall(sCtx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded {
552 t.Fatalf("EmptyCall() = %s, want %s", status.Code(err), codes.DeadlineExceeded)
553 }
554 return cc, r, backends, listeners
555 }
556
557
558
559
560
561
562 func (s) TestPickFirst_AddressUpdateWithAttributes(t *testing.T) {
563 cc, r, backends, listeners := setupPickFirstWithListenerWrapper(t, 2)
564
565
566
567 addrs := stubBackendsToResolverAddrs(backends)
568 for i := range addrs {
569 addrs[i].Attributes = addrs[i].Attributes.WithValue("test-attribute-1", fmt.Sprintf("%d", i))
570 }
571 r.UpdateState(resolver.State{Addresses: addrs})
572
573
574 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
575 defer cancel()
576 if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
577 t.Fatal(err)
578 }
579
580
581
582 val, err := listeners[0].NewConnCh.Receive(ctx)
583 if err != nil {
584 t.Fatalf("Failed to receive new connection from wrapped listener: %v", err)
585 }
586 conn := val.(*testutils.ConnWrapper)
587
588
589
590
591 for i := range addrs {
592 addrs[i].Attributes = addrs[i].Attributes.WithValue("test-attribute-2", fmt.Sprintf("%d", i))
593 }
594 r.UpdateState(resolver.State{Addresses: addrs})
595 if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
596 t.Fatal(err)
597 }
598
599
600
601
602
603 if _, err := conn.CloseCh.Receive(ctx); err != nil {
604 t.Fatalf("Timeout when expecting existing connection to be closed: %v", err)
605 }
606 val, err = listeners[0].NewConnCh.Receive(ctx)
607 if err != nil {
608 t.Fatalf("Failed to receive new connection from wrapped listener: %v", err)
609 }
610 conn = val.(*testutils.ConnWrapper)
611
612
613
614
615 for i := range addrs {
616 addrs[i].Attributes = addrs[i].Attributes.WithValue("test-attribute-3", fmt.Sprintf("%d", i))
617 }
618 addrs[0], addrs[1] = addrs[1], addrs[0]
619 r.UpdateState(resolver.State{Addresses: addrs})
620 if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
621 t.Fatal(err)
622 }
623
624
625
626 if _, err := conn.CloseCh.Receive(ctx); err != nil {
627 t.Fatalf("Timeout when expecting existing connection to be closed: %v", err)
628 }
629 _, err = listeners[1].NewConnCh.Receive(ctx)
630 if err != nil {
631 t.Fatalf("Failed to receive new connection from wrapped listener: %v", err)
632 }
633 }
634
635
636
637
638
639
640 func (s) TestPickFirst_AddressUpdateWithBalancerAttributes(t *testing.T) {
641 cc, r, backends, listeners := setupPickFirstWithListenerWrapper(t, 2)
642
643
644
645 addrs := stubBackendsToResolverAddrs(backends)
646 for i := range addrs {
647 addrs[i].BalancerAttributes = addrs[i].BalancerAttributes.WithValue("test-attribute-1", fmt.Sprintf("%d", i))
648 }
649 r.UpdateState(resolver.State{Addresses: addrs})
650
651
652 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
653 defer cancel()
654 if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
655 t.Fatal(err)
656 }
657
658
659
660 val, err := listeners[0].NewConnCh.Receive(ctx)
661 if err != nil {
662 t.Fatalf("Failed to receive new connection from wrapped listener: %v", err)
663 }
664 conn := val.(*testutils.ConnWrapper)
665
666
667
668
669 for i := range addrs {
670 addrs[i].BalancerAttributes = addrs[i].BalancerAttributes.WithValue("test-attribute-2", fmt.Sprintf("%d", i))
671 }
672 r.UpdateState(resolver.State{Addresses: addrs})
673
674
675
676 for i := range listeners {
677 sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
678 defer sCancel()
679 if _, err := listeners[i].NewConnCh.Receive(sCtx); err != context.DeadlineExceeded {
680 t.Fatalf("Unexpected error when expecting no new connection: %v", err)
681 }
682 }
683 sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
684 defer sCancel()
685 if _, err := conn.CloseCh.Receive(sCtx); err != context.DeadlineExceeded {
686 t.Fatalf("Unexpected error when expecting existing connection to stay active: %v", err)
687 }
688 if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
689 t.Fatal(err)
690 }
691
692
693
694
695 for i := range addrs {
696 addrs[i].BalancerAttributes = addrs[i].BalancerAttributes.WithValue("test-attribute-3", fmt.Sprintf("%d", i))
697 }
698 addrs[0], addrs[1] = addrs[1], addrs[0]
699 r.UpdateState(resolver.State{Addresses: addrs})
700
701
702
703 for i := range listeners {
704 sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
705 defer sCancel()
706 if _, err := listeners[i].NewConnCh.Receive(sCtx); err != context.DeadlineExceeded {
707 t.Fatalf("Unexpected error when expecting no new connection: %v", err)
708 }
709 }
710 sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortTimeout)
711 defer sCancel()
712 if _, err := conn.CloseCh.Receive(sCtx); err != context.DeadlineExceeded {
713 t.Fatalf("Unexpected error when expecting existing connection to stay active: %v", err)
714 }
715 if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[1]); err != nil {
716 t.Fatal(err)
717 }
718 }
719
720
721
722
723
724 func (s) TestPickFirst_ResolverError_NoPreviousUpdate(t *testing.T) {
725 cc, r, _ := setupPickFirst(t, 0)
726
727 nrErr := errors.New("error from name resolver")
728 r.ReportError(nrErr)
729
730 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
731 defer cancel()
732 testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)
733
734 client := testgrpc.NewTestServiceClient(cc)
735 _, err := client.EmptyCall(ctx, &testpb.Empty{})
736 if err == nil {
737 t.Fatalf("EmptyCall() succeeded when expected to fail with error: %v", nrErr)
738 }
739 if !strings.Contains(err.Error(), nrErr.Error()) {
740 t.Fatalf("EmptyCall() failed with error: %v, want error: %v", err, nrErr)
741 }
742 }
743
744
745
746
747
748 func (s) TestPickFirst_ResolverError_WithPreviousUpdate_Ready(t *testing.T) {
749 cc, r, backends := setupPickFirst(t, 1)
750
751 addrs := stubBackendsToResolverAddrs(backends)
752 r.UpdateState(resolver.State{Addresses: addrs})
753
754 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
755 defer cancel()
756 if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
757 t.Fatal(err)
758 }
759
760 nrErr := errors.New("error from name resolver")
761 r.ReportError(nrErr)
762
763
764 client := testgrpc.NewTestServiceClient(cc)
765 for end := time.Now().Add(time.Second); time.Now().Before(end); <-time.After(defaultTestShortTimeout) {
766 if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
767 t.Fatalf("EmptyCall() failed: %v", err)
768 }
769 }
770 }
771
772
773
774
775
776
777 func (s) TestPickFirst_ResolverError_WithPreviousUpdate_Connecting(t *testing.T) {
778 lis, err := testutils.LocalTCPListener()
779 if err != nil {
780 t.Fatalf("net.Listen() failed: %v", err)
781 }
782
783
784
785
786 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
787 defer cancel()
788 waitForConnecting := make(chan struct{})
789 go func() {
790 conn, err := lis.Accept()
791 if err != nil {
792 t.Errorf("Unexpected error when accepting a connection: %v", err)
793 }
794 defer conn.Close()
795
796 select {
797 case <-waitForConnecting:
798 case <-ctx.Done():
799 t.Error("Timeout when waiting for channel to move to CONNECTING state")
800 }
801 }()
802
803 r := manual.NewBuilderWithScheme("whatever")
804 dopts := []grpc.DialOption{
805 grpc.WithTransportCredentials(insecure.NewCredentials()),
806 grpc.WithResolvers(r),
807 grpc.WithDefaultServiceConfig(pickFirstServiceConfig),
808 }
809 cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...)
810 if err != nil {
811 t.Fatalf("grpc.Dial() failed: %v", err)
812 }
813 t.Cleanup(func() { cc.Close() })
814
815 addrs := []resolver.Address{{Addr: lis.Addr().String()}}
816 r.UpdateState(resolver.State{Addresses: addrs})
817 testutils.AwaitState(ctx, t, cc, connectivity.Connecting)
818
819 nrErr := errors.New("error from name resolver")
820 r.ReportError(nrErr)
821
822
823
824 client := testgrpc.NewTestServiceClient(cc)
825 sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
826 defer sCancel()
827 if _, err := client.EmptyCall(sCtx, &testpb.Empty{}); !strings.Contains(err.Error(), context.DeadlineExceeded.Error()) {
828 t.Fatalf("EmptyCall() failed with error: %v, want error: %v", err, context.DeadlineExceeded)
829 }
830
831
832
833 close(waitForConnecting)
834 testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)
835 checkForConnectionError(ctx, t, cc)
836 }
837
838
839
840
841
842 func (s) TestPickFirst_ResolverError_WithPreviousUpdate_TransientFailure(t *testing.T) {
843 lis, err := testutils.LocalTCPListener()
844 if err != nil {
845 t.Fatalf("net.Listen() failed: %v", err)
846 }
847
848
849
850 go func() {
851 conn, err := lis.Accept()
852 if err != nil {
853 t.Errorf("Unexpected error when accepting a connection: %v", err)
854 }
855 conn.Close()
856 }()
857
858 r := manual.NewBuilderWithScheme("whatever")
859 dopts := []grpc.DialOption{
860 grpc.WithTransportCredentials(insecure.NewCredentials()),
861 grpc.WithResolvers(r),
862 grpc.WithDefaultServiceConfig(pickFirstServiceConfig),
863 }
864 cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...)
865 if err != nil {
866 t.Fatalf("grpc.Dial() failed: %v", err)
867 }
868 t.Cleanup(func() { cc.Close() })
869
870 addrs := []resolver.Address{{Addr: lis.Addr().String()}}
871 r.UpdateState(resolver.State{Addresses: addrs})
872 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
873 defer cancel()
874 testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)
875 checkForConnectionError(ctx, t, cc)
876
877
878
879
880 nrErr := errors.New("error from name resolver")
881 r.ReportError(nrErr)
882 client := testgrpc.NewTestServiceClient(cc)
883 for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) {
884 if _, err := client.EmptyCall(ctx, &testpb.Empty{}); strings.Contains(err.Error(), nrErr.Error()) {
885 break
886 }
887 }
888 if ctx.Err() != nil {
889 t.Fatal("Timeout when waiting for RPCs to fail with error returned by the name resolver")
890 }
891 }
892
893 func checkForConnectionError(ctx context.Context, t *testing.T, cc *grpc.ClientConn) {
894 t.Helper()
895
896
897
898
899
900
901 client := testgrpc.NewTestServiceClient(cc)
902 if _, err := client.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Unavailable {
903 t.Fatalf("EmptyCall() failed with error: %v, want code %v", err, codes.Unavailable)
904 }
905 }
906
907
908
909
910
911 func (s) TestPickFirst_ResolverError_ZeroAddresses_WithPreviousUpdate(t *testing.T) {
912 cc, r, backends := setupPickFirst(t, 1)
913
914 addrs := stubBackendsToResolverAddrs(backends)
915 r.UpdateState(resolver.State{Addresses: addrs})
916
917 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
918 defer cancel()
919 if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
920 t.Fatal(err)
921 }
922
923 r.UpdateState(resolver.State{})
924 wantErr := "produced zero addresses"
925 client := testgrpc.NewTestServiceClient(cc)
926 for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) {
927 if _, err := client.EmptyCall(ctx, &testpb.Empty{}); strings.Contains(err.Error(), wantErr) {
928 break
929 }
930 }
931 if ctx.Err() != nil {
932 t.Fatal("Timeout when waiting for RPCs to fail with error returned by the name resolver")
933 }
934 }
935
View as plain text