1
18
19 package grpc
20
21 import (
22 "context"
23 "errors"
24 "fmt"
25 "math"
26 "net"
27 "strings"
28 "sync"
29 "sync/atomic"
30 "testing"
31 "time"
32
33 "golang.org/x/net/http2"
34 "google.golang.org/grpc/backoff"
35 "google.golang.org/grpc/balancer"
36 "google.golang.org/grpc/connectivity"
37 "google.golang.org/grpc/credentials"
38 "google.golang.org/grpc/credentials/insecure"
39 internalbackoff "google.golang.org/grpc/internal/backoff"
40 "google.golang.org/grpc/internal/grpcsync"
41 "google.golang.org/grpc/internal/grpctest"
42 "google.golang.org/grpc/internal/transport"
43 "google.golang.org/grpc/keepalive"
44 "google.golang.org/grpc/resolver"
45 "google.golang.org/grpc/resolver/manual"
46 "google.golang.org/grpc/serviceconfig"
47 "google.golang.org/grpc/testdata"
48 )
49
50 const (
51 defaultTestTimeout = 10 * time.Second
52 stateRecordingBalancerName = "state_recording_balancer"
53 )
54
55 var testBalancerBuilder = newStateRecordingBalancerBuilder()
56
57 func init() {
58 balancer.Register(testBalancerBuilder)
59 }
60
61 func parseCfg(r *manual.Resolver, s string) *serviceconfig.ParseResult {
62 scpr := r.CC.ParseServiceConfig(s)
63 if scpr.Err != nil {
64 panic(fmt.Sprintf("Error parsing config %q: %v", s, scpr.Err))
65 }
66 return scpr
67 }
68
69 func (s) TestDialWithTimeout(t *testing.T) {
70 lis, err := net.Listen("tcp", "localhost:0")
71 if err != nil {
72 t.Fatalf("Error while listening. Err: %v", err)
73 }
74 defer lis.Close()
75 lisAddr := resolver.Address{Addr: lis.Addr().String()}
76 lisDone := make(chan struct{})
77 dialDone := make(chan struct{})
78
79 go func() {
80 defer close(lisDone)
81 conn, err := lis.Accept()
82 if err != nil {
83 t.Errorf("Error while accepting. Err: %v", err)
84 return
85 }
86 framer := http2.NewFramer(conn, conn)
87 if err := framer.WriteSettings(http2.Setting{}); err != nil {
88 t.Errorf("Error while writing settings. Err: %v", err)
89 return
90 }
91 <-dialDone
92 }()
93
94 r := manual.NewBuilderWithScheme("whatever")
95 r.InitialState(resolver.State{Addresses: []resolver.Address{lisAddr}})
96 client, err := Dial(r.Scheme()+":///test.server", WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r), WithTimeout(5*time.Second))
97 close(dialDone)
98 if err != nil {
99 t.Fatalf("Dial failed. Err: %v", err)
100 }
101 defer client.Close()
102 timeout := time.After(1 * time.Second)
103 select {
104 case <-timeout:
105 t.Fatal("timed out waiting for server to finish")
106 case <-lisDone:
107 }
108 }
109
110 func (s) TestDialWithMultipleBackendsNotSendingServerPreface(t *testing.T) {
111 lis1, err := net.Listen("tcp", "localhost:0")
112 if err != nil {
113 t.Fatalf("Error while listening. Err: %v", err)
114 }
115 defer lis1.Close()
116 lis1Addr := resolver.Address{Addr: lis1.Addr().String()}
117 lis1Done := make(chan struct{})
118
119 go func() {
120 defer close(lis1Done)
121 conn, err := lis1.Accept()
122 if err != nil {
123 t.Errorf("Error while accepting. Err: %v", err)
124 return
125 }
126 conn.Close()
127 }()
128
129 lis2, err := net.Listen("tcp", "localhost:0")
130 if err != nil {
131 t.Fatalf("Error while listening. Err: %v", err)
132 }
133 defer lis2.Close()
134 lis2Done := make(chan struct{})
135 lis2Addr := resolver.Address{Addr: lis2.Addr().String()}
136
137 go func() {
138 defer close(lis2Done)
139 _, err := lis2.Accept()
140 if err != nil {
141 t.Errorf("Error while accepting. Err: %v", err)
142 return
143 }
144 }()
145
146 r := manual.NewBuilderWithScheme("whatever")
147 r.InitialState(resolver.State{Addresses: []resolver.Address{lis1Addr, lis2Addr}})
148 client, err := Dial(r.Scheme()+":///test.server", WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r))
149 if err != nil {
150 t.Fatalf("Dial failed. Err: %v", err)
151 }
152 defer client.Close()
153 timeout := time.After(5 * time.Second)
154 select {
155 case <-timeout:
156 t.Fatal("timed out waiting for server 1 to finish")
157 case <-lis1Done:
158 }
159 select {
160 case <-timeout:
161 t.Fatal("timed out waiting for server 2 to finish")
162 case <-lis2Done:
163 }
164 }
165
166 func (s) TestDialWaitsForServerSettings(t *testing.T) {
167 lis, err := net.Listen("tcp", "localhost:0")
168 if err != nil {
169 t.Fatalf("Error while listening. Err: %v", err)
170 }
171 defer lis.Close()
172 done := make(chan struct{})
173 sent := make(chan struct{})
174 dialDone := make(chan struct{})
175 go func() {
176 defer func() {
177 close(done)
178 }()
179 conn, err := lis.Accept()
180 if err != nil {
181 t.Errorf("Error while accepting. Err: %v", err)
182 return
183 }
184 defer conn.Close()
185
186
187 time.Sleep(100 * time.Millisecond)
188 framer := http2.NewFramer(conn, conn)
189 close(sent)
190 if err := framer.WriteSettings(http2.Setting{}); err != nil {
191 t.Errorf("Error while writing settings. Err: %v", err)
192 return
193 }
194 <-dialDone
195 }()
196 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
197 defer cancel()
198 client, err := DialContext(ctx, lis.Addr().String(), WithTransportCredentials(insecure.NewCredentials()), WithBlock())
199 close(dialDone)
200 if err != nil {
201 t.Fatalf("Error while dialing. Err: %v", err)
202 }
203 defer client.Close()
204 select {
205 case <-sent:
206 default:
207 t.Fatalf("Dial returned before server settings were sent")
208 }
209 <-done
210 }
211
212 func (s) TestDialWaitsForServerSettingsAndFails(t *testing.T) {
213 lis, err := net.Listen("tcp", "localhost:0")
214 if err != nil {
215 t.Fatalf("Error while listening. Err: %v", err)
216 }
217 done := make(chan struct{})
218 numConns := 0
219 go func() {
220 defer func() {
221 close(done)
222 }()
223 for {
224 conn, err := lis.Accept()
225 if err != nil {
226 break
227 }
228 numConns++
229 defer conn.Close()
230 }
231 }()
232 ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
233 defer cancel()
234 client, err := DialContext(ctx,
235 lis.Addr().String(),
236 WithTransportCredentials(insecure.NewCredentials()),
237 WithReturnConnectionError(),
238 WithConnectParams(ConnectParams{
239 Backoff: backoff.Config{},
240 MinConnectTimeout: 250 * time.Millisecond,
241 }))
242 lis.Close()
243 if err == nil {
244 client.Close()
245 t.Fatalf("Unexpected success (err=nil) while dialing")
246 }
247 expectedMsg := "server preface"
248 if !strings.Contains(err.Error(), context.DeadlineExceeded.Error()) || !strings.Contains(err.Error(), expectedMsg) {
249 t.Fatalf("DialContext(_) = %v; want a message that includes both %q and %q", err, context.DeadlineExceeded.Error(), expectedMsg)
250 }
251 <-done
252 if numConns < 2 {
253 t.Fatalf("dial attempts: %v; want > 1", numConns)
254 }
255 }
256
257
258
259
260
261 func (s) TestCloseConnectionWhenServerPrefaceNotReceived(t *testing.T) {
262 lis, err := net.Listen("tcp", "localhost:0")
263 if err != nil {
264 t.Fatalf("Error while listening. Err: %v", err)
265 }
266 var (
267 conn2 net.Conn
268 over uint32
269 )
270 defer func() {
271 lis.Close()
272
273
274 if conn2 != nil {
275 conn2.Close()
276 }
277 }()
278 done := make(chan struct{})
279 accepted := make(chan struct{})
280 go func() {
281 defer close(done)
282 conn1, err := lis.Accept()
283 if err != nil {
284 t.Errorf("Error while accepting. Err: %v", err)
285 return
286 }
287 defer conn1.Close()
288
289 conn2, err = lis.Accept()
290 if err != nil {
291 t.Errorf("Error while accepting. Err: %v", err)
292 return
293 }
294 close(accepted)
295 framer := http2.NewFramer(conn2, conn2)
296 if err = framer.WriteSettings(http2.Setting{}); err != nil {
297 t.Errorf("Error while writing settings. Err: %v", err)
298 return
299 }
300 b := make([]byte, 8)
301 for {
302 _, err = conn2.Read(b)
303 if err == nil {
304 continue
305 }
306 if atomic.LoadUint32(&over) == 1 {
307
308
309 return
310 }
311 t.Errorf("Unexpected error while reading. Err: %v, want timeout error", err)
312 break
313 }
314 }()
315 client, err := Dial(lis.Addr().String(), WithTransportCredentials(insecure.NewCredentials()), withMinConnectDeadline(func() time.Duration { return time.Millisecond * 500 }))
316 if err != nil {
317 t.Fatalf("Error while dialing. Err: %v", err)
318 }
319
320 go stayConnected(client)
321
322
323 timer := time.NewTimer(time.Second * 10)
324 select {
325 case <-accepted:
326 case <-timer.C:
327 t.Fatalf("Client didn't make another connection request in time.")
328 }
329
330 time.Sleep(time.Second)
331 atomic.StoreUint32(&over, 1)
332 client.Close()
333 <-done
334 }
335
336 func (s) TestBackoffWhenNoServerPrefaceReceived(t *testing.T) {
337 lis, err := net.Listen("tcp", "localhost:0")
338 if err != nil {
339 t.Fatalf("Unexpected error from net.Listen(%q, %q): %v", "tcp", "localhost:0", err)
340 }
341 defer lis.Close()
342 done := make(chan struct{})
343 go func() {
344 defer close(done)
345 conn, err := lis.Accept()
346 if err != nil {
347 t.Errorf("Error while accepting. Err: %v", err)
348 return
349 }
350 prevAt := time.Now()
351 conn.Close()
352 var prevDuration time.Duration
353
354 for i := 0; i < 3; i++ {
355 conn, err := lis.Accept()
356 if err != nil {
357 t.Errorf("Error while accepting. Err: %v", err)
358 return
359 }
360 meow := time.Now()
361 conn.Close()
362 dr := meow.Sub(prevAt)
363 if dr <= prevDuration {
364 t.Errorf("Client backoff did not increase with retries. Previous duration: %v, current duration: %v", prevDuration, dr)
365 return
366 }
367 prevDuration = dr
368 prevAt = meow
369 }
370 }()
371 bc := backoff.Config{
372 BaseDelay: 200 * time.Millisecond,
373 Multiplier: 2.0,
374 Jitter: 0,
375 MaxDelay: 120 * time.Second,
376 }
377 cp := ConnectParams{
378 Backoff: bc,
379 MinConnectTimeout: 1 * time.Second,
380 }
381 cc, err := Dial(lis.Addr().String(), WithTransportCredentials(insecure.NewCredentials()), WithConnectParams(cp))
382 if err != nil {
383 t.Fatalf("Unexpected error from Dial(%v) = %v", lis.Addr(), err)
384 }
385 defer cc.Close()
386 go stayConnected(cc)
387 <-done
388 }
389
390 func (s) TestWithTimeout(t *testing.T) {
391 conn, err := Dial("passthrough:///Non-Existent.Server:80",
392 WithTimeout(time.Millisecond),
393 WithBlock(),
394 WithTransportCredentials(insecure.NewCredentials()))
395 if err == nil {
396 conn.Close()
397 }
398 if err != context.DeadlineExceeded {
399 t.Fatalf("Dial(_, _) = %v, %v, want %v", conn, err, context.DeadlineExceeded)
400 }
401 }
402
403 func (s) TestWithTransportCredentialsTLS(t *testing.T) {
404 ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
405 defer cancel()
406 creds, err := credentials.NewClientTLSFromFile(testdata.Path("x509/server_ca_cert.pem"), "x.test.example.com")
407 if err != nil {
408 t.Fatalf("Failed to create credentials %v", err)
409 }
410 conn, err := DialContext(ctx, "passthrough:///Non-Existent.Server:80", WithTransportCredentials(creds), WithBlock())
411 if err == nil {
412 conn.Close()
413 }
414 if err != context.DeadlineExceeded {
415 t.Fatalf("Dial(_, _) = %v, %v, want %v", conn, err, context.DeadlineExceeded)
416 }
417 }
418
419
420
421
422 func (s) TestDial_OneBackoffPerRetryGroup(t *testing.T) {
423 var attempts uint32
424 getMinConnectTimeout := func() time.Duration {
425 if atomic.AddUint32(&attempts, 1) == 1 {
426
427
428
429 return time.Hour
430 }
431 t.Error("only one attempt backoff calculation, but got more")
432 return 0
433 }
434
435 ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
436 defer cancel()
437
438 lis1, err := net.Listen("tcp", "localhost:0")
439 if err != nil {
440 t.Fatalf("Error while listening. Err: %v", err)
441 }
442 defer lis1.Close()
443
444 lis2, err := net.Listen("tcp", "localhost:0")
445 if err != nil {
446 t.Fatalf("Error while listening. Err: %v", err)
447 }
448 defer lis2.Close()
449
450 server1Done := make(chan struct{})
451 server2Done := make(chan struct{})
452
453
454 go func() {
455 conn, err := lis1.Accept()
456 if err != nil {
457 t.Error(err)
458 return
459 }
460
461 conn.Close()
462 close(server1Done)
463 }()
464
465 go func() {
466 conn, err := lis2.Accept()
467 if err != nil {
468 t.Error(err)
469 return
470 }
471 conn.Close()
472 close(server2Done)
473 }()
474
475 rb := manual.NewBuilderWithScheme("whatever")
476 rb.InitialState(resolver.State{Addresses: []resolver.Address{
477 {Addr: lis1.Addr().String()},
478 {Addr: lis2.Addr().String()},
479 }})
480 client, err := DialContext(ctx, "whatever:///this-gets-overwritten",
481 WithTransportCredentials(insecure.NewCredentials()),
482 WithResolvers(rb),
483 withMinConnectDeadline(getMinConnectTimeout))
484 if err != nil {
485 t.Fatal(err)
486 }
487 defer client.Close()
488
489 timeout := time.After(15 * time.Second)
490
491 select {
492 case <-timeout:
493 t.Fatal("timed out waiting for test to finish")
494 case <-server1Done:
495 }
496
497 select {
498 case <-timeout:
499 t.Fatal("timed out waiting for test to finish")
500 case <-server2Done:
501 }
502 }
503
504 func (s) TestDialContextCancel(t *testing.T) {
505 ctx, cancel := context.WithCancel(context.Background())
506 cancel()
507 if _, err := DialContext(ctx, "Non-Existent.Server:80", WithBlock(), WithTransportCredentials(insecure.NewCredentials())); err != context.Canceled {
508 t.Fatalf("DialContext(%v, _) = _, %v, want _, %v", ctx, err, context.Canceled)
509 }
510 }
511
512 type failFastError struct{}
513
514 func (failFastError) Error() string { return "failfast" }
515 func (failFastError) Temporary() bool { return false }
516
517 func (s) TestDialContextFailFast(t *testing.T) {
518 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
519 defer cancel()
520 failErr := failFastError{}
521 dialer := func(string, time.Duration) (net.Conn, error) {
522 return nil, failErr
523 }
524
525 _, err := DialContext(ctx, "Non-Existent.Server:80", WithBlock(), WithTransportCredentials(insecure.NewCredentials()), WithDialer(dialer), FailOnNonTempDialError(true))
526 if terr, ok := err.(transport.ConnectionError); !ok || terr.Origin() != failErr {
527 t.Fatalf("DialContext() = _, %v, want _, %v", err, failErr)
528 }
529 }
530
531
532 type securePerRPCCredentials struct {
533 credentials.PerRPCCredentials
534 }
535
536 func (c securePerRPCCredentials) RequireTransportSecurity() bool {
537 return true
538 }
539
540 type fakeBundleCreds struct {
541 credentials.Bundle
542 transportCreds credentials.TransportCredentials
543 }
544
545 func (b *fakeBundleCreds) TransportCredentials() credentials.TransportCredentials {
546 return b.transportCreds
547 }
548
549 func (s) TestCredentialsMisuse(t *testing.T) {
550
551 if _, err := Dial("passthrough:///Non-Existent.Server:80"); err != errNoTransportSecurity {
552 t.Fatalf("Dial(_, _) = _, %v, want _, %v", err, errNoTransportSecurity)
553 }
554
555
556 creds, err := credentials.NewClientTLSFromFile(testdata.Path("x509/server_ca_cert.pem"), "x.test.example.com")
557 if err != nil {
558 t.Fatalf("Failed to create authenticator %v", err)
559 }
560 dopts := []DialOption{
561 WithTransportCredentials(creds),
562 WithCredentialsBundle(&fakeBundleCreds{transportCreds: creds}),
563 }
564 if _, err := Dial("passthrough:///Non-Existent.Server:80", dopts...); err != errTransportCredsAndBundle {
565 t.Fatalf("Dial(_, _) = _, %v, want _, %v", err, errTransportCredsAndBundle)
566 }
567
568
569
570 if _, err := Dial("passthrough:///Non-Existent.Server:80", WithPerRPCCredentials(securePerRPCCredentials{}), WithTransportCredentials(insecure.NewCredentials())); err != errTransportCredentialsMissing {
571 t.Fatalf("Dial(_, _) = _, %v, want _, %v", err, errTransportCredentialsMissing)
572 }
573
574
575 if _, err := Dial("passthrough:///Non-Existent.Server:80", WithCredentialsBundle(&fakeBundleCreds{})); err != errNoTransportCredsInBundle {
576 t.Fatalf("Dial(_, _) = _, %v, want _, %v", err, errTransportCredsAndBundle)
577 }
578 }
579
580 func (s) TestWithBackoffConfigDefault(t *testing.T) {
581 testBackoffConfigSet(t, internalbackoff.DefaultExponential)
582 }
583
584 func (s) TestWithBackoffConfig(t *testing.T) {
585 b := BackoffConfig{MaxDelay: DefaultBackoffConfig.MaxDelay / 2}
586 bc := backoff.DefaultConfig
587 bc.MaxDelay = b.MaxDelay
588 wantBackoff := internalbackoff.Exponential{Config: bc}
589 testBackoffConfigSet(t, wantBackoff, WithBackoffConfig(b))
590 }
591
592 func (s) TestWithBackoffMaxDelay(t *testing.T) {
593 md := DefaultBackoffConfig.MaxDelay / 2
594 bc := backoff.DefaultConfig
595 bc.MaxDelay = md
596 wantBackoff := internalbackoff.Exponential{Config: bc}
597 testBackoffConfigSet(t, wantBackoff, WithBackoffMaxDelay(md))
598 }
599
600 func (s) TestWithConnectParams(t *testing.T) {
601 bd := 2 * time.Second
602 mltpr := 2.0
603 jitter := 0.0
604 bc := backoff.Config{BaseDelay: bd, Multiplier: mltpr, Jitter: jitter}
605
606 crt := ConnectParams{Backoff: bc}
607
608
609 wantBackoff := internalbackoff.Exponential{Config: bc}
610 testBackoffConfigSet(t, wantBackoff, WithConnectParams(crt))
611 }
612
613 func testBackoffConfigSet(t *testing.T, wantBackoff internalbackoff.Exponential, opts ...DialOption) {
614 opts = append(opts, WithTransportCredentials(insecure.NewCredentials()))
615 conn, err := Dial("passthrough:///foo:80", opts...)
616 if err != nil {
617 t.Fatalf("unexpected error dialing connection: %v", err)
618 }
619 defer conn.Close()
620
621 if conn.dopts.bs == nil {
622 t.Fatalf("backoff config not set")
623 }
624
625 gotBackoff, ok := conn.dopts.bs.(internalbackoff.Exponential)
626 if !ok {
627 t.Fatalf("unexpected type of backoff config: %#v", conn.dopts.bs)
628 }
629
630 if gotBackoff != wantBackoff {
631 t.Fatalf("unexpected backoff config on connection: %v, want %v", gotBackoff, wantBackoff)
632 }
633 }
634
635 func (s) TestConnectParamsWithMinConnectTimeout(t *testing.T) {
636
637 mct := 1 * time.Minute
638 conn, err := Dial("passthrough:///foo:80", WithTransportCredentials(insecure.NewCredentials()), WithConnectParams(ConnectParams{MinConnectTimeout: mct}))
639 if err != nil {
640 t.Fatalf("unexpected error dialing connection: %v", err)
641 }
642 defer conn.Close()
643
644 if got := conn.dopts.minConnectTimeout(); got != mct {
645 t.Errorf("unexpect minConnectTimeout on the connection: %v, want %v", got, mct)
646 }
647 }
648
649 func (s) TestResolverServiceConfigBeforeAddressNotPanic(t *testing.T) {
650 r := manual.NewBuilderWithScheme("whatever")
651
652 cc, err := Dial(r.Scheme()+":///test.server", WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r))
653 if err != nil {
654 t.Fatalf("failed to dial: %v", err)
655 }
656 defer cc.Close()
657
658
659
660 r.UpdateState(resolver.State{ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "round_robin"}`)})
661
662 time.Sleep(time.Second)
663 }
664
665 func (s) TestResolverServiceConfigWhileClosingNotPanic(t *testing.T) {
666 for i := 0; i < 10; i++ {
667 r := manual.NewBuilderWithScheme(fmt.Sprintf("whatever-%d", i))
668
669 cc, err := Dial(r.Scheme()+":///test.server", WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r))
670 if err != nil {
671 t.Fatalf("failed to dial: %v", err)
672 }
673
674 go cc.Close()
675 go r.UpdateState(resolver.State{ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "round_robin"}`)})
676 }
677 }
678
679 func (s) TestResolverEmptyUpdateNotPanic(t *testing.T) {
680 r := manual.NewBuilderWithScheme("whatever")
681
682 cc, err := Dial(r.Scheme()+":///test.server", WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r))
683 if err != nil {
684 t.Fatalf("failed to dial: %v", err)
685 }
686 defer cc.Close()
687
688
689 r.UpdateState(resolver.State{})
690
691 time.Sleep(time.Second)
692 }
693
694 func (s) TestClientUpdatesParamsAfterGoAway(t *testing.T) {
695 grpctest.TLogger.ExpectError("Client received GoAway with error code ENHANCE_YOUR_CALM and debug data equal to ASCII \"too_many_pings\"")
696
697 lis, err := net.Listen("tcp", "localhost:0")
698 if err != nil {
699 t.Fatalf("Failed to listen. Err: %v", err)
700 }
701 defer lis.Close()
702 connected := grpcsync.NewEvent()
703 defer connected.Fire()
704 go func() {
705 conn, err := lis.Accept()
706 if err != nil {
707 t.Errorf("error accepting connection: %v", err)
708 return
709 }
710 defer conn.Close()
711 f := http2.NewFramer(conn, conn)
712
713
714 go func() {
715 for {
716 if _, err := f.ReadFrame(); err != nil {
717 return
718 }
719 }
720 }()
721 if err := f.WriteSettings(http2.Setting{}); err != nil {
722 t.Errorf("error writing settings: %v", err)
723 return
724 }
725 <-connected.Done()
726 if err := f.WriteGoAway(0, http2.ErrCodeEnhanceYourCalm, []byte("too_many_pings")); err != nil {
727 t.Errorf("error writing GOAWAY: %v", err)
728 return
729 }
730 }()
731 addr := lis.Addr().String()
732 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
733 defer cancel()
734 cc, err := DialContext(ctx, addr, WithBlock(), WithTransportCredentials(insecure.NewCredentials()), WithKeepaliveParams(keepalive.ClientParameters{
735 Time: 10 * time.Second,
736 Timeout: 100 * time.Millisecond,
737 PermitWithoutStream: true,
738 }))
739 if err != nil {
740 t.Fatalf("Dial(%s, _) = _, %v, want _, <nil>", addr, err)
741 }
742 defer cc.Close()
743 connected.Fire()
744 for {
745 time.Sleep(10 * time.Millisecond)
746 cc.mu.RLock()
747 v := cc.mkp.Time
748 cc.mu.RUnlock()
749 if v == 20*time.Second {
750
751 return
752 }
753 if ctx.Err() != nil {
754
755 t.Fatalf("cc.dopts.copts.Keepalive.Time = %v , want 20s", v)
756 }
757 }
758 }
759
760 func (s) TestDisableServiceConfigOption(t *testing.T) {
761 r := manual.NewBuilderWithScheme("whatever")
762 addr := r.Scheme() + ":///non.existent"
763 cc, err := Dial(addr, WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r), WithDisableServiceConfig())
764 if err != nil {
765 t.Fatalf("Dial(%s, _) = _, %v, want _, <nil>", addr, err)
766 }
767 defer cc.Close()
768 r.UpdateState(resolver.State{ServiceConfig: parseCfg(r, `{
769 "methodConfig": [
770 {
771 "name": [
772 {
773 "service": "foo",
774 "method": "Bar"
775 }
776 ],
777 "waitForReady": true
778 }
779 ]
780 }`)})
781 time.Sleep(1 * time.Second)
782 m := cc.GetMethodConfig("/foo/Bar")
783 if m.WaitForReady != nil {
784 t.Fatalf("want: method (\"/foo/bar/\") config to be empty, got: %+v", m)
785 }
786 }
787
788 func (s) TestMethodConfigDefaultService(t *testing.T) {
789 addr := "nonexist:///non.existent"
790 cc, err := Dial(addr, WithTransportCredentials(insecure.NewCredentials()), WithDefaultServiceConfig(`{
791 "methodConfig": [{
792 "name": [
793 {
794 "service": ""
795 }
796 ],
797 "waitForReady": true
798 }]
799 }`))
800 if err != nil {
801 t.Fatalf("Dial(%s, _) = _, %v, want _, <nil>", addr, err)
802 }
803 defer cc.Close()
804
805 m := cc.GetMethodConfig("/foo/Bar")
806 if m.WaitForReady == nil {
807 t.Fatalf("want: method (%q) config to fallback to the default service", "/foo/Bar")
808 }
809 }
810
811 func (s) TestClientConnCanonicalTarget(t *testing.T) {
812 tests := []struct {
813 name string
814 addr string
815 canonicalTargetWant string
816 }{
817 {
818 name: "normal-case",
819 addr: "dns://a.server.com/google.com",
820 canonicalTargetWant: "dns://a.server.com/google.com",
821 },
822 {
823 name: "canonical-target-not-specified",
824 addr: "no.scheme",
825 canonicalTargetWant: "passthrough:///no.scheme",
826 },
827 {
828 name: "canonical-target-nonexistent",
829 addr: "nonexist:///non.existent",
830 canonicalTargetWant: "passthrough:///nonexist:///non.existent",
831 },
832 {
833 name: "canonical-target-add-colon-slash",
834 addr: "dns:hostname:port",
835 canonicalTargetWant: "dns:///hostname:port",
836 },
837 }
838 for _, test := range tests {
839 t.Run(test.name, func(t *testing.T) {
840 cc, err := Dial(test.addr, WithTransportCredentials(insecure.NewCredentials()))
841 if err != nil {
842 t.Fatalf("Dial(%s, _) = _, %v, want _, <nil>", test.addr, err)
843 }
844 defer cc.Close()
845 if cc.Target() != test.addr {
846 t.Fatalf("Target() = %s, want %s", cc.Target(), test.addr)
847 }
848 if cc.CanonicalTarget() != test.canonicalTargetWant {
849 t.Fatalf("CanonicalTarget() = %s, want %s", cc.CanonicalTarget(), test.canonicalTargetWant)
850 }
851 })
852 }
853 }
854
855 type backoffForever struct{}
856
857 func (b backoffForever) Backoff(int) time.Duration { return time.Duration(math.MaxInt64) }
858
859 func (s) TestResetConnectBackoff(t *testing.T) {
860 dials := make(chan struct{})
861 defer func() {
862 select {
863 case <-dials:
864 default:
865 }
866 }()
867 dialer := func(string, time.Duration) (net.Conn, error) {
868 dials <- struct{}{}
869 return nil, errors.New("failed to fake dial")
870 }
871 cc, err := Dial("any", WithTransportCredentials(insecure.NewCredentials()), WithDialer(dialer), withBackoff(backoffForever{}))
872 if err != nil {
873 t.Fatalf("Dial() = _, %v; want _, nil", err)
874 }
875 defer cc.Close()
876 go stayConnected(cc)
877 select {
878 case <-dials:
879 case <-time.NewTimer(10 * time.Second).C:
880 t.Fatal("Failed to call dial within 10s")
881 }
882
883 select {
884 case <-dials:
885 t.Fatal("Dial called unexpectedly before resetting backoff")
886 case <-time.NewTimer(100 * time.Millisecond).C:
887 }
888
889 cc.ResetConnectBackoff()
890
891 select {
892 case <-dials:
893 case <-time.NewTimer(10 * time.Second).C:
894 t.Fatal("Failed to call dial within 10s after resetting backoff")
895 }
896 }
897
898 func (s) TestBackoffCancel(t *testing.T) {
899 dialStrCh := make(chan string)
900 cc, err := Dial("any", WithTransportCredentials(insecure.NewCredentials()), WithDialer(func(t string, _ time.Duration) (net.Conn, error) {
901 dialStrCh <- t
902 return nil, fmt.Errorf("test dialer, always error")
903 }))
904 if err != nil {
905 t.Fatalf("Failed to create ClientConn: %v", err)
906 }
907 defer cc.Close()
908
909 select {
910 case <-time.After(defaultTestTimeout):
911 t.Fatal("Timeout when waiting for custom dialer to be invoked during Dial")
912 case <-dialStrCh:
913 }
914 }
915
916
917
918
919 func (s) TestUpdateAddresses_NoopIfCalledWithSameAddresses(t *testing.T) {
920 lis1, err := net.Listen("tcp", "localhost:0")
921 if err != nil {
922 t.Fatalf("Error while listening. Err: %v", err)
923 }
924 defer lis1.Close()
925
926 lis2, err := net.Listen("tcp", "localhost:0")
927 if err != nil {
928 t.Fatalf("Error while listening. Err: %v", err)
929 }
930 defer lis2.Close()
931
932 lis3, err := net.Listen("tcp", "localhost:0")
933 if err != nil {
934 t.Fatalf("Error while listening. Err: %v", err)
935 }
936 defer lis3.Close()
937
938 closeServer2 := make(chan struct{})
939 exitCh := make(chan struct{})
940 server1ContactedFirstTime := make(chan struct{})
941 server1ContactedSecondTime := make(chan struct{})
942 server2ContactedFirstTime := make(chan struct{})
943 server2ContactedSecondTime := make(chan struct{})
944 server3Contacted := make(chan struct{})
945
946 defer close(exitCh)
947
948
949 go func() {
950
951
952
953 conn1, err := lis1.Accept()
954 if err != nil {
955 t.Error(err)
956 return
957 }
958 go keepReading(conn1)
959
960 framer := http2.NewFramer(conn1, conn1)
961 if err := framer.WriteSettings(http2.Setting{}); err != nil {
962 t.Errorf("Error while writing settings frame. %v", err)
963 return
964 }
965
966
967
968
969 stateNotifications := testBalancerBuilder.nextStateNotifier()
970
971 for {
972 select {
973 case st := <-stateNotifications:
974 if st == connectivity.Ready {
975 goto ready
976 }
977 case <-exitCh:
978 return
979 }
980 }
981
982 ready:
983
984
985 conn1.Close()
986
987
988 conn2, err := lis1.Accept()
989 if err != nil {
990 t.Error(err)
991 return
992 }
993 close(server1ContactedFirstTime)
994 conn2.Close()
995
996
997 lis1.Accept()
998 close(server1ContactedSecondTime)
999 }()
1000
1001 go func() {
1002
1003
1004
1005
1006 conn, err := lis2.Accept()
1007 if err != nil {
1008 t.Error(err)
1009 return
1010 }
1011
1012 close(server2ContactedFirstTime)
1013 <-closeServer2
1014 conn.Close()
1015
1016
1017 lis2.Accept()
1018 close(server2ContactedSecondTime)
1019 }()
1020
1021 go func() {
1022
1023 lis3.Accept()
1024 close(server3Contacted)
1025 }()
1026
1027 addrsList := []resolver.Address{
1028 {Addr: lis1.Addr().String()},
1029 {Addr: lis2.Addr().String()},
1030 {Addr: lis3.Addr().String()},
1031 }
1032 rb := manual.NewBuilderWithScheme("whatever")
1033 rb.InitialState(resolver.State{Addresses: addrsList})
1034
1035 client, err := Dial("whatever:///this-gets-overwritten",
1036 WithTransportCredentials(insecure.NewCredentials()),
1037 WithResolvers(rb),
1038 WithConnectParams(ConnectParams{
1039 Backoff: backoff.Config{},
1040 MinConnectTimeout: time.Hour,
1041 }),
1042 WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, stateRecordingBalancerName)))
1043 if err != nil {
1044 t.Fatal(err)
1045 }
1046 defer client.Close()
1047 go stayConnected(client)
1048
1049 timeout := time.After(5 * time.Second)
1050
1051
1052
1053 select {
1054 case <-server1ContactedFirstTime:
1055 case <-timeout:
1056 t.Fatal("timed out waiting for server1 to be contacted")
1057 }
1058 select {
1059 case <-server2ContactedFirstTime:
1060 case <-timeout:
1061 t.Fatal("timed out waiting for server2 to be contacted")
1062 }
1063
1064
1065 var ac *addrConn
1066 client.mu.Lock()
1067 for clientAC := range client.conns {
1068 ac = clientAC
1069 break
1070 }
1071 client.mu.Unlock()
1072
1073
1074
1075 ac.acbw.UpdateAddresses(addrsList)
1076
1077
1078
1079 close(closeServer2)
1080
1081 select {
1082 case <-server1ContactedSecondTime:
1083 t.Fatal("server1 was contacted a second time, but it should have continued to server 3")
1084 case <-server2ContactedSecondTime:
1085 t.Fatal("server2 was contacted a second time, but it should have continued to server 3")
1086 case <-server3Contacted:
1087 case <-timeout:
1088 t.Fatal("timed out waiting for any server to be contacted after tryUpdateAddrs")
1089 }
1090 }
1091
1092 func (s) TestDefaultServiceConfig(t *testing.T) {
1093 const defaultSC = `
1094 {
1095 "methodConfig": [
1096 {
1097 "name": [
1098 {
1099 "service": "foo",
1100 "method": "bar"
1101 }
1102 ],
1103 "waitForReady": true
1104 }
1105 ]
1106 }`
1107 tests := []struct {
1108 name string
1109 testF func(t *testing.T, r *manual.Resolver, addr, sc string)
1110 sc string
1111 }{
1112 {
1113 name: "invalid-service-config",
1114 testF: testInvalidDefaultServiceConfig,
1115 sc: "",
1116 },
1117 {
1118 name: "resolver-service-config-disabled",
1119 testF: testDefaultServiceConfigWhenResolverServiceConfigDisabled,
1120 sc: defaultSC,
1121 },
1122 {
1123 name: "resolver-does-not-return-service-config",
1124 testF: testDefaultServiceConfigWhenResolverDoesNotReturnServiceConfig,
1125 sc: defaultSC,
1126 },
1127 {
1128 name: "resolver-returns-invalid-service-config",
1129 testF: testDefaultServiceConfigWhenResolverReturnInvalidServiceConfig,
1130 sc: defaultSC,
1131 },
1132 }
1133
1134 for _, test := range tests {
1135 t.Run(test.name, func(t *testing.T) {
1136 r := manual.NewBuilderWithScheme(test.name)
1137 addr := r.Scheme() + ":///non.existent"
1138 test.testF(t, r, addr, test.sc)
1139 })
1140 }
1141 }
1142
1143 func verifyWaitForReadyEqualsTrue(cc *ClientConn) bool {
1144 var i int
1145 for i = 0; i < 10; i++ {
1146 mc := cc.GetMethodConfig("/foo/bar")
1147 if mc.WaitForReady != nil && *mc.WaitForReady == true {
1148 break
1149 }
1150 time.Sleep(100 * time.Millisecond)
1151 }
1152 return i != 10
1153 }
1154
1155 func testInvalidDefaultServiceConfig(t *testing.T, r *manual.Resolver, addr, sc string) {
1156 _, err := Dial(addr, WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r), WithDefaultServiceConfig(sc))
1157 if !strings.Contains(err.Error(), invalidDefaultServiceConfigErrPrefix) {
1158 t.Fatalf("Dial got err: %v, want err contains: %v", err, invalidDefaultServiceConfigErrPrefix)
1159 }
1160 }
1161
1162 func testDefaultServiceConfigWhenResolverServiceConfigDisabled(t *testing.T, r *manual.Resolver, addr string, js string) {
1163 cc, err := Dial(addr, WithTransportCredentials(insecure.NewCredentials()), WithDisableServiceConfig(), WithResolvers(r), WithDefaultServiceConfig(js))
1164 if err != nil {
1165 t.Fatalf("Dial(%s, _) = _, %v, want _, <nil>", addr, err)
1166 }
1167 defer cc.Close()
1168
1169 r.UpdateState(resolver.State{
1170 Addresses: []resolver.Address{{Addr: addr}},
1171 ServiceConfig: parseCfg(r, "{}"),
1172 })
1173 if !verifyWaitForReadyEqualsTrue(cc) {
1174 t.Fatal("default service config failed to be applied after 1s")
1175 }
1176 }
1177
1178 func testDefaultServiceConfigWhenResolverDoesNotReturnServiceConfig(t *testing.T, r *manual.Resolver, addr string, js string) {
1179 cc, err := Dial(addr, WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r), WithDefaultServiceConfig(js))
1180 if err != nil {
1181 t.Fatalf("Dial(%s, _) = _, %v, want _, <nil>", addr, err)
1182 }
1183 defer cc.Close()
1184 r.UpdateState(resolver.State{
1185 Addresses: []resolver.Address{{Addr: addr}},
1186 })
1187 if !verifyWaitForReadyEqualsTrue(cc) {
1188 t.Fatal("default service config failed to be applied after 1s")
1189 }
1190 }
1191
1192 func testDefaultServiceConfigWhenResolverReturnInvalidServiceConfig(t *testing.T, r *manual.Resolver, addr string, js string) {
1193 cc, err := Dial(addr, WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r), WithDefaultServiceConfig(js))
1194 if err != nil {
1195 t.Fatalf("Dial(%s, _) = _, %v, want _, <nil>", addr, err)
1196 }
1197 defer cc.Close()
1198 r.UpdateState(resolver.State{
1199 Addresses: []resolver.Address{{Addr: addr}},
1200 })
1201 if !verifyWaitForReadyEqualsTrue(cc) {
1202 t.Fatal("default service config failed to be applied after 1s")
1203 }
1204 }
1205
1206 type stateRecordingBalancer struct {
1207 balancer.Balancer
1208 }
1209
1210 func (b *stateRecordingBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) {
1211 panic(fmt.Sprintf("UpdateSubConnState(%v, %+v) called unexpectedly", sc, s))
1212 }
1213
1214 func (b *stateRecordingBalancer) Close() {
1215 b.Balancer.Close()
1216 }
1217
1218 type stateRecordingBalancerBuilder struct {
1219 mu sync.Mutex
1220 notifier chan connectivity.State
1221 }
1222
1223 func newStateRecordingBalancerBuilder() *stateRecordingBalancerBuilder {
1224 return &stateRecordingBalancerBuilder{}
1225 }
1226
1227 func (b *stateRecordingBalancerBuilder) Name() string {
1228 return stateRecordingBalancerName
1229 }
1230
1231 func (b *stateRecordingBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
1232 stateNotifications := make(chan connectivity.State, 10)
1233 b.mu.Lock()
1234 b.notifier = stateNotifications
1235 b.mu.Unlock()
1236 return &stateRecordingBalancer{
1237 Balancer: balancer.Get("pick_first").Build(&stateRecordingCCWrapper{cc, stateNotifications}, opts),
1238 }
1239 }
1240
1241 func (b *stateRecordingBalancerBuilder) nextStateNotifier() <-chan connectivity.State {
1242 b.mu.Lock()
1243 defer b.mu.Unlock()
1244 ret := b.notifier
1245 b.notifier = nil
1246 return ret
1247 }
1248
1249 type stateRecordingCCWrapper struct {
1250 balancer.ClientConn
1251 notifier chan<- connectivity.State
1252 }
1253
1254 func (ccw *stateRecordingCCWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
1255 oldListener := opts.StateListener
1256 opts.StateListener = func(s balancer.SubConnState) {
1257 ccw.notifier <- s.ConnectivityState
1258 oldListener(s)
1259 }
1260 return ccw.ClientConn.NewSubConn(addrs, opts)
1261 }
1262
1263
1264
1265
1266
1267 func keepReading(conn net.Conn) {
1268 buf := make([]byte, 1024)
1269 for _, err := conn.Read(buf); err == nil; _, err = conn.Read(buf) {
1270 }
1271 }
1272
1273
1274
1275 func stayConnected(cc *ClientConn) {
1276 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
1277 defer cancel()
1278
1279 for {
1280 state := cc.GetState()
1281 switch state {
1282 case connectivity.Idle:
1283 cc.Connect()
1284 case connectivity.Shutdown:
1285 return
1286 }
1287 if !cc.WaitForStateChange(ctx, state) {
1288 return
1289 }
1290 }
1291 }
1292
1293 func (s) TestURLAuthorityEscape(t *testing.T) {
1294 tests := []struct {
1295 name string
1296 authority string
1297 want string
1298 }{
1299 {
1300 name: "ipv6_authority",
1301 authority: "[::1]",
1302 want: "[::1]",
1303 },
1304 {
1305 name: "with_user_and_host",
1306 authority: "userinfo@host:10001",
1307 want: "userinfo@host:10001",
1308 },
1309 {
1310 name: "with_multiple_slashes",
1311 authority: "projects/123/network/abc/service",
1312 want: "projects%2F123%2Fnetwork%2Fabc%2Fservice",
1313 },
1314 {
1315 name: "all_possible_allowed_chars",
1316 authority: "abc123-._~!$&'()*+,;=@:[]",
1317 want: "abc123-._~!$&'()*+,;=@:[]",
1318 },
1319 }
1320
1321 for _, test := range tests {
1322 t.Run(test.name, func(t *testing.T) {
1323 if got, want := encodeAuthority(test.authority), test.want; got != want {
1324 t.Errorf("encodeAuthority(%s) = %s, want %s", test.authority, got, test.want)
1325 }
1326 })
1327 }
1328 }
1329
View as plain text