...

Source file src/google.golang.org/grpc/xds/internal/balancer/clustermanager/clustermanager_test.go

Documentation: google.golang.org/grpc/xds/internal/balancer/clustermanager

     1  /*
     2   *
     3   * Copyright 2020 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  package clustermanager
    20  
    21  import (
    22  	"context"
    23  	"fmt"
    24  	"testing"
    25  	"time"
    26  
    27  	"github.com/google/go-cmp/cmp"
    28  	"google.golang.org/grpc"
    29  	"google.golang.org/grpc/balancer"
    30  	"google.golang.org/grpc/codes"
    31  	"google.golang.org/grpc/connectivity"
    32  	"google.golang.org/grpc/credentials/insecure"
    33  	"google.golang.org/grpc/internal/balancer/stub"
    34  	"google.golang.org/grpc/internal/grpctest"
    35  	"google.golang.org/grpc/internal/hierarchy"
    36  	"google.golang.org/grpc/internal/testutils"
    37  	"google.golang.org/grpc/resolver"
    38  	"google.golang.org/grpc/status"
    39  )
    40  
    41  type s struct {
    42  	grpctest.Tester
    43  }
    44  
    45  func Test(t *testing.T) {
    46  	grpctest.RunSubTests(t, s{})
    47  }
    48  
    49  const (
    50  	defaultTestTimeout      = 5 * time.Second
    51  	defaultTestShortTimeout = 10 * time.Millisecond
    52  	testBackendAddrsCount   = 12
    53  )
    54  
    55  var testBackendAddrStrs []string
    56  
    57  func init() {
    58  	for i := 0; i < testBackendAddrsCount; i++ {
    59  		testBackendAddrStrs = append(testBackendAddrStrs, fmt.Sprintf("%d.%d.%d.%d:%d", i, i, i, i, i))
    60  	}
    61  }
    62  
    63  func testPick(t *testing.T, p balancer.Picker, info balancer.PickInfo, wantSC balancer.SubConn, wantErr error) {
    64  	t.Helper()
    65  	for i := 0; i < 5; i++ {
    66  		gotSCSt, err := p.Pick(info)
    67  		if fmt.Sprint(err) != fmt.Sprint(wantErr) {
    68  			t.Fatalf("picker.Pick(%+v), got error %v, want %v", info, err, wantErr)
    69  		}
    70  		if gotSCSt.SubConn != wantSC {
    71  			t.Fatalf("picker.Pick(%+v), got %v, want SubConn=%v", info, gotSCSt, wantSC)
    72  		}
    73  	}
    74  }
    75  
    76  func TestClusterPicks(t *testing.T) {
    77  	cc := testutils.NewBalancerClientConn(t)
    78  	builder := balancer.Get(balancerName)
    79  	parser := builder.(balancer.ConfigParser)
    80  	bal := builder.Build(cc, balancer.BuildOptions{})
    81  
    82  	configJSON1 := `{
    83  "children": {
    84  	"cds:cluster_1":{ "childPolicy": [{"round_robin":""}] },
    85  	"cds:cluster_2":{ "childPolicy": [{"round_robin":""}] }
    86  }
    87  }`
    88  	config1, err := parser.ParseConfig([]byte(configJSON1))
    89  	if err != nil {
    90  		t.Fatalf("failed to parse balancer config: %v", err)
    91  	}
    92  
    93  	// Send the config, and an address with hierarchy path ["cluster_1"].
    94  	wantAddrs := []resolver.Address{
    95  		{Addr: testBackendAddrStrs[0], BalancerAttributes: nil},
    96  		{Addr: testBackendAddrStrs[1], BalancerAttributes: nil},
    97  	}
    98  	if err := bal.UpdateClientConnState(balancer.ClientConnState{
    99  		ResolverState: resolver.State{Addresses: []resolver.Address{
   100  			hierarchy.Set(wantAddrs[0], []string{"cds:cluster_1"}),
   101  			hierarchy.Set(wantAddrs[1], []string{"cds:cluster_2"}),
   102  		}},
   103  		BalancerConfig: config1,
   104  	}); err != nil {
   105  		t.Fatalf("failed to update ClientConn state: %v", err)
   106  	}
   107  
   108  	m1 := make(map[resolver.Address]balancer.SubConn)
   109  	// Verify that a subconn is created with the address, and the hierarchy path
   110  	// in the address is cleared.
   111  	for range wantAddrs {
   112  		addrs := <-cc.NewSubConnAddrsCh
   113  		if len(hierarchy.Get(addrs[0])) != 0 {
   114  			t.Fatalf("NewSubConn with address %+v, attrs %+v, want address with hierarchy cleared", addrs[0], addrs[0].BalancerAttributes)
   115  		}
   116  		sc := <-cc.NewSubConnCh
   117  		// Clear the attributes before adding to map.
   118  		addrs[0].BalancerAttributes = nil
   119  		m1[addrs[0]] = sc
   120  		sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
   121  		sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
   122  	}
   123  
   124  	p1 := <-cc.NewPickerCh
   125  	for _, tt := range []struct {
   126  		pickInfo balancer.PickInfo
   127  		wantSC   balancer.SubConn
   128  		wantErr  error
   129  	}{
   130  		{
   131  			pickInfo: balancer.PickInfo{
   132  				Ctx: SetPickedCluster(context.Background(), "cds:cluster_1"),
   133  			},
   134  			wantSC: m1[wantAddrs[0]],
   135  		},
   136  		{
   137  			pickInfo: balancer.PickInfo{
   138  				Ctx: SetPickedCluster(context.Background(), "cds:cluster_2"),
   139  			},
   140  			wantSC: m1[wantAddrs[1]],
   141  		},
   142  		{
   143  			pickInfo: balancer.PickInfo{
   144  				Ctx: SetPickedCluster(context.Background(), "notacluster"),
   145  			},
   146  			wantErr: status.Errorf(codes.Unavailable, `unknown cluster selected for RPC: "notacluster"`),
   147  		},
   148  	} {
   149  		testPick(t, p1, tt.pickInfo, tt.wantSC, tt.wantErr)
   150  	}
   151  }
   152  
   153  // TestConfigUpdateAddCluster covers the cases the balancer receives config
   154  // update with extra clusters.
   155  func TestConfigUpdateAddCluster(t *testing.T) {
   156  	cc := testutils.NewBalancerClientConn(t)
   157  	builder := balancer.Get(balancerName)
   158  	parser := builder.(balancer.ConfigParser)
   159  	bal := builder.Build(cc, balancer.BuildOptions{})
   160  
   161  	configJSON1 := `{
   162  "children": {
   163  	"cds:cluster_1":{ "childPolicy": [{"round_robin":""}] },
   164  	"cds:cluster_2":{ "childPolicy": [{"round_robin":""}] }
   165  }
   166  }`
   167  	config1, err := parser.ParseConfig([]byte(configJSON1))
   168  	if err != nil {
   169  		t.Fatalf("failed to parse balancer config: %v", err)
   170  	}
   171  
   172  	// Send the config, and an address with hierarchy path ["cluster_1"].
   173  	wantAddrs := []resolver.Address{
   174  		{Addr: testBackendAddrStrs[0], BalancerAttributes: nil},
   175  		{Addr: testBackendAddrStrs[1], BalancerAttributes: nil},
   176  	}
   177  	if err := bal.UpdateClientConnState(balancer.ClientConnState{
   178  		ResolverState: resolver.State{Addresses: []resolver.Address{
   179  			hierarchy.Set(wantAddrs[0], []string{"cds:cluster_1"}),
   180  			hierarchy.Set(wantAddrs[1], []string{"cds:cluster_2"}),
   181  		}},
   182  		BalancerConfig: config1,
   183  	}); err != nil {
   184  		t.Fatalf("failed to update ClientConn state: %v", err)
   185  	}
   186  
   187  	m1 := make(map[resolver.Address]balancer.SubConn)
   188  	// Verify that a subconn is created with the address, and the hierarchy path
   189  	// in the address is cleared.
   190  	for range wantAddrs {
   191  		addrs := <-cc.NewSubConnAddrsCh
   192  		if len(hierarchy.Get(addrs[0])) != 0 {
   193  			t.Fatalf("NewSubConn with address %+v, attrs %+v, want address with hierarchy cleared", addrs[0], addrs[0].BalancerAttributes)
   194  		}
   195  		sc := <-cc.NewSubConnCh
   196  		// Clear the attributes before adding to map.
   197  		addrs[0].BalancerAttributes = nil
   198  		m1[addrs[0]] = sc
   199  		sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
   200  		sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
   201  	}
   202  
   203  	p1 := <-cc.NewPickerCh
   204  	for _, tt := range []struct {
   205  		pickInfo balancer.PickInfo
   206  		wantSC   balancer.SubConn
   207  		wantErr  error
   208  	}{
   209  		{
   210  			pickInfo: balancer.PickInfo{
   211  				Ctx: SetPickedCluster(context.Background(), "cds:cluster_1"),
   212  			},
   213  			wantSC: m1[wantAddrs[0]],
   214  		},
   215  		{
   216  			pickInfo: balancer.PickInfo{
   217  				Ctx: SetPickedCluster(context.Background(), "cds:cluster_2"),
   218  			},
   219  			wantSC: m1[wantAddrs[1]],
   220  		},
   221  		{
   222  			pickInfo: balancer.PickInfo{
   223  				Ctx: SetPickedCluster(context.Background(), "cds:notacluster"),
   224  			},
   225  			wantErr: status.Errorf(codes.Unavailable, `unknown cluster selected for RPC: "cds:notacluster"`),
   226  		},
   227  	} {
   228  		testPick(t, p1, tt.pickInfo, tt.wantSC, tt.wantErr)
   229  	}
   230  
   231  	// A config update with different routes, and different actions. Expect a
   232  	// new subconn and a picker update.
   233  	configJSON2 := `{
   234  "children": {
   235  	"cds:cluster_1":{ "childPolicy": [{"round_robin":""}] },
   236  	"cds:cluster_2":{ "childPolicy": [{"round_robin":""}] },
   237  	"cds:cluster_3":{ "childPolicy": [{"round_robin":""}] }
   238  }
   239  }`
   240  	config2, err := parser.ParseConfig([]byte(configJSON2))
   241  	if err != nil {
   242  		t.Fatalf("failed to parse balancer config: %v", err)
   243  	}
   244  	wantAddrs = append(wantAddrs, resolver.Address{Addr: testBackendAddrStrs[2], BalancerAttributes: nil})
   245  	if err := bal.UpdateClientConnState(balancer.ClientConnState{
   246  		ResolverState: resolver.State{Addresses: []resolver.Address{
   247  			hierarchy.Set(wantAddrs[0], []string{"cds:cluster_1"}),
   248  			hierarchy.Set(wantAddrs[1], []string{"cds:cluster_2"}),
   249  			hierarchy.Set(wantAddrs[2], []string{"cds:cluster_3"}),
   250  		}},
   251  		BalancerConfig: config2,
   252  	}); err != nil {
   253  		t.Fatalf("failed to update ClientConn state: %v", err)
   254  	}
   255  
   256  	// Expect exactly one new subconn.
   257  	addrs := <-cc.NewSubConnAddrsCh
   258  	if len(hierarchy.Get(addrs[0])) != 0 {
   259  		t.Fatalf("NewSubConn with address %+v, attrs %+v, want address with hierarchy cleared", addrs[0], addrs[0].BalancerAttributes)
   260  	}
   261  	sc := <-cc.NewSubConnCh
   262  	// Clear the attributes before adding to map.
   263  	addrs[0].BalancerAttributes = nil
   264  	m1[addrs[0]] = sc
   265  	sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
   266  	sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
   267  
   268  	// Should have no more newSubConn.
   269  	select {
   270  	case <-time.After(time.Millisecond * 500):
   271  	case <-cc.NewSubConnCh:
   272  		addrs := <-cc.NewSubConnAddrsCh
   273  		t.Fatalf("unexpected NewSubConn with address %v", addrs)
   274  	}
   275  
   276  	p2 := <-cc.NewPickerCh
   277  	for _, tt := range []struct {
   278  		pickInfo balancer.PickInfo
   279  		wantSC   balancer.SubConn
   280  		wantErr  error
   281  	}{
   282  		{
   283  			pickInfo: balancer.PickInfo{
   284  				Ctx: SetPickedCluster(context.Background(), "cds:cluster_1"),
   285  			},
   286  			wantSC: m1[wantAddrs[0]],
   287  		},
   288  		{
   289  			pickInfo: balancer.PickInfo{
   290  				Ctx: SetPickedCluster(context.Background(), "cds:cluster_2"),
   291  			},
   292  			wantSC: m1[wantAddrs[1]],
   293  		},
   294  		{
   295  			pickInfo: balancer.PickInfo{
   296  				Ctx: SetPickedCluster(context.Background(), "cds:cluster_3"),
   297  			},
   298  			wantSC: m1[wantAddrs[2]],
   299  		},
   300  		{
   301  			pickInfo: balancer.PickInfo{
   302  				Ctx: SetPickedCluster(context.Background(), "cds:notacluster"),
   303  			},
   304  			wantErr: status.Errorf(codes.Unavailable, `unknown cluster selected for RPC: "cds:notacluster"`),
   305  		},
   306  	} {
   307  		testPick(t, p2, tt.pickInfo, tt.wantSC, tt.wantErr)
   308  	}
   309  }
   310  
   311  // TestRoutingConfigUpdateDeleteAll covers the cases the balancer receives
   312  // config update with no clusters. Pick should fail with details in error.
   313  func TestRoutingConfigUpdateDeleteAll(t *testing.T) {
   314  	cc := testutils.NewBalancerClientConn(t)
   315  	builder := balancer.Get(balancerName)
   316  	parser := builder.(balancer.ConfigParser)
   317  	bal := builder.Build(cc, balancer.BuildOptions{})
   318  
   319  	configJSON1 := `{
   320  "children": {
   321  	"cds:cluster_1":{ "childPolicy": [{"round_robin":""}] },
   322  	"cds:cluster_2":{ "childPolicy": [{"round_robin":""}] }
   323  }
   324  }`
   325  	config1, err := parser.ParseConfig([]byte(configJSON1))
   326  	if err != nil {
   327  		t.Fatalf("failed to parse balancer config: %v", err)
   328  	}
   329  
   330  	// Send the config, and an address with hierarchy path ["cluster_1"].
   331  	wantAddrs := []resolver.Address{
   332  		{Addr: testBackendAddrStrs[0], BalancerAttributes: nil},
   333  		{Addr: testBackendAddrStrs[1], BalancerAttributes: nil},
   334  	}
   335  	if err := bal.UpdateClientConnState(balancer.ClientConnState{
   336  		ResolverState: resolver.State{Addresses: []resolver.Address{
   337  			hierarchy.Set(wantAddrs[0], []string{"cds:cluster_1"}),
   338  			hierarchy.Set(wantAddrs[1], []string{"cds:cluster_2"}),
   339  		}},
   340  		BalancerConfig: config1,
   341  	}); err != nil {
   342  		t.Fatalf("failed to update ClientConn state: %v", err)
   343  	}
   344  
   345  	m1 := make(map[resolver.Address]balancer.SubConn)
   346  	// Verify that a subconn is created with the address, and the hierarchy path
   347  	// in the address is cleared.
   348  	for range wantAddrs {
   349  		addrs := <-cc.NewSubConnAddrsCh
   350  		if len(hierarchy.Get(addrs[0])) != 0 {
   351  			t.Fatalf("NewSubConn with address %+v, attrs %+v, want address with hierarchy cleared", addrs[0], addrs[0].BalancerAttributes)
   352  		}
   353  		sc := <-cc.NewSubConnCh
   354  		// Clear the attributes before adding to map.
   355  		addrs[0].BalancerAttributes = nil
   356  		m1[addrs[0]] = sc
   357  		sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
   358  		sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
   359  	}
   360  
   361  	p1 := <-cc.NewPickerCh
   362  	for _, tt := range []struct {
   363  		pickInfo balancer.PickInfo
   364  		wantSC   balancer.SubConn
   365  		wantErr  error
   366  	}{
   367  		{
   368  			pickInfo: balancer.PickInfo{
   369  				Ctx: SetPickedCluster(context.Background(), "cds:cluster_1"),
   370  			},
   371  			wantSC: m1[wantAddrs[0]],
   372  		},
   373  		{
   374  			pickInfo: balancer.PickInfo{
   375  				Ctx: SetPickedCluster(context.Background(), "cds:cluster_2"),
   376  			},
   377  			wantSC: m1[wantAddrs[1]],
   378  		},
   379  		{
   380  			pickInfo: balancer.PickInfo{
   381  				Ctx: SetPickedCluster(context.Background(), "cds:notacluster"),
   382  			},
   383  			wantErr: status.Errorf(codes.Unavailable, `unknown cluster selected for RPC: "cds:notacluster"`),
   384  		},
   385  	} {
   386  		testPick(t, p1, tt.pickInfo, tt.wantSC, tt.wantErr)
   387  	}
   388  
   389  	// A config update with no clusters.
   390  	configJSON2 := `{}`
   391  	config2, err := parser.ParseConfig([]byte(configJSON2))
   392  	if err != nil {
   393  		t.Fatalf("failed to parse balancer config: %v", err)
   394  	}
   395  	if err := bal.UpdateClientConnState(balancer.ClientConnState{
   396  		BalancerConfig: config2,
   397  	}); err != nil {
   398  		t.Fatalf("failed to update ClientConn state: %v", err)
   399  	}
   400  
   401  	// Expect two removed subconns.
   402  	for range wantAddrs {
   403  		select {
   404  		case <-time.After(time.Millisecond * 500):
   405  			t.Fatalf("timeout waiting for remove subconn")
   406  		case <-cc.ShutdownSubConnCh:
   407  		}
   408  	}
   409  
   410  	p2 := <-cc.NewPickerCh
   411  	for i := 0; i < 5; i++ {
   412  		gotSCSt, err := p2.Pick(balancer.PickInfo{Ctx: SetPickedCluster(context.Background(), "cds:notacluster")})
   413  		if fmt.Sprint(err) != status.Errorf(codes.Unavailable, `unknown cluster selected for RPC: "cds:notacluster"`).Error() {
   414  			t.Fatalf("picker.Pick, got %v, %v, want error %v", gotSCSt, err, `unknown cluster selected for RPC: "cds:notacluster"`)
   415  		}
   416  	}
   417  
   418  	// Resend the previous config with clusters
   419  	if err := bal.UpdateClientConnState(balancer.ClientConnState{
   420  		ResolverState: resolver.State{Addresses: []resolver.Address{
   421  			hierarchy.Set(wantAddrs[0], []string{"cds:cluster_1"}),
   422  			hierarchy.Set(wantAddrs[1], []string{"cds:cluster_2"}),
   423  		}},
   424  		BalancerConfig: config1,
   425  	}); err != nil {
   426  		t.Fatalf("failed to update ClientConn state: %v", err)
   427  	}
   428  
   429  	m2 := make(map[resolver.Address]balancer.SubConn)
   430  	// Verify that a subconn is created with the address, and the hierarchy path
   431  	// in the address is cleared.
   432  	for range wantAddrs {
   433  		addrs := <-cc.NewSubConnAddrsCh
   434  		if len(hierarchy.Get(addrs[0])) != 0 {
   435  			t.Fatalf("NewSubConn with address %+v, attrs %+v, want address with hierarchy cleared", addrs[0], addrs[0].BalancerAttributes)
   436  		}
   437  		sc := <-cc.NewSubConnCh
   438  		// Clear the attributes before adding to map.
   439  		addrs[0].BalancerAttributes = nil
   440  		m2[addrs[0]] = sc
   441  		sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
   442  		sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
   443  	}
   444  
   445  	p3 := <-cc.NewPickerCh
   446  	for _, tt := range []struct {
   447  		pickInfo balancer.PickInfo
   448  		wantSC   balancer.SubConn
   449  		wantErr  error
   450  	}{
   451  		{
   452  			pickInfo: balancer.PickInfo{
   453  				Ctx: SetPickedCluster(context.Background(), "cds:cluster_1"),
   454  			},
   455  			wantSC: m2[wantAddrs[0]],
   456  		},
   457  		{
   458  			pickInfo: balancer.PickInfo{
   459  				Ctx: SetPickedCluster(context.Background(), "cds:cluster_2"),
   460  			},
   461  			wantSC: m2[wantAddrs[1]],
   462  		},
   463  		{
   464  			pickInfo: balancer.PickInfo{
   465  				Ctx: SetPickedCluster(context.Background(), "cds:notacluster"),
   466  			},
   467  			wantErr: status.Errorf(codes.Unavailable, `unknown cluster selected for RPC: "cds:notacluster"`),
   468  		},
   469  	} {
   470  		testPick(t, p3, tt.pickInfo, tt.wantSC, tt.wantErr)
   471  	}
   472  }
   473  
   474  func TestClusterManagerForwardsBalancerBuildOptions(t *testing.T) {
   475  	const (
   476  		userAgent          = "ua"
   477  		defaultTestTimeout = 1 * time.Second
   478  	)
   479  
   480  	// Setup the stub balancer such that we can read the build options passed to
   481  	// it in the UpdateClientConnState method.
   482  	ccsCh := testutils.NewChannel()
   483  	bOpts := balancer.BuildOptions{
   484  		DialCreds:       insecure.NewCredentials(),
   485  		CustomUserAgent: userAgent,
   486  	}
   487  	stub.Register(t.Name(), stub.BalancerFuncs{
   488  		UpdateClientConnState: func(bd *stub.BalancerData, _ balancer.ClientConnState) error {
   489  			if !cmp.Equal(bd.BuildOptions, bOpts) {
   490  				err := fmt.Errorf("buildOptions in child balancer: %v, want %v", bd, bOpts)
   491  				ccsCh.Send(err)
   492  				return err
   493  			}
   494  			ccsCh.Send(nil)
   495  			return nil
   496  		},
   497  	})
   498  
   499  	cc := testutils.NewBalancerClientConn(t)
   500  	builder := balancer.Get(balancerName)
   501  	parser := builder.(balancer.ConfigParser)
   502  	bal := builder.Build(cc, bOpts)
   503  
   504  	configJSON1 := fmt.Sprintf(`{
   505  "children": {
   506  	"cds:cluster_1":{ "childPolicy": [{"%s":""}] }
   507  }
   508  }`, t.Name())
   509  	config1, err := parser.ParseConfig([]byte(configJSON1))
   510  	if err != nil {
   511  		t.Fatalf("failed to parse balancer config: %v", err)
   512  	}
   513  
   514  	if err := bal.UpdateClientConnState(balancer.ClientConnState{BalancerConfig: config1}); err != nil {
   515  		t.Fatalf("failed to update ClientConn state: %v", err)
   516  	}
   517  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   518  	defer cancel()
   519  	v, err := ccsCh.Receive(ctx)
   520  	if err != nil {
   521  		t.Fatalf("timed out waiting for UpdateClientConnState result: %v", err)
   522  	}
   523  	if v != nil {
   524  		t.Fatal(v)
   525  	}
   526  }
   527  
   528  const initIdleBalancerName = "test-init-Idle-balancer"
   529  
   530  var errTestInitIdle = fmt.Errorf("init Idle balancer error 0")
   531  
   532  func init() {
   533  	stub.Register(initIdleBalancerName, stub.BalancerFuncs{
   534  		UpdateClientConnState: func(bd *stub.BalancerData, opts balancer.ClientConnState) error {
   535  			sc, err := bd.ClientConn.NewSubConn(opts.ResolverState.Addresses, balancer.NewSubConnOptions{
   536  				StateListener: func(state balancer.SubConnState) {
   537  					err := fmt.Errorf("wrong picker error")
   538  					if state.ConnectivityState == connectivity.Idle {
   539  						err = errTestInitIdle
   540  					}
   541  					bd.ClientConn.UpdateState(balancer.State{
   542  						ConnectivityState: state.ConnectivityState,
   543  						Picker:            &testutils.TestConstPicker{Err: err},
   544  					})
   545  				},
   546  			})
   547  			if err != nil {
   548  				return err
   549  			}
   550  			sc.Connect()
   551  			return nil
   552  		},
   553  	})
   554  }
   555  
   556  // TestInitialIdle covers the case that if the child reports Idle, the overall
   557  // state will be Idle.
   558  func TestInitialIdle(t *testing.T) {
   559  	cc := testutils.NewBalancerClientConn(t)
   560  	builder := balancer.Get(balancerName)
   561  	parser := builder.(balancer.ConfigParser)
   562  	bal := builder.Build(cc, balancer.BuildOptions{})
   563  
   564  	configJSON1 := `{
   565  "children": {
   566  	"cds:cluster_1":{ "childPolicy": [{"test-init-Idle-balancer":""}] }
   567  }
   568  }`
   569  	config1, err := parser.ParseConfig([]byte(configJSON1))
   570  	if err != nil {
   571  		t.Fatalf("failed to parse balancer config: %v", err)
   572  	}
   573  
   574  	// Send the config, and an address with hierarchy path ["cluster_1"].
   575  	wantAddrs := []resolver.Address{
   576  		{Addr: testBackendAddrStrs[0], BalancerAttributes: nil},
   577  	}
   578  	if err := bal.UpdateClientConnState(balancer.ClientConnState{
   579  		ResolverState: resolver.State{Addresses: []resolver.Address{
   580  			hierarchy.Set(wantAddrs[0], []string{"cds:cluster_1"}),
   581  		}},
   582  		BalancerConfig: config1,
   583  	}); err != nil {
   584  		t.Fatalf("failed to update ClientConn state: %v", err)
   585  	}
   586  
   587  	// Verify that a subconn is created with the address, and the hierarchy path
   588  	// in the address is cleared.
   589  	for range wantAddrs {
   590  		sc := <-cc.NewSubConnCh
   591  		sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Idle})
   592  	}
   593  
   594  	if state1 := <-cc.NewStateCh; state1 != connectivity.Idle {
   595  		t.Fatalf("Received aggregated state: %v, want Idle", state1)
   596  	}
   597  }
   598  
   599  // TestClusterGracefulSwitch tests the graceful switch functionality for a child
   600  // of the cluster manager. At first, the child is configured as a round robin
   601  // load balancer, and thus should behave accordingly. The test then gracefully
   602  // switches this child to a pick first load balancer. Once that balancer updates
   603  // it's state and completes the graceful switch process the new picker should
   604  // reflect this change.
   605  func TestClusterGracefulSwitch(t *testing.T) {
   606  	cc := testutils.NewBalancerClientConn(t)
   607  	builder := balancer.Get(balancerName)
   608  	parser := builder.(balancer.ConfigParser)
   609  	bal := builder.Build(cc, balancer.BuildOptions{})
   610  
   611  	configJSON1 := `{
   612  "children": {
   613  	"csp:cluster":{ "childPolicy": [{"round_robin":""}] }
   614  }
   615  }`
   616  	config1, err := parser.ParseConfig([]byte(configJSON1))
   617  	if err != nil {
   618  		t.Fatalf("failed to parse balancer config: %v", err)
   619  	}
   620  	wantAddrs := []resolver.Address{
   621  		{Addr: testBackendAddrStrs[0], BalancerAttributes: nil},
   622  		{Addr: testBackendAddrStrs[1], BalancerAttributes: nil},
   623  	}
   624  	if err := bal.UpdateClientConnState(balancer.ClientConnState{
   625  		ResolverState: resolver.State{Addresses: []resolver.Address{
   626  			hierarchy.Set(wantAddrs[0], []string{"csp:cluster"}),
   627  		}},
   628  		BalancerConfig: config1,
   629  	}); err != nil {
   630  		t.Fatalf("failed to update ClientConn state: %v", err)
   631  	}
   632  
   633  	sc1 := <-cc.NewSubConnCh
   634  	sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
   635  	sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
   636  	p1 := <-cc.NewPickerCh
   637  	pi := balancer.PickInfo{
   638  		Ctx: SetPickedCluster(context.Background(), "csp:cluster"),
   639  	}
   640  	testPick(t, p1, pi, sc1, nil)
   641  
   642  	childPolicyName := t.Name()
   643  	stub.Register(childPolicyName, stub.BalancerFuncs{
   644  		Init: func(bd *stub.BalancerData) {
   645  			bd.Data = balancer.Get(grpc.PickFirstBalancerName).Build(bd.ClientConn, bd.BuildOptions)
   646  		},
   647  		UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
   648  			bal := bd.Data.(balancer.Balancer)
   649  			return bal.UpdateClientConnState(ccs)
   650  		},
   651  	})
   652  	// Same cluster, different balancer type.
   653  	configJSON2 := fmt.Sprintf(`{
   654  "children": {
   655  	"csp:cluster":{ "childPolicy": [{"%s":""}] }
   656  }
   657  }`, childPolicyName)
   658  	config2, err := parser.ParseConfig([]byte(configJSON2))
   659  	if err != nil {
   660  		t.Fatalf("failed to parse balancer config: %v", err)
   661  	}
   662  	if err := bal.UpdateClientConnState(balancer.ClientConnState{
   663  		ResolverState: resolver.State{Addresses: []resolver.Address{
   664  			hierarchy.Set(wantAddrs[1], []string{"csp:cluster"}),
   665  		}},
   666  		BalancerConfig: config2,
   667  	}); err != nil {
   668  		t.Fatalf("failed to update ClientConn state: %v", err)
   669  	}
   670  	sc2 := <-cc.NewSubConnCh
   671  	// Update the pick first balancers SubConn as CONNECTING. This will cause
   672  	// the pick first balancer to UpdateState() with CONNECTING, which shouldn't send
   673  	// a Picker update back, as the Graceful Switch process is not complete.
   674  	sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
   675  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
   676  	defer cancel()
   677  	select {
   678  	case <-cc.NewPickerCh:
   679  		t.Fatalf("No new picker should have been sent due to the Graceful Switch process not completing")
   680  	case <-ctx.Done():
   681  	}
   682  
   683  	// Update the pick first balancers SubConn as READY. This will cause
   684  	// the pick first balancer to UpdateState() with READY, which should send a
   685  	// Picker update back, as the Graceful Switch process is complete. This
   686  	// Picker should always pick the pick first's created SubConn.
   687  	sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
   688  	p2 := <-cc.NewPickerCh
   689  	testPick(t, p2, pi, sc2, nil)
   690  	// The Graceful Switch process completing for the child should cause the
   691  	// SubConns for the balancer being gracefully switched from to get deleted.
   692  	ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout)
   693  	defer cancel()
   694  	select {
   695  	case <-ctx.Done():
   696  		t.Fatalf("error waiting for sc.Shutdown()")
   697  	case rsc := <-cc.ShutdownSubConnCh:
   698  		// The SubConn removed should have been the created SubConn
   699  		// from the child before switching.
   700  		if rsc != sc1 {
   701  			t.Fatalf("Shutdown() got: %v, want %v", rsc, sc1)
   702  		}
   703  	}
   704  }
   705  
   706  // tcc wraps a testutils.TestClientConn but stores all state transitions in a
   707  // slice.
   708  type tcc struct {
   709  	*testutils.BalancerClientConn
   710  	states []balancer.State
   711  }
   712  
   713  func (t *tcc) UpdateState(bs balancer.State) {
   714  	t.states = append(t.states, bs)
   715  	t.BalancerClientConn.UpdateState(bs)
   716  }
   717  
   718  func (s) TestUpdateStatePauses(t *testing.T) {
   719  	cc := &tcc{BalancerClientConn: testutils.NewBalancerClientConn(t)}
   720  
   721  	balFuncs := stub.BalancerFuncs{
   722  		UpdateClientConnState: func(bd *stub.BalancerData, s balancer.ClientConnState) error {
   723  			bd.ClientConn.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, Picker: nil})
   724  			bd.ClientConn.UpdateState(balancer.State{ConnectivityState: connectivity.Ready, Picker: nil})
   725  			return nil
   726  		},
   727  	}
   728  	stub.Register("update_state_balancer", balFuncs)
   729  
   730  	builder := balancer.Get(balancerName)
   731  	parser := builder.(balancer.ConfigParser)
   732  	bal := builder.Build(cc, balancer.BuildOptions{})
   733  
   734  	configJSON1 := `{
   735  "children": {
   736  	"cds:cluster_1":{ "childPolicy": [{"update_state_balancer":""}] }
   737  }
   738  }`
   739  	config1, err := parser.ParseConfig([]byte(configJSON1))
   740  	if err != nil {
   741  		t.Fatalf("failed to parse balancer config: %v", err)
   742  	}
   743  
   744  	// Send the config, and an address with hierarchy path ["cluster_1"].
   745  	wantAddrs := []resolver.Address{
   746  		{Addr: testBackendAddrStrs[0], BalancerAttributes: nil},
   747  	}
   748  	if err := bal.UpdateClientConnState(balancer.ClientConnState{
   749  		ResolverState: resolver.State{Addresses: []resolver.Address{
   750  			hierarchy.Set(wantAddrs[0], []string{"cds:cluster_1"}),
   751  		}},
   752  		BalancerConfig: config1,
   753  	}); err != nil {
   754  		t.Fatalf("failed to update ClientConn state: %v", err)
   755  	}
   756  
   757  	// Verify that the only state update is the second one called by the child.
   758  	if len(cc.states) != 1 || cc.states[0].ConnectivityState != connectivity.Ready {
   759  		t.Fatalf("cc.states = %v; want [connectivity.Ready]", cc.states)
   760  	}
   761  }
   762  

View as plain text