...

Source file src/google.golang.org/grpc/xds/internal/balancer/clusterresolver/e2e_test/aggregate_cluster_test.go

Documentation: google.golang.org/grpc/xds/internal/balancer/clusterresolver/e2e_test

     1  /*
     2   * Copyright 2023 gRPC authors.
     3   *
     4   * Licensed under the Apache License, Version 2.0 (the "License");
     5   * you may not use this file except in compliance with the License.
     6   * You may obtain a copy of the License at
     7   *
     8   *     http://www.apache.org/licenses/LICENSE-2.0
     9   *
    10   * Unless required by applicable law or agreed to in writing, software
    11   * distributed under the License is distributed on an "AS IS" BASIS,
    12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13   * See the License for the specific language governing permissions and
    14   * limitations under the License.
    15   */
    16  
    17  package e2e_test
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"net"
    23  	"sort"
    24  	"strconv"
    25  	"strings"
    26  	"testing"
    27  	"time"
    28  
    29  	"github.com/google/go-cmp/cmp"
    30  	"google.golang.org/grpc"
    31  	"google.golang.org/grpc/codes"
    32  	"google.golang.org/grpc/connectivity"
    33  	"google.golang.org/grpc/credentials/insecure"
    34  	"google.golang.org/grpc/internal"
    35  	"google.golang.org/grpc/internal/stubserver"
    36  	"google.golang.org/grpc/internal/testutils/pickfirst"
    37  	"google.golang.org/grpc/internal/testutils/xds/e2e"
    38  	"google.golang.org/grpc/internal/xds/bootstrap"
    39  	"google.golang.org/grpc/peer"
    40  	"google.golang.org/grpc/resolver"
    41  	"google.golang.org/grpc/resolver/manual"
    42  	"google.golang.org/grpc/serviceconfig"
    43  	"google.golang.org/grpc/status"
    44  	xdstestutils "google.golang.org/grpc/xds/internal/testutils"
    45  	"google.golang.org/grpc/xds/internal/xdsclient"
    46  	"google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version"
    47  	"google.golang.org/protobuf/types/known/wrapperspb"
    48  
    49  	v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
    50  	v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
    51  	v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
    52  	v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
    53  	testgrpc "google.golang.org/grpc/interop/grpc_testing"
    54  	testpb "google.golang.org/grpc/interop/grpc_testing"
    55  )
    56  
    57  // makeAggregateClusterResource returns an aggregate cluster resource with the
    58  // given name and list of child names.
    59  func makeAggregateClusterResource(name string, childNames []string) *v3clusterpb.Cluster {
    60  	return e2e.ClusterResourceWithOptions(e2e.ClusterOptions{
    61  		ClusterName: name,
    62  		Type:        e2e.ClusterTypeAggregate,
    63  		ChildNames:  childNames,
    64  	})
    65  }
    66  
    67  // makeLogicalDNSClusterResource returns a LOGICAL_DNS cluster resource with the
    68  // given name and given DNS host and port.
    69  func makeLogicalDNSClusterResource(name, dnsHost string, dnsPort uint32) *v3clusterpb.Cluster {
    70  	return e2e.ClusterResourceWithOptions(e2e.ClusterOptions{
    71  		ClusterName: name,
    72  		Type:        e2e.ClusterTypeLogicalDNS,
    73  		DNSHostName: dnsHost,
    74  		DNSPort:     dnsPort,
    75  	})
    76  }
    77  
    78  // setupDNS unregisters the DNS resolver and registers a manual resolver for the
    79  // same scheme. This allows the test to mock the DNS resolution by supplying the
    80  // addresses of the test backends.
    81  //
    82  // Returns the following:
    83  //   - a channel onto which the DNS target being resolved is written to by the
    84  //     mock DNS resolver
    85  //   - a channel to notify close of the DNS resolver
    86  //   - a channel to notify re-resolution requests to the DNS resolver
    87  //   - a manual resolver which is used to mock the actual DNS resolution
    88  //   - a cleanup function which re-registers the original DNS resolver
    89  func setupDNS() (chan resolver.Target, chan struct{}, chan resolver.ResolveNowOptions, *manual.Resolver, func()) {
    90  	targetCh := make(chan resolver.Target, 1)
    91  	closeCh := make(chan struct{}, 1)
    92  	resolveNowCh := make(chan resolver.ResolveNowOptions, 1)
    93  
    94  	mr := manual.NewBuilderWithScheme("dns")
    95  	mr.BuildCallback = func(target resolver.Target, _ resolver.ClientConn, _ resolver.BuildOptions) { targetCh <- target }
    96  	mr.CloseCallback = func() { closeCh <- struct{}{} }
    97  	mr.ResolveNowCallback = func(opts resolver.ResolveNowOptions) { resolveNowCh <- opts }
    98  
    99  	dnsResolverBuilder := resolver.Get("dns")
   100  	resolver.Register(mr)
   101  
   102  	return targetCh, closeCh, resolveNowCh, mr, func() { resolver.Register(dnsResolverBuilder) }
   103  }
   104  
   105  // TestAggregateCluster_WithTwoEDSClusters tests the case where the top-level
   106  // cluster resource is an aggregate cluster. It verifies that RPCs fail when the
   107  // management server has not responded to all requested EDS resources, and also
   108  // that RPCs are routed to the highest priority cluster once all requested EDS
   109  // resources have been sent by the management server.
   110  func (s) TestAggregateCluster_WithTwoEDSClusters(t *testing.T) {
   111  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   112  	defer cancel()
   113  
   114  	// Start an xDS management server that pushes the EDS resource names onto a
   115  	// channel when requested.
   116  	edsResourceNameCh := make(chan []string, 1)
   117  	managementServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{
   118  		OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error {
   119  			if req.GetTypeUrl() != version.V3EndpointsURL {
   120  				return nil
   121  			}
   122  			if len(req.GetResourceNames()) == 0 {
   123  				// This is the case for ACKs. Do nothing here.
   124  				return nil
   125  			}
   126  			select {
   127  			case edsResourceNameCh <- req.GetResourceNames():
   128  			case <-ctx.Done():
   129  			}
   130  			return nil
   131  		},
   132  		AllowResourceSubset: true,
   133  	})
   134  	defer cleanup()
   135  
   136  	// Start two test backends and extract their host and port. The first
   137  	// backend belongs to EDS cluster "cluster-1", while the second backend
   138  	// belongs to EDS cluster "cluster-2".
   139  	servers, cleanup2 := startTestServiceBackends(t, 2)
   140  	defer cleanup2()
   141  	addrs, ports := backendAddressesAndPorts(t, servers)
   142  
   143  	// Configure an aggregate cluster, two EDS clusters and only one endpoints
   144  	// resource (corresponding to the first EDS cluster) in the management
   145  	// server.
   146  	const clusterName1 = clusterName + "-cluster-1"
   147  	const clusterName2 = clusterName + "-cluster-2"
   148  	resources := e2e.UpdateOptions{
   149  		NodeID: nodeID,
   150  		Clusters: []*v3clusterpb.Cluster{
   151  			makeAggregateClusterResource(clusterName, []string{clusterName1, clusterName2}),
   152  			e2e.DefaultCluster(clusterName1, "", e2e.SecurityLevelNone),
   153  			e2e.DefaultCluster(clusterName2, "", e2e.SecurityLevelNone),
   154  		},
   155  		Endpoints:      []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(clusterName1, "localhost", []uint32{uint32(ports[0])})},
   156  		SkipValidation: true,
   157  	}
   158  	if err := managementServer.Update(ctx, resources); err != nil {
   159  		t.Fatal(err)
   160  	}
   161  
   162  	// Create xDS client, configure cds_experimental LB policy with a manual
   163  	// resolver, and dial the test backends.
   164  	cc, cleanup := setupAndDial(t, bootstrapContents)
   165  	defer cleanup()
   166  
   167  	// Wait for both EDS resources to be requested.
   168  	func() {
   169  		for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) {
   170  			select {
   171  			case names := <-edsResourceNameCh:
   172  				// Copy and sort the sortedNames to avoid racing with an
   173  				// OnStreamRequest call.
   174  				sortedNames := make([]string, len(names))
   175  				copy(sortedNames, names)
   176  				sort.Strings(sortedNames)
   177  				if cmp.Equal(sortedNames, []string{clusterName1, clusterName2}) {
   178  					return
   179  				}
   180  			default:
   181  			}
   182  		}
   183  	}()
   184  	if ctx.Err() != nil {
   185  		t.Fatalf("Timeout when waiting for all EDS resources %v to be requested", []string{clusterName1, clusterName2})
   186  	}
   187  
   188  	// Make an RPC with a short deadline. We expect this RPC to not succeed
   189  	// because the management server has not responded with all EDS resources
   190  	// requested.
   191  	client := testgrpc.NewTestServiceClient(cc)
   192  	sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
   193  	defer sCancel()
   194  	if _, err := client.EmptyCall(sCtx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded {
   195  		t.Fatalf("EmptyCall() code %s, want %s", status.Code(err), codes.DeadlineExceeded)
   196  	}
   197  
   198  	// Update the management server with the second EDS resource.
   199  	resources.Endpoints = append(resources.Endpoints, e2e.DefaultEndpoint(clusterName2, "localhost", []uint32{uint32(ports[1])}))
   200  	if err := managementServer.Update(ctx, resources); err != nil {
   201  		t.Fatal(err)
   202  	}
   203  
   204  	// Make an RPC and ensure that it gets routed to cluster-1, implicitly
   205  	// higher priority than cluster-2.
   206  	peer := &peer.Peer{}
   207  	if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil {
   208  		t.Fatalf("EmptyCall() failed: %v", err)
   209  	}
   210  	if peer.Addr.String() != addrs[0].Addr {
   211  		t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, addrs[0].Addr)
   212  	}
   213  }
   214  
   215  // TestAggregateCluster_WithTwoEDSClusters_PrioritiesChange tests the case where
   216  // the top-level cluster resource is an aggregate cluster. It verifies that RPCs
   217  // are routed to the highest priority EDS cluster.
   218  func (s) TestAggregateCluster_WithTwoEDSClusters_PrioritiesChange(t *testing.T) {
   219  	// Start an xDS management server.
   220  	managementServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
   221  	defer cleanup()
   222  
   223  	// Start two test backends and extract their host and port. The first
   224  	// backend belongs to EDS cluster "cluster-1", while the second backend
   225  	// belongs to EDS cluster "cluster-2".
   226  	servers, cleanup2 := startTestServiceBackends(t, 2)
   227  	defer cleanup2()
   228  	addrs, ports := backendAddressesAndPorts(t, servers)
   229  
   230  	// Configure an aggregate cluster, two EDS clusters and the corresponding
   231  	// endpoints resources in the management server.
   232  	const clusterName1 = clusterName + "cluster-1"
   233  	const clusterName2 = clusterName + "cluster-2"
   234  	resources := e2e.UpdateOptions{
   235  		NodeID: nodeID,
   236  		Clusters: []*v3clusterpb.Cluster{
   237  			makeAggregateClusterResource(clusterName, []string{clusterName1, clusterName2}),
   238  			e2e.DefaultCluster(clusterName1, "", e2e.SecurityLevelNone),
   239  			e2e.DefaultCluster(clusterName2, "", e2e.SecurityLevelNone),
   240  		},
   241  		Endpoints: []*v3endpointpb.ClusterLoadAssignment{
   242  			e2e.DefaultEndpoint(clusterName1, "localhost", []uint32{uint32(ports[0])}),
   243  			e2e.DefaultEndpoint(clusterName2, "localhost", []uint32{uint32(ports[1])}),
   244  		},
   245  		SkipValidation: true,
   246  	}
   247  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   248  	defer cancel()
   249  	if err := managementServer.Update(ctx, resources); err != nil {
   250  		t.Fatal(err)
   251  	}
   252  
   253  	// Create xDS client, configure cds_experimental LB policy with a manual
   254  	// resolver, and dial the test backends.
   255  	cc, cleanup := setupAndDial(t, bootstrapContents)
   256  	defer cleanup()
   257  
   258  	// Make an RPC and ensure that it gets routed to cluster-1, implicitly
   259  	// higher priority than cluster-2.
   260  	client := testgrpc.NewTestServiceClient(cc)
   261  	peer := &peer.Peer{}
   262  	if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil {
   263  		t.Fatalf("EmptyCall() failed: %v", err)
   264  	}
   265  	if peer.Addr.String() != addrs[0].Addr {
   266  		t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, addrs[0].Addr)
   267  	}
   268  
   269  	// Swap the priorities of the EDS clusters in the aggregate cluster.
   270  	resources.Clusters = []*v3clusterpb.Cluster{
   271  		makeAggregateClusterResource(clusterName, []string{clusterName2, clusterName1}),
   272  		e2e.DefaultCluster(clusterName1, "", e2e.SecurityLevelNone),
   273  		e2e.DefaultCluster(clusterName2, "", e2e.SecurityLevelNone),
   274  	}
   275  	if err := managementServer.Update(ctx, resources); err != nil {
   276  		t.Fatal(err)
   277  	}
   278  
   279  	// Wait for RPCs to get routed to cluster-2, which is now implicitly higher
   280  	// priority than cluster-1, after the priority switch above.
   281  	for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) {
   282  		if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil {
   283  			t.Fatalf("EmptyCall() failed: %v", err)
   284  		}
   285  		if peer.Addr.String() == addrs[1].Addr {
   286  			break
   287  		}
   288  	}
   289  	if ctx.Err() != nil {
   290  		t.Fatal("Timeout waiting for RPCs to be routed to cluster-2 after priority switch")
   291  	}
   292  }
   293  
   294  func hostAndPortFromAddress(t *testing.T, addr string) (string, uint32) {
   295  	t.Helper()
   296  
   297  	host, p, err := net.SplitHostPort(addr)
   298  	if err != nil {
   299  		t.Fatalf("Invalid serving address: %v", addr)
   300  	}
   301  	port, err := strconv.ParseUint(p, 10, 32)
   302  	if err != nil {
   303  		t.Fatalf("Invalid serving port %q: %v", p, err)
   304  	}
   305  	return host, uint32(port)
   306  }
   307  
   308  // TestAggregateCluster_WithOneDNSCluster tests the case where the top-level
   309  // cluster resource is an aggregate cluster that resolves to a single
   310  // LOGICAL_DNS cluster. The test verifies that RPCs can be made to backends that
   311  // make up the LOGICAL_DNS cluster.
   312  func (s) TestAggregateCluster_WithOneDNSCluster(t *testing.T) {
   313  	// Start an xDS management server.
   314  	managementServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
   315  	defer cleanup2()
   316  
   317  	// Start a test service backend.
   318  	server := stubserver.StartTestService(t, nil)
   319  	defer server.Stop()
   320  	host, port := hostAndPortFromAddress(t, server.Address)
   321  
   322  	// Configure an aggregate cluster pointing to a single LOGICAL_DNS cluster.
   323  	const dnsClusterName = clusterName + "-dns"
   324  	resources := e2e.UpdateOptions{
   325  		NodeID: nodeID,
   326  		Clusters: []*v3clusterpb.Cluster{
   327  			makeAggregateClusterResource(clusterName, []string{dnsClusterName}),
   328  			makeLogicalDNSClusterResource(dnsClusterName, host, uint32(port)),
   329  		},
   330  		SkipValidation: true,
   331  	}
   332  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   333  	defer cancel()
   334  	if err := managementServer.Update(ctx, resources); err != nil {
   335  		t.Fatal(err)
   336  	}
   337  
   338  	// Create xDS client, configure cds_experimental LB policy with a manual
   339  	// resolver, and dial the test backends.
   340  	cc, cleanup := setupAndDial(t, bootstrapContents)
   341  	defer cleanup()
   342  
   343  	// Make an RPC and ensure that it gets routed to the first backend since the
   344  	// child policy for a LOGICAL_DNS cluster is pick_first by default.
   345  	client := testgrpc.NewTestServiceClient(cc)
   346  	peer := &peer.Peer{}
   347  	if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil {
   348  		t.Fatalf("EmptyCall() failed: %v", err)
   349  	}
   350  	if peer.Addr.String() != server.Address {
   351  		t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, server.Address)
   352  	}
   353  }
   354  
   355  // Tests the case where the top-level cluster resource is an aggregate cluster
   356  // that resolves to a single LOGICAL_DNS cluster. The specified dns hostname is
   357  // expected to fail url parsing. The test verifies that the channel moves to
   358  // TRANSIENT_FAILURE.
   359  func (s) TestAggregateCluster_WithOneDNSCluster_ParseFailure(t *testing.T) {
   360  	// Start an xDS management server.
   361  	managementServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
   362  	defer cleanup2()
   363  
   364  	// Configure an aggregate cluster pointing to a single LOGICAL_DNS cluster.
   365  	const dnsClusterName = clusterName + "-dns"
   366  	resources := e2e.UpdateOptions{
   367  		NodeID: nodeID,
   368  		Clusters: []*v3clusterpb.Cluster{
   369  			makeAggregateClusterResource(clusterName, []string{dnsClusterName}),
   370  			makeLogicalDNSClusterResource(dnsClusterName, "%gh&%ij", uint32(8080)),
   371  		},
   372  		SkipValidation: true,
   373  	}
   374  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   375  	defer cancel()
   376  	if err := managementServer.Update(ctx, resources); err != nil {
   377  		t.Fatal(err)
   378  	}
   379  
   380  	// Create xDS client, configure cds_experimental LB policy with a manual
   381  	// resolver, and dial the test backends.
   382  	cc, cleanup := setupAndDial(t, bootstrapContents)
   383  	defer cleanup()
   384  
   385  	// Ensure that the ClientConn moves to TransientFailure.
   386  	for state := cc.GetState(); state != connectivity.TransientFailure; state = cc.GetState() {
   387  		if !cc.WaitForStateChange(ctx, state) {
   388  			t.Fatalf("Timed out waiting for state change. got %v; want %v", state, connectivity.TransientFailure)
   389  		}
   390  	}
   391  }
   392  
   393  // Tests the case where the top-level cluster resource is an aggregate cluster
   394  // that resolves to a single LOGICAL_DNS cluster. The test verifies that RPCs
   395  // can be made to backends that make up the LOGICAL_DNS cluster. The hostname of
   396  // the LOGICAL_DNS cluster is updated, and the test verifies that RPCs can be
   397  // made to backends that the new hostname resolves to.
   398  func (s) TestAggregateCluster_WithOneDNSCluster_HostnameChange(t *testing.T) {
   399  	// Start an xDS management server.
   400  	managementServer, nodeID, bootstrapContents, _, cleanup1 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
   401  	defer cleanup1()
   402  
   403  	// Start two test backends and extract their host and port. The first
   404  	// backend is used initially for the LOGICAL_DNS cluster and an update
   405  	// switches the cluster to use the second backend.
   406  	servers, cleanup2 := startTestServiceBackends(t, 2)
   407  	defer cleanup2()
   408  
   409  	// Configure an aggregate cluster pointing to a single LOGICAL_DNS cluster.
   410  	const dnsClusterName = clusterName + "-dns"
   411  	dnsHostName, dnsPort := hostAndPortFromAddress(t, servers[0].Address)
   412  	resources := e2e.UpdateOptions{
   413  		NodeID: nodeID,
   414  		Clusters: []*v3clusterpb.Cluster{
   415  			makeAggregateClusterResource(clusterName, []string{dnsClusterName}),
   416  			makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort),
   417  		},
   418  		SkipValidation: true,
   419  	}
   420  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   421  	defer cancel()
   422  	if err := managementServer.Update(ctx, resources); err != nil {
   423  		t.Fatal(err)
   424  	}
   425  
   426  	// Create xDS client, configure cds_experimental LB policy with a manual
   427  	// resolver, and dial the test backends.
   428  	cc, cleanup := setupAndDial(t, bootstrapContents)
   429  	defer cleanup()
   430  
   431  	// Make an RPC and ensure that it gets routed to the first backend since the
   432  	// child policy for a LOGICAL_DNS cluster is pick_first by default.
   433  	client := testgrpc.NewTestServiceClient(cc)
   434  	peer := &peer.Peer{}
   435  	if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil {
   436  		t.Fatalf("EmptyCall() failed: %v", err)
   437  	}
   438  	if peer.Addr.String() != servers[0].Address {
   439  		t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, servers[0].Address)
   440  	}
   441  
   442  	// Update the LOGICAL_DNS cluster's hostname to point to the second backend.
   443  	dnsHostName, dnsPort = hostAndPortFromAddress(t, servers[1].Address)
   444  	resources = e2e.UpdateOptions{
   445  		NodeID: nodeID,
   446  		Clusters: []*v3clusterpb.Cluster{
   447  			makeAggregateClusterResource(clusterName, []string{dnsClusterName}),
   448  			makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort),
   449  		},
   450  		SkipValidation: true,
   451  	}
   452  	if err := managementServer.Update(ctx, resources); err != nil {
   453  		t.Fatal(err)
   454  	}
   455  
   456  	// Ensure that traffic moves to the second backend eventually.
   457  	for ctx.Err() == nil {
   458  		if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer)); err != nil {
   459  			t.Fatalf("EmptyCall() failed: %v", err)
   460  		}
   461  		if peer.Addr.String() == servers[1].Address {
   462  			break
   463  		}
   464  	}
   465  	if ctx.Err() != nil {
   466  		t.Fatal("Timeout when waiting for RPCs to switch to the second backend")
   467  	}
   468  }
   469  
   470  // TestAggregateCluster_WithEDSAndDNS tests the case where the top-level cluster
   471  // resource is an aggregate cluster that resolves to an EDS and a LOGICAL_DNS
   472  // cluster. The test verifies that RPCs fail until both clusters are resolved to
   473  // endpoints, and RPCs are routed to the higher priority EDS cluster.
   474  func (s) TestAggregateCluster_WithEDSAndDNS(t *testing.T) {
   475  	dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS()
   476  	defer cleanup1()
   477  
   478  	// Start an xDS management server that pushes the name of the requested EDS
   479  	// resource onto a channel.
   480  	edsResourceCh := make(chan string, 1)
   481  	managementServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{
   482  		OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error {
   483  			if req.GetTypeUrl() != version.V3EndpointsURL {
   484  				return nil
   485  			}
   486  			if len(req.GetResourceNames()) > 0 {
   487  				select {
   488  				case edsResourceCh <- req.GetResourceNames()[0]:
   489  				default:
   490  				}
   491  			}
   492  			return nil
   493  		},
   494  		AllowResourceSubset: true,
   495  	})
   496  	defer cleanup2()
   497  
   498  	// Start two test backends and extract their host and port. The first
   499  	// backend is used for the EDS cluster and the second backend is used for
   500  	// the LOGICAL_DNS cluster.
   501  	servers, cleanup3 := startTestServiceBackends(t, 2)
   502  	defer cleanup3()
   503  	addrs, ports := backendAddressesAndPorts(t, servers)
   504  
   505  	// Configure an aggregate cluster pointing to an EDS and DNS cluster. Also
   506  	// configure an endpoints resource for the EDS cluster.
   507  	const (
   508  		edsClusterName = clusterName + "-eds"
   509  		dnsClusterName = clusterName + "-dns"
   510  		dnsHostName    = "dns_host"
   511  		dnsPort        = uint32(8080)
   512  	)
   513  	resources := e2e.UpdateOptions{
   514  		NodeID: nodeID,
   515  		Clusters: []*v3clusterpb.Cluster{
   516  			makeAggregateClusterResource(clusterName, []string{edsClusterName, dnsClusterName}),
   517  			e2e.DefaultCluster(edsClusterName, "", e2e.SecurityLevelNone),
   518  			makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort),
   519  		},
   520  		Endpoints:      []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(edsClusterName, "localhost", []uint32{uint32(ports[0])})},
   521  		SkipValidation: true,
   522  	}
   523  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   524  	defer cancel()
   525  	if err := managementServer.Update(ctx, resources); err != nil {
   526  		t.Fatal(err)
   527  	}
   528  
   529  	// Create xDS client, configure cds_experimental LB policy with a manual
   530  	// resolver, and dial the test backends.
   531  	cc, cleanup := setupAndDial(t, bootstrapContents)
   532  	defer cleanup()
   533  
   534  	// Ensure that an EDS request is sent for the expected resource name.
   535  	select {
   536  	case <-ctx.Done():
   537  		t.Fatal("Timeout when waiting for EDS request to be received on the management server")
   538  	case name := <-edsResourceCh:
   539  		if name != edsClusterName {
   540  			t.Fatalf("Received EDS request with resource name %q, want %q", name, edsClusterName)
   541  		}
   542  	}
   543  
   544  	// Ensure that the DNS resolver is started for the expected target.
   545  	select {
   546  	case <-ctx.Done():
   547  		t.Fatal("Timeout when waiting for DNS resolver to be started")
   548  	case target := <-dnsTargetCh:
   549  		got, want := target.Endpoint(), fmt.Sprintf("%s:%d", dnsHostName, dnsPort)
   550  		if got != want {
   551  			t.Fatalf("DNS resolution started for target %q, want %q", got, want)
   552  		}
   553  	}
   554  
   555  	// Make an RPC with a short deadline. We expect this RPC to not succeed
   556  	// because the DNS resolver has not responded with endpoint addresses.
   557  	client := testgrpc.NewTestServiceClient(cc)
   558  	sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
   559  	defer sCancel()
   560  	if _, err := client.EmptyCall(sCtx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded {
   561  		t.Fatalf("EmptyCall() code %s, want %s", status.Code(err), codes.DeadlineExceeded)
   562  	}
   563  
   564  	// Update DNS resolver with test backend addresses.
   565  	dnsR.UpdateState(resolver.State{Addresses: addrs[1:]})
   566  
   567  	// Make an RPC and ensure that it gets routed to the first backend since the
   568  	// EDS cluster is of higher priority than the LOGICAL_DNS cluster.
   569  	peer := &peer.Peer{}
   570  	if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil {
   571  		t.Fatalf("EmptyCall() failed: %v", err)
   572  	}
   573  	if peer.Addr.String() != addrs[0].Addr {
   574  		t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, addrs[0].Addr)
   575  	}
   576  }
   577  
   578  // TestAggregateCluster_SwitchEDSAndDNS tests the case where the top-level
   579  // cluster resource is an aggregate cluster. It initially resolves to a single
   580  // EDS cluster. The test verifies that RPCs are routed to backends in the EDS
   581  // cluster. Subsequently, the aggregate cluster resolves to a single DNS
   582  // cluster. The test verifies that RPCs are successful, this time to backends in
   583  // the DNS cluster.
   584  func (s) TestAggregateCluster_SwitchEDSAndDNS(t *testing.T) {
   585  	// Start an xDS management server.
   586  	managementServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
   587  	defer cleanup2()
   588  
   589  	// Start two test backends and extract their host and port. The first
   590  	// backend is used for the EDS cluster and the second backend is used for
   591  	// the LOGICAL_DNS cluster.
   592  	servers, cleanup3 := startTestServiceBackends(t, 2)
   593  	defer cleanup3()
   594  	addrs, ports := backendAddressesAndPorts(t, servers)
   595  	dnsHostName, dnsPort := hostAndPortFromAddress(t, addrs[1].Addr)
   596  
   597  	// Configure an aggregate cluster pointing to a single EDS cluster. Also,
   598  	// configure the underlying EDS cluster (and the corresponding endpoints
   599  	// resource) and DNS cluster (will be used later in the test).
   600  	const dnsClusterName = clusterName + "-dns"
   601  	resources := e2e.UpdateOptions{
   602  		NodeID: nodeID,
   603  		Clusters: []*v3clusterpb.Cluster{
   604  			makeAggregateClusterResource(clusterName, []string{edsServiceName}),
   605  			e2e.DefaultCluster(edsServiceName, "", e2e.SecurityLevelNone),
   606  			makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort),
   607  		},
   608  		Endpoints:      []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(edsServiceName, "localhost", []uint32{uint32(ports[0])})},
   609  		SkipValidation: true,
   610  	}
   611  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   612  	defer cancel()
   613  	if err := managementServer.Update(ctx, resources); err != nil {
   614  		t.Fatal(err)
   615  	}
   616  
   617  	// Create xDS client, configure cds_experimental LB policy with a manual
   618  	// resolver, and dial the test backends.
   619  	cc, cleanup := setupAndDial(t, bootstrapContents)
   620  	defer cleanup()
   621  
   622  	// Ensure that the RPC is routed to the appropriate backend.
   623  	client := testgrpc.NewTestServiceClient(cc)
   624  	peer := &peer.Peer{}
   625  	if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil {
   626  		t.Fatalf("EmptyCall() failed: %v", err)
   627  	}
   628  	if peer.Addr.String() != addrs[0].Addr {
   629  		t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, addrs[0].Addr)
   630  	}
   631  
   632  	// Update the aggregate cluster to point to a single DNS cluster.
   633  	resources.Clusters = []*v3clusterpb.Cluster{
   634  		makeAggregateClusterResource(clusterName, []string{dnsClusterName}),
   635  		e2e.DefaultCluster(edsServiceName, "", e2e.SecurityLevelNone),
   636  		makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort),
   637  	}
   638  	if err := managementServer.Update(ctx, resources); err != nil {
   639  		t.Fatal(err)
   640  	}
   641  
   642  	// Ensure that start getting routed to the backend corresponding to the
   643  	// LOGICAL_DNS cluster.
   644  	for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) {
   645  		client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer))
   646  		if peer.Addr.String() == addrs[1].Addr {
   647  			break
   648  		}
   649  	}
   650  	if ctx.Err() != nil {
   651  		t.Fatalf("Timeout when waiting for RPCs to be routed to backend %q in the DNS cluster", addrs[1].Addr)
   652  	}
   653  }
   654  
   655  // TestAggregateCluster_BadEDS_GoodToBadDNS tests the case where the top-level
   656  // cluster is an aggregate cluster that resolves to an EDS and LOGICAL_DNS
   657  // cluster. The test first asserts that no RPCs can be made after receiving an
   658  // EDS response with zero endpoints because no update has been received from the
   659  // DNS resolver yet. Once the DNS resolver pushes an update, the test verifies
   660  // that we switch to the DNS cluster and can make a successful RPC. At this
   661  // point when the DNS cluster returns an error, the test verifies that RPCs are
   662  // still successful. This is the expected behavior because the cluster resolver
   663  // policy eats errors from DNS Resolver after it has returned an error.
   664  func (s) TestAggregateCluster_BadEDS_GoodToBadDNS(t *testing.T) {
   665  	dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS()
   666  	defer cleanup1()
   667  
   668  	// Start an xDS management server.
   669  	managementServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
   670  	defer cleanup2()
   671  
   672  	// Start two test backends.
   673  	servers, cleanup3 := startTestServiceBackends(t, 2)
   674  	defer cleanup3()
   675  	addrs, _ := backendAddressesAndPorts(t, servers)
   676  
   677  	// Configure an aggregate cluster pointing to an EDS and LOGICAL_DNS
   678  	// cluster. Also configure an endpoints resource for the EDS cluster which
   679  	// triggers a NACK.
   680  	const (
   681  		edsClusterName = clusterName + "-eds"
   682  		dnsClusterName = clusterName + "-dns"
   683  		dnsHostName    = "dns_host"
   684  		dnsPort        = uint32(8080)
   685  	)
   686  	emptyEndpointResource := e2e.DefaultEndpoint(edsServiceName, "localhost", nil)
   687  	resources := e2e.UpdateOptions{
   688  		NodeID: nodeID,
   689  		Clusters: []*v3clusterpb.Cluster{
   690  			makeAggregateClusterResource(clusterName, []string{edsClusterName, dnsClusterName}),
   691  			e2e.DefaultCluster(edsClusterName, edsServiceName, e2e.SecurityLevelNone),
   692  			makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort),
   693  		},
   694  		Endpoints:      []*v3endpointpb.ClusterLoadAssignment{emptyEndpointResource},
   695  		SkipValidation: true,
   696  	}
   697  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   698  	defer cancel()
   699  	if err := managementServer.Update(ctx, resources); err != nil {
   700  		t.Fatal(err)
   701  	}
   702  
   703  	// Create xDS client, configure cds_experimental LB policy with a manual
   704  	// resolver, and dial the test backends.
   705  	cc, cleanup := setupAndDial(t, bootstrapContents)
   706  	defer cleanup()
   707  
   708  	// Make an RPC with a short deadline. We expect this RPC to not succeed
   709  	// because the EDS resource came back with no endpoints, and we are yet to
   710  	// push an update through the DNS resolver.
   711  	client := testgrpc.NewTestServiceClient(cc)
   712  	sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
   713  	defer sCancel()
   714  	if _, err := client.EmptyCall(sCtx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded {
   715  		t.Fatalf("EmptyCall() code %s, want %s", status.Code(err), codes.DeadlineExceeded)
   716  	}
   717  
   718  	// Ensure that the DNS resolver is started for the expected target.
   719  	select {
   720  	case <-ctx.Done():
   721  		t.Fatal("Timeout when waiting for DNS resolver to be started")
   722  	case target := <-dnsTargetCh:
   723  		got, want := target.Endpoint(), fmt.Sprintf("%s:%d", dnsHostName, dnsPort)
   724  		if got != want {
   725  			t.Fatalf("DNS resolution started for target %q, want %q", got, want)
   726  		}
   727  	}
   728  
   729  	// Update DNS resolver with test backend addresses.
   730  	dnsR.UpdateState(resolver.State{Addresses: addrs})
   731  
   732  	// Ensure that RPCs start getting routed to the first backend since the
   733  	// child policy for a LOGICAL_DNS cluster is pick_first by default.
   734  	for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) {
   735  		peer := &peer.Peer{}
   736  		if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer)); err != nil {
   737  			t.Logf("EmptyCall() failed: %v", err)
   738  			continue
   739  		}
   740  		if peer.Addr.String() == addrs[0].Addr {
   741  			break
   742  		}
   743  	}
   744  	if ctx.Err() != nil {
   745  		t.Fatalf("Timeout when waiting for RPCs to be routed to backend %q in the DNS cluster", addrs[0].Addr)
   746  	}
   747  
   748  	// Push an error from the DNS resolver as well.
   749  	dnsErr := fmt.Errorf("DNS error")
   750  	dnsR.ReportError(dnsErr)
   751  
   752  	// Ensure that RPCs continue to succeed for the next second.
   753  	for end := time.Now().Add(time.Second); time.Now().Before(end); <-time.After(defaultTestShortTimeout) {
   754  		peer := &peer.Peer{}
   755  		if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer)); err != nil {
   756  			t.Fatalf("EmptyCall() failed: %v", err)
   757  		}
   758  		if peer.Addr.String() != addrs[0].Addr {
   759  			t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, addrs[0].Addr)
   760  		}
   761  	}
   762  }
   763  
   764  // TestAggregateCluster_BadEDS_GoodToBadDNS tests the case where the top-level
   765  // cluster is an aggregate cluster that resolves to an EDS and LOGICAL_DNS
   766  // cluster. The test first sends an EDS response which triggers an NACK. Once
   767  // the DNS resolver pushes an update, the test verifies that we switch to the
   768  // DNS cluster and can make a successful RPC.
   769  func (s) TestAggregateCluster_BadEDSFromError_GoodToBadDNS(t *testing.T) {
   770  	// Start an xDS management server.
   771  	managementServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
   772  	defer cleanup2()
   773  
   774  	// Start a test service backend.
   775  	server := stubserver.StartTestService(t, nil)
   776  	defer server.Stop()
   777  	dnsHostName, dnsPort := hostAndPortFromAddress(t, server.Address)
   778  
   779  	// Configure an aggregate cluster pointing to an EDS and LOGICAL_DNS
   780  	// cluster. Also configure an empty endpoints resource for the EDS cluster
   781  	// that contains no endpoints.
   782  	const (
   783  		edsClusterName = clusterName + "-eds"
   784  		dnsClusterName = clusterName + "-dns"
   785  	)
   786  	nackEndpointResource := e2e.DefaultEndpoint(edsServiceName, "localhost", nil)
   787  	nackEndpointResource.Endpoints = []*v3endpointpb.LocalityLbEndpoints{
   788  		{
   789  			LoadBalancingWeight: &wrapperspb.UInt32Value{
   790  				Value: 0, // causes an NACK
   791  			},
   792  		},
   793  	}
   794  	resources := e2e.UpdateOptions{
   795  		NodeID: nodeID,
   796  		Clusters: []*v3clusterpb.Cluster{
   797  			makeAggregateClusterResource(clusterName, []string{edsClusterName, dnsClusterName}),
   798  			e2e.DefaultCluster(edsClusterName, edsServiceName, e2e.SecurityLevelNone),
   799  			makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort),
   800  		},
   801  		Endpoints:      []*v3endpointpb.ClusterLoadAssignment{nackEndpointResource},
   802  		SkipValidation: true,
   803  	}
   804  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   805  	defer cancel()
   806  	if err := managementServer.Update(ctx, resources); err != nil {
   807  		t.Fatal(err)
   808  	}
   809  
   810  	// Create xDS client, configure cds_experimental LB policy with a manual
   811  	// resolver, and dial the test backends.
   812  	cc, cleanup := setupAndDial(t, bootstrapContents)
   813  	defer cleanup()
   814  
   815  	// Ensure that RPCs start getting routed to the first backend since the
   816  	// child policy for a LOGICAL_DNS cluster is pick_first by default.
   817  	pickfirst.CheckRPCsToBackend(ctx, cc, resolver.Address{Addr: server.Address})
   818  }
   819  
   820  // TestAggregateCluster_BadDNS_GoodEDS tests the case where the top-level
   821  // cluster is an aggregate cluster that resolves to an LOGICAL_DNS and EDS
   822  // cluster. When the DNS Resolver returns an error and EDS cluster returns a
   823  // good update, this test verifies the cluster_resolver balancer correctly falls
   824  // back from the LOGICAL_DNS cluster to the EDS cluster.
   825  func (s) TestAggregateCluster_BadDNS_GoodEDS(t *testing.T) {
   826  	// Start an xDS management server.
   827  	managementServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
   828  	defer cleanup2()
   829  
   830  	// Start a test service backend.
   831  	server := stubserver.StartTestService(t, nil)
   832  	defer server.Stop()
   833  	_, edsPort := hostAndPortFromAddress(t, server.Address)
   834  
   835  	// Configure an aggregate cluster pointing to an LOGICAL_DNS and EDS
   836  	// cluster. Also configure an endpoints resource for the EDS cluster.
   837  	const (
   838  		edsClusterName = clusterName + "-eds"
   839  		dnsClusterName = clusterName + "-dns"
   840  	)
   841  	resources := e2e.UpdateOptions{
   842  		NodeID: nodeID,
   843  		Clusters: []*v3clusterpb.Cluster{
   844  			makeAggregateClusterResource(clusterName, []string{dnsClusterName, edsClusterName}),
   845  			makeLogicalDNSClusterResource(dnsClusterName, "bad.ip.v4.address", 8080),
   846  			e2e.DefaultCluster(edsClusterName, edsServiceName, e2e.SecurityLevelNone),
   847  		},
   848  		Endpoints:      []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(edsServiceName, "localhost", []uint32{uint32(edsPort)})},
   849  		SkipValidation: true,
   850  	}
   851  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   852  	defer cancel()
   853  	if err := managementServer.Update(ctx, resources); err != nil {
   854  		t.Fatal(err)
   855  	}
   856  
   857  	// Create xDS client, configure cds_experimental LB policy with a manual
   858  	// resolver, and dial the test backends.
   859  	cc, cleanup := setupAndDial(t, bootstrapContents)
   860  	defer cleanup()
   861  
   862  	// RPCs should work, higher level DNS cluster errors so should fallback to
   863  	// EDS cluster.
   864  	client := testgrpc.NewTestServiceClient(cc)
   865  	peer := &peer.Peer{}
   866  	if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil {
   867  		t.Fatalf("EmptyCall() failed: %v", err)
   868  	}
   869  	if peer.Addr.String() != server.Address {
   870  		t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, server.Address)
   871  	}
   872  }
   873  
   874  // TestAggregateCluster_BadEDS_BadDNS tests the case where the top-level cluster
   875  // is an aggregate cluster that resolves to an EDS and LOGICAL_DNS cluster. When
   876  // the EDS request returns a resource that contains no endpoints, the test
   877  // verifies that we switch to the DNS cluster. When the DNS cluster returns an
   878  // error, the test verifies that RPCs fail with the error triggered by the DNS
   879  // Discovery Mechanism (from sending an empty address list down).
   880  func (s) TestAggregateCluster_BadEDS_BadDNS(t *testing.T) {
   881  	// Start an xDS management server.
   882  	managementServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
   883  	defer cleanup2()
   884  
   885  	// Configure an aggregate cluster pointing to an EDS and LOGICAL_DNS
   886  	// cluster. Also configure an empty endpoints resource for the EDS cluster
   887  	// that contains no endpoints.
   888  	const (
   889  		edsClusterName = clusterName + "-eds"
   890  		dnsClusterName = clusterName + "-dns"
   891  	)
   892  	emptyEndpointResource := e2e.DefaultEndpoint(edsServiceName, "localhost", nil)
   893  	resources := e2e.UpdateOptions{
   894  		NodeID: nodeID,
   895  		Clusters: []*v3clusterpb.Cluster{
   896  			makeAggregateClusterResource(clusterName, []string{edsClusterName, dnsClusterName}),
   897  			e2e.DefaultCluster(edsClusterName, edsServiceName, e2e.SecurityLevelNone),
   898  			makeLogicalDNSClusterResource(dnsClusterName, "bad.ip.v4.address", 8080),
   899  		},
   900  		Endpoints:      []*v3endpointpb.ClusterLoadAssignment{emptyEndpointResource},
   901  		SkipValidation: true,
   902  	}
   903  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   904  	defer cancel()
   905  	if err := managementServer.Update(ctx, resources); err != nil {
   906  		t.Fatal(err)
   907  	}
   908  
   909  	// Create xDS client, configure cds_experimental LB policy with a manual
   910  	// resolver, and dial the test backends.
   911  	cc, cleanup := setupAndDial(t, bootstrapContents)
   912  	defer cleanup()
   913  
   914  	// Ensure that the error from the DNS Resolver leads to an empty address
   915  	// update for both priorities.
   916  	client := testgrpc.NewTestServiceClient(cc)
   917  	for ctx.Err() == nil {
   918  		_, err := client.EmptyCall(ctx, &testpb.Empty{})
   919  		if err == nil {
   920  			t.Fatal("EmptyCall() succeeded when expected to fail")
   921  		}
   922  		if status.Code(err) == codes.Unavailable && strings.Contains(err.Error(), "produced zero addresses") {
   923  			break
   924  		}
   925  	}
   926  	if ctx.Err() != nil {
   927  		t.Fatalf("Timeout when waiting for RPCs to fail with expected code and error")
   928  	}
   929  }
   930  
   931  // TestAggregateCluster_NoFallback_EDSNackedWithPreviousGoodUpdate tests the
   932  // scenario where the top-level cluster is an aggregate cluster that resolves to
   933  // an EDS and LOGICAL_DNS cluster. The management server first sends a good EDS
   934  // response for the EDS cluster and the test verifies that RPCs get routed to
   935  // the EDS cluster. The management server then sends a bad EDS response. The
   936  // test verifies that the cluster_resolver LB policy continues to use the
   937  // previously received good update and that RPCs still get routed to the EDS
   938  // cluster.
   939  func (s) TestAggregateCluster_NoFallback_EDSNackedWithPreviousGoodUpdate(t *testing.T) {
   940  	// Start an xDS management server.
   941  	mgmtServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
   942  	defer cleanup2()
   943  
   944  	// Start two test backends and extract their host and port. The first
   945  	// backend is used for the EDS cluster and the second backend is used for
   946  	// the LOGICAL_DNS cluster.
   947  	servers, cleanup3 := startTestServiceBackends(t, 2)
   948  	defer cleanup3()
   949  	addrs, ports := backendAddressesAndPorts(t, servers)
   950  	dnsHostName, dnsPort := hostAndPortFromAddress(t, servers[1].Address)
   951  
   952  	// Configure an aggregate cluster pointing to an EDS and DNS cluster. Also
   953  	// configure an endpoints resource for the EDS cluster.
   954  	const (
   955  		edsClusterName = clusterName + "-eds"
   956  		dnsClusterName = clusterName + "-dns"
   957  	)
   958  	resources := e2e.UpdateOptions{
   959  		NodeID: nodeID,
   960  		Clusters: []*v3clusterpb.Cluster{
   961  			makeAggregateClusterResource(clusterName, []string{edsClusterName, dnsClusterName}),
   962  			e2e.DefaultCluster(edsClusterName, "", e2e.SecurityLevelNone),
   963  			makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort),
   964  		},
   965  		Endpoints:      []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(edsClusterName, "localhost", []uint32{uint32(ports[0])})},
   966  		SkipValidation: true,
   967  	}
   968  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   969  	defer cancel()
   970  	if err := mgmtServer.Update(ctx, resources); err != nil {
   971  		t.Fatal(err)
   972  	}
   973  
   974  	// Create xDS client, configure cds_experimental LB policy with a manual
   975  	// resolver, and dial the test backends.
   976  	cc, cleanup := setupAndDial(t, bootstrapContents)
   977  	defer cleanup()
   978  
   979  	// Make an RPC and ensure that it gets routed to the first backend since the
   980  	// EDS cluster is of higher priority than the LOGICAL_DNS cluster.
   981  	client := testgrpc.NewTestServiceClient(cc)
   982  	peer := &peer.Peer{}
   983  	if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil {
   984  		t.Fatalf("EmptyCall() failed: %v", err)
   985  	}
   986  	if peer.Addr.String() != addrs[0].Addr {
   987  		t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, addrs[0].Addr)
   988  	}
   989  
   990  	// Push an EDS resource from the management server that is expected to be
   991  	// NACKed by the xDS client. Since the cluster_resolver LB policy has a
   992  	// previously received good EDS resource, it will continue to use that.
   993  	resources.Endpoints[0].Endpoints[0].LbEndpoints[0].LoadBalancingWeight = &wrapperspb.UInt32Value{Value: 0}
   994  	if err := mgmtServer.Update(ctx, resources); err != nil {
   995  		t.Fatal(err)
   996  	}
   997  
   998  	// Ensure that RPCs continue to get routed to the EDS cluster for the next
   999  	// second.
  1000  	for end := time.Now().Add(time.Second); time.Now().Before(end); <-time.After(defaultTestShortTimeout) {
  1001  		if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer)); err != nil {
  1002  			t.Fatalf("EmptyCall() failed: %v", err)
  1003  		}
  1004  		if peer.Addr.String() != addrs[0].Addr {
  1005  			t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, addrs[0].Addr)
  1006  		}
  1007  	}
  1008  }
  1009  
  1010  // TestAggregateCluster_Fallback_EDSNackedWithoutPreviousGoodUpdate tests the
  1011  // scenario where the top-level cluster is an aggregate cluster that resolves to
  1012  // an EDS and LOGICAL_DNS cluster.  The management server sends a bad EDS
  1013  // response. The test verifies that the cluster_resolver LB policy falls back to
  1014  // the LOGICAL_DNS cluster, because it is supposed to treat the bad EDS response
  1015  // as though it received an update with no endpoints.
  1016  func (s) TestAggregateCluster_Fallback_EDSNackedWithoutPreviousGoodUpdate(t *testing.T) {
  1017  	// Start an xDS management server.
  1018  	mgmtServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
  1019  	defer cleanup2()
  1020  
  1021  	// Start two test backends and extract their host and port. The first
  1022  	// backend is used for the EDS cluster and the second backend is used for
  1023  	// the LOGICAL_DNS cluster.
  1024  	servers, cleanup3 := startTestServiceBackends(t, 2)
  1025  	defer cleanup3()
  1026  	addrs, ports := backendAddressesAndPorts(t, servers)
  1027  	dnsHostName, dnsPort := hostAndPortFromAddress(t, servers[1].Address)
  1028  
  1029  	// Configure an aggregate cluster pointing to an EDS and DNS cluster.
  1030  	const (
  1031  		edsClusterName = clusterName + "-eds"
  1032  		dnsClusterName = clusterName + "-dns"
  1033  	)
  1034  	resources := e2e.UpdateOptions{
  1035  		NodeID: nodeID,
  1036  		Clusters: []*v3clusterpb.Cluster{
  1037  			makeAggregateClusterResource(clusterName, []string{edsClusterName, dnsClusterName}),
  1038  			e2e.DefaultCluster(edsClusterName, "", e2e.SecurityLevelNone),
  1039  			makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort),
  1040  		},
  1041  		Endpoints:      []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(edsClusterName, "localhost", []uint32{uint32(ports[0])})},
  1042  		SkipValidation: true,
  1043  	}
  1044  
  1045  	// Set a load balancing weight of 0 for the backend in the EDS resource.
  1046  	// This is expected to be NACKed by the xDS client. Since the
  1047  	// cluster_resolver LB policy has no previously received good EDS resource,
  1048  	// it will treat this as though it received an update with no endpoints.
  1049  	resources.Endpoints[0].Endpoints[0].LbEndpoints[0].LoadBalancingWeight = &wrapperspb.UInt32Value{Value: 0}
  1050  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  1051  	defer cancel()
  1052  	if err := mgmtServer.Update(ctx, resources); err != nil {
  1053  		t.Fatal(err)
  1054  	}
  1055  
  1056  	// Create xDS client, configure cds_experimental LB policy with a manual
  1057  	// resolver, and dial the test backends.
  1058  	cc, cleanup := setupAndDial(t, bootstrapContents)
  1059  	defer cleanup()
  1060  
  1061  	// Make an RPC and ensure that it gets routed to the LOGICAL_DNS cluster.
  1062  	peer := &peer.Peer{}
  1063  	client := testgrpc.NewTestServiceClient(cc)
  1064  	if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil {
  1065  		t.Fatalf("EmptyCall() failed: %v", err)
  1066  	}
  1067  	if peer.Addr.String() != addrs[1].Addr {
  1068  		t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, addrs[1].Addr)
  1069  	}
  1070  }
  1071  
  1072  // TestAggregateCluster_Fallback_EDS_ResourceNotFound tests the scenario where
  1073  // the top-level cluster is an aggregate cluster that resolves to an EDS and
  1074  // LOGICAL_DNS cluster.  The management server does not respond with the EDS
  1075  // cluster. The test verifies that the cluster_resolver LB policy falls back to
  1076  // the LOGICAL_DNS cluster in this case.
  1077  func (s) TestAggregateCluster_Fallback_EDS_ResourceNotFound(t *testing.T) {
  1078  	// Start an xDS management server.
  1079  	mgmtServer, nodeID, _, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
  1080  	defer cleanup2()
  1081  
  1082  	// Start a test backend for the LOGICAL_DNS cluster.
  1083  	server := stubserver.StartTestService(t, nil)
  1084  	defer server.Stop()
  1085  	dnsHostName, dnsPort := hostAndPortFromAddress(t, server.Address)
  1086  
  1087  	// Configure an aggregate cluster pointing to an EDS and DNS cluster. No
  1088  	// endpoints are configured for the EDS cluster.
  1089  	const (
  1090  		edsClusterName = clusterName + "-eds"
  1091  		dnsClusterName = clusterName + "-dns"
  1092  	)
  1093  	resources := e2e.UpdateOptions{
  1094  		NodeID: nodeID,
  1095  		Clusters: []*v3clusterpb.Cluster{
  1096  			makeAggregateClusterResource(clusterName, []string{edsClusterName, dnsClusterName}),
  1097  			e2e.DefaultCluster(edsClusterName, "", e2e.SecurityLevelNone),
  1098  			makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort),
  1099  		},
  1100  		SkipValidation: true,
  1101  	}
  1102  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  1103  	defer cancel()
  1104  	if err := mgmtServer.Update(ctx, resources); err != nil {
  1105  		t.Fatal(err)
  1106  	}
  1107  
  1108  	// Create an xDS client talking to the above management server, configured
  1109  	// with a short watch expiry timeout.
  1110  	xdsClient, close, err := xdsclient.NewWithConfigForTesting(&bootstrap.Config{
  1111  		XDSServer: xdstestutils.ServerConfigForAddress(t, mgmtServer.Address),
  1112  		NodeProto: &v3corepb.Node{Id: nodeID},
  1113  	}, defaultTestWatchExpiryTimeout, time.Duration(0))
  1114  	if err != nil {
  1115  		t.Fatalf("failed to create xds client: %v", err)
  1116  	}
  1117  	defer close()
  1118  
  1119  	// Create a manual resolver and push a service config specifying the use of
  1120  	// the cds LB policy as the top-level LB policy, and a corresponding config
  1121  	// with a single cluster.
  1122  	r := manual.NewBuilderWithScheme("whatever")
  1123  	jsonSC := fmt.Sprintf(`{
  1124  			"loadBalancingConfig":[{
  1125  				"cds_experimental":{
  1126  					"cluster": "%s"
  1127  				}
  1128  			}]
  1129  		}`, clusterName)
  1130  	scpr := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC)
  1131  	r.InitialState(xdsclient.SetClient(resolver.State{ServiceConfig: scpr}, xdsClient))
  1132  
  1133  	// Create a ClientConn.
  1134  	cc, err := grpc.Dial(r.Scheme()+":///test.service", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
  1135  	if err != nil {
  1136  		t.Fatalf("failed to dial local test server: %v", err)
  1137  	}
  1138  	defer cc.Close()
  1139  
  1140  	// Make an RPC and ensure that it gets routed to the LOGICAL_DNS cluster.
  1141  	// Even though the EDS cluster is of higher priority, since the management
  1142  	// server does not respond with an EDS resource, the cluster_resolver LB
  1143  	// policy is expected to fallback to the LOGICAL_DNS cluster once the watch
  1144  	// timeout expires.
  1145  	peer := &peer.Peer{}
  1146  	client := testgrpc.NewTestServiceClient(cc)
  1147  	if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil {
  1148  		t.Fatalf("EmptyCall() failed: %v", err)
  1149  	}
  1150  	if peer.Addr.String() != server.Address {
  1151  		t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, server.Address)
  1152  	}
  1153  }
  1154  

View as plain text