1
16
17 package e2e_test
18
19 import (
20 "context"
21 "fmt"
22 "net"
23 "sort"
24 "strconv"
25 "strings"
26 "testing"
27 "time"
28
29 "github.com/google/go-cmp/cmp"
30 "google.golang.org/grpc"
31 "google.golang.org/grpc/codes"
32 "google.golang.org/grpc/connectivity"
33 "google.golang.org/grpc/credentials/insecure"
34 "google.golang.org/grpc/internal"
35 "google.golang.org/grpc/internal/stubserver"
36 "google.golang.org/grpc/internal/testutils/pickfirst"
37 "google.golang.org/grpc/internal/testutils/xds/e2e"
38 "google.golang.org/grpc/internal/xds/bootstrap"
39 "google.golang.org/grpc/peer"
40 "google.golang.org/grpc/resolver"
41 "google.golang.org/grpc/resolver/manual"
42 "google.golang.org/grpc/serviceconfig"
43 "google.golang.org/grpc/status"
44 xdstestutils "google.golang.org/grpc/xds/internal/testutils"
45 "google.golang.org/grpc/xds/internal/xdsclient"
46 "google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version"
47 "google.golang.org/protobuf/types/known/wrapperspb"
48
49 v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
50 v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
51 v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
52 v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
53 testgrpc "google.golang.org/grpc/interop/grpc_testing"
54 testpb "google.golang.org/grpc/interop/grpc_testing"
55 )
56
57
58
59 func makeAggregateClusterResource(name string, childNames []string) *v3clusterpb.Cluster {
60 return e2e.ClusterResourceWithOptions(e2e.ClusterOptions{
61 ClusterName: name,
62 Type: e2e.ClusterTypeAggregate,
63 ChildNames: childNames,
64 })
65 }
66
67
68
69 func makeLogicalDNSClusterResource(name, dnsHost string, dnsPort uint32) *v3clusterpb.Cluster {
70 return e2e.ClusterResourceWithOptions(e2e.ClusterOptions{
71 ClusterName: name,
72 Type: e2e.ClusterTypeLogicalDNS,
73 DNSHostName: dnsHost,
74 DNSPort: dnsPort,
75 })
76 }
77
78
79
80
81
82
83
84
85
86
87
88
89 func setupDNS() (chan resolver.Target, chan struct{}, chan resolver.ResolveNowOptions, *manual.Resolver, func()) {
90 targetCh := make(chan resolver.Target, 1)
91 closeCh := make(chan struct{}, 1)
92 resolveNowCh := make(chan resolver.ResolveNowOptions, 1)
93
94 mr := manual.NewBuilderWithScheme("dns")
95 mr.BuildCallback = func(target resolver.Target, _ resolver.ClientConn, _ resolver.BuildOptions) { targetCh <- target }
96 mr.CloseCallback = func() { closeCh <- struct{}{} }
97 mr.ResolveNowCallback = func(opts resolver.ResolveNowOptions) { resolveNowCh <- opts }
98
99 dnsResolverBuilder := resolver.Get("dns")
100 resolver.Register(mr)
101
102 return targetCh, closeCh, resolveNowCh, mr, func() { resolver.Register(dnsResolverBuilder) }
103 }
104
105
106
107
108
109
110 func (s) TestAggregateCluster_WithTwoEDSClusters(t *testing.T) {
111 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
112 defer cancel()
113
114
115
116 edsResourceNameCh := make(chan []string, 1)
117 managementServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{
118 OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error {
119 if req.GetTypeUrl() != version.V3EndpointsURL {
120 return nil
121 }
122 if len(req.GetResourceNames()) == 0 {
123
124 return nil
125 }
126 select {
127 case edsResourceNameCh <- req.GetResourceNames():
128 case <-ctx.Done():
129 }
130 return nil
131 },
132 AllowResourceSubset: true,
133 })
134 defer cleanup()
135
136
137
138
139 servers, cleanup2 := startTestServiceBackends(t, 2)
140 defer cleanup2()
141 addrs, ports := backendAddressesAndPorts(t, servers)
142
143
144
145
146 const clusterName1 = clusterName + "-cluster-1"
147 const clusterName2 = clusterName + "-cluster-2"
148 resources := e2e.UpdateOptions{
149 NodeID: nodeID,
150 Clusters: []*v3clusterpb.Cluster{
151 makeAggregateClusterResource(clusterName, []string{clusterName1, clusterName2}),
152 e2e.DefaultCluster(clusterName1, "", e2e.SecurityLevelNone),
153 e2e.DefaultCluster(clusterName2, "", e2e.SecurityLevelNone),
154 },
155 Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(clusterName1, "localhost", []uint32{uint32(ports[0])})},
156 SkipValidation: true,
157 }
158 if err := managementServer.Update(ctx, resources); err != nil {
159 t.Fatal(err)
160 }
161
162
163
164 cc, cleanup := setupAndDial(t, bootstrapContents)
165 defer cleanup()
166
167
168 func() {
169 for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) {
170 select {
171 case names := <-edsResourceNameCh:
172
173
174 sortedNames := make([]string, len(names))
175 copy(sortedNames, names)
176 sort.Strings(sortedNames)
177 if cmp.Equal(sortedNames, []string{clusterName1, clusterName2}) {
178 return
179 }
180 default:
181 }
182 }
183 }()
184 if ctx.Err() != nil {
185 t.Fatalf("Timeout when waiting for all EDS resources %v to be requested", []string{clusterName1, clusterName2})
186 }
187
188
189
190
191 client := testgrpc.NewTestServiceClient(cc)
192 sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
193 defer sCancel()
194 if _, err := client.EmptyCall(sCtx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded {
195 t.Fatalf("EmptyCall() code %s, want %s", status.Code(err), codes.DeadlineExceeded)
196 }
197
198
199 resources.Endpoints = append(resources.Endpoints, e2e.DefaultEndpoint(clusterName2, "localhost", []uint32{uint32(ports[1])}))
200 if err := managementServer.Update(ctx, resources); err != nil {
201 t.Fatal(err)
202 }
203
204
205
206 peer := &peer.Peer{}
207 if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil {
208 t.Fatalf("EmptyCall() failed: %v", err)
209 }
210 if peer.Addr.String() != addrs[0].Addr {
211 t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, addrs[0].Addr)
212 }
213 }
214
215
216
217
218 func (s) TestAggregateCluster_WithTwoEDSClusters_PrioritiesChange(t *testing.T) {
219
220 managementServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
221 defer cleanup()
222
223
224
225
226 servers, cleanup2 := startTestServiceBackends(t, 2)
227 defer cleanup2()
228 addrs, ports := backendAddressesAndPorts(t, servers)
229
230
231
232 const clusterName1 = clusterName + "cluster-1"
233 const clusterName2 = clusterName + "cluster-2"
234 resources := e2e.UpdateOptions{
235 NodeID: nodeID,
236 Clusters: []*v3clusterpb.Cluster{
237 makeAggregateClusterResource(clusterName, []string{clusterName1, clusterName2}),
238 e2e.DefaultCluster(clusterName1, "", e2e.SecurityLevelNone),
239 e2e.DefaultCluster(clusterName2, "", e2e.SecurityLevelNone),
240 },
241 Endpoints: []*v3endpointpb.ClusterLoadAssignment{
242 e2e.DefaultEndpoint(clusterName1, "localhost", []uint32{uint32(ports[0])}),
243 e2e.DefaultEndpoint(clusterName2, "localhost", []uint32{uint32(ports[1])}),
244 },
245 SkipValidation: true,
246 }
247 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
248 defer cancel()
249 if err := managementServer.Update(ctx, resources); err != nil {
250 t.Fatal(err)
251 }
252
253
254
255 cc, cleanup := setupAndDial(t, bootstrapContents)
256 defer cleanup()
257
258
259
260 client := testgrpc.NewTestServiceClient(cc)
261 peer := &peer.Peer{}
262 if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil {
263 t.Fatalf("EmptyCall() failed: %v", err)
264 }
265 if peer.Addr.String() != addrs[0].Addr {
266 t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, addrs[0].Addr)
267 }
268
269
270 resources.Clusters = []*v3clusterpb.Cluster{
271 makeAggregateClusterResource(clusterName, []string{clusterName2, clusterName1}),
272 e2e.DefaultCluster(clusterName1, "", e2e.SecurityLevelNone),
273 e2e.DefaultCluster(clusterName2, "", e2e.SecurityLevelNone),
274 }
275 if err := managementServer.Update(ctx, resources); err != nil {
276 t.Fatal(err)
277 }
278
279
280
281 for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) {
282 if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil {
283 t.Fatalf("EmptyCall() failed: %v", err)
284 }
285 if peer.Addr.String() == addrs[1].Addr {
286 break
287 }
288 }
289 if ctx.Err() != nil {
290 t.Fatal("Timeout waiting for RPCs to be routed to cluster-2 after priority switch")
291 }
292 }
293
294 func hostAndPortFromAddress(t *testing.T, addr string) (string, uint32) {
295 t.Helper()
296
297 host, p, err := net.SplitHostPort(addr)
298 if err != nil {
299 t.Fatalf("Invalid serving address: %v", addr)
300 }
301 port, err := strconv.ParseUint(p, 10, 32)
302 if err != nil {
303 t.Fatalf("Invalid serving port %q: %v", p, err)
304 }
305 return host, uint32(port)
306 }
307
308
309
310
311
312 func (s) TestAggregateCluster_WithOneDNSCluster(t *testing.T) {
313
314 managementServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
315 defer cleanup2()
316
317
318 server := stubserver.StartTestService(t, nil)
319 defer server.Stop()
320 host, port := hostAndPortFromAddress(t, server.Address)
321
322
323 const dnsClusterName = clusterName + "-dns"
324 resources := e2e.UpdateOptions{
325 NodeID: nodeID,
326 Clusters: []*v3clusterpb.Cluster{
327 makeAggregateClusterResource(clusterName, []string{dnsClusterName}),
328 makeLogicalDNSClusterResource(dnsClusterName, host, uint32(port)),
329 },
330 SkipValidation: true,
331 }
332 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
333 defer cancel()
334 if err := managementServer.Update(ctx, resources); err != nil {
335 t.Fatal(err)
336 }
337
338
339
340 cc, cleanup := setupAndDial(t, bootstrapContents)
341 defer cleanup()
342
343
344
345 client := testgrpc.NewTestServiceClient(cc)
346 peer := &peer.Peer{}
347 if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil {
348 t.Fatalf("EmptyCall() failed: %v", err)
349 }
350 if peer.Addr.String() != server.Address {
351 t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, server.Address)
352 }
353 }
354
355
356
357
358
359 func (s) TestAggregateCluster_WithOneDNSCluster_ParseFailure(t *testing.T) {
360
361 managementServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
362 defer cleanup2()
363
364
365 const dnsClusterName = clusterName + "-dns"
366 resources := e2e.UpdateOptions{
367 NodeID: nodeID,
368 Clusters: []*v3clusterpb.Cluster{
369 makeAggregateClusterResource(clusterName, []string{dnsClusterName}),
370 makeLogicalDNSClusterResource(dnsClusterName, "%gh&%ij", uint32(8080)),
371 },
372 SkipValidation: true,
373 }
374 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
375 defer cancel()
376 if err := managementServer.Update(ctx, resources); err != nil {
377 t.Fatal(err)
378 }
379
380
381
382 cc, cleanup := setupAndDial(t, bootstrapContents)
383 defer cleanup()
384
385
386 for state := cc.GetState(); state != connectivity.TransientFailure; state = cc.GetState() {
387 if !cc.WaitForStateChange(ctx, state) {
388 t.Fatalf("Timed out waiting for state change. got %v; want %v", state, connectivity.TransientFailure)
389 }
390 }
391 }
392
393
394
395
396
397
398 func (s) TestAggregateCluster_WithOneDNSCluster_HostnameChange(t *testing.T) {
399
400 managementServer, nodeID, bootstrapContents, _, cleanup1 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
401 defer cleanup1()
402
403
404
405
406 servers, cleanup2 := startTestServiceBackends(t, 2)
407 defer cleanup2()
408
409
410 const dnsClusterName = clusterName + "-dns"
411 dnsHostName, dnsPort := hostAndPortFromAddress(t, servers[0].Address)
412 resources := e2e.UpdateOptions{
413 NodeID: nodeID,
414 Clusters: []*v3clusterpb.Cluster{
415 makeAggregateClusterResource(clusterName, []string{dnsClusterName}),
416 makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort),
417 },
418 SkipValidation: true,
419 }
420 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
421 defer cancel()
422 if err := managementServer.Update(ctx, resources); err != nil {
423 t.Fatal(err)
424 }
425
426
427
428 cc, cleanup := setupAndDial(t, bootstrapContents)
429 defer cleanup()
430
431
432
433 client := testgrpc.NewTestServiceClient(cc)
434 peer := &peer.Peer{}
435 if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil {
436 t.Fatalf("EmptyCall() failed: %v", err)
437 }
438 if peer.Addr.String() != servers[0].Address {
439 t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, servers[0].Address)
440 }
441
442
443 dnsHostName, dnsPort = hostAndPortFromAddress(t, servers[1].Address)
444 resources = e2e.UpdateOptions{
445 NodeID: nodeID,
446 Clusters: []*v3clusterpb.Cluster{
447 makeAggregateClusterResource(clusterName, []string{dnsClusterName}),
448 makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort),
449 },
450 SkipValidation: true,
451 }
452 if err := managementServer.Update(ctx, resources); err != nil {
453 t.Fatal(err)
454 }
455
456
457 for ctx.Err() == nil {
458 if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer)); err != nil {
459 t.Fatalf("EmptyCall() failed: %v", err)
460 }
461 if peer.Addr.String() == servers[1].Address {
462 break
463 }
464 }
465 if ctx.Err() != nil {
466 t.Fatal("Timeout when waiting for RPCs to switch to the second backend")
467 }
468 }
469
470
471
472
473
474 func (s) TestAggregateCluster_WithEDSAndDNS(t *testing.T) {
475 dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS()
476 defer cleanup1()
477
478
479
480 edsResourceCh := make(chan string, 1)
481 managementServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{
482 OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error {
483 if req.GetTypeUrl() != version.V3EndpointsURL {
484 return nil
485 }
486 if len(req.GetResourceNames()) > 0 {
487 select {
488 case edsResourceCh <- req.GetResourceNames()[0]:
489 default:
490 }
491 }
492 return nil
493 },
494 AllowResourceSubset: true,
495 })
496 defer cleanup2()
497
498
499
500
501 servers, cleanup3 := startTestServiceBackends(t, 2)
502 defer cleanup3()
503 addrs, ports := backendAddressesAndPorts(t, servers)
504
505
506
507 const (
508 edsClusterName = clusterName + "-eds"
509 dnsClusterName = clusterName + "-dns"
510 dnsHostName = "dns_host"
511 dnsPort = uint32(8080)
512 )
513 resources := e2e.UpdateOptions{
514 NodeID: nodeID,
515 Clusters: []*v3clusterpb.Cluster{
516 makeAggregateClusterResource(clusterName, []string{edsClusterName, dnsClusterName}),
517 e2e.DefaultCluster(edsClusterName, "", e2e.SecurityLevelNone),
518 makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort),
519 },
520 Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(edsClusterName, "localhost", []uint32{uint32(ports[0])})},
521 SkipValidation: true,
522 }
523 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
524 defer cancel()
525 if err := managementServer.Update(ctx, resources); err != nil {
526 t.Fatal(err)
527 }
528
529
530
531 cc, cleanup := setupAndDial(t, bootstrapContents)
532 defer cleanup()
533
534
535 select {
536 case <-ctx.Done():
537 t.Fatal("Timeout when waiting for EDS request to be received on the management server")
538 case name := <-edsResourceCh:
539 if name != edsClusterName {
540 t.Fatalf("Received EDS request with resource name %q, want %q", name, edsClusterName)
541 }
542 }
543
544
545 select {
546 case <-ctx.Done():
547 t.Fatal("Timeout when waiting for DNS resolver to be started")
548 case target := <-dnsTargetCh:
549 got, want := target.Endpoint(), fmt.Sprintf("%s:%d", dnsHostName, dnsPort)
550 if got != want {
551 t.Fatalf("DNS resolution started for target %q, want %q", got, want)
552 }
553 }
554
555
556
557 client := testgrpc.NewTestServiceClient(cc)
558 sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
559 defer sCancel()
560 if _, err := client.EmptyCall(sCtx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded {
561 t.Fatalf("EmptyCall() code %s, want %s", status.Code(err), codes.DeadlineExceeded)
562 }
563
564
565 dnsR.UpdateState(resolver.State{Addresses: addrs[1:]})
566
567
568
569 peer := &peer.Peer{}
570 if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil {
571 t.Fatalf("EmptyCall() failed: %v", err)
572 }
573 if peer.Addr.String() != addrs[0].Addr {
574 t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, addrs[0].Addr)
575 }
576 }
577
578
579
580
581
582
583
584 func (s) TestAggregateCluster_SwitchEDSAndDNS(t *testing.T) {
585
586 managementServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
587 defer cleanup2()
588
589
590
591
592 servers, cleanup3 := startTestServiceBackends(t, 2)
593 defer cleanup3()
594 addrs, ports := backendAddressesAndPorts(t, servers)
595 dnsHostName, dnsPort := hostAndPortFromAddress(t, addrs[1].Addr)
596
597
598
599
600 const dnsClusterName = clusterName + "-dns"
601 resources := e2e.UpdateOptions{
602 NodeID: nodeID,
603 Clusters: []*v3clusterpb.Cluster{
604 makeAggregateClusterResource(clusterName, []string{edsServiceName}),
605 e2e.DefaultCluster(edsServiceName, "", e2e.SecurityLevelNone),
606 makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort),
607 },
608 Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(edsServiceName, "localhost", []uint32{uint32(ports[0])})},
609 SkipValidation: true,
610 }
611 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
612 defer cancel()
613 if err := managementServer.Update(ctx, resources); err != nil {
614 t.Fatal(err)
615 }
616
617
618
619 cc, cleanup := setupAndDial(t, bootstrapContents)
620 defer cleanup()
621
622
623 client := testgrpc.NewTestServiceClient(cc)
624 peer := &peer.Peer{}
625 if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil {
626 t.Fatalf("EmptyCall() failed: %v", err)
627 }
628 if peer.Addr.String() != addrs[0].Addr {
629 t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, addrs[0].Addr)
630 }
631
632
633 resources.Clusters = []*v3clusterpb.Cluster{
634 makeAggregateClusterResource(clusterName, []string{dnsClusterName}),
635 e2e.DefaultCluster(edsServiceName, "", e2e.SecurityLevelNone),
636 makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort),
637 }
638 if err := managementServer.Update(ctx, resources); err != nil {
639 t.Fatal(err)
640 }
641
642
643
644 for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) {
645 client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer))
646 if peer.Addr.String() == addrs[1].Addr {
647 break
648 }
649 }
650 if ctx.Err() != nil {
651 t.Fatalf("Timeout when waiting for RPCs to be routed to backend %q in the DNS cluster", addrs[1].Addr)
652 }
653 }
654
655
656
657
658
659
660
661
662
663
664 func (s) TestAggregateCluster_BadEDS_GoodToBadDNS(t *testing.T) {
665 dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS()
666 defer cleanup1()
667
668
669 managementServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
670 defer cleanup2()
671
672
673 servers, cleanup3 := startTestServiceBackends(t, 2)
674 defer cleanup3()
675 addrs, _ := backendAddressesAndPorts(t, servers)
676
677
678
679
680 const (
681 edsClusterName = clusterName + "-eds"
682 dnsClusterName = clusterName + "-dns"
683 dnsHostName = "dns_host"
684 dnsPort = uint32(8080)
685 )
686 emptyEndpointResource := e2e.DefaultEndpoint(edsServiceName, "localhost", nil)
687 resources := e2e.UpdateOptions{
688 NodeID: nodeID,
689 Clusters: []*v3clusterpb.Cluster{
690 makeAggregateClusterResource(clusterName, []string{edsClusterName, dnsClusterName}),
691 e2e.DefaultCluster(edsClusterName, edsServiceName, e2e.SecurityLevelNone),
692 makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort),
693 },
694 Endpoints: []*v3endpointpb.ClusterLoadAssignment{emptyEndpointResource},
695 SkipValidation: true,
696 }
697 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
698 defer cancel()
699 if err := managementServer.Update(ctx, resources); err != nil {
700 t.Fatal(err)
701 }
702
703
704
705 cc, cleanup := setupAndDial(t, bootstrapContents)
706 defer cleanup()
707
708
709
710
711 client := testgrpc.NewTestServiceClient(cc)
712 sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
713 defer sCancel()
714 if _, err := client.EmptyCall(sCtx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded {
715 t.Fatalf("EmptyCall() code %s, want %s", status.Code(err), codes.DeadlineExceeded)
716 }
717
718
719 select {
720 case <-ctx.Done():
721 t.Fatal("Timeout when waiting for DNS resolver to be started")
722 case target := <-dnsTargetCh:
723 got, want := target.Endpoint(), fmt.Sprintf("%s:%d", dnsHostName, dnsPort)
724 if got != want {
725 t.Fatalf("DNS resolution started for target %q, want %q", got, want)
726 }
727 }
728
729
730 dnsR.UpdateState(resolver.State{Addresses: addrs})
731
732
733
734 for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) {
735 peer := &peer.Peer{}
736 if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer)); err != nil {
737 t.Logf("EmptyCall() failed: %v", err)
738 continue
739 }
740 if peer.Addr.String() == addrs[0].Addr {
741 break
742 }
743 }
744 if ctx.Err() != nil {
745 t.Fatalf("Timeout when waiting for RPCs to be routed to backend %q in the DNS cluster", addrs[0].Addr)
746 }
747
748
749 dnsErr := fmt.Errorf("DNS error")
750 dnsR.ReportError(dnsErr)
751
752
753 for end := time.Now().Add(time.Second); time.Now().Before(end); <-time.After(defaultTestShortTimeout) {
754 peer := &peer.Peer{}
755 if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer)); err != nil {
756 t.Fatalf("EmptyCall() failed: %v", err)
757 }
758 if peer.Addr.String() != addrs[0].Addr {
759 t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, addrs[0].Addr)
760 }
761 }
762 }
763
764
765
766
767
768
769 func (s) TestAggregateCluster_BadEDSFromError_GoodToBadDNS(t *testing.T) {
770
771 managementServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
772 defer cleanup2()
773
774
775 server := stubserver.StartTestService(t, nil)
776 defer server.Stop()
777 dnsHostName, dnsPort := hostAndPortFromAddress(t, server.Address)
778
779
780
781
782 const (
783 edsClusterName = clusterName + "-eds"
784 dnsClusterName = clusterName + "-dns"
785 )
786 nackEndpointResource := e2e.DefaultEndpoint(edsServiceName, "localhost", nil)
787 nackEndpointResource.Endpoints = []*v3endpointpb.LocalityLbEndpoints{
788 {
789 LoadBalancingWeight: &wrapperspb.UInt32Value{
790 Value: 0,
791 },
792 },
793 }
794 resources := e2e.UpdateOptions{
795 NodeID: nodeID,
796 Clusters: []*v3clusterpb.Cluster{
797 makeAggregateClusterResource(clusterName, []string{edsClusterName, dnsClusterName}),
798 e2e.DefaultCluster(edsClusterName, edsServiceName, e2e.SecurityLevelNone),
799 makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort),
800 },
801 Endpoints: []*v3endpointpb.ClusterLoadAssignment{nackEndpointResource},
802 SkipValidation: true,
803 }
804 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
805 defer cancel()
806 if err := managementServer.Update(ctx, resources); err != nil {
807 t.Fatal(err)
808 }
809
810
811
812 cc, cleanup := setupAndDial(t, bootstrapContents)
813 defer cleanup()
814
815
816
817 pickfirst.CheckRPCsToBackend(ctx, cc, resolver.Address{Addr: server.Address})
818 }
819
820
821
822
823
824
825 func (s) TestAggregateCluster_BadDNS_GoodEDS(t *testing.T) {
826
827 managementServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
828 defer cleanup2()
829
830
831 server := stubserver.StartTestService(t, nil)
832 defer server.Stop()
833 _, edsPort := hostAndPortFromAddress(t, server.Address)
834
835
836
837 const (
838 edsClusterName = clusterName + "-eds"
839 dnsClusterName = clusterName + "-dns"
840 )
841 resources := e2e.UpdateOptions{
842 NodeID: nodeID,
843 Clusters: []*v3clusterpb.Cluster{
844 makeAggregateClusterResource(clusterName, []string{dnsClusterName, edsClusterName}),
845 makeLogicalDNSClusterResource(dnsClusterName, "bad.ip.v4.address", 8080),
846 e2e.DefaultCluster(edsClusterName, edsServiceName, e2e.SecurityLevelNone),
847 },
848 Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(edsServiceName, "localhost", []uint32{uint32(edsPort)})},
849 SkipValidation: true,
850 }
851 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
852 defer cancel()
853 if err := managementServer.Update(ctx, resources); err != nil {
854 t.Fatal(err)
855 }
856
857
858
859 cc, cleanup := setupAndDial(t, bootstrapContents)
860 defer cleanup()
861
862
863
864 client := testgrpc.NewTestServiceClient(cc)
865 peer := &peer.Peer{}
866 if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil {
867 t.Fatalf("EmptyCall() failed: %v", err)
868 }
869 if peer.Addr.String() != server.Address {
870 t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, server.Address)
871 }
872 }
873
874
875
876
877
878
879
880 func (s) TestAggregateCluster_BadEDS_BadDNS(t *testing.T) {
881
882 managementServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
883 defer cleanup2()
884
885
886
887
888 const (
889 edsClusterName = clusterName + "-eds"
890 dnsClusterName = clusterName + "-dns"
891 )
892 emptyEndpointResource := e2e.DefaultEndpoint(edsServiceName, "localhost", nil)
893 resources := e2e.UpdateOptions{
894 NodeID: nodeID,
895 Clusters: []*v3clusterpb.Cluster{
896 makeAggregateClusterResource(clusterName, []string{edsClusterName, dnsClusterName}),
897 e2e.DefaultCluster(edsClusterName, edsServiceName, e2e.SecurityLevelNone),
898 makeLogicalDNSClusterResource(dnsClusterName, "bad.ip.v4.address", 8080),
899 },
900 Endpoints: []*v3endpointpb.ClusterLoadAssignment{emptyEndpointResource},
901 SkipValidation: true,
902 }
903 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
904 defer cancel()
905 if err := managementServer.Update(ctx, resources); err != nil {
906 t.Fatal(err)
907 }
908
909
910
911 cc, cleanup := setupAndDial(t, bootstrapContents)
912 defer cleanup()
913
914
915
916 client := testgrpc.NewTestServiceClient(cc)
917 for ctx.Err() == nil {
918 _, err := client.EmptyCall(ctx, &testpb.Empty{})
919 if err == nil {
920 t.Fatal("EmptyCall() succeeded when expected to fail")
921 }
922 if status.Code(err) == codes.Unavailable && strings.Contains(err.Error(), "produced zero addresses") {
923 break
924 }
925 }
926 if ctx.Err() != nil {
927 t.Fatalf("Timeout when waiting for RPCs to fail with expected code and error")
928 }
929 }
930
931
932
933
934
935
936
937
938
939 func (s) TestAggregateCluster_NoFallback_EDSNackedWithPreviousGoodUpdate(t *testing.T) {
940
941 mgmtServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
942 defer cleanup2()
943
944
945
946
947 servers, cleanup3 := startTestServiceBackends(t, 2)
948 defer cleanup3()
949 addrs, ports := backendAddressesAndPorts(t, servers)
950 dnsHostName, dnsPort := hostAndPortFromAddress(t, servers[1].Address)
951
952
953
954 const (
955 edsClusterName = clusterName + "-eds"
956 dnsClusterName = clusterName + "-dns"
957 )
958 resources := e2e.UpdateOptions{
959 NodeID: nodeID,
960 Clusters: []*v3clusterpb.Cluster{
961 makeAggregateClusterResource(clusterName, []string{edsClusterName, dnsClusterName}),
962 e2e.DefaultCluster(edsClusterName, "", e2e.SecurityLevelNone),
963 makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort),
964 },
965 Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(edsClusterName, "localhost", []uint32{uint32(ports[0])})},
966 SkipValidation: true,
967 }
968 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
969 defer cancel()
970 if err := mgmtServer.Update(ctx, resources); err != nil {
971 t.Fatal(err)
972 }
973
974
975
976 cc, cleanup := setupAndDial(t, bootstrapContents)
977 defer cleanup()
978
979
980
981 client := testgrpc.NewTestServiceClient(cc)
982 peer := &peer.Peer{}
983 if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil {
984 t.Fatalf("EmptyCall() failed: %v", err)
985 }
986 if peer.Addr.String() != addrs[0].Addr {
987 t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, addrs[0].Addr)
988 }
989
990
991
992
993 resources.Endpoints[0].Endpoints[0].LbEndpoints[0].LoadBalancingWeight = &wrapperspb.UInt32Value{Value: 0}
994 if err := mgmtServer.Update(ctx, resources); err != nil {
995 t.Fatal(err)
996 }
997
998
999
1000 for end := time.Now().Add(time.Second); time.Now().Before(end); <-time.After(defaultTestShortTimeout) {
1001 if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer)); err != nil {
1002 t.Fatalf("EmptyCall() failed: %v", err)
1003 }
1004 if peer.Addr.String() != addrs[0].Addr {
1005 t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, addrs[0].Addr)
1006 }
1007 }
1008 }
1009
1010
1011
1012
1013
1014
1015
1016 func (s) TestAggregateCluster_Fallback_EDSNackedWithoutPreviousGoodUpdate(t *testing.T) {
1017
1018 mgmtServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
1019 defer cleanup2()
1020
1021
1022
1023
1024 servers, cleanup3 := startTestServiceBackends(t, 2)
1025 defer cleanup3()
1026 addrs, ports := backendAddressesAndPorts(t, servers)
1027 dnsHostName, dnsPort := hostAndPortFromAddress(t, servers[1].Address)
1028
1029
1030 const (
1031 edsClusterName = clusterName + "-eds"
1032 dnsClusterName = clusterName + "-dns"
1033 )
1034 resources := e2e.UpdateOptions{
1035 NodeID: nodeID,
1036 Clusters: []*v3clusterpb.Cluster{
1037 makeAggregateClusterResource(clusterName, []string{edsClusterName, dnsClusterName}),
1038 e2e.DefaultCluster(edsClusterName, "", e2e.SecurityLevelNone),
1039 makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort),
1040 },
1041 Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(edsClusterName, "localhost", []uint32{uint32(ports[0])})},
1042 SkipValidation: true,
1043 }
1044
1045
1046
1047
1048
1049 resources.Endpoints[0].Endpoints[0].LbEndpoints[0].LoadBalancingWeight = &wrapperspb.UInt32Value{Value: 0}
1050 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
1051 defer cancel()
1052 if err := mgmtServer.Update(ctx, resources); err != nil {
1053 t.Fatal(err)
1054 }
1055
1056
1057
1058 cc, cleanup := setupAndDial(t, bootstrapContents)
1059 defer cleanup()
1060
1061
1062 peer := &peer.Peer{}
1063 client := testgrpc.NewTestServiceClient(cc)
1064 if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil {
1065 t.Fatalf("EmptyCall() failed: %v", err)
1066 }
1067 if peer.Addr.String() != addrs[1].Addr {
1068 t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, addrs[1].Addr)
1069 }
1070 }
1071
1072
1073
1074
1075
1076
1077 func (s) TestAggregateCluster_Fallback_EDS_ResourceNotFound(t *testing.T) {
1078
1079 mgmtServer, nodeID, _, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true})
1080 defer cleanup2()
1081
1082
1083 server := stubserver.StartTestService(t, nil)
1084 defer server.Stop()
1085 dnsHostName, dnsPort := hostAndPortFromAddress(t, server.Address)
1086
1087
1088
1089 const (
1090 edsClusterName = clusterName + "-eds"
1091 dnsClusterName = clusterName + "-dns"
1092 )
1093 resources := e2e.UpdateOptions{
1094 NodeID: nodeID,
1095 Clusters: []*v3clusterpb.Cluster{
1096 makeAggregateClusterResource(clusterName, []string{edsClusterName, dnsClusterName}),
1097 e2e.DefaultCluster(edsClusterName, "", e2e.SecurityLevelNone),
1098 makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort),
1099 },
1100 SkipValidation: true,
1101 }
1102 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
1103 defer cancel()
1104 if err := mgmtServer.Update(ctx, resources); err != nil {
1105 t.Fatal(err)
1106 }
1107
1108
1109
1110 xdsClient, close, err := xdsclient.NewWithConfigForTesting(&bootstrap.Config{
1111 XDSServer: xdstestutils.ServerConfigForAddress(t, mgmtServer.Address),
1112 NodeProto: &v3corepb.Node{Id: nodeID},
1113 }, defaultTestWatchExpiryTimeout, time.Duration(0))
1114 if err != nil {
1115 t.Fatalf("failed to create xds client: %v", err)
1116 }
1117 defer close()
1118
1119
1120
1121
1122 r := manual.NewBuilderWithScheme("whatever")
1123 jsonSC := fmt.Sprintf(`{
1124 "loadBalancingConfig":[{
1125 "cds_experimental":{
1126 "cluster": "%s"
1127 }
1128 }]
1129 }`, clusterName)
1130 scpr := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC)
1131 r.InitialState(xdsclient.SetClient(resolver.State{ServiceConfig: scpr}, xdsClient))
1132
1133
1134 cc, err := grpc.Dial(r.Scheme()+":///test.service", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
1135 if err != nil {
1136 t.Fatalf("failed to dial local test server: %v", err)
1137 }
1138 defer cc.Close()
1139
1140
1141
1142
1143
1144
1145 peer := &peer.Peer{}
1146 client := testgrpc.NewTestServiceClient(cc)
1147 if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil {
1148 t.Fatalf("EmptyCall() failed: %v", err)
1149 }
1150 if peer.Addr.String() != server.Address {
1151 t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, server.Address)
1152 }
1153 }
1154
View as plain text