...

Source file src/google.golang.org/grpc/xds/server_test.go

Documentation: google.golang.org/grpc/xds

     1  /*
     2   *
     3   * Copyright 2020 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  package xds
    20  
    21  import (
    22  	"context"
    23  	"encoding/json"
    24  	"errors"
    25  	"fmt"
    26  	"net"
    27  	"reflect"
    28  	"strconv"
    29  	"strings"
    30  	"sync"
    31  	"testing"
    32  	"time"
    33  
    34  	"github.com/google/go-cmp/cmp"
    35  	"github.com/google/uuid"
    36  	"google.golang.org/grpc"
    37  	"google.golang.org/grpc/connectivity"
    38  	"google.golang.org/grpc/credentials/insecure"
    39  	"google.golang.org/grpc/credentials/tls/certprovider"
    40  	"google.golang.org/grpc/credentials/xds"
    41  	"google.golang.org/grpc/internal/grpctest"
    42  	"google.golang.org/grpc/internal/testutils"
    43  	"google.golang.org/grpc/internal/testutils/xds/bootstrap"
    44  	"google.golang.org/grpc/internal/testutils/xds/e2e"
    45  	"google.golang.org/grpc/xds/internal/xdsclient"
    46  	"google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version"
    47  
    48  	v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
    49  	v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
    50  
    51  	_ "google.golang.org/grpc/xds/internal/httpfilter/router" // Register the router filter
    52  )
    53  
    54  const (
    55  	defaultTestTimeout          = 5 * time.Second
    56  	defaultTestShortTimeout     = 10 * time.Millisecond
    57  	nonExistentManagementServer = "non-existent-management-server"
    58  )
    59  
    60  type s struct {
    61  	grpctest.Tester
    62  }
    63  
    64  func Test(t *testing.T) {
    65  	grpctest.RunSubTests(t, s{})
    66  }
    67  
    68  type fakeGRPCServer struct {
    69  	done              chan struct{}
    70  	registerServiceCh *testutils.Channel
    71  	serveCh           *testutils.Channel
    72  }
    73  
    74  func (f *fakeGRPCServer) RegisterService(*grpc.ServiceDesc, any) {
    75  	f.registerServiceCh.Send(nil)
    76  }
    77  
    78  func (f *fakeGRPCServer) Serve(lis net.Listener) error {
    79  	f.serveCh.Send(nil)
    80  	<-f.done
    81  	lis.Close()
    82  	return nil
    83  }
    84  
    85  func (f *fakeGRPCServer) Stop() {
    86  	close(f.done)
    87  }
    88  func (f *fakeGRPCServer) GracefulStop() {
    89  	close(f.done)
    90  }
    91  
    92  func (f *fakeGRPCServer) GetServiceInfo() map[string]grpc.ServiceInfo {
    93  	panic("implement me")
    94  }
    95  
    96  func newFakeGRPCServer() *fakeGRPCServer {
    97  	return &fakeGRPCServer{
    98  		done:              make(chan struct{}),
    99  		registerServiceCh: testutils.NewChannel(),
   100  		serveCh:           testutils.NewChannel(),
   101  	}
   102  }
   103  
   104  func generateBootstrapContents(t *testing.T, nodeID, serverURI string) []byte {
   105  	t.Helper()
   106  
   107  	bs, err := e2e.DefaultBootstrapContents(nodeID, serverURI)
   108  	if err != nil {
   109  		t.Fatal(err)
   110  	}
   111  	return bs
   112  }
   113  
   114  func (s) TestNewServer_Success(t *testing.T) {
   115  	xdsCreds, err := xds.NewServerCredentials(xds.ServerOptions{FallbackCreds: insecure.NewCredentials()})
   116  	if err != nil {
   117  		t.Fatalf("failed to create xds server credentials: %v", err)
   118  	}
   119  
   120  	tests := []struct {
   121  		desc              string
   122  		serverOpts        []grpc.ServerOption
   123  		wantXDSCredsInUse bool
   124  	}{
   125  		{
   126  			desc: "without_xds_creds",
   127  			serverOpts: []grpc.ServerOption{
   128  				grpc.Creds(insecure.NewCredentials()),
   129  				BootstrapContentsForTesting(generateBootstrapContents(t, uuid.NewString(), nonExistentManagementServer)),
   130  			},
   131  		},
   132  		{
   133  			desc: "with_xds_creds",
   134  			serverOpts: []grpc.ServerOption{
   135  				grpc.Creds(xdsCreds),
   136  				BootstrapContentsForTesting(generateBootstrapContents(t, uuid.NewString(), nonExistentManagementServer)),
   137  			},
   138  			wantXDSCredsInUse: true,
   139  		},
   140  	}
   141  
   142  	for _, test := range tests {
   143  		t.Run(test.desc, func(t *testing.T) {
   144  			// The xds package adds a couple of server options (unary and stream
   145  			// interceptors) to the server options passed in by the user.
   146  			wantServerOpts := len(test.serverOpts) + 2
   147  
   148  			origNewGRPCServer := newGRPCServer
   149  			newGRPCServer = func(opts ...grpc.ServerOption) grpcServer {
   150  				if got := len(opts); got != wantServerOpts {
   151  					t.Fatalf("%d ServerOptions passed to grpc.Server, want %d", got, wantServerOpts)
   152  				}
   153  				// Verify that the user passed ServerOptions are forwarded as is.
   154  				if !reflect.DeepEqual(opts[2:], test.serverOpts) {
   155  					t.Fatalf("got ServerOptions %v, want %v", opts[2:], test.serverOpts)
   156  				}
   157  				return grpc.NewServer(opts...)
   158  			}
   159  			defer func() {
   160  				newGRPCServer = origNewGRPCServer
   161  			}()
   162  
   163  			s, err := NewGRPCServer(test.serverOpts...)
   164  			if err != nil {
   165  				t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
   166  			}
   167  			defer s.Stop()
   168  		})
   169  	}
   170  }
   171  
   172  func (s) TestNewServer_Failure(t *testing.T) {
   173  	xdsCreds, err := xds.NewServerCredentials(xds.ServerOptions{FallbackCreds: insecure.NewCredentials()})
   174  	if err != nil {
   175  		t.Fatalf("failed to create xds server credentials: %v", err)
   176  	}
   177  
   178  	tests := []struct {
   179  		desc       string
   180  		serverOpts []grpc.ServerOption
   181  		wantErr    string
   182  	}{
   183  		{
   184  			desc:       "bootstrap env var not set",
   185  			serverOpts: []grpc.ServerOption{grpc.Creds(xdsCreds)},
   186  			wantErr:    "bootstrap env vars are unspecified",
   187  		},
   188  		{
   189  			desc: "empty bootstrap config",
   190  			serverOpts: []grpc.ServerOption{
   191  				grpc.Creds(xdsCreds),
   192  				BootstrapContentsForTesting([]byte(`{}`)),
   193  			},
   194  			wantErr: "xDS client creation failed",
   195  		},
   196  		{
   197  			desc: "server_listener_resource_name_template is missing",
   198  			serverOpts: []grpc.ServerOption{
   199  				grpc.Creds(xdsCreds),
   200  				func() grpc.ServerOption {
   201  					bs, err := bootstrap.Contents(bootstrap.Options{
   202  						NodeID:    uuid.New().String(),
   203  						ServerURI: nonExistentManagementServer,
   204  						CertificateProviders: map[string]json.RawMessage{
   205  							"cert-provider-instance": json.RawMessage("{}"),
   206  						},
   207  					})
   208  					if err != nil {
   209  						t.Errorf("Failed to create bootstrap configuration: %v", err)
   210  					}
   211  					return BootstrapContentsForTesting(bs)
   212  				}(),
   213  			},
   214  			wantErr: "missing server_listener_resource_name_template in the bootstrap configuration",
   215  		},
   216  	}
   217  
   218  	for _, test := range tests {
   219  		t.Run(test.desc, func(t *testing.T) {
   220  			s, err := NewGRPCServer(test.serverOpts...)
   221  			if err == nil {
   222  				s.Stop()
   223  				t.Fatal("NewGRPCServer() succeeded when expected to fail")
   224  			}
   225  			if !strings.Contains(err.Error(), test.wantErr) {
   226  				t.Fatalf("NewGRPCServer() failed with error: %v, want: %s", err, test.wantErr)
   227  			}
   228  		})
   229  	}
   230  }
   231  
   232  func (s) TestRegisterService(t *testing.T) {
   233  	fs := newFakeGRPCServer()
   234  
   235  	origNewGRPCServer := newGRPCServer
   236  	newGRPCServer = func(opts ...grpc.ServerOption) grpcServer { return fs }
   237  	defer func() { newGRPCServer = origNewGRPCServer }()
   238  
   239  	s, err := NewGRPCServer(BootstrapContentsForTesting(generateBootstrapContents(t, uuid.NewString(), "non-existent-management-server")))
   240  	if err != nil {
   241  		t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
   242  	}
   243  	defer s.Stop()
   244  
   245  	s.RegisterService(&grpc.ServiceDesc{}, nil)
   246  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   247  	defer cancel()
   248  	if _, err := fs.registerServiceCh.Receive(ctx); err != nil {
   249  		t.Fatalf("Timeout when expecting RegisterService() to called on grpc.Server: %v", err)
   250  	}
   251  }
   252  
   253  const (
   254  	fakeProvider1Name = "fake-certificate-provider-1"
   255  	fakeProvider2Name = "fake-certificate-provider-2"
   256  )
   257  
   258  var (
   259  	fpb1, fpb2          *fakeProviderBuilder
   260  	fakeProvider1Config json.RawMessage
   261  	fakeProvider2Config json.RawMessage
   262  )
   263  
   264  func init() {
   265  	fpb1 = &fakeProviderBuilder{
   266  		name:    fakeProvider1Name,
   267  		buildCh: testutils.NewChannel(),
   268  	}
   269  	fpb2 = &fakeProviderBuilder{
   270  		name:    fakeProvider2Name,
   271  		buildCh: testutils.NewChannel(),
   272  	}
   273  	certprovider.Register(fpb1)
   274  	certprovider.Register(fpb2)
   275  
   276  	fakeProvider1Config = json.RawMessage(fmt.Sprintf(`{
   277  		"plugin_name": "%s",
   278  		"config": "my fake config 1"
   279  	}`, fakeProvider1Name))
   280  	fakeProvider2Config = json.RawMessage(fmt.Sprintf(`{
   281  		"plugin_name": "%s",
   282  		"config": "my fake config 2"
   283  	}`, fakeProvider2Name))
   284  }
   285  
   286  // fakeProviderBuilder builds new instances of fakeProvider and interprets the
   287  // config provided to it as a string.
   288  type fakeProviderBuilder struct {
   289  	name    string
   290  	buildCh *testutils.Channel
   291  }
   292  
   293  func (b *fakeProviderBuilder) ParseConfig(cfg any) (*certprovider.BuildableConfig, error) {
   294  	var config string
   295  	if err := json.Unmarshal(cfg.(json.RawMessage), &config); err != nil {
   296  		return nil, fmt.Errorf("providerBuilder %s failed to unmarshal config: %v", b.name, cfg)
   297  	}
   298  	return certprovider.NewBuildableConfig(b.name, []byte(config), func(certprovider.BuildOptions) certprovider.Provider {
   299  		b.buildCh.Send(nil)
   300  		return &fakeProvider{
   301  			Distributor: certprovider.NewDistributor(),
   302  			config:      config,
   303  		}
   304  	}), nil
   305  }
   306  
   307  func (b *fakeProviderBuilder) Name() string {
   308  	return b.name
   309  }
   310  
   311  // fakeProvider is an implementation of the Provider interface which provides a
   312  // method for tests to invoke to push new key materials.
   313  type fakeProvider struct {
   314  	*certprovider.Distributor
   315  	config string
   316  }
   317  
   318  // Close helps implement the Provider interface.
   319  func (p *fakeProvider) Close() {
   320  	p.Distributor.Stop()
   321  }
   322  
   323  func verifyCertProviderNotCreated() error {
   324  	sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
   325  	defer sCancel()
   326  	if _, err := fpb1.buildCh.Receive(sCtx); err != context.DeadlineExceeded {
   327  		return errors.New("certificate provider created when no xDS creds were specified")
   328  	}
   329  	sCtx, sCancel = context.WithTimeout(context.Background(), defaultTestShortTimeout)
   330  	defer sCancel()
   331  	if _, err := fpb2.buildCh.Receive(sCtx); err != context.DeadlineExceeded {
   332  		return errors.New("certificate provider created when no xDS creds were specified")
   333  	}
   334  	return nil
   335  }
   336  
   337  func hostPortFromListener(t *testing.T, lis net.Listener) (string, uint32) {
   338  	t.Helper()
   339  
   340  	host, p, err := net.SplitHostPort(lis.Addr().String())
   341  	if err != nil {
   342  		t.Fatalf("net.SplitHostPort(%s) failed: %v", lis.Addr().String(), err)
   343  	}
   344  	port, err := strconv.ParseInt(p, 10, 32)
   345  	if err != nil {
   346  		t.Fatalf("strconv.ParseInt(%s, 10, 32) failed: %v", p, err)
   347  	}
   348  	return host, uint32(port)
   349  }
   350  
   351  // TestServeSuccess tests the successful case of creating an xDS enabled gRPC
   352  // server and calling Serve() on it. The test verifies that an LDS request is
   353  // sent out for the expected name, and also verifies that the serving mode
   354  // changes appropriately.
   355  func (s) TestServeSuccess(t *testing.T) {
   356  	// Setup an xDS management server that pushes on a channel when an LDS
   357  	// request is received by it.
   358  	ldsRequestCh := make(chan []string, 1)
   359  	mgmtServer, nodeID, bootstrapContents, _, cancel := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{
   360  		OnStreamRequest: func(id int64, req *v3discoverypb.DiscoveryRequest) error {
   361  			if req.GetTypeUrl() == version.V3ListenerURL {
   362  				select {
   363  				case ldsRequestCh <- req.GetResourceNames():
   364  				default:
   365  				}
   366  			}
   367  			return nil
   368  		},
   369  	})
   370  	defer cancel()
   371  
   372  	// Override the function to create the underlying grpc.Server to allow the
   373  	// test to verify that Serve() is called on the underlying server.
   374  	fs := newFakeGRPCServer()
   375  	origNewGRPCServer := newGRPCServer
   376  	newGRPCServer = func(opts ...grpc.ServerOption) grpcServer { return fs }
   377  	defer func() { newGRPCServer = origNewGRPCServer }()
   378  
   379  	// Create a new xDS enabled gRPC server and pass it a server option to get
   380  	// notified about serving mode changes.
   381  	modeChangeCh := testutils.NewChannel()
   382  	modeChangeOption := ServingModeCallback(func(addr net.Addr, args ServingModeChangeArgs) {
   383  		t.Logf("Server mode change callback invoked for listener %q with mode %q and error %v", addr.String(), args.Mode, args.Err)
   384  		modeChangeCh.Send(args.Mode)
   385  	})
   386  	server, err := NewGRPCServer(modeChangeOption, BootstrapContentsForTesting(bootstrapContents))
   387  	if err != nil {
   388  		t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
   389  	}
   390  	defer server.Stop()
   391  
   392  	// Call Serve() in a goroutine.
   393  	lis, err := testutils.LocalTCPListener()
   394  	if err != nil {
   395  		t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
   396  	}
   397  	go func() {
   398  		if err := server.Serve(lis); err != nil {
   399  			t.Error(err)
   400  		}
   401  	}()
   402  
   403  	// Ensure that the LDS request is sent out for the expected name.
   404  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   405  	defer cancel()
   406  	var gotNames []string
   407  	select {
   408  	case gotNames = <-ldsRequestCh:
   409  	case <-ctx.Done():
   410  		t.Fatalf("Timeout when waiting for an LDS request to be sent out")
   411  	}
   412  	wantNames := []string{strings.Replace(e2e.ServerListenerResourceNameTemplate, "%s", lis.Addr().String(), -1)}
   413  	if !cmp.Equal(gotNames, wantNames) {
   414  		t.Fatalf("LDS watch registered for names %v, want %v", gotNames, wantNames)
   415  	}
   416  
   417  	// Update the management server with a good listener resource.
   418  	host, port := hostPortFromListener(t, lis)
   419  	resources := e2e.UpdateOptions{
   420  		NodeID:    nodeID,
   421  		Listeners: []*v3listenerpb.Listener{e2e.DefaultServerListener(host, port, e2e.SecurityLevelNone, "routeName")},
   422  	}
   423  	if err := mgmtServer.Update(ctx, resources); err != nil {
   424  		t.Fatal(err)
   425  	}
   426  
   427  	// Verify the serving mode reports SERVING.
   428  	v, err := modeChangeCh.Receive(ctx)
   429  	if err != nil {
   430  		t.Fatalf("Timeout when waiting for serving mode to change: %v", err)
   431  	}
   432  	if mode := v.(connectivity.ServingMode); mode != connectivity.ServingModeServing {
   433  		t.Fatalf("Serving mode is %q, want %q", mode, connectivity.ServingModeServing)
   434  	}
   435  
   436  	// Verify that Serve() is called on the underlying gRPC server.
   437  	if _, err := fs.serveCh.Receive(ctx); err != nil {
   438  		t.Fatalf("Timeout when waiting for Serve() to be invoked on the grpc.Server")
   439  	}
   440  
   441  	// Update the listener resource on the management server in such a way that
   442  	// it will be NACKed by our xDS client. The listener_filters field is
   443  	// unsupported and will be NACKed.
   444  	resources.Listeners[0].ListenerFilters = []*v3listenerpb.ListenerFilter{{Name: "foo"}}
   445  	if err := mgmtServer.Update(ctx, resources); err != nil {
   446  		t.Fatal(err)
   447  	}
   448  
   449  	// Verify that there is no change in the serving mode. The server should
   450  	// continue using the previously received good configuration.
   451  	sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
   452  	defer sCancel()
   453  	if v, err := modeChangeCh.Receive(sCtx); err != context.DeadlineExceeded {
   454  		t.Fatalf("Unexpected change in serving mode. New mode is %v", v.(connectivity.ServingMode))
   455  	}
   456  
   457  	// Remove the listener resource from the management server. This should
   458  	// result in a resource-not-found error from the xDS client and should
   459  	// result in the server moving to NOT_SERVING mode.
   460  	resources.Listeners = nil
   461  	if err := mgmtServer.Update(ctx, resources); err != nil {
   462  		t.Fatal(err)
   463  	}
   464  	v, err = modeChangeCh.Receive(ctx)
   465  	if err != nil {
   466  		t.Fatalf("Timeout when waiting for serving mode to change: %v", err)
   467  	}
   468  	if mode := v.(connectivity.ServingMode); mode != connectivity.ServingModeNotServing {
   469  		t.Fatalf("Serving mode is %q, want %q", mode, connectivity.ServingModeNotServing)
   470  	}
   471  }
   472  
   473  // TestNewServer_ClientCreationFailure tests the case where the xDS client
   474  // creation fails and verifies that the call to NewGRPCServer() fails.
   475  func (s) TestNewServer_ClientCreationFailure(t *testing.T) {
   476  	origNewXDSClient := newXDSClient
   477  	newXDSClient = func() (xdsclient.XDSClient, func(), error) {
   478  		return nil, nil, errors.New("xdsClient creation failed")
   479  	}
   480  	defer func() { newXDSClient = origNewXDSClient }()
   481  
   482  	if _, err := NewGRPCServer(); err == nil {
   483  		t.Fatal("NewGRPCServer() succeeded when expected to fail")
   484  	}
   485  }
   486  
   487  // TestHandleListenerUpdate_NoXDSCreds tests the case where an xds-enabled gRPC
   488  // server is not configured with xDS credentials. Verifies that the security
   489  // config received as part of a Listener update is not acted upon.
   490  func (s) TestHandleListenerUpdate_NoXDSCreds(t *testing.T) {
   491  	mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{})
   492  	if err != nil {
   493  		t.Fatalf("Failed to start xDS management server: %v", err)
   494  	}
   495  	defer mgmtServer.Stop()
   496  
   497  	// Generate bootstrap configuration pointing to the above management server
   498  	// with certificate provider configuration pointing to fake certificate
   499  	// providers.
   500  	nodeID := uuid.NewString()
   501  	bootstrapContents, err := bootstrap.Contents(bootstrap.Options{
   502  		NodeID:    nodeID,
   503  		ServerURI: mgmtServer.Address,
   504  		CertificateProviders: map[string]json.RawMessage{
   505  			e2e.ServerSideCertProviderInstance: fakeProvider1Config,
   506  			e2e.ClientSideCertProviderInstance: fakeProvider2Config,
   507  		},
   508  		ServerListenerResourceNameTemplate: e2e.ServerListenerResourceNameTemplate,
   509  	})
   510  	if err != nil {
   511  		t.Fatalf("Failed to create bootstrap configuration: %v", err)
   512  	}
   513  
   514  	// Create a new xDS enabled gRPC server and pass it a server option to get
   515  	// notified about serving mode changes. Also pass the above bootstrap
   516  	// configuration to be used during xDS client creation.
   517  	modeChangeCh := testutils.NewChannel()
   518  	modeChangeOption := ServingModeCallback(func(addr net.Addr, args ServingModeChangeArgs) {
   519  		t.Logf("Server mode change callback invoked for listener %q with mode %q and error %v", addr.String(), args.Mode, args.Err)
   520  		modeChangeCh.Send(args.Mode)
   521  	})
   522  	server, err := NewGRPCServer(modeChangeOption, BootstrapContentsForTesting(bootstrapContents))
   523  	if err != nil {
   524  		t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
   525  	}
   526  	defer server.Stop()
   527  
   528  	// Call Serve() in a goroutine.
   529  	lis, err := testutils.LocalTCPListener()
   530  	if err != nil {
   531  		t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
   532  	}
   533  	go func() {
   534  		if err := server.Serve(lis); err != nil {
   535  			t.Error(err)
   536  		}
   537  	}()
   538  
   539  	// Update the management server with a good listener resource that contains
   540  	// security configuration.
   541  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   542  	defer cancel()
   543  	host, port := hostPortFromListener(t, lis)
   544  	resources := e2e.UpdateOptions{
   545  		NodeID:    nodeID,
   546  		Listeners: []*v3listenerpb.Listener{e2e.DefaultServerListener(host, port, e2e.SecurityLevelMTLS, "routeName")},
   547  	}
   548  	if err := mgmtServer.Update(ctx, resources); err != nil {
   549  		t.Fatal(err)
   550  	}
   551  
   552  	// Verify the serving mode reports SERVING.
   553  	v, err := modeChangeCh.Receive(ctx)
   554  	if err != nil {
   555  		t.Fatalf("Timeout when waiting for serving mode to change: %v", err)
   556  	}
   557  	if mode := v.(connectivity.ServingMode); mode != connectivity.ServingModeServing {
   558  		t.Fatalf("Serving mode is %q, want %q", mode, connectivity.ServingModeServing)
   559  	}
   560  
   561  	// Make sure the security configuration is not acted upon.
   562  	if err := verifyCertProviderNotCreated(); err != nil {
   563  		t.Fatal(err)
   564  	}
   565  }
   566  
   567  // TestHandleListenerUpdate_ErrorUpdate tests the case where an xds-enabled gRPC
   568  // server is configured with xDS credentials, but receives a Listener update
   569  // with an error. Verifies that no certificate providers are created.
   570  func (s) TestHandleListenerUpdate_ErrorUpdate(t *testing.T) {
   571  	// Setup an xDS management server that pushes on a channel when an LDS
   572  	// request is received by it.
   573  	ldsRequestCh := make(chan []string, 1)
   574  	mgmtServer, nodeID, _, _, cancel := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{
   575  		OnStreamRequest: func(id int64, req *v3discoverypb.DiscoveryRequest) error {
   576  			if req.GetTypeUrl() == version.V3ListenerURL {
   577  				select {
   578  				case ldsRequestCh <- req.GetResourceNames():
   579  				default:
   580  				}
   581  			}
   582  			return nil
   583  		},
   584  	})
   585  	defer cancel()
   586  
   587  	// Generate bootstrap configuration pointing to the above management server
   588  	// with certificate provider configuration pointing to fake certificate
   589  	// providers.
   590  	bootstrapContents, err := bootstrap.Contents(bootstrap.Options{
   591  		NodeID:    nodeID,
   592  		ServerURI: mgmtServer.Address,
   593  		CertificateProviders: map[string]json.RawMessage{
   594  			e2e.ServerSideCertProviderInstance: fakeProvider1Config,
   595  			e2e.ClientSideCertProviderInstance: fakeProvider2Config,
   596  		},
   597  		ServerListenerResourceNameTemplate: e2e.ServerListenerResourceNameTemplate,
   598  	})
   599  	if err != nil {
   600  		t.Fatalf("Failed to create bootstrap configuration: %v", err)
   601  	}
   602  
   603  	// Create a new xDS enabled gRPC server and pass it a server option to get
   604  	// notified about serving mode changes. Also pass the above bootstrap
   605  	// configuration to be used during xDS client creation.
   606  	modeChangeCh := testutils.NewChannel()
   607  	modeChangeOption := ServingModeCallback(func(addr net.Addr, args ServingModeChangeArgs) {
   608  		t.Logf("Server mode change callback invoked for listener %q with mode %q and error %v", addr.String(), args.Mode, args.Err)
   609  		modeChangeCh.Send(args.Mode)
   610  	})
   611  	server, err := NewGRPCServer(modeChangeOption, BootstrapContentsForTesting(bootstrapContents))
   612  	if err != nil {
   613  		t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
   614  	}
   615  	defer server.Stop()
   616  
   617  	// Call Serve() in a goroutine.
   618  	lis, err := testutils.LocalTCPListener()
   619  	if err != nil {
   620  		t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
   621  	}
   622  	go server.Serve(lis)
   623  
   624  	// Update the listener resource on the management server in such a way that
   625  	// it will be NACKed by our xDS client. The listener_filters field is
   626  	// unsupported and will be NACKed.
   627  	ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
   628  	defer cancel()
   629  	host, port := hostPortFromListener(t, lis)
   630  	listener := e2e.DefaultServerListener(host, port, e2e.SecurityLevelMTLS, "routeName")
   631  	listener.ListenerFilters = []*v3listenerpb.ListenerFilter{{Name: "foo"}}
   632  	resources := e2e.UpdateOptions{
   633  		NodeID:    nodeID,
   634  		Listeners: []*v3listenerpb.Listener{listener},
   635  	}
   636  	if err := mgmtServer.Update(ctx, resources); err != nil {
   637  		t.Fatal(err)
   638  	}
   639  
   640  	// Ensure that the LDS request is sent out for the expected name.
   641  	var gotNames []string
   642  	select {
   643  	case gotNames = <-ldsRequestCh:
   644  	case <-ctx.Done():
   645  		t.Fatalf("Timeout when waiting for an LDS request to be sent out")
   646  	}
   647  	wantNames := []string{strings.Replace(e2e.ServerListenerResourceNameTemplate, "%s", lis.Addr().String(), -1)}
   648  	if !cmp.Equal(gotNames, wantNames) {
   649  		t.Fatalf("LDS watch registered for names %v, want %v", gotNames, wantNames)
   650  	}
   651  
   652  	// Make sure that no certificate providers are created.
   653  	if err := verifyCertProviderNotCreated(); err != nil {
   654  		t.Fatal(err)
   655  	}
   656  
   657  	// Also make sure that no serving mode updates are received. The serving
   658  	// mode does not change until the server comes to the conclusion that the
   659  	// requested resource is not present in the management server. This happens
   660  	// when the watch timer expires or when the resource is explicitly deleted
   661  	// by the management server.
   662  	sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
   663  	defer sCancel()
   664  	if _, err := modeChangeCh.Receive(sCtx); err != context.DeadlineExceeded {
   665  		t.Fatal("Serving mode changed received when none expected")
   666  	}
   667  }
   668  
   669  // TestServeReturnsErrorAfterClose tests that the xds Server returns
   670  // grpc.ErrServerStopped if Serve is called after Close on the server.
   671  func (s) TestServeReturnsErrorAfterClose(t *testing.T) {
   672  	server, err := NewGRPCServer(BootstrapContentsForTesting(generateBootstrapContents(t, uuid.NewString(), nonExistentManagementServer)))
   673  	if err != nil {
   674  		t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
   675  	}
   676  
   677  	lis, err := testutils.LocalTCPListener()
   678  	if err != nil {
   679  		t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
   680  	}
   681  	server.Stop()
   682  	err = server.Serve(lis)
   683  	if err == nil || !strings.Contains(err.Error(), grpc.ErrServerStopped.Error()) {
   684  		t.Fatalf("server erorred with wrong error, want: %v, got :%v", grpc.ErrServerStopped, err)
   685  	}
   686  }
   687  
   688  // TestServeAndCloseDoNotRace tests that Serve and Close on the xDS Server do
   689  // not race and leak the xDS Client. A leak would be found by the leak checker.
   690  func (s) TestServeAndCloseDoNotRace(t *testing.T) {
   691  	lis, err := testutils.LocalTCPListener()
   692  	if err != nil {
   693  		t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
   694  	}
   695  
   696  	wg := sync.WaitGroup{}
   697  	for i := 0; i < 100; i++ {
   698  		server, err := NewGRPCServer(BootstrapContentsForTesting(generateBootstrapContents(t, uuid.NewString(), nonExistentManagementServer)))
   699  		if err != nil {
   700  			t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err)
   701  		}
   702  		wg.Add(1)
   703  		go func() {
   704  			server.Serve(lis)
   705  			wg.Done()
   706  		}()
   707  		wg.Add(1)
   708  		go func() {
   709  			server.Stop()
   710  			wg.Done()
   711  		}()
   712  	}
   713  	wg.Wait()
   714  }
   715  

View as plain text