...

Source file src/google.golang.org/grpc/test/balancer_switching_test.go

Documentation: google.golang.org/grpc/test

     1  /*
     2   *
     3   * Copyright 2022 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  package test
    20  
    21  import (
    22  	"context"
    23  	"fmt"
    24  	"testing"
    25  
    26  	"google.golang.org/grpc"
    27  	"google.golang.org/grpc/balancer"
    28  	grpclbstate "google.golang.org/grpc/balancer/grpclb/state"
    29  	"google.golang.org/grpc/credentials/insecure"
    30  	"google.golang.org/grpc/internal"
    31  	"google.golang.org/grpc/internal/balancer/stub"
    32  	"google.golang.org/grpc/internal/stubserver"
    33  	"google.golang.org/grpc/internal/testutils/fakegrpclb"
    34  	"google.golang.org/grpc/internal/testutils/pickfirst"
    35  	rrutil "google.golang.org/grpc/internal/testutils/roundrobin"
    36  	"google.golang.org/grpc/resolver"
    37  	"google.golang.org/grpc/resolver/manual"
    38  
    39  	testgrpc "google.golang.org/grpc/interop/grpc_testing"
    40  	testpb "google.golang.org/grpc/interop/grpc_testing"
    41  )
    42  
    43  const (
    44  	loadBalancedServiceName = "foo.bar.service"
    45  	loadBalancedServicePort = 443
    46  	wantGRPCLBTraceDesc     = `Channel switches to new LB policy "grpclb"`
    47  	wantRoundRobinTraceDesc = `Channel switches to new LB policy "round_robin"`
    48  
    49  	// This is the number of stub backends set up at the start of each test. The
    50  	// first backend is used for the "grpclb" policy and the rest are used for
    51  	// other LB policies to test balancer switching.
    52  	backendCount = 3
    53  )
    54  
    55  // setupBackendsAndFakeGRPCLB sets up backendCount number of stub server
    56  // backends and a fake grpclb server for tests which exercise balancer switch
    57  // scenarios involving grpclb.
    58  //
    59  // The fake grpclb server always returns the first of the configured stub
    60  // backends as backend addresses. So, the tests are free to use the other
    61  // backends with other LB policies to verify balancer switching scenarios.
    62  //
    63  // Returns a cleanup function to be invoked by the caller.
    64  func setupBackendsAndFakeGRPCLB(t *testing.T) ([]*stubserver.StubServer, *fakegrpclb.Server, func()) {
    65  	backends, backendsCleanup := startBackendsForBalancerSwitch(t)
    66  
    67  	lbServer, err := fakegrpclb.NewServer(fakegrpclb.ServerParams{
    68  		LoadBalancedServiceName: loadBalancedServiceName,
    69  		LoadBalancedServicePort: loadBalancedServicePort,
    70  		BackendAddresses:        []string{backends[0].Address},
    71  	})
    72  	if err != nil {
    73  		t.Fatalf("failed to create fake grpclb server: %v", err)
    74  	}
    75  	go func() {
    76  		if err := lbServer.Serve(); err != nil {
    77  			t.Errorf("fake grpclb Serve() failed: %v", err)
    78  		}
    79  	}()
    80  
    81  	return backends, lbServer, func() {
    82  		backendsCleanup()
    83  		lbServer.Stop()
    84  	}
    85  }
    86  
    87  // startBackendsForBalancerSwitch spins up a bunch of stub server backends
    88  // exposing the TestService. Returns a cleanup function to be invoked by the
    89  // caller.
    90  func startBackendsForBalancerSwitch(t *testing.T) ([]*stubserver.StubServer, func()) {
    91  	t.Helper()
    92  
    93  	backends := make([]*stubserver.StubServer, backendCount)
    94  	for i := 0; i < backendCount; i++ {
    95  		backend := &stubserver.StubServer{
    96  			EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { return &testpb.Empty{}, nil },
    97  		}
    98  		if err := backend.StartServer(); err != nil {
    99  			t.Fatalf("Failed to start backend: %v", err)
   100  		}
   101  		t.Logf("Started TestService backend at: %q", backend.Address)
   102  		backends[i] = backend
   103  	}
   104  	return backends, func() {
   105  		for _, b := range backends {
   106  			b.Stop()
   107  		}
   108  	}
   109  }
   110  
   111  // TestBalancerSwitch_Basic tests the basic scenario of switching from one LB
   112  // policy to another, as specified in the service config.
   113  func (s) TestBalancerSwitch_Basic(t *testing.T) {
   114  	backends, cleanup := startBackendsForBalancerSwitch(t)
   115  	defer cleanup()
   116  	addrs := stubBackendsToResolverAddrs(backends)
   117  
   118  	r := manual.NewBuilderWithScheme("whatever")
   119  	cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
   120  	if err != nil {
   121  		t.Fatalf("grpc.Dial() failed: %v", err)
   122  	}
   123  	defer cc.Close()
   124  
   125  	// Push a resolver update without an LB policy in the service config. The
   126  	// channel should pick the default LB policy, which is pick_first.
   127  	r.UpdateState(resolver.State{Addresses: addrs})
   128  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   129  	defer cancel()
   130  	if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
   131  		t.Fatal(err)
   132  	}
   133  
   134  	// Push a resolver update with the service config specifying "round_robin".
   135  	r.UpdateState(resolver.State{
   136  		Addresses:     addrs,
   137  		ServiceConfig: parseServiceConfig(t, r, rrServiceConfig),
   138  	})
   139  	client := testgrpc.NewTestServiceClient(cc)
   140  	if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs); err != nil {
   141  		t.Fatal(err)
   142  	}
   143  
   144  	// Push a resolver update with the service config specifying "pick_first".
   145  	r.UpdateState(resolver.State{
   146  		Addresses:     addrs,
   147  		ServiceConfig: parseServiceConfig(t, r, pickFirstServiceConfig),
   148  	})
   149  	if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
   150  		t.Fatal(err)
   151  	}
   152  }
   153  
   154  // TestBalancerSwitch_grpclbToPickFirst tests the scenario where the channel
   155  // starts off "grpclb", switches to "pick_first" and back.
   156  func (s) TestBalancerSwitch_grpclbToPickFirst(t *testing.T) {
   157  	backends, lbServer, cleanup := setupBackendsAndFakeGRPCLB(t)
   158  	defer cleanup()
   159  
   160  	addrs := stubBackendsToResolverAddrs(backends)
   161  	r := manual.NewBuilderWithScheme("whatever")
   162  	target := fmt.Sprintf("%s:///%s", r.Scheme(), loadBalancedServiceName)
   163  	cc, err := grpc.Dial(target, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
   164  	if err != nil {
   165  		t.Fatalf("grpc.Dial() failed: %v", err)
   166  	}
   167  	defer cc.Close()
   168  
   169  	// Push a resolver update with a GRPCLB service config and a single address
   170  	// pointing to the grpclb server we created above. This will cause the
   171  	// channel to switch to the "grpclb" balancer, which returns a single
   172  	// backend address.
   173  	grpclbConfig := parseServiceConfig(t, r, `{"loadBalancingPolicy": "grpclb"}`)
   174  	state := resolver.State{ServiceConfig: grpclbConfig}
   175  	r.UpdateState(grpclbstate.Set(state, &grpclbstate.State{BalancerAddresses: []resolver.Address{{Addr: lbServer.Address()}}}))
   176  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   177  	defer cancel()
   178  	client := testgrpc.NewTestServiceClient(cc)
   179  	if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[0:1]); err != nil {
   180  		t.Fatal(err)
   181  	}
   182  
   183  	// Push a resolver update containing a non-existent grpclb server address.
   184  	// This should not lead to a balancer switch.
   185  	const nonExistentServer = "non-existent-grpclb-server-address"
   186  	r.UpdateState(grpclbstate.Set(state, &grpclbstate.State{BalancerAddresses: []resolver.Address{{Addr: nonExistentServer}}}))
   187  	if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[:1]); err != nil {
   188  		t.Fatal(err)
   189  	}
   190  
   191  	// Push a resolver update containing no grpclb server address. This should
   192  	// lead to the channel using the default LB policy which is pick_first. The
   193  	// list of addresses pushed as part of this update is different from the one
   194  	// returned by the "grpclb" balancer. So, we should see RPCs going to the
   195  	// newly configured backends, as part of the balancer switch.
   196  	emptyConfig := parseServiceConfig(t, r, `{}`)
   197  	r.UpdateState(resolver.State{Addresses: addrs[1:], ServiceConfig: emptyConfig})
   198  	if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[1]); err != nil {
   199  		t.Fatal(err)
   200  	}
   201  }
   202  
   203  // TestBalancerSwitch_pickFirstToGRPCLB tests the scenario where the channel
   204  // starts off with "pick_first", switches to "grpclb" and back.
   205  func (s) TestBalancerSwitch_pickFirstToGRPCLB(t *testing.T) {
   206  	backends, lbServer, cleanup := setupBackendsAndFakeGRPCLB(t)
   207  	defer cleanup()
   208  
   209  	addrs := stubBackendsToResolverAddrs(backends)
   210  	r := manual.NewBuilderWithScheme("whatever")
   211  	target := fmt.Sprintf("%s:///%s", r.Scheme(), loadBalancedServiceName)
   212  	cc, err := grpc.Dial(target, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
   213  	if err != nil {
   214  		t.Fatalf("grpc.Dial() failed: %v", err)
   215  	}
   216  	defer cc.Close()
   217  
   218  	// Push a resolver update containing no grpclb server address. This should
   219  	// lead to the channel using the default LB policy which is pick_first.
   220  	r.UpdateState(resolver.State{Addresses: addrs[1:]})
   221  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   222  	defer cancel()
   223  	if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[1]); err != nil {
   224  		t.Fatal(err)
   225  	}
   226  
   227  	// Push a resolver update with no service config and a single address pointing
   228  	// to the grpclb server we created above. This will cause the channel to
   229  	// switch to the "grpclb" balancer, which returns a single backend address.
   230  	grpclbConfig := parseServiceConfig(t, r, `{"loadBalancingPolicy": "grpclb"}`)
   231  	state := resolver.State{ServiceConfig: grpclbConfig}
   232  	r.UpdateState(grpclbstate.Set(state, &grpclbstate.State{BalancerAddresses: []resolver.Address{{Addr: lbServer.Address()}}}))
   233  	client := testgrpc.NewTestServiceClient(cc)
   234  	if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[:1]); err != nil {
   235  		t.Fatal(err)
   236  	}
   237  
   238  	// Push a resolver update containing a non-existent grpclb server address.
   239  	// This should not lead to a balancer switch.
   240  	r.UpdateState(grpclbstate.Set(state, &grpclbstate.State{BalancerAddresses: []resolver.Address{{Addr: "nonExistentServer"}}}))
   241  	if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[:1]); err != nil {
   242  		t.Fatal(err)
   243  	}
   244  
   245  	// Switch to "pick_first" again by sending no grpclb server addresses.
   246  	emptyConfig := parseServiceConfig(t, r, `{}`)
   247  	r.UpdateState(resolver.State{Addresses: addrs[1:], ServiceConfig: emptyConfig})
   248  	if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[1]); err != nil {
   249  		t.Fatal(err)
   250  	}
   251  }
   252  
   253  // TestBalancerSwitch_RoundRobinToGRPCLB tests the scenario where the channel
   254  // starts off with "round_robin", switches to "grpclb" and back.
   255  //
   256  // Note that this test uses the deprecated `loadBalancingPolicy` field in the
   257  // service config.
   258  func (s) TestBalancerSwitch_RoundRobinToGRPCLB(t *testing.T) {
   259  	backends, lbServer, cleanup := setupBackendsAndFakeGRPCLB(t)
   260  	defer cleanup()
   261  
   262  	addrs := stubBackendsToResolverAddrs(backends)
   263  	r := manual.NewBuilderWithScheme("whatever")
   264  	target := fmt.Sprintf("%s:///%s", r.Scheme(), loadBalancedServiceName)
   265  	cc, err := grpc.Dial(target, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
   266  	if err != nil {
   267  		t.Fatalf("grpc.Dial() failed: %v", err)
   268  	}
   269  	defer cc.Close()
   270  
   271  	// Note the use of the deprecated `loadBalancingPolicy` field here instead
   272  	// of the now recommended `loadBalancingConfig` field. The logic in the
   273  	// ClientConn which decides which balancer to switch to looks at the
   274  	// following places in the given order of preference:
   275  	// - `loadBalancingConfig` field
   276  	// - addresses of type grpclb
   277  	// - `loadBalancingPolicy` field
   278  	// If we use the `loadBalancingPolicy` field, the switch to "grpclb" later on
   279  	// in the test will not happen as the ClientConn will continue to use the LB
   280  	// policy received in the first update.
   281  	scpr := parseServiceConfig(t, r, `{"loadBalancingPolicy": "round_robin"}`)
   282  
   283  	// Push a resolver update with the service config specifying "round_robin".
   284  	r.UpdateState(resolver.State{Addresses: addrs[1:], ServiceConfig: scpr})
   285  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   286  	defer cancel()
   287  	client := testgrpc.NewTestServiceClient(cc)
   288  	if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[1:]); err != nil {
   289  		t.Fatal(err)
   290  	}
   291  
   292  	// Push a resolver update with grpclb and a single balancer address
   293  	// pointing to the grpclb server we created above. This will cause the
   294  	// channel to switch to the "grpclb" balancer, which returns a single
   295  	// backend address.
   296  	grpclbConfig := parseServiceConfig(t, r, `{"loadBalancingPolicy": "grpclb"}`)
   297  	state := resolver.State{ServiceConfig: grpclbConfig}
   298  	r.UpdateState(grpclbstate.Set(state, &grpclbstate.State{BalancerAddresses: []resolver.Address{{Addr: lbServer.Address()}}}))
   299  	if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[:1]); err != nil {
   300  		t.Fatal(err)
   301  	}
   302  
   303  	// Switch back to "round_robin".
   304  	r.UpdateState(resolver.State{Addresses: addrs[1:], ServiceConfig: scpr})
   305  	if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[1:]); err != nil {
   306  		t.Fatal(err)
   307  	}
   308  }
   309  
   310  // TestBalancerSwitch_grpclbNotRegistered tests the scenario where the grpclb
   311  // balancer is not registered. Verifies that the ClientConn falls back to the
   312  // default LB policy or the LB policy specified in the service config, and that
   313  // addresses of type "grpclb" are filtered out.
   314  func (s) TestBalancerSwitch_grpclbNotRegistered(t *testing.T) {
   315  	// Unregister the grpclb balancer builder for the duration of this test.
   316  	grpclbBuilder := balancer.Get("grpclb")
   317  	internal.BalancerUnregister(grpclbBuilder.Name())
   318  	defer balancer.Register(grpclbBuilder)
   319  
   320  	backends, cleanup := startBackendsForBalancerSwitch(t)
   321  	defer cleanup()
   322  	addrs := stubBackendsToResolverAddrs(backends)
   323  
   324  	r := manual.NewBuilderWithScheme("whatever")
   325  	cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
   326  	if err != nil {
   327  		t.Fatalf("grpc.Dial() failed: %v", err)
   328  	}
   329  	defer cc.Close()
   330  
   331  	// Push a resolver update which contains a bunch of stub server backends and a
   332  	// grpclb server address. The latter should get the ClientConn to try and
   333  	// apply the grpclb policy. But since grpclb is not registered, it should
   334  	// fallback to the default LB policy which is pick_first. The ClientConn is
   335  	// also expected to filter out the grpclb address when sending the addresses
   336  	// list fo pick_first.
   337  	grpclbAddr := []resolver.Address{{Addr: "non-existent-grpclb-server-address"}}
   338  	grpclbConfig := parseServiceConfig(t, r, `{"loadBalancingPolicy": "grpclb"}`)
   339  	state := resolver.State{ServiceConfig: grpclbConfig, Addresses: addrs}
   340  	r.UpdateState(grpclbstate.Set(state, &grpclbstate.State{BalancerAddresses: grpclbAddr}))
   341  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   342  	defer cancel()
   343  	if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
   344  		t.Fatal(err)
   345  	}
   346  
   347  	// Push a resolver update with the same addresses, but with a service config
   348  	// specifying "round_robin". The ClientConn is expected to filter out the
   349  	// grpclb address when sending the addresses list to round_robin.
   350  	r.UpdateState(resolver.State{
   351  		Addresses:     addrs,
   352  		ServiceConfig: parseServiceConfig(t, r, rrServiceConfig),
   353  	})
   354  	client := testgrpc.NewTestServiceClient(cc)
   355  	if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs); err != nil {
   356  		t.Fatal(err)
   357  	}
   358  }
   359  
   360  // TestBalancerSwitch_OldBalancerCallsShutdownInClose tests the scenario where
   361  // the balancer being switched out calls Shutdown() in its Close()
   362  // method. Verifies that this sequence of calls doesn't lead to a deadlock.
   363  func (s) TestBalancerSwitch_OldBalancerCallsShutdownInClose(t *testing.T) {
   364  	// Register a stub balancer which calls Shutdown() from its Close().
   365  	scChan := make(chan balancer.SubConn, 1)
   366  	uccsCalled := make(chan struct{}, 1)
   367  	stub.Register(t.Name(), stub.BalancerFuncs{
   368  		UpdateClientConnState: func(data *stub.BalancerData, ccs balancer.ClientConnState) error {
   369  			sc, err := data.ClientConn.NewSubConn(ccs.ResolverState.Addresses, balancer.NewSubConnOptions{})
   370  			if err != nil {
   371  				t.Errorf("failed to create subConn: %v", err)
   372  			}
   373  			scChan <- sc
   374  			close(uccsCalled)
   375  			return nil
   376  		},
   377  		Close: func(data *stub.BalancerData) {
   378  			(<-scChan).Shutdown()
   379  		},
   380  	})
   381  
   382  	r := manual.NewBuilderWithScheme("whatever")
   383  	cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
   384  	if err != nil {
   385  		t.Fatalf("grpc.Dial() failed: %v", err)
   386  	}
   387  	defer cc.Close()
   388  
   389  	// Push a resolver update specifying our stub balancer as the LB policy.
   390  	scpr := parseServiceConfig(t, r, fmt.Sprintf(`{"loadBalancingPolicy": "%v"}`, t.Name()))
   391  	r.UpdateState(resolver.State{
   392  		Addresses:     []resolver.Address{{Addr: "dummy-address"}},
   393  		ServiceConfig: scpr,
   394  	})
   395  
   396  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   397  	defer cancel()
   398  	select {
   399  	case <-ctx.Done():
   400  		t.Fatalf("timeout waiting for UpdateClientConnState to be called: %v", ctx.Err())
   401  	case <-uccsCalled:
   402  	}
   403  
   404  	// The following service config update will switch balancer from our stub
   405  	// balancer to pick_first. The former will be closed, which will call
   406  	// sc.Shutdown() inline.
   407  	//
   408  	// This is to make sure the sc.Shutdown() from Close() doesn't cause a
   409  	// deadlock (e.g. trying to grab a mutex while it's already locked).
   410  	//
   411  	// Do it in a goroutine so this test will fail with a helpful message
   412  	// (though the goroutine will still leak).
   413  	done := make(chan struct{})
   414  	go func() {
   415  		r.UpdateState(resolver.State{
   416  			Addresses:     []resolver.Address{{Addr: "dummy-address"}},
   417  			ServiceConfig: parseServiceConfig(t, r, pickFirstServiceConfig),
   418  		})
   419  		close(done)
   420  	}()
   421  
   422  	select {
   423  	case <-ctx.Done():
   424  		t.Fatalf("timeout waiting for resolver.UpdateState to finish: %v", ctx.Err())
   425  	case <-done:
   426  	}
   427  }
   428  
   429  // TestBalancerSwitch_Graceful tests the graceful switching of LB policies. It
   430  // starts off by configuring "round_robin" on the channel and ensures that RPCs
   431  // are successful. Then, it switches to a stub balancer which does not report a
   432  // picker until instructed by the test do to so. At this point, the test
   433  // verifies that RPCs are still successful using the old balancer. Then the test
   434  // asks the new balancer to report a healthy picker and the test verifies that
   435  // the RPCs get routed using the picker reported by the new balancer.
   436  func (s) TestBalancerSwitch_Graceful(t *testing.T) {
   437  	backends, cleanup := startBackendsForBalancerSwitch(t)
   438  	defer cleanup()
   439  	addrs := stubBackendsToResolverAddrs(backends)
   440  
   441  	r := manual.NewBuilderWithScheme("whatever")
   442  	cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
   443  	if err != nil {
   444  		t.Fatalf("grpc.Dial() failed: %v", err)
   445  	}
   446  	defer cc.Close()
   447  
   448  	// Push a resolver update with the service config specifying "round_robin".
   449  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   450  	defer cancel()
   451  	r.UpdateState(resolver.State{
   452  		Addresses:     addrs[1:],
   453  		ServiceConfig: parseServiceConfig(t, r, rrServiceConfig),
   454  	})
   455  	client := testgrpc.NewTestServiceClient(cc)
   456  	if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[1:]); err != nil {
   457  		t.Fatal(err)
   458  	}
   459  
   460  	// Register a stub balancer which uses a "pick_first" balancer underneath and
   461  	// signals on a channel when it receives ClientConn updates. But it does not
   462  	// forward the ccUpdate to the underlying "pick_first" balancer until the test
   463  	// asks it to do so. This allows us to test the graceful switch functionality.
   464  	// Until the test asks the stub balancer to forward the ccUpdate, RPCs should
   465  	// get routed to the old balancer. And once the test gives the go ahead, RPCs
   466  	// should get routed to the new balancer.
   467  	ccUpdateCh := make(chan struct{})
   468  	waitToProceed := make(chan struct{})
   469  	stub.Register(t.Name(), stub.BalancerFuncs{
   470  		Init: func(bd *stub.BalancerData) {
   471  			pf := balancer.Get(grpc.PickFirstBalancerName)
   472  			bd.Data = pf.Build(bd.ClientConn, bd.BuildOptions)
   473  		},
   474  		UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
   475  			bal := bd.Data.(balancer.Balancer)
   476  			close(ccUpdateCh)
   477  			go func() {
   478  				<-waitToProceed
   479  				bal.UpdateClientConnState(ccs)
   480  			}()
   481  			return nil
   482  		},
   483  	})
   484  
   485  	// Push a resolver update with the service config specifying our stub
   486  	// balancer. We should see a trace event for this balancer switch. But RPCs
   487  	// should still be routed to the old balancer since our stub balancer does not
   488  	// report a ready picker until we ask it to do so.
   489  	r.UpdateState(resolver.State{
   490  		Addresses:     addrs[:1],
   491  		ServiceConfig: r.CC.ParseServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%v": {}}]}`, t.Name())),
   492  	})
   493  	select {
   494  	case <-ctx.Done():
   495  		t.Fatal("Timeout when waiting for a ClientConnState update on the new balancer")
   496  	case <-ccUpdateCh:
   497  	}
   498  	if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs[1:]); err != nil {
   499  		t.Fatal(err)
   500  	}
   501  
   502  	// Ask our stub balancer to forward the earlier received ccUpdate to the
   503  	// underlying "pick_first" balancer which will result in a healthy picker
   504  	// being reported to the channel. RPCs should start using the new balancer.
   505  	close(waitToProceed)
   506  	if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
   507  		t.Fatal(err)
   508  	}
   509  }
   510  

View as plain text