1
18
19
20 package rls
21
22 import (
23 "encoding/json"
24 "errors"
25 "fmt"
26 "sync"
27 "sync/atomic"
28 "time"
29 "unsafe"
30
31 "google.golang.org/grpc/balancer"
32 "google.golang.org/grpc/connectivity"
33 "google.golang.org/grpc/grpclog"
34 "google.golang.org/grpc/internal"
35 "google.golang.org/grpc/internal/backoff"
36 "google.golang.org/grpc/internal/balancergroup"
37 "google.golang.org/grpc/internal/buffer"
38 internalgrpclog "google.golang.org/grpc/internal/grpclog"
39 "google.golang.org/grpc/internal/grpcsync"
40 "google.golang.org/grpc/internal/pretty"
41 "google.golang.org/grpc/resolver"
42 )
43
44 const (
45
46
47
48
49 Name = internal.RLSLoadBalancingPolicyName
50
51 periodicCachePurgeFreq = time.Minute
52 )
53
54 var (
55 logger = grpclog.Component("rls")
56 errBalancerClosed = errors.New("rls LB policy is closed")
57
58
59
60
61 defaultBackoffStrategy = backoff.Strategy(backoff.DefaultExponential)
62
63 dataCachePurgeTicker = func() *time.Ticker { return time.NewTicker(periodicCachePurgeFreq) }
64
65
66
67
68
69
70
71
72
73 minEvictDuration = 5 * time.Second
74
75
76
77 clientConnUpdateHook = func() {}
78 dataCachePurgeHook = func() {}
79 resetBackoffHook = func() {}
80 )
81
82 func init() {
83 balancer.Register(&rlsBB{})
84 }
85
86 type rlsBB struct{}
87
88 func (rlsBB) Name() string {
89 return Name
90 }
91
92 func (rlsBB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
93 lb := &rlsBalancer{
94 closed: grpcsync.NewEvent(),
95 done: grpcsync.NewEvent(),
96 cc: cc,
97 bopts: opts,
98 purgeTicker: dataCachePurgeTicker(),
99 dataCachePurgeHook: dataCachePurgeHook,
100 lbCfg: &lbConfig{},
101 pendingMap: make(map[cacheKey]*backoffState),
102 childPolicies: make(map[string]*childPolicyWrapper),
103 updateCh: buffer.NewUnbounded(),
104 }
105 lb.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[rls-experimental-lb %p] ", lb))
106 lb.dataCache = newDataCache(maxCacheSize, lb.logger)
107 lb.bg = balancergroup.New(balancergroup.Options{
108 CC: cc,
109 BuildOpts: opts,
110 StateAggregator: lb,
111 Logger: lb.logger,
112 SubBalancerCloseTimeout: time.Duration(0),
113 })
114 lb.bg.Start()
115 go lb.run()
116 return lb
117 }
118
119
120 type rlsBalancer struct {
121 closed *grpcsync.Event
122 done *grpcsync.Event
123 cc balancer.ClientConn
124 bopts balancer.BuildOptions
125 purgeTicker *time.Ticker
126 dataCachePurgeHook func()
127 logger *internalgrpclog.PrefixLogger
128
129
130
131
132
133
134
135
136
137
138 cacheMu sync.Mutex
139 dataCache *dataCache
140 pendingMap map[cacheKey]*backoffState
141
142
143 stateMu sync.Mutex
144 lbCfg *lbConfig
145 childPolicyBuilder balancer.Builder
146 resolverState resolver.State
147 ctrlCh *controlChannel
148 bg *balancergroup.BalancerGroup
149 childPolicies map[string]*childPolicyWrapper
150 defaultPolicy *childPolicyWrapper
151
152
153
154
155 lastPicker *rlsPicker
156
157
158
159
160 inhibitPickerUpdates bool
161
162
163 updateCh *buffer.Unbounded
164 }
165
166 type resumePickerUpdates struct {
167 done chan struct{}
168 }
169
170
171 type childPolicyIDAndState struct {
172 id string
173 state balancer.State
174 }
175
176 type controlChannelReady struct{}
177
178
179
180
181
182 func (b *rlsBalancer) run() {
183
184
185
186 defer func() { b.done.Fire() }()
187
188
189 doneCh := make(chan struct{})
190 defer func() {
191 <-doneCh
192 }()
193 go b.purgeDataCache(doneCh)
194
195 for {
196 select {
197 case u, ok := <-b.updateCh.Get():
198 if !ok {
199 return
200 }
201 b.updateCh.Load()
202 switch update := u.(type) {
203 case childPolicyIDAndState:
204 b.handleChildPolicyStateUpdate(update.id, update.state)
205 case controlChannelReady:
206 b.logger.Infof("Resetting backoff state after control channel getting back to READY")
207 b.cacheMu.Lock()
208 updatePicker := b.dataCache.resetBackoffState(&backoffState{bs: defaultBackoffStrategy})
209 b.cacheMu.Unlock()
210 if updatePicker {
211 b.sendNewPicker()
212 }
213 resetBackoffHook()
214 case resumePickerUpdates:
215 b.stateMu.Lock()
216 b.logger.Infof("Resuming picker updates after config propagation to child policies")
217 b.inhibitPickerUpdates = false
218 b.sendNewPickerLocked()
219 close(update.done)
220 b.stateMu.Unlock()
221 default:
222 b.logger.Errorf("Unsupported update type %T", update)
223 }
224 case <-b.closed.Done():
225 return
226 }
227 }
228 }
229
230
231
232
233 func (b *rlsBalancer) purgeDataCache(doneCh chan struct{}) {
234 defer close(doneCh)
235
236 for {
237 select {
238 case <-b.closed.Done():
239 return
240 case <-b.purgeTicker.C:
241 b.cacheMu.Lock()
242 updatePicker := b.dataCache.evictExpiredEntries()
243 b.cacheMu.Unlock()
244 if updatePicker {
245 b.sendNewPicker()
246 }
247 b.dataCachePurgeHook()
248 }
249 }
250 }
251
252 func (b *rlsBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error {
253 defer clientConnUpdateHook()
254
255 b.stateMu.Lock()
256 if b.closed.HasFired() {
257 b.stateMu.Unlock()
258 b.logger.Warningf("Received service config after balancer close: %s", pretty.ToJSON(ccs.BalancerConfig))
259 return errBalancerClosed
260 }
261
262 newCfg := ccs.BalancerConfig.(*lbConfig)
263 if b.lbCfg.Equal(newCfg) {
264 b.stateMu.Unlock()
265 b.logger.Infof("New service config matches existing config")
266 return nil
267 }
268
269 b.logger.Infof("Delaying picker updates until config is propagated to and processed by child policies")
270 b.inhibitPickerUpdates = true
271
272
273
274
275
276 b.handleControlChannelUpdate(newCfg)
277
278
279
280 b.resolverState = ccs.ResolverState
281 b.handleChildPolicyConfigUpdate(newCfg, &ccs)
282
283
284 resizeCache := newCfg.cacheSizeBytes != b.lbCfg.cacheSizeBytes
285
286
287 b.lbCfg = newCfg
288
289
290
291
292 done := make(chan struct{})
293 b.updateCh.Put(resumePickerUpdates{done: done})
294 b.stateMu.Unlock()
295 <-done
296
297 if resizeCache {
298
299
300
301
302
303
304
305 b.cacheMu.Lock()
306 b.dataCache.resize(newCfg.cacheSizeBytes)
307 b.cacheMu.Unlock()
308 }
309 return nil
310 }
311
312
313
314
315
316 func (b *rlsBalancer) handleControlChannelUpdate(newCfg *lbConfig) {
317 if newCfg.lookupService == b.lbCfg.lookupService && newCfg.lookupServiceTimeout == b.lbCfg.lookupServiceTimeout {
318 return
319 }
320
321
322 b.logger.Infof("Creating control channel to RLS server at: %v", newCfg.lookupService)
323 backToReadyFn := func() {
324 b.updateCh.Put(controlChannelReady{})
325 }
326 ctrlCh, err := newControlChannel(newCfg.lookupService, newCfg.controlChannelServiceConfig, newCfg.lookupServiceTimeout, b.bopts, backToReadyFn)
327 if err != nil {
328
329
330
331 b.logger.Errorf("Failed to create control channel to %q: %v", newCfg.lookupService, err)
332 return
333 }
334 if b.ctrlCh != nil {
335 b.ctrlCh.close()
336 }
337 b.ctrlCh = ctrlCh
338 }
339
340
341
342
343
344 func (b *rlsBalancer) handleChildPolicyConfigUpdate(newCfg *lbConfig, ccs *balancer.ClientConnState) {
345
346 if b.childPolicyBuilder == nil || b.childPolicyBuilder.Name() != newCfg.childPolicyName {
347 b.logger.Infof("Child policy changed to %q", newCfg.childPolicyName)
348 b.childPolicyBuilder = balancer.Get(newCfg.childPolicyName)
349 for _, cpw := range b.childPolicies {
350
351
352
353 b.bg.Remove(cpw.target)
354 b.bg.Add(cpw.target, b.childPolicyBuilder)
355 }
356 }
357
358 configSentToDefault := false
359 if b.lbCfg.defaultTarget != newCfg.defaultTarget {
360
361
362
363 b.logger.Infof("Default target in LB config changing from %q to %q", b.lbCfg.defaultTarget, newCfg.defaultTarget)
364 cpw := b.childPolicies[newCfg.defaultTarget]
365 if cpw == nil {
366 cpw = newChildPolicyWrapper(newCfg.defaultTarget)
367 b.childPolicies[newCfg.defaultTarget] = cpw
368 b.bg.Add(newCfg.defaultTarget, b.childPolicyBuilder)
369 b.logger.Infof("Child policy %q added to BalancerGroup", newCfg.defaultTarget)
370 }
371 if err := b.buildAndPushChildPolicyConfigs(newCfg.defaultTarget, newCfg, ccs); err != nil {
372 cpw.lamify(err)
373 }
374
375
376
377
378 if b.defaultPolicy != nil {
379 if b.defaultPolicy.releaseRef() {
380 delete(b.childPolicies, b.lbCfg.defaultTarget)
381 b.bg.Remove(b.defaultPolicy.target)
382 }
383 }
384 b.defaultPolicy = cpw
385 configSentToDefault = true
386 }
387
388
389 if b.lbCfg.childPolicyName == newCfg.childPolicyName && b.lbCfg.childPolicyTargetField == newCfg.childPolicyTargetField && childPolicyConfigEqual(b.lbCfg.childPolicyConfig, newCfg.childPolicyConfig) {
390 return
391 }
392
393
394
395 for _, cpw := range b.childPolicies {
396 if configSentToDefault && cpw.target == newCfg.defaultTarget {
397
398 continue
399 }
400 if err := b.buildAndPushChildPolicyConfigs(cpw.target, newCfg, ccs); err != nil {
401 cpw.lamify(err)
402 }
403 }
404 }
405
406
407
408
409
410
411
412 func (b *rlsBalancer) buildAndPushChildPolicyConfigs(target string, newCfg *lbConfig, ccs *balancer.ClientConnState) error {
413 jsonTarget, err := json.Marshal(target)
414 if err != nil {
415 return fmt.Errorf("failed to marshal child policy target %q: %v", target, err)
416 }
417
418 config := newCfg.childPolicyConfig
419 targetField := newCfg.childPolicyTargetField
420 config[targetField] = jsonTarget
421 jsonCfg, err := json.Marshal(config)
422 if err != nil {
423 return fmt.Errorf("failed to marshal child policy config %+v: %v", config, err)
424 }
425
426 parser, _ := b.childPolicyBuilder.(balancer.ConfigParser)
427 parsedCfg, err := parser.ParseConfig(jsonCfg)
428 if err != nil {
429 return fmt.Errorf("childPolicy config parsing failed: %v", err)
430 }
431
432 state := balancer.ClientConnState{ResolverState: ccs.ResolverState, BalancerConfig: parsedCfg}
433 b.logger.Infof("Pushing new state to child policy %q: %+v", target, state)
434 if err := b.bg.UpdateClientConnState(target, state); err != nil {
435 b.logger.Warningf("UpdateClientConnState(%q, %+v) failed : %v", target, ccs, err)
436 }
437 return nil
438 }
439
440 func (b *rlsBalancer) ResolverError(err error) {
441 b.bg.ResolverError(err)
442 }
443
444 func (b *rlsBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
445 b.logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", sc, state)
446 }
447
448 func (b *rlsBalancer) Close() {
449 b.stateMu.Lock()
450 b.closed.Fire()
451 b.purgeTicker.Stop()
452 if b.ctrlCh != nil {
453 b.ctrlCh.close()
454 }
455 b.bg.Close()
456 b.stateMu.Unlock()
457
458 b.cacheMu.Lock()
459 b.dataCache.stop()
460 b.cacheMu.Unlock()
461
462 b.updateCh.Close()
463
464 <-b.done.Done()
465 }
466
467 func (b *rlsBalancer) ExitIdle() {
468 b.bg.ExitIdle()
469 }
470
471
472
473
474
475
476
477
478
479
480
481
482 func (b *rlsBalancer) sendNewPickerLocked() {
483 aggregatedState := b.aggregatedConnectivityState()
484
485
486
487
488
489
490 if b.defaultPolicy != nil {
491 b.defaultPolicy.acquireRef()
492 }
493 picker := &rlsPicker{
494 kbm: b.lbCfg.kbMap,
495 origEndpoint: b.bopts.Target.Endpoint(),
496 lb: b,
497 defaultPolicy: b.defaultPolicy,
498 ctrlCh: b.ctrlCh,
499 maxAge: b.lbCfg.maxAge,
500 staleAge: b.lbCfg.staleAge,
501 bg: b.bg,
502 }
503 picker.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[rls-picker %p] ", picker))
504 state := balancer.State{
505 ConnectivityState: aggregatedState,
506 Picker: picker,
507 }
508
509 if !b.inhibitPickerUpdates {
510 b.logger.Infof("New balancer.State: %+v", state)
511 b.cc.UpdateState(state)
512 } else {
513 b.logger.Infof("Delaying picker update: %+v", state)
514 }
515
516 if b.lastPicker != nil {
517 if b.defaultPolicy != nil {
518 b.defaultPolicy.releaseRef()
519 }
520 }
521 b.lastPicker = picker
522 }
523
524 func (b *rlsBalancer) sendNewPicker() {
525 b.stateMu.Lock()
526 defer b.stateMu.Unlock()
527 if b.closed.HasFired() {
528 return
529 }
530 b.sendNewPickerLocked()
531 }
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547 func (b *rlsBalancer) aggregatedConnectivityState() connectivity.State {
548 if len(b.childPolicies) == 0 && b.lbCfg.defaultTarget == "" {
549 return connectivity.Idle
550 }
551
552 var readyN, connectingN, idleN int
553 for _, cpw := range b.childPolicies {
554 state := (*balancer.State)(atomic.LoadPointer(&cpw.state))
555 switch state.ConnectivityState {
556 case connectivity.Ready:
557 readyN++
558 case connectivity.Connecting:
559 connectingN++
560 case connectivity.Idle:
561 idleN++
562 }
563 }
564
565 switch {
566 case readyN > 0:
567 return connectivity.Ready
568 case connectingN > 0:
569 return connectivity.Connecting
570 case idleN > 0:
571 return connectivity.Idle
572 default:
573 return connectivity.TransientFailure
574 }
575 }
576
577
578
579
580
581 func (b *rlsBalancer) UpdateState(id string, state balancer.State) {
582 b.updateCh.Put(childPolicyIDAndState{id: id, state: state})
583 }
584
585
586
587
588
589
590
591
592
593
594 func (b *rlsBalancer) handleChildPolicyStateUpdate(id string, newState balancer.State) {
595 b.stateMu.Lock()
596 defer b.stateMu.Unlock()
597
598 cpw := b.childPolicies[id]
599 if cpw == nil {
600
601
602 b.logger.Warningf("Received state update %+v for missing child policy %q", newState, id)
603 return
604 }
605
606 oldState := (*balancer.State)(atomic.LoadPointer(&cpw.state))
607 if oldState.ConnectivityState == connectivity.TransientFailure && newState.ConnectivityState == connectivity.Connecting {
608
609
610
611
612 return
613 }
614 atomic.StorePointer(&cpw.state, unsafe.Pointer(&newState))
615 b.logger.Infof("Child policy %q has new state %+v", id, newState)
616 b.sendNewPickerLocked()
617 }
618
619
620
621
622
623 func (b *rlsBalancer) acquireChildPolicyReferences(targets []string) []*childPolicyWrapper {
624 b.stateMu.Lock()
625 var newChildPolicies []*childPolicyWrapper
626 for _, target := range targets {
627
628
629 if cpw := b.childPolicies[target]; cpw != nil {
630 cpw.acquireRef()
631 newChildPolicies = append(newChildPolicies, cpw)
632 continue
633 }
634
635
636
637 cpw := newChildPolicyWrapper(target)
638 b.childPolicies[target] = cpw
639 b.bg.Add(target, b.childPolicyBuilder)
640 b.logger.Infof("Child policy %q added to BalancerGroup", target)
641 newChildPolicies = append(newChildPolicies, cpw)
642 if err := b.buildAndPushChildPolicyConfigs(target, b.lbCfg, &balancer.ClientConnState{
643 ResolverState: b.resolverState,
644 }); err != nil {
645 cpw.lamify(err)
646 }
647 }
648 b.stateMu.Unlock()
649 return newChildPolicies
650 }
651
652
653
654
655 func (b *rlsBalancer) releaseChildPolicyReferences(targets []string) {
656 b.stateMu.Lock()
657 for _, target := range targets {
658 if cpw := b.childPolicies[target]; cpw.releaseRef() {
659 delete(b.childPolicies, cpw.target)
660 b.bg.Remove(cpw.target)
661 }
662 }
663 b.stateMu.Unlock()
664 }
665
View as plain text