...

Source file src/google.golang.org/grpc/balancer/leastrequest/leastrequest.go

Documentation: google.golang.org/grpc/balancer/leastrequest

     1  /*
     2   *
     3   * Copyright 2023 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  // Package leastrequest implements a least request load balancer.
    20  package leastrequest
    21  
    22  import (
    23  	"encoding/json"
    24  	"fmt"
    25  	"sync/atomic"
    26  
    27  	"google.golang.org/grpc/balancer"
    28  	"google.golang.org/grpc/balancer/base"
    29  	"google.golang.org/grpc/grpclog"
    30  	"google.golang.org/grpc/internal/grpcrand"
    31  	"google.golang.org/grpc/serviceconfig"
    32  )
    33  
    34  // grpcranduint32 is a global to stub out in tests.
    35  var grpcranduint32 = grpcrand.Uint32
    36  
    37  // Name is the name of the least request balancer.
    38  const Name = "least_request_experimental"
    39  
    40  var logger = grpclog.Component("least-request")
    41  
    42  func init() {
    43  	balancer.Register(bb{})
    44  }
    45  
    46  // LBConfig is the balancer config for least_request_experimental balancer.
    47  type LBConfig struct {
    48  	serviceconfig.LoadBalancingConfig `json:"-"`
    49  
    50  	// ChoiceCount is the number of random SubConns to sample to find the one
    51  	// with the fewest outstanding requests. If unset, defaults to 2. If set to
    52  	// < 2, the config will be rejected, and if set to > 10, will become 10.
    53  	ChoiceCount uint32 `json:"choiceCount,omitempty"`
    54  }
    55  
    56  type bb struct{}
    57  
    58  func (bb) ParseConfig(s json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
    59  	lbConfig := &LBConfig{
    60  		ChoiceCount: 2,
    61  	}
    62  	if err := json.Unmarshal(s, lbConfig); err != nil {
    63  		return nil, fmt.Errorf("least-request: unable to unmarshal LBConfig: %v", err)
    64  	}
    65  	// "If `choice_count < 2`, the config will be rejected." - A48
    66  	if lbConfig.ChoiceCount < 2 { // sweet
    67  		return nil, fmt.Errorf("least-request: lbConfig.choiceCount: %v, must be >= 2", lbConfig.ChoiceCount)
    68  	}
    69  	// "If a LeastRequestLoadBalancingConfig with a choice_count > 10 is
    70  	// received, the least_request_experimental policy will set choice_count =
    71  	// 10." - A48
    72  	if lbConfig.ChoiceCount > 10 {
    73  		lbConfig.ChoiceCount = 10
    74  	}
    75  	return lbConfig, nil
    76  }
    77  
    78  func (bb) Name() string {
    79  	return Name
    80  }
    81  
    82  func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
    83  	b := &leastRequestBalancer{scRPCCounts: make(map[balancer.SubConn]*atomic.Int32)}
    84  	baseBuilder := base.NewBalancerBuilder(Name, b, base.Config{HealthCheck: true})
    85  	b.Balancer = baseBuilder.Build(cc, bOpts)
    86  	return b
    87  }
    88  
    89  type leastRequestBalancer struct {
    90  	// Embeds balancer.Balancer because needs to intercept UpdateClientConnState
    91  	// to learn about choiceCount.
    92  	balancer.Balancer
    93  
    94  	choiceCount uint32
    95  	scRPCCounts map[balancer.SubConn]*atomic.Int32 // Hold onto RPC counts to keep track for subsequent picker updates.
    96  }
    97  
    98  func (lrb *leastRequestBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
    99  	lrCfg, ok := s.BalancerConfig.(*LBConfig)
   100  	if !ok {
   101  		logger.Errorf("least-request: received config with unexpected type %T: %v", s.BalancerConfig, s.BalancerConfig)
   102  		return balancer.ErrBadResolverState
   103  	}
   104  
   105  	lrb.choiceCount = lrCfg.ChoiceCount
   106  	return lrb.Balancer.UpdateClientConnState(s)
   107  }
   108  
   109  type scWithRPCCount struct {
   110  	sc      balancer.SubConn
   111  	numRPCs *atomic.Int32
   112  }
   113  
   114  func (lrb *leastRequestBalancer) Build(info base.PickerBuildInfo) balancer.Picker {
   115  	logger.Infof("least-request: Build called with info: %v", info)
   116  	if len(info.ReadySCs) == 0 {
   117  		return base.NewErrPicker(balancer.ErrNoSubConnAvailable)
   118  	}
   119  
   120  	for sc := range lrb.scRPCCounts {
   121  		if _, ok := info.ReadySCs[sc]; !ok { // If no longer ready, no more need for the ref to count active RPCs.
   122  			delete(lrb.scRPCCounts, sc)
   123  		}
   124  	}
   125  
   126  	// Create new refs if needed.
   127  	for sc := range info.ReadySCs {
   128  		if _, ok := lrb.scRPCCounts[sc]; !ok {
   129  			lrb.scRPCCounts[sc] = new(atomic.Int32)
   130  		}
   131  	}
   132  
   133  	// Copy refs to counters into picker.
   134  	scs := make([]scWithRPCCount, 0, len(info.ReadySCs))
   135  	for sc := range info.ReadySCs {
   136  		scs = append(scs, scWithRPCCount{
   137  			sc:      sc,
   138  			numRPCs: lrb.scRPCCounts[sc], // guaranteed to be present due to algorithm
   139  		})
   140  	}
   141  
   142  	return &picker{
   143  		choiceCount: lrb.choiceCount,
   144  		subConns:    scs,
   145  	}
   146  }
   147  
   148  type picker struct {
   149  	// choiceCount is the number of random SubConns to find the one with
   150  	// the least request.
   151  	choiceCount uint32
   152  	// Built out when receives list of ready RPCs.
   153  	subConns []scWithRPCCount
   154  }
   155  
   156  func (p *picker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
   157  	var pickedSC *scWithRPCCount
   158  	var pickedSCNumRPCs int32
   159  	for i := 0; i < int(p.choiceCount); i++ {
   160  		index := grpcranduint32() % uint32(len(p.subConns))
   161  		sc := p.subConns[index]
   162  		n := sc.numRPCs.Load()
   163  		if pickedSC == nil || n < pickedSCNumRPCs {
   164  			pickedSC = &sc
   165  			pickedSCNumRPCs = n
   166  		}
   167  	}
   168  	// "The counter for a subchannel should be atomically incremented by one
   169  	// after it has been successfully picked by the picker." - A48
   170  	pickedSC.numRPCs.Add(1)
   171  	// "the picker should add a callback for atomically decrementing the
   172  	// subchannel counter once the RPC finishes (regardless of Status code)." -
   173  	// A48.
   174  	done := func(balancer.DoneInfo) {
   175  		pickedSC.numRPCs.Add(-1)
   176  	}
   177  	return balancer.PickResult{
   178  		SubConn: pickedSC.sc,
   179  		Done:    done,
   180  	}, nil
   181  }
   182  

View as plain text