1
18
19 package xds_test
20
21 import (
22 "context"
23 "io"
24 "net"
25 "strings"
26 "testing"
27 "time"
28
29 "google.golang.org/grpc"
30 "google.golang.org/grpc/codes"
31 "google.golang.org/grpc/connectivity"
32 "google.golang.org/grpc/credentials/insecure"
33 "google.golang.org/grpc/internal"
34 "google.golang.org/grpc/internal/grpcsync"
35 "google.golang.org/grpc/internal/testutils"
36 "google.golang.org/grpc/internal/testutils/xds/e2e"
37 "google.golang.org/grpc/status"
38 "google.golang.org/grpc/xds"
39
40 v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
41 v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
42 testgrpc "google.golang.org/grpc/interop/grpc_testing"
43 testpb "google.golang.org/grpc/interop/grpc_testing"
44 )
45
46 var (
47 errAcceptAndClose = status.New(codes.Unavailable, "")
48 )
49
50
51
52
53
54
55
56
57
58 func (s) TestServeLDSRDS(t *testing.T) {
59 managementServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
60 defer cleanup()
61 lis, err := testutils.LocalTCPListener()
62 if err != nil {
63 t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
64 }
65
66
67
68 host, port, err := hostPortFromListener(lis)
69 if err != nil {
70 t.Fatalf("failed to retrieve host and port of server: %v", err)
71 }
72
73 listener := e2e.DefaultServerListenerWithRouteConfigName(host, port, e2e.SecurityLevelNone, "routeName")
74 routeConfig := e2e.RouteConfigNonForwardingAction("routeName")
75
76 resources := e2e.UpdateOptions{
77 NodeID: nodeID,
78 Listeners: []*v3listenerpb.Listener{listener},
79 Routes: []*v3routepb.RouteConfiguration{routeConfig},
80 }
81
82 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
83 defer cancel()
84 if err := managementServer.Update(ctx, resources); err != nil {
85 t.Fatal(err)
86 }
87
88 serving := grpcsync.NewEvent()
89 modeChangeOpt := xds.ServingModeCallback(func(addr net.Addr, args xds.ServingModeChangeArgs) {
90 t.Logf("serving mode for listener %q changed to %q, err: %v", addr.String(), args.Mode, args.Err)
91 if args.Mode == connectivity.ServingModeServing {
92 serving.Fire()
93 }
94 })
95
96 server, err := xds.NewGRPCServer(grpc.Creds(insecure.NewCredentials()), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents))
97 if err != nil {
98 t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
99 }
100 defer server.Stop()
101 testgrpc.RegisterTestServiceServer(server, &testService{})
102 go func() {
103 if err := server.Serve(lis); err != nil {
104 t.Errorf("Serve() failed: %v", err)
105 }
106 }()
107 select {
108 case <-ctx.Done():
109 t.Fatal("timeout waiting for the xDS Server to go Serving")
110 case <-serving.Done():
111 }
112
113 cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
114 if err != nil {
115 t.Fatalf("failed to dial local test server: %v", err)
116 }
117 defer cc.Close()
118
119 waitForSuccessfulRPC(ctx, t, cc)
120
121
122
123
124 routeConfig = e2e.RouteConfigFilterAction("routeName")
125 resources = e2e.UpdateOptions{
126 NodeID: nodeID,
127 Listeners: []*v3listenerpb.Listener{listener},
128 Routes: []*v3routepb.RouteConfiguration{routeConfig},
129 }
130 if err := managementServer.Update(ctx, resources); err != nil {
131 t.Fatal(err)
132 }
133
134
135
136
137 waitForFailedRPCWithStatus(ctx, t, cc, status.New(codes.Unavailable, "the incoming RPC matched to a route that was not of action type non forwarding"))
138 }
139
140
141
142
143 func waitForFailedRPCWithStatus(ctx context.Context, t *testing.T, cc *grpc.ClientConn, st *status.Status) {
144 t.Helper()
145
146 c := testgrpc.NewTestServiceClient(cc)
147 ticker := time.NewTicker(10 * time.Millisecond)
148 defer ticker.Stop()
149 var err error
150 for {
151 select {
152 case <-ctx.Done():
153 t.Fatalf("failure when waiting for RPCs to fail with certain status %v: %v. most recent error received from RPC: %v", st, ctx.Err(), err)
154 case <-ticker.C:
155 _, err = c.EmptyCall(ctx, &testpb.Empty{})
156 if status.Code(err) == st.Code() && strings.Contains(err.Error(), st.Message()) {
157 t.Logf("most recent error happy case: %v", err.Error())
158 return
159 }
160 }
161 }
162 }
163
164
165
166
167
168 func (s) TestRDSNack(t *testing.T) {
169 managementServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
170 defer cleanup()
171 lis, err := testutils.LocalTCPListener()
172 if err != nil {
173 t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
174 }
175
176
177
178 host, port, err := hostPortFromListener(lis)
179 if err != nil {
180 t.Fatalf("failed to retrieve host and port of server: %v", err)
181 }
182
183 listener := e2e.DefaultServerListenerWithRouteConfigName(host, port, e2e.SecurityLevelNone, "routeName")
184 routeConfig := e2e.RouteConfigNoRouteMatch("routeName")
185 resources := e2e.UpdateOptions{
186 NodeID: nodeID,
187 Listeners: []*v3listenerpb.Listener{listener},
188 Routes: []*v3routepb.RouteConfiguration{routeConfig},
189 SkipValidation: true,
190 }
191
192 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
193 defer cancel()
194 if err := managementServer.Update(ctx, resources); err != nil {
195 t.Fatal(err)
196 }
197 serving := grpcsync.NewEvent()
198 modeChangeOpt := xds.ServingModeCallback(func(addr net.Addr, args xds.ServingModeChangeArgs) {
199 t.Logf("serving mode for listener %q changed to %q, err: %v", addr.String(), args.Mode, args.Err)
200 if args.Mode == connectivity.ServingModeServing {
201 serving.Fire()
202 }
203 })
204
205 server, err := xds.NewGRPCServer(grpc.Creds(insecure.NewCredentials()), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents))
206 if err != nil {
207 t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
208 }
209 defer server.Stop()
210 testgrpc.RegisterTestServiceServer(server, &testService{})
211 go func() {
212 if err := server.Serve(lis); err != nil {
213 t.Errorf("Serve() failed: %v", err)
214 }
215 }()
216
217 cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
218 if err != nil {
219 t.Fatalf("failed to dial local test server: %v", err)
220 }
221 defer cc.Close()
222
223 <-serving.Done()
224 waitForFailedRPCWithStatus(ctx, t, cc, status.New(codes.Unavailable, "error from xDS configuration for matched route configuration"))
225 }
226
227
228
229
230
231
232
233 func (s) TestResourceNotFoundRDS(t *testing.T) {
234 managementServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
235 defer cleanup()
236 lis, err := testutils.LocalTCPListener()
237 if err != nil {
238 t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
239 }
240
241
242
243 host, port, err := hostPortFromListener(lis)
244 if err != nil {
245 t.Fatalf("failed to retrieve host and port of server: %v", err)
246 }
247
248 listener := e2e.DefaultServerListenerWithRouteConfigName(host, port, e2e.SecurityLevelNone, "routeName")
249 resources := e2e.UpdateOptions{
250 NodeID: nodeID,
251 Listeners: []*v3listenerpb.Listener{listener},
252 SkipValidation: true,
253 }
254
255 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
256 defer cancel()
257 if err := managementServer.Update(ctx, resources); err != nil {
258 t.Fatal(err)
259 }
260 serving := grpcsync.NewEvent()
261 modeChangeOpt := xds.ServingModeCallback(func(addr net.Addr, args xds.ServingModeChangeArgs) {
262 t.Logf("serving mode for listener %q changed to %q, err: %v", addr.String(), args.Mode, args.Err)
263 if args.Mode == connectivity.ServingModeServing {
264 serving.Fire()
265 }
266 })
267
268 server, err := xds.NewGRPCServer(grpc.Creds(insecure.NewCredentials()), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents))
269 if err != nil {
270 t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
271 }
272 defer server.Stop()
273 testgrpc.RegisterTestServiceServer(server, &testService{})
274 go func() {
275 if err := server.Serve(lis); err != nil {
276 t.Errorf("Serve() failed: %v", err)
277 }
278 }()
279
280 cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
281 if err != nil {
282 t.Fatalf("failed to dial local test server: %v", err)
283 }
284 defer cc.Close()
285
286 waitForFailedRPCWithStatus(ctx, t, cc, errAcceptAndClose)
287
288
289
290
291
292 loop:
293 for {
294 if err := internal.TriggerXDSResourceNameNotFoundClient.(func(string, string) error)("RouteConfigResource", "routeName"); err != nil {
295 t.Fatalf("Failed to trigger resource name not found for testing: %v", err)
296 }
297 select {
298 case <-serving.Done():
299 break loop
300 case <-ctx.Done():
301 t.Fatalf("timed out waiting for serving mode to go serving")
302 case <-time.After(time.Millisecond):
303 }
304 }
305 waitForFailedRPCWithStatus(ctx, t, cc, status.New(codes.Unavailable, "error from xDS configuration for matched route configuration"))
306 }
307
308
309
310
311
312
313
314 func (s) TestServingModeChanges(t *testing.T) {
315 managementServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
316 defer cleanup()
317 lis, err := testutils.LocalTCPListener()
318 if err != nil {
319 t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
320 }
321
322
323
324 host, port, err := hostPortFromListener(lis)
325 if err != nil {
326 t.Fatalf("failed to retrieve host and port of server: %v", err)
327 }
328
329 listener := e2e.DefaultServerListenerWithRouteConfigName(host, port, e2e.SecurityLevelNone, "routeName")
330 resources := e2e.UpdateOptions{
331 NodeID: nodeID,
332 Listeners: []*v3listenerpb.Listener{listener},
333 SkipValidation: true,
334 }
335
336 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
337 defer cancel()
338 if err := managementServer.Update(ctx, resources); err != nil {
339 t.Fatal(err)
340 }
341
342 serving := grpcsync.NewEvent()
343 modeChangeOpt := xds.ServingModeCallback(func(addr net.Addr, args xds.ServingModeChangeArgs) {
344 t.Logf("serving mode for listener %q changed to %q, err: %v", addr.String(), args.Mode, args.Err)
345 if args.Mode == connectivity.ServingModeServing {
346 serving.Fire()
347 }
348 })
349
350 server, err := xds.NewGRPCServer(grpc.Creds(insecure.NewCredentials()), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents))
351 if err != nil {
352 t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
353 }
354 defer server.Stop()
355 testgrpc.RegisterTestServiceServer(server, &testService{})
356 go func() {
357 if err := server.Serve(lis); err != nil {
358 t.Errorf("Serve() failed: %v", err)
359 }
360 }()
361 cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
362 if err != nil {
363 t.Fatalf("failed to dial local test server: %v", err)
364 }
365 defer cc.Close()
366
367 waitForFailedRPCWithStatus(ctx, t, cc, errAcceptAndClose)
368 routeConfig := e2e.RouteConfigNonForwardingAction("routeName")
369 resources = e2e.UpdateOptions{
370 NodeID: nodeID,
371 Listeners: []*v3listenerpb.Listener{listener},
372 Routes: []*v3routepb.RouteConfiguration{routeConfig},
373 }
374 defer cancel()
375 if err := managementServer.Update(ctx, resources); err != nil {
376 t.Fatal(err)
377 }
378
379 select {
380 case <-ctx.Done():
381 t.Fatal("timeout waiting for the xDS Server to go Serving")
382 case <-serving.Done():
383 }
384
385
386
387 waitForSuccessfulRPC(ctx, t, cc)
388
389
390
391
392
393 c := testgrpc.NewTestServiceClient(cc)
394 stream, err := c.FullDuplexCall(ctx)
395 if err != nil {
396 t.Fatalf("cc.FullDuplexCall failed: %f", err)
397 }
398
399
400
401
402
403
404
405 if err := internal.TriggerXDSResourceNameNotFoundClient.(func(string, string) error)("ListenerResource", listener.GetName()); err != nil {
406 t.Fatalf("Failed to trigger resource name not found for testing: %v", err)
407 }
408
409
410
411 if err = stream.Send(&testpb.StreamingOutputCallRequest{}); err != nil {
412 t.Fatalf("stream.Send() failed: %v, should continue to work due to graceful stop", err)
413 }
414 if err = stream.CloseSend(); err != nil {
415 t.Fatalf("stream.CloseSend() failed: %v, should continue to work due to graceful stop", err)
416 }
417 if _, err = stream.Recv(); err != io.EOF {
418 t.Fatalf("unexpected error: %v, expected an EOF error", err)
419 }
420
421
422 waitForFailedRPCWithStatus(ctx, t, cc, errAcceptAndClose)
423 }
424
425
426
427
428
429
430
431
432
433
434
435
436
437 func (s) TestMultipleUpdatesImmediatelySwitch(t *testing.T) {
438 managementServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
439 defer cleanup()
440 lis, err := testutils.LocalTCPListener()
441 if err != nil {
442 t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
443 }
444 host, port, err := hostPortFromListener(lis)
445 if err != nil {
446 t.Fatalf("failed to retrieve host and port of server: %v", err)
447 }
448 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
449 defer cancel()
450
451
452
453 ldsResource := e2e.ListenerResourceThreeRouteResources(host, port, e2e.SecurityLevelNone, "routeName")
454 resources := e2e.UpdateOptions{
455 NodeID: nodeID,
456 Listeners: []*v3listenerpb.Listener{ldsResource},
457 SkipValidation: true,
458 }
459 if err := managementServer.Update(ctx, resources); err != nil {
460 t.Fatal(err)
461 }
462 server, err := xds.NewGRPCServer(grpc.Creds(insecure.NewCredentials()), testModeChangeServerOption(t), xds.BootstrapContentsForTesting(bootstrapContents))
463 if err != nil {
464 t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
465 }
466 defer server.Stop()
467 testgrpc.RegisterTestServiceServer(server, &testService{})
468 go func() {
469 if err := server.Serve(lis); err != nil {
470 t.Errorf("Serve() failed: %v", err)
471 }
472 }()
473
474 cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
475 if err != nil {
476 t.Fatalf("failed to dial local test server: %v", err)
477 }
478 defer cc.Close()
479
480 waitForFailedRPCWithStatus(ctx, t, cc, errAcceptAndClose)
481
482 routeConfig1 := e2e.RouteConfigNonForwardingAction("routeName")
483 routeConfig2 := e2e.RouteConfigFilterAction("routeName2")
484 routeConfig3 := e2e.RouteConfigFilterAction("routeName3")
485 resources = e2e.UpdateOptions{
486 NodeID: nodeID,
487 Listeners: []*v3listenerpb.Listener{ldsResource},
488 Routes: []*v3routepb.RouteConfiguration{routeConfig1, routeConfig2, routeConfig3},
489 SkipValidation: true,
490 }
491 if err := managementServer.Update(ctx, resources); err != nil {
492 t.Fatal(err)
493 }
494 pollForSuccessfulRPC(ctx, t, cc)
495
496 c := testgrpc.NewTestServiceClient(cc)
497 stream, err := c.FullDuplexCall(ctx)
498 if err != nil {
499 t.Fatalf("cc.FullDuplexCall failed: %f", err)
500 }
501 if err = stream.Send(&testpb.StreamingOutputCallRequest{}); err != nil {
502 t.Fatalf("stream.Send() failed: %v, should continue to work due to graceful stop", err)
503 }
504
505
506
507 ldsResource = e2e.ListenerResourceFallbackToDefault(host, port, e2e.SecurityLevelNone)
508 resources = e2e.UpdateOptions{
509 NodeID: nodeID,
510 Listeners: []*v3listenerpb.Listener{ldsResource},
511 Routes: []*v3routepb.RouteConfiguration{routeConfig1, routeConfig2, routeConfig3},
512 SkipValidation: true,
513 }
514 if err := managementServer.Update(ctx, resources); err != nil {
515 t.Fatalf("error updating management server: %v", err)
516 }
517
518
519
520
521
522
523 waitForFailedRPCWithStatus(ctx, t, cc, status.New(codes.Unavailable, "the incoming RPC matched to a route that was not of action type non forwarding"))
524
525
526
527
528 if err = stream.CloseSend(); err != nil {
529 t.Fatalf("stream.CloseSend() failed: %v, should continue to work due to graceful stop", err)
530 }
531 if _, err = stream.Recv(); err != io.EOF {
532 t.Fatalf("unexpected error: %v, expected an EOF error", err)
533 }
534
535 ldsResource = e2e.DefaultServerListener(host, port, e2e.SecurityLevelNone, "routeName")
536 resources = e2e.UpdateOptions{
537 NodeID: nodeID,
538 Listeners: []*v3listenerpb.Listener{ldsResource},
539 Routes: []*v3routepb.RouteConfiguration{routeConfig1, routeConfig2, routeConfig3},
540 SkipValidation: true,
541 }
542 if err := managementServer.Update(ctx, resources); err != nil {
543 t.Fatal(err)
544 }
545
546 pollForSuccessfulRPC(ctx, t, cc)
547 }
548
549 func pollForSuccessfulRPC(ctx context.Context, t *testing.T, cc *grpc.ClientConn) {
550 t.Helper()
551 c := testgrpc.NewTestServiceClient(cc)
552 ticker := time.NewTicker(10 * time.Millisecond)
553 defer ticker.Stop()
554 for {
555 select {
556 case <-ctx.Done():
557 t.Fatalf("timeout waiting for RPCs to succeed")
558 case <-ticker.C:
559 if _, err := c.EmptyCall(ctx, &testpb.Empty{}); err == nil {
560 return
561 }
562 }
563 }
564 }
565
View as plain text