...

Source file src/google.golang.org/grpc/xds/internal/balancer/clusterimpl/tests/balancer_test.go

Documentation: google.golang.org/grpc/xds/internal/balancer/clusterimpl/tests

     1  /*
     2   *
     3   * Copyright 2023 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  package clusterimpl_test
    20  
    21  import (
    22  	"context"
    23  	"fmt"
    24  	"strings"
    25  	"testing"
    26  	"time"
    27  
    28  	"google.golang.org/grpc"
    29  	"google.golang.org/grpc/codes"
    30  	"google.golang.org/grpc/credentials/insecure"
    31  	"google.golang.org/grpc/internal/grpctest"
    32  	"google.golang.org/grpc/internal/stubserver"
    33  	"google.golang.org/grpc/internal/testutils"
    34  	"google.golang.org/grpc/internal/testutils/xds/e2e"
    35  	"google.golang.org/grpc/status"
    36  
    37  	v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
    38  	v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
    39  	testgrpc "google.golang.org/grpc/interop/grpc_testing"
    40  	testpb "google.golang.org/grpc/interop/grpc_testing"
    41  
    42  	_ "google.golang.org/grpc/xds"
    43  )
    44  
    45  const (
    46  	defaultTestTimeout      = 5 * time.Second
    47  	defaultTestShortTimeout = 100 * time.Millisecond
    48  )
    49  
    50  type s struct {
    51  	grpctest.Tester
    52  }
    53  
    54  func Test(t *testing.T) {
    55  	grpctest.RunSubTests(t, s{})
    56  }
    57  
    58  // TestConfigUpdateWithSameLoadReportingServerConfig tests the scenario where
    59  // the clusterimpl LB policy receives a config update with no change in the load
    60  // reporting server configuration. The test verifies that the existing load
    61  // repoting stream is not terminated and that a new load reporting stream is not
    62  // created.
    63  func (s) TestConfigUpdateWithSameLoadReportingServerConfig(t *testing.T) {
    64  	// Create an xDS management server that serves ADS and LRS requests.
    65  	opts := e2e.ManagementServerOptions{SupportLoadReportingService: true}
    66  	mgmtServer, nodeID, _, resolver, mgmtServerCleanup := e2e.SetupManagementServer(t, opts)
    67  	defer mgmtServerCleanup()
    68  
    69  	// Start a server backend exposing the test service.
    70  	server := stubserver.StartTestService(t, nil)
    71  	defer server.Stop()
    72  
    73  	// Configure the xDS management server with default resources. Override the
    74  	// default cluster to include an LRS server config pointing to self.
    75  	const serviceName = "my-test-xds-service"
    76  	resources := e2e.DefaultClientResources(e2e.ResourceParams{
    77  		DialTarget: serviceName,
    78  		NodeID:     nodeID,
    79  		Host:       "localhost",
    80  		Port:       testutils.ParsePort(t, server.Address),
    81  		SecLevel:   e2e.SecurityLevelNone,
    82  	})
    83  	resources.Clusters[0].LrsServer = &v3corepb.ConfigSource{
    84  		ConfigSourceSpecifier: &v3corepb.ConfigSource_Self{
    85  			Self: &v3corepb.SelfConfigSource{},
    86  		},
    87  	}
    88  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
    89  	defer cancel()
    90  	if err := mgmtServer.Update(ctx, resources); err != nil {
    91  		t.Fatal(err)
    92  	}
    93  
    94  	// Create a ClientConn and make a successful RPC.
    95  	cc, err := grpc.NewClient(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolver))
    96  	if err != nil {
    97  		t.Fatalf("failed to dial local test server: %v", err)
    98  	}
    99  	defer cc.Close()
   100  
   101  	client := testgrpc.NewTestServiceClient(cc)
   102  	if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
   103  		t.Fatalf("rpc EmptyCall() failed: %v", err)
   104  	}
   105  
   106  	// Ensure that an LRS stream is created.
   107  	if _, err := mgmtServer.LRSServer.LRSStreamOpenChan.Receive(ctx); err != nil {
   108  		t.Fatalf("Failure when waiting for an LRS stream to be opened: %v", err)
   109  	}
   110  
   111  	// Configure a new resource on the management server with drop config that
   112  	// drops all RPCs, but with no change in the load reporting server config.
   113  	resources.Endpoints = []*v3endpointpb.ClusterLoadAssignment{
   114  		e2e.EndpointResourceWithOptions(e2e.EndpointOptions{
   115  			ClusterName: "endpoints-" + serviceName,
   116  			Host:        "localhost",
   117  			Localities: []e2e.LocalityOptions{
   118  				{
   119  					Backends: []e2e.BackendOptions{{Port: testutils.ParsePort(t, server.Address)}},
   120  					Weight:   1,
   121  				},
   122  			},
   123  			DropPercents: map[string]int{"test-drop-everything": 100},
   124  		}),
   125  	}
   126  	if err := mgmtServer.Update(ctx, resources); err != nil {
   127  		t.Fatal(err)
   128  	}
   129  
   130  	// Repeatedly send RPCs until we sees that they are getting dropped, or the
   131  	// test context deadline expires. The former indicates that new config with
   132  	// drops has been applied.
   133  	for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) {
   134  		_, err := client.EmptyCall(ctx, &testpb.Empty{})
   135  		if err != nil && status.Code(err) == codes.Unavailable && strings.Contains(err.Error(), "RPC is dropped") {
   136  			break
   137  		}
   138  	}
   139  	if ctx.Err() != nil {
   140  		t.Fatalf("Timeout when waiting for RPCs to be dropped after config update")
   141  	}
   142  
   143  	// Ensure that the old LRS stream is not closed.
   144  	sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
   145  	defer sCancel()
   146  	if _, err := mgmtServer.LRSServer.LRSStreamCloseChan.Receive(sCtx); err == nil {
   147  		t.Fatal("LRS stream closed when expected not to")
   148  	}
   149  
   150  	// Also ensure that a new LRS stream is not created.
   151  	sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortTimeout)
   152  	defer sCancel()
   153  	if _, err := mgmtServer.LRSServer.LRSStreamOpenChan.Receive(sCtx); err == nil {
   154  		t.Fatal("New LRS stream created when expected not to")
   155  	}
   156  }
   157  

View as plain text