...

Source file src/google.golang.org/grpc/balancer/rls/balancer.go

Documentation: google.golang.org/grpc/balancer/rls

     1  /*
     2   *
     3   * Copyright 2020 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  // Package rls implements the RLS LB policy.
    20  package rls
    21  
    22  import (
    23  	"encoding/json"
    24  	"errors"
    25  	"fmt"
    26  	"sync"
    27  	"sync/atomic"
    28  	"time"
    29  	"unsafe"
    30  
    31  	"google.golang.org/grpc/balancer"
    32  	"google.golang.org/grpc/connectivity"
    33  	"google.golang.org/grpc/grpclog"
    34  	"google.golang.org/grpc/internal"
    35  	"google.golang.org/grpc/internal/backoff"
    36  	"google.golang.org/grpc/internal/balancergroup"
    37  	"google.golang.org/grpc/internal/buffer"
    38  	internalgrpclog "google.golang.org/grpc/internal/grpclog"
    39  	"google.golang.org/grpc/internal/grpcsync"
    40  	"google.golang.org/grpc/internal/pretty"
    41  	"google.golang.org/grpc/resolver"
    42  )
    43  
    44  const (
    45  	// Name is the name of the RLS LB policy.
    46  	//
    47  	// It currently has an experimental suffix which would be removed once
    48  	// end-to-end testing of the policy is completed.
    49  	Name = internal.RLSLoadBalancingPolicyName
    50  	// Default frequency for data cache purging.
    51  	periodicCachePurgeFreq = time.Minute
    52  )
    53  
    54  var (
    55  	logger            = grpclog.Component("rls")
    56  	errBalancerClosed = errors.New("rls LB policy is closed")
    57  
    58  	// Below defined vars for overriding in unit tests.
    59  
    60  	// Default exponential backoff strategy for data cache entries.
    61  	defaultBackoffStrategy = backoff.Strategy(backoff.DefaultExponential)
    62  	// Ticker used for periodic data cache purging.
    63  	dataCachePurgeTicker = func() *time.Ticker { return time.NewTicker(periodicCachePurgeFreq) }
    64  	// We want every cache entry to live in the cache for at least this
    65  	// duration. If we encounter a cache entry whose minimum expiration time is
    66  	// in the future, we abort the LRU pass, which may temporarily leave the
    67  	// cache being too large. This is necessary to ensure that in cases where
    68  	// the cache is too small, when we receive an RLS Response, we keep the
    69  	// resulting cache entry around long enough for the pending incoming
    70  	// requests to be re-processed through the new Picker. If we didn't do this,
    71  	// then we'd risk throwing away each RLS response as we receive it, in which
    72  	// case we would fail to actually route any of our incoming requests.
    73  	minEvictDuration = 5 * time.Second
    74  
    75  	// Following functions are no-ops in actual code, but can be overridden in
    76  	// tests to give tests visibility into exactly when certain events happen.
    77  	clientConnUpdateHook = func() {}
    78  	dataCachePurgeHook   = func() {}
    79  	resetBackoffHook     = func() {}
    80  )
    81  
    82  func init() {
    83  	balancer.Register(&rlsBB{})
    84  }
    85  
    86  type rlsBB struct{}
    87  
    88  func (rlsBB) Name() string {
    89  	return Name
    90  }
    91  
    92  func (rlsBB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
    93  	lb := &rlsBalancer{
    94  		closed:             grpcsync.NewEvent(),
    95  		done:               grpcsync.NewEvent(),
    96  		cc:                 cc,
    97  		bopts:              opts,
    98  		purgeTicker:        dataCachePurgeTicker(),
    99  		dataCachePurgeHook: dataCachePurgeHook,
   100  		lbCfg:              &lbConfig{},
   101  		pendingMap:         make(map[cacheKey]*backoffState),
   102  		childPolicies:      make(map[string]*childPolicyWrapper),
   103  		updateCh:           buffer.NewUnbounded(),
   104  	}
   105  	lb.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[rls-experimental-lb %p] ", lb))
   106  	lb.dataCache = newDataCache(maxCacheSize, lb.logger)
   107  	lb.bg = balancergroup.New(balancergroup.Options{
   108  		CC:                      cc,
   109  		BuildOpts:               opts,
   110  		StateAggregator:         lb,
   111  		Logger:                  lb.logger,
   112  		SubBalancerCloseTimeout: time.Duration(0), // Disable caching of removed child policies
   113  	})
   114  	lb.bg.Start()
   115  	go lb.run()
   116  	return lb
   117  }
   118  
   119  // rlsBalancer implements the RLS LB policy.
   120  type rlsBalancer struct {
   121  	closed             *grpcsync.Event // Fires when Close() is invoked. Guarded by stateMu.
   122  	done               *grpcsync.Event // Fires when Close() is done.
   123  	cc                 balancer.ClientConn
   124  	bopts              balancer.BuildOptions
   125  	purgeTicker        *time.Ticker
   126  	dataCachePurgeHook func()
   127  	logger             *internalgrpclog.PrefixLogger
   128  
   129  	// If both cacheMu and stateMu need to be acquired, the former must be
   130  	// acquired first to prevent a deadlock. This order restriction is due to the
   131  	// fact that in places where we need to acquire both the locks, we always
   132  	// start off reading the cache.
   133  
   134  	// cacheMu guards access to the data cache and pending requests map. We
   135  	// cannot use an RWMutex here since even an operation like
   136  	// dataCache.getEntry() modifies the underlying LRU, which is implemented as
   137  	// a doubly linked list.
   138  	cacheMu    sync.Mutex
   139  	dataCache  *dataCache                 // Cache of RLS data.
   140  	pendingMap map[cacheKey]*backoffState // Map of pending RLS requests.
   141  
   142  	// stateMu guards access to all LB policy state.
   143  	stateMu            sync.Mutex
   144  	lbCfg              *lbConfig        // Most recently received service config.
   145  	childPolicyBuilder balancer.Builder // Cached child policy builder.
   146  	resolverState      resolver.State   // Cached resolver state.
   147  	ctrlCh             *controlChannel  // Control channel to the RLS server.
   148  	bg                 *balancergroup.BalancerGroup
   149  	childPolicies      map[string]*childPolicyWrapper
   150  	defaultPolicy      *childPolicyWrapper
   151  	// A reference to the most recent picker sent to gRPC as part of a state
   152  	// update is cached in this field so that we can release the reference to the
   153  	// default child policy wrapper when a new picker is created. See
   154  	// sendNewPickerLocked() for details.
   155  	lastPicker *rlsPicker
   156  	// Set during UpdateClientConnState when pushing updates to child policies.
   157  	// Prevents state updates from child policies causing new pickers to be sent
   158  	// up the channel. Cleared after all child policies have processed the
   159  	// updates sent to them, after which a new picker is sent up the channel.
   160  	inhibitPickerUpdates bool
   161  
   162  	// Channel on which all updates are pushed. Processed in run().
   163  	updateCh *buffer.Unbounded
   164  }
   165  
   166  type resumePickerUpdates struct {
   167  	done chan struct{}
   168  }
   169  
   170  // childPolicyIDAndState wraps a child policy id and its state update.
   171  type childPolicyIDAndState struct {
   172  	id    string
   173  	state balancer.State
   174  }
   175  
   176  type controlChannelReady struct{}
   177  
   178  // run is a long-running goroutine which handles all the updates that the
   179  // balancer wishes to handle. The appropriate updateHandler will push the update
   180  // on to a channel that this goroutine will select on, thereby the handling of
   181  // the update will happen asynchronously.
   182  func (b *rlsBalancer) run() {
   183  	// We exit out of the for loop below only after `Close()` has been invoked.
   184  	// Firing the done event here will ensure that Close() returns only after
   185  	// all goroutines are done.
   186  	defer func() { b.done.Fire() }()
   187  
   188  	// Wait for purgeDataCache() goroutine to exit before returning from here.
   189  	doneCh := make(chan struct{})
   190  	defer func() {
   191  		<-doneCh
   192  	}()
   193  	go b.purgeDataCache(doneCh)
   194  
   195  	for {
   196  		select {
   197  		case u, ok := <-b.updateCh.Get():
   198  			if !ok {
   199  				return
   200  			}
   201  			b.updateCh.Load()
   202  			switch update := u.(type) {
   203  			case childPolicyIDAndState:
   204  				b.handleChildPolicyStateUpdate(update.id, update.state)
   205  			case controlChannelReady:
   206  				b.logger.Infof("Resetting backoff state after control channel getting back to READY")
   207  				b.cacheMu.Lock()
   208  				updatePicker := b.dataCache.resetBackoffState(&backoffState{bs: defaultBackoffStrategy})
   209  				b.cacheMu.Unlock()
   210  				if updatePicker {
   211  					b.sendNewPicker()
   212  				}
   213  				resetBackoffHook()
   214  			case resumePickerUpdates:
   215  				b.stateMu.Lock()
   216  				b.logger.Infof("Resuming picker updates after config propagation to child policies")
   217  				b.inhibitPickerUpdates = false
   218  				b.sendNewPickerLocked()
   219  				close(update.done)
   220  				b.stateMu.Unlock()
   221  			default:
   222  				b.logger.Errorf("Unsupported update type %T", update)
   223  			}
   224  		case <-b.closed.Done():
   225  			return
   226  		}
   227  	}
   228  }
   229  
   230  // purgeDataCache is a long-running goroutine which periodically deletes expired
   231  // entries. An expired entry is one for which both the expiryTime and
   232  // backoffExpiryTime are in the past.
   233  func (b *rlsBalancer) purgeDataCache(doneCh chan struct{}) {
   234  	defer close(doneCh)
   235  
   236  	for {
   237  		select {
   238  		case <-b.closed.Done():
   239  			return
   240  		case <-b.purgeTicker.C:
   241  			b.cacheMu.Lock()
   242  			updatePicker := b.dataCache.evictExpiredEntries()
   243  			b.cacheMu.Unlock()
   244  			if updatePicker {
   245  				b.sendNewPicker()
   246  			}
   247  			b.dataCachePurgeHook()
   248  		}
   249  	}
   250  }
   251  
   252  func (b *rlsBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error {
   253  	defer clientConnUpdateHook()
   254  
   255  	b.stateMu.Lock()
   256  	if b.closed.HasFired() {
   257  		b.stateMu.Unlock()
   258  		b.logger.Warningf("Received service config after balancer close: %s", pretty.ToJSON(ccs.BalancerConfig))
   259  		return errBalancerClosed
   260  	}
   261  
   262  	newCfg := ccs.BalancerConfig.(*lbConfig)
   263  	if b.lbCfg.Equal(newCfg) {
   264  		b.stateMu.Unlock()
   265  		b.logger.Infof("New service config matches existing config")
   266  		return nil
   267  	}
   268  
   269  	b.logger.Infof("Delaying picker updates until config is propagated to and processed by child policies")
   270  	b.inhibitPickerUpdates = true
   271  
   272  	// When the RLS server name changes, the old control channel needs to be
   273  	// swapped out for a new one. All state associated with the throttling
   274  	// algorithm is stored on a per-control-channel basis; when we swap out
   275  	// channels, we also swap out the throttling state.
   276  	b.handleControlChannelUpdate(newCfg)
   277  
   278  	// Any changes to child policy name or configuration needs to be handled by
   279  	// either creating new child policies or pushing updates to existing ones.
   280  	b.resolverState = ccs.ResolverState
   281  	b.handleChildPolicyConfigUpdate(newCfg, &ccs)
   282  
   283  	// Resize the cache if the size in the config has changed.
   284  	resizeCache := newCfg.cacheSizeBytes != b.lbCfg.cacheSizeBytes
   285  
   286  	// Update the copy of the config in the LB policy before releasing the lock.
   287  	b.lbCfg = newCfg
   288  
   289  	// Enqueue an event which will notify us when the above update has been
   290  	// propagated to all child policies, and the child policies have all
   291  	// processed their updates, and we have sent a picker update.
   292  	done := make(chan struct{})
   293  	b.updateCh.Put(resumePickerUpdates{done: done})
   294  	b.stateMu.Unlock()
   295  	<-done
   296  
   297  	if resizeCache {
   298  		// If the new config changes reduces the size of the data cache, we
   299  		// might have to evict entries to get the cache size down to the newly
   300  		// specified size.
   301  		//
   302  		// And we cannot do this operation above (where we compute the
   303  		// `resizeCache` boolean) because `cacheMu` needs to be grabbed before
   304  		// `stateMu` if we are to hold both locks at the same time.
   305  		b.cacheMu.Lock()
   306  		b.dataCache.resize(newCfg.cacheSizeBytes)
   307  		b.cacheMu.Unlock()
   308  	}
   309  	return nil
   310  }
   311  
   312  // handleControlChannelUpdate handles updates to service config fields which
   313  // influence the control channel to the RLS server.
   314  //
   315  // Caller must hold lb.stateMu.
   316  func (b *rlsBalancer) handleControlChannelUpdate(newCfg *lbConfig) {
   317  	if newCfg.lookupService == b.lbCfg.lookupService && newCfg.lookupServiceTimeout == b.lbCfg.lookupServiceTimeout {
   318  		return
   319  	}
   320  
   321  	// Create a new control channel and close the existing one.
   322  	b.logger.Infof("Creating control channel to RLS server at: %v", newCfg.lookupService)
   323  	backToReadyFn := func() {
   324  		b.updateCh.Put(controlChannelReady{})
   325  	}
   326  	ctrlCh, err := newControlChannel(newCfg.lookupService, newCfg.controlChannelServiceConfig, newCfg.lookupServiceTimeout, b.bopts, backToReadyFn)
   327  	if err != nil {
   328  		// This is very uncommon and usually represents a non-transient error.
   329  		// There is not much we can do here other than wait for another update
   330  		// which might fix things.
   331  		b.logger.Errorf("Failed to create control channel to %q: %v", newCfg.lookupService, err)
   332  		return
   333  	}
   334  	if b.ctrlCh != nil {
   335  		b.ctrlCh.close()
   336  	}
   337  	b.ctrlCh = ctrlCh
   338  }
   339  
   340  // handleChildPolicyConfigUpdate handles updates to service config fields which
   341  // influence child policy configuration.
   342  //
   343  // Caller must hold lb.stateMu.
   344  func (b *rlsBalancer) handleChildPolicyConfigUpdate(newCfg *lbConfig, ccs *balancer.ClientConnState) {
   345  	// Update child policy builder first since other steps are dependent on this.
   346  	if b.childPolicyBuilder == nil || b.childPolicyBuilder.Name() != newCfg.childPolicyName {
   347  		b.logger.Infof("Child policy changed to %q", newCfg.childPolicyName)
   348  		b.childPolicyBuilder = balancer.Get(newCfg.childPolicyName)
   349  		for _, cpw := range b.childPolicies {
   350  			// If the child policy has changed, we need to remove the old policy
   351  			// from the BalancerGroup and add a new one. The BalancerGroup takes
   352  			// care of closing the old one in this case.
   353  			b.bg.Remove(cpw.target)
   354  			b.bg.Add(cpw.target, b.childPolicyBuilder)
   355  		}
   356  	}
   357  
   358  	configSentToDefault := false
   359  	if b.lbCfg.defaultTarget != newCfg.defaultTarget {
   360  		// If the default target has changed, create a new childPolicyWrapper for
   361  		// the new target if required. If a new wrapper is created, add it to the
   362  		// childPolicies map and the BalancerGroup.
   363  		b.logger.Infof("Default target in LB config changing from %q to %q", b.lbCfg.defaultTarget, newCfg.defaultTarget)
   364  		cpw := b.childPolicies[newCfg.defaultTarget]
   365  		if cpw == nil {
   366  			cpw = newChildPolicyWrapper(newCfg.defaultTarget)
   367  			b.childPolicies[newCfg.defaultTarget] = cpw
   368  			b.bg.Add(newCfg.defaultTarget, b.childPolicyBuilder)
   369  			b.logger.Infof("Child policy %q added to BalancerGroup", newCfg.defaultTarget)
   370  		}
   371  		if err := b.buildAndPushChildPolicyConfigs(newCfg.defaultTarget, newCfg, ccs); err != nil {
   372  			cpw.lamify(err)
   373  		}
   374  
   375  		// If an old default exists, release its reference. If this was the last
   376  		// reference, remove the child policy from the BalancerGroup and remove the
   377  		// corresponding entry the childPolicies map.
   378  		if b.defaultPolicy != nil {
   379  			if b.defaultPolicy.releaseRef() {
   380  				delete(b.childPolicies, b.lbCfg.defaultTarget)
   381  				b.bg.Remove(b.defaultPolicy.target)
   382  			}
   383  		}
   384  		b.defaultPolicy = cpw
   385  		configSentToDefault = true
   386  	}
   387  
   388  	// No change in configuration affecting child policies. Return early.
   389  	if b.lbCfg.childPolicyName == newCfg.childPolicyName && b.lbCfg.childPolicyTargetField == newCfg.childPolicyTargetField && childPolicyConfigEqual(b.lbCfg.childPolicyConfig, newCfg.childPolicyConfig) {
   390  		return
   391  	}
   392  
   393  	// If fields affecting child policy configuration have changed, the changes
   394  	// are pushed to the childPolicyWrapper which handles them appropriately.
   395  	for _, cpw := range b.childPolicies {
   396  		if configSentToDefault && cpw.target == newCfg.defaultTarget {
   397  			// Default target has already been taken care of.
   398  			continue
   399  		}
   400  		if err := b.buildAndPushChildPolicyConfigs(cpw.target, newCfg, ccs); err != nil {
   401  			cpw.lamify(err)
   402  		}
   403  	}
   404  }
   405  
   406  // buildAndPushChildPolicyConfigs builds the final child policy configuration by
   407  // adding the `targetField` to the base child policy configuration received in
   408  // RLS LB policy configuration. The `targetField` is set to target and
   409  // configuration is pushed to the child policy through the BalancerGroup.
   410  //
   411  // Caller must hold lb.stateMu.
   412  func (b *rlsBalancer) buildAndPushChildPolicyConfigs(target string, newCfg *lbConfig, ccs *balancer.ClientConnState) error {
   413  	jsonTarget, err := json.Marshal(target)
   414  	if err != nil {
   415  		return fmt.Errorf("failed to marshal child policy target %q: %v", target, err)
   416  	}
   417  
   418  	config := newCfg.childPolicyConfig
   419  	targetField := newCfg.childPolicyTargetField
   420  	config[targetField] = jsonTarget
   421  	jsonCfg, err := json.Marshal(config)
   422  	if err != nil {
   423  		return fmt.Errorf("failed to marshal child policy config %+v: %v", config, err)
   424  	}
   425  
   426  	parser, _ := b.childPolicyBuilder.(balancer.ConfigParser)
   427  	parsedCfg, err := parser.ParseConfig(jsonCfg)
   428  	if err != nil {
   429  		return fmt.Errorf("childPolicy config parsing failed: %v", err)
   430  	}
   431  
   432  	state := balancer.ClientConnState{ResolverState: ccs.ResolverState, BalancerConfig: parsedCfg}
   433  	b.logger.Infof("Pushing new state to child policy %q: %+v", target, state)
   434  	if err := b.bg.UpdateClientConnState(target, state); err != nil {
   435  		b.logger.Warningf("UpdateClientConnState(%q, %+v) failed : %v", target, ccs, err)
   436  	}
   437  	return nil
   438  }
   439  
   440  func (b *rlsBalancer) ResolverError(err error) {
   441  	b.bg.ResolverError(err)
   442  }
   443  
   444  func (b *rlsBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
   445  	b.logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", sc, state)
   446  }
   447  
   448  func (b *rlsBalancer) Close() {
   449  	b.stateMu.Lock()
   450  	b.closed.Fire()
   451  	b.purgeTicker.Stop()
   452  	if b.ctrlCh != nil {
   453  		b.ctrlCh.close()
   454  	}
   455  	b.bg.Close()
   456  	b.stateMu.Unlock()
   457  
   458  	b.cacheMu.Lock()
   459  	b.dataCache.stop()
   460  	b.cacheMu.Unlock()
   461  
   462  	b.updateCh.Close()
   463  
   464  	<-b.done.Done()
   465  }
   466  
   467  func (b *rlsBalancer) ExitIdle() {
   468  	b.bg.ExitIdle()
   469  }
   470  
   471  // sendNewPickerLocked pushes a new picker on to the channel.
   472  //
   473  // Note that regardless of what connectivity state is reported, the policy will
   474  // return its own picker, and not a picker that unconditionally queues
   475  // (typically used for IDLE or CONNECTING) or a picker that unconditionally
   476  // fails (typically used for TRANSIENT_FAILURE). This is required because,
   477  // irrespective of the connectivity state, we need to able to perform RLS
   478  // lookups for incoming RPCs and affect the status of queued RPCs based on the
   479  // receipt of RLS responses.
   480  //
   481  // Caller must hold lb.stateMu.
   482  func (b *rlsBalancer) sendNewPickerLocked() {
   483  	aggregatedState := b.aggregatedConnectivityState()
   484  
   485  	// Acquire a separate reference for the picker. This is required to ensure
   486  	// that the wrapper held by the old picker is not closed when the default
   487  	// target changes in the config, and a new wrapper is created for the new
   488  	// default target. See handleChildPolicyConfigUpdate() for how config changes
   489  	// affecting the default target are handled.
   490  	if b.defaultPolicy != nil {
   491  		b.defaultPolicy.acquireRef()
   492  	}
   493  	picker := &rlsPicker{
   494  		kbm:           b.lbCfg.kbMap,
   495  		origEndpoint:  b.bopts.Target.Endpoint(),
   496  		lb:            b,
   497  		defaultPolicy: b.defaultPolicy,
   498  		ctrlCh:        b.ctrlCh,
   499  		maxAge:        b.lbCfg.maxAge,
   500  		staleAge:      b.lbCfg.staleAge,
   501  		bg:            b.bg,
   502  	}
   503  	picker.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[rls-picker %p] ", picker))
   504  	state := balancer.State{
   505  		ConnectivityState: aggregatedState,
   506  		Picker:            picker,
   507  	}
   508  
   509  	if !b.inhibitPickerUpdates {
   510  		b.logger.Infof("New balancer.State: %+v", state)
   511  		b.cc.UpdateState(state)
   512  	} else {
   513  		b.logger.Infof("Delaying picker update: %+v", state)
   514  	}
   515  
   516  	if b.lastPicker != nil {
   517  		if b.defaultPolicy != nil {
   518  			b.defaultPolicy.releaseRef()
   519  		}
   520  	}
   521  	b.lastPicker = picker
   522  }
   523  
   524  func (b *rlsBalancer) sendNewPicker() {
   525  	b.stateMu.Lock()
   526  	defer b.stateMu.Unlock()
   527  	if b.closed.HasFired() {
   528  		return
   529  	}
   530  	b.sendNewPickerLocked()
   531  }
   532  
   533  // The aggregated connectivity state reported is determined as follows:
   534  //   - If there is at least one child policy in state READY, the connectivity
   535  //     state is READY.
   536  //   - Otherwise, if there is at least one child policy in state CONNECTING, the
   537  //     connectivity state is CONNECTING.
   538  //   - Otherwise, if there is at least one child policy in state IDLE, the
   539  //     connectivity state is IDLE.
   540  //   - Otherwise, all child policies are in TRANSIENT_FAILURE, and the
   541  //     connectivity state is TRANSIENT_FAILURE.
   542  //
   543  // If the RLS policy has no child policies and no configured default target,
   544  // then we will report connectivity state IDLE.
   545  //
   546  // Caller must hold lb.stateMu.
   547  func (b *rlsBalancer) aggregatedConnectivityState() connectivity.State {
   548  	if len(b.childPolicies) == 0 && b.lbCfg.defaultTarget == "" {
   549  		return connectivity.Idle
   550  	}
   551  
   552  	var readyN, connectingN, idleN int
   553  	for _, cpw := range b.childPolicies {
   554  		state := (*balancer.State)(atomic.LoadPointer(&cpw.state))
   555  		switch state.ConnectivityState {
   556  		case connectivity.Ready:
   557  			readyN++
   558  		case connectivity.Connecting:
   559  			connectingN++
   560  		case connectivity.Idle:
   561  			idleN++
   562  		}
   563  	}
   564  
   565  	switch {
   566  	case readyN > 0:
   567  		return connectivity.Ready
   568  	case connectingN > 0:
   569  		return connectivity.Connecting
   570  	case idleN > 0:
   571  		return connectivity.Idle
   572  	default:
   573  		return connectivity.TransientFailure
   574  	}
   575  }
   576  
   577  // UpdateState is a implementation of the balancergroup.BalancerStateAggregator
   578  // interface. The actual state aggregation functionality is handled
   579  // asynchronously. This method only pushes the state update on to channel read
   580  // and dispatched by the run() goroutine.
   581  func (b *rlsBalancer) UpdateState(id string, state balancer.State) {
   582  	b.updateCh.Put(childPolicyIDAndState{id: id, state: state})
   583  }
   584  
   585  // handleChildPolicyStateUpdate provides the state aggregator functionality for
   586  // the BalancerGroup.
   587  //
   588  // This method is invoked by the BalancerGroup whenever a child policy sends a
   589  // state update. We cache the child policy's connectivity state and picker for
   590  // two reasons:
   591  //   - to suppress connectivity state transitions from TRANSIENT_FAILURE to states
   592  //     other than READY
   593  //   - to delegate picks to child policies
   594  func (b *rlsBalancer) handleChildPolicyStateUpdate(id string, newState balancer.State) {
   595  	b.stateMu.Lock()
   596  	defer b.stateMu.Unlock()
   597  
   598  	cpw := b.childPolicies[id]
   599  	if cpw == nil {
   600  		// All child policies start with an entry in the map. If ID is not in
   601  		// map, it's either been removed, or never existed.
   602  		b.logger.Warningf("Received state update %+v for missing child policy %q", newState, id)
   603  		return
   604  	}
   605  
   606  	oldState := (*balancer.State)(atomic.LoadPointer(&cpw.state))
   607  	if oldState.ConnectivityState == connectivity.TransientFailure && newState.ConnectivityState == connectivity.Connecting {
   608  		// Ignore state transitions from TRANSIENT_FAILURE to CONNECTING, and thus
   609  		// fail pending RPCs instead of queuing them indefinitely when all
   610  		// subChannels are failing, even if the subChannels are bouncing back and
   611  		// forth between CONNECTING and TRANSIENT_FAILURE.
   612  		return
   613  	}
   614  	atomic.StorePointer(&cpw.state, unsafe.Pointer(&newState))
   615  	b.logger.Infof("Child policy %q has new state %+v", id, newState)
   616  	b.sendNewPickerLocked()
   617  }
   618  
   619  // acquireChildPolicyReferences attempts to acquire references to
   620  // childPolicyWrappers corresponding to the passed in targets. If there is no
   621  // childPolicyWrapper corresponding to one of the targets, a new one is created
   622  // and added to the BalancerGroup.
   623  func (b *rlsBalancer) acquireChildPolicyReferences(targets []string) []*childPolicyWrapper {
   624  	b.stateMu.Lock()
   625  	var newChildPolicies []*childPolicyWrapper
   626  	for _, target := range targets {
   627  		// If the target exists in the LB policy's childPolicies map. a new
   628  		// reference is taken here and added to the new list.
   629  		if cpw := b.childPolicies[target]; cpw != nil {
   630  			cpw.acquireRef()
   631  			newChildPolicies = append(newChildPolicies, cpw)
   632  			continue
   633  		}
   634  
   635  		// If the target does not exist in the child policy map, then a new
   636  		// child policy wrapper is created and added to the new list.
   637  		cpw := newChildPolicyWrapper(target)
   638  		b.childPolicies[target] = cpw
   639  		b.bg.Add(target, b.childPolicyBuilder)
   640  		b.logger.Infof("Child policy %q added to BalancerGroup", target)
   641  		newChildPolicies = append(newChildPolicies, cpw)
   642  		if err := b.buildAndPushChildPolicyConfigs(target, b.lbCfg, &balancer.ClientConnState{
   643  			ResolverState: b.resolverState,
   644  		}); err != nil {
   645  			cpw.lamify(err)
   646  		}
   647  	}
   648  	b.stateMu.Unlock()
   649  	return newChildPolicies
   650  }
   651  
   652  // releaseChildPolicyReferences releases references to childPolicyWrappers
   653  // corresponding to the passed in targets. If the release reference was the last
   654  // one, the child policy is removed from the BalancerGroup.
   655  func (b *rlsBalancer) releaseChildPolicyReferences(targets []string) {
   656  	b.stateMu.Lock()
   657  	for _, target := range targets {
   658  		if cpw := b.childPolicies[target]; cpw.releaseRef() {
   659  			delete(b.childPolicies, cpw.target)
   660  			b.bg.Remove(cpw.target)
   661  		}
   662  	}
   663  	b.stateMu.Unlock()
   664  }
   665  

View as plain text