...

Source file src/google.golang.org/grpc/balancer/weightedroundrobin/balancer_test.go

Documentation: google.golang.org/grpc/balancer/weightedroundrobin

     1  /*
     2   *
     3   * Copyright 2023 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  package weightedroundrobin_test
    20  
    21  import (
    22  	"context"
    23  	"encoding/json"
    24  	"fmt"
    25  	"sync"
    26  	"sync/atomic"
    27  	"testing"
    28  	"time"
    29  
    30  	"google.golang.org/grpc"
    31  	"google.golang.org/grpc/internal"
    32  	"google.golang.org/grpc/internal/grpctest"
    33  	"google.golang.org/grpc/internal/stubserver"
    34  	"google.golang.org/grpc/internal/testutils/roundrobin"
    35  	"google.golang.org/grpc/orca"
    36  	"google.golang.org/grpc/peer"
    37  	"google.golang.org/grpc/resolver"
    38  
    39  	wrr "google.golang.org/grpc/balancer/weightedroundrobin"
    40  	iwrr "google.golang.org/grpc/balancer/weightedroundrobin/internal"
    41  
    42  	testgrpc "google.golang.org/grpc/interop/grpc_testing"
    43  	testpb "google.golang.org/grpc/interop/grpc_testing"
    44  )
    45  
    46  type s struct {
    47  	grpctest.Tester
    48  }
    49  
    50  func Test(t *testing.T) {
    51  	grpctest.RunSubTests(t, s{})
    52  }
    53  
    54  const defaultTestTimeout = 10 * time.Second
    55  const weightUpdatePeriod = 50 * time.Millisecond
    56  const weightExpirationPeriod = time.Minute
    57  const oobReportingInterval = 10 * time.Millisecond
    58  
    59  func init() {
    60  	iwrr.AllowAnyWeightUpdatePeriod = true
    61  }
    62  
    63  func boolp(b bool) *bool          { return &b }
    64  func float64p(f float64) *float64 { return &f }
    65  func stringp(s string) *string    { return &s }
    66  
    67  var (
    68  	perCallConfig = iwrr.LBConfig{
    69  		EnableOOBLoadReport:     boolp(false),
    70  		OOBReportingPeriod:      stringp("0.005s"),
    71  		BlackoutPeriod:          stringp("0s"),
    72  		WeightExpirationPeriod:  stringp("60s"),
    73  		WeightUpdatePeriod:      stringp(".050s"),
    74  		ErrorUtilizationPenalty: float64p(0),
    75  	}
    76  	oobConfig = iwrr.LBConfig{
    77  		EnableOOBLoadReport:     boolp(true),
    78  		OOBReportingPeriod:      stringp("0.005s"),
    79  		BlackoutPeriod:          stringp("0s"),
    80  		WeightExpirationPeriod:  stringp("60s"),
    81  		WeightUpdatePeriod:      stringp(".050s"),
    82  		ErrorUtilizationPenalty: float64p(0),
    83  	}
    84  )
    85  
    86  type testServer struct {
    87  	*stubserver.StubServer
    88  
    89  	oobMetrics  orca.ServerMetricsRecorder // Attached to the OOB stream.
    90  	callMetrics orca.CallMetricsRecorder   // Attached to per-call metrics.
    91  }
    92  
    93  type reportType int
    94  
    95  const (
    96  	reportNone reportType = iota
    97  	reportOOB
    98  	reportCall
    99  	reportBoth
   100  )
   101  
   102  func startServer(t *testing.T, r reportType) *testServer {
   103  	t.Helper()
   104  
   105  	smr := orca.NewServerMetricsRecorder()
   106  	cmr := orca.NewServerMetricsRecorder().(orca.CallMetricsRecorder)
   107  
   108  	ss := &stubserver.StubServer{
   109  		EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
   110  			if r := orca.CallMetricsRecorderFromContext(ctx); r != nil {
   111  				// Copy metrics from what the test set in cmr into r.
   112  				sm := cmr.(orca.ServerMetricsProvider).ServerMetrics()
   113  				r.SetApplicationUtilization(sm.AppUtilization)
   114  				r.SetQPS(sm.QPS)
   115  				r.SetEPS(sm.EPS)
   116  			}
   117  			return &testpb.Empty{}, nil
   118  		},
   119  	}
   120  
   121  	var sopts []grpc.ServerOption
   122  	if r == reportCall || r == reportBoth {
   123  		sopts = append(sopts, orca.CallMetricsServerOption(nil))
   124  	}
   125  
   126  	if r == reportOOB || r == reportBoth {
   127  		oso := orca.ServiceOptions{
   128  			ServerMetricsProvider: smr,
   129  			MinReportingInterval:  10 * time.Millisecond,
   130  		}
   131  		internal.ORCAAllowAnyMinReportingInterval.(func(so *orca.ServiceOptions))(&oso)
   132  		sopts = append(sopts, stubserver.RegisterServiceServerOption(func(s *grpc.Server) {
   133  			if err := orca.Register(s, oso); err != nil {
   134  				t.Fatalf("Failed to register orca service: %v", err)
   135  			}
   136  		}))
   137  	}
   138  
   139  	if err := ss.StartServer(sopts...); err != nil {
   140  		t.Fatalf("Error starting server: %v", err)
   141  	}
   142  	t.Cleanup(ss.Stop)
   143  
   144  	return &testServer{
   145  		StubServer:  ss,
   146  		oobMetrics:  smr,
   147  		callMetrics: cmr,
   148  	}
   149  }
   150  
   151  func svcConfig(t *testing.T, wrrCfg iwrr.LBConfig) string {
   152  	t.Helper()
   153  	m, err := json.Marshal(wrrCfg)
   154  	if err != nil {
   155  		t.Fatalf("Error marshaling JSON %v: %v", wrrCfg, err)
   156  	}
   157  	sc := fmt.Sprintf(`{"loadBalancingConfig": [ {%q:%v} ] }`, wrr.Name, string(m))
   158  	t.Logf("Marshaled service config: %v", sc)
   159  	return sc
   160  }
   161  
   162  // Tests basic functionality with one address.  With only one address, load
   163  // reporting doesn't affect routing at all.
   164  func (s) TestBalancer_OneAddress(t *testing.T) {
   165  	testCases := []struct {
   166  		rt  reportType
   167  		cfg iwrr.LBConfig
   168  	}{
   169  		{rt: reportNone, cfg: perCallConfig},
   170  		{rt: reportCall, cfg: perCallConfig},
   171  		{rt: reportOOB, cfg: oobConfig},
   172  	}
   173  
   174  	for _, tc := range testCases {
   175  		t.Run(fmt.Sprintf("reportType:%v", tc.rt), func(t *testing.T) {
   176  			ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   177  			defer cancel()
   178  
   179  			srv := startServer(t, tc.rt)
   180  
   181  			sc := svcConfig(t, tc.cfg)
   182  			if err := srv.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil {
   183  				t.Fatalf("Error starting client: %v", err)
   184  			}
   185  
   186  			// Perform many RPCs to ensure the LB policy works with 1 address.
   187  			for i := 0; i < 100; i++ {
   188  				srv.callMetrics.SetQPS(float64(i))
   189  				srv.oobMetrics.SetQPS(float64(i))
   190  				if _, err := srv.Client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
   191  					t.Fatalf("Error from EmptyCall: %v", err)
   192  				}
   193  				time.Sleep(time.Millisecond) // Delay; test will run 100ms and should perform ~10 weight updates
   194  			}
   195  		})
   196  	}
   197  }
   198  
   199  // Tests two addresses with ORCA reporting disabled (should fall back to pure
   200  // RR).
   201  func (s) TestBalancer_TwoAddresses_ReportingDisabled(t *testing.T) {
   202  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   203  	defer cancel()
   204  
   205  	srv1 := startServer(t, reportNone)
   206  	srv2 := startServer(t, reportNone)
   207  
   208  	sc := svcConfig(t, perCallConfig)
   209  	if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil {
   210  		t.Fatalf("Error starting client: %v", err)
   211  	}
   212  	addrs := []resolver.Address{{Addr: srv1.Address}, {Addr: srv2.Address}}
   213  	srv1.R.UpdateState(resolver.State{Addresses: addrs})
   214  
   215  	// Perform many RPCs to ensure the LB policy works with 2 addresses.
   216  	for i := 0; i < 20; i++ {
   217  		roundrobin.CheckRoundRobinRPCs(ctx, srv1.Client, addrs)
   218  	}
   219  }
   220  
   221  // Tests two addresses with per-call ORCA reporting enabled.  Checks the
   222  // backends are called in the appropriate ratios.
   223  func (s) TestBalancer_TwoAddresses_ReportingEnabledPerCall(t *testing.T) {
   224  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   225  	defer cancel()
   226  
   227  	srv1 := startServer(t, reportCall)
   228  	srv2 := startServer(t, reportCall)
   229  
   230  	// srv1 starts loaded and srv2 starts without load; ensure RPCs are routed
   231  	// disproportionately to srv2 (10:1).
   232  	srv1.callMetrics.SetQPS(10.0)
   233  	srv1.callMetrics.SetApplicationUtilization(1.0)
   234  
   235  	srv2.callMetrics.SetQPS(10.0)
   236  	srv2.callMetrics.SetApplicationUtilization(.1)
   237  
   238  	sc := svcConfig(t, perCallConfig)
   239  	if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil {
   240  		t.Fatalf("Error starting client: %v", err)
   241  	}
   242  	addrs := []resolver.Address{{Addr: srv1.Address}, {Addr: srv2.Address}}
   243  	srv1.R.UpdateState(resolver.State{Addresses: addrs})
   244  
   245  	// Call each backend once to ensure the weights have been received.
   246  	ensureReached(ctx, t, srv1.Client, 2)
   247  
   248  	// Wait for the weight update period to allow the new weights to be processed.
   249  	time.Sleep(weightUpdatePeriod)
   250  	checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 10})
   251  }
   252  
   253  // Tests two addresses with OOB ORCA reporting enabled.  Checks the backends
   254  // are called in the appropriate ratios.
   255  func (s) TestBalancer_TwoAddresses_ReportingEnabledOOB(t *testing.T) {
   256  	testCases := []struct {
   257  		name       string
   258  		utilSetter func(orca.ServerMetricsRecorder, float64)
   259  	}{{
   260  		name: "application_utilization",
   261  		utilSetter: func(smr orca.ServerMetricsRecorder, val float64) {
   262  			smr.SetApplicationUtilization(val)
   263  		},
   264  	}, {
   265  		name: "cpu_utilization",
   266  		utilSetter: func(smr orca.ServerMetricsRecorder, val float64) {
   267  			smr.SetCPUUtilization(val)
   268  		},
   269  	}, {
   270  		name: "application over cpu",
   271  		utilSetter: func(smr orca.ServerMetricsRecorder, val float64) {
   272  			smr.SetApplicationUtilization(val)
   273  			smr.SetCPUUtilization(2.0) // ignored because ApplicationUtilization is set
   274  		},
   275  	}}
   276  
   277  	for _, tc := range testCases {
   278  		t.Run(tc.name, func(t *testing.T) {
   279  			ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   280  			defer cancel()
   281  
   282  			srv1 := startServer(t, reportOOB)
   283  			srv2 := startServer(t, reportOOB)
   284  
   285  			// srv1 starts loaded and srv2 starts without load; ensure RPCs are routed
   286  			// disproportionately to srv2 (10:1).
   287  			srv1.oobMetrics.SetQPS(10.0)
   288  			tc.utilSetter(srv1.oobMetrics, 1.0)
   289  
   290  			srv2.oobMetrics.SetQPS(10.0)
   291  			tc.utilSetter(srv2.oobMetrics, 0.1)
   292  
   293  			sc := svcConfig(t, oobConfig)
   294  			if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil {
   295  				t.Fatalf("Error starting client: %v", err)
   296  			}
   297  			addrs := []resolver.Address{{Addr: srv1.Address}, {Addr: srv2.Address}}
   298  			srv1.R.UpdateState(resolver.State{Addresses: addrs})
   299  
   300  			// Call each backend once to ensure the weights have been received.
   301  			ensureReached(ctx, t, srv1.Client, 2)
   302  
   303  			// Wait for the weight update period to allow the new weights to be processed.
   304  			time.Sleep(weightUpdatePeriod)
   305  			checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 10})
   306  		})
   307  	}
   308  }
   309  
   310  // Tests two addresses with OOB ORCA reporting enabled, where the reports
   311  // change over time.  Checks the backends are called in the appropriate ratios
   312  // before and after modifying the reports.
   313  func (s) TestBalancer_TwoAddresses_UpdateLoads(t *testing.T) {
   314  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   315  	defer cancel()
   316  
   317  	srv1 := startServer(t, reportOOB)
   318  	srv2 := startServer(t, reportOOB)
   319  
   320  	// srv1 starts loaded and srv2 starts without load; ensure RPCs are routed
   321  	// disproportionately to srv2 (10:1).
   322  	srv1.oobMetrics.SetQPS(10.0)
   323  	srv1.oobMetrics.SetApplicationUtilization(1.0)
   324  
   325  	srv2.oobMetrics.SetQPS(10.0)
   326  	srv2.oobMetrics.SetApplicationUtilization(.1)
   327  
   328  	sc := svcConfig(t, oobConfig)
   329  	if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil {
   330  		t.Fatalf("Error starting client: %v", err)
   331  	}
   332  	addrs := []resolver.Address{{Addr: srv1.Address}, {Addr: srv2.Address}}
   333  	srv1.R.UpdateState(resolver.State{Addresses: addrs})
   334  
   335  	// Call each backend once to ensure the weights have been received.
   336  	ensureReached(ctx, t, srv1.Client, 2)
   337  
   338  	// Wait for the weight update period to allow the new weights to be processed.
   339  	time.Sleep(weightUpdatePeriod)
   340  	checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 10})
   341  
   342  	// Update the loads so srv2 is loaded and srv1 is not; ensure RPCs are
   343  	// routed disproportionately to srv1.
   344  	srv1.oobMetrics.SetQPS(10.0)
   345  	srv1.oobMetrics.SetApplicationUtilization(.1)
   346  
   347  	srv2.oobMetrics.SetQPS(10.0)
   348  	srv2.oobMetrics.SetApplicationUtilization(1.0)
   349  
   350  	// Wait for the weight update period to allow the new weights to be processed.
   351  	time.Sleep(weightUpdatePeriod + oobReportingInterval)
   352  	checkWeights(ctx, t, srvWeight{srv1, 10}, srvWeight{srv2, 1})
   353  }
   354  
   355  // Tests two addresses with OOB ORCA reporting enabled, then with switching to
   356  // per-call reporting.  Checks the backends are called in the appropriate
   357  // ratios before and after the change.
   358  func (s) TestBalancer_TwoAddresses_OOBThenPerCall(t *testing.T) {
   359  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   360  	defer cancel()
   361  
   362  	srv1 := startServer(t, reportBoth)
   363  	srv2 := startServer(t, reportBoth)
   364  
   365  	// srv1 starts loaded and srv2 starts without load; ensure RPCs are routed
   366  	// disproportionately to srv2 (10:1).
   367  	srv1.oobMetrics.SetQPS(10.0)
   368  	srv1.oobMetrics.SetApplicationUtilization(1.0)
   369  
   370  	srv2.oobMetrics.SetQPS(10.0)
   371  	srv2.oobMetrics.SetApplicationUtilization(.1)
   372  
   373  	// For per-call metrics (not used initially), srv2 reports that it is
   374  	// loaded and srv1 reports low load.  After confirming OOB works, switch to
   375  	// per-call and confirm the new routing weights are applied.
   376  	srv1.callMetrics.SetQPS(10.0)
   377  	srv1.callMetrics.SetApplicationUtilization(.1)
   378  
   379  	srv2.callMetrics.SetQPS(10.0)
   380  	srv2.callMetrics.SetApplicationUtilization(1.0)
   381  
   382  	sc := svcConfig(t, oobConfig)
   383  	if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil {
   384  		t.Fatalf("Error starting client: %v", err)
   385  	}
   386  	addrs := []resolver.Address{{Addr: srv1.Address}, {Addr: srv2.Address}}
   387  	srv1.R.UpdateState(resolver.State{Addresses: addrs})
   388  
   389  	// Call each backend once to ensure the weights have been received.
   390  	ensureReached(ctx, t, srv1.Client, 2)
   391  
   392  	// Wait for the weight update period to allow the new weights to be processed.
   393  	time.Sleep(weightUpdatePeriod)
   394  	checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 10})
   395  
   396  	// Update to per-call weights.
   397  	c := svcConfig(t, perCallConfig)
   398  	parsedCfg := srv1.R.CC.ParseServiceConfig(c)
   399  	if parsedCfg.Err != nil {
   400  		panic(fmt.Sprintf("Error parsing config %q: %v", c, parsedCfg.Err))
   401  	}
   402  	srv1.R.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: parsedCfg})
   403  
   404  	// Wait for the weight update period to allow the new weights to be processed.
   405  	time.Sleep(weightUpdatePeriod)
   406  	checkWeights(ctx, t, srvWeight{srv1, 10}, srvWeight{srv2, 1})
   407  }
   408  
   409  // Tests two addresses with OOB ORCA reporting enabled and a non-zero error
   410  // penalty applied.
   411  func (s) TestBalancer_TwoAddresses_ErrorPenalty(t *testing.T) {
   412  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   413  	defer cancel()
   414  
   415  	srv1 := startServer(t, reportOOB)
   416  	srv2 := startServer(t, reportOOB)
   417  
   418  	// srv1 starts loaded and srv2 starts without load; ensure RPCs are routed
   419  	// disproportionately to srv2 (10:1).  EPS values are set (but ignored
   420  	// initially due to ErrorUtilizationPenalty=0).  Later EUP will be updated
   421  	// to 0.9 which will cause the weights to be equal and RPCs to be routed
   422  	// 50/50.
   423  	srv1.oobMetrics.SetQPS(10.0)
   424  	srv1.oobMetrics.SetApplicationUtilization(1.0)
   425  	srv1.oobMetrics.SetEPS(0)
   426  	// srv1 weight before: 10.0 / 1.0 = 10.0
   427  	// srv1 weight after:  10.0 / 1.0 = 10.0
   428  
   429  	srv2.oobMetrics.SetQPS(10.0)
   430  	srv2.oobMetrics.SetApplicationUtilization(.1)
   431  	srv2.oobMetrics.SetEPS(10.0)
   432  	// srv2 weight before: 10.0 / 0.1 = 100.0
   433  	// srv2 weight after:  10.0 / 1.0 = 10.0
   434  
   435  	sc := svcConfig(t, oobConfig)
   436  	if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil {
   437  		t.Fatalf("Error starting client: %v", err)
   438  	}
   439  	addrs := []resolver.Address{{Addr: srv1.Address}, {Addr: srv2.Address}}
   440  	srv1.R.UpdateState(resolver.State{Addresses: addrs})
   441  
   442  	// Call each backend once to ensure the weights have been received.
   443  	ensureReached(ctx, t, srv1.Client, 2)
   444  
   445  	// Wait for the weight update period to allow the new weights to be processed.
   446  	time.Sleep(weightUpdatePeriod)
   447  	checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 10})
   448  
   449  	// Update to include an error penalty in the weights.
   450  	newCfg := oobConfig
   451  	newCfg.ErrorUtilizationPenalty = float64p(0.9)
   452  	c := svcConfig(t, newCfg)
   453  	parsedCfg := srv1.R.CC.ParseServiceConfig(c)
   454  	if parsedCfg.Err != nil {
   455  		panic(fmt.Sprintf("Error parsing config %q: %v", c, parsedCfg.Err))
   456  	}
   457  	srv1.R.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: parsedCfg})
   458  
   459  	// Wait for the weight update period to allow the new weights to be processed.
   460  	time.Sleep(weightUpdatePeriod + oobReportingInterval)
   461  	checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 1})
   462  }
   463  
   464  // Tests that the blackout period causes backends to use 0 as their weight
   465  // (meaning to use the average weight) until the blackout period elapses.
   466  func (s) TestBalancer_TwoAddresses_BlackoutPeriod(t *testing.T) {
   467  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   468  	defer cancel()
   469  
   470  	var mu sync.Mutex
   471  	start := time.Now()
   472  	now := start
   473  	setNow := func(t time.Time) {
   474  		mu.Lock()
   475  		defer mu.Unlock()
   476  		now = t
   477  	}
   478  
   479  	setTimeNow(func() time.Time {
   480  		mu.Lock()
   481  		defer mu.Unlock()
   482  		return now
   483  	})
   484  	t.Cleanup(func() { setTimeNow(time.Now) })
   485  
   486  	testCases := []struct {
   487  		blackoutPeriodCfg *string
   488  		blackoutPeriod    time.Duration
   489  	}{{
   490  		blackoutPeriodCfg: stringp("1s"),
   491  		blackoutPeriod:    time.Second,
   492  	}, {
   493  		blackoutPeriodCfg: nil,
   494  		blackoutPeriod:    10 * time.Second, // the default
   495  	}}
   496  	for _, tc := range testCases {
   497  		setNow(start)
   498  		srv1 := startServer(t, reportOOB)
   499  		srv2 := startServer(t, reportOOB)
   500  
   501  		// srv1 starts loaded and srv2 starts without load; ensure RPCs are routed
   502  		// disproportionately to srv2 (10:1).
   503  		srv1.oobMetrics.SetQPS(10.0)
   504  		srv1.oobMetrics.SetApplicationUtilization(1.0)
   505  
   506  		srv2.oobMetrics.SetQPS(10.0)
   507  		srv2.oobMetrics.SetApplicationUtilization(.1)
   508  
   509  		cfg := oobConfig
   510  		cfg.BlackoutPeriod = tc.blackoutPeriodCfg
   511  		sc := svcConfig(t, cfg)
   512  		if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil {
   513  			t.Fatalf("Error starting client: %v", err)
   514  		}
   515  		addrs := []resolver.Address{{Addr: srv1.Address}, {Addr: srv2.Address}}
   516  		srv1.R.UpdateState(resolver.State{Addresses: addrs})
   517  
   518  		// Call each backend once to ensure the weights have been received.
   519  		ensureReached(ctx, t, srv1.Client, 2)
   520  
   521  		// Wait for the weight update period to allow the new weights to be processed.
   522  		time.Sleep(weightUpdatePeriod)
   523  		// During the blackout period (1s) we should route roughly 50/50.
   524  		checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 1})
   525  
   526  		// Advance time to right before the blackout period ends and the weights
   527  		// should still be zero.
   528  		setNow(start.Add(tc.blackoutPeriod - time.Nanosecond))
   529  		// Wait for the weight update period to allow the new weights to be processed.
   530  		time.Sleep(weightUpdatePeriod)
   531  		checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 1})
   532  
   533  		// Advance time to right after the blackout period ends and the weights
   534  		// should now activate.
   535  		setNow(start.Add(tc.blackoutPeriod))
   536  		// Wait for the weight update period to allow the new weights to be processed.
   537  		time.Sleep(weightUpdatePeriod)
   538  		checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 10})
   539  	}
   540  }
   541  
   542  // Tests that the weight expiration period causes backends to use 0 as their
   543  // weight (meaning to use the average weight) once the expiration period
   544  // elapses.
   545  func (s) TestBalancer_TwoAddresses_WeightExpiration(t *testing.T) {
   546  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   547  	defer cancel()
   548  
   549  	var mu sync.Mutex
   550  	start := time.Now()
   551  	now := start
   552  	setNow := func(t time.Time) {
   553  		mu.Lock()
   554  		defer mu.Unlock()
   555  		now = t
   556  	}
   557  	setTimeNow(func() time.Time {
   558  		mu.Lock()
   559  		defer mu.Unlock()
   560  		return now
   561  	})
   562  	t.Cleanup(func() { setTimeNow(time.Now) })
   563  
   564  	srv1 := startServer(t, reportBoth)
   565  	srv2 := startServer(t, reportBoth)
   566  
   567  	// srv1 starts loaded and srv2 starts without load; ensure RPCs are routed
   568  	// disproportionately to srv2 (10:1).  Because the OOB reporting interval
   569  	// is 1 minute but the weights expire in 1 second, routing will go to 50/50
   570  	// after the weights expire.
   571  	srv1.oobMetrics.SetQPS(10.0)
   572  	srv1.oobMetrics.SetApplicationUtilization(1.0)
   573  
   574  	srv2.oobMetrics.SetQPS(10.0)
   575  	srv2.oobMetrics.SetApplicationUtilization(.1)
   576  
   577  	cfg := oobConfig
   578  	cfg.OOBReportingPeriod = stringp("60s")
   579  	sc := svcConfig(t, cfg)
   580  	if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil {
   581  		t.Fatalf("Error starting client: %v", err)
   582  	}
   583  	addrs := []resolver.Address{{Addr: srv1.Address}, {Addr: srv2.Address}}
   584  	srv1.R.UpdateState(resolver.State{Addresses: addrs})
   585  
   586  	// Call each backend once to ensure the weights have been received.
   587  	ensureReached(ctx, t, srv1.Client, 2)
   588  
   589  	// Wait for the weight update period to allow the new weights to be processed.
   590  	time.Sleep(weightUpdatePeriod)
   591  	checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 10})
   592  
   593  	// Advance what time.Now returns to the weight expiration time minus 1s to
   594  	// ensure all weights are still honored.
   595  	setNow(start.Add(weightExpirationPeriod - time.Second))
   596  
   597  	// Wait for the weight update period to allow the new weights to be processed.
   598  	time.Sleep(weightUpdatePeriod)
   599  	checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 10})
   600  
   601  	// Advance what time.Now returns to the weight expiration time plus 1s to
   602  	// ensure all weights expired and addresses are routed evenly.
   603  	setNow(start.Add(weightExpirationPeriod + time.Second))
   604  
   605  	// Wait for the weight expiration period so the weights have expired.
   606  	time.Sleep(weightUpdatePeriod)
   607  	checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 1})
   608  }
   609  
   610  // Tests logic surrounding subchannel management.
   611  func (s) TestBalancer_AddressesChanging(t *testing.T) {
   612  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   613  	defer cancel()
   614  
   615  	srv1 := startServer(t, reportBoth)
   616  	srv2 := startServer(t, reportBoth)
   617  	srv3 := startServer(t, reportBoth)
   618  	srv4 := startServer(t, reportBoth)
   619  
   620  	// srv1: weight 10
   621  	srv1.oobMetrics.SetQPS(10.0)
   622  	srv1.oobMetrics.SetApplicationUtilization(1.0)
   623  	// srv2: weight 100
   624  	srv2.oobMetrics.SetQPS(10.0)
   625  	srv2.oobMetrics.SetApplicationUtilization(.1)
   626  	// srv3: weight 20
   627  	srv3.oobMetrics.SetQPS(20.0)
   628  	srv3.oobMetrics.SetApplicationUtilization(1.0)
   629  	// srv4: weight 200
   630  	srv4.oobMetrics.SetQPS(20.0)
   631  	srv4.oobMetrics.SetApplicationUtilization(.1)
   632  
   633  	sc := svcConfig(t, oobConfig)
   634  	if err := srv1.StartClient(grpc.WithDefaultServiceConfig(sc)); err != nil {
   635  		t.Fatalf("Error starting client: %v", err)
   636  	}
   637  	srv2.Client = srv1.Client
   638  	addrs := []resolver.Address{{Addr: srv1.Address}, {Addr: srv2.Address}, {Addr: srv3.Address}}
   639  	srv1.R.UpdateState(resolver.State{Addresses: addrs})
   640  
   641  	// Call each backend once to ensure the weights have been received.
   642  	ensureReached(ctx, t, srv1.Client, 3)
   643  	time.Sleep(weightUpdatePeriod)
   644  	checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 10}, srvWeight{srv3, 2})
   645  
   646  	// Add backend 4
   647  	addrs = append(addrs, resolver.Address{Addr: srv4.Address})
   648  	srv1.R.UpdateState(resolver.State{Addresses: addrs})
   649  	time.Sleep(weightUpdatePeriod)
   650  	checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 10}, srvWeight{srv3, 2}, srvWeight{srv4, 20})
   651  
   652  	// Shutdown backend 3.  RPCs will no longer be routed to it.
   653  	srv3.Stop()
   654  	time.Sleep(weightUpdatePeriod)
   655  	checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv2, 10}, srvWeight{srv4, 20})
   656  
   657  	// Remove addresses 2 and 3.  RPCs will no longer be routed to 2 either.
   658  	addrs = []resolver.Address{{Addr: srv1.Address}, {Addr: srv4.Address}}
   659  	srv1.R.UpdateState(resolver.State{Addresses: addrs})
   660  	time.Sleep(weightUpdatePeriod)
   661  	checkWeights(ctx, t, srvWeight{srv1, 1}, srvWeight{srv4, 20})
   662  
   663  	// Re-add 2 and remove the rest.
   664  	addrs = []resolver.Address{{Addr: srv2.Address}}
   665  	srv1.R.UpdateState(resolver.State{Addresses: addrs})
   666  	time.Sleep(weightUpdatePeriod)
   667  	checkWeights(ctx, t, srvWeight{srv2, 10})
   668  
   669  	// Re-add 4.
   670  	addrs = append(addrs, resolver.Address{Addr: srv4.Address})
   671  	srv1.R.UpdateState(resolver.State{Addresses: addrs})
   672  	time.Sleep(weightUpdatePeriod)
   673  	checkWeights(ctx, t, srvWeight{srv2, 10}, srvWeight{srv4, 20})
   674  }
   675  
   676  func ensureReached(ctx context.Context, t *testing.T, c testgrpc.TestServiceClient, n int) {
   677  	t.Helper()
   678  	reached := make(map[string]struct{})
   679  	for len(reached) != n {
   680  		var peer peer.Peer
   681  		if _, err := c.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&peer)); err != nil {
   682  			t.Fatalf("Error from EmptyCall: %v", err)
   683  		}
   684  		reached[peer.Addr.String()] = struct{}{}
   685  	}
   686  }
   687  
   688  type srvWeight struct {
   689  	srv *testServer
   690  	w   int
   691  }
   692  
   693  const rrIterations = 100
   694  
   695  // checkWeights does rrIterations RPCs and expects the different backends to be
   696  // routed in a ratio as determined by the srvWeights passed in.  Allows for
   697  // some variance (+/- 2 RPCs per backend).
   698  func checkWeights(ctx context.Context, t *testing.T, sws ...srvWeight) {
   699  	t.Helper()
   700  
   701  	c := sws[0].srv.Client
   702  
   703  	// Replace the weights with approximate counts of RPCs wanted given the
   704  	// iterations performed.
   705  	weightSum := 0
   706  	for _, sw := range sws {
   707  		weightSum += sw.w
   708  	}
   709  	for i := range sws {
   710  		sws[i].w = rrIterations * sws[i].w / weightSum
   711  	}
   712  
   713  	for attempts := 0; attempts < 10; attempts++ {
   714  		serverCounts := make(map[string]int)
   715  		for i := 0; i < rrIterations; i++ {
   716  			var peer peer.Peer
   717  			if _, err := c.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&peer)); err != nil {
   718  				t.Fatalf("Error from EmptyCall: %v; timed out waiting for weighted RR behavior?", err)
   719  			}
   720  			serverCounts[peer.Addr.String()]++
   721  		}
   722  		if len(serverCounts) != len(sws) {
   723  			continue
   724  		}
   725  		success := true
   726  		for _, sw := range sws {
   727  			c := serverCounts[sw.srv.Address]
   728  			if c < sw.w-2 || c > sw.w+2 {
   729  				success = false
   730  				break
   731  			}
   732  		}
   733  		if success {
   734  			t.Logf("Passed iteration %v; counts: %v", attempts, serverCounts)
   735  			return
   736  		}
   737  		t.Logf("Failed iteration %v; counts: %v; want %+v", attempts, serverCounts, sws)
   738  		time.Sleep(5 * time.Millisecond)
   739  	}
   740  	t.Fatalf("Failed to route RPCs with proper ratio")
   741  }
   742  
   743  func init() {
   744  	setTimeNow(time.Now)
   745  	iwrr.TimeNow = timeNow
   746  }
   747  
   748  var timeNowFunc atomic.Value // func() time.Time
   749  
   750  func timeNow() time.Time {
   751  	return timeNowFunc.Load().(func() time.Time)()
   752  }
   753  
   754  func setTimeNow(f func() time.Time) {
   755  	timeNowFunc.Store(f)
   756  }
   757  

View as plain text