1
18
19 package resolver_test
20
21 import (
22 "context"
23 "fmt"
24 "strings"
25 "testing"
26 "time"
27
28 xxhash "github.com/cespare/xxhash/v2"
29 "github.com/envoyproxy/go-control-plane/pkg/wellknown"
30 "github.com/google/go-cmp/cmp"
31 "github.com/google/uuid"
32 "google.golang.org/grpc/codes"
33 "google.golang.org/grpc/internal"
34 "google.golang.org/grpc/internal/grpcsync"
35 iresolver "google.golang.org/grpc/internal/resolver"
36 "google.golang.org/grpc/internal/testutils"
37 xdsbootstrap "google.golang.org/grpc/internal/testutils/xds/bootstrap"
38 "google.golang.org/grpc/internal/testutils/xds/e2e"
39 "google.golang.org/grpc/internal/xds/bootstrap"
40 "google.golang.org/grpc/metadata"
41 "google.golang.org/grpc/resolver"
42 "google.golang.org/grpc/serviceconfig"
43 "google.golang.org/grpc/status"
44 "google.golang.org/grpc/xds/internal/balancer/clustermanager"
45 "google.golang.org/grpc/xds/internal/balancer/ringhash"
46 "google.golang.org/grpc/xds/internal/httpfilter"
47 xdsresolver "google.golang.org/grpc/xds/internal/resolver"
48 rinternal "google.golang.org/grpc/xds/internal/resolver/internal"
49 xdstestutils "google.golang.org/grpc/xds/internal/testutils"
50 "google.golang.org/grpc/xds/internal/xdsclient"
51 "google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version"
52 "google.golang.org/protobuf/proto"
53 "google.golang.org/protobuf/types/known/anypb"
54 "google.golang.org/protobuf/types/known/durationpb"
55 "google.golang.org/protobuf/types/known/structpb"
56 "google.golang.org/protobuf/types/known/wrapperspb"
57
58 v3xdsxdstypepb "github.com/cncf/xds/go/xds/type/v3"
59 v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
60 v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
61 v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
62 v3routerpb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/router/v3"
63 v3httppb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
64 v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
65
66 _ "google.golang.org/grpc/xds/internal/balancer/cdsbalancer"
67 _ "google.golang.org/grpc/xds/internal/httpfilter/router"
68 )
69
70
71
72
73 func (s) TestResolverBuilder_ClientCreationFails_NoBootstrap(t *testing.T) {
74
75 builder := resolver.Get(xdsresolver.Scheme)
76 if builder == nil {
77 t.Fatalf("Scheme %q is not registered", xdsresolver.Scheme)
78 }
79
80 target := resolver.Target{URL: *testutils.MustParseURL("xds:///target")}
81 if _, err := builder.Build(target, nil, resolver.BuildOptions{}); err == nil {
82 t.Fatalf("xds Resolver Build(%v) succeeded when expected to fail, because there is not bootstrap configuration for the xDS client", target)
83 }
84 }
85
86
87
88
89 func (s) TestResolverBuilder_AuthorityNotDefinedInBootstrap(t *testing.T) {
90 bootstrapCleanup, err := xdsbootstrap.CreateFile(xdsbootstrap.Options{
91 NodeID: "node-id",
92 ServerURI: "dummy-management-server",
93 })
94 if err != nil {
95 t.Fatal(err)
96 }
97 defer bootstrapCleanup()
98
99 builder := resolver.Get(xdsresolver.Scheme)
100 if builder == nil {
101 t.Fatalf("Scheme %q is not registered", xdsresolver.Scheme)
102 }
103
104 target := resolver.Target{URL: *testutils.MustParseURL("xds://non-existing-authority/target")}
105 const wantErr = `authority "non-existing-authority" specified in dial target "xds://non-existing-authority/target" is not found in the bootstrap file`
106
107 r, err := builder.Build(target, &testutils.ResolverClientConn{Logger: t}, resolver.BuildOptions{})
108 if r != nil {
109 r.Close()
110 }
111 if err == nil {
112 t.Fatalf("xds Resolver Build(%v) succeeded for target with authority not specified in bootstrap", target)
113 }
114 if !strings.Contains(err.Error(), wantErr) {
115 t.Fatalf("xds Resolver Build(%v) returned err: %v, wantErr: %v", target, err, wantErr)
116 }
117 }
118
119
120
121 func (s) TestResolverResourceName(t *testing.T) {
122 tests := []struct {
123 name string
124 listenerResourceNameTemplate string
125 extraAuthority string
126 dialTarget string
127 wantResourceNames []string
128 }{
129 {
130 name: "default %s old style",
131 listenerResourceNameTemplate: "%s",
132 dialTarget: "xds:///target",
133 wantResourceNames: []string{"target"},
134 },
135 {
136 name: "old style no percent encoding",
137 listenerResourceNameTemplate: "/path/to/%s",
138 dialTarget: "xds:///target",
139 wantResourceNames: []string{"/path/to/target"},
140 },
141 {
142 name: "new style with %s",
143 listenerResourceNameTemplate: "xdstp://authority.com/%s",
144 dialTarget: "xds:///0.0.0.0:8080",
145 wantResourceNames: []string{"xdstp://authority.com/0.0.0.0:8080"},
146 },
147 {
148 name: "new style percent encoding",
149 listenerResourceNameTemplate: "xdstp://authority.com/%s",
150 dialTarget: "xds:///[::1]:8080",
151 wantResourceNames: []string{"xdstp://authority.com/%5B::1%5D:8080"},
152 },
153 {
154 name: "new style different authority",
155 listenerResourceNameTemplate: "xdstp://authority.com/%s",
156 extraAuthority: "test-authority",
157 dialTarget: "xds://test-authority/target",
158 wantResourceNames: []string{"xdstp://test-authority/envoy.config.listener.v3.Listener/target"},
159 },
160 }
161 for _, tt := range tests {
162 t.Run(tt.name, func(t *testing.T) {
163
164 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
165 defer cancel()
166 nodeID := uuid.New().String()
167 mgmtServer, lisCh, _ := setupManagementServerForTest(ctx, t, nodeID)
168
169
170 opts := xdsbootstrap.Options{
171 ServerURI: mgmtServer.Address,
172 ClientDefaultListenerResourceNameTemplate: tt.listenerResourceNameTemplate,
173 }
174 if tt.extraAuthority != "" {
175
176
177
178 opts.Authorities = map[string]string{
179 tt.extraAuthority: mgmtServer.Address,
180 }
181 }
182 cleanup, err := xdsbootstrap.CreateFile(opts)
183 if err != nil {
184 t.Fatal(err)
185 }
186 defer cleanup()
187
188 buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL(tt.dialTarget)})
189 waitForResourceNames(ctx, t, lisCh, tt.wantResourceNames)
190 })
191 }
192 }
193
194
195
196
197 func (s) TestResolverWatchCallbackAfterClose(t *testing.T) {
198
199
200
201
202
203 routeConfigResourceNamesCh := make(chan []string, 1)
204 waitForResolverCloseCh := make(chan struct{})
205 mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{
206 OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error {
207 if req.GetTypeUrl() == version.V3RouteConfigURL {
208 select {
209 case <-routeConfigResourceNamesCh:
210 default:
211 }
212 select {
213 case routeConfigResourceNamesCh <- req.GetResourceNames():
214 default:
215 }
216 <-waitForResolverCloseCh
217 }
218 return nil
219 },
220 })
221 if err != nil {
222 t.Fatalf("Failed to start xDS management server: %v", err)
223 }
224 defer mgmtServer.Stop()
225
226
227 nodeID := uuid.New().String()
228 cleanup, err := xdsbootstrap.CreateFile(xdsbootstrap.Options{
229 NodeID: nodeID,
230 ServerURI: mgmtServer.Address,
231 })
232 if err != nil {
233 t.Fatal(err)
234 }
235 defer cleanup()
236
237
238 listeners := []*v3listenerpb.Listener{e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)}
239 routes := []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName)}
240 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
241 defer cancel()
242 configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes)
243
244
245 stateCh, _, r := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)})
246 waitForResourceNames(ctx, t, routeConfigResourceNamesCh, []string{defaultTestRouteConfigName})
247
248
249 r.Close()
250 close(waitForResolverCloseCh)
251
252
253
254
255 verifyNoUpdateFromResolver(ctx, t, stateCh)
256 }
257
258
259 func (s) TestResolverCloseClosesXDSClient(t *testing.T) {
260 bootstrapCfg := &bootstrap.Config{
261 XDSServer: xdstestutils.ServerConfigForAddress(t, "dummy-management-server-address"),
262 }
263
264
265
266
267 origNewClient := rinternal.NewXDSClient
268 closeCh := make(chan struct{})
269 rinternal.NewXDSClient = func() (xdsclient.XDSClient, func(), error) {
270 c, cancel, err := xdsclient.NewWithConfigForTesting(bootstrapCfg, defaultTestTimeout, defaultTestTimeout)
271 return c, grpcsync.OnceFunc(func() {
272 close(closeCh)
273 cancel()
274 }), err
275 }
276 defer func() { rinternal.NewXDSClient = origNewClient }()
277
278 _, _, r := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///my-service-client-side-xds")})
279 r.Close()
280
281 select {
282 case <-closeCh:
283 case <-time.After(defaultTestTimeout):
284 t.Fatal("Timeout when waiting for xDS client to be closed")
285 }
286 }
287
288
289
290
291
292
293
294 func (s) TestResolverBadServiceUpdate(t *testing.T) {
295
296 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
297 defer cancel()
298 nodeID := uuid.New().String()
299 mgmtServer, _, _ := setupManagementServerForTest(ctx, t, nodeID)
300
301
302
303 hcm := testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{
304 HttpFilters: []*v3httppb.HttpFilter{e2e.HTTPFilter("router", &v3routerpb.Router{})},
305 })
306 lis := &v3listenerpb.Listener{
307 Name: defaultTestServiceName,
308 ApiListener: &v3listenerpb.ApiListener{ApiListener: hcm},
309 FilterChains: []*v3listenerpb.FilterChain{{
310 Name: "filter-chain-name",
311 Filters: []*v3listenerpb.Filter{{
312 Name: wellknown.HTTPConnectionManager,
313 ConfigType: &v3listenerpb.Filter_TypedConfig{TypedConfig: hcm},
314 }},
315 }},
316 }
317 configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, []*v3listenerpb.Listener{lis}, nil)
318
319
320 stateCh, errCh, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)})
321 wantErr := "no RouteSpecifier"
322 verifyErrorFromResolver(ctx, t, errCh, wantErr)
323
324
325
326 listeners := []*v3listenerpb.Listener{e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)}
327 routes := []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName)}
328 configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes)
329
330
331 verifyUpdateFromResolver(ctx, t, stateCh, wantDefaultServiceConfig)
332
333
334
335 configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, []*v3listenerpb.Listener{lis}, nil)
336 verifyErrorFromResolver(ctx, t, errCh, wantErr)
337 }
338
339
340
341
342
343
344
345 func (s) TestResolverGoodServiceUpdate(t *testing.T) {
346 for _, tt := range []struct {
347 name string
348 routeConfig *v3routepb.RouteConfiguration
349 wantServiceConfig string
350 wantClusters map[string]bool
351 }{
352 {
353 name: "single cluster",
354 routeConfig: e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{
355 RouteConfigName: defaultTestRouteConfigName,
356 ListenerName: defaultTestServiceName,
357 ClusterSpecifierType: e2e.RouteConfigClusterSpecifierTypeCluster,
358 ClusterName: defaultTestClusterName,
359 }),
360 wantServiceConfig: wantDefaultServiceConfig,
361 wantClusters: map[string]bool{fmt.Sprintf("cluster:%s", defaultTestClusterName): true},
362 },
363 {
364 name: "two clusters",
365 routeConfig: e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{
366 RouteConfigName: defaultTestRouteConfigName,
367 ListenerName: defaultTestServiceName,
368 ClusterSpecifierType: e2e.RouteConfigClusterSpecifierTypeWeightedCluster,
369 WeightedClusters: map[string]int{"cluster_1": 75, "cluster_2": 25},
370 }),
371
372
373
374 wantServiceConfig: `{
375 "loadBalancingConfig": [{
376 "xds_cluster_manager_experimental": {
377 "children": {
378 "cluster:cluster_1": {
379 "childPolicy": [{
380 "cds_experimental": {
381 "cluster": "cluster_1"
382 }
383 }]
384 },
385 "cluster:cluster_2": {
386 "childPolicy": [{
387 "cds_experimental": {
388 "cluster": "cluster_2"
389 }
390 }]
391 }
392 }
393 }
394 }]}`,
395 wantClusters: map[string]bool{"cluster:cluster_1": true, "cluster:cluster_2": true},
396 },
397 } {
398 t.Run(tt.name, func(t *testing.T) {
399
400 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
401 defer cancel()
402 nodeID := uuid.New().String()
403 mgmtServer, _, _ := setupManagementServerForTest(ctx, t, nodeID)
404
405
406
407 listeners := []*v3listenerpb.Listener{e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)}
408 routes := []*v3routepb.RouteConfiguration{tt.routeConfig}
409 configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes)
410
411 stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)})
412
413
414 cs := verifyUpdateFromResolver(ctx, t, stateCh, tt.wantServiceConfig)
415
416 pickedClusters := make(map[string]bool)
417
418
419
420 for i := 0; i < 100; i++ {
421 res, err := cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"})
422 if err != nil {
423 t.Fatalf("cs.SelectConfig(): %v", err)
424 }
425 cluster := clustermanager.GetPickedClusterForTesting(res.Context)
426 pickedClusters[cluster] = true
427 res.OnCommitted()
428 }
429 if !cmp.Equal(pickedClusters, tt.wantClusters) {
430 t.Errorf("Picked clusters: %v; want: %v", pickedClusters, tt.wantClusters)
431 }
432 })
433 }
434 }
435
436
437
438
439 func (s) TestResolverRequestHash(t *testing.T) {
440
441 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
442 defer cancel()
443 nodeID := uuid.New().String()
444 mgmtServer, _, _ := setupManagementServerForTest(ctx, t, nodeID)
445
446
447
448 listeners := []*v3listenerpb.Listener{e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)}
449 routes := []*v3routepb.RouteConfiguration{{
450 Name: defaultTestRouteConfigName,
451 VirtualHosts: []*v3routepb.VirtualHost{{
452 Domains: []string{defaultTestServiceName},
453 Routes: []*v3routepb.Route{{
454 Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}},
455 Action: &v3routepb.Route_Route{Route: &v3routepb.RouteAction{
456 ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{WeightedClusters: &v3routepb.WeightedCluster{
457 Clusters: []*v3routepb.WeightedCluster_ClusterWeight{
458 {
459 Name: defaultTestClusterName,
460 Weight: &wrapperspb.UInt32Value{Value: 100},
461 },
462 },
463 }},
464 HashPolicy: []*v3routepb.RouteAction_HashPolicy{{
465 PolicySpecifier: &v3routepb.RouteAction_HashPolicy_Header_{
466 Header: &v3routepb.RouteAction_HashPolicy_Header{
467 HeaderName: ":path",
468 },
469 },
470 Terminal: true,
471 }},
472 }},
473 }},
474 }},
475 }}
476 configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes)
477
478
479 stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)})
480 cs := verifyUpdateFromResolver(ctx, t, stateCh, "")
481
482
483
484 res, err := cs.SelectConfig(iresolver.RPCInfo{
485 Context: metadata.NewOutgoingContext(ctx, metadata.Pairs(":path", "/products")),
486 Method: "/service/method",
487 })
488 if err != nil {
489 t.Fatalf("cs.SelectConfig(): %v", err)
490 }
491 gotHash := ringhash.GetRequestHashForTesting(res.Context)
492 wantHash := xxhash.Sum64String("/products")
493 if gotHash != wantHash {
494 t.Fatalf("Got request hash: %v, want: %v", gotHash, wantHash)
495 }
496 }
497
498
499
500
501
502 func (s) TestResolverRemovedWithRPCs(t *testing.T) {
503
504 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
505 defer cancel()
506 nodeID := uuid.New().String()
507 mgmtServer, _, _ := setupManagementServerForTest(ctx, t, nodeID)
508
509
510 listeners := []*v3listenerpb.Listener{e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)}
511 routes := []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName)}
512 configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes)
513
514 stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)})
515
516
517 cs := verifyUpdateFromResolver(ctx, t, stateCh, wantDefaultServiceConfig)
518
519 res, err := cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"})
520 if err != nil {
521 t.Fatalf("cs.SelectConfig(): %v", err)
522 }
523
524
525
526 if err := mgmtServer.Update(ctx, e2e.UpdateOptions{NodeID: nodeID}); err != nil {
527 t.Fatal(err)
528 }
529
530
531
532
533
534 cs = verifyUpdateFromResolver(ctx, t, stateCh, wantDefaultServiceConfig)
535 _, err = cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"})
536 if err == nil || status.Code(err) != codes.Unavailable {
537 t.Fatalf("cs.SelectConfig() returned: %v, want: %v", err, codes.Unavailable)
538 }
539
540
541
542 res.OnCommitted()
543
544
545
546 var state resolver.State
547 select {
548 case <-ctx.Done():
549 t.Fatalf("Timeout waiting for an update from the resolver: %v", ctx.Err())
550 case state = <-stateCh:
551 if err := state.ServiceConfig.Err; err != nil {
552 t.Fatalf("Received error in service config: %v", state.ServiceConfig.Err)
553 }
554 wantSCParsed := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)("{}")
555 if !internal.EqualServiceConfigForTesting(state.ServiceConfig.Config, wantSCParsed.Config) {
556 t.Fatalf("Got service config:\n%s \nWant service config:\n%s", cmp.Diff(nil, state.ServiceConfig.Config), cmp.Diff(nil, wantSCParsed.Config))
557 }
558 }
559
560
561 configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes)
562
563 cs = verifyUpdateFromResolver(ctx, t, stateCh, wantDefaultServiceConfig)
564
565 res, err = cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"})
566 if err != nil {
567 t.Fatalf("cs.SelectConfig(): %v", err)
568 }
569 res.OnCommitted()
570 }
571
572
573
574
575 func (s) TestResolverRemovedResource(t *testing.T) {
576
577 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
578 defer cancel()
579 nodeID := uuid.New().String()
580 mgmtServer, _, _ := setupManagementServerForTest(ctx, t, nodeID)
581
582
583 listeners := []*v3listenerpb.Listener{e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)}
584 routes := []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName)}
585 configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes)
586
587 stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)})
588
589
590 cs := verifyUpdateFromResolver(ctx, t, stateCh, wantDefaultServiceConfig)
591
592
593 res, err := cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"})
594 if err != nil {
595 t.Fatalf("cs.SelectConfig(): %v", err)
596 }
597
598
599
600 res.OnCommitted()
601
602
603
604 if err := mgmtServer.Update(ctx, e2e.UpdateOptions{NodeID: nodeID}); err != nil {
605 t.Fatal(err)
606 }
607
608
609
610 cs = verifyUpdateFromResolver(ctx, t, stateCh, wantDefaultServiceConfig)
611
612
613 res, err = cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"})
614 if err == nil || status.Code(err) != codes.Unavailable {
615 t.Fatalf("cs.SelectConfig() got %v, %v, expected UNAVAILABLE error", res, err)
616 }
617
618
619 var state resolver.State
620 select {
621 case <-ctx.Done():
622 t.Fatalf("Timeout waiting for an update from the resolver: %v", ctx.Err())
623 case state = <-stateCh:
624 if err := state.ServiceConfig.Err; err != nil {
625 t.Fatalf("Received error in service config: %v", state.ServiceConfig.Err)
626 }
627 wantSCParsed := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)("{}")
628 if !internal.EqualServiceConfigForTesting(state.ServiceConfig.Config, wantSCParsed.Config) {
629 t.Fatalf("Got service config:\n%s \nWant service config:\n%s", cmp.Diff(nil, state.ServiceConfig.Config), cmp.Diff(nil, wantSCParsed.Config))
630 }
631 }
632 }
633
634
635
636
637
638
639 func (s) TestResolverMaxStreamDuration(t *testing.T) {
640
641 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
642 defer cancel()
643 nodeID := uuid.New().String()
644 mgmtServer, _, _ := setupManagementServerForTest(ctx, t, nodeID)
645
646 stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)})
647
648
649
650
651
652 hcm := testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{
653 RouteSpecifier: &v3httppb.HttpConnectionManager_Rds{Rds: &v3httppb.Rds{
654 ConfigSource: &v3corepb.ConfigSource{
655 ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{Ads: &v3corepb.AggregatedConfigSource{}},
656 },
657 RouteConfigName: defaultTestRouteConfigName,
658 }},
659 HttpFilters: []*v3httppb.HttpFilter{e2e.RouterHTTPFilter},
660 CommonHttpProtocolOptions: &v3corepb.HttpProtocolOptions{
661 MaxStreamDuration: durationpb.New(1 * time.Second),
662 },
663 })
664 listeners := []*v3listenerpb.Listener{{
665 Name: defaultTestServiceName,
666 ApiListener: &v3listenerpb.ApiListener{ApiListener: hcm},
667 FilterChains: []*v3listenerpb.FilterChain{{
668 Name: "filter-chain-name",
669 Filters: []*v3listenerpb.Filter{{
670 Name: wellknown.HTTPConnectionManager,
671 ConfigType: &v3listenerpb.Filter_TypedConfig{TypedConfig: hcm},
672 }},
673 }},
674 }}
675 routes := []*v3routepb.RouteConfiguration{{
676 Name: defaultTestRouteConfigName,
677 VirtualHosts: []*v3routepb.VirtualHost{{
678 Domains: []string{defaultTestServiceName},
679 Routes: []*v3routepb.Route{
680 {
681 Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/foo"}},
682 Action: &v3routepb.Route_Route{Route: &v3routepb.RouteAction{
683 ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{WeightedClusters: &v3routepb.WeightedCluster{
684 Clusters: []*v3routepb.WeightedCluster_ClusterWeight{
685 {
686 Name: "A",
687 Weight: &wrapperspb.UInt32Value{Value: 100},
688 },
689 }},
690 },
691 MaxStreamDuration: &v3routepb.RouteAction_MaxStreamDuration{
692 MaxStreamDuration: durationpb.New(5 * time.Second),
693 },
694 }},
695 },
696 {
697 Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/bar"}},
698 Action: &v3routepb.Route_Route{Route: &v3routepb.RouteAction{
699 ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{WeightedClusters: &v3routepb.WeightedCluster{
700 Clusters: []*v3routepb.WeightedCluster_ClusterWeight{
701 {
702 Name: "B",
703 Weight: &wrapperspb.UInt32Value{Value: 100},
704 },
705 }},
706 },
707 MaxStreamDuration: &v3routepb.RouteAction_MaxStreamDuration{
708 MaxStreamDuration: durationpb.New(0 * time.Second),
709 },
710 }},
711 },
712 {
713 Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}},
714 Action: &v3routepb.Route_Route{Route: &v3routepb.RouteAction{
715 ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{WeightedClusters: &v3routepb.WeightedCluster{
716 Clusters: []*v3routepb.WeightedCluster_ClusterWeight{
717 {
718 Name: "C",
719 Weight: &wrapperspb.UInt32Value{Value: 100},
720 },
721 }},
722 },
723 }},
724 },
725 },
726 }},
727 }}
728 configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes)
729
730
731 cs := verifyUpdateFromResolver(ctx, t, stateCh, "")
732
733 testCases := []struct {
734 name string
735 method string
736 want *time.Duration
737 }{{
738 name: "RDS setting",
739 method: "/foo/method",
740 want: newDurationP(5 * time.Second),
741 }, {
742 name: "explicit zero in RDS; ignore LDS",
743 method: "/bar/method",
744 want: nil,
745 }, {
746 name: "no config in RDS; fallback to LDS",
747 method: "/baz/method",
748 want: newDurationP(time.Second),
749 }}
750
751 for _, tc := range testCases {
752 t.Run(tc.name, func(t *testing.T) {
753 req := iresolver.RPCInfo{
754 Method: tc.method,
755 Context: ctx,
756 }
757 res, err := cs.SelectConfig(req)
758 if err != nil {
759 t.Errorf("cs.SelectConfig(%v): %v", req, err)
760 return
761 }
762 res.OnCommitted()
763 got := res.MethodConfig.Timeout
764 if !cmp.Equal(got, tc.want) {
765 t.Errorf("For method %q: res.MethodConfig.Timeout = %v; want %v", tc.method, got, tc.want)
766 }
767 })
768 }
769 }
770
771
772 func (s) TestResolverDelayedOnCommitted(t *testing.T) {
773
774 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
775 defer cancel()
776 nodeID := uuid.New().String()
777 mgmtServer, _, _ := setupManagementServerForTest(ctx, t, nodeID)
778
779
780 listeners := []*v3listenerpb.Listener{e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)}
781 routes := []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName)}
782 configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes)
783
784 stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)})
785
786
787 cs := verifyUpdateFromResolver(ctx, t, stateCh, wantDefaultServiceConfig)
788
789
790 resOld, err := cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"})
791 if err != nil {
792 t.Fatalf("cs.SelectConfig(): %v", err)
793 }
794 wantClusterName := fmt.Sprintf("cluster:%s", defaultTestClusterName)
795 if cluster := clustermanager.GetPickedClusterForTesting(resOld.Context); cluster != wantClusterName {
796 t.Fatalf("Picked cluster is %q, want %q", cluster, wantClusterName)
797 }
798
799
800
801
802
803
804 newClusterName := "new-" + defaultTestClusterName
805 routes = []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, newClusterName)}
806 configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes)
807
808
809
810
811 wantSC := fmt.Sprintf(`
812 {
813 "loadBalancingConfig": [
814 {
815 "xds_cluster_manager_experimental": {
816 "children": {
817 "cluster:%s": {
818 "childPolicy": [
819 {
820 "cds_experimental": {
821 "cluster": "%s"
822 }
823 }
824 ]
825 },
826 "cluster:%s": {
827 "childPolicy": [
828 {
829 "cds_experimental": {
830 "cluster": "%s"
831 }
832 }
833 ]
834 }
835 }
836 }
837 }
838 ]
839 }`, defaultTestClusterName, defaultTestClusterName, newClusterName, newClusterName)
840 cs = verifyUpdateFromResolver(ctx, t, stateCh, wantSC)
841
842 resNew, err := cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"})
843 if err != nil {
844 t.Fatalf("cs.SelectConfig(): %v", err)
845 }
846 wantClusterName = fmt.Sprintf("cluster:%s", newClusterName)
847 if cluster := clustermanager.GetPickedClusterForTesting(resNew.Context); cluster != wantClusterName {
848 t.Fatalf("Picked cluster is %q, want %q", cluster, wantClusterName)
849 }
850
851
852
853
854 resOld.OnCommitted()
855
856 wantSC = fmt.Sprintf(`
857 {
858 "loadBalancingConfig": [
859 {
860 "xds_cluster_manager_experimental": {
861 "children": {
862 "cluster:%s": {
863 "childPolicy": [
864 {
865 "cds_experimental": {
866 "cluster": "%s"
867 }
868 }
869 ]
870 }
871 }
872 }
873 }
874 ]
875 }`, newClusterName, newClusterName)
876 verifyUpdateFromResolver(ctx, t, stateCh, wantSC)
877 }
878
879
880
881
882 func (s) TestResolverMultipleLDSUpdates(t *testing.T) {
883
884 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
885 defer cancel()
886 nodeID := uuid.New().String()
887 mgmtServer, _, _ := setupManagementServerForTest(ctx, t, nodeID)
888
889
890
891 listeners := []*v3listenerpb.Listener{e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)}
892 configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, nil)
893
894 stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)})
895
896
897 verifyNoUpdateFromResolver(ctx, t, stateCh)
898
899
900
901
902
903 hcm := testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{
904 RouteSpecifier: &v3httppb.HttpConnectionManager_Rds{Rds: &v3httppb.Rds{
905 ConfigSource: &v3corepb.ConfigSource{
906 ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{Ads: &v3corepb.AggregatedConfigSource{}},
907 },
908 RouteConfigName: defaultTestRouteConfigName,
909 }},
910 HttpFilters: []*v3httppb.HttpFilter{e2e.RouterHTTPFilter},
911 CommonHttpProtocolOptions: &v3corepb.HttpProtocolOptions{
912 MaxStreamDuration: durationpb.New(1 * time.Second),
913 },
914 })
915 listeners = []*v3listenerpb.Listener{{
916 Name: defaultTestServiceName,
917 ApiListener: &v3listenerpb.ApiListener{ApiListener: hcm},
918 FilterChains: []*v3listenerpb.FilterChain{{
919 Name: "filter-chain-name",
920 Filters: []*v3listenerpb.Filter{{
921 Name: wellknown.HTTPConnectionManager,
922 ConfigType: &v3listenerpb.Filter_TypedConfig{TypedConfig: hcm},
923 }},
924 }},
925 }}
926 configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, nil)
927
928
929 verifyNoUpdateFromResolver(ctx, t, stateCh)
930 }
931
932
933
934
935
936 func (s) TestResolverWRR(t *testing.T) {
937 origNewWRR := rinternal.NewWRR
938 rinternal.NewWRR = testutils.NewTestWRR
939 defer func() { rinternal.NewWRR = origNewWRR }()
940
941
942 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
943 defer cancel()
944 nodeID := uuid.New().String()
945 mgmtServer, _, _ := setupManagementServerForTest(ctx, t, nodeID)
946
947 stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)})
948
949
950 listeners := []*v3listenerpb.Listener{e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)}
951 routes := []*v3routepb.RouteConfiguration{e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{
952 RouteConfigName: defaultTestRouteConfigName,
953 ListenerName: defaultTestServiceName,
954 ClusterSpecifierType: e2e.RouteConfigClusterSpecifierTypeWeightedCluster,
955 WeightedClusters: map[string]int{"A": 75, "B": 25},
956 })}
957 configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes)
958
959
960 cs := verifyUpdateFromResolver(ctx, t, stateCh, "")
961
962
963 picks := map[string]int{}
964 for i := 0; i < 100; i++ {
965 res, err := cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"})
966 if err != nil {
967 t.Fatalf("cs.SelectConfig(): %v", err)
968 }
969 picks[clustermanager.GetPickedClusterForTesting(res.Context)]++
970 res.OnCommitted()
971 }
972 want := map[string]int{"cluster:A": 75, "cluster:B": 25}
973 if !cmp.Equal(picks, want) {
974 t.Errorf("Picked clusters: %v; want: %v", picks, want)
975 }
976 }
977
978 const filterCfgPathFieldName = "path"
979 const filterCfgErrorFieldName = "new_stream_error"
980
981 type filterCfg struct {
982 httpfilter.FilterConfig
983 path string
984 newStreamErr error
985 }
986
987 type filterBuilder struct {
988 paths []string
989 typeURL string
990 }
991
992 func (fb *filterBuilder) TypeURLs() []string { return []string{fb.typeURL} }
993
994 func filterConfigFromProto(cfg proto.Message) (httpfilter.FilterConfig, error) {
995 ts, ok := cfg.(*v3xdsxdstypepb.TypedStruct)
996 if !ok {
997 return nil, fmt.Errorf("unsupported filter config type: %T, want %T", cfg, &v3xdsxdstypepb.TypedStruct{})
998 }
999
1000 if ts.GetValue() == nil {
1001 return filterCfg{}, nil
1002 }
1003 ret := filterCfg{}
1004 if v := ts.GetValue().GetFields()[filterCfgPathFieldName]; v != nil {
1005 ret.path = v.GetStringValue()
1006 }
1007 if v := ts.GetValue().GetFields()[filterCfgErrorFieldName]; v != nil {
1008 if v.GetStringValue() == "" {
1009 ret.newStreamErr = nil
1010 } else {
1011 ret.newStreamErr = fmt.Errorf("%s", v.GetStringValue())
1012 }
1013 }
1014 return ret, nil
1015 }
1016
1017 func (*filterBuilder) ParseFilterConfig(cfg proto.Message) (httpfilter.FilterConfig, error) {
1018 return filterConfigFromProto(cfg)
1019 }
1020
1021 func (*filterBuilder) ParseFilterConfigOverride(override proto.Message) (httpfilter.FilterConfig, error) {
1022 return filterConfigFromProto(override)
1023 }
1024
1025 func (*filterBuilder) IsTerminal() bool { return false }
1026
1027 var _ httpfilter.ClientInterceptorBuilder = &filterBuilder{}
1028
1029 func (fb *filterBuilder) BuildClientInterceptor(config, override httpfilter.FilterConfig) (iresolver.ClientInterceptor, error) {
1030 if config == nil {
1031 panic("unexpected missing config")
1032 }
1033
1034 fi := &filterInterceptor{
1035 parent: fb,
1036 pathCh: make(chan string, 10),
1037 }
1038
1039 fb.paths = append(fb.paths, "build:"+config.(filterCfg).path)
1040 err := config.(filterCfg).newStreamErr
1041 if override != nil {
1042 fb.paths = append(fb.paths, "override:"+override.(filterCfg).path)
1043 err = override.(filterCfg).newStreamErr
1044 }
1045
1046 fi.cfgPath = config.(filterCfg).path
1047 fi.err = err
1048 return fi, nil
1049 }
1050
1051 type filterInterceptor struct {
1052 parent *filterBuilder
1053 pathCh chan string
1054 cfgPath string
1055 err error
1056 }
1057
1058 func (fi *filterInterceptor) NewStream(ctx context.Context, ri iresolver.RPCInfo, done func(), newStream func(ctx context.Context, done func()) (iresolver.ClientStream, error)) (iresolver.ClientStream, error) {
1059 fi.parent.paths = append(fi.parent.paths, "newstream:"+fi.cfgPath)
1060 if fi.err != nil {
1061 return nil, fi.err
1062 }
1063 d := func() {
1064 fi.parent.paths = append(fi.parent.paths, "done:"+fi.cfgPath)
1065 done()
1066 }
1067 cs, err := newStream(ctx, d)
1068 if err != nil {
1069 return nil, err
1070 }
1071 return &clientStream{ClientStream: cs}, nil
1072 }
1073
1074 type clientStream struct {
1075 iresolver.ClientStream
1076 }
1077
1078 func (s) TestConfigSelector_FailureCases(t *testing.T) {
1079 const methodName = "1"
1080
1081 tests := []struct {
1082 name string
1083 listener *v3listenerpb.Listener
1084 wantErr string
1085 }{
1086 {
1087 name: "route type RouteActionUnsupported invalid for client",
1088 listener: &v3listenerpb.Listener{
1089 Name: defaultTestServiceName,
1090 ApiListener: &v3listenerpb.ApiListener{
1091 ApiListener: testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{
1092 RouteSpecifier: &v3httppb.HttpConnectionManager_RouteConfig{
1093 RouteConfig: &v3routepb.RouteConfiguration{
1094 Name: defaultTestRouteConfigName,
1095 VirtualHosts: []*v3routepb.VirtualHost{{
1096 Domains: []string{defaultTestServiceName},
1097 Routes: []*v3routepb.Route{{
1098 Match: &v3routepb.RouteMatch{
1099 PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: methodName},
1100 },
1101 Action: &v3routepb.Route_FilterAction{},
1102 }},
1103 }},
1104 }},
1105 HttpFilters: []*v3httppb.HttpFilter{e2e.RouterHTTPFilter},
1106 }),
1107 },
1108 },
1109 wantErr: "matched route does not have a supported route action type",
1110 },
1111 {
1112 name: "route type RouteActionNonForwardingAction invalid for client",
1113 listener: &v3listenerpb.Listener{
1114 Name: defaultTestServiceName,
1115 ApiListener: &v3listenerpb.ApiListener{
1116 ApiListener: testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{
1117 RouteSpecifier: &v3httppb.HttpConnectionManager_RouteConfig{
1118 RouteConfig: &v3routepb.RouteConfiguration{
1119 Name: defaultTestRouteConfigName,
1120 VirtualHosts: []*v3routepb.VirtualHost{{
1121 Domains: []string{defaultTestServiceName},
1122 Routes: []*v3routepb.Route{{
1123 Match: &v3routepb.RouteMatch{
1124 PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: methodName},
1125 },
1126 Action: &v3routepb.Route_NonForwardingAction{},
1127 }},
1128 }},
1129 }},
1130 HttpFilters: []*v3httppb.HttpFilter{e2e.RouterHTTPFilter},
1131 }),
1132 },
1133 },
1134 wantErr: "matched route does not have a supported route action type",
1135 },
1136 }
1137
1138 for _, test := range tests {
1139 t.Run(test.name, func(t *testing.T) {
1140
1141 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
1142 defer cancel()
1143 nodeID := uuid.New().String()
1144 mgmtServer, _, _ := setupManagementServerForTest(ctx, t, nodeID)
1145
1146
1147 stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)})
1148
1149
1150
1151 configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, []*v3listenerpb.Listener{test.listener}, nil)
1152
1153
1154 cs := verifyUpdateFromResolver(ctx, t, stateCh, "")
1155
1156
1157 _, err := cs.SelectConfig(iresolver.RPCInfo{Method: methodName, Context: ctx})
1158 if err == nil || !strings.Contains(err.Error(), test.wantErr) {
1159 t.Errorf("SelectConfig(_) = _, %v; want _, Contains(%v)", err, test.wantErr)
1160 }
1161 })
1162 }
1163 }
1164
1165 func newHTTPFilter(t *testing.T, name, typeURL, path, err string) *v3httppb.HttpFilter {
1166 return &v3httppb.HttpFilter{
1167 Name: name,
1168 ConfigType: &v3httppb.HttpFilter_TypedConfig{
1169 TypedConfig: testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{
1170 TypeUrl: typeURL,
1171 Value: &structpb.Struct{
1172 Fields: map[string]*structpb.Value{
1173 filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: path}},
1174 filterCfgErrorFieldName: {Kind: &structpb.Value_StringValue{StringValue: err}},
1175 },
1176 },
1177 }),
1178 },
1179 }
1180 }
1181
1182 func (s) TestXDSResolverHTTPFilters(t *testing.T) {
1183 const methodName1 = "1"
1184 const methodName2 = "2"
1185 testFilterName := t.Name()
1186
1187 testCases := []struct {
1188 name string
1189 listener *v3listenerpb.Listener
1190 rpcRes map[string][][]string
1191 wantStreamErr string
1192 }{
1193 {
1194 name: "NewStream error - ensure earlier interceptor Done is still called",
1195 listener: &v3listenerpb.Listener{
1196 Name: defaultTestServiceName,
1197 ApiListener: &v3listenerpb.ApiListener{
1198 ApiListener: testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{
1199 RouteSpecifier: &v3httppb.HttpConnectionManager_RouteConfig{
1200 RouteConfig: &v3routepb.RouteConfiguration{
1201 Name: defaultTestRouteConfigName,
1202 VirtualHosts: []*v3routepb.VirtualHost{{
1203 Domains: []string{defaultTestServiceName},
1204 Routes: []*v3routepb.Route{{
1205 Match: &v3routepb.RouteMatch{
1206 PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: methodName1},
1207 },
1208 Action: &v3routepb.Route_Route{
1209 Route: &v3routepb.RouteAction{
1210 ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{
1211 WeightedClusters: &v3routepb.WeightedCluster{
1212 Clusters: []*v3routepb.WeightedCluster_ClusterWeight{
1213 {Name: "A", Weight: wrapperspb.UInt32(1)},
1214 {Name: "B", Weight: wrapperspb.UInt32(1)},
1215 },
1216 },
1217 },
1218 },
1219 },
1220 }},
1221 }},
1222 }},
1223 HttpFilters: []*v3httppb.HttpFilter{
1224 newHTTPFilter(t, "foo", testFilterName, "foo1", ""),
1225 newHTTPFilter(t, "bar", testFilterName, "bar1", "bar newstream err"),
1226 e2e.RouterHTTPFilter,
1227 },
1228 }),
1229 },
1230 },
1231 rpcRes: map[string][][]string{
1232 methodName1: {
1233 {"build:foo1", "build:bar1", "newstream:foo1", "newstream:bar1", "done:foo1"},
1234 },
1235 },
1236 wantStreamErr: "bar newstream err",
1237 },
1238 {
1239 name: "all overrides",
1240 listener: &v3listenerpb.Listener{
1241 Name: defaultTestServiceName,
1242 ApiListener: &v3listenerpb.ApiListener{
1243 ApiListener: testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{
1244 RouteSpecifier: &v3httppb.HttpConnectionManager_RouteConfig{
1245 RouteConfig: &v3routepb.RouteConfiguration{
1246 Name: defaultTestRouteConfigName,
1247 VirtualHosts: []*v3routepb.VirtualHost{{
1248 Domains: []string{defaultTestServiceName},
1249 Routes: []*v3routepb.Route{
1250 {
1251 Match: &v3routepb.RouteMatch{
1252 PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: methodName1},
1253 },
1254 Action: &v3routepb.Route_Route{
1255 Route: &v3routepb.RouteAction{
1256 ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{
1257 WeightedClusters: &v3routepb.WeightedCluster{
1258 Clusters: []*v3routepb.WeightedCluster_ClusterWeight{
1259 {Name: "A", Weight: wrapperspb.UInt32(1)},
1260 {Name: "B", Weight: wrapperspb.UInt32(1)},
1261 },
1262 },
1263 },
1264 },
1265 },
1266 },
1267 {
1268 Match: &v3routepb.RouteMatch{
1269 PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: methodName2},
1270 },
1271 Action: &v3routepb.Route_Route{
1272 Route: &v3routepb.RouteAction{
1273 ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{
1274 WeightedClusters: &v3routepb.WeightedCluster{
1275 Clusters: []*v3routepb.WeightedCluster_ClusterWeight{
1276 {Name: "A", Weight: wrapperspb.UInt32(1)},
1277 {
1278 Name: "B",
1279 Weight: wrapperspb.UInt32(1),
1280 TypedPerFilterConfig: map[string]*anypb.Any{
1281 "foo": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{
1282 TypeUrl: testFilterName,
1283 Value: &structpb.Struct{
1284 Fields: map[string]*structpb.Value{
1285 filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: "foo4"}},
1286 },
1287 },
1288 }),
1289 "bar": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{
1290 TypeUrl: testFilterName,
1291 Value: &structpb.Struct{
1292 Fields: map[string]*structpb.Value{
1293 filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: "bar4"}},
1294 },
1295 },
1296 }),
1297 },
1298 },
1299 },
1300 },
1301 },
1302 },
1303 },
1304 TypedPerFilterConfig: map[string]*anypb.Any{
1305 "foo": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{
1306 TypeUrl: testFilterName,
1307 Value: &structpb.Struct{
1308 Fields: map[string]*structpb.Value{
1309 filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: "foo3"}},
1310 filterCfgErrorFieldName: {Kind: &structpb.Value_StringValue{StringValue: ""}},
1311 },
1312 },
1313 }),
1314 "bar": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{
1315 TypeUrl: testFilterName,
1316 Value: &structpb.Struct{
1317 Fields: map[string]*structpb.Value{
1318 filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: "bar3"}},
1319 },
1320 },
1321 }),
1322 },
1323 },
1324 },
1325 TypedPerFilterConfig: map[string]*anypb.Any{
1326 "foo": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{
1327 TypeUrl: testFilterName,
1328 Value: &structpb.Struct{
1329 Fields: map[string]*structpb.Value{
1330 filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: "foo2"}},
1331 filterCfgErrorFieldName: {Kind: &structpb.Value_StringValue{StringValue: ""}},
1332 },
1333 },
1334 }),
1335 "bar": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{
1336 TypeUrl: testFilterName,
1337 Value: &structpb.Struct{
1338 Fields: map[string]*structpb.Value{
1339 filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: "bar2"}},
1340 },
1341 },
1342 }),
1343 },
1344 }},
1345 }},
1346 HttpFilters: []*v3httppb.HttpFilter{
1347 newHTTPFilter(t, "foo", testFilterName, "foo1", "this is overridden to nil"),
1348 newHTTPFilter(t, "bar", testFilterName, "bar1", ""),
1349 e2e.RouterHTTPFilter,
1350 },
1351 }),
1352 },
1353 },
1354 rpcRes: map[string][][]string{
1355 methodName1: {
1356 {"build:foo1", "override:foo2", "build:bar1", "override:bar2", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"},
1357 {"build:foo1", "override:foo2", "build:bar1", "override:bar2", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"},
1358 },
1359 methodName2: {
1360 {"build:foo1", "override:foo3", "build:bar1", "override:bar3", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"},
1361 {"build:foo1", "override:foo4", "build:bar1", "override:bar4", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"},
1362 {"build:foo1", "override:foo3", "build:bar1", "override:bar3", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"},
1363 {"build:foo1", "override:foo4", "build:bar1", "override:bar4", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"},
1364 },
1365 },
1366 },
1367 }
1368
1369 for _, tc := range testCases {
1370 t.Run(tc.name, func(t *testing.T) {
1371 origNewWRR := rinternal.NewWRR
1372 rinternal.NewWRR = testutils.NewTestWRR
1373 defer func() { rinternal.NewWRR = origNewWRR }()
1374
1375
1376 fb := &filterBuilder{typeURL: testFilterName}
1377 httpfilter.Register(fb)
1378
1379
1380 ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
1381 defer cancel()
1382 nodeID := uuid.New().String()
1383 mgmtServer, _, _ := setupManagementServerForTest(ctx, t, nodeID)
1384
1385
1386 stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)})
1387
1388
1389
1390 configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, []*v3listenerpb.Listener{tc.listener}, nil)
1391
1392
1393 cs := verifyUpdateFromResolver(ctx, t, stateCh, "")
1394
1395 for method, wants := range tc.rpcRes {
1396
1397 remainingWant := make([][]string, len(wants))
1398 copy(remainingWant, wants)
1399 for n := range wants {
1400 res, err := cs.SelectConfig(iresolver.RPCInfo{Method: method, Context: ctx})
1401 if err != nil {
1402 t.Fatalf("Unexpected error from cs.SelectConfig(_): %v", err)
1403 }
1404
1405 var doneFunc func()
1406 _, err = res.Interceptor.NewStream(context.Background(), iresolver.RPCInfo{}, func() {}, func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
1407 doneFunc = done
1408 return nil, nil
1409 })
1410 if tc.wantStreamErr != "" {
1411 if err == nil || !strings.Contains(err.Error(), tc.wantStreamErr) {
1412 t.Errorf("NewStream(...) = _, %v; want _, Contains(%v)", err, tc.wantStreamErr)
1413 }
1414 if err == nil {
1415 res.OnCommitted()
1416 doneFunc()
1417 }
1418 continue
1419 }
1420 if err != nil {
1421 t.Fatalf("unexpected error from Interceptor.NewStream: %v", err)
1422
1423 }
1424 res.OnCommitted()
1425 doneFunc()
1426
1427 gotPaths := fb.paths
1428 fb.paths = []string{}
1429
1430
1431 pass := false
1432 for i := range remainingWant {
1433 if cmp.Equal(gotPaths, remainingWant[i]) {
1434 remainingWant[i] = remainingWant[len(remainingWant)-1]
1435 remainingWant = remainingWant[:len(remainingWant)-1]
1436 pass = true
1437 break
1438 }
1439 }
1440 if !pass {
1441 t.Errorf("%q:%v - path:\n%v\nwant one of:\n%v", method, n, gotPaths, remainingWant)
1442 }
1443 }
1444 }
1445 })
1446 }
1447 }
1448
1449 func newDurationP(d time.Duration) *time.Duration {
1450 return &d
1451 }
1452
View as plain text