1
18
19
20
21
22
23
24 package grpclb
25
26 import (
27 "context"
28 "errors"
29 "fmt"
30 "sync"
31 "time"
32
33 "google.golang.org/grpc"
34 "google.golang.org/grpc/balancer"
35 "google.golang.org/grpc/balancer/base"
36 grpclbstate "google.golang.org/grpc/balancer/grpclb/state"
37 "google.golang.org/grpc/connectivity"
38 "google.golang.org/grpc/credentials"
39 "google.golang.org/grpc/grpclog"
40 "google.golang.org/grpc/internal"
41 "google.golang.org/grpc/internal/backoff"
42 internalgrpclog "google.golang.org/grpc/internal/grpclog"
43 "google.golang.org/grpc/internal/pretty"
44 "google.golang.org/grpc/internal/resolver/dns"
45 "google.golang.org/grpc/resolver"
46 "google.golang.org/grpc/resolver/manual"
47 "google.golang.org/protobuf/types/known/durationpb"
48
49 lbpb "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1"
50 )
51
52 const (
53 lbTokenKey = "lb-token"
54 defaultFallbackTimeout = 10 * time.Second
55 grpclbName = "grpclb"
56 )
57
58 var errServerTerminatedConnection = errors.New("grpclb: failed to recv server list: server terminated connection")
59 var logger = grpclog.Component("grpclb")
60
61 func convertDuration(d *durationpb.Duration) time.Duration {
62 if d == nil {
63 return 0
64 }
65 return time.Duration(d.Seconds)*time.Second + time.Duration(d.Nanos)*time.Nanosecond
66 }
67
68
69
70
71 type loadBalancerClient struct {
72 cc *grpc.ClientConn
73 }
74
75 func (c *loadBalancerClient) BalanceLoad(ctx context.Context, opts ...grpc.CallOption) (*balanceLoadClientStream, error) {
76 desc := &grpc.StreamDesc{
77 StreamName: "BalanceLoad",
78 ServerStreams: true,
79 ClientStreams: true,
80 }
81 stream, err := c.cc.NewStream(ctx, desc, "/grpc.lb.v1.LoadBalancer/BalanceLoad", opts...)
82 if err != nil {
83 return nil, err
84 }
85 x := &balanceLoadClientStream{stream}
86 return x, nil
87 }
88
89 type balanceLoadClientStream struct {
90 grpc.ClientStream
91 }
92
93 func (x *balanceLoadClientStream) Send(m *lbpb.LoadBalanceRequest) error {
94 return x.ClientStream.SendMsg(m)
95 }
96
97 func (x *balanceLoadClientStream) Recv() (*lbpb.LoadBalanceResponse, error) {
98 m := new(lbpb.LoadBalanceResponse)
99 if err := x.ClientStream.RecvMsg(m); err != nil {
100 return nil, err
101 }
102 return m, nil
103 }
104
105 func init() {
106 balancer.Register(newLBBuilder())
107 dns.EnableSRVLookups = true
108 }
109
110
111 func newLBBuilder() balancer.Builder {
112 return newLBBuilderWithFallbackTimeout(defaultFallbackTimeout)
113 }
114
115
116
117
118
119
120
121 func newLBBuilderWithFallbackTimeout(fallbackTimeout time.Duration) balancer.Builder {
122 return &lbBuilder{
123 fallbackTimeout: fallbackTimeout,
124 }
125 }
126
127 type lbBuilder struct {
128 fallbackTimeout time.Duration
129 }
130
131 func (b *lbBuilder) Name() string {
132 return grpclbName
133 }
134
135 func (b *lbBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
136
137
138
139 mr := manual.NewBuilderWithScheme("grpclb-internal")
140
141
142
143 mr.ResolveNowCallback = cc.ResolveNow
144
145 lb := &lbBalancer{
146 cc: newLBCacheClientConn(cc),
147 dialTarget: opt.Target.Endpoint(),
148 target: opt.Target.Endpoint(),
149 opt: opt,
150 fallbackTimeout: b.fallbackTimeout,
151 doneCh: make(chan struct{}),
152
153 manualResolver: mr,
154 subConns: make(map[resolver.Address]balancer.SubConn),
155 scStates: make(map[balancer.SubConn]connectivity.State),
156 picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable),
157 clientStats: newRPCStats(),
158 backoff: backoff.DefaultExponential,
159 }
160 lb.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[grpclb %p] ", lb))
161
162 var err error
163 if opt.CredsBundle != nil {
164 lb.grpclbClientConnCreds, err = opt.CredsBundle.NewWithMode(internal.CredsBundleModeBalancer)
165 if err != nil {
166 lb.logger.Warningf("Failed to create credentials used for connecting to grpclb: %v", err)
167 }
168 lb.grpclbBackendCreds, err = opt.CredsBundle.NewWithMode(internal.CredsBundleModeBackendFromBalancer)
169 if err != nil {
170 lb.logger.Warningf("Failed to create credentials used for connecting to backends returned by grpclb: %v", err)
171 }
172 }
173
174 return lb
175 }
176
177 type lbBalancer struct {
178 cc *lbCacheClientConn
179 dialTarget string
180 target string
181 opt balancer.BuildOptions
182 logger *internalgrpclog.PrefixLogger
183
184 usePickFirst bool
185
186
187
188
189 grpclbClientConnCreds credentials.Bundle
190
191
192
193 grpclbBackendCreds credentials.Bundle
194
195 fallbackTimeout time.Duration
196 doneCh chan struct{}
197
198
199
200
201 manualResolver *manual.Resolver
202
203 ccRemoteLB *remoteBalancerCCWrapper
204
205 backoff backoff.Strategy
206
207
208
209 clientStats *rpcStats
210
211 mu sync.Mutex
212
213
214
215 fullServerList []*lbpb.Server
216
217
218 backendAddrs []resolver.Address
219
220
221
222
223 backendAddrsWithoutMetadata []resolver.Address
224
225 state connectivity.State
226 subConns map[resolver.Address]balancer.SubConn
227 scStates map[balancer.SubConn]connectivity.State
228 picker balancer.Picker
229
230
231 remoteBalancerConnected bool
232 serverListReceived bool
233 inFallback bool
234
235
236
237 resolvedBackendAddrs []resolver.Address
238 connErr error
239 }
240
241
242
243
244
245
246
247 func (lb *lbBalancer) regeneratePicker(resetDrop bool) {
248 if lb.state == connectivity.TransientFailure {
249 lb.picker = base.NewErrPicker(fmt.Errorf("all SubConns are in TransientFailure, last connection error: %v", lb.connErr))
250 return
251 }
252
253 if lb.state == connectivity.Connecting {
254 lb.picker = base.NewErrPicker(balancer.ErrNoSubConnAvailable)
255 return
256 }
257
258 var readySCs []balancer.SubConn
259 if lb.usePickFirst {
260 for _, sc := range lb.subConns {
261 readySCs = append(readySCs, sc)
262 break
263 }
264 } else {
265 for _, a := range lb.backendAddrsWithoutMetadata {
266 if sc, ok := lb.subConns[a]; ok {
267 if st, ok := lb.scStates[sc]; ok && st == connectivity.Ready {
268 readySCs = append(readySCs, sc)
269 }
270 }
271 }
272 }
273
274 if len(readySCs) <= 0 {
275
276
277
278
279
280
281 lb.picker = base.NewErrPicker(balancer.ErrNoSubConnAvailable)
282 return
283 }
284 if lb.inFallback {
285 lb.picker = newRRPicker(readySCs)
286 return
287 }
288 if resetDrop {
289 lb.picker = newLBPicker(lb.fullServerList, readySCs, lb.clientStats)
290 return
291 }
292 prevLBPicker, ok := lb.picker.(*lbPicker)
293 if !ok {
294 lb.picker = newLBPicker(lb.fullServerList, readySCs, lb.clientStats)
295 return
296 }
297 prevLBPicker.updateReadySCs(readySCs)
298 }
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313 func (lb *lbBalancer) aggregateSubConnStates() connectivity.State {
314 var numConnecting uint64
315
316 for _, sc := range lb.subConns {
317 if state, ok := lb.scStates[sc]; ok {
318 switch state {
319 case connectivity.Ready:
320 return connectivity.Ready
321 case connectivity.Connecting, connectivity.Idle:
322 numConnecting++
323 }
324 }
325 }
326 if numConnecting > 0 {
327 return connectivity.Connecting
328 }
329 return connectivity.TransientFailure
330 }
331
332
333
334 func (lb *lbBalancer) UpdateSubConnState(sc balancer.SubConn, scs balancer.SubConnState) {
335 lb.logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", sc, scs)
336 }
337
338 func (lb *lbBalancer) updateSubConnState(sc balancer.SubConn, scs balancer.SubConnState) {
339 s := scs.ConnectivityState
340 if lb.logger.V(2) {
341 lb.logger.Infof("SubConn state change: %p, %v", sc, s)
342 }
343 lb.mu.Lock()
344 defer lb.mu.Unlock()
345
346 oldS, ok := lb.scStates[sc]
347 if !ok {
348 if lb.logger.V(2) {
349 lb.logger.Infof("Received state change for an unknown SubConn: %p, %v", sc, s)
350 }
351 return
352 }
353 lb.scStates[sc] = s
354 switch s {
355 case connectivity.Idle:
356 sc.Connect()
357 case connectivity.Shutdown:
358
359
360 delete(lb.scStates, sc)
361 case connectivity.TransientFailure:
362 lb.connErr = scs.ConnectionError
363 }
364
365
366
367 lb.updateStateAndPicker((oldS == connectivity.Ready) != (s == connectivity.Ready), false)
368
369
370
371 if lb.state != connectivity.Ready {
372 if !lb.inFallback && !lb.remoteBalancerConnected {
373
374 lb.refreshSubConns(lb.resolvedBackendAddrs, true, lb.usePickFirst)
375 }
376 }
377 }
378
379
380
381
382
383 func (lb *lbBalancer) updateStateAndPicker(forceRegeneratePicker bool, resetDrop bool) {
384 oldAggrState := lb.state
385 lb.state = lb.aggregateSubConnStates()
386
387
388
389 if forceRegeneratePicker || (lb.state != oldAggrState) {
390 lb.regeneratePicker(resetDrop)
391 }
392 var cc balancer.ClientConn = lb.cc
393 if lb.usePickFirst {
394
395 cc = lb.cc.ClientConn
396 }
397
398 cc.UpdateState(balancer.State{ConnectivityState: lb.state, Picker: lb.picker})
399 }
400
401
402
403
404 func (lb *lbBalancer) fallbackToBackendsAfter(fallbackTimeout time.Duration) {
405 timer := time.NewTimer(fallbackTimeout)
406 defer timer.Stop()
407 select {
408 case <-timer.C:
409 case <-lb.doneCh:
410 return
411 }
412 lb.mu.Lock()
413 if lb.inFallback || lb.serverListReceived {
414 lb.mu.Unlock()
415 return
416 }
417
418 lb.refreshSubConns(lb.resolvedBackendAddrs, true, lb.usePickFirst)
419 lb.mu.Unlock()
420 }
421
422 func (lb *lbBalancer) handleServiceConfig(gc *grpclbServiceConfig) {
423 lb.mu.Lock()
424 defer lb.mu.Unlock()
425
426
427
428
429
430
431
432
433
434
435
436
437 if gc != nil {
438 target := lb.dialTarget
439 if gc.ServiceName != "" {
440 target = gc.ServiceName
441 }
442 if target != lb.target {
443 lb.target = target
444 if lb.ccRemoteLB != nil {
445 lb.ccRemoteLB.cancelRemoteBalancerCall()
446 }
447 }
448 }
449
450 newUsePickFirst := childIsPickFirst(gc)
451 if lb.usePickFirst == newUsePickFirst {
452 return
453 }
454 if lb.logger.V(2) {
455 lb.logger.Infof("Switching mode. Is pick_first used for backends? %v", newUsePickFirst)
456 }
457 lb.refreshSubConns(lb.backendAddrs, lb.inFallback, newUsePickFirst)
458 }
459
460 func (lb *lbBalancer) ResolverError(error) {
461
462
463 }
464
465 func (lb *lbBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error {
466 if lb.logger.V(2) {
467 lb.logger.Infof("UpdateClientConnState: %s", pretty.ToJSON(ccs))
468 }
469 gc, _ := ccs.BalancerConfig.(*grpclbServiceConfig)
470 lb.handleServiceConfig(gc)
471
472 backendAddrs := ccs.ResolverState.Addresses
473
474 var remoteBalancerAddrs []resolver.Address
475 if sd := grpclbstate.Get(ccs.ResolverState); sd != nil {
476
477
478 remoteBalancerAddrs = sd.BalancerAddresses
479 }
480
481 if len(backendAddrs)+len(remoteBalancerAddrs) == 0 {
482
483
484 return balancer.ErrBadResolverState
485 }
486
487 if len(remoteBalancerAddrs) == 0 {
488 if lb.ccRemoteLB != nil {
489 lb.ccRemoteLB.close()
490 lb.ccRemoteLB = nil
491 }
492 } else if lb.ccRemoteLB == nil {
493
494
495 if err := lb.newRemoteBalancerCCWrapper(); err != nil {
496 return err
497 }
498
499 go lb.fallbackToBackendsAfter(lb.fallbackTimeout)
500 }
501
502 if lb.ccRemoteLB != nil {
503
504
505 lb.manualResolver.UpdateState(resolver.State{Addresses: remoteBalancerAddrs})
506 }
507
508 lb.mu.Lock()
509 lb.resolvedBackendAddrs = backendAddrs
510 if len(remoteBalancerAddrs) == 0 || lb.inFallback {
511
512
513
514
515
516 lb.refreshSubConns(lb.resolvedBackendAddrs, true, lb.usePickFirst)
517 }
518 lb.mu.Unlock()
519 return nil
520 }
521
522 func (lb *lbBalancer) Close() {
523 select {
524 case <-lb.doneCh:
525 return
526 default:
527 }
528 close(lb.doneCh)
529 if lb.ccRemoteLB != nil {
530 lb.ccRemoteLB.close()
531 }
532 lb.cc.close()
533 }
534
535 func (lb *lbBalancer) ExitIdle() {}
536
View as plain text