...

Source file src/google.golang.org/grpc/test/xds/xds_client_ignore_resource_deletion_test.go

Documentation: google.golang.org/grpc/test/xds

     1  /*
     2   *
     3   * Copyright 2023 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  package xds_test
    20  
    21  import (
    22  	"context"
    23  	"fmt"
    24  	"net"
    25  	"sync"
    26  	"testing"
    27  	"time"
    28  
    29  	"github.com/google/uuid"
    30  	"google.golang.org/grpc"
    31  	"google.golang.org/grpc/connectivity"
    32  	"google.golang.org/grpc/credentials/insecure"
    33  	"google.golang.org/grpc/internal"
    34  	"google.golang.org/grpc/internal/stubserver"
    35  	"google.golang.org/grpc/internal/testutils"
    36  	"google.golang.org/grpc/internal/testutils/xds/bootstrap"
    37  	"google.golang.org/grpc/internal/testutils/xds/e2e"
    38  	"google.golang.org/grpc/resolver"
    39  	"google.golang.org/grpc/xds"
    40  
    41  	clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
    42  	endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
    43  	listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
    44  	routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
    45  	testgrpc "google.golang.org/grpc/interop/grpc_testing"
    46  	testpb "google.golang.org/grpc/interop/grpc_testing"
    47  )
    48  
    49  const (
    50  	serviceName = "my-service-xds"
    51  	rdsName     = "route-" + serviceName
    52  	cdsName1    = "cluster1-" + serviceName
    53  	cdsName2    = "cluster2-" + serviceName
    54  	edsName1    = "eds1-" + serviceName
    55  	edsName2    = "eds2-" + serviceName
    56  )
    57  
    58  var (
    59  	// This route configuration resource contains two routes:
    60  	// - a route for the EmptyCall rpc, to be sent to cluster1
    61  	// - a route for the UnaryCall rpc, to be sent to cluster2
    62  	defaultRouteConfigWithTwoRoutes = &routepb.RouteConfiguration{
    63  		Name: rdsName,
    64  		VirtualHosts: []*routepb.VirtualHost{{
    65  			Domains: []string{serviceName},
    66  			Routes: []*routepb.Route{
    67  				{
    68  					Match: &routepb.RouteMatch{PathSpecifier: &routepb.RouteMatch_Prefix{Prefix: "/grpc.testing.TestService/EmptyCall"}},
    69  					Action: &routepb.Route_Route{Route: &routepb.RouteAction{
    70  						ClusterSpecifier: &routepb.RouteAction_Cluster{Cluster: cdsName1},
    71  					}},
    72  				},
    73  				{
    74  					Match: &routepb.RouteMatch{PathSpecifier: &routepb.RouteMatch_Prefix{Prefix: "/grpc.testing.TestService/UnaryCall"}},
    75  					Action: &routepb.Route_Route{Route: &routepb.RouteAction{
    76  						ClusterSpecifier: &routepb.RouteAction_Cluster{Cluster: cdsName2},
    77  					}},
    78  				},
    79  			},
    80  		}},
    81  	}
    82  )
    83  
    84  // This test runs subtest each for a Listener resource and a Cluster resource deletion
    85  // in the response from the server for the following cases:
    86  //   - testResourceDeletionIgnored: When ignore_resource_deletion is set, the
    87  //     xDSClient should not delete the resource.
    88  //   - testResourceDeletionNotIgnored: When ignore_resource_deletion is unset,
    89  //     the xDSClient should delete the resource.
    90  //
    91  // Resource deletion is only applicable to Listener and Cluster resources.
    92  func (s) TestIgnoreResourceDeletionOnClient(t *testing.T) {
    93  	server1 := stubserver.StartTestService(t, nil)
    94  	t.Cleanup(server1.Stop)
    95  
    96  	server2 := stubserver.StartTestService(t, nil)
    97  	t.Cleanup(server2.Stop)
    98  
    99  	initialResourceOnServer := func(nodeID string) e2e.UpdateOptions {
   100  		return e2e.UpdateOptions{
   101  			NodeID:    nodeID,
   102  			Listeners: []*listenerpb.Listener{e2e.DefaultClientListener(serviceName, rdsName)},
   103  			Routes:    []*routepb.RouteConfiguration{defaultRouteConfigWithTwoRoutes},
   104  			Clusters: []*clusterpb.Cluster{
   105  				e2e.DefaultCluster(cdsName1, edsName1, e2e.SecurityLevelNone),
   106  				e2e.DefaultCluster(cdsName2, edsName2, e2e.SecurityLevelNone),
   107  			},
   108  			Endpoints: []*endpointpb.ClusterLoadAssignment{
   109  				e2e.DefaultEndpoint(edsName1, "localhost", []uint32{testutils.ParsePort(t, server1.Address)}),
   110  				e2e.DefaultEndpoint(edsName2, "localhost", []uint32{testutils.ParsePort(t, server2.Address)}),
   111  			},
   112  			SkipValidation: true,
   113  		}
   114  	}
   115  
   116  	tests := []struct {
   117  		name           string
   118  		updateResource func(r *e2e.UpdateOptions)
   119  	}{
   120  		{
   121  			name: "listener",
   122  			updateResource: func(r *e2e.UpdateOptions) {
   123  				r.Listeners = nil
   124  			},
   125  		},
   126  		{
   127  			name: "cluster",
   128  			updateResource: func(r *e2e.UpdateOptions) {
   129  				r.Clusters = nil
   130  			},
   131  		},
   132  	}
   133  	for _, test := range tests {
   134  		t.Run(fmt.Sprintf("%s resource deletion ignored", test.name), func(t *testing.T) {
   135  			testResourceDeletionIgnored(t, initialResourceOnServer, test.updateResource)
   136  		})
   137  		t.Run(fmt.Sprintf("%s resource deletion not ignored", test.name), func(t *testing.T) {
   138  			testResourceDeletionNotIgnored(t, initialResourceOnServer, test.updateResource)
   139  		})
   140  	}
   141  }
   142  
   143  // This subtest tests the scenario where the bootstrap config has "ignore_resource_deletion"
   144  // set in "server_features" field. This subtest verifies that the resource was
   145  // not deleted by the xDSClient when a resource is missing the xDS response and
   146  // RPCs continue to succeed.
   147  func testResourceDeletionIgnored(t *testing.T, initialResource func(string) e2e.UpdateOptions, updateResource func(r *e2e.UpdateOptions)) {
   148  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   149  	t.Cleanup(cancel)
   150  	mgmtServer := startManagementServer(t)
   151  	nodeID := uuid.New().String()
   152  	bs := generateBootstrapContents(t, mgmtServer.Address, true, nodeID)
   153  	xdsR := xdsResolverBuilder(t, bs)
   154  	resources := initialResource(nodeID)
   155  
   156  	// Update the management server with initial resources setup.
   157  	if err := mgmtServer.Update(ctx, resources); err != nil {
   158  		t.Fatal(err)
   159  	}
   160  
   161  	cc, err := grpc.NewClient(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(xdsR))
   162  	if err != nil {
   163  		t.Fatalf("Failed to dial local test server: %v.", err)
   164  	}
   165  	t.Cleanup(func() { cc.Close() })
   166  
   167  	if err := verifyRPCtoAllEndpoints(cc); err != nil {
   168  		t.Fatal(err)
   169  	}
   170  
   171  	// Mutate resource and update on the server.
   172  	updateResource(&resources)
   173  	if err := mgmtServer.Update(ctx, resources); err != nil {
   174  		t.Fatal(err)
   175  	}
   176  
   177  	// Make an RPC every 50ms for the next 500ms. This is to ensure that the
   178  	// updated resource is received from the management server and is processed by
   179  	// gRPC. Since resource deletions are ignored by the xDS client, we expect RPCs
   180  	// to all endpoints to keep succeeding.
   181  	timer := time.NewTimer(500 * time.Millisecond)
   182  	ticker := time.NewTicker(50 * time.Millisecond)
   183  	t.Cleanup(ticker.Stop)
   184  	for {
   185  		if err := verifyRPCtoAllEndpoints(cc); err != nil {
   186  			t.Fatal(err)
   187  		}
   188  		select {
   189  		case <-ctx.Done():
   190  			return
   191  		case <-timer.C:
   192  			return
   193  		case <-ticker.C:
   194  		}
   195  	}
   196  }
   197  
   198  // This subtest tests the scenario where the bootstrap config has "ignore_resource_deletion"
   199  // not set in "server_features" field. This subtest verifies that the resource was
   200  // deleted by the xDSClient when a resource is missing the xDS response and subsequent
   201  // RPCs fail.
   202  func testResourceDeletionNotIgnored(t *testing.T, initialResource func(string) e2e.UpdateOptions, updateResource func(r *e2e.UpdateOptions)) {
   203  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout*1000)
   204  	t.Cleanup(cancel)
   205  	mgmtServer := startManagementServer(t)
   206  	nodeID := uuid.New().String()
   207  	bs := generateBootstrapContents(t, mgmtServer.Address, false, nodeID)
   208  	xdsR := xdsResolverBuilder(t, bs)
   209  	resources := initialResource(nodeID)
   210  
   211  	// Update the management server with initial resources setup.
   212  	if err := mgmtServer.Update(ctx, resources); err != nil {
   213  		t.Fatal(err)
   214  	}
   215  
   216  	cc, err := grpc.NewClient(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(xdsR))
   217  	if err != nil {
   218  		t.Fatalf("failed to dial local test server: %v", err)
   219  	}
   220  	t.Cleanup(func() { cc.Close() })
   221  
   222  	if err := verifyRPCtoAllEndpoints(cc); err != nil {
   223  		t.Fatal(err)
   224  	}
   225  
   226  	// Mutate resource and update on the server.
   227  	updateResource(&resources)
   228  	if err := mgmtServer.Update(ctx, resources); err != nil {
   229  		t.Fatal(err)
   230  	}
   231  
   232  	// Spin up go routines to verify RPCs fail after the update.
   233  	client := testgrpc.NewTestServiceClient(cc)
   234  	wg := sync.WaitGroup{}
   235  	wg.Add(2)
   236  	go func() {
   237  		defer wg.Done()
   238  		for ; ctx.Err() == nil; <-time.After(10 * time.Millisecond) {
   239  			if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
   240  				return
   241  			}
   242  		}
   243  	}()
   244  	go func() {
   245  		defer wg.Done()
   246  		for ; ctx.Err() == nil; <-time.After(10 * time.Millisecond) {
   247  			if _, err := client.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
   248  				return
   249  			}
   250  		}
   251  	}()
   252  
   253  	wg.Wait()
   254  	if ctx.Err() != nil {
   255  		t.Fatal("Context expired before RPCs failed.")
   256  	}
   257  }
   258  
   259  // This helper creates a management server for the test.
   260  func startManagementServer(t *testing.T) *e2e.ManagementServer {
   261  	t.Helper()
   262  	mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{})
   263  	if err != nil {
   264  		t.Fatalf("Failed to start management server: %v", err)
   265  	}
   266  	t.Cleanup(mgmtServer.Stop)
   267  	return mgmtServer
   268  }
   269  
   270  // This helper generates a custom bootstrap config for the test.
   271  func generateBootstrapContents(t *testing.T, serverURI string, ignoreResourceDeletion bool, nodeID string) []byte {
   272  	t.Helper()
   273  	bootstrapContents, err := bootstrap.Contents(bootstrap.Options{
   274  		NodeID:                             nodeID,
   275  		ServerURI:                          serverURI,
   276  		ServerListenerResourceNameTemplate: e2e.ServerListenerResourceNameTemplate,
   277  		IgnoreResourceDeletion:             ignoreResourceDeletion,
   278  	})
   279  	if err != nil {
   280  		t.Fatal(err)
   281  	}
   282  	return bootstrapContents
   283  }
   284  
   285  // This helper creates an XDS resolver Builder from the bootstrap config passed
   286  // as parameter.
   287  func xdsResolverBuilder(t *testing.T, bs []byte) resolver.Builder {
   288  	t.Helper()
   289  	resolverBuilder := internal.NewXDSResolverWithConfigForTesting.(func([]byte) (resolver.Builder, error))
   290  	xdsR, err := resolverBuilder(bs)
   291  	if err != nil {
   292  		t.Fatalf("Creating xDS resolver for testing failed for config %q: %v", string(bs), err)
   293  	}
   294  	return xdsR
   295  }
   296  
   297  // This helper creates an xDS-enabled gRPC server using the listener and the
   298  // bootstrap config passed. It then registers the test service on the newly
   299  // created gRPC server and starts serving.
   300  func setupGRPCServerWithModeChangeChannelAndServe(t *testing.T, bootstrapContents []byte, lis net.Listener) chan connectivity.ServingMode {
   301  	t.Helper()
   302  	updateCh := make(chan connectivity.ServingMode, 1)
   303  
   304  	// Create a server option to get notified about serving mode changes.
   305  	modeChangeOpt := xds.ServingModeCallback(func(addr net.Addr, args xds.ServingModeChangeArgs) {
   306  		t.Logf("Serving mode for listener %q changed to %q, err: %v", addr.String(), args.Mode, args.Err)
   307  		updateCh <- args.Mode
   308  	})
   309  	server, err := xds.NewGRPCServer(grpc.Creds(insecure.NewCredentials()), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents))
   310  	if err != nil {
   311  		t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
   312  	}
   313  	t.Cleanup(server.Stop)
   314  	testgrpc.RegisterTestServiceServer(server, &testService{})
   315  
   316  	// Serve.
   317  	go func() {
   318  		if err := server.Serve(lis); err != nil {
   319  			t.Errorf("Serve() failed: %v", err)
   320  		}
   321  	}()
   322  
   323  	return updateCh
   324  }
   325  
   326  // This helper creates a new TCP listener. This helper also uses this listener to
   327  // create a resource update with a listener resource. This helper returns the
   328  // resource update and the TCP listener.
   329  func resourceWithListenerForGRPCServer(t *testing.T, nodeID string) (e2e.UpdateOptions, net.Listener) {
   330  	t.Helper()
   331  	lis, err := testutils.LocalTCPListener()
   332  	if err != nil {
   333  		t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
   334  	}
   335  	t.Cleanup(func() { lis.Close() })
   336  	host, port, err := hostPortFromListener(lis)
   337  	if err != nil {
   338  		t.Fatalf("Failed to retrieve host and port of listener at %q: %v", lis.Addr(), err)
   339  	}
   340  	listener := e2e.DefaultServerListener(host, port, e2e.SecurityLevelNone, "routeName")
   341  	resources := e2e.UpdateOptions{
   342  		NodeID:    nodeID,
   343  		Listeners: []*listenerpb.Listener{listener},
   344  	}
   345  	return resources, lis
   346  }
   347  
   348  // This test creates a gRPC server which provides server-side xDS functionality
   349  // by talking to a custom management server. This tests the scenario where bootstrap
   350  // config with "server_features" includes "ignore_resource_deletion". In which
   351  // case, when the listener resource is deleted on the management server, the gRPC
   352  // server should continue to serve RPCs.
   353  func (s) TestListenerResourceDeletionOnServerIgnored(t *testing.T) {
   354  	mgmtServer := startManagementServer(t)
   355  	nodeID := uuid.New().String()
   356  	bs := generateBootstrapContents(t, mgmtServer.Address, true, nodeID)
   357  	xdsR := xdsResolverBuilder(t, bs)
   358  	resources, lis := resourceWithListenerForGRPCServer(t, nodeID)
   359  	modeChangeCh := setupGRPCServerWithModeChangeChannelAndServe(t, bs, lis)
   360  
   361  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   362  	defer cancel()
   363  	if err := mgmtServer.Update(ctx, resources); err != nil {
   364  		t.Fatal(err)
   365  	}
   366  
   367  	// Wait for the server to update to ServingModeServing mode.
   368  	select {
   369  	case <-ctx.Done():
   370  		t.Fatal("Test timed out waiting for a server to change to ServingModeServing.")
   371  	case mode := <-modeChangeCh:
   372  		if mode != connectivity.ServingModeServing {
   373  			t.Fatalf("Server switched to mode %v, want %v", mode, connectivity.ServingModeServing)
   374  		}
   375  	}
   376  
   377  	// Create a ClientConn and make a successful RPCs.
   378  	cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(xdsR))
   379  	if err != nil {
   380  		t.Fatalf("failed to dial local test server: %v", err)
   381  	}
   382  	defer cc.Close()
   383  
   384  	if err := verifyRPCtoAllEndpoints(cc); err != nil {
   385  		t.Fatal(err)
   386  	}
   387  
   388  	// Update without a listener resource.
   389  	if err := mgmtServer.Update(ctx, e2e.UpdateOptions{
   390  		NodeID:    nodeID,
   391  		Listeners: []*listenerpb.Listener{},
   392  	}); err != nil {
   393  		t.Fatal(err)
   394  	}
   395  
   396  	// Perform RPCs every 100 ms for 1s and verify that the serving mode does not
   397  	// change on gRPC server.
   398  	timer := time.NewTimer(500 * time.Millisecond)
   399  	ticker := time.NewTicker(50 * time.Millisecond)
   400  	t.Cleanup(ticker.Stop)
   401  	for {
   402  		if err := verifyRPCtoAllEndpoints(cc); err != nil {
   403  			t.Fatal(err)
   404  		}
   405  		select {
   406  		case <-timer.C:
   407  			return
   408  		case mode := <-modeChangeCh:
   409  			t.Fatalf("Server switched to mode: %v when no switch was expected", mode)
   410  		case <-ticker.C:
   411  		}
   412  	}
   413  }
   414  
   415  // This test creates a gRPC server which provides server-side xDS functionality
   416  // by talking to a custom management server. This tests the scenario where bootstrap
   417  // config with "server_features" does not include "ignore_resource_deletion". In
   418  // which case, when the listener resource is deleted on the management server, the
   419  // gRPC server should stop serving RPCs and switch mode to ServingModeNotServing.
   420  func (s) TestListenerResourceDeletionOnServerNotIgnored(t *testing.T) {
   421  	mgmtServer := startManagementServer(t)
   422  	nodeID := uuid.New().String()
   423  	bs := generateBootstrapContents(t, mgmtServer.Address, false, nodeID)
   424  	xdsR := xdsResolverBuilder(t, bs)
   425  	resources, lis := resourceWithListenerForGRPCServer(t, nodeID)
   426  	updateCh := setupGRPCServerWithModeChangeChannelAndServe(t, bs, lis)
   427  
   428  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   429  	defer cancel()
   430  	if err := mgmtServer.Update(ctx, resources); err != nil {
   431  		t.Fatal(err)
   432  	}
   433  
   434  	// Wait for the listener to move to "serving" mode.
   435  	select {
   436  	case <-ctx.Done():
   437  		t.Fatal("Test timed out waiting for a mode change update.")
   438  	case mode := <-updateCh:
   439  		if mode != connectivity.ServingModeServing {
   440  			t.Fatalf("Listener received new mode %v, want %v", mode, connectivity.ServingModeServing)
   441  		}
   442  	}
   443  
   444  	// Create a ClientConn and make a successful RPCs.
   445  	cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(xdsR))
   446  	if err != nil {
   447  		t.Fatalf("failed to dial local test server: %v", err)
   448  	}
   449  	defer cc.Close()
   450  	if err := verifyRPCtoAllEndpoints(cc); err != nil {
   451  		t.Fatal(err)
   452  	}
   453  
   454  	if err := mgmtServer.Update(ctx, e2e.UpdateOptions{
   455  		NodeID:    nodeID,
   456  		Listeners: []*listenerpb.Listener{}, // empty listener resource
   457  	}); err != nil {
   458  		t.Fatal(err)
   459  	}
   460  
   461  	select {
   462  	case <-ctx.Done():
   463  		t.Fatalf("timed out waiting for a mode change update: %v", err)
   464  	case mode := <-updateCh:
   465  		if mode != connectivity.ServingModeNotServing {
   466  			t.Fatalf("listener received new mode %v, want %v", mode, connectivity.ServingModeNotServing)
   467  		}
   468  	}
   469  }
   470  
   471  // This helper makes both UnaryCall and EmptyCall RPCs using the ClientConn that
   472  // is passed to this function. This helper panics for any failed RPCs.
   473  func verifyRPCtoAllEndpoints(cc grpc.ClientConnInterface) error {
   474  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   475  	defer cancel()
   476  	client := testgrpc.NewTestServiceClient(cc)
   477  	if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
   478  		return fmt.Errorf("rpc EmptyCall() failed: %v", err)
   479  	}
   480  	if _, err := client.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
   481  		return fmt.Errorf("rpc UnaryCall() failed: %v", err)
   482  	}
   483  	return nil
   484  }
   485  

View as plain text