1
16
17 package e2e_test
18
19 import (
20 "context"
21 "errors"
22 "fmt"
23 "strings"
24 "testing"
25 "time"
26
27 "github.com/google/go-cmp/cmp"
28 "github.com/google/uuid"
29 "google.golang.org/grpc"
30 "google.golang.org/grpc/codes"
31 "google.golang.org/grpc/credentials/insecure"
32 "google.golang.org/grpc/internal"
33 "google.golang.org/grpc/internal/grpctest"
34 "google.golang.org/grpc/internal/stubserver"
35 "google.golang.org/grpc/internal/testutils"
36 rrutil "google.golang.org/grpc/internal/testutils/roundrobin"
37 "google.golang.org/grpc/internal/testutils/xds/e2e"
38 "google.golang.org/grpc/internal/xds/bootstrap"
39 "google.golang.org/grpc/peer"
40 "google.golang.org/grpc/resolver"
41 "google.golang.org/grpc/resolver/manual"
42 "google.golang.org/grpc/serviceconfig"
43 "google.golang.org/grpc/status"
44 xdstestutils "google.golang.org/grpc/xds/internal/testutils"
45 "google.golang.org/grpc/xds/internal/xdsclient"
46 "google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version"
47 "google.golang.org/protobuf/types/known/wrapperspb"
48
49 v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
50 v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
51 v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
52 v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
53 testgrpc "google.golang.org/grpc/interop/grpc_testing"
54 testpb "google.golang.org/grpc/interop/grpc_testing"
55
56 _ "google.golang.org/grpc/xds/internal/balancer/clusterresolver"
57 "google.golang.org/grpc/xds/internal/balancer/priority"
58 )
59
60 const (
61 clusterName = "cluster-my-service-client-side-xds"
62 edsServiceName = "endpoints-my-service-client-side-xds"
63 localityName1 = "my-locality-1"
64 localityName2 = "my-locality-2"
65 localityName3 = "my-locality-3"
66
67 defaultTestTimeout = 5 * time.Second
68 defaultTestShortTimeout = 10 * time.Millisecond
69 defaultTestWatchExpiryTimeout = 500 * time.Millisecond
70 )
71
72 type s struct {
73 grpctest.Tester
74 }
75
76 func Test(t *testing.T) {
77 grpctest.RunSubTests(t, s{})
78 }
79
80
81
82
83 func backendAddressesAndPorts(t *testing.T, servers []*stubserver.StubServer) ([]resolver.Address, []uint32) {
84 addrs := make([]resolver.Address, len(servers))
85 ports := make([]uint32, len(servers))
86 for i := 0; i < len(servers); i++ {
87 addrs[i] = resolver.Address{Addr: servers[i].Address}
88 ports[i] = testutils.ParsePort(t, servers[i].Address)
89 }
90 return addrs, ports
91 }
92
93 func startTestServiceBackends(t *testing.T, numBackends int) ([]*stubserver.StubServer, func()) {
94 var servers []*stubserver.StubServer
95 for i := 0; i < numBackends; i++ {
96 servers = append(servers, stubserver.StartTestService(t, nil))
97 servers[i].StartServer()
98 }
99
100 return servers, func() {
101 for _, server := range servers {
102 server.Stop()
103 }
104 }
105 }
106
107
108
109 func clientEndpointsResource(nodeID, edsServiceName string, localities []e2e.LocalityOptions) e2e.UpdateOptions {
110 return e2e.UpdateOptions{
111 NodeID: nodeID,
112 Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.EndpointResourceWithOptions(e2e.EndpointOptions{
113 ClusterName: edsServiceName,
114 Host: "localhost",
115 Localities: localities,
116 })},
117 SkipValidation: true,
118 }
119 }
120
121
122
123
124
125
126
127
128 func (s) TestEDS_OneLocality(t *testing.T) {
129
130 managementServer, nodeID, bootstrapContents, _, cleanup1 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
131 defer cleanup1()
132
133
134 servers, cleanup2 := startTestServiceBackends(t, 3)
135 defer cleanup2()
136 addrs, ports := backendAddressesAndPorts(t, servers)
137
138
139
140 resources := clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{{
141 Name: localityName1,
142 Weight: 1,
143 Backends: []e2e.BackendOptions{{Port: ports[0]}},
144 }})
145 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
146 defer cancel()
147 if err := managementServer.Update(ctx, resources); err != nil {
148 t.Fatal(err)
149 }
150
151
152 client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
153 if err != nil {
154 t.Fatalf("Failed to create xDS client: %v", err)
155 }
156 defer close()
157
158
159
160 r := manual.NewBuilderWithScheme("whatever")
161 jsonSC := fmt.Sprintf(`{
162 "loadBalancingConfig":[{
163 "cluster_resolver_experimental":{
164 "discoveryMechanisms": [{
165 "cluster": "%s",
166 "type": "EDS",
167 "edsServiceName": "%s",
168 "outlierDetection": {}
169 }],
170 "xdsLbPolicy":[{"round_robin":{}}]
171 }
172 }]
173 }`, clusterName, edsServiceName)
174 scpr := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC)
175 r.InitialState(xdsclient.SetClient(resolver.State{ServiceConfig: scpr}, client))
176
177
178 cc, err := grpc.Dial(r.Scheme()+":///test.service", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
179 if err != nil {
180 t.Fatalf("failed to dial local test server: %v", err)
181 }
182 defer cc.Close()
183
184
185 testClient := testgrpc.NewTestServiceClient(cc)
186 if err := rrutil.CheckRoundRobinRPCs(ctx, testClient, addrs[:1]); err != nil {
187 t.Fatal(err)
188 }
189
190
191
192 resources = clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{{
193 Name: localityName1,
194 Weight: 1,
195 Backends: []e2e.BackendOptions{{Port: ports[0]}, {Port: ports[1]}},
196 }})
197 if err := managementServer.Update(ctx, resources); err != nil {
198 t.Fatal(err)
199 }
200 if err := rrutil.CheckRoundRobinRPCs(ctx, testClient, addrs[:2]); err != nil {
201 t.Fatal(err)
202 }
203
204
205
206 resources = clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{{
207 Name: localityName1,
208 Weight: 1,
209 Backends: []e2e.BackendOptions{{Port: ports[1]}},
210 }})
211 if err := managementServer.Update(ctx, resources); err != nil {
212 t.Fatal(err)
213 }
214 if err := rrutil.CheckRoundRobinRPCs(ctx, testClient, addrs[1:2]); err != nil {
215 t.Fatal(err)
216 }
217
218
219 resources = clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{{
220 Name: localityName1,
221 Weight: 1,
222 Backends: []e2e.BackendOptions{{Port: ports[2]}},
223 }})
224 if err := managementServer.Update(ctx, resources); err != nil {
225 t.Fatal(err)
226 }
227 if err := rrutil.CheckRoundRobinRPCs(ctx, testClient, addrs[2:3]); err != nil {
228 t.Fatal(err)
229 }
230 }
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250 func (s) TestEDS_MultipleLocalities(t *testing.T) {
251
252 managementServer, nodeID, bootstrapContents, _, cleanup1 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
253 defer cleanup1()
254
255
256 servers, cleanup2 := startTestServiceBackends(t, 4)
257 defer cleanup2()
258 addrs, ports := backendAddressesAndPorts(t, servers)
259
260
261
262 resources := clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{
263 {
264 Name: localityName1,
265 Weight: 1,
266 Backends: []e2e.BackendOptions{{Port: ports[0]}},
267 },
268 {
269 Name: localityName2,
270 Weight: 1,
271 Backends: []e2e.BackendOptions{{Port: ports[1]}},
272 },
273 })
274 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
275 defer cancel()
276 if err := managementServer.Update(ctx, resources); err != nil {
277 t.Fatal(err)
278 }
279
280
281 client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
282 if err != nil {
283 t.Fatalf("Failed to create xDS client: %v", err)
284 }
285 defer close()
286
287
288
289 r := manual.NewBuilderWithScheme("whatever")
290 jsonSC := fmt.Sprintf(`{
291 "loadBalancingConfig":[{
292 "cluster_resolver_experimental":{
293 "discoveryMechanisms": [{
294 "cluster": "%s",
295 "type": "EDS",
296 "edsServiceName": "%s",
297 "outlierDetection": {}
298 }],
299 "xdsLbPolicy":[{"round_robin":{}}]
300 }
301 }]
302 }`, clusterName, edsServiceName)
303 scpr := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC)
304 r.InitialState(xdsclient.SetClient(resolver.State{ServiceConfig: scpr}, client))
305
306
307 cc, err := grpc.Dial(r.Scheme()+":///test.service", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
308 if err != nil {
309 t.Fatalf("failed to dial local test server: %v", err)
310 }
311 defer cc.Close()
312
313
314 testClient := testgrpc.NewTestServiceClient(cc)
315 if err := rrutil.CheckWeightedRoundRobinRPCs(ctx, testClient, addrs[0:2]); err != nil {
316 t.Fatal(err)
317 }
318
319
320
321 resources = clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{
322 {
323 Name: localityName1,
324 Weight: 1,
325 Backends: []e2e.BackendOptions{{Port: ports[0]}},
326 },
327 {
328 Name: localityName2,
329 Weight: 1,
330 Backends: []e2e.BackendOptions{{Port: ports[1]}},
331 },
332 {
333 Name: localityName3,
334 Weight: 1,
335 Backends: []e2e.BackendOptions{{Port: ports[2]}},
336 },
337 })
338 if err := managementServer.Update(ctx, resources); err != nil {
339 t.Fatal(err)
340 }
341 if err := rrutil.CheckWeightedRoundRobinRPCs(ctx, testClient, addrs[0:3]); err != nil {
342 t.Fatal(err)
343 }
344
345
346
347 resources = clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{
348 {
349 Name: localityName2,
350 Weight: 1,
351 Backends: []e2e.BackendOptions{{Port: ports[1]}},
352 },
353 {
354 Name: localityName3,
355 Weight: 1,
356 Backends: []e2e.BackendOptions{{Port: ports[2]}},
357 },
358 })
359 if err := managementServer.Update(ctx, resources); err != nil {
360 t.Fatal(err)
361 }
362 if err := rrutil.CheckWeightedRoundRobinRPCs(ctx, testClient, addrs[1:3]); err != nil {
363 t.Fatal(err)
364 }
365
366
367
368
369 resources = clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{
370 {
371 Name: localityName2,
372 Weight: 1,
373 Backends: []e2e.BackendOptions{{Port: ports[1]}},
374 },
375 {
376 Name: localityName3,
377 Weight: 1,
378 Backends: []e2e.BackendOptions{{Port: ports[2]}, {Port: ports[3]}},
379 },
380 })
381 if err := managementServer.Update(ctx, resources); err != nil {
382 t.Fatal(err)
383 }
384 wantAddrs := []resolver.Address{addrs[1], addrs[1], addrs[2], addrs[3]}
385 if err := rrutil.CheckWeightedRoundRobinRPCs(ctx, testClient, wantAddrs); err != nil {
386 t.Fatal(err)
387 }
388 }
389
390
391
392
393 func (s) TestEDS_EndpointsHealth(t *testing.T) {
394
395 managementServer, nodeID, bootstrapContents, _, cleanup1 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
396 defer cleanup1()
397
398
399 servers, cleanup2 := startTestServiceBackends(t, 12)
400 defer cleanup2()
401 addrs, ports := backendAddressesAndPorts(t, servers)
402
403
404
405
406 resources := clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{
407 {
408 Name: localityName1,
409 Weight: 1,
410 Backends: []e2e.BackendOptions{
411 {Port: ports[0], HealthStatus: v3corepb.HealthStatus_UNKNOWN},
412 {Port: ports[1], HealthStatus: v3corepb.HealthStatus_HEALTHY},
413 {Port: ports[2], HealthStatus: v3corepb.HealthStatus_UNHEALTHY},
414 {Port: ports[3], HealthStatus: v3corepb.HealthStatus_DRAINING},
415 {Port: ports[4], HealthStatus: v3corepb.HealthStatus_TIMEOUT},
416 {Port: ports[5], HealthStatus: v3corepb.HealthStatus_DEGRADED},
417 },
418 },
419 {
420 Name: localityName2,
421 Weight: 1,
422 Backends: []e2e.BackendOptions{
423 {Port: ports[6], HealthStatus: v3corepb.HealthStatus_UNKNOWN},
424 {Port: ports[7], HealthStatus: v3corepb.HealthStatus_HEALTHY},
425 {Port: ports[8], HealthStatus: v3corepb.HealthStatus_UNHEALTHY},
426 {Port: ports[9], HealthStatus: v3corepb.HealthStatus_DRAINING},
427 {Port: ports[10], HealthStatus: v3corepb.HealthStatus_TIMEOUT},
428 {Port: ports[11], HealthStatus: v3corepb.HealthStatus_DEGRADED},
429 },
430 },
431 })
432 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
433 defer cancel()
434 if err := managementServer.Update(ctx, resources); err != nil {
435 t.Fatal(err)
436 }
437
438
439 client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
440 if err != nil {
441 t.Fatalf("Failed to create xDS client: %v", err)
442 }
443 defer close()
444
445
446
447 r := manual.NewBuilderWithScheme("whatever")
448 jsonSC := fmt.Sprintf(`{
449 "loadBalancingConfig":[{
450 "cluster_resolver_experimental":{
451 "discoveryMechanisms": [{
452 "cluster": "%s",
453 "type": "EDS",
454 "edsServiceName": "%s",
455 "outlierDetection": {}
456 }],
457 "xdsLbPolicy":[{"round_robin":{}}]
458 }
459 }]
460 }`, clusterName, edsServiceName)
461 scpr := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC)
462 r.InitialState(xdsclient.SetClient(resolver.State{ServiceConfig: scpr}, client))
463
464
465 cc, err := grpc.Dial(r.Scheme()+":///test.service", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
466 if err != nil {
467 t.Fatalf("failed to dial local test server: %v", err)
468 }
469 defer cc.Close()
470
471
472
473 testClient := testgrpc.NewTestServiceClient(cc)
474 if err := rrutil.CheckWeightedRoundRobinRPCs(ctx, testClient, append(addrs[0:2], addrs[6:8]...)); err != nil {
475 t.Fatal(err)
476 }
477 }
478
479
480
481
482 func (s) TestEDS_EmptyUpdate(t *testing.T) {
483
484 managementServer, nodeID, bootstrapContents, _, cleanup1 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
485 defer cleanup1()
486
487
488 servers, cleanup2 := startTestServiceBackends(t, 4)
489 defer cleanup2()
490 addrs, ports := backendAddressesAndPorts(t, servers)
491
492 oldCacheTimeout := priority.DefaultSubBalancerCloseTimeout
493 priority.DefaultSubBalancerCloseTimeout = 100 * time.Microsecond
494 defer func() { priority.DefaultSubBalancerCloseTimeout = oldCacheTimeout }()
495
496
497
498 resources := clientEndpointsResource(nodeID, edsServiceName, nil)
499 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
500 defer cancel()
501 if err := managementServer.Update(ctx, resources); err != nil {
502 t.Fatal(err)
503 }
504
505
506 client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
507 if err != nil {
508 t.Fatalf("Failed to create xDS client: %v", err)
509 }
510 defer close()
511
512
513
514 r := manual.NewBuilderWithScheme("whatever")
515 jsonSC := fmt.Sprintf(`{
516 "loadBalancingConfig":[{
517 "cluster_resolver_experimental":{
518 "discoveryMechanisms": [{
519 "cluster": "%s",
520 "type": "EDS",
521 "edsServiceName": "%s",
522 "outlierDetection": {}
523 }],
524 "xdsLbPolicy":[{"round_robin":{}}]
525 }
526 }]
527 }`, clusterName, edsServiceName)
528 scpr := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC)
529 r.InitialState(xdsclient.SetClient(resolver.State{ServiceConfig: scpr}, client))
530
531
532
533
534 cc, err := grpc.Dial(r.Scheme()+":///test.service", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
535 if err != nil {
536 t.Fatalf("failed to dial local test server: %v", err)
537 }
538 defer cc.Close()
539 testClient := testgrpc.NewTestServiceClient(cc)
540 if err := waitForProducedZeroAddressesError(ctx, t, testClient); err != nil {
541 t.Fatal(err)
542 }
543
544
545 resources = clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{{
546 Name: localityName1,
547 Weight: 1,
548 Backends: []e2e.BackendOptions{{Port: ports[0]}},
549 }})
550 if err := managementServer.Update(ctx, resources); err != nil {
551 t.Fatal(err)
552 }
553 if err := rrutil.CheckRoundRobinRPCs(ctx, testClient, addrs[:1]); err != nil {
554 t.Fatal(err)
555 }
556
557
558
559 resources = clientEndpointsResource(nodeID, edsServiceName, nil)
560 if err := managementServer.Update(ctx, resources); err != nil {
561 t.Fatal(err)
562 }
563 if err := waitForProducedZeroAddressesError(ctx, t, testClient); err != nil {
564 t.Fatal(err)
565 }
566 }
567
568
569
570
571
572 func (s) TestEDS_ResourceRemoved(t *testing.T) {
573
574
575
576
577
578
579 edsResourceRequestedCh := make(chan struct{}, 1)
580 edsResourceCanceledCh := make(chan struct{}, 1)
581 managementServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{
582 OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error {
583 if req.GetTypeUrl() == version.V3EndpointsURL {
584 switch len(req.GetResourceNames()) {
585 case 0:
586 select {
587 case edsResourceCanceledCh <- struct{}{}:
588 default:
589 }
590 case 1:
591 if req.GetResourceNames()[0] == edsServiceName {
592 select {
593 case edsResourceRequestedCh <- struct{}{}:
594 default:
595 }
596 }
597 default:
598 t.Errorf("Unexpected number of resources, %d, in an EDS request", len(req.GetResourceNames()))
599 }
600 }
601 return nil
602 },
603 })
604 defer cleanup()
605
606 server := stubserver.StartTestService(t, nil)
607 defer server.Stop()
608
609
610 resources := e2e.UpdateOptions{
611 NodeID: nodeID,
612 Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, edsServiceName, e2e.SecurityLevelNone)},
613 Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(edsServiceName, "localhost", []uint32{testutils.ParsePort(t, server.Address)})},
614 SkipValidation: true,
615 }
616 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
617 defer cancel()
618 if err := managementServer.Update(ctx, resources); err != nil {
619 t.Fatal(err)
620 }
621
622
623
624 cc, cleanup := setupAndDial(t, bootstrapContents)
625 defer cleanup()
626
627 client := testgrpc.NewTestServiceClient(cc)
628 if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
629 t.Fatalf("EmptyCall() failed: %v", err)
630 }
631
632
633 resources.Endpoints = nil
634 if err := managementServer.Update(ctx, resources); err != nil {
635 t.Fatal(err)
636 }
637
638
639
640 for end := time.Now().Add(time.Second); time.Now().Before(end); <-time.After(defaultTestShortTimeout) {
641 if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
642 t.Fatalf("EmptyCall() failed: %v", err)
643 }
644 select {
645 case <-edsResourceCanceledCh:
646 t.Fatal("EDS watch canceled when not expected to be canceled")
647 default:
648 }
649 }
650 }
651
652
653
654
655
656 func (s) TestEDS_ClusterResourceDoesNotContainEDSServiceName(t *testing.T) {
657 edsResourceCh := make(chan string, 1)
658 managementServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{
659 OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error {
660 if req.GetTypeUrl() != version.V3EndpointsURL {
661 return nil
662 }
663 if len(req.GetResourceNames()) > 0 {
664 select {
665 case edsResourceCh <- req.GetResourceNames()[0]:
666 default:
667 }
668 }
669 return nil
670 },
671 })
672 defer cleanup()
673
674 server := stubserver.StartTestService(t, nil)
675 defer server.Stop()
676
677
678 resources := e2e.UpdateOptions{
679 NodeID: nodeID,
680 Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, "", e2e.SecurityLevelNone)},
681 Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(clusterName, "localhost", []uint32{testutils.ParsePort(t, server.Address)})},
682 SkipValidation: true,
683 }
684 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
685 defer cancel()
686 if err := managementServer.Update(ctx, resources); err != nil {
687 t.Fatal(err)
688 }
689
690
691
692 cc, cleanup := setupAndDial(t, bootstrapContents)
693 defer cleanup()
694
695 client := testgrpc.NewTestServiceClient(cc)
696 if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
697 t.Fatalf("EmptyCall() failed: %v", err)
698 }
699
700 select {
701 case <-ctx.Done():
702 t.Fatal("Timeout when waiting for EDS request to be received on the management server")
703 case name := <-edsResourceCh:
704 if name != clusterName {
705 t.Fatalf("Received EDS request with resource name %q, want %q", name, clusterName)
706 }
707 }
708 }
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725 func (s) TestEDS_ClusterResourceUpdates(t *testing.T) {
726 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
727 defer cancel()
728
729
730
731 edsResourceNameCh := make(chan []string, 1)
732 managementServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{
733 OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error {
734 if req.GetTypeUrl() != version.V3EndpointsURL {
735 return nil
736 }
737 if len(req.GetResourceNames()) == 0 {
738
739 return nil
740 }
741 select {
742 case <-ctx.Done():
743 case edsResourceNameCh <- req.GetResourceNames():
744 }
745 return nil
746 },
747 AllowResourceSubset: true,
748 })
749 defer cleanup()
750
751
752
753
754
755 servers, cleanup2 := startTestServiceBackends(t, 2)
756 defer cleanup2()
757 addrs, ports := backendAddressesAndPorts(t, servers)
758
759
760 resources := e2e.UpdateOptions{
761 NodeID: nodeID,
762 Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, edsServiceName, e2e.SecurityLevelNone)},
763 Endpoints: []*v3endpointpb.ClusterLoadAssignment{
764 e2e.DefaultEndpoint(edsServiceName, "localhost", []uint32{uint32(ports[0])}),
765 e2e.DefaultEndpoint(clusterName, "localhost", []uint32{uint32(ports[1])}),
766 },
767 SkipValidation: true,
768 }
769 if err := managementServer.Update(ctx, resources); err != nil {
770 t.Fatal(err)
771 }
772
773
774
775 cc, cleanup := setupAndDial(t, bootstrapContents)
776 defer cleanup()
777
778 client := testgrpc.NewTestServiceClient(cc)
779 peer := &peer.Peer{}
780 if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer)); err != nil {
781 t.Fatalf("EmptyCall() failed: %v", err)
782 }
783 if peer.Addr.String() != addrs[0].Addr {
784 t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, addrs[0].Addr)
785 }
786
787
788 select {
789 case <-ctx.Done():
790 t.Fatal("Timeout when waiting for EDS request to be received on the management server")
791 case names := <-edsResourceNameCh:
792 if !cmp.Equal(names, []string{edsServiceName}) {
793 t.Fatalf("Received EDS request with resource names %v, want %v", names, []string{edsServiceName})
794 }
795 }
796
797
798 resources.Clusters = []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, "", e2e.SecurityLevelNone)}
799 if err := managementServer.Update(ctx, resources); err != nil {
800 t.Fatal(err)
801 }
802
803
804
805
806
807
808
809 for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) {
810 names := <-edsResourceNameCh
811 if cmp.Equal(names, []string{clusterName}) {
812 break
813 }
814 }
815 if ctx.Err() != nil {
816 t.Fatalf("Timeout when waiting for old EDS watch %q to be canceled and new one %q to be registered", edsServiceName, clusterName)
817 }
818
819
820
821 for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) {
822 if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer)); err != nil {
823 continue
824 }
825 if peer.Addr.String() == addrs[1].Addr {
826 break
827 }
828 }
829 if ctx.Err() != nil {
830 t.Fatalf("Timeout when waiting for EmptyCall() to be routed to correct backend %q", addrs[1].Addr)
831 }
832
833
834 resources.Clusters[0].CircuitBreakers = &v3clusterpb.CircuitBreakers{
835 Thresholds: []*v3clusterpb.CircuitBreakers_Thresholds{
836 {
837 Priority: v3corepb.RoutingPriority_DEFAULT,
838 MaxRequests: wrapperspb.UInt32(512),
839 },
840 },
841 }
842 if err := managementServer.Update(ctx, resources); err != nil {
843 t.Fatal(err)
844 }
845
846
847
848 for end := time.Now().Add(time.Second); time.Now().Before(end); <-time.After(defaultTestShortTimeout) {
849 if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer)); err != nil {
850 t.Fatalf("EmptyCall() failed: %v", err)
851 }
852 if peer.Addr.String() != addrs[1].Addr {
853 t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, addrs[1].Addr)
854 }
855 }
856 }
857
858
859
860
861
862
863
864 func (s) TestEDS_BadUpdateWithoutPreviousGoodUpdate(t *testing.T) {
865
866 mgmtServer, nodeID, bootstrapContents, _, cleanup1 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
867 defer cleanup1()
868
869
870 server := stubserver.StartTestService(t, nil)
871 defer server.Stop()
872
873
874
875
876
877 resources := clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{{
878 Name: localityName1,
879 Weight: 1,
880 Backends: []e2e.BackendOptions{{Port: testutils.ParsePort(t, server.Address)}},
881 }})
882 resources.Endpoints[0].Endpoints[0].LbEndpoints[0].LoadBalancingWeight = &wrapperspb.UInt32Value{Value: 0}
883 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
884 defer cancel()
885 if err := mgmtServer.Update(ctx, resources); err != nil {
886 t.Fatal(err)
887 }
888
889
890 xdsClient, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
891 if err != nil {
892 t.Fatalf("Failed to create xDS client: %v", err)
893 }
894 defer close()
895
896
897
898 r := manual.NewBuilderWithScheme("whatever")
899 jsonSC := fmt.Sprintf(`{
900 "loadBalancingConfig":[{
901 "cluster_resolver_experimental":{
902 "discoveryMechanisms": [{
903 "cluster": "%s",
904 "type": "EDS",
905 "edsServiceName": "%s",
906 "outlierDetection": {}
907 }],
908 "xdsLbPolicy":[{"round_robin":{}}]
909 }
910 }]
911 }`, clusterName, edsServiceName)
912 scpr := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC)
913 r.InitialState(xdsclient.SetClient(resolver.State{ServiceConfig: scpr}, xdsClient))
914
915
916
917 cc, err := grpc.Dial(r.Scheme()+":///test.service", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
918 if err != nil {
919 t.Fatalf("failed to dial local test server: %v", err)
920 }
921 defer cc.Close()
922 client := testgrpc.NewTestServiceClient(cc)
923 if err := waitForProducedZeroAddressesError(ctx, t, client); err != nil {
924 t.Fatal(err)
925 }
926 }
927
928
929
930
931
932
933
934 func (s) TestEDS_BadUpdateWithPreviousGoodUpdate(t *testing.T) {
935
936 mgmtServer, nodeID, bootstrapContents, _, cleanup1 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
937 defer cleanup1()
938
939
940 server := stubserver.StartTestService(t, nil)
941 defer server.Stop()
942
943
944 resources := clientEndpointsResource(nodeID, edsServiceName, []e2e.LocalityOptions{{
945 Name: localityName1,
946 Weight: 1,
947 Backends: []e2e.BackendOptions{{Port: testutils.ParsePort(t, server.Address)}},
948 }})
949 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
950 defer cancel()
951 if err := mgmtServer.Update(ctx, resources); err != nil {
952 t.Fatal(err)
953 }
954
955
956 xdsClient, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
957 if err != nil {
958 t.Fatalf("Failed to create xDS client: %v", err)
959 }
960 defer close()
961
962
963
964 r := manual.NewBuilderWithScheme("whatever")
965 jsonSC := fmt.Sprintf(`{
966 "loadBalancingConfig":[{
967 "cluster_resolver_experimental":{
968 "discoveryMechanisms": [{
969 "cluster": "%s",
970 "type": "EDS",
971 "edsServiceName": "%s",
972 "outlierDetection": {}
973 }],
974 "xdsLbPolicy":[{"round_robin":{}}]
975 }
976 }]
977 }`, clusterName, edsServiceName)
978 scpr := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC)
979 r.InitialState(xdsclient.SetClient(resolver.State{ServiceConfig: scpr}, xdsClient))
980
981
982 cc, err := grpc.Dial(r.Scheme()+":///test.service", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
983 if err != nil {
984 t.Fatalf("failed to dial local test server: %v", err)
985 }
986 defer cc.Close()
987
988
989 client := testgrpc.NewTestServiceClient(cc)
990 if err := rrutil.CheckRoundRobinRPCs(ctx, client, []resolver.Address{{Addr: server.Address}}); err != nil {
991 t.Fatal(err)
992 }
993
994
995
996
997
998 resources.Endpoints[0].Endpoints[0].LbEndpoints[0].LoadBalancingWeight = &wrapperspb.UInt32Value{Value: 0}
999 if err := mgmtServer.Update(ctx, resources); err != nil {
1000 t.Fatal(err)
1001 }
1002
1003
1004 for end := time.Now().Add(time.Second); time.Now().Before(end); <-time.After(defaultTestShortTimeout) {
1005 if err := rrutil.CheckRoundRobinRPCs(ctx, client, []resolver.Address{{Addr: server.Address}}); err != nil {
1006 t.Fatal(err)
1007 }
1008 }
1009 }
1010
1011
1012
1013
1014
1015
1016
1017 func (s) TestEDS_ResourceNotFound(t *testing.T) {
1018
1019 mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{})
1020 if err != nil {
1021 t.Fatalf("Failed to spin up the xDS management server: %v", err)
1022 }
1023 defer mgmtServer.Stop()
1024
1025
1026
1027 nodeID := uuid.New().String()
1028 xdsClient, close, err := xdsclient.NewWithConfigForTesting(&bootstrap.Config{
1029 XDSServer: xdstestutils.ServerConfigForAddress(t, mgmtServer.Address),
1030 NodeProto: &v3corepb.Node{Id: nodeID},
1031 }, defaultTestWatchExpiryTimeout, time.Duration(0))
1032 if err != nil {
1033 t.Fatalf("failed to create xds client: %v", err)
1034 }
1035 defer close()
1036
1037
1038 resources := e2e.UpdateOptions{NodeID: nodeID, SkipValidation: true}
1039 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
1040 defer cancel()
1041 if err := mgmtServer.Update(ctx, resources); err != nil {
1042 t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err)
1043 }
1044
1045
1046
1047 r := manual.NewBuilderWithScheme("whatever")
1048 jsonSC := fmt.Sprintf(`{
1049 "loadBalancingConfig":[{
1050 "cluster_resolver_experimental":{
1051 "discoveryMechanisms": [{
1052 "cluster": "%s",
1053 "type": "EDS",
1054 "edsServiceName": "%s",
1055 "outlierDetection": {}
1056 }],
1057 "xdsLbPolicy":[{"round_robin":{}}]
1058 }
1059 }]
1060 }`, clusterName, edsServiceName)
1061 scpr := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC)
1062 r.InitialState(xdsclient.SetClient(resolver.State{ServiceConfig: scpr}, xdsClient))
1063
1064
1065
1066 cc, err := grpc.Dial(r.Scheme()+":///test.service", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
1067 if err != nil {
1068 t.Fatalf("failed to dial local test server: %v", err)
1069 }
1070 defer cc.Close()
1071 client := testgrpc.NewTestServiceClient(cc)
1072 if err := waitForProducedZeroAddressesError(ctx, t, client); err != nil {
1073 t.Fatal(err)
1074 }
1075 }
1076
1077
1078
1079
1080
1081 func waitForProducedZeroAddressesError(ctx context.Context, t *testing.T, client testgrpc.TestServiceClient) error {
1082 for ; ctx.Err() == nil; <-time.After(time.Millisecond) {
1083 _, err := client.EmptyCall(ctx, &testpb.Empty{})
1084 if err == nil {
1085 t.Log("EmptyCall() succeeded after error in Discovery Mechanism")
1086 continue
1087 }
1088 if code := status.Code(err); code != codes.Unavailable {
1089 t.Logf("EmptyCall() returned code: %v, want: %v", code, codes.Unavailable)
1090 continue
1091 }
1092 if !strings.Contains(err.Error(), "produced zero addresses") {
1093 t.Logf("EmptyCall() = %v, want %v", err, "produced zero addresses")
1094 continue
1095 }
1096 return nil
1097 }
1098 return errors.New("timeout when waiting for RPCs to fail with UNAVAILABLE status and produced zero addresses")
1099 }
1100
View as plain text