...

Source file src/google.golang.org/grpc/xds/internal/balancer/clusterresolver/e2e_test/eds_impl_test.go

Documentation: google.golang.org/grpc/xds/internal/balancer/clusterresolver/e2e_test

     1  /*
     2   * Copyright 2022 gRPC authors.
     3   *
     4   * Licensed under the Apache License, Version 2.0 (the "License");
     5   * you may not use this file except in compliance with the License.
     6   * You may obtain a copy of the License at
     7   *
     8   *     http://www.apache.org/licenses/LICENSE-2.0
     9   *
    10   * Unless required by applicable law or agreed to in writing, software
    11   * distributed under the License is distributed on an "AS IS" BASIS,
    12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13   * See the License for the specific language governing permissions and
    14   * limitations under the License.
    15   */
    16  
    17  package e2e_test
    18  
    19  import (
    20  	"context"
    21  	"errors"
    22  	"fmt"
    23  	"strings"
    24  	"testing"
    25  	"time"
    26  
    27  	"github.com/google/go-cmp/cmp"
    28  	"github.com/google/uuid"
    29  	"google.golang.org/grpc"
    30  	"google.golang.org/grpc/codes"
    31  	"google.golang.org/grpc/credentials/insecure"
    32  	"google.golang.org/grpc/internal"
    33  	"google.golang.org/grpc/internal/grpctest"
    34  	"google.golang.org/grpc/internal/stubserver"
    35  	"google.golang.org/grpc/internal/testutils"
    36  	rrutil "google.golang.org/grpc/internal/testutils/roundrobin"
    37  	"google.golang.org/grpc/internal/testutils/xds/e2e"
    38  	"google.golang.org/grpc/internal/xds/bootstrap"
    39  	"google.golang.org/grpc/peer"
    40  	"google.golang.org/grpc/resolver"
    41  	"google.golang.org/grpc/resolver/manual"
    42  	"google.golang.org/grpc/serviceconfig"
    43  	"google.golang.org/grpc/status"
    44  	xdstestutils "google.golang.org/grpc/xds/internal/testutils"
    45  	"google.golang.org/grpc/xds/internal/xdsclient"
    46  	"google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version"
    47  	"google.golang.org/protobuf/types/known/wrapperspb"
    48  
    49  	v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
    50  	v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
    51  	v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
    52  	v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
    53  	testgrpc "google.golang.org/grpc/interop/grpc_testing"
    54  	testpb "google.golang.org/grpc/interop/grpc_testing"
    55  
    56  	_ "google.golang.org/grpc/xds/internal/balancer/clusterresolver" // Register the "cluster_resolver_experimental" LB policy.
    57  	"google.golang.org/grpc/xds/internal/balancer/priority"
    58  )
    59  
    60  const (
    61  	clusterName    = "cluster-my-service-client-side-xds"
    62  	edsServiceName = "endpoints-my-service-client-side-xds"
    63  	localityName1  = "my-locality-1"
    64  	localityName2  = "my-locality-2"
    65  	localityName3  = "my-locality-3"
    66  
    67  	defaultTestTimeout            = 5 * time.Second
    68  	defaultTestShortTimeout       = 10 * time.Millisecond
    69  	defaultTestWatchExpiryTimeout = 500 * time.Millisecond
    70  )
    71  
    72  type s struct {
    73  	grpctest.Tester
    74  }
    75  
    76  func Test(t *testing.T) {
    77  	grpctest.RunSubTests(t, s{})
    78  }
    79  
    80  // backendAddressesAndPorts extracts the address and port of each of the
    81  // StubServers passed in and returns them. Fails the test if any of the
    82  // StubServers passed have an invalid address.
    83  func backendAddressesAndPorts(t *testing.T, servers []*stubserver.StubServer) ([]resolver.Address, []uint32) {
    84  	addrs := make([]resolver.Address, len(servers))
    85  	ports := make([]uint32, len(servers))
    86  	for i := 0; i < len(servers); i++ {
    87  		addrs[i] = resolver.Address{Addr: servers[i].Address}
    88  		ports[i] = testutils.ParsePort(t, servers[i].Address)
    89  	}
    90  	return addrs, ports
    91  }
    92  
    93  func startTestServiceBackends(t *testing.T, numBackends int) ([]*stubserver.StubServer, func()) {
    94  	var servers []*stubserver.StubServer
    95  	for i := 0; i < numBackends; i++ {
    96  		servers = append(servers, stubserver.StartTestService(t, nil))
    97  		servers[i].StartServer()
    98  	}
    99  
   100  	return servers, func() {
   101  		for _, server := range servers {
   102  			server.Stop()
   103  		}
   104  	}
   105  }
   106  
   107  // clientEndpointsResource returns an EDS resource for the specified nodeID,
   108  // service name and localities.
   109  func clientEndpointsResource(nodeID, edsServiceName string, localities []e2e.LocalityOptions) e2e.UpdateOptions {
   110  	return e2e.UpdateOptions{
   111  		NodeID: nodeID,
   112  		Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.EndpointResourceWithOptions(e2e.EndpointOptions{
   113  			ClusterName: edsServiceName,
   114  			Host:        "localhost",
   115  			Localities:  localities,
   116  		})},
   117  		SkipValidation: true,
   118  	}
   119  }
   120  
   121  // TestEDS_OneLocality tests the cluster_resolver LB policy using an EDS
   122  // resource with one locality. The following scenarios are tested:
   123  //  1. Single backend. Test verifies that RPCs reach this backend.
   124  //  2. Add a backend. Test verifies that RPCs are roundrobined across the two
   125  //     backends.
   126  //  3. Remove one backend. Test verifies that all RPCs reach the other backend.
   127  //  4. Replace the backend. Test verifies that all RPCs reach the new backend.
   128  func (s) TestEDS_OneLocality(t *testing.T) {
   129  	// Spin up a management server to receive xDS resources from.
   130  	managementServer, nodeID, bootstrapContents, _, cleanup1 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
   131  	defer cleanup1()
   132  
   133  	// Start backend servers which provide an implementation of the TestService.
   134  	servers, cleanup2 := startTestServiceBackends(t, 3)
   135  	defer cleanup2()
   136  	addrs, ports := backendAddressesAndPorts(t, servers)
   137  
   138  	// Create xDS resources for consumption by the test. We start off with a
   139  	// single backend in a single EDS locality.
   140  	resources := clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{{
   141  		Name:     localityName1,
   142  		Weight:   1,
   143  		Backends: []e2e.BackendOptions{{Port: ports[0]}},
   144  	}})
   145  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   146  	defer cancel()
   147  	if err := managementServer.Update(ctx, resources); err != nil {
   148  		t.Fatal(err)
   149  	}
   150  
   151  	// Create an xDS client for use by the cluster_resolver LB policy.
   152  	client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
   153  	if err != nil {
   154  		t.Fatalf("Failed to create xDS client: %v", err)
   155  	}
   156  	defer close()
   157  
   158  	// Create a manual resolver and push a service config specifying the use of
   159  	// the cluster_resolver LB policy with a single discovery mechanism.
   160  	r := manual.NewBuilderWithScheme("whatever")
   161  	jsonSC := fmt.Sprintf(`{
   162  			"loadBalancingConfig":[{
   163  				"cluster_resolver_experimental":{
   164  					"discoveryMechanisms": [{
   165  						"cluster": "%s",
   166  						"type": "EDS",
   167  						"edsServiceName": "%s",
   168  						"outlierDetection": {}
   169  					}],
   170  					"xdsLbPolicy":[{"round_robin":{}}]
   171  				}
   172  			}]
   173  		}`, clusterName, edsServiceName)
   174  	scpr := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC)
   175  	r.InitialState(xdsclient.SetClient(resolver.State{ServiceConfig: scpr}, client))
   176  
   177  	// Create a ClientConn and make a successful RPC.
   178  	cc, err := grpc.Dial(r.Scheme()+":///test.service", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
   179  	if err != nil {
   180  		t.Fatalf("failed to dial local test server: %v", err)
   181  	}
   182  	defer cc.Close()
   183  
   184  	// Ensure RPCs are being roundrobined across the single backend.
   185  	testClient := testgrpc.NewTestServiceClient(cc)
   186  	if err := rrutil.CheckRoundRobinRPCs(ctx, testClient, addrs[:1]); err != nil {
   187  		t.Fatal(err)
   188  	}
   189  
   190  	// Add a backend to the same locality, and ensure RPCs are sent in a
   191  	// roundrobin fashion across the two backends.
   192  	resources = clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{{
   193  		Name:     localityName1,
   194  		Weight:   1,
   195  		Backends: []e2e.BackendOptions{{Port: ports[0]}, {Port: ports[1]}},
   196  	}})
   197  	if err := managementServer.Update(ctx, resources); err != nil {
   198  		t.Fatal(err)
   199  	}
   200  	if err := rrutil.CheckRoundRobinRPCs(ctx, testClient, addrs[:2]); err != nil {
   201  		t.Fatal(err)
   202  	}
   203  
   204  	// Remove the first backend, and ensure all RPCs are sent to the second
   205  	// backend.
   206  	resources = clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{{
   207  		Name:     localityName1,
   208  		Weight:   1,
   209  		Backends: []e2e.BackendOptions{{Port: ports[1]}},
   210  	}})
   211  	if err := managementServer.Update(ctx, resources); err != nil {
   212  		t.Fatal(err)
   213  	}
   214  	if err := rrutil.CheckRoundRobinRPCs(ctx, testClient, addrs[1:2]); err != nil {
   215  		t.Fatal(err)
   216  	}
   217  
   218  	// Replace the backend, and ensure all RPCs are sent to the new backend.
   219  	resources = clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{{
   220  		Name:     localityName1,
   221  		Weight:   1,
   222  		Backends: []e2e.BackendOptions{{Port: ports[2]}},
   223  	}})
   224  	if err := managementServer.Update(ctx, resources); err != nil {
   225  		t.Fatal(err)
   226  	}
   227  	if err := rrutil.CheckRoundRobinRPCs(ctx, testClient, addrs[2:3]); err != nil {
   228  		t.Fatal(err)
   229  	}
   230  }
   231  
   232  // TestEDS_MultipleLocalities tests the cluster_resolver LB policy using an EDS
   233  // resource with multiple localities. The following scenarios are tested:
   234  //  1. Two localities, each with a single backend. Test verifies that RPCs are
   235  //     weighted roundrobined across these two backends.
   236  //  2. Add another locality, with a single backend. Test verifies that RPCs are
   237  //     weighted roundrobined across all the backends.
   238  //  3. Remove one locality. Test verifies that RPCs are weighted roundrobined
   239  //     across backends from the remaining localities.
   240  //  4. Add a backend to one locality. Test verifies that RPCs are weighted
   241  //     roundrobined across localities.
   242  //  5. Change the weight of one of the localities. Test verifies that RPCs are
   243  //     weighted roundrobined across the localities.
   244  //
   245  // In our LB policy tree, one of the descendents of the "cluster_resolver" LB
   246  // policy is the "weighted_target" LB policy which performs weighted roundrobin
   247  // across localities (and this has a randomness component associated with it).
   248  // Therefore, the moment we have backends from more than one locality, RPCs are
   249  // weighted roundrobined across them.
   250  func (s) TestEDS_MultipleLocalities(t *testing.T) {
   251  	// Spin up a management server to receive xDS resources from.
   252  	managementServer, nodeID, bootstrapContents, _, cleanup1 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
   253  	defer cleanup1()
   254  
   255  	// Start backend servers which provide an implementation of the TestService.
   256  	servers, cleanup2 := startTestServiceBackends(t, 4)
   257  	defer cleanup2()
   258  	addrs, ports := backendAddressesAndPorts(t, servers)
   259  
   260  	// Create xDS resources for consumption by the test. We start off with two
   261  	// localities, and single backend in each of them.
   262  	resources := clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{
   263  		{
   264  			Name:     localityName1,
   265  			Weight:   1,
   266  			Backends: []e2e.BackendOptions{{Port: ports[0]}},
   267  		},
   268  		{
   269  			Name:     localityName2,
   270  			Weight:   1,
   271  			Backends: []e2e.BackendOptions{{Port: ports[1]}},
   272  		},
   273  	})
   274  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   275  	defer cancel()
   276  	if err := managementServer.Update(ctx, resources); err != nil {
   277  		t.Fatal(err)
   278  	}
   279  
   280  	// Create an xDS client for use by the cluster_resolver LB policy.
   281  	client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
   282  	if err != nil {
   283  		t.Fatalf("Failed to create xDS client: %v", err)
   284  	}
   285  	defer close()
   286  
   287  	// Create a manual resolver and push service config specifying the use of
   288  	// the cluster_resolver LB policy with a single discovery mechanism.
   289  	r := manual.NewBuilderWithScheme("whatever")
   290  	jsonSC := fmt.Sprintf(`{
   291  			"loadBalancingConfig":[{
   292  				"cluster_resolver_experimental":{
   293  					"discoveryMechanisms": [{
   294  						"cluster": "%s",
   295  						"type": "EDS",
   296  						"edsServiceName": "%s",
   297  						"outlierDetection": {}
   298  					}],
   299  					"xdsLbPolicy":[{"round_robin":{}}]
   300  				}
   301  			}]
   302  		}`, clusterName, edsServiceName)
   303  	scpr := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC)
   304  	r.InitialState(xdsclient.SetClient(resolver.State{ServiceConfig: scpr}, client))
   305  
   306  	// Create a ClientConn and make a successful RPC.
   307  	cc, err := grpc.Dial(r.Scheme()+":///test.service", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
   308  	if err != nil {
   309  		t.Fatalf("failed to dial local test server: %v", err)
   310  	}
   311  	defer cc.Close()
   312  
   313  	// Ensure RPCs are being weighted roundrobined across the two backends.
   314  	testClient := testgrpc.NewTestServiceClient(cc)
   315  	if err := rrutil.CheckWeightedRoundRobinRPCs(ctx, testClient, addrs[0:2]); err != nil {
   316  		t.Fatal(err)
   317  	}
   318  
   319  	// Add another locality with a single backend, and ensure RPCs are being
   320  	// weighted roundrobined across the three backends.
   321  	resources = clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{
   322  		{
   323  			Name:     localityName1,
   324  			Weight:   1,
   325  			Backends: []e2e.BackendOptions{{Port: ports[0]}},
   326  		},
   327  		{
   328  			Name:     localityName2,
   329  			Weight:   1,
   330  			Backends: []e2e.BackendOptions{{Port: ports[1]}},
   331  		},
   332  		{
   333  			Name:     localityName3,
   334  			Weight:   1,
   335  			Backends: []e2e.BackendOptions{{Port: ports[2]}},
   336  		},
   337  	})
   338  	if err := managementServer.Update(ctx, resources); err != nil {
   339  		t.Fatal(err)
   340  	}
   341  	if err := rrutil.CheckWeightedRoundRobinRPCs(ctx, testClient, addrs[0:3]); err != nil {
   342  		t.Fatal(err)
   343  	}
   344  
   345  	// Remove the first locality, and ensure RPCs are being weighted
   346  	// roundrobined across the remaining two backends.
   347  	resources = clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{
   348  		{
   349  			Name:     localityName2,
   350  			Weight:   1,
   351  			Backends: []e2e.BackendOptions{{Port: ports[1]}},
   352  		},
   353  		{
   354  			Name:     localityName3,
   355  			Weight:   1,
   356  			Backends: []e2e.BackendOptions{{Port: ports[2]}},
   357  		},
   358  	})
   359  	if err := managementServer.Update(ctx, resources); err != nil {
   360  		t.Fatal(err)
   361  	}
   362  	if err := rrutil.CheckWeightedRoundRobinRPCs(ctx, testClient, addrs[1:3]); err != nil {
   363  		t.Fatal(err)
   364  	}
   365  
   366  	// Add a backend to one locality, and ensure weighted roundrobin. Since RPCs
   367  	// are roundrobined across localities, locality2's backend will receive
   368  	// twice the traffic.
   369  	resources = clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{
   370  		{
   371  			Name:     localityName2,
   372  			Weight:   1,
   373  			Backends: []e2e.BackendOptions{{Port: ports[1]}},
   374  		},
   375  		{
   376  			Name:     localityName3,
   377  			Weight:   1,
   378  			Backends: []e2e.BackendOptions{{Port: ports[2]}, {Port: ports[3]}},
   379  		},
   380  	})
   381  	if err := managementServer.Update(ctx, resources); err != nil {
   382  		t.Fatal(err)
   383  	}
   384  	wantAddrs := []resolver.Address{addrs[1], addrs[1], addrs[2], addrs[3]}
   385  	if err := rrutil.CheckWeightedRoundRobinRPCs(ctx, testClient, wantAddrs); err != nil {
   386  		t.Fatal(err)
   387  	}
   388  }
   389  
   390  // TestEDS_EndpointsHealth tests the cluster_resolver LB policy using an EDS
   391  // resource which specifies endpoint health information and verifies that
   392  // traffic is routed only to backends deemed capable of receiving traffic.
   393  func (s) TestEDS_EndpointsHealth(t *testing.T) {
   394  	// Spin up a management server to receive xDS resources from.
   395  	managementServer, nodeID, bootstrapContents, _, cleanup1 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
   396  	defer cleanup1()
   397  
   398  	// Start backend servers which provide an implementation of the TestService.
   399  	servers, cleanup2 := startTestServiceBackends(t, 12)
   400  	defer cleanup2()
   401  	addrs, ports := backendAddressesAndPorts(t, servers)
   402  
   403  	// Create xDS resources for consumption by the test.  Two localities with
   404  	// six backends each, with two of the six backends being healthy. Both
   405  	// UNKNOWN and HEALTHY are considered by gRPC for load balancing.
   406  	resources := clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{
   407  		{
   408  			Name:   localityName1,
   409  			Weight: 1,
   410  			Backends: []e2e.BackendOptions{
   411  				{Port: ports[0], HealthStatus: v3corepb.HealthStatus_UNKNOWN},
   412  				{Port: ports[1], HealthStatus: v3corepb.HealthStatus_HEALTHY},
   413  				{Port: ports[2], HealthStatus: v3corepb.HealthStatus_UNHEALTHY},
   414  				{Port: ports[3], HealthStatus: v3corepb.HealthStatus_DRAINING},
   415  				{Port: ports[4], HealthStatus: v3corepb.HealthStatus_TIMEOUT},
   416  				{Port: ports[5], HealthStatus: v3corepb.HealthStatus_DEGRADED},
   417  			},
   418  		},
   419  		{
   420  			Name:   localityName2,
   421  			Weight: 1,
   422  			Backends: []e2e.BackendOptions{
   423  				{Port: ports[6], HealthStatus: v3corepb.HealthStatus_UNKNOWN},
   424  				{Port: ports[7], HealthStatus: v3corepb.HealthStatus_HEALTHY},
   425  				{Port: ports[8], HealthStatus: v3corepb.HealthStatus_UNHEALTHY},
   426  				{Port: ports[9], HealthStatus: v3corepb.HealthStatus_DRAINING},
   427  				{Port: ports[10], HealthStatus: v3corepb.HealthStatus_TIMEOUT},
   428  				{Port: ports[11], HealthStatus: v3corepb.HealthStatus_DEGRADED},
   429  			},
   430  		},
   431  	})
   432  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   433  	defer cancel()
   434  	if err := managementServer.Update(ctx, resources); err != nil {
   435  		t.Fatal(err)
   436  	}
   437  
   438  	// Create an xDS client for use by the cluster_resolver LB policy.
   439  	client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
   440  	if err != nil {
   441  		t.Fatalf("Failed to create xDS client: %v", err)
   442  	}
   443  	defer close()
   444  
   445  	// Create a manual resolver and push service config specifying the use of
   446  	// the cluster_resolver LB policy with a single discovery mechanism.
   447  	r := manual.NewBuilderWithScheme("whatever")
   448  	jsonSC := fmt.Sprintf(`{
   449  			"loadBalancingConfig":[{
   450  				"cluster_resolver_experimental":{
   451  					"discoveryMechanisms": [{
   452  						"cluster": "%s",
   453  						"type": "EDS",
   454  						"edsServiceName": "%s",
   455  						"outlierDetection": {}
   456  					}],
   457  					"xdsLbPolicy":[{"round_robin":{}}]
   458  				}
   459  			}]
   460  		}`, clusterName, edsServiceName)
   461  	scpr := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC)
   462  	r.InitialState(xdsclient.SetClient(resolver.State{ServiceConfig: scpr}, client))
   463  
   464  	// Create a ClientConn and make a successful RPC.
   465  	cc, err := grpc.Dial(r.Scheme()+":///test.service", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
   466  	if err != nil {
   467  		t.Fatalf("failed to dial local test server: %v", err)
   468  	}
   469  	defer cc.Close()
   470  
   471  	// Ensure RPCs are being weighted roundrobined across healthy backends from
   472  	// both localities.
   473  	testClient := testgrpc.NewTestServiceClient(cc)
   474  	if err := rrutil.CheckWeightedRoundRobinRPCs(ctx, testClient, append(addrs[0:2], addrs[6:8]...)); err != nil {
   475  		t.Fatal(err)
   476  	}
   477  }
   478  
   479  // TestEDS_EmptyUpdate tests the cluster_resolver LB policy using an EDS
   480  // resource with no localities and verifies that RPCs fail with "all priorities
   481  // removed" error.
   482  func (s) TestEDS_EmptyUpdate(t *testing.T) {
   483  	// Spin up a management server to receive xDS resources from.
   484  	managementServer, nodeID, bootstrapContents, _, cleanup1 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
   485  	defer cleanup1()
   486  
   487  	// Start backend servers which provide an implementation of the TestService.
   488  	servers, cleanup2 := startTestServiceBackends(t, 4)
   489  	defer cleanup2()
   490  	addrs, ports := backendAddressesAndPorts(t, servers)
   491  
   492  	oldCacheTimeout := priority.DefaultSubBalancerCloseTimeout
   493  	priority.DefaultSubBalancerCloseTimeout = 100 * time.Microsecond
   494  	defer func() { priority.DefaultSubBalancerCloseTimeout = oldCacheTimeout }()
   495  
   496  	// Create xDS resources for consumption by the test. The first update is an
   497  	// empty update. This should put the channel in TRANSIENT_FAILURE.
   498  	resources := clientEndpointsResource(nodeID, edsServiceName, nil)
   499  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   500  	defer cancel()
   501  	if err := managementServer.Update(ctx, resources); err != nil {
   502  		t.Fatal(err)
   503  	}
   504  
   505  	// Create an xDS client for use by the cluster_resolver LB policy.
   506  	client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
   507  	if err != nil {
   508  		t.Fatalf("Failed to create xDS client: %v", err)
   509  	}
   510  	defer close()
   511  
   512  	// Create a manual resolver and push service config specifying the use of
   513  	// the cluster_resolver LB policy with a single discovery mechanism.
   514  	r := manual.NewBuilderWithScheme("whatever")
   515  	jsonSC := fmt.Sprintf(`{
   516  			"loadBalancingConfig":[{
   517  				"cluster_resolver_experimental":{
   518  					"discoveryMechanisms": [{
   519  						"cluster": "%s",
   520  						"type": "EDS",
   521  						"edsServiceName": "%s",
   522  						"outlierDetection": {}
   523  					}],
   524  					"xdsLbPolicy":[{"round_robin":{}}]
   525  				}
   526  			}]
   527  		}`, clusterName, edsServiceName)
   528  	scpr := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC)
   529  	r.InitialState(xdsclient.SetClient(resolver.State{ServiceConfig: scpr}, client))
   530  
   531  	// Create a ClientConn and ensure that RPCs fail with "all priorities
   532  	// removed" error. This is the expected error when the cluster_resolver LB
   533  	// policy receives an EDS update with no localities.
   534  	cc, err := grpc.Dial(r.Scheme()+":///test.service", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
   535  	if err != nil {
   536  		t.Fatalf("failed to dial local test server: %v", err)
   537  	}
   538  	defer cc.Close()
   539  	testClient := testgrpc.NewTestServiceClient(cc)
   540  	if err := waitForProducedZeroAddressesError(ctx, t, testClient); err != nil {
   541  		t.Fatal(err)
   542  	}
   543  
   544  	// Add a locality with one backend and ensure RPCs are successful.
   545  	resources = clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{{
   546  		Name:     localityName1,
   547  		Weight:   1,
   548  		Backends: []e2e.BackendOptions{{Port: ports[0]}},
   549  	}})
   550  	if err := managementServer.Update(ctx, resources); err != nil {
   551  		t.Fatal(err)
   552  	}
   553  	if err := rrutil.CheckRoundRobinRPCs(ctx, testClient, addrs[:1]); err != nil {
   554  		t.Fatal(err)
   555  	}
   556  
   557  	// Push another empty update and ensure that RPCs fail with "all priorities
   558  	// removed" error again.
   559  	resources = clientEndpointsResource(nodeID, edsServiceName, nil)
   560  	if err := managementServer.Update(ctx, resources); err != nil {
   561  		t.Fatal(err)
   562  	}
   563  	if err := waitForProducedZeroAddressesError(ctx, t, testClient); err != nil {
   564  		t.Fatal(err)
   565  	}
   566  }
   567  
   568  // TestEDS_ResourceRemoved tests the case where the EDS resource requested by
   569  // the clusterresolver LB policy is removed from the management server. The test
   570  // verifies that the EDS watch is not canceled and that RPCs continue to succeed
   571  // with the previously received configuration.
   572  func (s) TestEDS_ResourceRemoved(t *testing.T) {
   573  	// Start an xDS management server that uses a couple of channels to
   574  	// notify the test about the following events:
   575  	// - an EDS requested with the expected resource name is requested
   576  	// - EDS resource is unrequested, i.e, an EDS request with no resource name
   577  	//   is received, which indicates that we are not longer interested in that
   578  	//   resource.
   579  	edsResourceRequestedCh := make(chan struct{}, 1)
   580  	edsResourceCanceledCh := make(chan struct{}, 1)
   581  	managementServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{
   582  		OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error {
   583  			if req.GetTypeUrl() == version.V3EndpointsURL {
   584  				switch len(req.GetResourceNames()) {
   585  				case 0:
   586  					select {
   587  					case edsResourceCanceledCh <- struct{}{}:
   588  					default:
   589  					}
   590  				case 1:
   591  					if req.GetResourceNames()[0] == edsServiceName {
   592  						select {
   593  						case edsResourceRequestedCh <- struct{}{}:
   594  						default:
   595  						}
   596  					}
   597  				default:
   598  					t.Errorf("Unexpected number of resources, %d, in an EDS request", len(req.GetResourceNames()))
   599  				}
   600  			}
   601  			return nil
   602  		},
   603  	})
   604  	defer cleanup()
   605  
   606  	server := stubserver.StartTestService(t, nil)
   607  	defer server.Stop()
   608  
   609  	// Configure cluster and endpoints resources in the management server.
   610  	resources := e2e.UpdateOptions{
   611  		NodeID:         nodeID,
   612  		Clusters:       []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, edsServiceName, e2e.SecurityLevelNone)},
   613  		Endpoints:      []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(edsServiceName, "localhost", []uint32{testutils.ParsePort(t, server.Address)})},
   614  		SkipValidation: true,
   615  	}
   616  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   617  	defer cancel()
   618  	if err := managementServer.Update(ctx, resources); err != nil {
   619  		t.Fatal(err)
   620  	}
   621  
   622  	// Create xDS client, configure cds_experimental LB policy with a manual
   623  	// resolver, and dial the test backends.
   624  	cc, cleanup := setupAndDial(t, bootstrapContents)
   625  	defer cleanup()
   626  
   627  	client := testgrpc.NewTestServiceClient(cc)
   628  	if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
   629  		t.Fatalf("EmptyCall() failed: %v", err)
   630  	}
   631  
   632  	// Delete the endpoints resource from the management server.
   633  	resources.Endpoints = nil
   634  	if err := managementServer.Update(ctx, resources); err != nil {
   635  		t.Fatal(err)
   636  	}
   637  
   638  	// Ensure that RPCs continue to succeed for the next second, and that the
   639  	// EDS watch is not canceled.
   640  	for end := time.Now().Add(time.Second); time.Now().Before(end); <-time.After(defaultTestShortTimeout) {
   641  		if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
   642  			t.Fatalf("EmptyCall() failed: %v", err)
   643  		}
   644  		select {
   645  		case <-edsResourceCanceledCh:
   646  			t.Fatal("EDS watch canceled when not expected to be canceled")
   647  		default:
   648  		}
   649  	}
   650  }
   651  
   652  // TestEDS_ClusterResourceDoesNotContainEDSServiceName tests the case where the
   653  // Cluster resource sent by the management server does not contain an EDS
   654  // service name. The test verifies that the cluster_resolver LB policy uses the
   655  // cluster name for the EDS resource.
   656  func (s) TestEDS_ClusterResourceDoesNotContainEDSServiceName(t *testing.T) {
   657  	edsResourceCh := make(chan string, 1)
   658  	managementServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{
   659  		OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error {
   660  			if req.GetTypeUrl() != version.V3EndpointsURL {
   661  				return nil
   662  			}
   663  			if len(req.GetResourceNames()) > 0 {
   664  				select {
   665  				case edsResourceCh <- req.GetResourceNames()[0]:
   666  				default:
   667  				}
   668  			}
   669  			return nil
   670  		},
   671  	})
   672  	defer cleanup()
   673  
   674  	server := stubserver.StartTestService(t, nil)
   675  	defer server.Stop()
   676  
   677  	// Configure cluster and endpoints resources with the same name in the management server. The cluster resource does not specify an EDS service name.
   678  	resources := e2e.UpdateOptions{
   679  		NodeID:         nodeID,
   680  		Clusters:       []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, "", e2e.SecurityLevelNone)},
   681  		Endpoints:      []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(clusterName, "localhost", []uint32{testutils.ParsePort(t, server.Address)})},
   682  		SkipValidation: true,
   683  	}
   684  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   685  	defer cancel()
   686  	if err := managementServer.Update(ctx, resources); err != nil {
   687  		t.Fatal(err)
   688  	}
   689  
   690  	// Create xDS client, configure cds_experimental LB policy with a manual
   691  	// resolver, and dial the test backends.
   692  	cc, cleanup := setupAndDial(t, bootstrapContents)
   693  	defer cleanup()
   694  
   695  	client := testgrpc.NewTestServiceClient(cc)
   696  	if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
   697  		t.Fatalf("EmptyCall() failed: %v", err)
   698  	}
   699  
   700  	select {
   701  	case <-ctx.Done():
   702  		t.Fatal("Timeout when waiting for EDS request to be received on the management server")
   703  	case name := <-edsResourceCh:
   704  		if name != clusterName {
   705  			t.Fatalf("Received EDS request with resource name %q, want %q", name, clusterName)
   706  		}
   707  	}
   708  }
   709  
   710  // TestEDS_ClusterResourceUpdates verifies different scenarios with regards to
   711  // cluster resource updates.
   712  //
   713  //   - The first cluster resource contains an eds_service_name. The test verifies
   714  //     that an EDS request is sent for the received eds_service_name. It also
   715  //     verifies that a subsequent RPC gets routed to a backend belonging to that
   716  //     service name.
   717  //   - The next cluster resource update contains no eds_service_name. The test
   718  //     verifies that a subsequent EDS request is sent for the cluster_name and
   719  //     that the previously received eds_service_name is no longer requested. It
   720  //     also verifies that a subsequent RPC gets routed to a backend belonging to
   721  //     the service represented by the cluster_name.
   722  //   - The next cluster resource update changes the circuit breaking
   723  //     configuration, but does not change the service name. The test verifies
   724  //     that a subsequent RPC gets routed to the same backend as before.
   725  func (s) TestEDS_ClusterResourceUpdates(t *testing.T) {
   726  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   727  	defer cancel()
   728  
   729  	// Start an xDS management server that pushes the EDS resource names onto a
   730  	// channel.
   731  	edsResourceNameCh := make(chan []string, 1)
   732  	managementServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{
   733  		OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error {
   734  			if req.GetTypeUrl() != version.V3EndpointsURL {
   735  				return nil
   736  			}
   737  			if len(req.GetResourceNames()) == 0 {
   738  				// This is the case for ACKs. Do nothing here.
   739  				return nil
   740  			}
   741  			select {
   742  			case <-ctx.Done():
   743  			case edsResourceNameCh <- req.GetResourceNames():
   744  			}
   745  			return nil
   746  		},
   747  		AllowResourceSubset: true,
   748  	})
   749  	defer cleanup()
   750  
   751  	// Start two test backends and extract their host and port. The first
   752  	// backend is used for the EDS resource identified by the eds_service_name,
   753  	// and the second backend is used for the EDS resource identified by the
   754  	// cluster_name.
   755  	servers, cleanup2 := startTestServiceBackends(t, 2)
   756  	defer cleanup2()
   757  	addrs, ports := backendAddressesAndPorts(t, servers)
   758  
   759  	// Configure cluster and endpoints resources in the management server.
   760  	resources := e2e.UpdateOptions{
   761  		NodeID:   nodeID,
   762  		Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, edsServiceName, e2e.SecurityLevelNone)},
   763  		Endpoints: []*v3endpointpb.ClusterLoadAssignment{
   764  			e2e.DefaultEndpoint(edsServiceName, "localhost", []uint32{uint32(ports[0])}),
   765  			e2e.DefaultEndpoint(clusterName, "localhost", []uint32{uint32(ports[1])}),
   766  		},
   767  		SkipValidation: true,
   768  	}
   769  	if err := managementServer.Update(ctx, resources); err != nil {
   770  		t.Fatal(err)
   771  	}
   772  
   773  	// Create xDS client, configure cds_experimental LB policy with a manual
   774  	// resolver, and dial the test backends.
   775  	cc, cleanup := setupAndDial(t, bootstrapContents)
   776  	defer cleanup()
   777  
   778  	client := testgrpc.NewTestServiceClient(cc)
   779  	peer := &peer.Peer{}
   780  	if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer)); err != nil {
   781  		t.Fatalf("EmptyCall() failed: %v", err)
   782  	}
   783  	if peer.Addr.String() != addrs[0].Addr {
   784  		t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, addrs[0].Addr)
   785  	}
   786  
   787  	// Ensure EDS watch is registered for eds_service_name.
   788  	select {
   789  	case <-ctx.Done():
   790  		t.Fatal("Timeout when waiting for EDS request to be received on the management server")
   791  	case names := <-edsResourceNameCh:
   792  		if !cmp.Equal(names, []string{edsServiceName}) {
   793  			t.Fatalf("Received EDS request with resource names %v, want %v", names, []string{edsServiceName})
   794  		}
   795  	}
   796  
   797  	// Change the cluster resource to not contain an eds_service_name.
   798  	resources.Clusters = []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, "", e2e.SecurityLevelNone)}
   799  	if err := managementServer.Update(ctx, resources); err != nil {
   800  		t.Fatal(err)
   801  	}
   802  
   803  	// Ensure that an EDS watch for eds_service_name is canceled and new watch
   804  	// for cluster_name is registered. The actual order in which this happens is
   805  	// not deterministic, i.e the watch for old resource could be canceled
   806  	// before the new one is registered or vice-versa. In either case,
   807  	// eventually, we want to see a request to the management server for just
   808  	// the cluster_name.
   809  	for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) {
   810  		names := <-edsResourceNameCh
   811  		if cmp.Equal(names, []string{clusterName}) {
   812  			break
   813  		}
   814  	}
   815  	if ctx.Err() != nil {
   816  		t.Fatalf("Timeout when waiting for old EDS watch %q to be canceled and new one %q to be registered", edsServiceName, clusterName)
   817  	}
   818  
   819  	// Make a RPC, and ensure that it gets routed to second backend,
   820  	// corresponding to the cluster_name.
   821  	for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) {
   822  		if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer)); err != nil {
   823  			continue
   824  		}
   825  		if peer.Addr.String() == addrs[1].Addr {
   826  			break
   827  		}
   828  	}
   829  	if ctx.Err() != nil {
   830  		t.Fatalf("Timeout when waiting for EmptyCall() to be routed to correct backend %q", addrs[1].Addr)
   831  	}
   832  
   833  	// Change cluster resource circuit breaking count.
   834  	resources.Clusters[0].CircuitBreakers = &v3clusterpb.CircuitBreakers{
   835  		Thresholds: []*v3clusterpb.CircuitBreakers_Thresholds{
   836  			{
   837  				Priority:    v3corepb.RoutingPriority_DEFAULT,
   838  				MaxRequests: wrapperspb.UInt32(512),
   839  			},
   840  		},
   841  	}
   842  	if err := managementServer.Update(ctx, resources); err != nil {
   843  		t.Fatal(err)
   844  	}
   845  
   846  	// Ensure that RPCs continue to get routed to the second backend for the
   847  	// next second.
   848  	for end := time.Now().Add(time.Second); time.Now().Before(end); <-time.After(defaultTestShortTimeout) {
   849  		if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer)); err != nil {
   850  			t.Fatalf("EmptyCall() failed: %v", err)
   851  		}
   852  		if peer.Addr.String() != addrs[1].Addr {
   853  			t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, addrs[1].Addr)
   854  		}
   855  	}
   856  }
   857  
   858  // TestEDS_BadUpdateWithoutPreviousGoodUpdate tests the case where the
   859  // management server sends a bad update (one that is NACKed by the xDS client).
   860  // Since the cluster_resolver LB policy does not have a previously received good
   861  // update, it is expected to treat this bad update as though it received an
   862  // update with no endpoints. Hence RPCs are expected to fail with "all
   863  // priorities removed" error.
   864  func (s) TestEDS_BadUpdateWithoutPreviousGoodUpdate(t *testing.T) {
   865  	// Spin up a management server to receive xDS resources from.
   866  	mgmtServer, nodeID, bootstrapContents, _, cleanup1 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
   867  	defer cleanup1()
   868  
   869  	// Start a backend server that implements the TestService.
   870  	server := stubserver.StartTestService(t, nil)
   871  	defer server.Stop()
   872  
   873  	// Create an EDS resource with a load balancing weight of 0. This will
   874  	// result in the resource being NACKed by the xDS client. Since the
   875  	// cluster_resolver LB policy does not have a previously received good EDS
   876  	// update, it should treat this update as an empty EDS update.
   877  	resources := clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{{
   878  		Name:     localityName1,
   879  		Weight:   1,
   880  		Backends: []e2e.BackendOptions{{Port: testutils.ParsePort(t, server.Address)}},
   881  	}})
   882  	resources.Endpoints[0].Endpoints[0].LbEndpoints[0].LoadBalancingWeight = &wrapperspb.UInt32Value{Value: 0}
   883  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   884  	defer cancel()
   885  	if err := mgmtServer.Update(ctx, resources); err != nil {
   886  		t.Fatal(err)
   887  	}
   888  
   889  	// Create an xDS client for use by the cluster_resolver LB policy.
   890  	xdsClient, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
   891  	if err != nil {
   892  		t.Fatalf("Failed to create xDS client: %v", err)
   893  	}
   894  	defer close()
   895  
   896  	// Create a manual resolver and push a service config specifying the use of
   897  	// the cluster_resolver LB policy with a single discovery mechanism.
   898  	r := manual.NewBuilderWithScheme("whatever")
   899  	jsonSC := fmt.Sprintf(`{
   900  			"loadBalancingConfig":[{
   901  				"cluster_resolver_experimental":{
   902  					"discoveryMechanisms": [{
   903  						"cluster": "%s",
   904  						"type": "EDS",
   905  						"edsServiceName": "%s",
   906  						"outlierDetection": {}
   907  					}],
   908  					"xdsLbPolicy":[{"round_robin":{}}]
   909  				}
   910  			}]
   911  		}`, clusterName, edsServiceName)
   912  	scpr := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC)
   913  	r.InitialState(xdsclient.SetClient(resolver.State{ServiceConfig: scpr}, xdsClient))
   914  
   915  	// Create a ClientConn and verify that RPCs fail with "all priorities
   916  	// removed" error.
   917  	cc, err := grpc.Dial(r.Scheme()+":///test.service", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
   918  	if err != nil {
   919  		t.Fatalf("failed to dial local test server: %v", err)
   920  	}
   921  	defer cc.Close()
   922  	client := testgrpc.NewTestServiceClient(cc)
   923  	if err := waitForProducedZeroAddressesError(ctx, t, client); err != nil {
   924  		t.Fatal(err)
   925  	}
   926  }
   927  
   928  // TestEDS_BadUpdateWithPreviousGoodUpdate tests the case where the
   929  // cluster_resolver LB policy receives a good EDS update from the management
   930  // server and the test verifies that RPCs are successful. Then, a bad update is
   931  // received from the management server (one that is NACKed by the xDS client).
   932  // The test verifies that the previously received good update is still being
   933  // used and that RPCs are still successful.
   934  func (s) TestEDS_BadUpdateWithPreviousGoodUpdate(t *testing.T) {
   935  	// Spin up a management server to receive xDS resources from.
   936  	mgmtServer, nodeID, bootstrapContents, _, cleanup1 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
   937  	defer cleanup1()
   938  
   939  	// Start a backend server that implements the TestService.
   940  	server := stubserver.StartTestService(t, nil)
   941  	defer server.Stop()
   942  
   943  	// Create an EDS resource for consumption by the test.
   944  	resources := clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{{
   945  		Name:     localityName1,
   946  		Weight:   1,
   947  		Backends: []e2e.BackendOptions{{Port: testutils.ParsePort(t, server.Address)}},
   948  	}})
   949  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   950  	defer cancel()
   951  	if err := mgmtServer.Update(ctx, resources); err != nil {
   952  		t.Fatal(err)
   953  	}
   954  
   955  	// Create an xDS client for use by the cluster_resolver LB policy.
   956  	xdsClient, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
   957  	if err != nil {
   958  		t.Fatalf("Failed to create xDS client: %v", err)
   959  	}
   960  	defer close()
   961  
   962  	// Create a manual resolver and push a service config specifying the use of
   963  	// the cluster_resolver LB policy with a single discovery mechanism.
   964  	r := manual.NewBuilderWithScheme("whatever")
   965  	jsonSC := fmt.Sprintf(`{
   966  			"loadBalancingConfig":[{
   967  				"cluster_resolver_experimental":{
   968  					"discoveryMechanisms": [{
   969  						"cluster": "%s",
   970  						"type": "EDS",
   971  						"edsServiceName": "%s",
   972  						"outlierDetection": {}
   973  					}],
   974  					"xdsLbPolicy":[{"round_robin":{}}]
   975  				}
   976  			}]
   977  		}`, clusterName, edsServiceName)
   978  	scpr := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC)
   979  	r.InitialState(xdsclient.SetClient(resolver.State{ServiceConfig: scpr}, xdsClient))
   980  
   981  	// Create a ClientConn and make a successful RPC.
   982  	cc, err := grpc.Dial(r.Scheme()+":///test.service", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
   983  	if err != nil {
   984  		t.Fatalf("failed to dial local test server: %v", err)
   985  	}
   986  	defer cc.Close()
   987  
   988  	// Ensure RPCs are being roundrobined across the single backend.
   989  	client := testgrpc.NewTestServiceClient(cc)
   990  	if err := rrutil.CheckRoundRobinRPCs(ctx, client, []resolver.Address{{Addr: server.Address}}); err != nil {
   991  		t.Fatal(err)
   992  	}
   993  
   994  	// Update the endpoints resource in the management server with a load
   995  	// balancing weight of 0. This will result in the resource being NACKed by
   996  	// the xDS client. But since the cluster_resolver LB policy has a previously
   997  	// received good EDS update, it should continue using it.
   998  	resources.Endpoints[0].Endpoints[0].LbEndpoints[0].LoadBalancingWeight = &wrapperspb.UInt32Value{Value: 0}
   999  	if err := mgmtServer.Update(ctx, resources); err != nil {
  1000  		t.Fatal(err)
  1001  	}
  1002  
  1003  	// Ensure that RPCs continue to succeed for the next second.
  1004  	for end := time.Now().Add(time.Second); time.Now().Before(end); <-time.After(defaultTestShortTimeout) {
  1005  		if err := rrutil.CheckRoundRobinRPCs(ctx, client, []resolver.Address{{Addr: server.Address}}); err != nil {
  1006  			t.Fatal(err)
  1007  		}
  1008  	}
  1009  }
  1010  
  1011  // TestEDS_ResourceNotFound tests the case where the requested EDS resource does
  1012  // not exist on the management server. Once the watch timer associated with the
  1013  // requested resource expires, the cluster_resolver LB policy receives a
  1014  // "resource-not-found" callback from the xDS client and is expected to treat it
  1015  // as though it received an update with no endpoints. Hence RPCs are expected to
  1016  // fail with "all priorities removed" error.
  1017  func (s) TestEDS_ResourceNotFound(t *testing.T) {
  1018  	// Spin up a management server to receive xDS resources from.
  1019  	mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{})
  1020  	if err != nil {
  1021  		t.Fatalf("Failed to spin up the xDS management server: %v", err)
  1022  	}
  1023  	defer mgmtServer.Stop()
  1024  
  1025  	// Create an xDS client talking to the above management server, configured
  1026  	// with a short watch expiry timeout.
  1027  	nodeID := uuid.New().String()
  1028  	xdsClient, close, err := xdsclient.NewWithConfigForTesting(&bootstrap.Config{
  1029  		XDSServer: xdstestutils.ServerConfigForAddress(t, mgmtServer.Address),
  1030  		NodeProto: &v3corepb.Node{Id: nodeID},
  1031  	}, defaultTestWatchExpiryTimeout, time.Duration(0))
  1032  	if err != nil {
  1033  		t.Fatalf("failed to create xds client: %v", err)
  1034  	}
  1035  	defer close()
  1036  
  1037  	// Configure no resources on the management server.
  1038  	resources := e2e.UpdateOptions{NodeID: nodeID, SkipValidation: true}
  1039  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  1040  	defer cancel()
  1041  	if err := mgmtServer.Update(ctx, resources); err != nil {
  1042  		t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err)
  1043  	}
  1044  
  1045  	// Create a manual resolver and push a service config specifying the use of
  1046  	// the cluster_resolver LB policy with a single discovery mechanism.
  1047  	r := manual.NewBuilderWithScheme("whatever")
  1048  	jsonSC := fmt.Sprintf(`{
  1049  			"loadBalancingConfig":[{
  1050  				"cluster_resolver_experimental":{
  1051  					"discoveryMechanisms": [{
  1052  						"cluster": "%s",
  1053  						"type": "EDS",
  1054  						"edsServiceName": "%s",
  1055  						"outlierDetection": {}
  1056  					}],
  1057  					"xdsLbPolicy":[{"round_robin":{}}]
  1058  				}
  1059  			}]
  1060  		}`, clusterName, edsServiceName)
  1061  	scpr := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC)
  1062  	r.InitialState(xdsclient.SetClient(resolver.State{ServiceConfig: scpr}, xdsClient))
  1063  
  1064  	// Create a ClientConn and verify that RPCs fail with "all priorities
  1065  	// removed" error.
  1066  	cc, err := grpc.Dial(r.Scheme()+":///test.service", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
  1067  	if err != nil {
  1068  		t.Fatalf("failed to dial local test server: %v", err)
  1069  	}
  1070  	defer cc.Close()
  1071  	client := testgrpc.NewTestServiceClient(cc)
  1072  	if err := waitForProducedZeroAddressesError(ctx, t, client); err != nil {
  1073  		t.Fatal(err)
  1074  	}
  1075  }
  1076  
  1077  // waitForAllPrioritiesRemovedError repeatedly makes RPCs using the
  1078  // TestServiceClient until they fail with an error which indicates that no
  1079  // resolver addresses have been produced. A non-nil error is returned if the
  1080  // context expires before RPCs fail with the expected error.
  1081  func waitForProducedZeroAddressesError(ctx context.Context, t *testing.T, client testgrpc.TestServiceClient) error {
  1082  	for ; ctx.Err() == nil; <-time.After(time.Millisecond) {
  1083  		_, err := client.EmptyCall(ctx, &testpb.Empty{})
  1084  		if err == nil {
  1085  			t.Log("EmptyCall() succeeded after error in Discovery Mechanism")
  1086  			continue
  1087  		}
  1088  		if code := status.Code(err); code != codes.Unavailable {
  1089  			t.Logf("EmptyCall() returned code: %v, want: %v", code, codes.Unavailable)
  1090  			continue
  1091  		}
  1092  		if !strings.Contains(err.Error(), "produced zero addresses") {
  1093  			t.Logf("EmptyCall() = %v, want %v", err, "produced zero addresses")
  1094  			continue
  1095  		}
  1096  		return nil
  1097  	}
  1098  	return errors.New("timeout when waiting for RPCs to fail with UNAVAILABLE status and produced zero addresses")
  1099  }
  1100  

View as plain text