...

Source file src/google.golang.org/grpc/xds/internal/resolver/cluster_specifier_plugin_test.go

Documentation: google.golang.org/grpc/xds/internal/resolver

     1  /*
     2   *
     3   * Copyright 2021 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  package resolver_test
    20  
    21  import (
    22  	"context"
    23  	"encoding/json"
    24  	"fmt"
    25  	"testing"
    26  
    27  	"github.com/google/uuid"
    28  	"google.golang.org/grpc/balancer"
    29  	iresolver "google.golang.org/grpc/internal/resolver"
    30  	"google.golang.org/grpc/internal/testutils"
    31  	"google.golang.org/grpc/internal/testutils/xds/e2e"
    32  	"google.golang.org/grpc/resolver"
    33  	"google.golang.org/grpc/serviceconfig"
    34  	"google.golang.org/grpc/xds/internal/balancer/clustermanager"
    35  	"google.golang.org/grpc/xds/internal/clusterspecifier"
    36  	"google.golang.org/protobuf/proto"
    37  	"google.golang.org/protobuf/types/known/anypb"
    38  	"google.golang.org/protobuf/types/known/wrapperspb"
    39  
    40  	v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
    41  	v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
    42  )
    43  
    44  func init() {
    45  	balancer.Register(cspBalancerBuilder{})
    46  	clusterspecifier.Register(testClusterSpecifierPlugin{})
    47  }
    48  
    49  // cspBalancerBuilder is a no-op LB policy which is referenced by the
    50  // testClusterSpecifierPlugin.
    51  type cspBalancerBuilder struct{}
    52  
    53  func (cspBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
    54  	return nil
    55  }
    56  
    57  func (cspBalancerBuilder) Name() string {
    58  	return "csp_experimental"
    59  }
    60  
    61  type cspBalancerConfig struct {
    62  	serviceconfig.LoadBalancingConfig
    63  	ArbitraryField string `json:"arbitrary_field"`
    64  }
    65  
    66  func (cspBalancerBuilder) ParseConfig(lbCfg json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
    67  	cfg := &cspBalancerConfig{}
    68  	if err := json.Unmarshal(lbCfg, cfg); err != nil {
    69  		return nil, err
    70  	}
    71  	return cfg, nil
    72  
    73  }
    74  
    75  // testClusterSpecifierPlugin is a test cluster specifier plugin which returns
    76  // an LB policy configuration specifying the cspBalancer.
    77  type testClusterSpecifierPlugin struct {
    78  }
    79  
    80  func (testClusterSpecifierPlugin) TypeURLs() []string {
    81  	// The config for this plugin contains a wrapperspb.StringValue, and since
    82  	// we marshal that proto as an Any proto, the type URL on the latter gets
    83  	// set to "type.googleapis.com/google.protobuf.StringValue". If we wanted a
    84  	// more descriptive type URL for this test plugin, we would have to define a
    85  	// proto package with a message for the configuration. That would be
    86  	// overkill for a test. Therefore, this seems to be an acceptable tradeoff.
    87  	return []string{"type.googleapis.com/google.protobuf.StringValue"}
    88  }
    89  
    90  func (testClusterSpecifierPlugin) ParseClusterSpecifierConfig(cfg proto.Message) (clusterspecifier.BalancerConfig, error) {
    91  	if cfg == nil {
    92  		return nil, fmt.Errorf("testClusterSpecifierPlugin: nil configuration message provided")
    93  	}
    94  	anyp, ok := cfg.(*anypb.Any)
    95  	if !ok {
    96  		return nil, fmt.Errorf("testClusterSpecifierPlugin: error parsing config %v: got type %T, want *anypb.Any", cfg, cfg)
    97  	}
    98  	lbCfg := new(wrapperspb.StringValue)
    99  	if err := anypb.UnmarshalTo(anyp, lbCfg, proto.UnmarshalOptions{}); err != nil {
   100  		return nil, fmt.Errorf("testClusterSpecifierPlugin: error parsing config %v: %v", cfg, err)
   101  	}
   102  	return []map[string]any{{"csp_experimental": cspBalancerConfig{ArbitraryField: lbCfg.GetValue()}}}, nil
   103  }
   104  
   105  // TestResolverClusterSpecifierPlugin tests the case where a route configuration
   106  // containing cluster specifier plugins is sent by the management server. The
   107  // test verifies that the service config output by the resolver contains the LB
   108  // policy specified by the cluster specifier plugin, and the config selector
   109  // returns the cluster associated with the cluster specifier plugin.
   110  //
   111  // The test also verifies that a change in the cluster specifier plugin config
   112  // result in appropriate change in the service config pushed by the resolver.
   113  func (s) TestResolverClusterSpecifierPlugin(t *testing.T) {
   114  	// Spin up an xDS management server for the test.
   115  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   116  	defer cancel()
   117  	nodeID := uuid.New().String()
   118  	mgmtServer, _, _ := setupManagementServerForTest(ctx, t, nodeID)
   119  
   120  	// Configure resources on the management server.
   121  	listeners := []*v3listenerpb.Listener{e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)}
   122  	routes := []*v3routepb.RouteConfiguration{e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{
   123  		RouteConfigName:              defaultTestRouteConfigName,
   124  		ListenerName:                 defaultTestServiceName,
   125  		ClusterSpecifierType:         e2e.RouteConfigClusterSpecifierTypeClusterSpecifierPlugin,
   126  		ClusterSpecifierPluginName:   "cspA",
   127  		ClusterSpecifierPluginConfig: testutils.MarshalAny(t, &wrapperspb.StringValue{Value: "anything"}),
   128  	})}
   129  	configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes)
   130  
   131  	stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)})
   132  
   133  	// Wait for an update from the resolver, and verify the service config.
   134  	wantSC := `
   135   {
   136  	 "loadBalancingConfig": [
   137  		 {
   138  		   "xds_cluster_manager_experimental": {
   139  			 "children": {
   140  			   "cluster_specifier_plugin:cspA": {
   141  				 "childPolicy": [
   142  				   {
   143  					 "csp_experimental": {
   144  					   "arbitrary_field": "anything"
   145  					 }
   146  				   }
   147  				 ]
   148  			   }
   149  			 }
   150  		   }
   151  		 }
   152  	   ]
   153   }`
   154  	cs := verifyUpdateFromResolver(ctx, t, stateCh, wantSC)
   155  	res, err := cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"})
   156  	if err != nil {
   157  		t.Fatalf("cs.SelectConfig(): %v", err)
   158  	}
   159  
   160  	gotCluster := clustermanager.GetPickedClusterForTesting(res.Context)
   161  	wantCluster := "cluster_specifier_plugin:cspA"
   162  	if gotCluster != wantCluster {
   163  		t.Fatalf("config selector returned cluster: %v, want: %v", gotCluster, wantCluster)
   164  	}
   165  
   166  	// Change the cluster specifier plugin configuration.
   167  	routes = []*v3routepb.RouteConfiguration{e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{
   168  		RouteConfigName:              defaultTestRouteConfigName,
   169  		ListenerName:                 defaultTestServiceName,
   170  		ClusterSpecifierType:         e2e.RouteConfigClusterSpecifierTypeClusterSpecifierPlugin,
   171  		ClusterSpecifierPluginName:   "cspA",
   172  		ClusterSpecifierPluginConfig: testutils.MarshalAny(t, &wrapperspb.StringValue{Value: "changed"}),
   173  	})}
   174  	configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes)
   175  
   176  	// Wait for an update from the resolver, and verify the service config.
   177  	wantSC = `
   178   {
   179  	 "loadBalancingConfig": [
   180  		 {
   181  		   "xds_cluster_manager_experimental": {
   182  			 "children": {
   183  			   "cluster_specifier_plugin:cspA": {
   184  				 "childPolicy": [
   185  				   {
   186  					 "csp_experimental": {
   187  					   "arbitrary_field": "changed"
   188  					 }
   189  				   }
   190  				 ]
   191  			   }
   192  			 }
   193  		   }
   194  		 }
   195  	   ]
   196   }`
   197  	verifyUpdateFromResolver(ctx, t, stateCh, wantSC)
   198  }
   199  
   200  // TestXDSResolverDelayedOnCommittedCSP tests that cluster specifier plugins and
   201  // their corresponding configurations remain in service config if RPCs are in
   202  // flight.
   203  func (s) TestXDSResolverDelayedOnCommittedCSP(t *testing.T) {
   204  	// Spin up an xDS management server for the test.
   205  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   206  	defer cancel()
   207  	nodeID := uuid.New().String()
   208  	mgmtServer, _, _ := setupManagementServerForTest(ctx, t, nodeID)
   209  
   210  	// Configure resources on the management server.
   211  	listeners := []*v3listenerpb.Listener{e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)}
   212  	routes := []*v3routepb.RouteConfiguration{e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{
   213  		RouteConfigName:              defaultTestRouteConfigName,
   214  		ListenerName:                 defaultTestServiceName,
   215  		ClusterSpecifierType:         e2e.RouteConfigClusterSpecifierTypeClusterSpecifierPlugin,
   216  		ClusterSpecifierPluginName:   "cspA",
   217  		ClusterSpecifierPluginConfig: testutils.MarshalAny(t, &wrapperspb.StringValue{Value: "anythingA"}),
   218  	})}
   219  	configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes)
   220  
   221  	stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)})
   222  
   223  	// Wait for an update from the resolver, and verify the service config.
   224  	wantSC := `
   225   {
   226  	 "loadBalancingConfig": [
   227  		 {
   228  		   "xds_cluster_manager_experimental": {
   229  			 "children": {
   230  			   "cluster_specifier_plugin:cspA": {
   231  				 "childPolicy": [
   232  				   {
   233  					 "csp_experimental": {
   234  					   "arbitrary_field": "anythingA"
   235  					 }
   236  				   }
   237  				 ]
   238  			   }
   239  			 }
   240  		   }
   241  		 }
   242  	   ]
   243   }`
   244  	cs := verifyUpdateFromResolver(ctx, t, stateCh, wantSC)
   245  
   246  	resOld, err := cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"})
   247  	if err != nil {
   248  		t.Fatalf("cs.SelectConfig(): %v", err)
   249  	}
   250  
   251  	gotCluster := clustermanager.GetPickedClusterForTesting(resOld.Context)
   252  	wantCluster := "cluster_specifier_plugin:cspA"
   253  	if gotCluster != wantCluster {
   254  		t.Fatalf("config selector returned cluster: %v, want: %v", gotCluster, wantCluster)
   255  	}
   256  
   257  	// Delay resOld.OnCommitted(). As long as there are pending RPCs to removed
   258  	// clusters, they still appear in the service config.
   259  
   260  	// Change the cluster specifier plugin configuration.
   261  	routes = []*v3routepb.RouteConfiguration{e2e.RouteConfigResourceWithOptions(e2e.RouteConfigOptions{
   262  		RouteConfigName:              defaultTestRouteConfigName,
   263  		ListenerName:                 defaultTestServiceName,
   264  		ClusterSpecifierType:         e2e.RouteConfigClusterSpecifierTypeClusterSpecifierPlugin,
   265  		ClusterSpecifierPluginName:   "cspB",
   266  		ClusterSpecifierPluginConfig: testutils.MarshalAny(t, &wrapperspb.StringValue{Value: "anythingB"}),
   267  	})}
   268  	configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes)
   269  
   270  	// Wait for an update from the resolver, and verify the service config.
   271  	wantSC = `
   272   {
   273  	 "loadBalancingConfig": [
   274  		 {
   275  		   "xds_cluster_manager_experimental": {
   276  			 "children": {
   277  			   "cluster_specifier_plugin:cspA": {
   278  				 "childPolicy": [
   279  				   {
   280  					 "csp_experimental": {
   281  					   "arbitrary_field": "anythingA"
   282  					 }
   283  				   }
   284  				 ]
   285  			   },
   286  			   "cluster_specifier_plugin:cspB": {
   287  				 "childPolicy": [
   288  				   {
   289  					 "csp_experimental": {
   290  					   "arbitrary_field": "anythingB"
   291  					 }
   292  				   }
   293  				 ]
   294  			   }
   295  			 }
   296  		   }
   297  		 }
   298  	   ]
   299   }`
   300  	cs = verifyUpdateFromResolver(ctx, t, stateCh, wantSC)
   301  
   302  	// Perform an RPC and ensure that it is routed to the new cluster.
   303  	resNew, err := cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"})
   304  	if err != nil {
   305  		t.Fatalf("cs.SelectConfig(): %v", err)
   306  	}
   307  
   308  	gotCluster = clustermanager.GetPickedClusterForTesting(resNew.Context)
   309  	wantCluster = "cluster_specifier_plugin:cspB"
   310  	if gotCluster != wantCluster {
   311  		t.Fatalf("config selector returned cluster: %v, want: %v", gotCluster, wantCluster)
   312  	}
   313  
   314  	// Invoke resOld.OnCommitted; should lead to a service config update that deletes
   315  	// cspA.
   316  	resOld.OnCommitted()
   317  
   318  	wantSC = `
   319   {
   320  	 "loadBalancingConfig": [
   321  		 {
   322  		   "xds_cluster_manager_experimental": {
   323  			 "children": {
   324  			   "cluster_specifier_plugin:cspB": {
   325  				 "childPolicy": [
   326  				   {
   327  					 "csp_experimental": {
   328  					   "arbitrary_field": "anythingB"
   329  					 }
   330  				   }
   331  				 ]
   332  			   }
   333  			 }
   334  		   }
   335  		 }
   336  	   ]
   337   }`
   338  	verifyUpdateFromResolver(ctx, t, stateCh, wantSC)
   339  }
   340  

View as plain text