...

Source file src/google.golang.org/grpc/test/xds/xds_server_test.go

Documentation: google.golang.org/grpc/test/xds

     1  /*
     2   *
     3   * Copyright 2023 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  package xds_test
    20  
    21  import (
    22  	"context"
    23  	"io"
    24  	"net"
    25  	"strings"
    26  	"testing"
    27  	"time"
    28  
    29  	"google.golang.org/grpc"
    30  	"google.golang.org/grpc/codes"
    31  	"google.golang.org/grpc/connectivity"
    32  	"google.golang.org/grpc/credentials/insecure"
    33  	"google.golang.org/grpc/internal"
    34  	"google.golang.org/grpc/internal/grpcsync"
    35  	"google.golang.org/grpc/internal/testutils"
    36  	"google.golang.org/grpc/internal/testutils/xds/e2e"
    37  	"google.golang.org/grpc/status"
    38  	"google.golang.org/grpc/xds"
    39  
    40  	v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
    41  	v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
    42  	testgrpc "google.golang.org/grpc/interop/grpc_testing"
    43  	testpb "google.golang.org/grpc/interop/grpc_testing"
    44  )
    45  
    46  var (
    47  	errAcceptAndClose = status.New(codes.Unavailable, "")
    48  )
    49  
    50  // TestServeLDSRDS tests the case where a server receives LDS resource which
    51  // specifies RDS. LDS and RDS resources are configured on the management server,
    52  // which the server should pick up. The server should successfully accept
    53  // connections and RPCs should work on these accepted connections. It then
    54  // switches the RDS resource to match incoming RPC's to a route type of type
    55  // that isn't non forwarding action. This should get picked up by the connection
    56  // dynamically, and subsequent RPC's on that connection should start failing
    57  // with status code UNAVAILABLE.
    58  func (s) TestServeLDSRDS(t *testing.T) {
    59  	managementServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
    60  	defer cleanup()
    61  	lis, err := testutils.LocalTCPListener()
    62  	if err != nil {
    63  		t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
    64  	}
    65  	// Setup the management server to respond with a listener resource that
    66  	// specifies a route name to watch, and a RDS resource corresponding to this
    67  	// route name.
    68  	host, port, err := hostPortFromListener(lis)
    69  	if err != nil {
    70  		t.Fatalf("failed to retrieve host and port of server: %v", err)
    71  	}
    72  
    73  	listener := e2e.DefaultServerListenerWithRouteConfigName(host, port, e2e.SecurityLevelNone, "routeName")
    74  	routeConfig := e2e.RouteConfigNonForwardingAction("routeName")
    75  
    76  	resources := e2e.UpdateOptions{
    77  		NodeID:    nodeID,
    78  		Listeners: []*v3listenerpb.Listener{listener},
    79  		Routes:    []*v3routepb.RouteConfiguration{routeConfig},
    80  	}
    81  
    82  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
    83  	defer cancel()
    84  	if err := managementServer.Update(ctx, resources); err != nil {
    85  		t.Fatal(err)
    86  	}
    87  
    88  	serving := grpcsync.NewEvent()
    89  	modeChangeOpt := xds.ServingModeCallback(func(addr net.Addr, args xds.ServingModeChangeArgs) {
    90  		t.Logf("serving mode for listener %q changed to %q, err: %v", addr.String(), args.Mode, args.Err)
    91  		if args.Mode == connectivity.ServingModeServing {
    92  			serving.Fire()
    93  		}
    94  	})
    95  
    96  	server, err := xds.NewGRPCServer(grpc.Creds(insecure.NewCredentials()), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents))
    97  	if err != nil {
    98  		t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
    99  	}
   100  	defer server.Stop()
   101  	testgrpc.RegisterTestServiceServer(server, &testService{})
   102  	go func() {
   103  		if err := server.Serve(lis); err != nil {
   104  			t.Errorf("Serve() failed: %v", err)
   105  		}
   106  	}()
   107  	select {
   108  	case <-ctx.Done():
   109  		t.Fatal("timeout waiting for the xDS Server to go Serving")
   110  	case <-serving.Done():
   111  	}
   112  
   113  	cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
   114  	if err != nil {
   115  		t.Fatalf("failed to dial local test server: %v", err)
   116  	}
   117  	defer cc.Close()
   118  
   119  	waitForSuccessfulRPC(ctx, t, cc) // Eventually, the LDS and dynamic RDS get processed, work, and RPC's should work as usual.
   120  
   121  	// Set the route config to be of type route action route, which the rpc will
   122  	// match to. This should eventually reflect in the Conn's routing
   123  	// configuration and fail the rpc with a status code UNAVAILABLE.
   124  	routeConfig = e2e.RouteConfigFilterAction("routeName")
   125  	resources = e2e.UpdateOptions{
   126  		NodeID:    nodeID,
   127  		Listeners: []*v3listenerpb.Listener{listener}, // Same lis, so will get eaten by the xDS Client.
   128  		Routes:    []*v3routepb.RouteConfiguration{routeConfig},
   129  	}
   130  	if err := managementServer.Update(ctx, resources); err != nil {
   131  		t.Fatal(err)
   132  	}
   133  
   134  	// "NonForwardingAction is expected for all Routes used on server-side; a
   135  	// route with an inappropriate action causes RPCs matching that route to
   136  	// fail with UNAVAILABLE." - A36
   137  	waitForFailedRPCWithStatus(ctx, t, cc, status.New(codes.Unavailable, "the incoming RPC matched to a route that was not of action type non forwarding"))
   138  }
   139  
   140  // waitForFailedRPCWithStatus makes unary RPC's until it receives the expected
   141  // status in a polling manner. Fails if the RPC made does not return the
   142  // expected status before the context expires.
   143  func waitForFailedRPCWithStatus(ctx context.Context, t *testing.T, cc *grpc.ClientConn, st *status.Status) {
   144  	t.Helper()
   145  
   146  	c := testgrpc.NewTestServiceClient(cc)
   147  	ticker := time.NewTicker(10 * time.Millisecond)
   148  	defer ticker.Stop()
   149  	var err error
   150  	for {
   151  		select {
   152  		case <-ctx.Done():
   153  			t.Fatalf("failure when waiting for RPCs to fail with certain status %v: %v. most recent error received from RPC: %v", st, ctx.Err(), err)
   154  		case <-ticker.C:
   155  			_, err = c.EmptyCall(ctx, &testpb.Empty{})
   156  			if status.Code(err) == st.Code() && strings.Contains(err.Error(), st.Message()) {
   157  				t.Logf("most recent error happy case: %v", err.Error())
   158  				return
   159  			}
   160  		}
   161  	}
   162  }
   163  
   164  // TestResourceNack tests the case where an LDS points to an RDS which returns
   165  // an RDS Resource which is NACKed. This should trigger server should move to
   166  // serving, successfully Accept Connections, and fail at the L7 level with a
   167  // certain error message.
   168  func (s) TestRDSNack(t *testing.T) {
   169  	managementServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
   170  	defer cleanup()
   171  	lis, err := testutils.LocalTCPListener()
   172  	if err != nil {
   173  		t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
   174  	}
   175  	// Setup the management server to respond with a listener resource that
   176  	// specifies a route name to watch, and no RDS resource corresponding to
   177  	// this route name.
   178  	host, port, err := hostPortFromListener(lis)
   179  	if err != nil {
   180  		t.Fatalf("failed to retrieve host and port of server: %v", err)
   181  	}
   182  
   183  	listener := e2e.DefaultServerListenerWithRouteConfigName(host, port, e2e.SecurityLevelNone, "routeName")
   184  	routeConfig := e2e.RouteConfigNoRouteMatch("routeName")
   185  	resources := e2e.UpdateOptions{
   186  		NodeID:         nodeID,
   187  		Listeners:      []*v3listenerpb.Listener{listener},
   188  		Routes:         []*v3routepb.RouteConfiguration{routeConfig},
   189  		SkipValidation: true,
   190  	}
   191  
   192  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   193  	defer cancel()
   194  	if err := managementServer.Update(ctx, resources); err != nil {
   195  		t.Fatal(err)
   196  	}
   197  	serving := grpcsync.NewEvent()
   198  	modeChangeOpt := xds.ServingModeCallback(func(addr net.Addr, args xds.ServingModeChangeArgs) {
   199  		t.Logf("serving mode for listener %q changed to %q, err: %v", addr.String(), args.Mode, args.Err)
   200  		if args.Mode == connectivity.ServingModeServing {
   201  			serving.Fire()
   202  		}
   203  	})
   204  
   205  	server, err := xds.NewGRPCServer(grpc.Creds(insecure.NewCredentials()), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents))
   206  	if err != nil {
   207  		t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
   208  	}
   209  	defer server.Stop()
   210  	testgrpc.RegisterTestServiceServer(server, &testService{})
   211  	go func() {
   212  		if err := server.Serve(lis); err != nil {
   213  			t.Errorf("Serve() failed: %v", err)
   214  		}
   215  	}()
   216  
   217  	cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
   218  	if err != nil {
   219  		t.Fatalf("failed to dial local test server: %v", err)
   220  	}
   221  	defer cc.Close()
   222  
   223  	<-serving.Done()
   224  	waitForFailedRPCWithStatus(ctx, t, cc, status.New(codes.Unavailable, "error from xDS configuration for matched route configuration"))
   225  }
   226  
   227  // TestResourceNotFoundRDS tests the case where an LDS points to an RDS which
   228  // returns resource not found. Before getting the resource not found, the xDS
   229  // Server has not received all configuration needed, so it should Accept and
   230  // Close any new connections. After it has received the resource not found
   231  // error, the server should move to serving, successfully Accept Connections,
   232  // and fail at the L7 level with resource not found specified.
   233  func (s) TestResourceNotFoundRDS(t *testing.T) {
   234  	managementServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
   235  	defer cleanup()
   236  	lis, err := testutils.LocalTCPListener()
   237  	if err != nil {
   238  		t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
   239  	}
   240  	// Setup the management server to respond with a listener resource that
   241  	// specifies a route name to watch, and no RDS resource corresponding to
   242  	// this route name.
   243  	host, port, err := hostPortFromListener(lis)
   244  	if err != nil {
   245  		t.Fatalf("failed to retrieve host and port of server: %v", err)
   246  	}
   247  
   248  	listener := e2e.DefaultServerListenerWithRouteConfigName(host, port, e2e.SecurityLevelNone, "routeName")
   249  	resources := e2e.UpdateOptions{
   250  		NodeID:         nodeID,
   251  		Listeners:      []*v3listenerpb.Listener{listener},
   252  		SkipValidation: true,
   253  	}
   254  
   255  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   256  	defer cancel()
   257  	if err := managementServer.Update(ctx, resources); err != nil {
   258  		t.Fatal(err)
   259  	}
   260  	serving := grpcsync.NewEvent()
   261  	modeChangeOpt := xds.ServingModeCallback(func(addr net.Addr, args xds.ServingModeChangeArgs) {
   262  		t.Logf("serving mode for listener %q changed to %q, err: %v", addr.String(), args.Mode, args.Err)
   263  		if args.Mode == connectivity.ServingModeServing {
   264  			serving.Fire()
   265  		}
   266  	})
   267  
   268  	server, err := xds.NewGRPCServer(grpc.Creds(insecure.NewCredentials()), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents))
   269  	if err != nil {
   270  		t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
   271  	}
   272  	defer server.Stop()
   273  	testgrpc.RegisterTestServiceServer(server, &testService{})
   274  	go func() {
   275  		if err := server.Serve(lis); err != nil {
   276  			t.Errorf("Serve() failed: %v", err)
   277  		}
   278  	}()
   279  
   280  	cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
   281  	if err != nil {
   282  		t.Fatalf("failed to dial local test server: %v", err)
   283  	}
   284  	defer cc.Close()
   285  
   286  	waitForFailedRPCWithStatus(ctx, t, cc, errAcceptAndClose)
   287  
   288  	// Invoke resource not found - this should result in L7 RPC error with
   289  	// unavailable receive on serving as a result, should trigger it to go
   290  	// serving. Poll as watch might not be started yet to trigger resource not
   291  	// found.
   292  loop:
   293  	for {
   294  		if err := internal.TriggerXDSResourceNameNotFoundClient.(func(string, string) error)("RouteConfigResource", "routeName"); err != nil {
   295  			t.Fatalf("Failed to trigger resource name not found for testing: %v", err)
   296  		}
   297  		select {
   298  		case <-serving.Done():
   299  			break loop
   300  		case <-ctx.Done():
   301  			t.Fatalf("timed out waiting for serving mode to go serving")
   302  		case <-time.After(time.Millisecond):
   303  		}
   304  	}
   305  	waitForFailedRPCWithStatus(ctx, t, cc, status.New(codes.Unavailable, "error from xDS configuration for matched route configuration"))
   306  }
   307  
   308  // TestServingModeChanges tests the Server's logic as it transitions from Not
   309  // Ready to Ready, then to Not Ready. Before it goes Ready, connections should
   310  // be accepted and closed. After it goes ready, RPC's should proceed as normal
   311  // according to matched route configuration. After it transitions back into not
   312  // ready (through an explicit LDS Resource Not Found), previously running RPC's
   313  // should be gracefully closed and still work, and new RPC's should fail.
   314  func (s) TestServingModeChanges(t *testing.T) {
   315  	managementServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
   316  	defer cleanup()
   317  	lis, err := testutils.LocalTCPListener()
   318  	if err != nil {
   319  		t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
   320  	}
   321  	// Setup the management server to respond with a listener resource that
   322  	// specifies a route name to watch. Due to not having received the full
   323  	// configuration, this should cause the server to be in mode Serving.
   324  	host, port, err := hostPortFromListener(lis)
   325  	if err != nil {
   326  		t.Fatalf("failed to retrieve host and port of server: %v", err)
   327  	}
   328  
   329  	listener := e2e.DefaultServerListenerWithRouteConfigName(host, port, e2e.SecurityLevelNone, "routeName")
   330  	resources := e2e.UpdateOptions{
   331  		NodeID:         nodeID,
   332  		Listeners:      []*v3listenerpb.Listener{listener},
   333  		SkipValidation: true,
   334  	}
   335  
   336  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   337  	defer cancel()
   338  	if err := managementServer.Update(ctx, resources); err != nil {
   339  		t.Fatal(err)
   340  	}
   341  
   342  	serving := grpcsync.NewEvent()
   343  	modeChangeOpt := xds.ServingModeCallback(func(addr net.Addr, args xds.ServingModeChangeArgs) {
   344  		t.Logf("serving mode for listener %q changed to %q, err: %v", addr.String(), args.Mode, args.Err)
   345  		if args.Mode == connectivity.ServingModeServing {
   346  			serving.Fire()
   347  		}
   348  	})
   349  
   350  	server, err := xds.NewGRPCServer(grpc.Creds(insecure.NewCredentials()), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents))
   351  	if err != nil {
   352  		t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
   353  	}
   354  	defer server.Stop()
   355  	testgrpc.RegisterTestServiceServer(server, &testService{})
   356  	go func() {
   357  		if err := server.Serve(lis); err != nil {
   358  			t.Errorf("Serve() failed: %v", err)
   359  		}
   360  	}()
   361  	cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
   362  	if err != nil {
   363  		t.Fatalf("failed to dial local test server: %v", err)
   364  	}
   365  	defer cc.Close()
   366  
   367  	waitForFailedRPCWithStatus(ctx, t, cc, errAcceptAndClose)
   368  	routeConfig := e2e.RouteConfigNonForwardingAction("routeName")
   369  	resources = e2e.UpdateOptions{
   370  		NodeID:    nodeID,
   371  		Listeners: []*v3listenerpb.Listener{listener},
   372  		Routes:    []*v3routepb.RouteConfiguration{routeConfig},
   373  	}
   374  	defer cancel()
   375  	if err := managementServer.Update(ctx, resources); err != nil {
   376  		t.Fatal(err)
   377  	}
   378  
   379  	select {
   380  	case <-ctx.Done():
   381  		t.Fatal("timeout waiting for the xDS Server to go Serving")
   382  	case <-serving.Done():
   383  	}
   384  
   385  	// A unary RPC should work once it transitions into serving. (need this same
   386  	// assertion from LDS resource not found triggering it).
   387  	waitForSuccessfulRPC(ctx, t, cc)
   388  
   389  	// Start a stream before switching the server to not serving. Due to the
   390  	// stream being created before the graceful stop of the underlying
   391  	// connection, it should be able to continue even after the server switches
   392  	// to not serving.
   393  	c := testgrpc.NewTestServiceClient(cc)
   394  	stream, err := c.FullDuplexCall(ctx)
   395  	if err != nil {
   396  		t.Fatalf("cc.FullDuplexCall failed: %f", err)
   397  	}
   398  
   399  	// Invoke the lds resource not found - this should cause the server to
   400  	// switch to not serving. This should gracefully drain connections, and fail
   401  	// RPC's after. (how to assert accepted + closed) does this make it's way to
   402  	// application layer? (should work outside of resource not found...
   403  
   404  	// Invoke LDS Resource not found here (tests graceful close)
   405  	if err := internal.TriggerXDSResourceNameNotFoundClient.(func(string, string) error)("ListenerResource", listener.GetName()); err != nil {
   406  		t.Fatalf("Failed to trigger resource name not found for testing: %v", err)
   407  	}
   408  
   409  	// New RPCs on that connection should eventually start failing. Due to
   410  	// Graceful Stop any started streams continue to work.
   411  	if err = stream.Send(&testpb.StreamingOutputCallRequest{}); err != nil {
   412  		t.Fatalf("stream.Send() failed: %v, should continue to work due to graceful stop", err)
   413  	}
   414  	if err = stream.CloseSend(); err != nil {
   415  		t.Fatalf("stream.CloseSend() failed: %v, should continue to work due to graceful stop", err)
   416  	}
   417  	if _, err = stream.Recv(); err != io.EOF {
   418  		t.Fatalf("unexpected error: %v, expected an EOF error", err)
   419  	}
   420  
   421  	// New RPCs on that connection should eventually start failing.
   422  	waitForFailedRPCWithStatus(ctx, t, cc, errAcceptAndClose)
   423  }
   424  
   425  // TestMultipleUpdatesImmediatelySwitch tests the case where you get an LDS
   426  // specifying RDS A, B, and C (with A being matched to). The Server should be in
   427  // not serving until it receives all 3 RDS Configurations, and then transition
   428  // into serving. RPCs will match to RDS A and work properly. Afterward, it
   429  // receives an LDS specifying RDS A, B. The Filter Chain pointing to RDS A
   430  // doesn't get matched, and the Default Filter Chain pointing to RDS B does get
   431  // matched. RDS B is of the wrong route type for server side, so RPC's are
   432  // expected to eventually fail with that information. However, any RPC's on the
   433  // old configuration should be allowed to complete due to the transition being
   434  // graceful stop.After, it receives an LDS specifying RDS A (which incoming
   435  // RPC's will match to). This configuration should eventually be represented in
   436  // the Server's state, and RPCs should proceed successfully.
   437  func (s) TestMultipleUpdatesImmediatelySwitch(t *testing.T) {
   438  	managementServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
   439  	defer cleanup()
   440  	lis, err := testutils.LocalTCPListener()
   441  	if err != nil {
   442  		t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
   443  	}
   444  	host, port, err := hostPortFromListener(lis)
   445  	if err != nil {
   446  		t.Fatalf("failed to retrieve host and port of server: %v", err)
   447  	}
   448  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   449  	defer cancel()
   450  
   451  	// Setup the management server to respond with a listener resource that
   452  	// specifies three route names to watch.
   453  	ldsResource := e2e.ListenerResourceThreeRouteResources(host, port, e2e.SecurityLevelNone, "routeName")
   454  	resources := e2e.UpdateOptions{
   455  		NodeID:         nodeID,
   456  		Listeners:      []*v3listenerpb.Listener{ldsResource},
   457  		SkipValidation: true,
   458  	}
   459  	if err := managementServer.Update(ctx, resources); err != nil {
   460  		t.Fatal(err)
   461  	}
   462  	server, err := xds.NewGRPCServer(grpc.Creds(insecure.NewCredentials()), testModeChangeServerOption(t), xds.BootstrapContentsForTesting(bootstrapContents))
   463  	if err != nil {
   464  		t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
   465  	}
   466  	defer server.Stop()
   467  	testgrpc.RegisterTestServiceServer(server, &testService{})
   468  	go func() {
   469  		if err := server.Serve(lis); err != nil {
   470  			t.Errorf("Serve() failed: %v", err)
   471  		}
   472  	}()
   473  
   474  	cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
   475  	if err != nil {
   476  		t.Fatalf("failed to dial local test server: %v", err)
   477  	}
   478  	defer cc.Close()
   479  
   480  	waitForFailedRPCWithStatus(ctx, t, cc, errAcceptAndClose)
   481  
   482  	routeConfig1 := e2e.RouteConfigNonForwardingAction("routeName")
   483  	routeConfig2 := e2e.RouteConfigFilterAction("routeName2")
   484  	routeConfig3 := e2e.RouteConfigFilterAction("routeName3")
   485  	resources = e2e.UpdateOptions{
   486  		NodeID:         nodeID,
   487  		Listeners:      []*v3listenerpb.Listener{ldsResource},
   488  		Routes:         []*v3routepb.RouteConfiguration{routeConfig1, routeConfig2, routeConfig3},
   489  		SkipValidation: true,
   490  	}
   491  	if err := managementServer.Update(ctx, resources); err != nil {
   492  		t.Fatal(err)
   493  	}
   494  	pollForSuccessfulRPC(ctx, t, cc)
   495  
   496  	c := testgrpc.NewTestServiceClient(cc)
   497  	stream, err := c.FullDuplexCall(ctx)
   498  	if err != nil {
   499  		t.Fatalf("cc.FullDuplexCall failed: %f", err)
   500  	}
   501  	if err = stream.Send(&testpb.StreamingOutputCallRequest{}); err != nil {
   502  		t.Fatalf("stream.Send() failed: %v, should continue to work due to graceful stop", err)
   503  	}
   504  
   505  	// Configure with LDS with a filter chain that doesn't get matched to and a
   506  	// default filter chain that matches to RDS A.
   507  	ldsResource = e2e.ListenerResourceFallbackToDefault(host, port, e2e.SecurityLevelNone)
   508  	resources = e2e.UpdateOptions{
   509  		NodeID:         nodeID,
   510  		Listeners:      []*v3listenerpb.Listener{ldsResource},
   511  		Routes:         []*v3routepb.RouteConfiguration{routeConfig1, routeConfig2, routeConfig3},
   512  		SkipValidation: true,
   513  	}
   514  	if err := managementServer.Update(ctx, resources); err != nil {
   515  		t.Fatalf("error updating management server: %v", err)
   516  	}
   517  
   518  	// xDS is eventually consistent. So simply poll for the new change to be
   519  	// reflected.
   520  	// "NonForwardingAction is expected for all Routes used on server-side; a
   521  	// route with an inappropriate action causes RPCs matching that route to
   522  	// fail with UNAVAILABLE." - A36
   523  	waitForFailedRPCWithStatus(ctx, t, cc, status.New(codes.Unavailable, "the incoming RPC matched to a route that was not of action type non forwarding"))
   524  
   525  	// Stream should be allowed to continue on the old working configuration -
   526  	// as it on a connection that is gracefully closed (old FCM/LDS
   527  	// Configuration which is allowed to continue).
   528  	if err = stream.CloseSend(); err != nil {
   529  		t.Fatalf("stream.CloseSend() failed: %v, should continue to work due to graceful stop", err)
   530  	}
   531  	if _, err = stream.Recv(); err != io.EOF {
   532  		t.Fatalf("unexpected error: %v, expected an EOF error", err)
   533  	}
   534  
   535  	ldsResource = e2e.DefaultServerListener(host, port, e2e.SecurityLevelNone, "routeName")
   536  	resources = e2e.UpdateOptions{
   537  		NodeID:         nodeID,
   538  		Listeners:      []*v3listenerpb.Listener{ldsResource},
   539  		Routes:         []*v3routepb.RouteConfiguration{routeConfig1, routeConfig2, routeConfig3},
   540  		SkipValidation: true,
   541  	}
   542  	if err := managementServer.Update(ctx, resources); err != nil {
   543  		t.Fatal(err)
   544  	}
   545  
   546  	pollForSuccessfulRPC(ctx, t, cc)
   547  }
   548  
   549  func pollForSuccessfulRPC(ctx context.Context, t *testing.T, cc *grpc.ClientConn) {
   550  	t.Helper()
   551  	c := testgrpc.NewTestServiceClient(cc)
   552  	ticker := time.NewTicker(10 * time.Millisecond)
   553  	defer ticker.Stop()
   554  	for {
   555  		select {
   556  		case <-ctx.Done():
   557  			t.Fatalf("timeout waiting for RPCs to succeed")
   558  		case <-ticker.C:
   559  			if _, err := c.EmptyCall(ctx, &testpb.Empty{}); err == nil {
   560  				return
   561  			}
   562  		}
   563  	}
   564  }
   565  

View as plain text