...

Source file src/google.golang.org/grpc/xds/server.go

Documentation: google.golang.org/grpc/xds

     1  /*
     2   *
     3   * Copyright 2020 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  package xds
    20  
    21  import (
    22  	"context"
    23  	"errors"
    24  	"fmt"
    25  	"net"
    26  
    27  	"google.golang.org/grpc"
    28  	"google.golang.org/grpc/codes"
    29  	"google.golang.org/grpc/connectivity"
    30  	internalgrpclog "google.golang.org/grpc/internal/grpclog"
    31  	"google.golang.org/grpc/internal/grpcsync"
    32  	iresolver "google.golang.org/grpc/internal/resolver"
    33  	"google.golang.org/grpc/internal/transport"
    34  	"google.golang.org/grpc/internal/xds/bootstrap"
    35  	"google.golang.org/grpc/metadata"
    36  	"google.golang.org/grpc/status"
    37  	"google.golang.org/grpc/xds/internal/server"
    38  	"google.golang.org/grpc/xds/internal/xdsclient"
    39  	"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
    40  )
    41  
    42  const serverPrefix = "[xds-server %p] "
    43  
    44  var (
    45  	// These new functions will be overridden in unit tests.
    46  	newXDSClient = func() (xdsclient.XDSClient, func(), error) {
    47  		return xdsclient.New()
    48  	}
    49  	newGRPCServer = func(opts ...grpc.ServerOption) grpcServer {
    50  		return grpc.NewServer(opts...)
    51  	}
    52  )
    53  
    54  // grpcServer contains methods from grpc.Server which are used by the
    55  // GRPCServer type here. This is useful for overriding in unit tests.
    56  type grpcServer interface {
    57  	RegisterService(*grpc.ServiceDesc, any)
    58  	Serve(net.Listener) error
    59  	Stop()
    60  	GracefulStop()
    61  	GetServiceInfo() map[string]grpc.ServiceInfo
    62  }
    63  
    64  // GRPCServer wraps a gRPC server and provides server-side xDS functionality, by
    65  // communication with a management server using xDS APIs. It implements the
    66  // grpc.ServiceRegistrar interface and can be passed to service registration
    67  // functions in IDL generated code.
    68  type GRPCServer struct {
    69  	gs             grpcServer
    70  	quit           *grpcsync.Event
    71  	logger         *internalgrpclog.PrefixLogger
    72  	opts           *serverOptions
    73  	xdsC           xdsclient.XDSClient
    74  	xdsClientClose func()
    75  }
    76  
    77  // NewGRPCServer creates an xDS-enabled gRPC server using the passed in opts.
    78  // The underlying gRPC server has no service registered and has not started to
    79  // accept requests yet.
    80  func NewGRPCServer(opts ...grpc.ServerOption) (*GRPCServer, error) {
    81  	newOpts := []grpc.ServerOption{
    82  		grpc.ChainUnaryInterceptor(xdsUnaryInterceptor),
    83  		grpc.ChainStreamInterceptor(xdsStreamInterceptor),
    84  	}
    85  	newOpts = append(newOpts, opts...)
    86  	s := &GRPCServer{
    87  		gs:   newGRPCServer(newOpts...),
    88  		quit: grpcsync.NewEvent(),
    89  	}
    90  	s.handleServerOptions(opts)
    91  
    92  	// Initializing the xDS client upfront (instead of at serving time)
    93  	// simplifies the code by eliminating the need for a mutex to protect the
    94  	// xdsC and xdsClientClose fields.
    95  	newXDSClient := newXDSClient
    96  	if s.opts.bootstrapContentsForTesting != nil {
    97  		// Bootstrap file contents may be specified as a server option for tests.
    98  		newXDSClient = func() (xdsclient.XDSClient, func(), error) {
    99  			return xdsclient.NewWithBootstrapContentsForTesting(s.opts.bootstrapContentsForTesting)
   100  		}
   101  	}
   102  	xdsClient, xdsClientClose, err := newXDSClient()
   103  	if err != nil {
   104  		return nil, fmt.Errorf("xDS client creation failed: %v", err)
   105  	}
   106  
   107  	// Validate the bootstrap configuration for server specific fields.
   108  
   109  	// Listener resource name template is mandatory on the server side.
   110  	cfg := xdsClient.BootstrapConfig()
   111  	if cfg.ServerListenerResourceNameTemplate == "" {
   112  		xdsClientClose()
   113  		return nil, errors.New("missing server_listener_resource_name_template in the bootstrap configuration")
   114  	}
   115  
   116  	s.xdsC = xdsClient
   117  	s.xdsClientClose = xdsClientClose
   118  
   119  	s.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(serverPrefix, s))
   120  	s.logger.Infof("Created xds.GRPCServer")
   121  
   122  	return s, nil
   123  }
   124  
   125  // handleServerOptions iterates through the list of server options passed in by
   126  // the user, and handles the xDS server specific options.
   127  func (s *GRPCServer) handleServerOptions(opts []grpc.ServerOption) {
   128  	so := s.defaultServerOptions()
   129  	for _, opt := range opts {
   130  		if o, ok := opt.(*serverOption); ok {
   131  			o.apply(so)
   132  		}
   133  	}
   134  	s.opts = so
   135  }
   136  
   137  func (s *GRPCServer) defaultServerOptions() *serverOptions {
   138  	return &serverOptions{
   139  		// A default serving mode change callback which simply logs at the
   140  		// default-visible log level. This will be used if the application does not
   141  		// register a mode change callback.
   142  		//
   143  		// Note that this means that `s.opts.modeCallback` will never be nil and can
   144  		// safely be invoked directly from `handleServingModeChanges`.
   145  		modeCallback: s.loggingServerModeChangeCallback,
   146  	}
   147  }
   148  
   149  func (s *GRPCServer) loggingServerModeChangeCallback(addr net.Addr, args ServingModeChangeArgs) {
   150  	switch args.Mode {
   151  	case connectivity.ServingModeServing:
   152  		s.logger.Errorf("Listener %q entering mode: %q", addr.String(), args.Mode)
   153  	case connectivity.ServingModeNotServing:
   154  		s.logger.Errorf("Listener %q entering mode: %q due to error: %v", addr.String(), args.Mode, args.Err)
   155  	}
   156  }
   157  
   158  // RegisterService registers a service and its implementation to the underlying
   159  // gRPC server. It is called from the IDL generated code. This must be called
   160  // before invoking Serve.
   161  func (s *GRPCServer) RegisterService(sd *grpc.ServiceDesc, ss any) {
   162  	s.gs.RegisterService(sd, ss)
   163  }
   164  
   165  // GetServiceInfo returns a map from service names to ServiceInfo.
   166  // Service names include the package names, in the form of <package>.<service>.
   167  func (s *GRPCServer) GetServiceInfo() map[string]grpc.ServiceInfo {
   168  	return s.gs.GetServiceInfo()
   169  }
   170  
   171  // Serve gets the underlying gRPC server to accept incoming connections on the
   172  // listener lis, which is expected to be listening on a TCP port.
   173  //
   174  // A connection to the management server, to receive xDS configuration, is
   175  // initiated here.
   176  //
   177  // Serve will return a non-nil error unless Stop or GracefulStop is called.
   178  func (s *GRPCServer) Serve(lis net.Listener) error {
   179  	s.logger.Infof("Serve() passed a net.Listener on %s", lis.Addr().String())
   180  	if _, ok := lis.Addr().(*net.TCPAddr); !ok {
   181  		return fmt.Errorf("xds: GRPCServer expects listener to return a net.TCPAddr. Got %T", lis.Addr())
   182  	}
   183  
   184  	if s.quit.HasFired() {
   185  		return grpc.ErrServerStopped
   186  	}
   187  
   188  	// The server listener resource name template from the bootstrap
   189  	// configuration contains a template for the name of the Listener resource
   190  	// to subscribe to for a gRPC server. If the token `%s` is present in the
   191  	// string, it will be replaced with the server's listening "IP:port" (e.g.,
   192  	// "0.0.0.0:8080", "[::]:8080").
   193  	cfg := s.xdsC.BootstrapConfig()
   194  	name := bootstrap.PopulateResourceTemplate(cfg.ServerListenerResourceNameTemplate, lis.Addr().String())
   195  
   196  	// Create a listenerWrapper which handles all functionality required by
   197  	// this particular instance of Serve().
   198  	lw := server.NewListenerWrapper(server.ListenerWrapperParams{
   199  		Listener:             lis,
   200  		ListenerResourceName: name,
   201  		XDSClient:            s.xdsC,
   202  		ModeCallback: func(addr net.Addr, mode connectivity.ServingMode, err error) {
   203  			s.opts.modeCallback(addr, ServingModeChangeArgs{
   204  				Mode: mode,
   205  				Err:  err,
   206  			})
   207  		},
   208  	})
   209  	return s.gs.Serve(lw)
   210  }
   211  
   212  // Stop stops the underlying gRPC server. It immediately closes all open
   213  // connections. It cancels all active RPCs on the server side and the
   214  // corresponding pending RPCs on the client side will get notified by connection
   215  // errors.
   216  func (s *GRPCServer) Stop() {
   217  	s.quit.Fire()
   218  	s.gs.Stop()
   219  	if s.xdsC != nil {
   220  		s.xdsClientClose()
   221  	}
   222  }
   223  
   224  // GracefulStop stops the underlying gRPC server gracefully. It stops the server
   225  // from accepting new connections and RPCs and blocks until all the pending RPCs
   226  // are finished.
   227  func (s *GRPCServer) GracefulStop() {
   228  	s.quit.Fire()
   229  	s.gs.GracefulStop()
   230  	if s.xdsC != nil {
   231  		s.xdsClientClose()
   232  	}
   233  }
   234  
   235  // routeAndProcess routes the incoming RPC to a configured route in the route
   236  // table and also processes the RPC by running the incoming RPC through any HTTP
   237  // Filters configured.
   238  func routeAndProcess(ctx context.Context) error {
   239  	conn := transport.GetConnection(ctx)
   240  	cw, ok := conn.(interface {
   241  		UsableRouteConfiguration() xdsresource.UsableRouteConfiguration
   242  	})
   243  	if !ok {
   244  		return errors.New("missing virtual hosts in incoming context")
   245  	}
   246  
   247  	rc := cw.UsableRouteConfiguration()
   248  	// Error out at routing l7 level with a status code UNAVAILABLE, represents
   249  	// an nack before usable route configuration or resource not found for RDS
   250  	// or error combining LDS + RDS (Shouldn't happen).
   251  	if rc.Err != nil {
   252  		if logger.V(2) {
   253  			logger.Infof("RPC on connection with xDS Configuration error: %v", rc.Err)
   254  		}
   255  		return status.Error(codes.Unavailable, "error from xDS configuration for matched route configuration")
   256  	}
   257  
   258  	mn, ok := grpc.Method(ctx)
   259  	if !ok {
   260  		return errors.New("missing method name in incoming context")
   261  	}
   262  	md, ok := metadata.FromIncomingContext(ctx)
   263  	if !ok {
   264  		return errors.New("missing metadata in incoming context")
   265  	}
   266  	// A41 added logic to the core grpc implementation to guarantee that once
   267  	// the RPC gets to this point, there will be a single, unambiguous authority
   268  	// present in the header map.
   269  	authority := md.Get(":authority")
   270  	vh := xdsresource.FindBestMatchingVirtualHostServer(authority[0], rc.VHS)
   271  	if vh == nil {
   272  		return status.Error(codes.Unavailable, "the incoming RPC did not match a configured Virtual Host")
   273  	}
   274  
   275  	var rwi *xdsresource.RouteWithInterceptors
   276  	rpcInfo := iresolver.RPCInfo{
   277  		Context: ctx,
   278  		Method:  mn,
   279  	}
   280  	for _, r := range vh.Routes {
   281  		if r.M.Match(rpcInfo) {
   282  			// "NonForwardingAction is expected for all Routes used on server-side; a route with an inappropriate action causes
   283  			// RPCs matching that route to fail with UNAVAILABLE." - A36
   284  			if r.ActionType != xdsresource.RouteActionNonForwardingAction {
   285  				return status.Error(codes.Unavailable, "the incoming RPC matched to a route that was not of action type non forwarding")
   286  			}
   287  			rwi = &r
   288  			break
   289  		}
   290  	}
   291  	if rwi == nil {
   292  		return status.Error(codes.Unavailable, "the incoming RPC did not match a configured Route")
   293  	}
   294  	for _, interceptor := range rwi.Interceptors {
   295  		if err := interceptor.AllowRPC(ctx); err != nil {
   296  			return status.Errorf(codes.PermissionDenied, "Incoming RPC is not allowed: %v", err)
   297  		}
   298  	}
   299  	return nil
   300  }
   301  
   302  // xdsUnaryInterceptor is the unary interceptor added to the gRPC server to
   303  // perform any xDS specific functionality on unary RPCs.
   304  func xdsUnaryInterceptor(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {
   305  	if err := routeAndProcess(ctx); err != nil {
   306  		return nil, err
   307  	}
   308  	return handler(ctx, req)
   309  }
   310  
   311  // xdsStreamInterceptor is the stream interceptor added to the gRPC server to
   312  // perform any xDS specific functionality on streaming RPCs.
   313  func xdsStreamInterceptor(srv any, ss grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
   314  	if err := routeAndProcess(ss.Context()); err != nil {
   315  		return err
   316  	}
   317  	return handler(srv, ss)
   318  }
   319  

View as plain text