...

Source file src/google.golang.org/grpc/xds/internal/balancer/outlierdetection/balancer.go

Documentation: google.golang.org/grpc/xds/internal/balancer/outlierdetection

     1  /*
     2   *
     3   * Copyright 2022 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  // Package outlierdetection provides an implementation of the outlier detection
    20  // LB policy, as defined in
    21  // https://github.com/grpc/proposal/blob/master/A50-xds-outlier-detection.md.
    22  package outlierdetection
    23  
    24  import (
    25  	"encoding/json"
    26  	"fmt"
    27  	"math"
    28  	"strings"
    29  	"sync"
    30  	"sync/atomic"
    31  	"time"
    32  	"unsafe"
    33  
    34  	"google.golang.org/grpc/balancer"
    35  	"google.golang.org/grpc/connectivity"
    36  	"google.golang.org/grpc/internal/balancer/gracefulswitch"
    37  	"google.golang.org/grpc/internal/buffer"
    38  	"google.golang.org/grpc/internal/channelz"
    39  	"google.golang.org/grpc/internal/grpclog"
    40  	"google.golang.org/grpc/internal/grpcrand"
    41  	"google.golang.org/grpc/internal/grpcsync"
    42  	iserviceconfig "google.golang.org/grpc/internal/serviceconfig"
    43  	"google.golang.org/grpc/resolver"
    44  	"google.golang.org/grpc/serviceconfig"
    45  )
    46  
    47  // Globals to stub out in tests.
    48  var (
    49  	afterFunc = time.AfterFunc
    50  	now       = time.Now
    51  )
    52  
    53  // Name is the name of the outlier detection balancer.
    54  const Name = "outlier_detection_experimental"
    55  
    56  func init() {
    57  	balancer.Register(bb{})
    58  }
    59  
    60  type bb struct{}
    61  
    62  func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
    63  	b := &outlierDetectionBalancer{
    64  		cc:             cc,
    65  		closed:         grpcsync.NewEvent(),
    66  		done:           grpcsync.NewEvent(),
    67  		addrs:          make(map[string]*addressInfo),
    68  		scWrappers:     make(map[balancer.SubConn]*subConnWrapper),
    69  		scUpdateCh:     buffer.NewUnbounded(),
    70  		pickerUpdateCh: buffer.NewUnbounded(),
    71  		channelzParent: bOpts.ChannelzParent,
    72  	}
    73  	b.logger = prefixLogger(b)
    74  	b.logger.Infof("Created")
    75  	b.child = gracefulswitch.NewBalancer(b, bOpts)
    76  	go b.run()
    77  	return b
    78  }
    79  
    80  func (bb) ParseConfig(s json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
    81  	lbCfg := &LBConfig{
    82  		// Default top layer values as documented in A50.
    83  		Interval:           iserviceconfig.Duration(10 * time.Second),
    84  		BaseEjectionTime:   iserviceconfig.Duration(30 * time.Second),
    85  		MaxEjectionTime:    iserviceconfig.Duration(300 * time.Second),
    86  		MaxEjectionPercent: 10,
    87  	}
    88  
    89  	// This unmarshalling handles underlying layers sre and fpe which have their
    90  	// own defaults for their fields if either sre or fpe are present.
    91  	if err := json.Unmarshal(s, lbCfg); err != nil { // Validates child config if present as well.
    92  		return nil, fmt.Errorf("xds: unable to unmarshal LBconfig: %s, error: %v", string(s), err)
    93  	}
    94  
    95  	// Note: in the xds flow, these validations will never fail. The xdsclient
    96  	// performs the same validations as here on the xds Outlier Detection
    97  	// resource before parsing resource into JSON which this function gets
    98  	// called with. A50 defines two separate places for these validations to
    99  	// take place, the xdsclient and this ParseConfig method. "When parsing a
   100  	// config from JSON, if any of these requirements is violated, that should
   101  	// be treated as a parsing error." - A50
   102  	switch {
   103  	// "The google.protobuf.Duration fields interval, base_ejection_time, and
   104  	// max_ejection_time must obey the restrictions in the
   105  	// google.protobuf.Duration documentation and they must have non-negative
   106  	// values." - A50
   107  	// Approximately 290 years is the maximum time that time.Duration (int64)
   108  	// can represent. The restrictions on the protobuf.Duration field are to be
   109  	// within +-10000 years. Thus, just check for negative values.
   110  	case lbCfg.Interval < 0:
   111  		return nil, fmt.Errorf("OutlierDetectionLoadBalancingConfig.interval = %s; must be >= 0", lbCfg.Interval)
   112  	case lbCfg.BaseEjectionTime < 0:
   113  		return nil, fmt.Errorf("OutlierDetectionLoadBalancingConfig.base_ejection_time = %s; must be >= 0", lbCfg.BaseEjectionTime)
   114  	case lbCfg.MaxEjectionTime < 0:
   115  		return nil, fmt.Errorf("OutlierDetectionLoadBalancingConfig.max_ejection_time = %s; must be >= 0", lbCfg.MaxEjectionTime)
   116  
   117  	// "The fields max_ejection_percent,
   118  	// success_rate_ejection.enforcement_percentage,
   119  	// failure_percentage_ejection.threshold, and
   120  	// failure_percentage.enforcement_percentage must have values less than or
   121  	// equal to 100." - A50
   122  	case lbCfg.MaxEjectionPercent > 100:
   123  		return nil, fmt.Errorf("OutlierDetectionLoadBalancingConfig.max_ejection_percent = %v; must be <= 100", lbCfg.MaxEjectionPercent)
   124  	case lbCfg.SuccessRateEjection != nil && lbCfg.SuccessRateEjection.EnforcementPercentage > 100:
   125  		return nil, fmt.Errorf("OutlierDetectionLoadBalancingConfig.SuccessRateEjection.enforcement_percentage = %v; must be <= 100", lbCfg.SuccessRateEjection.EnforcementPercentage)
   126  	case lbCfg.FailurePercentageEjection != nil && lbCfg.FailurePercentageEjection.Threshold > 100:
   127  		return nil, fmt.Errorf("OutlierDetectionLoadBalancingConfig.FailurePercentageEjection.threshold = %v; must be <= 100", lbCfg.FailurePercentageEjection.Threshold)
   128  	case lbCfg.FailurePercentageEjection != nil && lbCfg.FailurePercentageEjection.EnforcementPercentage > 100:
   129  		return nil, fmt.Errorf("OutlierDetectionLoadBalancingConfig.FailurePercentageEjection.enforcement_percentage = %v; must be <= 100", lbCfg.FailurePercentageEjection.EnforcementPercentage)
   130  	}
   131  	return lbCfg, nil
   132  }
   133  
   134  func (bb) Name() string {
   135  	return Name
   136  }
   137  
   138  // scUpdate wraps a subConn update to be sent to the child balancer.
   139  type scUpdate struct {
   140  	scw   *subConnWrapper
   141  	state balancer.SubConnState
   142  }
   143  
   144  type ejectionUpdate struct {
   145  	scw       *subConnWrapper
   146  	isEjected bool // true for ejected, false for unejected
   147  }
   148  
   149  type lbCfgUpdate struct {
   150  	lbCfg *LBConfig
   151  	// to make sure picker is updated synchronously.
   152  	done chan struct{}
   153  }
   154  
   155  type outlierDetectionBalancer struct {
   156  	// These fields are safe to be accessed without holding any mutex because
   157  	// they are synchronized in run(), which makes these field accesses happen
   158  	// serially.
   159  	//
   160  	// childState is the latest balancer state received from the child.
   161  	childState balancer.State
   162  	// recentPickerNoop represents whether the most recent picker sent upward to
   163  	// the balancer.ClientConn is a noop picker, which doesn't count RPC's. Used
   164  	// to suppress redundant picker updates.
   165  	recentPickerNoop bool
   166  
   167  	closed         *grpcsync.Event
   168  	done           *grpcsync.Event
   169  	cc             balancer.ClientConn
   170  	logger         *grpclog.PrefixLogger
   171  	channelzParent channelz.Identifier
   172  
   173  	// childMu guards calls into child (to uphold the balancer.Balancer API
   174  	// guarantee of synchronous calls).
   175  	childMu sync.Mutex
   176  	child   *gracefulswitch.Balancer
   177  
   178  	// mu guards access to the following fields. It also helps to synchronize
   179  	// behaviors of the following events: config updates, firing of the interval
   180  	// timer, SubConn State updates, SubConn address updates, and child state
   181  	// updates.
   182  	//
   183  	// For example, when we receive a config update in the middle of the
   184  	// interval timer algorithm, which uses knobs present in the config, the
   185  	// balancer will wait for the interval timer algorithm to finish before
   186  	// persisting the new configuration.
   187  	//
   188  	// Another example would be the updating of the addrs map, such as from a
   189  	// SubConn address update in the middle of the interval timer algorithm
   190  	// which uses addrs. This balancer waits for the interval timer algorithm to
   191  	// finish before making the update to the addrs map.
   192  	//
   193  	// This mutex is never held at the same time as childMu (within the context
   194  	// of a single goroutine).
   195  	mu                    sync.Mutex
   196  	addrs                 map[string]*addressInfo
   197  	cfg                   *LBConfig
   198  	scWrappers            map[balancer.SubConn]*subConnWrapper
   199  	timerStartTime        time.Time
   200  	intervalTimer         *time.Timer
   201  	inhibitPickerUpdates  bool
   202  	updateUnconditionally bool
   203  	numAddrsEjected       int // For fast calculations of percentage of addrs ejected
   204  
   205  	scUpdateCh     *buffer.Unbounded
   206  	pickerUpdateCh *buffer.Unbounded
   207  }
   208  
   209  // noopConfig returns whether this balancer is configured with a logical no-op
   210  // configuration or not.
   211  //
   212  // Caller must hold b.mu.
   213  func (b *outlierDetectionBalancer) noopConfig() bool {
   214  	return b.cfg.SuccessRateEjection == nil && b.cfg.FailurePercentageEjection == nil
   215  }
   216  
   217  // onIntervalConfig handles logic required specifically on the receipt of a
   218  // configuration which specifies to count RPC's and periodically perform passive
   219  // health checking based on heuristics defined in configuration every configured
   220  // interval.
   221  //
   222  // Caller must hold b.mu.
   223  func (b *outlierDetectionBalancer) onIntervalConfig() {
   224  	var interval time.Duration
   225  	if b.timerStartTime.IsZero() {
   226  		b.timerStartTime = time.Now()
   227  		for _, addrInfo := range b.addrs {
   228  			addrInfo.callCounter.clear()
   229  		}
   230  		interval = time.Duration(b.cfg.Interval)
   231  	} else {
   232  		interval = time.Duration(b.cfg.Interval) - now().Sub(b.timerStartTime)
   233  		if interval < 0 {
   234  			interval = 0
   235  		}
   236  	}
   237  	b.intervalTimer = afterFunc(interval, b.intervalTimerAlgorithm)
   238  }
   239  
   240  // onNoopConfig handles logic required specifically on the receipt of a
   241  // configuration which specifies the balancer to be a noop.
   242  //
   243  // Caller must hold b.mu.
   244  func (b *outlierDetectionBalancer) onNoopConfig() {
   245  	// "If a config is provided with both the `success_rate_ejection` and
   246  	// `failure_percentage_ejection` fields unset, skip starting the timer and
   247  	// do the following:"
   248  	// "Unset the timer start timestamp."
   249  	b.timerStartTime = time.Time{}
   250  	for _, addrInfo := range b.addrs {
   251  		// "Uneject all currently ejected addresses."
   252  		if !addrInfo.latestEjectionTimestamp.IsZero() {
   253  			b.unejectAddress(addrInfo)
   254  		}
   255  		// "Reset each address's ejection time multiplier to 0."
   256  		addrInfo.ejectionTimeMultiplier = 0
   257  	}
   258  }
   259  
   260  func (b *outlierDetectionBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
   261  	lbCfg, ok := s.BalancerConfig.(*LBConfig)
   262  	if !ok {
   263  		b.logger.Errorf("received config with unexpected type %T: %v", s.BalancerConfig, s.BalancerConfig)
   264  		return balancer.ErrBadResolverState
   265  	}
   266  
   267  	// Reject whole config if child policy doesn't exist, don't persist it for
   268  	// later.
   269  	bb := balancer.Get(lbCfg.ChildPolicy.Name)
   270  	if bb == nil {
   271  		return fmt.Errorf("outlier detection: child balancer %q not registered", lbCfg.ChildPolicy.Name)
   272  	}
   273  
   274  	// It is safe to read b.cfg here without holding the mutex, as the only
   275  	// write to b.cfg happens later in this function. This function is part of
   276  	// the balancer.Balancer API, so it is guaranteed to be called in a
   277  	// synchronous manner, so it cannot race with this read.
   278  	if b.cfg == nil || b.cfg.ChildPolicy.Name != lbCfg.ChildPolicy.Name {
   279  		b.childMu.Lock()
   280  		err := b.child.SwitchTo(bb)
   281  		if err != nil {
   282  			b.childMu.Unlock()
   283  			return fmt.Errorf("outlier detection: error switching to child of type %q: %v", lbCfg.ChildPolicy.Name, err)
   284  		}
   285  		b.childMu.Unlock()
   286  	}
   287  
   288  	b.mu.Lock()
   289  	// Inhibit child picker updates until this UpdateClientConnState() call
   290  	// completes. If needed, a picker update containing the no-op config bit
   291  	// determined from this config and most recent state from the child will be
   292  	// sent synchronously upward at the end of this UpdateClientConnState()
   293  	// call.
   294  	b.inhibitPickerUpdates = true
   295  	b.updateUnconditionally = false
   296  	b.cfg = lbCfg
   297  
   298  	addrs := make(map[string]bool, len(s.ResolverState.Addresses))
   299  	for _, addr := range s.ResolverState.Addresses {
   300  		addrs[addr.Addr] = true
   301  		if _, ok := b.addrs[addr.Addr]; !ok {
   302  			b.addrs[addr.Addr] = newAddressInfo()
   303  		}
   304  	}
   305  	for addr := range b.addrs {
   306  		if !addrs[addr] {
   307  			delete(b.addrs, addr)
   308  		}
   309  	}
   310  
   311  	if b.intervalTimer != nil {
   312  		b.intervalTimer.Stop()
   313  	}
   314  
   315  	if b.noopConfig() {
   316  		b.onNoopConfig()
   317  	} else {
   318  		b.onIntervalConfig()
   319  	}
   320  	b.mu.Unlock()
   321  
   322  	b.childMu.Lock()
   323  	err := b.child.UpdateClientConnState(balancer.ClientConnState{
   324  		ResolverState:  s.ResolverState,
   325  		BalancerConfig: b.cfg.ChildPolicy.Config,
   326  	})
   327  	b.childMu.Unlock()
   328  
   329  	done := make(chan struct{})
   330  	b.pickerUpdateCh.Put(lbCfgUpdate{
   331  		lbCfg: lbCfg,
   332  		done:  done,
   333  	})
   334  	<-done
   335  
   336  	return err
   337  }
   338  
   339  func (b *outlierDetectionBalancer) ResolverError(err error) {
   340  	b.childMu.Lock()
   341  	defer b.childMu.Unlock()
   342  	b.child.ResolverError(err)
   343  }
   344  
   345  func (b *outlierDetectionBalancer) updateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
   346  	b.mu.Lock()
   347  	defer b.mu.Unlock()
   348  	scw, ok := b.scWrappers[sc]
   349  	if !ok {
   350  		// Shouldn't happen if passed down a SubConnWrapper to child on SubConn
   351  		// creation.
   352  		b.logger.Errorf("UpdateSubConnState called with SubConn that has no corresponding SubConnWrapper")
   353  		return
   354  	}
   355  	if state.ConnectivityState == connectivity.Shutdown {
   356  		delete(b.scWrappers, scw.SubConn)
   357  	}
   358  	b.scUpdateCh.Put(&scUpdate{
   359  		scw:   scw,
   360  		state: state,
   361  	})
   362  }
   363  
   364  func (b *outlierDetectionBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
   365  	b.logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", sc, state)
   366  }
   367  
   368  func (b *outlierDetectionBalancer) Close() {
   369  	b.closed.Fire()
   370  	<-b.done.Done()
   371  	b.childMu.Lock()
   372  	b.child.Close()
   373  	b.childMu.Unlock()
   374  
   375  	b.scUpdateCh.Close()
   376  	b.pickerUpdateCh.Close()
   377  
   378  	b.mu.Lock()
   379  	defer b.mu.Unlock()
   380  	if b.intervalTimer != nil {
   381  		b.intervalTimer.Stop()
   382  	}
   383  }
   384  
   385  func (b *outlierDetectionBalancer) ExitIdle() {
   386  	b.childMu.Lock()
   387  	defer b.childMu.Unlock()
   388  	b.child.ExitIdle()
   389  }
   390  
   391  // wrappedPicker delegates to the child policy's picker, and when the request
   392  // finishes, it increments the corresponding counter in the map entry referenced
   393  // by the subConnWrapper that was picked. If both the `success_rate_ejection`
   394  // and `failure_percentage_ejection` fields are unset in the configuration, this
   395  // picker will not count.
   396  type wrappedPicker struct {
   397  	childPicker balancer.Picker
   398  	noopPicker  bool
   399  }
   400  
   401  func (wp *wrappedPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
   402  	pr, err := wp.childPicker.Pick(info)
   403  	if err != nil {
   404  		return balancer.PickResult{}, err
   405  	}
   406  
   407  	done := func(di balancer.DoneInfo) {
   408  		if !wp.noopPicker {
   409  			incrementCounter(pr.SubConn, di)
   410  		}
   411  		if pr.Done != nil {
   412  			pr.Done(di)
   413  		}
   414  	}
   415  	scw, ok := pr.SubConn.(*subConnWrapper)
   416  	if !ok {
   417  		// This can never happen, but check is present for defensive
   418  		// programming.
   419  		logger.Errorf("Picked SubConn from child picker is not a SubConnWrapper")
   420  		return balancer.PickResult{
   421  			SubConn:  pr.SubConn,
   422  			Done:     done,
   423  			Metadata: pr.Metadata,
   424  		}, nil
   425  	}
   426  	return balancer.PickResult{
   427  		SubConn:  scw.SubConn,
   428  		Done:     done,
   429  		Metadata: pr.Metadata,
   430  	}, nil
   431  }
   432  
   433  func incrementCounter(sc balancer.SubConn, info balancer.DoneInfo) {
   434  	scw, ok := sc.(*subConnWrapper)
   435  	if !ok {
   436  		// Shouldn't happen, as comes from child
   437  		return
   438  	}
   439  
   440  	// scw.addressInfo and callCounter.activeBucket can be written to
   441  	// concurrently (the pointers themselves). Thus, protect the reads here with
   442  	// atomics to prevent data corruption. There exists a race in which you read
   443  	// the addressInfo or active bucket pointer and then that pointer points to
   444  	// deprecated memory. If this goroutine yields the processor, in between
   445  	// reading the addressInfo pointer and writing to the active bucket,
   446  	// UpdateAddresses can switch the addressInfo the scw points to. Writing to
   447  	// an outdated addresses is a very small race and tolerable. After reading
   448  	// callCounter.activeBucket in this picker a swap call can concurrently
   449  	// change what activeBucket points to. A50 says to swap the pointer, which
   450  	// will cause this race to write to deprecated memory the interval timer
   451  	// algorithm will never read, which makes this race alright.
   452  	addrInfo := (*addressInfo)(atomic.LoadPointer(&scw.addressInfo))
   453  	if addrInfo == nil {
   454  		return
   455  	}
   456  	ab := (*bucket)(atomic.LoadPointer(&addrInfo.callCounter.activeBucket))
   457  
   458  	if info.Err == nil {
   459  		atomic.AddUint32(&ab.numSuccesses, 1)
   460  	} else {
   461  		atomic.AddUint32(&ab.numFailures, 1)
   462  	}
   463  }
   464  
   465  func (b *outlierDetectionBalancer) UpdateState(s balancer.State) {
   466  	b.pickerUpdateCh.Put(s)
   467  }
   468  
   469  func (b *outlierDetectionBalancer) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
   470  	var sc balancer.SubConn
   471  	oldListener := opts.StateListener
   472  	opts.StateListener = func(state balancer.SubConnState) { b.updateSubConnState(sc, state) }
   473  	sc, err := b.cc.NewSubConn(addrs, opts)
   474  	if err != nil {
   475  		return nil, err
   476  	}
   477  	scw := &subConnWrapper{
   478  		SubConn:    sc,
   479  		addresses:  addrs,
   480  		scUpdateCh: b.scUpdateCh,
   481  		listener:   oldListener,
   482  	}
   483  	b.mu.Lock()
   484  	defer b.mu.Unlock()
   485  	b.scWrappers[sc] = scw
   486  	if len(addrs) != 1 {
   487  		return scw, nil
   488  	}
   489  	addrInfo, ok := b.addrs[addrs[0].Addr]
   490  	if !ok {
   491  		return scw, nil
   492  	}
   493  	addrInfo.sws = append(addrInfo.sws, scw)
   494  	atomic.StorePointer(&scw.addressInfo, unsafe.Pointer(addrInfo))
   495  	if !addrInfo.latestEjectionTimestamp.IsZero() {
   496  		scw.eject()
   497  	}
   498  	return scw, nil
   499  }
   500  
   501  func (b *outlierDetectionBalancer) RemoveSubConn(sc balancer.SubConn) {
   502  	b.logger.Errorf("RemoveSubConn(%v) called unexpectedly", sc)
   503  }
   504  
   505  // appendIfPresent appends the scw to the address, if the address is present in
   506  // the Outlier Detection balancers address map. Returns nil if not present, and
   507  // the map entry if present.
   508  //
   509  // Caller must hold b.mu.
   510  func (b *outlierDetectionBalancer) appendIfPresent(addr string, scw *subConnWrapper) *addressInfo {
   511  	addrInfo, ok := b.addrs[addr]
   512  	if !ok {
   513  		return nil
   514  	}
   515  
   516  	addrInfo.sws = append(addrInfo.sws, scw)
   517  	atomic.StorePointer(&scw.addressInfo, unsafe.Pointer(addrInfo))
   518  	return addrInfo
   519  }
   520  
   521  // removeSubConnFromAddressesMapEntry removes the scw from its map entry if
   522  // present.
   523  //
   524  // Caller must hold b.mu.
   525  func (b *outlierDetectionBalancer) removeSubConnFromAddressesMapEntry(scw *subConnWrapper) {
   526  	addrInfo := (*addressInfo)(atomic.LoadPointer(&scw.addressInfo))
   527  	if addrInfo == nil {
   528  		return
   529  	}
   530  	for i, sw := range addrInfo.sws {
   531  		if scw == sw {
   532  			addrInfo.sws = append(addrInfo.sws[:i], addrInfo.sws[i+1:]...)
   533  			return
   534  		}
   535  	}
   536  }
   537  
   538  func (b *outlierDetectionBalancer) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) {
   539  	scw, ok := sc.(*subConnWrapper)
   540  	if !ok {
   541  		// Return, shouldn't happen if passed up scw
   542  		return
   543  	}
   544  
   545  	b.cc.UpdateAddresses(scw.SubConn, addrs)
   546  	b.mu.Lock()
   547  	defer b.mu.Unlock()
   548  
   549  	// Note that 0 addresses is a valid update/state for a SubConn to be in.
   550  	// This is correctly handled by this algorithm (handled as part of a non singular
   551  	// old address/new address).
   552  	switch {
   553  	case len(scw.addresses) == 1 && len(addrs) == 1: // single address to single address
   554  		// If the updated address is the same, then there is nothing to do
   555  		// past this point.
   556  		if scw.addresses[0].Addr == addrs[0].Addr {
   557  			return
   558  		}
   559  		b.removeSubConnFromAddressesMapEntry(scw)
   560  		addrInfo := b.appendIfPresent(addrs[0].Addr, scw)
   561  		if addrInfo == nil { // uneject unconditionally because could have come from an ejected address
   562  			scw.uneject()
   563  			break
   564  		}
   565  		if addrInfo.latestEjectionTimestamp.IsZero() { // relay new updated subconn state
   566  			scw.uneject()
   567  		} else {
   568  			scw.eject()
   569  		}
   570  	case len(scw.addresses) == 1: // single address to multiple/no addresses
   571  		b.removeSubConnFromAddressesMapEntry(scw)
   572  		addrInfo := (*addressInfo)(atomic.LoadPointer(&scw.addressInfo))
   573  		if addrInfo != nil {
   574  			addrInfo.callCounter.clear()
   575  		}
   576  		scw.uneject()
   577  	case len(addrs) == 1: // multiple/no addresses to single address
   578  		addrInfo := b.appendIfPresent(addrs[0].Addr, scw)
   579  		if addrInfo != nil && !addrInfo.latestEjectionTimestamp.IsZero() {
   580  			scw.eject()
   581  		}
   582  	} // otherwise multiple/no addresses to multiple/no addresses; ignore
   583  
   584  	scw.addresses = addrs
   585  }
   586  
   587  func (b *outlierDetectionBalancer) ResolveNow(opts resolver.ResolveNowOptions) {
   588  	b.cc.ResolveNow(opts)
   589  }
   590  
   591  func (b *outlierDetectionBalancer) Target() string {
   592  	return b.cc.Target()
   593  }
   594  
   595  func max(x, y time.Duration) time.Duration {
   596  	if x < y {
   597  		return y
   598  	}
   599  	return x
   600  }
   601  
   602  func min(x, y time.Duration) time.Duration {
   603  	if x < y {
   604  		return x
   605  	}
   606  	return y
   607  }
   608  
   609  // handleSubConnUpdate stores the recent state and forward the update
   610  // if the SubConn is not ejected.
   611  func (b *outlierDetectionBalancer) handleSubConnUpdate(u *scUpdate) {
   612  	scw := u.scw
   613  	scw.latestState = u.state
   614  	if !scw.ejected {
   615  		if scw.listener != nil {
   616  			b.childMu.Lock()
   617  			scw.listener(u.state)
   618  			b.childMu.Unlock()
   619  		}
   620  	}
   621  }
   622  
   623  // handleEjectedUpdate handles any SubConns that get ejected/unejected, and
   624  // forwards the appropriate corresponding subConnState to the child policy.
   625  func (b *outlierDetectionBalancer) handleEjectedUpdate(u *ejectionUpdate) {
   626  	scw := u.scw
   627  	scw.ejected = u.isEjected
   628  	// If scw.latestState has never been written to will default to connectivity
   629  	// IDLE, which is fine.
   630  	stateToUpdate := scw.latestState
   631  	if u.isEjected {
   632  		stateToUpdate = balancer.SubConnState{
   633  			ConnectivityState: connectivity.TransientFailure,
   634  		}
   635  	}
   636  	if scw.listener != nil {
   637  		b.childMu.Lock()
   638  		scw.listener(stateToUpdate)
   639  		b.childMu.Unlock()
   640  	}
   641  }
   642  
   643  // handleChildStateUpdate forwards the picker update wrapped in a wrapped picker
   644  // with the noop picker bit present.
   645  func (b *outlierDetectionBalancer) handleChildStateUpdate(u balancer.State) {
   646  	b.childState = u
   647  	b.mu.Lock()
   648  	if b.inhibitPickerUpdates {
   649  		// If a child's state is updated during the suppression of child
   650  		// updates, the synchronous handleLBConfigUpdate function with respect
   651  		// to UpdateClientConnState should return a picker unconditionally.
   652  		b.updateUnconditionally = true
   653  		b.mu.Unlock()
   654  		return
   655  	}
   656  	noopCfg := b.noopConfig()
   657  	b.mu.Unlock()
   658  	b.recentPickerNoop = noopCfg
   659  	b.cc.UpdateState(balancer.State{
   660  		ConnectivityState: b.childState.ConnectivityState,
   661  		Picker: &wrappedPicker{
   662  			childPicker: b.childState.Picker,
   663  			noopPicker:  noopCfg,
   664  		},
   665  	})
   666  }
   667  
   668  // handleLBConfigUpdate compares whether the new config is a noop config or not,
   669  // to the noop bit in the picker if present. It updates the picker if this bit
   670  // changed compared to the picker currently in use.
   671  func (b *outlierDetectionBalancer) handleLBConfigUpdate(u lbCfgUpdate) {
   672  	lbCfg := u.lbCfg
   673  	noopCfg := lbCfg.SuccessRateEjection == nil && lbCfg.FailurePercentageEjection == nil
   674  	// If the child has sent it's first update and this config flips the noop
   675  	// bit compared to the most recent picker update sent upward, then a new
   676  	// picker with this updated bit needs to be forwarded upward. If a child
   677  	// update was received during the suppression of child updates within
   678  	// UpdateClientConnState(), then a new picker needs to be forwarded with
   679  	// this updated state, irregardless of whether this new configuration flips
   680  	// the bit.
   681  	if b.childState.Picker != nil && noopCfg != b.recentPickerNoop || b.updateUnconditionally {
   682  		b.recentPickerNoop = noopCfg
   683  		b.cc.UpdateState(balancer.State{
   684  			ConnectivityState: b.childState.ConnectivityState,
   685  			Picker: &wrappedPicker{
   686  				childPicker: b.childState.Picker,
   687  				noopPicker:  noopCfg,
   688  			},
   689  		})
   690  	}
   691  	b.inhibitPickerUpdates = false
   692  	b.updateUnconditionally = false
   693  	close(u.done)
   694  }
   695  
   696  func (b *outlierDetectionBalancer) run() {
   697  	defer b.done.Fire()
   698  	for {
   699  		select {
   700  		case update, ok := <-b.scUpdateCh.Get():
   701  			if !ok {
   702  				return
   703  			}
   704  			b.scUpdateCh.Load()
   705  			if b.closed.HasFired() { // don't send SubConn updates to child after the balancer has been closed
   706  				return
   707  			}
   708  			switch u := update.(type) {
   709  			case *scUpdate:
   710  				b.handleSubConnUpdate(u)
   711  			case *ejectionUpdate:
   712  				b.handleEjectedUpdate(u)
   713  			}
   714  		case update, ok := <-b.pickerUpdateCh.Get():
   715  			if !ok {
   716  				return
   717  			}
   718  			b.pickerUpdateCh.Load()
   719  			if b.closed.HasFired() { // don't send picker updates to grpc after the balancer has been closed
   720  				return
   721  			}
   722  			switch u := update.(type) {
   723  			case balancer.State:
   724  				b.handleChildStateUpdate(u)
   725  			case lbCfgUpdate:
   726  				b.handleLBConfigUpdate(u)
   727  			}
   728  		case <-b.closed.Done():
   729  			return
   730  		}
   731  	}
   732  }
   733  
   734  // intervalTimerAlgorithm ejects and unejects addresses based on the Outlier
   735  // Detection configuration and data about each address from the previous
   736  // interval.
   737  func (b *outlierDetectionBalancer) intervalTimerAlgorithm() {
   738  	b.mu.Lock()
   739  	defer b.mu.Unlock()
   740  	b.timerStartTime = time.Now()
   741  
   742  	for _, addrInfo := range b.addrs {
   743  		addrInfo.callCounter.swap()
   744  	}
   745  
   746  	if b.cfg.SuccessRateEjection != nil {
   747  		b.successRateAlgorithm()
   748  	}
   749  
   750  	if b.cfg.FailurePercentageEjection != nil {
   751  		b.failurePercentageAlgorithm()
   752  	}
   753  
   754  	for _, addrInfo := range b.addrs {
   755  		if addrInfo.latestEjectionTimestamp.IsZero() && addrInfo.ejectionTimeMultiplier > 0 {
   756  			addrInfo.ejectionTimeMultiplier--
   757  			continue
   758  		}
   759  		if addrInfo.latestEjectionTimestamp.IsZero() {
   760  			// Address is already not ejected, so no need to check for whether
   761  			// to uneject the address below.
   762  			continue
   763  		}
   764  		et := time.Duration(b.cfg.BaseEjectionTime) * time.Duration(addrInfo.ejectionTimeMultiplier)
   765  		met := max(time.Duration(b.cfg.BaseEjectionTime), time.Duration(b.cfg.MaxEjectionTime))
   766  		uet := addrInfo.latestEjectionTimestamp.Add(min(et, met))
   767  		if now().After(uet) {
   768  			b.unejectAddress(addrInfo)
   769  		}
   770  	}
   771  
   772  	// This conditional only for testing (since the interval timer algorithm is
   773  	// called manually), will never hit in production.
   774  	if b.intervalTimer != nil {
   775  		b.intervalTimer.Stop()
   776  	}
   777  	b.intervalTimer = afterFunc(time.Duration(b.cfg.Interval), b.intervalTimerAlgorithm)
   778  }
   779  
   780  // addrsWithAtLeastRequestVolume returns a slice of address information of all
   781  // addresses with at least request volume passed in.
   782  //
   783  // Caller must hold b.mu.
   784  func (b *outlierDetectionBalancer) addrsWithAtLeastRequestVolume(requestVolume uint32) []*addressInfo {
   785  	var addrs []*addressInfo
   786  	for _, addrInfo := range b.addrs {
   787  		bucket := addrInfo.callCounter.inactiveBucket
   788  		rv := bucket.numSuccesses + bucket.numFailures
   789  		if rv >= requestVolume {
   790  			addrs = append(addrs, addrInfo)
   791  		}
   792  	}
   793  	return addrs
   794  }
   795  
   796  // meanAndStdDev returns the mean and std dev of the fractions of successful
   797  // requests of the addresses passed in.
   798  //
   799  // Caller must hold b.mu.
   800  func (b *outlierDetectionBalancer) meanAndStdDev(addrs []*addressInfo) (float64, float64) {
   801  	var totalFractionOfSuccessfulRequests float64
   802  	var mean float64
   803  	for _, addrInfo := range addrs {
   804  		bucket := addrInfo.callCounter.inactiveBucket
   805  		rv := bucket.numSuccesses + bucket.numFailures
   806  		totalFractionOfSuccessfulRequests += float64(bucket.numSuccesses) / float64(rv)
   807  	}
   808  	mean = totalFractionOfSuccessfulRequests / float64(len(addrs))
   809  	var sumOfSquares float64
   810  	for _, addrInfo := range addrs {
   811  		bucket := addrInfo.callCounter.inactiveBucket
   812  		rv := bucket.numSuccesses + bucket.numFailures
   813  		devFromMean := (float64(bucket.numSuccesses) / float64(rv)) - mean
   814  		sumOfSquares += devFromMean * devFromMean
   815  	}
   816  	variance := sumOfSquares / float64(len(addrs))
   817  	return mean, math.Sqrt(variance)
   818  }
   819  
   820  // successRateAlgorithm ejects any addresses where the success rate falls below
   821  // the other addresses according to mean and standard deviation, and if overall
   822  // applicable from other set heuristics.
   823  //
   824  // Caller must hold b.mu.
   825  func (b *outlierDetectionBalancer) successRateAlgorithm() {
   826  	addrsToConsider := b.addrsWithAtLeastRequestVolume(b.cfg.SuccessRateEjection.RequestVolume)
   827  	if len(addrsToConsider) < int(b.cfg.SuccessRateEjection.MinimumHosts) {
   828  		return
   829  	}
   830  	mean, stddev := b.meanAndStdDev(addrsToConsider)
   831  	for _, addrInfo := range addrsToConsider {
   832  		bucket := addrInfo.callCounter.inactiveBucket
   833  		ejectionCfg := b.cfg.SuccessRateEjection
   834  		if float64(b.numAddrsEjected)/float64(len(b.addrs))*100 >= float64(b.cfg.MaxEjectionPercent) {
   835  			return
   836  		}
   837  		successRate := float64(bucket.numSuccesses) / float64(bucket.numSuccesses+bucket.numFailures)
   838  		requiredSuccessRate := mean - stddev*(float64(ejectionCfg.StdevFactor)/1000)
   839  		if successRate < requiredSuccessRate {
   840  			channelz.Infof(logger, b.channelzParent, "SuccessRate algorithm detected outlier: %s. Parameters: successRate=%f, mean=%f, stddev=%f, requiredSuccessRate=%f", addrInfo, successRate, mean, stddev, requiredSuccessRate)
   841  			if uint32(grpcrand.Int31n(100)) < ejectionCfg.EnforcementPercentage {
   842  				b.ejectAddress(addrInfo)
   843  			}
   844  		}
   845  	}
   846  }
   847  
   848  // failurePercentageAlgorithm ejects any addresses where the failure percentage
   849  // rate exceeds a set enforcement percentage, if overall applicable from other
   850  // set heuristics.
   851  //
   852  // Caller must hold b.mu.
   853  func (b *outlierDetectionBalancer) failurePercentageAlgorithm() {
   854  	addrsToConsider := b.addrsWithAtLeastRequestVolume(b.cfg.FailurePercentageEjection.RequestVolume)
   855  	if len(addrsToConsider) < int(b.cfg.FailurePercentageEjection.MinimumHosts) {
   856  		return
   857  	}
   858  
   859  	for _, addrInfo := range addrsToConsider {
   860  		bucket := addrInfo.callCounter.inactiveBucket
   861  		ejectionCfg := b.cfg.FailurePercentageEjection
   862  		if float64(b.numAddrsEjected)/float64(len(b.addrs))*100 >= float64(b.cfg.MaxEjectionPercent) {
   863  			return
   864  		}
   865  		failurePercentage := (float64(bucket.numFailures) / float64(bucket.numSuccesses+bucket.numFailures)) * 100
   866  		if failurePercentage > float64(b.cfg.FailurePercentageEjection.Threshold) {
   867  			channelz.Infof(logger, b.channelzParent, "FailurePercentage algorithm detected outlier: %s, failurePercentage=%f", addrInfo, failurePercentage)
   868  			if uint32(grpcrand.Int31n(100)) < ejectionCfg.EnforcementPercentage {
   869  				b.ejectAddress(addrInfo)
   870  			}
   871  		}
   872  	}
   873  }
   874  
   875  // Caller must hold b.mu.
   876  func (b *outlierDetectionBalancer) ejectAddress(addrInfo *addressInfo) {
   877  	b.numAddrsEjected++
   878  	addrInfo.latestEjectionTimestamp = b.timerStartTime
   879  	addrInfo.ejectionTimeMultiplier++
   880  	for _, sbw := range addrInfo.sws {
   881  		sbw.eject()
   882  		channelz.Infof(logger, b.channelzParent, "Subchannel ejected: %s", sbw)
   883  	}
   884  
   885  }
   886  
   887  // Caller must hold b.mu.
   888  func (b *outlierDetectionBalancer) unejectAddress(addrInfo *addressInfo) {
   889  	b.numAddrsEjected--
   890  	addrInfo.latestEjectionTimestamp = time.Time{}
   891  	for _, sbw := range addrInfo.sws {
   892  		sbw.uneject()
   893  		channelz.Infof(logger, b.channelzParent, "Subchannel unejected: %s", sbw)
   894  	}
   895  }
   896  
   897  // addressInfo contains the runtime information about an address that pertains
   898  // to Outlier Detection. This struct and all of its fields is protected by
   899  // outlierDetectionBalancer.mu in the case where it is accessed through the
   900  // address map. In the case of Picker callbacks, the writes to the activeBucket
   901  // of callCounter are protected by atomically loading and storing
   902  // unsafe.Pointers (see further explanation in incrementCounter()).
   903  type addressInfo struct {
   904  	// The call result counter object.
   905  	callCounter *callCounter
   906  
   907  	// The latest ejection timestamp, or zero if the address is currently not
   908  	// ejected.
   909  	latestEjectionTimestamp time.Time
   910  
   911  	// The current ejection time multiplier, starting at 0.
   912  	ejectionTimeMultiplier int64
   913  
   914  	// A list of subchannel wrapper objects that correspond to this address.
   915  	sws []*subConnWrapper
   916  }
   917  
   918  func (a *addressInfo) String() string {
   919  	var res strings.Builder
   920  	res.WriteString("[")
   921  	for _, sw := range a.sws {
   922  		res.WriteString(sw.String())
   923  	}
   924  	res.WriteString("]")
   925  	return res.String()
   926  }
   927  
   928  func newAddressInfo() *addressInfo {
   929  	return &addressInfo{
   930  		callCounter: newCallCounter(),
   931  	}
   932  }
   933  

View as plain text