...

Source file src/google.golang.org/grpc/test/xds/xds_server_serving_mode_test.go

Documentation: google.golang.org/grpc/test/xds

     1  /*
     2   *
     3   * Copyright 2021 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  package xds_test
    20  
    21  import (
    22  	"context"
    23  	"fmt"
    24  	"net"
    25  	"testing"
    26  	"time"
    27  
    28  	"google.golang.org/grpc"
    29  	"google.golang.org/grpc/connectivity"
    30  	"google.golang.org/grpc/credentials/insecure"
    31  	xdscreds "google.golang.org/grpc/credentials/xds"
    32  	"google.golang.org/grpc/internal/testutils"
    33  	"google.golang.org/grpc/internal/testutils/xds/e2e"
    34  	"google.golang.org/grpc/xds"
    35  
    36  	v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
    37  	testgrpc "google.golang.org/grpc/interop/grpc_testing"
    38  	testpb "google.golang.org/grpc/interop/grpc_testing"
    39  )
    40  
    41  // TestServerSideXDS_RedundantUpdateSuppression tests the scenario where the
    42  // control plane sends the same resource update. It verifies that the mode
    43  // change callback is not invoked and client connections to the server are not
    44  // recycled.
    45  func (s) TestServerSideXDS_RedundantUpdateSuppression(t *testing.T) {
    46  	managementServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
    47  	defer cleanup()
    48  
    49  	creds, err := xdscreds.NewServerCredentials(xdscreds.ServerOptions{FallbackCreds: insecure.NewCredentials()})
    50  	if err != nil {
    51  		t.Fatal(err)
    52  	}
    53  	lis, err := testutils.LocalTCPListener()
    54  	if err != nil {
    55  		t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
    56  	}
    57  	updateCh := make(chan connectivity.ServingMode, 1)
    58  
    59  	// Create a server option to get notified about serving mode changes.
    60  	modeChangeOpt := xds.ServingModeCallback(func(addr net.Addr, args xds.ServingModeChangeArgs) {
    61  		t.Logf("serving mode for listener %q changed to %q, err: %v", addr.String(), args.Mode, args.Err)
    62  		updateCh <- args.Mode
    63  	})
    64  
    65  	// Initialize an xDS-enabled gRPC server and register the stubServer on it.
    66  	server, err := xds.NewGRPCServer(grpc.Creds(creds), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents))
    67  	if err != nil {
    68  		t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
    69  	}
    70  	defer server.Stop()
    71  	testgrpc.RegisterTestServiceServer(server, &testService{})
    72  
    73  	// Setup the management server to respond with the listener resources.
    74  	host, port, err := hostPortFromListener(lis)
    75  	if err != nil {
    76  		t.Fatalf("failed to retrieve host and port of server: %v", err)
    77  	}
    78  	listener := e2e.DefaultServerListener(host, port, e2e.SecurityLevelNone, "routeName")
    79  	resources := e2e.UpdateOptions{
    80  		NodeID:    nodeID,
    81  		Listeners: []*v3listenerpb.Listener{listener},
    82  	}
    83  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
    84  	defer cancel()
    85  	if err := managementServer.Update(ctx, resources); err != nil {
    86  		t.Fatal(err)
    87  	}
    88  
    89  	go func() {
    90  		if err := server.Serve(lis); err != nil {
    91  			t.Errorf("Serve() failed: %v", err)
    92  		}
    93  	}()
    94  
    95  	// Wait for the listener to move to "serving" mode.
    96  	select {
    97  	case <-ctx.Done():
    98  		t.Fatalf("timed out waiting for a mode change update: %v", err)
    99  	case mode := <-updateCh:
   100  		if mode != connectivity.ServingModeServing {
   101  			t.Fatalf("listener received new mode %v, want %v", mode, connectivity.ServingModeServing)
   102  		}
   103  	}
   104  
   105  	// Create a ClientConn and make a successful RPCs.
   106  	cc, err := grpc.NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
   107  	if err != nil {
   108  		t.Fatalf("failed to dial local test server: %v", err)
   109  	}
   110  	defer cc.Close()
   111  	waitForSuccessfulRPC(ctx, t, cc)
   112  
   113  	// Start a goroutine to make sure that we do not see any connectivity state
   114  	// changes on the client connection. If redundant updates are not
   115  	// suppressed, server will recycle client connections.
   116  	errCh := make(chan error, 1)
   117  	go func() {
   118  		prev := connectivity.Ready // We know we are READY since we just did an RPC.
   119  		for {
   120  			curr := cc.GetState()
   121  			if !(curr == connectivity.Ready || curr == connectivity.Idle) {
   122  				errCh <- fmt.Errorf("unexpected connectivity state change {%s --> %s} on the client connection", prev, curr)
   123  				return
   124  			}
   125  			if !cc.WaitForStateChange(ctx, curr) {
   126  				// Break out of the for loop when the context has been cancelled.
   127  				break
   128  			}
   129  			prev = curr
   130  		}
   131  		errCh <- nil
   132  	}()
   133  
   134  	// Update the management server with the same listener resource. This will
   135  	// update the resource version though, and should result in a the management
   136  	// server sending the same resource to the xDS-enabled gRPC server.
   137  	if err := managementServer.Update(ctx, e2e.UpdateOptions{
   138  		NodeID:    nodeID,
   139  		Listeners: []*v3listenerpb.Listener{listener},
   140  	}); err != nil {
   141  		t.Fatal(err)
   142  	}
   143  
   144  	// Since redundant resource updates are suppressed, we should not see the
   145  	// mode change callback being invoked.
   146  	sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
   147  	defer sCancel()
   148  	select {
   149  	case <-sCtx.Done():
   150  	case mode := <-updateCh:
   151  		t.Fatalf("unexpected mode change callback with new mode %v", mode)
   152  	}
   153  
   154  	// Make sure RPCs continue to succeed.
   155  	waitForSuccessfulRPC(ctx, t, cc)
   156  
   157  	// Cancel the context to ensure that the WaitForStateChange call exits early
   158  	// and returns false.
   159  	cancel()
   160  	if err := <-errCh; err != nil {
   161  		t.Fatal(err)
   162  	}
   163  }
   164  
   165  // TestServerSideXDS_ServingModeChanges tests the serving mode functionality in
   166  // xDS enabled gRPC servers. It verifies that appropriate mode changes happen in
   167  // the server, and also verifies behavior of clientConns under these modes.
   168  func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) {
   169  	managementServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
   170  	defer cleanup()
   171  
   172  	// Configure xDS credentials to be used on the server-side.
   173  	creds, err := xdscreds.NewServerCredentials(xdscreds.ServerOptions{
   174  		FallbackCreds: insecure.NewCredentials(),
   175  	})
   176  	if err != nil {
   177  		t.Fatal(err)
   178  	}
   179  
   180  	// Create two local listeners and pass it to Serve().
   181  	lis1, err := testutils.LocalTCPListener()
   182  	if err != nil {
   183  		t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
   184  	}
   185  	lis2, err := testutils.LocalTCPListener()
   186  	if err != nil {
   187  		t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
   188  	}
   189  
   190  	// Create a couple of channels on which mode updates will be pushed.
   191  	updateCh1 := make(chan connectivity.ServingMode, 1)
   192  	updateCh2 := make(chan connectivity.ServingMode, 1)
   193  
   194  	// Create a server option to get notified about serving mode changes, and
   195  	// push the updated mode on the channels created above.
   196  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   197  	defer cancel()
   198  	modeChangeOpt := xds.ServingModeCallback(func(addr net.Addr, args xds.ServingModeChangeArgs) {
   199  		t.Logf("serving mode for listener %q changed to %q, err: %v", addr.String(), args.Mode, args.Err)
   200  		switch addr.String() {
   201  		case lis1.Addr().String():
   202  			updateCh1 <- args.Mode
   203  		case lis2.Addr().String():
   204  			updateCh2 <- args.Mode
   205  		default:
   206  			t.Errorf("serving mode callback invoked for unknown listener address: %q", addr.String())
   207  		}
   208  	})
   209  
   210  	// Initialize an xDS-enabled gRPC server and register the stubServer on it.
   211  	server, err := xds.NewGRPCServer(grpc.Creds(creds), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents))
   212  	if err != nil {
   213  		t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
   214  	}
   215  	defer server.Stop()
   216  	testgrpc.RegisterTestServiceServer(server, &testService{})
   217  
   218  	// Setup the management server to respond with server-side Listener
   219  	// resources for both listeners.
   220  	host1, port1, err := hostPortFromListener(lis1)
   221  	if err != nil {
   222  		t.Fatalf("failed to retrieve host and port of server: %v", err)
   223  	}
   224  	listener1 := e2e.DefaultServerListener(host1, port1, e2e.SecurityLevelNone, "routeName")
   225  	host2, port2, err := hostPortFromListener(lis2)
   226  	if err != nil {
   227  		t.Fatalf("failed to retrieve host and port of server: %v", err)
   228  	}
   229  	listener2 := e2e.DefaultServerListener(host2, port2, e2e.SecurityLevelNone, "routeName")
   230  	resources := e2e.UpdateOptions{
   231  		NodeID:    nodeID,
   232  		Listeners: []*v3listenerpb.Listener{listener1, listener2},
   233  	}
   234  	if err := managementServer.Update(ctx, resources); err != nil {
   235  		t.Fatal(err)
   236  	}
   237  
   238  	go func() {
   239  		if err := server.Serve(lis1); err != nil {
   240  			t.Errorf("Serve() failed: %v", err)
   241  		}
   242  	}()
   243  	go func() {
   244  		if err := server.Serve(lis2); err != nil {
   245  			t.Errorf("Serve() failed: %v", err)
   246  		}
   247  	}()
   248  
   249  	// Wait for both listeners to move to "serving" mode.
   250  	select {
   251  	case <-ctx.Done():
   252  		t.Fatalf("timed out waiting for a mode change update: %v", err)
   253  	case mode := <-updateCh1:
   254  		if mode != connectivity.ServingModeServing {
   255  			t.Fatalf("listener received new mode %v, want %v", mode, connectivity.ServingModeServing)
   256  		}
   257  	}
   258  	select {
   259  	case <-ctx.Done():
   260  		t.Fatalf("timed out waiting for a mode change update: %v", err)
   261  	case mode := <-updateCh2:
   262  		if mode != connectivity.ServingModeServing {
   263  			t.Fatalf("listener received new mode %v, want %v", mode, connectivity.ServingModeServing)
   264  		}
   265  	}
   266  
   267  	// Create a ClientConn to the first listener and make a successful RPCs.
   268  	cc1, err := grpc.NewClient(lis1.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
   269  	if err != nil {
   270  		t.Fatalf("failed to dial local test server: %v", err)
   271  	}
   272  	defer cc1.Close()
   273  	waitForSuccessfulRPC(ctx, t, cc1)
   274  
   275  	// Create a ClientConn to the second listener and make a successful RPCs.
   276  	cc2, err := grpc.NewClient(lis2.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
   277  	if err != nil {
   278  		t.Fatalf("failed to dial local test server: %v", err)
   279  	}
   280  	defer cc2.Close()
   281  	waitForSuccessfulRPC(ctx, t, cc2)
   282  
   283  	// Update the management server to remove the second listener resource. This
   284  	// should push only the second listener into "not-serving" mode.
   285  	if err := managementServer.Update(ctx, e2e.UpdateOptions{
   286  		NodeID:    nodeID,
   287  		Listeners: []*v3listenerpb.Listener{listener1},
   288  	}); err != nil {
   289  		t.Fatal(err)
   290  	}
   291  
   292  	// Wait for lis2 to move to "not-serving" mode.
   293  	select {
   294  	case <-ctx.Done():
   295  		t.Fatalf("timed out waiting for a mode change update: %v", err)
   296  	case mode := <-updateCh2:
   297  		if mode != connectivity.ServingModeNotServing {
   298  			t.Fatalf("listener received new mode %v, want %v", mode, connectivity.ServingModeNotServing)
   299  		}
   300  	}
   301  
   302  	// Make sure RPCs succeed on cc1 and fail on cc2.
   303  	waitForSuccessfulRPC(ctx, t, cc1)
   304  	waitForFailedRPC(ctx, t, cc2)
   305  
   306  	// Update the management server to remove the first listener resource as
   307  	// well. This should push the first listener into "not-serving" mode. Second
   308  	// listener is already in "not-serving" mode.
   309  	if err := managementServer.Update(ctx, e2e.UpdateOptions{
   310  		NodeID:    nodeID,
   311  		Listeners: []*v3listenerpb.Listener{},
   312  	}); err != nil {
   313  		t.Fatal(err)
   314  	}
   315  
   316  	// Wait for lis1 to move to "not-serving" mode. lis2 was already removed
   317  	// from the xdsclient's resource cache. So, lis2's callback will not be
   318  	// invoked this time around.
   319  	select {
   320  	case <-ctx.Done():
   321  		t.Fatalf("timed out waiting for a mode change update: %v", err)
   322  	case mode := <-updateCh1:
   323  		if mode != connectivity.ServingModeNotServing {
   324  			t.Fatalf("listener received new mode %v, want %v", mode, connectivity.ServingModeNotServing)
   325  		}
   326  	}
   327  
   328  	// Make sure RPCs fail on both.
   329  	waitForFailedRPC(ctx, t, cc1)
   330  	waitForFailedRPC(ctx, t, cc2)
   331  
   332  	// Make sure new connection attempts to "not-serving" servers fail. We use a
   333  	// short timeout since we expect this to fail.
   334  	sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
   335  	defer sCancel()
   336  	if _, err := grpc.DialContext(sCtx, lis1.Addr().String(), grpc.WithBlock(), grpc.WithTransportCredentials(insecure.NewCredentials())); err == nil {
   337  		t.Fatal("successfully created clientConn to a server in \"not-serving\" state")
   338  	}
   339  
   340  	// Update the management server with both listener resources.
   341  	if err := managementServer.Update(ctx, e2e.UpdateOptions{
   342  		NodeID:    nodeID,
   343  		Listeners: []*v3listenerpb.Listener{listener1, listener2},
   344  	}); err != nil {
   345  		t.Fatal(err)
   346  	}
   347  
   348  	// Wait for both listeners to move to "serving" mode.
   349  	select {
   350  	case <-ctx.Done():
   351  		t.Fatalf("timed out waiting for a mode change update: %v", err)
   352  	case mode := <-updateCh1:
   353  		if mode != connectivity.ServingModeServing {
   354  			t.Fatalf("listener received new mode %v, want %v", mode, connectivity.ServingModeServing)
   355  		}
   356  	}
   357  	select {
   358  	case <-ctx.Done():
   359  		t.Fatalf("timed out waiting for a mode change update: %v", err)
   360  	case mode := <-updateCh2:
   361  		if mode != connectivity.ServingModeServing {
   362  			t.Fatalf("listener received new mode %v, want %v", mode, connectivity.ServingModeServing)
   363  		}
   364  	}
   365  
   366  	// The clientConns created earlier should be able to make RPCs now.
   367  	waitForSuccessfulRPC(ctx, t, cc1)
   368  	waitForSuccessfulRPC(ctx, t, cc2)
   369  }
   370  
   371  func waitForSuccessfulRPC(ctx context.Context, t *testing.T, cc *grpc.ClientConn) {
   372  	t.Helper()
   373  
   374  	c := testgrpc.NewTestServiceClient(cc)
   375  	if _, err := c.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
   376  		t.Fatalf("rpc EmptyCall() failed: %v", err)
   377  	}
   378  }
   379  
   380  func waitForFailedRPC(ctx context.Context, t *testing.T, cc *grpc.ClientConn) {
   381  	t.Helper()
   382  
   383  	// Attempt one RPC before waiting for the ticker to expire.
   384  	c := testgrpc.NewTestServiceClient(cc)
   385  	if _, err := c.EmptyCall(ctx, &testpb.Empty{}); err != nil {
   386  		return
   387  	}
   388  
   389  	ticker := time.NewTicker(10 * time.Millisecond)
   390  	defer ticker.Stop()
   391  	for {
   392  		select {
   393  		case <-ctx.Done():
   394  			t.Fatalf("failure when waiting for RPCs to fail: %v", ctx.Err())
   395  		case <-ticker.C:
   396  			if _, err := c.EmptyCall(ctx, &testpb.Empty{}); err != nil {
   397  				return
   398  			}
   399  		}
   400  	}
   401  }
   402  

View as plain text