...

Source file src/google.golang.org/grpc/xds/internal/balancer/clusterresolver/e2e_test/balancer_test.go

Documentation: google.golang.org/grpc/xds/internal/balancer/clusterresolver/e2e_test

     1  /*
     2   * Copyright 2023 gRPC authors.
     3   *
     4   * Licensed under the Apache License, Version 2.0 (the "License");
     5   * you may not use this file except in compliance with the License.
     6   * You may obtain a copy of the License at
     7   *
     8   *     http://www.apache.org/licenses/LICENSE-2.0
     9   *
    10   * Unless required by applicable law or agreed to in writing, software
    11   * distributed under the License is distributed on an "AS IS" BASIS,
    12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13   * See the License for the specific language governing permissions and
    14   * limitations under the License.
    15   */
    16  
    17  package e2e_test
    18  
    19  import (
    20  	"context"
    21  	"encoding/json"
    22  	"fmt"
    23  	"strings"
    24  	"testing"
    25  	"time"
    26  
    27  	"github.com/google/go-cmp/cmp"
    28  	"google.golang.org/grpc"
    29  	"google.golang.org/grpc/balancer"
    30  	"google.golang.org/grpc/balancer/roundrobin"
    31  	"google.golang.org/grpc/codes"
    32  	"google.golang.org/grpc/connectivity"
    33  	"google.golang.org/grpc/credentials/insecure"
    34  	"google.golang.org/grpc/internal"
    35  	"google.golang.org/grpc/internal/balancer/stub"
    36  	iserviceconfig "google.golang.org/grpc/internal/serviceconfig"
    37  	"google.golang.org/grpc/internal/stubserver"
    38  	"google.golang.org/grpc/internal/testutils"
    39  	"google.golang.org/grpc/internal/testutils/xds/e2e"
    40  	"google.golang.org/grpc/resolver"
    41  	"google.golang.org/grpc/resolver/manual"
    42  	"google.golang.org/grpc/serviceconfig"
    43  	"google.golang.org/grpc/status"
    44  	"google.golang.org/grpc/xds/internal/balancer/clusterimpl"
    45  	"google.golang.org/grpc/xds/internal/balancer/outlierdetection"
    46  	"google.golang.org/grpc/xds/internal/balancer/priority"
    47  	"google.golang.org/grpc/xds/internal/balancer/wrrlocality"
    48  	"google.golang.org/grpc/xds/internal/xdsclient"
    49  	"google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version"
    50  	"google.golang.org/protobuf/types/known/durationpb"
    51  	"google.golang.org/protobuf/types/known/wrapperspb"
    52  
    53  	v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
    54  	v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
    55  	v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
    56  	v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
    57  	testgrpc "google.golang.org/grpc/interop/grpc_testing"
    58  	testpb "google.golang.org/grpc/interop/grpc_testing"
    59  
    60  	_ "google.golang.org/grpc/xds/internal/balancer/cdsbalancer" // Register the "cds_experimental" LB policy.
    61  )
    62  
    63  // setupAndDial performs common setup across all tests
    64  //
    65  //   - creates an xDS client with the passed in bootstrap contents
    66  //   - creates a  manual resolver that configures `cds_experimental` as the
    67  //     top-level LB policy.
    68  //   - creates a ClientConn to talk to the test backends
    69  //
    70  // Returns a function to close the ClientConn and the xDS client.
    71  func setupAndDial(t *testing.T, bootstrapContents []byte) (*grpc.ClientConn, func()) {
    72  	t.Helper()
    73  
    74  	// Create an xDS client for use by the cluster_resolver LB policy.
    75  	xdsC, xdsClose, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
    76  	if err != nil {
    77  		t.Fatalf("Failed to create xDS client: %v", err)
    78  	}
    79  
    80  	// Create a manual resolver and push a service config specifying the use of
    81  	// the cds LB policy as the top-level LB policy, and a corresponding config
    82  	// with a single cluster.
    83  	r := manual.NewBuilderWithScheme("whatever")
    84  	jsonSC := fmt.Sprintf(`{
    85  			"loadBalancingConfig":[{
    86  				"cds_experimental":{
    87  					"cluster": "%s"
    88  				}
    89  			}]
    90  		}`, clusterName)
    91  	scpr := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC)
    92  	r.InitialState(xdsclient.SetClient(resolver.State{ServiceConfig: scpr}, xdsC))
    93  
    94  	// Create a ClientConn and make a successful RPC.
    95  	cc, err := grpc.Dial(r.Scheme()+":///test.service", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
    96  	if err != nil {
    97  		xdsClose()
    98  		t.Fatalf("Failed to dial local test server: %v", err)
    99  	}
   100  	return cc, func() {
   101  		xdsClose()
   102  		cc.Close()
   103  	}
   104  }
   105  
   106  // TestErrorFromParentLB_ConnectionError tests the case where the parent of the
   107  // clusterresolver LB policy sends its a connection error. The parent policy,
   108  // CDS LB policy, sends a connection error when the ADS stream to the management
   109  // server breaks. The test verifies that there is no perceivable effect because
   110  // of this connection error, and that RPCs continue to work (because the LB
   111  // policies are expected to use previously received xDS resources).
   112  func (s) TestErrorFromParentLB_ConnectionError(t *testing.T) {
   113  	// Create a listener to be used by the management server. The test will
   114  	// close this listener to simulate ADS stream breakage.
   115  	lis, err := testutils.LocalTCPListener()
   116  	if err != nil {
   117  		t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
   118  	}
   119  
   120  	// Start an xDS management server with the above restartable listener, and
   121  	// push a channel when the stream is closed.
   122  	streamClosedCh := make(chan struct{}, 1)
   123  	managementServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{
   124  		Listener: lis,
   125  		OnStreamClosed: func(int64, *v3corepb.Node) {
   126  			select {
   127  			case streamClosedCh <- struct{}{}:
   128  			default:
   129  			}
   130  		},
   131  	})
   132  	defer cleanup()
   133  
   134  	server := stubserver.StartTestService(t, nil)
   135  	defer server.Stop()
   136  
   137  	// Configure cluster and endpoints resources in the management server.
   138  	resources := e2e.UpdateOptions{
   139  		NodeID:         nodeID,
   140  		Clusters:       []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, edsServiceName, e2e.SecurityLevelNone)},
   141  		Endpoints:      []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(edsServiceName, "localhost", []uint32{testutils.ParsePort(t, server.Address)})},
   142  		SkipValidation: true,
   143  	}
   144  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   145  	defer cancel()
   146  	if err := managementServer.Update(ctx, resources); err != nil {
   147  		t.Fatal(err)
   148  	}
   149  
   150  	// Create xDS client, configure cds_experimental LB policy with a manual
   151  	// resolver, and dial the test backends.
   152  	cc, cleanup := setupAndDial(t, bootstrapContents)
   153  	defer cleanup()
   154  
   155  	client := testgrpc.NewTestServiceClient(cc)
   156  	if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
   157  		t.Fatalf("EmptyCall() failed: %v", err)
   158  	}
   159  
   160  	// Close the listener and ensure that the ADS stream breaks.
   161  	lis.Close()
   162  	select {
   163  	case <-ctx.Done():
   164  		t.Fatal("Timeout when waiting for ADS stream to close")
   165  	default:
   166  	}
   167  
   168  	// Ensure that RPCs continue to succeed for the next second.
   169  	for end := time.Now().Add(time.Second); time.Now().Before(end); <-time.After(defaultTestShortTimeout) {
   170  		if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
   171  			t.Fatalf("EmptyCall() failed: %v", err)
   172  		}
   173  	}
   174  }
   175  
   176  // TestErrorFromParentLB_ResourceNotFound tests the case where the parent of the
   177  // clusterresolver LB policy sends it a resource-not-found error. The parent
   178  // policy, CDS LB policy, sends a resource-not-found error when the cluster
   179  // resource associated with these LB policies is removed by the management
   180  // server. The test verifies that the associated EDS is canceled and RPCs fail.
   181  // It also ensures that when the Cluster resource is added back, the EDS
   182  // resource is re-requested and RPCs being to succeed.
   183  func (s) TestErrorFromParentLB_ResourceNotFound(t *testing.T) {
   184  	// Start an xDS management server that uses a couple of channels to
   185  	// notify the test about the following events:
   186  	// - an EDS requested with the expected resource name is requested
   187  	// - EDS resource is unrequested, i.e, an EDS request with no resource name
   188  	//   is received, which indicates that we are not longer interested in that
   189  	//   resource.
   190  	edsResourceRequestedCh := make(chan struct{}, 1)
   191  	edsResourceCanceledCh := make(chan struct{}, 1)
   192  	managementServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{
   193  		OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error {
   194  			if req.GetTypeUrl() == version.V3EndpointsURL {
   195  				switch len(req.GetResourceNames()) {
   196  				case 0:
   197  					select {
   198  					case edsResourceCanceledCh <- struct{}{}:
   199  					default:
   200  					}
   201  				case 1:
   202  					if req.GetResourceNames()[0] == edsServiceName {
   203  						select {
   204  						case edsResourceRequestedCh <- struct{}{}:
   205  						default:
   206  						}
   207  					}
   208  				default:
   209  					t.Errorf("Unexpected number of resources, %d, in an EDS request", len(req.GetResourceNames()))
   210  				}
   211  			}
   212  			return nil
   213  		},
   214  	})
   215  	defer cleanup()
   216  
   217  	server := stubserver.StartTestService(t, nil)
   218  	defer server.Stop()
   219  
   220  	// Configure cluster and endpoints resources in the management server.
   221  	resources := e2e.UpdateOptions{
   222  		NodeID:         nodeID,
   223  		Clusters:       []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, edsServiceName, e2e.SecurityLevelNone)},
   224  		Endpoints:      []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(edsServiceName, "localhost", []uint32{testutils.ParsePort(t, server.Address)})},
   225  		SkipValidation: true,
   226  	}
   227  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   228  	defer cancel()
   229  	if err := managementServer.Update(ctx, resources); err != nil {
   230  		t.Fatal(err)
   231  	}
   232  
   233  	// Create xDS client, configure cds_experimental LB policy with a manual
   234  	// resolver, and dial the test backends.
   235  	cc, cleanup := setupAndDial(t, bootstrapContents)
   236  	defer cleanup()
   237  
   238  	// Wait for the EDS resource to be requested.
   239  	select {
   240  	case <-ctx.Done():
   241  		t.Fatal("Timeout when waiting for EDS resource to be requested")
   242  	case <-edsResourceRequestedCh:
   243  	}
   244  
   245  	// Ensure that a successful RPC can be made.
   246  	client := testgrpc.NewTestServiceClient(cc)
   247  	if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
   248  		t.Fatalf("EmptyCall() failed: %v", err)
   249  	}
   250  
   251  	// Delete the cluster resource from the management server.
   252  	resources.Clusters = nil
   253  	if err := managementServer.Update(ctx, resources); err != nil {
   254  		t.Fatal(err)
   255  	}
   256  
   257  	// Wait for the EDS resource to be not requested anymore.
   258  	select {
   259  	case <-ctx.Done():
   260  		t.Fatal("Timeout when waiting for EDS resource to not requested")
   261  	case <-edsResourceCanceledCh:
   262  	}
   263  
   264  	// Ensure that RPCs start to fail with expected error.
   265  	for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) {
   266  		sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
   267  		defer sCancel()
   268  		_, err := client.EmptyCall(sCtx, &testpb.Empty{})
   269  		if status.Code(err) == codes.Unavailable && strings.Contains(err.Error(), "all priorities are removed") {
   270  			break
   271  		}
   272  		if err != nil {
   273  			t.Logf("EmptyCall failed: %v", err)
   274  		}
   275  	}
   276  	if ctx.Err() != nil {
   277  		t.Fatalf("RPCs did not fail after removal of Cluster resource")
   278  	}
   279  
   280  	testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)
   281  
   282  	// Configure cluster and endpoints resources in the management server.
   283  	resources = e2e.UpdateOptions{
   284  		NodeID:         nodeID,
   285  		Clusters:       []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, edsServiceName, e2e.SecurityLevelNone)},
   286  		Endpoints:      []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(edsServiceName, "localhost", []uint32{testutils.ParsePort(t, server.Address)})},
   287  		SkipValidation: true,
   288  	}
   289  	if err := managementServer.Update(ctx, resources); err != nil {
   290  		t.Fatal(err)
   291  	}
   292  
   293  	// Wait for the EDS resource to be requested again.
   294  	select {
   295  	case <-ctx.Done():
   296  		t.Fatal("Timeout when waiting for EDS resource to be requested")
   297  	case <-edsResourceRequestedCh:
   298  	}
   299  
   300  	// Ensure that a successful RPC can be made.
   301  	for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) {
   302  		sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
   303  		defer sCancel()
   304  		if _, err := client.EmptyCall(sCtx, &testpb.Empty{}); err != nil {
   305  			t.Logf("EmptyCall failed: %v", err)
   306  			continue
   307  		}
   308  		break
   309  	}
   310  	if ctx.Err() != nil {
   311  		t.Fatalf("RPCs did not fail after removal of Cluster resource")
   312  	}
   313  }
   314  
   315  // Test verifies that when the received Cluster resource contains outlier
   316  // detection configuration, the LB config pushed to the child policy contains
   317  // the appropriate configuration for the outlier detection LB policy.
   318  func (s) TestOutlierDetectionConfigPropagationToChildPolicy(t *testing.T) {
   319  	// Unregister the priority balancer builder for the duration of this test,
   320  	// and register a policy under the same name that makes the LB config
   321  	// pushed to it available to the test.
   322  	priorityBuilder := balancer.Get(priority.Name)
   323  	internal.BalancerUnregister(priorityBuilder.Name())
   324  	lbCfgCh := make(chan serviceconfig.LoadBalancingConfig, 1)
   325  	stub.Register(priority.Name, stub.BalancerFuncs{
   326  		Init: func(bd *stub.BalancerData) {
   327  			bd.Data = priorityBuilder.Build(bd.ClientConn, bd.BuildOptions)
   328  		},
   329  		ParseConfig: func(lbCfg json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
   330  			return priorityBuilder.(balancer.ConfigParser).ParseConfig(lbCfg)
   331  		},
   332  		UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
   333  			select {
   334  			case lbCfgCh <- ccs.BalancerConfig:
   335  			default:
   336  			}
   337  			bal := bd.Data.(balancer.Balancer)
   338  			return bal.UpdateClientConnState(ccs)
   339  		},
   340  		Close: func(bd *stub.BalancerData) {
   341  			bal := bd.Data.(balancer.Balancer)
   342  			bal.Close()
   343  		},
   344  	})
   345  	defer balancer.Register(priorityBuilder)
   346  
   347  	managementServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
   348  	defer cleanup()
   349  
   350  	server := stubserver.StartTestService(t, nil)
   351  	defer server.Stop()
   352  
   353  	// Configure cluster and endpoints resources in the management server.
   354  	cluster := e2e.DefaultCluster(clusterName, edsServiceName, e2e.SecurityLevelNone)
   355  	cluster.OutlierDetection = &v3clusterpb.OutlierDetection{
   356  		Interval:                 durationpb.New(10 * time.Second),
   357  		BaseEjectionTime:         durationpb.New(30 * time.Second),
   358  		MaxEjectionTime:          durationpb.New(300 * time.Second),
   359  		MaxEjectionPercent:       wrapperspb.UInt32(10),
   360  		SuccessRateStdevFactor:   wrapperspb.UInt32(2000),
   361  		EnforcingSuccessRate:     wrapperspb.UInt32(50),
   362  		SuccessRateMinimumHosts:  wrapperspb.UInt32(10),
   363  		SuccessRateRequestVolume: wrapperspb.UInt32(50),
   364  	}
   365  	resources := e2e.UpdateOptions{
   366  		NodeID:         nodeID,
   367  		Clusters:       []*v3clusterpb.Cluster{cluster},
   368  		Endpoints:      []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(edsServiceName, "localhost", []uint32{testutils.ParsePort(t, server.Address)})},
   369  		SkipValidation: true,
   370  	}
   371  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   372  	defer cancel()
   373  	if err := managementServer.Update(ctx, resources); err != nil {
   374  		t.Fatal(err)
   375  	}
   376  
   377  	// Create xDS client, configure cds_experimental LB policy with a manual
   378  	// resolver, and dial the test backends.
   379  	_, cleanup = setupAndDial(t, bootstrapContents)
   380  	defer cleanup()
   381  
   382  	// The priority configuration generated should have Outlier Detection as a
   383  	// direct child due to Outlier Detection being turned on.
   384  	wantCfg := &priority.LBConfig{
   385  		Children: map[string]*priority.Child{
   386  			"priority-0-0": {
   387  				Config: &iserviceconfig.BalancerConfig{
   388  					Name: outlierdetection.Name,
   389  					Config: &outlierdetection.LBConfig{
   390  						Interval:           iserviceconfig.Duration(10 * time.Second), // default interval
   391  						BaseEjectionTime:   iserviceconfig.Duration(30 * time.Second),
   392  						MaxEjectionTime:    iserviceconfig.Duration(300 * time.Second),
   393  						MaxEjectionPercent: 10,
   394  						SuccessRateEjection: &outlierdetection.SuccessRateEjection{
   395  							StdevFactor:           2000,
   396  							EnforcementPercentage: 50,
   397  							MinimumHosts:          10,
   398  							RequestVolume:         50,
   399  						},
   400  						ChildPolicy: &iserviceconfig.BalancerConfig{
   401  							Name: clusterimpl.Name,
   402  							Config: &clusterimpl.LBConfig{
   403  								Cluster:        clusterName,
   404  								EDSServiceName: edsServiceName,
   405  								ChildPolicy: &iserviceconfig.BalancerConfig{
   406  									Name: wrrlocality.Name,
   407  									Config: &wrrlocality.LBConfig{
   408  										ChildPolicy: &iserviceconfig.BalancerConfig{
   409  											Name: roundrobin.Name,
   410  										},
   411  									},
   412  								},
   413  							},
   414  						},
   415  					},
   416  				},
   417  				IgnoreReresolutionRequests: true,
   418  			},
   419  		},
   420  		Priorities: []string{"priority-0-0"},
   421  	}
   422  
   423  	select {
   424  	case lbCfg := <-lbCfgCh:
   425  		gotCfg := lbCfg.(*priority.LBConfig)
   426  		if diff := cmp.Diff(wantCfg, gotCfg); diff != "" {
   427  			t.Fatalf("Child policy received unexpected diff in config (-want +got):\n%s", diff)
   428  		}
   429  	case <-ctx.Done():
   430  		t.Fatalf("Timeout when waiting for child policy to receive its configuration")
   431  	}
   432  }
   433  

View as plain text