...

Source file src/google.golang.org/grpc/server.go

Documentation: google.golang.org/grpc

     1  /*
     2   *
     3   * Copyright 2014 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  package grpc
    20  
    21  import (
    22  	"context"
    23  	"errors"
    24  	"fmt"
    25  	"io"
    26  	"math"
    27  	"net"
    28  	"net/http"
    29  	"reflect"
    30  	"runtime"
    31  	"strings"
    32  	"sync"
    33  	"sync/atomic"
    34  	"time"
    35  
    36  	"google.golang.org/grpc/codes"
    37  	"google.golang.org/grpc/credentials"
    38  	"google.golang.org/grpc/encoding"
    39  	"google.golang.org/grpc/encoding/proto"
    40  	"google.golang.org/grpc/grpclog"
    41  	"google.golang.org/grpc/internal"
    42  	"google.golang.org/grpc/internal/binarylog"
    43  	"google.golang.org/grpc/internal/channelz"
    44  	"google.golang.org/grpc/internal/grpcsync"
    45  	"google.golang.org/grpc/internal/grpcutil"
    46  	"google.golang.org/grpc/internal/transport"
    47  	"google.golang.org/grpc/keepalive"
    48  	"google.golang.org/grpc/metadata"
    49  	"google.golang.org/grpc/peer"
    50  	"google.golang.org/grpc/stats"
    51  	"google.golang.org/grpc/status"
    52  	"google.golang.org/grpc/tap"
    53  )
    54  
    55  const (
    56  	defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
    57  	defaultServerMaxSendMessageSize    = math.MaxInt32
    58  
    59  	// Server transports are tracked in a map which is keyed on listener
    60  	// address. For regular gRPC traffic, connections are accepted in Serve()
    61  	// through a call to Accept(), and we use the actual listener address as key
    62  	// when we add it to the map. But for connections received through
    63  	// ServeHTTP(), we do not have a listener and hence use this dummy value.
    64  	listenerAddressForServeHTTP = "listenerAddressForServeHTTP"
    65  )
    66  
    67  func init() {
    68  	internal.GetServerCredentials = func(srv *Server) credentials.TransportCredentials {
    69  		return srv.opts.creds
    70  	}
    71  	internal.IsRegisteredMethod = func(srv *Server, method string) bool {
    72  		return srv.isRegisteredMethod(method)
    73  	}
    74  	internal.ServerFromContext = serverFromContext
    75  	internal.AddGlobalServerOptions = func(opt ...ServerOption) {
    76  		globalServerOptions = append(globalServerOptions, opt...)
    77  	}
    78  	internal.ClearGlobalServerOptions = func() {
    79  		globalServerOptions = nil
    80  	}
    81  	internal.BinaryLogger = binaryLogger
    82  	internal.JoinServerOptions = newJoinServerOption
    83  	internal.RecvBufferPool = recvBufferPool
    84  }
    85  
    86  var statusOK = status.New(codes.OK, "")
    87  var logger = grpclog.Component("core")
    88  
    89  type methodHandler func(srv any, ctx context.Context, dec func(any) error, interceptor UnaryServerInterceptor) (any, error)
    90  
    91  // MethodDesc represents an RPC service's method specification.
    92  type MethodDesc struct {
    93  	MethodName string
    94  	Handler    methodHandler
    95  }
    96  
    97  // ServiceDesc represents an RPC service's specification.
    98  type ServiceDesc struct {
    99  	ServiceName string
   100  	// The pointer to the service interface. Used to check whether the user
   101  	// provided implementation satisfies the interface requirements.
   102  	HandlerType any
   103  	Methods     []MethodDesc
   104  	Streams     []StreamDesc
   105  	Metadata    any
   106  }
   107  
   108  // serviceInfo wraps information about a service. It is very similar to
   109  // ServiceDesc and is constructed from it for internal purposes.
   110  type serviceInfo struct {
   111  	// Contains the implementation for the methods in this service.
   112  	serviceImpl any
   113  	methods     map[string]*MethodDesc
   114  	streams     map[string]*StreamDesc
   115  	mdata       any
   116  }
   117  
   118  // Server is a gRPC server to serve RPC requests.
   119  type Server struct {
   120  	opts serverOptions
   121  
   122  	mu  sync.Mutex // guards following
   123  	lis map[net.Listener]bool
   124  	// conns contains all active server transports. It is a map keyed on a
   125  	// listener address with the value being the set of active transports
   126  	// belonging to that listener.
   127  	conns    map[string]map[transport.ServerTransport]bool
   128  	serve    bool
   129  	drain    bool
   130  	cv       *sync.Cond              // signaled when connections close for GracefulStop
   131  	services map[string]*serviceInfo // service name -> service info
   132  	events   traceEventLog
   133  
   134  	quit               *grpcsync.Event
   135  	done               *grpcsync.Event
   136  	channelzRemoveOnce sync.Once
   137  	serveWG            sync.WaitGroup // counts active Serve goroutines for Stop/GracefulStop
   138  	handlersWG         sync.WaitGroup // counts active method handler goroutines
   139  
   140  	channelz *channelz.Server
   141  
   142  	serverWorkerChannel      chan func()
   143  	serverWorkerChannelClose func()
   144  }
   145  
   146  type serverOptions struct {
   147  	creds                 credentials.TransportCredentials
   148  	codec                 baseCodec
   149  	cp                    Compressor
   150  	dc                    Decompressor
   151  	unaryInt              UnaryServerInterceptor
   152  	streamInt             StreamServerInterceptor
   153  	chainUnaryInts        []UnaryServerInterceptor
   154  	chainStreamInts       []StreamServerInterceptor
   155  	binaryLogger          binarylog.Logger
   156  	inTapHandle           tap.ServerInHandle
   157  	statsHandlers         []stats.Handler
   158  	maxConcurrentStreams  uint32
   159  	maxReceiveMessageSize int
   160  	maxSendMessageSize    int
   161  	unknownStreamDesc     *StreamDesc
   162  	keepaliveParams       keepalive.ServerParameters
   163  	keepalivePolicy       keepalive.EnforcementPolicy
   164  	initialWindowSize     int32
   165  	initialConnWindowSize int32
   166  	writeBufferSize       int
   167  	readBufferSize        int
   168  	sharedWriteBuffer     bool
   169  	connectionTimeout     time.Duration
   170  	maxHeaderListSize     *uint32
   171  	headerTableSize       *uint32
   172  	numServerWorkers      uint32
   173  	recvBufferPool        SharedBufferPool
   174  	waitForHandlers       bool
   175  }
   176  
   177  var defaultServerOptions = serverOptions{
   178  	maxConcurrentStreams:  math.MaxUint32,
   179  	maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
   180  	maxSendMessageSize:    defaultServerMaxSendMessageSize,
   181  	connectionTimeout:     120 * time.Second,
   182  	writeBufferSize:       defaultWriteBufSize,
   183  	readBufferSize:        defaultReadBufSize,
   184  	recvBufferPool:        nopBufferPool{},
   185  }
   186  var globalServerOptions []ServerOption
   187  
   188  // A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
   189  type ServerOption interface {
   190  	apply(*serverOptions)
   191  }
   192  
   193  // EmptyServerOption does not alter the server configuration. It can be embedded
   194  // in another structure to build custom server options.
   195  //
   196  // # Experimental
   197  //
   198  // Notice: This type is EXPERIMENTAL and may be changed or removed in a
   199  // later release.
   200  type EmptyServerOption struct{}
   201  
   202  func (EmptyServerOption) apply(*serverOptions) {}
   203  
   204  // funcServerOption wraps a function that modifies serverOptions into an
   205  // implementation of the ServerOption interface.
   206  type funcServerOption struct {
   207  	f func(*serverOptions)
   208  }
   209  
   210  func (fdo *funcServerOption) apply(do *serverOptions) {
   211  	fdo.f(do)
   212  }
   213  
   214  func newFuncServerOption(f func(*serverOptions)) *funcServerOption {
   215  	return &funcServerOption{
   216  		f: f,
   217  	}
   218  }
   219  
   220  // joinServerOption provides a way to combine arbitrary number of server
   221  // options into one.
   222  type joinServerOption struct {
   223  	opts []ServerOption
   224  }
   225  
   226  func (mdo *joinServerOption) apply(do *serverOptions) {
   227  	for _, opt := range mdo.opts {
   228  		opt.apply(do)
   229  	}
   230  }
   231  
   232  func newJoinServerOption(opts ...ServerOption) ServerOption {
   233  	return &joinServerOption{opts: opts}
   234  }
   235  
   236  // SharedWriteBuffer allows reusing per-connection transport write buffer.
   237  // If this option is set to true every connection will release the buffer after
   238  // flushing the data on the wire.
   239  //
   240  // # Experimental
   241  //
   242  // Notice: This API is EXPERIMENTAL and may be changed or removed in a
   243  // later release.
   244  func SharedWriteBuffer(val bool) ServerOption {
   245  	return newFuncServerOption(func(o *serverOptions) {
   246  		o.sharedWriteBuffer = val
   247  	})
   248  }
   249  
   250  // WriteBufferSize determines how much data can be batched before doing a write
   251  // on the wire. The default value for this buffer is 32KB. Zero or negative
   252  // values will disable the write buffer such that each write will be on underlying
   253  // connection. Note: A Send call may not directly translate to a write.
   254  func WriteBufferSize(s int) ServerOption {
   255  	return newFuncServerOption(func(o *serverOptions) {
   256  		o.writeBufferSize = s
   257  	})
   258  }
   259  
   260  // ReadBufferSize lets you set the size of read buffer, this determines how much
   261  // data can be read at most for one read syscall. The default value for this
   262  // buffer is 32KB. Zero or negative values will disable read buffer for a
   263  // connection so data framer can access the underlying conn directly.
   264  func ReadBufferSize(s int) ServerOption {
   265  	return newFuncServerOption(func(o *serverOptions) {
   266  		o.readBufferSize = s
   267  	})
   268  }
   269  
   270  // InitialWindowSize returns a ServerOption that sets window size for stream.
   271  // The lower bound for window size is 64K and any value smaller than that will be ignored.
   272  func InitialWindowSize(s int32) ServerOption {
   273  	return newFuncServerOption(func(o *serverOptions) {
   274  		o.initialWindowSize = s
   275  	})
   276  }
   277  
   278  // InitialConnWindowSize returns a ServerOption that sets window size for a connection.
   279  // The lower bound for window size is 64K and any value smaller than that will be ignored.
   280  func InitialConnWindowSize(s int32) ServerOption {
   281  	return newFuncServerOption(func(o *serverOptions) {
   282  		o.initialConnWindowSize = s
   283  	})
   284  }
   285  
   286  // KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
   287  func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
   288  	if kp.Time > 0 && kp.Time < internal.KeepaliveMinServerPingTime {
   289  		logger.Warning("Adjusting keepalive ping interval to minimum period of 1s")
   290  		kp.Time = internal.KeepaliveMinServerPingTime
   291  	}
   292  
   293  	return newFuncServerOption(func(o *serverOptions) {
   294  		o.keepaliveParams = kp
   295  	})
   296  }
   297  
   298  // KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server.
   299  func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption {
   300  	return newFuncServerOption(func(o *serverOptions) {
   301  		o.keepalivePolicy = kep
   302  	})
   303  }
   304  
   305  // CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
   306  //
   307  // This will override any lookups by content-subtype for Codecs registered with RegisterCodec.
   308  //
   309  // Deprecated: register codecs using encoding.RegisterCodec. The server will
   310  // automatically use registered codecs based on the incoming requests' headers.
   311  // See also
   312  // https://github.com/grpc/grpc-go/blob/master/Documentation/encoding.md#using-a-codec.
   313  // Will be supported throughout 1.x.
   314  func CustomCodec(codec Codec) ServerOption {
   315  	return newFuncServerOption(func(o *serverOptions) {
   316  		o.codec = codec
   317  	})
   318  }
   319  
   320  // ForceServerCodec returns a ServerOption that sets a codec for message
   321  // marshaling and unmarshaling.
   322  //
   323  // This will override any lookups by content-subtype for Codecs registered
   324  // with RegisterCodec.
   325  //
   326  // See Content-Type on
   327  // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
   328  // more details. Also see the documentation on RegisterCodec and
   329  // CallContentSubtype for more details on the interaction between encoding.Codec
   330  // and content-subtype.
   331  //
   332  // This function is provided for advanced users; prefer to register codecs
   333  // using encoding.RegisterCodec.
   334  // The server will automatically use registered codecs based on the incoming
   335  // requests' headers. See also
   336  // https://github.com/grpc/grpc-go/blob/master/Documentation/encoding.md#using-a-codec.
   337  // Will be supported throughout 1.x.
   338  //
   339  // # Experimental
   340  //
   341  // Notice: This API is EXPERIMENTAL and may be changed or removed in a
   342  // later release.
   343  func ForceServerCodec(codec encoding.Codec) ServerOption {
   344  	return newFuncServerOption(func(o *serverOptions) {
   345  		o.codec = codec
   346  	})
   347  }
   348  
   349  // RPCCompressor returns a ServerOption that sets a compressor for outbound
   350  // messages.  For backward compatibility, all outbound messages will be sent
   351  // using this compressor, regardless of incoming message compression.  By
   352  // default, server messages will be sent using the same compressor with which
   353  // request messages were sent.
   354  //
   355  // Deprecated: use encoding.RegisterCompressor instead. Will be supported
   356  // throughout 1.x.
   357  func RPCCompressor(cp Compressor) ServerOption {
   358  	return newFuncServerOption(func(o *serverOptions) {
   359  		o.cp = cp
   360  	})
   361  }
   362  
   363  // RPCDecompressor returns a ServerOption that sets a decompressor for inbound
   364  // messages.  It has higher priority than decompressors registered via
   365  // encoding.RegisterCompressor.
   366  //
   367  // Deprecated: use encoding.RegisterCompressor instead. Will be supported
   368  // throughout 1.x.
   369  func RPCDecompressor(dc Decompressor) ServerOption {
   370  	return newFuncServerOption(func(o *serverOptions) {
   371  		o.dc = dc
   372  	})
   373  }
   374  
   375  // MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
   376  // If this is not set, gRPC uses the default limit.
   377  //
   378  // Deprecated: use MaxRecvMsgSize instead. Will be supported throughout 1.x.
   379  func MaxMsgSize(m int) ServerOption {
   380  	return MaxRecvMsgSize(m)
   381  }
   382  
   383  // MaxRecvMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
   384  // If this is not set, gRPC uses the default 4MB.
   385  func MaxRecvMsgSize(m int) ServerOption {
   386  	return newFuncServerOption(func(o *serverOptions) {
   387  		o.maxReceiveMessageSize = m
   388  	})
   389  }
   390  
   391  // MaxSendMsgSize returns a ServerOption to set the max message size in bytes the server can send.
   392  // If this is not set, gRPC uses the default `math.MaxInt32`.
   393  func MaxSendMsgSize(m int) ServerOption {
   394  	return newFuncServerOption(func(o *serverOptions) {
   395  		o.maxSendMessageSize = m
   396  	})
   397  }
   398  
   399  // MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
   400  // of concurrent streams to each ServerTransport.
   401  func MaxConcurrentStreams(n uint32) ServerOption {
   402  	if n == 0 {
   403  		n = math.MaxUint32
   404  	}
   405  	return newFuncServerOption(func(o *serverOptions) {
   406  		o.maxConcurrentStreams = n
   407  	})
   408  }
   409  
   410  // Creds returns a ServerOption that sets credentials for server connections.
   411  func Creds(c credentials.TransportCredentials) ServerOption {
   412  	return newFuncServerOption(func(o *serverOptions) {
   413  		o.creds = c
   414  	})
   415  }
   416  
   417  // UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the
   418  // server. Only one unary interceptor can be installed. The construction of multiple
   419  // interceptors (e.g., chaining) can be implemented at the caller.
   420  func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
   421  	return newFuncServerOption(func(o *serverOptions) {
   422  		if o.unaryInt != nil {
   423  			panic("The unary server interceptor was already set and may not be reset.")
   424  		}
   425  		o.unaryInt = i
   426  	})
   427  }
   428  
   429  // ChainUnaryInterceptor returns a ServerOption that specifies the chained interceptor
   430  // for unary RPCs. The first interceptor will be the outer most,
   431  // while the last interceptor will be the inner most wrapper around the real call.
   432  // All unary interceptors added by this method will be chained.
   433  func ChainUnaryInterceptor(interceptors ...UnaryServerInterceptor) ServerOption {
   434  	return newFuncServerOption(func(o *serverOptions) {
   435  		o.chainUnaryInts = append(o.chainUnaryInts, interceptors...)
   436  	})
   437  }
   438  
   439  // StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the
   440  // server. Only one stream interceptor can be installed.
   441  func StreamInterceptor(i StreamServerInterceptor) ServerOption {
   442  	return newFuncServerOption(func(o *serverOptions) {
   443  		if o.streamInt != nil {
   444  			panic("The stream server interceptor was already set and may not be reset.")
   445  		}
   446  		o.streamInt = i
   447  	})
   448  }
   449  
   450  // ChainStreamInterceptor returns a ServerOption that specifies the chained interceptor
   451  // for streaming RPCs. The first interceptor will be the outer most,
   452  // while the last interceptor will be the inner most wrapper around the real call.
   453  // All stream interceptors added by this method will be chained.
   454  func ChainStreamInterceptor(interceptors ...StreamServerInterceptor) ServerOption {
   455  	return newFuncServerOption(func(o *serverOptions) {
   456  		o.chainStreamInts = append(o.chainStreamInts, interceptors...)
   457  	})
   458  }
   459  
   460  // InTapHandle returns a ServerOption that sets the tap handle for all the server
   461  // transport to be created. Only one can be installed.
   462  //
   463  // # Experimental
   464  //
   465  // Notice: This API is EXPERIMENTAL and may be changed or removed in a
   466  // later release.
   467  func InTapHandle(h tap.ServerInHandle) ServerOption {
   468  	return newFuncServerOption(func(o *serverOptions) {
   469  		if o.inTapHandle != nil {
   470  			panic("The tap handle was already set and may not be reset.")
   471  		}
   472  		o.inTapHandle = h
   473  	})
   474  }
   475  
   476  // StatsHandler returns a ServerOption that sets the stats handler for the server.
   477  func StatsHandler(h stats.Handler) ServerOption {
   478  	return newFuncServerOption(func(o *serverOptions) {
   479  		if h == nil {
   480  			logger.Error("ignoring nil parameter in grpc.StatsHandler ServerOption")
   481  			// Do not allow a nil stats handler, which would otherwise cause
   482  			// panics.
   483  			return
   484  		}
   485  		o.statsHandlers = append(o.statsHandlers, h)
   486  	})
   487  }
   488  
   489  // binaryLogger returns a ServerOption that can set the binary logger for the
   490  // server.
   491  func binaryLogger(bl binarylog.Logger) ServerOption {
   492  	return newFuncServerOption(func(o *serverOptions) {
   493  		o.binaryLogger = bl
   494  	})
   495  }
   496  
   497  // UnknownServiceHandler returns a ServerOption that allows for adding a custom
   498  // unknown service handler. The provided method is a bidi-streaming RPC service
   499  // handler that will be invoked instead of returning the "unimplemented" gRPC
   500  // error whenever a request is received for an unregistered service or method.
   501  // The handling function and stream interceptor (if set) have full access to
   502  // the ServerStream, including its Context.
   503  func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
   504  	return newFuncServerOption(func(o *serverOptions) {
   505  		o.unknownStreamDesc = &StreamDesc{
   506  			StreamName: "unknown_service_handler",
   507  			Handler:    streamHandler,
   508  			// We need to assume that the users of the streamHandler will want to use both.
   509  			ClientStreams: true,
   510  			ServerStreams: true,
   511  		}
   512  	})
   513  }
   514  
   515  // ConnectionTimeout returns a ServerOption that sets the timeout for
   516  // connection establishment (up to and including HTTP/2 handshaking) for all
   517  // new connections.  If this is not set, the default is 120 seconds.  A zero or
   518  // negative value will result in an immediate timeout.
   519  //
   520  // # Experimental
   521  //
   522  // Notice: This API is EXPERIMENTAL and may be changed or removed in a
   523  // later release.
   524  func ConnectionTimeout(d time.Duration) ServerOption {
   525  	return newFuncServerOption(func(o *serverOptions) {
   526  		o.connectionTimeout = d
   527  	})
   528  }
   529  
   530  // MaxHeaderListSizeServerOption is a ServerOption that sets the max
   531  // (uncompressed) size of header list that the server is prepared to accept.
   532  type MaxHeaderListSizeServerOption struct {
   533  	MaxHeaderListSize uint32
   534  }
   535  
   536  func (o MaxHeaderListSizeServerOption) apply(so *serverOptions) {
   537  	so.maxHeaderListSize = &o.MaxHeaderListSize
   538  }
   539  
   540  // MaxHeaderListSize returns a ServerOption that sets the max (uncompressed) size
   541  // of header list that the server is prepared to accept.
   542  func MaxHeaderListSize(s uint32) ServerOption {
   543  	return MaxHeaderListSizeServerOption{
   544  		MaxHeaderListSize: s,
   545  	}
   546  }
   547  
   548  // HeaderTableSize returns a ServerOption that sets the size of dynamic
   549  // header table for stream.
   550  //
   551  // # Experimental
   552  //
   553  // Notice: This API is EXPERIMENTAL and may be changed or removed in a
   554  // later release.
   555  func HeaderTableSize(s uint32) ServerOption {
   556  	return newFuncServerOption(func(o *serverOptions) {
   557  		o.headerTableSize = &s
   558  	})
   559  }
   560  
   561  // NumStreamWorkers returns a ServerOption that sets the number of worker
   562  // goroutines that should be used to process incoming streams. Setting this to
   563  // zero (default) will disable workers and spawn a new goroutine for each
   564  // stream.
   565  //
   566  // # Experimental
   567  //
   568  // Notice: This API is EXPERIMENTAL and may be changed or removed in a
   569  // later release.
   570  func NumStreamWorkers(numServerWorkers uint32) ServerOption {
   571  	// TODO: If/when this API gets stabilized (i.e. stream workers become the
   572  	// only way streams are processed), change the behavior of the zero value to
   573  	// a sane default. Preliminary experiments suggest that a value equal to the
   574  	// number of CPUs available is most performant; requires thorough testing.
   575  	return newFuncServerOption(func(o *serverOptions) {
   576  		o.numServerWorkers = numServerWorkers
   577  	})
   578  }
   579  
   580  // WaitForHandlers cause Stop to wait until all outstanding method handlers have
   581  // exited before returning.  If false, Stop will return as soon as all
   582  // connections have closed, but method handlers may still be running. By
   583  // default, Stop does not wait for method handlers to return.
   584  //
   585  // # Experimental
   586  //
   587  // Notice: This API is EXPERIMENTAL and may be changed or removed in a
   588  // later release.
   589  func WaitForHandlers(w bool) ServerOption {
   590  	return newFuncServerOption(func(o *serverOptions) {
   591  		o.waitForHandlers = w
   592  	})
   593  }
   594  
   595  // RecvBufferPool returns a ServerOption that configures the server
   596  // to use the provided shared buffer pool for parsing incoming messages. Depending
   597  // on the application's workload, this could result in reduced memory allocation.
   598  //
   599  // If you are unsure about how to implement a memory pool but want to utilize one,
   600  // begin with grpc.NewSharedBufferPool.
   601  //
   602  // Note: The shared buffer pool feature will not be active if any of the following
   603  // options are used: StatsHandler, EnableTracing, or binary logging. In such
   604  // cases, the shared buffer pool will be ignored.
   605  //
   606  // Deprecated: use experimental.WithRecvBufferPool instead.  Will be deleted in
   607  // v1.60.0 or later.
   608  func RecvBufferPool(bufferPool SharedBufferPool) ServerOption {
   609  	return recvBufferPool(bufferPool)
   610  }
   611  
   612  func recvBufferPool(bufferPool SharedBufferPool) ServerOption {
   613  	return newFuncServerOption(func(o *serverOptions) {
   614  		o.recvBufferPool = bufferPool
   615  	})
   616  }
   617  
   618  // serverWorkerResetThreshold defines how often the stack must be reset. Every
   619  // N requests, by spawning a new goroutine in its place, a worker can reset its
   620  // stack so that large stacks don't live in memory forever. 2^16 should allow
   621  // each goroutine stack to live for at least a few seconds in a typical
   622  // workload (assuming a QPS of a few thousand requests/sec).
   623  const serverWorkerResetThreshold = 1 << 16
   624  
   625  // serverWorkers blocks on a *transport.Stream channel forever and waits for
   626  // data to be fed by serveStreams. This allows multiple requests to be
   627  // processed by the same goroutine, removing the need for expensive stack
   628  // re-allocations (see the runtime.morestack problem [1]).
   629  //
   630  // [1] https://github.com/golang/go/issues/18138
   631  func (s *Server) serverWorker() {
   632  	for completed := 0; completed < serverWorkerResetThreshold; completed++ {
   633  		f, ok := <-s.serverWorkerChannel
   634  		if !ok {
   635  			return
   636  		}
   637  		f()
   638  	}
   639  	go s.serverWorker()
   640  }
   641  
   642  // initServerWorkers creates worker goroutines and a channel to process incoming
   643  // connections to reduce the time spent overall on runtime.morestack.
   644  func (s *Server) initServerWorkers() {
   645  	s.serverWorkerChannel = make(chan func())
   646  	s.serverWorkerChannelClose = grpcsync.OnceFunc(func() {
   647  		close(s.serverWorkerChannel)
   648  	})
   649  	for i := uint32(0); i < s.opts.numServerWorkers; i++ {
   650  		go s.serverWorker()
   651  	}
   652  }
   653  
   654  // NewServer creates a gRPC server which has no service registered and has not
   655  // started to accept requests yet.
   656  func NewServer(opt ...ServerOption) *Server {
   657  	opts := defaultServerOptions
   658  	for _, o := range globalServerOptions {
   659  		o.apply(&opts)
   660  	}
   661  	for _, o := range opt {
   662  		o.apply(&opts)
   663  	}
   664  	s := &Server{
   665  		lis:      make(map[net.Listener]bool),
   666  		opts:     opts,
   667  		conns:    make(map[string]map[transport.ServerTransport]bool),
   668  		services: make(map[string]*serviceInfo),
   669  		quit:     grpcsync.NewEvent(),
   670  		done:     grpcsync.NewEvent(),
   671  		channelz: channelz.RegisterServer(""),
   672  	}
   673  	chainUnaryServerInterceptors(s)
   674  	chainStreamServerInterceptors(s)
   675  	s.cv = sync.NewCond(&s.mu)
   676  	if EnableTracing {
   677  		_, file, line, _ := runtime.Caller(1)
   678  		s.events = newTraceEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
   679  	}
   680  
   681  	if s.opts.numServerWorkers > 0 {
   682  		s.initServerWorkers()
   683  	}
   684  
   685  	channelz.Info(logger, s.channelz, "Server created")
   686  	return s
   687  }
   688  
   689  // printf records an event in s's event log, unless s has been stopped.
   690  // REQUIRES s.mu is held.
   691  func (s *Server) printf(format string, a ...any) {
   692  	if s.events != nil {
   693  		s.events.Printf(format, a...)
   694  	}
   695  }
   696  
   697  // errorf records an error in s's event log, unless s has been stopped.
   698  // REQUIRES s.mu is held.
   699  func (s *Server) errorf(format string, a ...any) {
   700  	if s.events != nil {
   701  		s.events.Errorf(format, a...)
   702  	}
   703  }
   704  
   705  // ServiceRegistrar wraps a single method that supports service registration. It
   706  // enables users to pass concrete types other than grpc.Server to the service
   707  // registration methods exported by the IDL generated code.
   708  type ServiceRegistrar interface {
   709  	// RegisterService registers a service and its implementation to the
   710  	// concrete type implementing this interface.  It may not be called
   711  	// once the server has started serving.
   712  	// desc describes the service and its methods and handlers. impl is the
   713  	// service implementation which is passed to the method handlers.
   714  	RegisterService(desc *ServiceDesc, impl any)
   715  }
   716  
   717  // RegisterService registers a service and its implementation to the gRPC
   718  // server. It is called from the IDL generated code. This must be called before
   719  // invoking Serve. If ss is non-nil (for legacy code), its type is checked to
   720  // ensure it implements sd.HandlerType.
   721  func (s *Server) RegisterService(sd *ServiceDesc, ss any) {
   722  	if ss != nil {
   723  		ht := reflect.TypeOf(sd.HandlerType).Elem()
   724  		st := reflect.TypeOf(ss)
   725  		if !st.Implements(ht) {
   726  			logger.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
   727  		}
   728  	}
   729  	s.register(sd, ss)
   730  }
   731  
   732  func (s *Server) register(sd *ServiceDesc, ss any) {
   733  	s.mu.Lock()
   734  	defer s.mu.Unlock()
   735  	s.printf("RegisterService(%q)", sd.ServiceName)
   736  	if s.serve {
   737  		logger.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
   738  	}
   739  	if _, ok := s.services[sd.ServiceName]; ok {
   740  		logger.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
   741  	}
   742  	info := &serviceInfo{
   743  		serviceImpl: ss,
   744  		methods:     make(map[string]*MethodDesc),
   745  		streams:     make(map[string]*StreamDesc),
   746  		mdata:       sd.Metadata,
   747  	}
   748  	for i := range sd.Methods {
   749  		d := &sd.Methods[i]
   750  		info.methods[d.MethodName] = d
   751  	}
   752  	for i := range sd.Streams {
   753  		d := &sd.Streams[i]
   754  		info.streams[d.StreamName] = d
   755  	}
   756  	s.services[sd.ServiceName] = info
   757  }
   758  
   759  // MethodInfo contains the information of an RPC including its method name and type.
   760  type MethodInfo struct {
   761  	// Name is the method name only, without the service name or package name.
   762  	Name string
   763  	// IsClientStream indicates whether the RPC is a client streaming RPC.
   764  	IsClientStream bool
   765  	// IsServerStream indicates whether the RPC is a server streaming RPC.
   766  	IsServerStream bool
   767  }
   768  
   769  // ServiceInfo contains unary RPC method info, streaming RPC method info and metadata for a service.
   770  type ServiceInfo struct {
   771  	Methods []MethodInfo
   772  	// Metadata is the metadata specified in ServiceDesc when registering service.
   773  	Metadata any
   774  }
   775  
   776  // GetServiceInfo returns a map from service names to ServiceInfo.
   777  // Service names include the package names, in the form of <package>.<service>.
   778  func (s *Server) GetServiceInfo() map[string]ServiceInfo {
   779  	ret := make(map[string]ServiceInfo)
   780  	for n, srv := range s.services {
   781  		methods := make([]MethodInfo, 0, len(srv.methods)+len(srv.streams))
   782  		for m := range srv.methods {
   783  			methods = append(methods, MethodInfo{
   784  				Name:           m,
   785  				IsClientStream: false,
   786  				IsServerStream: false,
   787  			})
   788  		}
   789  		for m, d := range srv.streams {
   790  			methods = append(methods, MethodInfo{
   791  				Name:           m,
   792  				IsClientStream: d.ClientStreams,
   793  				IsServerStream: d.ServerStreams,
   794  			})
   795  		}
   796  
   797  		ret[n] = ServiceInfo{
   798  			Methods:  methods,
   799  			Metadata: srv.mdata,
   800  		}
   801  	}
   802  	return ret
   803  }
   804  
   805  // ErrServerStopped indicates that the operation is now illegal because of
   806  // the server being stopped.
   807  var ErrServerStopped = errors.New("grpc: the server has been stopped")
   808  
   809  type listenSocket struct {
   810  	net.Listener
   811  	channelz *channelz.Socket
   812  }
   813  
   814  func (l *listenSocket) Close() error {
   815  	err := l.Listener.Close()
   816  	channelz.RemoveEntry(l.channelz.ID)
   817  	channelz.Info(logger, l.channelz, "ListenSocket deleted")
   818  	return err
   819  }
   820  
   821  // Serve accepts incoming connections on the listener lis, creating a new
   822  // ServerTransport and service goroutine for each. The service goroutines
   823  // read gRPC requests and then call the registered handlers to reply to them.
   824  // Serve returns when lis.Accept fails with fatal errors.  lis will be closed when
   825  // this method returns.
   826  // Serve will return a non-nil error unless Stop or GracefulStop is called.
   827  //
   828  // Note: All supported releases of Go (as of December 2023) override the OS
   829  // defaults for TCP keepalive time and interval to 15s. To enable TCP keepalive
   830  // with OS defaults for keepalive time and interval, callers need to do the
   831  // following two things:
   832  //   - pass a net.Listener created by calling the Listen method on a
   833  //     net.ListenConfig with the `KeepAlive` field set to a negative value. This
   834  //     will result in the Go standard library not overriding OS defaults for TCP
   835  //     keepalive interval and time. But this will also result in the Go standard
   836  //     library not enabling TCP keepalives by default.
   837  //   - override the Accept method on the passed in net.Listener and set the
   838  //     SO_KEEPALIVE socket option to enable TCP keepalives, with OS defaults.
   839  func (s *Server) Serve(lis net.Listener) error {
   840  	s.mu.Lock()
   841  	s.printf("serving")
   842  	s.serve = true
   843  	if s.lis == nil {
   844  		// Serve called after Stop or GracefulStop.
   845  		s.mu.Unlock()
   846  		lis.Close()
   847  		return ErrServerStopped
   848  	}
   849  
   850  	s.serveWG.Add(1)
   851  	defer func() {
   852  		s.serveWG.Done()
   853  		if s.quit.HasFired() {
   854  			// Stop or GracefulStop called; block until done and return nil.
   855  			<-s.done.Done()
   856  		}
   857  	}()
   858  
   859  	ls := &listenSocket{
   860  		Listener: lis,
   861  		channelz: channelz.RegisterSocket(&channelz.Socket{
   862  			SocketType:    channelz.SocketTypeListen,
   863  			Parent:        s.channelz,
   864  			RefName:       lis.Addr().String(),
   865  			LocalAddr:     lis.Addr(),
   866  			SocketOptions: channelz.GetSocketOption(lis)},
   867  		),
   868  	}
   869  	s.lis[ls] = true
   870  
   871  	defer func() {
   872  		s.mu.Lock()
   873  		if s.lis != nil && s.lis[ls] {
   874  			ls.Close()
   875  			delete(s.lis, ls)
   876  		}
   877  		s.mu.Unlock()
   878  	}()
   879  
   880  	s.mu.Unlock()
   881  	channelz.Info(logger, ls.channelz, "ListenSocket created")
   882  
   883  	var tempDelay time.Duration // how long to sleep on accept failure
   884  	for {
   885  		rawConn, err := lis.Accept()
   886  		if err != nil {
   887  			if ne, ok := err.(interface {
   888  				Temporary() bool
   889  			}); ok && ne.Temporary() {
   890  				if tempDelay == 0 {
   891  					tempDelay = 5 * time.Millisecond
   892  				} else {
   893  					tempDelay *= 2
   894  				}
   895  				if max := 1 * time.Second; tempDelay > max {
   896  					tempDelay = max
   897  				}
   898  				s.mu.Lock()
   899  				s.printf("Accept error: %v; retrying in %v", err, tempDelay)
   900  				s.mu.Unlock()
   901  				timer := time.NewTimer(tempDelay)
   902  				select {
   903  				case <-timer.C:
   904  				case <-s.quit.Done():
   905  					timer.Stop()
   906  					return nil
   907  				}
   908  				continue
   909  			}
   910  			s.mu.Lock()
   911  			s.printf("done serving; Accept = %v", err)
   912  			s.mu.Unlock()
   913  
   914  			if s.quit.HasFired() {
   915  				return nil
   916  			}
   917  			return err
   918  		}
   919  		tempDelay = 0
   920  		// Start a new goroutine to deal with rawConn so we don't stall this Accept
   921  		// loop goroutine.
   922  		//
   923  		// Make sure we account for the goroutine so GracefulStop doesn't nil out
   924  		// s.conns before this conn can be added.
   925  		s.serveWG.Add(1)
   926  		go func() {
   927  			s.handleRawConn(lis.Addr().String(), rawConn)
   928  			s.serveWG.Done()
   929  		}()
   930  	}
   931  }
   932  
   933  // handleRawConn forks a goroutine to handle a just-accepted connection that
   934  // has not had any I/O performed on it yet.
   935  func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) {
   936  	if s.quit.HasFired() {
   937  		rawConn.Close()
   938  		return
   939  	}
   940  	rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
   941  
   942  	// Finish handshaking (HTTP2)
   943  	st := s.newHTTP2Transport(rawConn)
   944  	rawConn.SetDeadline(time.Time{})
   945  	if st == nil {
   946  		return
   947  	}
   948  
   949  	if cc, ok := rawConn.(interface {
   950  		PassServerTransport(transport.ServerTransport)
   951  	}); ok {
   952  		cc.PassServerTransport(st)
   953  	}
   954  
   955  	if !s.addConn(lisAddr, st) {
   956  		return
   957  	}
   958  	go func() {
   959  		s.serveStreams(context.Background(), st, rawConn)
   960  		s.removeConn(lisAddr, st)
   961  	}()
   962  }
   963  
   964  // newHTTP2Transport sets up a http/2 transport (using the
   965  // gRPC http2 server transport in transport/http2_server.go).
   966  func (s *Server) newHTTP2Transport(c net.Conn) transport.ServerTransport {
   967  	config := &transport.ServerConfig{
   968  		MaxStreams:            s.opts.maxConcurrentStreams,
   969  		ConnectionTimeout:     s.opts.connectionTimeout,
   970  		Credentials:           s.opts.creds,
   971  		InTapHandle:           s.opts.inTapHandle,
   972  		StatsHandlers:         s.opts.statsHandlers,
   973  		KeepaliveParams:       s.opts.keepaliveParams,
   974  		KeepalivePolicy:       s.opts.keepalivePolicy,
   975  		InitialWindowSize:     s.opts.initialWindowSize,
   976  		InitialConnWindowSize: s.opts.initialConnWindowSize,
   977  		WriteBufferSize:       s.opts.writeBufferSize,
   978  		ReadBufferSize:        s.opts.readBufferSize,
   979  		SharedWriteBuffer:     s.opts.sharedWriteBuffer,
   980  		ChannelzParent:        s.channelz,
   981  		MaxHeaderListSize:     s.opts.maxHeaderListSize,
   982  		HeaderTableSize:       s.opts.headerTableSize,
   983  	}
   984  	st, err := transport.NewServerTransport(c, config)
   985  	if err != nil {
   986  		s.mu.Lock()
   987  		s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
   988  		s.mu.Unlock()
   989  		// ErrConnDispatched means that the connection was dispatched away from
   990  		// gRPC; those connections should be left open.
   991  		if err != credentials.ErrConnDispatched {
   992  			// Don't log on ErrConnDispatched and io.EOF to prevent log spam.
   993  			if err != io.EOF {
   994  				channelz.Info(logger, s.channelz, "grpc: Server.Serve failed to create ServerTransport: ", err)
   995  			}
   996  			c.Close()
   997  		}
   998  		return nil
   999  	}
  1000  
  1001  	return st
  1002  }
  1003  
  1004  func (s *Server) serveStreams(ctx context.Context, st transport.ServerTransport, rawConn net.Conn) {
  1005  	ctx = transport.SetConnection(ctx, rawConn)
  1006  	ctx = peer.NewContext(ctx, st.Peer())
  1007  	for _, sh := range s.opts.statsHandlers {
  1008  		ctx = sh.TagConn(ctx, &stats.ConnTagInfo{
  1009  			RemoteAddr: st.Peer().Addr,
  1010  			LocalAddr:  st.Peer().LocalAddr,
  1011  		})
  1012  		sh.HandleConn(ctx, &stats.ConnBegin{})
  1013  	}
  1014  
  1015  	defer func() {
  1016  		st.Close(errors.New("finished serving streams for the server transport"))
  1017  		for _, sh := range s.opts.statsHandlers {
  1018  			sh.HandleConn(ctx, &stats.ConnEnd{})
  1019  		}
  1020  	}()
  1021  
  1022  	streamQuota := newHandlerQuota(s.opts.maxConcurrentStreams)
  1023  	st.HandleStreams(ctx, func(stream *transport.Stream) {
  1024  		s.handlersWG.Add(1)
  1025  		streamQuota.acquire()
  1026  		f := func() {
  1027  			defer streamQuota.release()
  1028  			defer s.handlersWG.Done()
  1029  			s.handleStream(st, stream)
  1030  		}
  1031  
  1032  		if s.opts.numServerWorkers > 0 {
  1033  			select {
  1034  			case s.serverWorkerChannel <- f:
  1035  				return
  1036  			default:
  1037  				// If all stream workers are busy, fallback to the default code path.
  1038  			}
  1039  		}
  1040  		go f()
  1041  	})
  1042  }
  1043  
  1044  var _ http.Handler = (*Server)(nil)
  1045  
  1046  // ServeHTTP implements the Go standard library's http.Handler
  1047  // interface by responding to the gRPC request r, by looking up
  1048  // the requested gRPC method in the gRPC server s.
  1049  //
  1050  // The provided HTTP request must have arrived on an HTTP/2
  1051  // connection. When using the Go standard library's server,
  1052  // practically this means that the Request must also have arrived
  1053  // over TLS.
  1054  //
  1055  // To share one port (such as 443 for https) between gRPC and an
  1056  // existing http.Handler, use a root http.Handler such as:
  1057  //
  1058  //	if r.ProtoMajor == 2 && strings.HasPrefix(
  1059  //		r.Header.Get("Content-Type"), "application/grpc") {
  1060  //		grpcServer.ServeHTTP(w, r)
  1061  //	} else {
  1062  //		yourMux.ServeHTTP(w, r)
  1063  //	}
  1064  //
  1065  // Note that ServeHTTP uses Go's HTTP/2 server implementation which is totally
  1066  // separate from grpc-go's HTTP/2 server. Performance and features may vary
  1067  // between the two paths. ServeHTTP does not support some gRPC features
  1068  // available through grpc-go's HTTP/2 server.
  1069  //
  1070  // # Experimental
  1071  //
  1072  // Notice: This API is EXPERIMENTAL and may be changed or removed in a
  1073  // later release.
  1074  func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  1075  	st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandlers)
  1076  	if err != nil {
  1077  		// Errors returned from transport.NewServerHandlerTransport have
  1078  		// already been written to w.
  1079  		return
  1080  	}
  1081  	if !s.addConn(listenerAddressForServeHTTP, st) {
  1082  		return
  1083  	}
  1084  	defer s.removeConn(listenerAddressForServeHTTP, st)
  1085  	s.serveStreams(r.Context(), st, nil)
  1086  }
  1087  
  1088  func (s *Server) addConn(addr string, st transport.ServerTransport) bool {
  1089  	s.mu.Lock()
  1090  	defer s.mu.Unlock()
  1091  	if s.conns == nil {
  1092  		st.Close(errors.New("Server.addConn called when server has already been stopped"))
  1093  		return false
  1094  	}
  1095  	if s.drain {
  1096  		// Transport added after we drained our existing conns: drain it
  1097  		// immediately.
  1098  		st.Drain("")
  1099  	}
  1100  
  1101  	if s.conns[addr] == nil {
  1102  		// Create a map entry if this is the first connection on this listener.
  1103  		s.conns[addr] = make(map[transport.ServerTransport]bool)
  1104  	}
  1105  	s.conns[addr][st] = true
  1106  	return true
  1107  }
  1108  
  1109  func (s *Server) removeConn(addr string, st transport.ServerTransport) {
  1110  	s.mu.Lock()
  1111  	defer s.mu.Unlock()
  1112  
  1113  	conns := s.conns[addr]
  1114  	if conns != nil {
  1115  		delete(conns, st)
  1116  		if len(conns) == 0 {
  1117  			// If the last connection for this address is being removed, also
  1118  			// remove the map entry corresponding to the address. This is used
  1119  			// in GracefulStop() when waiting for all connections to be closed.
  1120  			delete(s.conns, addr)
  1121  		}
  1122  		s.cv.Broadcast()
  1123  	}
  1124  }
  1125  
  1126  func (s *Server) incrCallsStarted() {
  1127  	s.channelz.ServerMetrics.CallsStarted.Add(1)
  1128  	s.channelz.ServerMetrics.LastCallStartedTimestamp.Store(time.Now().UnixNano())
  1129  }
  1130  
  1131  func (s *Server) incrCallsSucceeded() {
  1132  	s.channelz.ServerMetrics.CallsSucceeded.Add(1)
  1133  }
  1134  
  1135  func (s *Server) incrCallsFailed() {
  1136  	s.channelz.ServerMetrics.CallsFailed.Add(1)
  1137  }
  1138  
  1139  func (s *Server) sendResponse(ctx context.Context, t transport.ServerTransport, stream *transport.Stream, msg any, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {
  1140  	data, err := encode(s.getCodec(stream.ContentSubtype()), msg)
  1141  	if err != nil {
  1142  		channelz.Error(logger, s.channelz, "grpc: server failed to encode response: ", err)
  1143  		return err
  1144  	}
  1145  	compData, err := compress(data, cp, comp)
  1146  	if err != nil {
  1147  		channelz.Error(logger, s.channelz, "grpc: server failed to compress response: ", err)
  1148  		return err
  1149  	}
  1150  	hdr, payload := msgHeader(data, compData)
  1151  	// TODO(dfawley): should we be checking len(data) instead?
  1152  	if len(payload) > s.opts.maxSendMessageSize {
  1153  		return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(payload), s.opts.maxSendMessageSize)
  1154  	}
  1155  	err = t.Write(stream, hdr, payload, opts)
  1156  	if err == nil {
  1157  		for _, sh := range s.opts.statsHandlers {
  1158  			sh.HandleRPC(ctx, outPayload(false, msg, data, payload, time.Now()))
  1159  		}
  1160  	}
  1161  	return err
  1162  }
  1163  
  1164  // chainUnaryServerInterceptors chains all unary server interceptors into one.
  1165  func chainUnaryServerInterceptors(s *Server) {
  1166  	// Prepend opts.unaryInt to the chaining interceptors if it exists, since unaryInt will
  1167  	// be executed before any other chained interceptors.
  1168  	interceptors := s.opts.chainUnaryInts
  1169  	if s.opts.unaryInt != nil {
  1170  		interceptors = append([]UnaryServerInterceptor{s.opts.unaryInt}, s.opts.chainUnaryInts...)
  1171  	}
  1172  
  1173  	var chainedInt UnaryServerInterceptor
  1174  	if len(interceptors) == 0 {
  1175  		chainedInt = nil
  1176  	} else if len(interceptors) == 1 {
  1177  		chainedInt = interceptors[0]
  1178  	} else {
  1179  		chainedInt = chainUnaryInterceptors(interceptors)
  1180  	}
  1181  
  1182  	s.opts.unaryInt = chainedInt
  1183  }
  1184  
  1185  func chainUnaryInterceptors(interceptors []UnaryServerInterceptor) UnaryServerInterceptor {
  1186  	return func(ctx context.Context, req any, info *UnaryServerInfo, handler UnaryHandler) (any, error) {
  1187  		return interceptors[0](ctx, req, info, getChainUnaryHandler(interceptors, 0, info, handler))
  1188  	}
  1189  }
  1190  
  1191  func getChainUnaryHandler(interceptors []UnaryServerInterceptor, curr int, info *UnaryServerInfo, finalHandler UnaryHandler) UnaryHandler {
  1192  	if curr == len(interceptors)-1 {
  1193  		return finalHandler
  1194  	}
  1195  	return func(ctx context.Context, req any) (any, error) {
  1196  		return interceptors[curr+1](ctx, req, info, getChainUnaryHandler(interceptors, curr+1, info, finalHandler))
  1197  	}
  1198  }
  1199  
  1200  func (s *Server) processUnaryRPC(ctx context.Context, t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, md *MethodDesc, trInfo *traceInfo) (err error) {
  1201  	shs := s.opts.statsHandlers
  1202  	if len(shs) != 0 || trInfo != nil || channelz.IsOn() {
  1203  		if channelz.IsOn() {
  1204  			s.incrCallsStarted()
  1205  		}
  1206  		var statsBegin *stats.Begin
  1207  		for _, sh := range shs {
  1208  			beginTime := time.Now()
  1209  			statsBegin = &stats.Begin{
  1210  				BeginTime:      beginTime,
  1211  				IsClientStream: false,
  1212  				IsServerStream: false,
  1213  			}
  1214  			sh.HandleRPC(ctx, statsBegin)
  1215  		}
  1216  		if trInfo != nil {
  1217  			trInfo.tr.LazyLog(&trInfo.firstLine, false)
  1218  		}
  1219  		// The deferred error handling for tracing, stats handler and channelz are
  1220  		// combined into one function to reduce stack usage -- a defer takes ~56-64
  1221  		// bytes on the stack, so overflowing the stack will require a stack
  1222  		// re-allocation, which is expensive.
  1223  		//
  1224  		// To maintain behavior similar to separate deferred statements, statements
  1225  		// should be executed in the reverse order. That is, tracing first, stats
  1226  		// handler second, and channelz last. Note that panics *within* defers will
  1227  		// lead to different behavior, but that's an acceptable compromise; that
  1228  		// would be undefined behavior territory anyway.
  1229  		defer func() {
  1230  			if trInfo != nil {
  1231  				if err != nil && err != io.EOF {
  1232  					trInfo.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)
  1233  					trInfo.tr.SetError()
  1234  				}
  1235  				trInfo.tr.Finish()
  1236  			}
  1237  
  1238  			for _, sh := range shs {
  1239  				end := &stats.End{
  1240  					BeginTime: statsBegin.BeginTime,
  1241  					EndTime:   time.Now(),
  1242  				}
  1243  				if err != nil && err != io.EOF {
  1244  					end.Error = toRPCErr(err)
  1245  				}
  1246  				sh.HandleRPC(ctx, end)
  1247  			}
  1248  
  1249  			if channelz.IsOn() {
  1250  				if err != nil && err != io.EOF {
  1251  					s.incrCallsFailed()
  1252  				} else {
  1253  					s.incrCallsSucceeded()
  1254  				}
  1255  			}
  1256  		}()
  1257  	}
  1258  	var binlogs []binarylog.MethodLogger
  1259  	if ml := binarylog.GetMethodLogger(stream.Method()); ml != nil {
  1260  		binlogs = append(binlogs, ml)
  1261  	}
  1262  	if s.opts.binaryLogger != nil {
  1263  		if ml := s.opts.binaryLogger.GetMethodLogger(stream.Method()); ml != nil {
  1264  			binlogs = append(binlogs, ml)
  1265  		}
  1266  	}
  1267  	if len(binlogs) != 0 {
  1268  		md, _ := metadata.FromIncomingContext(ctx)
  1269  		logEntry := &binarylog.ClientHeader{
  1270  			Header:     md,
  1271  			MethodName: stream.Method(),
  1272  			PeerAddr:   nil,
  1273  		}
  1274  		if deadline, ok := ctx.Deadline(); ok {
  1275  			logEntry.Timeout = time.Until(deadline)
  1276  			if logEntry.Timeout < 0 {
  1277  				logEntry.Timeout = 0
  1278  			}
  1279  		}
  1280  		if a := md[":authority"]; len(a) > 0 {
  1281  			logEntry.Authority = a[0]
  1282  		}
  1283  		if peer, ok := peer.FromContext(ctx); ok {
  1284  			logEntry.PeerAddr = peer.Addr
  1285  		}
  1286  		for _, binlog := range binlogs {
  1287  			binlog.Log(ctx, logEntry)
  1288  		}
  1289  	}
  1290  
  1291  	// comp and cp are used for compression.  decomp and dc are used for
  1292  	// decompression.  If comp and decomp are both set, they are the same;
  1293  	// however they are kept separate to ensure that at most one of the
  1294  	// compressor/decompressor variable pairs are set for use later.
  1295  	var comp, decomp encoding.Compressor
  1296  	var cp Compressor
  1297  	var dc Decompressor
  1298  	var sendCompressorName string
  1299  
  1300  	// If dc is set and matches the stream's compression, use it.  Otherwise, try
  1301  	// to find a matching registered compressor for decomp.
  1302  	if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
  1303  		dc = s.opts.dc
  1304  	} else if rc != "" && rc != encoding.Identity {
  1305  		decomp = encoding.GetCompressor(rc)
  1306  		if decomp == nil {
  1307  			st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
  1308  			t.WriteStatus(stream, st)
  1309  			return st.Err()
  1310  		}
  1311  	}
  1312  
  1313  	// If cp is set, use it.  Otherwise, attempt to compress the response using
  1314  	// the incoming message compression method.
  1315  	//
  1316  	// NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
  1317  	if s.opts.cp != nil {
  1318  		cp = s.opts.cp
  1319  		sendCompressorName = cp.Type()
  1320  	} else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
  1321  		// Legacy compressor not specified; attempt to respond with same encoding.
  1322  		comp = encoding.GetCompressor(rc)
  1323  		if comp != nil {
  1324  			sendCompressorName = comp.Name()
  1325  		}
  1326  	}
  1327  
  1328  	if sendCompressorName != "" {
  1329  		if err := stream.SetSendCompress(sendCompressorName); err != nil {
  1330  			return status.Errorf(codes.Internal, "grpc: failed to set send compressor: %v", err)
  1331  		}
  1332  	}
  1333  
  1334  	var payInfo *payloadInfo
  1335  	if len(shs) != 0 || len(binlogs) != 0 {
  1336  		payInfo = &payloadInfo{}
  1337  	}
  1338  
  1339  	d, cancel, err := recvAndDecompress(&parser{r: stream, recvBufferPool: s.opts.recvBufferPool}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
  1340  	if err != nil {
  1341  		if e := t.WriteStatus(stream, status.Convert(err)); e != nil {
  1342  			channelz.Warningf(logger, s.channelz, "grpc: Server.processUnaryRPC failed to write status: %v", e)
  1343  		}
  1344  		return err
  1345  	}
  1346  	if channelz.IsOn() {
  1347  		t.IncrMsgRecv()
  1348  	}
  1349  	df := func(v any) error {
  1350  		defer cancel()
  1351  
  1352  		if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {
  1353  			return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
  1354  		}
  1355  		for _, sh := range shs {
  1356  			sh.HandleRPC(ctx, &stats.InPayload{
  1357  				RecvTime:         time.Now(),
  1358  				Payload:          v,
  1359  				Length:           len(d),
  1360  				WireLength:       payInfo.compressedLength + headerLen,
  1361  				CompressedLength: payInfo.compressedLength,
  1362  				Data:             d,
  1363  			})
  1364  		}
  1365  		if len(binlogs) != 0 {
  1366  			cm := &binarylog.ClientMessage{
  1367  				Message: d,
  1368  			}
  1369  			for _, binlog := range binlogs {
  1370  				binlog.Log(ctx, cm)
  1371  			}
  1372  		}
  1373  		if trInfo != nil {
  1374  			trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
  1375  		}
  1376  		return nil
  1377  	}
  1378  	ctx = NewContextWithServerTransportStream(ctx, stream)
  1379  	reply, appErr := md.Handler(info.serviceImpl, ctx, df, s.opts.unaryInt)
  1380  	if appErr != nil {
  1381  		appStatus, ok := status.FromError(appErr)
  1382  		if !ok {
  1383  			// Convert non-status application error to a status error with code
  1384  			// Unknown, but handle context errors specifically.
  1385  			appStatus = status.FromContextError(appErr)
  1386  			appErr = appStatus.Err()
  1387  		}
  1388  		if trInfo != nil {
  1389  			trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
  1390  			trInfo.tr.SetError()
  1391  		}
  1392  		if e := t.WriteStatus(stream, appStatus); e != nil {
  1393  			channelz.Warningf(logger, s.channelz, "grpc: Server.processUnaryRPC failed to write status: %v", e)
  1394  		}
  1395  		if len(binlogs) != 0 {
  1396  			if h, _ := stream.Header(); h.Len() > 0 {
  1397  				// Only log serverHeader if there was header. Otherwise it can
  1398  				// be trailer only.
  1399  				sh := &binarylog.ServerHeader{
  1400  					Header: h,
  1401  				}
  1402  				for _, binlog := range binlogs {
  1403  					binlog.Log(ctx, sh)
  1404  				}
  1405  			}
  1406  			st := &binarylog.ServerTrailer{
  1407  				Trailer: stream.Trailer(),
  1408  				Err:     appErr,
  1409  			}
  1410  			for _, binlog := range binlogs {
  1411  				binlog.Log(ctx, st)
  1412  			}
  1413  		}
  1414  		return appErr
  1415  	}
  1416  	if trInfo != nil {
  1417  		trInfo.tr.LazyLog(stringer("OK"), false)
  1418  	}
  1419  	opts := &transport.Options{Last: true}
  1420  
  1421  	// Server handler could have set new compressor by calling SetSendCompressor.
  1422  	// In case it is set, we need to use it for compressing outbound message.
  1423  	if stream.SendCompress() != sendCompressorName {
  1424  		comp = encoding.GetCompressor(stream.SendCompress())
  1425  	}
  1426  	if err := s.sendResponse(ctx, t, stream, reply, cp, opts, comp); err != nil {
  1427  		if err == io.EOF {
  1428  			// The entire stream is done (for unary RPC only).
  1429  			return err
  1430  		}
  1431  		if sts, ok := status.FromError(err); ok {
  1432  			if e := t.WriteStatus(stream, sts); e != nil {
  1433  				channelz.Warningf(logger, s.channelz, "grpc: Server.processUnaryRPC failed to write status: %v", e)
  1434  			}
  1435  		} else {
  1436  			switch st := err.(type) {
  1437  			case transport.ConnectionError:
  1438  				// Nothing to do here.
  1439  			default:
  1440  				panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
  1441  			}
  1442  		}
  1443  		if len(binlogs) != 0 {
  1444  			h, _ := stream.Header()
  1445  			sh := &binarylog.ServerHeader{
  1446  				Header: h,
  1447  			}
  1448  			st := &binarylog.ServerTrailer{
  1449  				Trailer: stream.Trailer(),
  1450  				Err:     appErr,
  1451  			}
  1452  			for _, binlog := range binlogs {
  1453  				binlog.Log(ctx, sh)
  1454  				binlog.Log(ctx, st)
  1455  			}
  1456  		}
  1457  		return err
  1458  	}
  1459  	if len(binlogs) != 0 {
  1460  		h, _ := stream.Header()
  1461  		sh := &binarylog.ServerHeader{
  1462  			Header: h,
  1463  		}
  1464  		sm := &binarylog.ServerMessage{
  1465  			Message: reply,
  1466  		}
  1467  		for _, binlog := range binlogs {
  1468  			binlog.Log(ctx, sh)
  1469  			binlog.Log(ctx, sm)
  1470  		}
  1471  	}
  1472  	if channelz.IsOn() {
  1473  		t.IncrMsgSent()
  1474  	}
  1475  	if trInfo != nil {
  1476  		trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
  1477  	}
  1478  	// TODO: Should we be logging if writing status failed here, like above?
  1479  	// Should the logging be in WriteStatus?  Should we ignore the WriteStatus
  1480  	// error or allow the stats handler to see it?
  1481  	if len(binlogs) != 0 {
  1482  		st := &binarylog.ServerTrailer{
  1483  			Trailer: stream.Trailer(),
  1484  			Err:     appErr,
  1485  		}
  1486  		for _, binlog := range binlogs {
  1487  			binlog.Log(ctx, st)
  1488  		}
  1489  	}
  1490  	return t.WriteStatus(stream, statusOK)
  1491  }
  1492  
  1493  // chainStreamServerInterceptors chains all stream server interceptors into one.
  1494  func chainStreamServerInterceptors(s *Server) {
  1495  	// Prepend opts.streamInt to the chaining interceptors if it exists, since streamInt will
  1496  	// be executed before any other chained interceptors.
  1497  	interceptors := s.opts.chainStreamInts
  1498  	if s.opts.streamInt != nil {
  1499  		interceptors = append([]StreamServerInterceptor{s.opts.streamInt}, s.opts.chainStreamInts...)
  1500  	}
  1501  
  1502  	var chainedInt StreamServerInterceptor
  1503  	if len(interceptors) == 0 {
  1504  		chainedInt = nil
  1505  	} else if len(interceptors) == 1 {
  1506  		chainedInt = interceptors[0]
  1507  	} else {
  1508  		chainedInt = chainStreamInterceptors(interceptors)
  1509  	}
  1510  
  1511  	s.opts.streamInt = chainedInt
  1512  }
  1513  
  1514  func chainStreamInterceptors(interceptors []StreamServerInterceptor) StreamServerInterceptor {
  1515  	return func(srv any, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error {
  1516  		return interceptors[0](srv, ss, info, getChainStreamHandler(interceptors, 0, info, handler))
  1517  	}
  1518  }
  1519  
  1520  func getChainStreamHandler(interceptors []StreamServerInterceptor, curr int, info *StreamServerInfo, finalHandler StreamHandler) StreamHandler {
  1521  	if curr == len(interceptors)-1 {
  1522  		return finalHandler
  1523  	}
  1524  	return func(srv any, stream ServerStream) error {
  1525  		return interceptors[curr+1](srv, stream, info, getChainStreamHandler(interceptors, curr+1, info, finalHandler))
  1526  	}
  1527  }
  1528  
  1529  func (s *Server) processStreamingRPC(ctx context.Context, t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, sd *StreamDesc, trInfo *traceInfo) (err error) {
  1530  	if channelz.IsOn() {
  1531  		s.incrCallsStarted()
  1532  	}
  1533  	shs := s.opts.statsHandlers
  1534  	var statsBegin *stats.Begin
  1535  	if len(shs) != 0 {
  1536  		beginTime := time.Now()
  1537  		statsBegin = &stats.Begin{
  1538  			BeginTime:      beginTime,
  1539  			IsClientStream: sd.ClientStreams,
  1540  			IsServerStream: sd.ServerStreams,
  1541  		}
  1542  		for _, sh := range shs {
  1543  			sh.HandleRPC(ctx, statsBegin)
  1544  		}
  1545  	}
  1546  	ctx = NewContextWithServerTransportStream(ctx, stream)
  1547  	ss := &serverStream{
  1548  		ctx:                   ctx,
  1549  		t:                     t,
  1550  		s:                     stream,
  1551  		p:                     &parser{r: stream, recvBufferPool: s.opts.recvBufferPool},
  1552  		codec:                 s.getCodec(stream.ContentSubtype()),
  1553  		maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
  1554  		maxSendMessageSize:    s.opts.maxSendMessageSize,
  1555  		trInfo:                trInfo,
  1556  		statsHandler:          shs,
  1557  	}
  1558  
  1559  	if len(shs) != 0 || trInfo != nil || channelz.IsOn() {
  1560  		// See comment in processUnaryRPC on defers.
  1561  		defer func() {
  1562  			if trInfo != nil {
  1563  				ss.mu.Lock()
  1564  				if err != nil && err != io.EOF {
  1565  					ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)
  1566  					ss.trInfo.tr.SetError()
  1567  				}
  1568  				ss.trInfo.tr.Finish()
  1569  				ss.trInfo.tr = nil
  1570  				ss.mu.Unlock()
  1571  			}
  1572  
  1573  			if len(shs) != 0 {
  1574  				end := &stats.End{
  1575  					BeginTime: statsBegin.BeginTime,
  1576  					EndTime:   time.Now(),
  1577  				}
  1578  				if err != nil && err != io.EOF {
  1579  					end.Error = toRPCErr(err)
  1580  				}
  1581  				for _, sh := range shs {
  1582  					sh.HandleRPC(ctx, end)
  1583  				}
  1584  			}
  1585  
  1586  			if channelz.IsOn() {
  1587  				if err != nil && err != io.EOF {
  1588  					s.incrCallsFailed()
  1589  				} else {
  1590  					s.incrCallsSucceeded()
  1591  				}
  1592  			}
  1593  		}()
  1594  	}
  1595  
  1596  	if ml := binarylog.GetMethodLogger(stream.Method()); ml != nil {
  1597  		ss.binlogs = append(ss.binlogs, ml)
  1598  	}
  1599  	if s.opts.binaryLogger != nil {
  1600  		if ml := s.opts.binaryLogger.GetMethodLogger(stream.Method()); ml != nil {
  1601  			ss.binlogs = append(ss.binlogs, ml)
  1602  		}
  1603  	}
  1604  	if len(ss.binlogs) != 0 {
  1605  		md, _ := metadata.FromIncomingContext(ctx)
  1606  		logEntry := &binarylog.ClientHeader{
  1607  			Header:     md,
  1608  			MethodName: stream.Method(),
  1609  			PeerAddr:   nil,
  1610  		}
  1611  		if deadline, ok := ctx.Deadline(); ok {
  1612  			logEntry.Timeout = time.Until(deadline)
  1613  			if logEntry.Timeout < 0 {
  1614  				logEntry.Timeout = 0
  1615  			}
  1616  		}
  1617  		if a := md[":authority"]; len(a) > 0 {
  1618  			logEntry.Authority = a[0]
  1619  		}
  1620  		if peer, ok := peer.FromContext(ss.Context()); ok {
  1621  			logEntry.PeerAddr = peer.Addr
  1622  		}
  1623  		for _, binlog := range ss.binlogs {
  1624  			binlog.Log(ctx, logEntry)
  1625  		}
  1626  	}
  1627  
  1628  	// If dc is set and matches the stream's compression, use it.  Otherwise, try
  1629  	// to find a matching registered compressor for decomp.
  1630  	if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
  1631  		ss.dc = s.opts.dc
  1632  	} else if rc != "" && rc != encoding.Identity {
  1633  		ss.decomp = encoding.GetCompressor(rc)
  1634  		if ss.decomp == nil {
  1635  			st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
  1636  			t.WriteStatus(ss.s, st)
  1637  			return st.Err()
  1638  		}
  1639  	}
  1640  
  1641  	// If cp is set, use it.  Otherwise, attempt to compress the response using
  1642  	// the incoming message compression method.
  1643  	//
  1644  	// NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
  1645  	if s.opts.cp != nil {
  1646  		ss.cp = s.opts.cp
  1647  		ss.sendCompressorName = s.opts.cp.Type()
  1648  	} else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
  1649  		// Legacy compressor not specified; attempt to respond with same encoding.
  1650  		ss.comp = encoding.GetCompressor(rc)
  1651  		if ss.comp != nil {
  1652  			ss.sendCompressorName = rc
  1653  		}
  1654  	}
  1655  
  1656  	if ss.sendCompressorName != "" {
  1657  		if err := stream.SetSendCompress(ss.sendCompressorName); err != nil {
  1658  			return status.Errorf(codes.Internal, "grpc: failed to set send compressor: %v", err)
  1659  		}
  1660  	}
  1661  
  1662  	ss.ctx = newContextWithRPCInfo(ss.ctx, false, ss.codec, ss.cp, ss.comp)
  1663  
  1664  	if trInfo != nil {
  1665  		trInfo.tr.LazyLog(&trInfo.firstLine, false)
  1666  	}
  1667  	var appErr error
  1668  	var server any
  1669  	if info != nil {
  1670  		server = info.serviceImpl
  1671  	}
  1672  	if s.opts.streamInt == nil {
  1673  		appErr = sd.Handler(server, ss)
  1674  	} else {
  1675  		info := &StreamServerInfo{
  1676  			FullMethod:     stream.Method(),
  1677  			IsClientStream: sd.ClientStreams,
  1678  			IsServerStream: sd.ServerStreams,
  1679  		}
  1680  		appErr = s.opts.streamInt(server, ss, info, sd.Handler)
  1681  	}
  1682  	if appErr != nil {
  1683  		appStatus, ok := status.FromError(appErr)
  1684  		if !ok {
  1685  			// Convert non-status application error to a status error with code
  1686  			// Unknown, but handle context errors specifically.
  1687  			appStatus = status.FromContextError(appErr)
  1688  			appErr = appStatus.Err()
  1689  		}
  1690  		if trInfo != nil {
  1691  			ss.mu.Lock()
  1692  			ss.trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
  1693  			ss.trInfo.tr.SetError()
  1694  			ss.mu.Unlock()
  1695  		}
  1696  		if len(ss.binlogs) != 0 {
  1697  			st := &binarylog.ServerTrailer{
  1698  				Trailer: ss.s.Trailer(),
  1699  				Err:     appErr,
  1700  			}
  1701  			for _, binlog := range ss.binlogs {
  1702  				binlog.Log(ctx, st)
  1703  			}
  1704  		}
  1705  		t.WriteStatus(ss.s, appStatus)
  1706  		// TODO: Should we log an error from WriteStatus here and below?
  1707  		return appErr
  1708  	}
  1709  	if trInfo != nil {
  1710  		ss.mu.Lock()
  1711  		ss.trInfo.tr.LazyLog(stringer("OK"), false)
  1712  		ss.mu.Unlock()
  1713  	}
  1714  	if len(ss.binlogs) != 0 {
  1715  		st := &binarylog.ServerTrailer{
  1716  			Trailer: ss.s.Trailer(),
  1717  			Err:     appErr,
  1718  		}
  1719  		for _, binlog := range ss.binlogs {
  1720  			binlog.Log(ctx, st)
  1721  		}
  1722  	}
  1723  	return t.WriteStatus(ss.s, statusOK)
  1724  }
  1725  
  1726  func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream) {
  1727  	ctx := stream.Context()
  1728  	ctx = contextWithServer(ctx, s)
  1729  	var ti *traceInfo
  1730  	if EnableTracing {
  1731  		tr := newTrace("grpc.Recv."+methodFamily(stream.Method()), stream.Method())
  1732  		ctx = newTraceContext(ctx, tr)
  1733  		ti = &traceInfo{
  1734  			tr: tr,
  1735  			firstLine: firstLine{
  1736  				client:     false,
  1737  				remoteAddr: t.Peer().Addr,
  1738  			},
  1739  		}
  1740  		if dl, ok := ctx.Deadline(); ok {
  1741  			ti.firstLine.deadline = time.Until(dl)
  1742  		}
  1743  	}
  1744  
  1745  	sm := stream.Method()
  1746  	if sm != "" && sm[0] == '/' {
  1747  		sm = sm[1:]
  1748  	}
  1749  	pos := strings.LastIndex(sm, "/")
  1750  	if pos == -1 {
  1751  		if ti != nil {
  1752  			ti.tr.LazyLog(&fmtStringer{"Malformed method name %q", []any{sm}}, true)
  1753  			ti.tr.SetError()
  1754  		}
  1755  		errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
  1756  		if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
  1757  			if ti != nil {
  1758  				ti.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)
  1759  				ti.tr.SetError()
  1760  			}
  1761  			channelz.Warningf(logger, s.channelz, "grpc: Server.handleStream failed to write status: %v", err)
  1762  		}
  1763  		if ti != nil {
  1764  			ti.tr.Finish()
  1765  		}
  1766  		return
  1767  	}
  1768  	service := sm[:pos]
  1769  	method := sm[pos+1:]
  1770  
  1771  	md, _ := metadata.FromIncomingContext(ctx)
  1772  	for _, sh := range s.opts.statsHandlers {
  1773  		ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: stream.Method()})
  1774  		sh.HandleRPC(ctx, &stats.InHeader{
  1775  			FullMethod:  stream.Method(),
  1776  			RemoteAddr:  t.Peer().Addr,
  1777  			LocalAddr:   t.Peer().LocalAddr,
  1778  			Compression: stream.RecvCompress(),
  1779  			WireLength:  stream.HeaderWireLength(),
  1780  			Header:      md,
  1781  		})
  1782  	}
  1783  	// To have calls in stream callouts work. Will delete once all stats handler
  1784  	// calls come from the gRPC layer.
  1785  	stream.SetContext(ctx)
  1786  
  1787  	srv, knownService := s.services[service]
  1788  	if knownService {
  1789  		if md, ok := srv.methods[method]; ok {
  1790  			s.processUnaryRPC(ctx, t, stream, srv, md, ti)
  1791  			return
  1792  		}
  1793  		if sd, ok := srv.streams[method]; ok {
  1794  			s.processStreamingRPC(ctx, t, stream, srv, sd, ti)
  1795  			return
  1796  		}
  1797  	}
  1798  	// Unknown service, or known server unknown method.
  1799  	if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
  1800  		s.processStreamingRPC(ctx, t, stream, nil, unknownDesc, ti)
  1801  		return
  1802  	}
  1803  	var errDesc string
  1804  	if !knownService {
  1805  		errDesc = fmt.Sprintf("unknown service %v", service)
  1806  	} else {
  1807  		errDesc = fmt.Sprintf("unknown method %v for service %v", method, service)
  1808  	}
  1809  	if ti != nil {
  1810  		ti.tr.LazyPrintf("%s", errDesc)
  1811  		ti.tr.SetError()
  1812  	}
  1813  	if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
  1814  		if ti != nil {
  1815  			ti.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)
  1816  			ti.tr.SetError()
  1817  		}
  1818  		channelz.Warningf(logger, s.channelz, "grpc: Server.handleStream failed to write status: %v", err)
  1819  	}
  1820  	if ti != nil {
  1821  		ti.tr.Finish()
  1822  	}
  1823  }
  1824  
  1825  // The key to save ServerTransportStream in the context.
  1826  type streamKey struct{}
  1827  
  1828  // NewContextWithServerTransportStream creates a new context from ctx and
  1829  // attaches stream to it.
  1830  //
  1831  // # Experimental
  1832  //
  1833  // Notice: This API is EXPERIMENTAL and may be changed or removed in a
  1834  // later release.
  1835  func NewContextWithServerTransportStream(ctx context.Context, stream ServerTransportStream) context.Context {
  1836  	return context.WithValue(ctx, streamKey{}, stream)
  1837  }
  1838  
  1839  // ServerTransportStream is a minimal interface that a transport stream must
  1840  // implement. This can be used to mock an actual transport stream for tests of
  1841  // handler code that use, for example, grpc.SetHeader (which requires some
  1842  // stream to be in context).
  1843  //
  1844  // See also NewContextWithServerTransportStream.
  1845  //
  1846  // # Experimental
  1847  //
  1848  // Notice: This type is EXPERIMENTAL and may be changed or removed in a
  1849  // later release.
  1850  type ServerTransportStream interface {
  1851  	Method() string
  1852  	SetHeader(md metadata.MD) error
  1853  	SendHeader(md metadata.MD) error
  1854  	SetTrailer(md metadata.MD) error
  1855  }
  1856  
  1857  // ServerTransportStreamFromContext returns the ServerTransportStream saved in
  1858  // ctx. Returns nil if the given context has no stream associated with it
  1859  // (which implies it is not an RPC invocation context).
  1860  //
  1861  // # Experimental
  1862  //
  1863  // Notice: This API is EXPERIMENTAL and may be changed or removed in a
  1864  // later release.
  1865  func ServerTransportStreamFromContext(ctx context.Context) ServerTransportStream {
  1866  	s, _ := ctx.Value(streamKey{}).(ServerTransportStream)
  1867  	return s
  1868  }
  1869  
  1870  // Stop stops the gRPC server. It immediately closes all open
  1871  // connections and listeners.
  1872  // It cancels all active RPCs on the server side and the corresponding
  1873  // pending RPCs on the client side will get notified by connection
  1874  // errors.
  1875  func (s *Server) Stop() {
  1876  	s.stop(false)
  1877  }
  1878  
  1879  // GracefulStop stops the gRPC server gracefully. It stops the server from
  1880  // accepting new connections and RPCs and blocks until all the pending RPCs are
  1881  // finished.
  1882  func (s *Server) GracefulStop() {
  1883  	s.stop(true)
  1884  }
  1885  
  1886  func (s *Server) stop(graceful bool) {
  1887  	s.quit.Fire()
  1888  	defer s.done.Fire()
  1889  
  1890  	s.channelzRemoveOnce.Do(func() { channelz.RemoveEntry(s.channelz.ID) })
  1891  	s.mu.Lock()
  1892  	s.closeListenersLocked()
  1893  	// Wait for serving threads to be ready to exit.  Only then can we be sure no
  1894  	// new conns will be created.
  1895  	s.mu.Unlock()
  1896  	s.serveWG.Wait()
  1897  
  1898  	s.mu.Lock()
  1899  	defer s.mu.Unlock()
  1900  
  1901  	if graceful {
  1902  		s.drainAllServerTransportsLocked()
  1903  	} else {
  1904  		s.closeServerTransportsLocked()
  1905  	}
  1906  
  1907  	for len(s.conns) != 0 {
  1908  		s.cv.Wait()
  1909  	}
  1910  	s.conns = nil
  1911  
  1912  	if s.opts.numServerWorkers > 0 {
  1913  		// Closing the channel (only once, via grpcsync.OnceFunc) after all the
  1914  		// connections have been closed above ensures that there are no
  1915  		// goroutines executing the callback passed to st.HandleStreams (where
  1916  		// the channel is written to).
  1917  		s.serverWorkerChannelClose()
  1918  	}
  1919  
  1920  	if graceful || s.opts.waitForHandlers {
  1921  		s.handlersWG.Wait()
  1922  	}
  1923  
  1924  	if s.events != nil {
  1925  		s.events.Finish()
  1926  		s.events = nil
  1927  	}
  1928  }
  1929  
  1930  // s.mu must be held by the caller.
  1931  func (s *Server) closeServerTransportsLocked() {
  1932  	for _, conns := range s.conns {
  1933  		for st := range conns {
  1934  			st.Close(errors.New("Server.Stop called"))
  1935  		}
  1936  	}
  1937  }
  1938  
  1939  // s.mu must be held by the caller.
  1940  func (s *Server) drainAllServerTransportsLocked() {
  1941  	if !s.drain {
  1942  		for _, conns := range s.conns {
  1943  			for st := range conns {
  1944  				st.Drain("graceful_stop")
  1945  			}
  1946  		}
  1947  		s.drain = true
  1948  	}
  1949  }
  1950  
  1951  // s.mu must be held by the caller.
  1952  func (s *Server) closeListenersLocked() {
  1953  	for lis := range s.lis {
  1954  		lis.Close()
  1955  	}
  1956  	s.lis = nil
  1957  }
  1958  
  1959  // contentSubtype must be lowercase
  1960  // cannot return nil
  1961  func (s *Server) getCodec(contentSubtype string) baseCodec {
  1962  	if s.opts.codec != nil {
  1963  		return s.opts.codec
  1964  	}
  1965  	if contentSubtype == "" {
  1966  		return encoding.GetCodec(proto.Name)
  1967  	}
  1968  	codec := encoding.GetCodec(contentSubtype)
  1969  	if codec == nil {
  1970  		logger.Warningf("Unsupported codec %q. Defaulting to %q for now. This will start to fail in future releases.", contentSubtype, proto.Name)
  1971  		return encoding.GetCodec(proto.Name)
  1972  	}
  1973  	return codec
  1974  }
  1975  
  1976  type serverKey struct{}
  1977  
  1978  // serverFromContext gets the Server from the context.
  1979  func serverFromContext(ctx context.Context) *Server {
  1980  	s, _ := ctx.Value(serverKey{}).(*Server)
  1981  	return s
  1982  }
  1983  
  1984  // contextWithServer sets the Server in the context.
  1985  func contextWithServer(ctx context.Context, server *Server) context.Context {
  1986  	return context.WithValue(ctx, serverKey{}, server)
  1987  }
  1988  
  1989  // isRegisteredMethod returns whether the passed in method is registered as a
  1990  // method on the server. /service/method and service/method will match if the
  1991  // service and method are registered on the server.
  1992  func (s *Server) isRegisteredMethod(serviceMethod string) bool {
  1993  	if serviceMethod != "" && serviceMethod[0] == '/' {
  1994  		serviceMethod = serviceMethod[1:]
  1995  	}
  1996  	pos := strings.LastIndex(serviceMethod, "/")
  1997  	if pos == -1 { // Invalid method name syntax.
  1998  		return false
  1999  	}
  2000  	service := serviceMethod[:pos]
  2001  	method := serviceMethod[pos+1:]
  2002  	srv, knownService := s.services[service]
  2003  	if knownService {
  2004  		if _, ok := srv.methods[method]; ok {
  2005  			return true
  2006  		}
  2007  		if _, ok := srv.streams[method]; ok {
  2008  			return true
  2009  		}
  2010  	}
  2011  	return false
  2012  }
  2013  
  2014  // SetHeader sets the header metadata to be sent from the server to the client.
  2015  // The context provided must be the context passed to the server's handler.
  2016  //
  2017  // Streaming RPCs should prefer the SetHeader method of the ServerStream.
  2018  //
  2019  // When called multiple times, all the provided metadata will be merged.  All
  2020  // the metadata will be sent out when one of the following happens:
  2021  //
  2022  //   - grpc.SendHeader is called, or for streaming handlers, stream.SendHeader.
  2023  //   - The first response message is sent.  For unary handlers, this occurs when
  2024  //     the handler returns; for streaming handlers, this can happen when stream's
  2025  //     SendMsg method is called.
  2026  //   - An RPC status is sent out (error or success).  This occurs when the handler
  2027  //     returns.
  2028  //
  2029  // SetHeader will fail if called after any of the events above.
  2030  //
  2031  // The error returned is compatible with the status package.  However, the
  2032  // status code will often not match the RPC status as seen by the client
  2033  // application, and therefore, should not be relied upon for this purpose.
  2034  func SetHeader(ctx context.Context, md metadata.MD) error {
  2035  	if md.Len() == 0 {
  2036  		return nil
  2037  	}
  2038  	stream := ServerTransportStreamFromContext(ctx)
  2039  	if stream == nil {
  2040  		return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
  2041  	}
  2042  	return stream.SetHeader(md)
  2043  }
  2044  
  2045  // SendHeader sends header metadata. It may be called at most once, and may not
  2046  // be called after any event that causes headers to be sent (see SetHeader for
  2047  // a complete list).  The provided md and headers set by SetHeader() will be
  2048  // sent.
  2049  //
  2050  // The error returned is compatible with the status package.  However, the
  2051  // status code will often not match the RPC status as seen by the client
  2052  // application, and therefore, should not be relied upon for this purpose.
  2053  func SendHeader(ctx context.Context, md metadata.MD) error {
  2054  	stream := ServerTransportStreamFromContext(ctx)
  2055  	if stream == nil {
  2056  		return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
  2057  	}
  2058  	if err := stream.SendHeader(md); err != nil {
  2059  		return toRPCErr(err)
  2060  	}
  2061  	return nil
  2062  }
  2063  
  2064  // SetSendCompressor sets a compressor for outbound messages from the server.
  2065  // It must not be called after any event that causes headers to be sent
  2066  // (see ServerStream.SetHeader for the complete list). Provided compressor is
  2067  // used when below conditions are met:
  2068  //
  2069  //   - compressor is registered via encoding.RegisterCompressor
  2070  //   - compressor name must exist in the client advertised compressor names
  2071  //     sent in grpc-accept-encoding header. Use ClientSupportedCompressors to
  2072  //     get client supported compressor names.
  2073  //
  2074  // The context provided must be the context passed to the server's handler.
  2075  // It must be noted that compressor name encoding.Identity disables the
  2076  // outbound compression.
  2077  // By default, server messages will be sent using the same compressor with
  2078  // which request messages were sent.
  2079  //
  2080  // It is not safe to call SetSendCompressor concurrently with SendHeader and
  2081  // SendMsg.
  2082  //
  2083  // # Experimental
  2084  //
  2085  // Notice: This function is EXPERIMENTAL and may be changed or removed in a
  2086  // later release.
  2087  func SetSendCompressor(ctx context.Context, name string) error {
  2088  	stream, ok := ServerTransportStreamFromContext(ctx).(*transport.Stream)
  2089  	if !ok || stream == nil {
  2090  		return fmt.Errorf("failed to fetch the stream from the given context")
  2091  	}
  2092  
  2093  	if err := validateSendCompressor(name, stream.ClientAdvertisedCompressors()); err != nil {
  2094  		return fmt.Errorf("unable to set send compressor: %w", err)
  2095  	}
  2096  
  2097  	return stream.SetSendCompress(name)
  2098  }
  2099  
  2100  // ClientSupportedCompressors returns compressor names advertised by the client
  2101  // via grpc-accept-encoding header.
  2102  //
  2103  // The context provided must be the context passed to the server's handler.
  2104  //
  2105  // # Experimental
  2106  //
  2107  // Notice: This function is EXPERIMENTAL and may be changed or removed in a
  2108  // later release.
  2109  func ClientSupportedCompressors(ctx context.Context) ([]string, error) {
  2110  	stream, ok := ServerTransportStreamFromContext(ctx).(*transport.Stream)
  2111  	if !ok || stream == nil {
  2112  		return nil, fmt.Errorf("failed to fetch the stream from the given context %v", ctx)
  2113  	}
  2114  
  2115  	return stream.ClientAdvertisedCompressors(), nil
  2116  }
  2117  
  2118  // SetTrailer sets the trailer metadata that will be sent when an RPC returns.
  2119  // When called more than once, all the provided metadata will be merged.
  2120  //
  2121  // The error returned is compatible with the status package.  However, the
  2122  // status code will often not match the RPC status as seen by the client
  2123  // application, and therefore, should not be relied upon for this purpose.
  2124  func SetTrailer(ctx context.Context, md metadata.MD) error {
  2125  	if md.Len() == 0 {
  2126  		return nil
  2127  	}
  2128  	stream := ServerTransportStreamFromContext(ctx)
  2129  	if stream == nil {
  2130  		return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
  2131  	}
  2132  	return stream.SetTrailer(md)
  2133  }
  2134  
  2135  // Method returns the method string for the server context.  The returned
  2136  // string is in the format of "/service/method".
  2137  func Method(ctx context.Context) (string, bool) {
  2138  	s := ServerTransportStreamFromContext(ctx)
  2139  	if s == nil {
  2140  		return "", false
  2141  	}
  2142  	return s.Method(), true
  2143  }
  2144  
  2145  // validateSendCompressor returns an error when given compressor name cannot be
  2146  // handled by the server or the client based on the advertised compressors.
  2147  func validateSendCompressor(name string, clientCompressors []string) error {
  2148  	if name == encoding.Identity {
  2149  		return nil
  2150  	}
  2151  
  2152  	if !grpcutil.IsCompressorNameRegistered(name) {
  2153  		return fmt.Errorf("compressor not registered %q", name)
  2154  	}
  2155  
  2156  	for _, c := range clientCompressors {
  2157  		if c == name {
  2158  			return nil // found match
  2159  		}
  2160  	}
  2161  	return fmt.Errorf("client does not support compressor %q", name)
  2162  }
  2163  
  2164  // atomicSemaphore implements a blocking, counting semaphore. acquire should be
  2165  // called synchronously; release may be called asynchronously.
  2166  type atomicSemaphore struct {
  2167  	n    atomic.Int64
  2168  	wait chan struct{}
  2169  }
  2170  
  2171  func (q *atomicSemaphore) acquire() {
  2172  	if q.n.Add(-1) < 0 {
  2173  		// We ran out of quota.  Block until a release happens.
  2174  		<-q.wait
  2175  	}
  2176  }
  2177  
  2178  func (q *atomicSemaphore) release() {
  2179  	// N.B. the "<= 0" check below should allow for this to work with multiple
  2180  	// concurrent calls to acquire, but also note that with synchronous calls to
  2181  	// acquire, as our system does, n will never be less than -1.  There are
  2182  	// fairness issues (queuing) to consider if this was to be generalized.
  2183  	if q.n.Add(1) <= 0 {
  2184  		// An acquire was waiting on us.  Unblock it.
  2185  		q.wait <- struct{}{}
  2186  	}
  2187  }
  2188  
  2189  func newHandlerQuota(n uint32) *atomicSemaphore {
  2190  	a := &atomicSemaphore{wait: make(chan struct{}, 1)}
  2191  	a.n.Store(int64(n))
  2192  	return a
  2193  }
  2194  

View as plain text