1
18
19 package xds_test
20
21 import (
22 "context"
23 "fmt"
24 "testing"
25
26 "google.golang.org/grpc"
27 "google.golang.org/grpc/credentials/insecure"
28 "google.golang.org/grpc/internal"
29 "google.golang.org/grpc/internal/stubserver"
30 "google.golang.org/grpc/internal/testutils"
31 "google.golang.org/grpc/internal/testutils/rls"
32 "google.golang.org/grpc/internal/testutils/xds/e2e"
33 "google.golang.org/protobuf/types/known/durationpb"
34
35 v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
36 v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
37 v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
38 v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
39 rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1"
40 testgrpc "google.golang.org/grpc/interop/grpc_testing"
41 testpb "google.golang.org/grpc/interop/grpc_testing"
42
43 _ "google.golang.org/grpc/balancer/rls"
44 )
45
46
47
48 func defaultClientResourcesWithRLSCSP(t *testing.T, lb e2e.LoadBalancingPolicy, params e2e.ResourceParams, rlsProto *rlspb.RouteLookupConfig) e2e.UpdateOptions {
49 routeConfigName := "route-" + params.DialTarget
50 clusterName := "cluster-" + params.DialTarget
51 endpointsName := "endpoints-" + params.DialTarget
52 return e2e.UpdateOptions{
53 NodeID: params.NodeID,
54 Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(params.DialTarget, routeConfigName)},
55 Routes: []*v3routepb.RouteConfiguration{e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{
56 RouteConfigName: routeConfigName,
57 ListenerName: params.DialTarget,
58 ClusterSpecifierType: e2e.RouteConfigClusterSpecifierTypeClusterSpecifierPlugin,
59 ClusterSpecifierPluginName: "rls-csp",
60 ClusterSpecifierPluginConfig: testutils.MarshalAny(t, &rlspb.RouteLookupClusterSpecifier{
61 RouteLookupConfig: rlsProto,
62 }),
63 })},
64 Clusters: []*v3clusterpb.Cluster{e2e.ClusterResourceWithOptions(e2e.ClusterOptions{
65 ClusterName: clusterName,
66 ServiceName: endpointsName,
67 Policy: lb,
68 SecurityLevel: params.SecLevel,
69 })},
70 Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(endpointsName, params.Host, []uint32{params.Port})},
71 }
72 }
73
74
75
76
77
78
79
80 func (s) TestRLSinxDS(t *testing.T) {
81 tests := []struct {
82 name string
83 lbPolicy e2e.LoadBalancingPolicy
84 }{
85 {
86 name: "roundrobin",
87 lbPolicy: e2e.LoadBalancingPolicyRoundRobin,
88 },
89 {
90 name: "ringhash",
91 lbPolicy: e2e.LoadBalancingPolicyRingHash,
92 },
93 }
94 for _, test := range tests {
95 t.Run(test.name, func(t *testing.T) {
96 testRLSinxDS(t, test.lbPolicy)
97 })
98 }
99 }
100
101 func testRLSinxDS(t *testing.T, lbPolicy e2e.LoadBalancingPolicy) {
102 internal.RegisterRLSClusterSpecifierPluginForTesting()
103 defer internal.UnregisterRLSClusterSpecifierPluginForTesting()
104
105
106
107
108 managementServer, nodeID, _, resolver, cleanup1 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
109 defer cleanup1()
110
111 server := stubserver.StartTestService(t, nil)
112 defer server.Stop()
113
114 lis := testutils.NewListenerWrapper(t, nil)
115 rlsServer, rlsRequestCh := rls.SetupFakeRLSServer(t, lis)
116 rlsProto := &rlspb.RouteLookupConfig{
117 GrpcKeybuilders: []*rlspb.GrpcKeyBuilder{{Names: []*rlspb.GrpcKeyBuilder_Name{{Service: "grpc.testing.TestService"}}}},
118 LookupService: rlsServer.Address,
119 LookupServiceTimeout: durationpb.New(defaultTestTimeout),
120 CacheSizeBytes: 1024,
121 }
122
123 const serviceName = "my-service-client-side-xds"
124 resources := defaultClientResourcesWithRLSCSP(t, lbPolicy, e2e.ResourceParams{
125 DialTarget: serviceName,
126 NodeID: nodeID,
127 Host: "localhost",
128 Port: testutils.ParsePort(t, server.Address),
129 SecLevel: e2e.SecurityLevelNone,
130 }, rlsProto)
131
132 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
133 defer cancel()
134 if err := managementServer.Update(ctx, resources); err != nil {
135 t.Fatal(err)
136 }
137
138
139
140 rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rls.RouteLookupResponse {
141 return &rls.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{"cluster-" + serviceName}}}
142 })
143
144
145 cc, err := grpc.NewClient(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolver))
146 if err != nil {
147 t.Fatalf("failed to dial local test server: %v", err)
148 }
149 defer cc.Close()
150
151 client := testgrpc.NewTestServiceClient(cc)
152
153
154 if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
155 t.Fatalf("rpc EmptyCall() failed: %v", err)
156 }
157
158
159
160
161
162 if _, err = lis.NewConnCh.Receive(ctx); err != nil {
163 t.Fatal("Timeout when waiting for RLS LB policy to create control channel")
164 }
165
166
167 select {
168 case <-ctx.Done():
169 t.Fatalf("Timeout when waiting for an RLS request to be sent out")
170 case <-rlsRequestCh:
171 }
172 }
173
View as plain text