...

Source file src/google.golang.org/grpc/xds/internal/balancer/clusterimpl/balancer_test.go

Documentation: google.golang.org/grpc/xds/internal/balancer/clusterimpl

     1  /*
     2   *
     3   * Copyright 2020 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  package clusterimpl
    20  
    21  import (
    22  	"context"
    23  	"errors"
    24  	"fmt"
    25  	"strings"
    26  	"testing"
    27  	"time"
    28  
    29  	"github.com/google/go-cmp/cmp"
    30  	"github.com/google/go-cmp/cmp/cmpopts"
    31  	"google.golang.org/grpc/balancer"
    32  	"google.golang.org/grpc/balancer/base"
    33  	"google.golang.org/grpc/balancer/roundrobin"
    34  	"google.golang.org/grpc/connectivity"
    35  	"google.golang.org/grpc/internal/balancer/stub"
    36  	"google.golang.org/grpc/internal/grpctest"
    37  	internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
    38  	"google.golang.org/grpc/internal/testutils"
    39  	"google.golang.org/grpc/internal/xds"
    40  	"google.golang.org/grpc/internal/xds/bootstrap"
    41  	"google.golang.org/grpc/resolver"
    42  	xdsinternal "google.golang.org/grpc/xds/internal"
    43  	"google.golang.org/grpc/xds/internal/testutils/fakeclient"
    44  	"google.golang.org/grpc/xds/internal/xdsclient"
    45  	"google.golang.org/grpc/xds/internal/xdsclient/load"
    46  
    47  	v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3"
    48  )
    49  
    50  const (
    51  	defaultTestTimeout      = 5 * time.Second
    52  	defaultShortTestTimeout = 100 * time.Microsecond
    53  
    54  	testClusterName = "test-cluster"
    55  	testServiceName = "test-eds-service"
    56  
    57  	testNamedMetricsKey1 = "test-named1"
    58  	testNamedMetricsKey2 = "test-named2"
    59  )
    60  
    61  var (
    62  	testBackendAddrs = []resolver.Address{
    63  		{Addr: "1.1.1.1:1"},
    64  	}
    65  	testLRSServerConfig = &bootstrap.ServerConfig{
    66  		ServerURI: "trafficdirector.googleapis.com:443",
    67  		Creds: bootstrap.ChannelCreds{
    68  			Type: "google_default",
    69  		},
    70  	}
    71  
    72  	cmpOpts = cmp.Options{
    73  		cmpopts.EquateEmpty(),
    74  		cmpopts.IgnoreFields(load.Data{}, "ReportInterval"),
    75  	}
    76  	toleranceCmpOpt = cmpopts.EquateApprox(0, 1e-5)
    77  )
    78  
    79  type s struct {
    80  	grpctest.Tester
    81  }
    82  
    83  func Test(t *testing.T) {
    84  	grpctest.RunSubTests(t, s{})
    85  }
    86  
    87  func init() {
    88  	NewRandomWRR = testutils.NewTestWRR
    89  }
    90  
    91  // TestDropByCategory verifies that the balancer correctly drops the picks, and
    92  // that the drops are reported.
    93  func (s) TestDropByCategory(t *testing.T) {
    94  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
    95  	defer cancel()
    96  
    97  	defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName)
    98  	xdsC := fakeclient.NewClient()
    99  
   100  	builder := balancer.Get(Name)
   101  	cc := testutils.NewBalancerClientConn(t)
   102  	b := builder.Build(cc, balancer.BuildOptions{})
   103  	defer b.Close()
   104  
   105  	const (
   106  		dropReason      = "test-dropping-category"
   107  		dropNumerator   = 1
   108  		dropDenominator = 2
   109  	)
   110  	if err := b.UpdateClientConnState(balancer.ClientConnState{
   111  		ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC),
   112  		BalancerConfig: &LBConfig{
   113  			Cluster:             testClusterName,
   114  			EDSServiceName:      testServiceName,
   115  			LoadReportingServer: testLRSServerConfig,
   116  			DropCategories: []DropConfig{{
   117  				Category:           dropReason,
   118  				RequestsPerMillion: million * dropNumerator / dropDenominator,
   119  			}},
   120  			ChildPolicy: &internalserviceconfig.BalancerConfig{
   121  				Name: roundrobin.Name,
   122  			},
   123  		},
   124  	}); err != nil {
   125  		t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
   126  	}
   127  
   128  	got, err := xdsC.WaitForReportLoad(ctx)
   129  	if err != nil {
   130  		t.Fatalf("xdsClient.ReportLoad failed with error: %v", err)
   131  	}
   132  	if got.Server != testLRSServerConfig {
   133  		t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, testLRSServerConfig)
   134  	}
   135  
   136  	sc1 := <-cc.NewSubConnCh
   137  	sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
   138  	// This should get the connecting picker.
   139  	if err := cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable); err != nil {
   140  		t.Fatal(err.Error())
   141  	}
   142  
   143  	sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
   144  	// Test pick with one backend.
   145  
   146  	const rpcCount = 20
   147  	if err := cc.WaitForPicker(ctx, func(p balancer.Picker) error {
   148  		for i := 0; i < rpcCount; i++ {
   149  			gotSCSt, err := p.Pick(balancer.PickInfo{})
   150  			// Even RPCs are dropped.
   151  			if i%2 == 0 {
   152  				if err == nil || !strings.Contains(err.Error(), "dropped") {
   153  					return fmt.Errorf("pick.Pick, got %v, %v, want error RPC dropped", gotSCSt, err)
   154  				}
   155  				continue
   156  			}
   157  			if err != nil || gotSCSt.SubConn != sc1 {
   158  				return fmt.Errorf("picker.Pick, got %v, %v, want SubConn=%v", gotSCSt, err, sc1)
   159  			}
   160  			if gotSCSt.Done != nil {
   161  				gotSCSt.Done(balancer.DoneInfo{})
   162  			}
   163  		}
   164  		return nil
   165  	}); err != nil {
   166  		t.Fatal(err.Error())
   167  	}
   168  
   169  	// Dump load data from the store and compare with expected counts.
   170  	loadStore := xdsC.LoadStore()
   171  	if loadStore == nil {
   172  		t.Fatal("loadStore is nil in xdsClient")
   173  	}
   174  	const dropCount = rpcCount * dropNumerator / dropDenominator
   175  	wantStatsData0 := []*load.Data{{
   176  		Cluster:    testClusterName,
   177  		Service:    testServiceName,
   178  		TotalDrops: dropCount,
   179  		Drops:      map[string]uint64{dropReason: dropCount},
   180  		LocalityStats: map[string]load.LocalityData{
   181  			assertString(xdsinternal.LocalityID{}.ToString): {RequestStats: load.RequestData{Succeeded: rpcCount - dropCount}},
   182  		},
   183  	}}
   184  
   185  	gotStatsData0 := loadStore.Stats([]string{testClusterName})
   186  	if diff := cmp.Diff(gotStatsData0, wantStatsData0, cmpOpts); diff != "" {
   187  		t.Fatalf("got unexpected reports, diff (-got, +want): %v", diff)
   188  	}
   189  
   190  	// Send an update with new drop configs.
   191  	const (
   192  		dropReason2      = "test-dropping-category-2"
   193  		dropNumerator2   = 1
   194  		dropDenominator2 = 4
   195  	)
   196  	if err := b.UpdateClientConnState(balancer.ClientConnState{
   197  		ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC),
   198  		BalancerConfig: &LBConfig{
   199  			Cluster:             testClusterName,
   200  			EDSServiceName:      testServiceName,
   201  			LoadReportingServer: testLRSServerConfig,
   202  			DropCategories: []DropConfig{{
   203  				Category:           dropReason2,
   204  				RequestsPerMillion: million * dropNumerator2 / dropDenominator2,
   205  			}},
   206  			ChildPolicy: &internalserviceconfig.BalancerConfig{
   207  				Name: roundrobin.Name,
   208  			},
   209  		},
   210  	}); err != nil {
   211  		t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
   212  	}
   213  
   214  	if err := cc.WaitForPicker(ctx, func(p balancer.Picker) error {
   215  		for i := 0; i < rpcCount; i++ {
   216  			gotSCSt, err := p.Pick(balancer.PickInfo{})
   217  			// Even RPCs are dropped.
   218  			if i%4 == 0 {
   219  				if err == nil || !strings.Contains(err.Error(), "dropped") {
   220  					return fmt.Errorf("pick.Pick, got %v, %v, want error RPC dropped", gotSCSt, err)
   221  				}
   222  				continue
   223  			}
   224  			if err != nil || gotSCSt.SubConn != sc1 {
   225  				return fmt.Errorf("picker.Pick, got %v, %v, want SubConn=%v", gotSCSt, err, sc1)
   226  			}
   227  			if gotSCSt.Done != nil {
   228  				gotSCSt.Done(balancer.DoneInfo{})
   229  			}
   230  		}
   231  		return nil
   232  	}); err != nil {
   233  		t.Fatal(err.Error())
   234  	}
   235  
   236  	const dropCount2 = rpcCount * dropNumerator2 / dropDenominator2
   237  	wantStatsData1 := []*load.Data{{
   238  		Cluster:    testClusterName,
   239  		Service:    testServiceName,
   240  		TotalDrops: dropCount2,
   241  		Drops:      map[string]uint64{dropReason2: dropCount2},
   242  		LocalityStats: map[string]load.LocalityData{
   243  			assertString(xdsinternal.LocalityID{}.ToString): {RequestStats: load.RequestData{Succeeded: rpcCount - dropCount2}},
   244  		},
   245  	}}
   246  
   247  	gotStatsData1 := loadStore.Stats([]string{testClusterName})
   248  	if diff := cmp.Diff(gotStatsData1, wantStatsData1, cmpOpts); diff != "" {
   249  		t.Fatalf("got unexpected reports, diff (-got, +want): %v", diff)
   250  	}
   251  }
   252  
   253  // TestDropCircuitBreaking verifies that the balancer correctly drops the picks
   254  // due to circuit breaking, and that the drops are reported.
   255  func (s) TestDropCircuitBreaking(t *testing.T) {
   256  	defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName)
   257  	xdsC := fakeclient.NewClient()
   258  
   259  	builder := balancer.Get(Name)
   260  	cc := testutils.NewBalancerClientConn(t)
   261  	b := builder.Build(cc, balancer.BuildOptions{})
   262  	defer b.Close()
   263  
   264  	var maxRequest uint32 = 50
   265  	if err := b.UpdateClientConnState(balancer.ClientConnState{
   266  		ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC),
   267  		BalancerConfig: &LBConfig{
   268  			Cluster:               testClusterName,
   269  			EDSServiceName:        testServiceName,
   270  			LoadReportingServer:   testLRSServerConfig,
   271  			MaxConcurrentRequests: &maxRequest,
   272  			ChildPolicy: &internalserviceconfig.BalancerConfig{
   273  				Name: roundrobin.Name,
   274  			},
   275  		},
   276  	}); err != nil {
   277  		t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
   278  	}
   279  
   280  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   281  	defer cancel()
   282  
   283  	got, err := xdsC.WaitForReportLoad(ctx)
   284  	if err != nil {
   285  		t.Fatalf("xdsClient.ReportLoad failed with error: %v", err)
   286  	}
   287  	if got.Server != testLRSServerConfig {
   288  		t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, testLRSServerConfig)
   289  	}
   290  
   291  	sc1 := <-cc.NewSubConnCh
   292  	sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
   293  	// This should get the connecting picker.
   294  	if err := cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable); err != nil {
   295  		t.Fatal(err.Error())
   296  	}
   297  
   298  	sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
   299  	// Test pick with one backend.
   300  	const rpcCount = 100
   301  	if err := cc.WaitForPicker(ctx, func(p balancer.Picker) error {
   302  		dones := []func(){}
   303  		for i := 0; i < rpcCount; i++ {
   304  			gotSCSt, err := p.Pick(balancer.PickInfo{})
   305  			if i < 50 && err != nil {
   306  				return fmt.Errorf("The first 50%% picks should be non-drops, got error %v", err)
   307  			} else if i > 50 && err == nil {
   308  				return fmt.Errorf("The second 50%% picks should be drops, got error <nil>")
   309  			}
   310  			dones = append(dones, func() {
   311  				if gotSCSt.Done != nil {
   312  					gotSCSt.Done(balancer.DoneInfo{})
   313  				}
   314  			})
   315  		}
   316  		for _, done := range dones {
   317  			done()
   318  		}
   319  
   320  		dones = []func(){}
   321  		// Pick without drops.
   322  		for i := 0; i < 50; i++ {
   323  			gotSCSt, err := p.Pick(balancer.PickInfo{})
   324  			if err != nil {
   325  				t.Errorf("The third 50%% picks should be non-drops, got error %v", err)
   326  			}
   327  			dones = append(dones, func() {
   328  				if gotSCSt.Done != nil {
   329  					gotSCSt.Done(balancer.DoneInfo{})
   330  				}
   331  			})
   332  		}
   333  		for _, done := range dones {
   334  			done()
   335  		}
   336  
   337  		return nil
   338  	}); err != nil {
   339  		t.Fatal(err.Error())
   340  	}
   341  
   342  	// Dump load data from the store and compare with expected counts.
   343  	loadStore := xdsC.LoadStore()
   344  	if loadStore == nil {
   345  		t.Fatal("loadStore is nil in xdsClient")
   346  	}
   347  
   348  	wantStatsData0 := []*load.Data{{
   349  		Cluster:    testClusterName,
   350  		Service:    testServiceName,
   351  		TotalDrops: uint64(maxRequest),
   352  		LocalityStats: map[string]load.LocalityData{
   353  			assertString(xdsinternal.LocalityID{}.ToString): {RequestStats: load.RequestData{Succeeded: uint64(rpcCount - maxRequest + 50)}},
   354  		},
   355  	}}
   356  
   357  	gotStatsData0 := loadStore.Stats([]string{testClusterName})
   358  	if diff := cmp.Diff(gotStatsData0, wantStatsData0, cmpOpts); diff != "" {
   359  		t.Fatalf("got unexpected drop reports, diff (-got, +want): %v", diff)
   360  	}
   361  }
   362  
   363  // TestPickerUpdateAfterClose covers the case where a child policy sends a
   364  // picker update after the cluster_impl policy is closed. Because picker updates
   365  // are handled in the run() goroutine, which exits before Close() returns, we
   366  // expect the above picker update to be dropped.
   367  func (s) TestPickerUpdateAfterClose(t *testing.T) {
   368  	defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName)
   369  	xdsC := fakeclient.NewClient()
   370  
   371  	builder := balancer.Get(Name)
   372  	cc := testutils.NewBalancerClientConn(t)
   373  	b := builder.Build(cc, balancer.BuildOptions{})
   374  
   375  	// Create a stub balancer which waits for the cluster_impl policy to be
   376  	// closed before sending a picker update (upon receipt of a subConn state
   377  	// change).
   378  	closeCh := make(chan struct{})
   379  	const childPolicyName = "stubBalancer-TestPickerUpdateAfterClose"
   380  	stub.Register(childPolicyName, stub.BalancerFuncs{
   381  		UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
   382  			// Create a subConn which will be used later on to test the race
   383  			// between StateListener() and Close().
   384  			sc, err := bd.ClientConn.NewSubConn(ccs.ResolverState.Addresses, balancer.NewSubConnOptions{
   385  				StateListener: func(balancer.SubConnState) {
   386  					go func() {
   387  						// Wait for Close() to be called on the parent policy before
   388  						// sending the picker update.
   389  						<-closeCh
   390  						bd.ClientConn.UpdateState(balancer.State{
   391  							Picker: base.NewErrPicker(errors.New("dummy error picker")),
   392  						})
   393  					}()
   394  				},
   395  			})
   396  			if err != nil {
   397  				return err
   398  			}
   399  			sc.Connect()
   400  			return nil
   401  		},
   402  	})
   403  
   404  	var maxRequest uint32 = 50
   405  	if err := b.UpdateClientConnState(balancer.ClientConnState{
   406  		ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC),
   407  		BalancerConfig: &LBConfig{
   408  			Cluster:               testClusterName,
   409  			EDSServiceName:        testServiceName,
   410  			MaxConcurrentRequests: &maxRequest,
   411  			ChildPolicy: &internalserviceconfig.BalancerConfig{
   412  				Name: childPolicyName,
   413  			},
   414  		},
   415  	}); err != nil {
   416  		b.Close()
   417  		t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
   418  	}
   419  
   420  	// Send a subConn state change to trigger a picker update. The stub balancer
   421  	// that we use as the child policy will not send a picker update until the
   422  	// parent policy is closed.
   423  	sc1 := <-cc.NewSubConnCh
   424  	sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
   425  	b.Close()
   426  	close(closeCh)
   427  
   428  	select {
   429  	case <-cc.NewPickerCh:
   430  		t.Fatalf("unexpected picker update after balancer is closed")
   431  	case <-time.After(defaultShortTestTimeout):
   432  	}
   433  }
   434  
   435  // TestClusterNameInAddressAttributes covers the case that cluster name is
   436  // attached to the subconn address attributes.
   437  func (s) TestClusterNameInAddressAttributes(t *testing.T) {
   438  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   439  	defer cancel()
   440  
   441  	defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName)
   442  	xdsC := fakeclient.NewClient()
   443  
   444  	builder := balancer.Get(Name)
   445  	cc := testutils.NewBalancerClientConn(t)
   446  	b := builder.Build(cc, balancer.BuildOptions{})
   447  	defer b.Close()
   448  
   449  	if err := b.UpdateClientConnState(balancer.ClientConnState{
   450  		ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC),
   451  		BalancerConfig: &LBConfig{
   452  			Cluster:        testClusterName,
   453  			EDSServiceName: testServiceName,
   454  			ChildPolicy: &internalserviceconfig.BalancerConfig{
   455  				Name: roundrobin.Name,
   456  			},
   457  		},
   458  	}); err != nil {
   459  		t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
   460  	}
   461  
   462  	sc1 := <-cc.NewSubConnCh
   463  	sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
   464  	// This should get the connecting picker.
   465  	if err := cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable); err != nil {
   466  		t.Fatal(err.Error())
   467  	}
   468  
   469  	addrs1 := <-cc.NewSubConnAddrsCh
   470  	if got, want := addrs1[0].Addr, testBackendAddrs[0].Addr; got != want {
   471  		t.Fatalf("sc is created with addr %v, want %v", got, want)
   472  	}
   473  	cn, ok := xds.GetXDSHandshakeClusterName(addrs1[0].Attributes)
   474  	if !ok || cn != testClusterName {
   475  		t.Fatalf("sc is created with addr with cluster name %v, %v, want cluster name %v", cn, ok, testClusterName)
   476  	}
   477  
   478  	sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
   479  	// Test pick with one backend.
   480  	if err := cc.WaitForRoundRobinPicker(ctx, sc1); err != nil {
   481  		t.Fatal(err.Error())
   482  	}
   483  
   484  	const testClusterName2 = "test-cluster-2"
   485  	var addr2 = resolver.Address{Addr: "2.2.2.2"}
   486  	if err := b.UpdateClientConnState(balancer.ClientConnState{
   487  		ResolverState: xdsclient.SetClient(resolver.State{Addresses: []resolver.Address{addr2}}, xdsC),
   488  		BalancerConfig: &LBConfig{
   489  			Cluster:        testClusterName2,
   490  			EDSServiceName: testServiceName,
   491  			ChildPolicy: &internalserviceconfig.BalancerConfig{
   492  				Name: roundrobin.Name,
   493  			},
   494  		},
   495  	}); err != nil {
   496  		t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
   497  	}
   498  
   499  	addrs2 := <-cc.NewSubConnAddrsCh
   500  	if got, want := addrs2[0].Addr, addr2.Addr; got != want {
   501  		t.Fatalf("sc is created with addr %v, want %v", got, want)
   502  	}
   503  	// New addresses should have the new cluster name.
   504  	cn2, ok := xds.GetXDSHandshakeClusterName(addrs2[0].Attributes)
   505  	if !ok || cn2 != testClusterName2 {
   506  		t.Fatalf("sc is created with addr with cluster name %v, %v, want cluster name %v", cn2, ok, testClusterName2)
   507  	}
   508  }
   509  
   510  // TestReResolution verifies that when a SubConn turns transient failure,
   511  // re-resolution is triggered.
   512  func (s) TestReResolution(t *testing.T) {
   513  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   514  	defer cancel()
   515  
   516  	defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName)
   517  	xdsC := fakeclient.NewClient()
   518  
   519  	builder := balancer.Get(Name)
   520  	cc := testutils.NewBalancerClientConn(t)
   521  	b := builder.Build(cc, balancer.BuildOptions{})
   522  	defer b.Close()
   523  
   524  	if err := b.UpdateClientConnState(balancer.ClientConnState{
   525  		ResolverState: xdsclient.SetClient(resolver.State{Addresses: testBackendAddrs}, xdsC),
   526  		BalancerConfig: &LBConfig{
   527  			Cluster:        testClusterName,
   528  			EDSServiceName: testServiceName,
   529  			ChildPolicy: &internalserviceconfig.BalancerConfig{
   530  				Name: roundrobin.Name,
   531  			},
   532  		},
   533  	}); err != nil {
   534  		t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
   535  	}
   536  
   537  	sc1 := <-cc.NewSubConnCh
   538  	sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
   539  	// This should get the connecting picker.
   540  	if err := cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable); err != nil {
   541  		t.Fatal(err.Error())
   542  	}
   543  
   544  	sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
   545  	// This should get the transient failure picker.
   546  	if err := cc.WaitForErrPicker(ctx); err != nil {
   547  		t.Fatal(err.Error())
   548  	}
   549  
   550  	// The transient failure should trigger a re-resolution.
   551  	select {
   552  	case <-cc.ResolveNowCh:
   553  	case <-time.After(defaultTestTimeout):
   554  		t.Fatalf("timeout waiting for ResolveNow()")
   555  	}
   556  
   557  	sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
   558  	// Test pick with one backend.
   559  	if err := cc.WaitForRoundRobinPicker(ctx, sc1); err != nil {
   560  		t.Fatal(err.Error())
   561  	}
   562  
   563  	sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
   564  	// This should get the transient failure picker.
   565  	if err := cc.WaitForErrPicker(ctx); err != nil {
   566  		t.Fatal(err.Error())
   567  	}
   568  
   569  	// The transient failure should trigger a re-resolution.
   570  	select {
   571  	case <-cc.ResolveNowCh:
   572  	case <-time.After(defaultTestTimeout):
   573  		t.Fatalf("timeout waiting for ResolveNow()")
   574  	}
   575  }
   576  
   577  func (s) TestLoadReporting(t *testing.T) {
   578  	var testLocality = xdsinternal.LocalityID{
   579  		Region:  "test-region",
   580  		Zone:    "test-zone",
   581  		SubZone: "test-sub-zone",
   582  	}
   583  
   584  	xdsC := fakeclient.NewClient()
   585  
   586  	builder := balancer.Get(Name)
   587  	cc := testutils.NewBalancerClientConn(t)
   588  	b := builder.Build(cc, balancer.BuildOptions{})
   589  	defer b.Close()
   590  
   591  	addrs := make([]resolver.Address, len(testBackendAddrs))
   592  	for i, a := range testBackendAddrs {
   593  		addrs[i] = xdsinternal.SetLocalityID(a, testLocality)
   594  	}
   595  	if err := b.UpdateClientConnState(balancer.ClientConnState{
   596  		ResolverState: xdsclient.SetClient(resolver.State{Addresses: addrs}, xdsC),
   597  		BalancerConfig: &LBConfig{
   598  			Cluster:             testClusterName,
   599  			EDSServiceName:      testServiceName,
   600  			LoadReportingServer: testLRSServerConfig,
   601  			// Locality:                testLocality,
   602  			ChildPolicy: &internalserviceconfig.BalancerConfig{
   603  				Name: roundrobin.Name,
   604  			},
   605  		},
   606  	}); err != nil {
   607  		t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
   608  	}
   609  
   610  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   611  	defer cancel()
   612  
   613  	got, err := xdsC.WaitForReportLoad(ctx)
   614  	if err != nil {
   615  		t.Fatalf("xdsClient.ReportLoad failed with error: %v", err)
   616  	}
   617  	if got.Server != testLRSServerConfig {
   618  		t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, testLRSServerConfig)
   619  	}
   620  
   621  	sc1 := <-cc.NewSubConnCh
   622  	sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
   623  	// This should get the connecting picker.
   624  	if err := cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable); err != nil {
   625  		t.Fatal(err.Error())
   626  	}
   627  
   628  	sc1.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
   629  	// Test pick with one backend.
   630  	const successCount = 5
   631  	const errorCount = 5
   632  	if err := cc.WaitForPicker(ctx, func(p balancer.Picker) error {
   633  		for i := 0; i < successCount; i++ {
   634  			gotSCSt, err := p.Pick(balancer.PickInfo{})
   635  			if gotSCSt.SubConn != sc1 {
   636  				return fmt.Errorf("picker.Pick, got %v, %v, want SubConn=%v", gotSCSt, err, sc1)
   637  			}
   638  			lr := &v3orcapb.OrcaLoadReport{
   639  				NamedMetrics: map[string]float64{testNamedMetricsKey1: 3.14, testNamedMetricsKey2: 2.718},
   640  			}
   641  			gotSCSt.Done(balancer.DoneInfo{ServerLoad: lr})
   642  		}
   643  		for i := 0; i < errorCount; i++ {
   644  			gotSCSt, err := p.Pick(balancer.PickInfo{})
   645  			if gotSCSt.SubConn != sc1 {
   646  				return fmt.Errorf("picker.Pick, got %v, %v, want SubConn=%v", gotSCSt, err, sc1)
   647  			}
   648  			gotSCSt.Done(balancer.DoneInfo{Err: fmt.Errorf("error")})
   649  		}
   650  		return nil
   651  	}); err != nil {
   652  		t.Fatal(err.Error())
   653  	}
   654  
   655  	// Dump load data from the store and compare with expected counts.
   656  	loadStore := xdsC.LoadStore()
   657  	if loadStore == nil {
   658  		t.Fatal("loadStore is nil in xdsClient")
   659  	}
   660  	sds := loadStore.Stats([]string{testClusterName})
   661  	if len(sds) == 0 {
   662  		t.Fatalf("loads for cluster %v not found in store", testClusterName)
   663  	}
   664  	sd := sds[0]
   665  	if sd.Cluster != testClusterName || sd.Service != testServiceName {
   666  		t.Fatalf("got unexpected load for %q, %q, want %q, %q", sd.Cluster, sd.Service, testClusterName, testServiceName)
   667  	}
   668  	testLocalityJSON, _ := testLocality.ToString()
   669  	localityData, ok := sd.LocalityStats[testLocalityJSON]
   670  	if !ok {
   671  		t.Fatalf("loads for %v not found in store", testLocality)
   672  	}
   673  	reqStats := localityData.RequestStats
   674  	if reqStats.Succeeded != successCount {
   675  		t.Errorf("got succeeded %v, want %v", reqStats.Succeeded, successCount)
   676  	}
   677  	if reqStats.Errored != errorCount {
   678  		t.Errorf("got errord %v, want %v", reqStats.Errored, errorCount)
   679  	}
   680  	if reqStats.InProgress != 0 {
   681  		t.Errorf("got inProgress %v, want %v", reqStats.InProgress, 0)
   682  	}
   683  	wantLoadStats := map[string]load.ServerLoadData{
   684  		testNamedMetricsKey1: {Count: 5, Sum: 15.7},  // aggregation of 5 * 3.14 = 15.7
   685  		testNamedMetricsKey2: {Count: 5, Sum: 13.59}, // aggregation of 5 * 2.718 = 13.59
   686  	}
   687  	if diff := cmp.Diff(wantLoadStats, localityData.LoadStats, toleranceCmpOpt); diff != "" {
   688  		t.Errorf("localityData.LoadStats returned unexpected diff (-want +got):\n%s", diff)
   689  	}
   690  	b.Close()
   691  	if err := xdsC.WaitForCancelReportLoad(ctx); err != nil {
   692  		t.Fatalf("unexpected error waiting form load report to be canceled: %v", err)
   693  	}
   694  }
   695  
   696  // TestUpdateLRSServer covers the cases
   697  // - the init config specifies "" as the LRS server
   698  // - config modifies LRS server to a different string
   699  // - config sets LRS server to nil to stop load reporting
   700  func (s) TestUpdateLRSServer(t *testing.T) {
   701  	var testLocality = xdsinternal.LocalityID{
   702  		Region:  "test-region",
   703  		Zone:    "test-zone",
   704  		SubZone: "test-sub-zone",
   705  	}
   706  
   707  	xdsC := fakeclient.NewClient()
   708  
   709  	builder := balancer.Get(Name)
   710  	cc := testutils.NewBalancerClientConn(t)
   711  	b := builder.Build(cc, balancer.BuildOptions{})
   712  	defer b.Close()
   713  
   714  	addrs := make([]resolver.Address, len(testBackendAddrs))
   715  	for i, a := range testBackendAddrs {
   716  		addrs[i] = xdsinternal.SetLocalityID(a, testLocality)
   717  	}
   718  	if err := b.UpdateClientConnState(balancer.ClientConnState{
   719  		ResolverState: xdsclient.SetClient(resolver.State{Addresses: addrs}, xdsC),
   720  		BalancerConfig: &LBConfig{
   721  			Cluster:             testClusterName,
   722  			EDSServiceName:      testServiceName,
   723  			LoadReportingServer: testLRSServerConfig,
   724  			ChildPolicy: &internalserviceconfig.BalancerConfig{
   725  				Name: roundrobin.Name,
   726  			},
   727  		},
   728  	}); err != nil {
   729  		t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
   730  	}
   731  
   732  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   733  	defer cancel()
   734  
   735  	got, err := xdsC.WaitForReportLoad(ctx)
   736  	if err != nil {
   737  		t.Fatalf("xdsClient.ReportLoad failed with error: %v", err)
   738  	}
   739  	if got.Server != testLRSServerConfig {
   740  		t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got.Server, testLRSServerConfig)
   741  	}
   742  
   743  	testLRSServerConfig2 := &bootstrap.ServerConfig{
   744  		ServerURI: "trafficdirector-another.googleapis.com:443",
   745  		Creds: bootstrap.ChannelCreds{
   746  			Type: "google_default",
   747  		},
   748  	}
   749  	// Update LRS server to a different name.
   750  	if err := b.UpdateClientConnState(balancer.ClientConnState{
   751  		ResolverState: xdsclient.SetClient(resolver.State{Addresses: addrs}, xdsC),
   752  		BalancerConfig: &LBConfig{
   753  			Cluster:             testClusterName,
   754  			EDSServiceName:      testServiceName,
   755  			LoadReportingServer: testLRSServerConfig2,
   756  			ChildPolicy: &internalserviceconfig.BalancerConfig{
   757  				Name: roundrobin.Name,
   758  			},
   759  		},
   760  	}); err != nil {
   761  		t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
   762  	}
   763  	if err := xdsC.WaitForCancelReportLoad(ctx); err != nil {
   764  		t.Fatalf("unexpected error waiting form load report to be canceled: %v", err)
   765  	}
   766  	got2, err2 := xdsC.WaitForReportLoad(ctx)
   767  	if err2 != nil {
   768  		t.Fatalf("xdsClient.ReportLoad failed with error: %v", err2)
   769  	}
   770  	if got2.Server != testLRSServerConfig2 {
   771  		t.Fatalf("xdsClient.ReportLoad called with {%q}: want {%q}", got2.Server, testLRSServerConfig2)
   772  	}
   773  
   774  	// Update LRS server to nil, to disable LRS.
   775  	if err := b.UpdateClientConnState(balancer.ClientConnState{
   776  		ResolverState: xdsclient.SetClient(resolver.State{Addresses: addrs}, xdsC),
   777  		BalancerConfig: &LBConfig{
   778  			Cluster:        testClusterName,
   779  			EDSServiceName: testServiceName,
   780  			ChildPolicy: &internalserviceconfig.BalancerConfig{
   781  				Name: roundrobin.Name,
   782  			},
   783  		},
   784  	}); err != nil {
   785  		t.Fatalf("unexpected error from UpdateClientConnState: %v", err)
   786  	}
   787  	if err := xdsC.WaitForCancelReportLoad(ctx); err != nil {
   788  		t.Fatalf("unexpected error waiting form load report to be canceled: %v", err)
   789  	}
   790  
   791  	shortCtx, shortCancel := context.WithTimeout(context.Background(), defaultShortTestTimeout)
   792  	defer shortCancel()
   793  	if s, err := xdsC.WaitForReportLoad(shortCtx); err != context.DeadlineExceeded {
   794  		t.Fatalf("unexpected load report to server: %q", s)
   795  	}
   796  }
   797  
   798  func assertString(f func() (string, error)) string {
   799  	s, err := f()
   800  	if err != nil {
   801  		panic(err.Error())
   802  	}
   803  	return s
   804  }
   805  

View as plain text