1
16
17 package cdsbalancer
18
19 import (
20 "context"
21 "encoding/json"
22 "errors"
23 "fmt"
24 "strings"
25 "testing"
26 "time"
27
28 "github.com/google/go-cmp/cmp"
29 "google.golang.org/grpc"
30 "google.golang.org/grpc/balancer"
31 "google.golang.org/grpc/codes"
32 "google.golang.org/grpc/connectivity"
33 "google.golang.org/grpc/credentials/insecure"
34 "google.golang.org/grpc/internal"
35 "google.golang.org/grpc/internal/balancer/stub"
36 "google.golang.org/grpc/internal/grpctest"
37 "google.golang.org/grpc/internal/stubserver"
38 "google.golang.org/grpc/internal/testutils"
39 "google.golang.org/grpc/internal/testutils/xds/e2e"
40 "google.golang.org/grpc/internal/xds/bootstrap"
41 "google.golang.org/grpc/resolver"
42 "google.golang.org/grpc/resolver/manual"
43 "google.golang.org/grpc/serviceconfig"
44 "google.golang.org/grpc/status"
45 "google.golang.org/grpc/xds/internal/balancer/clusterresolver"
46 "google.golang.org/grpc/xds/internal/xdsclient"
47 "google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
48 "google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version"
49 "google.golang.org/protobuf/types/known/durationpb"
50 "google.golang.org/protobuf/types/known/wrapperspb"
51
52 v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
53 v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
54 v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
55 v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
56 testgrpc "google.golang.org/grpc/interop/grpc_testing"
57 testpb "google.golang.org/grpc/interop/grpc_testing"
58
59 _ "google.golang.org/grpc/xds/internal/balancer/ringhash"
60 )
61
62 const (
63 clusterName = "cluster1"
64 edsClusterName = clusterName + "-eds"
65 dnsClusterName = clusterName + "-dns"
66 serviceName = "service1"
67 dnsHostName = "dns_host"
68 dnsPort = uint32(8080)
69 defaultTestTimeout = 5 * time.Second
70 defaultTestShortTimeout = 10 * time.Millisecond
71 )
72
73 type s struct {
74 grpctest.Tester
75 }
76
77 func Test(t *testing.T) {
78 grpctest.RunSubTests(t, s{})
79 }
80
81 func waitForResourceNames(ctx context.Context, resourceNamesCh chan []string, wantNames []string) error {
82 for ctx.Err() == nil {
83 select {
84 case <-ctx.Done():
85 case gotNames := <-resourceNamesCh:
86 if cmp.Equal(gotNames, wantNames) {
87 return nil
88 }
89 }
90 }
91 if ctx.Err() != nil {
92 return fmt.Errorf("Timeout when waiting for appropriate Cluster resources to be requested")
93 }
94 return nil
95 }
96
97
98
99
100
101
102
103
104
105
106 func registerWrappedClusterResolverPolicy(t *testing.T) (chan serviceconfig.LoadBalancingConfig, chan error, chan struct{}, chan struct{}) {
107 clusterresolverBuilder := balancer.Get(clusterresolver.Name)
108 internal.BalancerUnregister(clusterresolverBuilder.Name())
109
110 lbCfgCh := make(chan serviceconfig.LoadBalancingConfig, 1)
111 resolverErrCh := make(chan error, 1)
112 exitIdleCh := make(chan struct{})
113 closeCh := make(chan struct{})
114
115 stub.Register(clusterresolver.Name, stub.BalancerFuncs{
116 Init: func(bd *stub.BalancerData) {
117 bd.Data = clusterresolverBuilder.Build(bd.ClientConn, bd.BuildOptions)
118 },
119 ParseConfig: func(lbCfg json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
120 return clusterresolverBuilder.(balancer.ConfigParser).ParseConfig(lbCfg)
121 },
122 UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
123 select {
124 case lbCfgCh <- ccs.BalancerConfig:
125 default:
126 }
127 bal := bd.Data.(balancer.Balancer)
128 return bal.UpdateClientConnState(ccs)
129 },
130 ResolverError: func(bd *stub.BalancerData, err error) {
131 select {
132 case resolverErrCh <- err:
133 default:
134 }
135 bal := bd.Data.(balancer.Balancer)
136 bal.ResolverError(err)
137 },
138 ExitIdle: func(bd *stub.BalancerData) {
139 bal := bd.Data.(balancer.Balancer)
140 bal.(balancer.ExitIdler).ExitIdle()
141 close(exitIdleCh)
142 },
143 Close: func(bd *stub.BalancerData) {
144 bal := bd.Data.(balancer.Balancer)
145 bal.Close()
146 close(closeCh)
147 },
148 })
149 t.Cleanup(func() { balancer.Register(clusterresolverBuilder) })
150
151 return lbCfgCh, resolverErrCh, exitIdleCh, closeCh
152 }
153
154
155
156
157
158
159 func registerWrappedCDSPolicy(t *testing.T) chan balancer.Balancer {
160 cdsBuilder := balancer.Get(cdsName)
161 internal.BalancerUnregister(cdsBuilder.Name())
162 cdsBalancerCh := make(chan balancer.Balancer, 1)
163 stub.Register(cdsBuilder.Name(), stub.BalancerFuncs{
164 Init: func(bd *stub.BalancerData) {
165 bal := cdsBuilder.Build(bd.ClientConn, bd.BuildOptions)
166 bd.Data = bal
167 cdsBalancerCh <- bal
168 },
169 ParseConfig: func(lbCfg json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
170 return cdsBuilder.(balancer.ConfigParser).ParseConfig(lbCfg)
171 },
172 UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
173 bal := bd.Data.(balancer.Balancer)
174 return bal.UpdateClientConnState(ccs)
175 },
176 Close: func(bd *stub.BalancerData) {
177 bal := bd.Data.(balancer.Balancer)
178 bal.Close()
179 },
180 })
181 t.Cleanup(func() { balancer.Register(cdsBuilder) })
182
183 return cdsBalancerCh
184 }
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202 func setupWithManagementServer(t *testing.T) (*e2e.ManagementServer, string, *grpc.ClientConn, *manual.Resolver, xdsclient.XDSClient, chan []string, chan struct{}) {
203 t.Helper()
204
205 cdsResourceRequestedCh := make(chan []string, 1)
206 cdsResourceCanceledCh := make(chan struct{}, 1)
207 mgmtServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{
208 OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error {
209 if req.GetTypeUrl() == version.V3ClusterURL {
210 switch len(req.GetResourceNames()) {
211 case 0:
212 select {
213 case cdsResourceCanceledCh <- struct{}{}:
214 default:
215 }
216 default:
217 select {
218 case cdsResourceRequestedCh <- req.GetResourceNames():
219 default:
220 }
221 }
222 }
223 return nil
224 },
225
226
227 AllowResourceSubset: true,
228 })
229 t.Cleanup(cleanup)
230
231 xdsC, xdsClose, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
232 if err != nil {
233 t.Fatalf("Failed to create xDS client: %v", err)
234 }
235 t.Cleanup(xdsClose)
236
237 r := manual.NewBuilderWithScheme("whatever")
238 jsonSC := fmt.Sprintf(`{
239 "loadBalancingConfig":[{
240 "cds_experimental":{
241 "cluster": "%s"
242 }
243 }]
244 }`, clusterName)
245 scpr := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC)
246 r.InitialState(xdsclient.SetClient(resolver.State{ServiceConfig: scpr}, xdsC))
247
248 cc, err := grpc.Dial(r.Scheme()+":///test.service", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
249 if err != nil {
250 t.Fatalf("Failed to dial local test server: %v", err)
251 }
252 t.Cleanup(func() { cc.Close() })
253
254 return mgmtServer, nodeID, cc, r, xdsC, cdsResourceRequestedCh, cdsResourceCanceledCh
255 }
256
257
258
259
260
261
262
263
264 func compareLoadBalancingConfig(ctx context.Context, lbCfgCh chan serviceconfig.LoadBalancingConfig, wantChildCfg serviceconfig.LoadBalancingConfig) error {
265 wantJSON, err := json.Marshal(wantChildCfg)
266 if err != nil {
267 return fmt.Errorf("failed to marshal expected child config to JSON: %v", err)
268 }
269 select {
270 case lbCfg := <-lbCfgCh:
271 gotJSON, err := json.Marshal(lbCfg)
272 if err != nil {
273 return fmt.Errorf("failed to marshal received LB config into JSON: %v", err)
274 }
275 if diff := cmp.Diff(wantJSON, gotJSON); diff != "" {
276 return fmt.Errorf("child policy received unexpected diff in config (-want +got):\n%s", diff)
277 }
278 case <-ctx.Done():
279 return fmt.Errorf("timeout when waiting for child policy to receive its configuration")
280 }
281 return nil
282 }
283
284
285
286
287
288
289
290 func (s) TestConfigurationUpdate_Success(t *testing.T) {
291 _, _, _, r, xdsClient, cdsResourceRequestedCh, _ := setupWithManagementServer(t)
292
293
294 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
295 defer cancel()
296 wantNames := []string{clusterName}
297 if err := waitForResourceNames(ctx, cdsResourceRequestedCh, wantNames); err != nil {
298 t.Fatal(err)
299 }
300
301
302 jsonSC := fmt.Sprintf(`{
303 "loadBalancingConfig":[{
304 "cds_experimental":{
305 "cluster": "%s"
306 }
307 }]
308 }`, clusterName)
309 scpr := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC)
310 r.UpdateState(xdsclient.SetClient(resolver.State{ServiceConfig: scpr}, xdsClient))
311
312
313 sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
314 defer sCancel()
315 select {
316 case <-sCtx.Done():
317 case gotNames := <-cdsResourceRequestedCh:
318 t.Fatalf("CDS resources %v requested when none expected", gotNames)
319 }
320
321
322 newClusterName := clusterName + "-new"
323 jsonSC = fmt.Sprintf(`{
324 "loadBalancingConfig":[{
325 "cds_experimental":{
326 "cluster": "%s"
327 }
328 }]
329 }`, newClusterName)
330 scpr = internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC)
331 r.UpdateState(xdsclient.SetClient(resolver.State{ServiceConfig: scpr}, xdsClient))
332
333
334
335 wantNames = []string{newClusterName}
336 if err := waitForResourceNames(ctx, cdsResourceRequestedCh, wantNames); err != nil {
337 t.Fatal(err)
338 }
339 }
340
341
342
343 func (s) TestConfigurationUpdate_EmptyCluster(t *testing.T) {
344
345 _, _, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
346 t.Cleanup(cleanup)
347 xdsClient, xdsClose, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents)
348 if err != nil {
349 t.Fatalf("Failed to create xDS client: %v", err)
350 }
351 t.Cleanup(xdsClose)
352
353
354
355
356
357
358 r := manual.NewBuilderWithScheme("whatever")
359 updateStateErrCh := make(chan error, 1)
360 r.UpdateStateCallback = func(err error) { updateStateErrCh <- err }
361 jsonSC := `{
362 "loadBalancingConfig":[{
363 "cds_experimental":{
364 "cluster": ""
365 }
366 }]
367 }`
368 scpr := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC)
369 r.InitialState(xdsclient.SetClient(resolver.State{ServiceConfig: scpr}, xdsClient))
370
371
372 cc, err := grpc.Dial(r.Scheme()+":///test.service", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
373 if err != nil {
374 t.Fatalf("Failed to dial: %v", err)
375 }
376 t.Cleanup(func() { cc.Close() })
377
378 select {
379 case <-time.After(defaultTestTimeout):
380 t.Fatalf("Timed out waiting for error from the LB policy")
381 case err := <-updateStateErrCh:
382 if err != balancer.ErrBadResolverState {
383 t.Fatalf("For a configuration update with an empty cluster name, got error %v from the LB policy, want %v", err, balancer.ErrBadResolverState)
384 }
385 }
386 }
387
388
389
390 func (s) TestConfigurationUpdate_MissingXdsClient(t *testing.T) {
391
392
393
394
395 r := manual.NewBuilderWithScheme("whatever")
396 updateStateErrCh := make(chan error, 1)
397 r.UpdateStateCallback = func(err error) { updateStateErrCh <- err }
398 jsonSC := `{
399 "loadBalancingConfig":[{
400 "cds_experimental":{
401 "cluster": "foo"
402 }
403 }]
404 }`
405 scpr := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(jsonSC)
406 r.InitialState(resolver.State{ServiceConfig: scpr})
407
408
409 cc, err := grpc.Dial(r.Scheme()+":///test.service", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
410 if err != nil {
411 t.Fatalf("Failed to dial: %v", err)
412 }
413 t.Cleanup(func() { cc.Close() })
414
415 select {
416 case <-time.After(defaultTestTimeout):
417 t.Fatalf("Timed out waiting for error from the LB policy")
418 case err := <-updateStateErrCh:
419 if err != balancer.ErrBadResolverState {
420 t.Fatalf("For a configuration update missing the xDS client, got error %v from the LB policy, want %v", err, balancer.ErrBadResolverState)
421 }
422 }
423 }
424
425
426
427
428 func (s) TestClusterUpdate_Success(t *testing.T) {
429 tests := []struct {
430 name string
431 clusterResource *v3clusterpb.Cluster
432 wantChildCfg serviceconfig.LoadBalancingConfig
433 }{
434 {
435 name: "happy-case-with-circuit-breakers",
436 clusterResource: func() *v3clusterpb.Cluster {
437 c := e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelNone)
438 c.CircuitBreakers = &v3clusterpb.CircuitBreakers{
439 Thresholds: []*v3clusterpb.CircuitBreakers_Thresholds{
440 {
441 Priority: v3corepb.RoutingPriority_DEFAULT,
442 MaxRequests: wrapperspb.UInt32(512),
443 },
444 {
445 Priority: v3corepb.RoutingPriority_HIGH,
446 MaxRequests: nil,
447 },
448 },
449 }
450 return c
451 }(),
452 wantChildCfg: &clusterresolver.LBConfig{
453 DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{{
454 Cluster: clusterName,
455 Type: clusterresolver.DiscoveryMechanismTypeEDS,
456 EDSServiceName: serviceName,
457 MaxConcurrentRequests: newUint32(512),
458 OutlierDetection: json.RawMessage(`{}`),
459 }},
460 XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`),
461 },
462 },
463 {
464 name: "happy-case-with-ring-hash-lb-policy",
465 clusterResource: func() *v3clusterpb.Cluster {
466 c := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{
467 ClusterName: clusterName,
468 ServiceName: serviceName,
469 SecurityLevel: e2e.SecurityLevelNone,
470 Policy: e2e.LoadBalancingPolicyRingHash,
471 })
472 c.LbConfig = &v3clusterpb.Cluster_RingHashLbConfig_{
473 RingHashLbConfig: &v3clusterpb.Cluster_RingHashLbConfig{
474 MinimumRingSize: &wrapperspb.UInt64Value{Value: 100},
475 MaximumRingSize: &wrapperspb.UInt64Value{Value: 1000},
476 },
477 }
478 return c
479 }(),
480 wantChildCfg: &clusterresolver.LBConfig{
481 DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{{
482 Cluster: clusterName,
483 Type: clusterresolver.DiscoveryMechanismTypeEDS,
484 EDSServiceName: serviceName,
485 OutlierDetection: json.RawMessage(`{}`),
486 }},
487 XDSLBPolicy: json.RawMessage(`[{"ring_hash_experimental": {"minRingSize":100, "maxRingSize":1000}}]`),
488 },
489 },
490 {
491 name: "happy-case-outlier-detection-xds-defaults",
492 clusterResource: func() *v3clusterpb.Cluster {
493 c := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{
494 ClusterName: clusterName,
495 ServiceName: serviceName,
496 SecurityLevel: e2e.SecurityLevelNone,
497 Policy: e2e.LoadBalancingPolicyRingHash,
498 })
499 c.OutlierDetection = &v3clusterpb.OutlierDetection{}
500 return c
501 }(),
502 wantChildCfg: &clusterresolver.LBConfig{
503 DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{{
504 Cluster: clusterName,
505 Type: clusterresolver.DiscoveryMechanismTypeEDS,
506 EDSServiceName: serviceName,
507 OutlierDetection: json.RawMessage(`{"successRateEjection":{}}`),
508 }},
509 XDSLBPolicy: json.RawMessage(`[{"ring_hash_experimental": {"minRingSize":1024, "maxRingSize":8388608}}]`),
510 },
511 },
512 {
513 name: "happy-case-outlier-detection-all-fields-set",
514 clusterResource: func() *v3clusterpb.Cluster {
515 c := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{
516 ClusterName: clusterName,
517 ServiceName: serviceName,
518 SecurityLevel: e2e.SecurityLevelNone,
519 Policy: e2e.LoadBalancingPolicyRingHash,
520 })
521 c.OutlierDetection = &v3clusterpb.OutlierDetection{
522 Interval: durationpb.New(10 * time.Second),
523 BaseEjectionTime: durationpb.New(30 * time.Second),
524 MaxEjectionTime: durationpb.New(300 * time.Second),
525 MaxEjectionPercent: wrapperspb.UInt32(10),
526 SuccessRateStdevFactor: wrapperspb.UInt32(1900),
527 EnforcingSuccessRate: wrapperspb.UInt32(100),
528 SuccessRateMinimumHosts: wrapperspb.UInt32(5),
529 SuccessRateRequestVolume: wrapperspb.UInt32(100),
530 FailurePercentageThreshold: wrapperspb.UInt32(85),
531 EnforcingFailurePercentage: wrapperspb.UInt32(5),
532 FailurePercentageMinimumHosts: wrapperspb.UInt32(5),
533 FailurePercentageRequestVolume: wrapperspb.UInt32(50),
534 }
535 return c
536 }(),
537 wantChildCfg: &clusterresolver.LBConfig{
538 DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{{
539 Cluster: clusterName,
540 Type: clusterresolver.DiscoveryMechanismTypeEDS,
541 EDSServiceName: serviceName,
542 OutlierDetection: json.RawMessage(`{
543 "interval": "10s",
544 "baseEjectionTime": "30s",
545 "maxEjectionTime": "300s",
546 "maxEjectionPercent": 10,
547 "successRateEjection": {
548 "stdevFactor": 1900,
549 "enforcementPercentage": 100,
550 "minimumHosts": 5,
551 "requestVolume": 100
552 },
553 "failurePercentageEjection": {
554 "threshold": 85,
555 "enforcementPercentage": 5,
556 "minimumHosts": 5,
557 "requestVolume": 50
558 }
559 }`),
560 }},
561 XDSLBPolicy: json.RawMessage(`[{"ring_hash_experimental": {"minRingSize":1024, "maxRingSize":8388608}}]`),
562 },
563 },
564 }
565
566 for _, test := range tests {
567 t.Run(test.name, func(t *testing.T) {
568 lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t)
569 mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t)
570
571 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
572 defer cancel()
573 if err := mgmtServer.Update(ctx, e2e.UpdateOptions{
574 NodeID: nodeID,
575 Clusters: []*v3clusterpb.Cluster{test.clusterResource},
576 SkipValidation: true,
577 }); err != nil {
578 t.Fatal(err)
579 }
580
581 if err := compareLoadBalancingConfig(ctx, lbCfgCh, test.wantChildCfg); err != nil {
582 t.Fatal(err)
583 }
584 })
585 }
586 }
587
588
589
590
591 func (s) TestClusterUpdate_SuccessWithLRS(t *testing.T) {
592 lbCfgCh, _, _, _ := registerWrappedClusterResolverPolicy(t)
593 mgmtServer, nodeID, _, _, _, _, _ := setupWithManagementServer(t)
594
595 clusterResource := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{
596 ClusterName: clusterName,
597 ServiceName: serviceName,
598 EnableLRS: true,
599 })
600 wantChildCfg := &clusterresolver.LBConfig{
601 DiscoveryMechanisms: []clusterresolver.DiscoveryMechanism{{
602 Cluster: clusterName,
603 Type: clusterresolver.DiscoveryMechanismTypeEDS,
604 EDSServiceName: serviceName,
605 LoadReportingServer: &bootstrap.ServerConfig{
606 ServerURI: mgmtServer.Address,
607 Creds: bootstrap.ChannelCreds{Type: "insecure"},
608 },
609 OutlierDetection: json.RawMessage(`{}`),
610 }},
611 XDSLBPolicy: json.RawMessage(`[{"xds_wrr_locality_experimental": {"childPolicy": [{"round_robin": {}}]}}]`),
612 }
613
614 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
615 defer cancel()
616 if err := mgmtServer.Update(ctx, e2e.UpdateOptions{
617 NodeID: nodeID,
618 Clusters: []*v3clusterpb.Cluster{clusterResource},
619 SkipValidation: true,
620 }); err != nil {
621 t.Fatal(err)
622 }
623
624 if err := compareLoadBalancingConfig(ctx, lbCfgCh, wantChildCfg); err != nil {
625 t.Fatal(err)
626 }
627 }
628
629
630
631
632
633
634
635
636
637
638
639
640 func (s) TestClusterUpdate_Failure(t *testing.T) {
641 _, resolverErrCh, _, _ := registerWrappedClusterResolverPolicy(t)
642 mgmtServer, nodeID, cc, _, _, cdsResourceRequestedCh, cdsResourceCanceledCh := setupWithManagementServer(t)
643
644
645 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
646 defer cancel()
647 wantNames := []string{clusterName}
648 if err := waitForResourceNames(ctx, cdsResourceRequestedCh, wantNames); err != nil {
649 t.Fatal(err)
650 }
651
652
653
654
655 cluster := e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelNone)
656 cluster.LrsServer = &v3corepb.ConfigSource{ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{}}
657 resources := e2e.UpdateOptions{
658 NodeID: nodeID,
659 Clusters: []*v3clusterpb.Cluster{cluster},
660 SkipValidation: true,
661 }
662 if err := mgmtServer.Update(ctx, resources); err != nil {
663 t.Fatal(err)
664 }
665
666
667 sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
668 defer sCancel()
669 select {
670 case <-sCtx.Done():
671 case <-cdsResourceCanceledCh:
672 t.Fatal("Watch for cluster resource is cancelled when not expected to")
673 }
674
675 testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)
676
677
678 const wantClusterNACKErr = "unsupported config_source_specifier"
679 client := testgrpc.NewTestServiceClient(cc)
680 _, err := client.EmptyCall(ctx, &testpb.Empty{})
681 if code := status.Code(err); code != codes.Unavailable {
682 t.Fatalf("EmptyCall() failed with code: %v, want %v", code, codes.Unavailable)
683 }
684 if err != nil && !strings.Contains(err.Error(), wantClusterNACKErr) {
685 t.Fatalf("EmptyCall() failed with err: %v, want err containing: %v", err, wantClusterNACKErr)
686 }
687
688
689 server := stubserver.StartTestService(t, nil)
690 t.Cleanup(server.Stop)
691
692
693 resources = e2e.UpdateOptions{
694 NodeID: nodeID,
695 Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelNone)},
696 Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, "localhost", []uint32{testutils.ParsePort(t, server.Address)})},
697 SkipValidation: true,
698 }
699 if err := mgmtServer.Update(ctx, resources); err != nil {
700 t.Fatal(err)
701 }
702
703
704 if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
705 t.Fatalf("EmptyCall() failed: %v", err)
706 }
707
708
709 resources = e2e.UpdateOptions{
710 NodeID: nodeID,
711 Clusters: []*v3clusterpb.Cluster{cluster},
712 Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, "localhost", []uint32{testutils.ParsePort(t, server.Address)})},
713 SkipValidation: true,
714 }
715 if err := mgmtServer.Update(ctx, resources); err != nil {
716 t.Fatal(err)
717 }
718
719
720 sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortTimeout)
721 defer sCancel()
722 select {
723 case <-sCtx.Done():
724 case <-cdsResourceCanceledCh:
725 t.Fatal("Watch for cluster resource is cancelled when not expected to")
726 }
727
728
729
730 if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
731 t.Fatalf("EmptyCall() failed: %v", err)
732 }
733
734
735 select {
736 case err := <-resolverErrCh:
737 if !strings.Contains(err.Error(), wantClusterNACKErr) {
738 t.Fatalf("Error pushed to child policy is %v, want %v", err, wantClusterNACKErr)
739 }
740 case <-ctx.Done():
741 t.Fatal("Timeout when waiting for resolver error to be pushed to the child policy")
742 }
743
744
745
746 resources = e2e.UpdateOptions{
747 NodeID: nodeID,
748 SkipValidation: true,
749 }
750 if err := mgmtServer.Update(ctx, resources); err != nil {
751 t.Fatal(err)
752 }
753
754
755 sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortTimeout)
756 defer sCancel()
757 select {
758 case <-sCtx.Done():
759 case <-cdsResourceCanceledCh:
760 t.Fatal("Watch for cluster resource is cancelled when not expected to")
761 }
762
763 testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)
764
765
766
767
768 if _, err := client.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Unavailable {
769 t.Fatalf("EmptyCall() failed with code: %v, want %v", status.Code(err), codes.Unavailable)
770 }
771 }
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786 func (s) TestResolverError(t *testing.T) {
787 _, resolverErrCh, _, _ := registerWrappedClusterResolverPolicy(t)
788 mgmtServer, nodeID, cc, r, _, cdsResourceRequestedCh, cdsResourceCanceledCh := setupWithManagementServer(t)
789
790
791 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
792 defer cancel()
793 wantNames := []string{clusterName}
794 if err := waitForResourceNames(ctx, cdsResourceRequestedCh, wantNames); err != nil {
795 t.Fatal(err)
796 }
797
798
799 resolverErr := errors.New("resolver-error-not-a-resource-not-found-error")
800 r.ReportError(resolverErr)
801
802 testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)
803
804
805 select {
806 case <-resolverErrCh:
807 default:
808 }
809
810
811 client := testgrpc.NewTestServiceClient(cc)
812 _, err := client.EmptyCall(ctx, &testpb.Empty{})
813 if code := status.Code(err); code != codes.Unavailable {
814 t.Fatalf("EmptyCall() failed with code: %v, want %v", code, codes.Unavailable)
815 }
816 if err != nil && !strings.Contains(err.Error(), resolverErr.Error()) {
817 t.Fatalf("EmptyCall() failed with err: %v, want %v", err, resolverErr)
818 }
819
820
821 sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
822 defer sCancel()
823 select {
824 case <-sCtx.Done():
825 case <-cdsResourceCanceledCh:
826 t.Fatal("Watch for cluster resource is cancelled when not expected to")
827 }
828
829
830 server := stubserver.StartTestService(t, nil)
831 t.Cleanup(server.Stop)
832
833
834 resources := e2e.UpdateOptions{
835 NodeID: nodeID,
836 Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelNone)},
837 Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, "localhost", []uint32{testutils.ParsePort(t, server.Address)})},
838 SkipValidation: true,
839 }
840 if err := mgmtServer.Update(ctx, resources); err != nil {
841 t.Fatal(err)
842 }
843
844
845 if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
846 t.Fatalf("EmptyCall() failed: %v", err)
847 }
848
849
850 r.ReportError(resolverErr)
851
852
853
854 sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortTimeout)
855 defer sCancel()
856 select {
857 case <-sCtx.Done():
858 case <-cdsResourceCanceledCh:
859 t.Fatal("Watch for cluster resource is cancelled when not expected to")
860 }
861
862
863
864 if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
865 t.Fatalf("EmptyCall() failed: %v", err)
866 }
867
868
869 select {
870 case err := <-resolverErrCh:
871 if err != resolverErr {
872 t.Fatalf("Error pushed to child policy is %v, want %v", err, resolverErr)
873 }
874 case <-ctx.Done():
875 t.Fatal("Timeout when waiting for resolver error to be pushed to the child policy")
876 }
877
878
879 resolverErr = xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "xds resource not found error")
880 r.ReportError(resolverErr)
881
882
883 select {
884 case <-ctx.Done():
885 t.Fatal("Timeout when waiting for CDS resource to be not requested")
886 case <-cdsResourceCanceledCh:
887 }
888
889
890 select {
891 case err := <-resolverErrCh:
892 if err != resolverErr {
893 t.Fatalf("Error pushed to child policy is %v, want %v", err, resolverErr)
894 }
895 case <-ctx.Done():
896 t.Fatal("Timeout when waiting for resolver error to be pushed to the child policy")
897 }
898
899 testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)
900
901
902
903
904 if _, err := client.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Unavailable {
905 t.Fatalf("EmptyCall() failed with code: %v, want %v", status.Code(err), codes.Unavailable)
906 }
907 }
908
909
910
911 func (s) TestClose(t *testing.T) {
912 cdsBalancerCh := registerWrappedCDSPolicy(t)
913 _, _, _, childPolicyCloseCh := registerWrappedClusterResolverPolicy(t)
914 mgmtServer, nodeID, cc, _, _, _, cdsResourceCanceledCh := setupWithManagementServer(t)
915
916
917 server := stubserver.StartTestService(t, nil)
918 t.Cleanup(server.Stop)
919
920
921 resources := e2e.UpdateOptions{
922 NodeID: nodeID,
923 Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelNone)},
924 Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, "localhost", []uint32{testutils.ParsePort(t, server.Address)})},
925 SkipValidation: true,
926 }
927 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
928 defer cancel()
929 if err := mgmtServer.Update(ctx, resources); err != nil {
930 t.Fatal(err)
931 }
932
933
934 client := testgrpc.NewTestServiceClient(cc)
935 if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
936 t.Fatalf("EmptyCall() failed: %v", err)
937 }
938
939
940 var cdsBal balancer.Balancer
941 select {
942 case cdsBal = <-cdsBalancerCh:
943 case <-ctx.Done():
944 t.Fatal("Timeout when waiting for cds LB policy to be created")
945 }
946 cdsBal.Close()
947
948
949 select {
950 case <-ctx.Done():
951 t.Fatal("Timeout when waiting for CDS resource to be not requested")
952 case <-cdsResourceCanceledCh:
953 }
954
955 select {
956 case <-ctx.Done():
957 t.Fatal("Timeout when waiting for the child policy to be closed")
958 case <-childPolicyCloseCh:
959 }
960 }
961
962
963
964 func (s) TestExitIdle(t *testing.T) {
965 cdsBalancerCh := registerWrappedCDSPolicy(t)
966 _, _, exitIdleCh, _ := registerWrappedClusterResolverPolicy(t)
967 mgmtServer, nodeID, cc, _, _, _, _ := setupWithManagementServer(t)
968
969
970 server := stubserver.StartTestService(t, nil)
971 t.Cleanup(server.Stop)
972
973
974 resources := e2e.UpdateOptions{
975 NodeID: nodeID,
976 Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, serviceName, e2e.SecurityLevelNone)},
977 Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(serviceName, "localhost", []uint32{testutils.ParsePort(t, server.Address)})},
978 SkipValidation: true,
979 }
980 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
981 defer cancel()
982 if err := mgmtServer.Update(ctx, resources); err != nil {
983 t.Fatal(err)
984 }
985
986
987 client := testgrpc.NewTestServiceClient(cc)
988 if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
989 t.Fatalf("EmptyCall() failed: %v", err)
990 }
991
992
993 var cdsBal balancer.Balancer
994 select {
995 case cdsBal = <-cdsBalancerCh:
996 case <-ctx.Done():
997 t.Fatal("Timeout when waiting for cds LB policy to be created")
998 }
999 cdsBal.(balancer.ExitIdler).ExitIdle()
1000
1001
1002 select {
1003 case <-ctx.Done():
1004 t.Fatal("Timeout when waiting for the child policy to be closed")
1005 case <-exitIdleCh:
1006 }
1007 }
1008
1009
1010 func (s) TestParseConfig(t *testing.T) {
1011 bb := balancer.Get(cdsName)
1012 if bb == nil {
1013 t.Fatalf("balancer.Get(%q) returned nil", cdsName)
1014 }
1015 parser, ok := bb.(balancer.ConfigParser)
1016 if !ok {
1017 t.Fatalf("balancer %q does not implement the ConfigParser interface", cdsName)
1018 }
1019
1020 tests := []struct {
1021 name string
1022 input json.RawMessage
1023 wantCfg serviceconfig.LoadBalancingConfig
1024 wantErr bool
1025 }{
1026 {
1027 name: "good-config",
1028 input: json.RawMessage(`{"Cluster": "cluster1"}`),
1029 wantCfg: &lbConfig{ClusterName: "cluster1"},
1030 },
1031 {
1032 name: "unknown-fields-in-config",
1033 input: json.RawMessage(`{"Unknown": "foobar"}`),
1034 wantCfg: &lbConfig{ClusterName: ""},
1035 },
1036 {
1037 name: "empty-config",
1038 input: json.RawMessage(""),
1039 wantErr: true,
1040 },
1041 {
1042 name: "bad-config",
1043 input: json.RawMessage(`{"Cluster": 5}`),
1044 wantErr: true,
1045 },
1046 }
1047
1048 for _, test := range tests {
1049 t.Run(test.name, func(t *testing.T) {
1050 gotCfg, gotErr := parser.ParseConfig(test.input)
1051 if (gotErr != nil) != test.wantErr {
1052 t.Fatalf("ParseConfig(%v) = %v, wantErr %v", string(test.input), gotErr, test.wantErr)
1053 }
1054 if test.wantErr {
1055 return
1056 }
1057 if !cmp.Equal(gotCfg, test.wantCfg) {
1058 t.Fatalf("ParseConfig(%v) = %v, want %v", string(test.input), gotCfg, test.wantCfg)
1059 }
1060 })
1061 }
1062 }
1063
1064 func newUint32(i uint32) *uint32 {
1065 return &i
1066 }
1067
View as plain text