1
18
19
20 package ringhash
21
22 import (
23 "encoding/json"
24 "errors"
25 "fmt"
26 "sync"
27
28 "google.golang.org/grpc/balancer"
29 "google.golang.org/grpc/balancer/base"
30 "google.golang.org/grpc/balancer/weightedroundrobin"
31 "google.golang.org/grpc/connectivity"
32 "google.golang.org/grpc/internal/grpclog"
33 "google.golang.org/grpc/internal/pretty"
34 "google.golang.org/grpc/resolver"
35 "google.golang.org/grpc/serviceconfig"
36 )
37
38
39 const Name = "ring_hash_experimental"
40
41 func init() {
42 balancer.Register(bb{})
43 }
44
45 type bb struct{}
46
47 func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
48 b := &ringhashBalancer{
49 cc: cc,
50 subConns: resolver.NewAddressMap(),
51 scStates: make(map[balancer.SubConn]*subConn),
52 csEvltr: &connectivityStateEvaluator{},
53 }
54 b.logger = prefixLogger(b)
55 b.logger.Infof("Created")
56 return b
57 }
58
59 func (bb) Name() string {
60 return Name
61 }
62
63 func (bb) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
64 return parseConfig(c)
65 }
66
67 type subConn struct {
68 addr string
69 weight uint32
70 sc balancer.SubConn
71 logger *grpclog.PrefixLogger
72
73 mu sync.RWMutex
74
75
76 state connectivity.State
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95 failing bool
96
97
98
99
100
101
102 connectQueued bool
103
104
105
106 attemptingToConnect bool
107 }
108
109
110
111
112
113 func (sc *subConn) setState(s connectivity.State) {
114 sc.mu.Lock()
115 defer sc.mu.Unlock()
116 switch s {
117 case connectivity.Idle:
118
119 if sc.connectQueued {
120 sc.connectQueued = false
121 sc.logger.Infof("Executing a queued connect for subConn moving to state: %v", sc.state)
122 sc.sc.Connect()
123 } else {
124 sc.attemptingToConnect = false
125 }
126 case connectivity.Connecting:
127
128
129 sc.connectQueued = false
130 case connectivity.Ready:
131
132
133 sc.connectQueued = false
134 sc.attemptingToConnect = false
135
136 sc.failing = false
137 case connectivity.TransientFailure:
138
139 sc.failing = true
140 case connectivity.Shutdown:
141 sc.attemptingToConnect = false
142 }
143 sc.state = s
144 }
145
146
147
148
149 func (sc *subConn) effectiveState() connectivity.State {
150 sc.mu.RLock()
151 defer sc.mu.RUnlock()
152 if sc.failing && (sc.state == connectivity.Idle || sc.state == connectivity.Connecting) {
153 return connectivity.TransientFailure
154 }
155 return sc.state
156 }
157
158
159
160
161 func (sc *subConn) queueConnect() {
162 sc.mu.Lock()
163 defer sc.mu.Unlock()
164 sc.attemptingToConnect = true
165 if sc.state == connectivity.Idle {
166 sc.logger.Infof("Executing a queued connect for subConn in state: %v", sc.state)
167 sc.sc.Connect()
168 return
169 }
170
171
172 sc.logger.Infof("Queueing a connect for subConn in state: %v", sc.state)
173 sc.connectQueued = true
174 }
175
176 func (sc *subConn) isAttemptingToConnect() bool {
177 sc.mu.Lock()
178 defer sc.mu.Unlock()
179 return sc.attemptingToConnect
180 }
181
182 type ringhashBalancer struct {
183 cc balancer.ClientConn
184 logger *grpclog.PrefixLogger
185
186 config *LBConfig
187 subConns *resolver.AddressMap
188 scStates map[balancer.SubConn]*subConn
189
190
191
192
193 ring *ring
194 picker balancer.Picker
195 csEvltr *connectivityStateEvaluator
196 state connectivity.State
197
198 resolverErr error
199 connErr error
200 }
201
202
203
204
205
206
207
208
209
210
211
212
213 func (b *ringhashBalancer) updateAddresses(addrs []resolver.Address) bool {
214 var addrsUpdated bool
215
216 addrsSet := resolver.NewAddressMap()
217 for _, addr := range addrs {
218 addrsSet.Set(addr, true)
219 newWeight := getWeightAttribute(addr)
220 if val, ok := b.subConns.Get(addr); !ok {
221 var sc balancer.SubConn
222 opts := balancer.NewSubConnOptions{
223 HealthCheckEnabled: true,
224 StateListener: func(state balancer.SubConnState) { b.updateSubConnState(sc, state) },
225 }
226 sc, err := b.cc.NewSubConn([]resolver.Address{addr}, opts)
227 if err != nil {
228 b.logger.Warningf("Failed to create new SubConn: %v", err)
229 continue
230 }
231 scs := &subConn{addr: addr.Addr, weight: newWeight, sc: sc}
232 scs.logger = subConnPrefixLogger(b, scs)
233 scs.setState(connectivity.Idle)
234 b.state = b.csEvltr.recordTransition(connectivity.Shutdown, connectivity.Idle)
235 b.subConns.Set(addr, scs)
236 b.scStates[sc] = scs
237 addrsUpdated = true
238 } else {
239
240
241
242
243
244
245
246 scInfo := val.(*subConn)
247 if oldWeight := scInfo.weight; oldWeight != newWeight {
248 scInfo.weight = newWeight
249 b.subConns.Set(addr, scInfo)
250
251 addrsUpdated = true
252 }
253 }
254 }
255 for _, addr := range b.subConns.Keys() {
256
257 if _, ok := addrsSet.Get(addr); !ok {
258 v, _ := b.subConns.Get(addr)
259 scInfo := v.(*subConn)
260 scInfo.sc.Shutdown()
261 b.subConns.Delete(addr)
262 addrsUpdated = true
263
264
265 }
266 }
267 return addrsUpdated
268 }
269
270 func (b *ringhashBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
271 b.logger.Infof("Received update from resolver, balancer config: %+v", pretty.ToJSON(s.BalancerConfig))
272 newConfig, ok := s.BalancerConfig.(*LBConfig)
273 if !ok {
274 return fmt.Errorf("unexpected balancer config with type: %T", s.BalancerConfig)
275 }
276
277
278
279
280 regenerateRing := b.updateAddresses(s.ResolverState.Addresses)
281
282
283
284 if b.config == nil || b.config.MinRingSize != newConfig.MinRingSize || b.config.MaxRingSize != newConfig.MaxRingSize {
285 regenerateRing = true
286 }
287 b.config = newConfig
288
289
290
291
292
293 if len(s.ResolverState.Addresses) == 0 {
294 b.ResolverError(errors.New("produced zero addresses"))
295 return balancer.ErrBadResolverState
296 }
297
298 if regenerateRing {
299
300
301 b.ring = newRing(b.subConns, b.config.MinRingSize, b.config.MaxRingSize, b.logger)
302 b.regeneratePicker()
303 b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker})
304 }
305
306
307 b.resolverErr = nil
308 return nil
309 }
310
311 func (b *ringhashBalancer) ResolverError(err error) {
312 b.resolverErr = err
313 if b.subConns.Len() == 0 {
314 b.state = connectivity.TransientFailure
315 }
316
317 if b.state != connectivity.TransientFailure {
318
319
320 return
321 }
322 b.regeneratePicker()
323 b.cc.UpdateState(balancer.State{
324 ConnectivityState: b.state,
325 Picker: b.picker,
326 })
327 }
328
329 func (b *ringhashBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
330 b.logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", sc, state)
331 }
332
333
334
335
336
337
338
339
340
341
342
343
344 func (b *ringhashBalancer) updateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
345 s := state.ConnectivityState
346 if logger.V(2) {
347 b.logger.Infof("Handle SubConn state change: %p, %v", sc, s)
348 }
349 scs, ok := b.scStates[sc]
350 if !ok {
351 b.logger.Infof("Received state change for an unknown SubConn: %p, %v", sc, s)
352 return
353 }
354 oldSCState := scs.effectiveState()
355 scs.setState(s)
356 newSCState := scs.effectiveState()
357 b.logger.Infof("SubConn's effective old state was: %v, new state is %v", oldSCState, newSCState)
358
359 b.state = b.csEvltr.recordTransition(oldSCState, newSCState)
360
361 switch s {
362 case connectivity.TransientFailure:
363
364 b.connErr = state.ConnectionError
365 case connectivity.Shutdown:
366
367
368 delete(b.scStates, sc)
369 }
370
371 if oldSCState != newSCState {
372
373
374
375 b.regeneratePicker()
376 b.logger.Infof("Pushing new state %v and picker %p", b.state, b.picker)
377 b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker})
378 }
379
380 switch b.state {
381 case connectivity.Connecting, connectivity.TransientFailure:
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396 for _, v := range b.subConns.Values() {
397 sc := v.(*subConn)
398 if sc.isAttemptingToConnect() {
399 return
400 }
401 }
402
403
404 sc := nextSkippingDuplicatesSubConn(b.ring, scs)
405 if sc != nil {
406 sc.queueConnect()
407 return
408 }
409
410
411
412
413
414 scs.queueConnect()
415 }
416 }
417
418
419
420 func (b *ringhashBalancer) mergeErrors() error {
421
422
423 if b.connErr == nil {
424 return fmt.Errorf("last resolver error: %v", b.resolverErr)
425 }
426 if b.resolverErr == nil {
427 return fmt.Errorf("last connection error: %v", b.connErr)
428 }
429 return fmt.Errorf("last connection error: %v; last resolver error: %v", b.connErr, b.resolverErr)
430 }
431
432 func (b *ringhashBalancer) regeneratePicker() {
433 if b.state == connectivity.TransientFailure {
434 b.picker = base.NewErrPicker(b.mergeErrors())
435 return
436 }
437 b.picker = newPicker(b.ring, b.logger)
438 }
439
440 func (b *ringhashBalancer) Close() {
441 b.logger.Infof("Shutdown")
442 }
443
444 func (b *ringhashBalancer) ExitIdle() {
445
446
447 }
448
449
450
451
452
453 type connectivityStateEvaluator struct {
454 sum uint64
455 nums [5]uint64
456 }
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472 func (cse *connectivityStateEvaluator) recordTransition(oldState, newState connectivity.State) connectivity.State {
473
474 for idx, state := range []connectivity.State{oldState, newState} {
475 updateVal := 2*uint64(idx) - 1
476 cse.nums[state] += updateVal
477 }
478 if oldState == connectivity.Shutdown {
479
480
481 cse.sum++
482 }
483 if newState == connectivity.Shutdown {
484 cse.sum--
485 }
486
487 if cse.nums[connectivity.Ready] > 0 {
488 return connectivity.Ready
489 }
490 if cse.nums[connectivity.TransientFailure] > 1 {
491 return connectivity.TransientFailure
492 }
493 if cse.nums[connectivity.Connecting] > 0 {
494 return connectivity.Connecting
495 }
496 if cse.nums[connectivity.TransientFailure] > 0 && cse.sum > 1 {
497 return connectivity.Connecting
498 }
499 if cse.nums[connectivity.Idle] > 0 {
500 return connectivity.Idle
501 }
502 return connectivity.TransientFailure
503 }
504
505
506
507
508
509
510
511
512 func getWeightAttribute(addr resolver.Address) uint32 {
513 w := weightedroundrobin.GetAddrInfo(addr).Weight
514 if w == 0 {
515 return 1
516 }
517 return w
518 }
519
View as plain text