1
18
19
20
21
22
23
24
25
26 package weightedaggregator
27
28 import (
29 "fmt"
30 "sync"
31
32 "google.golang.org/grpc/balancer"
33 "google.golang.org/grpc/balancer/base"
34 "google.golang.org/grpc/connectivity"
35 "google.golang.org/grpc/internal/grpclog"
36 "google.golang.org/grpc/internal/wrr"
37 )
38
39 type weightedPickerState struct {
40 weight uint32
41 state balancer.State
42
43
44
45
46
47 stateToAggregate connectivity.State
48 }
49
50 func (s *weightedPickerState) String() string {
51 return fmt.Sprintf("weight:%v,picker:%p,state:%v,stateToAggregate:%v", s.weight, s.state.Picker, s.state.ConnectivityState, s.stateToAggregate)
52 }
53
54
55 type Aggregator struct {
56 cc balancer.ClientConn
57 logger *grpclog.PrefixLogger
58 newWRR func() wrr.WRR
59
60 csEvltr *balancer.ConnectivityStateEvaluator
61
62 mu sync.Mutex
63
64
65
66
67 started bool
68
69
70
71
72 idToPickerState map[string]*weightedPickerState
73
74 pauseUpdateState bool
75
76
77 needUpdateStateOnResume bool
78 }
79
80
81 func New(cc balancer.ClientConn, logger *grpclog.PrefixLogger, newWRR func() wrr.WRR) *Aggregator {
82 return &Aggregator{
83 cc: cc,
84 logger: logger,
85 newWRR: newWRR,
86 csEvltr: &balancer.ConnectivityStateEvaluator{},
87 idToPickerState: make(map[string]*weightedPickerState),
88 }
89 }
90
91
92
93 func (wbsa *Aggregator) Start() {
94 wbsa.mu.Lock()
95 defer wbsa.mu.Unlock()
96 wbsa.started = true
97 }
98
99
100
101 func (wbsa *Aggregator) Stop() {
102 wbsa.mu.Lock()
103 defer wbsa.mu.Unlock()
104 wbsa.started = false
105 wbsa.clearStates()
106 }
107
108
109
110 func (wbsa *Aggregator) Add(id string, weight uint32) {
111 wbsa.mu.Lock()
112 defer wbsa.mu.Unlock()
113 wbsa.idToPickerState[id] = &weightedPickerState{
114 weight: weight,
115
116
117
118 state: balancer.State{
119 ConnectivityState: connectivity.Connecting,
120 Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable),
121 },
122 stateToAggregate: connectivity.Connecting,
123 }
124 wbsa.csEvltr.RecordTransition(connectivity.Shutdown, connectivity.Connecting)
125
126 wbsa.buildAndUpdateLocked()
127 }
128
129
130
131 func (wbsa *Aggregator) Remove(id string) {
132 wbsa.mu.Lock()
133 defer wbsa.mu.Unlock()
134 if _, ok := wbsa.idToPickerState[id]; !ok {
135 return
136 }
137
138
139
140 wbsa.csEvltr.RecordTransition(wbsa.idToPickerState[id].stateToAggregate, connectivity.Shutdown)
141
142
143 delete(wbsa.idToPickerState, id)
144 wbsa.buildAndUpdateLocked()
145 }
146
147
148
149
150 func (wbsa *Aggregator) UpdateWeight(id string, newWeight uint32) {
151 wbsa.mu.Lock()
152 defer wbsa.mu.Unlock()
153 pState, ok := wbsa.idToPickerState[id]
154 if !ok {
155 return
156 }
157 pState.weight = newWeight
158 }
159
160
161
162
163 func (wbsa *Aggregator) PauseStateUpdates() {
164 wbsa.mu.Lock()
165 defer wbsa.mu.Unlock()
166 wbsa.pauseUpdateState = true
167 wbsa.needUpdateStateOnResume = false
168 }
169
170
171
172 func (wbsa *Aggregator) ResumeStateUpdates() {
173 wbsa.mu.Lock()
174 defer wbsa.mu.Unlock()
175 wbsa.pauseUpdateState = false
176 if wbsa.needUpdateStateOnResume {
177 wbsa.cc.UpdateState(wbsa.build())
178 }
179 }
180
181
182
183 func (wbsa *Aggregator) NeedUpdateStateOnResume() {
184 wbsa.mu.Lock()
185 defer wbsa.mu.Unlock()
186 wbsa.needUpdateStateOnResume = true
187 }
188
189
190
191
192
193 func (wbsa *Aggregator) UpdateState(id string, newState balancer.State) {
194 wbsa.mu.Lock()
195 defer wbsa.mu.Unlock()
196 state, ok := wbsa.idToPickerState[id]
197 if !ok {
198
199
200 return
201 }
202
203 if !(state.state.ConnectivityState == connectivity.TransientFailure && newState.ConnectivityState == connectivity.Connecting) {
204
205
206
207
208 wbsa.csEvltr.RecordTransition(state.stateToAggregate, newState.ConnectivityState)
209 state.stateToAggregate = newState.ConnectivityState
210 }
211 state.state = newState
212
213 wbsa.buildAndUpdateLocked()
214 }
215
216
217
218
219
220 func (wbsa *Aggregator) clearStates() {
221 for _, pState := range wbsa.idToPickerState {
222 pState.state = balancer.State{
223 ConnectivityState: connectivity.Connecting,
224 Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable),
225 }
226 pState.stateToAggregate = connectivity.Connecting
227 }
228 }
229
230
231
232
233
234 func (wbsa *Aggregator) buildAndUpdateLocked() {
235 if !wbsa.started {
236 return
237 }
238 if wbsa.pauseUpdateState {
239
240
241 wbsa.needUpdateStateOnResume = true
242 return
243 }
244
245 wbsa.cc.UpdateState(wbsa.build())
246 }
247
248
249
250
251 func (wbsa *Aggregator) build() balancer.State {
252 wbsa.logger.Infof("Child pickers with config: %+v", wbsa.idToPickerState)
253
254
255 pickers := make([]weightedPickerState, 0, len(wbsa.idToPickerState))
256
257 switch aggState := wbsa.csEvltr.CurrentState(); aggState {
258 case connectivity.Connecting:
259 return balancer.State{
260 ConnectivityState: aggState,
261 Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable)}
262 case connectivity.TransientFailure:
263
264 for _, ps := range wbsa.idToPickerState {
265 pickers = append(pickers, *ps)
266 }
267 return balancer.State{
268 ConnectivityState: aggState,
269 Picker: newWeightedPickerGroup(pickers, wbsa.newWRR)}
270 default:
271 for _, ps := range wbsa.idToPickerState {
272 if ps.stateToAggregate == connectivity.Ready {
273 pickers = append(pickers, *ps)
274 }
275 }
276 return balancer.State{
277 ConnectivityState: aggState,
278 Picker: newWeightedPickerGroup(pickers, wbsa.newWRR)}
279 }
280
281 }
282
283 type weightedPickerGroup struct {
284 w wrr.WRR
285 }
286
287
288
289
290
291
292 func newWeightedPickerGroup(readyWeightedPickers []weightedPickerState, newWRR func() wrr.WRR) *weightedPickerGroup {
293 w := newWRR()
294 for _, ps := range readyWeightedPickers {
295 w.Add(ps.state.Picker, int64(ps.weight))
296 }
297
298 return &weightedPickerGroup{
299 w: w,
300 }
301 }
302
303 func (pg *weightedPickerGroup) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
304 p, ok := pg.w.Next().(balancer.Picker)
305 if !ok {
306 return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
307 }
308 return p.Pick(info)
309 }
310
View as plain text