/* * * Copyright 2020 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ package xds import ( "context" "encoding/json" "errors" "fmt" "net" "reflect" "strconv" "strings" "sync" "testing" "time" "github.com/google/go-cmp/cmp" "github.com/google/uuid" "google.golang.org/grpc" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/credentials/tls/certprovider" "google.golang.org/grpc/credentials/xds" "google.golang.org/grpc/internal/grpctest" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/internal/testutils/xds/bootstrap" "google.golang.org/grpc/internal/testutils/xds/e2e" "google.golang.org/grpc/xds/internal/xdsclient" "google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version" v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" _ "google.golang.org/grpc/xds/internal/httpfilter/router" // Register the router filter ) const ( defaultTestTimeout = 5 * time.Second defaultTestShortTimeout = 10 * time.Millisecond nonExistentManagementServer = "non-existent-management-server" ) type s struct { grpctest.Tester } func Test(t *testing.T) { grpctest.RunSubTests(t, s{}) } type fakeGRPCServer struct { done chan struct{} registerServiceCh *testutils.Channel serveCh *testutils.Channel } func (f *fakeGRPCServer) RegisterService(*grpc.ServiceDesc, any) { f.registerServiceCh.Send(nil) } func (f *fakeGRPCServer) Serve(lis net.Listener) error { f.serveCh.Send(nil) <-f.done lis.Close() return nil } func (f *fakeGRPCServer) Stop() { close(f.done) } func (f *fakeGRPCServer) GracefulStop() { close(f.done) } func (f *fakeGRPCServer) GetServiceInfo() map[string]grpc.ServiceInfo { panic("implement me") } func newFakeGRPCServer() *fakeGRPCServer { return &fakeGRPCServer{ done: make(chan struct{}), registerServiceCh: testutils.NewChannel(), serveCh: testutils.NewChannel(), } } func generateBootstrapContents(t *testing.T, nodeID, serverURI string) []byte { t.Helper() bs, err := e2e.DefaultBootstrapContents(nodeID, serverURI) if err != nil { t.Fatal(err) } return bs } func (s) TestNewServer_Success(t *testing.T) { xdsCreds, err := xds.NewServerCredentials(xds.ServerOptions{FallbackCreds: insecure.NewCredentials()}) if err != nil { t.Fatalf("failed to create xds server credentials: %v", err) } tests := []struct { desc string serverOpts []grpc.ServerOption wantXDSCredsInUse bool }{ { desc: "without_xds_creds", serverOpts: []grpc.ServerOption{ grpc.Creds(insecure.NewCredentials()), BootstrapContentsForTesting(generateBootstrapContents(t, uuid.NewString(), nonExistentManagementServer)), }, }, { desc: "with_xds_creds", serverOpts: []grpc.ServerOption{ grpc.Creds(xdsCreds), BootstrapContentsForTesting(generateBootstrapContents(t, uuid.NewString(), nonExistentManagementServer)), }, wantXDSCredsInUse: true, }, } for _, test := range tests { t.Run(test.desc, func(t *testing.T) { // The xds package adds a couple of server options (unary and stream // interceptors) to the server options passed in by the user. wantServerOpts := len(test.serverOpts) + 2 origNewGRPCServer := newGRPCServer newGRPCServer = func(opts ...grpc.ServerOption) grpcServer { if got := len(opts); got != wantServerOpts { t.Fatalf("%d ServerOptions passed to grpc.Server, want %d", got, wantServerOpts) } // Verify that the user passed ServerOptions are forwarded as is. if !reflect.DeepEqual(opts[2:], test.serverOpts) { t.Fatalf("got ServerOptions %v, want %v", opts[2:], test.serverOpts) } return grpc.NewServer(opts...) } defer func() { newGRPCServer = origNewGRPCServer }() s, err := NewGRPCServer(test.serverOpts...) if err != nil { t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err) } defer s.Stop() }) } } func (s) TestNewServer_Failure(t *testing.T) { xdsCreds, err := xds.NewServerCredentials(xds.ServerOptions{FallbackCreds: insecure.NewCredentials()}) if err != nil { t.Fatalf("failed to create xds server credentials: %v", err) } tests := []struct { desc string serverOpts []grpc.ServerOption wantErr string }{ { desc: "bootstrap env var not set", serverOpts: []grpc.ServerOption{grpc.Creds(xdsCreds)}, wantErr: "bootstrap env vars are unspecified", }, { desc: "empty bootstrap config", serverOpts: []grpc.ServerOption{ grpc.Creds(xdsCreds), BootstrapContentsForTesting([]byte(`{}`)), }, wantErr: "xDS client creation failed", }, { desc: "server_listener_resource_name_template is missing", serverOpts: []grpc.ServerOption{ grpc.Creds(xdsCreds), func() grpc.ServerOption { bs, err := bootstrap.Contents(bootstrap.Options{ NodeID: uuid.New().String(), ServerURI: nonExistentManagementServer, CertificateProviders: map[string]json.RawMessage{ "cert-provider-instance": json.RawMessage("{}"), }, }) if err != nil { t.Errorf("Failed to create bootstrap configuration: %v", err) } return BootstrapContentsForTesting(bs) }(), }, wantErr: "missing server_listener_resource_name_template in the bootstrap configuration", }, } for _, test := range tests { t.Run(test.desc, func(t *testing.T) { s, err := NewGRPCServer(test.serverOpts...) if err == nil { s.Stop() t.Fatal("NewGRPCServer() succeeded when expected to fail") } if !strings.Contains(err.Error(), test.wantErr) { t.Fatalf("NewGRPCServer() failed with error: %v, want: %s", err, test.wantErr) } }) } } func (s) TestRegisterService(t *testing.T) { fs := newFakeGRPCServer() origNewGRPCServer := newGRPCServer newGRPCServer = func(opts ...grpc.ServerOption) grpcServer { return fs } defer func() { newGRPCServer = origNewGRPCServer }() s, err := NewGRPCServer(BootstrapContentsForTesting(generateBootstrapContents(t, uuid.NewString(), "non-existent-management-server"))) if err != nil { t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err) } defer s.Stop() s.RegisterService(&grpc.ServiceDesc{}, nil) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() if _, err := fs.registerServiceCh.Receive(ctx); err != nil { t.Fatalf("Timeout when expecting RegisterService() to called on grpc.Server: %v", err) } } const ( fakeProvider1Name = "fake-certificate-provider-1" fakeProvider2Name = "fake-certificate-provider-2" ) var ( fpb1, fpb2 *fakeProviderBuilder fakeProvider1Config json.RawMessage fakeProvider2Config json.RawMessage ) func init() { fpb1 = &fakeProviderBuilder{ name: fakeProvider1Name, buildCh: testutils.NewChannel(), } fpb2 = &fakeProviderBuilder{ name: fakeProvider2Name, buildCh: testutils.NewChannel(), } certprovider.Register(fpb1) certprovider.Register(fpb2) fakeProvider1Config = json.RawMessage(fmt.Sprintf(`{ "plugin_name": "%s", "config": "my fake config 1" }`, fakeProvider1Name)) fakeProvider2Config = json.RawMessage(fmt.Sprintf(`{ "plugin_name": "%s", "config": "my fake config 2" }`, fakeProvider2Name)) } // fakeProviderBuilder builds new instances of fakeProvider and interprets the // config provided to it as a string. type fakeProviderBuilder struct { name string buildCh *testutils.Channel } func (b *fakeProviderBuilder) ParseConfig(cfg any) (*certprovider.BuildableConfig, error) { var config string if err := json.Unmarshal(cfg.(json.RawMessage), &config); err != nil { return nil, fmt.Errorf("providerBuilder %s failed to unmarshal config: %v", b.name, cfg) } return certprovider.NewBuildableConfig(b.name, []byte(config), func(certprovider.BuildOptions) certprovider.Provider { b.buildCh.Send(nil) return &fakeProvider{ Distributor: certprovider.NewDistributor(), config: config, } }), nil } func (b *fakeProviderBuilder) Name() string { return b.name } // fakeProvider is an implementation of the Provider interface which provides a // method for tests to invoke to push new key materials. type fakeProvider struct { *certprovider.Distributor config string } // Close helps implement the Provider interface. func (p *fakeProvider) Close() { p.Distributor.Stop() } func verifyCertProviderNotCreated() error { sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) defer sCancel() if _, err := fpb1.buildCh.Receive(sCtx); err != context.DeadlineExceeded { return errors.New("certificate provider created when no xDS creds were specified") } sCtx, sCancel = context.WithTimeout(context.Background(), defaultTestShortTimeout) defer sCancel() if _, err := fpb2.buildCh.Receive(sCtx); err != context.DeadlineExceeded { return errors.New("certificate provider created when no xDS creds were specified") } return nil } func hostPortFromListener(t *testing.T, lis net.Listener) (string, uint32) { t.Helper() host, p, err := net.SplitHostPort(lis.Addr().String()) if err != nil { t.Fatalf("net.SplitHostPort(%s) failed: %v", lis.Addr().String(), err) } port, err := strconv.ParseInt(p, 10, 32) if err != nil { t.Fatalf("strconv.ParseInt(%s, 10, 32) failed: %v", p, err) } return host, uint32(port) } // TestServeSuccess tests the successful case of creating an xDS enabled gRPC // server and calling Serve() on it. The test verifies that an LDS request is // sent out for the expected name, and also verifies that the serving mode // changes appropriately. func (s) TestServeSuccess(t *testing.T) { // Setup an xDS management server that pushes on a channel when an LDS // request is received by it. ldsRequestCh := make(chan []string, 1) mgmtServer, nodeID, bootstrapContents, _, cancel := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{ OnStreamRequest: func(id int64, req *v3discoverypb.DiscoveryRequest) error { if req.GetTypeUrl() == version.V3ListenerURL { select { case ldsRequestCh <- req.GetResourceNames(): default: } } return nil }, }) defer cancel() // Override the function to create the underlying grpc.Server to allow the // test to verify that Serve() is called on the underlying server. fs := newFakeGRPCServer() origNewGRPCServer := newGRPCServer newGRPCServer = func(opts ...grpc.ServerOption) grpcServer { return fs } defer func() { newGRPCServer = origNewGRPCServer }() // Create a new xDS enabled gRPC server and pass it a server option to get // notified about serving mode changes. modeChangeCh := testutils.NewChannel() modeChangeOption := ServingModeCallback(func(addr net.Addr, args ServingModeChangeArgs) { t.Logf("Server mode change callback invoked for listener %q with mode %q and error %v", addr.String(), args.Mode, args.Err) modeChangeCh.Send(args.Mode) }) server, err := NewGRPCServer(modeChangeOption, BootstrapContentsForTesting(bootstrapContents)) if err != nil { t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err) } defer server.Stop() // Call Serve() in a goroutine. lis, err := testutils.LocalTCPListener() if err != nil { t.Fatalf("testutils.LocalTCPListener() failed: %v", err) } go func() { if err := server.Serve(lis); err != nil { t.Error(err) } }() // Ensure that the LDS request is sent out for the expected name. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() var gotNames []string select { case gotNames = <-ldsRequestCh: case <-ctx.Done(): t.Fatalf("Timeout when waiting for an LDS request to be sent out") } wantNames := []string{strings.Replace(e2e.ServerListenerResourceNameTemplate, "%s", lis.Addr().String(), -1)} if !cmp.Equal(gotNames, wantNames) { t.Fatalf("LDS watch registered for names %v, want %v", gotNames, wantNames) } // Update the management server with a good listener resource. host, port := hostPortFromListener(t, lis) resources := e2e.UpdateOptions{ NodeID: nodeID, Listeners: []*v3listenerpb.Listener{e2e.DefaultServerListener(host, port, e2e.SecurityLevelNone, "routeName")}, } if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatal(err) } // Verify the serving mode reports SERVING. v, err := modeChangeCh.Receive(ctx) if err != nil { t.Fatalf("Timeout when waiting for serving mode to change: %v", err) } if mode := v.(connectivity.ServingMode); mode != connectivity.ServingModeServing { t.Fatalf("Serving mode is %q, want %q", mode, connectivity.ServingModeServing) } // Verify that Serve() is called on the underlying gRPC server. if _, err := fs.serveCh.Receive(ctx); err != nil { t.Fatalf("Timeout when waiting for Serve() to be invoked on the grpc.Server") } // Update the listener resource on the management server in such a way that // it will be NACKed by our xDS client. The listener_filters field is // unsupported and will be NACKed. resources.Listeners[0].ListenerFilters = []*v3listenerpb.ListenerFilter{{Name: "foo"}} if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatal(err) } // Verify that there is no change in the serving mode. The server should // continue using the previously received good configuration. sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) defer sCancel() if v, err := modeChangeCh.Receive(sCtx); err != context.DeadlineExceeded { t.Fatalf("Unexpected change in serving mode. New mode is %v", v.(connectivity.ServingMode)) } // Remove the listener resource from the management server. This should // result in a resource-not-found error from the xDS client and should // result in the server moving to NOT_SERVING mode. resources.Listeners = nil if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatal(err) } v, err = modeChangeCh.Receive(ctx) if err != nil { t.Fatalf("Timeout when waiting for serving mode to change: %v", err) } if mode := v.(connectivity.ServingMode); mode != connectivity.ServingModeNotServing { t.Fatalf("Serving mode is %q, want %q", mode, connectivity.ServingModeNotServing) } } // TestNewServer_ClientCreationFailure tests the case where the xDS client // creation fails and verifies that the call to NewGRPCServer() fails. func (s) TestNewServer_ClientCreationFailure(t *testing.T) { origNewXDSClient := newXDSClient newXDSClient = func() (xdsclient.XDSClient, func(), error) { return nil, nil, errors.New("xdsClient creation failed") } defer func() { newXDSClient = origNewXDSClient }() if _, err := NewGRPCServer(); err == nil { t.Fatal("NewGRPCServer() succeeded when expected to fail") } } // TestHandleListenerUpdate_NoXDSCreds tests the case where an xds-enabled gRPC // server is not configured with xDS credentials. Verifies that the security // config received as part of a Listener update is not acted upon. func (s) TestHandleListenerUpdate_NoXDSCreds(t *testing.T) { mgmtServer, err := e2e.StartManagementServer(e2e.ManagementServerOptions{}) if err != nil { t.Fatalf("Failed to start xDS management server: %v", err) } defer mgmtServer.Stop() // Generate bootstrap configuration pointing to the above management server // with certificate provider configuration pointing to fake certificate // providers. nodeID := uuid.NewString() bootstrapContents, err := bootstrap.Contents(bootstrap.Options{ NodeID: nodeID, ServerURI: mgmtServer.Address, CertificateProviders: map[string]json.RawMessage{ e2e.ServerSideCertProviderInstance: fakeProvider1Config, e2e.ClientSideCertProviderInstance: fakeProvider2Config, }, ServerListenerResourceNameTemplate: e2e.ServerListenerResourceNameTemplate, }) if err != nil { t.Fatalf("Failed to create bootstrap configuration: %v", err) } // Create a new xDS enabled gRPC server and pass it a server option to get // notified about serving mode changes. Also pass the above bootstrap // configuration to be used during xDS client creation. modeChangeCh := testutils.NewChannel() modeChangeOption := ServingModeCallback(func(addr net.Addr, args ServingModeChangeArgs) { t.Logf("Server mode change callback invoked for listener %q with mode %q and error %v", addr.String(), args.Mode, args.Err) modeChangeCh.Send(args.Mode) }) server, err := NewGRPCServer(modeChangeOption, BootstrapContentsForTesting(bootstrapContents)) if err != nil { t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err) } defer server.Stop() // Call Serve() in a goroutine. lis, err := testutils.LocalTCPListener() if err != nil { t.Fatalf("testutils.LocalTCPListener() failed: %v", err) } go func() { if err := server.Serve(lis); err != nil { t.Error(err) } }() // Update the management server with a good listener resource that contains // security configuration. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() host, port := hostPortFromListener(t, lis) resources := e2e.UpdateOptions{ NodeID: nodeID, Listeners: []*v3listenerpb.Listener{e2e.DefaultServerListener(host, port, e2e.SecurityLevelMTLS, "routeName")}, } if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatal(err) } // Verify the serving mode reports SERVING. v, err := modeChangeCh.Receive(ctx) if err != nil { t.Fatalf("Timeout when waiting for serving mode to change: %v", err) } if mode := v.(connectivity.ServingMode); mode != connectivity.ServingModeServing { t.Fatalf("Serving mode is %q, want %q", mode, connectivity.ServingModeServing) } // Make sure the security configuration is not acted upon. if err := verifyCertProviderNotCreated(); err != nil { t.Fatal(err) } } // TestHandleListenerUpdate_ErrorUpdate tests the case where an xds-enabled gRPC // server is configured with xDS credentials, but receives a Listener update // with an error. Verifies that no certificate providers are created. func (s) TestHandleListenerUpdate_ErrorUpdate(t *testing.T) { // Setup an xDS management server that pushes on a channel when an LDS // request is received by it. ldsRequestCh := make(chan []string, 1) mgmtServer, nodeID, _, _, cancel := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{ OnStreamRequest: func(id int64, req *v3discoverypb.DiscoveryRequest) error { if req.GetTypeUrl() == version.V3ListenerURL { select { case ldsRequestCh <- req.GetResourceNames(): default: } } return nil }, }) defer cancel() // Generate bootstrap configuration pointing to the above management server // with certificate provider configuration pointing to fake certificate // providers. bootstrapContents, err := bootstrap.Contents(bootstrap.Options{ NodeID: nodeID, ServerURI: mgmtServer.Address, CertificateProviders: map[string]json.RawMessage{ e2e.ServerSideCertProviderInstance: fakeProvider1Config, e2e.ClientSideCertProviderInstance: fakeProvider2Config, }, ServerListenerResourceNameTemplate: e2e.ServerListenerResourceNameTemplate, }) if err != nil { t.Fatalf("Failed to create bootstrap configuration: %v", err) } // Create a new xDS enabled gRPC server and pass it a server option to get // notified about serving mode changes. Also pass the above bootstrap // configuration to be used during xDS client creation. modeChangeCh := testutils.NewChannel() modeChangeOption := ServingModeCallback(func(addr net.Addr, args ServingModeChangeArgs) { t.Logf("Server mode change callback invoked for listener %q with mode %q and error %v", addr.String(), args.Mode, args.Err) modeChangeCh.Send(args.Mode) }) server, err := NewGRPCServer(modeChangeOption, BootstrapContentsForTesting(bootstrapContents)) if err != nil { t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err) } defer server.Stop() // Call Serve() in a goroutine. lis, err := testutils.LocalTCPListener() if err != nil { t.Fatalf("testutils.LocalTCPListener() failed: %v", err) } go server.Serve(lis) // Update the listener resource on the management server in such a way that // it will be NACKed by our xDS client. The listener_filters field is // unsupported and will be NACKed. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() host, port := hostPortFromListener(t, lis) listener := e2e.DefaultServerListener(host, port, e2e.SecurityLevelMTLS, "routeName") listener.ListenerFilters = []*v3listenerpb.ListenerFilter{{Name: "foo"}} resources := e2e.UpdateOptions{ NodeID: nodeID, Listeners: []*v3listenerpb.Listener{listener}, } if err := mgmtServer.Update(ctx, resources); err != nil { t.Fatal(err) } // Ensure that the LDS request is sent out for the expected name. var gotNames []string select { case gotNames = <-ldsRequestCh: case <-ctx.Done(): t.Fatalf("Timeout when waiting for an LDS request to be sent out") } wantNames := []string{strings.Replace(e2e.ServerListenerResourceNameTemplate, "%s", lis.Addr().String(), -1)} if !cmp.Equal(gotNames, wantNames) { t.Fatalf("LDS watch registered for names %v, want %v", gotNames, wantNames) } // Make sure that no certificate providers are created. if err := verifyCertProviderNotCreated(); err != nil { t.Fatal(err) } // Also make sure that no serving mode updates are received. The serving // mode does not change until the server comes to the conclusion that the // requested resource is not present in the management server. This happens // when the watch timer expires or when the resource is explicitly deleted // by the management server. sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) defer sCancel() if _, err := modeChangeCh.Receive(sCtx); err != context.DeadlineExceeded { t.Fatal("Serving mode changed received when none expected") } } // TestServeReturnsErrorAfterClose tests that the xds Server returns // grpc.ErrServerStopped if Serve is called after Close on the server. func (s) TestServeReturnsErrorAfterClose(t *testing.T) { server, err := NewGRPCServer(BootstrapContentsForTesting(generateBootstrapContents(t, uuid.NewString(), nonExistentManagementServer))) if err != nil { t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err) } lis, err := testutils.LocalTCPListener() if err != nil { t.Fatalf("testutils.LocalTCPListener() failed: %v", err) } server.Stop() err = server.Serve(lis) if err == nil || !strings.Contains(err.Error(), grpc.ErrServerStopped.Error()) { t.Fatalf("server erorred with wrong error, want: %v, got :%v", grpc.ErrServerStopped, err) } } // TestServeAndCloseDoNotRace tests that Serve and Close on the xDS Server do // not race and leak the xDS Client. A leak would be found by the leak checker. func (s) TestServeAndCloseDoNotRace(t *testing.T) { lis, err := testutils.LocalTCPListener() if err != nil { t.Fatalf("testutils.LocalTCPListener() failed: %v", err) } wg := sync.WaitGroup{} for i := 0; i < 100; i++ { server, err := NewGRPCServer(BootstrapContentsForTesting(generateBootstrapContents(t, uuid.NewString(), nonExistentManagementServer))) if err != nil { t.Fatalf("Failed to create an xDS enabled gRPC server: %v", err) } wg.Add(1) go func() { server.Serve(lis) wg.Done() }() wg.Add(1) go func() { server.Stop() wg.Done() }() } wg.Wait() }