1
16
17 package cdsbalancer
18
19 import (
20 "context"
21 "encoding/json"
22 "fmt"
23 "strings"
24 "testing"
25 "time"
26
27 "google.golang.org/grpc"
28 "google.golang.org/grpc/codes"
29 "google.golang.org/grpc/connectivity"
30 "google.golang.org/grpc/internal/pretty"
31 "google.golang.org/grpc/internal/stubserver"
32 "google.golang.org/grpc/internal/testutils"
33 "google.golang.org/grpc/internal/testutils/xds/e2e"
34 "google.golang.org/grpc/serviceconfig"
35 "google.golang.org/grpc/status"
36 "google.golang.org/grpc/xds/internal/balancer/clusterresolver"
37
38 v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
39 v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
40 testgrpc "google.golang.org/grpc/interop/grpc_testing"
41 testpb "google.golang.org/grpc/interop/grpc_testing"
42 )
43
44
45
46 func makeAggregateClusterResource(name string, childNames []string) *v3clusterpb.Cluster {
47 return e2e.ClusterResourceWithOptions(e2e.ClusterOptions{
48 ClusterName: name,
49 Type: e2e.ClusterTypeAggregate,
50 ChildNames: childNames,
51 })
52 }
53
54
55
56 func makeLogicalDNSClusterResource(name, dnsHost string, dnsPort uint32) *v3clusterpb.Cluster {
57 return e2e.ClusterResourceWithOptions(e2e.ClusterOptions{
58 ClusterName: name,
59 Type: e2e.ClusterTypeLogicalDNS,
60 DNSHostName: dnsHost,
61 DNSPort: dnsPort,
62 })
63 }
64
65
66
67
68
69
70 func (s) TestAggregateClusterSuccess_LeafNode(t *testing.T) {
71 tests := []struct {
72 name string
73 firstClusterResource *v3clusterpb.Cluster
74 secondClusterResource *v3clusterpb.Cluster
75 wantFirstChildCfg serviceconfig.LoadBalancingConfig
76 wantSecondChildCfg serviceconfig.LoadBalancingConfig
77 }{
78 {
79 name: "eds",
80 firstClusterResource: e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelNone),
81 secondClusterResource: e2e.DefaultCluster(clusterName, serviceName+"-new", e2e.SecurityLevelNone),
82 wantFirstChildCfg: &clusterresolver.LBConfig{
83 DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{{
84 Cluster: clusterName,
85 Type: clusterresolver.DiscoveryMechanismTypeEDS,
86 EDSServiceName: serviceName,
87 OutlierDetection: json.RawMessage(`{}`),
88 }},
89 XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`),
90 },
91 wantSecondChildCfg: &clusterresolver.LBConfig{
92 DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{{
93 Cluster: clusterName,
94 Type: clusterresolver.DiscoveryMechanismTypeEDS,
95 EDSServiceName: serviceName + "-new",
96 OutlierDetection: json.RawMessage(`{}`),
97 }},
98 XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`),
99 },
100 },
101 {
102 name: "dns",
103 firstClusterResource: makeLogicalDNSClusterResource(clusterName, "dns_host", uint32(8080)),
104 secondClusterResource: makeLogicalDNSClusterResource(clusterName, "dns_host_new", uint32(8080)),
105 wantFirstChildCfg: &clusterresolver.LBConfig{
106 DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{{
107 Cluster: clusterName,
108 Type: clusterresolver.DiscoveryMechanismTypeLogicalDNS,
109 DNSHostname: "dns_host:8080",
110 OutlierDetection: json.RawMessage(`{}`),
111 }},
112 XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`),
113 },
114 wantSecondChildCfg: &clusterresolver.LBConfig{
115 DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{{
116 Cluster: clusterName,
117 Type: clusterresolver.DiscoveryMechanismTypeLogicalDNS,
118 DNSHostname: "dns_host_new:8080",
119 OutlierDetection: json.RawMessage(`{}`),
120 }},
121 XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`),
122 },
123 },
124 }
125
126 for _, test := range tests {
127 t.Run(test.name, func(t *testing.T) {
128 lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t)
129 mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t)
130
131
132
133 resources := e2e.UpdateOptions{
134 NodeID: nodeID,
135 Clusters: []*v3clusterpb.Cluster{test.firstClusterResource},
136 SkipValidation: true,
137 }
138 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
139 defer cancel()
140 if err := mgmtServer.Update(ctx, resources); err != nil {
141 t.Fatal(err)
142 }
143 if err := compareLoadBalancingConfig(ctx, lbCfgCh, test.wantFirstChildCfg); err != nil {
144 t.Fatal(err)
145 }
146
147
148
149 resources.Clusters[0] = test.secondClusterResource
150 if err := mgmtServer.Update(ctx, resources); err != nil {
151 t.Fatal(err)
152 }
153 if err := compareLoadBalancingConfig(ctx, lbCfgCh, test.wantSecondChildCfg); err != nil {
154 t.Fatal(err)
155 }
156 })
157 }
158 }
159
160
161
162
163
164
165
166
167
168
169
170 func (s) TestAggregateClusterSuccess_ThenUpdateChildClusters(t *testing.T) {
171 lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t)
172 mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t)
173
174
175
176
177
178 resources := e2e.UpdateOptions{
179 NodeID: nodeID,
180 Clusters: []*v3clusterpb.Cluster{
181 makeAggregateClusterResource(clusterName, []string{edsClusterName, dnsClusterName}),
182 e2e.DefaultCluster(edsClusterName, serviceName, e2e.SecurityLevelNone),
183 },
184 SkipValidation: true,
185 }
186 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
187 defer cancel()
188 if err := mgmtServer.Update(ctx, resources); err != nil {
189 t.Fatal(err)
190 }
191
192
193
194 select {
195 case cfg := <-lbCfgCh:
196 t.Fatalf("Child policy received configuration when not expected to: %s", pretty.ToJSON(cfg))
197 case <-time.After(defaultTestShortTimeout):
198 }
199
200
201
202 resources.Clusters = append(resources.Clusters, makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort))
203 if err := mgmtServer.Update(ctx, resources); err != nil {
204 t.Fatal(err)
205 }
206
207 wantChildCfg := &clusterresolver.LBConfig{
208 DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{
209 {
210 Cluster: edsClusterName,
211 Type: clusterresolver.DiscoveryMechanismTypeEDS,
212 EDSServiceName: serviceName,
213 OutlierDetection: json.RawMessage(`{}`),
214 },
215 {
216 Cluster: dnsClusterName,
217 Type: clusterresolver.DiscoveryMechanismTypeLogicalDNS,
218 DNSHostname: fmt.Sprintf("%s:%d", dnsHostName, dnsPort),
219 OutlierDetection: json.RawMessage(`{}`),
220 },
221 },
222 XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`),
223 }
224 if err := compareLoadBalancingConfig(ctx, lbCfgCh, wantChildCfg); err != nil {
225 t.Fatal(err)
226 }
227
228 const dnsClusterNameNew = dnsClusterName + "-new"
229 const dnsHostNameNew = dnsHostName + "-new"
230 resources = e2e.UpdateOptions{
231 NodeID: nodeID,
232 Clusters: []*v3clusterpb.Cluster{
233 makeAggregateClusterResource(clusterName, []string{edsClusterName, dnsClusterNameNew}),
234 e2e.DefaultCluster(edsClusterName, serviceName, e2e.SecurityLevelNone),
235 makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort),
236 makeLogicalDNSClusterResource(dnsClusterNameNew, dnsHostNameNew, dnsPort),
237 },
238 SkipValidation: true,
239 }
240 if err := mgmtServer.Update(ctx, resources); err != nil {
241 t.Fatal(err)
242 }
243 wantChildCfg = &clusterresolver.LBConfig{
244 DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{
245 {
246 Cluster: edsClusterName,
247 Type: clusterresolver.DiscoveryMechanismTypeEDS,
248 EDSServiceName: serviceName,
249 OutlierDetection: json.RawMessage(`{}`),
250 },
251 {
252 Cluster: dnsClusterNameNew,
253 Type: clusterresolver.DiscoveryMechanismTypeLogicalDNS,
254 DNSHostname: fmt.Sprintf("%s:%d", dnsHostNameNew, dnsPort),
255 OutlierDetection: json.RawMessage(`{}`),
256 },
257 },
258 XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`),
259 }
260 if err := compareLoadBalancingConfig(ctx, lbCfgCh, wantChildCfg); err != nil {
261 t.Fatal(err)
262 }
263 }
264
265
266
267
268
269
270
271
272
273 func (s) TestAggregateClusterSuccess_ThenChangeRootToEDS(t *testing.T) {
274 lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t)
275 mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t)
276
277
278
279 resources := e2e.UpdateOptions{
280 NodeID: nodeID,
281 Clusters: []*v3clusterpb.Cluster{
282 makeAggregateClusterResource(clusterName, []string{edsClusterName, dnsClusterName}),
283 e2e.DefaultCluster(edsClusterName, serviceName, e2e.SecurityLevelNone),
284 makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort),
285 },
286 SkipValidation: true,
287 }
288 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
289 defer cancel()
290 if err := mgmtServer.Update(ctx, resources); err != nil {
291 t.Fatal(err)
292 }
293
294 wantChildCfg := &clusterresolver.LBConfig{
295 DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{
296 {
297 Cluster: edsClusterName,
298 Type: clusterresolver.DiscoveryMechanismTypeEDS,
299 EDSServiceName: serviceName,
300 OutlierDetection: json.RawMessage(`{}`),
301 },
302 {
303 Cluster: dnsClusterName,
304 Type: clusterresolver.DiscoveryMechanismTypeLogicalDNS,
305 DNSHostname: fmt.Sprintf("%s:%d", dnsHostName, dnsPort),
306 OutlierDetection: json.RawMessage(`{}`),
307 },
308 },
309 XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`),
310 }
311 if err := compareLoadBalancingConfig(ctx, lbCfgCh, wantChildCfg); err != nil {
312 t.Fatal(err)
313 }
314
315 resources = e2e.UpdateOptions{
316 NodeID: nodeID,
317 Clusters: []*v3clusterpb.Cluster{
318 e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelNone),
319 makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort),
320 },
321 SkipValidation: true,
322 }
323 if err := mgmtServer.Update(ctx, resources); err != nil {
324 t.Fatal(err)
325 }
326 wantChildCfg = &clusterresolver.LBConfig{
327 DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{{
328 Cluster: clusterName,
329 Type: clusterresolver.DiscoveryMechanismTypeEDS,
330 EDSServiceName: serviceName,
331 OutlierDetection: json.RawMessage(`{}`),
332 }},
333 XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`),
334 }
335 if err := compareLoadBalancingConfig(ctx, lbCfgCh, wantChildCfg); err != nil {
336 t.Fatal(err)
337 }
338 }
339
340
341
342
343
344
345 func (s) TestAggregatedClusterSuccess_SwitchBetweenLeafAndAggregate(t *testing.T) {
346 lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t)
347 mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t)
348
349
350 resources := e2e.UpdateOptions{
351 NodeID: nodeID,
352 Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelNone)},
353 SkipValidation: true,
354 }
355 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
356 defer cancel()
357 if err := mgmtServer.Update(ctx, resources); err != nil {
358 t.Fatal(err)
359 }
360 wantChildCfg := &clusterresolver.LBConfig{
361 DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{{
362 Cluster: clusterName,
363 Type: clusterresolver.DiscoveryMechanismTypeEDS,
364 EDSServiceName: serviceName,
365 OutlierDetection: json.RawMessage(`{}`),
366 }},
367 XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`),
368 }
369 if err := compareLoadBalancingConfig(ctx, lbCfgCh, wantChildCfg); err != nil {
370 t.Fatal(err)
371 }
372
373
374
375 resources = e2e.UpdateOptions{
376 NodeID: nodeID,
377 Clusters: []*v3clusterpb.Cluster{
378 makeAggregateClusterResource(clusterName, []string{edsClusterName, dnsClusterName}),
379 e2e.DefaultCluster(edsClusterName, serviceName, e2e.SecurityLevelNone),
380 makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort),
381 },
382 SkipValidation: true,
383 }
384 if err := mgmtServer.Update(ctx, resources); err != nil {
385 t.Fatal(err)
386 }
387 wantChildCfg = &clusterresolver.LBConfig{
388 DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{
389 {
390 Cluster: edsClusterName,
391 Type: clusterresolver.DiscoveryMechanismTypeEDS,
392 EDSServiceName: serviceName,
393 OutlierDetection: json.RawMessage(`{}`),
394 },
395 {
396 Cluster: dnsClusterName,
397 Type: clusterresolver.DiscoveryMechanismTypeLogicalDNS,
398 DNSHostname: fmt.Sprintf("%s:%d", dnsHostName, dnsPort),
399 OutlierDetection: json.RawMessage(`{}`),
400 },
401 },
402 XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`),
403 }
404 if err := compareLoadBalancingConfig(ctx, lbCfgCh, wantChildCfg); err != nil {
405 t.Fatal(err)
406 }
407
408
409 resources = e2e.UpdateOptions{
410 NodeID: nodeID,
411 Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelNone)},
412 SkipValidation: true,
413 }
414 if err := mgmtServer.Update(ctx, resources); err != nil {
415 t.Fatal(err)
416 }
417 wantChildCfg = &clusterresolver.LBConfig{
418 DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{{
419 Cluster: clusterName,
420 Type: clusterresolver.DiscoveryMechanismTypeEDS,
421 EDSServiceName: serviceName,
422 OutlierDetection: json.RawMessage(`{}`),
423 }},
424 XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`),
425 }
426 if err := compareLoadBalancingConfig(ctx, lbCfgCh, wantChildCfg); err != nil {
427 t.Fatal(err)
428 }
429 }
430
431
432
433
434
435
436 func (s) TestAggregatedClusterFailure_ExceedsMaxStackDepth(t *testing.T) {
437 mgmtServer, nodeID, cc, _, _, _, _ := setupWithManagementServer(t)
438
439 resources := e2e.UpdateOptions{
440 NodeID: nodeID,
441 Clusters: []*v3clusterpb.Cluster{
442 makeAggregateClusterResource(clusterName, []string{clusterName + "-1"}),
443 makeAggregateClusterResource(clusterName+"-1", []string{clusterName + "-2"}),
444 makeAggregateClusterResource(clusterName+"-2", []string{clusterName + "-3"}),
445 makeAggregateClusterResource(clusterName+"-3", []string{clusterName + "-4"}),
446 makeAggregateClusterResource(clusterName+"-4", []string{clusterName + "-5"}),
447 makeAggregateClusterResource(clusterName+"-5", []string{clusterName + "-6"}),
448 makeAggregateClusterResource(clusterName+"-6", []string{clusterName + "-7"}),
449 makeAggregateClusterResource(clusterName+"-7", []string{clusterName + "-8"}),
450 makeAggregateClusterResource(clusterName+"-8", []string{clusterName + "-9"}),
451 makeAggregateClusterResource(clusterName+"-9", []string{clusterName + "-10"}),
452 makeAggregateClusterResource(clusterName+"-10", []string{clusterName + "-11"}),
453 makeAggregateClusterResource(clusterName+"-11", []string{clusterName + "-12"}),
454 makeAggregateClusterResource(clusterName+"-12", []string{clusterName + "-13"}),
455 makeAggregateClusterResource(clusterName+"-13", []string{clusterName + "-14"}),
456 makeAggregateClusterResource(clusterName+"-14", []string{clusterName + "-15"}),
457 makeAggregateClusterResource(clusterName+"-15", []string{clusterName + "-16"}),
458 e2e.DefaultCluster(clusterName+"-16", serviceName, e2e.SecurityLevelNone),
459 },
460 SkipValidation: true,
461 }
462 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
463 defer cancel()
464 if err := mgmtServer.Update(ctx, resources); err != nil {
465 t.Fatal(err)
466 }
467
468 testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)
469
470 const wantErr = "aggregate cluster graph exceeds max depth"
471 client := testgrpc.NewTestServiceClient(cc)
472 _, err := client.EmptyCall(ctx, &testpb.Empty{})
473 if code := status.Code(err); code != codes.Unavailable {
474 t.Fatalf("EmptyCall() failed with code: %v, want %v", code, codes.Unavailable)
475 }
476 if err != nil && !strings.Contains(err.Error(), wantErr) {
477 t.Fatalf("EmptyCall() failed with err: %v, want err containing: %v", err, wantErr)
478 }
479
480
481 server := stubserver.StartTestService(t, nil)
482 t.Cleanup(server.Stop)
483
484
485
486 resources = e2e.UpdateOptions{
487 NodeID: nodeID,
488 Clusters: []*v3clusterpb.Cluster{
489 makeAggregateClusterResource(clusterName, []string{clusterName + "-1"}),
490 makeAggregateClusterResource(clusterName+"-1", []string{clusterName + "-2"}),
491 makeAggregateClusterResource(clusterName+"-2", []string{clusterName + "-3"}),
492 makeAggregateClusterResource(clusterName+"-3", []string{clusterName + "-4"}),
493 makeAggregateClusterResource(clusterName+"-4", []string{clusterName + "-5"}),
494 makeAggregateClusterResource(clusterName+"-5", []string{clusterName + "-6"}),
495 makeAggregateClusterResource(clusterName+"-6", []string{clusterName + "-7"}),
496 makeAggregateClusterResource(clusterName+"-7", []string{clusterName + "-8"}),
497 makeAggregateClusterResource(clusterName+"-8", []string{clusterName + "-9"}),
498 makeAggregateClusterResource(clusterName+"-9", []string{clusterName + "-10"}),
499 makeAggregateClusterResource(clusterName+"-10", []string{clusterName + "-11"}),
500 makeAggregateClusterResource(clusterName+"-11", []string{clusterName + "-12"}),
501 makeAggregateClusterResource(clusterName+"-12", []string{clusterName + "-13"}),
502 makeAggregateClusterResource(clusterName+"-13", []string{clusterName + "-14"}),
503 makeAggregateClusterResource(clusterName+"-14", []string{clusterName + "-15"}),
504 e2e.DefaultCluster(clusterName+"-15", serviceName, e2e.SecurityLevelNone),
505 },
506 Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, "localhost", []uint32{testutils.ParsePort(t, server.Address)})},
507 SkipValidation: true,
508 }
509 if err := mgmtServer.Update(ctx, resources); err != nil {
510 t.Fatal(err)
511 }
512
513
514 if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
515 t.Fatalf("EmptyCall() failed: %v", err)
516 }
517 }
518
519
520
521
522
523 func (s) TestAggregatedClusterSuccess_DiamondDependency(t *testing.T) {
524 lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t)
525 mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t)
526
527
528
529
530
531
532 const (
533 clusterNameA = clusterName
534 clusterNameB = clusterName + "-B"
535 clusterNameC = clusterName + "-C"
536 clusterNameD = clusterName + "-D"
537 )
538 resources := e2e.UpdateOptions{
539 NodeID: nodeID,
540 Clusters: []*v3clusterpb.Cluster{
541 makeAggregateClusterResource(clusterNameA, []string{clusterNameB, clusterNameC}),
542 makeAggregateClusterResource(clusterNameB, []string{clusterNameD}),
543 e2e.DefaultCluster(clusterNameD, serviceName, e2e.SecurityLevelNone),
544 },
545 SkipValidation: true,
546 }
547 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
548 defer cancel()
549 if err := mgmtServer.Update(ctx, resources); err != nil {
550 t.Fatal(err)
551 }
552
553
554
555 select {
556 case cfg := <-lbCfgCh:
557 t.Fatalf("Child policy received configuration when not expected to: %s", pretty.ToJSON(cfg))
558 case <-time.After(defaultTestShortTimeout):
559 }
560
561
562
563
564 resources.Clusters = append(resources.Clusters, makeAggregateClusterResource(clusterNameC, []string{clusterNameD}))
565 if err := mgmtServer.Update(ctx, resources); err != nil {
566 t.Fatal(err)
567 }
568
569 wantChildCfg := &clusterresolver.LBConfig{
570 DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{{
571 Cluster: clusterNameD,
572 Type: clusterresolver.DiscoveryMechanismTypeEDS,
573 EDSServiceName: serviceName,
574 OutlierDetection: json.RawMessage(`{}`),
575 }},
576 XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`),
577 }
578 if err := compareLoadBalancingConfig(ctx, lbCfgCh, wantChildCfg); err != nil {
579 t.Fatal(err)
580 }
581 }
582
583
584
585
586
587
588
589 func (s) TestAggregatedClusterSuccess_IgnoreDups(t *testing.T) {
590 lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t)
591 mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t)
592
593
594
595
596
597
598 const (
599 clusterNameA = clusterName
600 clusterNameB = clusterName + "-B"
601 clusterNameC = clusterName + "-C"
602 clusterNameD = clusterName + "-D"
603 )
604 resources := e2e.UpdateOptions{
605 NodeID: nodeID,
606 Clusters: []*v3clusterpb.Cluster{
607 makeAggregateClusterResource(clusterNameA, []string{clusterNameB, clusterNameC}),
608 makeAggregateClusterResource(clusterNameB, []string{clusterNameC, clusterNameD}),
609 e2e.DefaultCluster(clusterNameD, serviceName, e2e.SecurityLevelNone),
610 },
611 SkipValidation: true,
612 }
613 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
614 defer cancel()
615 if err := mgmtServer.Update(ctx, resources); err != nil {
616 t.Fatal(err)
617 }
618
619
620
621 select {
622 case cfg := <-lbCfgCh:
623 t.Fatalf("Child policy received configuration when not expected to: %s", pretty.ToJSON(cfg))
624 case <-time.After(defaultTestShortTimeout):
625 }
626
627
628
629
630 resources.Clusters = append(resources.Clusters, e2e.DefaultCluster(clusterNameC, serviceName, e2e.SecurityLevelNone))
631 if err := mgmtServer.Update(ctx, resources); err != nil {
632 t.Fatal(err)
633 }
634
635 wantChildCfg := &clusterresolver.LBConfig{
636 DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{
637 {
638 Cluster: clusterNameC,
639 Type: clusterresolver.DiscoveryMechanismTypeEDS,
640 EDSServiceName: serviceName,
641 OutlierDetection: json.RawMessage(`{}`),
642 },
643 {
644 Cluster: clusterNameD,
645 Type: clusterresolver.DiscoveryMechanismTypeEDS,
646 EDSServiceName: serviceName,
647 OutlierDetection: json.RawMessage(`{}`),
648 },
649 },
650 XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`),
651 }
652 if err := compareLoadBalancingConfig(ctx, lbCfgCh, wantChildCfg); err != nil {
653 t.Fatal(err)
654 }
655 }
656
657
658
659
660
661
662
663
664
665 func (s) TestAggregatedCluster_NodeChildOfItself(t *testing.T) {
666 lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t)
667 mgmtServer, nodeID, cc, _, _, _, _ := setupWithManagementServer(t)
668
669 const (
670 clusterNameA = clusterName
671 clusterNameB = clusterName + "-B"
672 )
673
674
675 resources := e2e.UpdateOptions{
676 NodeID: nodeID,
677 Clusters: []*v3clusterpb.Cluster{makeAggregateClusterResource(clusterNameA, []string{clusterNameA})},
678 SkipValidation: true,
679 }
680 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
681 defer cancel()
682 if err := mgmtServer.Update(ctx, resources); err != nil {
683 t.Fatal(err)
684 }
685
686 select {
687 case cfg := <-lbCfgCh:
688 t.Fatalf("Child policy received configuration when not expected to: %s", pretty.ToJSON(cfg))
689 case <-time.After(defaultTestShortTimeout):
690 }
691
692 testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)
693
694
695 client := testgrpc.NewTestServiceClient(cc)
696 _, err := client.EmptyCall(ctx, &testpb.Empty{})
697 if gotCode, wantCode := status.Code(err), codes.Unavailable; gotCode != wantCode {
698 t.Fatalf("EmptyCall() failed with code: %v, want %v", gotCode, wantCode)
699 }
700 const wantErr = "aggregate cluster graph has no leaf clusters"
701 if !strings.Contains(err.Error(), wantErr) {
702 t.Fatalf("EmptyCall() failed with err: %v, want error containing %s", err, wantErr)
703 }
704
705
706 server := stubserver.StartTestService(t, nil)
707 t.Cleanup(server.Stop)
708
709
710 resources = e2e.UpdateOptions{
711 NodeID: nodeID,
712 Clusters: []*v3clusterpb.Cluster{
713 makeAggregateClusterResource(clusterNameA, []string{clusterNameB}),
714 e2e.DefaultCluster(clusterNameB, serviceName, e2e.SecurityLevelNone),
715 },
716 Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, "localhost", []uint32{testutils.ParsePort(t, server.Address)})},
717 SkipValidation: true,
718 }
719 if err := mgmtServer.Update(ctx, resources); err != nil {
720 t.Fatal(err)
721 }
722
723
724 wantChildCfg := &clusterresolver.LBConfig{
725 DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{{
726 Cluster: clusterNameB,
727 Type: clusterresolver.DiscoveryMechanismTypeEDS,
728 EDSServiceName: serviceName,
729 OutlierDetection: json.RawMessage(`{}`),
730 }},
731 XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`),
732 }
733 if err := compareLoadBalancingConfig(ctx, lbCfgCh, wantChildCfg); err != nil {
734 t.Fatal(err)
735 }
736
737
738 if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
739 t.Fatalf("EmptyCall() failed: %v", err)
740 }
741 }
742
743
744
745
746
747
748
749 func (s) TestAggregatedCluster_CycleWithNoLeafNode(t *testing.T) {
750 lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t)
751 mgmtServer, nodeID, cc, _, _, _, _ := setupWithManagementServer(t)
752
753 const (
754 clusterNameA = clusterName
755 clusterNameB = clusterName + "-B"
756 )
757
758
759 resources := e2e.UpdateOptions{
760 NodeID: nodeID,
761 Clusters: []*v3clusterpb.Cluster{
762 makeAggregateClusterResource(clusterNameA, []string{clusterNameB}),
763 makeAggregateClusterResource(clusterNameB, []string{clusterNameA}),
764 },
765 SkipValidation: true,
766 }
767 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
768 defer cancel()
769 if err := mgmtServer.Update(ctx, resources); err != nil {
770 t.Fatal(err)
771 }
772
773 select {
774 case cfg := <-lbCfgCh:
775 t.Fatalf("Child policy received configuration when not expected to: %s", pretty.ToJSON(cfg))
776 case <-time.After(defaultTestShortTimeout):
777 }
778
779 testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)
780
781
782 client := testgrpc.NewTestServiceClient(cc)
783 _, err := client.EmptyCall(ctx, &testpb.Empty{})
784 if gotCode, wantCode := status.Code(err), codes.Unavailable; gotCode != wantCode {
785 t.Fatalf("EmptyCall() failed with code: %v, want %v", gotCode, wantCode)
786 }
787 const wantErr = "aggregate cluster graph has no leaf clusters"
788 if !strings.Contains(err.Error(), wantErr) {
789 t.Fatalf("EmptyCall() failed with err: %v, want %s", err, wantErr)
790 }
791 }
792
793
794
795
796
797 func (s) TestAggregatedCluster_CycleWithLeafNode(t *testing.T) {
798 lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t)
799 mgmtServer, nodeID, cc, _, _, _, _ := setupWithManagementServer(t)
800
801
802 server := stubserver.StartTestService(t, nil)
803 t.Cleanup(server.Stop)
804
805 const (
806 clusterNameA = clusterName
807 clusterNameB = clusterName + "-B"
808 clusterNameC = clusterName + "-C"
809 )
810
811
812 resources := e2e.UpdateOptions{
813 NodeID: nodeID,
814 Clusters: []*v3clusterpb.Cluster{
815 makeAggregateClusterResource(clusterNameA, []string{clusterNameB}),
816 makeAggregateClusterResource(clusterNameB, []string{clusterNameA, clusterNameC}),
817 e2e.DefaultCluster(clusterNameC, serviceName, e2e.SecurityLevelNone),
818 },
819 Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, "localhost", []uint32{testutils.ParsePort(t, server.Address)})},
820 SkipValidation: true,
821 }
822 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
823 defer cancel()
824 if err := mgmtServer.Update(ctx, resources); err != nil {
825 t.Fatal(err)
826 }
827
828
829 wantChildCfg := &clusterresolver.LBConfig{
830 DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{{
831 Cluster: clusterNameC,
832 Type: clusterresolver.DiscoveryMechanismTypeEDS,
833 EDSServiceName: serviceName,
834 OutlierDetection: json.RawMessage(`{}`),
835 }},
836 XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`),
837 }
838 if err := compareLoadBalancingConfig(ctx, lbCfgCh, wantChildCfg); err != nil {
839 t.Fatal(err)
840 }
841
842
843 client := testgrpc.NewTestServiceClient(cc)
844 if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
845 t.Fatalf("EmptyCall() failed: %v", err)
846 }
847 }
848
View as plain text