...

Source file src/google.golang.org/grpc/clientconn_test.go

Documentation: google.golang.org/grpc

     1  /*
     2   *
     3   * Copyright 2014 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  package grpc
    20  
    21  import (
    22  	"context"
    23  	"errors"
    24  	"fmt"
    25  	"math"
    26  	"net"
    27  	"strings"
    28  	"sync"
    29  	"sync/atomic"
    30  	"testing"
    31  	"time"
    32  
    33  	"golang.org/x/net/http2"
    34  	"google.golang.org/grpc/backoff"
    35  	"google.golang.org/grpc/balancer"
    36  	"google.golang.org/grpc/connectivity"
    37  	"google.golang.org/grpc/credentials"
    38  	"google.golang.org/grpc/credentials/insecure"
    39  	internalbackoff "google.golang.org/grpc/internal/backoff"
    40  	"google.golang.org/grpc/internal/grpcsync"
    41  	"google.golang.org/grpc/internal/grpctest"
    42  	"google.golang.org/grpc/internal/transport"
    43  	"google.golang.org/grpc/keepalive"
    44  	"google.golang.org/grpc/resolver"
    45  	"google.golang.org/grpc/resolver/manual"
    46  	"google.golang.org/grpc/serviceconfig"
    47  	"google.golang.org/grpc/testdata"
    48  )
    49  
    50  const (
    51  	defaultTestTimeout         = 10 * time.Second
    52  	stateRecordingBalancerName = "state_recording_balancer"
    53  )
    54  
    55  var testBalancerBuilder = newStateRecordingBalancerBuilder()
    56  
    57  func init() {
    58  	balancer.Register(testBalancerBuilder)
    59  }
    60  
    61  func parseCfg(r *manual.Resolver, s string) *serviceconfig.ParseResult {
    62  	scpr := r.CC.ParseServiceConfig(s)
    63  	if scpr.Err != nil {
    64  		panic(fmt.Sprintf("Error parsing config %q: %v", s, scpr.Err))
    65  	}
    66  	return scpr
    67  }
    68  
    69  func (s) TestDialWithTimeout(t *testing.T) {
    70  	lis, err := net.Listen("tcp", "localhost:0")
    71  	if err != nil {
    72  		t.Fatalf("Error while listening. Err: %v", err)
    73  	}
    74  	defer lis.Close()
    75  	lisAddr := resolver.Address{Addr: lis.Addr().String()}
    76  	lisDone := make(chan struct{})
    77  	dialDone := make(chan struct{})
    78  	// 1st listener accepts the connection and then does nothing
    79  	go func() {
    80  		defer close(lisDone)
    81  		conn, err := lis.Accept()
    82  		if err != nil {
    83  			t.Errorf("Error while accepting. Err: %v", err)
    84  			return
    85  		}
    86  		framer := http2.NewFramer(conn, conn)
    87  		if err := framer.WriteSettings(http2.Setting{}); err != nil {
    88  			t.Errorf("Error while writing settings. Err: %v", err)
    89  			return
    90  		}
    91  		<-dialDone // Close conn only after dial returns.
    92  	}()
    93  
    94  	r := manual.NewBuilderWithScheme("whatever")
    95  	r.InitialState(resolver.State{Addresses: []resolver.Address{lisAddr}})
    96  	client, err := Dial(r.Scheme()+":///test.server", WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r), WithTimeout(5*time.Second))
    97  	close(dialDone)
    98  	if err != nil {
    99  		t.Fatalf("Dial failed. Err: %v", err)
   100  	}
   101  	defer client.Close()
   102  	timeout := time.After(1 * time.Second)
   103  	select {
   104  	case <-timeout:
   105  		t.Fatal("timed out waiting for server to finish")
   106  	case <-lisDone:
   107  	}
   108  }
   109  
   110  func (s) TestDialWithMultipleBackendsNotSendingServerPreface(t *testing.T) {
   111  	lis1, err := net.Listen("tcp", "localhost:0")
   112  	if err != nil {
   113  		t.Fatalf("Error while listening. Err: %v", err)
   114  	}
   115  	defer lis1.Close()
   116  	lis1Addr := resolver.Address{Addr: lis1.Addr().String()}
   117  	lis1Done := make(chan struct{})
   118  	// 1st listener accepts the connection and immediately closes it.
   119  	go func() {
   120  		defer close(lis1Done)
   121  		conn, err := lis1.Accept()
   122  		if err != nil {
   123  			t.Errorf("Error while accepting. Err: %v", err)
   124  			return
   125  		}
   126  		conn.Close()
   127  	}()
   128  
   129  	lis2, err := net.Listen("tcp", "localhost:0")
   130  	if err != nil {
   131  		t.Fatalf("Error while listening. Err: %v", err)
   132  	}
   133  	defer lis2.Close()
   134  	lis2Done := make(chan struct{})
   135  	lis2Addr := resolver.Address{Addr: lis2.Addr().String()}
   136  	// 2nd listener should get a connection attempt since the first one failed.
   137  	go func() {
   138  		defer close(lis2Done)
   139  		_, err := lis2.Accept() // Closing the client will clean up this conn.
   140  		if err != nil {
   141  			t.Errorf("Error while accepting. Err: %v", err)
   142  			return
   143  		}
   144  	}()
   145  
   146  	r := manual.NewBuilderWithScheme("whatever")
   147  	r.InitialState(resolver.State{Addresses: []resolver.Address{lis1Addr, lis2Addr}})
   148  	client, err := Dial(r.Scheme()+":///test.server", WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r))
   149  	if err != nil {
   150  		t.Fatalf("Dial failed. Err: %v", err)
   151  	}
   152  	defer client.Close()
   153  	timeout := time.After(5 * time.Second)
   154  	select {
   155  	case <-timeout:
   156  		t.Fatal("timed out waiting for server 1 to finish")
   157  	case <-lis1Done:
   158  	}
   159  	select {
   160  	case <-timeout:
   161  		t.Fatal("timed out waiting for server 2 to finish")
   162  	case <-lis2Done:
   163  	}
   164  }
   165  
   166  func (s) TestDialWaitsForServerSettings(t *testing.T) {
   167  	lis, err := net.Listen("tcp", "localhost:0")
   168  	if err != nil {
   169  		t.Fatalf("Error while listening. Err: %v", err)
   170  	}
   171  	defer lis.Close()
   172  	done := make(chan struct{})
   173  	sent := make(chan struct{})
   174  	dialDone := make(chan struct{})
   175  	go func() { // Launch the server.
   176  		defer func() {
   177  			close(done)
   178  		}()
   179  		conn, err := lis.Accept()
   180  		if err != nil {
   181  			t.Errorf("Error while accepting. Err: %v", err)
   182  			return
   183  		}
   184  		defer conn.Close()
   185  		// Sleep for a little bit to make sure that Dial on client
   186  		// side blocks until settings are received.
   187  		time.Sleep(100 * time.Millisecond)
   188  		framer := http2.NewFramer(conn, conn)
   189  		close(sent)
   190  		if err := framer.WriteSettings(http2.Setting{}); err != nil {
   191  			t.Errorf("Error while writing settings. Err: %v", err)
   192  			return
   193  		}
   194  		<-dialDone // Close conn only after dial returns.
   195  	}()
   196  	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
   197  	defer cancel()
   198  	client, err := DialContext(ctx, lis.Addr().String(), WithTransportCredentials(insecure.NewCredentials()), WithBlock())
   199  	close(dialDone)
   200  	if err != nil {
   201  		t.Fatalf("Error while dialing. Err: %v", err)
   202  	}
   203  	defer client.Close()
   204  	select {
   205  	case <-sent:
   206  	default:
   207  		t.Fatalf("Dial returned before server settings were sent")
   208  	}
   209  	<-done
   210  }
   211  
   212  func (s) TestDialWaitsForServerSettingsAndFails(t *testing.T) {
   213  	lis, err := net.Listen("tcp", "localhost:0")
   214  	if err != nil {
   215  		t.Fatalf("Error while listening. Err: %v", err)
   216  	}
   217  	done := make(chan struct{})
   218  	numConns := 0
   219  	go func() { // Launch the server.
   220  		defer func() {
   221  			close(done)
   222  		}()
   223  		for {
   224  			conn, err := lis.Accept()
   225  			if err != nil {
   226  				break
   227  			}
   228  			numConns++
   229  			defer conn.Close()
   230  		}
   231  	}()
   232  	ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
   233  	defer cancel()
   234  	client, err := DialContext(ctx,
   235  		lis.Addr().String(),
   236  		WithTransportCredentials(insecure.NewCredentials()),
   237  		WithReturnConnectionError(),
   238  		WithConnectParams(ConnectParams{
   239  			Backoff:           backoff.Config{},
   240  			MinConnectTimeout: 250 * time.Millisecond,
   241  		}))
   242  	lis.Close()
   243  	if err == nil {
   244  		client.Close()
   245  		t.Fatalf("Unexpected success (err=nil) while dialing")
   246  	}
   247  	expectedMsg := "server preface"
   248  	if !strings.Contains(err.Error(), context.DeadlineExceeded.Error()) || !strings.Contains(err.Error(), expectedMsg) {
   249  		t.Fatalf("DialContext(_) = %v; want a message that includes both %q and %q", err, context.DeadlineExceeded.Error(), expectedMsg)
   250  	}
   251  	<-done
   252  	if numConns < 2 {
   253  		t.Fatalf("dial attempts: %v; want > 1", numConns)
   254  	}
   255  }
   256  
   257  // 1. Client connects to a server that doesn't send preface.
   258  // 2. After minConnectTimeout(500 ms here), client disconnects and retries.
   259  // 3. The new server sends its preface.
   260  // 4. Client doesn't kill the connection this time.
   261  func (s) TestCloseConnectionWhenServerPrefaceNotReceived(t *testing.T) {
   262  	lis, err := net.Listen("tcp", "localhost:0")
   263  	if err != nil {
   264  		t.Fatalf("Error while listening. Err: %v", err)
   265  	}
   266  	var (
   267  		conn2 net.Conn
   268  		over  uint32
   269  	)
   270  	defer func() {
   271  		lis.Close()
   272  		// conn2 shouldn't be closed until the client has
   273  		// observed a successful test.
   274  		if conn2 != nil {
   275  			conn2.Close()
   276  		}
   277  	}()
   278  	done := make(chan struct{})
   279  	accepted := make(chan struct{})
   280  	go func() { // Launch the server.
   281  		defer close(done)
   282  		conn1, err := lis.Accept()
   283  		if err != nil {
   284  			t.Errorf("Error while accepting. Err: %v", err)
   285  			return
   286  		}
   287  		defer conn1.Close()
   288  		// Don't send server settings and the client should close the connection and try again.
   289  		conn2, err = lis.Accept() // Accept a reconnection request from client.
   290  		if err != nil {
   291  			t.Errorf("Error while accepting. Err: %v", err)
   292  			return
   293  		}
   294  		close(accepted)
   295  		framer := http2.NewFramer(conn2, conn2)
   296  		if err = framer.WriteSettings(http2.Setting{}); err != nil {
   297  			t.Errorf("Error while writing settings. Err: %v", err)
   298  			return
   299  		}
   300  		b := make([]byte, 8)
   301  		for {
   302  			_, err = conn2.Read(b)
   303  			if err == nil {
   304  				continue
   305  			}
   306  			if atomic.LoadUint32(&over) == 1 {
   307  				// The connection stayed alive for the timer.
   308  				// Success.
   309  				return
   310  			}
   311  			t.Errorf("Unexpected error while reading. Err: %v, want timeout error", err)
   312  			break
   313  		}
   314  	}()
   315  	client, err := Dial(lis.Addr().String(), WithTransportCredentials(insecure.NewCredentials()), withMinConnectDeadline(func() time.Duration { return time.Millisecond * 500 }))
   316  	if err != nil {
   317  		t.Fatalf("Error while dialing. Err: %v", err)
   318  	}
   319  
   320  	go stayConnected(client)
   321  
   322  	// wait for connection to be accepted on the server.
   323  	timer := time.NewTimer(time.Second * 10)
   324  	select {
   325  	case <-accepted:
   326  	case <-timer.C:
   327  		t.Fatalf("Client didn't make another connection request in time.")
   328  	}
   329  	// Make sure the connection stays alive for sometime.
   330  	time.Sleep(time.Second)
   331  	atomic.StoreUint32(&over, 1)
   332  	client.Close()
   333  	<-done
   334  }
   335  
   336  func (s) TestBackoffWhenNoServerPrefaceReceived(t *testing.T) {
   337  	lis, err := net.Listen("tcp", "localhost:0")
   338  	if err != nil {
   339  		t.Fatalf("Unexpected error from net.Listen(%q, %q): %v", "tcp", "localhost:0", err)
   340  	}
   341  	defer lis.Close()
   342  	done := make(chan struct{})
   343  	go func() { // Launch the server.
   344  		defer close(done)
   345  		conn, err := lis.Accept() // Accept the connection only to close it immediately.
   346  		if err != nil {
   347  			t.Errorf("Error while accepting. Err: %v", err)
   348  			return
   349  		}
   350  		prevAt := time.Now()
   351  		conn.Close()
   352  		var prevDuration time.Duration
   353  		// Make sure the retry attempts are backed off properly.
   354  		for i := 0; i < 3; i++ {
   355  			conn, err := lis.Accept()
   356  			if err != nil {
   357  				t.Errorf("Error while accepting. Err: %v", err)
   358  				return
   359  			}
   360  			meow := time.Now()
   361  			conn.Close()
   362  			dr := meow.Sub(prevAt)
   363  			if dr <= prevDuration {
   364  				t.Errorf("Client backoff did not increase with retries. Previous duration: %v, current duration: %v", prevDuration, dr)
   365  				return
   366  			}
   367  			prevDuration = dr
   368  			prevAt = meow
   369  		}
   370  	}()
   371  	bc := backoff.Config{
   372  		BaseDelay:  200 * time.Millisecond,
   373  		Multiplier: 2.0,
   374  		Jitter:     0,
   375  		MaxDelay:   120 * time.Second,
   376  	}
   377  	cp := ConnectParams{
   378  		Backoff:           bc,
   379  		MinConnectTimeout: 1 * time.Second,
   380  	}
   381  	cc, err := Dial(lis.Addr().String(), WithTransportCredentials(insecure.NewCredentials()), WithConnectParams(cp))
   382  	if err != nil {
   383  		t.Fatalf("Unexpected error from Dial(%v) = %v", lis.Addr(), err)
   384  	}
   385  	defer cc.Close()
   386  	go stayConnected(cc)
   387  	<-done
   388  }
   389  
   390  func (s) TestWithTimeout(t *testing.T) {
   391  	conn, err := Dial("passthrough:///Non-Existent.Server:80",
   392  		WithTimeout(time.Millisecond),
   393  		WithBlock(),
   394  		WithTransportCredentials(insecure.NewCredentials()))
   395  	if err == nil {
   396  		conn.Close()
   397  	}
   398  	if err != context.DeadlineExceeded {
   399  		t.Fatalf("Dial(_, _) = %v, %v, want %v", conn, err, context.DeadlineExceeded)
   400  	}
   401  }
   402  
   403  func (s) TestWithTransportCredentialsTLS(t *testing.T) {
   404  	ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
   405  	defer cancel()
   406  	creds, err := credentials.NewClientTLSFromFile(testdata.Path("x509/server_ca_cert.pem"), "x.test.example.com")
   407  	if err != nil {
   408  		t.Fatalf("Failed to create credentials %v", err)
   409  	}
   410  	conn, err := DialContext(ctx, "passthrough:///Non-Existent.Server:80", WithTransportCredentials(creds), WithBlock())
   411  	if err == nil {
   412  		conn.Close()
   413  	}
   414  	if err != context.DeadlineExceeded {
   415  		t.Fatalf("Dial(_, _) = %v, %v, want %v", conn, err, context.DeadlineExceeded)
   416  	}
   417  }
   418  
   419  // When creating a transport configured with n addresses, only calculate the
   420  // backoff once per "round" of attempts instead of once per address (n times
   421  // per "round" of attempts).
   422  func (s) TestDial_OneBackoffPerRetryGroup(t *testing.T) {
   423  	var attempts uint32
   424  	getMinConnectTimeout := func() time.Duration {
   425  		if atomic.AddUint32(&attempts, 1) == 1 {
   426  			// Once all addresses are exhausted, hang around and wait for the
   427  			// client.Close to happen rather than re-starting a new round of
   428  			// attempts.
   429  			return time.Hour
   430  		}
   431  		t.Error("only one attempt backoff calculation, but got more")
   432  		return 0
   433  	}
   434  
   435  	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
   436  	defer cancel()
   437  
   438  	lis1, err := net.Listen("tcp", "localhost:0")
   439  	if err != nil {
   440  		t.Fatalf("Error while listening. Err: %v", err)
   441  	}
   442  	defer lis1.Close()
   443  
   444  	lis2, err := net.Listen("tcp", "localhost:0")
   445  	if err != nil {
   446  		t.Fatalf("Error while listening. Err: %v", err)
   447  	}
   448  	defer lis2.Close()
   449  
   450  	server1Done := make(chan struct{})
   451  	server2Done := make(chan struct{})
   452  
   453  	// Launch server 1.
   454  	go func() {
   455  		conn, err := lis1.Accept()
   456  		if err != nil {
   457  			t.Error(err)
   458  			return
   459  		}
   460  
   461  		conn.Close()
   462  		close(server1Done)
   463  	}()
   464  	// Launch server 2.
   465  	go func() {
   466  		conn, err := lis2.Accept()
   467  		if err != nil {
   468  			t.Error(err)
   469  			return
   470  		}
   471  		conn.Close()
   472  		close(server2Done)
   473  	}()
   474  
   475  	rb := manual.NewBuilderWithScheme("whatever")
   476  	rb.InitialState(resolver.State{Addresses: []resolver.Address{
   477  		{Addr: lis1.Addr().String()},
   478  		{Addr: lis2.Addr().String()},
   479  	}})
   480  	client, err := DialContext(ctx, "whatever:///this-gets-overwritten",
   481  		WithTransportCredentials(insecure.NewCredentials()),
   482  		WithResolvers(rb),
   483  		withMinConnectDeadline(getMinConnectTimeout))
   484  	if err != nil {
   485  		t.Fatal(err)
   486  	}
   487  	defer client.Close()
   488  
   489  	timeout := time.After(15 * time.Second)
   490  
   491  	select {
   492  	case <-timeout:
   493  		t.Fatal("timed out waiting for test to finish")
   494  	case <-server1Done:
   495  	}
   496  
   497  	select {
   498  	case <-timeout:
   499  		t.Fatal("timed out waiting for test to finish")
   500  	case <-server2Done:
   501  	}
   502  }
   503  
   504  func (s) TestDialContextCancel(t *testing.T) {
   505  	ctx, cancel := context.WithCancel(context.Background())
   506  	cancel()
   507  	if _, err := DialContext(ctx, "Non-Existent.Server:80", WithBlock(), WithTransportCredentials(insecure.NewCredentials())); err != context.Canceled {
   508  		t.Fatalf("DialContext(%v, _) = _, %v, want _, %v", ctx, err, context.Canceled)
   509  	}
   510  }
   511  
   512  type failFastError struct{}
   513  
   514  func (failFastError) Error() string   { return "failfast" }
   515  func (failFastError) Temporary() bool { return false }
   516  
   517  func (s) TestDialContextFailFast(t *testing.T) {
   518  	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
   519  	defer cancel()
   520  	failErr := failFastError{}
   521  	dialer := func(string, time.Duration) (net.Conn, error) {
   522  		return nil, failErr
   523  	}
   524  
   525  	_, err := DialContext(ctx, "Non-Existent.Server:80", WithBlock(), WithTransportCredentials(insecure.NewCredentials()), WithDialer(dialer), FailOnNonTempDialError(true))
   526  	if terr, ok := err.(transport.ConnectionError); !ok || terr.Origin() != failErr {
   527  		t.Fatalf("DialContext() = _, %v, want _, %v", err, failErr)
   528  	}
   529  }
   530  
   531  // securePerRPCCredentials always requires transport security.
   532  type securePerRPCCredentials struct {
   533  	credentials.PerRPCCredentials
   534  }
   535  
   536  func (c securePerRPCCredentials) RequireTransportSecurity() bool {
   537  	return true
   538  }
   539  
   540  type fakeBundleCreds struct {
   541  	credentials.Bundle
   542  	transportCreds credentials.TransportCredentials
   543  }
   544  
   545  func (b *fakeBundleCreds) TransportCredentials() credentials.TransportCredentials {
   546  	return b.transportCreds
   547  }
   548  
   549  func (s) TestCredentialsMisuse(t *testing.T) {
   550  	// Use of no transport creds and no creds bundle must fail.
   551  	if _, err := Dial("passthrough:///Non-Existent.Server:80"); err != errNoTransportSecurity {
   552  		t.Fatalf("Dial(_, _) = _, %v, want _, %v", err, errNoTransportSecurity)
   553  	}
   554  
   555  	// Use of both transport creds and creds bundle must fail.
   556  	creds, err := credentials.NewClientTLSFromFile(testdata.Path("x509/server_ca_cert.pem"), "x.test.example.com")
   557  	if err != nil {
   558  		t.Fatalf("Failed to create authenticator %v", err)
   559  	}
   560  	dopts := []DialOption{
   561  		WithTransportCredentials(creds),
   562  		WithCredentialsBundle(&fakeBundleCreds{transportCreds: creds}),
   563  	}
   564  	if _, err := Dial("passthrough:///Non-Existent.Server:80", dopts...); err != errTransportCredsAndBundle {
   565  		t.Fatalf("Dial(_, _) = _, %v, want _, %v", err, errTransportCredsAndBundle)
   566  	}
   567  
   568  	// Use of perRPC creds requiring transport security over an insecure
   569  	// transport must fail.
   570  	if _, err := Dial("passthrough:///Non-Existent.Server:80", WithPerRPCCredentials(securePerRPCCredentials{}), WithTransportCredentials(insecure.NewCredentials())); err != errTransportCredentialsMissing {
   571  		t.Fatalf("Dial(_, _) = _, %v, want _, %v", err, errTransportCredentialsMissing)
   572  	}
   573  
   574  	// Use of a creds bundle with nil transport credentials must fail.
   575  	if _, err := Dial("passthrough:///Non-Existent.Server:80", WithCredentialsBundle(&fakeBundleCreds{})); err != errNoTransportCredsInBundle {
   576  		t.Fatalf("Dial(_, _) = _, %v, want _, %v", err, errTransportCredsAndBundle)
   577  	}
   578  }
   579  
   580  func (s) TestWithBackoffConfigDefault(t *testing.T) {
   581  	testBackoffConfigSet(t, internalbackoff.DefaultExponential)
   582  }
   583  
   584  func (s) TestWithBackoffConfig(t *testing.T) {
   585  	b := BackoffConfig{MaxDelay: DefaultBackoffConfig.MaxDelay / 2}
   586  	bc := backoff.DefaultConfig
   587  	bc.MaxDelay = b.MaxDelay
   588  	wantBackoff := internalbackoff.Exponential{Config: bc}
   589  	testBackoffConfigSet(t, wantBackoff, WithBackoffConfig(b))
   590  }
   591  
   592  func (s) TestWithBackoffMaxDelay(t *testing.T) {
   593  	md := DefaultBackoffConfig.MaxDelay / 2
   594  	bc := backoff.DefaultConfig
   595  	bc.MaxDelay = md
   596  	wantBackoff := internalbackoff.Exponential{Config: bc}
   597  	testBackoffConfigSet(t, wantBackoff, WithBackoffMaxDelay(md))
   598  }
   599  
   600  func (s) TestWithConnectParams(t *testing.T) {
   601  	bd := 2 * time.Second
   602  	mltpr := 2.0
   603  	jitter := 0.0
   604  	bc := backoff.Config{BaseDelay: bd, Multiplier: mltpr, Jitter: jitter}
   605  
   606  	crt := ConnectParams{Backoff: bc}
   607  	// MaxDelay is not set in the ConnectParams. So it should not be set on
   608  	// internalbackoff.Exponential as well.
   609  	wantBackoff := internalbackoff.Exponential{Config: bc}
   610  	testBackoffConfigSet(t, wantBackoff, WithConnectParams(crt))
   611  }
   612  
   613  func testBackoffConfigSet(t *testing.T, wantBackoff internalbackoff.Exponential, opts ...DialOption) {
   614  	opts = append(opts, WithTransportCredentials(insecure.NewCredentials()))
   615  	conn, err := Dial("passthrough:///foo:80", opts...)
   616  	if err != nil {
   617  		t.Fatalf("unexpected error dialing connection: %v", err)
   618  	}
   619  	defer conn.Close()
   620  
   621  	if conn.dopts.bs == nil {
   622  		t.Fatalf("backoff config not set")
   623  	}
   624  
   625  	gotBackoff, ok := conn.dopts.bs.(internalbackoff.Exponential)
   626  	if !ok {
   627  		t.Fatalf("unexpected type of backoff config: %#v", conn.dopts.bs)
   628  	}
   629  
   630  	if gotBackoff != wantBackoff {
   631  		t.Fatalf("unexpected backoff config on connection: %v, want %v", gotBackoff, wantBackoff)
   632  	}
   633  }
   634  
   635  func (s) TestConnectParamsWithMinConnectTimeout(t *testing.T) {
   636  	// Default value specified for minConnectTimeout in the spec is 20 seconds.
   637  	mct := 1 * time.Minute
   638  	conn, err := Dial("passthrough:///foo:80", WithTransportCredentials(insecure.NewCredentials()), WithConnectParams(ConnectParams{MinConnectTimeout: mct}))
   639  	if err != nil {
   640  		t.Fatalf("unexpected error dialing connection: %v", err)
   641  	}
   642  	defer conn.Close()
   643  
   644  	if got := conn.dopts.minConnectTimeout(); got != mct {
   645  		t.Errorf("unexpect minConnectTimeout on the connection: %v, want %v", got, mct)
   646  	}
   647  }
   648  
   649  func (s) TestResolverServiceConfigBeforeAddressNotPanic(t *testing.T) {
   650  	r := manual.NewBuilderWithScheme("whatever")
   651  
   652  	cc, err := Dial(r.Scheme()+":///test.server", WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r))
   653  	if err != nil {
   654  		t.Fatalf("failed to dial: %v", err)
   655  	}
   656  	defer cc.Close()
   657  
   658  	// SwitchBalancer before NewAddress. There was no balancer created, this
   659  	// makes sure we don't call close on nil balancerWrapper.
   660  	r.UpdateState(resolver.State{ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "round_robin"}`)}) // This should not panic.
   661  
   662  	time.Sleep(time.Second) // Sleep to make sure the service config is handled by ClientConn.
   663  }
   664  
   665  func (s) TestResolverServiceConfigWhileClosingNotPanic(t *testing.T) {
   666  	for i := 0; i < 10; i++ { // Run this multiple times to make sure it doesn't panic.
   667  		r := manual.NewBuilderWithScheme(fmt.Sprintf("whatever-%d", i))
   668  
   669  		cc, err := Dial(r.Scheme()+":///test.server", WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r))
   670  		if err != nil {
   671  			t.Fatalf("failed to dial: %v", err)
   672  		}
   673  		// Send a new service config while closing the ClientConn.
   674  		go cc.Close()
   675  		go r.UpdateState(resolver.State{ServiceConfig: parseCfg(r, `{"loadBalancingPolicy": "round_robin"}`)}) // This should not panic.
   676  	}
   677  }
   678  
   679  func (s) TestResolverEmptyUpdateNotPanic(t *testing.T) {
   680  	r := manual.NewBuilderWithScheme("whatever")
   681  
   682  	cc, err := Dial(r.Scheme()+":///test.server", WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r))
   683  	if err != nil {
   684  		t.Fatalf("failed to dial: %v", err)
   685  	}
   686  	defer cc.Close()
   687  
   688  	// This make sure we don't create addrConn with empty address list.
   689  	r.UpdateState(resolver.State{}) // This should not panic.
   690  
   691  	time.Sleep(time.Second) // Sleep to make sure the service config is handled by ClientConn.
   692  }
   693  
   694  func (s) TestClientUpdatesParamsAfterGoAway(t *testing.T) {
   695  	grpctest.TLogger.ExpectError("Client received GoAway with error code ENHANCE_YOUR_CALM and debug data equal to ASCII \"too_many_pings\"")
   696  
   697  	lis, err := net.Listen("tcp", "localhost:0")
   698  	if err != nil {
   699  		t.Fatalf("Failed to listen. Err: %v", err)
   700  	}
   701  	defer lis.Close()
   702  	connected := grpcsync.NewEvent()
   703  	defer connected.Fire()
   704  	go func() {
   705  		conn, err := lis.Accept()
   706  		if err != nil {
   707  			t.Errorf("error accepting connection: %v", err)
   708  			return
   709  		}
   710  		defer conn.Close()
   711  		f := http2.NewFramer(conn, conn)
   712  		// Start a goroutine to read from the conn to prevent the client from
   713  		// blocking after it writes its preface.
   714  		go func() {
   715  			for {
   716  				if _, err := f.ReadFrame(); err != nil {
   717  					return
   718  				}
   719  			}
   720  		}()
   721  		if err := f.WriteSettings(http2.Setting{}); err != nil {
   722  			t.Errorf("error writing settings: %v", err)
   723  			return
   724  		}
   725  		<-connected.Done()
   726  		if err := f.WriteGoAway(0, http2.ErrCodeEnhanceYourCalm, []byte("too_many_pings")); err != nil {
   727  			t.Errorf("error writing GOAWAY: %v", err)
   728  			return
   729  		}
   730  	}()
   731  	addr := lis.Addr().String()
   732  	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
   733  	defer cancel()
   734  	cc, err := DialContext(ctx, addr, WithBlock(), WithTransportCredentials(insecure.NewCredentials()), WithKeepaliveParams(keepalive.ClientParameters{
   735  		Time:                10 * time.Second,
   736  		Timeout:             100 * time.Millisecond,
   737  		PermitWithoutStream: true,
   738  	}))
   739  	if err != nil {
   740  		t.Fatalf("Dial(%s, _) = _, %v, want _, <nil>", addr, err)
   741  	}
   742  	defer cc.Close()
   743  	connected.Fire()
   744  	for {
   745  		time.Sleep(10 * time.Millisecond)
   746  		cc.mu.RLock()
   747  		v := cc.mkp.Time
   748  		cc.mu.RUnlock()
   749  		if v == 20*time.Second {
   750  			// Success
   751  			return
   752  		}
   753  		if ctx.Err() != nil {
   754  			// Timeout
   755  			t.Fatalf("cc.dopts.copts.Keepalive.Time = %v , want 20s", v)
   756  		}
   757  	}
   758  }
   759  
   760  func (s) TestDisableServiceConfigOption(t *testing.T) {
   761  	r := manual.NewBuilderWithScheme("whatever")
   762  	addr := r.Scheme() + ":///non.existent"
   763  	cc, err := Dial(addr, WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r), WithDisableServiceConfig())
   764  	if err != nil {
   765  		t.Fatalf("Dial(%s, _) = _, %v, want _, <nil>", addr, err)
   766  	}
   767  	defer cc.Close()
   768  	r.UpdateState(resolver.State{ServiceConfig: parseCfg(r, `{
   769      "methodConfig": [
   770          {
   771              "name": [
   772                  {
   773                      "service": "foo",
   774                      "method": "Bar"
   775                  }
   776              ],
   777              "waitForReady": true
   778          }
   779      ]
   780  }`)})
   781  	time.Sleep(1 * time.Second)
   782  	m := cc.GetMethodConfig("/foo/Bar")
   783  	if m.WaitForReady != nil {
   784  		t.Fatalf("want: method (\"/foo/bar/\") config to be empty, got: %+v", m)
   785  	}
   786  }
   787  
   788  func (s) TestMethodConfigDefaultService(t *testing.T) {
   789  	addr := "nonexist:///non.existent"
   790  	cc, err := Dial(addr, WithTransportCredentials(insecure.NewCredentials()), WithDefaultServiceConfig(`{
   791    "methodConfig": [{
   792      "name": [
   793        {
   794          "service": ""
   795        }
   796      ],
   797      "waitForReady": true
   798    }]
   799  }`))
   800  	if err != nil {
   801  		t.Fatalf("Dial(%s, _) = _, %v, want _, <nil>", addr, err)
   802  	}
   803  	defer cc.Close()
   804  
   805  	m := cc.GetMethodConfig("/foo/Bar")
   806  	if m.WaitForReady == nil {
   807  		t.Fatalf("want: method (%q) config to fallback to the default service", "/foo/Bar")
   808  	}
   809  }
   810  
   811  func (s) TestClientConnCanonicalTarget(t *testing.T) {
   812  	tests := []struct {
   813  		name                string
   814  		addr                string
   815  		canonicalTargetWant string
   816  	}{
   817  		{
   818  			name:                "normal-case",
   819  			addr:                "dns://a.server.com/google.com",
   820  			canonicalTargetWant: "dns://a.server.com/google.com",
   821  		},
   822  		{
   823  			name:                "canonical-target-not-specified",
   824  			addr:                "no.scheme",
   825  			canonicalTargetWant: "passthrough:///no.scheme",
   826  		},
   827  		{
   828  			name:                "canonical-target-nonexistent",
   829  			addr:                "nonexist:///non.existent",
   830  			canonicalTargetWant: "passthrough:///nonexist:///non.existent",
   831  		},
   832  		{
   833  			name:                "canonical-target-add-colon-slash",
   834  			addr:                "dns:hostname:port",
   835  			canonicalTargetWant: "dns:///hostname:port",
   836  		},
   837  	}
   838  	for _, test := range tests {
   839  		t.Run(test.name, func(t *testing.T) {
   840  			cc, err := Dial(test.addr, WithTransportCredentials(insecure.NewCredentials()))
   841  			if err != nil {
   842  				t.Fatalf("Dial(%s, _) = _, %v, want _, <nil>", test.addr, err)
   843  			}
   844  			defer cc.Close()
   845  			if cc.Target() != test.addr {
   846  				t.Fatalf("Target() = %s, want %s", cc.Target(), test.addr)
   847  			}
   848  			if cc.CanonicalTarget() != test.canonicalTargetWant {
   849  				t.Fatalf("CanonicalTarget() = %s, want %s", cc.CanonicalTarget(), test.canonicalTargetWant)
   850  			}
   851  		})
   852  	}
   853  }
   854  
   855  type backoffForever struct{}
   856  
   857  func (b backoffForever) Backoff(int) time.Duration { return time.Duration(math.MaxInt64) }
   858  
   859  func (s) TestResetConnectBackoff(t *testing.T) {
   860  	dials := make(chan struct{})
   861  	defer func() { // If we fail, let the http2client break out of dialing.
   862  		select {
   863  		case <-dials:
   864  		default:
   865  		}
   866  	}()
   867  	dialer := func(string, time.Duration) (net.Conn, error) {
   868  		dials <- struct{}{}
   869  		return nil, errors.New("failed to fake dial")
   870  	}
   871  	cc, err := Dial("any", WithTransportCredentials(insecure.NewCredentials()), WithDialer(dialer), withBackoff(backoffForever{}))
   872  	if err != nil {
   873  		t.Fatalf("Dial() = _, %v; want _, nil", err)
   874  	}
   875  	defer cc.Close()
   876  	go stayConnected(cc)
   877  	select {
   878  	case <-dials:
   879  	case <-time.NewTimer(10 * time.Second).C:
   880  		t.Fatal("Failed to call dial within 10s")
   881  	}
   882  
   883  	select {
   884  	case <-dials:
   885  		t.Fatal("Dial called unexpectedly before resetting backoff")
   886  	case <-time.NewTimer(100 * time.Millisecond).C:
   887  	}
   888  
   889  	cc.ResetConnectBackoff()
   890  
   891  	select {
   892  	case <-dials:
   893  	case <-time.NewTimer(10 * time.Second).C:
   894  		t.Fatal("Failed to call dial within 10s after resetting backoff")
   895  	}
   896  }
   897  
   898  func (s) TestBackoffCancel(t *testing.T) {
   899  	dialStrCh := make(chan string)
   900  	cc, err := Dial("any", WithTransportCredentials(insecure.NewCredentials()), WithDialer(func(t string, _ time.Duration) (net.Conn, error) {
   901  		dialStrCh <- t
   902  		return nil, fmt.Errorf("test dialer, always error")
   903  	}))
   904  	if err != nil {
   905  		t.Fatalf("Failed to create ClientConn: %v", err)
   906  	}
   907  	defer cc.Close()
   908  
   909  	select {
   910  	case <-time.After(defaultTestTimeout):
   911  		t.Fatal("Timeout when waiting for custom dialer to be invoked during Dial")
   912  	case <-dialStrCh:
   913  	}
   914  }
   915  
   916  // TestUpdateAddresses_NoopIfCalledWithSameAddresses tests that UpdateAddresses
   917  // should be noop if UpdateAddresses is called with the same list of addresses,
   918  // even when the SubConn is in Connecting and doesn't have a current address.
   919  func (s) TestUpdateAddresses_NoopIfCalledWithSameAddresses(t *testing.T) {
   920  	lis1, err := net.Listen("tcp", "localhost:0")
   921  	if err != nil {
   922  		t.Fatalf("Error while listening. Err: %v", err)
   923  	}
   924  	defer lis1.Close()
   925  
   926  	lis2, err := net.Listen("tcp", "localhost:0")
   927  	if err != nil {
   928  		t.Fatalf("Error while listening. Err: %v", err)
   929  	}
   930  	defer lis2.Close()
   931  
   932  	lis3, err := net.Listen("tcp", "localhost:0")
   933  	if err != nil {
   934  		t.Fatalf("Error while listening. Err: %v", err)
   935  	}
   936  	defer lis3.Close()
   937  
   938  	closeServer2 := make(chan struct{})
   939  	exitCh := make(chan struct{})
   940  	server1ContactedFirstTime := make(chan struct{})
   941  	server1ContactedSecondTime := make(chan struct{})
   942  	server2ContactedFirstTime := make(chan struct{})
   943  	server2ContactedSecondTime := make(chan struct{})
   944  	server3Contacted := make(chan struct{})
   945  
   946  	defer close(exitCh)
   947  
   948  	// Launch server 1.
   949  	go func() {
   950  		// First, let's allow the initial connection to go READY. We need to do
   951  		// this because tryUpdateAddrs only works after there's some non-nil
   952  		// address on the ac, and curAddress is only set after READY.
   953  		conn1, err := lis1.Accept()
   954  		if err != nil {
   955  			t.Error(err)
   956  			return
   957  		}
   958  		go keepReading(conn1)
   959  
   960  		framer := http2.NewFramer(conn1, conn1)
   961  		if err := framer.WriteSettings(http2.Setting{}); err != nil {
   962  			t.Errorf("Error while writing settings frame. %v", err)
   963  			return
   964  		}
   965  
   966  		// nextStateNotifier() is updated after balancerBuilder.Build(), which is
   967  		// called by grpc.Dial. It's safe to do it here because lis1.Accept blocks
   968  		// until balancer is built to process the addresses.
   969  		stateNotifications := testBalancerBuilder.nextStateNotifier()
   970  		// Wait for the transport to become ready.
   971  		for {
   972  			select {
   973  			case st := <-stateNotifications:
   974  				if st == connectivity.Ready {
   975  					goto ready
   976  				}
   977  			case <-exitCh:
   978  				return
   979  			}
   980  		}
   981  
   982  	ready:
   983  		// Once it's ready, curAddress has been set. So let's close this
   984  		// connection prompting the first reconnect cycle.
   985  		conn1.Close()
   986  
   987  		// Accept and immediately close, causing it to go to server2.
   988  		conn2, err := lis1.Accept()
   989  		if err != nil {
   990  			t.Error(err)
   991  			return
   992  		}
   993  		close(server1ContactedFirstTime)
   994  		conn2.Close()
   995  
   996  		// Hopefully it picks this server after tryUpdateAddrs.
   997  		lis1.Accept()
   998  		close(server1ContactedSecondTime)
   999  	}()
  1000  	// Launch server 2.
  1001  	go func() {
  1002  		// Accept and then hang waiting for the test call tryUpdateAddrs and
  1003  		// then signal to this server to close. After this server closes, it
  1004  		// should start from the top instead of trying server2 or continuing
  1005  		// to server3.
  1006  		conn, err := lis2.Accept()
  1007  		if err != nil {
  1008  			t.Error(err)
  1009  			return
  1010  		}
  1011  
  1012  		close(server2ContactedFirstTime)
  1013  		<-closeServer2
  1014  		conn.Close()
  1015  
  1016  		// After tryUpdateAddrs, it should NOT try server2.
  1017  		lis2.Accept()
  1018  		close(server2ContactedSecondTime)
  1019  	}()
  1020  	// Launch server 3.
  1021  	go func() {
  1022  		// After tryUpdateAddrs, it should NOT try server3. (or any other time)
  1023  		lis3.Accept()
  1024  		close(server3Contacted)
  1025  	}()
  1026  
  1027  	addrsList := []resolver.Address{
  1028  		{Addr: lis1.Addr().String()},
  1029  		{Addr: lis2.Addr().String()},
  1030  		{Addr: lis3.Addr().String()},
  1031  	}
  1032  	rb := manual.NewBuilderWithScheme("whatever")
  1033  	rb.InitialState(resolver.State{Addresses: addrsList})
  1034  
  1035  	client, err := Dial("whatever:///this-gets-overwritten",
  1036  		WithTransportCredentials(insecure.NewCredentials()),
  1037  		WithResolvers(rb),
  1038  		WithConnectParams(ConnectParams{
  1039  			Backoff:           backoff.Config{},
  1040  			MinConnectTimeout: time.Hour,
  1041  		}),
  1042  		WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, stateRecordingBalancerName)))
  1043  	if err != nil {
  1044  		t.Fatal(err)
  1045  	}
  1046  	defer client.Close()
  1047  	go stayConnected(client)
  1048  
  1049  	timeout := time.After(5 * time.Second)
  1050  
  1051  	// Wait for server1 to be contacted (which will immediately fail), then
  1052  	// server2 (which will hang waiting for our signal).
  1053  	select {
  1054  	case <-server1ContactedFirstTime:
  1055  	case <-timeout:
  1056  		t.Fatal("timed out waiting for server1 to be contacted")
  1057  	}
  1058  	select {
  1059  	case <-server2ContactedFirstTime:
  1060  	case <-timeout:
  1061  		t.Fatal("timed out waiting for server2 to be contacted")
  1062  	}
  1063  
  1064  	// Grab the addrConn and call tryUpdateAddrs.
  1065  	var ac *addrConn
  1066  	client.mu.Lock()
  1067  	for clientAC := range client.conns {
  1068  		ac = clientAC
  1069  		break
  1070  	}
  1071  	client.mu.Unlock()
  1072  
  1073  	// Call UpdateAddresses with the same list of addresses, it should be a noop
  1074  	// (even when the SubConn is Connecting, and doesn't have a curAddr).
  1075  	ac.acbw.UpdateAddresses(addrsList)
  1076  
  1077  	// We've called tryUpdateAddrs - now let's make server2 close the
  1078  	// connection and check that it continues to server3.
  1079  	close(closeServer2)
  1080  
  1081  	select {
  1082  	case <-server1ContactedSecondTime:
  1083  		t.Fatal("server1 was contacted a second time, but it should have continued to server 3")
  1084  	case <-server2ContactedSecondTime:
  1085  		t.Fatal("server2 was contacted a second time, but it should have continued to server 3")
  1086  	case <-server3Contacted:
  1087  	case <-timeout:
  1088  		t.Fatal("timed out waiting for any server to be contacted after tryUpdateAddrs")
  1089  	}
  1090  }
  1091  
  1092  func (s) TestDefaultServiceConfig(t *testing.T) {
  1093  	const defaultSC = `
  1094  {
  1095      "methodConfig": [
  1096          {
  1097              "name": [
  1098                  {
  1099                      "service": "foo",
  1100                      "method": "bar"
  1101                  }
  1102              ],
  1103              "waitForReady": true
  1104          }
  1105      ]
  1106  }`
  1107  	tests := []struct {
  1108  		name  string
  1109  		testF func(t *testing.T, r *manual.Resolver, addr, sc string)
  1110  		sc    string
  1111  	}{
  1112  		{
  1113  			name:  "invalid-service-config",
  1114  			testF: testInvalidDefaultServiceConfig,
  1115  			sc:    "",
  1116  		},
  1117  		{
  1118  			name:  "resolver-service-config-disabled",
  1119  			testF: testDefaultServiceConfigWhenResolverServiceConfigDisabled,
  1120  			sc:    defaultSC,
  1121  		},
  1122  		{
  1123  			name:  "resolver-does-not-return-service-config",
  1124  			testF: testDefaultServiceConfigWhenResolverDoesNotReturnServiceConfig,
  1125  			sc:    defaultSC,
  1126  		},
  1127  		{
  1128  			name:  "resolver-returns-invalid-service-config",
  1129  			testF: testDefaultServiceConfigWhenResolverReturnInvalidServiceConfig,
  1130  			sc:    defaultSC,
  1131  		},
  1132  	}
  1133  
  1134  	for _, test := range tests {
  1135  		t.Run(test.name, func(t *testing.T) {
  1136  			r := manual.NewBuilderWithScheme(test.name)
  1137  			addr := r.Scheme() + ":///non.existent"
  1138  			test.testF(t, r, addr, test.sc)
  1139  		})
  1140  	}
  1141  }
  1142  
  1143  func verifyWaitForReadyEqualsTrue(cc *ClientConn) bool {
  1144  	var i int
  1145  	for i = 0; i < 10; i++ {
  1146  		mc := cc.GetMethodConfig("/foo/bar")
  1147  		if mc.WaitForReady != nil && *mc.WaitForReady == true {
  1148  			break
  1149  		}
  1150  		time.Sleep(100 * time.Millisecond)
  1151  	}
  1152  	return i != 10
  1153  }
  1154  
  1155  func testInvalidDefaultServiceConfig(t *testing.T, r *manual.Resolver, addr, sc string) {
  1156  	_, err := Dial(addr, WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r), WithDefaultServiceConfig(sc))
  1157  	if !strings.Contains(err.Error(), invalidDefaultServiceConfigErrPrefix) {
  1158  		t.Fatalf("Dial got err: %v, want err contains: %v", err, invalidDefaultServiceConfigErrPrefix)
  1159  	}
  1160  }
  1161  
  1162  func testDefaultServiceConfigWhenResolverServiceConfigDisabled(t *testing.T, r *manual.Resolver, addr string, js string) {
  1163  	cc, err := Dial(addr, WithTransportCredentials(insecure.NewCredentials()), WithDisableServiceConfig(), WithResolvers(r), WithDefaultServiceConfig(js))
  1164  	if err != nil {
  1165  		t.Fatalf("Dial(%s, _) = _, %v, want _, <nil>", addr, err)
  1166  	}
  1167  	defer cc.Close()
  1168  	// Resolver service config gets ignored since resolver service config is disabled.
  1169  	r.UpdateState(resolver.State{
  1170  		Addresses:     []resolver.Address{{Addr: addr}},
  1171  		ServiceConfig: parseCfg(r, "{}"),
  1172  	})
  1173  	if !verifyWaitForReadyEqualsTrue(cc) {
  1174  		t.Fatal("default service config failed to be applied after 1s")
  1175  	}
  1176  }
  1177  
  1178  func testDefaultServiceConfigWhenResolverDoesNotReturnServiceConfig(t *testing.T, r *manual.Resolver, addr string, js string) {
  1179  	cc, err := Dial(addr, WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r), WithDefaultServiceConfig(js))
  1180  	if err != nil {
  1181  		t.Fatalf("Dial(%s, _) = _, %v, want _, <nil>", addr, err)
  1182  	}
  1183  	defer cc.Close()
  1184  	r.UpdateState(resolver.State{
  1185  		Addresses: []resolver.Address{{Addr: addr}},
  1186  	})
  1187  	if !verifyWaitForReadyEqualsTrue(cc) {
  1188  		t.Fatal("default service config failed to be applied after 1s")
  1189  	}
  1190  }
  1191  
  1192  func testDefaultServiceConfigWhenResolverReturnInvalidServiceConfig(t *testing.T, r *manual.Resolver, addr string, js string) {
  1193  	cc, err := Dial(addr, WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r), WithDefaultServiceConfig(js))
  1194  	if err != nil {
  1195  		t.Fatalf("Dial(%s, _) = _, %v, want _, <nil>", addr, err)
  1196  	}
  1197  	defer cc.Close()
  1198  	r.UpdateState(resolver.State{
  1199  		Addresses: []resolver.Address{{Addr: addr}},
  1200  	})
  1201  	if !verifyWaitForReadyEqualsTrue(cc) {
  1202  		t.Fatal("default service config failed to be applied after 1s")
  1203  	}
  1204  }
  1205  
  1206  type stateRecordingBalancer struct {
  1207  	balancer.Balancer
  1208  }
  1209  
  1210  func (b *stateRecordingBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) {
  1211  	panic(fmt.Sprintf("UpdateSubConnState(%v, %+v) called unexpectedly", sc, s))
  1212  }
  1213  
  1214  func (b *stateRecordingBalancer) Close() {
  1215  	b.Balancer.Close()
  1216  }
  1217  
  1218  type stateRecordingBalancerBuilder struct {
  1219  	mu       sync.Mutex
  1220  	notifier chan connectivity.State // The notifier used in the last Balancer.
  1221  }
  1222  
  1223  func newStateRecordingBalancerBuilder() *stateRecordingBalancerBuilder {
  1224  	return &stateRecordingBalancerBuilder{}
  1225  }
  1226  
  1227  func (b *stateRecordingBalancerBuilder) Name() string {
  1228  	return stateRecordingBalancerName
  1229  }
  1230  
  1231  func (b *stateRecordingBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
  1232  	stateNotifications := make(chan connectivity.State, 10)
  1233  	b.mu.Lock()
  1234  	b.notifier = stateNotifications
  1235  	b.mu.Unlock()
  1236  	return &stateRecordingBalancer{
  1237  		Balancer: balancer.Get("pick_first").Build(&stateRecordingCCWrapper{cc, stateNotifications}, opts),
  1238  	}
  1239  }
  1240  
  1241  func (b *stateRecordingBalancerBuilder) nextStateNotifier() <-chan connectivity.State {
  1242  	b.mu.Lock()
  1243  	defer b.mu.Unlock()
  1244  	ret := b.notifier
  1245  	b.notifier = nil
  1246  	return ret
  1247  }
  1248  
  1249  type stateRecordingCCWrapper struct {
  1250  	balancer.ClientConn
  1251  	notifier chan<- connectivity.State
  1252  }
  1253  
  1254  func (ccw *stateRecordingCCWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
  1255  	oldListener := opts.StateListener
  1256  	opts.StateListener = func(s balancer.SubConnState) {
  1257  		ccw.notifier <- s.ConnectivityState
  1258  		oldListener(s)
  1259  	}
  1260  	return ccw.ClientConn.NewSubConn(addrs, opts)
  1261  }
  1262  
  1263  // Keep reading until something causes the connection to die (EOF, server
  1264  // closed, etc). Useful as a tool for mindlessly keeping the connection
  1265  // healthy, since the client will error if things like client prefaces are not
  1266  // accepted in a timely fashion.
  1267  func keepReading(conn net.Conn) {
  1268  	buf := make([]byte, 1024)
  1269  	for _, err := conn.Read(buf); err == nil; _, err = conn.Read(buf) {
  1270  	}
  1271  }
  1272  
  1273  // stayConnected makes cc stay connected by repeatedly calling cc.Connect()
  1274  // until the state becomes Shutdown or until 10 seconds elapses.
  1275  func stayConnected(cc *ClientConn) {
  1276  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  1277  	defer cancel()
  1278  
  1279  	for {
  1280  		state := cc.GetState()
  1281  		switch state {
  1282  		case connectivity.Idle:
  1283  			cc.Connect()
  1284  		case connectivity.Shutdown:
  1285  			return
  1286  		}
  1287  		if !cc.WaitForStateChange(ctx, state) {
  1288  			return
  1289  		}
  1290  	}
  1291  }
  1292  
  1293  func (s) TestURLAuthorityEscape(t *testing.T) {
  1294  	tests := []struct {
  1295  		name      string
  1296  		authority string
  1297  		want      string
  1298  	}{
  1299  		{
  1300  			name:      "ipv6_authority",
  1301  			authority: "[::1]",
  1302  			want:      "[::1]",
  1303  		},
  1304  		{
  1305  			name:      "with_user_and_host",
  1306  			authority: "userinfo@host:10001",
  1307  			want:      "userinfo@host:10001",
  1308  		},
  1309  		{
  1310  			name:      "with_multiple_slashes",
  1311  			authority: "projects/123/network/abc/service",
  1312  			want:      "projects%2F123%2Fnetwork%2Fabc%2Fservice",
  1313  		},
  1314  		{
  1315  			name:      "all_possible_allowed_chars",
  1316  			authority: "abc123-._~!$&'()*+,;=@:[]",
  1317  			want:      "abc123-._~!$&'()*+,;=@:[]",
  1318  		},
  1319  	}
  1320  
  1321  	for _, test := range tests {
  1322  		t.Run(test.name, func(t *testing.T) {
  1323  			if got, want := encodeAuthority(test.authority), test.want; got != want {
  1324  				t.Errorf("encodeAuthority(%s) = %s, want %s", test.authority, got, test.want)
  1325  			}
  1326  		})
  1327  	}
  1328  }
  1329  

View as plain text