...

Source file src/google.golang.org/grpc/test/pickfirst_test.go

Documentation: google.golang.org/grpc/test

     1  /*
     2   *
     3   * Copyright 2022 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  package test
    20  
    21  import (
    22  	"context"
    23  	"errors"
    24  	"fmt"
    25  	"strings"
    26  	"testing"
    27  	"time"
    28  
    29  	"google.golang.org/grpc"
    30  	"google.golang.org/grpc/backoff"
    31  	"google.golang.org/grpc/codes"
    32  	"google.golang.org/grpc/connectivity"
    33  	"google.golang.org/grpc/credentials/insecure"
    34  	"google.golang.org/grpc/internal"
    35  	"google.golang.org/grpc/internal/channelz"
    36  	"google.golang.org/grpc/internal/grpcrand"
    37  	"google.golang.org/grpc/internal/stubserver"
    38  	"google.golang.org/grpc/internal/testutils"
    39  	"google.golang.org/grpc/internal/testutils/pickfirst"
    40  	"google.golang.org/grpc/resolver"
    41  	"google.golang.org/grpc/resolver/manual"
    42  	"google.golang.org/grpc/serviceconfig"
    43  	"google.golang.org/grpc/status"
    44  
    45  	testgrpc "google.golang.org/grpc/interop/grpc_testing"
    46  	testpb "google.golang.org/grpc/interop/grpc_testing"
    47  )
    48  
    49  const pickFirstServiceConfig = `{"loadBalancingConfig": [{"pick_first":{}}]}`
    50  
    51  // setupPickFirst performs steps required for pick_first tests. It starts a
    52  // bunch of backends exporting the TestService, creates a ClientConn to them
    53  // with service config specifying the use of the pick_first LB policy.
    54  func setupPickFirst(t *testing.T, backendCount int, opts ...grpc.DialOption) (*grpc.ClientConn, *manual.Resolver, []*stubserver.StubServer) {
    55  	t.Helper()
    56  
    57  	r := manual.NewBuilderWithScheme("whatever")
    58  
    59  	backends := make([]*stubserver.StubServer, backendCount)
    60  	addrs := make([]resolver.Address, backendCount)
    61  	for i := 0; i < backendCount; i++ {
    62  		backend := &stubserver.StubServer{
    63  			EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
    64  				return &testpb.Empty{}, nil
    65  			},
    66  		}
    67  		if err := backend.StartServer(); err != nil {
    68  			t.Fatalf("Failed to start backend: %v", err)
    69  		}
    70  		t.Logf("Started TestService backend at: %q", backend.Address)
    71  		t.Cleanup(func() { backend.Stop() })
    72  
    73  		backends[i] = backend
    74  		addrs[i] = resolver.Address{Addr: backend.Address}
    75  	}
    76  
    77  	dopts := []grpc.DialOption{
    78  		grpc.WithTransportCredentials(insecure.NewCredentials()),
    79  		grpc.WithResolvers(r),
    80  		grpc.WithDefaultServiceConfig(pickFirstServiceConfig),
    81  	}
    82  	dopts = append(dopts, opts...)
    83  	cc, err := grpc.NewClient(r.Scheme()+":///test.server", dopts...)
    84  	if err != nil {
    85  		t.Fatalf("grpc.NewClient() failed: %v", err)
    86  	}
    87  	t.Cleanup(func() { cc.Close() })
    88  
    89  	// At this point, the resolver has not returned any addresses to the channel.
    90  	// This RPC must block until the context expires.
    91  	sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
    92  	defer sCancel()
    93  	client := testgrpc.NewTestServiceClient(cc)
    94  	if _, err := client.EmptyCall(sCtx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded {
    95  		t.Fatalf("EmptyCall() = %s, want %s", status.Code(err), codes.DeadlineExceeded)
    96  	}
    97  	return cc, r, backends
    98  }
    99  
   100  // stubBackendsToResolverAddrs converts from a set of stub server backends to
   101  // resolver addresses. Useful when pushing addresses to the manual resolver.
   102  func stubBackendsToResolverAddrs(backends []*stubserver.StubServer) []resolver.Address {
   103  	addrs := make([]resolver.Address, len(backends))
   104  	for i, backend := range backends {
   105  		addrs[i] = resolver.Address{Addr: backend.Address}
   106  	}
   107  	return addrs
   108  }
   109  
   110  // TestPickFirst_OneBackend tests the most basic scenario for pick_first. It
   111  // brings up a single backend and verifies that all RPCs get routed to it.
   112  func (s) TestPickFirst_OneBackend(t *testing.T) {
   113  	cc, r, backends := setupPickFirst(t, 1)
   114  
   115  	addrs := stubBackendsToResolverAddrs(backends)
   116  	r.UpdateState(resolver.State{Addresses: addrs})
   117  
   118  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   119  	defer cancel()
   120  	if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
   121  		t.Fatal(err)
   122  	}
   123  }
   124  
   125  // TestPickFirst_MultipleBackends tests the scenario with multiple backends and
   126  // verifies that all RPCs get routed to the first one.
   127  func (s) TestPickFirst_MultipleBackends(t *testing.T) {
   128  	cc, r, backends := setupPickFirst(t, 2)
   129  
   130  	addrs := stubBackendsToResolverAddrs(backends)
   131  	r.UpdateState(resolver.State{Addresses: addrs})
   132  
   133  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   134  	defer cancel()
   135  	if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
   136  		t.Fatal(err)
   137  	}
   138  }
   139  
   140  // TestPickFirst_OneServerDown tests the scenario where we have multiple
   141  // backends and pick_first is working as expected. Verifies that RPCs get routed
   142  // to the next backend in the list when the first one goes down.
   143  func (s) TestPickFirst_OneServerDown(t *testing.T) {
   144  	cc, r, backends := setupPickFirst(t, 2)
   145  
   146  	addrs := stubBackendsToResolverAddrs(backends)
   147  	r.UpdateState(resolver.State{Addresses: addrs})
   148  
   149  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   150  	defer cancel()
   151  	if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
   152  		t.Fatal(err)
   153  	}
   154  
   155  	// Stop the backend which is currently being used. RPCs should get routed to
   156  	// the next backend in the list.
   157  	backends[0].Stop()
   158  	if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[1]); err != nil {
   159  		t.Fatal(err)
   160  	}
   161  }
   162  
   163  // TestPickFirst_AllServersDown tests the scenario where we have multiple
   164  // backends and pick_first is working as expected. When all backends go down,
   165  // the test verifies that RPCs fail with appropriate status code.
   166  func (s) TestPickFirst_AllServersDown(t *testing.T) {
   167  	cc, r, backends := setupPickFirst(t, 2)
   168  
   169  	addrs := stubBackendsToResolverAddrs(backends)
   170  	r.UpdateState(resolver.State{Addresses: addrs})
   171  
   172  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   173  	defer cancel()
   174  	if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
   175  		t.Fatal(err)
   176  	}
   177  
   178  	for _, b := range backends {
   179  		b.Stop()
   180  	}
   181  
   182  	client := testgrpc.NewTestServiceClient(cc)
   183  	for {
   184  		if ctx.Err() != nil {
   185  			t.Fatalf("channel failed to move to Unavailable after all backends were stopped: %v", ctx.Err())
   186  		}
   187  		if _, err := client.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) == codes.Unavailable {
   188  			return
   189  		}
   190  		time.Sleep(defaultTestShortTimeout)
   191  	}
   192  }
   193  
   194  // TestPickFirst_AddressesRemoved tests the scenario where we have multiple
   195  // backends and pick_first is working as expected. It then verifies that when
   196  // addresses are removed by the name resolver, RPCs get routed appropriately.
   197  func (s) TestPickFirst_AddressesRemoved(t *testing.T) {
   198  	cc, r, backends := setupPickFirst(t, 3)
   199  
   200  	addrs := stubBackendsToResolverAddrs(backends)
   201  	r.UpdateState(resolver.State{Addresses: addrs})
   202  
   203  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   204  	defer cancel()
   205  	if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
   206  		t.Fatal(err)
   207  	}
   208  
   209  	// Remove the first backend from the list of addresses originally pushed.
   210  	// RPCs should get routed to the first backend in the new list.
   211  	r.UpdateState(resolver.State{Addresses: []resolver.Address{addrs[1], addrs[2]}})
   212  	if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[1]); err != nil {
   213  		t.Fatal(err)
   214  	}
   215  
   216  	// Append the backend that we just removed to the end of the list.
   217  	// Nothing should change.
   218  	r.UpdateState(resolver.State{Addresses: []resolver.Address{addrs[1], addrs[2], addrs[0]}})
   219  	if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[1]); err != nil {
   220  		t.Fatal(err)
   221  	}
   222  
   223  	// Remove the first backend from the existing list of addresses.
   224  	// RPCs should get routed to the first backend in the new list.
   225  	r.UpdateState(resolver.State{Addresses: []resolver.Address{addrs[2], addrs[0]}})
   226  	if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[2]); err != nil {
   227  		t.Fatal(err)
   228  	}
   229  
   230  	// Remove the first backend from the existing list of addresses.
   231  	// RPCs should get routed to the first backend in the new list.
   232  	r.UpdateState(resolver.State{Addresses: []resolver.Address{addrs[0]}})
   233  	if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
   234  		t.Fatal(err)
   235  	}
   236  }
   237  
   238  // TestPickFirst_NewAddressWhileBlocking tests the case where pick_first is
   239  // configured on a channel, things are working as expected and then a resolver
   240  // updates removes all addresses. An RPC attempted at this point in time will be
   241  // blocked because there are no valid backends. This test verifies that when new
   242  // backends are added, the RPC is able to complete.
   243  func (s) TestPickFirst_NewAddressWhileBlocking(t *testing.T) {
   244  	cc, r, backends := setupPickFirst(t, 2)
   245  	addrs := stubBackendsToResolverAddrs(backends)
   246  	r.UpdateState(resolver.State{Addresses: addrs})
   247  
   248  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   249  	defer cancel()
   250  	if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
   251  		t.Fatal(err)
   252  	}
   253  
   254  	// Send a resolver update with no addresses. This should push the channel into
   255  	// TransientFailure.
   256  	r.UpdateState(resolver.State{})
   257  	testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)
   258  
   259  	doneCh := make(chan struct{})
   260  	client := testgrpc.NewTestServiceClient(cc)
   261  	go func() {
   262  		// The channel is currently in TransientFailure and this RPC will block
   263  		// until the channel becomes Ready, which will only happen when we push a
   264  		// resolver update with a valid backend address.
   265  		if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
   266  			t.Errorf("EmptyCall() = %v, want <nil>", err)
   267  		}
   268  		close(doneCh)
   269  	}()
   270  
   271  	// Make sure that there is one pending RPC on the ClientConn before attempting
   272  	// to push new addresses through the name resolver. If we don't do this, the
   273  	// resolver update can happen before the above goroutine gets to make the RPC.
   274  	for {
   275  		if err := ctx.Err(); err != nil {
   276  			t.Fatal(err)
   277  		}
   278  		tcs, _ := channelz.GetTopChannels(0, 0)
   279  		if len(tcs) != 1 {
   280  			t.Fatalf("there should only be one top channel, not %d", len(tcs))
   281  		}
   282  		started := tcs[0].ChannelMetrics.CallsStarted.Load()
   283  		completed := tcs[0].ChannelMetrics.CallsSucceeded.Load() + tcs[0].ChannelMetrics.CallsFailed.Load()
   284  		if (started - completed) == 1 {
   285  			break
   286  		}
   287  		time.Sleep(defaultTestShortTimeout)
   288  	}
   289  
   290  	// Send a resolver update with a valid backend to push the channel to Ready
   291  	// and unblock the above RPC.
   292  	r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backends[0].Address}}})
   293  
   294  	select {
   295  	case <-ctx.Done():
   296  		t.Fatal("Timeout when waiting for blocked RPC to complete")
   297  	case <-doneCh:
   298  	}
   299  }
   300  
   301  // TestPickFirst_StickyTransientFailure tests the case where pick_first is
   302  // configured on a channel, and the backend is configured to close incoming
   303  // connections as soon as they are accepted. The test verifies that the channel
   304  // enters TransientFailure and stays there. The test also verifies that the
   305  // pick_first LB policy is constantly trying to reconnect to the backend.
   306  func (s) TestPickFirst_StickyTransientFailure(t *testing.T) {
   307  	// Spin up a local server which closes the connection as soon as it receives
   308  	// one. It also sends a signal on a channel whenver it received a connection.
   309  	lis, err := testutils.LocalTCPListener()
   310  	if err != nil {
   311  		t.Fatalf("Failed to create listener: %v", err)
   312  	}
   313  	t.Cleanup(func() { lis.Close() })
   314  
   315  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   316  	defer cancel()
   317  	connCh := make(chan struct{}, 1)
   318  	go func() {
   319  		for {
   320  			conn, err := lis.Accept()
   321  			if err != nil {
   322  				return
   323  			}
   324  			select {
   325  			case connCh <- struct{}{}:
   326  				conn.Close()
   327  			case <-ctx.Done():
   328  				return
   329  			}
   330  		}
   331  	}()
   332  
   333  	// Dial the above server with a ConnectParams that does a constant backoff
   334  	// of defaultTestShortTimeout duration.
   335  	dopts := []grpc.DialOption{
   336  		grpc.WithTransportCredentials(insecure.NewCredentials()),
   337  		grpc.WithDefaultServiceConfig(pickFirstServiceConfig),
   338  		grpc.WithConnectParams(grpc.ConnectParams{
   339  			Backoff: backoff.Config{
   340  				BaseDelay:  defaultTestShortTimeout,
   341  				Multiplier: float64(0),
   342  				Jitter:     float64(0),
   343  				MaxDelay:   defaultTestShortTimeout,
   344  			},
   345  		}),
   346  	}
   347  	cc, err := grpc.Dial(lis.Addr().String(), dopts...)
   348  	if err != nil {
   349  		t.Fatalf("Failed to dial server at %q: %v", lis.Addr(), err)
   350  	}
   351  	t.Cleanup(func() { cc.Close() })
   352  
   353  	testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)
   354  
   355  	// Spawn a goroutine to ensure that the channel stays in TransientFailure.
   356  	// The call to cc.WaitForStateChange will return false when the main
   357  	// goroutine exits and the context is cancelled.
   358  	go func() {
   359  		if cc.WaitForStateChange(ctx, connectivity.TransientFailure) {
   360  			if state := cc.GetState(); state != connectivity.Shutdown {
   361  				t.Errorf("Unexpected state change from TransientFailure to %s", cc.GetState())
   362  			}
   363  		}
   364  	}()
   365  
   366  	// Ensures that the pick_first LB policy is constantly trying to reconnect.
   367  	for i := 0; i < 10; i++ {
   368  		select {
   369  		case <-connCh:
   370  		case <-time.After(2 * defaultTestShortTimeout):
   371  			t.Error("Timeout when waiting for pick_first to reconnect")
   372  		}
   373  	}
   374  }
   375  
   376  // Tests the PF LB policy with shuffling enabled.
   377  func (s) TestPickFirst_ShuffleAddressList(t *testing.T) {
   378  	const serviceConfig = `{"loadBalancingConfig": [{"pick_first":{ "shuffleAddressList": true }}]}`
   379  
   380  	// Install a shuffler that always reverses two entries.
   381  	origShuf := grpcrand.Shuffle
   382  	defer func() { grpcrand.Shuffle = origShuf }()
   383  	grpcrand.Shuffle = func(n int, f func(int, int)) {
   384  		if n != 2 {
   385  			t.Errorf("Shuffle called with n=%v; want 2", n)
   386  			return
   387  		}
   388  		f(0, 1) // reverse the two addresses
   389  	}
   390  
   391  	// Set up our backends.
   392  	cc, r, backends := setupPickFirst(t, 2)
   393  	addrs := stubBackendsToResolverAddrs(backends)
   394  
   395  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   396  	defer cancel()
   397  
   398  	// Push an update with both addresses and shuffling disabled.  We should
   399  	// connect to backend 0.
   400  	r.UpdateState(resolver.State{Endpoints: []resolver.Endpoint{
   401  		{Addresses: []resolver.Address{addrs[0]}},
   402  		{Addresses: []resolver.Address{addrs[1]}},
   403  	}})
   404  	if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
   405  		t.Fatal(err)
   406  	}
   407  
   408  	// Send a config with shuffling enabled.  This will reverse the addresses,
   409  	// but the channel should still be connected to backend 0.
   410  	shufState := resolver.State{
   411  		ServiceConfig: parseServiceConfig(t, r, serviceConfig),
   412  		Endpoints: []resolver.Endpoint{
   413  			{Addresses: []resolver.Address{addrs[0]}},
   414  			{Addresses: []resolver.Address{addrs[1]}},
   415  		},
   416  	}
   417  	r.UpdateState(shufState)
   418  	if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
   419  		t.Fatal(err)
   420  	}
   421  
   422  	// Send a resolver update with no addresses. This should push the channel
   423  	// into TransientFailure.
   424  	r.UpdateState(resolver.State{})
   425  	testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)
   426  
   427  	// Send the same config as last time with shuffling enabled.  Since we are
   428  	// not connected to backend 0, we should connect to backend 1.
   429  	r.UpdateState(shufState)
   430  	if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[1]); err != nil {
   431  		t.Fatal(err)
   432  	}
   433  }
   434  
   435  // Test config parsing with the env var turned on and off for various scenarios.
   436  func (s) TestPickFirst_ParseConfig_Success(t *testing.T) {
   437  	// Install a shuffler that always reverses two entries.
   438  	origShuf := grpcrand.Shuffle
   439  	defer func() { grpcrand.Shuffle = origShuf }()
   440  	grpcrand.Shuffle = func(n int, f func(int, int)) {
   441  		if n != 2 {
   442  			t.Errorf("Shuffle called with n=%v; want 2", n)
   443  			return
   444  		}
   445  		f(0, 1) // reverse the two addresses
   446  	}
   447  
   448  	tests := []struct {
   449  		name          string
   450  		serviceConfig string
   451  		wantFirstAddr bool
   452  	}{
   453  		{
   454  			name:          "empty pickfirst config",
   455  			serviceConfig: `{"loadBalancingConfig": [{"pick_first":{}}]}`,
   456  			wantFirstAddr: true,
   457  		},
   458  		{
   459  			name:          "empty good pickfirst config",
   460  			serviceConfig: `{"loadBalancingConfig": [{"pick_first":{ "shuffleAddressList": true }}]}`,
   461  			wantFirstAddr: false,
   462  		},
   463  	}
   464  
   465  	for _, test := range tests {
   466  		t.Run(test.name, func(t *testing.T) {
   467  			// Set up our backends.
   468  			cc, r, backends := setupPickFirst(t, 2)
   469  			addrs := stubBackendsToResolverAddrs(backends)
   470  
   471  			r.UpdateState(resolver.State{
   472  				ServiceConfig: parseServiceConfig(t, r, test.serviceConfig),
   473  				Addresses:     addrs,
   474  			})
   475  
   476  			// Some tests expect address shuffling to happen, and indicate that
   477  			// by setting wantFirstAddr to false (since our shuffling function
   478  			// defined at the top of this test, simply reverses the list of
   479  			// addresses provided to it).
   480  			wantAddr := addrs[0]
   481  			if !test.wantFirstAddr {
   482  				wantAddr = addrs[1]
   483  			}
   484  
   485  			ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   486  			defer cancel()
   487  			if err := pickfirst.CheckRPCsToBackend(ctx, cc, wantAddr); err != nil {
   488  				t.Fatal(err)
   489  			}
   490  		})
   491  	}
   492  }
   493  
   494  // Test config parsing for a bad service config.
   495  func (s) TestPickFirst_ParseConfig_Failure(t *testing.T) {
   496  	// Service config should fail with the below config. Name resolvers are
   497  	// expected to perform this parsing before they push the parsed service
   498  	// config to the channel.
   499  	const sc = `{"loadBalancingConfig": [{"pick_first":{ "shuffleAddressList": 666 }}]}`
   500  	scpr := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(sc)
   501  	if scpr.Err == nil {
   502  		t.Fatalf("ParseConfig() succeeded and returned %+v, when expected to fail", scpr)
   503  	}
   504  }
   505  
   506  // setupPickFirstWithListenerWrapper is very similar to setupPickFirst, but uses
   507  // a wrapped listener that the test can use to track accepted connections.
   508  func setupPickFirstWithListenerWrapper(t *testing.T, backendCount int, opts ...grpc.DialOption) (*grpc.ClientConn, *manual.Resolver, []*stubserver.StubServer, []*testutils.ListenerWrapper) {
   509  	t.Helper()
   510  
   511  	backends := make([]*stubserver.StubServer, backendCount)
   512  	addrs := make([]resolver.Address, backendCount)
   513  	listeners := make([]*testutils.ListenerWrapper, backendCount)
   514  	for i := 0; i < backendCount; i++ {
   515  		lis := testutils.NewListenerWrapper(t, nil)
   516  		backend := &stubserver.StubServer{
   517  			Listener: lis,
   518  			EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
   519  				return &testpb.Empty{}, nil
   520  			},
   521  		}
   522  		if err := backend.StartServer(); err != nil {
   523  			t.Fatalf("Failed to start backend: %v", err)
   524  		}
   525  		t.Logf("Started TestService backend at: %q", backend.Address)
   526  		t.Cleanup(func() { backend.Stop() })
   527  
   528  		backends[i] = backend
   529  		addrs[i] = resolver.Address{Addr: backend.Address}
   530  		listeners[i] = lis
   531  	}
   532  
   533  	r := manual.NewBuilderWithScheme("whatever")
   534  	dopts := []grpc.DialOption{
   535  		grpc.WithTransportCredentials(insecure.NewCredentials()),
   536  		grpc.WithResolvers(r),
   537  		grpc.WithDefaultServiceConfig(pickFirstServiceConfig),
   538  	}
   539  	dopts = append(dopts, opts...)
   540  	cc, err := grpc.NewClient(r.Scheme()+":///test.server", dopts...)
   541  	if err != nil {
   542  		t.Fatalf("grpc.NewClient() failed: %v", err)
   543  	}
   544  	t.Cleanup(func() { cc.Close() })
   545  
   546  	// At this point, the resolver has not returned any addresses to the channel.
   547  	// This RPC must block until the context expires.
   548  	sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
   549  	defer sCancel()
   550  	client := testgrpc.NewTestServiceClient(cc)
   551  	if _, err := client.EmptyCall(sCtx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded {
   552  		t.Fatalf("EmptyCall() = %s, want %s", status.Code(err), codes.DeadlineExceeded)
   553  	}
   554  	return cc, r, backends, listeners
   555  }
   556  
   557  // TestPickFirst_AddressUpdateWithAttributes tests the case where an address
   558  // update received by the pick_first LB policy differs in attributes. Addresses
   559  // which differ in attributes are considered different from the perspective of
   560  // subconn creation and connection establishment and the test verifies that new
   561  // connections are created when attributes change.
   562  func (s) TestPickFirst_AddressUpdateWithAttributes(t *testing.T) {
   563  	cc, r, backends, listeners := setupPickFirstWithListenerWrapper(t, 2)
   564  
   565  	// Add a set of attributes to the addresses before pushing them to the
   566  	// pick_first LB policy through the manual resolver.
   567  	addrs := stubBackendsToResolverAddrs(backends)
   568  	for i := range addrs {
   569  		addrs[i].Attributes = addrs[i].Attributes.WithValue("test-attribute-1", fmt.Sprintf("%d", i))
   570  	}
   571  	r.UpdateState(resolver.State{Addresses: addrs})
   572  
   573  	// Ensure that RPCs succeed to the first backend in the list.
   574  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   575  	defer cancel()
   576  	if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
   577  		t.Fatal(err)
   578  	}
   579  
   580  	// Grab the wrapped connection from the listener wrapper. This will be used
   581  	// to verify the connection is closed.
   582  	val, err := listeners[0].NewConnCh.Receive(ctx)
   583  	if err != nil {
   584  		t.Fatalf("Failed to receive new connection from wrapped listener: %v", err)
   585  	}
   586  	conn := val.(*testutils.ConnWrapper)
   587  
   588  	// Add another set of attributes to the addresses, and push them to the
   589  	// pick_first LB policy through the manual resolver. Leave the order of the
   590  	// addresses unchanged.
   591  	for i := range addrs {
   592  		addrs[i].Attributes = addrs[i].Attributes.WithValue("test-attribute-2", fmt.Sprintf("%d", i))
   593  	}
   594  	r.UpdateState(resolver.State{Addresses: addrs})
   595  	if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
   596  		t.Fatal(err)
   597  	}
   598  
   599  	// A change in the address attributes results in the new address being
   600  	// considered different to the current address. This will result in the old
   601  	// connection being closed and a new connection to the same backend (since
   602  	// address order is not modified).
   603  	if _, err := conn.CloseCh.Receive(ctx); err != nil {
   604  		t.Fatalf("Timeout when expecting existing connection to be closed: %v", err)
   605  	}
   606  	val, err = listeners[0].NewConnCh.Receive(ctx)
   607  	if err != nil {
   608  		t.Fatalf("Failed to receive new connection from wrapped listener: %v", err)
   609  	}
   610  	conn = val.(*testutils.ConnWrapper)
   611  
   612  	// Add another set of attributes to the addresses, and push them to the
   613  	// pick_first LB policy through the manual resolver.  Reverse of the order
   614  	// of addresses.
   615  	for i := range addrs {
   616  		addrs[i].Attributes = addrs[i].Attributes.WithValue("test-attribute-3", fmt.Sprintf("%d", i))
   617  	}
   618  	addrs[0], addrs[1] = addrs[1], addrs[0]
   619  	r.UpdateState(resolver.State{Addresses: addrs})
   620  	if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
   621  		t.Fatal(err)
   622  	}
   623  
   624  	// Ensure that the old connection is closed and a new connection is
   625  	// established to the first address in the new list.
   626  	if _, err := conn.CloseCh.Receive(ctx); err != nil {
   627  		t.Fatalf("Timeout when expecting existing connection to be closed: %v", err)
   628  	}
   629  	_, err = listeners[1].NewConnCh.Receive(ctx)
   630  	if err != nil {
   631  		t.Fatalf("Failed to receive new connection from wrapped listener: %v", err)
   632  	}
   633  }
   634  
   635  // TestPickFirst_AddressUpdateWithBalancerAttributes tests the case where an
   636  // address update received by the pick_first LB policy differs in balancer
   637  // attributes, which are meant only for consumption by LB policies. In this
   638  // case, the test verifies that new connections are not created when the address
   639  // update only changes the balancer attributes.
   640  func (s) TestPickFirst_AddressUpdateWithBalancerAttributes(t *testing.T) {
   641  	cc, r, backends, listeners := setupPickFirstWithListenerWrapper(t, 2)
   642  
   643  	// Add a set of balancer attributes to the addresses before pushing them to
   644  	// the pick_first LB policy through the manual resolver.
   645  	addrs := stubBackendsToResolverAddrs(backends)
   646  	for i := range addrs {
   647  		addrs[i].BalancerAttributes = addrs[i].BalancerAttributes.WithValue("test-attribute-1", fmt.Sprintf("%d", i))
   648  	}
   649  	r.UpdateState(resolver.State{Addresses: addrs})
   650  
   651  	// Ensure that RPCs succeed to the expected backend.
   652  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   653  	defer cancel()
   654  	if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
   655  		t.Fatal(err)
   656  	}
   657  
   658  	// Grab the wrapped connection from the listener wrapper. This will be used
   659  	// to verify the connection is not closed.
   660  	val, err := listeners[0].NewConnCh.Receive(ctx)
   661  	if err != nil {
   662  		t.Fatalf("Failed to receive new connection from wrapped listener: %v", err)
   663  	}
   664  	conn := val.(*testutils.ConnWrapper)
   665  
   666  	// Add a set of balancer attributes to the addresses before pushing them to
   667  	// the pick_first LB policy through the manual resolver. Leave the order of
   668  	// the addresses unchanged.
   669  	for i := range addrs {
   670  		addrs[i].BalancerAttributes = addrs[i].BalancerAttributes.WithValue("test-attribute-2", fmt.Sprintf("%d", i))
   671  	}
   672  	r.UpdateState(resolver.State{Addresses: addrs})
   673  
   674  	// Ensure that no new connection is established, and ensure that the old
   675  	// connection is not closed.
   676  	for i := range listeners {
   677  		sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
   678  		defer sCancel()
   679  		if _, err := listeners[i].NewConnCh.Receive(sCtx); err != context.DeadlineExceeded {
   680  			t.Fatalf("Unexpected error when expecting no new connection: %v", err)
   681  		}
   682  	}
   683  	sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
   684  	defer sCancel()
   685  	if _, err := conn.CloseCh.Receive(sCtx); err != context.DeadlineExceeded {
   686  		t.Fatalf("Unexpected error when expecting existing connection to stay active: %v", err)
   687  	}
   688  	if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
   689  		t.Fatal(err)
   690  	}
   691  
   692  	// Add a set of balancer attributes to the addresses before pushing them to
   693  	// the pick_first LB policy through the manual resolver. Reverse of the
   694  	// order of addresses.
   695  	for i := range addrs {
   696  		addrs[i].BalancerAttributes = addrs[i].BalancerAttributes.WithValue("test-attribute-3", fmt.Sprintf("%d", i))
   697  	}
   698  	addrs[0], addrs[1] = addrs[1], addrs[0]
   699  	r.UpdateState(resolver.State{Addresses: addrs})
   700  
   701  	// Ensure that no new connection is established, and ensure that the old
   702  	// connection is not closed.
   703  	for i := range listeners {
   704  		sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
   705  		defer sCancel()
   706  		if _, err := listeners[i].NewConnCh.Receive(sCtx); err != context.DeadlineExceeded {
   707  			t.Fatalf("Unexpected error when expecting no new connection: %v", err)
   708  		}
   709  	}
   710  	sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortTimeout)
   711  	defer sCancel()
   712  	if _, err := conn.CloseCh.Receive(sCtx); err != context.DeadlineExceeded {
   713  		t.Fatalf("Unexpected error when expecting existing connection to stay active: %v", err)
   714  	}
   715  	if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[1]); err != nil {
   716  		t.Fatal(err)
   717  	}
   718  }
   719  
   720  // Tests the case where the pick_first LB policy receives an error from the name
   721  // resolver without previously receiving a good update. Verifies that the
   722  // channel moves to TRANSIENT_FAILURE and that error received from the name
   723  // resolver is propagated to the caller of an RPC.
   724  func (s) TestPickFirst_ResolverError_NoPreviousUpdate(t *testing.T) {
   725  	cc, r, _ := setupPickFirst(t, 0)
   726  
   727  	nrErr := errors.New("error from name resolver")
   728  	r.ReportError(nrErr)
   729  
   730  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   731  	defer cancel()
   732  	testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)
   733  
   734  	client := testgrpc.NewTestServiceClient(cc)
   735  	_, err := client.EmptyCall(ctx, &testpb.Empty{})
   736  	if err == nil {
   737  		t.Fatalf("EmptyCall() succeeded when expected to fail with error: %v", nrErr)
   738  	}
   739  	if !strings.Contains(err.Error(), nrErr.Error()) {
   740  		t.Fatalf("EmptyCall() failed with error: %v, want error: %v", err, nrErr)
   741  	}
   742  }
   743  
   744  // Tests the case where the pick_first LB policy receives an error from the name
   745  // resolver after receiving a good update (and the channel is currently READY).
   746  // The test verifies that the channel continues to use the previously received
   747  // good update.
   748  func (s) TestPickFirst_ResolverError_WithPreviousUpdate_Ready(t *testing.T) {
   749  	cc, r, backends := setupPickFirst(t, 1)
   750  
   751  	addrs := stubBackendsToResolverAddrs(backends)
   752  	r.UpdateState(resolver.State{Addresses: addrs})
   753  
   754  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   755  	defer cancel()
   756  	if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
   757  		t.Fatal(err)
   758  	}
   759  
   760  	nrErr := errors.New("error from name resolver")
   761  	r.ReportError(nrErr)
   762  
   763  	// Ensure that RPCs continue to succeed for the next second.
   764  	client := testgrpc.NewTestServiceClient(cc)
   765  	for end := time.Now().Add(time.Second); time.Now().Before(end); <-time.After(defaultTestShortTimeout) {
   766  		if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
   767  			t.Fatalf("EmptyCall() failed: %v", err)
   768  		}
   769  	}
   770  }
   771  
   772  // Tests the case where the pick_first LB policy receives an error from the name
   773  // resolver after receiving a good update (and the channel is currently in
   774  // CONNECTING state). The test verifies that the channel continues to use the
   775  // previously received good update, and that RPCs don't fail with the error
   776  // received from the name resolver.
   777  func (s) TestPickFirst_ResolverError_WithPreviousUpdate_Connecting(t *testing.T) {
   778  	lis, err := testutils.LocalTCPListener()
   779  	if err != nil {
   780  		t.Fatalf("net.Listen() failed: %v", err)
   781  	}
   782  
   783  	// Listen on a local port and act like a server that blocks until the
   784  	// channel reaches CONNECTING and closes the connection without sending a
   785  	// server preface.
   786  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   787  	defer cancel()
   788  	waitForConnecting := make(chan struct{})
   789  	go func() {
   790  		conn, err := lis.Accept()
   791  		if err != nil {
   792  			t.Errorf("Unexpected error when accepting a connection: %v", err)
   793  		}
   794  		defer conn.Close()
   795  
   796  		select {
   797  		case <-waitForConnecting:
   798  		case <-ctx.Done():
   799  			t.Error("Timeout when waiting for channel to move to CONNECTING state")
   800  		}
   801  	}()
   802  
   803  	r := manual.NewBuilderWithScheme("whatever")
   804  	dopts := []grpc.DialOption{
   805  		grpc.WithTransportCredentials(insecure.NewCredentials()),
   806  		grpc.WithResolvers(r),
   807  		grpc.WithDefaultServiceConfig(pickFirstServiceConfig),
   808  	}
   809  	cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...)
   810  	if err != nil {
   811  		t.Fatalf("grpc.Dial() failed: %v", err)
   812  	}
   813  	t.Cleanup(func() { cc.Close() })
   814  
   815  	addrs := []resolver.Address{{Addr: lis.Addr().String()}}
   816  	r.UpdateState(resolver.State{Addresses: addrs})
   817  	testutils.AwaitState(ctx, t, cc, connectivity.Connecting)
   818  
   819  	nrErr := errors.New("error from name resolver")
   820  	r.ReportError(nrErr)
   821  
   822  	// RPCs should fail with deadline exceed error as long as they are in
   823  	// CONNECTING and not the error returned by the name resolver.
   824  	client := testgrpc.NewTestServiceClient(cc)
   825  	sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
   826  	defer sCancel()
   827  	if _, err := client.EmptyCall(sCtx, &testpb.Empty{}); !strings.Contains(err.Error(), context.DeadlineExceeded.Error()) {
   828  		t.Fatalf("EmptyCall() failed with error: %v, want error: %v", err, context.DeadlineExceeded)
   829  	}
   830  
   831  	// Closing this channel leads to closing of the connection by our listener.
   832  	// gRPC should see this as a connection error.
   833  	close(waitForConnecting)
   834  	testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)
   835  	checkForConnectionError(ctx, t, cc)
   836  }
   837  
   838  // Tests the case where the pick_first LB policy receives an error from the name
   839  // resolver after receiving a good update. The previous good update though has
   840  // seen the channel move to TRANSIENT_FAILURE.  The test verifies that the
   841  // channel fails RPCs with the new error from the resolver.
   842  func (s) TestPickFirst_ResolverError_WithPreviousUpdate_TransientFailure(t *testing.T) {
   843  	lis, err := testutils.LocalTCPListener()
   844  	if err != nil {
   845  		t.Fatalf("net.Listen() failed: %v", err)
   846  	}
   847  
   848  	// Listen on a local port and act like a server that closes the connection
   849  	// without sending a server preface.
   850  	go func() {
   851  		conn, err := lis.Accept()
   852  		if err != nil {
   853  			t.Errorf("Unexpected error when accepting a connection: %v", err)
   854  		}
   855  		conn.Close()
   856  	}()
   857  
   858  	r := manual.NewBuilderWithScheme("whatever")
   859  	dopts := []grpc.DialOption{
   860  		grpc.WithTransportCredentials(insecure.NewCredentials()),
   861  		grpc.WithResolvers(r),
   862  		grpc.WithDefaultServiceConfig(pickFirstServiceConfig),
   863  	}
   864  	cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...)
   865  	if err != nil {
   866  		t.Fatalf("grpc.Dial() failed: %v", err)
   867  	}
   868  	t.Cleanup(func() { cc.Close() })
   869  
   870  	addrs := []resolver.Address{{Addr: lis.Addr().String()}}
   871  	r.UpdateState(resolver.State{Addresses: addrs})
   872  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   873  	defer cancel()
   874  	testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)
   875  	checkForConnectionError(ctx, t, cc)
   876  
   877  	// An error from the name resolver should result in RPCs failing with that
   878  	// error instead of the old error that caused the channel to move to
   879  	// TRANSIENT_FAILURE in the first place.
   880  	nrErr := errors.New("error from name resolver")
   881  	r.ReportError(nrErr)
   882  	client := testgrpc.NewTestServiceClient(cc)
   883  	for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) {
   884  		if _, err := client.EmptyCall(ctx, &testpb.Empty{}); strings.Contains(err.Error(), nrErr.Error()) {
   885  			break
   886  		}
   887  	}
   888  	if ctx.Err() != nil {
   889  		t.Fatal("Timeout when waiting for RPCs to fail with error returned by the name resolver")
   890  	}
   891  }
   892  
   893  func checkForConnectionError(ctx context.Context, t *testing.T, cc *grpc.ClientConn) {
   894  	t.Helper()
   895  
   896  	// RPCs may fail on the client side in two ways, once the fake server closes
   897  	// the accepted connection:
   898  	// - writing the client preface succeeds, but not reading the server preface
   899  	// - writing the client preface fails
   900  	// In either case, we should see it fail with UNAVAILABLE.
   901  	client := testgrpc.NewTestServiceClient(cc)
   902  	if _, err := client.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Unavailable {
   903  		t.Fatalf("EmptyCall() failed with error: %v, want code %v", err, codes.Unavailable)
   904  	}
   905  }
   906  
   907  // Tests the case where the pick_first LB policy receives an update from the
   908  // name resolver with no addresses after receiving a good update. The test
   909  // verifies that the channel fails RPCs with an error indicating the fact that
   910  // the name resolver returned no addresses.
   911  func (s) TestPickFirst_ResolverError_ZeroAddresses_WithPreviousUpdate(t *testing.T) {
   912  	cc, r, backends := setupPickFirst(t, 1)
   913  
   914  	addrs := stubBackendsToResolverAddrs(backends)
   915  	r.UpdateState(resolver.State{Addresses: addrs})
   916  
   917  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   918  	defer cancel()
   919  	if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
   920  		t.Fatal(err)
   921  	}
   922  
   923  	r.UpdateState(resolver.State{})
   924  	wantErr := "produced zero addresses"
   925  	client := testgrpc.NewTestServiceClient(cc)
   926  	for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) {
   927  		if _, err := client.EmptyCall(ctx, &testpb.Empty{}); strings.Contains(err.Error(), wantErr) {
   928  			break
   929  		}
   930  	}
   931  	if ctx.Err() != nil {
   932  		t.Fatal("Timeout when waiting for RPCs to fail with error returned by the name resolver")
   933  	}
   934  }
   935  

View as plain text