...

Source file src/google.golang.org/grpc/balancer/grpclb/grpclb_picker.go

Documentation: google.golang.org/grpc/balancer/grpclb

     1  /*
     2   *
     3   * Copyright 2017 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  package grpclb
    20  
    21  import (
    22  	"sync"
    23  	"sync/atomic"
    24  
    25  	"google.golang.org/grpc/balancer"
    26  	lbpb "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1"
    27  	"google.golang.org/grpc/codes"
    28  	"google.golang.org/grpc/internal/grpcrand"
    29  	"google.golang.org/grpc/status"
    30  )
    31  
    32  // rpcStats is same as lbpb.ClientStats, except that numCallsDropped is a map
    33  // instead of a slice.
    34  type rpcStats struct {
    35  	// Only access the following fields atomically.
    36  	numCallsStarted                        int64
    37  	numCallsFinished                       int64
    38  	numCallsFinishedWithClientFailedToSend int64
    39  	numCallsFinishedKnownReceived          int64
    40  
    41  	mu sync.Mutex
    42  	// map load_balance_token -> num_calls_dropped
    43  	numCallsDropped map[string]int64
    44  }
    45  
    46  func newRPCStats() *rpcStats {
    47  	return &rpcStats{
    48  		numCallsDropped: make(map[string]int64),
    49  	}
    50  }
    51  
    52  func isZeroStats(stats *lbpb.ClientStats) bool {
    53  	return len(stats.CallsFinishedWithDrop) == 0 &&
    54  		stats.NumCallsStarted == 0 &&
    55  		stats.NumCallsFinished == 0 &&
    56  		stats.NumCallsFinishedWithClientFailedToSend == 0 &&
    57  		stats.NumCallsFinishedKnownReceived == 0
    58  }
    59  
    60  // toClientStats converts rpcStats to lbpb.ClientStats, and clears rpcStats.
    61  func (s *rpcStats) toClientStats() *lbpb.ClientStats {
    62  	stats := &lbpb.ClientStats{
    63  		NumCallsStarted:                        atomic.SwapInt64(&s.numCallsStarted, 0),
    64  		NumCallsFinished:                       atomic.SwapInt64(&s.numCallsFinished, 0),
    65  		NumCallsFinishedWithClientFailedToSend: atomic.SwapInt64(&s.numCallsFinishedWithClientFailedToSend, 0),
    66  		NumCallsFinishedKnownReceived:          atomic.SwapInt64(&s.numCallsFinishedKnownReceived, 0),
    67  	}
    68  	s.mu.Lock()
    69  	dropped := s.numCallsDropped
    70  	s.numCallsDropped = make(map[string]int64)
    71  	s.mu.Unlock()
    72  	for token, count := range dropped {
    73  		stats.CallsFinishedWithDrop = append(stats.CallsFinishedWithDrop, &lbpb.ClientStatsPerToken{
    74  			LoadBalanceToken: token,
    75  			NumCalls:         count,
    76  		})
    77  	}
    78  	return stats
    79  }
    80  
    81  func (s *rpcStats) drop(token string) {
    82  	atomic.AddInt64(&s.numCallsStarted, 1)
    83  	s.mu.Lock()
    84  	s.numCallsDropped[token]++
    85  	s.mu.Unlock()
    86  	atomic.AddInt64(&s.numCallsFinished, 1)
    87  }
    88  
    89  func (s *rpcStats) failedToSend() {
    90  	atomic.AddInt64(&s.numCallsStarted, 1)
    91  	atomic.AddInt64(&s.numCallsFinishedWithClientFailedToSend, 1)
    92  	atomic.AddInt64(&s.numCallsFinished, 1)
    93  }
    94  
    95  func (s *rpcStats) knownReceived() {
    96  	atomic.AddInt64(&s.numCallsStarted, 1)
    97  	atomic.AddInt64(&s.numCallsFinishedKnownReceived, 1)
    98  	atomic.AddInt64(&s.numCallsFinished, 1)
    99  }
   100  
   101  // rrPicker does roundrobin on subConns. It's typically used when there's no
   102  // response from remote balancer, and grpclb falls back to the resolved
   103  // backends.
   104  //
   105  // It guaranteed that len(subConns) > 0.
   106  type rrPicker struct {
   107  	mu           sync.Mutex
   108  	subConns     []balancer.SubConn // The subConns that were READY when taking the snapshot.
   109  	subConnsNext int
   110  }
   111  
   112  func newRRPicker(readySCs []balancer.SubConn) *rrPicker {
   113  	return &rrPicker{
   114  		subConns:     readySCs,
   115  		subConnsNext: grpcrand.Intn(len(readySCs)),
   116  	}
   117  }
   118  
   119  func (p *rrPicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
   120  	p.mu.Lock()
   121  	defer p.mu.Unlock()
   122  	sc := p.subConns[p.subConnsNext]
   123  	p.subConnsNext = (p.subConnsNext + 1) % len(p.subConns)
   124  	return balancer.PickResult{SubConn: sc}, nil
   125  }
   126  
   127  // lbPicker does two layers of picks:
   128  //
   129  // First layer: roundrobin on all servers in serverList, including drops and backends.
   130  // - If it picks a drop, the RPC will fail as being dropped.
   131  // - If it picks a backend, do a second layer pick to pick the real backend.
   132  //
   133  // Second layer: roundrobin on all READY backends.
   134  //
   135  // It's guaranteed that len(serverList) > 0.
   136  type lbPicker struct {
   137  	mu             sync.Mutex
   138  	serverList     []*lbpb.Server
   139  	serverListNext int
   140  	subConns       []balancer.SubConn // The subConns that were READY when taking the snapshot.
   141  	subConnsNext   int
   142  
   143  	stats *rpcStats
   144  }
   145  
   146  func newLBPicker(serverList []*lbpb.Server, readySCs []balancer.SubConn, stats *rpcStats) *lbPicker {
   147  	return &lbPicker{
   148  		serverList:   serverList,
   149  		subConns:     readySCs,
   150  		subConnsNext: grpcrand.Intn(len(readySCs)),
   151  		stats:        stats,
   152  	}
   153  }
   154  
   155  func (p *lbPicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
   156  	p.mu.Lock()
   157  	defer p.mu.Unlock()
   158  
   159  	// Layer one roundrobin on serverList.
   160  	s := p.serverList[p.serverListNext]
   161  	p.serverListNext = (p.serverListNext + 1) % len(p.serverList)
   162  
   163  	// If it's a drop, return an error and fail the RPC.
   164  	if s.Drop {
   165  		p.stats.drop(s.LoadBalanceToken)
   166  		return balancer.PickResult{}, status.Errorf(codes.Unavailable, "request dropped by grpclb")
   167  	}
   168  
   169  	// If not a drop but there's no ready subConns.
   170  	if len(p.subConns) <= 0 {
   171  		return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
   172  	}
   173  
   174  	// Return the next ready subConn in the list, also collect rpc stats.
   175  	sc := p.subConns[p.subConnsNext]
   176  	p.subConnsNext = (p.subConnsNext + 1) % len(p.subConns)
   177  	done := func(info balancer.DoneInfo) {
   178  		if !info.BytesSent {
   179  			p.stats.failedToSend()
   180  		} else if info.BytesReceived {
   181  			p.stats.knownReceived()
   182  		}
   183  	}
   184  	return balancer.PickResult{SubConn: sc, Done: done}, nil
   185  }
   186  
   187  func (p *lbPicker) updateReadySCs(readySCs []balancer.SubConn) {
   188  	p.mu.Lock()
   189  	defer p.mu.Unlock()
   190  
   191  	p.subConns = readySCs
   192  	p.subConnsNext = p.subConnsNext % len(readySCs)
   193  }
   194  

View as plain text