...

Source file src/google.golang.org/grpc/xds/internal/resolver/xds_resolver_test.go

Documentation: google.golang.org/grpc/xds/internal/resolver

     1  /*
     2   *
     3   * Copyright 2019 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  package resolver_test
    20  
    21  import (
    22  	"context"
    23  	"fmt"
    24  	"strings"
    25  	"testing"
    26  	"time"
    27  
    28  	xxhash "github.com/cespare/xxhash/v2"
    29  	"github.com/envoyproxy/go-control-plane/pkg/wellknown"
    30  	"github.com/google/go-cmp/cmp"
    31  	"github.com/google/uuid"
    32  	"google.golang.org/grpc/codes"
    33  	"google.golang.org/grpc/internal"
    34  	"google.golang.org/grpc/internal/grpcsync"
    35  	iresolver "google.golang.org/grpc/internal/resolver"
    36  	"google.golang.org/grpc/internal/testutils"
    37  	xdsbootstrap "google.golang.org/grpc/internal/testutils/xds/bootstrap"
    38  	"google.golang.org/grpc/internal/testutils/xds/e2e"
    39  	"google.golang.org/grpc/internal/xds/bootstrap"
    40  	"google.golang.org/grpc/metadata"
    41  	"google.golang.org/grpc/resolver"
    42  	"google.golang.org/grpc/serviceconfig"
    43  	"google.golang.org/grpc/status"
    44  	"google.golang.org/grpc/xds/internal/balancer/clustermanager"
    45  	"google.golang.org/grpc/xds/internal/balancer/ringhash"
    46  	"google.golang.org/grpc/xds/internal/httpfilter"
    47  	xdsresolver "google.golang.org/grpc/xds/internal/resolver"
    48  	rinternal "google.golang.org/grpc/xds/internal/resolver/internal"
    49  	xdstestutils "google.golang.org/grpc/xds/internal/testutils"
    50  	"google.golang.org/grpc/xds/internal/xdsclient"
    51  	"google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version"
    52  	"google.golang.org/protobuf/proto"
    53  	"google.golang.org/protobuf/types/known/anypb"
    54  	"google.golang.org/protobuf/types/known/durationpb"
    55  	"google.golang.org/protobuf/types/known/structpb"
    56  	"google.golang.org/protobuf/types/known/wrapperspb"
    57  
    58  	v3xdsxdstypepb "github.com/cncf/xds/go/xds/type/v3"
    59  	v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
    60  	v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
    61  	v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
    62  	v3routerpb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/router/v3"
    63  	v3httppb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
    64  	v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
    65  
    66  	_ "google.golang.org/grpc/xds/internal/balancer/cdsbalancer" // Register the cds LB policy
    67  	_ "google.golang.org/grpc/xds/internal/httpfilter/router"    // Register the router filter
    68  )
    69  
    70  // Tests the case where xDS client creation is expected to fail because the
    71  // bootstrap configuration is not specified. The test verifies that xDS resolver
    72  // build fails as well.
    73  func (s) TestResolverBuilder_ClientCreationFails_NoBootstrap(t *testing.T) {
    74  	// Build an xDS resolver without specifying bootstrap env vars.
    75  	builder := resolver.Get(xdsresolver.Scheme)
    76  	if builder == nil {
    77  		t.Fatalf("Scheme %q is not registered", xdsresolver.Scheme)
    78  	}
    79  
    80  	target := resolver.Target{URL: *testutils.MustParseURL("xds:///target")}
    81  	if _, err := builder.Build(target, nil, resolver.BuildOptions{}); err == nil {
    82  		t.Fatalf("xds Resolver Build(%v) succeeded when expected to fail, because there is not bootstrap configuration for the xDS client", target)
    83  	}
    84  }
    85  
    86  // Tests the case where the specified dial target contains an authority that is
    87  // not specified in the bootstrap file. Verifies that the resolver.Build method
    88  // fails with the expected error string.
    89  func (s) TestResolverBuilder_AuthorityNotDefinedInBootstrap(t *testing.T) {
    90  	bootstrapCleanup, err := xdsbootstrap.CreateFile(xdsbootstrap.Options{
    91  		NodeID:    "node-id",
    92  		ServerURI: "dummy-management-server",
    93  	})
    94  	if err != nil {
    95  		t.Fatal(err)
    96  	}
    97  	defer bootstrapCleanup()
    98  
    99  	builder := resolver.Get(xdsresolver.Scheme)
   100  	if builder == nil {
   101  		t.Fatalf("Scheme %q is not registered", xdsresolver.Scheme)
   102  	}
   103  
   104  	target := resolver.Target{URL: *testutils.MustParseURL("xds://non-existing-authority/target")}
   105  	const wantErr = `authority "non-existing-authority" specified in dial target "xds://non-existing-authority/target" is not found in the bootstrap file`
   106  
   107  	r, err := builder.Build(target, &testutils.ResolverClientConn{Logger: t}, resolver.BuildOptions{})
   108  	if r != nil {
   109  		r.Close()
   110  	}
   111  	if err == nil {
   112  		t.Fatalf("xds Resolver Build(%v) succeeded for target with authority not specified in bootstrap", target)
   113  	}
   114  	if !strings.Contains(err.Error(), wantErr) {
   115  		t.Fatalf("xds Resolver Build(%v) returned err: %v, wantErr: %v", target, err, wantErr)
   116  	}
   117  }
   118  
   119  // Test builds an xDS resolver and verifies that the resource name specified in
   120  // the discovery request matches expectations.
   121  func (s) TestResolverResourceName(t *testing.T) {
   122  	tests := []struct {
   123  		name                         string
   124  		listenerResourceNameTemplate string
   125  		extraAuthority               string
   126  		dialTarget                   string
   127  		wantResourceNames            []string
   128  	}{
   129  		{
   130  			name:                         "default %s old style",
   131  			listenerResourceNameTemplate: "%s",
   132  			dialTarget:                   "xds:///target",
   133  			wantResourceNames:            []string{"target"},
   134  		},
   135  		{
   136  			name:                         "old style no percent encoding",
   137  			listenerResourceNameTemplate: "/path/to/%s",
   138  			dialTarget:                   "xds:///target",
   139  			wantResourceNames:            []string{"/path/to/target"},
   140  		},
   141  		{
   142  			name:                         "new style with %s",
   143  			listenerResourceNameTemplate: "xdstp://authority.com/%s",
   144  			dialTarget:                   "xds:///0.0.0.0:8080",
   145  			wantResourceNames:            []string{"xdstp://authority.com/0.0.0.0:8080"},
   146  		},
   147  		{
   148  			name:                         "new style percent encoding",
   149  			listenerResourceNameTemplate: "xdstp://authority.com/%s",
   150  			dialTarget:                   "xds:///[::1]:8080",
   151  			wantResourceNames:            []string{"xdstp://authority.com/%5B::1%5D:8080"},
   152  		},
   153  		{
   154  			name:                         "new style different authority",
   155  			listenerResourceNameTemplate: "xdstp://authority.com/%s",
   156  			extraAuthority:               "test-authority",
   157  			dialTarget:                   "xds://test-authority/target",
   158  			wantResourceNames:            []string{"xdstp://test-authority/envoy.config.listener.v3.Listener/target"},
   159  		},
   160  	}
   161  	for _, tt := range tests {
   162  		t.Run(tt.name, func(t *testing.T) {
   163  			// Spin up an xDS management server for the test.
   164  			ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   165  			defer cancel()
   166  			nodeID := uuid.New().String()
   167  			mgmtServer, lisCh, _ := setupManagementServerForTest(ctx, t, nodeID)
   168  
   169  			// Create a bootstrap configuration with test options.
   170  			opts := xdsbootstrap.Options{
   171  				ServerURI: mgmtServer.Address,
   172  				ClientDefaultListenerResourceNameTemplate: tt.listenerResourceNameTemplate,
   173  			}
   174  			if tt.extraAuthority != "" {
   175  				// In this test, we really don't care about having multiple
   176  				// management servers. All we need to verify is whether the
   177  				// resource name matches expectation.
   178  				opts.Authorities = map[string]string{
   179  					tt.extraAuthority: mgmtServer.Address,
   180  				}
   181  			}
   182  			cleanup, err := xdsbootstrap.CreateFile(opts)
   183  			if err != nil {
   184  				t.Fatal(err)
   185  			}
   186  			defer cleanup()
   187  
   188  			buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL(tt.dialTarget)})
   189  			waitForResourceNames(ctx, t, lisCh, tt.wantResourceNames)
   190  		})
   191  	}
   192  }
   193  
   194  // Tests the case where a service update from the underlying xDS client is
   195  // received after the resolver is closed, and verifies that the update is not
   196  // propagated to the ClientConn.
   197  func (s) TestResolverWatchCallbackAfterClose(t *testing.T) {
   198  	// Setup the management server that synchronizes with the test goroutine
   199  	// using two channels. The management server signals the test goroutine when
   200  	// it receives a discovery request for a route configuration resource. And
   201  	// the test goroutine signals the management server when the resolver is
   202  	// closed.
   203  	routeConfigResourceNamesCh := make(chan []string, 1)
   204  	waitForResolverCloseCh := make(chan struct{})
   205  	mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{
   206  		OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error {
   207  			if req.GetTypeUrl() == version.V3RouteConfigURL {
   208  				select {
   209  				case <-routeConfigResourceNamesCh:
   210  				default:
   211  				}
   212  				select {
   213  				case routeConfigResourceNamesCh <- req.GetResourceNames():
   214  				default:
   215  				}
   216  				<-waitForResolverCloseCh
   217  			}
   218  			return nil
   219  		},
   220  	})
   221  	if err != nil {
   222  		t.Fatalf("Failed to start xDS management server: %v", err)
   223  	}
   224  	defer mgmtServer.Stop()
   225  
   226  	// Create a bootstrap configuration specifying the above management server.
   227  	nodeID := uuid.New().String()
   228  	cleanup, err := xdsbootstrap.CreateFile(xdsbootstrap.Options{
   229  		NodeID:    nodeID,
   230  		ServerURI: mgmtServer.Address,
   231  	})
   232  	if err != nil {
   233  		t.Fatal(err)
   234  	}
   235  	defer cleanup()
   236  
   237  	// Configure resources on the management server.
   238  	listeners := []*v3listenerpb.Listener{e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)}
   239  	routes := []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName)}
   240  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   241  	defer cancel()
   242  	configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes)
   243  
   244  	// Wait for a discovery request for a route configuration resource.
   245  	stateCh, _, r := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)})
   246  	waitForResourceNames(ctx, t, routeConfigResourceNamesCh, []string{defaultTestRouteConfigName})
   247  
   248  	// Close the resolver and unblock the management server.
   249  	r.Close()
   250  	close(waitForResolverCloseCh)
   251  
   252  	// Verify that the update from the management server is not propagated to
   253  	// the ClientConn. The xDS resolver, once closed, is expected to drop
   254  	// updates from the xDS client.
   255  	verifyNoUpdateFromResolver(ctx, t, stateCh)
   256  }
   257  
   258  // Tests that the xDS resolver's Close method closes the xDS client.
   259  func (s) TestResolverCloseClosesXDSClient(t *testing.T) {
   260  	bootstrapCfg := &bootstrap.Config{
   261  		XDSServer: xdstestutils.ServerConfigForAddress(t, "dummy-management-server-address"),
   262  	}
   263  
   264  	// Override xDS client creation to use bootstrap configuration pointing to a
   265  	// dummy management server. Also close a channel when the returned xDS
   266  	// client is closed.
   267  	origNewClient := rinternal.NewXDSClient
   268  	closeCh := make(chan struct{})
   269  	rinternal.NewXDSClient = func() (xdsclient.XDSClient, func(), error) {
   270  		c, cancel, err := xdsclient.NewWithConfigForTesting(bootstrapCfg, defaultTestTimeout, defaultTestTimeout)
   271  		return c, grpcsync.OnceFunc(func() {
   272  			close(closeCh)
   273  			cancel()
   274  		}), err
   275  	}
   276  	defer func() { rinternal.NewXDSClient = origNewClient }()
   277  
   278  	_, _, r := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///my-service-client-side-xds")})
   279  	r.Close()
   280  
   281  	select {
   282  	case <-closeCh:
   283  	case <-time.After(defaultTestTimeout):
   284  		t.Fatal("Timeout when waiting for xDS client to be closed")
   285  	}
   286  }
   287  
   288  // Tests the case where a resource returned by the management server is NACKed
   289  // by the xDS client, which then returns an update containing an error to the
   290  // resolver. Verifies that the update is propagated to the ClientConn by the
   291  // resolver. It also tests the cases where the resolver gets a good update
   292  // subsequently, and another error after the good update. The test also verifies
   293  // that these are propagated to the ClientConn.
   294  func (s) TestResolverBadServiceUpdate(t *testing.T) {
   295  	// Spin up an xDS management server for the test.
   296  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   297  	defer cancel()
   298  	nodeID := uuid.New().String()
   299  	mgmtServer, _, _ := setupManagementServerForTest(ctx, t, nodeID)
   300  
   301  	// Configure a listener resource that is expected to be NACKed because it
   302  	// does not contain the `RouteSpecifier` field in the HTTPConnectionManager.
   303  	hcm := testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{
   304  		HttpFilters: []*v3httppb.HttpFilter{e2e.HTTPFilter("router", &v3routerpb.Router{})},
   305  	})
   306  	lis := &v3listenerpb.Listener{
   307  		Name:        defaultTestServiceName,
   308  		ApiListener: &v3listenerpb.ApiListener{ApiListener: hcm},
   309  		FilterChains: []*v3listenerpb.FilterChain{{
   310  			Name: "filter-chain-name",
   311  			Filters: []*v3listenerpb.Filter{{
   312  				Name:       wellknown.HTTPConnectionManager,
   313  				ConfigType: &v3listenerpb.Filter_TypedConfig{TypedConfig: hcm},
   314  			}},
   315  		}},
   316  	}
   317  	configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, []*v3listenerpb.Listener{lis}, nil)
   318  
   319  	// Build the resolver and expect an error update from it.
   320  	stateCh, errCh, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)})
   321  	wantErr := "no RouteSpecifier"
   322  	verifyErrorFromResolver(ctx, t, errCh, wantErr)
   323  
   324  	// Configure good listener and route configuration resources on the
   325  	// management server.
   326  	listeners := []*v3listenerpb.Listener{e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)}
   327  	routes := []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName)}
   328  	configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes)
   329  
   330  	// Expect a good update from the resolver.
   331  	verifyUpdateFromResolver(ctx, t, stateCh, wantDefaultServiceConfig)
   332  
   333  	// Configure another bad resource on the management server and expect an
   334  	// error update from the resolver.
   335  	configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, []*v3listenerpb.Listener{lis}, nil)
   336  	verifyErrorFromResolver(ctx, t, errCh, wantErr)
   337  }
   338  
   339  // TestResolverGoodServiceUpdate tests the case where the resource returned by
   340  // the management server is ACKed by the xDS client, which then returns a good
   341  // service update to the resolver. The test verifies that the service config
   342  // returned by the resolver matches expectations, and that the config selector
   343  // returned by the resolver picks clusters based on the route configuration
   344  // received from the management server.
   345  func (s) TestResolverGoodServiceUpdate(t *testing.T) {
   346  	for _, tt := range []struct {
   347  		name              string
   348  		routeConfig       *v3routepb.RouteConfiguration
   349  		wantServiceConfig string
   350  		wantClusters      map[string]bool
   351  	}{
   352  		{
   353  			name: "single cluster",
   354  			routeConfig: e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{
   355  				RouteConfigName:      defaultTestRouteConfigName,
   356  				ListenerName:         defaultTestServiceName,
   357  				ClusterSpecifierType: e2e.RouteConfigClusterSpecifierTypeCluster,
   358  				ClusterName:          defaultTestClusterName,
   359  			}),
   360  			wantServiceConfig: wantDefaultServiceConfig,
   361  			wantClusters:      map[string]bool{fmt.Sprintf("cluster:%s", defaultTestClusterName): true},
   362  		},
   363  		{
   364  			name: "two clusters",
   365  			routeConfig: e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{
   366  				RouteConfigName:      defaultTestRouteConfigName,
   367  				ListenerName:         defaultTestServiceName,
   368  				ClusterSpecifierType: e2e.RouteConfigClusterSpecifierTypeWeightedCluster,
   369  				WeightedClusters:     map[string]int{"cluster_1": 75, "cluster_2": 25},
   370  			}),
   371  			// This update contains the cluster from the previous update as well
   372  			// as this update, as the previous config selector still references
   373  			// the old cluster when the new one is pushed.
   374  			wantServiceConfig: `{
   375    "loadBalancingConfig": [{
   376      "xds_cluster_manager_experimental": {
   377        "children": {
   378          "cluster:cluster_1": {
   379            "childPolicy": [{
   380  			"cds_experimental": {
   381  			  "cluster": "cluster_1"
   382  			}
   383  		  }]
   384          },
   385          "cluster:cluster_2": {
   386            "childPolicy": [{
   387  			"cds_experimental": {
   388  			  "cluster": "cluster_2"
   389  			}
   390  		  }]
   391          }
   392        }
   393      }
   394    }]}`,
   395  			wantClusters: map[string]bool{"cluster:cluster_1": true, "cluster:cluster_2": true},
   396  		},
   397  	} {
   398  		t.Run(tt.name, func(t *testing.T) {
   399  			// Spin up an xDS management server for the test.
   400  			ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   401  			defer cancel()
   402  			nodeID := uuid.New().String()
   403  			mgmtServer, _, _ := setupManagementServerForTest(ctx, t, nodeID)
   404  
   405  			// Configure the management server with a good listener resource and a
   406  			// route configuration resource, as specified by the test case.
   407  			listeners := []*v3listenerpb.Listener{e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)}
   408  			routes := []*v3routepb.RouteConfiguration{tt.routeConfig}
   409  			configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes)
   410  
   411  			stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)})
   412  
   413  			// Read the update pushed by the resolver to the ClientConn.
   414  			cs := verifyUpdateFromResolver(ctx, t, stateCh, tt.wantServiceConfig)
   415  
   416  			pickedClusters := make(map[string]bool)
   417  			// Odds of picking 75% cluster 100 times in a row: 1 in 3E-13.  And
   418  			// with the random number generator stubbed out, we can rely on this
   419  			// to be 100% reproducible.
   420  			for i := 0; i < 100; i++ {
   421  				res, err := cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"})
   422  				if err != nil {
   423  					t.Fatalf("cs.SelectConfig(): %v", err)
   424  				}
   425  				cluster := clustermanager.GetPickedClusterForTesting(res.Context)
   426  				pickedClusters[cluster] = true
   427  				res.OnCommitted()
   428  			}
   429  			if !cmp.Equal(pickedClusters, tt.wantClusters) {
   430  				t.Errorf("Picked clusters: %v; want: %v", pickedClusters, tt.wantClusters)
   431  			}
   432  		})
   433  	}
   434  }
   435  
   436  // Tests a case where a resolver receives a RouteConfig update with a HashPolicy
   437  // specifying to generate a hash. The configSelector generated should
   438  // successfully generate a Hash.
   439  func (s) TestResolverRequestHash(t *testing.T) {
   440  	// Spin up an xDS management server for the test.
   441  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   442  	defer cancel()
   443  	nodeID := uuid.New().String()
   444  	mgmtServer, _, _ := setupManagementServerForTest(ctx, t, nodeID)
   445  
   446  	// Configure the management server with a good listener resource and a
   447  	// route configuration resource that specifies a hash policy.
   448  	listeners := []*v3listenerpb.Listener{e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)}
   449  	routes := []*v3routepb.RouteConfiguration{{
   450  		Name: defaultTestRouteConfigName,
   451  		VirtualHosts: []*v3routepb.VirtualHost{{
   452  			Domains: []string{defaultTestServiceName},
   453  			Routes: []*v3routepb.Route{{
   454  				Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}},
   455  				Action: &v3routepb.Route_Route{Route: &v3routepb.RouteAction{
   456  					ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{WeightedClusters: &v3routepb.WeightedCluster{
   457  						Clusters: []*v3routepb.WeightedCluster_ClusterWeight{
   458  							{
   459  								Name:   defaultTestClusterName,
   460  								Weight: &wrapperspb.UInt32Value{Value: 100},
   461  							},
   462  						},
   463  					}},
   464  					HashPolicy: []*v3routepb.RouteAction_HashPolicy{{
   465  						PolicySpecifier: &v3routepb.RouteAction_HashPolicy_Header_{
   466  							Header: &v3routepb.RouteAction_HashPolicy_Header{
   467  								HeaderName: ":path",
   468  							},
   469  						},
   470  						Terminal: true,
   471  					}},
   472  				}},
   473  			}},
   474  		}},
   475  	}}
   476  	configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes)
   477  
   478  	// Build the resolver and read the config selector out of it.
   479  	stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)})
   480  	cs := verifyUpdateFromResolver(ctx, t, stateCh, "")
   481  
   482  	// Selecting a config when there was a hash policy specified in the route
   483  	// that will be selected should put a request hash in the config's context.
   484  	res, err := cs.SelectConfig(iresolver.RPCInfo{
   485  		Context: metadata.NewOutgoingContext(ctx, metadata.Pairs(":path", "/products")),
   486  		Method:  "/service/method",
   487  	})
   488  	if err != nil {
   489  		t.Fatalf("cs.SelectConfig(): %v", err)
   490  	}
   491  	gotHash := ringhash.GetRequestHashForTesting(res.Context)
   492  	wantHash := xxhash.Sum64String("/products")
   493  	if gotHash != wantHash {
   494  		t.Fatalf("Got request hash: %v, want: %v", gotHash, wantHash)
   495  	}
   496  }
   497  
   498  // Tests the case where resources are removed from the management server,
   499  // causing it to send an empty update to the xDS client, which returns a
   500  // resource-not-found error to the xDS resolver. The test verifies that an
   501  // ongoing RPC is handled to completion when this happens.
   502  func (s) TestResolverRemovedWithRPCs(t *testing.T) {
   503  	// Spin up an xDS management server for the test.
   504  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   505  	defer cancel()
   506  	nodeID := uuid.New().String()
   507  	mgmtServer, _, _ := setupManagementServerForTest(ctx, t, nodeID)
   508  
   509  	// Configure resources on the management server.
   510  	listeners := []*v3listenerpb.Listener{e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)}
   511  	routes := []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName)}
   512  	configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes)
   513  
   514  	stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)})
   515  
   516  	// Read the update pushed by the resolver to the ClientConn.
   517  	cs := verifyUpdateFromResolver(ctx, t, stateCh, wantDefaultServiceConfig)
   518  
   519  	res, err := cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"})
   520  	if err != nil {
   521  		t.Fatalf("cs.SelectConfig(): %v", err)
   522  	}
   523  
   524  	// Delete the resources on the management server. This should result in a
   525  	// resource-not-found error from the xDS client.
   526  	if err := mgmtServer.Update(ctx, e2e.UpdateOptions{NodeID: nodeID}); err != nil {
   527  		t.Fatal(err)
   528  	}
   529  
   530  	// The RPC started earlier is still in progress. So, the xDS resolver will
   531  	// not produce an empty service config at this point. Instead it will retain
   532  	// the cluster to which the RPC is ongoing in the service config, but will
   533  	// return an erroring config selector which will fail new RPCs.
   534  	cs = verifyUpdateFromResolver(ctx, t, stateCh, wantDefaultServiceConfig)
   535  	_, err = cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"})
   536  	if err == nil || status.Code(err) != codes.Unavailable {
   537  		t.Fatalf("cs.SelectConfig() returned: %v, want: %v", err, codes.Unavailable)
   538  	}
   539  
   540  	// "Finish the RPC"; this could cause a panic if the resolver doesn't
   541  	// handle it correctly.
   542  	res.OnCommitted()
   543  
   544  	// Now that the RPC is committed, the xDS resolver is expected to send an
   545  	// update with an empty service config.
   546  	var state resolver.State
   547  	select {
   548  	case <-ctx.Done():
   549  		t.Fatalf("Timeout waiting for an update from the resolver: %v", ctx.Err())
   550  	case state = <-stateCh:
   551  		if err := state.ServiceConfig.Err; err != nil {
   552  			t.Fatalf("Received error in service config: %v", state.ServiceConfig.Err)
   553  		}
   554  		wantSCParsed := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)("{}")
   555  		if !internal.EqualServiceConfigForTesting(state.ServiceConfig.Config, wantSCParsed.Config) {
   556  			t.Fatalf("Got service config:\n%s \nWant service config:\n%s", cmp.Diff(nil, state.ServiceConfig.Config), cmp.Diff(nil, wantSCParsed.Config))
   557  		}
   558  	}
   559  
   560  	// Re-add the listener and expect everything to work again.
   561  	configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes)
   562  	// Read the update pushed by the resolver to the ClientConn.
   563  	cs = verifyUpdateFromResolver(ctx, t, stateCh, wantDefaultServiceConfig)
   564  
   565  	res, err = cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"})
   566  	if err != nil {
   567  		t.Fatalf("cs.SelectConfig(): %v", err)
   568  	}
   569  	res.OnCommitted()
   570  }
   571  
   572  // Tests the case where resources returned by the management server are removed.
   573  // The test verifies that the resolver pushes the expected config selector and
   574  // service config in this case.
   575  func (s) TestResolverRemovedResource(t *testing.T) {
   576  	// Spin up an xDS management server for the test.
   577  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   578  	defer cancel()
   579  	nodeID := uuid.New().String()
   580  	mgmtServer, _, _ := setupManagementServerForTest(ctx, t, nodeID)
   581  
   582  	// Configure resources on the management server.
   583  	listeners := []*v3listenerpb.Listener{e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)}
   584  	routes := []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName)}
   585  	configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes)
   586  
   587  	stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)})
   588  
   589  	// Read the update pushed by the resolver to the ClientConn.
   590  	cs := verifyUpdateFromResolver(ctx, t, stateCh, wantDefaultServiceConfig)
   591  
   592  	// "Make an RPC" by invoking the config selector.
   593  	res, err := cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"})
   594  	if err != nil {
   595  		t.Fatalf("cs.SelectConfig(): %v", err)
   596  	}
   597  
   598  	// "Finish the RPC"; this could cause a panic if the resolver doesn't
   599  	// handle it correctly.
   600  	res.OnCommitted()
   601  
   602  	// Delete the resources on the management server, resulting in a
   603  	// resource-not-found error from the xDS client.
   604  	if err := mgmtServer.Update(ctx, e2e.UpdateOptions{NodeID: nodeID}); err != nil {
   605  		t.Fatal(err)
   606  	}
   607  
   608  	// The channel should receive the existing service config with the original
   609  	// cluster but with an erroring config selector.
   610  	cs = verifyUpdateFromResolver(ctx, t, stateCh, wantDefaultServiceConfig)
   611  
   612  	// "Make another RPC" by invoking the config selector.
   613  	res, err = cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"})
   614  	if err == nil || status.Code(err) != codes.Unavailable {
   615  		t.Fatalf("cs.SelectConfig() got %v, %v, expected UNAVAILABLE error", res, err)
   616  	}
   617  
   618  	// In the meantime, an empty ServiceConfig update should have been sent.
   619  	var state resolver.State
   620  	select {
   621  	case <-ctx.Done():
   622  		t.Fatalf("Timeout waiting for an update from the resolver: %v", ctx.Err())
   623  	case state = <-stateCh:
   624  		if err := state.ServiceConfig.Err; err != nil {
   625  			t.Fatalf("Received error in service config: %v", state.ServiceConfig.Err)
   626  		}
   627  		wantSCParsed := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)("{}")
   628  		if !internal.EqualServiceConfigForTesting(state.ServiceConfig.Config, wantSCParsed.Config) {
   629  			t.Fatalf("Got service config:\n%s \nWant service config:\n%s", cmp.Diff(nil, state.ServiceConfig.Config), cmp.Diff(nil, wantSCParsed.Config))
   630  		}
   631  	}
   632  }
   633  
   634  // Tests the case where the resolver receives max stream duration as part of the
   635  // listener and route configuration resources.  The test verifies that the RPC
   636  // timeout returned by the config selector matches expectations. A non-nil max
   637  // stream duration (this includes an explicit zero value) in a matching route
   638  // overrides the value specified in the listener resource.
   639  func (s) TestResolverMaxStreamDuration(t *testing.T) {
   640  	// Spin up an xDS management server for the test.
   641  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   642  	defer cancel()
   643  	nodeID := uuid.New().String()
   644  	mgmtServer, _, _ := setupManagementServerForTest(ctx, t, nodeID)
   645  
   646  	stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)})
   647  
   648  	// Configure the management server with a listener resource that specifies a
   649  	// max stream duration as part of its HTTP connection manager. Also
   650  	// configure a route configuration resource, which has multiple routes with
   651  	// different values of max stream duration.
   652  	hcm := testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{
   653  		RouteSpecifier: &v3httppb.HttpConnectionManager_Rds{Rds: &v3httppb.Rds{
   654  			ConfigSource: &v3corepb.ConfigSource{
   655  				ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{Ads: &v3corepb.AggregatedConfigSource{}},
   656  			},
   657  			RouteConfigName: defaultTestRouteConfigName,
   658  		}},
   659  		HttpFilters: []*v3httppb.HttpFilter{e2e.RouterHTTPFilter},
   660  		CommonHttpProtocolOptions: &v3corepb.HttpProtocolOptions{
   661  			MaxStreamDuration: durationpb.New(1 * time.Second),
   662  		},
   663  	})
   664  	listeners := []*v3listenerpb.Listener{{
   665  		Name:        defaultTestServiceName,
   666  		ApiListener: &v3listenerpb.ApiListener{ApiListener: hcm},
   667  		FilterChains: []*v3listenerpb.FilterChain{{
   668  			Name: "filter-chain-name",
   669  			Filters: []*v3listenerpb.Filter{{
   670  				Name:       wellknown.HTTPConnectionManager,
   671  				ConfigType: &v3listenerpb.Filter_TypedConfig{TypedConfig: hcm},
   672  			}},
   673  		}},
   674  	}}
   675  	routes := []*v3routepb.RouteConfiguration{{
   676  		Name: defaultTestRouteConfigName,
   677  		VirtualHosts: []*v3routepb.VirtualHost{{
   678  			Domains: []string{defaultTestServiceName},
   679  			Routes: []*v3routepb.Route{
   680  				{
   681  					Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/foo"}},
   682  					Action: &v3routepb.Route_Route{Route: &v3routepb.RouteAction{
   683  						ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{WeightedClusters: &v3routepb.WeightedCluster{
   684  							Clusters: []*v3routepb.WeightedCluster_ClusterWeight{
   685  								{
   686  									Name:   "A",
   687  									Weight: &wrapperspb.UInt32Value{Value: 100},
   688  								},
   689  							}},
   690  						},
   691  						MaxStreamDuration: &v3routepb.RouteAction_MaxStreamDuration{
   692  							MaxStreamDuration: durationpb.New(5 * time.Second),
   693  						},
   694  					}},
   695  				},
   696  				{
   697  					Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/bar"}},
   698  					Action: &v3routepb.Route_Route{Route: &v3routepb.RouteAction{
   699  						ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{WeightedClusters: &v3routepb.WeightedCluster{
   700  							Clusters: []*v3routepb.WeightedCluster_ClusterWeight{
   701  								{
   702  									Name:   "B",
   703  									Weight: &wrapperspb.UInt32Value{Value: 100},
   704  								},
   705  							}},
   706  						},
   707  						MaxStreamDuration: &v3routepb.RouteAction_MaxStreamDuration{
   708  							MaxStreamDuration: durationpb.New(0 * time.Second),
   709  						},
   710  					}},
   711  				},
   712  				{
   713  					Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}},
   714  					Action: &v3routepb.Route_Route{Route: &v3routepb.RouteAction{
   715  						ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{WeightedClusters: &v3routepb.WeightedCluster{
   716  							Clusters: []*v3routepb.WeightedCluster_ClusterWeight{
   717  								{
   718  									Name:   "C",
   719  									Weight: &wrapperspb.UInt32Value{Value: 100},
   720  								},
   721  							}},
   722  						},
   723  					}},
   724  				},
   725  			},
   726  		}},
   727  	}}
   728  	configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes)
   729  
   730  	// Read the update pushed by the resolver to the ClientConn.
   731  	cs := verifyUpdateFromResolver(ctx, t, stateCh, "")
   732  
   733  	testCases := []struct {
   734  		name   string
   735  		method string
   736  		want   *time.Duration
   737  	}{{
   738  		name:   "RDS setting",
   739  		method: "/foo/method",
   740  		want:   newDurationP(5 * time.Second),
   741  	}, {
   742  		name:   "explicit zero in RDS; ignore LDS",
   743  		method: "/bar/method",
   744  		want:   nil,
   745  	}, {
   746  		name:   "no config in RDS; fallback to LDS",
   747  		method: "/baz/method",
   748  		want:   newDurationP(time.Second),
   749  	}}
   750  
   751  	for _, tc := range testCases {
   752  		t.Run(tc.name, func(t *testing.T) {
   753  			req := iresolver.RPCInfo{
   754  				Method:  tc.method,
   755  				Context: ctx,
   756  			}
   757  			res, err := cs.SelectConfig(req)
   758  			if err != nil {
   759  				t.Errorf("cs.SelectConfig(%v): %v", req, err)
   760  				return
   761  			}
   762  			res.OnCommitted()
   763  			got := res.MethodConfig.Timeout
   764  			if !cmp.Equal(got, tc.want) {
   765  				t.Errorf("For method %q: res.MethodConfig.Timeout = %v; want %v", tc.method, got, tc.want)
   766  			}
   767  		})
   768  	}
   769  }
   770  
   771  // Tests that clusters remain in service config if RPCs are in flight.
   772  func (s) TestResolverDelayedOnCommitted(t *testing.T) {
   773  	// Spin up an xDS management server for the test.
   774  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   775  	defer cancel()
   776  	nodeID := uuid.New().String()
   777  	mgmtServer, _, _ := setupManagementServerForTest(ctx, t, nodeID)
   778  
   779  	// Configure resources on the management server.
   780  	listeners := []*v3listenerpb.Listener{e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)}
   781  	routes := []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName)}
   782  	configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes)
   783  
   784  	stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)})
   785  
   786  	// Read the update pushed by the resolver to the ClientConn.
   787  	cs := verifyUpdateFromResolver(ctx, t, stateCh, wantDefaultServiceConfig)
   788  
   789  	// Make an RPC, but do not commit it yet.
   790  	resOld, err := cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"})
   791  	if err != nil {
   792  		t.Fatalf("cs.SelectConfig(): %v", err)
   793  	}
   794  	wantClusterName := fmt.Sprintf("cluster:%s", defaultTestClusterName)
   795  	if cluster := clustermanager.GetPickedClusterForTesting(resOld.Context); cluster != wantClusterName {
   796  		t.Fatalf("Picked cluster is %q, want %q", cluster, wantClusterName)
   797  	}
   798  
   799  	// Delay resOld.OnCommitted(). As long as there are pending RPCs to removed
   800  	// clusters, they still appear in the service config.
   801  
   802  	// Update the route configuration resource on the management server to
   803  	// return a new cluster.
   804  	newClusterName := "new-" + defaultTestClusterName
   805  	routes = []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, newClusterName)}
   806  	configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes)
   807  
   808  	// Read the update pushed by the resolver to the ClientConn and ensure the
   809  	// old cluster is present in the service config. Also ensure that the newly
   810  	// returned config selector does not hold a reference to the old cluster.
   811  	wantSC := fmt.Sprintf(`
   812  {
   813  	"loadBalancingConfig": [
   814  		{
   815  		  "xds_cluster_manager_experimental": {
   816  			"children": {
   817  			  "cluster:%s": {
   818  				"childPolicy": [
   819  				  {
   820  					"cds_experimental": {
   821  					  "cluster": "%s"
   822  					}
   823  				  }
   824  				]
   825  			  },
   826  			  "cluster:%s": {
   827  				"childPolicy": [
   828  				  {
   829  					"cds_experimental": {
   830  					  "cluster": "%s"
   831  					}
   832  				  }
   833  				]
   834  			  }
   835  			}
   836  		  }
   837  		}
   838  	  ]
   839  }`, defaultTestClusterName, defaultTestClusterName, newClusterName, newClusterName)
   840  	cs = verifyUpdateFromResolver(ctx, t, stateCh, wantSC)
   841  
   842  	resNew, err := cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"})
   843  	if err != nil {
   844  		t.Fatalf("cs.SelectConfig(): %v", err)
   845  	}
   846  	wantClusterName = fmt.Sprintf("cluster:%s", newClusterName)
   847  	if cluster := clustermanager.GetPickedClusterForTesting(resNew.Context); cluster != wantClusterName {
   848  		t.Fatalf("Picked cluster is %q, want %q", cluster, wantClusterName)
   849  	}
   850  
   851  	// Invoke OnCommitted on the old RPC; should lead to a service config update
   852  	// that deletes the old cluster, as the old cluster no longer has any
   853  	// pending RPCs.
   854  	resOld.OnCommitted()
   855  
   856  	wantSC = fmt.Sprintf(`
   857  {
   858  	"loadBalancingConfig": [
   859  		{
   860  		  "xds_cluster_manager_experimental": {
   861  			"children": {
   862  			  "cluster:%s": {
   863  				"childPolicy": [
   864  				  {
   865  					"cds_experimental": {
   866  					  "cluster": "%s"
   867  					}
   868  				  }
   869  				]
   870  			  }
   871  			}
   872  		  }
   873  		}
   874  	  ]
   875  }`, newClusterName, newClusterName)
   876  	verifyUpdateFromResolver(ctx, t, stateCh, wantSC)
   877  }
   878  
   879  // Tests the case where two LDS updates with the same RDS name to watch are
   880  // received without an RDS in between. Those LDS updates shouldn't trigger a
   881  // service config update.
   882  func (s) TestResolverMultipleLDSUpdates(t *testing.T) {
   883  	// Spin up an xDS management server for the test.
   884  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   885  	defer cancel()
   886  	nodeID := uuid.New().String()
   887  	mgmtServer, _, _ := setupManagementServerForTest(ctx, t, nodeID)
   888  
   889  	// Configure the management server with a listener resource, but no route
   890  	// configuration resource.
   891  	listeners := []*v3listenerpb.Listener{e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)}
   892  	configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, nil)
   893  
   894  	stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)})
   895  
   896  	// Ensure there is no update from the resolver.
   897  	verifyNoUpdateFromResolver(ctx, t, stateCh)
   898  
   899  	// Configure the management server with a listener resource that points to
   900  	// the same route configuration resource but has different values for max
   901  	// stream duration field. There is still no route configuration resource on
   902  	// the management server.
   903  	hcm := testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{
   904  		RouteSpecifier: &v3httppb.HttpConnectionManager_Rds{Rds: &v3httppb.Rds{
   905  			ConfigSource: &v3corepb.ConfigSource{
   906  				ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{Ads: &v3corepb.AggregatedConfigSource{}},
   907  			},
   908  			RouteConfigName: defaultTestRouteConfigName,
   909  		}},
   910  		HttpFilters: []*v3httppb.HttpFilter{e2e.RouterHTTPFilter},
   911  		CommonHttpProtocolOptions: &v3corepb.HttpProtocolOptions{
   912  			MaxStreamDuration: durationpb.New(1 * time.Second),
   913  		},
   914  	})
   915  	listeners = []*v3listenerpb.Listener{{
   916  		Name:        defaultTestServiceName,
   917  		ApiListener: &v3listenerpb.ApiListener{ApiListener: hcm},
   918  		FilterChains: []*v3listenerpb.FilterChain{{
   919  			Name: "filter-chain-name",
   920  			Filters: []*v3listenerpb.Filter{{
   921  				Name:       wellknown.HTTPConnectionManager,
   922  				ConfigType: &v3listenerpb.Filter_TypedConfig{TypedConfig: hcm},
   923  			}},
   924  		}},
   925  	}}
   926  	configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, nil)
   927  
   928  	// Ensure that there is no update from the resolver.
   929  	verifyNoUpdateFromResolver(ctx, t, stateCh)
   930  }
   931  
   932  // TestResolverWRR tests the case where the route configuration returned by the
   933  // management server contains a set of weighted clusters. The test performs a
   934  // bunch of RPCs using the cluster specifier returned by the resolver, and
   935  // verifies the cluster distribution.
   936  func (s) TestResolverWRR(t *testing.T) {
   937  	origNewWRR := rinternal.NewWRR
   938  	rinternal.NewWRR = testutils.NewTestWRR
   939  	defer func() { rinternal.NewWRR = origNewWRR }()
   940  
   941  	// Spin up an xDS management server for the test.
   942  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   943  	defer cancel()
   944  	nodeID := uuid.New().String()
   945  	mgmtServer, _, _ := setupManagementServerForTest(ctx, t, nodeID)
   946  
   947  	stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)})
   948  
   949  	// Configure resources on the management server.
   950  	listeners := []*v3listenerpb.Listener{e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)}
   951  	routes := []*v3routepb.RouteConfiguration{e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{
   952  		RouteConfigName:      defaultTestRouteConfigName,
   953  		ListenerName:         defaultTestServiceName,
   954  		ClusterSpecifierType: e2e.RouteConfigClusterSpecifierTypeWeightedCluster,
   955  		WeightedClusters:     map[string]int{"A": 75, "B": 25},
   956  	})}
   957  	configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes)
   958  
   959  	// Read the update pushed by the resolver to the ClientConn.
   960  	cs := verifyUpdateFromResolver(ctx, t, stateCh, "")
   961  
   962  	// Make RPCs to verify WRR behavior in the cluster specifier.
   963  	picks := map[string]int{}
   964  	for i := 0; i < 100; i++ {
   965  		res, err := cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"})
   966  		if err != nil {
   967  			t.Fatalf("cs.SelectConfig(): %v", err)
   968  		}
   969  		picks[clustermanager.GetPickedClusterForTesting(res.Context)]++
   970  		res.OnCommitted()
   971  	}
   972  	want := map[string]int{"cluster:A": 75, "cluster:B": 25}
   973  	if !cmp.Equal(picks, want) {
   974  		t.Errorf("Picked clusters: %v; want: %v", picks, want)
   975  	}
   976  }
   977  
   978  const filterCfgPathFieldName = "path"
   979  const filterCfgErrorFieldName = "new_stream_error"
   980  
   981  type filterCfg struct {
   982  	httpfilter.FilterConfig
   983  	path         string
   984  	newStreamErr error
   985  }
   986  
   987  type filterBuilder struct {
   988  	paths   []string
   989  	typeURL string
   990  }
   991  
   992  func (fb *filterBuilder) TypeURLs() []string { return []string{fb.typeURL} }
   993  
   994  func filterConfigFromProto(cfg proto.Message) (httpfilter.FilterConfig, error) {
   995  	ts, ok := cfg.(*v3xdsxdstypepb.TypedStruct)
   996  	if !ok {
   997  		return nil, fmt.Errorf("unsupported filter config type: %T, want %T", cfg, &v3xdsxdstypepb.TypedStruct{})
   998  	}
   999  
  1000  	if ts.GetValue() == nil {
  1001  		return filterCfg{}, nil
  1002  	}
  1003  	ret := filterCfg{}
  1004  	if v := ts.GetValue().GetFields()[filterCfgPathFieldName]; v != nil {
  1005  		ret.path = v.GetStringValue()
  1006  	}
  1007  	if v := ts.GetValue().GetFields()[filterCfgErrorFieldName]; v != nil {
  1008  		if v.GetStringValue() == "" {
  1009  			ret.newStreamErr = nil
  1010  		} else {
  1011  			ret.newStreamErr = fmt.Errorf("%s", v.GetStringValue())
  1012  		}
  1013  	}
  1014  	return ret, nil
  1015  }
  1016  
  1017  func (*filterBuilder) ParseFilterConfig(cfg proto.Message) (httpfilter.FilterConfig, error) {
  1018  	return filterConfigFromProto(cfg)
  1019  }
  1020  
  1021  func (*filterBuilder) ParseFilterConfigOverride(override proto.Message) (httpfilter.FilterConfig, error) {
  1022  	return filterConfigFromProto(override)
  1023  }
  1024  
  1025  func (*filterBuilder) IsTerminal() bool { return false }
  1026  
  1027  var _ httpfilter.ClientInterceptorBuilder = &filterBuilder{}
  1028  
  1029  func (fb *filterBuilder) BuildClientInterceptor(config, override httpfilter.FilterConfig) (iresolver.ClientInterceptor, error) {
  1030  	if config == nil {
  1031  		panic("unexpected missing config")
  1032  	}
  1033  
  1034  	fi := &filterInterceptor{
  1035  		parent: fb,
  1036  		pathCh: make(chan string, 10),
  1037  	}
  1038  
  1039  	fb.paths = append(fb.paths, "build:"+config.(filterCfg).path)
  1040  	err := config.(filterCfg).newStreamErr
  1041  	if override != nil {
  1042  		fb.paths = append(fb.paths, "override:"+override.(filterCfg).path)
  1043  		err = override.(filterCfg).newStreamErr
  1044  	}
  1045  
  1046  	fi.cfgPath = config.(filterCfg).path
  1047  	fi.err = err
  1048  	return fi, nil
  1049  }
  1050  
  1051  type filterInterceptor struct {
  1052  	parent  *filterBuilder
  1053  	pathCh  chan string
  1054  	cfgPath string
  1055  	err     error
  1056  }
  1057  
  1058  func (fi *filterInterceptor) NewStream(ctx context.Context, ri iresolver.RPCInfo, done func(), newStream func(ctx context.Context, done func()) (iresolver.ClientStream, error)) (iresolver.ClientStream, error) {
  1059  	fi.parent.paths = append(fi.parent.paths, "newstream:"+fi.cfgPath)
  1060  	if fi.err != nil {
  1061  		return nil, fi.err
  1062  	}
  1063  	d := func() {
  1064  		fi.parent.paths = append(fi.parent.paths, "done:"+fi.cfgPath)
  1065  		done()
  1066  	}
  1067  	cs, err := newStream(ctx, d)
  1068  	if err != nil {
  1069  		return nil, err
  1070  	}
  1071  	return &clientStream{ClientStream: cs}, nil
  1072  }
  1073  
  1074  type clientStream struct {
  1075  	iresolver.ClientStream
  1076  }
  1077  
  1078  func (s) TestConfigSelector_FailureCases(t *testing.T) {
  1079  	const methodName = "1"
  1080  
  1081  	tests := []struct {
  1082  		name     string
  1083  		listener *v3listenerpb.Listener
  1084  		wantErr  string
  1085  	}{
  1086  		{
  1087  			name: "route type RouteActionUnsupported invalid for client",
  1088  			listener: &v3listenerpb.Listener{
  1089  				Name: defaultTestServiceName,
  1090  				ApiListener: &v3listenerpb.ApiListener{
  1091  					ApiListener: testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{
  1092  						RouteSpecifier: &v3httppb.HttpConnectionManager_RouteConfig{
  1093  							RouteConfig: &v3routepb.RouteConfiguration{
  1094  								Name: defaultTestRouteConfigName,
  1095  								VirtualHosts: []*v3routepb.VirtualHost{{
  1096  									Domains: []string{defaultTestServiceName},
  1097  									Routes: []*v3routepb.Route{{
  1098  										Match: &v3routepb.RouteMatch{
  1099  											PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: methodName},
  1100  										},
  1101  										Action: &v3routepb.Route_FilterAction{},
  1102  									}},
  1103  								}},
  1104  							}},
  1105  						HttpFilters: []*v3httppb.HttpFilter{e2e.RouterHTTPFilter},
  1106  					}),
  1107  				},
  1108  			},
  1109  			wantErr: "matched route does not have a supported route action type",
  1110  		},
  1111  		{
  1112  			name: "route type RouteActionNonForwardingAction invalid for client",
  1113  			listener: &v3listenerpb.Listener{
  1114  				Name: defaultTestServiceName,
  1115  				ApiListener: &v3listenerpb.ApiListener{
  1116  					ApiListener: testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{
  1117  						RouteSpecifier: &v3httppb.HttpConnectionManager_RouteConfig{
  1118  							RouteConfig: &v3routepb.RouteConfiguration{
  1119  								Name: defaultTestRouteConfigName,
  1120  								VirtualHosts: []*v3routepb.VirtualHost{{
  1121  									Domains: []string{defaultTestServiceName},
  1122  									Routes: []*v3routepb.Route{{
  1123  										Match: &v3routepb.RouteMatch{
  1124  											PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: methodName},
  1125  										},
  1126  										Action: &v3routepb.Route_NonForwardingAction{},
  1127  									}},
  1128  								}},
  1129  							}},
  1130  						HttpFilters: []*v3httppb.HttpFilter{e2e.RouterHTTPFilter},
  1131  					}),
  1132  				},
  1133  			},
  1134  			wantErr: "matched route does not have a supported route action type",
  1135  		},
  1136  	}
  1137  
  1138  	for _, test := range tests {
  1139  		t.Run(test.name, func(t *testing.T) {
  1140  			// Spin up an xDS management server.
  1141  			ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  1142  			defer cancel()
  1143  			nodeID := uuid.New().String()
  1144  			mgmtServer, _, _ := setupManagementServerForTest(ctx, t, nodeID)
  1145  
  1146  			// Build an xDS resolver.
  1147  			stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)})
  1148  
  1149  			// Update the management server with a listener resource that
  1150  			// contains inline route configuration.
  1151  			configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, []*v3listenerpb.Listener{test.listener}, nil)
  1152  
  1153  			// Ensure that the resolver pushes a state update to the channel.
  1154  			cs := verifyUpdateFromResolver(ctx, t, stateCh, "")
  1155  
  1156  			// Ensure that it returns the expected error.
  1157  			_, err := cs.SelectConfig(iresolver.RPCInfo{Method: methodName, Context: ctx})
  1158  			if err == nil || !strings.Contains(err.Error(), test.wantErr) {
  1159  				t.Errorf("SelectConfig(_) = _, %v; want _, Contains(%v)", err, test.wantErr)
  1160  			}
  1161  		})
  1162  	}
  1163  }
  1164  
  1165  func newHTTPFilter(t *testing.T, name, typeURL, path, err string) *v3httppb.HttpFilter {
  1166  	return &v3httppb.HttpFilter{
  1167  		Name: name,
  1168  		ConfigType: &v3httppb.HttpFilter_TypedConfig{
  1169  			TypedConfig: testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{
  1170  				TypeUrl: typeURL,
  1171  				Value: &structpb.Struct{
  1172  					Fields: map[string]*structpb.Value{
  1173  						filterCfgPathFieldName:  {Kind: &structpb.Value_StringValue{StringValue: path}},
  1174  						filterCfgErrorFieldName: {Kind: &structpb.Value_StringValue{StringValue: err}},
  1175  					},
  1176  				},
  1177  			}),
  1178  		},
  1179  	}
  1180  }
  1181  
  1182  func (s) TestXDSResolverHTTPFilters(t *testing.T) {
  1183  	const methodName1 = "1"
  1184  	const methodName2 = "2"
  1185  	testFilterName := t.Name()
  1186  
  1187  	testCases := []struct {
  1188  		name          string
  1189  		listener      *v3listenerpb.Listener
  1190  		rpcRes        map[string][][]string
  1191  		wantStreamErr string
  1192  	}{
  1193  		{
  1194  			name: "NewStream error - ensure earlier interceptor Done is still called",
  1195  			listener: &v3listenerpb.Listener{
  1196  				Name: defaultTestServiceName,
  1197  				ApiListener: &v3listenerpb.ApiListener{
  1198  					ApiListener: testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{
  1199  						RouteSpecifier: &v3httppb.HttpConnectionManager_RouteConfig{
  1200  							RouteConfig: &v3routepb.RouteConfiguration{
  1201  								Name: defaultTestRouteConfigName,
  1202  								VirtualHosts: []*v3routepb.VirtualHost{{
  1203  									Domains: []string{defaultTestServiceName},
  1204  									Routes: []*v3routepb.Route{{
  1205  										Match: &v3routepb.RouteMatch{
  1206  											PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: methodName1},
  1207  										},
  1208  										Action: &v3routepb.Route_Route{
  1209  											Route: &v3routepb.RouteAction{
  1210  												ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{
  1211  													WeightedClusters: &v3routepb.WeightedCluster{
  1212  														Clusters: []*v3routepb.WeightedCluster_ClusterWeight{
  1213  															{Name: "A", Weight: wrapperspb.UInt32(1)},
  1214  															{Name: "B", Weight: wrapperspb.UInt32(1)},
  1215  														},
  1216  													},
  1217  												},
  1218  											},
  1219  										},
  1220  									}},
  1221  								}},
  1222  							}},
  1223  						HttpFilters: []*v3httppb.HttpFilter{
  1224  							newHTTPFilter(t, "foo", testFilterName, "foo1", ""),
  1225  							newHTTPFilter(t, "bar", testFilterName, "bar1", "bar newstream err"),
  1226  							e2e.RouterHTTPFilter,
  1227  						},
  1228  					}),
  1229  				},
  1230  			},
  1231  			rpcRes: map[string][][]string{
  1232  				methodName1: {
  1233  					{"build:foo1", "build:bar1", "newstream:foo1", "newstream:bar1", "done:foo1"}, // err in bar1 NewStream()
  1234  				},
  1235  			},
  1236  			wantStreamErr: "bar newstream err",
  1237  		},
  1238  		{
  1239  			name: "all overrides",
  1240  			listener: &v3listenerpb.Listener{
  1241  				Name: defaultTestServiceName,
  1242  				ApiListener: &v3listenerpb.ApiListener{
  1243  					ApiListener: testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{
  1244  						RouteSpecifier: &v3httppb.HttpConnectionManager_RouteConfig{
  1245  							RouteConfig: &v3routepb.RouteConfiguration{
  1246  								Name: defaultTestRouteConfigName,
  1247  								VirtualHosts: []*v3routepb.VirtualHost{{
  1248  									Domains: []string{defaultTestServiceName},
  1249  									Routes: []*v3routepb.Route{
  1250  										{
  1251  											Match: &v3routepb.RouteMatch{
  1252  												PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: methodName1},
  1253  											},
  1254  											Action: &v3routepb.Route_Route{
  1255  												Route: &v3routepb.RouteAction{
  1256  													ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{
  1257  														WeightedClusters: &v3routepb.WeightedCluster{
  1258  															Clusters: []*v3routepb.WeightedCluster_ClusterWeight{
  1259  																{Name: "A", Weight: wrapperspb.UInt32(1)},
  1260  																{Name: "B", Weight: wrapperspb.UInt32(1)},
  1261  															},
  1262  														},
  1263  													},
  1264  												},
  1265  											},
  1266  										},
  1267  										{
  1268  											Match: &v3routepb.RouteMatch{
  1269  												PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: methodName2},
  1270  											},
  1271  											Action: &v3routepb.Route_Route{
  1272  												Route: &v3routepb.RouteAction{
  1273  													ClusterSpecifier: &v3routepb.RouteAction_WeightedClusters{
  1274  														WeightedClusters: &v3routepb.WeightedCluster{
  1275  															Clusters: []*v3routepb.WeightedCluster_ClusterWeight{
  1276  																{Name: "A", Weight: wrapperspb.UInt32(1)},
  1277  																{
  1278  																	Name:   "B",
  1279  																	Weight: wrapperspb.UInt32(1),
  1280  																	TypedPerFilterConfig: map[string]*anypb.Any{
  1281  																		"foo": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{
  1282  																			TypeUrl: testFilterName,
  1283  																			Value: &structpb.Struct{
  1284  																				Fields: map[string]*structpb.Value{
  1285  																					filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: "foo4"}},
  1286  																				},
  1287  																			},
  1288  																		}),
  1289  																		"bar": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{
  1290  																			TypeUrl: testFilterName,
  1291  																			Value: &structpb.Struct{
  1292  																				Fields: map[string]*structpb.Value{
  1293  																					filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: "bar4"}},
  1294  																				},
  1295  																			},
  1296  																		}),
  1297  																	},
  1298  																},
  1299  															},
  1300  														},
  1301  													},
  1302  												},
  1303  											},
  1304  											TypedPerFilterConfig: map[string]*anypb.Any{
  1305  												"foo": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{
  1306  													TypeUrl: testFilterName,
  1307  													Value: &structpb.Struct{
  1308  														Fields: map[string]*structpb.Value{
  1309  															filterCfgPathFieldName:  {Kind: &structpb.Value_StringValue{StringValue: "foo3"}},
  1310  															filterCfgErrorFieldName: {Kind: &structpb.Value_StringValue{StringValue: ""}},
  1311  														},
  1312  													},
  1313  												}),
  1314  												"bar": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{
  1315  													TypeUrl: testFilterName,
  1316  													Value: &structpb.Struct{
  1317  														Fields: map[string]*structpb.Value{
  1318  															filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: "bar3"}},
  1319  														},
  1320  													},
  1321  												}),
  1322  											},
  1323  										},
  1324  									},
  1325  									TypedPerFilterConfig: map[string]*anypb.Any{
  1326  										"foo": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{
  1327  											TypeUrl: testFilterName,
  1328  											Value: &structpb.Struct{
  1329  												Fields: map[string]*structpb.Value{
  1330  													filterCfgPathFieldName:  {Kind: &structpb.Value_StringValue{StringValue: "foo2"}},
  1331  													filterCfgErrorFieldName: {Kind: &structpb.Value_StringValue{StringValue: ""}},
  1332  												},
  1333  											},
  1334  										}),
  1335  										"bar": testutils.MarshalAny(t, &v3xdsxdstypepb.TypedStruct{
  1336  											TypeUrl: testFilterName,
  1337  											Value: &structpb.Struct{
  1338  												Fields: map[string]*structpb.Value{
  1339  													filterCfgPathFieldName: {Kind: &structpb.Value_StringValue{StringValue: "bar2"}},
  1340  												},
  1341  											},
  1342  										}),
  1343  									},
  1344  								}},
  1345  							}},
  1346  						HttpFilters: []*v3httppb.HttpFilter{
  1347  							newHTTPFilter(t, "foo", testFilterName, "foo1", "this is overridden to nil"),
  1348  							newHTTPFilter(t, "bar", testFilterName, "bar1", ""),
  1349  							e2e.RouterHTTPFilter,
  1350  						},
  1351  					}),
  1352  				},
  1353  			},
  1354  			rpcRes: map[string][][]string{
  1355  				methodName1: {
  1356  					{"build:foo1", "override:foo2", "build:bar1", "override:bar2", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"},
  1357  					{"build:foo1", "override:foo2", "build:bar1", "override:bar2", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"},
  1358  				},
  1359  				methodName2: {
  1360  					{"build:foo1", "override:foo3", "build:bar1", "override:bar3", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"},
  1361  					{"build:foo1", "override:foo4", "build:bar1", "override:bar4", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"},
  1362  					{"build:foo1", "override:foo3", "build:bar1", "override:bar3", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"},
  1363  					{"build:foo1", "override:foo4", "build:bar1", "override:bar4", "newstream:foo1", "newstream:bar1", "done:bar1", "done:foo1"},
  1364  				},
  1365  			},
  1366  		},
  1367  	}
  1368  
  1369  	for _, tc := range testCases {
  1370  		t.Run(tc.name, func(t *testing.T) {
  1371  			origNewWRR := rinternal.NewWRR
  1372  			rinternal.NewWRR = testutils.NewTestWRR
  1373  			defer func() { rinternal.NewWRR = origNewWRR }()
  1374  
  1375  			// Register a custom httpFilter builder for the test.
  1376  			fb := &filterBuilder{typeURL: testFilterName}
  1377  			httpfilter.Register(fb)
  1378  
  1379  			// Spin up an xDS management server.
  1380  			ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
  1381  			defer cancel()
  1382  			nodeID := uuid.New().String()
  1383  			mgmtServer, _, _ := setupManagementServerForTest(ctx, t, nodeID)
  1384  
  1385  			// Build an xDS resolver.
  1386  			stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)})
  1387  
  1388  			// Update the management server with a listener resource that
  1389  			// contains an inline route configuration.
  1390  			configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, []*v3listenerpb.Listener{tc.listener}, nil)
  1391  
  1392  			// Ensure that the resolver pushes a state update to the channel.
  1393  			cs := verifyUpdateFromResolver(ctx, t, stateCh, "")
  1394  
  1395  			for method, wants := range tc.rpcRes {
  1396  				// Order of wants is non-deterministic.
  1397  				remainingWant := make([][]string, len(wants))
  1398  				copy(remainingWant, wants)
  1399  				for n := range wants {
  1400  					res, err := cs.SelectConfig(iresolver.RPCInfo{Method: method, Context: ctx})
  1401  					if err != nil {
  1402  						t.Fatalf("Unexpected error from cs.SelectConfig(_): %v", err)
  1403  					}
  1404  
  1405  					var doneFunc func()
  1406  					_, err = res.Interceptor.NewStream(context.Background(), iresolver.RPCInfo{}, func() {}, func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
  1407  						doneFunc = done
  1408  						return nil, nil
  1409  					})
  1410  					if tc.wantStreamErr != "" {
  1411  						if err == nil || !strings.Contains(err.Error(), tc.wantStreamErr) {
  1412  							t.Errorf("NewStream(...) = _, %v; want _, Contains(%v)", err, tc.wantStreamErr)
  1413  						}
  1414  						if err == nil {
  1415  							res.OnCommitted()
  1416  							doneFunc()
  1417  						}
  1418  						continue
  1419  					}
  1420  					if err != nil {
  1421  						t.Fatalf("unexpected error from Interceptor.NewStream: %v", err)
  1422  
  1423  					}
  1424  					res.OnCommitted()
  1425  					doneFunc()
  1426  
  1427  					gotPaths := fb.paths
  1428  					fb.paths = []string{}
  1429  
  1430  					// Confirm the desired path is found in remainingWant, and remove it.
  1431  					pass := false
  1432  					for i := range remainingWant {
  1433  						if cmp.Equal(gotPaths, remainingWant[i]) {
  1434  							remainingWant[i] = remainingWant[len(remainingWant)-1]
  1435  							remainingWant = remainingWant[:len(remainingWant)-1]
  1436  							pass = true
  1437  							break
  1438  						}
  1439  					}
  1440  					if !pass {
  1441  						t.Errorf("%q:%v - path:\n%v\nwant one of:\n%v", method, n, gotPaths, remainingWant)
  1442  					}
  1443  				}
  1444  			}
  1445  		})
  1446  	}
  1447  }
  1448  
  1449  func newDurationP(d time.Duration) *time.Duration {
  1450  	return &d
  1451  }
  1452  

View as plain text