...

Source file src/google.golang.org/grpc/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go

Documentation: google.golang.org/grpc/xds/internal/balancer/cdsbalancer

     1  /*
     2   * Copyright 2019 gRPC authors.
     3   *
     4   * Licensed under the Apache License, Version 2.0 (the "License");
     5   * you may not use this file except in compliance with the License.
     6   * You may obtain a copy of the License at
     7   *
     8   *     http://www.apache.org/licenses/LICENSE-2.0
     9   *
    10   * Unless required by applicable law or agreed to in writing, software
    11   * distributed under the License is distributed on an "AS IS" BASIS,
    12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13   * See the License for the specific language governing permissions and
    14   * limitations under the License.
    15   */
    16  
    17  package cdsbalancer
    18  
    19  import (
    20  	"context"
    21  	"encoding/json"
    22  	"errors"
    23  	"fmt"
    24  	"strings"
    25  	"testing"
    26  	"time"
    27  
    28  	"github.com/google/go-cmp/cmp"
    29  	"google.golang.org/grpc"
    30  	"google.golang.org/grpc/balancer"
    31  	"google.golang.org/grpc/codes"
    32  	"google.golang.org/grpc/connectivity"
    33  	"google.golang.org/grpc/credentials/insecure"
    34  	"google.golang.org/grpc/internal"
    35  	"google.golang.org/grpc/internal/balancer/stub"
    36  	"google.golang.org/grpc/internal/grpctest"
    37  	"google.golang.org/grpc/internal/stubserver"
    38  	"google.golang.org/grpc/internal/testutils"
    39  	"google.golang.org/grpc/internal/testutils/xds/e2e"
    40  	"google.golang.org/grpc/internal/xds/bootstrap"
    41  	"google.golang.org/grpc/resolver"
    42  	"google.golang.org/grpc/resolver/manual"
    43  	"google.golang.org/grpc/serviceconfig"
    44  	"google.golang.org/grpc/status"
    45  	"google.golang.org/grpc/xds/internal/balancer/clusterresolver"
    46  	"google.golang.org/grpc/xds/internal/xdsclient"
    47  	"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
    48  	"google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version"
    49  	"google.golang.org/protobuf/types/known/durationpb"
    50  	"google.golang.org/protobuf/types/known/wrapperspb"
    51  
    52  	v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
    53  	v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
    54  	v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
    55  	v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
    56  	testgrpc "google.golang.org/grpc/interop/grpc_testing"
    57  	testpb "google.golang.org/grpc/interop/grpc_testing"
    58  
    59  	_ "google.golang.org/grpc/xds/internal/balancer/ringhash" // Register the ring_hash LB policy
    60  )
    61  
    62  const (
    63  	clusterName             = "cluster1"
    64  	edsClusterName          = clusterName + "-eds"
    65  	dnsClusterName          = clusterName + "-dns"
    66  	serviceName             = "service1"
    67  	dnsHostName             = "dns_host"
    68  	dnsPort                 = uint32(8080)
    69  	defaultTestTimeout      = 5 * time.Second
    70  	defaultTestShortTimeout = 10 * time.Millisecond // For events expected to *not* happen.
    71  )
    72  
    73  type s struct {
    74  	grpctest.Tester
    75  }
    76  
    77  func Test(t *testing.T) {
    78  	grpctest.RunSubTests(t, s{})
    79  }
    80  
    81  func waitForResourceNames(ctx context.Context, resourceNamesCh chan []string, wantNames []string) error {
    82  	for ctx.Err() == nil {
    83  		select {
    84  		case <-ctx.Done():
    85  		case gotNames := <-resourceNamesCh:
    86  			if cmp.Equal(gotNames, wantNames) {
    87  				return nil
    88  			}
    89  		}
    90  	}
    91  	if ctx.Err() != nil {
    92  		return fmt.Errorf("Timeout when waiting for appropriate Cluster resources to be requested")
    93  	}
    94  	return nil
    95  }
    96  
    97  // Registers a wrapped cluster_resolver LB policy (child policy of the cds LB
    98  // policy) for the duration of this test that retains all the functionality of
    99  // the former, but makes certain events available for inspection by the test.
   100  //
   101  // Returns the following:
   102  // - a channel to read received load balancing configuration
   103  // - a channel to read received resolver error
   104  // - a channel that is closed when ExitIdle() is called
   105  // - a channel that is closed when the balancer is closed
   106  func registerWrappedClusterResolverPolicy(t *testing.T) (chan serviceconfig.LoadBalancingConfig, chan error, chan struct{}, chan struct{}) {
   107  	clusterresolverBuilder := balancer.Get(clusterresolver.Name)
   108  	internal.BalancerUnregister(clusterresolverBuilder.Name())
   109  
   110  	lbCfgCh := make(chan serviceconfig.LoadBalancingConfig, 1)
   111  	resolverErrCh := make(chan error, 1)
   112  	exitIdleCh := make(chan struct{})
   113  	closeCh := make(chan struct{})
   114  
   115  	stub.Register(clusterresolver.Name, stub.BalancerFuncs{
   116  		Init: func(bd *stub.BalancerData) {
   117  			bd.Data = clusterresolverBuilder.Build(bd.ClientConn, bd.BuildOptions)
   118  		},
   119  		ParseConfig: func(lbCfg json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
   120  			return clusterresolverBuilder.(balancer.ConfigParser).ParseConfig(lbCfg)
   121  		},
   122  		UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
   123  			select {
   124  			case lbCfgCh <- ccs.BalancerConfig:
   125  			default:
   126  			}
   127  			bal := bd.Data.(balancer.Balancer)
   128  			return bal.UpdateClientConnState(ccs)
   129  		},
   130  		ResolverError: func(bd *stub.BalancerData, err error) {
   131  			select {
   132  			case resolverErrCh <- err:
   133  			default:
   134  			}
   135  			bal := bd.Data.(balancer.Balancer)
   136  			bal.ResolverError(err)
   137  		},
   138  		ExitIdle: func(bd *stub.BalancerData) {
   139  			bal := bd.Data.(balancer.Balancer)
   140  			bal.(balancer.ExitIdler).ExitIdle()
   141  			close(exitIdleCh)
   142  		},
   143  		Close: func(bd *stub.BalancerData) {
   144  			bal := bd.Data.(balancer.Balancer)
   145  			bal.Close()
   146  			close(closeCh)
   147  		},
   148  	})
   149  	t.Cleanup(func() { balancer.Register(clusterresolverBuilder) })
   150  
   151  	return lbCfgCh, resolverErrCh, exitIdleCh, closeCh
   152  }
   153  
   154  // Registers a wrapped cds LB policy for the duration of this test that retains
   155  // all the functionality of the original cds LB policy, but makes the newly
   156  // built policy available to the test to directly invoke any balancer methods.
   157  //
   158  // Returns a channel on which the newly built cds LB policy is written to.
   159  func registerWrappedCDSPolicy(t *testing.T) chan balancer.Balancer {
   160  	cdsBuilder := balancer.Get(cdsName)
   161  	internal.BalancerUnregister(cdsBuilder.Name())
   162  	cdsBalancerCh := make(chan balancer.Balancer, 1)
   163  	stub.Register(cdsBuilder.Name(), stub.BalancerFuncs{
   164  		Init: func(bd *stub.BalancerData) {
   165  			bal := cdsBuilder.Build(bd.ClientConn, bd.BuildOptions)
   166  			bd.Data = bal
   167  			cdsBalancerCh <- bal
   168  		},
   169  		ParseConfig: func(lbCfg json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
   170  			return cdsBuilder.(balancer.ConfigParser).ParseConfig(lbCfg)
   171  		},
   172  		UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
   173  			bal := bd.Data.(balancer.Balancer)
   174  			return bal.UpdateClientConnState(ccs)
   175  		},
   176  		Close: func(bd *stub.BalancerData) {
   177  			bal := bd.Data.(balancer.Balancer)
   178  			bal.Close()
   179  		},
   180  	})
   181  	t.Cleanup(func() { balancer.Register(cdsBuilder) })
   182  
   183  	return cdsBalancerCh
   184  }
   185  
   186  // Performs the following setup required for tests:
   187  //   - Spins up an xDS management server
   188  //   - Creates an xDS client talking to this management server
   189  //   - Creates a manual resolver that configures the cds LB policy as the
   190  //     top-level policy, and pushes an initial configuration to it
   191  //   - Creates a gRPC channel with the above manual resolver
   192  //
   193  // Returns the following:
   194  //   - the xDS management server
   195  //   - the nodeID expected by the management server
   196  //   - the grpc channel to the test backend service
   197  //   - the manual resolver configured on the channel
   198  //   - the xDS cient used the grpc channel
   199  //   - a channel on which requested cluster resource names are sent
   200  //   - a channel used to signal that previously requested cluster resources are
   201  //     no longer requested
   202  func setupWithManagementServer(t *testing.T) (*e2e.ManagementServer, string, *grpc.ClientConn, *manual.Resolver, xdsclient.XDSClient, chan []string, chan struct{}) {
   203  	t.Helper()
   204  
   205  	cdsResourceRequestedCh := make(chan []string, 1)
   206  	cdsResourceCanceledCh := make(chan struct{}, 1)
   207  	mgmtServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{
   208  		OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error {
   209  			if req.GetTypeUrl() == version.V3ClusterURL {
   210  				switch len(req.GetResourceNames()) {
   211  				case 0:
   212  					select {
   213  					case cdsResourceCanceledCh <- struct{}{}:
   214  					default:
   215  					}
   216  				default:
   217  					select {
   218  					case cdsResourceRequestedCh <- req.GetResourceNames():
   219  					default:
   220  					}
   221  				}
   222  			}
   223  			return nil
   224  		},
   225  		// Required for aggregate clusters as all resources cannot be requested
   226  		// at once.
   227  		AllowResourceSubset: true,
   228  	})
   229  	t.Cleanup(cleanup)
   230  
   231  	xdsC, xdsClose, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
   232  	if err != nil {
   233  		t.Fatalf("Failed to create xDS client: %v", err)
   234  	}
   235  	t.Cleanup(xdsClose)
   236  
   237  	r := manual.NewBuilderWithScheme("whatever")
   238  	jsonSC := fmt.Sprintf(`{
   239  			"loadBalancingConfig":[{
   240  				"cds_experimental":{
   241  					"cluster": "%s"
   242  				}
   243  			}]
   244  		}`, clusterName)
   245  	scpr := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC)
   246  	r.InitialState(xdsclient.SetClient(resolver.State{ServiceConfig: scpr}, xdsC))
   247  
   248  	cc, err := grpc.Dial(r.Scheme()+":///test.service", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
   249  	if err != nil {
   250  		t.Fatalf("Failed to dial local test server: %v", err)
   251  	}
   252  	t.Cleanup(func() { cc.Close() })
   253  
   254  	return mgmtServer, nodeID, cc, r, xdsC, cdsResourceRequestedCh, cdsResourceCanceledCh
   255  }
   256  
   257  // Helper function to compare the load balancing configuration received on the
   258  // channel with the expected one. Both configs are marshalled to JSON and then
   259  // compared.
   260  //
   261  // Returns an error if marshalling to JSON fails, or if the load balancing
   262  // configurations don't match, or if the context deadline expires before reading
   263  // a child policy configuration off of the lbCfgCh.
   264  func compareLoadBalancingConfig(ctx context.Context, lbCfgCh chan serviceconfig.LoadBalancingConfig, wantChildCfg serviceconfig.LoadBalancingConfig) error {
   265  	wantJSON, err := json.Marshal(wantChildCfg)
   266  	if err != nil {
   267  		return fmt.Errorf("failed to marshal expected child config to JSON: %v", err)
   268  	}
   269  	select {
   270  	case lbCfg := <-lbCfgCh:
   271  		gotJSON, err := json.Marshal(lbCfg)
   272  		if err != nil {
   273  			return fmt.Errorf("failed to marshal received LB config into JSON: %v", err)
   274  		}
   275  		if diff := cmp.Diff(wantJSON, gotJSON); diff != "" {
   276  			return fmt.Errorf("child policy received unexpected diff in config (-want +got):\n%s", diff)
   277  		}
   278  	case <-ctx.Done():
   279  		return fmt.Errorf("timeout when waiting for child policy to receive its configuration")
   280  	}
   281  	return nil
   282  }
   283  
   284  // Tests the functionality that handles LB policy configuration. Verifies that
   285  // the appropriate xDS resource is requested corresponding to the provided LB
   286  // policy configuration. Also verifies that when the LB policy receives the same
   287  // configuration again, it does not send out a new request, and when the
   288  // configuration changes, it stops requesting the old cluster resource and
   289  // starts requesting the new one.
   290  func (s) TestConfigurationUpdate_Success(t *testing.T) {
   291  	_, _, _, r, xdsClient, cdsResourceRequestedCh, _ := setupWithManagementServer(t)
   292  
   293  	// Verify that the specified cluster resource is requested.
   294  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   295  	defer cancel()
   296  	wantNames := []string{clusterName}
   297  	if err := waitForResourceNames(ctx, cdsResourceRequestedCh, wantNames); err != nil {
   298  		t.Fatal(err)
   299  	}
   300  
   301  	// Push the same configuration again.
   302  	jsonSC := fmt.Sprintf(`{
   303  			"loadBalancingConfig":[{
   304  				"cds_experimental":{
   305  					"cluster": "%s"
   306  				}
   307  			}]
   308  		}`, clusterName)
   309  	scpr := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC)
   310  	r.UpdateState(xdsclient.SetClient(resolver.State{ServiceConfig: scpr}, xdsClient))
   311  
   312  	// Verify that a new CDS request is not sent.
   313  	sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
   314  	defer sCancel()
   315  	select {
   316  	case <-sCtx.Done():
   317  	case gotNames := <-cdsResourceRequestedCh:
   318  		t.Fatalf("CDS resources %v requested when none expected", gotNames)
   319  	}
   320  
   321  	// Push an updated configuration with a different cluster name.
   322  	newClusterName := clusterName + "-new"
   323  	jsonSC = fmt.Sprintf(`{
   324  			"loadBalancingConfig":[{
   325  				"cds_experimental":{
   326  					"cluster": "%s"
   327  				}
   328  			}]
   329  		}`, newClusterName)
   330  	scpr = internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC)
   331  	r.UpdateState(xdsclient.SetClient(resolver.State{ServiceConfig: scpr}, xdsClient))
   332  
   333  	// Verify that the new cluster name is requested and the old one is no
   334  	// longer requested.
   335  	wantNames = []string{newClusterName}
   336  	if err := waitForResourceNames(ctx, cdsResourceRequestedCh, wantNames); err != nil {
   337  		t.Fatal(err)
   338  	}
   339  }
   340  
   341  // Tests the case where a configuration with an empty cluster name is pushed to
   342  // the CDS LB policy. Verifies that ErrBadResolverState is returned.
   343  func (s) TestConfigurationUpdate_EmptyCluster(t *testing.T) {
   344  	// Setup a management server and an xDS client to talk to it.
   345  	_, _, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
   346  	t.Cleanup(cleanup)
   347  	xdsClient, xdsClose, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
   348  	if err != nil {
   349  		t.Fatalf("Failed to create xDS client: %v", err)
   350  	}
   351  	t.Cleanup(xdsClose)
   352  
   353  	// Create a manual resolver that configures the CDS LB policy as the
   354  	// top-level LB policy on the channel, and pushes a configuration with an
   355  	// empty cluster name. Also, register a callback with the manual resolver to
   356  	// receive the error returned by the balancer when a configuration with an
   357  	// empty cluster name is pushed.
   358  	r := manual.NewBuilderWithScheme("whatever")
   359  	updateStateErrCh := make(chan error, 1)
   360  	r.UpdateStateCallback = func(err error) { updateStateErrCh <- err }
   361  	jsonSC := `{
   362  			"loadBalancingConfig":[{
   363  				"cds_experimental":{
   364  					"cluster": ""
   365  				}
   366  			}]
   367  		}`
   368  	scpr := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC)
   369  	r.InitialState(xdsclient.SetClient(resolver.State{ServiceConfig: scpr}, xdsClient))
   370  
   371  	// Create a ClientConn with the above manual resolver.
   372  	cc, err := grpc.Dial(r.Scheme()+":///test.service", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
   373  	if err != nil {
   374  		t.Fatalf("Failed to dial: %v", err)
   375  	}
   376  	t.Cleanup(func() { cc.Close() })
   377  
   378  	select {
   379  	case <-time.After(defaultTestTimeout):
   380  		t.Fatalf("Timed out waiting for error from the LB policy")
   381  	case err := <-updateStateErrCh:
   382  		if err != balancer.ErrBadResolverState {
   383  			t.Fatalf("For a configuration update with an empty cluster name, got error %v from the LB policy, want %v", err, balancer.ErrBadResolverState)
   384  		}
   385  	}
   386  }
   387  
   388  // Tests the case where a configuration with a missing xDS client is pushed to
   389  // the CDS LB policy. Verifies that ErrBadResolverState is returned.
   390  func (s) TestConfigurationUpdate_MissingXdsClient(t *testing.T) {
   391  	// Create a manual resolver that configures the CDS LB policy as the
   392  	// top-level LB policy on the channel, and pushes a configuration that is
   393  	// missing the xDS client.  Also, register a callback with the manual
   394  	// resolver to receive the error returned by the balancer.
   395  	r := manual.NewBuilderWithScheme("whatever")
   396  	updateStateErrCh := make(chan error, 1)
   397  	r.UpdateStateCallback = func(err error) { updateStateErrCh <- err }
   398  	jsonSC := `{
   399  			"loadBalancingConfig":[{
   400  				"cds_experimental":{
   401  					"cluster": "foo"
   402  				}
   403  			}]
   404  		}`
   405  	scpr := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC)
   406  	r.InitialState(resolver.State{ServiceConfig: scpr})
   407  
   408  	// Create a ClientConn with the above manual resolver.
   409  	cc, err := grpc.Dial(r.Scheme()+":///test.service", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
   410  	if err != nil {
   411  		t.Fatalf("Failed to dial: %v", err)
   412  	}
   413  	t.Cleanup(func() { cc.Close() })
   414  
   415  	select {
   416  	case <-time.After(defaultTestTimeout):
   417  		t.Fatalf("Timed out waiting for error from the LB policy")
   418  	case err := <-updateStateErrCh:
   419  		if err != balancer.ErrBadResolverState {
   420  			t.Fatalf("For a configuration update missing the xDS client, got error %v from the LB policy, want %v", err, balancer.ErrBadResolverState)
   421  		}
   422  	}
   423  }
   424  
   425  // Tests success scenarios where the cds LB policy receives a cluster resource
   426  // from the management server. Verifies that the load balancing configuration
   427  // pushed to the child is as expected.
   428  func (s) TestClusterUpdate_Success(t *testing.T) {
   429  	tests := []struct {
   430  		name            string
   431  		clusterResource *v3clusterpb.Cluster
   432  		wantChildCfg    serviceconfig.LoadBalancingConfig
   433  	}{
   434  		{
   435  			name: "happy-case-with-circuit-breakers",
   436  			clusterResource: func() *v3clusterpb.Cluster {
   437  				c := e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelNone)
   438  				c.CircuitBreakers = &v3clusterpb.CircuitBreakers{
   439  					Thresholds: []*v3clusterpb.CircuitBreakers_Thresholds{
   440  						{
   441  							Priority:    v3corepb.RoutingPriority_DEFAULT,
   442  							MaxRequests: wrapperspb.UInt32(512),
   443  						},
   444  						{
   445  							Priority:    v3corepb.RoutingPriority_HIGH,
   446  							MaxRequests: nil,
   447  						},
   448  					},
   449  				}
   450  				return c
   451  			}(),
   452  			wantChildCfg: &clusterresolver.LBConfig{
   453  				DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{{
   454  					Cluster:               clusterName,
   455  					Type:                  clusterresolver.DiscoveryMechanismTypeEDS,
   456  					EDSServiceName:        serviceName,
   457  					MaxConcurrentRequests: newUint32(512),
   458  					OutlierDetection:      json.RawMessage(`{}`),
   459  				}},
   460  				XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`),
   461  			},
   462  		},
   463  		{
   464  			name: "happy-case-with-ring-hash-lb-policy",
   465  			clusterResource: func() *v3clusterpb.Cluster {
   466  				c := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{
   467  					ClusterName:   clusterName,
   468  					ServiceName:   serviceName,
   469  					SecurityLevel: e2e.SecurityLevelNone,
   470  					Policy:        e2e.LoadBalancingPolicyRingHash,
   471  				})
   472  				c.LbConfig = &v3clusterpb.Cluster_RingHashLbConfig_{
   473  					RingHashLbConfig: &v3clusterpb.Cluster_RingHashLbConfig{
   474  						MinimumRingSize: &wrapperspb.UInt64Value{Value: 100},
   475  						MaximumRingSize: &wrapperspb.UInt64Value{Value: 1000},
   476  					},
   477  				}
   478  				return c
   479  			}(),
   480  			wantChildCfg: &clusterresolver.LBConfig{
   481  				DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{{
   482  					Cluster:          clusterName,
   483  					Type:             clusterresolver.DiscoveryMechanismTypeEDS,
   484  					EDSServiceName:   serviceName,
   485  					OutlierDetection: json.RawMessage(`{}`),
   486  				}},
   487  				XDSLBPolicy: json.RawMessage(`[{"ring_hash_experimental": {"minRingSize":100, "maxRingSize":1000}}]`),
   488  			},
   489  		},
   490  		{
   491  			name: "happy-case-outlier-detection-xds-defaults", // OD proto set but no proto fields set
   492  			clusterResource: func() *v3clusterpb.Cluster {
   493  				c := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{
   494  					ClusterName:   clusterName,
   495  					ServiceName:   serviceName,
   496  					SecurityLevel: e2e.SecurityLevelNone,
   497  					Policy:        e2e.LoadBalancingPolicyRingHash,
   498  				})
   499  				c.OutlierDetection = &v3clusterpb.OutlierDetection{}
   500  				return c
   501  			}(),
   502  			wantChildCfg: &clusterresolver.LBConfig{
   503  				DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{{
   504  					Cluster:          clusterName,
   505  					Type:             clusterresolver.DiscoveryMechanismTypeEDS,
   506  					EDSServiceName:   serviceName,
   507  					OutlierDetection: json.RawMessage(`{"successRateEjection":{}}`),
   508  				}},
   509  				XDSLBPolicy: json.RawMessage(`[{"ring_hash_experimental": {"minRingSize":1024, "maxRingSize":8388608}}]`),
   510  			},
   511  		},
   512  		{
   513  			name: "happy-case-outlier-detection-all-fields-set",
   514  			clusterResource: func() *v3clusterpb.Cluster {
   515  				c := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{
   516  					ClusterName:   clusterName,
   517  					ServiceName:   serviceName,
   518  					SecurityLevel: e2e.SecurityLevelNone,
   519  					Policy:        e2e.LoadBalancingPolicyRingHash,
   520  				})
   521  				c.OutlierDetection = &v3clusterpb.OutlierDetection{
   522  					Interval:                       durationpb.New(10 * time.Second),
   523  					BaseEjectionTime:               durationpb.New(30 * time.Second),
   524  					MaxEjectionTime:                durationpb.New(300 * time.Second),
   525  					MaxEjectionPercent:             wrapperspb.UInt32(10),
   526  					SuccessRateStdevFactor:         wrapperspb.UInt32(1900),
   527  					EnforcingSuccessRate:           wrapperspb.UInt32(100),
   528  					SuccessRateMinimumHosts:        wrapperspb.UInt32(5),
   529  					SuccessRateRequestVolume:       wrapperspb.UInt32(100),
   530  					FailurePercentageThreshold:     wrapperspb.UInt32(85),
   531  					EnforcingFailurePercentage:     wrapperspb.UInt32(5),
   532  					FailurePercentageMinimumHosts:  wrapperspb.UInt32(5),
   533  					FailurePercentageRequestVolume: wrapperspb.UInt32(50),
   534  				}
   535  				return c
   536  			}(),
   537  			wantChildCfg: &clusterresolver.LBConfig{
   538  				DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{{
   539  					Cluster:        clusterName,
   540  					Type:           clusterresolver.DiscoveryMechanismTypeEDS,
   541  					EDSServiceName: serviceName,
   542  					OutlierDetection: json.RawMessage(`{
   543  						"interval": "10s",
   544  						"baseEjectionTime": "30s",
   545  						"maxEjectionTime": "300s",
   546  						"maxEjectionPercent": 10,
   547  						"successRateEjection": {
   548  							"stdevFactor": 1900,
   549  							"enforcementPercentage": 100,
   550  							"minimumHosts": 5,
   551  							"requestVolume": 100
   552  						},
   553  						"failurePercentageEjection": {
   554  							"threshold": 85,
   555  							"enforcementPercentage": 5,
   556  							"minimumHosts": 5,
   557  							"requestVolume": 50
   558  						}
   559  					}`),
   560  				}},
   561  				XDSLBPolicy: json.RawMessage(`[{"ring_hash_experimental": {"minRingSize":1024, "maxRingSize":8388608}}]`),
   562  			},
   563  		},
   564  	}
   565  
   566  	for _, test := range tests {
   567  		t.Run(test.name, func(t *testing.T) {
   568  			lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t)
   569  			mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t)
   570  
   571  			ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   572  			defer cancel()
   573  			if err := mgmtServer.Update(ctx, e2e.UpdateOptions{
   574  				NodeID:         nodeID,
   575  				Clusters:       []*v3clusterpb.Cluster{test.clusterResource},
   576  				SkipValidation: true,
   577  			}); err != nil {
   578  				t.Fatal(err)
   579  			}
   580  
   581  			if err := compareLoadBalancingConfig(ctx, lbCfgCh, test.wantChildCfg); err != nil {
   582  				t.Fatal(err)
   583  			}
   584  		})
   585  	}
   586  }
   587  
   588  // Tests a single success scenario where the cds LB policy receives a cluster
   589  // resource from the management server with LRS enabled. Verifies that the load
   590  // balancing configuration pushed to the child is as expected.
   591  func (s) TestClusterUpdate_SuccessWithLRS(t *testing.T) {
   592  	lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t)
   593  	mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t)
   594  
   595  	clusterResource := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{
   596  		ClusterName: clusterName,
   597  		ServiceName: serviceName,
   598  		EnableLRS:   true,
   599  	})
   600  	wantChildCfg := &clusterresolver.LBConfig{
   601  		DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{{
   602  			Cluster:        clusterName,
   603  			Type:           clusterresolver.DiscoveryMechanismTypeEDS,
   604  			EDSServiceName: serviceName,
   605  			LoadReportingServer: &bootstrap.ServerConfig{
   606  				ServerURI: mgmtServer.Address,
   607  				Creds:     bootstrap.ChannelCreds{Type: "insecure"},
   608  			},
   609  			OutlierDetection: json.RawMessage(`{}`),
   610  		}},
   611  		XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`),
   612  	}
   613  
   614  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   615  	defer cancel()
   616  	if err := mgmtServer.Update(ctx, e2e.UpdateOptions{
   617  		NodeID:         nodeID,
   618  		Clusters:       []*v3clusterpb.Cluster{clusterResource},
   619  		SkipValidation: true,
   620  	}); err != nil {
   621  		t.Fatal(err)
   622  	}
   623  
   624  	if err := compareLoadBalancingConfig(ctx, lbCfgCh, wantChildCfg); err != nil {
   625  		t.Fatal(err)
   626  	}
   627  }
   628  
   629  // Tests scenarios for a bad cluster update received from the management server.
   630  //
   631  //   - when a bad cluster resource update is received without any previous good
   632  //     update from the management server, the cds LB policy is expected to put
   633  //     the channel in TRANSIENT_FAILURE.
   634  //   - when a bad cluster resource update is received after a previous good
   635  //     update from the management server, the cds LB policy is expected to
   636  //     continue using the previous good update.
   637  //   - when the cluster resource is removed after a previous good
   638  //     update from the management server, the cds LB policy is expected to put
   639  //     the channel in TRANSIENT_FAILURE.
   640  func (s) TestClusterUpdate_Failure(t *testing.T) {
   641  	_, resolverErrCh, _, _ := registerWrappedClusterResolverPolicy(t)
   642  	mgmtServer, nodeID, cc, _, _, cdsResourceRequestedCh, cdsResourceCanceledCh := setupWithManagementServer(t)
   643  
   644  	// Verify that the specified cluster resource is requested.
   645  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   646  	defer cancel()
   647  	wantNames := []string{clusterName}
   648  	if err := waitForResourceNames(ctx, cdsResourceRequestedCh, wantNames); err != nil {
   649  		t.Fatal(err)
   650  	}
   651  
   652  	// Configure the management server to return a cluster resource that
   653  	// contains a config_source_specifier for the `lrs_server` field which is not
   654  	// set to `self`, and hence is expected to be NACKed by the client.
   655  	cluster := e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelNone)
   656  	cluster.LrsServer = &v3corepb.ConfigSource{ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{}}
   657  	resources := e2e.UpdateOptions{
   658  		NodeID:         nodeID,
   659  		Clusters:       []*v3clusterpb.Cluster{cluster},
   660  		SkipValidation: true,
   661  	}
   662  	if err := mgmtServer.Update(ctx, resources); err != nil {
   663  		t.Fatal(err)
   664  	}
   665  
   666  	// Verify that the watch for the cluster resource is not cancelled.
   667  	sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
   668  	defer sCancel()
   669  	select {
   670  	case <-sCtx.Done():
   671  	case <-cdsResourceCanceledCh:
   672  		t.Fatal("Watch for cluster resource is cancelled when not expected to")
   673  	}
   674  
   675  	testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)
   676  
   677  	// Ensure that the NACK error is propagated to the RPC caller.
   678  	const wantClusterNACKErr = "unsupported config_source_specifier"
   679  	client := testgrpc.NewTestServiceClient(cc)
   680  	_, err := client.EmptyCall(ctx, &testpb.Empty{})
   681  	if code := status.Code(err); code != codes.Unavailable {
   682  		t.Fatalf("EmptyCall() failed with code: %v, want %v", code, codes.Unavailable)
   683  	}
   684  	if err != nil && !strings.Contains(err.Error(), wantClusterNACKErr) {
   685  		t.Fatalf("EmptyCall() failed with err: %v, want err containing: %v", err, wantClusterNACKErr)
   686  	}
   687  
   688  	// Start a test service backend.
   689  	server := stubserver.StartTestService(t, nil)
   690  	t.Cleanup(server.Stop)
   691  
   692  	// Configure cluster and endpoints resources in the management server.
   693  	resources = e2e.UpdateOptions{
   694  		NodeID:         nodeID,
   695  		Clusters:       []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelNone)},
   696  		Endpoints:      []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, "localhost", []uint32{testutils.ParsePort(t, server.Address)})},
   697  		SkipValidation: true,
   698  	}
   699  	if err := mgmtServer.Update(ctx, resources); err != nil {
   700  		t.Fatal(err)
   701  	}
   702  
   703  	// Verify that a successful RPC can be made.
   704  	if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
   705  		t.Fatalf("EmptyCall() failed: %v", err)
   706  	}
   707  
   708  	// Send the bad cluster resource again.
   709  	resources = e2e.UpdateOptions{
   710  		NodeID:         nodeID,
   711  		Clusters:       []*v3clusterpb.Cluster{cluster},
   712  		Endpoints:      []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, "localhost", []uint32{testutils.ParsePort(t, server.Address)})},
   713  		SkipValidation: true,
   714  	}
   715  	if err := mgmtServer.Update(ctx, resources); err != nil {
   716  		t.Fatal(err)
   717  	}
   718  
   719  	// Verify that the watch for the cluster resource is not cancelled.
   720  	sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortTimeout)
   721  	defer sCancel()
   722  	select {
   723  	case <-sCtx.Done():
   724  	case <-cdsResourceCanceledCh:
   725  		t.Fatal("Watch for cluster resource is cancelled when not expected to")
   726  	}
   727  
   728  	// Verify that a successful RPC can be made, using the previously received
   729  	// good configuration.
   730  	if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
   731  		t.Fatalf("EmptyCall() failed: %v", err)
   732  	}
   733  
   734  	// Verify that the resolver error is pushed to the child policy.
   735  	select {
   736  	case err := <-resolverErrCh:
   737  		if !strings.Contains(err.Error(), wantClusterNACKErr) {
   738  			t.Fatalf("Error pushed to child policy is %v, want %v", err, wantClusterNACKErr)
   739  		}
   740  	case <-ctx.Done():
   741  		t.Fatal("Timeout when waiting for resolver error to be pushed to the child policy")
   742  	}
   743  
   744  	// Remove the cluster resource from the management server, triggering a
   745  	// resource-not-found error.
   746  	resources = e2e.UpdateOptions{
   747  		NodeID:         nodeID,
   748  		SkipValidation: true,
   749  	}
   750  	if err := mgmtServer.Update(ctx, resources); err != nil {
   751  		t.Fatal(err)
   752  	}
   753  
   754  	// Verify that the watch for the cluster resource is not cancelled.
   755  	sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortTimeout)
   756  	defer sCancel()
   757  	select {
   758  	case <-sCtx.Done():
   759  	case <-cdsResourceCanceledCh:
   760  		t.Fatal("Watch for cluster resource is cancelled when not expected to")
   761  	}
   762  
   763  	testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)
   764  
   765  	// Ensure RPC fails with Unavailable. The actual error message depends on
   766  	// the picker returned from the priority LB policy, and therefore not
   767  	// checking for it here.
   768  	if _, err := client.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Unavailable {
   769  		t.Fatalf("EmptyCall() failed with code: %v, want %v", status.Code(err), codes.Unavailable)
   770  	}
   771  }
   772  
   773  // Tests the following scenarios for resolver errors:
   774  //   - when a resolver error is received without any previous good update from the
   775  //     management server, the cds LB policy is expected to put the channel in
   776  //     TRANSIENT_FAILURE.
   777  //   - when a resolver error is received (one that is not a resource-not-found
   778  //     error), with a previous good update from the management server, the cds LB
   779  //     policy is expected to push the error down the child policy, but is expected
   780  //     to continue to use the previously received good configuration.
   781  //   - when a resolver error is received (one that is a resource-not-found
   782  //     error, which is usually the case when the LDS resource is removed),
   783  //     with a previous good update from the management server, the cds LB policy
   784  //     is expected to push the error down the child policy and put the channel in
   785  //     TRANSIENT_FAILURE. It is also expected to cancel the CDS watch.
   786  func (s) TestResolverError(t *testing.T) {
   787  	_, resolverErrCh, _, _ := registerWrappedClusterResolverPolicy(t)
   788  	mgmtServer, nodeID, cc, r, _, cdsResourceRequestedCh, cdsResourceCanceledCh := setupWithManagementServer(t)
   789  
   790  	// Verify that the specified cluster resource is requested.
   791  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   792  	defer cancel()
   793  	wantNames := []string{clusterName}
   794  	if err := waitForResourceNames(ctx, cdsResourceRequestedCh, wantNames); err != nil {
   795  		t.Fatal(err)
   796  	}
   797  
   798  	// Push a resolver error that is not a resource-not-found error.
   799  	resolverErr := errors.New("resolver-error-not-a-resource-not-found-error")
   800  	r.ReportError(resolverErr)
   801  
   802  	testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)
   803  
   804  	// Drain the resolver error channel.
   805  	select {
   806  	case <-resolverErrCh:
   807  	default:
   808  	}
   809  
   810  	// Ensure that the resolver error is propagated to the RPC caller.
   811  	client := testgrpc.NewTestServiceClient(cc)
   812  	_, err := client.EmptyCall(ctx, &testpb.Empty{})
   813  	if code := status.Code(err); code != codes.Unavailable {
   814  		t.Fatalf("EmptyCall() failed with code: %v, want %v", code, codes.Unavailable)
   815  	}
   816  	if err != nil && !strings.Contains(err.Error(), resolverErr.Error()) {
   817  		t.Fatalf("EmptyCall() failed with err: %v, want %v", err, resolverErr)
   818  	}
   819  
   820  	// Also verify that the watch for the cluster resource is not cancelled.
   821  	sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
   822  	defer sCancel()
   823  	select {
   824  	case <-sCtx.Done():
   825  	case <-cdsResourceCanceledCh:
   826  		t.Fatal("Watch for cluster resource is cancelled when not expected to")
   827  	}
   828  
   829  	// Start a test service backend.
   830  	server := stubserver.StartTestService(t, nil)
   831  	t.Cleanup(server.Stop)
   832  
   833  	// Configure good cluster and endpoints resources in the management server.
   834  	resources := e2e.UpdateOptions{
   835  		NodeID:         nodeID,
   836  		Clusters:       []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelNone)},
   837  		Endpoints:      []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, "localhost", []uint32{testutils.ParsePort(t, server.Address)})},
   838  		SkipValidation: true,
   839  	}
   840  	if err := mgmtServer.Update(ctx, resources); err != nil {
   841  		t.Fatal(err)
   842  	}
   843  
   844  	// Verify that a successful RPC can be made.
   845  	if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
   846  		t.Fatalf("EmptyCall() failed: %v", err)
   847  	}
   848  
   849  	// Again push a resolver error that is not a resource-not-found error.
   850  	r.ReportError(resolverErr)
   851  
   852  	// And again verify that the watch for the cluster resource is not
   853  	// cancelled.
   854  	sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortTimeout)
   855  	defer sCancel()
   856  	select {
   857  	case <-sCtx.Done():
   858  	case <-cdsResourceCanceledCh:
   859  		t.Fatal("Watch for cluster resource is cancelled when not expected to")
   860  	}
   861  
   862  	// Verify that a successful RPC can be made, using the previously received
   863  	// good configuration.
   864  	if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
   865  		t.Fatalf("EmptyCall() failed: %v", err)
   866  	}
   867  
   868  	// Verify that the resolver error is pushed to the child policy.
   869  	select {
   870  	case err := <-resolverErrCh:
   871  		if err != resolverErr {
   872  			t.Fatalf("Error pushed to child policy is %v, want %v", err, resolverErr)
   873  		}
   874  	case <-ctx.Done():
   875  		t.Fatal("Timeout when waiting for resolver error to be pushed to the child policy")
   876  	}
   877  
   878  	// Push a resource-not-found-error this time around.
   879  	resolverErr = xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "xds resource not found error")
   880  	r.ReportError(resolverErr)
   881  
   882  	// Wait for the CDS resource to be not requested anymore.
   883  	select {
   884  	case <-ctx.Done():
   885  		t.Fatal("Timeout when waiting for CDS resource to be not requested")
   886  	case <-cdsResourceCanceledCh:
   887  	}
   888  
   889  	// Verify that the resolver error is pushed to the child policy.
   890  	select {
   891  	case err := <-resolverErrCh:
   892  		if err != resolverErr {
   893  			t.Fatalf("Error pushed to child policy is %v, want %v", err, resolverErr)
   894  		}
   895  	case <-ctx.Done():
   896  		t.Fatal("Timeout when waiting for resolver error to be pushed to the child policy")
   897  	}
   898  
   899  	testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)
   900  
   901  	// Ensure RPC fails with Unavailable. The actual error message depends on
   902  	// the picker returned from the priority LB policy, and therefore not
   903  	// checking for it here.
   904  	if _, err := client.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Unavailable {
   905  		t.Fatalf("EmptyCall() failed with code: %v, want %v", status.Code(err), codes.Unavailable)
   906  	}
   907  }
   908  
   909  // Tests that closing the cds LB policy results in the cluster resource watch
   910  // being cancelled and the child policy being closed.
   911  func (s) TestClose(t *testing.T) {
   912  	cdsBalancerCh := registerWrappedCDSPolicy(t)
   913  	_, _, _, childPolicyCloseCh := registerWrappedClusterResolverPolicy(t)
   914  	mgmtServer, nodeID, cc, _, _, _, cdsResourceCanceledCh := setupWithManagementServer(t)
   915  
   916  	// Start a test service backend.
   917  	server := stubserver.StartTestService(t, nil)
   918  	t.Cleanup(server.Stop)
   919  
   920  	// Configure cluster and endpoints resources in the management server.
   921  	resources := e2e.UpdateOptions{
   922  		NodeID:         nodeID,
   923  		Clusters:       []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelNone)},
   924  		Endpoints:      []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, "localhost", []uint32{testutils.ParsePort(t, server.Address)})},
   925  		SkipValidation: true,
   926  	}
   927  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   928  	defer cancel()
   929  	if err := mgmtServer.Update(ctx, resources); err != nil {
   930  		t.Fatal(err)
   931  	}
   932  
   933  	// Verify that a successful RPC can be made.
   934  	client := testgrpc.NewTestServiceClient(cc)
   935  	if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
   936  		t.Fatalf("EmptyCall() failed: %v", err)
   937  	}
   938  
   939  	// Retrieve the cds LB policy and close it.
   940  	var cdsBal balancer.Balancer
   941  	select {
   942  	case cdsBal = <-cdsBalancerCh:
   943  	case <-ctx.Done():
   944  		t.Fatal("Timeout when waiting for cds LB policy to be created")
   945  	}
   946  	cdsBal.Close()
   947  
   948  	// Wait for the CDS resource to be not requested anymore.
   949  	select {
   950  	case <-ctx.Done():
   951  		t.Fatal("Timeout when waiting for CDS resource to be not requested")
   952  	case <-cdsResourceCanceledCh:
   953  	}
   954  	// Wait for the child policy to be closed.
   955  	select {
   956  	case <-ctx.Done():
   957  		t.Fatal("Timeout when waiting for the child policy to be closed")
   958  	case <-childPolicyCloseCh:
   959  	}
   960  }
   961  
   962  // Tests that calling ExitIdle on the cds LB policy results in the call being
   963  // propagated to the child policy.
   964  func (s) TestExitIdle(t *testing.T) {
   965  	cdsBalancerCh := registerWrappedCDSPolicy(t)
   966  	_, _, exitIdleCh, _ := registerWrappedClusterResolverPolicy(t)
   967  	mgmtServer, nodeID, cc, _, _, _, _ := setupWithManagementServer(t)
   968  
   969  	// Start a test service backend.
   970  	server := stubserver.StartTestService(t, nil)
   971  	t.Cleanup(server.Stop)
   972  
   973  	// Configure cluster and endpoints resources in the management server.
   974  	resources := e2e.UpdateOptions{
   975  		NodeID:         nodeID,
   976  		Clusters:       []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelNone)},
   977  		Endpoints:      []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, "localhost", []uint32{testutils.ParsePort(t, server.Address)})},
   978  		SkipValidation: true,
   979  	}
   980  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   981  	defer cancel()
   982  	if err := mgmtServer.Update(ctx, resources); err != nil {
   983  		t.Fatal(err)
   984  	}
   985  
   986  	// Verify that a successful RPC can be made.
   987  	client := testgrpc.NewTestServiceClient(cc)
   988  	if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
   989  		t.Fatalf("EmptyCall() failed: %v", err)
   990  	}
   991  
   992  	// Retrieve the cds LB policy and call ExitIdle() on it.
   993  	var cdsBal balancer.Balancer
   994  	select {
   995  	case cdsBal = <-cdsBalancerCh:
   996  	case <-ctx.Done():
   997  		t.Fatal("Timeout when waiting for cds LB policy to be created")
   998  	}
   999  	cdsBal.(balancer.ExitIdler).ExitIdle()
  1000  
  1001  	// Wait for ExitIdle to be called on the child policy.
  1002  	select {
  1003  	case <-ctx.Done():
  1004  		t.Fatal("Timeout when waiting for the child policy to be closed")
  1005  	case <-exitIdleCh:
  1006  	}
  1007  }
  1008  
  1009  // TestParseConfig verifies the ParseConfig() method in the CDS balancer.
  1010  func (s) TestParseConfig(t *testing.T) {
  1011  	bb := balancer.Get(cdsName)
  1012  	if bb == nil {
  1013  		t.Fatalf("balancer.Get(%q) returned nil", cdsName)
  1014  	}
  1015  	parser, ok := bb.(balancer.ConfigParser)
  1016  	if !ok {
  1017  		t.Fatalf("balancer %q does not implement the ConfigParser interface", cdsName)
  1018  	}
  1019  
  1020  	tests := []struct {
  1021  		name    string
  1022  		input   json.RawMessage
  1023  		wantCfg serviceconfig.LoadBalancingConfig
  1024  		wantErr bool
  1025  	}{
  1026  		{
  1027  			name:    "good-config",
  1028  			input:   json.RawMessage(`{"Cluster": "cluster1"}`),
  1029  			wantCfg: &lbConfig{ClusterName: "cluster1"},
  1030  		},
  1031  		{
  1032  			name:    "unknown-fields-in-config",
  1033  			input:   json.RawMessage(`{"Unknown": "foobar"}`),
  1034  			wantCfg: &lbConfig{ClusterName: ""},
  1035  		},
  1036  		{
  1037  			name:    "empty-config",
  1038  			input:   json.RawMessage(""),
  1039  			wantErr: true,
  1040  		},
  1041  		{
  1042  			name:    "bad-config",
  1043  			input:   json.RawMessage(`{"Cluster": 5}`),
  1044  			wantErr: true,
  1045  		},
  1046  	}
  1047  
  1048  	for _, test := range tests {
  1049  		t.Run(test.name, func(t *testing.T) {
  1050  			gotCfg, gotErr := parser.ParseConfig(test.input)
  1051  			if (gotErr != nil) != test.wantErr {
  1052  				t.Fatalf("ParseConfig(%v) = %v, wantErr %v", string(test.input), gotErr, test.wantErr)
  1053  			}
  1054  			if test.wantErr {
  1055  				return
  1056  			}
  1057  			if !cmp.Equal(gotCfg, test.wantCfg) {
  1058  				t.Fatalf("ParseConfig(%v) = %v, want %v", string(test.input), gotCfg, test.wantCfg)
  1059  			}
  1060  		})
  1061  	}
  1062  }
  1063  
  1064  func newUint32(i uint32) *uint32 {
  1065  	return &i
  1066  }
  1067  

View as plain text