...

Source file src/google.golang.org/grpc/internal/balancergroup/balancergroup.go

Documentation: google.golang.org/grpc/internal/balancergroup

     1  /*
     2   * Copyright 2019 gRPC authors.
     3   *
     4   * Licensed under the Apache License, Version 2.0 (the "License");
     5   * you may not use this file except in compliance with the License.
     6   * You may obtain a copy of the License at
     7   *
     8   *     http://www.apache.org/licenses/LICENSE-2.0
     9   *
    10   * Unless required by applicable law or agreed to in writing, software
    11   * distributed under the License is distributed on an "AS IS" BASIS,
    12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13   * See the License for the specific language governing permissions and
    14   * limitations under the License.
    15   */
    16  
    17  // Package balancergroup implements a utility struct to bind multiple balancers
    18  // into one balancer.
    19  package balancergroup
    20  
    21  import (
    22  	"fmt"
    23  	"sync"
    24  	"time"
    25  
    26  	"google.golang.org/grpc/balancer"
    27  	"google.golang.org/grpc/connectivity"
    28  	"google.golang.org/grpc/internal/balancer/gracefulswitch"
    29  	"google.golang.org/grpc/internal/cache"
    30  	"google.golang.org/grpc/internal/grpclog"
    31  	"google.golang.org/grpc/resolver"
    32  )
    33  
    34  // subBalancerWrapper is used to keep the configurations that will be used to start
    35  // the underlying balancer. It can be called to start/stop the underlying
    36  // balancer.
    37  //
    38  // When the config changes, it will pass the update to the underlying balancer
    39  // if it exists.
    40  //
    41  // TODO: move to a separate file?
    42  type subBalancerWrapper struct {
    43  	// subBalancerWrapper is passed to the sub-balancer as a ClientConn
    44  	// wrapper, only to keep the state and picker.  When sub-balancer is
    45  	// restarted while in cache, the picker needs to be resent.
    46  	//
    47  	// It also contains the sub-balancer ID, so the parent balancer group can
    48  	// keep track of SubConn/pickers and the sub-balancers they belong to. Some
    49  	// of the actions are forwarded to the parent ClientConn with no change.
    50  	// Some are forward to balancer group with the sub-balancer ID.
    51  	balancer.ClientConn
    52  	id    string
    53  	group *BalancerGroup
    54  
    55  	mu    sync.Mutex
    56  	state balancer.State
    57  
    58  	// The static part of sub-balancer. Keeps balancerBuilders and addresses.
    59  	// To be used when restarting sub-balancer.
    60  	builder balancer.Builder
    61  	// Options to be passed to sub-balancer at the time of creation.
    62  	buildOpts balancer.BuildOptions
    63  	// ccState is a cache of the addresses/balancer config, so when the balancer
    64  	// is restarted after close, it will get the previous update. It's a pointer
    65  	// and is set to nil at init, so when the balancer is built for the first
    66  	// time (not a restart), it won't receive an empty update. Note that this
    67  	// isn't reset to nil when the underlying balancer is closed.
    68  	ccState *balancer.ClientConnState
    69  	// The dynamic part of sub-balancer. Only used when balancer group is
    70  	// started. Gets cleared when sub-balancer is closed.
    71  	balancer *gracefulswitch.Balancer
    72  }
    73  
    74  // UpdateState overrides balancer.ClientConn, to keep state and picker.
    75  func (sbc *subBalancerWrapper) UpdateState(state balancer.State) {
    76  	sbc.mu.Lock()
    77  	sbc.state = state
    78  	sbc.group.updateBalancerState(sbc.id, state)
    79  	sbc.mu.Unlock()
    80  }
    81  
    82  // NewSubConn overrides balancer.ClientConn, so balancer group can keep track of
    83  // the relation between subconns and sub-balancers.
    84  func (sbc *subBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
    85  	return sbc.group.newSubConn(sbc, addrs, opts)
    86  }
    87  
    88  func (sbc *subBalancerWrapper) updateBalancerStateWithCachedPicker() {
    89  	sbc.mu.Lock()
    90  	if sbc.state.Picker != nil {
    91  		sbc.group.updateBalancerState(sbc.id, sbc.state)
    92  	}
    93  	sbc.mu.Unlock()
    94  }
    95  
    96  func (sbc *subBalancerWrapper) startBalancer() {
    97  	if sbc.balancer == nil {
    98  		sbc.balancer = gracefulswitch.NewBalancer(sbc, sbc.buildOpts)
    99  	}
   100  	sbc.group.logger.Infof("Creating child policy of type %q for locality %q", sbc.builder.Name(), sbc.id)
   101  	sbc.balancer.SwitchTo(sbc.builder)
   102  	if sbc.ccState != nil {
   103  		sbc.balancer.UpdateClientConnState(*sbc.ccState)
   104  	}
   105  }
   106  
   107  // exitIdle invokes the sub-balancer's ExitIdle method. Returns a boolean
   108  // indicating whether or not the operation was completed.
   109  func (sbc *subBalancerWrapper) exitIdle() (complete bool) {
   110  	b := sbc.balancer
   111  	if b == nil {
   112  		return true
   113  	}
   114  	b.ExitIdle()
   115  	return true
   116  }
   117  
   118  func (sbc *subBalancerWrapper) updateClientConnState(s balancer.ClientConnState) error {
   119  	sbc.ccState = &s
   120  	b := sbc.balancer
   121  	if b == nil {
   122  		// This sub-balancer was closed. This should never happen because
   123  		// sub-balancers are closed when the locality is removed from EDS, or
   124  		// the balancer group is closed. There should be no further address
   125  		// updates when either of this happened.
   126  		//
   127  		// This will be a common case with priority support, because a
   128  		// sub-balancer (and the whole balancer group) could be closed because
   129  		// it's the lower priority, but it can still get address updates.
   130  		return nil
   131  	}
   132  	return b.UpdateClientConnState(s)
   133  }
   134  
   135  func (sbc *subBalancerWrapper) resolverError(err error) {
   136  	b := sbc.balancer
   137  	if b == nil {
   138  		// This sub-balancer was closed. This should never happen because
   139  		// sub-balancers are closed when the locality is removed from EDS, or
   140  		// the balancer group is closed. There should be no further address
   141  		// updates when either of this happened.
   142  		//
   143  		// This will be a common case with priority support, because a
   144  		// sub-balancer (and the whole balancer group) could be closed because
   145  		// it's the lower priority, but it can still get address updates.
   146  		return
   147  	}
   148  	b.ResolverError(err)
   149  }
   150  
   151  func (sbc *subBalancerWrapper) gracefulSwitch(builder balancer.Builder) {
   152  	sbc.builder = builder
   153  	b := sbc.balancer
   154  	// Even if you get an add and it persists builder but doesn't start
   155  	// balancer, this would leave graceful switch being nil, in which we are
   156  	// correctly overwriting with the recent builder here as well to use later.
   157  	// The graceful switch balancer's presence is an invariant of whether the
   158  	// balancer group is closed or not (if closed, nil, if started, present).
   159  	if sbc.balancer != nil {
   160  		sbc.group.logger.Infof("Switching child policy %v to type %v", sbc.id, sbc.builder.Name())
   161  		b.SwitchTo(sbc.builder)
   162  	}
   163  }
   164  
   165  func (sbc *subBalancerWrapper) stopBalancer() {
   166  	if sbc.balancer == nil {
   167  		return
   168  	}
   169  	sbc.balancer.Close()
   170  	sbc.balancer = nil
   171  }
   172  
   173  // BalancerGroup takes a list of balancers, and make them into one balancer.
   174  //
   175  // Note that this struct doesn't implement balancer.Balancer, because it's not
   176  // intended to be used directly as a balancer. It's expected to be used as a
   177  // sub-balancer manager by a high level balancer.
   178  //
   179  //	Updates from ClientConn are forwarded to sub-balancers
   180  //	- service config update
   181  //	- address update
   182  //	- subConn state change
   183  //	  - find the corresponding balancer and forward
   184  //
   185  //	Actions from sub-balances are forwarded to parent ClientConn
   186  //	- new/remove SubConn
   187  //	- picker update and health states change
   188  //	  - sub-pickers are sent to an aggregator provided by the parent, which
   189  //	    will group them into a group-picker. The aggregated connectivity state is
   190  //	    also handled by the aggregator.
   191  //	- resolveNow
   192  //
   193  // Sub-balancers are only built when the balancer group is started. If the
   194  // balancer group is closed, the sub-balancers are also closed. And it's
   195  // guaranteed that no updates will be sent to parent ClientConn from a closed
   196  // balancer group.
   197  type BalancerGroup struct {
   198  	cc        balancer.ClientConn
   199  	buildOpts balancer.BuildOptions
   200  	logger    *grpclog.PrefixLogger
   201  
   202  	// stateAggregator is where the state/picker updates will be sent to. It's
   203  	// provided by the parent balancer, to build a picker with all the
   204  	// sub-pickers.
   205  	stateAggregator BalancerStateAggregator
   206  
   207  	// outgoingMu guards all operations in the direction:
   208  	// ClientConn-->Sub-balancer. Including start, stop, resolver updates and
   209  	// SubConn state changes.
   210  	//
   211  	// The corresponding boolean outgoingStarted is used to stop further updates
   212  	// to sub-balancers after they are closed.
   213  	outgoingMu         sync.Mutex
   214  	outgoingStarted    bool
   215  	idToBalancerConfig map[string]*subBalancerWrapper
   216  	// Cache for sub-balancers when they are removed. This is `nil` if caching
   217  	// is disabled by passing `0` for Options.SubBalancerCloseTimeout`.
   218  	deletedBalancerCache *cache.TimeoutCache
   219  
   220  	// incomingMu is to make sure this balancer group doesn't send updates to cc
   221  	// after it's closed.
   222  	//
   223  	// We don't share the mutex to avoid deadlocks (e.g. a call to sub-balancer
   224  	// may call back to balancer group inline. It causes deaclock if they
   225  	// require the same mutex).
   226  	//
   227  	// We should never need to hold multiple locks at the same time in this
   228  	// struct. The case where two locks are held can only happen when the
   229  	// underlying balancer calls back into balancer group inline. So there's an
   230  	// implicit lock acquisition order that outgoingMu is locked before
   231  	// incomingMu.
   232  
   233  	// incomingMu guards all operations in the direction:
   234  	// Sub-balancer-->ClientConn. Including NewSubConn, RemoveSubConn. It also
   235  	// guards the map from SubConn to balancer ID, so updateSubConnState needs
   236  	// to hold it shortly to potentially delete from the map.
   237  	//
   238  	// UpdateState is called by the balancer state aggretator, and it will
   239  	// decide when and whether to call.
   240  	//
   241  	// The corresponding boolean incomingStarted is used to stop further updates
   242  	// from sub-balancers after they are closed.
   243  	incomingMu      sync.Mutex
   244  	incomingStarted bool // This boolean only guards calls back to ClientConn.
   245  	scToSubBalancer map[balancer.SubConn]*subBalancerWrapper
   246  }
   247  
   248  // Options wraps the arguments to be passed to the BalancerGroup ctor.
   249  type Options struct {
   250  	// CC is a reference to the parent balancer.ClientConn.
   251  	CC balancer.ClientConn
   252  	// BuildOpts contains build options to be used when creating sub-balancers.
   253  	BuildOpts balancer.BuildOptions
   254  	// StateAggregator is an implementation of the BalancerStateAggregator
   255  	// interface to aggregate picker and connectivity states from sub-balancers.
   256  	StateAggregator BalancerStateAggregator
   257  	// Logger is a group specific prefix logger.
   258  	Logger *grpclog.PrefixLogger
   259  	// SubBalancerCloseTimeout is the amount of time deleted sub-balancers spend
   260  	// in the idle cache. A value of zero here disables caching of deleted
   261  	// sub-balancers.
   262  	SubBalancerCloseTimeout time.Duration
   263  }
   264  
   265  // New creates a new BalancerGroup. Note that the BalancerGroup
   266  // needs to be started to work.
   267  func New(opts Options) *BalancerGroup {
   268  	var bc *cache.TimeoutCache
   269  	if opts.SubBalancerCloseTimeout != time.Duration(0) {
   270  		bc = cache.NewTimeoutCache(opts.SubBalancerCloseTimeout)
   271  	}
   272  
   273  	return &BalancerGroup{
   274  		cc:              opts.CC,
   275  		buildOpts:       opts.BuildOpts,
   276  		stateAggregator: opts.StateAggregator,
   277  		logger:          opts.Logger,
   278  
   279  		deletedBalancerCache: bc,
   280  		idToBalancerConfig:   make(map[string]*subBalancerWrapper),
   281  		scToSubBalancer:      make(map[balancer.SubConn]*subBalancerWrapper),
   282  	}
   283  }
   284  
   285  // Start starts the balancer group, including building all the sub-balancers,
   286  // and send the existing addresses to them.
   287  //
   288  // A BalancerGroup can be closed and started later. When a BalancerGroup is
   289  // closed, it can still receive address updates, which will be applied when
   290  // restarted.
   291  func (bg *BalancerGroup) Start() {
   292  	bg.incomingMu.Lock()
   293  	bg.incomingStarted = true
   294  	bg.incomingMu.Unlock()
   295  
   296  	bg.outgoingMu.Lock()
   297  	if bg.outgoingStarted {
   298  		bg.outgoingMu.Unlock()
   299  		return
   300  	}
   301  
   302  	for _, config := range bg.idToBalancerConfig {
   303  		config.startBalancer()
   304  	}
   305  	bg.outgoingStarted = true
   306  	bg.outgoingMu.Unlock()
   307  }
   308  
   309  // AddWithClientConn adds a balancer with the given id to the group. The
   310  // balancer is built with a balancer builder registered with balancerName. The
   311  // given ClientConn is passed to the newly built balancer instead of the
   312  // onepassed to balancergroup.New().
   313  //
   314  // TODO: Get rid of the existing Add() API and replace it with this.
   315  func (bg *BalancerGroup) AddWithClientConn(id, balancerName string, cc balancer.ClientConn) error {
   316  	bg.logger.Infof("Adding child policy of type %q for locality %q", balancerName, id)
   317  	builder := balancer.Get(balancerName)
   318  	if builder == nil {
   319  		return fmt.Errorf("unregistered balancer name %q", balancerName)
   320  	}
   321  
   322  	// Store data in static map, and then check to see if bg is started.
   323  	bg.outgoingMu.Lock()
   324  	defer bg.outgoingMu.Unlock()
   325  	var sbc *subBalancerWrapper
   326  	// If outgoingStarted is true, search in the cache. Otherwise, cache is
   327  	// guaranteed to be empty, searching is unnecessary. Also, skip the cache if
   328  	// caching is disabled.
   329  	if bg.outgoingStarted && bg.deletedBalancerCache != nil {
   330  		if old, ok := bg.deletedBalancerCache.Remove(id); ok {
   331  			if bg.logger.V(2) {
   332  				bg.logger.Infof("Removing and reusing child policy of type %q for locality %q from the balancer cache", balancerName, id)
   333  				bg.logger.Infof("Number of items remaining in the balancer cache: %d", bg.deletedBalancerCache.Len())
   334  			}
   335  
   336  			sbc, _ = old.(*subBalancerWrapper)
   337  			if sbc != nil && sbc.builder != builder {
   338  				// If the sub-balancer in cache was built with a different
   339  				// balancer builder, don't use it, cleanup this old-balancer,
   340  				// and behave as sub-balancer is not found in cache.
   341  				//
   342  				// NOTE that this will also drop the cached addresses for this
   343  				// sub-balancer, which seems to be reasonable.
   344  				sbc.stopBalancer()
   345  				// cleanupSubConns must be done before the new balancer starts,
   346  				// otherwise new SubConns created by the new balancer might be
   347  				// removed by mistake.
   348  				bg.cleanupSubConns(sbc)
   349  				sbc = nil
   350  			}
   351  		}
   352  	}
   353  	if sbc == nil {
   354  		sbc = &subBalancerWrapper{
   355  			ClientConn: cc,
   356  			id:         id,
   357  			group:      bg,
   358  			builder:    builder,
   359  			buildOpts:  bg.buildOpts,
   360  		}
   361  		if bg.outgoingStarted {
   362  			// Only start the balancer if bg is started. Otherwise, we only keep the
   363  			// static data.
   364  			sbc.startBalancer()
   365  		}
   366  	} else {
   367  		// When brining back a sub-balancer from cache, re-send the cached
   368  		// picker and state.
   369  		sbc.updateBalancerStateWithCachedPicker()
   370  	}
   371  	bg.idToBalancerConfig[id] = sbc
   372  	return nil
   373  }
   374  
   375  // Add adds a balancer built by builder to the group, with given id.
   376  func (bg *BalancerGroup) Add(id string, builder balancer.Builder) {
   377  	bg.AddWithClientConn(id, builder.Name(), bg.cc)
   378  }
   379  
   380  // UpdateBuilder updates the builder for a current child, starting the Graceful
   381  // Switch process for that child.
   382  //
   383  // TODO: update this API to take the name of the new builder instead.
   384  func (bg *BalancerGroup) UpdateBuilder(id string, builder balancer.Builder) {
   385  	bg.outgoingMu.Lock()
   386  	// This does not deal with the balancer cache because this call should come
   387  	// after an Add call for a given child balancer. If the child is removed,
   388  	// the caller will call Add if the child balancer comes back which would
   389  	// then deal with the balancer cache.
   390  	sbc := bg.idToBalancerConfig[id]
   391  	if sbc == nil {
   392  		// simply ignore it if not present, don't error
   393  		return
   394  	}
   395  	sbc.gracefulSwitch(builder)
   396  	bg.outgoingMu.Unlock()
   397  }
   398  
   399  // Remove removes the balancer with id from the group.
   400  //
   401  // But doesn't close the balancer. The balancer is kept in a cache, and will be
   402  // closed after timeout. Cleanup work (closing sub-balancer and removing
   403  // subconns) will be done after timeout.
   404  func (bg *BalancerGroup) Remove(id string) {
   405  	bg.logger.Infof("Removing child policy for locality %q", id)
   406  
   407  	bg.outgoingMu.Lock()
   408  
   409  	sbToRemove, ok := bg.idToBalancerConfig[id]
   410  	if !ok {
   411  		bg.logger.Errorf("Child policy for locality %q does not exist in the balancer group", id)
   412  		bg.outgoingMu.Unlock()
   413  		return
   414  	}
   415  
   416  	// Unconditionally remove the sub-balancer config from the map.
   417  	delete(bg.idToBalancerConfig, id)
   418  	if !bg.outgoingStarted {
   419  		// Nothing needs to be done here, since we wouldn't have created the
   420  		// sub-balancer.
   421  		bg.outgoingMu.Unlock()
   422  		return
   423  	}
   424  
   425  	if bg.deletedBalancerCache != nil {
   426  		if bg.logger.V(2) {
   427  			bg.logger.Infof("Adding child policy for locality %q to the balancer cache", id)
   428  			bg.logger.Infof("Number of items remaining in the balancer cache: %d", bg.deletedBalancerCache.Len())
   429  		}
   430  
   431  		bg.deletedBalancerCache.Add(id, sbToRemove, func() {
   432  			if bg.logger.V(2) {
   433  				bg.logger.Infof("Removing child policy for locality %q from the balancer cache after timeout", id)
   434  				bg.logger.Infof("Number of items remaining in the balancer cache: %d", bg.deletedBalancerCache.Len())
   435  			}
   436  
   437  			// A sub-balancer evicted from the timeout cache needs to closed
   438  			// and its subConns need to removed, unconditionally. There is a
   439  			// possibility that a sub-balancer might be removed (thereby
   440  			// moving it to the cache) around the same time that the
   441  			// balancergroup is closed, and by the time we get here the
   442  			// balancergroup might be closed.  Check for `outgoingStarted ==
   443  			// true` at that point can lead to a leaked sub-balancer.
   444  			bg.outgoingMu.Lock()
   445  			sbToRemove.stopBalancer()
   446  			bg.outgoingMu.Unlock()
   447  			bg.cleanupSubConns(sbToRemove)
   448  		})
   449  		bg.outgoingMu.Unlock()
   450  		return
   451  	}
   452  
   453  	// Remove the sub-balancer with immediate effect if we are not caching.
   454  	sbToRemove.stopBalancer()
   455  	bg.outgoingMu.Unlock()
   456  	bg.cleanupSubConns(sbToRemove)
   457  }
   458  
   459  // bg.remove(id) doesn't do cleanup for the sub-balancer. This function does
   460  // cleanup after the timeout.
   461  func (bg *BalancerGroup) cleanupSubConns(config *subBalancerWrapper) {
   462  	bg.incomingMu.Lock()
   463  	// Remove SubConns. This is only done after the balancer is
   464  	// actually closed.
   465  	//
   466  	// NOTE: if NewSubConn is called by this (closed) balancer later, the
   467  	// SubConn will be leaked. This shouldn't happen if the balancer
   468  	// implementation is correct. To make sure this never happens, we need to
   469  	// add another layer (balancer manager) between balancer group and the
   470  	// sub-balancers.
   471  	for sc, b := range bg.scToSubBalancer {
   472  		if b == config {
   473  			delete(bg.scToSubBalancer, sc)
   474  		}
   475  	}
   476  	bg.incomingMu.Unlock()
   477  }
   478  
   479  // connect attempts to connect to all subConns belonging to sb.
   480  func (bg *BalancerGroup) connect(sb *subBalancerWrapper) {
   481  	bg.incomingMu.Lock()
   482  	for sc, b := range bg.scToSubBalancer {
   483  		if b == sb {
   484  			sc.Connect()
   485  		}
   486  	}
   487  	bg.incomingMu.Unlock()
   488  }
   489  
   490  // Following are actions from the parent grpc.ClientConn, forward to sub-balancers.
   491  
   492  // updateSubConnState forwards the update to cb and updates scToSubBalancer if
   493  // needed.
   494  func (bg *BalancerGroup) updateSubConnState(sc balancer.SubConn, state balancer.SubConnState, cb func(balancer.SubConnState)) {
   495  	bg.incomingMu.Lock()
   496  	if _, ok := bg.scToSubBalancer[sc]; !ok {
   497  		bg.incomingMu.Unlock()
   498  		return
   499  	}
   500  	if state.ConnectivityState == connectivity.Shutdown {
   501  		// Only delete sc from the map when state changed to Shutdown.
   502  		delete(bg.scToSubBalancer, sc)
   503  	}
   504  	bg.incomingMu.Unlock()
   505  
   506  	bg.outgoingMu.Lock()
   507  	if cb != nil {
   508  		cb(state)
   509  	}
   510  	bg.outgoingMu.Unlock()
   511  }
   512  
   513  // UpdateSubConnState handles the state for the subconn. It finds the
   514  // corresponding balancer and forwards the update.
   515  func (bg *BalancerGroup) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
   516  	bg.logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", sc, state)
   517  }
   518  
   519  // UpdateClientConnState handles ClientState (including balancer config and
   520  // addresses) from resolver. It finds the balancer and forwards the update.
   521  func (bg *BalancerGroup) UpdateClientConnState(id string, s balancer.ClientConnState) error {
   522  	bg.outgoingMu.Lock()
   523  	defer bg.outgoingMu.Unlock()
   524  	if config, ok := bg.idToBalancerConfig[id]; ok {
   525  		return config.updateClientConnState(s)
   526  	}
   527  	return nil
   528  }
   529  
   530  // ResolverError forwards resolver errors to all sub-balancers.
   531  func (bg *BalancerGroup) ResolverError(err error) {
   532  	bg.outgoingMu.Lock()
   533  	for _, config := range bg.idToBalancerConfig {
   534  		config.resolverError(err)
   535  	}
   536  	bg.outgoingMu.Unlock()
   537  }
   538  
   539  // Following are actions from sub-balancers, forward to ClientConn.
   540  
   541  // newSubConn: forward to ClientConn, and also create a map from sc to balancer,
   542  // so state update will find the right balancer.
   543  //
   544  // One note about removing SubConn: only forward to ClientConn, but not delete
   545  // from map. Delete sc from the map only when state changes to Shutdown. Since
   546  // it's just forwarding the action, there's no need for a removeSubConn()
   547  // wrapper function.
   548  func (bg *BalancerGroup) newSubConn(config *subBalancerWrapper, addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
   549  	// NOTE: if balancer with id was already removed, this should also return
   550  	// error. But since we call balancer.stopBalancer when removing the balancer, this
   551  	// shouldn't happen.
   552  	bg.incomingMu.Lock()
   553  	if !bg.incomingStarted {
   554  		bg.incomingMu.Unlock()
   555  		return nil, fmt.Errorf("NewSubConn is called after balancer group is closed")
   556  	}
   557  	var sc balancer.SubConn
   558  	oldListener := opts.StateListener
   559  	opts.StateListener = func(state balancer.SubConnState) { bg.updateSubConnState(sc, state, oldListener) }
   560  	sc, err := bg.cc.NewSubConn(addrs, opts)
   561  	if err != nil {
   562  		bg.incomingMu.Unlock()
   563  		return nil, err
   564  	}
   565  	bg.scToSubBalancer[sc] = config
   566  	bg.incomingMu.Unlock()
   567  	return sc, nil
   568  }
   569  
   570  // updateBalancerState: forward the new state to balancer state aggregator. The
   571  // aggregator will create an aggregated picker and an aggregated connectivity
   572  // state, then forward to ClientConn.
   573  func (bg *BalancerGroup) updateBalancerState(id string, state balancer.State) {
   574  	bg.logger.Infof("Balancer state update from locality %v, new state: %+v", id, state)
   575  
   576  	// Send new state to the aggregator, without holding the incomingMu.
   577  	// incomingMu is to protect all calls to the parent ClientConn, this update
   578  	// doesn't necessary trigger a call to ClientConn, and should already be
   579  	// protected by aggregator's mutex if necessary.
   580  	if bg.stateAggregator != nil {
   581  		bg.stateAggregator.UpdateState(id, state)
   582  	}
   583  }
   584  
   585  // Close closes the balancer. It stops sub-balancers, and removes the subconns.
   586  // The BalancerGroup can be restarted later.
   587  func (bg *BalancerGroup) Close() {
   588  	bg.incomingMu.Lock()
   589  	if bg.incomingStarted {
   590  		bg.incomingStarted = false
   591  		// Also remove all SubConns.
   592  		for sc := range bg.scToSubBalancer {
   593  			sc.Shutdown()
   594  			delete(bg.scToSubBalancer, sc)
   595  		}
   596  	}
   597  	bg.incomingMu.Unlock()
   598  
   599  	// Clear(true) runs clear function to close sub-balancers in cache. It
   600  	// must be called out of outgoing mutex.
   601  	if bg.deletedBalancerCache != nil {
   602  		bg.deletedBalancerCache.Clear(true)
   603  	}
   604  
   605  	bg.outgoingMu.Lock()
   606  	if bg.outgoingStarted {
   607  		bg.outgoingStarted = false
   608  		for _, config := range bg.idToBalancerConfig {
   609  			config.stopBalancer()
   610  		}
   611  	}
   612  	bg.outgoingMu.Unlock()
   613  }
   614  
   615  // ExitIdle should be invoked when the parent LB policy's ExitIdle is invoked.
   616  // It will trigger this on all sub-balancers, or reconnect their subconns if
   617  // not supported.
   618  func (bg *BalancerGroup) ExitIdle() {
   619  	bg.outgoingMu.Lock()
   620  	for _, config := range bg.idToBalancerConfig {
   621  		if !config.exitIdle() {
   622  			bg.connect(config)
   623  		}
   624  	}
   625  	bg.outgoingMu.Unlock()
   626  }
   627  
   628  // ExitIdleOne instructs the sub-balancer `id` to exit IDLE state, if
   629  // appropriate and possible.
   630  func (bg *BalancerGroup) ExitIdleOne(id string) {
   631  	bg.outgoingMu.Lock()
   632  	if config := bg.idToBalancerConfig[id]; config != nil {
   633  		if !config.exitIdle() {
   634  			bg.connect(config)
   635  		}
   636  	}
   637  	bg.outgoingMu.Unlock()
   638  }
   639  

View as plain text