...

Source file src/google.golang.org/grpc/xds/internal/balancer/outlierdetection/balancer_test.go

Documentation: google.golang.org/grpc/xds/internal/balancer/outlierdetection

     1  /*
     2   *
     3   * Copyright 2022 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  package outlierdetection
    20  
    21  import (
    22  	"context"
    23  	"encoding/json"
    24  	"errors"
    25  	"fmt"
    26  	"math"
    27  	"strings"
    28  	"sync"
    29  	"testing"
    30  	"time"
    31  
    32  	"github.com/google/go-cmp/cmp"
    33  	"github.com/google/go-cmp/cmp/cmpopts"
    34  	"google.golang.org/grpc/balancer"
    35  	"google.golang.org/grpc/connectivity"
    36  	"google.golang.org/grpc/internal/balancer/stub"
    37  	"google.golang.org/grpc/internal/channelz"
    38  	"google.golang.org/grpc/internal/grpcsync"
    39  	"google.golang.org/grpc/internal/grpctest"
    40  	iserviceconfig "google.golang.org/grpc/internal/serviceconfig"
    41  	"google.golang.org/grpc/internal/testutils"
    42  	"google.golang.org/grpc/resolver"
    43  	"google.golang.org/grpc/serviceconfig"
    44  	"google.golang.org/grpc/xds/internal/balancer/clusterimpl"
    45  )
    46  
    47  var (
    48  	defaultTestTimeout      = 5 * time.Second
    49  	defaultTestShortTimeout = 10 * time.Millisecond
    50  )
    51  
    52  type s struct {
    53  	grpctest.Tester
    54  }
    55  
    56  func Test(t *testing.T) {
    57  	grpctest.RunSubTests(t, s{})
    58  }
    59  
    60  // TestParseConfig verifies the ParseConfig() method in the Outlier Detection
    61  // Balancer.
    62  func (s) TestParseConfig(t *testing.T) {
    63  	const errParseConfigName = "errParseConfigBalancer"
    64  	stub.Register(errParseConfigName, stub.BalancerFuncs{
    65  		ParseConfig: func(json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
    66  			return nil, errors.New("some error")
    67  		},
    68  	})
    69  
    70  	parser := bb{}
    71  	const (
    72  		defaultInterval                       = iserviceconfig.Duration(10 * time.Second)
    73  		defaultBaseEjectionTime               = iserviceconfig.Duration(30 * time.Second)
    74  		defaultMaxEjectionTime                = iserviceconfig.Duration(300 * time.Second)
    75  		defaultMaxEjectionPercent             = 10
    76  		defaultSuccessRateStdevFactor         = 1900
    77  		defaultEnforcingSuccessRate           = 100
    78  		defaultSuccessRateMinimumHosts        = 5
    79  		defaultSuccessRateRequestVolume       = 100
    80  		defaultFailurePercentageThreshold     = 85
    81  		defaultEnforcingFailurePercentage     = 0
    82  		defaultFailurePercentageMinimumHosts  = 5
    83  		defaultFailurePercentageRequestVolume = 50
    84  	)
    85  	tests := []struct {
    86  		name    string
    87  		input   string
    88  		wantCfg serviceconfig.LoadBalancingConfig
    89  		wantErr string
    90  	}{
    91  		{
    92  			name: "no-fields-set-should-get-default",
    93  			input: `{
    94  				"childPolicy": [
    95  				{
    96  					"xds_cluster_impl_experimental": {
    97  						"cluster": "test_cluster"
    98  					}
    99  				}
   100  				]
   101  			}`,
   102  			wantCfg: &LBConfig{
   103  				Interval:           defaultInterval,
   104  				BaseEjectionTime:   defaultBaseEjectionTime,
   105  				MaxEjectionTime:    defaultMaxEjectionTime,
   106  				MaxEjectionPercent: defaultMaxEjectionPercent,
   107  				ChildPolicy: &iserviceconfig.BalancerConfig{
   108  					Name: "xds_cluster_impl_experimental",
   109  					Config: &clusterimpl.LBConfig{
   110  						Cluster: "test_cluster",
   111  					},
   112  				},
   113  			},
   114  		},
   115  
   116  		{
   117  			name: "some-top-level-fields-set",
   118  			input: `{
   119  				"interval": "15s",
   120  				"maxEjectionTime": "350s",
   121  				"childPolicy": [
   122  				{
   123  					"xds_cluster_impl_experimental": {
   124  						"cluster": "test_cluster"
   125  					}
   126  				}
   127  				]
   128  			}`,
   129  			// Should get set fields + defaults for unset fields.
   130  			wantCfg: &LBConfig{
   131  				Interval:           iserviceconfig.Duration(15 * time.Second),
   132  				BaseEjectionTime:   defaultBaseEjectionTime,
   133  				MaxEjectionTime:    iserviceconfig.Duration(350 * time.Second),
   134  				MaxEjectionPercent: defaultMaxEjectionPercent,
   135  				ChildPolicy: &iserviceconfig.BalancerConfig{
   136  					Name: "xds_cluster_impl_experimental",
   137  					Config: &clusterimpl.LBConfig{
   138  						Cluster: "test_cluster",
   139  					},
   140  				},
   141  			},
   142  		},
   143  		{
   144  			name: "success-rate-ejection-present-but-no-fields",
   145  			input: `{
   146  				"successRateEjection": {},
   147                  "childPolicy": [
   148  				{
   149  					"xds_cluster_impl_experimental": {
   150  						"cluster": "test_cluster"
   151  					}
   152  				}
   153  				]
   154  			}`,
   155  			// Should get defaults of success-rate-ejection struct.
   156  			wantCfg: &LBConfig{
   157  				Interval:           defaultInterval,
   158  				BaseEjectionTime:   defaultBaseEjectionTime,
   159  				MaxEjectionTime:    defaultMaxEjectionTime,
   160  				MaxEjectionPercent: defaultMaxEjectionPercent,
   161  				SuccessRateEjection: &SuccessRateEjection{
   162  					StdevFactor:           defaultSuccessRateStdevFactor,
   163  					EnforcementPercentage: defaultEnforcingSuccessRate,
   164  					MinimumHosts:          defaultSuccessRateMinimumHosts,
   165  					RequestVolume:         defaultSuccessRateRequestVolume,
   166  				},
   167  				ChildPolicy: &iserviceconfig.BalancerConfig{
   168  					Name: "xds_cluster_impl_experimental",
   169  					Config: &clusterimpl.LBConfig{
   170  						Cluster: "test_cluster",
   171  					},
   172  				},
   173  			},
   174  		},
   175  		{
   176  			name: "success-rate-ejection-present-partially-set",
   177  			input: `{
   178  				"successRateEjection": {
   179  					"stdevFactor": 1000,
   180  					"minimumHosts": 5
   181  				},
   182                  "childPolicy": [
   183  				{
   184  					"xds_cluster_impl_experimental": {
   185  						"cluster": "test_cluster"
   186  					}
   187  				}
   188  				]
   189  			}`,
   190  			// Should get set fields + defaults for others in success rate
   191  			// ejection layer.
   192  			wantCfg: &LBConfig{
   193  				Interval:           defaultInterval,
   194  				BaseEjectionTime:   defaultBaseEjectionTime,
   195  				MaxEjectionTime:    defaultMaxEjectionTime,
   196  				MaxEjectionPercent: defaultMaxEjectionPercent,
   197  				SuccessRateEjection: &SuccessRateEjection{
   198  					StdevFactor:           1000,
   199  					EnforcementPercentage: defaultEnforcingSuccessRate,
   200  					MinimumHosts:          5,
   201  					RequestVolume:         defaultSuccessRateRequestVolume,
   202  				},
   203  				ChildPolicy: &iserviceconfig.BalancerConfig{
   204  					Name: "xds_cluster_impl_experimental",
   205  					Config: &clusterimpl.LBConfig{
   206  						Cluster: "test_cluster",
   207  					},
   208  				},
   209  			},
   210  		},
   211  		{
   212  			name: "success-rate-ejection-present-fully-set",
   213  			input: `{
   214  				"successRateEjection": {
   215  					"stdevFactor": 1000,
   216  					"enforcementPercentage": 50,
   217  					"minimumHosts": 5,
   218  					"requestVolume": 50
   219  				},
   220                  "childPolicy": [
   221  				{
   222  					"xds_cluster_impl_experimental": {
   223  						"cluster": "test_cluster"
   224  					}
   225  				}
   226  				]
   227  			}`,
   228  			wantCfg: &LBConfig{
   229  				Interval:           defaultInterval,
   230  				BaseEjectionTime:   defaultBaseEjectionTime,
   231  				MaxEjectionTime:    defaultMaxEjectionTime,
   232  				MaxEjectionPercent: defaultMaxEjectionPercent,
   233  				SuccessRateEjection: &SuccessRateEjection{
   234  					StdevFactor:           1000,
   235  					EnforcementPercentage: 50,
   236  					MinimumHosts:          5,
   237  					RequestVolume:         50,
   238  				},
   239  				ChildPolicy: &iserviceconfig.BalancerConfig{
   240  					Name: "xds_cluster_impl_experimental",
   241  					Config: &clusterimpl.LBConfig{
   242  						Cluster: "test_cluster",
   243  					},
   244  				},
   245  			},
   246  		},
   247  		{
   248  			name: "failure-percentage-ejection-present-but-no-fields",
   249  			input: `{
   250  				"failurePercentageEjection": {},
   251                  "childPolicy": [
   252  				{
   253  					"xds_cluster_impl_experimental": {
   254  						"cluster": "test_cluster"
   255  					}
   256  				}
   257  				]
   258  			}`,
   259  			// Should get defaults of failure percentage ejection layer.
   260  			wantCfg: &LBConfig{
   261  				Interval:           defaultInterval,
   262  				BaseEjectionTime:   defaultBaseEjectionTime,
   263  				MaxEjectionTime:    defaultMaxEjectionTime,
   264  				MaxEjectionPercent: defaultMaxEjectionPercent,
   265  				FailurePercentageEjection: &FailurePercentageEjection{
   266  					Threshold:             defaultFailurePercentageThreshold,
   267  					EnforcementPercentage: defaultEnforcingFailurePercentage,
   268  					MinimumHosts:          defaultFailurePercentageMinimumHosts,
   269  					RequestVolume:         defaultFailurePercentageRequestVolume,
   270  				},
   271  				ChildPolicy: &iserviceconfig.BalancerConfig{
   272  					Name: "xds_cluster_impl_experimental",
   273  					Config: &clusterimpl.LBConfig{
   274  						Cluster: "test_cluster",
   275  					},
   276  				},
   277  			},
   278  		},
   279  		{
   280  			name: "failure-percentage-ejection-present-partially-set",
   281  			input: `{
   282  				"failurePercentageEjection": {
   283  					"threshold": 80,
   284  					"minimumHosts": 10
   285  				},
   286                  "childPolicy": [
   287  				{
   288  					"xds_cluster_impl_experimental": {
   289  						"cluster": "test_cluster"
   290  					}
   291  				}
   292  				]
   293  			}`,
   294  			// Should get set fields + defaults for others in success rate
   295  			// ejection layer.
   296  			wantCfg: &LBConfig{
   297  				Interval:           defaultInterval,
   298  				BaseEjectionTime:   defaultBaseEjectionTime,
   299  				MaxEjectionTime:    defaultMaxEjectionTime,
   300  				MaxEjectionPercent: defaultMaxEjectionPercent,
   301  				FailurePercentageEjection: &FailurePercentageEjection{
   302  					Threshold:             80,
   303  					EnforcementPercentage: defaultEnforcingFailurePercentage,
   304  					MinimumHosts:          10,
   305  					RequestVolume:         defaultFailurePercentageRequestVolume,
   306  				},
   307  				ChildPolicy: &iserviceconfig.BalancerConfig{
   308  					Name: "xds_cluster_impl_experimental",
   309  					Config: &clusterimpl.LBConfig{
   310  						Cluster: "test_cluster",
   311  					},
   312  				},
   313  			},
   314  		},
   315  		{
   316  			name: "failure-percentage-ejection-present-fully-set",
   317  			input: `{
   318  				"failurePercentageEjection": {
   319  					"threshold": 80,
   320  					"enforcementPercentage": 100,
   321  					"minimumHosts": 10,
   322  					"requestVolume": 40
   323                  },
   324                  "childPolicy": [
   325  				{
   326  					"xds_cluster_impl_experimental": {
   327  						"cluster": "test_cluster"
   328  					}
   329  				}
   330  				]
   331  			}`,
   332  			wantCfg: &LBConfig{
   333  				Interval:           defaultInterval,
   334  				BaseEjectionTime:   defaultBaseEjectionTime,
   335  				MaxEjectionTime:    defaultMaxEjectionTime,
   336  				MaxEjectionPercent: defaultMaxEjectionPercent,
   337  				FailurePercentageEjection: &FailurePercentageEjection{
   338  					Threshold:             80,
   339  					EnforcementPercentage: 100,
   340  					MinimumHosts:          10,
   341  					RequestVolume:         40,
   342  				},
   343  				ChildPolicy: &iserviceconfig.BalancerConfig{
   344  					Name: "xds_cluster_impl_experimental",
   345  					Config: &clusterimpl.LBConfig{
   346  						Cluster: "test_cluster",
   347  					},
   348  				},
   349  			},
   350  		},
   351  		{ // to make sure zero values aren't overwritten by defaults
   352  			name: "lb-config-every-field-set-zero-value",
   353  			input: `{
   354  				"interval": "0s",
   355  				"baseEjectionTime": "0s",
   356  				"maxEjectionTime": "0s",
   357  				"maxEjectionPercent": 0,
   358  				"successRateEjection": {
   359  					"stdevFactor": 0,
   360  					"enforcementPercentage": 0,
   361  					"minimumHosts": 0,
   362  					"requestVolume": 0
   363  				},
   364  				"failurePercentageEjection": {
   365  					"threshold": 0,
   366  					"enforcementPercentage": 0,
   367  					"minimumHosts": 0,
   368  					"requestVolume": 0
   369  				},
   370                  "childPolicy": [
   371  				{
   372  					"xds_cluster_impl_experimental": {
   373  						"cluster": "test_cluster"
   374  					}
   375  				}
   376  				]
   377  			}`,
   378  			wantCfg: &LBConfig{
   379  				SuccessRateEjection:       &SuccessRateEjection{},
   380  				FailurePercentageEjection: &FailurePercentageEjection{},
   381  				ChildPolicy: &iserviceconfig.BalancerConfig{
   382  					Name: "xds_cluster_impl_experimental",
   383  					Config: &clusterimpl.LBConfig{
   384  						Cluster: "test_cluster",
   385  					},
   386  				},
   387  			},
   388  		},
   389  		{
   390  			name: "lb-config-every-field-set",
   391  			input: `{
   392  				"interval": "10s",
   393  				"baseEjectionTime": "30s",
   394  				"maxEjectionTime": "300s",
   395  				"maxEjectionPercent": 10,
   396  				"successRateEjection": {
   397  					"stdevFactor": 1900,
   398  					"enforcementPercentage": 100,
   399  					"minimumHosts": 5,
   400  					"requestVolume": 100
   401  				},
   402  				"failurePercentageEjection": {
   403  					"threshold": 85,
   404  					"enforcementPercentage": 5,
   405  					"minimumHosts": 5,
   406  					"requestVolume": 50
   407  				},
   408                  "childPolicy": [
   409  				{
   410  					"xds_cluster_impl_experimental": {
   411  						"cluster": "test_cluster"
   412  					}
   413  				}
   414  				]
   415  			}`,
   416  			wantCfg: &LBConfig{
   417  				Interval:           iserviceconfig.Duration(10 * time.Second),
   418  				BaseEjectionTime:   iserviceconfig.Duration(30 * time.Second),
   419  				MaxEjectionTime:    iserviceconfig.Duration(300 * time.Second),
   420  				MaxEjectionPercent: 10,
   421  				SuccessRateEjection: &SuccessRateEjection{
   422  					StdevFactor:           1900,
   423  					EnforcementPercentage: 100,
   424  					MinimumHosts:          5,
   425  					RequestVolume:         100,
   426  				},
   427  				FailurePercentageEjection: &FailurePercentageEjection{
   428  					Threshold:             85,
   429  					EnforcementPercentage: 5,
   430  					MinimumHosts:          5,
   431  					RequestVolume:         50,
   432  				},
   433  				ChildPolicy: &iserviceconfig.BalancerConfig{
   434  					Name: "xds_cluster_impl_experimental",
   435  					Config: &clusterimpl.LBConfig{
   436  						Cluster: "test_cluster",
   437  					},
   438  				},
   439  			},
   440  		},
   441  		{
   442  			name:    "interval-is-negative",
   443  			input:   `{"interval": "-10s"}`,
   444  			wantErr: "OutlierDetectionLoadBalancingConfig.interval = -10s; must be >= 0",
   445  		},
   446  		{
   447  			name:    "base-ejection-time-is-negative",
   448  			input:   `{"baseEjectionTime": "-10s"}`,
   449  			wantErr: "OutlierDetectionLoadBalancingConfig.base_ejection_time = -10s; must be >= 0",
   450  		},
   451  		{
   452  			name:    "max-ejection-time-is-negative",
   453  			input:   `{"maxEjectionTime": "-10s"}`,
   454  			wantErr: "OutlierDetectionLoadBalancingConfig.max_ejection_time = -10s; must be >= 0",
   455  		},
   456  		{
   457  			name:    "max-ejection-percent-is-greater-than-100",
   458  			input:   `{"maxEjectionPercent": 150}`,
   459  			wantErr: "OutlierDetectionLoadBalancingConfig.max_ejection_percent = 150; must be <= 100",
   460  		},
   461  		{
   462  			name: "enforcement-percentage-success-rate-is-greater-than-100",
   463  			input: `{
   464  				"successRateEjection": {
   465  					"enforcementPercentage": 150
   466  				}
   467  			}`,
   468  			wantErr: "OutlierDetectionLoadBalancingConfig.SuccessRateEjection.enforcement_percentage = 150; must be <= 100",
   469  		},
   470  		{
   471  			name: "failure-percentage-threshold-is-greater-than-100",
   472  			input: `{
   473  				"failurePercentageEjection": {
   474  					"threshold": 150
   475  				}
   476  			}`,
   477  			wantErr: "OutlierDetectionLoadBalancingConfig.FailurePercentageEjection.threshold = 150; must be <= 100",
   478  		},
   479  		{
   480  			name: "enforcement-percentage-failure-percentage-ejection-is-greater-than-100",
   481  			input: `{
   482  				"failurePercentageEjection": {
   483  					"enforcementPercentage": 150
   484  				}
   485  			}`,
   486  			wantErr: "OutlierDetectionLoadBalancingConfig.FailurePercentageEjection.enforcement_percentage = 150; must be <= 100",
   487  		},
   488  		{
   489  			name: "child-policy-present-but-parse-error",
   490  			input: `{
   491  				"childPolicy": [
   492  				{
   493  					"errParseConfigBalancer": {
   494  						"cluster": "test_cluster"
   495  					}
   496  				}
   497  			]
   498  			}`,
   499  			wantErr: "error parsing loadBalancingConfig for policy \"errParseConfigBalancer\"",
   500  		},
   501  		{
   502  			name: "no-supported-child-policy",
   503  			input: `{
   504  				"childPolicy": [
   505  				{
   506  					"doesNotExistBalancer": {
   507  						"cluster": "test_cluster"
   508  					}
   509  				}
   510  			]
   511  			}`,
   512  			wantErr: "invalid loadBalancingConfig: no supported policies found",
   513  		},
   514  	}
   515  	for _, test := range tests {
   516  		t.Run(test.name, func(t *testing.T) {
   517  			gotCfg, gotErr := parser.ParseConfig(json.RawMessage(test.input))
   518  			if gotErr != nil && !strings.Contains(gotErr.Error(), test.wantErr) {
   519  				t.Fatalf("ParseConfig(%v) = %v, wantErr %v", test.input, gotErr, test.wantErr)
   520  			}
   521  			if (gotErr != nil) != (test.wantErr != "") {
   522  				t.Fatalf("ParseConfig(%v) = %v, wantErr %v", test.input, gotErr, test.wantErr)
   523  			}
   524  			if test.wantErr != "" {
   525  				return
   526  			}
   527  			if diff := cmp.Diff(gotCfg, test.wantCfg); diff != "" {
   528  				t.Fatalf("parseConfig(%v) got unexpected output, diff (-got +want): %v", string(test.input), diff)
   529  			}
   530  		})
   531  	}
   532  }
   533  
   534  func (lbc *LBConfig) Equal(lbc2 *LBConfig) bool {
   535  	if !lbc.EqualIgnoringChildPolicy(lbc2) {
   536  		return false
   537  	}
   538  	return cmp.Equal(lbc.ChildPolicy, lbc2.ChildPolicy)
   539  }
   540  
   541  type subConnWithState struct {
   542  	sc    balancer.SubConn
   543  	state balancer.SubConnState
   544  }
   545  
   546  func setup(t *testing.T) (*outlierDetectionBalancer, *testutils.BalancerClientConn, func()) {
   547  	t.Helper()
   548  	builder := balancer.Get(Name)
   549  	if builder == nil {
   550  		t.Fatalf("balancer.Get(%q) returned nil", Name)
   551  	}
   552  	tcc := testutils.NewBalancerClientConn(t)
   553  	ch := channelz.RegisterChannel(nil, "test channel")
   554  	t.Cleanup(func() { channelz.RemoveEntry(ch.ID) })
   555  	odB := builder.Build(tcc, balancer.BuildOptions{ChannelzParent: ch})
   556  	return odB.(*outlierDetectionBalancer), tcc, odB.Close
   557  }
   558  
   559  type emptyChildConfig struct {
   560  	serviceconfig.LoadBalancingConfig
   561  }
   562  
   563  // TestChildBasicOperations tests basic operations of the Outlier Detection
   564  // Balancer and it's interaction with it's child. The following scenarios are
   565  // tested, in a step by step fashion:
   566  // 1. The Outlier Detection Balancer receives it's first good configuration. The
   567  // balancer is expected to create a child and sent the child it's configuration.
   568  // 2. The Outlier Detection Balancer receives new configuration that specifies a
   569  // child's type, and the new type immediately reports READY inline. The first
   570  // child balancer should be closed and the second child balancer should receive
   571  // a config update.
   572  // 3. The Outlier Detection Balancer is closed. The second child balancer should
   573  // be closed.
   574  func (s) TestChildBasicOperations(t *testing.T) {
   575  	bc := emptyChildConfig{}
   576  
   577  	ccsCh := testutils.NewChannel()
   578  	closeCh := testutils.NewChannel()
   579  
   580  	stub.Register(t.Name()+"child1", stub.BalancerFuncs{
   581  		UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
   582  			ccsCh.Send(ccs.BalancerConfig)
   583  			return nil
   584  		},
   585  		Close: func(bd *stub.BalancerData) {
   586  			closeCh.Send(nil)
   587  		},
   588  	})
   589  
   590  	stub.Register(t.Name()+"child2", stub.BalancerFuncs{
   591  		UpdateClientConnState: func(bd *stub.BalancerData, _ balancer.ClientConnState) error {
   592  			// UpdateState inline to READY to complete graceful switch process
   593  			// synchronously from any UpdateClientConnState call.
   594  			bd.ClientConn.UpdateState(balancer.State{
   595  				ConnectivityState: connectivity.Ready,
   596  				Picker:            &testutils.TestConstPicker{},
   597  			})
   598  			ccsCh.Send(nil)
   599  			return nil
   600  		},
   601  		Close: func(bd *stub.BalancerData) {
   602  			closeCh.Send(nil)
   603  		},
   604  	})
   605  
   606  	od, tcc, _ := setup(t)
   607  
   608  	// This first config update should cause a child to be built and forwarded
   609  	// it's first update.
   610  	od.UpdateClientConnState(balancer.ClientConnState{
   611  		BalancerConfig: &LBConfig{
   612  			ChildPolicy: &iserviceconfig.BalancerConfig{
   613  				Name:   t.Name() + "child1",
   614  				Config: bc,
   615  			},
   616  		},
   617  	})
   618  
   619  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   620  	defer cancel()
   621  	cr, err := ccsCh.Receive(ctx)
   622  	if err != nil {
   623  		t.Fatalf("timed out waiting for UpdateClientConnState on the first child balancer: %v", err)
   624  	}
   625  	if _, ok := cr.(emptyChildConfig); !ok {
   626  		t.Fatalf("Received child policy config of type %T, want %T", cr, emptyChildConfig{})
   627  	}
   628  
   629  	// This Update Client Conn State call should cause the first child balancer
   630  	// to close, and a new child to be created and also forwarded it's first
   631  	// config update.
   632  	od.UpdateClientConnState(balancer.ClientConnState{
   633  		BalancerConfig: &LBConfig{
   634  			Interval: math.MaxInt64,
   635  			ChildPolicy: &iserviceconfig.BalancerConfig{
   636  				Name:   t.Name() + "child2",
   637  				Config: emptyChildConfig{},
   638  			},
   639  		},
   640  	})
   641  
   642  	// Verify inline UpdateState() call from the new child eventually makes it's
   643  	// way to the Test Client Conn.
   644  	select {
   645  	case <-ctx.Done():
   646  		t.Fatalf("timeout while waiting for a UpdateState call on the ClientConn")
   647  	case state := <-tcc.NewStateCh:
   648  		if state != connectivity.Ready {
   649  			t.Fatalf("ClientConn received connectivity state %v, want %v", state, connectivity.Ready)
   650  		}
   651  	}
   652  
   653  	// Verify the first child balancer closed.
   654  	if _, err = closeCh.Receive(ctx); err != nil {
   655  		t.Fatalf("timed out waiting for the first child balancer to be closed: %v", err)
   656  	}
   657  	// Verify the second child balancer received it's first config update.
   658  	if _, err = ccsCh.Receive(ctx); err != nil {
   659  		t.Fatalf("timed out waiting for UpdateClientConnState on the second child balancer: %v", err)
   660  	}
   661  	// Closing the Outlier Detection Balancer should close the newly created
   662  	// child.
   663  	od.Close()
   664  	if _, err = closeCh.Receive(ctx); err != nil {
   665  		t.Fatalf("timed out waiting for the second child balancer to be closed: %v", err)
   666  	}
   667  }
   668  
   669  // TestUpdateAddresses tests the functionality of UpdateAddresses and any
   670  // changes in the addresses/plurality of those addresses for a SubConn. The
   671  // Balancer is set up with two upstreams, with one of the upstreams being
   672  // ejected. Initially, there is one SubConn for each address. The following
   673  // scenarios are tested, in a step by step fashion:
   674  // 1. The SubConn not currently ejected switches addresses to the address that
   675  // is ejected. This should cause the SubConn to get ejected.
   676  // 2. Update this same SubConn to multiple addresses. This should cause the
   677  // SubConn to get unejected, as it is no longer being tracked by Outlier
   678  // Detection at that point.
   679  // 3. Update this same SubConn to different addresses, still multiple. This
   680  // should be a noop, as the SubConn is still no longer being tracked by Outlier
   681  // Detection.
   682  // 4. Update this same SubConn to the a single address which is ejected. This
   683  // should cause the SubConn to be ejected.
   684  func (s) TestUpdateAddresses(t *testing.T) {
   685  	scsCh := testutils.NewChannel()
   686  	var scw1, scw2 balancer.SubConn
   687  	var err error
   688  	stub.Register(t.Name(), stub.BalancerFuncs{
   689  		UpdateClientConnState: func(bd *stub.BalancerData, _ balancer.ClientConnState) error {
   690  			scw1, err = bd.ClientConn.NewSubConn([]resolver.Address{{Addr: "address1"}}, balancer.NewSubConnOptions{
   691  				StateListener: func(state balancer.SubConnState) { scsCh.Send(subConnWithState{sc: scw1, state: state}) },
   692  			})
   693  			if err != nil {
   694  				t.Errorf("error in od.NewSubConn call: %v", err)
   695  			}
   696  			scw2, err = bd.ClientConn.NewSubConn([]resolver.Address{{Addr: "address2"}}, balancer.NewSubConnOptions{
   697  				StateListener: func(state balancer.SubConnState) { scsCh.Send(subConnWithState{sc: scw2, state: state}) },
   698  			})
   699  			if err != nil {
   700  				t.Errorf("error in od.NewSubConn call: %v", err)
   701  			}
   702  			bd.ClientConn.UpdateState(balancer.State{
   703  				ConnectivityState: connectivity.Ready,
   704  				Picker: &rrPicker{
   705  					scs: []balancer.SubConn{scw1, scw2},
   706  				},
   707  			})
   708  			return nil
   709  		},
   710  	})
   711  
   712  	od, tcc, cleanup := setup(t)
   713  	defer cleanup()
   714  
   715  	od.UpdateClientConnState(balancer.ClientConnState{
   716  		ResolverState: resolver.State{
   717  			Addresses: []resolver.Address{
   718  				{Addr: "address1"},
   719  				{Addr: "address2"},
   720  			},
   721  		},
   722  		BalancerConfig: &LBConfig{
   723  			Interval:           iserviceconfig.Duration(10 * time.Second),
   724  			BaseEjectionTime:   iserviceconfig.Duration(30 * time.Second),
   725  			MaxEjectionTime:    iserviceconfig.Duration(300 * time.Second),
   726  			MaxEjectionPercent: 10,
   727  			FailurePercentageEjection: &FailurePercentageEjection{
   728  				Threshold:             50,
   729  				EnforcementPercentage: 100,
   730  				MinimumHosts:          2,
   731  				RequestVolume:         3,
   732  			},
   733  			ChildPolicy: &iserviceconfig.BalancerConfig{
   734  				Name:   t.Name(),
   735  				Config: emptyChildConfig{},
   736  			},
   737  		},
   738  	})
   739  
   740  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   741  	defer cancel()
   742  
   743  	// Setup the system to where one address is ejected and one address
   744  	// isn't.
   745  	select {
   746  	case <-ctx.Done():
   747  		t.Fatal("timeout while waiting for a UpdateState call on the ClientConn")
   748  	case picker := <-tcc.NewPickerCh:
   749  		pi, err := picker.Pick(balancer.PickInfo{})
   750  		if err != nil {
   751  			t.Fatalf("picker.Pick failed with error: %v", err)
   752  		}
   753  		// Simulate 5 successful RPC calls on the first SubConn (the first call
   754  		// to picker.Pick).
   755  		for c := 0; c < 5; c++ {
   756  			pi.Done(balancer.DoneInfo{})
   757  		}
   758  		pi, err = picker.Pick(balancer.PickInfo{})
   759  		if err != nil {
   760  			t.Fatalf("picker.Pick failed with error: %v", err)
   761  		}
   762  		// Simulate 5 failed RPC calls on the second SubConn (the second call to
   763  		// picker.Pick). Thus, when the interval timer algorithm is run, the
   764  		// second SubConn's address should be ejected, which will allow us to
   765  		// further test UpdateAddresses() logic.
   766  		for c := 0; c < 5; c++ {
   767  			pi.Done(balancer.DoneInfo{Err: errors.New("some error")})
   768  		}
   769  		od.intervalTimerAlgorithm()
   770  		// verify StateListener() got called with TRANSIENT_FAILURE for child
   771  		// with address that was ejected.
   772  		gotSCWS, err := scsCh.Receive(ctx)
   773  		if err != nil {
   774  			t.Fatalf("Error waiting for Sub Conn update: %v", err)
   775  		}
   776  		if err = scwsEqual(gotSCWS.(subConnWithState), subConnWithState{
   777  			sc:    scw2,
   778  			state: balancer.SubConnState{ConnectivityState: connectivity.TransientFailure},
   779  		}); err != nil {
   780  			t.Fatalf("Error in Sub Conn update: %v", err)
   781  		}
   782  	}
   783  
   784  	// Update scw1 to another address that is currently ejected. This should
   785  	// cause scw1 to get ejected.
   786  	od.UpdateAddresses(scw1, []resolver.Address{{Addr: "address2"}})
   787  
   788  	// Verify that update addresses gets forwarded to ClientConn.
   789  	select {
   790  	case <-ctx.Done():
   791  		t.Fatal("timeout while waiting for a UpdateState call on the ClientConn")
   792  	case <-tcc.UpdateAddressesAddrsCh:
   793  	}
   794  	// Verify scw1 got ejected (StateListener called with TRANSIENT_FAILURE).
   795  	gotSCWS, err := scsCh.Receive(ctx)
   796  	if err != nil {
   797  		t.Fatalf("Error waiting for Sub Conn update: %v", err)
   798  	}
   799  	if err = scwsEqual(gotSCWS.(subConnWithState), subConnWithState{
   800  		sc:    scw1,
   801  		state: balancer.SubConnState{ConnectivityState: connectivity.TransientFailure},
   802  	}); err != nil {
   803  		t.Fatalf("Error in Sub Conn update: %v", err)
   804  	}
   805  
   806  	// Update scw1 to multiple addresses. This should cause scw1 to get
   807  	// unejected, as is it no longer being tracked for Outlier Detection.
   808  	od.UpdateAddresses(scw1, []resolver.Address{
   809  		{Addr: "address1"},
   810  		{Addr: "address2"},
   811  	})
   812  	// Verify scw1 got unejected (StateListener called with recent state).
   813  	gotSCWS, err = scsCh.Receive(ctx)
   814  	if err != nil {
   815  		t.Fatalf("Error waiting for Sub Conn update: %v", err)
   816  	}
   817  	if err = scwsEqual(gotSCWS.(subConnWithState), subConnWithState{
   818  		sc:    scw1,
   819  		state: balancer.SubConnState{ConnectivityState: connectivity.Idle},
   820  	}); err != nil {
   821  		t.Fatalf("Error in Sub Conn update: %v", err)
   822  	}
   823  
   824  	// Update scw1 to a different multiple addresses list. A change of addresses
   825  	// in which the plurality goes from multiple to multiple should be a no-op,
   826  	// as the address continues to be ignored by outlier detection.
   827  	od.UpdateAddresses(scw1, []resolver.Address{
   828  		{Addr: "address2"},
   829  		{Addr: "address3"},
   830  	})
   831  	// Verify no downstream effects.
   832  	sCtx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
   833  	defer cancel()
   834  	if _, err := scsCh.Receive(sCtx); err == nil {
   835  		t.Fatalf("no SubConn update should have been sent (no SubConn got ejected/unejected)")
   836  	}
   837  
   838  	// Update scw1 back to a single address, which is ejected. This should cause
   839  	// the SubConn to be re-ejected.
   840  	od.UpdateAddresses(scw1, []resolver.Address{{Addr: "address2"}})
   841  	// Verify scw1 got ejected (StateListener called with TRANSIENT FAILURE).
   842  	gotSCWS, err = scsCh.Receive(ctx)
   843  	if err != nil {
   844  		t.Fatalf("Error waiting for Sub Conn update: %v", err)
   845  	}
   846  	if err = scwsEqual(gotSCWS.(subConnWithState), subConnWithState{
   847  		sc:    scw1,
   848  		state: balancer.SubConnState{ConnectivityState: connectivity.TransientFailure},
   849  	}); err != nil {
   850  		t.Fatalf("Error in Sub Conn update: %v", err)
   851  	}
   852  }
   853  
   854  func scwsEqual(gotSCWS subConnWithState, wantSCWS subConnWithState) error {
   855  	if gotSCWS.sc != wantSCWS.sc || !cmp.Equal(gotSCWS.state, wantSCWS.state, cmp.AllowUnexported(subConnWrapper{}, addressInfo{}), cmpopts.IgnoreFields(subConnWrapper{}, "scUpdateCh")) {
   856  		return fmt.Errorf("received SubConnState: %+v, want %+v", gotSCWS, wantSCWS)
   857  	}
   858  	return nil
   859  }
   860  
   861  type rrPicker struct {
   862  	scs  []balancer.SubConn
   863  	next int
   864  }
   865  
   866  func (rrp *rrPicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
   867  	sc := rrp.scs[rrp.next]
   868  	rrp.next = (rrp.next + 1) % len(rrp.scs)
   869  	return balancer.PickResult{SubConn: sc}, nil
   870  }
   871  
   872  // TestDurationOfInterval tests the configured interval timer.
   873  // The following scenarios are tested:
   874  // 1. The Outlier Detection Balancer receives it's first config. The balancer
   875  // should configure the timer with whatever is directly specified on the config.
   876  // 2. The Outlier Detection Balancer receives a subsequent config. The balancer
   877  // should configure with whatever interval is configured minus the difference
   878  // between the current time and the previous start timestamp.
   879  // 3. The Outlier Detection Balancer receives a no-op configuration. The
   880  // balancer should not configure a timer at all.
   881  func (s) TestDurationOfInterval(t *testing.T) {
   882  	stub.Register(t.Name(), stub.BalancerFuncs{})
   883  
   884  	od, _, cleanup := setup(t)
   885  	defer func(af func(d time.Duration, f func()) *time.Timer) {
   886  		cleanup()
   887  		afterFunc = af
   888  	}(afterFunc)
   889  
   890  	durationChan := testutils.NewChannel()
   891  	afterFunc = func(dur time.Duration, _ func()) *time.Timer {
   892  		durationChan.Send(dur)
   893  		return time.NewTimer(math.MaxInt64)
   894  	}
   895  
   896  	od.UpdateClientConnState(balancer.ClientConnState{
   897  		BalancerConfig: &LBConfig{
   898  			Interval: iserviceconfig.Duration(8 * time.Second),
   899  			SuccessRateEjection: &SuccessRateEjection{
   900  				StdevFactor:           1900,
   901  				EnforcementPercentage: 100,
   902  				MinimumHosts:          5,
   903  				RequestVolume:         100,
   904  			},
   905  			ChildPolicy: &iserviceconfig.BalancerConfig{
   906  				Name:   t.Name(),
   907  				Config: emptyChildConfig{},
   908  			},
   909  		},
   910  	})
   911  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   912  	defer cancel()
   913  	d, err := durationChan.Receive(ctx)
   914  	if err != nil {
   915  		t.Fatalf("Error receiving duration from afterFunc() call: %v", err)
   916  	}
   917  	dur := d.(time.Duration)
   918  	// The configured duration should be 8 seconds - what the balancer was
   919  	// configured with.
   920  	if dur != 8*time.Second {
   921  		t.Fatalf("configured duration should have been 8 seconds to start timer")
   922  	}
   923  
   924  	// Override time.Now to time.Now() + 5 seconds. This will represent 5
   925  	// seconds already passing for the next check in UpdateClientConnState.
   926  	defer func(n func() time.Time) {
   927  		now = n
   928  	}(now)
   929  	now = func() time.Time {
   930  		return time.Now().Add(time.Second * 5)
   931  	}
   932  
   933  	// UpdateClientConnState with an interval of 9 seconds. Due to 5 seconds
   934  	// already passing (from overridden time.Now function), this should start an
   935  	// interval timer of ~4 seconds.
   936  	od.UpdateClientConnState(balancer.ClientConnState{
   937  		BalancerConfig: &LBConfig{
   938  			Interval: iserviceconfig.Duration(9 * time.Second),
   939  			SuccessRateEjection: &SuccessRateEjection{
   940  				StdevFactor:           1900,
   941  				EnforcementPercentage: 100,
   942  				MinimumHosts:          5,
   943  				RequestVolume:         100,
   944  			},
   945  			ChildPolicy: &iserviceconfig.BalancerConfig{
   946  				Name:   t.Name(),
   947  				Config: emptyChildConfig{},
   948  			},
   949  		},
   950  	})
   951  
   952  	d, err = durationChan.Receive(ctx)
   953  	if err != nil {
   954  		t.Fatalf("Error receiving duration from afterFunc() call: %v", err)
   955  	}
   956  	dur = d.(time.Duration)
   957  	if dur.Seconds() < 3.5 || 4.5 < dur.Seconds() {
   958  		t.Fatalf("configured duration should have been around 4 seconds to start timer")
   959  	}
   960  
   961  	// UpdateClientConnState with a no-op config. This shouldn't configure the
   962  	// interval timer at all due to it being a no-op.
   963  	od.UpdateClientConnState(balancer.ClientConnState{
   964  		BalancerConfig: &LBConfig{
   965  			Interval: iserviceconfig.Duration(10 * time.Second),
   966  			ChildPolicy: &iserviceconfig.BalancerConfig{
   967  				Name:   t.Name(),
   968  				Config: emptyChildConfig{},
   969  			},
   970  		},
   971  	})
   972  
   973  	// No timer should have been started.
   974  	sCtx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
   975  	defer cancel()
   976  	if _, err = durationChan.Receive(sCtx); err == nil {
   977  		t.Fatal("No timer should have started.")
   978  	}
   979  }
   980  
   981  // TestEjectUnejectSuccessRate tests the functionality of the interval timer
   982  // algorithm when configured with SuccessRateEjection. The Outlier Detection
   983  // Balancer will be set up with 3 SubConns, each with a different address.
   984  // It tests the following scenarios, in a step by step fashion:
   985  // 1. The three addresses each have 5 successes. The interval timer algorithm should
   986  // not eject any of the addresses.
   987  // 2. Two of the addresses have 5 successes, the third has five failures. The
   988  // interval timer algorithm should eject the third address with five failures.
   989  // 3. The interval timer algorithm is run at a later time past max ejection
   990  // time. The interval timer algorithm should uneject the third address.
   991  func (s) TestEjectUnejectSuccessRate(t *testing.T) {
   992  	scsCh := testutils.NewChannel()
   993  	var scw1, scw2, scw3 balancer.SubConn
   994  	var err error
   995  	stub.Register(t.Name(), stub.BalancerFuncs{
   996  		UpdateClientConnState: func(bd *stub.BalancerData, _ balancer.ClientConnState) error {
   997  			scw1, err = bd.ClientConn.NewSubConn([]resolver.Address{{Addr: "address1"}}, balancer.NewSubConnOptions{
   998  				StateListener: func(state balancer.SubConnState) { scsCh.Send(subConnWithState{sc: scw1, state: state}) },
   999  			})
  1000  			if err != nil {
  1001  				t.Errorf("error in od.NewSubConn call: %v", err)
  1002  			}
  1003  			scw2, err = bd.ClientConn.NewSubConn([]resolver.Address{{Addr: "address2"}}, balancer.NewSubConnOptions{
  1004  				StateListener: func(state balancer.SubConnState) { scsCh.Send(subConnWithState{sc: scw2, state: state}) },
  1005  			})
  1006  			if err != nil {
  1007  				t.Errorf("error in od.NewSubConn call: %v", err)
  1008  			}
  1009  			scw3, err = bd.ClientConn.NewSubConn([]resolver.Address{{Addr: "address3"}}, balancer.NewSubConnOptions{
  1010  				StateListener: func(state balancer.SubConnState) { scsCh.Send(subConnWithState{sc: scw3, state: state}) },
  1011  			})
  1012  			if err != nil {
  1013  				t.Errorf("error in od.NewSubConn call: %v", err)
  1014  			}
  1015  			bd.ClientConn.UpdateState(balancer.State{
  1016  				ConnectivityState: connectivity.Ready,
  1017  				Picker: &rrPicker{
  1018  					scs: []balancer.SubConn{scw1, scw2, scw3},
  1019  				},
  1020  			})
  1021  			return nil
  1022  		},
  1023  	})
  1024  
  1025  	od, tcc, cleanup := setup(t)
  1026  	defer func() {
  1027  		cleanup()
  1028  	}()
  1029  
  1030  	od.UpdateClientConnState(balancer.ClientConnState{
  1031  		ResolverState: resolver.State{
  1032  			Addresses: []resolver.Address{
  1033  				{Addr: "address1"},
  1034  				{Addr: "address2"},
  1035  				{Addr: "address3"},
  1036  			},
  1037  		},
  1038  		BalancerConfig: &LBConfig{
  1039  			Interval:           math.MaxInt64, // so the interval will never run unless called manually in test.
  1040  			BaseEjectionTime:   iserviceconfig.Duration(30 * time.Second),
  1041  			MaxEjectionTime:    iserviceconfig.Duration(300 * time.Second),
  1042  			MaxEjectionPercent: 10,
  1043  			FailurePercentageEjection: &FailurePercentageEjection{
  1044  				Threshold:             50,
  1045  				EnforcementPercentage: 100,
  1046  				MinimumHosts:          3,
  1047  				RequestVolume:         3,
  1048  			},
  1049  			ChildPolicy: &iserviceconfig.BalancerConfig{
  1050  				Name:   t.Name(),
  1051  				Config: emptyChildConfig{},
  1052  			},
  1053  		},
  1054  	})
  1055  
  1056  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  1057  	defer cancel()
  1058  
  1059  	select {
  1060  	case <-ctx.Done():
  1061  		t.Fatalf("timeout while waiting for a UpdateState call on the ClientConn")
  1062  	case picker := <-tcc.NewPickerCh:
  1063  		// Set each of the three upstream addresses to have five successes each.
  1064  		// This should cause none of the addresses to be ejected as none of them
  1065  		// are outliers according to the success rate algorithm.
  1066  		for i := 0; i < 3; i++ {
  1067  			pi, err := picker.Pick(balancer.PickInfo{})
  1068  			if err != nil {
  1069  				t.Fatalf("picker.Pick failed with error: %v", err)
  1070  			}
  1071  			for c := 0; c < 5; c++ {
  1072  				pi.Done(balancer.DoneInfo{})
  1073  			}
  1074  		}
  1075  
  1076  		od.intervalTimerAlgorithm()
  1077  
  1078  		// verify no StateListener() call on the child, as no addresses got
  1079  		// ejected (ejected address will cause an StateListener call).
  1080  		sCtx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
  1081  		defer cancel()
  1082  		if _, err := scsCh.Receive(sCtx); err == nil {
  1083  			t.Fatalf("no SubConn update should have been sent (no SubConn got ejected)")
  1084  		}
  1085  
  1086  		// Since no addresses are ejected, a SubConn update should forward down
  1087  		// to the child.
  1088  		od.updateSubConnState(scw1.(*subConnWrapper).SubConn, balancer.SubConnState{
  1089  			ConnectivityState: connectivity.Connecting,
  1090  		})
  1091  
  1092  		gotSCWS, err := scsCh.Receive(ctx)
  1093  		if err != nil {
  1094  			t.Fatalf("Error waiting for Sub Conn update: %v", err)
  1095  		}
  1096  		if err = scwsEqual(gotSCWS.(subConnWithState), subConnWithState{
  1097  			sc:    scw1,
  1098  			state: balancer.SubConnState{ConnectivityState: connectivity.Connecting},
  1099  		}); err != nil {
  1100  			t.Fatalf("Error in Sub Conn update: %v", err)
  1101  		}
  1102  
  1103  		// Set two of the upstream addresses to have five successes each, and
  1104  		// one of the upstream addresses to have five failures. This should
  1105  		// cause the address which has five failures to be ejected according to
  1106  		// the SuccessRateAlgorithm.
  1107  		for i := 0; i < 2; i++ {
  1108  			pi, err := picker.Pick(balancer.PickInfo{})
  1109  			if err != nil {
  1110  				t.Fatalf("picker.Pick failed with error: %v", err)
  1111  			}
  1112  			for c := 0; c < 5; c++ {
  1113  				pi.Done(balancer.DoneInfo{})
  1114  			}
  1115  		}
  1116  		pi, err := picker.Pick(balancer.PickInfo{})
  1117  		if err != nil {
  1118  			t.Fatalf("picker.Pick failed with error: %v", err)
  1119  		}
  1120  		for c := 0; c < 5; c++ {
  1121  			pi.Done(balancer.DoneInfo{Err: errors.New("some error")})
  1122  		}
  1123  
  1124  		// should eject address that always errored.
  1125  		od.intervalTimerAlgorithm()
  1126  		// Due to the address being ejected, the SubConn with that address
  1127  		// should be ejected, meaning a TRANSIENT_FAILURE connectivity state
  1128  		// gets reported to the child.
  1129  		gotSCWS, err = scsCh.Receive(ctx)
  1130  		if err != nil {
  1131  			t.Fatalf("Error waiting for Sub Conn update: %v", err)
  1132  		}
  1133  		if err = scwsEqual(gotSCWS.(subConnWithState), subConnWithState{
  1134  			sc:    scw3,
  1135  			state: balancer.SubConnState{ConnectivityState: connectivity.TransientFailure},
  1136  		}); err != nil {
  1137  			t.Fatalf("Error in Sub Conn update: %v", err)
  1138  		}
  1139  		// Only one address should be ejected.
  1140  		sCtx, cancel = context.WithTimeout(context.Background(), defaultTestShortTimeout)
  1141  		defer cancel()
  1142  		if _, err := scsCh.Receive(sCtx); err == nil {
  1143  			t.Fatalf("Only one SubConn update should have been sent (only one SubConn got ejected)")
  1144  		}
  1145  
  1146  		// Now that an address is ejected, SubConn updates for SubConns using
  1147  		// that address should not be forwarded downward. These SubConn updates
  1148  		// will be cached to update the child sometime in the future when the
  1149  		// address gets unejected.
  1150  		od.updateSubConnState(pi.SubConn, balancer.SubConnState{
  1151  			ConnectivityState: connectivity.Connecting,
  1152  		})
  1153  		sCtx, cancel = context.WithTimeout(context.Background(), defaultTestShortTimeout)
  1154  		defer cancel()
  1155  		if _, err := scsCh.Receive(sCtx); err == nil {
  1156  			t.Fatalf("SubConn update should not have been forwarded (the SubConn is ejected)")
  1157  		}
  1158  
  1159  		// Override now to cause the interval timer algorithm to always uneject
  1160  		// the ejected address. This will always uneject the ejected address
  1161  		// because this time is set way past the max ejection time set in the
  1162  		// configuration, which will make the next interval timer algorithm run
  1163  		// uneject any ejected addresses.
  1164  		defer func(n func() time.Time) {
  1165  			now = n
  1166  		}(now)
  1167  		now = func() time.Time {
  1168  			return time.Now().Add(time.Second * 1000)
  1169  		}
  1170  		od.intervalTimerAlgorithm()
  1171  
  1172  		// unejected SubConn should report latest persisted state - which is
  1173  		// connecting from earlier.
  1174  		gotSCWS, err = scsCh.Receive(ctx)
  1175  		if err != nil {
  1176  			t.Fatalf("Error waiting for Sub Conn update: %v", err)
  1177  		}
  1178  		if err = scwsEqual(gotSCWS.(subConnWithState), subConnWithState{
  1179  			sc:    scw3,
  1180  			state: balancer.SubConnState{ConnectivityState: connectivity.Connecting},
  1181  		}); err != nil {
  1182  			t.Fatalf("Error in Sub Conn update: %v", err)
  1183  		}
  1184  	}
  1185  }
  1186  
  1187  // TestEjectFailureRate tests the functionality of the interval timer algorithm
  1188  // when configured with FailurePercentageEjection, and also the functionality of
  1189  // noop configuration. The Outlier Detection Balancer will be set up with 3
  1190  // SubConns, each with a different address. It tests the following scenarios, in
  1191  // a step by step fashion:
  1192  // 1. The three addresses each have 5 successes. The interval timer algorithm
  1193  // should not eject any of the addresses.
  1194  // 2. Two of the addresses have 5 successes, the third has five failures. The
  1195  // interval timer algorithm should eject the third address with five failures.
  1196  // 3. The Outlier Detection Balancer receives a subsequent noop config update.
  1197  // The balancer should uneject all ejected addresses.
  1198  func (s) TestEjectFailureRate(t *testing.T) {
  1199  	scsCh := testutils.NewChannel()
  1200  	var scw1, scw2, scw3 balancer.SubConn
  1201  	var err error
  1202  	stub.Register(t.Name(), stub.BalancerFuncs{
  1203  		UpdateClientConnState: func(bd *stub.BalancerData, _ balancer.ClientConnState) error {
  1204  			if scw1 != nil { // UpdateClientConnState was already called, no need to recreate SubConns.
  1205  				return nil
  1206  			}
  1207  			scw1, err = bd.ClientConn.NewSubConn([]resolver.Address{{Addr: "address1"}}, balancer.NewSubConnOptions{
  1208  				StateListener: func(state balancer.SubConnState) { scsCh.Send(subConnWithState{sc: scw1, state: state}) },
  1209  			})
  1210  			if err != nil {
  1211  				t.Errorf("error in od.NewSubConn call: %v", err)
  1212  			}
  1213  			scw2, err = bd.ClientConn.NewSubConn([]resolver.Address{{Addr: "address2"}}, balancer.NewSubConnOptions{
  1214  				StateListener: func(state balancer.SubConnState) { scsCh.Send(subConnWithState{sc: scw2, state: state}) },
  1215  			})
  1216  			if err != nil {
  1217  				t.Errorf("error in od.NewSubConn call: %v", err)
  1218  			}
  1219  			scw3, err = bd.ClientConn.NewSubConn([]resolver.Address{{Addr: "address3"}}, balancer.NewSubConnOptions{
  1220  				StateListener: func(state balancer.SubConnState) { scsCh.Send(subConnWithState{sc: scw3, state: state}) },
  1221  			})
  1222  			if err != nil {
  1223  				t.Errorf("error in od.NewSubConn call: %v", err)
  1224  			}
  1225  			return nil
  1226  		},
  1227  	})
  1228  
  1229  	od, tcc, cleanup := setup(t)
  1230  	defer func() {
  1231  		cleanup()
  1232  	}()
  1233  
  1234  	od.UpdateClientConnState(balancer.ClientConnState{
  1235  		ResolverState: resolver.State{
  1236  			Addresses: []resolver.Address{
  1237  				{Addr: "address1"},
  1238  				{Addr: "address2"},
  1239  				{Addr: "address3"},
  1240  			},
  1241  		},
  1242  		BalancerConfig: &LBConfig{
  1243  			Interval:           math.MaxInt64, // so the interval will never run unless called manually in test.
  1244  			BaseEjectionTime:   iserviceconfig.Duration(30 * time.Second),
  1245  			MaxEjectionTime:    iserviceconfig.Duration(300 * time.Second),
  1246  			MaxEjectionPercent: 10,
  1247  			SuccessRateEjection: &SuccessRateEjection{
  1248  				StdevFactor:           500,
  1249  				EnforcementPercentage: 100,
  1250  				MinimumHosts:          3,
  1251  				RequestVolume:         3,
  1252  			},
  1253  			ChildPolicy: &iserviceconfig.BalancerConfig{
  1254  				Name:   t.Name(),
  1255  				Config: emptyChildConfig{},
  1256  			},
  1257  		},
  1258  	})
  1259  
  1260  	od.UpdateState(balancer.State{
  1261  		ConnectivityState: connectivity.Ready,
  1262  		Picker: &rrPicker{
  1263  			scs: []balancer.SubConn{scw1, scw2, scw3},
  1264  		},
  1265  	})
  1266  
  1267  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  1268  	defer cancel()
  1269  
  1270  	select {
  1271  	case <-ctx.Done():
  1272  		t.Fatalf("timeout while waiting for a UpdateState call on the ClientConn")
  1273  	case picker := <-tcc.NewPickerCh:
  1274  		// Set each upstream address to have five successes each. This should
  1275  		// cause none of the addresses to be ejected as none of them are below
  1276  		// the failure percentage threshold.
  1277  		for i := 0; i < 3; i++ {
  1278  			pi, err := picker.Pick(balancer.PickInfo{})
  1279  			if err != nil {
  1280  				t.Fatalf("picker.Pick failed with error: %v", err)
  1281  			}
  1282  			for c := 0; c < 5; c++ {
  1283  				pi.Done(balancer.DoneInfo{})
  1284  			}
  1285  		}
  1286  
  1287  		od.intervalTimerAlgorithm()
  1288  		sCtx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
  1289  		defer cancel()
  1290  		if _, err := scsCh.Receive(sCtx); err == nil {
  1291  			t.Fatalf("no SubConn update should have been sent (no SubConn got ejected)")
  1292  		}
  1293  
  1294  		// Set two upstream addresses to have five successes each, and one
  1295  		// upstream address to have five failures. This should cause the address
  1296  		// with five failures to be ejected according to the Failure Percentage
  1297  		// Algorithm.
  1298  		for i := 0; i < 2; i++ {
  1299  			pi, err := picker.Pick(balancer.PickInfo{})
  1300  			if err != nil {
  1301  				t.Fatalf("picker.Pick failed with error: %v", err)
  1302  			}
  1303  			for c := 0; c < 5; c++ {
  1304  				pi.Done(balancer.DoneInfo{})
  1305  			}
  1306  		}
  1307  		pi, err := picker.Pick(balancer.PickInfo{})
  1308  		if err != nil {
  1309  			t.Fatalf("picker.Pick failed with error: %v", err)
  1310  		}
  1311  		for c := 0; c < 5; c++ {
  1312  			pi.Done(balancer.DoneInfo{Err: errors.New("some error")})
  1313  		}
  1314  
  1315  		// should eject address that always errored.
  1316  		od.intervalTimerAlgorithm()
  1317  
  1318  		// verify StateListener() got called with TRANSIENT_FAILURE for child
  1319  		// in address that was ejected.
  1320  		gotSCWS, err := scsCh.Receive(ctx)
  1321  		if err != nil {
  1322  			t.Fatalf("Error waiting for Sub Conn update: %v", err)
  1323  		}
  1324  		if err = scwsEqual(gotSCWS.(subConnWithState), subConnWithState{
  1325  			sc:    scw3,
  1326  			state: balancer.SubConnState{ConnectivityState: connectivity.TransientFailure},
  1327  		}); err != nil {
  1328  			t.Fatalf("Error in Sub Conn update: %v", err)
  1329  		}
  1330  
  1331  		// verify only one address got ejected.
  1332  		sCtx, cancel = context.WithTimeout(context.Background(), defaultTestShortTimeout)
  1333  		defer cancel()
  1334  		if _, err := scsCh.Receive(sCtx); err == nil {
  1335  			t.Fatalf("Only one SubConn update should have been sent (only one SubConn got ejected)")
  1336  		}
  1337  
  1338  		// upon the Outlier Detection balancer being reconfigured with a noop
  1339  		// configuration, every ejected SubConn should be unejected.
  1340  		od.UpdateClientConnState(balancer.ClientConnState{
  1341  			ResolverState: resolver.State{
  1342  				Addresses: []resolver.Address{
  1343  					{Addr: "address1"},
  1344  					{Addr: "address2"},
  1345  					{Addr: "address3"},
  1346  				},
  1347  			},
  1348  			BalancerConfig: &LBConfig{
  1349  				Interval:           math.MaxInt64,
  1350  				BaseEjectionTime:   iserviceconfig.Duration(30 * time.Second),
  1351  				MaxEjectionTime:    iserviceconfig.Duration(300 * time.Second),
  1352  				MaxEjectionPercent: 10,
  1353  				ChildPolicy: &iserviceconfig.BalancerConfig{
  1354  					Name:   t.Name(),
  1355  					Config: emptyChildConfig{},
  1356  				},
  1357  			},
  1358  		})
  1359  		gotSCWS, err = scsCh.Receive(ctx)
  1360  		if err != nil {
  1361  			t.Fatalf("Error waiting for Sub Conn update: %v", err)
  1362  		}
  1363  		if err = scwsEqual(gotSCWS.(subConnWithState), subConnWithState{
  1364  			sc:    scw3,
  1365  			state: balancer.SubConnState{ConnectivityState: connectivity.Idle},
  1366  		}); err != nil {
  1367  			t.Fatalf("Error in Sub Conn update: %v", err)
  1368  		}
  1369  	}
  1370  }
  1371  
  1372  // TestConcurrentOperations calls different operations on the balancer in
  1373  // separate goroutines to test for any race conditions and deadlocks. It also
  1374  // uses a child balancer which verifies that no operations on the child get
  1375  // called after the child balancer is closed.
  1376  func (s) TestConcurrentOperations(t *testing.T) {
  1377  	closed := grpcsync.NewEvent()
  1378  	stub.Register(t.Name(), stub.BalancerFuncs{
  1379  		UpdateClientConnState: func(*stub.BalancerData, balancer.ClientConnState) error {
  1380  			if closed.HasFired() {
  1381  				t.Error("UpdateClientConnState was called after Close(), which breaks the balancer API")
  1382  			}
  1383  			return nil
  1384  		},
  1385  		ResolverError: func(*stub.BalancerData, error) {
  1386  			if closed.HasFired() {
  1387  				t.Error("ResolverError was called after Close(), which breaks the balancer API")
  1388  			}
  1389  		},
  1390  		Close: func(*stub.BalancerData) {
  1391  			closed.Fire()
  1392  		},
  1393  		ExitIdle: func(*stub.BalancerData) {
  1394  			if closed.HasFired() {
  1395  				t.Error("ExitIdle was called after Close(), which breaks the balancer API")
  1396  			}
  1397  		},
  1398  	})
  1399  
  1400  	od, tcc, cleanup := setup(t)
  1401  	defer func() {
  1402  		cleanup()
  1403  	}()
  1404  
  1405  	od.UpdateClientConnState(balancer.ClientConnState{
  1406  		ResolverState: resolver.State{
  1407  			Addresses: []resolver.Address{
  1408  				{Addr: "address1"},
  1409  				{Addr: "address2"},
  1410  				{Addr: "address3"},
  1411  			},
  1412  		},
  1413  		BalancerConfig: &LBConfig{
  1414  			Interval:           math.MaxInt64, // so the interval will never run unless called manually in test.
  1415  			BaseEjectionTime:   iserviceconfig.Duration(30 * time.Second),
  1416  			MaxEjectionTime:    iserviceconfig.Duration(300 * time.Second),
  1417  			MaxEjectionPercent: 10,
  1418  			SuccessRateEjection: &SuccessRateEjection{ // Have both Success Rate and Failure Percentage to step through all the interval timer code
  1419  				StdevFactor:           500,
  1420  				EnforcementPercentage: 100,
  1421  				MinimumHosts:          3,
  1422  				RequestVolume:         3,
  1423  			},
  1424  			FailurePercentageEjection: &FailurePercentageEjection{
  1425  				Threshold:             50,
  1426  				EnforcementPercentage: 100,
  1427  				MinimumHosts:          3,
  1428  				RequestVolume:         3,
  1429  			},
  1430  			ChildPolicy: &iserviceconfig.BalancerConfig{
  1431  				Name:   t.Name(),
  1432  				Config: emptyChildConfig{},
  1433  			},
  1434  		},
  1435  	})
  1436  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  1437  	defer cancel()
  1438  
  1439  	scw1, err := od.NewSubConn([]resolver.Address{{Addr: "address1"}}, balancer.NewSubConnOptions{})
  1440  	if err != nil {
  1441  		t.Fatalf("error in od.NewSubConn call: %v", err)
  1442  	}
  1443  	if err != nil {
  1444  		t.Fatalf("error in od.NewSubConn call: %v", err)
  1445  	}
  1446  
  1447  	scw2, err := od.NewSubConn([]resolver.Address{{Addr: "address2"}}, balancer.NewSubConnOptions{})
  1448  	if err != nil {
  1449  		t.Fatalf("error in od.NewSubConn call: %v", err)
  1450  	}
  1451  
  1452  	scw3, err := od.NewSubConn([]resolver.Address{{Addr: "address3"}}, balancer.NewSubConnOptions{})
  1453  	if err != nil {
  1454  		t.Fatalf("error in od.NewSubConn call: %v", err)
  1455  	}
  1456  
  1457  	od.UpdateState(balancer.State{
  1458  		ConnectivityState: connectivity.Ready,
  1459  		Picker: &rrPicker{
  1460  			scs: []balancer.SubConn{scw2, scw3},
  1461  		},
  1462  	})
  1463  
  1464  	var picker balancer.Picker
  1465  	select {
  1466  	case <-ctx.Done():
  1467  		t.Fatalf("timeout while waiting for a UpdateState call on the ClientConn")
  1468  	case picker = <-tcc.NewPickerCh:
  1469  	}
  1470  
  1471  	finished := make(chan struct{})
  1472  	var wg sync.WaitGroup
  1473  	wg.Add(1)
  1474  	go func() {
  1475  		defer wg.Done()
  1476  		for {
  1477  			select {
  1478  			case <-finished:
  1479  				return
  1480  			default:
  1481  			}
  1482  			pi, err := picker.Pick(balancer.PickInfo{})
  1483  			if err != nil {
  1484  				continue
  1485  			}
  1486  			pi.Done(balancer.DoneInfo{})
  1487  			pi.Done(balancer.DoneInfo{Err: errors.New("some error")})
  1488  			time.Sleep(1 * time.Nanosecond)
  1489  		}
  1490  	}()
  1491  
  1492  	wg.Add(1)
  1493  	go func() {
  1494  		defer wg.Done()
  1495  		for {
  1496  			select {
  1497  			case <-finished:
  1498  				return
  1499  			default:
  1500  			}
  1501  			od.intervalTimerAlgorithm()
  1502  		}
  1503  	}()
  1504  
  1505  	// call Outlier Detection's balancer.ClientConn operations asynchronously.
  1506  	// balancer.ClientConn operations have no guarantee from the API to be
  1507  	// called synchronously.
  1508  	wg.Add(1)
  1509  	go func() {
  1510  		defer wg.Done()
  1511  		for {
  1512  			select {
  1513  			case <-finished:
  1514  				return
  1515  			default:
  1516  			}
  1517  			od.UpdateState(balancer.State{
  1518  				ConnectivityState: connectivity.Ready,
  1519  				Picker: &rrPicker{
  1520  					scs: []balancer.SubConn{scw2, scw3},
  1521  				},
  1522  			})
  1523  			time.Sleep(1 * time.Nanosecond)
  1524  		}
  1525  	}()
  1526  
  1527  	wg.Add(1)
  1528  	go func() {
  1529  		defer wg.Done()
  1530  		od.NewSubConn([]resolver.Address{{Addr: "address4"}}, balancer.NewSubConnOptions{})
  1531  	}()
  1532  
  1533  	wg.Add(1)
  1534  	go func() {
  1535  		defer wg.Done()
  1536  		scw1.Shutdown()
  1537  	}()
  1538  
  1539  	wg.Add(1)
  1540  	go func() {
  1541  		defer wg.Done()
  1542  		od.UpdateAddresses(scw2, []resolver.Address{{Addr: "address3"}})
  1543  	}()
  1544  
  1545  	// Call balancer.Balancers synchronously in this goroutine, upholding the
  1546  	// balancer.Balancer API guarantee of synchronous calls.
  1547  	od.UpdateClientConnState(balancer.ClientConnState{ // This will delete addresses and flip to no op
  1548  		ResolverState: resolver.State{
  1549  			Addresses: []resolver.Address{{Addr: "address1"}},
  1550  		},
  1551  		BalancerConfig: &LBConfig{
  1552  			Interval: math.MaxInt64,
  1553  			ChildPolicy: &iserviceconfig.BalancerConfig{
  1554  				Name:   t.Name(),
  1555  				Config: emptyChildConfig{},
  1556  			},
  1557  		},
  1558  	})
  1559  
  1560  	// Call balancer.Balancers synchronously in this goroutine, upholding the
  1561  	// balancer.Balancer API guarantee.
  1562  	od.updateSubConnState(scw1.(*subConnWrapper).SubConn, balancer.SubConnState{
  1563  		ConnectivityState: connectivity.Connecting,
  1564  	})
  1565  	od.ResolverError(errors.New("some error"))
  1566  	od.ExitIdle()
  1567  	od.Close()
  1568  	close(finished)
  1569  	wg.Wait()
  1570  }
  1571  

View as plain text