1
18
19 package xds_test
20
21 import (
22 "context"
23 "fmt"
24 "net"
25 "sync"
26 "testing"
27 "time"
28
29 "github.com/google/uuid"
30 "google.golang.org/grpc"
31 "google.golang.org/grpc/connectivity"
32 "google.golang.org/grpc/credentials/insecure"
33 "google.golang.org/grpc/internal"
34 "google.golang.org/grpc/internal/stubserver"
35 "google.golang.org/grpc/internal/testutils"
36 "google.golang.org/grpc/internal/testutils/xds/bootstrap"
37 "google.golang.org/grpc/internal/testutils/xds/e2e"
38 "google.golang.org/grpc/resolver"
39 "google.golang.org/grpc/xds"
40
41 clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
42 endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
43 listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
44 routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
45 testgrpc "google.golang.org/grpc/interop/grpc_testing"
46 testpb "google.golang.org/grpc/interop/grpc_testing"
47 )
48
49 const (
50 serviceName = "my-service-xds"
51 rdsName = "route-" + serviceName
52 cdsName1 = "cluster1-" + serviceName
53 cdsName2 = "cluster2-" + serviceName
54 edsName1 = "eds1-" + serviceName
55 edsName2 = "eds2-" + serviceName
56 )
57
58 var (
59
60
61
62 defaultRouteConfigWithTwoRoutes = &routepb.RouteConfiguration{
63 Name: rdsName,
64 VirtualHosts: []*routepb.VirtualHost{{
65 Domains: []string{serviceName},
66 Routes: []*routepb.Route{
67 {
68 Match: &routepb.RouteMatch{PathSpecifier: &routepb.RouteMatch_Prefix{Prefix: "/grpc.testing.TestService/EmptyCall"}},
69 Action: &routepb.Route_Route{Route: &routepb.RouteAction{
70 ClusterSpecifier: &routepb.RouteAction_Cluster{Cluster: cdsName1},
71 }},
72 },
73 {
74 Match: &routepb.RouteMatch{PathSpecifier: &routepb.RouteMatch_Prefix{Prefix: "/grpc.testing.TestService/UnaryCall"}},
75 Action: &routepb.Route_Route{Route: &routepb.RouteAction{
76 ClusterSpecifier: &routepb.RouteAction_Cluster{Cluster: cdsName2},
77 }},
78 },
79 },
80 }},
81 }
82 )
83
84
85
86
87
88
89
90
91
92 func (s) TestIgnoreResourceDeletionOnClient(t *testing.T) {
93 server1 := stubserver.StartTestService(t, nil)
94 t.Cleanup(server1.Stop)
95
96 server2 := stubserver.StartTestService(t, nil)
97 t.Cleanup(server2.Stop)
98
99 initialResourceOnServer := func(nodeID string) e2e.UpdateOptions {
100 return e2e.UpdateOptions{
101 NodeID: nodeID,
102 Listeners: []*listenerpb.Listener{e2e.DefaultClientListener(serviceName, rdsName)},
103 Routes: []*routepb.RouteConfiguration{defaultRouteConfigWithTwoRoutes},
104 Clusters: []*clusterpb.Cluster{
105 e2e.DefaultCluster(cdsName1, edsName1, e2e.SecurityLevelNone),
106 e2e.DefaultCluster(cdsName2, edsName2, e2e.SecurityLevelNone),
107 },
108 Endpoints: []*endpointpb.ClusterLoadAssignment{
109 e2e.DefaultEndpoint(edsName1, "localhost", []uint32{testutils.ParsePort(t, server1.Address)}),
110 e2e.DefaultEndpoint(edsName2, "localhost", []uint32{testutils.ParsePort(t, server2.Address)}),
111 },
112 SkipValidation: true,
113 }
114 }
115
116 tests := []struct {
117 name string
118 updateResource func(r *e2e.UpdateOptions)
119 }{
120 {
121 name: "listener",
122 updateResource: func(r *e2e.UpdateOptions) {
123 r.Listeners = nil
124 },
125 },
126 {
127 name: "cluster",
128 updateResource: func(r *e2e.UpdateOptions) {
129 r.Clusters = nil
130 },
131 },
132 }
133 for _, test := range tests {
134 t.Run(fmt.Sprintf("%s resource deletion ignored", test.name), func(t *testing.T) {
135 testResourceDeletionIgnored(t, initialResourceOnServer, test.updateResource)
136 })
137 t.Run(fmt.Sprintf("%s resource deletion not ignored", test.name), func(t *testing.T) {
138 testResourceDeletionNotIgnored(t, initialResourceOnServer, test.updateResource)
139 })
140 }
141 }
142
143
144
145
146
147 func testResourceDeletionIgnored(t *testing.T, initialResource func(string) e2e.UpdateOptions, updateResource func(r *e2e.UpdateOptions)) {
148 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
149 t.Cleanup(cancel)
150 mgmtServer := startManagementServer(t)
151 nodeID := uuid.New().String()
152 bs := generateBootstrapContents(t, mgmtServer.Address, true, nodeID)
153 xdsR := xdsResolverBuilder(t, bs)
154 resources := initialResource(nodeID)
155
156
157 if err := mgmtServer.Update(ctx, resources); err != nil {
158 t.Fatal(err)
159 }
160
161 cc, err := grpc.NewClient(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(xdsR))
162 if err != nil {
163 t.Fatalf("Failed to dial local test server: %v.", err)
164 }
165 t.Cleanup(func() { cc.Close() })
166
167 if err := verifyRPCtoAllEndpoints(cc); err != nil {
168 t.Fatal(err)
169 }
170
171
172 updateResource(&resources)
173 if err := mgmtServer.Update(ctx, resources); err != nil {
174 t.Fatal(err)
175 }
176
177
178
179
180
181 timer := time.NewTimer(500 * time.Millisecond)
182 ticker := time.NewTicker(50 * time.Millisecond)
183 t.Cleanup(ticker.Stop)
184 for {
185 if err := verifyRPCtoAllEndpoints(cc); err != nil {
186 t.Fatal(err)
187 }
188 select {
189 case <-ctx.Done():
190 return
191 case <-timer.C:
192 return
193 case <-ticker.C:
194 }
195 }
196 }
197
198
199
200
201
202 func testResourceDeletionNotIgnored(t *testing.T, initialResource func(string) e2e.UpdateOptions, updateResource func(r *e2e.UpdateOptions)) {
203 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout*1000)
204 t.Cleanup(cancel)
205 mgmtServer := startManagementServer(t)
206 nodeID := uuid.New().String()
207 bs := generateBootstrapContents(t, mgmtServer.Address, false, nodeID)
208 xdsR := xdsResolverBuilder(t, bs)
209 resources := initialResource(nodeID)
210
211
212 if err := mgmtServer.Update(ctx, resources); err != nil {
213 t.Fatal(err)
214 }
215
216 cc, err := grpc.NewClient(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(xdsR))
217 if err != nil {
218 t.Fatalf("failed to dial local test server: %v", err)
219 }
220 t.Cleanup(func() { cc.Close() })
221
222 if err := verifyRPCtoAllEndpoints(cc); err != nil {
223 t.Fatal(err)
224 }
225
226
227 updateResource(&resources)
228 if err := mgmtServer.Update(ctx, resources); err != nil {
229 t.Fatal(err)
230 }
231
232
233 client := testgrpc.NewTestServiceClient(cc)
234 wg := sync.WaitGroup{}
235 wg.Add(2)
236 go func() {
237 defer wg.Done()
238 for ; ctx.Err() == nil; <-time.After(10 * time.Millisecond) {
239 if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
240 return
241 }
242 }
243 }()
244 go func() {
245 defer wg.Done()
246 for ; ctx.Err() == nil; <-time.After(10 * time.Millisecond) {
247 if _, err := client.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
248 return
249 }
250 }
251 }()
252
253 wg.Wait()
254 if ctx.Err() != nil {
255 t.Fatal("Context expired before RPCs failed.")
256 }
257 }
258
259
260 func startManagementServer(t *testing.T) *e2e.ManagementServer {
261 t.Helper()
262 mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{})
263 if err != nil {
264 t.Fatalf("Failed to start management server: %v", err)
265 }
266 t.Cleanup(mgmtServer.Stop)
267 return mgmtServer
268 }
269
270
271 func generateBootstrapContents(t *testing.T, serverURI string, ignoreResourceDeletion bool, nodeID string) []byte {
272 t.Helper()
273 bootstrapContents, err := bootstrap.Contents(bootstrap.Options{
274 NodeID: nodeID,
275 ServerURI: serverURI,
276 ServerListenerResourceNameTemplate: e2e.ServerListenerResourceNameTemplate,
277 IgnoreResourceDeletion: ignoreResourceDeletion,
278 })
279 if err != nil {
280 t.Fatal(err)
281 }
282 return bootstrapContents
283 }
284
285
286
287 func xdsResolverBuilder(t *testing.T, bs []byte) resolver.Builder {
288 t.Helper()
289 resolverBuilder := internal.NewXDSResolverWithConfigForTesting.(func([]byte) (resolver.Builder, error))
290 xdsR, err := resolverBuilder(bs)
291 if err != nil {
292 t.Fatalf("Creating xDS resolver for testing failed for config %q: %v", string(bs), err)
293 }
294 return xdsR
295 }
296
297
298
299
300 func setupGRPCServerWithModeChangeChannelAndServe(t *testing.T, bootstrapContents []byte, lis net.Listener) chan connectivity.ServingMode {
301 t.Helper()
302 updateCh := make(chan connectivity.ServingMode, 1)
303
304
305 modeChangeOpt := xds.ServingModeCallback(func(addr net.Addr, args xds.ServingModeChangeArgs) {
306 t.Logf("Serving mode for listener %q changed to %q, err: %v", addr.String(), args.Mode, args.Err)
307 updateCh <- args.Mode
308 })
309 server, err := xds.NewGRPCServer(grpc.Creds(insecure.NewCredentials()), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents))
310 if err != nil {
311 t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
312 }
313 t.Cleanup(server.Stop)
314 testgrpc.RegisterTestServiceServer(server, &testService{})
315
316
317 go func() {
318 if err := server.Serve(lis); err != nil {
319 t.Errorf("Serve() failed: %v", err)
320 }
321 }()
322
323 return updateCh
324 }
325
326
327
328
329 func resourceWithListenerForGRPCServer(t *testing.T, nodeID string) (e2e.UpdateOptions, net.Listener) {
330 t.Helper()
331 lis, err := testutils.LocalTCPListener()
332 if err != nil {
333 t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
334 }
335 t.Cleanup(func() { lis.Close() })
336 host, port, err := hostPortFromListener(lis)
337 if err != nil {
338 t.Fatalf("Failed to retrieve host and port of listener at %q: %v", lis.Addr(), err)
339 }
340 listener := e2e.DefaultServerListener(host, port, e2e.SecurityLevelNone, "routeName")
341 resources := e2e.UpdateOptions{
342 NodeID: nodeID,
343 Listeners: []*listenerpb.Listener{listener},
344 }
345 return resources, lis
346 }
347
348
349
350
351
352
353 func (s) TestListenerResourceDeletionOnServerIgnored(t *testing.T) {
354 mgmtServer := startManagementServer(t)
355 nodeID := uuid.New().String()
356 bs := generateBootstrapContents(t, mgmtServer.Address, true, nodeID)
357 xdsR := xdsResolverBuilder(t, bs)
358 resources, lis := resourceWithListenerForGRPCServer(t, nodeID)
359 modeChangeCh := setupGRPCServerWithModeChangeChannelAndServe(t, bs, lis)
360
361 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
362 defer cancel()
363 if err := mgmtServer.Update(ctx, resources); err != nil {
364 t.Fatal(err)
365 }
366
367
368 select {
369 case <-ctx.Done():
370 t.Fatal("Test timed out waiting for a server to change to ServingModeServing.")
371 case mode := <-modeChangeCh:
372 if mode != connectivity.ServingModeServing {
373 t.Fatalf("Server switched to mode %v, want %v", mode, connectivity.ServingModeServing)
374 }
375 }
376
377
378 cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(xdsR))
379 if err != nil {
380 t.Fatalf("failed to dial local test server: %v", err)
381 }
382 defer cc.Close()
383
384 if err := verifyRPCtoAllEndpoints(cc); err != nil {
385 t.Fatal(err)
386 }
387
388
389 if err := mgmtServer.Update(ctx, e2e.UpdateOptions{
390 NodeID: nodeID,
391 Listeners: []*listenerpb.Listener{},
392 }); err != nil {
393 t.Fatal(err)
394 }
395
396
397
398 timer := time.NewTimer(500 * time.Millisecond)
399 ticker := time.NewTicker(50 * time.Millisecond)
400 t.Cleanup(ticker.Stop)
401 for {
402 if err := verifyRPCtoAllEndpoints(cc); err != nil {
403 t.Fatal(err)
404 }
405 select {
406 case <-timer.C:
407 return
408 case mode := <-modeChangeCh:
409 t.Fatalf("Server switched to mode: %v when no switch was expected", mode)
410 case <-ticker.C:
411 }
412 }
413 }
414
415
416
417
418
419
420 func (s) TestListenerResourceDeletionOnServerNotIgnored(t *testing.T) {
421 mgmtServer := startManagementServer(t)
422 nodeID := uuid.New().String()
423 bs := generateBootstrapContents(t, mgmtServer.Address, false, nodeID)
424 xdsR := xdsResolverBuilder(t, bs)
425 resources, lis := resourceWithListenerForGRPCServer(t, nodeID)
426 updateCh := setupGRPCServerWithModeChangeChannelAndServe(t, bs, lis)
427
428 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
429 defer cancel()
430 if err := mgmtServer.Update(ctx, resources); err != nil {
431 t.Fatal(err)
432 }
433
434
435 select {
436 case <-ctx.Done():
437 t.Fatal("Test timed out waiting for a mode change update.")
438 case mode := <-updateCh:
439 if mode != connectivity.ServingModeServing {
440 t.Fatalf("Listener received new mode %v, want %v", mode, connectivity.ServingModeServing)
441 }
442 }
443
444
445 cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(xdsR))
446 if err != nil {
447 t.Fatalf("failed to dial local test server: %v", err)
448 }
449 defer cc.Close()
450 if err := verifyRPCtoAllEndpoints(cc); err != nil {
451 t.Fatal(err)
452 }
453
454 if err := mgmtServer.Update(ctx, e2e.UpdateOptions{
455 NodeID: nodeID,
456 Listeners: []*listenerpb.Listener{},
457 }); err != nil {
458 t.Fatal(err)
459 }
460
461 select {
462 case <-ctx.Done():
463 t.Fatalf("timed out waiting for a mode change update: %v", err)
464 case mode := <-updateCh:
465 if mode != connectivity.ServingModeNotServing {
466 t.Fatalf("listener received new mode %v, want %v", mode, connectivity.ServingModeNotServing)
467 }
468 }
469 }
470
471
472
473 func verifyRPCtoAllEndpoints(cc grpc.ClientConnInterface) error {
474 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
475 defer cancel()
476 client := testgrpc.NewTestServiceClient(cc)
477 if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
478 return fmt.Errorf("rpc EmptyCall() failed: %v", err)
479 }
480 if _, err := client.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
481 return fmt.Errorf("rpc UnaryCall() failed: %v", err)
482 }
483 return nil
484 }
485
View as plain text