...

Source file src/google.golang.org/grpc/xds/internal/balancer/clustermanager/balancerstateaggregator.go

Documentation: google.golang.org/grpc/xds/internal/balancer/clustermanager

     1  /*
     2   *
     3   * Copyright 2020 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  package clustermanager
    20  
    21  import (
    22  	"fmt"
    23  	"sync"
    24  
    25  	"google.golang.org/grpc/balancer"
    26  	"google.golang.org/grpc/balancer/base"
    27  	"google.golang.org/grpc/connectivity"
    28  	"google.golang.org/grpc/internal/grpclog"
    29  )
    30  
    31  type subBalancerState struct {
    32  	state balancer.State
    33  	// stateToAggregate is the connectivity state used only for state
    34  	// aggregation. It could be different from state.ConnectivityState. For
    35  	// example when a sub-balancer transitions from TransientFailure to
    36  	// connecting, state.ConnectivityState is Connecting, but stateToAggregate
    37  	// is still TransientFailure.
    38  	stateToAggregate connectivity.State
    39  }
    40  
    41  func (s *subBalancerState) String() string {
    42  	return fmt.Sprintf("picker:%p,state:%v,stateToAggregate:%v", s.state.Picker, s.state.ConnectivityState, s.stateToAggregate)
    43  }
    44  
    45  type balancerStateAggregator struct {
    46  	cc     balancer.ClientConn
    47  	logger *grpclog.PrefixLogger
    48  
    49  	mu sync.Mutex
    50  	// If started is false, no updates should be sent to the parent cc. A closed
    51  	// sub-balancer could still send pickers to this aggregator. This makes sure
    52  	// that no updates will be forwarded to parent when the whole balancer group
    53  	// and states aggregator is closed.
    54  	started bool
    55  	// All balancer IDs exist as keys in this map, even if balancer group is not
    56  	// started.
    57  	//
    58  	// If an ID is not in map, it's either removed or never added.
    59  	idToPickerState map[string]*subBalancerState
    60  	// Set when UpdateState call propagation is paused.
    61  	pauseUpdateState bool
    62  	// Set when UpdateState call propagation is paused and an UpdateState call
    63  	// is suppressed.
    64  	needUpdateStateOnResume bool
    65  }
    66  
    67  func newBalancerStateAggregator(cc balancer.ClientConn, logger *grpclog.PrefixLogger) *balancerStateAggregator {
    68  	return &balancerStateAggregator{
    69  		cc:              cc,
    70  		logger:          logger,
    71  		idToPickerState: make(map[string]*subBalancerState),
    72  	}
    73  }
    74  
    75  // Start starts the aggregator. It can be called after Close to restart the
    76  // aggretator.
    77  func (bsa *balancerStateAggregator) start() {
    78  	bsa.mu.Lock()
    79  	defer bsa.mu.Unlock()
    80  	bsa.started = true
    81  }
    82  
    83  // Close closes the aggregator. When the aggregator is closed, it won't call
    84  // parent ClientConn to update balancer state.
    85  func (bsa *balancerStateAggregator) close() {
    86  	bsa.mu.Lock()
    87  	defer bsa.mu.Unlock()
    88  	bsa.started = false
    89  	bsa.clearStates()
    90  }
    91  
    92  // add adds a sub-balancer state with weight. It adds a place holder, and waits
    93  // for the real sub-balancer to update state.
    94  //
    95  // This is called when there's a new child.
    96  func (bsa *balancerStateAggregator) add(id string) {
    97  	bsa.mu.Lock()
    98  	defer bsa.mu.Unlock()
    99  	bsa.idToPickerState[id] = &subBalancerState{
   100  		// Start everything in CONNECTING, so if one of the sub-balancers
   101  		// reports TransientFailure, the RPCs will still wait for the other
   102  		// sub-balancers.
   103  		state: balancer.State{
   104  			ConnectivityState: connectivity.Connecting,
   105  			Picker:            base.NewErrPicker(balancer.ErrNoSubConnAvailable),
   106  		},
   107  		stateToAggregate: connectivity.Connecting,
   108  	}
   109  }
   110  
   111  // remove removes the sub-balancer state. Future updates from this sub-balancer,
   112  // if any, will be ignored.
   113  //
   114  // This is called when a child is removed.
   115  func (bsa *balancerStateAggregator) remove(id string) {
   116  	bsa.mu.Lock()
   117  	defer bsa.mu.Unlock()
   118  	if _, ok := bsa.idToPickerState[id]; !ok {
   119  		return
   120  	}
   121  	// Remove id and picker from picker map. This also results in future updates
   122  	// for this ID to be ignored.
   123  	delete(bsa.idToPickerState, id)
   124  }
   125  
   126  // pauseStateUpdates causes UpdateState calls to not propagate to the parent
   127  // ClientConn.  The last state will be remembered and propagated when
   128  // ResumeStateUpdates is called.
   129  func (bsa *balancerStateAggregator) pauseStateUpdates() {
   130  	bsa.mu.Lock()
   131  	defer bsa.mu.Unlock()
   132  	bsa.pauseUpdateState = true
   133  	bsa.needUpdateStateOnResume = false
   134  }
   135  
   136  // resumeStateUpdates will resume propagating UpdateState calls to the parent,
   137  // and call UpdateState on the parent if any UpdateState call was suppressed.
   138  func (bsa *balancerStateAggregator) resumeStateUpdates() {
   139  	bsa.mu.Lock()
   140  	defer bsa.mu.Unlock()
   141  	bsa.pauseUpdateState = false
   142  	if bsa.needUpdateStateOnResume {
   143  		bsa.cc.UpdateState(bsa.build())
   144  	}
   145  }
   146  
   147  // UpdateState is called to report a balancer state change from sub-balancer.
   148  // It's usually called by the balancer group.
   149  //
   150  // It calls parent ClientConn's UpdateState with the new aggregated state.
   151  func (bsa *balancerStateAggregator) UpdateState(id string, state balancer.State) {
   152  	bsa.mu.Lock()
   153  	defer bsa.mu.Unlock()
   154  	pickerSt, ok := bsa.idToPickerState[id]
   155  	if !ok {
   156  		// All state starts with an entry in pickStateMap. If ID is not in map,
   157  		// it's either removed, or never existed.
   158  		return
   159  	}
   160  	if !(pickerSt.state.ConnectivityState == connectivity.TransientFailure && state.ConnectivityState == connectivity.Connecting) {
   161  		// If old state is TransientFailure, and new state is Connecting, don't
   162  		// update the state, to prevent the aggregated state from being always
   163  		// CONNECTING. Otherwise, stateToAggregate is the same as
   164  		// state.ConnectivityState.
   165  		pickerSt.stateToAggregate = state.ConnectivityState
   166  	}
   167  	pickerSt.state = state
   168  
   169  	if !bsa.started {
   170  		return
   171  	}
   172  	if bsa.pauseUpdateState {
   173  		// If updates are paused, do not call UpdateState, but remember that we
   174  		// need to call it when they are resumed.
   175  		bsa.needUpdateStateOnResume = true
   176  		return
   177  	}
   178  	bsa.cc.UpdateState(bsa.build())
   179  }
   180  
   181  // clearState Reset everything to init state (Connecting) but keep the entry in
   182  // map (to keep the weight).
   183  //
   184  // Caller must hold bsa.mu.
   185  func (bsa *balancerStateAggregator) clearStates() {
   186  	for _, pState := range bsa.idToPickerState {
   187  		pState.state = balancer.State{
   188  			ConnectivityState: connectivity.Connecting,
   189  			Picker:            base.NewErrPicker(balancer.ErrNoSubConnAvailable),
   190  		}
   191  		pState.stateToAggregate = connectivity.Connecting
   192  	}
   193  }
   194  
   195  // buildAndUpdate combines the sub-state from each sub-balancer into one state,
   196  // and update it to parent ClientConn.
   197  func (bsa *balancerStateAggregator) buildAndUpdate() {
   198  	bsa.mu.Lock()
   199  	defer bsa.mu.Unlock()
   200  	if !bsa.started {
   201  		return
   202  	}
   203  	if bsa.pauseUpdateState {
   204  		// If updates are paused, do not call UpdateState, but remember that we
   205  		// need to call it when they are resumed.
   206  		bsa.needUpdateStateOnResume = true
   207  		return
   208  	}
   209  	bsa.cc.UpdateState(bsa.build())
   210  }
   211  
   212  // build combines sub-states into one. The picker will do a child pick.
   213  //
   214  // Caller must hold bsa.mu.
   215  func (bsa *balancerStateAggregator) build() balancer.State {
   216  	// TODO: the majority of this function (and UpdateState) is exactly the same
   217  	// as weighted_target's state aggregator. Try to make a general utility
   218  	// function/struct to handle the logic.
   219  	//
   220  	// One option: make a SubBalancerState that handles Update(State), including
   221  	// handling the special connecting after ready, as in UpdateState(). Then a
   222  	// function to calculate the aggregated connectivity state as in this
   223  	// function.
   224  	//
   225  	// TODO: use balancer.ConnectivityStateEvaluator to calculate the aggregated
   226  	// state.
   227  	var readyN, connectingN, idleN int
   228  	for _, ps := range bsa.idToPickerState {
   229  		switch ps.stateToAggregate {
   230  		case connectivity.Ready:
   231  			readyN++
   232  		case connectivity.Connecting:
   233  			connectingN++
   234  		case connectivity.Idle:
   235  			idleN++
   236  		}
   237  	}
   238  	var aggregatedState connectivity.State
   239  	switch {
   240  	case readyN > 0:
   241  		aggregatedState = connectivity.Ready
   242  	case connectingN > 0:
   243  		aggregatedState = connectivity.Connecting
   244  	case idleN > 0:
   245  		aggregatedState = connectivity.Idle
   246  	default:
   247  		aggregatedState = connectivity.TransientFailure
   248  	}
   249  
   250  	// The picker's return error might not be consistent with the
   251  	// aggregatedState. Because for this LB policy, we want to always build
   252  	// picker with all sub-pickers (not only ready sub-pickers), so even if the
   253  	// overall state is Ready, pick for certain RPCs can behave like Connecting
   254  	// or TransientFailure.
   255  	bsa.logger.Infof("Child pickers: %+v", bsa.idToPickerState)
   256  	return balancer.State{
   257  		ConnectivityState: aggregatedState,
   258  		Picker:            newPickerGroup(bsa.idToPickerState),
   259  	}
   260  }
   261  

View as plain text