...

Source file src/google.golang.org/grpc/xds/internal/balancer/cdsbalancer/aggregate_cluster_test.go

Documentation: google.golang.org/grpc/xds/internal/balancer/cdsbalancer

     1  /*
     2   * Copyright 2021 gRPC authors.
     3   *
     4   * Licensed under the Apache License, Version 2.0 (the "License");
     5   * you may not use this file except in compliance with the License.
     6   * You may obtain a copy of the License at
     7   *
     8   *     http://www.apache.org/licenses/LICENSE-2.0
     9   *
    10   * Unless required by applicable law or agreed to in writing, software
    11   * distributed under the License is distributed on an "AS IS" BASIS,
    12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13   * See the License for the specific language governing permissions and
    14   * limitations under the License.
    15   */
    16  
    17  package cdsbalancer
    18  
    19  import (
    20  	"context"
    21  	"encoding/json"
    22  	"fmt"
    23  	"strings"
    24  	"testing"
    25  	"time"
    26  
    27  	"google.golang.org/grpc"
    28  	"google.golang.org/grpc/codes"
    29  	"google.golang.org/grpc/connectivity"
    30  	"google.golang.org/grpc/internal/pretty"
    31  	"google.golang.org/grpc/internal/stubserver"
    32  	"google.golang.org/grpc/internal/testutils"
    33  	"google.golang.org/grpc/internal/testutils/xds/e2e"
    34  	"google.golang.org/grpc/serviceconfig"
    35  	"google.golang.org/grpc/status"
    36  	"google.golang.org/grpc/xds/internal/balancer/clusterresolver"
    37  
    38  	v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
    39  	v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
    40  	testgrpc "google.golang.org/grpc/interop/grpc_testing"
    41  	testpb "google.golang.org/grpc/interop/grpc_testing"
    42  )
    43  
    44  // makeAggregateClusterResource returns an aggregate cluster resource with the
    45  // given name and list of child names.
    46  func makeAggregateClusterResource(name string, childNames []string) *v3clusterpb.Cluster {
    47  	return e2e.ClusterResourceWithOptions(e2e.ClusterOptions{
    48  		ClusterName: name,
    49  		Type:        e2e.ClusterTypeAggregate,
    50  		ChildNames:  childNames,
    51  	})
    52  }
    53  
    54  // makeLogicalDNSClusterResource returns a LOGICAL_DNS cluster resource with the
    55  // given name and given DNS host and port.
    56  func makeLogicalDNSClusterResource(name, dnsHost string, dnsPort uint32) *v3clusterpb.Cluster {
    57  	return e2e.ClusterResourceWithOptions(e2e.ClusterOptions{
    58  		ClusterName: name,
    59  		Type:        e2e.ClusterTypeLogicalDNS,
    60  		DNSHostName: dnsHost,
    61  		DNSPort:     dnsPort,
    62  	})
    63  }
    64  
    65  // Tests the case where the cluster resource requested by the cds LB policy is a
    66  // leaf cluster. The management server sends two updates for the same leaf
    67  // cluster resource. The test verifies that the load balancing configuration
    68  // pushed to the cluster_resolver LB policy is contains the expected discovery
    69  // mechanism corresponding to the leaf cluster, on both occasions.
    70  func (s) TestAggregateClusterSuccess_LeafNode(t *testing.T) {
    71  	tests := []struct {
    72  		name                  string
    73  		firstClusterResource  *v3clusterpb.Cluster
    74  		secondClusterResource *v3clusterpb.Cluster
    75  		wantFirstChildCfg     serviceconfig.LoadBalancingConfig
    76  		wantSecondChildCfg    serviceconfig.LoadBalancingConfig
    77  	}{
    78  		{
    79  			name:                  "eds",
    80  			firstClusterResource:  e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelNone),
    81  			secondClusterResource: e2e.DefaultCluster(clusterName, serviceName+"-new", e2e.SecurityLevelNone),
    82  			wantFirstChildCfg: &clusterresolver.LBConfig{
    83  				DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{{
    84  					Cluster:          clusterName,
    85  					Type:             clusterresolver.DiscoveryMechanismTypeEDS,
    86  					EDSServiceName:   serviceName,
    87  					OutlierDetection: json.RawMessage(`{}`),
    88  				}},
    89  				XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`),
    90  			},
    91  			wantSecondChildCfg: &clusterresolver.LBConfig{
    92  				DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{{
    93  					Cluster:          clusterName,
    94  					Type:             clusterresolver.DiscoveryMechanismTypeEDS,
    95  					EDSServiceName:   serviceName + "-new",
    96  					OutlierDetection: json.RawMessage(`{}`),
    97  				}},
    98  				XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`),
    99  			},
   100  		},
   101  		{
   102  			name:                  "dns",
   103  			firstClusterResource:  makeLogicalDNSClusterResource(clusterName, "dns_host", uint32(8080)),
   104  			secondClusterResource: makeLogicalDNSClusterResource(clusterName, "dns_host_new", uint32(8080)),
   105  			wantFirstChildCfg: &clusterresolver.LBConfig{
   106  				DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{{
   107  					Cluster:          clusterName,
   108  					Type:             clusterresolver.DiscoveryMechanismTypeLogicalDNS,
   109  					DNSHostname:      "dns_host:8080",
   110  					OutlierDetection: json.RawMessage(`{}`),
   111  				}},
   112  				XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`),
   113  			},
   114  			wantSecondChildCfg: &clusterresolver.LBConfig{
   115  				DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{{
   116  					Cluster:          clusterName,
   117  					Type:             clusterresolver.DiscoveryMechanismTypeLogicalDNS,
   118  					DNSHostname:      "dns_host_new:8080",
   119  					OutlierDetection: json.RawMessage(`{}`),
   120  				}},
   121  				XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`),
   122  			},
   123  		},
   124  	}
   125  
   126  	for _, test := range tests {
   127  		t.Run(test.name, func(t *testing.T) {
   128  			lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t)
   129  			mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t)
   130  
   131  			// Push the first cluster resource through the management server and
   132  			// verify the configuration pushed to the child policy.
   133  			resources := e2e.UpdateOptions{
   134  				NodeID:         nodeID,
   135  				Clusters:       []*v3clusterpb.Cluster{test.firstClusterResource},
   136  				SkipValidation: true,
   137  			}
   138  			ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   139  			defer cancel()
   140  			if err := mgmtServer.Update(ctx, resources); err != nil {
   141  				t.Fatal(err)
   142  			}
   143  			if err := compareLoadBalancingConfig(ctx, lbCfgCh, test.wantFirstChildCfg); err != nil {
   144  				t.Fatal(err)
   145  			}
   146  
   147  			// Push the second cluster resource through the management server and
   148  			// verify the configuration pushed to the child policy.
   149  			resources.Clusters[0] = test.secondClusterResource
   150  			if err := mgmtServer.Update(ctx, resources); err != nil {
   151  				t.Fatal(err)
   152  			}
   153  			if err := compareLoadBalancingConfig(ctx, lbCfgCh, test.wantSecondChildCfg); err != nil {
   154  				t.Fatal(err)
   155  			}
   156  		})
   157  	}
   158  }
   159  
   160  // Tests the case where the cluster resource requested by the cds LB policy is
   161  // an aggregate cluster root pointing to two child clusters, one of type EDS and
   162  // the other of type LogicalDNS. The test verifies that load balancing
   163  // configuration is pushed to the cluster_resolver LB policy only when all child
   164  // clusters are resolved and it also verifies that the pushed configuration
   165  // contains the expected discovery mechanisms. The test then updates the
   166  // aggregate cluster to point to two child clusters, the same leaf cluster of
   167  // type EDS and a different leaf cluster of type LogicalDNS and verifies that
   168  // the load balancing configuration pushed to the cluster_resolver LB policy
   169  // contains the expected discovery mechanisms.
   170  func (s) TestAggregateClusterSuccess_ThenUpdateChildClusters(t *testing.T) {
   171  	lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t)
   172  	mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t)
   173  
   174  	// Configure the management server with the aggregate cluster resource
   175  	// pointing to two child clusters, one EDS and one LogicalDNS. Include the
   176  	// resource corresponding to the EDS cluster here, but don't include
   177  	// resource corresponding to the LogicalDNS cluster yet.
   178  	resources := e2e.UpdateOptions{
   179  		NodeID: nodeID,
   180  		Clusters: []*v3clusterpb.Cluster{
   181  			makeAggregateClusterResource(clusterName, []string{edsClusterName, dnsClusterName}),
   182  			e2e.DefaultCluster(edsClusterName, serviceName, e2e.SecurityLevelNone),
   183  		},
   184  		SkipValidation: true,
   185  	}
   186  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   187  	defer cancel()
   188  	if err := mgmtServer.Update(ctx, resources); err != nil {
   189  		t.Fatal(err)
   190  	}
   191  
   192  	// Verify that no configuration is pushed to the child policy yet, because
   193  	// not all clusters making up the aggregate cluster have been resolved yet.
   194  	select {
   195  	case cfg := <-lbCfgCh:
   196  		t.Fatalf("Child policy received configuration when not expected to: %s", pretty.ToJSON(cfg))
   197  	case <-time.After(defaultTestShortTimeout):
   198  	}
   199  
   200  	// Now configure the LogicalDNS cluster in the management server. This
   201  	// should result in configuration being pushed down to the child policy.
   202  	resources.Clusters = append(resources.Clusters, makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort))
   203  	if err := mgmtServer.Update(ctx, resources); err != nil {
   204  		t.Fatal(err)
   205  	}
   206  
   207  	wantChildCfg := &clusterresolver.LBConfig{
   208  		DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{
   209  			{
   210  				Cluster:          edsClusterName,
   211  				Type:             clusterresolver.DiscoveryMechanismTypeEDS,
   212  				EDSServiceName:   serviceName,
   213  				OutlierDetection: json.RawMessage(`{}`),
   214  			},
   215  			{
   216  				Cluster:          dnsClusterName,
   217  				Type:             clusterresolver.DiscoveryMechanismTypeLogicalDNS,
   218  				DNSHostname:      fmt.Sprintf("%s:%d", dnsHostName, dnsPort),
   219  				OutlierDetection: json.RawMessage(`{}`),
   220  			},
   221  		},
   222  		XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`),
   223  	}
   224  	if err := compareLoadBalancingConfig(ctx, lbCfgCh, wantChildCfg); err != nil {
   225  		t.Fatal(err)
   226  	}
   227  
   228  	const dnsClusterNameNew = dnsClusterName + "-new"
   229  	const dnsHostNameNew = dnsHostName + "-new"
   230  	resources = e2e.UpdateOptions{
   231  		NodeID: nodeID,
   232  		Clusters: []*v3clusterpb.Cluster{
   233  			makeAggregateClusterResource(clusterName, []string{edsClusterName, dnsClusterNameNew}),
   234  			e2e.DefaultCluster(edsClusterName, serviceName, e2e.SecurityLevelNone),
   235  			makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort),
   236  			makeLogicalDNSClusterResource(dnsClusterNameNew, dnsHostNameNew, dnsPort),
   237  		},
   238  		SkipValidation: true,
   239  	}
   240  	if err := mgmtServer.Update(ctx, resources); err != nil {
   241  		t.Fatal(err)
   242  	}
   243  	wantChildCfg = &clusterresolver.LBConfig{
   244  		DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{
   245  			{
   246  				Cluster:          edsClusterName,
   247  				Type:             clusterresolver.DiscoveryMechanismTypeEDS,
   248  				EDSServiceName:   serviceName,
   249  				OutlierDetection: json.RawMessage(`{}`),
   250  			},
   251  			{
   252  				Cluster:          dnsClusterNameNew,
   253  				Type:             clusterresolver.DiscoveryMechanismTypeLogicalDNS,
   254  				DNSHostname:      fmt.Sprintf("%s:%d", dnsHostNameNew, dnsPort),
   255  				OutlierDetection: json.RawMessage(`{}`),
   256  			},
   257  		},
   258  		XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`),
   259  	}
   260  	if err := compareLoadBalancingConfig(ctx, lbCfgCh, wantChildCfg); err != nil {
   261  		t.Fatal(err)
   262  	}
   263  }
   264  
   265  // Tests the case where the cluster resource requested by the cds LB policy is
   266  // an aggregate cluster root pointing to two child clusters, one of type EDS and
   267  // the other of type LogicalDNS. The test verifies that the load balancing
   268  // configuration pushed to the cluster_resolver LB policy contains the discovery
   269  // mechanisms for both child clusters. The test then updates the root cluster
   270  // resource requested by the cds LB policy to a leaf cluster of type EDS and
   271  // verifies the load balancing configuration pushed to the cluster_resolver LB
   272  // policy contains a single discovery mechanism.
   273  func (s) TestAggregateClusterSuccess_ThenChangeRootToEDS(t *testing.T) {
   274  	lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t)
   275  	mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t)
   276  
   277  	// Configure the management server with the aggregate cluster resource
   278  	// pointing to two child clusters.
   279  	resources := e2e.UpdateOptions{
   280  		NodeID: nodeID,
   281  		Clusters: []*v3clusterpb.Cluster{
   282  			makeAggregateClusterResource(clusterName, []string{edsClusterName, dnsClusterName}),
   283  			e2e.DefaultCluster(edsClusterName, serviceName, e2e.SecurityLevelNone),
   284  			makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort),
   285  		},
   286  		SkipValidation: true,
   287  	}
   288  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   289  	defer cancel()
   290  	if err := mgmtServer.Update(ctx, resources); err != nil {
   291  		t.Fatal(err)
   292  	}
   293  
   294  	wantChildCfg := &clusterresolver.LBConfig{
   295  		DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{
   296  			{
   297  				Cluster:          edsClusterName,
   298  				Type:             clusterresolver.DiscoveryMechanismTypeEDS,
   299  				EDSServiceName:   serviceName,
   300  				OutlierDetection: json.RawMessage(`{}`),
   301  			},
   302  			{
   303  				Cluster:          dnsClusterName,
   304  				Type:             clusterresolver.DiscoveryMechanismTypeLogicalDNS,
   305  				DNSHostname:      fmt.Sprintf("%s:%d", dnsHostName, dnsPort),
   306  				OutlierDetection: json.RawMessage(`{}`),
   307  			},
   308  		},
   309  		XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`),
   310  	}
   311  	if err := compareLoadBalancingConfig(ctx, lbCfgCh, wantChildCfg); err != nil {
   312  		t.Fatal(err)
   313  	}
   314  
   315  	resources = e2e.UpdateOptions{
   316  		NodeID: nodeID,
   317  		Clusters: []*v3clusterpb.Cluster{
   318  			e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelNone),
   319  			makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort),
   320  		},
   321  		SkipValidation: true,
   322  	}
   323  	if err := mgmtServer.Update(ctx, resources); err != nil {
   324  		t.Fatal(err)
   325  	}
   326  	wantChildCfg = &clusterresolver.LBConfig{
   327  		DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{{
   328  			Cluster:          clusterName,
   329  			Type:             clusterresolver.DiscoveryMechanismTypeEDS,
   330  			EDSServiceName:   serviceName,
   331  			OutlierDetection: json.RawMessage(`{}`),
   332  		}},
   333  		XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`),
   334  	}
   335  	if err := compareLoadBalancingConfig(ctx, lbCfgCh, wantChildCfg); err != nil {
   336  		t.Fatal(err)
   337  	}
   338  }
   339  
   340  // Tests the case where a requested cluster resource switches between being a
   341  // leaf and an aggregate cluster pointing to an EDS and LogicalDNS child
   342  // cluster. In each of these cases, the test verifies that the load balancing
   343  // configuration pushed to the cluster_resolver LB policy contains the expected
   344  // discovery mechanisms.
   345  func (s) TestAggregatedClusterSuccess_SwitchBetweenLeafAndAggregate(t *testing.T) {
   346  	lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t)
   347  	mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t)
   348  
   349  	// Start off with the requested cluster being a leaf EDS cluster.
   350  	resources := e2e.UpdateOptions{
   351  		NodeID:         nodeID,
   352  		Clusters:       []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelNone)},
   353  		SkipValidation: true,
   354  	}
   355  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   356  	defer cancel()
   357  	if err := mgmtServer.Update(ctx, resources); err != nil {
   358  		t.Fatal(err)
   359  	}
   360  	wantChildCfg := &clusterresolver.LBConfig{
   361  		DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{{
   362  			Cluster:          clusterName,
   363  			Type:             clusterresolver.DiscoveryMechanismTypeEDS,
   364  			EDSServiceName:   serviceName,
   365  			OutlierDetection: json.RawMessage(`{}`),
   366  		}},
   367  		XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`),
   368  	}
   369  	if err := compareLoadBalancingConfig(ctx, lbCfgCh, wantChildCfg); err != nil {
   370  		t.Fatal(err)
   371  	}
   372  
   373  	// Switch the requested cluster to be an aggregate cluster pointing to two
   374  	// child clusters.
   375  	resources = e2e.UpdateOptions{
   376  		NodeID: nodeID,
   377  		Clusters: []*v3clusterpb.Cluster{
   378  			makeAggregateClusterResource(clusterName, []string{edsClusterName, dnsClusterName}),
   379  			e2e.DefaultCluster(edsClusterName, serviceName, e2e.SecurityLevelNone),
   380  			makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort),
   381  		},
   382  		SkipValidation: true,
   383  	}
   384  	if err := mgmtServer.Update(ctx, resources); err != nil {
   385  		t.Fatal(err)
   386  	}
   387  	wantChildCfg = &clusterresolver.LBConfig{
   388  		DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{
   389  			{
   390  				Cluster:          edsClusterName,
   391  				Type:             clusterresolver.DiscoveryMechanismTypeEDS,
   392  				EDSServiceName:   serviceName,
   393  				OutlierDetection: json.RawMessage(`{}`),
   394  			},
   395  			{
   396  				Cluster:          dnsClusterName,
   397  				Type:             clusterresolver.DiscoveryMechanismTypeLogicalDNS,
   398  				DNSHostname:      fmt.Sprintf("%s:%d", dnsHostName, dnsPort),
   399  				OutlierDetection: json.RawMessage(`{}`),
   400  			},
   401  		},
   402  		XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`),
   403  	}
   404  	if err := compareLoadBalancingConfig(ctx, lbCfgCh, wantChildCfg); err != nil {
   405  		t.Fatal(err)
   406  	}
   407  
   408  	// Switch the cluster back to a leaf EDS cluster.
   409  	resources = e2e.UpdateOptions{
   410  		NodeID:         nodeID,
   411  		Clusters:       []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelNone)},
   412  		SkipValidation: true,
   413  	}
   414  	if err := mgmtServer.Update(ctx, resources); err != nil {
   415  		t.Fatal(err)
   416  	}
   417  	wantChildCfg = &clusterresolver.LBConfig{
   418  		DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{{
   419  			Cluster:          clusterName,
   420  			Type:             clusterresolver.DiscoveryMechanismTypeEDS,
   421  			EDSServiceName:   serviceName,
   422  			OutlierDetection: json.RawMessage(`{}`),
   423  		}},
   424  		XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`),
   425  	}
   426  	if err := compareLoadBalancingConfig(ctx, lbCfgCh, wantChildCfg); err != nil {
   427  		t.Fatal(err)
   428  	}
   429  }
   430  
   431  // Tests the scenario where an aggregate cluster exceeds the maximum depth,
   432  // which is 16. Verfies that the channel moves to TRANSIENT_FAILURE, and the
   433  // error is propagated to RPC callers. The test then modifies the graph to no
   434  // longer exceed maximum depth, but be at the maximum allowed depth, and
   435  // verifies that an RPC can be made successfully.
   436  func (s) TestAggregatedClusterFailure_ExceedsMaxStackDepth(t *testing.T) {
   437  	mgmtServer, nodeID, cc, _, _, _, _ := setupWithManagementServer(t)
   438  
   439  	resources := e2e.UpdateOptions{
   440  		NodeID: nodeID,
   441  		Clusters: []*v3clusterpb.Cluster{
   442  			makeAggregateClusterResource(clusterName, []string{clusterName + "-1"}),
   443  			makeAggregateClusterResource(clusterName+"-1", []string{clusterName + "-2"}),
   444  			makeAggregateClusterResource(clusterName+"-2", []string{clusterName + "-3"}),
   445  			makeAggregateClusterResource(clusterName+"-3", []string{clusterName + "-4"}),
   446  			makeAggregateClusterResource(clusterName+"-4", []string{clusterName + "-5"}),
   447  			makeAggregateClusterResource(clusterName+"-5", []string{clusterName + "-6"}),
   448  			makeAggregateClusterResource(clusterName+"-6", []string{clusterName + "-7"}),
   449  			makeAggregateClusterResource(clusterName+"-7", []string{clusterName + "-8"}),
   450  			makeAggregateClusterResource(clusterName+"-8", []string{clusterName + "-9"}),
   451  			makeAggregateClusterResource(clusterName+"-9", []string{clusterName + "-10"}),
   452  			makeAggregateClusterResource(clusterName+"-10", []string{clusterName + "-11"}),
   453  			makeAggregateClusterResource(clusterName+"-11", []string{clusterName + "-12"}),
   454  			makeAggregateClusterResource(clusterName+"-12", []string{clusterName + "-13"}),
   455  			makeAggregateClusterResource(clusterName+"-13", []string{clusterName + "-14"}),
   456  			makeAggregateClusterResource(clusterName+"-14", []string{clusterName + "-15"}),
   457  			makeAggregateClusterResource(clusterName+"-15", []string{clusterName + "-16"}),
   458  			e2e.DefaultCluster(clusterName+"-16", serviceName, e2e.SecurityLevelNone),
   459  		},
   460  		SkipValidation: true,
   461  	}
   462  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   463  	defer cancel()
   464  	if err := mgmtServer.Update(ctx, resources); err != nil {
   465  		t.Fatal(err)
   466  	}
   467  
   468  	testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)
   469  
   470  	const wantErr = "aggregate cluster graph exceeds max depth"
   471  	client := testgrpc.NewTestServiceClient(cc)
   472  	_, err := client.EmptyCall(ctx, &testpb.Empty{})
   473  	if code := status.Code(err); code != codes.Unavailable {
   474  		t.Fatalf("EmptyCall() failed with code: %v, want %v", code, codes.Unavailable)
   475  	}
   476  	if err != nil && !strings.Contains(err.Error(), wantErr) {
   477  		t.Fatalf("EmptyCall() failed with err: %v, want err containing: %v", err, wantErr)
   478  	}
   479  
   480  	// Start a test service backend.
   481  	server := stubserver.StartTestService(t, nil)
   482  	t.Cleanup(server.Stop)
   483  
   484  	// Update the aggregate cluster resource to no longer exceed max depth, and
   485  	// be at the maximum depth allowed.
   486  	resources = e2e.UpdateOptions{
   487  		NodeID: nodeID,
   488  		Clusters: []*v3clusterpb.Cluster{
   489  			makeAggregateClusterResource(clusterName, []string{clusterName + "-1"}),
   490  			makeAggregateClusterResource(clusterName+"-1", []string{clusterName + "-2"}),
   491  			makeAggregateClusterResource(clusterName+"-2", []string{clusterName + "-3"}),
   492  			makeAggregateClusterResource(clusterName+"-3", []string{clusterName + "-4"}),
   493  			makeAggregateClusterResource(clusterName+"-4", []string{clusterName + "-5"}),
   494  			makeAggregateClusterResource(clusterName+"-5", []string{clusterName + "-6"}),
   495  			makeAggregateClusterResource(clusterName+"-6", []string{clusterName + "-7"}),
   496  			makeAggregateClusterResource(clusterName+"-7", []string{clusterName + "-8"}),
   497  			makeAggregateClusterResource(clusterName+"-8", []string{clusterName + "-9"}),
   498  			makeAggregateClusterResource(clusterName+"-9", []string{clusterName + "-10"}),
   499  			makeAggregateClusterResource(clusterName+"-10", []string{clusterName + "-11"}),
   500  			makeAggregateClusterResource(clusterName+"-11", []string{clusterName + "-12"}),
   501  			makeAggregateClusterResource(clusterName+"-12", []string{clusterName + "-13"}),
   502  			makeAggregateClusterResource(clusterName+"-13", []string{clusterName + "-14"}),
   503  			makeAggregateClusterResource(clusterName+"-14", []string{clusterName + "-15"}),
   504  			e2e.DefaultCluster(clusterName+"-15", serviceName, e2e.SecurityLevelNone),
   505  		},
   506  		Endpoints:      []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, "localhost", []uint32{testutils.ParsePort(t, server.Address)})},
   507  		SkipValidation: true,
   508  	}
   509  	if err := mgmtServer.Update(ctx, resources); err != nil {
   510  		t.Fatal(err)
   511  	}
   512  
   513  	// Verify that a successful RPC can be made.
   514  	if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
   515  		t.Fatalf("EmptyCall() failed: %v", err)
   516  	}
   517  }
   518  
   519  // Tests a diamond shaped aggregate cluster (A->[B,C]; B->D; C->D). Verifies
   520  // that the load balancing configuration pushed to the cluster_resolver LB
   521  // policy specifies cluster D only once. Also verifies that configuration is
   522  // pushed only after all child clusters are resolved.
   523  func (s) TestAggregatedClusterSuccess_DiamondDependency(t *testing.T) {
   524  	lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t)
   525  	mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t)
   526  
   527  	// Configure the management server with an aggregate cluster resource having
   528  	// a diamond dependency pattern, (A->[B,C]; B->D; C->D). Includes resources
   529  	// for cluster A, B and D, but don't include the resource for cluster C yet.
   530  	// This will help us verify that no configuration is pushed to the child
   531  	// policy until the whole cluster graph is resolved.
   532  	const (
   533  		clusterNameA = clusterName // cluster name in cds LB policy config
   534  		clusterNameB = clusterName + "-B"
   535  		clusterNameC = clusterName + "-C"
   536  		clusterNameD = clusterName + "-D"
   537  	)
   538  	resources := e2e.UpdateOptions{
   539  		NodeID: nodeID,
   540  		Clusters: []*v3clusterpb.Cluster{
   541  			makeAggregateClusterResource(clusterNameA, []string{clusterNameB, clusterNameC}),
   542  			makeAggregateClusterResource(clusterNameB, []string{clusterNameD}),
   543  			e2e.DefaultCluster(clusterNameD, serviceName, e2e.SecurityLevelNone),
   544  		},
   545  		SkipValidation: true,
   546  	}
   547  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   548  	defer cancel()
   549  	if err := mgmtServer.Update(ctx, resources); err != nil {
   550  		t.Fatal(err)
   551  	}
   552  
   553  	// Verify that no configuration is pushed to the child policy yet, because
   554  	// not all clusters making up the aggregate cluster have been resolved yet.
   555  	select {
   556  	case cfg := <-lbCfgCh:
   557  		t.Fatalf("Child policy received configuration when not expected to: %s", pretty.ToJSON(cfg))
   558  	case <-time.After(defaultTestShortTimeout):
   559  	}
   560  
   561  	// Now configure the resource for cluster C in the management server,
   562  	// thereby completing the cluster graph. This should result in configuration
   563  	// being pushed down to the child policy.
   564  	resources.Clusters = append(resources.Clusters, makeAggregateClusterResource(clusterNameC, []string{clusterNameD}))
   565  	if err := mgmtServer.Update(ctx, resources); err != nil {
   566  		t.Fatal(err)
   567  	}
   568  
   569  	wantChildCfg := &clusterresolver.LBConfig{
   570  		DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{{
   571  			Cluster:          clusterNameD,
   572  			Type:             clusterresolver.DiscoveryMechanismTypeEDS,
   573  			EDSServiceName:   serviceName,
   574  			OutlierDetection: json.RawMessage(`{}`),
   575  		}},
   576  		XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`),
   577  	}
   578  	if err := compareLoadBalancingConfig(ctx, lbCfgCh, wantChildCfg); err != nil {
   579  		t.Fatal(err)
   580  	}
   581  }
   582  
   583  // Tests the case where the aggregate cluster graph contains duplicates (A->[B,
   584  // C]; B->[C, D]). Verifies that the load balancing configuration pushed to the
   585  // cluster_resolver LB policy does not contain duplicates, and that the
   586  // discovery mechanism corresponding to cluster C is of higher priority than the
   587  // discovery mechanism for cluster D. Also verifies that the configuration is
   588  // pushed only after all child clusters are resolved.
   589  func (s) TestAggregatedClusterSuccess_IgnoreDups(t *testing.T) {
   590  	lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t)
   591  	mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t)
   592  
   593  	// Configure the management server with an aggregate cluster resource that
   594  	// has duplicates in the graph, (A->[B, C]; B->[C, D]). Include resources
   595  	// for clusters A, B and D, but don't configure the resource for cluster C
   596  	// yet. This will help us verify that no configuration is pushed to the
   597  	// child policy until the whole cluster graph is resolved.
   598  	const (
   599  		clusterNameA = clusterName // cluster name in cds LB policy config
   600  		clusterNameB = clusterName + "-B"
   601  		clusterNameC = clusterName + "-C"
   602  		clusterNameD = clusterName + "-D"
   603  	)
   604  	resources := e2e.UpdateOptions{
   605  		NodeID: nodeID,
   606  		Clusters: []*v3clusterpb.Cluster{
   607  			makeAggregateClusterResource(clusterNameA, []string{clusterNameB, clusterNameC}),
   608  			makeAggregateClusterResource(clusterNameB, []string{clusterNameC, clusterNameD}),
   609  			e2e.DefaultCluster(clusterNameD, serviceName, e2e.SecurityLevelNone),
   610  		},
   611  		SkipValidation: true,
   612  	}
   613  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   614  	defer cancel()
   615  	if err := mgmtServer.Update(ctx, resources); err != nil {
   616  		t.Fatal(err)
   617  	}
   618  
   619  	// Verify that no configuration is pushed to the child policy yet, because
   620  	// not all clusters making up the aggregate cluster have been resolved yet.
   621  	select {
   622  	case cfg := <-lbCfgCh:
   623  		t.Fatalf("Child policy received configuration when not expected to: %s", pretty.ToJSON(cfg))
   624  	case <-time.After(defaultTestShortTimeout):
   625  	}
   626  
   627  	// Now configure the resource for cluster C in the management server,
   628  	// thereby completing the cluster graph. This should result in configuration
   629  	// being pushed down to the child policy.
   630  	resources.Clusters = append(resources.Clusters, e2e.DefaultCluster(clusterNameC, serviceName, e2e.SecurityLevelNone))
   631  	if err := mgmtServer.Update(ctx, resources); err != nil {
   632  		t.Fatal(err)
   633  	}
   634  
   635  	wantChildCfg := &clusterresolver.LBConfig{
   636  		DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{
   637  			{
   638  				Cluster:          clusterNameC,
   639  				Type:             clusterresolver.DiscoveryMechanismTypeEDS,
   640  				EDSServiceName:   serviceName,
   641  				OutlierDetection: json.RawMessage(`{}`),
   642  			},
   643  			{
   644  				Cluster:          clusterNameD,
   645  				Type:             clusterresolver.DiscoveryMechanismTypeEDS,
   646  				EDSServiceName:   serviceName,
   647  				OutlierDetection: json.RawMessage(`{}`),
   648  			},
   649  		},
   650  		XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`),
   651  	}
   652  	if err := compareLoadBalancingConfig(ctx, lbCfgCh, wantChildCfg); err != nil {
   653  		t.Fatal(err)
   654  	}
   655  }
   656  
   657  // Tests the scenario where the aggregate cluster graph has a node that has
   658  // child node of itself. The case for this is A -> A, and since there is no base
   659  // cluster (EDS or Logical DNS), no configuration should be pushed to the child
   660  // policy.  The channel is expected to move to TRANSIENT_FAILURE and RPCs are
   661  // expected to fail with code UNAVAILABLE and an error message specifying that
   662  // the aggregate cluster grpah no leaf clusters.  Then the test updates A -> B,
   663  // where B is a leaf EDS cluster. Verifies that configuration is pushed to the
   664  // child policy and that an RPC can be successfully made.
   665  func (s) TestAggregatedCluster_NodeChildOfItself(t *testing.T) {
   666  	lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t)
   667  	mgmtServer, nodeID, cc, _, _, _, _ := setupWithManagementServer(t)
   668  
   669  	const (
   670  		clusterNameA = clusterName // cluster name in cds LB policy config
   671  		clusterNameB = clusterName + "-B"
   672  	)
   673  	// Configure the management server with an aggregate cluster resource whose
   674  	// child is itself.
   675  	resources := e2e.UpdateOptions{
   676  		NodeID:         nodeID,
   677  		Clusters:       []*v3clusterpb.Cluster{makeAggregateClusterResource(clusterNameA, []string{clusterNameA})},
   678  		SkipValidation: true,
   679  	}
   680  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   681  	defer cancel()
   682  	if err := mgmtServer.Update(ctx, resources); err != nil {
   683  		t.Fatal(err)
   684  	}
   685  
   686  	select {
   687  	case cfg := <-lbCfgCh:
   688  		t.Fatalf("Child policy received configuration when not expected to: %s", pretty.ToJSON(cfg))
   689  	case <-time.After(defaultTestShortTimeout):
   690  	}
   691  
   692  	testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)
   693  
   694  	// Verify that the RPC fails with expected code.
   695  	client := testgrpc.NewTestServiceClient(cc)
   696  	_, err := client.EmptyCall(ctx, &testpb.Empty{})
   697  	if gotCode, wantCode := status.Code(err), codes.Unavailable; gotCode != wantCode {
   698  		t.Fatalf("EmptyCall() failed with code: %v, want %v", gotCode, wantCode)
   699  	}
   700  	const wantErr = "aggregate cluster graph has no leaf clusters"
   701  	if !strings.Contains(err.Error(), wantErr) {
   702  		t.Fatalf("EmptyCall() failed with err: %v, want error containing %s", err, wantErr)
   703  	}
   704  
   705  	// Start a test service backend.
   706  	server := stubserver.StartTestService(t, nil)
   707  	t.Cleanup(server.Stop)
   708  
   709  	// Update the aggregate cluster to point to a leaf EDS cluster.
   710  	resources = e2e.UpdateOptions{
   711  		NodeID: nodeID,
   712  		Clusters: []*v3clusterpb.Cluster{
   713  			makeAggregateClusterResource(clusterNameA, []string{clusterNameB}),
   714  			e2e.DefaultCluster(clusterNameB, serviceName, e2e.SecurityLevelNone),
   715  		},
   716  		Endpoints:      []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, "localhost", []uint32{testutils.ParsePort(t, server.Address)})},
   717  		SkipValidation: true,
   718  	}
   719  	if err := mgmtServer.Update(ctx, resources); err != nil {
   720  		t.Fatal(err)
   721  	}
   722  
   723  	// Verify the configuration pushed to the child policy.
   724  	wantChildCfg := &clusterresolver.LBConfig{
   725  		DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{{
   726  			Cluster:          clusterNameB,
   727  			Type:             clusterresolver.DiscoveryMechanismTypeEDS,
   728  			EDSServiceName:   serviceName,
   729  			OutlierDetection: json.RawMessage(`{}`),
   730  		}},
   731  		XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`),
   732  	}
   733  	if err := compareLoadBalancingConfig(ctx, lbCfgCh, wantChildCfg); err != nil {
   734  		t.Fatal(err)
   735  	}
   736  
   737  	// Verify that a successful RPC can be made.
   738  	if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
   739  		t.Fatalf("EmptyCall() failed: %v", err)
   740  	}
   741  }
   742  
   743  // Tests the scenario where the aggregate cluster graph contains a cycle and
   744  // contains no leaf clusters. The case used here is [A -> B, B -> A]. As there
   745  // are no leaf clusters in this graph, no configuration should be pushed to the
   746  // child policy. The channel is expected to move to TRANSIENT_FAILURE and RPCs
   747  // are expected to fail with code UNAVAILABLE and an error message specifying
   748  // that the aggregate cluster graph has no leaf clusters.
   749  func (s) TestAggregatedCluster_CycleWithNoLeafNode(t *testing.T) {
   750  	lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t)
   751  	mgmtServer, nodeID, cc, _, _, _, _ := setupWithManagementServer(t)
   752  
   753  	const (
   754  		clusterNameA = clusterName // cluster name in cds LB policy config
   755  		clusterNameB = clusterName + "-B"
   756  	)
   757  	// Configure the management server with an aggregate cluster resource graph
   758  	// that contains a cycle and no leaf clusters.
   759  	resources := e2e.UpdateOptions{
   760  		NodeID: nodeID,
   761  		Clusters: []*v3clusterpb.Cluster{
   762  			makeAggregateClusterResource(clusterNameA, []string{clusterNameB}),
   763  			makeAggregateClusterResource(clusterNameB, []string{clusterNameA}),
   764  		},
   765  		SkipValidation: true,
   766  	}
   767  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   768  	defer cancel()
   769  	if err := mgmtServer.Update(ctx, resources); err != nil {
   770  		t.Fatal(err)
   771  	}
   772  
   773  	select {
   774  	case cfg := <-lbCfgCh:
   775  		t.Fatalf("Child policy received configuration when not expected to: %s", pretty.ToJSON(cfg))
   776  	case <-time.After(defaultTestShortTimeout):
   777  	}
   778  
   779  	testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)
   780  
   781  	// Verify that the RPC fails with expected code.
   782  	client := testgrpc.NewTestServiceClient(cc)
   783  	_, err := client.EmptyCall(ctx, &testpb.Empty{})
   784  	if gotCode, wantCode := status.Code(err), codes.Unavailable; gotCode != wantCode {
   785  		t.Fatalf("EmptyCall() failed with code: %v, want %v", gotCode, wantCode)
   786  	}
   787  	const wantErr = "aggregate cluster graph has no leaf clusters"
   788  	if !strings.Contains(err.Error(), wantErr) {
   789  		t.Fatalf("EmptyCall() failed with err: %v, want %s", err, wantErr)
   790  	}
   791  }
   792  
   793  // Tests the scenario where the aggregate cluster graph contains a cycle and
   794  // also contains a leaf cluster. The case used here is [A -> B, B -> A, C]. As
   795  // there is a leaf cluster in this graph , configuration should be pushed to the
   796  // child policy and RPCs should get routed to that leaf cluster.
   797  func (s) TestAggregatedCluster_CycleWithLeafNode(t *testing.T) {
   798  	lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t)
   799  	mgmtServer, nodeID, cc, _, _, _, _ := setupWithManagementServer(t)
   800  
   801  	// Start a test service backend.
   802  	server := stubserver.StartTestService(t, nil)
   803  	t.Cleanup(server.Stop)
   804  
   805  	const (
   806  		clusterNameA = clusterName // cluster name in cds LB policy config
   807  		clusterNameB = clusterName + "-B"
   808  		clusterNameC = clusterName + "-C"
   809  	)
   810  	// Configure the management server with an aggregate cluster resource graph
   811  	// that contains a cycle, but also contains a leaf cluster.
   812  	resources := e2e.UpdateOptions{
   813  		NodeID: nodeID,
   814  		Clusters: []*v3clusterpb.Cluster{
   815  			makeAggregateClusterResource(clusterNameA, []string{clusterNameB}),
   816  			makeAggregateClusterResource(clusterNameB, []string{clusterNameA, clusterNameC}),
   817  			e2e.DefaultCluster(clusterNameC, serviceName, e2e.SecurityLevelNone),
   818  		},
   819  		Endpoints:      []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, "localhost", []uint32{testutils.ParsePort(t, server.Address)})},
   820  		SkipValidation: true,
   821  	}
   822  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   823  	defer cancel()
   824  	if err := mgmtServer.Update(ctx, resources); err != nil {
   825  		t.Fatal(err)
   826  	}
   827  
   828  	// Verify the configuration pushed to the child policy.
   829  	wantChildCfg := &clusterresolver.LBConfig{
   830  		DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{{
   831  			Cluster:          clusterNameC,
   832  			Type:             clusterresolver.DiscoveryMechanismTypeEDS,
   833  			EDSServiceName:   serviceName,
   834  			OutlierDetection: json.RawMessage(`{}`),
   835  		}},
   836  		XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`),
   837  	}
   838  	if err := compareLoadBalancingConfig(ctx, lbCfgCh, wantChildCfg); err != nil {
   839  		t.Fatal(err)
   840  	}
   841  
   842  	// Verify that a successful RPC can be made.
   843  	client := testgrpc.NewTestServiceClient(cc)
   844  	if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
   845  		t.Fatalf("EmptyCall() failed: %v", err)
   846  	}
   847  }
   848  

View as plain text