...

Source file src/google.golang.org/grpc/internal/balancergroup/balancergroup_test.go

Documentation: google.golang.org/grpc/internal/balancergroup

     1  /*
     2   * Copyright 2019 gRPC authors.
     3   *
     4   * Licensed under the Apache License, Version 2.0 (the "License");
     5   * you may not use this file except in compliance with the License.
     6   * You may obtain a copy of the License at
     7   *
     8   *     http://www.apache.org/licenses/LICENSE-2.0
     9   *
    10   * Unless required by applicable law or agreed to in writing, software
    11   * distributed under the License is distributed on an "AS IS" BASIS,
    12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13   * See the License for the specific language governing permissions and
    14   * limitations under the License.
    15   */
    16  
    17  package balancergroup
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"testing"
    23  	"time"
    24  
    25  	"google.golang.org/grpc"
    26  	"google.golang.org/grpc/balancer"
    27  	"google.golang.org/grpc/balancer/roundrobin"
    28  	"google.golang.org/grpc/balancer/weightedtarget/weightedaggregator"
    29  	"google.golang.org/grpc/connectivity"
    30  	"google.golang.org/grpc/credentials/insecure"
    31  	"google.golang.org/grpc/internal/balancer/stub"
    32  	"google.golang.org/grpc/internal/channelz"
    33  	"google.golang.org/grpc/internal/grpctest"
    34  	"google.golang.org/grpc/internal/testutils"
    35  	"google.golang.org/grpc/resolver"
    36  )
    37  
    38  const (
    39  	defaultTestTimeout      = 5 * time.Second
    40  	defaultTestShortTimeout = 10 * time.Millisecond
    41  )
    42  
    43  var (
    44  	rrBuilder        = balancer.Get(roundrobin.Name)
    45  	testBalancerIDs  = []string{"b1", "b2", "b3"}
    46  	testBackendAddrs []resolver.Address
    47  )
    48  
    49  const testBackendAddrsCount = 12
    50  
    51  func init() {
    52  	for i := 0; i < testBackendAddrsCount; i++ {
    53  		testBackendAddrs = append(testBackendAddrs, resolver.Address{Addr: fmt.Sprintf("%d.%d.%d.%d:%d", i, i, i, i, i)})
    54  	}
    55  }
    56  
    57  type s struct {
    58  	grpctest.Tester
    59  }
    60  
    61  func Test(t *testing.T) {
    62  	grpctest.RunSubTests(t, s{})
    63  }
    64  
    65  // Create a new balancer group, add balancer and backends, but not start.
    66  // - b1, weight 2, backends [0,1]
    67  // - b2, weight 1, backends [2,3]
    68  // Start the balancer group and check behavior.
    69  //
    70  // Close the balancer group, call add/remove/change weight/change address.
    71  // - b2, weight 3, backends [0,3]
    72  // - b3, weight 1, backends [1,2]
    73  // Start the balancer group again and check for behavior.
    74  func (s) TestBalancerGroup_start_close(t *testing.T) {
    75  	cc := testutils.NewBalancerClientConn(t)
    76  	gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR)
    77  	gator.Start()
    78  	bg := New(Options{
    79  		CC:                      cc,
    80  		BuildOpts:               balancer.BuildOptions{},
    81  		StateAggregator:         gator,
    82  		Logger:                  nil,
    83  		SubBalancerCloseTimeout: time.Duration(0),
    84  	})
    85  
    86  	// Add two balancers to group and send two resolved addresses to both
    87  	// balancers.
    88  	gator.Add(testBalancerIDs[0], 2)
    89  	bg.Add(testBalancerIDs[0], rrBuilder)
    90  	bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:2]}})
    91  	gator.Add(testBalancerIDs[1], 1)
    92  	bg.Add(testBalancerIDs[1], rrBuilder)
    93  	bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[2:4]}})
    94  
    95  	bg.Start()
    96  
    97  	m1 := make(map[resolver.Address]balancer.SubConn)
    98  	for i := 0; i < 4; i++ {
    99  		addrs := <-cc.NewSubConnAddrsCh
   100  		sc := <-cc.NewSubConnCh
   101  		m1[addrs[0]] = sc
   102  		sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
   103  		sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
   104  	}
   105  
   106  	// Test roundrobin on the last picker.
   107  	p1 := <-cc.NewPickerCh
   108  	want := []balancer.SubConn{
   109  		m1[testBackendAddrs[0]], m1[testBackendAddrs[0]],
   110  		m1[testBackendAddrs[1]], m1[testBackendAddrs[1]],
   111  		m1[testBackendAddrs[2]], m1[testBackendAddrs[3]],
   112  	}
   113  	if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p1)); err != nil {
   114  		t.Fatalf("want %v, got %v", want, err)
   115  	}
   116  
   117  	gator.Stop()
   118  	bg.Close()
   119  	for i := 0; i < 4; i++ {
   120  		(<-cc.ShutdownSubConnCh).UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Shutdown})
   121  	}
   122  
   123  	// Add b3, weight 1, backends [1,2].
   124  	gator.Add(testBalancerIDs[2], 1)
   125  	bg.Add(testBalancerIDs[2], rrBuilder)
   126  	bg.UpdateClientConnState(testBalancerIDs[2], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[1:3]}})
   127  
   128  	// Remove b1.
   129  	gator.Remove(testBalancerIDs[0])
   130  	bg.Remove(testBalancerIDs[0])
   131  
   132  	// Update b2 to weight 3, backends [0,3].
   133  	gator.UpdateWeight(testBalancerIDs[1], 3)
   134  	bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: append([]resolver.Address(nil), testBackendAddrs[0], testBackendAddrs[3])}})
   135  
   136  	gator.Start()
   137  	bg.Start()
   138  
   139  	m2 := make(map[resolver.Address]balancer.SubConn)
   140  	for i := 0; i < 4; i++ {
   141  		addrs := <-cc.NewSubConnAddrsCh
   142  		sc := <-cc.NewSubConnCh
   143  		m2[addrs[0]] = sc
   144  		sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
   145  		sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
   146  	}
   147  
   148  	// Test roundrobin on the last picker.
   149  	p2 := <-cc.NewPickerCh
   150  	want = []balancer.SubConn{
   151  		m2[testBackendAddrs[0]], m2[testBackendAddrs[0]], m2[testBackendAddrs[0]],
   152  		m2[testBackendAddrs[3]], m2[testBackendAddrs[3]], m2[testBackendAddrs[3]],
   153  		m2[testBackendAddrs[1]], m2[testBackendAddrs[2]],
   154  	}
   155  	if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p2)); err != nil {
   156  		t.Fatalf("want %v, got %v", want, err)
   157  	}
   158  }
   159  
   160  // Test that balancer group start() doesn't deadlock if the balancer calls back
   161  // into balancer group inline when it gets an update.
   162  //
   163  // The potential deadlock can happen if we
   164  //   - hold a lock and send updates to balancer (e.g. update resolved addresses)
   165  //   - the balancer calls back (NewSubConn or update picker) in line
   166  //
   167  // The callback will try to hold hte same lock again, which will cause a
   168  // deadlock.
   169  //
   170  // This test starts the balancer group with a test balancer, will updates picker
   171  // whenever it gets an address update. It's expected that start() doesn't block
   172  // because of deadlock.
   173  func (s) TestBalancerGroup_start_close_deadlock(t *testing.T) {
   174  	const balancerName = "stub-TestBalancerGroup_start_close_deadlock"
   175  	stub.Register(balancerName, stub.BalancerFuncs{})
   176  	builder := balancer.Get(balancerName)
   177  
   178  	cc := testutils.NewBalancerClientConn(t)
   179  	gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR)
   180  	gator.Start()
   181  	bg := New(Options{
   182  		CC:                      cc,
   183  		BuildOpts:               balancer.BuildOptions{},
   184  		StateAggregator:         gator,
   185  		Logger:                  nil,
   186  		SubBalancerCloseTimeout: time.Duration(0),
   187  	})
   188  
   189  	gator.Add(testBalancerIDs[0], 2)
   190  	bg.Add(testBalancerIDs[0], builder)
   191  	bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:2]}})
   192  	gator.Add(testBalancerIDs[1], 1)
   193  	bg.Add(testBalancerIDs[1], builder)
   194  	bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[2:4]}})
   195  
   196  	bg.Start()
   197  }
   198  
   199  // initBalancerGroupForCachingTest creates a balancer group, and initialize it
   200  // to be ready for caching tests.
   201  //
   202  // Two rr balancers are added to bg, each with 2 ready subConns. A sub-balancer
   203  // is removed later, so the balancer group returned has one sub-balancer in its
   204  // own map, and one sub-balancer in cache.
   205  func initBalancerGroupForCachingTest(t *testing.T, idleCacheTimeout time.Duration) (*weightedaggregator.Aggregator, *BalancerGroup, *testutils.BalancerClientConn, map[resolver.Address]*testutils.TestSubConn) {
   206  	cc := testutils.NewBalancerClientConn(t)
   207  	gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR)
   208  	gator.Start()
   209  	bg := New(Options{
   210  		CC:                      cc,
   211  		BuildOpts:               balancer.BuildOptions{},
   212  		StateAggregator:         gator,
   213  		Logger:                  nil,
   214  		SubBalancerCloseTimeout: idleCacheTimeout,
   215  	})
   216  
   217  	// Add two balancers to group and send two resolved addresses to both
   218  	// balancers.
   219  	gator.Add(testBalancerIDs[0], 2)
   220  	bg.Add(testBalancerIDs[0], rrBuilder)
   221  	bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:2]}})
   222  	gator.Add(testBalancerIDs[1], 1)
   223  	bg.Add(testBalancerIDs[1], rrBuilder)
   224  	bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[2:4]}})
   225  
   226  	bg.Start()
   227  
   228  	m1 := make(map[resolver.Address]*testutils.TestSubConn)
   229  	for i := 0; i < 4; i++ {
   230  		addrs := <-cc.NewSubConnAddrsCh
   231  		sc := <-cc.NewSubConnCh
   232  		m1[addrs[0]] = sc
   233  		sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
   234  		sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
   235  	}
   236  
   237  	// Test roundrobin on the last picker.
   238  	p1 := <-cc.NewPickerCh
   239  	want := []balancer.SubConn{
   240  		m1[testBackendAddrs[0]], m1[testBackendAddrs[0]],
   241  		m1[testBackendAddrs[1]], m1[testBackendAddrs[1]],
   242  		m1[testBackendAddrs[2]], m1[testBackendAddrs[3]],
   243  	}
   244  	if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p1)); err != nil {
   245  		t.Fatalf("want %v, got %v", want, err)
   246  	}
   247  
   248  	gator.Remove(testBalancerIDs[1])
   249  	bg.Remove(testBalancerIDs[1])
   250  	// Don't wait for SubConns to be removed after close, because they are only
   251  	// removed after close timeout.
   252  	for i := 0; i < 10; i++ {
   253  		select {
   254  		case sc := <-cc.ShutdownSubConnCh:
   255  			t.Fatalf("Got request to shut down subconn %v, want no shut down subconn (because subconns were still in cache)", sc)
   256  		default:
   257  		}
   258  		time.Sleep(time.Millisecond)
   259  	}
   260  	// Test roundrobin on the with only sub-balancer0.
   261  	p2 := <-cc.NewPickerCh
   262  	want = []balancer.SubConn{
   263  		m1[testBackendAddrs[0]], m1[testBackendAddrs[1]],
   264  	}
   265  	if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p2)); err != nil {
   266  		t.Fatalf("want %v, got %v", want, err)
   267  	}
   268  
   269  	return gator, bg, cc, m1
   270  }
   271  
   272  // Test that if a sub-balancer is removed, and re-added within close timeout,
   273  // the subConns won't be re-created.
   274  func (s) TestBalancerGroup_locality_caching(t *testing.T) {
   275  	gator, bg, cc, addrToSC := initBalancerGroupForCachingTest(t, defaultTestTimeout)
   276  
   277  	// Turn down subconn for addr2, shouldn't get picker update because
   278  	// sub-balancer1 was removed.
   279  	addrToSC[testBackendAddrs[2]].UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
   280  	for i := 0; i < 10; i++ {
   281  		select {
   282  		case <-cc.NewPickerCh:
   283  			t.Fatalf("Got new picker, want no new picker (because the sub-balancer was removed)")
   284  		default:
   285  		}
   286  		time.Sleep(defaultTestShortTimeout)
   287  	}
   288  
   289  	// Re-add sub-balancer-1, because subconns were in cache, no new subconns
   290  	// should be created. But a new picker will still be generated, with subconn
   291  	// states update to date.
   292  	gator.Add(testBalancerIDs[1], 1)
   293  	bg.Add(testBalancerIDs[1], rrBuilder)
   294  
   295  	p3 := <-cc.NewPickerCh
   296  	want := []balancer.SubConn{
   297  		addrToSC[testBackendAddrs[0]], addrToSC[testBackendAddrs[0]],
   298  		addrToSC[testBackendAddrs[1]], addrToSC[testBackendAddrs[1]],
   299  		// addr2 is down, b2 only has addr3 in READY state.
   300  		addrToSC[testBackendAddrs[3]], addrToSC[testBackendAddrs[3]],
   301  	}
   302  	if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p3)); err != nil {
   303  		t.Fatalf("want %v, got %v", want, err)
   304  	}
   305  
   306  	for i := 0; i < 10; i++ {
   307  		select {
   308  		case <-cc.NewSubConnAddrsCh:
   309  			t.Fatalf("Got new subconn, want no new subconn (because subconns were still in cache)")
   310  		default:
   311  		}
   312  		time.Sleep(defaultTestShortTimeout)
   313  	}
   314  }
   315  
   316  // Sub-balancers are put in cache when they are shut down. If balancer group is
   317  // closed within close timeout, all subconns should still be removed
   318  // immediately.
   319  func (s) TestBalancerGroup_locality_caching_close_group(t *testing.T) {
   320  	_, bg, cc, addrToSC := initBalancerGroupForCachingTest(t, defaultTestTimeout)
   321  
   322  	bg.Close()
   323  	// The balancer group is closed. The subconns should be shutdown immediately.
   324  	shutdownTimeout := time.After(time.Millisecond * 500)
   325  	scToShutdown := map[balancer.SubConn]int{
   326  		addrToSC[testBackendAddrs[0]]: 1,
   327  		addrToSC[testBackendAddrs[1]]: 1,
   328  		addrToSC[testBackendAddrs[2]]: 1,
   329  		addrToSC[testBackendAddrs[3]]: 1,
   330  	}
   331  	for i := 0; i < len(scToShutdown); i++ {
   332  		select {
   333  		case sc := <-cc.ShutdownSubConnCh:
   334  			c := scToShutdown[sc]
   335  			if c == 0 {
   336  				t.Fatalf("Got Shutdown for %v when there's %d shutdown expected", sc, c)
   337  			}
   338  			scToShutdown[sc] = c - 1
   339  		case <-shutdownTimeout:
   340  			t.Fatalf("timeout waiting for subConns (from balancer in cache) to be shut down")
   341  		}
   342  	}
   343  }
   344  
   345  // Sub-balancers in cache will be closed if not re-added within timeout, and
   346  // subConns will be shut down.
   347  func (s) TestBalancerGroup_locality_caching_not_readd_within_timeout(t *testing.T) {
   348  	_, _, cc, addrToSC := initBalancerGroupForCachingTest(t, time.Second)
   349  
   350  	// The sub-balancer is not re-added within timeout. The subconns should be
   351  	// shut down.
   352  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   353  	defer cancel()
   354  	scToShutdown := map[balancer.SubConn]int{
   355  		addrToSC[testBackendAddrs[2]]: 1,
   356  		addrToSC[testBackendAddrs[3]]: 1,
   357  	}
   358  	for i := 0; i < len(scToShutdown); i++ {
   359  		select {
   360  		case sc := <-cc.ShutdownSubConnCh:
   361  			c := scToShutdown[sc]
   362  			if c == 0 {
   363  				t.Fatalf("Got Shutdown for %v when there's %d shutdown expected", sc, c)
   364  			}
   365  			scToShutdown[sc] = c - 1
   366  		case <-ctx.Done():
   367  			t.Fatalf("timeout waiting for subConns (from balancer in cache) to be shut down")
   368  		}
   369  	}
   370  }
   371  
   372  // Wrap the rr builder, so it behaves the same, but has a different name.
   373  type noopBalancerBuilderWrapper struct {
   374  	balancer.Builder
   375  }
   376  
   377  func init() {
   378  	balancer.Register(&noopBalancerBuilderWrapper{Builder: rrBuilder})
   379  }
   380  
   381  func (*noopBalancerBuilderWrapper) Name() string {
   382  	return "noopBalancerBuilderWrapper"
   383  }
   384  
   385  // After removing a sub-balancer, re-add with same ID, but different balancer
   386  // builder. Old subconns should be shut down, and new subconns should be created.
   387  func (s) TestBalancerGroup_locality_caching_readd_with_different_builder(t *testing.T) {
   388  	gator, bg, cc, addrToSC := initBalancerGroupForCachingTest(t, defaultTestTimeout)
   389  
   390  	// Re-add sub-balancer-1, but with a different balancer builder. The
   391  	// sub-balancer was still in cache, but can't be reused. This should cause
   392  	// old sub-balancer's subconns to be shut down immediately, and new
   393  	// subconns to be created.
   394  	gator.Add(testBalancerIDs[1], 1)
   395  	bg.Add(testBalancerIDs[1], &noopBalancerBuilderWrapper{rrBuilder})
   396  
   397  	// The cached sub-balancer should be closed, and the subconns should be
   398  	// shut down immediately.
   399  	shutdownTimeout := time.After(time.Millisecond * 500)
   400  	scToShutdown := map[balancer.SubConn]int{
   401  		addrToSC[testBackendAddrs[2]]: 1,
   402  		addrToSC[testBackendAddrs[3]]: 1,
   403  	}
   404  	for i := 0; i < len(scToShutdown); i++ {
   405  		select {
   406  		case sc := <-cc.ShutdownSubConnCh:
   407  			c := scToShutdown[sc]
   408  			if c == 0 {
   409  				t.Fatalf("Got Shutdown for %v when there's %d shutdown expected", sc, c)
   410  			}
   411  			scToShutdown[sc] = c - 1
   412  		case <-shutdownTimeout:
   413  			t.Fatalf("timeout waiting for subConns (from balancer in cache) to be shut down")
   414  		}
   415  	}
   416  
   417  	bg.UpdateClientConnState(testBalancerIDs[1], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[4:6]}})
   418  
   419  	newSCTimeout := time.After(time.Millisecond * 500)
   420  	scToAdd := map[resolver.Address]int{
   421  		testBackendAddrs[4]: 1,
   422  		testBackendAddrs[5]: 1,
   423  	}
   424  	for i := 0; i < len(scToAdd); i++ {
   425  		select {
   426  		case addr := <-cc.NewSubConnAddrsCh:
   427  			c := scToAdd[addr[0]]
   428  			if c == 0 {
   429  				t.Fatalf("Got newSubConn for %v when there's %d new expected", addr, c)
   430  			}
   431  			scToAdd[addr[0]] = c - 1
   432  			sc := <-cc.NewSubConnCh
   433  			addrToSC[addr[0]] = sc
   434  			sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
   435  			sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
   436  		case <-newSCTimeout:
   437  			t.Fatalf("timeout waiting for subConns (from new sub-balancer) to be newed")
   438  		}
   439  	}
   440  
   441  	// Test roundrobin on the new picker.
   442  	p3 := <-cc.NewPickerCh
   443  	want := []balancer.SubConn{
   444  		addrToSC[testBackendAddrs[0]], addrToSC[testBackendAddrs[0]],
   445  		addrToSC[testBackendAddrs[1]], addrToSC[testBackendAddrs[1]],
   446  		addrToSC[testBackendAddrs[4]], addrToSC[testBackendAddrs[5]],
   447  	}
   448  	if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p3)); err != nil {
   449  		t.Fatalf("want %v, got %v", want, err)
   450  	}
   451  }
   452  
   453  // After removing a sub-balancer, it will be kept in cache. Make sure that this
   454  // sub-balancer's Close is called when the balancer group is closed.
   455  func (s) TestBalancerGroup_CloseStopsBalancerInCache(t *testing.T) {
   456  	const balancerName = "stub-TestBalancerGroup_check_close"
   457  	closed := make(chan struct{})
   458  	stub.Register(balancerName, stub.BalancerFuncs{Close: func(_ *stub.BalancerData) {
   459  		close(closed)
   460  	}})
   461  	builder := balancer.Get(balancerName)
   462  
   463  	gator, bg, _, _ := initBalancerGroupForCachingTest(t, time.Second)
   464  
   465  	// Add balancer, and remove
   466  	gator.Add(testBalancerIDs[2], 1)
   467  	bg.Add(testBalancerIDs[2], builder)
   468  	gator.Remove(testBalancerIDs[2])
   469  	bg.Remove(testBalancerIDs[2])
   470  
   471  	// Immediately close balancergroup, before the cache timeout.
   472  	bg.Close()
   473  
   474  	// Make sure the removed child balancer is closed eventually.
   475  	select {
   476  	case <-closed:
   477  	case <-time.After(time.Second * 2):
   478  		t.Fatalf("timeout waiting for the child balancer in cache to be closed")
   479  	}
   480  }
   481  
   482  // TestBalancerGroupBuildOptions verifies that the balancer.BuildOptions passed
   483  // to the balancergroup at creation time is passed to child policies.
   484  func (s) TestBalancerGroupBuildOptions(t *testing.T) {
   485  	const (
   486  		balancerName = "stubBalancer-TestBalancerGroupBuildOptions"
   487  		userAgent    = "ua"
   488  	)
   489  
   490  	// Setup the stub balancer such that we can read the build options passed to
   491  	// it in the UpdateClientConnState method.
   492  	bOpts := balancer.BuildOptions{
   493  		DialCreds:       insecure.NewCredentials(),
   494  		ChannelzParent:  channelz.RegisterChannel(nil, "test channel"),
   495  		CustomUserAgent: userAgent,
   496  	}
   497  	stub.Register(balancerName, stub.BalancerFuncs{
   498  		UpdateClientConnState: func(bd *stub.BalancerData, _ balancer.ClientConnState) error {
   499  			if bd.BuildOptions.DialCreds != bOpts.DialCreds || bd.BuildOptions.ChannelzParent != bOpts.ChannelzParent || bd.BuildOptions.CustomUserAgent != bOpts.CustomUserAgent {
   500  				return fmt.Errorf("buildOptions in child balancer: %v, want %v", bd, bOpts)
   501  			}
   502  			return nil
   503  		},
   504  	})
   505  	cc := testutils.NewBalancerClientConn(t)
   506  	bg := New(Options{
   507  		CC:              cc,
   508  		BuildOpts:       bOpts,
   509  		StateAggregator: nil,
   510  		Logger:          nil,
   511  	})
   512  	bg.Start()
   513  
   514  	// Add the stub balancer build above as a child policy.
   515  	balancerBuilder := balancer.Get(balancerName)
   516  	bg.Add(testBalancerIDs[0], balancerBuilder)
   517  
   518  	// Send an empty clientConn state change. This should trigger the
   519  	// verification of the buildOptions being passed to the child policy.
   520  	if err := bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{}); err != nil {
   521  		t.Fatal(err)
   522  	}
   523  }
   524  
   525  func (s) TestBalancerExitIdleOne(t *testing.T) {
   526  	const balancerName = "stub-balancer-test-balancergroup-exit-idle-one"
   527  	exitIdleCh := make(chan struct{}, 1)
   528  	stub.Register(balancerName, stub.BalancerFuncs{
   529  		ExitIdle: func(*stub.BalancerData) {
   530  			exitIdleCh <- struct{}{}
   531  		},
   532  	})
   533  	cc := testutils.NewBalancerClientConn(t)
   534  	bg := New(Options{
   535  		CC:              cc,
   536  		BuildOpts:       balancer.BuildOptions{},
   537  		StateAggregator: nil,
   538  		Logger:          nil,
   539  	})
   540  	bg.Start()
   541  	defer bg.Close()
   542  
   543  	// Add the stub balancer build above as a child policy.
   544  	builder := balancer.Get(balancerName)
   545  	bg.Add(testBalancerIDs[0], builder)
   546  
   547  	// Call ExitIdle on the child policy.
   548  	bg.ExitIdleOne(testBalancerIDs[0])
   549  	select {
   550  	case <-time.After(time.Second):
   551  		t.Fatal("Timeout when waiting for ExitIdle to be invoked on child policy")
   552  	case <-exitIdleCh:
   553  	}
   554  }
   555  
   556  // TestBalancerGracefulSwitch tests the graceful switch functionality for a
   557  // child of the balancer group. At first, the child is configured as a round
   558  // robin load balancer, and thus should behave accordingly. The test then
   559  // gracefully switches this child to a custom type which only creates a SubConn
   560  // for the second passed in address and also only picks that created SubConn.
   561  // The new aggregated picker should reflect this change for the child.
   562  func (s) TestBalancerGracefulSwitch(t *testing.T) {
   563  	cc := testutils.NewBalancerClientConn(t)
   564  	gator := weightedaggregator.New(cc, nil, testutils.NewTestWRR)
   565  	gator.Start()
   566  	bg := New(Options{
   567  		CC:              cc,
   568  		BuildOpts:       balancer.BuildOptions{},
   569  		StateAggregator: gator,
   570  		Logger:          nil,
   571  	})
   572  	gator.Add(testBalancerIDs[0], 1)
   573  	bg.Add(testBalancerIDs[0], rrBuilder)
   574  	bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[0:2]}})
   575  
   576  	bg.Start()
   577  
   578  	m1 := make(map[resolver.Address]balancer.SubConn)
   579  	scs := make(map[balancer.SubConn]bool)
   580  	for i := 0; i < 2; i++ {
   581  		addrs := <-cc.NewSubConnAddrsCh
   582  		sc := <-cc.NewSubConnCh
   583  		m1[addrs[0]] = sc
   584  		scs[sc] = true
   585  		sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
   586  		sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
   587  	}
   588  
   589  	p1 := <-cc.NewPickerCh
   590  	want := []balancer.SubConn{
   591  		m1[testBackendAddrs[0]], m1[testBackendAddrs[1]],
   592  	}
   593  	if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p1)); err != nil {
   594  		t.Fatal(err)
   595  	}
   596  
   597  	// The balancer type for testBalancersIDs[0] is currently Round Robin. Now,
   598  	// change it to a balancer that has separate behavior logically (creating
   599  	// SubConn for second address in address list and always picking that
   600  	// SubConn), and see if the downstream behavior reflects that change.
   601  	childPolicyName := t.Name()
   602  	stub.Register(childPolicyName, stub.BalancerFuncs{
   603  		Init: func(bd *stub.BalancerData) {
   604  			bd.Data = balancer.Get(grpc.PickFirstBalancerName).Build(bd.ClientConn, bd.BuildOptions)
   605  		},
   606  		UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
   607  			ccs.ResolverState.Addresses = ccs.ResolverState.Addresses[1:]
   608  			bal := bd.Data.(balancer.Balancer)
   609  			return bal.UpdateClientConnState(ccs)
   610  		},
   611  	})
   612  	builder := balancer.Get(childPolicyName)
   613  	bg.UpdateBuilder(testBalancerIDs[0], builder)
   614  	if err := bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[2:4]}}); err != nil {
   615  		t.Fatalf("error updating ClientConn state: %v", err)
   616  	}
   617  
   618  	addrs := <-cc.NewSubConnAddrsCh
   619  	if addrs[0].Addr != testBackendAddrs[3].Addr {
   620  		// Verifies forwarded to new created balancer, as the wrapped pick first
   621  		// balancer will delete first address.
   622  		t.Fatalf("newSubConn called with wrong address, want: %v, got : %v", testBackendAddrs[3].Addr, addrs[0].Addr)
   623  	}
   624  	sc := <-cc.NewSubConnCh
   625  
   626  	// Update the pick first balancers SubConn as CONNECTING. This will cause
   627  	// the pick first balancer to UpdateState() with CONNECTING, which shouldn't send
   628  	// a Picker update back, as the Graceful Switch process is not complete.
   629  	sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
   630  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
   631  	defer cancel()
   632  	select {
   633  	case <-cc.NewPickerCh:
   634  		t.Fatalf("No new picker should have been sent due to the Graceful Switch process not completing")
   635  	case <-ctx.Done():
   636  	}
   637  
   638  	// Update the pick first balancers SubConn as READY. This will cause
   639  	// the pick first balancer to UpdateState() with READY, which should send a
   640  	// Picker update back, as the Graceful Switch process is complete. This
   641  	// Picker should always pick the pick first's created SubConn which
   642  	// corresponds to address 3.
   643  	sc.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
   644  	p2 := <-cc.NewPickerCh
   645  	pr, err := p2.Pick(balancer.PickInfo{})
   646  	if err != nil {
   647  		t.Fatalf("error picking: %v", err)
   648  	}
   649  	if pr.SubConn != sc {
   650  		t.Fatalf("picker.Pick(), want %v, got %v", sc, pr.SubConn)
   651  	}
   652  
   653  	// The Graceful Switch process completing for the child should cause the
   654  	// SubConns for the balancer being gracefully switched from to get deleted.
   655  	ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout)
   656  	defer cancel()
   657  	for i := 0; i < 2; i++ {
   658  		select {
   659  		case <-ctx.Done():
   660  			t.Fatalf("error waiting for Shutdown()")
   661  		case sc := <-cc.ShutdownSubConnCh:
   662  			// The SubConn shut down should have been one of the two created
   663  			// SubConns, and both should be deleted.
   664  			if ok := scs[sc]; ok {
   665  				delete(scs, sc)
   666  				continue
   667  			} else {
   668  				t.Fatalf("Shutdown called for wrong SubConn %v, want in %v", sc, scs)
   669  			}
   670  		}
   671  	}
   672  }
   673  

View as plain text