...

Source file src/google.golang.org/grpc/xds/internal/balancer/ringhash/picker.go

Documentation: google.golang.org/grpc/xds/internal/balancer/ringhash

     1  /*
     2   *
     3   * Copyright 2021 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  package ringhash
    20  
    21  import (
    22  	"fmt"
    23  
    24  	"google.golang.org/grpc/balancer"
    25  	"google.golang.org/grpc/codes"
    26  	"google.golang.org/grpc/connectivity"
    27  	"google.golang.org/grpc/internal/grpclog"
    28  	"google.golang.org/grpc/status"
    29  )
    30  
    31  type picker struct {
    32  	ring          *ring
    33  	logger        *grpclog.PrefixLogger
    34  	subConnStates map[*subConn]connectivity.State
    35  }
    36  
    37  func newPicker(ring *ring, logger *grpclog.PrefixLogger) *picker {
    38  	states := make(map[*subConn]connectivity.State)
    39  	for _, e := range ring.items {
    40  		states[e.sc] = e.sc.effectiveState()
    41  	}
    42  	return &picker{ring: ring, logger: logger, subConnStates: states}
    43  }
    44  
    45  // handleRICSResult is the return type of handleRICS. It's needed to wrap the
    46  // returned error from Pick() in a struct. With this, if the return values are
    47  // `balancer.PickResult, error, bool`, linter complains because error is not the
    48  // last return value.
    49  type handleRICSResult struct {
    50  	pr  balancer.PickResult
    51  	err error
    52  }
    53  
    54  // handleRICS generates pick result if the entry is in Ready, Idle, Connecting
    55  // or Shutdown. TransientFailure will be handled specifically after this
    56  // function returns.
    57  //
    58  // The first return value indicates if the state is in Ready, Idle, Connecting
    59  // or Shutdown. If it's true, the PickResult and error should be returned from
    60  // Pick() as is.
    61  func (p *picker) handleRICS(e *ringEntry) (handleRICSResult, bool) {
    62  	switch state := p.subConnStates[e.sc]; state {
    63  	case connectivity.Ready:
    64  		return handleRICSResult{pr: balancer.PickResult{SubConn: e.sc.sc}}, true
    65  	case connectivity.Idle:
    66  		// Trigger Connect() and queue the pick.
    67  		e.sc.queueConnect()
    68  		return handleRICSResult{err: balancer.ErrNoSubConnAvailable}, true
    69  	case connectivity.Connecting:
    70  		return handleRICSResult{err: balancer.ErrNoSubConnAvailable}, true
    71  	case connectivity.TransientFailure:
    72  		// Return ok==false, so TransientFailure will be handled afterwards.
    73  		return handleRICSResult{}, false
    74  	case connectivity.Shutdown:
    75  		// Shutdown can happen in a race where the old picker is called. A new
    76  		// picker should already be sent.
    77  		return handleRICSResult{err: balancer.ErrNoSubConnAvailable}, true
    78  	default:
    79  		// Should never reach this. All the connectivity states are already
    80  		// handled in the cases.
    81  		p.logger.Errorf("SubConn has undefined connectivity state: %v", state)
    82  		return handleRICSResult{err: status.Errorf(codes.Unavailable, "SubConn has undefined connectivity state: %v", state)}, true
    83  	}
    84  }
    85  
    86  func (p *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
    87  	e := p.ring.pick(getRequestHash(info.Ctx))
    88  	if hr, ok := p.handleRICS(e); ok {
    89  		return hr.pr, hr.err
    90  	}
    91  	// ok was false, the entry is in transient failure.
    92  	return p.handleTransientFailure(e)
    93  }
    94  
    95  func (p *picker) handleTransientFailure(e *ringEntry) (balancer.PickResult, error) {
    96  	// Queue a connect on the first picked SubConn.
    97  	e.sc.queueConnect()
    98  
    99  	// Find next entry in the ring, skipping duplicate SubConns.
   100  	e2 := nextSkippingDuplicates(p.ring, e)
   101  	if e2 == nil {
   102  		// There's no next entry available, fail the pick.
   103  		return balancer.PickResult{}, fmt.Errorf("the only SubConn is in Transient Failure")
   104  	}
   105  
   106  	// For the second SubConn, also check Ready/Idle/Connecting as if it's the
   107  	// first entry.
   108  	if hr, ok := p.handleRICS(e2); ok {
   109  		return hr.pr, hr.err
   110  	}
   111  
   112  	// The second SubConn is also in TransientFailure. Queue a connect on it.
   113  	e2.sc.queueConnect()
   114  
   115  	// If it gets here, this is after the second SubConn, and the second SubConn
   116  	// was in TransientFailure.
   117  	//
   118  	// Loop over all other SubConns:
   119  	// - If all SubConns so far are all TransientFailure, trigger Connect() on
   120  	// the TransientFailure SubConns, and keep going.
   121  	// - If there's one SubConn that's not in TransientFailure, keep checking
   122  	// the remaining SubConns (in case there's a Ready, which will be returned),
   123  	// but don't not trigger Connect() on the other SubConns.
   124  	var firstNonFailedFound bool
   125  	for ee := nextSkippingDuplicates(p.ring, e2); ee != e; ee = nextSkippingDuplicates(p.ring, ee) {
   126  		scState := p.subConnStates[ee.sc]
   127  		if scState == connectivity.Ready {
   128  			return balancer.PickResult{SubConn: ee.sc.sc}, nil
   129  		}
   130  		if firstNonFailedFound {
   131  			continue
   132  		}
   133  		if scState == connectivity.TransientFailure {
   134  			// This will queue a connect.
   135  			ee.sc.queueConnect()
   136  			continue
   137  		}
   138  		// This is a SubConn in a non-failure state. We continue to check the
   139  		// other SubConns, but remember that there was a non-failed SubConn
   140  		// seen. After this, Pick() will never trigger any SubConn to Connect().
   141  		firstNonFailedFound = true
   142  		if scState == connectivity.Idle {
   143  			// This is the first non-failed SubConn, and it is in a real Idle
   144  			// state. Trigger it to Connect().
   145  			ee.sc.queueConnect()
   146  		}
   147  	}
   148  	return balancer.PickResult{}, fmt.Errorf("no connection is Ready")
   149  }
   150  
   151  // nextSkippingDuplicates finds the next entry in the ring, with a different
   152  // subconn from the given entry.
   153  func nextSkippingDuplicates(ring *ring, entry *ringEntry) *ringEntry {
   154  	for next := ring.next(entry); next != entry; next = ring.next(next) {
   155  		if next.sc != entry.sc {
   156  			return next
   157  		}
   158  	}
   159  	// There's no qualifying next entry.
   160  	return nil
   161  }
   162  
   163  // nextSkippingDuplicatesSubConn finds the next subconn in the ring, that's
   164  // different from the given subconn.
   165  func nextSkippingDuplicatesSubConn(ring *ring, sc *subConn) *subConn {
   166  	var entry *ringEntry
   167  	for _, it := range ring.items {
   168  		if it.sc == sc {
   169  			entry = it
   170  			break
   171  		}
   172  	}
   173  	if entry == nil {
   174  		// If the given subconn is not in the ring (e.g. it was deleted), return
   175  		// the first one.
   176  		if len(ring.items) > 0 {
   177  			return ring.items[0].sc
   178  		}
   179  		return nil
   180  	}
   181  	ee := nextSkippingDuplicates(ring, entry)
   182  	if ee == nil {
   183  		return nil
   184  	}
   185  	return ee.sc
   186  }
   187  

View as plain text