1
18
19 package xds_test
20
21 import (
22 "context"
23 "fmt"
24 "io"
25 "net"
26 "strconv"
27 "testing"
28
29 "google.golang.org/grpc"
30 "google.golang.org/grpc/codes"
31 "google.golang.org/grpc/credentials/insecure"
32 xdscreds "google.golang.org/grpc/credentials/xds"
33 "google.golang.org/grpc/internal/testutils"
34 "google.golang.org/grpc/internal/testutils/xds/e2e"
35 "google.golang.org/grpc/status"
36 "google.golang.org/grpc/xds"
37
38 testgrpc "google.golang.org/grpc/interop/grpc_testing"
39 testpb "google.golang.org/grpc/interop/grpc_testing"
40 )
41
42 type testService struct {
43 testgrpc.TestServiceServer
44 }
45
46 func (*testService) EmptyCall(context.Context, *testpb.Empty) (*testpb.Empty, error) {
47 return &testpb.Empty{}, nil
48 }
49
50 func (*testService) UnaryCall(context.Context, *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
51 return &testpb.SimpleResponse{}, nil
52 }
53
54 func (*testService) FullDuplexCall(stream testgrpc.TestService_FullDuplexCallServer) error {
55 for {
56 _, err := stream.Recv()
57 if err == io.EOF {
58 return nil
59 }
60 }
61 }
62
63 func testModeChangeServerOption(t *testing.T) grpc.ServerOption {
64
65
66
67
68
69
70
71
72 return xds.ServingModeCallback(func(addr net.Addr, args xds.ServingModeChangeArgs) {
73 t.Logf("Serving mode for listener %q changed to %q, err: %v", addr.String(), args.Mode, args.Err)
74 })
75 }
76
77
78
79
80
81
82
83
84
85 func setupGRPCServer(t *testing.T, bootstrapContents []byte) (net.Listener, func()) {
86 t.Helper()
87
88
89 creds, err := xdscreds.NewServerCredentials(xdscreds.ServerOptions{
90 FallbackCreds: insecure.NewCredentials(),
91 })
92 if err != nil {
93 t.Fatal(err)
94 }
95
96
97 server, err := xds.NewGRPCServer(grpc.Creds(creds), testModeChangeServerOption(t), xds.BootstrapContentsForTesting(bootstrapContents))
98 if err != nil {
99 t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
100 }
101 testgrpc.RegisterTestServiceServer(server, &testService{})
102
103
104 lis, err := testutils.LocalTCPListener()
105 if err != nil {
106 t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
107 }
108
109 go func() {
110 if err := server.Serve(lis); err != nil {
111 t.Errorf("Serve() failed: %v", err)
112 }
113 }()
114
115 return lis, func() {
116 server.Stop()
117 }
118 }
119
120 func hostPortFromListener(lis net.Listener) (string, uint32, error) {
121 host, p, err := net.SplitHostPort(lis.Addr().String())
122 if err != nil {
123 return "", 0, fmt.Errorf("net.SplitHostPort(%s) failed: %v", lis.Addr().String(), err)
124 }
125 port, err := strconv.ParseInt(p, 10, 32)
126 if err != nil {
127 return "", 0, fmt.Errorf("strconv.ParseInt(%s, 10, 32) failed: %v", p, err)
128 }
129 return host, uint32(port), nil
130 }
131
132
133
134
135
136
137
138
139
140
141
142 func (s) TestServerSideXDS_Fallback(t *testing.T) {
143 managementServer, nodeID, bootstrapContents, resolver, cleanup1 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
144 defer cleanup1()
145
146 lis, cleanup2 := setupGRPCServer(t, bootstrapContents)
147 defer cleanup2()
148
149
150
151
152 host, port, err := hostPortFromListener(lis)
153 if err != nil {
154 t.Fatalf("failed to retrieve host and port of server: %v", err)
155 }
156 const serviceName = "my-service-fallback"
157 resources := e2e.DefaultClientResources(e2e.ResourceParams{
158 DialTarget: serviceName,
159 NodeID: nodeID,
160 Host: host,
161 Port: port,
162 SecLevel: e2e.SecurityLevelNone,
163 })
164
165
166
167
168 inboundLis := e2e.DefaultServerListener(host, port, e2e.SecurityLevelNone, "routeName")
169 resources.Listeners = append(resources.Listeners, inboundLis)
170
171
172 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
173 defer cancel()
174 if err := managementServer.Update(ctx, resources); err != nil {
175 t.Fatal(err)
176 }
177
178
179 creds, err := xdscreds.NewClientCredentials(xdscreds.ClientOptions{
180 FallbackCreds: insecure.NewCredentials(),
181 })
182 if err != nil {
183 t.Fatal(err)
184 }
185
186
187 cc, err := grpc.DialContext(ctx, fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(creds), grpc.WithResolvers(resolver))
188 if err != nil {
189 t.Fatalf("failed to dial local test server: %v", err)
190 }
191 defer cc.Close()
192
193 client := testgrpc.NewTestServiceClient(cc)
194 if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
195 t.Errorf("rpc EmptyCall() failed: %v", err)
196 }
197 }
198
199
200
201
202
203
204
205
206
207
208
209 func (s) TestServerSideXDS_FileWatcherCerts(t *testing.T) {
210 tests := []struct {
211 name string
212 secLevel e2e.SecurityLevel
213 }{
214 {
215 name: "tls",
216 secLevel: e2e.SecurityLevelTLS,
217 },
218 {
219 name: "mtls",
220 secLevel: e2e.SecurityLevelMTLS,
221 },
222 }
223 for _, test := range tests {
224 t.Run(test.name, func(t *testing.T) {
225 managementServer, nodeID, bootstrapContents, resolver, cleanup1 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
226 defer cleanup1()
227
228 lis, cleanup2 := setupGRPCServer(t, bootstrapContents)
229 defer cleanup2()
230
231
232
233 host, port, err := hostPortFromListener(lis)
234 if err != nil {
235 t.Fatalf("failed to retrieve host and port of server: %v", err)
236 }
237
238
239
240
241 serviceName := "my-service-file-watcher-certs-" + test.name
242 resources := e2e.DefaultClientResources(e2e.ResourceParams{
243 DialTarget: serviceName,
244 NodeID: nodeID,
245 Host: host,
246 Port: port,
247 SecLevel: test.secLevel,
248 })
249
250
251
252
253 inboundLis := e2e.DefaultServerListener(host, port, test.secLevel, "routeName")
254 resources.Listeners = append(resources.Listeners, inboundLis)
255
256
257 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
258 defer cancel()
259 if err := managementServer.Update(ctx, resources); err != nil {
260 t.Fatal(err)
261 }
262
263
264 creds, err := xdscreds.NewClientCredentials(xdscreds.ClientOptions{
265 FallbackCreds: insecure.NewCredentials(),
266 })
267 if err != nil {
268 t.Fatal(err)
269 }
270
271
272 cc, err := grpc.DialContext(ctx, fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(creds), grpc.WithResolvers(resolver))
273 if err != nil {
274 t.Fatalf("failed to dial local test server: %v", err)
275 }
276 defer cc.Close()
277
278 client := testgrpc.NewTestServiceClient(cc)
279 if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
280 t.Fatalf("rpc EmptyCall() failed: %v", err)
281 }
282 })
283 }
284 }
285
286
287
288
289
290
291
292
293
294 func (s) TestServerSideXDS_SecurityConfigChange(t *testing.T) {
295 managementServer, nodeID, bootstrapContents, resolver, cleanup1 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
296 defer cleanup1()
297
298 lis, cleanup2 := setupGRPCServer(t, bootstrapContents)
299 defer cleanup2()
300
301
302
303
304
305 host, port, err := hostPortFromListener(lis)
306 if err != nil {
307 t.Fatalf("failed to retrieve host and port of server: %v", err)
308 }
309 const serviceName = "my-service-security-config-change"
310 resources := e2e.DefaultClientResources(e2e.ResourceParams{
311 DialTarget: serviceName,
312 NodeID: nodeID,
313 Host: host,
314 Port: port,
315 SecLevel: e2e.SecurityLevelNone,
316 })
317
318
319
320
321 inboundLis := e2e.DefaultServerListener(host, port, e2e.SecurityLevelNone, "routeName")
322 resources.Listeners = append(resources.Listeners, inboundLis)
323
324
325 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
326 defer cancel()
327 if err := managementServer.Update(ctx, resources); err != nil {
328 t.Fatal(err)
329 }
330
331
332 xdsCreds, err := xdscreds.NewClientCredentials(xdscreds.ClientOptions{
333 FallbackCreds: insecure.NewCredentials(),
334 })
335 if err != nil {
336 t.Fatal(err)
337 }
338
339
340 xdsCC, err := grpc.DialContext(ctx, fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(xdsCreds), grpc.WithResolvers(resolver))
341 if err != nil {
342 t.Fatalf("failed to dial local test server: %v", err)
343 }
344 defer xdsCC.Close()
345
346 client := testgrpc.NewTestServiceClient(xdsCC)
347 if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
348 t.Fatalf("rpc EmptyCall() failed: %v", err)
349 }
350
351
352
353 tlsCreds := e2e.CreateClientTLSCredentials(t)
354 tlsCC, err := grpc.DialContext(ctx, lis.Addr().String(), grpc.WithTransportCredentials(tlsCreds))
355 if err != nil {
356 t.Fatalf("failed to dial local test server: %v", err)
357 }
358 defer tlsCC.Close()
359
360
361 client = testgrpc.NewTestServiceClient(tlsCC)
362 if _, err := client.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Unavailable {
363 t.Fatal("rpc EmptyCall() succeeded when expected to fail")
364 }
365
366
367
368 resources = e2e.DefaultClientResources(e2e.ResourceParams{
369 DialTarget: serviceName,
370 NodeID: nodeID,
371 Host: host,
372 Port: port,
373 SecLevel: e2e.SecurityLevelMTLS,
374 })
375 inboundLis = e2e.DefaultServerListener(host, port, e2e.SecurityLevelMTLS, "routeName")
376 resources.Listeners = append(resources.Listeners, inboundLis)
377 if err := managementServer.Update(ctx, resources); err != nil {
378 t.Fatal(err)
379 }
380
381
382 if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
383 t.Fatalf("rpc EmptyCall() failed: %v", err)
384 }
385 }
386
View as plain text