...

Source file src/google.golang.org/grpc/xds/internal/balancer/ringhash/ringhash.go

Documentation: google.golang.org/grpc/xds/internal/balancer/ringhash

     1  /*
     2   *
     3   * Copyright 2021 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  // Package ringhash implements the ringhash balancer.
    20  package ringhash
    21  
    22  import (
    23  	"encoding/json"
    24  	"errors"
    25  	"fmt"
    26  	"sync"
    27  
    28  	"google.golang.org/grpc/balancer"
    29  	"google.golang.org/grpc/balancer/base"
    30  	"google.golang.org/grpc/balancer/weightedroundrobin"
    31  	"google.golang.org/grpc/connectivity"
    32  	"google.golang.org/grpc/internal/grpclog"
    33  	"google.golang.org/grpc/internal/pretty"
    34  	"google.golang.org/grpc/resolver"
    35  	"google.golang.org/grpc/serviceconfig"
    36  )
    37  
    38  // Name is the name of the ring_hash balancer.
    39  const Name = "ring_hash_experimental"
    40  
    41  func init() {
    42  	balancer.Register(bb{})
    43  }
    44  
    45  type bb struct{}
    46  
    47  func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
    48  	b := &ringhashBalancer{
    49  		cc:       cc,
    50  		subConns: resolver.NewAddressMap(),
    51  		scStates: make(map[balancer.SubConn]*subConn),
    52  		csEvltr:  &connectivityStateEvaluator{},
    53  	}
    54  	b.logger = prefixLogger(b)
    55  	b.logger.Infof("Created")
    56  	return b
    57  }
    58  
    59  func (bb) Name() string {
    60  	return Name
    61  }
    62  
    63  func (bb) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
    64  	return parseConfig(c)
    65  }
    66  
    67  type subConn struct {
    68  	addr   string
    69  	weight uint32
    70  	sc     balancer.SubConn
    71  	logger *grpclog.PrefixLogger
    72  
    73  	mu sync.RWMutex
    74  	// This is the actual state of this SubConn (as updated by the ClientConn).
    75  	// The effective state can be different, see comment of attemptedToConnect.
    76  	state connectivity.State
    77  	// failing is whether this SubConn is in a failing state. A subConn is
    78  	// considered to be in a failing state if it was previously in
    79  	// TransientFailure.
    80  	//
    81  	// This affects the effective connectivity state of this SubConn, e.g.
    82  	// - if the actual state is Idle or Connecting, but this SubConn is failing,
    83  	// the effective state is TransientFailure.
    84  	//
    85  	// This is used in pick(). E.g. if a subConn is Idle, but has failing as
    86  	// true, pick() will
    87  	// - consider this SubConn as TransientFailure, and check the state of the
    88  	// next SubConn.
    89  	// - trigger Connect() (note that normally a SubConn in real
    90  	// TransientFailure cannot Connect())
    91  	//
    92  	// A subConn starts in non-failing (failing is false). A transition to
    93  	// TransientFailure sets failing to true (and it stays true). A transition
    94  	// to Ready sets failing to false.
    95  	failing bool
    96  	// connectQueued is true if a Connect() was queued for this SubConn while
    97  	// it's not in Idle (most likely was in TransientFailure). A Connect() will
    98  	// be triggered on this SubConn when it turns Idle.
    99  	//
   100  	// When connectivity state is updated to Idle for this SubConn, if
   101  	// connectQueued is true, Connect() will be called on the SubConn.
   102  	connectQueued bool
   103  	// attemptingToConnect indicates if this subconn is attempting to connect.
   104  	// It's set when queueConnect is called. It's unset when the state is
   105  	// changed to Ready/Shutdown, or Idle (and if connectQueued is false).
   106  	attemptingToConnect bool
   107  }
   108  
   109  // setState updates the state of this SubConn.
   110  //
   111  // It also handles the queued Connect(). If the new state is Idle, and a
   112  // Connect() was queued, this SubConn will be triggered to Connect().
   113  func (sc *subConn) setState(s connectivity.State) {
   114  	sc.mu.Lock()
   115  	defer sc.mu.Unlock()
   116  	switch s {
   117  	case connectivity.Idle:
   118  		// Trigger Connect() if new state is Idle, and there is a queued connect.
   119  		if sc.connectQueued {
   120  			sc.connectQueued = false
   121  			sc.logger.Infof("Executing a queued connect for subConn moving to state: %v", sc.state)
   122  			sc.sc.Connect()
   123  		} else {
   124  			sc.attemptingToConnect = false
   125  		}
   126  	case connectivity.Connecting:
   127  		// Clear connectQueued if the SubConn isn't failing. This state
   128  		// transition is unlikely to happen, but handle this just in case.
   129  		sc.connectQueued = false
   130  	case connectivity.Ready:
   131  		// Clear connectQueued if the SubConn isn't failing. This state
   132  		// transition is unlikely to happen, but handle this just in case.
   133  		sc.connectQueued = false
   134  		sc.attemptingToConnect = false
   135  		// Set to a non-failing state.
   136  		sc.failing = false
   137  	case connectivity.TransientFailure:
   138  		// Set to a failing state.
   139  		sc.failing = true
   140  	case connectivity.Shutdown:
   141  		sc.attemptingToConnect = false
   142  	}
   143  	sc.state = s
   144  }
   145  
   146  // effectiveState returns the effective state of this SubConn. It can be
   147  // different from the actual state, e.g. Idle while the subConn is failing is
   148  // considered TransientFailure. Read comment of field failing for other cases.
   149  func (sc *subConn) effectiveState() connectivity.State {
   150  	sc.mu.RLock()
   151  	defer sc.mu.RUnlock()
   152  	if sc.failing && (sc.state == connectivity.Idle || sc.state == connectivity.Connecting) {
   153  		return connectivity.TransientFailure
   154  	}
   155  	return sc.state
   156  }
   157  
   158  // queueConnect sets a boolean so that when the SubConn state changes to Idle,
   159  // it's Connect() will be triggered. If the SubConn state is already Idle, it
   160  // will just call Connect().
   161  func (sc *subConn) queueConnect() {
   162  	sc.mu.Lock()
   163  	defer sc.mu.Unlock()
   164  	sc.attemptingToConnect = true
   165  	if sc.state == connectivity.Idle {
   166  		sc.logger.Infof("Executing a queued connect for subConn in state: %v", sc.state)
   167  		sc.sc.Connect()
   168  		return
   169  	}
   170  	// Queue this connect, and when this SubConn switches back to Idle (happens
   171  	// after backoff in TransientFailure), it will Connect().
   172  	sc.logger.Infof("Queueing a connect for subConn in state: %v", sc.state)
   173  	sc.connectQueued = true
   174  }
   175  
   176  func (sc *subConn) isAttemptingToConnect() bool {
   177  	sc.mu.Lock()
   178  	defer sc.mu.Unlock()
   179  	return sc.attemptingToConnect
   180  }
   181  
   182  type ringhashBalancer struct {
   183  	cc     balancer.ClientConn
   184  	logger *grpclog.PrefixLogger
   185  
   186  	config   *LBConfig
   187  	subConns *resolver.AddressMap // Map from resolver.Address to `*subConn`.
   188  	scStates map[balancer.SubConn]*subConn
   189  
   190  	// ring is always in sync with subConns. When subConns change, a new ring is
   191  	// generated. Note that address weights updates (they are keys in the
   192  	// subConns map) also regenerates the ring.
   193  	ring    *ring
   194  	picker  balancer.Picker
   195  	csEvltr *connectivityStateEvaluator
   196  	state   connectivity.State
   197  
   198  	resolverErr error // the last error reported by the resolver; cleared on successful resolution
   199  	connErr     error // the last connection error; cleared upon leaving TransientFailure
   200  }
   201  
   202  // updateAddresses creates new SubConns and removes SubConns, based on the
   203  // address update.
   204  //
   205  // The return value is whether the new address list is different from the
   206  // previous. True if
   207  // - an address was added
   208  // - an address was removed
   209  // - an address's weight was updated
   210  //
   211  // Note that this function doesn't trigger SubConn connecting, so all the new
   212  // SubConn states are Idle.
   213  func (b *ringhashBalancer) updateAddresses(addrs []resolver.Address) bool {
   214  	var addrsUpdated bool
   215  	// addrsSet is the set converted from addrs, used for quick lookup.
   216  	addrsSet := resolver.NewAddressMap()
   217  	for _, addr := range addrs {
   218  		addrsSet.Set(addr, true)
   219  		newWeight := getWeightAttribute(addr)
   220  		if val, ok := b.subConns.Get(addr); !ok {
   221  			var sc balancer.SubConn
   222  			opts := balancer.NewSubConnOptions{
   223  				HealthCheckEnabled: true,
   224  				StateListener:      func(state balancer.SubConnState) { b.updateSubConnState(sc, state) },
   225  			}
   226  			sc, err := b.cc.NewSubConn([]resolver.Address{addr}, opts)
   227  			if err != nil {
   228  				b.logger.Warningf("Failed to create new SubConn: %v", err)
   229  				continue
   230  			}
   231  			scs := &subConn{addr: addr.Addr, weight: newWeight, sc: sc}
   232  			scs.logger = subConnPrefixLogger(b, scs)
   233  			scs.setState(connectivity.Idle)
   234  			b.state = b.csEvltr.recordTransition(connectivity.Shutdown, connectivity.Idle)
   235  			b.subConns.Set(addr, scs)
   236  			b.scStates[sc] = scs
   237  			addrsUpdated = true
   238  		} else {
   239  			// We have seen this address before and created a subConn for it. If the
   240  			// weight associated with the address has changed, update the subConns map
   241  			// with the new weight. This will be used when a new ring is created.
   242  			//
   243  			// There is no need to call UpdateAddresses on the subConn at this point
   244  			// since *only* the weight attribute has changed, and that does not affect
   245  			// subConn uniqueness.
   246  			scInfo := val.(*subConn)
   247  			if oldWeight := scInfo.weight; oldWeight != newWeight {
   248  				scInfo.weight = newWeight
   249  				b.subConns.Set(addr, scInfo)
   250  				// Return true to force recreation of the ring.
   251  				addrsUpdated = true
   252  			}
   253  		}
   254  	}
   255  	for _, addr := range b.subConns.Keys() {
   256  		// addr was removed by resolver.
   257  		if _, ok := addrsSet.Get(addr); !ok {
   258  			v, _ := b.subConns.Get(addr)
   259  			scInfo := v.(*subConn)
   260  			scInfo.sc.Shutdown()
   261  			b.subConns.Delete(addr)
   262  			addrsUpdated = true
   263  			// Keep the state of this sc in b.scStates until sc's state becomes Shutdown.
   264  			// The entry will be deleted in updateSubConnState.
   265  		}
   266  	}
   267  	return addrsUpdated
   268  }
   269  
   270  func (b *ringhashBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
   271  	b.logger.Infof("Received update from resolver, balancer config: %+v", pretty.ToJSON(s.BalancerConfig))
   272  	newConfig, ok := s.BalancerConfig.(*LBConfig)
   273  	if !ok {
   274  		return fmt.Errorf("unexpected balancer config with type: %T", s.BalancerConfig)
   275  	}
   276  
   277  	// If addresses were updated, whether it resulted in SubConn
   278  	// creation/deletion, or just weight update, we need to regenerate the ring
   279  	// and send a new picker.
   280  	regenerateRing := b.updateAddresses(s.ResolverState.Addresses)
   281  
   282  	// If the ring configuration has changed, we need to regenerate the ring and
   283  	// send a new picker.
   284  	if b.config == nil || b.config.MinRingSize != newConfig.MinRingSize || b.config.MaxRingSize != newConfig.MaxRingSize {
   285  		regenerateRing = true
   286  	}
   287  	b.config = newConfig
   288  
   289  	// If resolver state contains no addresses, return an error so ClientConn
   290  	// will trigger re-resolve. Also records this as an resolver error, so when
   291  	// the overall state turns transient failure, the error message will have
   292  	// the zero address information.
   293  	if len(s.ResolverState.Addresses) == 0 {
   294  		b.ResolverError(errors.New("produced zero addresses"))
   295  		return balancer.ErrBadResolverState
   296  	}
   297  
   298  	if regenerateRing {
   299  		// Ring creation is guaranteed to not fail because we call newRing()
   300  		// with a non-empty subConns map.
   301  		b.ring = newRing(b.subConns, b.config.MinRingSize, b.config.MaxRingSize, b.logger)
   302  		b.regeneratePicker()
   303  		b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker})
   304  	}
   305  
   306  	// Successful resolution; clear resolver error and return nil.
   307  	b.resolverErr = nil
   308  	return nil
   309  }
   310  
   311  func (b *ringhashBalancer) ResolverError(err error) {
   312  	b.resolverErr = err
   313  	if b.subConns.Len() == 0 {
   314  		b.state = connectivity.TransientFailure
   315  	}
   316  
   317  	if b.state != connectivity.TransientFailure {
   318  		// The picker will not change since the balancer does not currently
   319  		// report an error.
   320  		return
   321  	}
   322  	b.regeneratePicker()
   323  	b.cc.UpdateState(balancer.State{
   324  		ConnectivityState: b.state,
   325  		Picker:            b.picker,
   326  	})
   327  }
   328  
   329  func (b *ringhashBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
   330  	b.logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", sc, state)
   331  }
   332  
   333  // updateSubConnState updates the per-SubConn state stored in the ring, and also
   334  // the aggregated state.
   335  //
   336  //	It triggers an update to cc when:
   337  //	- the new state is TransientFailure, to update the error message
   338  //	  - it's possible that this is a noop, but sending an extra update is easier
   339  //	    than comparing errors
   340  //
   341  //	- the aggregated state is changed
   342  //	  - the same picker will be sent again, but this update may trigger a re-pick
   343  //	    for some RPCs.
   344  func (b *ringhashBalancer) updateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
   345  	s := state.ConnectivityState
   346  	if logger.V(2) {
   347  		b.logger.Infof("Handle SubConn state change: %p, %v", sc, s)
   348  	}
   349  	scs, ok := b.scStates[sc]
   350  	if !ok {
   351  		b.logger.Infof("Received state change for an unknown SubConn: %p, %v", sc, s)
   352  		return
   353  	}
   354  	oldSCState := scs.effectiveState()
   355  	scs.setState(s)
   356  	newSCState := scs.effectiveState()
   357  	b.logger.Infof("SubConn's effective old state was: %v, new state is %v", oldSCState, newSCState)
   358  
   359  	b.state = b.csEvltr.recordTransition(oldSCState, newSCState)
   360  
   361  	switch s {
   362  	case connectivity.TransientFailure:
   363  		// Save error to be reported via picker.
   364  		b.connErr = state.ConnectionError
   365  	case connectivity.Shutdown:
   366  		// When an address was removed by resolver, b called Shutdown but kept
   367  		// the sc's state in scStates. Remove state for this sc here.
   368  		delete(b.scStates, sc)
   369  	}
   370  
   371  	if oldSCState != newSCState {
   372  		// Because the picker caches the state of the subconns, we always
   373  		// regenerate and update the picker when the effective SubConn state
   374  		// changes.
   375  		b.regeneratePicker()
   376  		b.logger.Infof("Pushing new state %v and picker %p", b.state, b.picker)
   377  		b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker})
   378  	}
   379  
   380  	switch b.state {
   381  	case connectivity.Connecting, connectivity.TransientFailure:
   382  		// When overall state is TransientFailure, we need to make sure at least
   383  		// one SubConn is attempting to connect, otherwise this balancer may
   384  		// never get picks if the parent is priority.
   385  		//
   386  		// Because we report Connecting as the overall state when only one
   387  		// SubConn is in TransientFailure, we do the same check for Connecting
   388  		// here.
   389  		//
   390  		// Note that this check also covers deleting SubConns due to address
   391  		// change. E.g. if the SubConn attempting to connect is deleted, and the
   392  		// overall state is TF. Since there must be at least one SubConn
   393  		// attempting to connect, we need to trigger one. But since the deleted
   394  		// SubConn will eventually send a shutdown update, this code will run
   395  		// and trigger the next SubConn to connect.
   396  		for _, v := range b.subConns.Values() {
   397  			sc := v.(*subConn)
   398  			if sc.isAttemptingToConnect() {
   399  				return
   400  			}
   401  		}
   402  		// Trigger a SubConn (this updated SubConn's next SubConn in the ring)
   403  		// to connect if nobody is attempting to connect.
   404  		sc := nextSkippingDuplicatesSubConn(b.ring, scs)
   405  		if sc != nil {
   406  			sc.queueConnect()
   407  			return
   408  		}
   409  		// This handles the edge case where we have a single subConn in the
   410  		// ring. nextSkippingDuplicatesSubCon() would have returned nil. We
   411  		// still need to ensure that some subConn is attempting to connect, in
   412  		// order to give the LB policy a chance to move out of
   413  		// TRANSIENT_FAILURE. Hence, we try connecting on the current subConn.
   414  		scs.queueConnect()
   415  	}
   416  }
   417  
   418  // mergeErrors builds an error from the last connection error and the last
   419  // resolver error.  Must only be called if b.state is TransientFailure.
   420  func (b *ringhashBalancer) mergeErrors() error {
   421  	// connErr must always be non-nil unless there are no SubConns, in which
   422  	// case resolverErr must be non-nil.
   423  	if b.connErr == nil {
   424  		return fmt.Errorf("last resolver error: %v", b.resolverErr)
   425  	}
   426  	if b.resolverErr == nil {
   427  		return fmt.Errorf("last connection error: %v", b.connErr)
   428  	}
   429  	return fmt.Errorf("last connection error: %v; last resolver error: %v", b.connErr, b.resolverErr)
   430  }
   431  
   432  func (b *ringhashBalancer) regeneratePicker() {
   433  	if b.state == connectivity.TransientFailure {
   434  		b.picker = base.NewErrPicker(b.mergeErrors())
   435  		return
   436  	}
   437  	b.picker = newPicker(b.ring, b.logger)
   438  }
   439  
   440  func (b *ringhashBalancer) Close() {
   441  	b.logger.Infof("Shutdown")
   442  }
   443  
   444  func (b *ringhashBalancer) ExitIdle() {
   445  	// ExitIdle implementation is a no-op because connections are either
   446  	// triggers from picks or from subConn state changes.
   447  }
   448  
   449  // connectivityStateEvaluator takes the connectivity states of multiple SubConns
   450  // and returns one aggregated connectivity state.
   451  //
   452  // It's not thread safe.
   453  type connectivityStateEvaluator struct {
   454  	sum  uint64
   455  	nums [5]uint64
   456  }
   457  
   458  // recordTransition records state change happening in subConn and based on that
   459  // it evaluates what aggregated state should be.
   460  //
   461  // - If there is at least one subchannel in READY state, report READY.
   462  // - If there are 2 or more subchannels in TRANSIENT_FAILURE state, report TRANSIENT_FAILURE.
   463  // - If there is at least one subchannel in CONNECTING state, report CONNECTING.
   464  // - If there is one subchannel in TRANSIENT_FAILURE and there is more than one subchannel, report state CONNECTING.
   465  // - If there is at least one subchannel in Idle state, report Idle.
   466  // - Otherwise, report TRANSIENT_FAILURE.
   467  //
   468  // Note that if there are 1 connecting, 2 transient failure, the overall state
   469  // is transient failure. This is because the second transient failure is a
   470  // fallback of the first failing SubConn, and we want to report transient
   471  // failure to failover to the lower priority.
   472  func (cse *connectivityStateEvaluator) recordTransition(oldState, newState connectivity.State) connectivity.State {
   473  	// Update counters.
   474  	for idx, state := range []connectivity.State{oldState, newState} {
   475  		updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new.
   476  		cse.nums[state] += updateVal
   477  	}
   478  	if oldState == connectivity.Shutdown {
   479  		// There's technically no transition from Shutdown. But we record a
   480  		// Shutdown->Idle transition when a new SubConn is created.
   481  		cse.sum++
   482  	}
   483  	if newState == connectivity.Shutdown {
   484  		cse.sum--
   485  	}
   486  
   487  	if cse.nums[connectivity.Ready] > 0 {
   488  		return connectivity.Ready
   489  	}
   490  	if cse.nums[connectivity.TransientFailure] > 1 {
   491  		return connectivity.TransientFailure
   492  	}
   493  	if cse.nums[connectivity.Connecting] > 0 {
   494  		return connectivity.Connecting
   495  	}
   496  	if cse.nums[connectivity.TransientFailure] > 0 && cse.sum > 1 {
   497  		return connectivity.Connecting
   498  	}
   499  	if cse.nums[connectivity.Idle] > 0 {
   500  		return connectivity.Idle
   501  	}
   502  	return connectivity.TransientFailure
   503  }
   504  
   505  // getWeightAttribute is a convenience function which returns the value of the
   506  // weight attribute stored in the BalancerAttributes field of addr, using the
   507  // weightedroundrobin package.
   508  //
   509  // When used in the xDS context, the weight attribute is guaranteed to be
   510  // non-zero. But, when used in a non-xDS context, the weight attribute could be
   511  // unset. A Default of 1 is used in the latter case.
   512  func getWeightAttribute(addr resolver.Address) uint32 {
   513  	w := weightedroundrobin.GetAddrInfo(addr).Weight
   514  	if w == 0 {
   515  		return 1
   516  	}
   517  	return w
   518  }
   519  

View as plain text