...

Source file src/google.golang.org/grpc/balancer/weightedtarget/weightedtarget_test.go

Documentation: google.golang.org/grpc/balancer/weightedtarget

     1  /*
     2   *
     3   * Copyright 2020 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  package weightedtarget
    20  
    21  import (
    22  	"context"
    23  	"encoding/json"
    24  	"errors"
    25  	"fmt"
    26  	"strings"
    27  	"testing"
    28  	"time"
    29  
    30  	"github.com/google/go-cmp/cmp"
    31  	"google.golang.org/grpc/attributes"
    32  	"google.golang.org/grpc/balancer"
    33  	"google.golang.org/grpc/balancer/roundrobin"
    34  	"google.golang.org/grpc/connectivity"
    35  	"google.golang.org/grpc/internal/balancer/stub"
    36  	"google.golang.org/grpc/internal/grpctest"
    37  	"google.golang.org/grpc/internal/hierarchy"
    38  	"google.golang.org/grpc/internal/testutils"
    39  	"google.golang.org/grpc/resolver"
    40  	"google.golang.org/grpc/serviceconfig"
    41  )
    42  
    43  const (
    44  	defaultTestTimeout = 5 * time.Second
    45  )
    46  
    47  type s struct {
    48  	grpctest.Tester
    49  }
    50  
    51  func Test(t *testing.T) {
    52  	grpctest.RunSubTests(t, s{})
    53  }
    54  
    55  type testConfigBalancerBuilder struct {
    56  	balancer.Builder
    57  }
    58  
    59  func newTestConfigBalancerBuilder() *testConfigBalancerBuilder {
    60  	return &testConfigBalancerBuilder{
    61  		Builder: balancer.Get(roundrobin.Name),
    62  	}
    63  }
    64  
    65  // pickAndCheckError returns a function which takes a picker, invokes the Pick() method
    66  // multiple times and ensures that the error returned by the picker matches the provided error.
    67  func pickAndCheckError(want error) func(balancer.Picker) error {
    68  	const rpcCount = 5
    69  	return func(p balancer.Picker) error {
    70  		for i := 0; i < rpcCount; i++ {
    71  			if _, err := p.Pick(balancer.PickInfo{}); err == nil || !strings.Contains(err.Error(), want.Error()) {
    72  				return fmt.Errorf("picker.Pick() returned error: %v, want: %v", err, want)
    73  			}
    74  		}
    75  		return nil
    76  	}
    77  }
    78  
    79  func (t *testConfigBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
    80  	rr := t.Builder.Build(cc, opts)
    81  	return &testConfigBalancer{
    82  		Balancer: rr,
    83  	}
    84  }
    85  
    86  const testConfigBalancerName = "test_config_balancer"
    87  
    88  func (t *testConfigBalancerBuilder) Name() string {
    89  	return testConfigBalancerName
    90  }
    91  
    92  type stringBalancerConfig struct {
    93  	serviceconfig.LoadBalancingConfig
    94  	configStr string
    95  }
    96  
    97  func (t *testConfigBalancerBuilder) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
    98  	var cfg string
    99  	if err := json.Unmarshal(c, &cfg); err != nil {
   100  		return nil, fmt.Errorf("failed to unmarshal config in %q: %v", testConfigBalancerName, err)
   101  	}
   102  	return stringBalancerConfig{configStr: cfg}, nil
   103  }
   104  
   105  // testConfigBalancer is a roundrobin balancer, but it takes the balancer config
   106  // string and adds it as an address attribute to the backend addresses.
   107  type testConfigBalancer struct {
   108  	balancer.Balancer
   109  }
   110  
   111  // configKey is the type used as the key to store balancer config in the
   112  // Attributes field of resolver.Address.
   113  type configKey struct{}
   114  
   115  func setConfigKey(addr resolver.Address, config string) resolver.Address {
   116  	addr.Attributes = addr.Attributes.WithValue(configKey{}, config)
   117  	return addr
   118  }
   119  
   120  func getConfigKey(attr *attributes.Attributes) (string, bool) {
   121  	v := attr.Value(configKey{})
   122  	name, ok := v.(string)
   123  	return name, ok
   124  }
   125  
   126  func (b *testConfigBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
   127  	c, ok := s.BalancerConfig.(stringBalancerConfig)
   128  	if !ok {
   129  		return fmt.Errorf("unexpected balancer config with type %T", s.BalancerConfig)
   130  	}
   131  
   132  	addrsWithAttr := make([]resolver.Address, len(s.ResolverState.Addresses))
   133  	for i, addr := range s.ResolverState.Addresses {
   134  		addrsWithAttr[i] = setConfigKey(addr, c.configStr)
   135  	}
   136  	s.BalancerConfig = nil
   137  	s.ResolverState.Addresses = addrsWithAttr
   138  	return b.Balancer.UpdateClientConnState(s)
   139  }
   140  
   141  func (b *testConfigBalancer) Close() {
   142  	b.Balancer.Close()
   143  }
   144  
   145  var (
   146  	wtbBuilder          balancer.Builder
   147  	wtbParser           balancer.ConfigParser
   148  	testBackendAddrStrs []string
   149  )
   150  
   151  const testBackendAddrsCount = 12
   152  
   153  func init() {
   154  	balancer.Register(newTestConfigBalancerBuilder())
   155  	for i := 0; i < testBackendAddrsCount; i++ {
   156  		testBackendAddrStrs = append(testBackendAddrStrs, fmt.Sprintf("%d.%d.%d.%d:%d", i, i, i, i, i))
   157  	}
   158  	wtbBuilder = balancer.Get(Name)
   159  	wtbParser = wtbBuilder.(balancer.ConfigParser)
   160  
   161  	NewRandomWRR = testutils.NewTestWRR
   162  }
   163  
   164  // TestWeightedTarget covers the cases that a sub-balancer is added and a
   165  // sub-balancer is removed. It verifies that the addresses and balancer configs
   166  // are forwarded to the right sub-balancer. This test is intended to test the
   167  // glue code in weighted_target. It also tests an empty target config update,
   168  // which should trigger a transient failure state update.
   169  func (s) TestWeightedTarget(t *testing.T) {
   170  	cc := testutils.NewBalancerClientConn(t)
   171  	wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
   172  	defer wtb.Close()
   173  
   174  	// Start with "cluster_1: round_robin".
   175  	config1, err := wtbParser.ParseConfig([]byte(`
   176  {
   177    "targets": {
   178      "cluster_1": {
   179        "weight":1,
   180        "childPolicy": [{"round_robin": ""}]
   181      }
   182    }
   183  }`))
   184  	if err != nil {
   185  		t.Fatalf("failed to parse balancer config: %v", err)
   186  	}
   187  
   188  	// Send the config, and an address with hierarchy path ["cluster_1"].
   189  	addr1 := resolver.Address{Addr: testBackendAddrStrs[1], Attributes: nil}
   190  	if err := wtb.UpdateClientConnState(balancer.ClientConnState{
   191  		ResolverState:  resolver.State{Addresses: []resolver.Address{hierarchy.Set(addr1, []string{"cluster_1"})}},
   192  		BalancerConfig: config1,
   193  	}); err != nil {
   194  		t.Fatalf("failed to update ClientConn state: %v", err)
   195  	}
   196  	verifyAddressInNewSubConn(t, cc, addr1)
   197  
   198  	// Send subconn state change.
   199  	sc1 := <-cc.NewSubConnCh
   200  	sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
   201  	<-cc.NewPickerCh
   202  	sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
   203  	p := <-cc.NewPickerCh
   204  
   205  	// Test pick with one backend.
   206  	for i := 0; i < 5; i++ {
   207  		gotSCSt, _ := p.Pick(balancer.PickInfo{})
   208  		if gotSCSt.SubConn != sc1 {
   209  			t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc1)
   210  		}
   211  	}
   212  
   213  	// Remove cluster_1, and add "cluster_2: test_config_balancer". The
   214  	// test_config_balancer adds an address attribute whose value is set to the
   215  	// config that is passed to it.
   216  	config2, err := wtbParser.ParseConfig([]byte(`
   217  {
   218    "targets": {
   219      "cluster_2": {
   220         "weight":1,
   221         "childPolicy": [{"test_config_balancer": "cluster_2"}]
   222      }
   223    }
   224  }`))
   225  	if err != nil {
   226  		t.Fatalf("failed to parse balancer config: %v", err)
   227  	}
   228  
   229  	// Send the config, and one address with hierarchy path "cluster_2".
   230  	addr2 := resolver.Address{Addr: testBackendAddrStrs[2], Attributes: nil}
   231  	if err := wtb.UpdateClientConnState(balancer.ClientConnState{
   232  		ResolverState:  resolver.State{Addresses: []resolver.Address{hierarchy.Set(addr2, []string{"cluster_2"})}},
   233  		BalancerConfig: config2,
   234  	}); err != nil {
   235  		t.Fatalf("failed to update ClientConn state: %v", err)
   236  	}
   237  
   238  	// Expect a new subConn from the test_config_balancer which has an address
   239  	// attribute set to the config that was passed to it.
   240  	verifyAddressInNewSubConn(t, cc, setConfigKey(addr2, "cluster_2"))
   241  
   242  	// The subconn for cluster_1 should be shut down.
   243  	scShutdown := <-cc.ShutdownSubConnCh
   244  	if scShutdown != sc1 {
   245  		t.Fatalf("ShutdownSubConn, want %v, got %v", sc1, scShutdown)
   246  	}
   247  	scShutdown.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Shutdown})
   248  
   249  	sc2 := <-cc.NewSubConnCh
   250  	sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
   251  	<-cc.NewPickerCh
   252  	sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
   253  	p = <-cc.NewPickerCh
   254  
   255  	// Test pick with one backend.
   256  	for i := 0; i < 5; i++ {
   257  		gotSCSt, _ := p.Pick(balancer.PickInfo{})
   258  		if gotSCSt.SubConn != sc2 {
   259  			t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc2)
   260  		}
   261  	}
   262  
   263  	// Replace child policy of "cluster_1" to "round_robin".
   264  	config3, err := wtbParser.ParseConfig([]byte(`
   265  {
   266    "targets": {
   267      "cluster_2": {
   268        "weight":1,
   269        "childPolicy": [{"round_robin": ""}]
   270      }
   271    }
   272  }`))
   273  	if err != nil {
   274  		t.Fatalf("failed to parse balancer config: %v", err)
   275  	}
   276  
   277  	// Send the config, and an address with hierarchy path ["cluster_2"].
   278  	addr3 := resolver.Address{Addr: testBackendAddrStrs[3], Attributes: nil}
   279  	if err := wtb.UpdateClientConnState(balancer.ClientConnState{
   280  		ResolverState:  resolver.State{Addresses: []resolver.Address{hierarchy.Set(addr3, []string{"cluster_2"})}},
   281  		BalancerConfig: config3,
   282  	}); err != nil {
   283  		t.Fatalf("failed to update ClientConn state: %v", err)
   284  	}
   285  	verifyAddressInNewSubConn(t, cc, addr3)
   286  
   287  	// The subconn from the test_config_balancer should be shut down.
   288  	scShutdown = <-cc.ShutdownSubConnCh
   289  	if scShutdown != sc2 {
   290  		t.Fatalf("ShutdownSubConn, want %v, got %v", sc1, scShutdown)
   291  	}
   292  	scShutdown.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Shutdown})
   293  
   294  	// Send subconn state change.
   295  	sc3 := <-cc.NewSubConnCh
   296  	sc3.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
   297  	<-cc.NewPickerCh
   298  	sc3.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
   299  	p = <-cc.NewPickerCh
   300  
   301  	// Test pick with one backend.
   302  	for i := 0; i < 5; i++ {
   303  		gotSCSt, _ := p.Pick(balancer.PickInfo{})
   304  		if gotSCSt.SubConn != sc3 {
   305  			t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc3)
   306  		}
   307  	}
   308  	// Update the Weighted Target Balancer with an empty address list and no
   309  	// targets. This should cause a Transient Failure State update to the Client
   310  	// Conn.
   311  	emptyConfig, err := wtbParser.ParseConfig([]byte(`{}`))
   312  	if err != nil {
   313  		t.Fatalf("Failed to parse balancer config: %v", err)
   314  	}
   315  	if err := wtb.UpdateClientConnState(balancer.ClientConnState{
   316  		ResolverState:  resolver.State{},
   317  		BalancerConfig: emptyConfig,
   318  	}); err != nil {
   319  		t.Fatalf("Failed to update ClientConn state: %v", err)
   320  	}
   321  
   322  	state := <-cc.NewStateCh
   323  	if state != connectivity.TransientFailure {
   324  		t.Fatalf("Empty target update should have triggered a TF state update, got: %v", state)
   325  	}
   326  }
   327  
   328  // TestWeightedTarget_OneSubBalancer_AddRemoveBackend tests the case where we
   329  // have a weighted target balancer will one sub-balancer, and we add and remove
   330  // backends from the subBalancer.
   331  func (s) TestWeightedTarget_OneSubBalancer_AddRemoveBackend(t *testing.T) {
   332  	cc := testutils.NewBalancerClientConn(t)
   333  	wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
   334  	defer wtb.Close()
   335  
   336  	// Start with "cluster_1: round_robin".
   337  	config, err := wtbParser.ParseConfig([]byte(`
   338  {
   339    "targets": {
   340      "cluster_1": {
   341        "weight":1,
   342        "childPolicy": [{"round_robin": ""}]
   343      }
   344    }
   345  }`))
   346  	if err != nil {
   347  		t.Fatalf("failed to parse balancer config: %v", err)
   348  	}
   349  
   350  	// Send the config, and an address with hierarchy path ["cluster_1"].
   351  	addr1 := resolver.Address{Addr: testBackendAddrStrs[1]}
   352  	if err := wtb.UpdateClientConnState(balancer.ClientConnState{
   353  		ResolverState:  resolver.State{Addresses: []resolver.Address{hierarchy.Set(addr1, []string{"cluster_1"})}},
   354  		BalancerConfig: config,
   355  	}); err != nil {
   356  		t.Fatalf("failed to update ClientConn state: %v", err)
   357  	}
   358  	verifyAddressInNewSubConn(t, cc, addr1)
   359  
   360  	// Expect one SubConn, and move it to READY.
   361  	sc1 := <-cc.NewSubConnCh
   362  	sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
   363  	<-cc.NewPickerCh
   364  	sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
   365  	p := <-cc.NewPickerCh
   366  
   367  	// Test pick with one backend.
   368  	for i := 0; i < 5; i++ {
   369  		gotSCSt, _ := p.Pick(balancer.PickInfo{})
   370  		if gotSCSt.SubConn != sc1 {
   371  			t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc1)
   372  		}
   373  	}
   374  
   375  	// Send two addresses.
   376  	addr2 := resolver.Address{Addr: testBackendAddrStrs[2]}
   377  	if err := wtb.UpdateClientConnState(balancer.ClientConnState{
   378  		ResolverState: resolver.State{Addresses: []resolver.Address{
   379  			hierarchy.Set(addr1, []string{"cluster_1"}),
   380  			hierarchy.Set(addr2, []string{"cluster_1"}),
   381  		}},
   382  		BalancerConfig: config,
   383  	}); err != nil {
   384  		t.Fatalf("failed to update ClientConn state: %v", err)
   385  	}
   386  	verifyAddressInNewSubConn(t, cc, addr2)
   387  
   388  	// Expect one new SubConn, and move it to READY.
   389  	sc2 := <-cc.NewSubConnCh
   390  	// Update the SubConn to become READY.
   391  	sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
   392  	<-cc.NewPickerCh
   393  	sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
   394  	p = <-cc.NewPickerCh
   395  
   396  	// Test round robin pick.
   397  	want := []balancer.SubConn{sc1, sc2}
   398  	if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p)); err != nil {
   399  		t.Fatalf("want %v, got %v", want, err)
   400  	}
   401  
   402  	// Remove the first address.
   403  	if err := wtb.UpdateClientConnState(balancer.ClientConnState{
   404  		ResolverState:  resolver.State{Addresses: []resolver.Address{hierarchy.Set(addr2, []string{"cluster_1"})}},
   405  		BalancerConfig: config,
   406  	}); err != nil {
   407  		t.Fatalf("failed to update ClientConn state: %v", err)
   408  	}
   409  
   410  	// Expect one SubConn to be shut down.
   411  	scShutdown := <-cc.ShutdownSubConnCh
   412  	if scShutdown != sc1 {
   413  		t.Fatalf("ShutdownSubConn, want %v, got %v", sc1, scShutdown)
   414  	}
   415  	scShutdown.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Shutdown})
   416  	p = <-cc.NewPickerCh
   417  
   418  	// Test pick with only the second SubConn.
   419  	for i := 0; i < 5; i++ {
   420  		gotSC, _ := p.Pick(balancer.PickInfo{})
   421  		if gotSC.SubConn != sc2 {
   422  			t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSC, sc2)
   423  		}
   424  	}
   425  }
   426  
   427  // TestWeightedTarget_TwoSubBalancers_OneBackend tests the case where we have a
   428  // weighted target balancer with two sub-balancers, each with one backend.
   429  func (s) TestWeightedTarget_TwoSubBalancers_OneBackend(t *testing.T) {
   430  	cc := testutils.NewBalancerClientConn(t)
   431  	wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
   432  	defer wtb.Close()
   433  
   434  	// Start with "cluster_1: test_config_balancer, cluster_2: test_config_balancer".
   435  	config, err := wtbParser.ParseConfig([]byte(`
   436  {
   437    "targets": {
   438      "cluster_1": {
   439        "weight":1,
   440        "childPolicy": [{"test_config_balancer": "cluster_1"}]
   441      },
   442      "cluster_2": {
   443        "weight":1,
   444        "childPolicy": [{"test_config_balancer": "cluster_2"}]
   445      }
   446    }
   447  }`))
   448  	if err != nil {
   449  		t.Fatalf("failed to parse balancer config: %v", err)
   450  	}
   451  
   452  	// Send the config with one address for each cluster.
   453  	addr1 := resolver.Address{Addr: testBackendAddrStrs[1]}
   454  	addr2 := resolver.Address{Addr: testBackendAddrStrs[2]}
   455  	if err := wtb.UpdateClientConnState(balancer.ClientConnState{
   456  		ResolverState: resolver.State{Addresses: []resolver.Address{
   457  			hierarchy.Set(addr1, []string{"cluster_1"}),
   458  			hierarchy.Set(addr2, []string{"cluster_2"}),
   459  		}},
   460  		BalancerConfig: config,
   461  	}); err != nil {
   462  		t.Fatalf("failed to update ClientConn state: %v", err)
   463  	}
   464  
   465  	scs := waitForNewSubConns(t, cc, 2)
   466  	verifySubConnAddrs(t, scs, map[string][]resolver.Address{
   467  		"cluster_1": {addr1},
   468  		"cluster_2": {addr2},
   469  	})
   470  
   471  	// We expect a single subConn on each subBalancer.
   472  	sc1 := scs["cluster_1"][0].sc.(*testutils.TestSubConn)
   473  	sc2 := scs["cluster_2"][0].sc.(*testutils.TestSubConn)
   474  
   475  	// Send state changes for both SubConns, and wait for the picker.
   476  	sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
   477  	<-cc.NewPickerCh
   478  	sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
   479  	<-cc.NewPickerCh
   480  	sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
   481  	<-cc.NewPickerCh
   482  	sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
   483  	p := <-cc.NewPickerCh
   484  
   485  	// Test roundrobin on the last picker.
   486  	want := []balancer.SubConn{sc1, sc2}
   487  	if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p)); err != nil {
   488  		t.Fatalf("want %v, got %v", want, err)
   489  	}
   490  }
   491  
   492  // TestWeightedTarget_TwoSubBalancers_MoreBackends tests the case where we have
   493  // a weighted target balancer with two sub-balancers, each with more than one
   494  // backend.
   495  func (s) TestWeightedTarget_TwoSubBalancers_MoreBackends(t *testing.T) {
   496  	cc := testutils.NewBalancerClientConn(t)
   497  	wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
   498  	defer wtb.Close()
   499  
   500  	// Start with "cluster_1: round_robin, cluster_2: round_robin".
   501  	config, err := wtbParser.ParseConfig([]byte(`
   502  {
   503    "targets": {
   504      "cluster_1": {
   505        "weight":1,
   506        "childPolicy": [{"test_config_balancer": "cluster_1"}]
   507      },
   508      "cluster_2": {
   509        "weight":1,
   510        "childPolicy": [{"test_config_balancer": "cluster_2"}]
   511      }
   512    }
   513  }`))
   514  	if err != nil {
   515  		t.Fatalf("failed to parse balancer config: %v", err)
   516  	}
   517  
   518  	// Send the config with two backends for each cluster.
   519  	addr1 := resolver.Address{Addr: testBackendAddrStrs[1]}
   520  	addr2 := resolver.Address{Addr: testBackendAddrStrs[2]}
   521  	addr3 := resolver.Address{Addr: testBackendAddrStrs[3]}
   522  	addr4 := resolver.Address{Addr: testBackendAddrStrs[4]}
   523  	if err := wtb.UpdateClientConnState(balancer.ClientConnState{
   524  		ResolverState: resolver.State{Addresses: []resolver.Address{
   525  			hierarchy.Set(addr1, []string{"cluster_1"}),
   526  			hierarchy.Set(addr2, []string{"cluster_1"}),
   527  			hierarchy.Set(addr3, []string{"cluster_2"}),
   528  			hierarchy.Set(addr4, []string{"cluster_2"}),
   529  		}},
   530  		BalancerConfig: config,
   531  	}); err != nil {
   532  		t.Fatalf("failed to update ClientConn state: %v", err)
   533  	}
   534  
   535  	scs := waitForNewSubConns(t, cc, 4)
   536  	verifySubConnAddrs(t, scs, map[string][]resolver.Address{
   537  		"cluster_1": {addr1, addr2},
   538  		"cluster_2": {addr3, addr4},
   539  	})
   540  
   541  	// We expect two subConns on each subBalancer.
   542  	sc1 := scs["cluster_1"][0].sc.(*testutils.TestSubConn)
   543  	sc2 := scs["cluster_1"][1].sc.(*testutils.TestSubConn)
   544  	sc3 := scs["cluster_2"][0].sc.(*testutils.TestSubConn)
   545  	sc4 := scs["cluster_2"][1].sc.(*testutils.TestSubConn)
   546  
   547  	// Send state changes for all SubConns, and wait for the picker.
   548  	sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
   549  	<-cc.NewPickerCh
   550  	sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
   551  	<-cc.NewPickerCh
   552  	sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
   553  	<-cc.NewPickerCh
   554  	sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
   555  	<-cc.NewPickerCh
   556  	sc3.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
   557  	<-cc.NewPickerCh
   558  	sc3.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
   559  	<-cc.NewPickerCh
   560  	sc4.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
   561  	<-cc.NewPickerCh
   562  	sc4.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
   563  	p := <-cc.NewPickerCh
   564  
   565  	// Test roundrobin on the last picker. RPCs should be sent equally to all
   566  	// backends.
   567  	want := []balancer.SubConn{sc1, sc2, sc3, sc4}
   568  	if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p)); err != nil {
   569  		t.Fatalf("want %v, got %v", want, err)
   570  	}
   571  
   572  	// Turn sc2's connection down, should be RR between balancers.
   573  	sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
   574  	p = <-cc.NewPickerCh
   575  	want = []balancer.SubConn{sc1, sc1, sc3, sc4}
   576  	if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p)); err != nil {
   577  		t.Fatalf("want %v, got %v", want, err)
   578  	}
   579  
   580  	// Shut down subConn corresponding to addr3.
   581  	if err := wtb.UpdateClientConnState(balancer.ClientConnState{
   582  		ResolverState: resolver.State{Addresses: []resolver.Address{
   583  			hierarchy.Set(addr1, []string{"cluster_1"}),
   584  			hierarchy.Set(addr2, []string{"cluster_1"}),
   585  			hierarchy.Set(addr4, []string{"cluster_2"}),
   586  		}},
   587  		BalancerConfig: config,
   588  	}); err != nil {
   589  		t.Fatalf("failed to update ClientConn state: %v", err)
   590  	}
   591  	scShutdown := <-cc.ShutdownSubConnCh
   592  	if scShutdown != sc3 {
   593  		t.Fatalf("ShutdownSubConn, want %v, got %v", sc3, scShutdown)
   594  	}
   595  	scShutdown.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Shutdown})
   596  	p = <-cc.NewPickerCh
   597  	want = []balancer.SubConn{sc1, sc4}
   598  	if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p)); err != nil {
   599  		t.Fatalf("want %v, got %v", want, err)
   600  	}
   601  
   602  	// Turn sc1's connection down.
   603  	wantSubConnErr := errors.New("subConn connection error")
   604  	sc1.UpdateState(balancer.SubConnState{
   605  		ConnectivityState: connectivity.TransientFailure,
   606  		ConnectionError:   wantSubConnErr,
   607  	})
   608  	p = <-cc.NewPickerCh
   609  	want = []balancer.SubConn{sc4}
   610  	if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p)); err != nil {
   611  		t.Fatalf("want %v, got %v", want, err)
   612  	}
   613  
   614  	// Turn last connection to connecting.
   615  	sc4.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
   616  	p = <-cc.NewPickerCh
   617  	for i := 0; i < 5; i++ {
   618  		if _, err := p.Pick(balancer.PickInfo{}); err != balancer.ErrNoSubConnAvailable {
   619  			t.Fatalf("want pick error %v, got %v", balancer.ErrNoSubConnAvailable, err)
   620  		}
   621  	}
   622  
   623  	// Turn all connections down.
   624  	sc4.UpdateState(balancer.SubConnState{
   625  		ConnectivityState: connectivity.TransientFailure,
   626  		ConnectionError:   wantSubConnErr,
   627  	})
   628  
   629  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   630  	defer cancel()
   631  	if err := cc.WaitForPicker(ctx, pickAndCheckError(wantSubConnErr)); err != nil {
   632  		t.Fatal(err)
   633  	}
   634  }
   635  
   636  // TestWeightedTarget_TwoSubBalancers_DifferentWeight_MoreBackends tests the
   637  // case where we have a weighted target balancer with two sub-balancers of
   638  // differing weights.
   639  func (s) TestWeightedTarget_TwoSubBalancers_DifferentWeight_MoreBackends(t *testing.T) {
   640  	cc := testutils.NewBalancerClientConn(t)
   641  	wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
   642  	defer wtb.Close()
   643  
   644  	// Start with two subBalancers, one with twice the weight of the other.
   645  	config, err := wtbParser.ParseConfig([]byte(`
   646  {
   647    "targets": {
   648      "cluster_1": {
   649        "weight": 2,
   650        "childPolicy": [{"test_config_balancer": "cluster_1"}]
   651      },
   652      "cluster_2": {
   653        "weight": 1,
   654        "childPolicy": [{"test_config_balancer": "cluster_2"}]
   655      }
   656    }
   657  }`))
   658  	if err != nil {
   659  		t.Fatalf("failed to parse balancer config: %v", err)
   660  	}
   661  
   662  	// Send the config with two backends for each cluster.
   663  	addr1 := resolver.Address{Addr: testBackendAddrStrs[1]}
   664  	addr2 := resolver.Address{Addr: testBackendAddrStrs[2]}
   665  	addr3 := resolver.Address{Addr: testBackendAddrStrs[3]}
   666  	addr4 := resolver.Address{Addr: testBackendAddrStrs[4]}
   667  	if err := wtb.UpdateClientConnState(balancer.ClientConnState{
   668  		ResolverState: resolver.State{Addresses: []resolver.Address{
   669  			hierarchy.Set(addr1, []string{"cluster_1"}),
   670  			hierarchy.Set(addr2, []string{"cluster_1"}),
   671  			hierarchy.Set(addr3, []string{"cluster_2"}),
   672  			hierarchy.Set(addr4, []string{"cluster_2"}),
   673  		}},
   674  		BalancerConfig: config,
   675  	}); err != nil {
   676  		t.Fatalf("failed to update ClientConn state: %v", err)
   677  	}
   678  
   679  	scs := waitForNewSubConns(t, cc, 4)
   680  	verifySubConnAddrs(t, scs, map[string][]resolver.Address{
   681  		"cluster_1": {addr1, addr2},
   682  		"cluster_2": {addr3, addr4},
   683  	})
   684  
   685  	// We expect two subConns on each subBalancer.
   686  	sc1 := scs["cluster_1"][0].sc.(*testutils.TestSubConn)
   687  	sc2 := scs["cluster_1"][1].sc.(*testutils.TestSubConn)
   688  	sc3 := scs["cluster_2"][0].sc.(*testutils.TestSubConn)
   689  	sc4 := scs["cluster_2"][1].sc.(*testutils.TestSubConn)
   690  
   691  	// Send state changes for all SubConns, and wait for the picker.
   692  	sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
   693  	<-cc.NewPickerCh
   694  	sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
   695  	<-cc.NewPickerCh
   696  	sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
   697  	<-cc.NewPickerCh
   698  	sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
   699  	<-cc.NewPickerCh
   700  	sc3.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
   701  	<-cc.NewPickerCh
   702  	sc3.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
   703  	<-cc.NewPickerCh
   704  	sc4.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
   705  	<-cc.NewPickerCh
   706  	sc4.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
   707  	p := <-cc.NewPickerCh
   708  
   709  	// Test roundrobin on the last picker. Twice the number of RPCs should be
   710  	// sent to cluster_1 when compared to cluster_2.
   711  	want := []balancer.SubConn{sc1, sc1, sc2, sc2, sc3, sc4}
   712  	if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p)); err != nil {
   713  		t.Fatalf("want %v, got %v", want, err)
   714  	}
   715  }
   716  
   717  // TestWeightedTarget_ThreeSubBalancers_RemoveBalancer tests the case where we
   718  // have a weighted target balancer with three sub-balancers and we remove one of
   719  // the subBalancers.
   720  func (s) TestWeightedTarget_ThreeSubBalancers_RemoveBalancer(t *testing.T) {
   721  	cc := testutils.NewBalancerClientConn(t)
   722  	wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
   723  	defer wtb.Close()
   724  
   725  	// Start with two subBalancers, one with twice the weight of the other.
   726  	config, err := wtbParser.ParseConfig([]byte(`
   727  {
   728    "targets": {
   729      "cluster_1": {
   730        "weight": 1,
   731        "childPolicy": [{"test_config_balancer": "cluster_1"}]
   732      },
   733      "cluster_2": {
   734        "weight": 1,
   735        "childPolicy": [{"test_config_balancer": "cluster_2"}]
   736      },
   737      "cluster_3": {
   738        "weight": 1,
   739        "childPolicy": [{"test_config_balancer": "cluster_3"}]
   740      }
   741    }
   742  }`))
   743  	if err != nil {
   744  		t.Fatalf("failed to parse balancer config: %v", err)
   745  	}
   746  
   747  	// Send the config with one backend for each cluster.
   748  	addr1 := resolver.Address{Addr: testBackendAddrStrs[1]}
   749  	addr2 := resolver.Address{Addr: testBackendAddrStrs[2]}
   750  	addr3 := resolver.Address{Addr: testBackendAddrStrs[3]}
   751  	if err := wtb.UpdateClientConnState(balancer.ClientConnState{
   752  		ResolverState: resolver.State{Addresses: []resolver.Address{
   753  			hierarchy.Set(addr1, []string{"cluster_1"}),
   754  			hierarchy.Set(addr2, []string{"cluster_2"}),
   755  			hierarchy.Set(addr3, []string{"cluster_3"}),
   756  		}},
   757  		BalancerConfig: config,
   758  	}); err != nil {
   759  		t.Fatalf("failed to update ClientConn state: %v", err)
   760  	}
   761  
   762  	scs := waitForNewSubConns(t, cc, 3)
   763  	verifySubConnAddrs(t, scs, map[string][]resolver.Address{
   764  		"cluster_1": {addr1},
   765  		"cluster_2": {addr2},
   766  		"cluster_3": {addr3},
   767  	})
   768  
   769  	// We expect one subConn on each subBalancer.
   770  	sc1 := scs["cluster_1"][0].sc.(*testutils.TestSubConn)
   771  	sc2 := scs["cluster_2"][0].sc.(*testutils.TestSubConn)
   772  	sc3 := scs["cluster_3"][0].sc.(*testutils.TestSubConn)
   773  
   774  	// Send state changes for all SubConns, and wait for the picker.
   775  	sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
   776  	<-cc.NewPickerCh
   777  	sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
   778  	<-cc.NewPickerCh
   779  	sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
   780  	<-cc.NewPickerCh
   781  	sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
   782  	<-cc.NewPickerCh
   783  	sc3.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
   784  	<-cc.NewPickerCh
   785  	sc3.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
   786  	p := <-cc.NewPickerCh
   787  
   788  	want := []balancer.SubConn{sc1, sc2, sc3}
   789  	if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p)); err != nil {
   790  		t.Fatalf("want %v, got %v", want, err)
   791  	}
   792  
   793  	// Remove the second balancer, while the others two are ready.
   794  	config, err = wtbParser.ParseConfig([]byte(`
   795  {
   796    "targets": {
   797      "cluster_1": {
   798        "weight": 1,
   799        "childPolicy": [{"test_config_balancer": "cluster_1"}]
   800      },
   801      "cluster_3": {
   802        "weight": 1,
   803        "childPolicy": [{"test_config_balancer": "cluster_3"}]
   804      }
   805    }
   806  }`))
   807  	if err != nil {
   808  		t.Fatalf("failed to parse balancer config: %v", err)
   809  	}
   810  	if err := wtb.UpdateClientConnState(balancer.ClientConnState{
   811  		ResolverState: resolver.State{Addresses: []resolver.Address{
   812  			hierarchy.Set(addr1, []string{"cluster_1"}),
   813  			hierarchy.Set(addr3, []string{"cluster_3"}),
   814  		}},
   815  		BalancerConfig: config,
   816  	}); err != nil {
   817  		t.Fatalf("failed to update ClientConn state: %v", err)
   818  	}
   819  
   820  	// Removing a subBalancer causes the weighted target LB policy to push a new
   821  	// picker which ensures that the removed subBalancer is not picked for RPCs.
   822  	p = <-cc.NewPickerCh
   823  
   824  	scShutdown := <-cc.ShutdownSubConnCh
   825  	if scShutdown != sc2 {
   826  		t.Fatalf("ShutdownSubConn, want %v, got %v", sc2, scShutdown)
   827  	}
   828  	want = []balancer.SubConn{sc1, sc3}
   829  	if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p)); err != nil {
   830  		t.Fatalf("want %v, got %v", want, err)
   831  	}
   832  
   833  	// Move balancer 3 into transient failure.
   834  	wantSubConnErr := errors.New("subConn connection error")
   835  	sc3.UpdateState(balancer.SubConnState{
   836  		ConnectivityState: connectivity.TransientFailure,
   837  		ConnectionError:   wantSubConnErr,
   838  	})
   839  	<-cc.NewPickerCh
   840  
   841  	// Remove the first balancer, while the third is transient failure.
   842  	config, err = wtbParser.ParseConfig([]byte(`
   843  {
   844    "targets": {
   845      "cluster_3": {
   846        "weight": 1,
   847        "childPolicy": [{"test_config_balancer": "cluster_3"}]
   848      }
   849    }
   850  }`))
   851  	if err != nil {
   852  		t.Fatalf("failed to parse balancer config: %v", err)
   853  	}
   854  	if err := wtb.UpdateClientConnState(balancer.ClientConnState{
   855  		ResolverState: resolver.State{Addresses: []resolver.Address{
   856  			hierarchy.Set(addr3, []string{"cluster_3"}),
   857  		}},
   858  		BalancerConfig: config,
   859  	}); err != nil {
   860  		t.Fatalf("failed to update ClientConn state: %v", err)
   861  	}
   862  
   863  	// Removing a subBalancer causes the weighted target LB policy to push a new
   864  	// picker which ensures that the removed subBalancer is not picked for RPCs.
   865  
   866  	scShutdown = <-cc.ShutdownSubConnCh
   867  	if scShutdown != sc1 {
   868  		t.Fatalf("ShutdownSubConn, want %v, got %v", sc1, scShutdown)
   869  	}
   870  
   871  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   872  	defer cancel()
   873  	if err := cc.WaitForPicker(ctx, pickAndCheckError(wantSubConnErr)); err != nil {
   874  		t.Fatal(err)
   875  	}
   876  }
   877  
   878  // TestWeightedTarget_TwoSubBalancers_ChangeWeight_MoreBackends tests the case
   879  // where we have a weighted target balancer with two sub-balancers, and we
   880  // change the weight of these subBalancers.
   881  func (s) TestWeightedTarget_TwoSubBalancers_ChangeWeight_MoreBackends(t *testing.T) {
   882  	cc := testutils.NewBalancerClientConn(t)
   883  	wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
   884  	defer wtb.Close()
   885  
   886  	// Start with two subBalancers, one with twice the weight of the other.
   887  	config, err := wtbParser.ParseConfig([]byte(`
   888  {
   889    "targets": {
   890      "cluster_1": {
   891        "weight": 2,
   892        "childPolicy": [{"test_config_balancer": "cluster_1"}]
   893      },
   894      "cluster_2": {
   895        "weight": 1,
   896        "childPolicy": [{"test_config_balancer": "cluster_2"}]
   897      }
   898    }
   899  }`))
   900  	if err != nil {
   901  		t.Fatalf("failed to parse balancer config: %v", err)
   902  	}
   903  
   904  	// Send the config with two backends for each cluster.
   905  	addr1 := resolver.Address{Addr: testBackendAddrStrs[1]}
   906  	addr2 := resolver.Address{Addr: testBackendAddrStrs[2]}
   907  	addr3 := resolver.Address{Addr: testBackendAddrStrs[3]}
   908  	addr4 := resolver.Address{Addr: testBackendAddrStrs[4]}
   909  	if err := wtb.UpdateClientConnState(balancer.ClientConnState{
   910  		ResolverState: resolver.State{Addresses: []resolver.Address{
   911  			hierarchy.Set(addr1, []string{"cluster_1"}),
   912  			hierarchy.Set(addr2, []string{"cluster_1"}),
   913  			hierarchy.Set(addr3, []string{"cluster_2"}),
   914  			hierarchy.Set(addr4, []string{"cluster_2"}),
   915  		}},
   916  		BalancerConfig: config,
   917  	}); err != nil {
   918  		t.Fatalf("failed to update ClientConn state: %v", err)
   919  	}
   920  
   921  	scs := waitForNewSubConns(t, cc, 4)
   922  	verifySubConnAddrs(t, scs, map[string][]resolver.Address{
   923  		"cluster_1": {addr1, addr2},
   924  		"cluster_2": {addr3, addr4},
   925  	})
   926  
   927  	// We expect two subConns on each subBalancer.
   928  	sc1 := scs["cluster_1"][0].sc.(*testutils.TestSubConn)
   929  	sc2 := scs["cluster_1"][1].sc.(*testutils.TestSubConn)
   930  	sc3 := scs["cluster_2"][0].sc.(*testutils.TestSubConn)
   931  	sc4 := scs["cluster_2"][1].sc.(*testutils.TestSubConn)
   932  
   933  	// Send state changes for all SubConns, and wait for the picker.
   934  	sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
   935  	<-cc.NewPickerCh
   936  	sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
   937  	<-cc.NewPickerCh
   938  	sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
   939  	<-cc.NewPickerCh
   940  	sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
   941  	<-cc.NewPickerCh
   942  	sc3.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
   943  	<-cc.NewPickerCh
   944  	sc3.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
   945  	<-cc.NewPickerCh
   946  	sc4.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
   947  	<-cc.NewPickerCh
   948  	sc4.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
   949  	p := <-cc.NewPickerCh
   950  
   951  	// Test roundrobin on the last picker. Twice the number of RPCs should be
   952  	// sent to cluster_1 when compared to cluster_2.
   953  	want := []balancer.SubConn{sc1, sc1, sc2, sc2, sc3, sc4}
   954  	if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p)); err != nil {
   955  		t.Fatalf("want %v, got %v", want, err)
   956  	}
   957  
   958  	// Change the weight of cluster_1.
   959  	config, err = wtbParser.ParseConfig([]byte(`
   960  {
   961    "targets": {
   962      "cluster_1": {
   963        "weight": 3,
   964        "childPolicy": [{"test_config_balancer": "cluster_1"}]
   965      },
   966      "cluster_2": {
   967        "weight": 1,
   968        "childPolicy": [{"test_config_balancer": "cluster_2"}]
   969      }
   970    }
   971  }`))
   972  	if err != nil {
   973  		t.Fatalf("failed to parse balancer config: %v", err)
   974  	}
   975  	if err := wtb.UpdateClientConnState(balancer.ClientConnState{
   976  		ResolverState: resolver.State{Addresses: []resolver.Address{
   977  			hierarchy.Set(addr1, []string{"cluster_1"}),
   978  			hierarchy.Set(addr2, []string{"cluster_1"}),
   979  			hierarchy.Set(addr3, []string{"cluster_2"}),
   980  			hierarchy.Set(addr4, []string{"cluster_2"}),
   981  		}},
   982  		BalancerConfig: config,
   983  	}); err != nil {
   984  		t.Fatalf("failed to update ClientConn state: %v", err)
   985  	}
   986  
   987  	// Weight change causes a new picker to be pushed to the channel.
   988  	p = <-cc.NewPickerCh
   989  	want = []balancer.SubConn{sc1, sc1, sc1, sc2, sc2, sc2, sc3, sc4}
   990  	if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p)); err != nil {
   991  		t.Fatalf("want %v, got %v", want, err)
   992  	}
   993  }
   994  
   995  // TestWeightedTarget_InitOneSubBalancerTransientFailure tests that at init
   996  // time, with two sub-balancers, if one sub-balancer reports transient_failure,
   997  // the picks won't fail with transient_failure, and should instead wait for the
   998  // other sub-balancer.
   999  func (s) TestWeightedTarget_InitOneSubBalancerTransientFailure(t *testing.T) {
  1000  	cc := testutils.NewBalancerClientConn(t)
  1001  	wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
  1002  	defer wtb.Close()
  1003  
  1004  	// Start with "cluster_1: test_config_balancer, cluster_2: test_config_balancer".
  1005  	config, err := wtbParser.ParseConfig([]byte(`
  1006  {
  1007    "targets": {
  1008      "cluster_1": {
  1009        "weight":1,
  1010        "childPolicy": [{"test_config_balancer": "cluster_1"}]
  1011      },
  1012      "cluster_2": {
  1013        "weight":1,
  1014        "childPolicy": [{"test_config_balancer": "cluster_2"}]
  1015      }
  1016    }
  1017  }`))
  1018  	if err != nil {
  1019  		t.Fatalf("failed to parse balancer config: %v", err)
  1020  	}
  1021  
  1022  	// Send the config with one address for each cluster.
  1023  	addr1 := resolver.Address{Addr: testBackendAddrStrs[1]}
  1024  	addr2 := resolver.Address{Addr: testBackendAddrStrs[2]}
  1025  	if err := wtb.UpdateClientConnState(balancer.ClientConnState{
  1026  		ResolverState: resolver.State{Addresses: []resolver.Address{
  1027  			hierarchy.Set(addr1, []string{"cluster_1"}),
  1028  			hierarchy.Set(addr2, []string{"cluster_2"}),
  1029  		}},
  1030  		BalancerConfig: config,
  1031  	}); err != nil {
  1032  		t.Fatalf("failed to update ClientConn state: %v", err)
  1033  	}
  1034  
  1035  	scs := waitForNewSubConns(t, cc, 2)
  1036  	verifySubConnAddrs(t, scs, map[string][]resolver.Address{
  1037  		"cluster_1": {addr1},
  1038  		"cluster_2": {addr2},
  1039  	})
  1040  
  1041  	// We expect a single subConn on each subBalancer.
  1042  	sc1 := scs["cluster_1"][0].sc.(*testutils.TestSubConn)
  1043  	_ = scs["cluster_2"][0].sc
  1044  
  1045  	// Set one subconn to TransientFailure, this will trigger one sub-balancer
  1046  	// to report transient failure.
  1047  	sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
  1048  
  1049  	p := <-cc.NewPickerCh
  1050  	for i := 0; i < 5; i++ {
  1051  		r, err := p.Pick(balancer.PickInfo{})
  1052  		if err != balancer.ErrNoSubConnAvailable {
  1053  			t.Fatalf("want pick to fail with %v, got result %v, err %v", balancer.ErrNoSubConnAvailable, r, err)
  1054  		}
  1055  	}
  1056  }
  1057  
  1058  // Test that with two sub-balancers, both in transient_failure, if one turns
  1059  // connecting, the overall state stays in transient_failure, and all picks
  1060  // return transient failure error.
  1061  func (s) TestBalancerGroup_SubBalancerTurnsConnectingFromTransientFailure(t *testing.T) {
  1062  	cc := testutils.NewBalancerClientConn(t)
  1063  	wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
  1064  	defer wtb.Close()
  1065  
  1066  	// Start with "cluster_1: test_config_balancer, cluster_2: test_config_balancer".
  1067  	config, err := wtbParser.ParseConfig([]byte(`
  1068  {
  1069    "targets": {
  1070      "cluster_1": {
  1071        "weight":1,
  1072        "childPolicy": [{"test_config_balancer": "cluster_1"}]
  1073      },
  1074      "cluster_2": {
  1075        "weight":1,
  1076        "childPolicy": [{"test_config_balancer": "cluster_2"}]
  1077      }
  1078    }
  1079  }`))
  1080  	if err != nil {
  1081  		t.Fatalf("failed to parse balancer config: %v", err)
  1082  	}
  1083  
  1084  	// Send the config with one address for each cluster.
  1085  	addr1 := resolver.Address{Addr: testBackendAddrStrs[1]}
  1086  	addr2 := resolver.Address{Addr: testBackendAddrStrs[2]}
  1087  	if err := wtb.UpdateClientConnState(balancer.ClientConnState{
  1088  		ResolverState: resolver.State{Addresses: []resolver.Address{
  1089  			hierarchy.Set(addr1, []string{"cluster_1"}),
  1090  			hierarchy.Set(addr2, []string{"cluster_2"}),
  1091  		}},
  1092  		BalancerConfig: config,
  1093  	}); err != nil {
  1094  		t.Fatalf("failed to update ClientConn state: %v", err)
  1095  	}
  1096  
  1097  	scs := waitForNewSubConns(t, cc, 2)
  1098  	verifySubConnAddrs(t, scs, map[string][]resolver.Address{
  1099  		"cluster_1": {addr1},
  1100  		"cluster_2": {addr2},
  1101  	})
  1102  
  1103  	// We expect a single subConn on each subBalancer.
  1104  	sc1 := scs["cluster_1"][0].sc.(*testutils.TestSubConn)
  1105  	sc2 := scs["cluster_2"][0].sc.(*testutils.TestSubConn)
  1106  
  1107  	// Set both subconn to TransientFailure, this will put both sub-balancers in
  1108  	// transient failure.
  1109  	wantSubConnErr := errors.New("subConn connection error")
  1110  	sc1.UpdateState(balancer.SubConnState{
  1111  		ConnectivityState: connectivity.TransientFailure,
  1112  		ConnectionError:   wantSubConnErr,
  1113  	})
  1114  	<-cc.NewPickerCh
  1115  	sc2.UpdateState(balancer.SubConnState{
  1116  		ConnectivityState: connectivity.TransientFailure,
  1117  		ConnectionError:   wantSubConnErr,
  1118  	})
  1119  	p := <-cc.NewPickerCh
  1120  
  1121  	for i := 0; i < 5; i++ {
  1122  		if _, err := p.Pick(balancer.PickInfo{}); err == nil || !strings.Contains(err.Error(), wantSubConnErr.Error()) {
  1123  			t.Fatalf("picker.Pick() returned error: %v, want: %v", err, wantSubConnErr)
  1124  		}
  1125  	}
  1126  
  1127  	// Set one subconn to Connecting, it shouldn't change the overall state.
  1128  	sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
  1129  	select {
  1130  	case <-time.After(100 * time.Millisecond):
  1131  	case <-cc.NewPickerCh:
  1132  		t.Fatal("received new picker from the LB policy when expecting none")
  1133  	}
  1134  
  1135  	for i := 0; i < 5; i++ {
  1136  		if _, err := p.Pick(balancer.PickInfo{}); err == nil || !strings.Contains(err.Error(), wantSubConnErr.Error()) {
  1137  			t.Fatalf("picker.Pick() returned error: %v, want: %v", err, wantSubConnErr)
  1138  		}
  1139  	}
  1140  }
  1141  
  1142  // Verify that a SubConn is created with the expected address and hierarchy
  1143  // path cleared.
  1144  func verifyAddressInNewSubConn(t *testing.T, cc *testutils.BalancerClientConn, addr resolver.Address) {
  1145  	t.Helper()
  1146  
  1147  	gotAddr := <-cc.NewSubConnAddrsCh
  1148  	wantAddr := []resolver.Address{hierarchy.Set(addr, []string{})}
  1149  	if diff := cmp.Diff(gotAddr, wantAddr, cmp.AllowUnexported(attributes.Attributes{})); diff != "" {
  1150  		t.Fatalf("got unexpected new subconn addrs: %v", diff)
  1151  	}
  1152  }
  1153  
  1154  // subConnWithAddr wraps a subConn and the address for which it was created.
  1155  type subConnWithAddr struct {
  1156  	sc   balancer.SubConn
  1157  	addr resolver.Address
  1158  }
  1159  
  1160  // waitForNewSubConns waits for `num` number of subConns to be created. This is
  1161  // expected to be used from tests using the "test_config_balancer" LB policy,
  1162  // which adds an address attribute with value set to the balancer config.
  1163  //
  1164  // Returned value is a map from subBalancer (identified by its config) to
  1165  // subConns created by it.
  1166  func waitForNewSubConns(t *testing.T, cc *testutils.BalancerClientConn, num int) map[string][]subConnWithAddr {
  1167  	t.Helper()
  1168  
  1169  	scs := make(map[string][]subConnWithAddr)
  1170  	for i := 0; i < num; i++ {
  1171  		addrs := <-cc.NewSubConnAddrsCh
  1172  		if len(addrs) != 1 {
  1173  			t.Fatalf("received subConns with %d addresses, want 1", len(addrs))
  1174  		}
  1175  		cfg, ok := getConfigKey(addrs[0].Attributes)
  1176  		if !ok {
  1177  			t.Fatalf("received subConn address %v contains no attribute for balancer config", addrs[0])
  1178  		}
  1179  		sc := <-cc.NewSubConnCh
  1180  		scWithAddr := subConnWithAddr{sc: sc, addr: addrs[0]}
  1181  		scs[cfg] = append(scs[cfg], scWithAddr)
  1182  	}
  1183  	return scs
  1184  }
  1185  
  1186  func verifySubConnAddrs(t *testing.T, scs map[string][]subConnWithAddr, wantSubConnAddrs map[string][]resolver.Address) {
  1187  	t.Helper()
  1188  
  1189  	if len(scs) != len(wantSubConnAddrs) {
  1190  		t.Fatalf("got new subConns %+v, want %v", scs, wantSubConnAddrs)
  1191  	}
  1192  	for cfg, scsWithAddr := range scs {
  1193  		if len(scsWithAddr) != len(wantSubConnAddrs[cfg]) {
  1194  			t.Fatalf("got new subConns %+v, want %v", scs, wantSubConnAddrs)
  1195  		}
  1196  		wantAddrs := wantSubConnAddrs[cfg]
  1197  		for i, scWithAddr := range scsWithAddr {
  1198  			if diff := cmp.Diff(wantAddrs[i].Addr, scWithAddr.addr.Addr); diff != "" {
  1199  				t.Fatalf("got unexpected new subconn addrs: %v", diff)
  1200  			}
  1201  		}
  1202  	}
  1203  }
  1204  
  1205  const initIdleBalancerName = "test-init-Idle-balancer"
  1206  
  1207  var errTestInitIdle = fmt.Errorf("init Idle balancer error 0")
  1208  
  1209  func init() {
  1210  	stub.Register(initIdleBalancerName, stub.BalancerFuncs{
  1211  		UpdateClientConnState: func(bd *stub.BalancerData, opts balancer.ClientConnState) error {
  1212  			sc, err := bd.ClientConn.NewSubConn(opts.ResolverState.Addresses, balancer.NewSubConnOptions{
  1213  				StateListener: func(state balancer.SubConnState) {
  1214  					err := fmt.Errorf("wrong picker error")
  1215  					if state.ConnectivityState == connectivity.Idle {
  1216  						err = errTestInitIdle
  1217  					}
  1218  					bd.ClientConn.UpdateState(balancer.State{
  1219  						ConnectivityState: state.ConnectivityState,
  1220  						Picker:            &testutils.TestConstPicker{Err: err},
  1221  					})
  1222  				},
  1223  			})
  1224  			if err != nil {
  1225  				return err
  1226  			}
  1227  			sc.Connect()
  1228  			return nil
  1229  		},
  1230  	})
  1231  }
  1232  
  1233  // TestInitialIdle covers the case that if the child reports Idle, the overall
  1234  // state will be Idle.
  1235  func (s) TestInitialIdle(t *testing.T) {
  1236  	cc := testutils.NewBalancerClientConn(t)
  1237  	wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
  1238  	defer wtb.Close()
  1239  
  1240  	config, err := wtbParser.ParseConfig([]byte(`
  1241  {
  1242    "targets": {
  1243      "cluster_1": {
  1244        "weight":1,
  1245        "childPolicy": [{"test-init-Idle-balancer": ""}]
  1246      }
  1247    }
  1248  }`))
  1249  	if err != nil {
  1250  		t.Fatalf("failed to parse balancer config: %v", err)
  1251  	}
  1252  
  1253  	// Send the config, and an address with hierarchy path ["cluster_1"].
  1254  	addrs := []resolver.Address{{Addr: testBackendAddrStrs[0], Attributes: nil}}
  1255  	if err := wtb.UpdateClientConnState(balancer.ClientConnState{
  1256  		ResolverState:  resolver.State{Addresses: []resolver.Address{hierarchy.Set(addrs[0], []string{"cds:cluster_1"})}},
  1257  		BalancerConfig: config,
  1258  	}); err != nil {
  1259  		t.Fatalf("failed to update ClientConn state: %v", err)
  1260  	}
  1261  
  1262  	// Verify that a subconn is created with the address, and the hierarchy path
  1263  	// in the address is cleared.
  1264  	for range addrs {
  1265  		sc := <-cc.NewSubConnCh
  1266  		sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Idle})
  1267  	}
  1268  
  1269  	if state := <-cc.NewStateCh; state != connectivity.Idle {
  1270  		t.Fatalf("Received aggregated state: %v, want Idle", state)
  1271  	}
  1272  }
  1273  
  1274  // TestIgnoreSubBalancerStateTransitions covers the case that if the child reports a
  1275  // transition from TF to Connecting, the overall state will still be TF.
  1276  func (s) TestIgnoreSubBalancerStateTransitions(t *testing.T) {
  1277  	cc := &tcc{BalancerClientConn: testutils.NewBalancerClientConn(t)}
  1278  
  1279  	wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
  1280  	defer wtb.Close()
  1281  
  1282  	config, err := wtbParser.ParseConfig([]byte(`
  1283  {
  1284    "targets": {
  1285      "cluster_1": {
  1286        "weight":1,
  1287        "childPolicy": [{"round_robin": ""}]
  1288      }
  1289    }
  1290  }`))
  1291  	if err != nil {
  1292  		t.Fatalf("failed to parse balancer config: %v", err)
  1293  	}
  1294  
  1295  	// Send the config, and an address with hierarchy path ["cluster_1"].
  1296  	addr := resolver.Address{Addr: testBackendAddrStrs[0], Attributes: nil}
  1297  	if err := wtb.UpdateClientConnState(balancer.ClientConnState{
  1298  		ResolverState:  resolver.State{Addresses: []resolver.Address{hierarchy.Set(addr, []string{"cluster_1"})}},
  1299  		BalancerConfig: config,
  1300  	}); err != nil {
  1301  		t.Fatalf("failed to update ClientConn state: %v", err)
  1302  	}
  1303  
  1304  	sc := <-cc.NewSubConnCh
  1305  	sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
  1306  	sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
  1307  
  1308  	// Verify that the SubConnState update from TF to Connecting is ignored.
  1309  	if len(cc.states) != 2 || cc.states[0].ConnectivityState != connectivity.Connecting || cc.states[1].ConnectivityState != connectivity.TransientFailure {
  1310  		t.Fatalf("cc.states = %v; want [Connecting, TransientFailure]", cc.states)
  1311  	}
  1312  }
  1313  
  1314  // tcc wraps a testutils.TestClientConn but stores all state transitions in a
  1315  // slice.
  1316  type tcc struct {
  1317  	*testutils.BalancerClientConn
  1318  	states []balancer.State
  1319  }
  1320  
  1321  func (t *tcc) UpdateState(bs balancer.State) {
  1322  	t.states = append(t.states, bs)
  1323  	t.BalancerClientConn.UpdateState(bs)
  1324  }
  1325  
  1326  func (s) TestUpdateStatePauses(t *testing.T) {
  1327  	cc := &tcc{BalancerClientConn: testutils.NewBalancerClientConn(t)}
  1328  
  1329  	balFuncs := stub.BalancerFuncs{
  1330  		UpdateClientConnState: func(bd *stub.BalancerData, s balancer.ClientConnState) error {
  1331  			bd.ClientConn.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, Picker: nil})
  1332  			bd.ClientConn.UpdateState(balancer.State{ConnectivityState: connectivity.Ready, Picker: nil})
  1333  			return nil
  1334  		},
  1335  	}
  1336  	stub.Register("update_state_balancer", balFuncs)
  1337  
  1338  	wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
  1339  	defer wtb.Close()
  1340  
  1341  	config, err := wtbParser.ParseConfig([]byte(`
  1342  {
  1343    "targets": {
  1344      "cluster_1": {
  1345        "weight":1,
  1346        "childPolicy": [{"update_state_balancer": ""}]
  1347      }
  1348    }
  1349  }`))
  1350  	if err != nil {
  1351  		t.Fatalf("failed to parse balancer config: %v", err)
  1352  	}
  1353  
  1354  	// Send the config, and an address with hierarchy path ["cluster_1"].
  1355  	addrs := []resolver.Address{{Addr: testBackendAddrStrs[0], Attributes: nil}}
  1356  	if err := wtb.UpdateClientConnState(balancer.ClientConnState{
  1357  		ResolverState:  resolver.State{Addresses: []resolver.Address{hierarchy.Set(addrs[0], []string{"cds:cluster_1"})}},
  1358  		BalancerConfig: config,
  1359  	}); err != nil {
  1360  		t.Fatalf("failed to update ClientConn state: %v", err)
  1361  	}
  1362  
  1363  	// Verify that the only state update is the second one called by the child.
  1364  	if len(cc.states) != 1 || cc.states[0].ConnectivityState != connectivity.Ready {
  1365  		t.Fatalf("cc.states = %v; want [connectivity.Ready]", cc.states)
  1366  	}
  1367  }
  1368  

View as plain text