1 /* 2 * 3 * Copyright 2023 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 package weightedroundrobin 20 21 import ( 22 "math" 23 ) 24 25 type scheduler interface { 26 nextIndex() int 27 } 28 29 // newScheduler uses scWeights to create a new scheduler for selecting subconns 30 // in a picker. It will return a round robin implementation if at least 31 // len(scWeights)-1 are zero or there is only a single subconn, otherwise it 32 // will return an Earliest Deadline First (EDF) scheduler implementation that 33 // selects the subchannels according to their weights. 34 func newScheduler(scWeights []float64, inc func() uint32) scheduler { 35 n := len(scWeights) 36 if n == 0 { 37 return nil 38 } 39 if n == 1 { 40 return &rrScheduler{numSCs: 1, inc: inc} 41 } 42 sum := float64(0) 43 numZero := 0 44 max := float64(0) 45 for _, w := range scWeights { 46 sum += w 47 if w > max { 48 max = w 49 } 50 if w == 0 { 51 numZero++ 52 } 53 } 54 if numZero >= n-1 { 55 return &rrScheduler{numSCs: uint32(n), inc: inc} 56 } 57 unscaledMean := sum / float64(n-numZero) 58 scalingFactor := maxWeight / max 59 mean := uint16(math.Round(scalingFactor * unscaledMean)) 60 61 weights := make([]uint16, n) 62 allEqual := true 63 for i, w := range scWeights { 64 if w == 0 { 65 // Backends with weight = 0 use the mean. 66 weights[i] = mean 67 } else { 68 scaledWeight := uint16(math.Round(scalingFactor * w)) 69 weights[i] = scaledWeight 70 if scaledWeight != mean { 71 allEqual = false 72 } 73 } 74 } 75 76 if allEqual { 77 return &rrScheduler{numSCs: uint32(n), inc: inc} 78 } 79 80 logger.Infof("using edf scheduler with weights: %v", weights) 81 return &edfScheduler{weights: weights, inc: inc} 82 } 83 84 const maxWeight = math.MaxUint16 85 86 // edfScheduler implements EDF using the same algorithm as grpc-c++ here: 87 // 88 // https://github.com/grpc/grpc/blob/master/src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/static_stride_scheduler.cc 89 type edfScheduler struct { 90 inc func() uint32 91 weights []uint16 92 } 93 94 // Returns the index in s.weights for the picker to choose. 95 func (s *edfScheduler) nextIndex() int { 96 const offset = maxWeight / 2 97 98 for { 99 idx := uint64(s.inc()) 100 101 // The sequence number (idx) is split in two: the lower %n gives the 102 // index of the backend, and the rest gives the number of times we've 103 // iterated through all backends. `generation` is used to 104 // deterministically decide whether we pick or skip the backend on this 105 // iteration, in proportion to the backend's weight. 106 107 backendIndex := idx % uint64(len(s.weights)) 108 generation := idx / uint64(len(s.weights)) 109 weight := uint64(s.weights[backendIndex]) 110 111 // We pick a backend `weight` times per `maxWeight` generations. The 112 // multiply and modulus ~evenly spread out the picks for a given 113 // backend between different generations. The offset by `backendIndex` 114 // helps to reduce the chance of multiple consecutive non-picks: if we 115 // have two consecutive backends with an equal, say, 80% weight of the 116 // max, with no offset we would see 1/5 generations that skipped both. 117 // TODO(b/190488683): add test for offset efficacy. 118 mod := uint64(weight*generation+backendIndex*offset) % maxWeight 119 120 if mod < maxWeight-weight { 121 continue 122 } 123 return int(backendIndex) 124 } 125 } 126 127 // A simple RR scheduler to use for fallback when fewer than two backends have 128 // non-zero weights, or all backends have the same weight, or when only one 129 // subconn exists. 130 type rrScheduler struct { 131 inc func() uint32 132 numSCs uint32 133 } 134 135 func (s *rrScheduler) nextIndex() int { 136 idx := s.inc() 137 return int(idx % s.numSCs) 138 } 139