...
1
18
19
20 package leastrequest
21
22 import (
23 "encoding/json"
24 "fmt"
25 "sync/atomic"
26
27 "google.golang.org/grpc/balancer"
28 "google.golang.org/grpc/balancer/base"
29 "google.golang.org/grpc/grpclog"
30 "google.golang.org/grpc/internal/grpcrand"
31 "google.golang.org/grpc/serviceconfig"
32 )
33
34
35 var grpcranduint32 = grpcrand.Uint32
36
37
38 const Name = "least_request_experimental"
39
40 var logger = grpclog.Component("least-request")
41
42 func init() {
43 balancer.Register(bb{})
44 }
45
46
47 type LBConfig struct {
48 serviceconfig.LoadBalancingConfig `json:"-"`
49
50
51
52
53 ChoiceCount uint32 `json:"choiceCount,omitempty"`
54 }
55
56 type bb struct{}
57
58 func (bb) ParseConfig(s json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
59 lbConfig := &LBConfig{
60 ChoiceCount: 2,
61 }
62 if err := json.Unmarshal(s, lbConfig); err != nil {
63 return nil, fmt.Errorf("least-request: unable to unmarshal LBConfig: %v", err)
64 }
65
66 if lbConfig.ChoiceCount < 2 {
67 return nil, fmt.Errorf("least-request: lbConfig.choiceCount: %v, must be >= 2", lbConfig.ChoiceCount)
68 }
69
70
71
72 if lbConfig.ChoiceCount > 10 {
73 lbConfig.ChoiceCount = 10
74 }
75 return lbConfig, nil
76 }
77
78 func (bb) Name() string {
79 return Name
80 }
81
82 func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
83 b := &leastRequestBalancer{scRPCCounts: make(map[balancer.SubConn]*atomic.Int32)}
84 baseBuilder := base.NewBalancerBuilder(Name, b, base.Config{HealthCheck: true})
85 b.Balancer = baseBuilder.Build(cc, bOpts)
86 return b
87 }
88
89 type leastRequestBalancer struct {
90
91
92 balancer.Balancer
93
94 choiceCount uint32
95 scRPCCounts map[balancer.SubConn]*atomic.Int32
96 }
97
98 func (lrb *leastRequestBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
99 lrCfg, ok := s.BalancerConfig.(*LBConfig)
100 if !ok {
101 logger.Errorf("least-request: received config with unexpected type %T: %v", s.BalancerConfig, s.BalancerConfig)
102 return balancer.ErrBadResolverState
103 }
104
105 lrb.choiceCount = lrCfg.ChoiceCount
106 return lrb.Balancer.UpdateClientConnState(s)
107 }
108
109 type scWithRPCCount struct {
110 sc balancer.SubConn
111 numRPCs *atomic.Int32
112 }
113
114 func (lrb *leastRequestBalancer) Build(info base.PickerBuildInfo) balancer.Picker {
115 logger.Infof("least-request: Build called with info: %v", info)
116 if len(info.ReadySCs) == 0 {
117 return base.NewErrPicker(balancer.ErrNoSubConnAvailable)
118 }
119
120 for sc := range lrb.scRPCCounts {
121 if _, ok := info.ReadySCs[sc]; !ok {
122 delete(lrb.scRPCCounts, sc)
123 }
124 }
125
126
127 for sc := range info.ReadySCs {
128 if _, ok := lrb.scRPCCounts[sc]; !ok {
129 lrb.scRPCCounts[sc] = new(atomic.Int32)
130 }
131 }
132
133
134 scs := make([]scWithRPCCount, 0, len(info.ReadySCs))
135 for sc := range info.ReadySCs {
136 scs = append(scs, scWithRPCCount{
137 sc: sc,
138 numRPCs: lrb.scRPCCounts[sc],
139 })
140 }
141
142 return &picker{
143 choiceCount: lrb.choiceCount,
144 subConns: scs,
145 }
146 }
147
148 type picker struct {
149
150
151 choiceCount uint32
152
153 subConns []scWithRPCCount
154 }
155
156 func (p *picker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
157 var pickedSC *scWithRPCCount
158 var pickedSCNumRPCs int32
159 for i := 0; i < int(p.choiceCount); i++ {
160 index := grpcranduint32() % uint32(len(p.subConns))
161 sc := p.subConns[index]
162 n := sc.numRPCs.Load()
163 if pickedSC == nil || n < pickedSCNumRPCs {
164 pickedSC = &sc
165 pickedSCNumRPCs = n
166 }
167 }
168
169
170 pickedSC.numRPCs.Add(1)
171
172
173
174 done := func(balancer.DoneInfo) {
175 pickedSC.numRPCs.Add(-1)
176 }
177 return balancer.PickResult{
178 SubConn: pickedSC.sc,
179 Done: done,
180 }, nil
181 }
182
View as plain text