...

Source file src/google.golang.org/grpc/internal/transport/transport.go

Documentation: google.golang.org/grpc/internal/transport

     1  /*
     2   *
     3   * Copyright 2014 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  // Package transport defines and implements message oriented communication
    20  // channel to complete various transactions (e.g., an RPC).  It is meant for
    21  // grpc-internal usage and is not intended to be imported directly by users.
    22  package transport
    23  
    24  import (
    25  	"bytes"
    26  	"context"
    27  	"errors"
    28  	"fmt"
    29  	"io"
    30  	"net"
    31  	"strings"
    32  	"sync"
    33  	"sync/atomic"
    34  	"time"
    35  
    36  	"google.golang.org/grpc/codes"
    37  	"google.golang.org/grpc/credentials"
    38  	"google.golang.org/grpc/internal/channelz"
    39  	"google.golang.org/grpc/keepalive"
    40  	"google.golang.org/grpc/metadata"
    41  	"google.golang.org/grpc/peer"
    42  	"google.golang.org/grpc/resolver"
    43  	"google.golang.org/grpc/stats"
    44  	"google.golang.org/grpc/status"
    45  	"google.golang.org/grpc/tap"
    46  )
    47  
    48  const logLevel = 2
    49  
    50  type bufferPool struct {
    51  	pool sync.Pool
    52  }
    53  
    54  func newBufferPool() *bufferPool {
    55  	return &bufferPool{
    56  		pool: sync.Pool{
    57  			New: func() any {
    58  				return new(bytes.Buffer)
    59  			},
    60  		},
    61  	}
    62  }
    63  
    64  func (p *bufferPool) get() *bytes.Buffer {
    65  	return p.pool.Get().(*bytes.Buffer)
    66  }
    67  
    68  func (p *bufferPool) put(b *bytes.Buffer) {
    69  	p.pool.Put(b)
    70  }
    71  
    72  // recvMsg represents the received msg from the transport. All transport
    73  // protocol specific info has been removed.
    74  type recvMsg struct {
    75  	buffer *bytes.Buffer
    76  	// nil: received some data
    77  	// io.EOF: stream is completed. data is nil.
    78  	// other non-nil error: transport failure. data is nil.
    79  	err error
    80  }
    81  
    82  // recvBuffer is an unbounded channel of recvMsg structs.
    83  //
    84  // Note: recvBuffer differs from buffer.Unbounded only in the fact that it
    85  // holds a channel of recvMsg structs instead of objects implementing "item"
    86  // interface. recvBuffer is written to much more often and using strict recvMsg
    87  // structs helps avoid allocation in "recvBuffer.put"
    88  type recvBuffer struct {
    89  	c       chan recvMsg
    90  	mu      sync.Mutex
    91  	backlog []recvMsg
    92  	err     error
    93  }
    94  
    95  func newRecvBuffer() *recvBuffer {
    96  	b := &recvBuffer{
    97  		c: make(chan recvMsg, 1),
    98  	}
    99  	return b
   100  }
   101  
   102  func (b *recvBuffer) put(r recvMsg) {
   103  	b.mu.Lock()
   104  	if b.err != nil {
   105  		b.mu.Unlock()
   106  		// An error had occurred earlier, don't accept more
   107  		// data or errors.
   108  		return
   109  	}
   110  	b.err = r.err
   111  	if len(b.backlog) == 0 {
   112  		select {
   113  		case b.c <- r:
   114  			b.mu.Unlock()
   115  			return
   116  		default:
   117  		}
   118  	}
   119  	b.backlog = append(b.backlog, r)
   120  	b.mu.Unlock()
   121  }
   122  
   123  func (b *recvBuffer) load() {
   124  	b.mu.Lock()
   125  	if len(b.backlog) > 0 {
   126  		select {
   127  		case b.c <- b.backlog[0]:
   128  			b.backlog[0] = recvMsg{}
   129  			b.backlog = b.backlog[1:]
   130  		default:
   131  		}
   132  	}
   133  	b.mu.Unlock()
   134  }
   135  
   136  // get returns the channel that receives a recvMsg in the buffer.
   137  //
   138  // Upon receipt of a recvMsg, the caller should call load to send another
   139  // recvMsg onto the channel if there is any.
   140  func (b *recvBuffer) get() <-chan recvMsg {
   141  	return b.c
   142  }
   143  
   144  // recvBufferReader implements io.Reader interface to read the data from
   145  // recvBuffer.
   146  type recvBufferReader struct {
   147  	closeStream func(error) // Closes the client transport stream with the given error and nil trailer metadata.
   148  	ctx         context.Context
   149  	ctxDone     <-chan struct{} // cache of ctx.Done() (for performance).
   150  	recv        *recvBuffer
   151  	last        *bytes.Buffer // Stores the remaining data in the previous calls.
   152  	err         error
   153  	freeBuffer  func(*bytes.Buffer)
   154  }
   155  
   156  // Read reads the next len(p) bytes from last. If last is drained, it tries to
   157  // read additional data from recv. It blocks if there no additional data available
   158  // in recv. If Read returns any non-nil error, it will continue to return that error.
   159  func (r *recvBufferReader) Read(p []byte) (n int, err error) {
   160  	if r.err != nil {
   161  		return 0, r.err
   162  	}
   163  	if r.last != nil {
   164  		// Read remaining data left in last call.
   165  		copied, _ := r.last.Read(p)
   166  		if r.last.Len() == 0 {
   167  			r.freeBuffer(r.last)
   168  			r.last = nil
   169  		}
   170  		return copied, nil
   171  	}
   172  	if r.closeStream != nil {
   173  		n, r.err = r.readClient(p)
   174  	} else {
   175  		n, r.err = r.read(p)
   176  	}
   177  	return n, r.err
   178  }
   179  
   180  func (r *recvBufferReader) read(p []byte) (n int, err error) {
   181  	select {
   182  	case <-r.ctxDone:
   183  		return 0, ContextErr(r.ctx.Err())
   184  	case m := <-r.recv.get():
   185  		return r.readAdditional(m, p)
   186  	}
   187  }
   188  
   189  func (r *recvBufferReader) readClient(p []byte) (n int, err error) {
   190  	// If the context is canceled, then closes the stream with nil metadata.
   191  	// closeStream writes its error parameter to r.recv as a recvMsg.
   192  	// r.readAdditional acts on that message and returns the necessary error.
   193  	select {
   194  	case <-r.ctxDone:
   195  		// Note that this adds the ctx error to the end of recv buffer, and
   196  		// reads from the head. This will delay the error until recv buffer is
   197  		// empty, thus will delay ctx cancellation in Recv().
   198  		//
   199  		// It's done this way to fix a race between ctx cancel and trailer. The
   200  		// race was, stream.Recv() may return ctx error if ctxDone wins the
   201  		// race, but stream.Trailer() may return a non-nil md because the stream
   202  		// was not marked as done when trailer is received. This closeStream
   203  		// call will mark stream as done, thus fix the race.
   204  		//
   205  		// TODO: delaying ctx error seems like a unnecessary side effect. What
   206  		// we really want is to mark the stream as done, and return ctx error
   207  		// faster.
   208  		r.closeStream(ContextErr(r.ctx.Err()))
   209  		m := <-r.recv.get()
   210  		return r.readAdditional(m, p)
   211  	case m := <-r.recv.get():
   212  		return r.readAdditional(m, p)
   213  	}
   214  }
   215  
   216  func (r *recvBufferReader) readAdditional(m recvMsg, p []byte) (n int, err error) {
   217  	r.recv.load()
   218  	if m.err != nil {
   219  		return 0, m.err
   220  	}
   221  	copied, _ := m.buffer.Read(p)
   222  	if m.buffer.Len() == 0 {
   223  		r.freeBuffer(m.buffer)
   224  		r.last = nil
   225  	} else {
   226  		r.last = m.buffer
   227  	}
   228  	return copied, nil
   229  }
   230  
   231  type streamState uint32
   232  
   233  const (
   234  	streamActive    streamState = iota
   235  	streamWriteDone             // EndStream sent
   236  	streamReadDone              // EndStream received
   237  	streamDone                  // the entire stream is finished.
   238  )
   239  
   240  // Stream represents an RPC in the transport layer.
   241  type Stream struct {
   242  	id           uint32
   243  	st           ServerTransport    // nil for client side Stream
   244  	ct           *http2Client       // nil for server side Stream
   245  	ctx          context.Context    // the associated context of the stream
   246  	cancel       context.CancelFunc // always nil for client side Stream
   247  	done         chan struct{}      // closed at the end of stream to unblock writers. On the client side.
   248  	doneFunc     func()             // invoked at the end of stream on client side.
   249  	ctxDone      <-chan struct{}    // same as done chan but for server side. Cache of ctx.Done() (for performance)
   250  	method       string             // the associated RPC method of the stream
   251  	recvCompress string
   252  	sendCompress string
   253  	buf          *recvBuffer
   254  	trReader     io.Reader
   255  	fc           *inFlow
   256  	wq           *writeQuota
   257  
   258  	// Holds compressor names passed in grpc-accept-encoding metadata from the
   259  	// client. This is empty for the client side stream.
   260  	clientAdvertisedCompressors string
   261  	// Callback to state application's intentions to read data. This
   262  	// is used to adjust flow control, if needed.
   263  	requestRead func(int)
   264  
   265  	headerChan       chan struct{} // closed to indicate the end of header metadata.
   266  	headerChanClosed uint32        // set when headerChan is closed. Used to avoid closing headerChan multiple times.
   267  	// headerValid indicates whether a valid header was received.  Only
   268  	// meaningful after headerChan is closed (always call waitOnHeader() before
   269  	// reading its value).  Not valid on server side.
   270  	headerValid      bool
   271  	headerWireLength int // Only set on server side.
   272  
   273  	// hdrMu protects header and trailer metadata on the server-side.
   274  	hdrMu sync.Mutex
   275  	// On client side, header keeps the received header metadata.
   276  	//
   277  	// On server side, header keeps the header set by SetHeader(). The complete
   278  	// header will merged into this after t.WriteHeader() is called.
   279  	header  metadata.MD
   280  	trailer metadata.MD // the key-value map of trailer metadata.
   281  
   282  	noHeaders bool // set if the client never received headers (set only after the stream is done).
   283  
   284  	// On the server-side, headerSent is atomically set to 1 when the headers are sent out.
   285  	headerSent uint32
   286  
   287  	state streamState
   288  
   289  	// On client-side it is the status error received from the server.
   290  	// On server-side it is unused.
   291  	status *status.Status
   292  
   293  	bytesReceived uint32 // indicates whether any bytes have been received on this stream
   294  	unprocessed   uint32 // set if the server sends a refused stream or GOAWAY including this stream
   295  
   296  	// contentSubtype is the content-subtype for requests.
   297  	// this must be lowercase or the behavior is undefined.
   298  	contentSubtype string
   299  }
   300  
   301  // isHeaderSent is only valid on the server-side.
   302  func (s *Stream) isHeaderSent() bool {
   303  	return atomic.LoadUint32(&s.headerSent) == 1
   304  }
   305  
   306  // updateHeaderSent updates headerSent and returns true
   307  // if it was already set. It is valid only on server-side.
   308  func (s *Stream) updateHeaderSent() bool {
   309  	return atomic.SwapUint32(&s.headerSent, 1) == 1
   310  }
   311  
   312  func (s *Stream) swapState(st streamState) streamState {
   313  	return streamState(atomic.SwapUint32((*uint32)(&s.state), uint32(st)))
   314  }
   315  
   316  func (s *Stream) compareAndSwapState(oldState, newState streamState) bool {
   317  	return atomic.CompareAndSwapUint32((*uint32)(&s.state), uint32(oldState), uint32(newState))
   318  }
   319  
   320  func (s *Stream) getState() streamState {
   321  	return streamState(atomic.LoadUint32((*uint32)(&s.state)))
   322  }
   323  
   324  func (s *Stream) waitOnHeader() {
   325  	if s.headerChan == nil {
   326  		// On the server headerChan is always nil since a stream originates
   327  		// only after having received headers.
   328  		return
   329  	}
   330  	select {
   331  	case <-s.ctx.Done():
   332  		// Close the stream to prevent headers/trailers from changing after
   333  		// this function returns.
   334  		s.ct.CloseStream(s, ContextErr(s.ctx.Err()))
   335  		// headerChan could possibly not be closed yet if closeStream raced
   336  		// with operateHeaders; wait until it is closed explicitly here.
   337  		<-s.headerChan
   338  	case <-s.headerChan:
   339  	}
   340  }
   341  
   342  // RecvCompress returns the compression algorithm applied to the inbound
   343  // message. It is empty string if there is no compression applied.
   344  func (s *Stream) RecvCompress() string {
   345  	s.waitOnHeader()
   346  	return s.recvCompress
   347  }
   348  
   349  // SetSendCompress sets the compression algorithm to the stream.
   350  func (s *Stream) SetSendCompress(name string) error {
   351  	if s.isHeaderSent() || s.getState() == streamDone {
   352  		return errors.New("transport: set send compressor called after headers sent or stream done")
   353  	}
   354  
   355  	s.sendCompress = name
   356  	return nil
   357  }
   358  
   359  // SendCompress returns the send compressor name.
   360  func (s *Stream) SendCompress() string {
   361  	return s.sendCompress
   362  }
   363  
   364  // ClientAdvertisedCompressors returns the compressor names advertised by the
   365  // client via grpc-accept-encoding header.
   366  func (s *Stream) ClientAdvertisedCompressors() []string {
   367  	values := strings.Split(s.clientAdvertisedCompressors, ",")
   368  	for i, v := range values {
   369  		values[i] = strings.TrimSpace(v)
   370  	}
   371  	return values
   372  }
   373  
   374  // Done returns a channel which is closed when it receives the final status
   375  // from the server.
   376  func (s *Stream) Done() <-chan struct{} {
   377  	return s.done
   378  }
   379  
   380  // Header returns the header metadata of the stream.
   381  //
   382  // On client side, it acquires the key-value pairs of header metadata once it is
   383  // available. It blocks until i) the metadata is ready or ii) there is no header
   384  // metadata or iii) the stream is canceled/expired.
   385  //
   386  // On server side, it returns the out header after t.WriteHeader is called.  It
   387  // does not block and must not be called until after WriteHeader.
   388  func (s *Stream) Header() (metadata.MD, error) {
   389  	if s.headerChan == nil {
   390  		// On server side, return the header in stream. It will be the out
   391  		// header after t.WriteHeader is called.
   392  		return s.header.Copy(), nil
   393  	}
   394  	s.waitOnHeader()
   395  
   396  	if !s.headerValid || s.noHeaders {
   397  		return nil, s.status.Err()
   398  	}
   399  
   400  	return s.header.Copy(), nil
   401  }
   402  
   403  // TrailersOnly blocks until a header or trailers-only frame is received and
   404  // then returns true if the stream was trailers-only.  If the stream ends
   405  // before headers are received, returns true, nil.  Client-side only.
   406  func (s *Stream) TrailersOnly() bool {
   407  	s.waitOnHeader()
   408  	return s.noHeaders
   409  }
   410  
   411  // Trailer returns the cached trailer metedata. Note that if it is not called
   412  // after the entire stream is done, it could return an empty MD. Client
   413  // side only.
   414  // It can be safely read only after stream has ended that is either read
   415  // or write have returned io.EOF.
   416  func (s *Stream) Trailer() metadata.MD {
   417  	c := s.trailer.Copy()
   418  	return c
   419  }
   420  
   421  // ContentSubtype returns the content-subtype for a request. For example, a
   422  // content-subtype of "proto" will result in a content-type of
   423  // "application/grpc+proto". This will always be lowercase.  See
   424  // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
   425  // more details.
   426  func (s *Stream) ContentSubtype() string {
   427  	return s.contentSubtype
   428  }
   429  
   430  // Context returns the context of the stream.
   431  func (s *Stream) Context() context.Context {
   432  	return s.ctx
   433  }
   434  
   435  // SetContext sets the context of the stream. This will be deleted once the
   436  // stats handler callouts all move to gRPC layer.
   437  func (s *Stream) SetContext(ctx context.Context) {
   438  	s.ctx = ctx
   439  }
   440  
   441  // Method returns the method for the stream.
   442  func (s *Stream) Method() string {
   443  	return s.method
   444  }
   445  
   446  // Status returns the status received from the server.
   447  // Status can be read safely only after the stream has ended,
   448  // that is, after Done() is closed.
   449  func (s *Stream) Status() *status.Status {
   450  	return s.status
   451  }
   452  
   453  // HeaderWireLength returns the size of the headers of the stream as received
   454  // from the wire. Valid only on the server.
   455  func (s *Stream) HeaderWireLength() int {
   456  	return s.headerWireLength
   457  }
   458  
   459  // SetHeader sets the header metadata. This can be called multiple times.
   460  // Server side only.
   461  // This should not be called in parallel to other data writes.
   462  func (s *Stream) SetHeader(md metadata.MD) error {
   463  	if md.Len() == 0 {
   464  		return nil
   465  	}
   466  	if s.isHeaderSent() || s.getState() == streamDone {
   467  		return ErrIllegalHeaderWrite
   468  	}
   469  	s.hdrMu.Lock()
   470  	s.header = metadata.Join(s.header, md)
   471  	s.hdrMu.Unlock()
   472  	return nil
   473  }
   474  
   475  // SendHeader sends the given header metadata. The given metadata is
   476  // combined with any metadata set by previous calls to SetHeader and
   477  // then written to the transport stream.
   478  func (s *Stream) SendHeader(md metadata.MD) error {
   479  	return s.st.WriteHeader(s, md)
   480  }
   481  
   482  // SetTrailer sets the trailer metadata which will be sent with the RPC status
   483  // by the server. This can be called multiple times. Server side only.
   484  // This should not be called parallel to other data writes.
   485  func (s *Stream) SetTrailer(md metadata.MD) error {
   486  	if md.Len() == 0 {
   487  		return nil
   488  	}
   489  	if s.getState() == streamDone {
   490  		return ErrIllegalHeaderWrite
   491  	}
   492  	s.hdrMu.Lock()
   493  	s.trailer = metadata.Join(s.trailer, md)
   494  	s.hdrMu.Unlock()
   495  	return nil
   496  }
   497  
   498  func (s *Stream) write(m recvMsg) {
   499  	s.buf.put(m)
   500  }
   501  
   502  // Read reads all p bytes from the wire for this stream.
   503  func (s *Stream) Read(p []byte) (n int, err error) {
   504  	// Don't request a read if there was an error earlier
   505  	if er := s.trReader.(*transportReader).er; er != nil {
   506  		return 0, er
   507  	}
   508  	s.requestRead(len(p))
   509  	return io.ReadFull(s.trReader, p)
   510  }
   511  
   512  // tranportReader reads all the data available for this Stream from the transport and
   513  // passes them into the decoder, which converts them into a gRPC message stream.
   514  // The error is io.EOF when the stream is done or another non-nil error if
   515  // the stream broke.
   516  type transportReader struct {
   517  	reader io.Reader
   518  	// The handler to control the window update procedure for both this
   519  	// particular stream and the associated transport.
   520  	windowHandler func(int)
   521  	er            error
   522  }
   523  
   524  func (t *transportReader) Read(p []byte) (n int, err error) {
   525  	n, err = t.reader.Read(p)
   526  	if err != nil {
   527  		t.er = err
   528  		return
   529  	}
   530  	t.windowHandler(n)
   531  	return
   532  }
   533  
   534  // BytesReceived indicates whether any bytes have been received on this stream.
   535  func (s *Stream) BytesReceived() bool {
   536  	return atomic.LoadUint32(&s.bytesReceived) == 1
   537  }
   538  
   539  // Unprocessed indicates whether the server did not process this stream --
   540  // i.e. it sent a refused stream or GOAWAY including this stream ID.
   541  func (s *Stream) Unprocessed() bool {
   542  	return atomic.LoadUint32(&s.unprocessed) == 1
   543  }
   544  
   545  // GoString is implemented by Stream so context.String() won't
   546  // race when printing %#v.
   547  func (s *Stream) GoString() string {
   548  	return fmt.Sprintf("<stream: %p, %v>", s, s.method)
   549  }
   550  
   551  // state of transport
   552  type transportState int
   553  
   554  const (
   555  	reachable transportState = iota
   556  	closing
   557  	draining
   558  )
   559  
   560  // ServerConfig consists of all the configurations to establish a server transport.
   561  type ServerConfig struct {
   562  	MaxStreams            uint32
   563  	ConnectionTimeout     time.Duration
   564  	Credentials           credentials.TransportCredentials
   565  	InTapHandle           tap.ServerInHandle
   566  	StatsHandlers         []stats.Handler
   567  	KeepaliveParams       keepalive.ServerParameters
   568  	KeepalivePolicy       keepalive.EnforcementPolicy
   569  	InitialWindowSize     int32
   570  	InitialConnWindowSize int32
   571  	WriteBufferSize       int
   572  	ReadBufferSize        int
   573  	SharedWriteBuffer     bool
   574  	ChannelzParent        *channelz.Server
   575  	MaxHeaderListSize     *uint32
   576  	HeaderTableSize       *uint32
   577  }
   578  
   579  // ConnectOptions covers all relevant options for communicating with the server.
   580  type ConnectOptions struct {
   581  	// UserAgent is the application user agent.
   582  	UserAgent string
   583  	// Dialer specifies how to dial a network address.
   584  	Dialer func(context.Context, string) (net.Conn, error)
   585  	// FailOnNonTempDialError specifies if gRPC fails on non-temporary dial errors.
   586  	FailOnNonTempDialError bool
   587  	// PerRPCCredentials stores the PerRPCCredentials required to issue RPCs.
   588  	PerRPCCredentials []credentials.PerRPCCredentials
   589  	// TransportCredentials stores the Authenticator required to setup a client
   590  	// connection. Only one of TransportCredentials and CredsBundle is non-nil.
   591  	TransportCredentials credentials.TransportCredentials
   592  	// CredsBundle is the credentials bundle to be used. Only one of
   593  	// TransportCredentials and CredsBundle is non-nil.
   594  	CredsBundle credentials.Bundle
   595  	// KeepaliveParams stores the keepalive parameters.
   596  	KeepaliveParams keepalive.ClientParameters
   597  	// StatsHandlers stores the handler for stats.
   598  	StatsHandlers []stats.Handler
   599  	// InitialWindowSize sets the initial window size for a stream.
   600  	InitialWindowSize int32
   601  	// InitialConnWindowSize sets the initial window size for a connection.
   602  	InitialConnWindowSize int32
   603  	// WriteBufferSize sets the size of write buffer which in turn determines how much data can be batched before it's written on the wire.
   604  	WriteBufferSize int
   605  	// ReadBufferSize sets the size of read buffer, which in turn determines how much data can be read at most for one read syscall.
   606  	ReadBufferSize int
   607  	// SharedWriteBuffer indicates whether connections should reuse write buffer
   608  	SharedWriteBuffer bool
   609  	// ChannelzParent sets the addrConn id which initiated the creation of this client transport.
   610  	ChannelzParent *channelz.SubChannel
   611  	// MaxHeaderListSize sets the max (uncompressed) size of header list that is prepared to be received.
   612  	MaxHeaderListSize *uint32
   613  	// UseProxy specifies if a proxy should be used.
   614  	UseProxy bool
   615  }
   616  
   617  // NewClientTransport establishes the transport with the required ConnectOptions
   618  // and returns it to the caller.
   619  func NewClientTransport(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onClose func(GoAwayReason)) (ClientTransport, error) {
   620  	return newHTTP2Client(connectCtx, ctx, addr, opts, onClose)
   621  }
   622  
   623  // Options provides additional hints and information for message
   624  // transmission.
   625  type Options struct {
   626  	// Last indicates whether this write is the last piece for
   627  	// this stream.
   628  	Last bool
   629  }
   630  
   631  // CallHdr carries the information of a particular RPC.
   632  type CallHdr struct {
   633  	// Host specifies the peer's host.
   634  	Host string
   635  
   636  	// Method specifies the operation to perform.
   637  	Method string
   638  
   639  	// SendCompress specifies the compression algorithm applied on
   640  	// outbound message.
   641  	SendCompress string
   642  
   643  	// Creds specifies credentials.PerRPCCredentials for a call.
   644  	Creds credentials.PerRPCCredentials
   645  
   646  	// ContentSubtype specifies the content-subtype for a request. For example, a
   647  	// content-subtype of "proto" will result in a content-type of
   648  	// "application/grpc+proto". The value of ContentSubtype must be all
   649  	// lowercase, otherwise the behavior is undefined. See
   650  	// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
   651  	// for more details.
   652  	ContentSubtype string
   653  
   654  	PreviousAttempts int // value of grpc-previous-rpc-attempts header to set
   655  
   656  	DoneFunc func() // called when the stream is finished
   657  }
   658  
   659  // ClientTransport is the common interface for all gRPC client-side transport
   660  // implementations.
   661  type ClientTransport interface {
   662  	// Close tears down this transport. Once it returns, the transport
   663  	// should not be accessed any more. The caller must make sure this
   664  	// is called only once.
   665  	Close(err error)
   666  
   667  	// GracefulClose starts to tear down the transport: the transport will stop
   668  	// accepting new RPCs and NewStream will return error. Once all streams are
   669  	// finished, the transport will close.
   670  	//
   671  	// It does not block.
   672  	GracefulClose()
   673  
   674  	// Write sends the data for the given stream. A nil stream indicates
   675  	// the write is to be performed on the transport as a whole.
   676  	Write(s *Stream, hdr []byte, data []byte, opts *Options) error
   677  
   678  	// NewStream creates a Stream for an RPC.
   679  	NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error)
   680  
   681  	// CloseStream clears the footprint of a stream when the stream is
   682  	// not needed any more. The err indicates the error incurred when
   683  	// CloseStream is called. Must be called when a stream is finished
   684  	// unless the associated transport is closing.
   685  	CloseStream(stream *Stream, err error)
   686  
   687  	// Error returns a channel that is closed when some I/O error
   688  	// happens. Typically the caller should have a goroutine to monitor
   689  	// this in order to take action (e.g., close the current transport
   690  	// and create a new one) in error case. It should not return nil
   691  	// once the transport is initiated.
   692  	Error() <-chan struct{}
   693  
   694  	// GoAway returns a channel that is closed when ClientTransport
   695  	// receives the draining signal from the server (e.g., GOAWAY frame in
   696  	// HTTP/2).
   697  	GoAway() <-chan struct{}
   698  
   699  	// GetGoAwayReason returns the reason why GoAway frame was received, along
   700  	// with a human readable string with debug info.
   701  	GetGoAwayReason() (GoAwayReason, string)
   702  
   703  	// RemoteAddr returns the remote network address.
   704  	RemoteAddr() net.Addr
   705  
   706  	// IncrMsgSent increments the number of message sent through this transport.
   707  	IncrMsgSent()
   708  
   709  	// IncrMsgRecv increments the number of message received through this transport.
   710  	IncrMsgRecv()
   711  }
   712  
   713  // ServerTransport is the common interface for all gRPC server-side transport
   714  // implementations.
   715  //
   716  // Methods may be called concurrently from multiple goroutines, but
   717  // Write methods for a given Stream will be called serially.
   718  type ServerTransport interface {
   719  	// HandleStreams receives incoming streams using the given handler.
   720  	HandleStreams(context.Context, func(*Stream))
   721  
   722  	// WriteHeader sends the header metadata for the given stream.
   723  	// WriteHeader may not be called on all streams.
   724  	WriteHeader(s *Stream, md metadata.MD) error
   725  
   726  	// Write sends the data for the given stream.
   727  	// Write may not be called on all streams.
   728  	Write(s *Stream, hdr []byte, data []byte, opts *Options) error
   729  
   730  	// WriteStatus sends the status of a stream to the client.  WriteStatus is
   731  	// the final call made on a stream and always occurs.
   732  	WriteStatus(s *Stream, st *status.Status) error
   733  
   734  	// Close tears down the transport. Once it is called, the transport
   735  	// should not be accessed any more. All the pending streams and their
   736  	// handlers will be terminated asynchronously.
   737  	Close(err error)
   738  
   739  	// Peer returns the peer of the server transport.
   740  	Peer() *peer.Peer
   741  
   742  	// Drain notifies the client this ServerTransport stops accepting new RPCs.
   743  	Drain(debugData string)
   744  
   745  	// IncrMsgSent increments the number of message sent through this transport.
   746  	IncrMsgSent()
   747  
   748  	// IncrMsgRecv increments the number of message received through this transport.
   749  	IncrMsgRecv()
   750  }
   751  
   752  // connectionErrorf creates an ConnectionError with the specified error description.
   753  func connectionErrorf(temp bool, e error, format string, a ...any) ConnectionError {
   754  	return ConnectionError{
   755  		Desc: fmt.Sprintf(format, a...),
   756  		temp: temp,
   757  		err:  e,
   758  	}
   759  }
   760  
   761  // ConnectionError is an error that results in the termination of the
   762  // entire connection and the retry of all the active streams.
   763  type ConnectionError struct {
   764  	Desc string
   765  	temp bool
   766  	err  error
   767  }
   768  
   769  func (e ConnectionError) Error() string {
   770  	return fmt.Sprintf("connection error: desc = %q", e.Desc)
   771  }
   772  
   773  // Temporary indicates if this connection error is temporary or fatal.
   774  func (e ConnectionError) Temporary() bool {
   775  	return e.temp
   776  }
   777  
   778  // Origin returns the original error of this connection error.
   779  func (e ConnectionError) Origin() error {
   780  	// Never return nil error here.
   781  	// If the original error is nil, return itself.
   782  	if e.err == nil {
   783  		return e
   784  	}
   785  	return e.err
   786  }
   787  
   788  // Unwrap returns the original error of this connection error or nil when the
   789  // origin is nil.
   790  func (e ConnectionError) Unwrap() error {
   791  	return e.err
   792  }
   793  
   794  var (
   795  	// ErrConnClosing indicates that the transport is closing.
   796  	ErrConnClosing = connectionErrorf(true, nil, "transport is closing")
   797  	// errStreamDrain indicates that the stream is rejected because the
   798  	// connection is draining. This could be caused by goaway or balancer
   799  	// removing the address.
   800  	errStreamDrain = status.Error(codes.Unavailable, "the connection is draining")
   801  	// errStreamDone is returned from write at the client side to indiacte application
   802  	// layer of an error.
   803  	errStreamDone = errors.New("the stream is done")
   804  	// StatusGoAway indicates that the server sent a GOAWAY that included this
   805  	// stream's ID in unprocessed RPCs.
   806  	statusGoAway = status.New(codes.Unavailable, "the stream is rejected because server is draining the connection")
   807  )
   808  
   809  // GoAwayReason contains the reason for the GoAway frame received.
   810  type GoAwayReason uint8
   811  
   812  const (
   813  	// GoAwayInvalid indicates that no GoAway frame is received.
   814  	GoAwayInvalid GoAwayReason = 0
   815  	// GoAwayNoReason is the default value when GoAway frame is received.
   816  	GoAwayNoReason GoAwayReason = 1
   817  	// GoAwayTooManyPings indicates that a GoAway frame with
   818  	// ErrCodeEnhanceYourCalm was received and that the debug data said
   819  	// "too_many_pings".
   820  	GoAwayTooManyPings GoAwayReason = 2
   821  )
   822  
   823  // ContextErr converts the error from context package into a status error.
   824  func ContextErr(err error) error {
   825  	switch err {
   826  	case context.DeadlineExceeded:
   827  		return status.Error(codes.DeadlineExceeded, err.Error())
   828  	case context.Canceled:
   829  		return status.Error(codes.Canceled, err.Error())
   830  	}
   831  	return status.Errorf(codes.Internal, "Unexpected error from context packet: %v", err)
   832  }
   833  

View as plain text