...

Source file src/google.golang.org/grpc/balancer/weightedtarget/weightedaggregator/aggregator.go

Documentation: google.golang.org/grpc/balancer/weightedtarget/weightedaggregator

     1  /*
     2   *
     3   * Copyright 2020 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  // Package weightedaggregator implements state aggregator for weighted_target
    20  // balancer.
    21  //
    22  // This is a separate package so it can be shared by weighted_target and eds.
    23  // The eds balancer will be refactored to use weighted_target directly. After
    24  // that, all functions and structs in this package can be moved to package
    25  // weightedtarget and unexported.
    26  package weightedaggregator
    27  
    28  import (
    29  	"fmt"
    30  	"sync"
    31  
    32  	"google.golang.org/grpc/balancer"
    33  	"google.golang.org/grpc/balancer/base"
    34  	"google.golang.org/grpc/connectivity"
    35  	"google.golang.org/grpc/internal/grpclog"
    36  	"google.golang.org/grpc/internal/wrr"
    37  )
    38  
    39  type weightedPickerState struct {
    40  	weight uint32
    41  	state  balancer.State
    42  	// stateToAggregate is the connectivity state used only for state
    43  	// aggregation. It could be different from state.ConnectivityState. For
    44  	// example when a sub-balancer transitions from TransientFailure to
    45  	// connecting, state.ConnectivityState is Connecting, but stateToAggregate
    46  	// is still TransientFailure.
    47  	stateToAggregate connectivity.State
    48  }
    49  
    50  func (s *weightedPickerState) String() string {
    51  	return fmt.Sprintf("weight:%v,picker:%p,state:%v,stateToAggregate:%v", s.weight, s.state.Picker, s.state.ConnectivityState, s.stateToAggregate)
    52  }
    53  
    54  // Aggregator is the weighted balancer state aggregator.
    55  type Aggregator struct {
    56  	cc     balancer.ClientConn
    57  	logger *grpclog.PrefixLogger
    58  	newWRR func() wrr.WRR
    59  
    60  	csEvltr *balancer.ConnectivityStateEvaluator
    61  
    62  	mu sync.Mutex
    63  	// If started is false, no updates should be sent to the parent cc. A closed
    64  	// sub-balancer could still send pickers to this aggregator. This makes sure
    65  	// that no updates will be forwarded to parent when the whole balancer group
    66  	// and states aggregator is closed.
    67  	started bool
    68  	// All balancer IDs exist as keys in this map, even if balancer group is not
    69  	// started.
    70  	//
    71  	// If an ID is not in map, it's either removed or never added.
    72  	idToPickerState map[string]*weightedPickerState
    73  	// Set when UpdateState call propagation is paused.
    74  	pauseUpdateState bool
    75  	// Set when UpdateState call propagation is paused and an UpdateState call
    76  	// is suppressed.
    77  	needUpdateStateOnResume bool
    78  }
    79  
    80  // New creates a new weighted balancer state aggregator.
    81  func New(cc balancer.ClientConn, logger *grpclog.PrefixLogger, newWRR func() wrr.WRR) *Aggregator {
    82  	return &Aggregator{
    83  		cc:              cc,
    84  		logger:          logger,
    85  		newWRR:          newWRR,
    86  		csEvltr:         &balancer.ConnectivityStateEvaluator{},
    87  		idToPickerState: make(map[string]*weightedPickerState),
    88  	}
    89  }
    90  
    91  // Start starts the aggregator. It can be called after Stop to restart the
    92  // aggretator.
    93  func (wbsa *Aggregator) Start() {
    94  	wbsa.mu.Lock()
    95  	defer wbsa.mu.Unlock()
    96  	wbsa.started = true
    97  }
    98  
    99  // Stop stops the aggregator. When the aggregator is stopped, it won't call
   100  // parent ClientConn to update balancer state.
   101  func (wbsa *Aggregator) Stop() {
   102  	wbsa.mu.Lock()
   103  	defer wbsa.mu.Unlock()
   104  	wbsa.started = false
   105  	wbsa.clearStates()
   106  }
   107  
   108  // Add adds a sub-balancer state with weight. It adds a place holder, and waits for
   109  // the real sub-balancer to update state.
   110  func (wbsa *Aggregator) Add(id string, weight uint32) {
   111  	wbsa.mu.Lock()
   112  	defer wbsa.mu.Unlock()
   113  	wbsa.idToPickerState[id] = &weightedPickerState{
   114  		weight: weight,
   115  		// Start everything in CONNECTING, so if one of the sub-balancers
   116  		// reports TransientFailure, the RPCs will still wait for the other
   117  		// sub-balancers.
   118  		state: balancer.State{
   119  			ConnectivityState: connectivity.Connecting,
   120  			Picker:            base.NewErrPicker(balancer.ErrNoSubConnAvailable),
   121  		},
   122  		stateToAggregate: connectivity.Connecting,
   123  	}
   124  	wbsa.csEvltr.RecordTransition(connectivity.Shutdown, connectivity.Connecting)
   125  
   126  	wbsa.buildAndUpdateLocked()
   127  }
   128  
   129  // Remove removes the sub-balancer state. Future updates from this sub-balancer,
   130  // if any, will be ignored.
   131  func (wbsa *Aggregator) Remove(id string) {
   132  	wbsa.mu.Lock()
   133  	defer wbsa.mu.Unlock()
   134  	if _, ok := wbsa.idToPickerState[id]; !ok {
   135  		return
   136  	}
   137  	// Setting the state of the deleted sub-balancer to Shutdown will get csEvltr
   138  	// to remove the previous state for any aggregated state evaluations.
   139  	// transitions to and from connectivity.Shutdown are ignored by csEvltr.
   140  	wbsa.csEvltr.RecordTransition(wbsa.idToPickerState[id].stateToAggregate, connectivity.Shutdown)
   141  	// Remove id and picker from picker map. This also results in future updates
   142  	// for this ID to be ignored.
   143  	delete(wbsa.idToPickerState, id)
   144  	wbsa.buildAndUpdateLocked()
   145  }
   146  
   147  // UpdateWeight updates the weight for the given id. Note that this doesn't
   148  // trigger an update to the parent ClientConn. The caller should decide when
   149  // it's necessary, and call BuildAndUpdate.
   150  func (wbsa *Aggregator) UpdateWeight(id string, newWeight uint32) {
   151  	wbsa.mu.Lock()
   152  	defer wbsa.mu.Unlock()
   153  	pState, ok := wbsa.idToPickerState[id]
   154  	if !ok {
   155  		return
   156  	}
   157  	pState.weight = newWeight
   158  }
   159  
   160  // PauseStateUpdates causes UpdateState calls to not propagate to the parent
   161  // ClientConn.  The last state will be remembered and propagated when
   162  // ResumeStateUpdates is called.
   163  func (wbsa *Aggregator) PauseStateUpdates() {
   164  	wbsa.mu.Lock()
   165  	defer wbsa.mu.Unlock()
   166  	wbsa.pauseUpdateState = true
   167  	wbsa.needUpdateStateOnResume = false
   168  }
   169  
   170  // ResumeStateUpdates will resume propagating UpdateState calls to the parent,
   171  // and call UpdateState on the parent if any UpdateState call was suppressed.
   172  func (wbsa *Aggregator) ResumeStateUpdates() {
   173  	wbsa.mu.Lock()
   174  	defer wbsa.mu.Unlock()
   175  	wbsa.pauseUpdateState = false
   176  	if wbsa.needUpdateStateOnResume {
   177  		wbsa.cc.UpdateState(wbsa.build())
   178  	}
   179  }
   180  
   181  // NeedUpdateStateOnResume sets the UpdateStateOnResume bool to true, letting a
   182  // picker update be sent once ResumeStateUpdates is called.
   183  func (wbsa *Aggregator) NeedUpdateStateOnResume() {
   184  	wbsa.mu.Lock()
   185  	defer wbsa.mu.Unlock()
   186  	wbsa.needUpdateStateOnResume = true
   187  }
   188  
   189  // UpdateState is called to report a balancer state change from sub-balancer.
   190  // It's usually called by the balancer group.
   191  //
   192  // It calls parent ClientConn's UpdateState with the new aggregated state.
   193  func (wbsa *Aggregator) UpdateState(id string, newState balancer.State) {
   194  	wbsa.mu.Lock()
   195  	defer wbsa.mu.Unlock()
   196  	state, ok := wbsa.idToPickerState[id]
   197  	if !ok {
   198  		// All state starts with an entry in pickStateMap. If ID is not in map,
   199  		// it's either removed, or never existed.
   200  		return
   201  	}
   202  
   203  	if !(state.state.ConnectivityState == connectivity.TransientFailure && newState.ConnectivityState == connectivity.Connecting) {
   204  		// If old state is TransientFailure, and new state is Connecting, don't
   205  		// update the state, to prevent the aggregated state from being always
   206  		// CONNECTING. Otherwise, stateToAggregate is the same as
   207  		// state.ConnectivityState.
   208  		wbsa.csEvltr.RecordTransition(state.stateToAggregate, newState.ConnectivityState)
   209  		state.stateToAggregate = newState.ConnectivityState
   210  	}
   211  	state.state = newState
   212  
   213  	wbsa.buildAndUpdateLocked()
   214  }
   215  
   216  // clearState Reset everything to init state (Connecting) but keep the entry in
   217  // map (to keep the weight).
   218  //
   219  // Caller must hold wbsa.mu.
   220  func (wbsa *Aggregator) clearStates() {
   221  	for _, pState := range wbsa.idToPickerState {
   222  		pState.state = balancer.State{
   223  			ConnectivityState: connectivity.Connecting,
   224  			Picker:            base.NewErrPicker(balancer.ErrNoSubConnAvailable),
   225  		}
   226  		pState.stateToAggregate = connectivity.Connecting
   227  	}
   228  }
   229  
   230  // buildAndUpdateLocked aggregates the connectivity states of the sub-balancers,
   231  // builds a new picker and sends an update to the parent ClientConn.
   232  //
   233  // Caller must hold wbsa.mu.
   234  func (wbsa *Aggregator) buildAndUpdateLocked() {
   235  	if !wbsa.started {
   236  		return
   237  	}
   238  	if wbsa.pauseUpdateState {
   239  		// If updates are paused, do not call UpdateState, but remember that we
   240  		// need to call it when they are resumed.
   241  		wbsa.needUpdateStateOnResume = true
   242  		return
   243  	}
   244  
   245  	wbsa.cc.UpdateState(wbsa.build())
   246  }
   247  
   248  // build combines sub-states into one.
   249  //
   250  // Caller must hold wbsa.mu.
   251  func (wbsa *Aggregator) build() balancer.State {
   252  	wbsa.logger.Infof("Child pickers with config: %+v", wbsa.idToPickerState)
   253  
   254  	// Make sure picker's return error is consistent with the aggregatedState.
   255  	pickers := make([]weightedPickerState, 0, len(wbsa.idToPickerState))
   256  
   257  	switch aggState := wbsa.csEvltr.CurrentState(); aggState {
   258  	case connectivity.Connecting:
   259  		return balancer.State{
   260  			ConnectivityState: aggState,
   261  			Picker:            base.NewErrPicker(balancer.ErrNoSubConnAvailable)}
   262  	case connectivity.TransientFailure:
   263  		// this means that all sub-balancers are now in TransientFailure.
   264  		for _, ps := range wbsa.idToPickerState {
   265  			pickers = append(pickers, *ps)
   266  		}
   267  		return balancer.State{
   268  			ConnectivityState: aggState,
   269  			Picker:            newWeightedPickerGroup(pickers, wbsa.newWRR)}
   270  	default:
   271  		for _, ps := range wbsa.idToPickerState {
   272  			if ps.stateToAggregate == connectivity.Ready {
   273  				pickers = append(pickers, *ps)
   274  			}
   275  		}
   276  		return balancer.State{
   277  			ConnectivityState: aggState,
   278  			Picker:            newWeightedPickerGroup(pickers, wbsa.newWRR)}
   279  	}
   280  
   281  }
   282  
   283  type weightedPickerGroup struct {
   284  	w wrr.WRR
   285  }
   286  
   287  // newWeightedPickerGroup takes pickers with weights, and groups them into one
   288  // picker.
   289  //
   290  // Note it only takes ready pickers. The map shouldn't contain non-ready
   291  // pickers.
   292  func newWeightedPickerGroup(readyWeightedPickers []weightedPickerState, newWRR func() wrr.WRR) *weightedPickerGroup {
   293  	w := newWRR()
   294  	for _, ps := range readyWeightedPickers {
   295  		w.Add(ps.state.Picker, int64(ps.weight))
   296  	}
   297  
   298  	return &weightedPickerGroup{
   299  		w: w,
   300  	}
   301  }
   302  
   303  func (pg *weightedPickerGroup) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
   304  	p, ok := pg.w.Next().(balancer.Picker)
   305  	if !ok {
   306  		return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
   307  	}
   308  	return p.Pick(info)
   309  }
   310  

View as plain text