...

Source file src/google.golang.org/grpc/test/xds/xds_rls_clusterspecifier_plugin_test.go

Documentation: google.golang.org/grpc/test/xds

     1  /*
     2   *
     3   * Copyright 2022 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  package xds_test
    20  
    21  import (
    22  	"context"
    23  	"fmt"
    24  	"testing"
    25  
    26  	"google.golang.org/grpc"
    27  	"google.golang.org/grpc/credentials/insecure"
    28  	"google.golang.org/grpc/internal"
    29  	"google.golang.org/grpc/internal/stubserver"
    30  	"google.golang.org/grpc/internal/testutils"
    31  	"google.golang.org/grpc/internal/testutils/rls"
    32  	"google.golang.org/grpc/internal/testutils/xds/e2e"
    33  	"google.golang.org/protobuf/types/known/durationpb"
    34  
    35  	v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
    36  	v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
    37  	v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
    38  	v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
    39  	rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1"
    40  	testgrpc "google.golang.org/grpc/interop/grpc_testing"
    41  	testpb "google.golang.org/grpc/interop/grpc_testing"
    42  
    43  	_ "google.golang.org/grpc/balancer/rls" // Register the RLS Load Balancing policy.
    44  )
    45  
    46  // defaultClientResourcesWithRLSCSP returns a set of resources (LDS, RDS, CDS, EDS) for a
    47  // client to connect to a server with a RLS Load Balancer as a child of Cluster Manager.
    48  func defaultClientResourcesWithRLSCSP(t *testing.T, lb e2e.LoadBalancingPolicy, params e2e.ResourceParams, rlsProto *rlspb.RouteLookupConfig) e2e.UpdateOptions {
    49  	routeConfigName := "route-" + params.DialTarget
    50  	clusterName := "cluster-" + params.DialTarget
    51  	endpointsName := "endpoints-" + params.DialTarget
    52  	return e2e.UpdateOptions{
    53  		NodeID:    params.NodeID,
    54  		Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(params.DialTarget, routeConfigName)},
    55  		Routes: []*v3routepb.RouteConfiguration{e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{
    56  			RouteConfigName:            routeConfigName,
    57  			ListenerName:               params.DialTarget,
    58  			ClusterSpecifierType:       e2e.RouteConfigClusterSpecifierTypeClusterSpecifierPlugin,
    59  			ClusterSpecifierPluginName: "rls-csp",
    60  			ClusterSpecifierPluginConfig: testutils.MarshalAny(t, &rlspb.RouteLookupClusterSpecifier{
    61  				RouteLookupConfig: rlsProto,
    62  			}),
    63  		})},
    64  		Clusters: []*v3clusterpb.Cluster{e2e.ClusterResourceWithOptions(e2e.ClusterOptions{
    65  			ClusterName:   clusterName,
    66  			ServiceName:   endpointsName,
    67  			Policy:        lb,
    68  			SecurityLevel: params.SecLevel,
    69  		})},
    70  		Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(endpointsName, params.Host, []uint32{params.Port})},
    71  	}
    72  }
    73  
    74  // TestRLSinxDS tests an xDS configured system with an RLS Balancer present.
    75  //
    76  // This test sets up the RLS Balancer using the RLS Cluster Specifier Plugin,
    77  // spins up a test service and has a fake RLS Server correctly respond with a
    78  // target corresponding to this test service. This test asserts an RPC proceeds
    79  // as normal with the RLS Balancer as part of system.
    80  func (s) TestRLSinxDS(t *testing.T) {
    81  	tests := []struct {
    82  		name     string
    83  		lbPolicy e2e.LoadBalancingPolicy
    84  	}{
    85  		{
    86  			name:     "roundrobin",
    87  			lbPolicy: e2e.LoadBalancingPolicyRoundRobin,
    88  		},
    89  		{
    90  			name:     "ringhash",
    91  			lbPolicy: e2e.LoadBalancingPolicyRingHash,
    92  		},
    93  	}
    94  	for _, test := range tests {
    95  		t.Run(test.name, func(t *testing.T) {
    96  			testRLSinxDS(t, test.lbPolicy)
    97  		})
    98  	}
    99  }
   100  
   101  func testRLSinxDS(t *testing.T, lbPolicy e2e.LoadBalancingPolicy) {
   102  	internal.RegisterRLSClusterSpecifierPluginForTesting()
   103  	defer internal.UnregisterRLSClusterSpecifierPluginForTesting()
   104  
   105  	// Set up all components and configuration necessary - management server,
   106  	// xDS resolver, fake RLS Server, and xDS configuration which specifies an
   107  	// RLS Balancer that communicates to this set up fake RLS Server.
   108  	managementServer, nodeID, _, resolver, cleanup1 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
   109  	defer cleanup1()
   110  
   111  	server := stubserver.StartTestService(t, nil)
   112  	defer server.Stop()
   113  
   114  	lis := testutils.NewListenerWrapper(t, nil)
   115  	rlsServer, rlsRequestCh := rls.SetupFakeRLSServer(t, lis)
   116  	rlsProto := &rlspb.RouteLookupConfig{
   117  		GrpcKeybuilders:      []*rlspb.GrpcKeyBuilder{{Names: []*rlspb.GrpcKeyBuilder_Name{{Service: "grpc.testing.TestService"}}}},
   118  		LookupService:        rlsServer.Address,
   119  		LookupServiceTimeout: durationpb.New(defaultTestTimeout),
   120  		CacheSizeBytes:       1024,
   121  	}
   122  
   123  	const serviceName = "my-service-client-side-xds"
   124  	resources := defaultClientResourcesWithRLSCSP(t, lbPolicy, e2e.ResourceParams{
   125  		DialTarget: serviceName,
   126  		NodeID:     nodeID,
   127  		Host:       "localhost",
   128  		Port:       testutils.ParsePort(t, server.Address),
   129  		SecLevel:   e2e.SecurityLevelNone,
   130  	}, rlsProto)
   131  
   132  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   133  	defer cancel()
   134  	if err := managementServer.Update(ctx, resources); err != nil {
   135  		t.Fatal(err)
   136  	}
   137  
   138  	// Configure the fake RLS Server to set the RLS Balancers child CDS
   139  	// Cluster's name as the target for the RPC to use.
   140  	rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rls.RouteLookupResponse {
   141  		return &rls.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{"cluster-" + serviceName}}}
   142  	})
   143  
   144  	// Create a ClientConn and make a successful RPC.
   145  	cc, err := grpc.NewClient(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolver))
   146  	if err != nil {
   147  		t.Fatalf("failed to dial local test server: %v", err)
   148  	}
   149  	defer cc.Close()
   150  
   151  	client := testgrpc.NewTestServiceClient(cc)
   152  	// Successfully sending the RPC will require the RLS Load Balancer to
   153  	// communicate with the fake RLS Server for information about the target.
   154  	if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
   155  		t.Fatalf("rpc EmptyCall() failed: %v", err)
   156  	}
   157  
   158  	// These RLS Verifications makes sure the RLS Load Balancer is actually part
   159  	// of the xDS Configured system that correctly sends out RPC.
   160  
   161  	// Verify connection is established to RLS Server.
   162  	if _, err = lis.NewConnCh.Receive(ctx); err != nil {
   163  		t.Fatal("Timeout when waiting for RLS LB policy to create control channel")
   164  	}
   165  
   166  	// Verify an rls request is sent out to fake RLS Server.
   167  	select {
   168  	case <-ctx.Done():
   169  		t.Fatalf("Timeout when waiting for an RLS request to be sent out")
   170  	case <-rlsRequestCh:
   171  	}
   172  }
   173  

View as plain text