1
18
19
20
21
22 package outlierdetection
23
24 import (
25 "encoding/json"
26 "fmt"
27 "math"
28 "strings"
29 "sync"
30 "sync/atomic"
31 "time"
32 "unsafe"
33
34 "google.golang.org/grpc/balancer"
35 "google.golang.org/grpc/connectivity"
36 "google.golang.org/grpc/internal/balancer/gracefulswitch"
37 "google.golang.org/grpc/internal/buffer"
38 "google.golang.org/grpc/internal/channelz"
39 "google.golang.org/grpc/internal/grpclog"
40 "google.golang.org/grpc/internal/grpcrand"
41 "google.golang.org/grpc/internal/grpcsync"
42 iserviceconfig "google.golang.org/grpc/internal/serviceconfig"
43 "google.golang.org/grpc/resolver"
44 "google.golang.org/grpc/serviceconfig"
45 )
46
47
48 var (
49 afterFunc = time.AfterFunc
50 now = time.Now
51 )
52
53
54 const Name = "outlier_detection_experimental"
55
56 func init() {
57 balancer.Register(bb{})
58 }
59
60 type bb struct{}
61
62 func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
63 b := &outlierDetectionBalancer{
64 cc: cc,
65 closed: grpcsync.NewEvent(),
66 done: grpcsync.NewEvent(),
67 addrs: make(map[string]*addressInfo),
68 scWrappers: make(map[balancer.SubConn]*subConnWrapper),
69 scUpdateCh: buffer.NewUnbounded(),
70 pickerUpdateCh: buffer.NewUnbounded(),
71 channelzParent: bOpts.ChannelzParent,
72 }
73 b.logger = prefixLogger(b)
74 b.logger.Infof("Created")
75 b.child = gracefulswitch.NewBalancer(b, bOpts)
76 go b.run()
77 return b
78 }
79
80 func (bb) ParseConfig(s json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
81 lbCfg := &LBConfig{
82
83 Interval: iserviceconfig.Duration(10 * time.Second),
84 BaseEjectionTime: iserviceconfig.Duration(30 * time.Second),
85 MaxEjectionTime: iserviceconfig.Duration(300 * time.Second),
86 MaxEjectionPercent: 10,
87 }
88
89
90
91 if err := json.Unmarshal(s, lbCfg); err != nil {
92 return nil, fmt.Errorf("xds: unable to unmarshal LBconfig: %s, error: %v", string(s), err)
93 }
94
95
96
97
98
99
100
101
102 switch {
103
104
105
106
107
108
109
110 case lbCfg.Interval < 0:
111 return nil, fmt.Errorf("OutlierDetectionLoadBalancingConfig.interval = %s; must be >= 0", lbCfg.Interval)
112 case lbCfg.BaseEjectionTime < 0:
113 return nil, fmt.Errorf("OutlierDetectionLoadBalancingConfig.base_ejection_time = %s; must be >= 0", lbCfg.BaseEjectionTime)
114 case lbCfg.MaxEjectionTime < 0:
115 return nil, fmt.Errorf("OutlierDetectionLoadBalancingConfig.max_ejection_time = %s; must be >= 0", lbCfg.MaxEjectionTime)
116
117
118
119
120
121
122 case lbCfg.MaxEjectionPercent > 100:
123 return nil, fmt.Errorf("OutlierDetectionLoadBalancingConfig.max_ejection_percent = %v; must be <= 100", lbCfg.MaxEjectionPercent)
124 case lbCfg.SuccessRateEjection != nil && lbCfg.SuccessRateEjection.EnforcementPercentage > 100:
125 return nil, fmt.Errorf("OutlierDetectionLoadBalancingConfig.SuccessRateEjection.enforcement_percentage = %v; must be <= 100", lbCfg.SuccessRateEjection.EnforcementPercentage)
126 case lbCfg.FailurePercentageEjection != nil && lbCfg.FailurePercentageEjection.Threshold > 100:
127 return nil, fmt.Errorf("OutlierDetectionLoadBalancingConfig.FailurePercentageEjection.threshold = %v; must be <= 100", lbCfg.FailurePercentageEjection.Threshold)
128 case lbCfg.FailurePercentageEjection != nil && lbCfg.FailurePercentageEjection.EnforcementPercentage > 100:
129 return nil, fmt.Errorf("OutlierDetectionLoadBalancingConfig.FailurePercentageEjection.enforcement_percentage = %v; must be <= 100", lbCfg.FailurePercentageEjection.EnforcementPercentage)
130 }
131 return lbCfg, nil
132 }
133
134 func (bb) Name() string {
135 return Name
136 }
137
138
139 type scUpdate struct {
140 scw *subConnWrapper
141 state balancer.SubConnState
142 }
143
144 type ejectionUpdate struct {
145 scw *subConnWrapper
146 isEjected bool
147 }
148
149 type lbCfgUpdate struct {
150 lbCfg *LBConfig
151
152 done chan struct{}
153 }
154
155 type outlierDetectionBalancer struct {
156
157
158
159
160
161 childState balancer.State
162
163
164
165 recentPickerNoop bool
166
167 closed *grpcsync.Event
168 done *grpcsync.Event
169 cc balancer.ClientConn
170 logger *grpclog.PrefixLogger
171 channelzParent channelz.Identifier
172
173
174
175 childMu sync.Mutex
176 child *gracefulswitch.Balancer
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195 mu sync.Mutex
196 addrs map[string]*addressInfo
197 cfg *LBConfig
198 scWrappers map[balancer.SubConn]*subConnWrapper
199 timerStartTime time.Time
200 intervalTimer *time.Timer
201 inhibitPickerUpdates bool
202 updateUnconditionally bool
203 numAddrsEjected int
204
205 scUpdateCh *buffer.Unbounded
206 pickerUpdateCh *buffer.Unbounded
207 }
208
209
210
211
212
213 func (b *outlierDetectionBalancer) noopConfig() bool {
214 return b.cfg.SuccessRateEjection == nil && b.cfg.FailurePercentageEjection == nil
215 }
216
217
218
219
220
221
222
223 func (b *outlierDetectionBalancer) onIntervalConfig() {
224 var interval time.Duration
225 if b.timerStartTime.IsZero() {
226 b.timerStartTime = time.Now()
227 for _, addrInfo := range b.addrs {
228 addrInfo.callCounter.clear()
229 }
230 interval = time.Duration(b.cfg.Interval)
231 } else {
232 interval = time.Duration(b.cfg.Interval) - now().Sub(b.timerStartTime)
233 if interval < 0 {
234 interval = 0
235 }
236 }
237 b.intervalTimer = afterFunc(interval, b.intervalTimerAlgorithm)
238 }
239
240
241
242
243
244 func (b *outlierDetectionBalancer) onNoopConfig() {
245
246
247
248
249 b.timerStartTime = time.Time{}
250 for _, addrInfo := range b.addrs {
251
252 if !addrInfo.latestEjectionTimestamp.IsZero() {
253 b.unejectAddress(addrInfo)
254 }
255
256 addrInfo.ejectionTimeMultiplier = 0
257 }
258 }
259
260 func (b *outlierDetectionBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
261 lbCfg, ok := s.BalancerConfig.(*LBConfig)
262 if !ok {
263 b.logger.Errorf("received config with unexpected type %T: %v", s.BalancerConfig, s.BalancerConfig)
264 return balancer.ErrBadResolverState
265 }
266
267
268
269 bb := balancer.Get(lbCfg.ChildPolicy.Name)
270 if bb == nil {
271 return fmt.Errorf("outlier detection: child balancer %q not registered", lbCfg.ChildPolicy.Name)
272 }
273
274
275
276
277
278 if b.cfg == nil || b.cfg.ChildPolicy.Name != lbCfg.ChildPolicy.Name {
279 b.childMu.Lock()
280 err := b.child.SwitchTo(bb)
281 if err != nil {
282 b.childMu.Unlock()
283 return fmt.Errorf("outlier detection: error switching to child of type %q: %v", lbCfg.ChildPolicy.Name, err)
284 }
285 b.childMu.Unlock()
286 }
287
288 b.mu.Lock()
289
290
291
292
293
294 b.inhibitPickerUpdates = true
295 b.updateUnconditionally = false
296 b.cfg = lbCfg
297
298 addrs := make(map[string]bool, len(s.ResolverState.Addresses))
299 for _, addr := range s.ResolverState.Addresses {
300 addrs[addr.Addr] = true
301 if _, ok := b.addrs[addr.Addr]; !ok {
302 b.addrs[addr.Addr] = newAddressInfo()
303 }
304 }
305 for addr := range b.addrs {
306 if !addrs[addr] {
307 delete(b.addrs, addr)
308 }
309 }
310
311 if b.intervalTimer != nil {
312 b.intervalTimer.Stop()
313 }
314
315 if b.noopConfig() {
316 b.onNoopConfig()
317 } else {
318 b.onIntervalConfig()
319 }
320 b.mu.Unlock()
321
322 b.childMu.Lock()
323 err := b.child.UpdateClientConnState(balancer.ClientConnState{
324 ResolverState: s.ResolverState,
325 BalancerConfig: b.cfg.ChildPolicy.Config,
326 })
327 b.childMu.Unlock()
328
329 done := make(chan struct{})
330 b.pickerUpdateCh.Put(lbCfgUpdate{
331 lbCfg: lbCfg,
332 done: done,
333 })
334 <-done
335
336 return err
337 }
338
339 func (b *outlierDetectionBalancer) ResolverError(err error) {
340 b.childMu.Lock()
341 defer b.childMu.Unlock()
342 b.child.ResolverError(err)
343 }
344
345 func (b *outlierDetectionBalancer) updateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
346 b.mu.Lock()
347 defer b.mu.Unlock()
348 scw, ok := b.scWrappers[sc]
349 if !ok {
350
351
352 b.logger.Errorf("UpdateSubConnState called with SubConn that has no corresponding SubConnWrapper")
353 return
354 }
355 if state.ConnectivityState == connectivity.Shutdown {
356 delete(b.scWrappers, scw.SubConn)
357 }
358 b.scUpdateCh.Put(&scUpdate{
359 scw: scw,
360 state: state,
361 })
362 }
363
364 func (b *outlierDetectionBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
365 b.logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", sc, state)
366 }
367
368 func (b *outlierDetectionBalancer) Close() {
369 b.closed.Fire()
370 <-b.done.Done()
371 b.childMu.Lock()
372 b.child.Close()
373 b.childMu.Unlock()
374
375 b.scUpdateCh.Close()
376 b.pickerUpdateCh.Close()
377
378 b.mu.Lock()
379 defer b.mu.Unlock()
380 if b.intervalTimer != nil {
381 b.intervalTimer.Stop()
382 }
383 }
384
385 func (b *outlierDetectionBalancer) ExitIdle() {
386 b.childMu.Lock()
387 defer b.childMu.Unlock()
388 b.child.ExitIdle()
389 }
390
391
392
393
394
395
396 type wrappedPicker struct {
397 childPicker balancer.Picker
398 noopPicker bool
399 }
400
401 func (wp *wrappedPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
402 pr, err := wp.childPicker.Pick(info)
403 if err != nil {
404 return balancer.PickResult{}, err
405 }
406
407 done := func(di balancer.DoneInfo) {
408 if !wp.noopPicker {
409 incrementCounter(pr.SubConn, di)
410 }
411 if pr.Done != nil {
412 pr.Done(di)
413 }
414 }
415 scw, ok := pr.SubConn.(*subConnWrapper)
416 if !ok {
417
418
419 logger.Errorf("Picked SubConn from child picker is not a SubConnWrapper")
420 return balancer.PickResult{
421 SubConn: pr.SubConn,
422 Done: done,
423 Metadata: pr.Metadata,
424 }, nil
425 }
426 return balancer.PickResult{
427 SubConn: scw.SubConn,
428 Done: done,
429 Metadata: pr.Metadata,
430 }, nil
431 }
432
433 func incrementCounter(sc balancer.SubConn, info balancer.DoneInfo) {
434 scw, ok := sc.(*subConnWrapper)
435 if !ok {
436
437 return
438 }
439
440
441
442
443
444
445
446
447
448
449
450
451
452 addrInfo := (*addressInfo)(atomic.LoadPointer(&scw.addressInfo))
453 if addrInfo == nil {
454 return
455 }
456 ab := (*bucket)(atomic.LoadPointer(&addrInfo.callCounter.activeBucket))
457
458 if info.Err == nil {
459 atomic.AddUint32(&ab.numSuccesses, 1)
460 } else {
461 atomic.AddUint32(&ab.numFailures, 1)
462 }
463 }
464
465 func (b *outlierDetectionBalancer) UpdateState(s balancer.State) {
466 b.pickerUpdateCh.Put(s)
467 }
468
469 func (b *outlierDetectionBalancer) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
470 var sc balancer.SubConn
471 oldListener := opts.StateListener
472 opts.StateListener = func(state balancer.SubConnState) { b.updateSubConnState(sc, state) }
473 sc, err := b.cc.NewSubConn(addrs, opts)
474 if err != nil {
475 return nil, err
476 }
477 scw := &subConnWrapper{
478 SubConn: sc,
479 addresses: addrs,
480 scUpdateCh: b.scUpdateCh,
481 listener: oldListener,
482 }
483 b.mu.Lock()
484 defer b.mu.Unlock()
485 b.scWrappers[sc] = scw
486 if len(addrs) != 1 {
487 return scw, nil
488 }
489 addrInfo, ok := b.addrs[addrs[0].Addr]
490 if !ok {
491 return scw, nil
492 }
493 addrInfo.sws = append(addrInfo.sws, scw)
494 atomic.StorePointer(&scw.addressInfo, unsafe.Pointer(addrInfo))
495 if !addrInfo.latestEjectionTimestamp.IsZero() {
496 scw.eject()
497 }
498 return scw, nil
499 }
500
501 func (b *outlierDetectionBalancer) RemoveSubConn(sc balancer.SubConn) {
502 b.logger.Errorf("RemoveSubConn(%v) called unexpectedly", sc)
503 }
504
505
506
507
508
509
510 func (b *outlierDetectionBalancer) appendIfPresent(addr string, scw *subConnWrapper) *addressInfo {
511 addrInfo, ok := b.addrs[addr]
512 if !ok {
513 return nil
514 }
515
516 addrInfo.sws = append(addrInfo.sws, scw)
517 atomic.StorePointer(&scw.addressInfo, unsafe.Pointer(addrInfo))
518 return addrInfo
519 }
520
521
522
523
524
525 func (b *outlierDetectionBalancer) removeSubConnFromAddressesMapEntry(scw *subConnWrapper) {
526 addrInfo := (*addressInfo)(atomic.LoadPointer(&scw.addressInfo))
527 if addrInfo == nil {
528 return
529 }
530 for i, sw := range addrInfo.sws {
531 if scw == sw {
532 addrInfo.sws = append(addrInfo.sws[:i], addrInfo.sws[i+1:]...)
533 return
534 }
535 }
536 }
537
538 func (b *outlierDetectionBalancer) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) {
539 scw, ok := sc.(*subConnWrapper)
540 if !ok {
541
542 return
543 }
544
545 b.cc.UpdateAddresses(scw.SubConn, addrs)
546 b.mu.Lock()
547 defer b.mu.Unlock()
548
549
550
551
552 switch {
553 case len(scw.addresses) == 1 && len(addrs) == 1:
554
555
556 if scw.addresses[0].Addr == addrs[0].Addr {
557 return
558 }
559 b.removeSubConnFromAddressesMapEntry(scw)
560 addrInfo := b.appendIfPresent(addrs[0].Addr, scw)
561 if addrInfo == nil {
562 scw.uneject()
563 break
564 }
565 if addrInfo.latestEjectionTimestamp.IsZero() {
566 scw.uneject()
567 } else {
568 scw.eject()
569 }
570 case len(scw.addresses) == 1:
571 b.removeSubConnFromAddressesMapEntry(scw)
572 addrInfo := (*addressInfo)(atomic.LoadPointer(&scw.addressInfo))
573 if addrInfo != nil {
574 addrInfo.callCounter.clear()
575 }
576 scw.uneject()
577 case len(addrs) == 1:
578 addrInfo := b.appendIfPresent(addrs[0].Addr, scw)
579 if addrInfo != nil && !addrInfo.latestEjectionTimestamp.IsZero() {
580 scw.eject()
581 }
582 }
583
584 scw.addresses = addrs
585 }
586
587 func (b *outlierDetectionBalancer) ResolveNow(opts resolver.ResolveNowOptions) {
588 b.cc.ResolveNow(opts)
589 }
590
591 func (b *outlierDetectionBalancer) Target() string {
592 return b.cc.Target()
593 }
594
595 func max(x, y time.Duration) time.Duration {
596 if x < y {
597 return y
598 }
599 return x
600 }
601
602 func min(x, y time.Duration) time.Duration {
603 if x < y {
604 return x
605 }
606 return y
607 }
608
609
610
611 func (b *outlierDetectionBalancer) handleSubConnUpdate(u *scUpdate) {
612 scw := u.scw
613 scw.latestState = u.state
614 if !scw.ejected {
615 if scw.listener != nil {
616 b.childMu.Lock()
617 scw.listener(u.state)
618 b.childMu.Unlock()
619 }
620 }
621 }
622
623
624
625 func (b *outlierDetectionBalancer) handleEjectedUpdate(u *ejectionUpdate) {
626 scw := u.scw
627 scw.ejected = u.isEjected
628
629
630 stateToUpdate := scw.latestState
631 if u.isEjected {
632 stateToUpdate = balancer.SubConnState{
633 ConnectivityState: connectivity.TransientFailure,
634 }
635 }
636 if scw.listener != nil {
637 b.childMu.Lock()
638 scw.listener(stateToUpdate)
639 b.childMu.Unlock()
640 }
641 }
642
643
644
645 func (b *outlierDetectionBalancer) handleChildStateUpdate(u balancer.State) {
646 b.childState = u
647 b.mu.Lock()
648 if b.inhibitPickerUpdates {
649
650
651
652 b.updateUnconditionally = true
653 b.mu.Unlock()
654 return
655 }
656 noopCfg := b.noopConfig()
657 b.mu.Unlock()
658 b.recentPickerNoop = noopCfg
659 b.cc.UpdateState(balancer.State{
660 ConnectivityState: b.childState.ConnectivityState,
661 Picker: &wrappedPicker{
662 childPicker: b.childState.Picker,
663 noopPicker: noopCfg,
664 },
665 })
666 }
667
668
669
670
671 func (b *outlierDetectionBalancer) handleLBConfigUpdate(u lbCfgUpdate) {
672 lbCfg := u.lbCfg
673 noopCfg := lbCfg.SuccessRateEjection == nil && lbCfg.FailurePercentageEjection == nil
674
675
676
677
678
679
680
681 if b.childState.Picker != nil && noopCfg != b.recentPickerNoop || b.updateUnconditionally {
682 b.recentPickerNoop = noopCfg
683 b.cc.UpdateState(balancer.State{
684 ConnectivityState: b.childState.ConnectivityState,
685 Picker: &wrappedPicker{
686 childPicker: b.childState.Picker,
687 noopPicker: noopCfg,
688 },
689 })
690 }
691 b.inhibitPickerUpdates = false
692 b.updateUnconditionally = false
693 close(u.done)
694 }
695
696 func (b *outlierDetectionBalancer) run() {
697 defer b.done.Fire()
698 for {
699 select {
700 case update, ok := <-b.scUpdateCh.Get():
701 if !ok {
702 return
703 }
704 b.scUpdateCh.Load()
705 if b.closed.HasFired() {
706 return
707 }
708 switch u := update.(type) {
709 case *scUpdate:
710 b.handleSubConnUpdate(u)
711 case *ejectionUpdate:
712 b.handleEjectedUpdate(u)
713 }
714 case update, ok := <-b.pickerUpdateCh.Get():
715 if !ok {
716 return
717 }
718 b.pickerUpdateCh.Load()
719 if b.closed.HasFired() {
720 return
721 }
722 switch u := update.(type) {
723 case balancer.State:
724 b.handleChildStateUpdate(u)
725 case lbCfgUpdate:
726 b.handleLBConfigUpdate(u)
727 }
728 case <-b.closed.Done():
729 return
730 }
731 }
732 }
733
734
735
736
737 func (b *outlierDetectionBalancer) intervalTimerAlgorithm() {
738 b.mu.Lock()
739 defer b.mu.Unlock()
740 b.timerStartTime = time.Now()
741
742 for _, addrInfo := range b.addrs {
743 addrInfo.callCounter.swap()
744 }
745
746 if b.cfg.SuccessRateEjection != nil {
747 b.successRateAlgorithm()
748 }
749
750 if b.cfg.FailurePercentageEjection != nil {
751 b.failurePercentageAlgorithm()
752 }
753
754 for _, addrInfo := range b.addrs {
755 if addrInfo.latestEjectionTimestamp.IsZero() && addrInfo.ejectionTimeMultiplier > 0 {
756 addrInfo.ejectionTimeMultiplier--
757 continue
758 }
759 if addrInfo.latestEjectionTimestamp.IsZero() {
760
761
762 continue
763 }
764 et := time.Duration(b.cfg.BaseEjectionTime) * time.Duration(addrInfo.ejectionTimeMultiplier)
765 met := max(time.Duration(b.cfg.BaseEjectionTime), time.Duration(b.cfg.MaxEjectionTime))
766 uet := addrInfo.latestEjectionTimestamp.Add(min(et, met))
767 if now().After(uet) {
768 b.unejectAddress(addrInfo)
769 }
770 }
771
772
773
774 if b.intervalTimer != nil {
775 b.intervalTimer.Stop()
776 }
777 b.intervalTimer = afterFunc(time.Duration(b.cfg.Interval), b.intervalTimerAlgorithm)
778 }
779
780
781
782
783
784 func (b *outlierDetectionBalancer) addrsWithAtLeastRequestVolume(requestVolume uint32) []*addressInfo {
785 var addrs []*addressInfo
786 for _, addrInfo := range b.addrs {
787 bucket := addrInfo.callCounter.inactiveBucket
788 rv := bucket.numSuccesses + bucket.numFailures
789 if rv >= requestVolume {
790 addrs = append(addrs, addrInfo)
791 }
792 }
793 return addrs
794 }
795
796
797
798
799
800 func (b *outlierDetectionBalancer) meanAndStdDev(addrs []*addressInfo) (float64, float64) {
801 var totalFractionOfSuccessfulRequests float64
802 var mean float64
803 for _, addrInfo := range addrs {
804 bucket := addrInfo.callCounter.inactiveBucket
805 rv := bucket.numSuccesses + bucket.numFailures
806 totalFractionOfSuccessfulRequests += float64(bucket.numSuccesses) / float64(rv)
807 }
808 mean = totalFractionOfSuccessfulRequests / float64(len(addrs))
809 var sumOfSquares float64
810 for _, addrInfo := range addrs {
811 bucket := addrInfo.callCounter.inactiveBucket
812 rv := bucket.numSuccesses + bucket.numFailures
813 devFromMean := (float64(bucket.numSuccesses) / float64(rv)) - mean
814 sumOfSquares += devFromMean * devFromMean
815 }
816 variance := sumOfSquares / float64(len(addrs))
817 return mean, math.Sqrt(variance)
818 }
819
820
821
822
823
824
825 func (b *outlierDetectionBalancer) successRateAlgorithm() {
826 addrsToConsider := b.addrsWithAtLeastRequestVolume(b.cfg.SuccessRateEjection.RequestVolume)
827 if len(addrsToConsider) < int(b.cfg.SuccessRateEjection.MinimumHosts) {
828 return
829 }
830 mean, stddev := b.meanAndStdDev(addrsToConsider)
831 for _, addrInfo := range addrsToConsider {
832 bucket := addrInfo.callCounter.inactiveBucket
833 ejectionCfg := b.cfg.SuccessRateEjection
834 if float64(b.numAddrsEjected)/float64(len(b.addrs))*100 >= float64(b.cfg.MaxEjectionPercent) {
835 return
836 }
837 successRate := float64(bucket.numSuccesses) / float64(bucket.numSuccesses+bucket.numFailures)
838 requiredSuccessRate := mean - stddev*(float64(ejectionCfg.StdevFactor)/1000)
839 if successRate < requiredSuccessRate {
840 channelz.Infof(logger, b.channelzParent, "SuccessRate algorithm detected outlier: %s. Parameters: successRate=%f, mean=%f, stddev=%f, requiredSuccessRate=%f", addrInfo, successRate, mean, stddev, requiredSuccessRate)
841 if uint32(grpcrand.Int31n(100)) < ejectionCfg.EnforcementPercentage {
842 b.ejectAddress(addrInfo)
843 }
844 }
845 }
846 }
847
848
849
850
851
852
853 func (b *outlierDetectionBalancer) failurePercentageAlgorithm() {
854 addrsToConsider := b.addrsWithAtLeastRequestVolume(b.cfg.FailurePercentageEjection.RequestVolume)
855 if len(addrsToConsider) < int(b.cfg.FailurePercentageEjection.MinimumHosts) {
856 return
857 }
858
859 for _, addrInfo := range addrsToConsider {
860 bucket := addrInfo.callCounter.inactiveBucket
861 ejectionCfg := b.cfg.FailurePercentageEjection
862 if float64(b.numAddrsEjected)/float64(len(b.addrs))*100 >= float64(b.cfg.MaxEjectionPercent) {
863 return
864 }
865 failurePercentage := (float64(bucket.numFailures) / float64(bucket.numSuccesses+bucket.numFailures)) * 100
866 if failurePercentage > float64(b.cfg.FailurePercentageEjection.Threshold) {
867 channelz.Infof(logger, b.channelzParent, "FailurePercentage algorithm detected outlier: %s, failurePercentage=%f", addrInfo, failurePercentage)
868 if uint32(grpcrand.Int31n(100)) < ejectionCfg.EnforcementPercentage {
869 b.ejectAddress(addrInfo)
870 }
871 }
872 }
873 }
874
875
876 func (b *outlierDetectionBalancer) ejectAddress(addrInfo *addressInfo) {
877 b.numAddrsEjected++
878 addrInfo.latestEjectionTimestamp = b.timerStartTime
879 addrInfo.ejectionTimeMultiplier++
880 for _, sbw := range addrInfo.sws {
881 sbw.eject()
882 channelz.Infof(logger, b.channelzParent, "Subchannel ejected: %s", sbw)
883 }
884
885 }
886
887
888 func (b *outlierDetectionBalancer) unejectAddress(addrInfo *addressInfo) {
889 b.numAddrsEjected--
890 addrInfo.latestEjectionTimestamp = time.Time{}
891 for _, sbw := range addrInfo.sws {
892 sbw.uneject()
893 channelz.Infof(logger, b.channelzParent, "Subchannel unejected: %s", sbw)
894 }
895 }
896
897
898
899
900
901
902
903 type addressInfo struct {
904
905 callCounter *callCounter
906
907
908
909 latestEjectionTimestamp time.Time
910
911
912 ejectionTimeMultiplier int64
913
914
915 sws []*subConnWrapper
916 }
917
918 func (a *addressInfo) String() string {
919 var res strings.Builder
920 res.WriteString("[")
921 for _, sw := range a.sws {
922 res.WriteString(sw.String())
923 }
924 res.WriteString("]")
925 return res.String()
926 }
927
928 func newAddressInfo() *addressInfo {
929 return &addressInfo{
930 callCounter: newCallCounter(),
931 }
932 }
933
View as plain text