1 /* 2 * 3 * Copyright 2014 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19 // Package transport defines and implements message oriented communication 20 // channel to complete various transactions (e.g., an RPC). It is meant for 21 // grpc-internal usage and is not intended to be imported directly by users. 22 package transport 23 24 import ( 25 "bytes" 26 "context" 27 "errors" 28 "fmt" 29 "io" 30 "net" 31 "strings" 32 "sync" 33 "sync/atomic" 34 "time" 35 36 "google.golang.org/grpc/codes" 37 "google.golang.org/grpc/credentials" 38 "google.golang.org/grpc/internal/channelz" 39 "google.golang.org/grpc/keepalive" 40 "google.golang.org/grpc/metadata" 41 "google.golang.org/grpc/peer" 42 "google.golang.org/grpc/resolver" 43 "google.golang.org/grpc/stats" 44 "google.golang.org/grpc/status" 45 "google.golang.org/grpc/tap" 46 ) 47 48 const logLevel = 2 49 50 type bufferPool struct { 51 pool sync.Pool 52 } 53 54 func newBufferPool() *bufferPool { 55 return &bufferPool{ 56 pool: sync.Pool{ 57 New: func() any { 58 return new(bytes.Buffer) 59 }, 60 }, 61 } 62 } 63 64 func (p *bufferPool) get() *bytes.Buffer { 65 return p.pool.Get().(*bytes.Buffer) 66 } 67 68 func (p *bufferPool) put(b *bytes.Buffer) { 69 p.pool.Put(b) 70 } 71 72 // recvMsg represents the received msg from the transport. All transport 73 // protocol specific info has been removed. 74 type recvMsg struct { 75 buffer *bytes.Buffer 76 // nil: received some data 77 // io.EOF: stream is completed. data is nil. 78 // other non-nil error: transport failure. data is nil. 79 err error 80 } 81 82 // recvBuffer is an unbounded channel of recvMsg structs. 83 // 84 // Note: recvBuffer differs from buffer.Unbounded only in the fact that it 85 // holds a channel of recvMsg structs instead of objects implementing "item" 86 // interface. recvBuffer is written to much more often and using strict recvMsg 87 // structs helps avoid allocation in "recvBuffer.put" 88 type recvBuffer struct { 89 c chan recvMsg 90 mu sync.Mutex 91 backlog []recvMsg 92 err error 93 } 94 95 func newRecvBuffer() *recvBuffer { 96 b := &recvBuffer{ 97 c: make(chan recvMsg, 1), 98 } 99 return b 100 } 101 102 func (b *recvBuffer) put(r recvMsg) { 103 b.mu.Lock() 104 if b.err != nil { 105 b.mu.Unlock() 106 // An error had occurred earlier, don't accept more 107 // data or errors. 108 return 109 } 110 b.err = r.err 111 if len(b.backlog) == 0 { 112 select { 113 case b.c <- r: 114 b.mu.Unlock() 115 return 116 default: 117 } 118 } 119 b.backlog = append(b.backlog, r) 120 b.mu.Unlock() 121 } 122 123 func (b *recvBuffer) load() { 124 b.mu.Lock() 125 if len(b.backlog) > 0 { 126 select { 127 case b.c <- b.backlog[0]: 128 b.backlog[0] = recvMsg{} 129 b.backlog = b.backlog[1:] 130 default: 131 } 132 } 133 b.mu.Unlock() 134 } 135 136 // get returns the channel that receives a recvMsg in the buffer. 137 // 138 // Upon receipt of a recvMsg, the caller should call load to send another 139 // recvMsg onto the channel if there is any. 140 func (b *recvBuffer) get() <-chan recvMsg { 141 return b.c 142 } 143 144 // recvBufferReader implements io.Reader interface to read the data from 145 // recvBuffer. 146 type recvBufferReader struct { 147 closeStream func(error) // Closes the client transport stream with the given error and nil trailer metadata. 148 ctx context.Context 149 ctxDone <-chan struct{} // cache of ctx.Done() (for performance). 150 recv *recvBuffer 151 last *bytes.Buffer // Stores the remaining data in the previous calls. 152 err error 153 freeBuffer func(*bytes.Buffer) 154 } 155 156 // Read reads the next len(p) bytes from last. If last is drained, it tries to 157 // read additional data from recv. It blocks if there no additional data available 158 // in recv. If Read returns any non-nil error, it will continue to return that error. 159 func (r *recvBufferReader) Read(p []byte) (n int, err error) { 160 if r.err != nil { 161 return 0, r.err 162 } 163 if r.last != nil { 164 // Read remaining data left in last call. 165 copied, _ := r.last.Read(p) 166 if r.last.Len() == 0 { 167 r.freeBuffer(r.last) 168 r.last = nil 169 } 170 return copied, nil 171 } 172 if r.closeStream != nil { 173 n, r.err = r.readClient(p) 174 } else { 175 n, r.err = r.read(p) 176 } 177 return n, r.err 178 } 179 180 func (r *recvBufferReader) read(p []byte) (n int, err error) { 181 select { 182 case <-r.ctxDone: 183 return 0, ContextErr(r.ctx.Err()) 184 case m := <-r.recv.get(): 185 return r.readAdditional(m, p) 186 } 187 } 188 189 func (r *recvBufferReader) readClient(p []byte) (n int, err error) { 190 // If the context is canceled, then closes the stream with nil metadata. 191 // closeStream writes its error parameter to r.recv as a recvMsg. 192 // r.readAdditional acts on that message and returns the necessary error. 193 select { 194 case <-r.ctxDone: 195 // Note that this adds the ctx error to the end of recv buffer, and 196 // reads from the head. This will delay the error until recv buffer is 197 // empty, thus will delay ctx cancellation in Recv(). 198 // 199 // It's done this way to fix a race between ctx cancel and trailer. The 200 // race was, stream.Recv() may return ctx error if ctxDone wins the 201 // race, but stream.Trailer() may return a non-nil md because the stream 202 // was not marked as done when trailer is received. This closeStream 203 // call will mark stream as done, thus fix the race. 204 // 205 // TODO: delaying ctx error seems like a unnecessary side effect. What 206 // we really want is to mark the stream as done, and return ctx error 207 // faster. 208 r.closeStream(ContextErr(r.ctx.Err())) 209 m := <-r.recv.get() 210 return r.readAdditional(m, p) 211 case m := <-r.recv.get(): 212 return r.readAdditional(m, p) 213 } 214 } 215 216 func (r *recvBufferReader) readAdditional(m recvMsg, p []byte) (n int, err error) { 217 r.recv.load() 218 if m.err != nil { 219 return 0, m.err 220 } 221 copied, _ := m.buffer.Read(p) 222 if m.buffer.Len() == 0 { 223 r.freeBuffer(m.buffer) 224 r.last = nil 225 } else { 226 r.last = m.buffer 227 } 228 return copied, nil 229 } 230 231 type streamState uint32 232 233 const ( 234 streamActive streamState = iota 235 streamWriteDone // EndStream sent 236 streamReadDone // EndStream received 237 streamDone // the entire stream is finished. 238 ) 239 240 // Stream represents an RPC in the transport layer. 241 type Stream struct { 242 id uint32 243 st ServerTransport // nil for client side Stream 244 ct *http2Client // nil for server side Stream 245 ctx context.Context // the associated context of the stream 246 cancel context.CancelFunc // always nil for client side Stream 247 done chan struct{} // closed at the end of stream to unblock writers. On the client side. 248 doneFunc func() // invoked at the end of stream on client side. 249 ctxDone <-chan struct{} // same as done chan but for server side. Cache of ctx.Done() (for performance) 250 method string // the associated RPC method of the stream 251 recvCompress string 252 sendCompress string 253 buf *recvBuffer 254 trReader io.Reader 255 fc *inFlow 256 wq *writeQuota 257 258 // Holds compressor names passed in grpc-accept-encoding metadata from the 259 // client. This is empty for the client side stream. 260 clientAdvertisedCompressors string 261 // Callback to state application's intentions to read data. This 262 // is used to adjust flow control, if needed. 263 requestRead func(int) 264 265 headerChan chan struct{} // closed to indicate the end of header metadata. 266 headerChanClosed uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times. 267 // headerValid indicates whether a valid header was received. Only 268 // meaningful after headerChan is closed (always call waitOnHeader() before 269 // reading its value). Not valid on server side. 270 headerValid bool 271 headerWireLength int // Only set on server side. 272 273 // hdrMu protects header and trailer metadata on the server-side. 274 hdrMu sync.Mutex 275 // On client side, header keeps the received header metadata. 276 // 277 // On server side, header keeps the header set by SetHeader(). The complete 278 // header will merged into this after t.WriteHeader() is called. 279 header metadata.MD 280 trailer metadata.MD // the key-value map of trailer metadata. 281 282 noHeaders bool // set if the client never received headers (set only after the stream is done). 283 284 // On the server-side, headerSent is atomically set to 1 when the headers are sent out. 285 headerSent uint32 286 287 state streamState 288 289 // On client-side it is the status error received from the server. 290 // On server-side it is unused. 291 status *status.Status 292 293 bytesReceived uint32 // indicates whether any bytes have been received on this stream 294 unprocessed uint32 // set if the server sends a refused stream or GOAWAY including this stream 295 296 // contentSubtype is the content-subtype for requests. 297 // this must be lowercase or the behavior is undefined. 298 contentSubtype string 299 } 300 301 // isHeaderSent is only valid on the server-side. 302 func (s *Stream) isHeaderSent() bool { 303 return atomic.LoadUint32(&s.headerSent) == 1 304 } 305 306 // updateHeaderSent updates headerSent and returns true 307 // if it was already set. It is valid only on server-side. 308 func (s *Stream) updateHeaderSent() bool { 309 return atomic.SwapUint32(&s.headerSent, 1) == 1 310 } 311 312 func (s *Stream) swapState(st streamState) streamState { 313 return streamState(atomic.SwapUint32((*uint32)(&s.state), uint32(st))) 314 } 315 316 func (s *Stream) compareAndSwapState(oldState, newState streamState) bool { 317 return atomic.CompareAndSwapUint32((*uint32)(&s.state), uint32(oldState), uint32(newState)) 318 } 319 320 func (s *Stream) getState() streamState { 321 return streamState(atomic.LoadUint32((*uint32)(&s.state))) 322 } 323 324 func (s *Stream) waitOnHeader() { 325 if s.headerChan == nil { 326 // On the server headerChan is always nil since a stream originates 327 // only after having received headers. 328 return 329 } 330 select { 331 case <-s.ctx.Done(): 332 // Close the stream to prevent headers/trailers from changing after 333 // this function returns. 334 s.ct.CloseStream(s, ContextErr(s.ctx.Err())) 335 // headerChan could possibly not be closed yet if closeStream raced 336 // with operateHeaders; wait until it is closed explicitly here. 337 <-s.headerChan 338 case <-s.headerChan: 339 } 340 } 341 342 // RecvCompress returns the compression algorithm applied to the inbound 343 // message. It is empty string if there is no compression applied. 344 func (s *Stream) RecvCompress() string { 345 s.waitOnHeader() 346 return s.recvCompress 347 } 348 349 // SetSendCompress sets the compression algorithm to the stream. 350 func (s *Stream) SetSendCompress(name string) error { 351 if s.isHeaderSent() || s.getState() == streamDone { 352 return errors.New("transport: set send compressor called after headers sent or stream done") 353 } 354 355 s.sendCompress = name 356 return nil 357 } 358 359 // SendCompress returns the send compressor name. 360 func (s *Stream) SendCompress() string { 361 return s.sendCompress 362 } 363 364 // ClientAdvertisedCompressors returns the compressor names advertised by the 365 // client via grpc-accept-encoding header. 366 func (s *Stream) ClientAdvertisedCompressors() []string { 367 values := strings.Split(s.clientAdvertisedCompressors, ",") 368 for i, v := range values { 369 values[i] = strings.TrimSpace(v) 370 } 371 return values 372 } 373 374 // Done returns a channel which is closed when it receives the final status 375 // from the server. 376 func (s *Stream) Done() <-chan struct{} { 377 return s.done 378 } 379 380 // Header returns the header metadata of the stream. 381 // 382 // On client side, it acquires the key-value pairs of header metadata once it is 383 // available. It blocks until i) the metadata is ready or ii) there is no header 384 // metadata or iii) the stream is canceled/expired. 385 // 386 // On server side, it returns the out header after t.WriteHeader is called. It 387 // does not block and must not be called until after WriteHeader. 388 func (s *Stream) Header() (metadata.MD, error) { 389 if s.headerChan == nil { 390 // On server side, return the header in stream. It will be the out 391 // header after t.WriteHeader is called. 392 return s.header.Copy(), nil 393 } 394 s.waitOnHeader() 395 396 if !s.headerValid || s.noHeaders { 397 return nil, s.status.Err() 398 } 399 400 return s.header.Copy(), nil 401 } 402 403 // TrailersOnly blocks until a header or trailers-only frame is received and 404 // then returns true if the stream was trailers-only. If the stream ends 405 // before headers are received, returns true, nil. Client-side only. 406 func (s *Stream) TrailersOnly() bool { 407 s.waitOnHeader() 408 return s.noHeaders 409 } 410 411 // Trailer returns the cached trailer metedata. Note that if it is not called 412 // after the entire stream is done, it could return an empty MD. Client 413 // side only. 414 // It can be safely read only after stream has ended that is either read 415 // or write have returned io.EOF. 416 func (s *Stream) Trailer() metadata.MD { 417 c := s.trailer.Copy() 418 return c 419 } 420 421 // ContentSubtype returns the content-subtype for a request. For example, a 422 // content-subtype of "proto" will result in a content-type of 423 // "application/grpc+proto". This will always be lowercase. See 424 // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for 425 // more details. 426 func (s *Stream) ContentSubtype() string { 427 return s.contentSubtype 428 } 429 430 // Context returns the context of the stream. 431 func (s *Stream) Context() context.Context { 432 return s.ctx 433 } 434 435 // SetContext sets the context of the stream. This will be deleted once the 436 // stats handler callouts all move to gRPC layer. 437 func (s *Stream) SetContext(ctx context.Context) { 438 s.ctx = ctx 439 } 440 441 // Method returns the method for the stream. 442 func (s *Stream) Method() string { 443 return s.method 444 } 445 446 // Status returns the status received from the server. 447 // Status can be read safely only after the stream has ended, 448 // that is, after Done() is closed. 449 func (s *Stream) Status() *status.Status { 450 return s.status 451 } 452 453 // HeaderWireLength returns the size of the headers of the stream as received 454 // from the wire. Valid only on the server. 455 func (s *Stream) HeaderWireLength() int { 456 return s.headerWireLength 457 } 458 459 // SetHeader sets the header metadata. This can be called multiple times. 460 // Server side only. 461 // This should not be called in parallel to other data writes. 462 func (s *Stream) SetHeader(md metadata.MD) error { 463 if md.Len() == 0 { 464 return nil 465 } 466 if s.isHeaderSent() || s.getState() == streamDone { 467 return ErrIllegalHeaderWrite 468 } 469 s.hdrMu.Lock() 470 s.header = metadata.Join(s.header, md) 471 s.hdrMu.Unlock() 472 return nil 473 } 474 475 // SendHeader sends the given header metadata. The given metadata is 476 // combined with any metadata set by previous calls to SetHeader and 477 // then written to the transport stream. 478 func (s *Stream) SendHeader(md metadata.MD) error { 479 return s.st.WriteHeader(s, md) 480 } 481 482 // SetTrailer sets the trailer metadata which will be sent with the RPC status 483 // by the server. This can be called multiple times. Server side only. 484 // This should not be called parallel to other data writes. 485 func (s *Stream) SetTrailer(md metadata.MD) error { 486 if md.Len() == 0 { 487 return nil 488 } 489 if s.getState() == streamDone { 490 return ErrIllegalHeaderWrite 491 } 492 s.hdrMu.Lock() 493 s.trailer = metadata.Join(s.trailer, md) 494 s.hdrMu.Unlock() 495 return nil 496 } 497 498 func (s *Stream) write(m recvMsg) { 499 s.buf.put(m) 500 } 501 502 // Read reads all p bytes from the wire for this stream. 503 func (s *Stream) Read(p []byte) (n int, err error) { 504 // Don't request a read if there was an error earlier 505 if er := s.trReader.(*transportReader).er; er != nil { 506 return 0, er 507 } 508 s.requestRead(len(p)) 509 return io.ReadFull(s.trReader, p) 510 } 511 512 // tranportReader reads all the data available for this Stream from the transport and 513 // passes them into the decoder, which converts them into a gRPC message stream. 514 // The error is io.EOF when the stream is done or another non-nil error if 515 // the stream broke. 516 type transportReader struct { 517 reader io.Reader 518 // The handler to control the window update procedure for both this 519 // particular stream and the associated transport. 520 windowHandler func(int) 521 er error 522 } 523 524 func (t *transportReader) Read(p []byte) (n int, err error) { 525 n, err = t.reader.Read(p) 526 if err != nil { 527 t.er = err 528 return 529 } 530 t.windowHandler(n) 531 return 532 } 533 534 // BytesReceived indicates whether any bytes have been received on this stream. 535 func (s *Stream) BytesReceived() bool { 536 return atomic.LoadUint32(&s.bytesReceived) == 1 537 } 538 539 // Unprocessed indicates whether the server did not process this stream -- 540 // i.e. it sent a refused stream or GOAWAY including this stream ID. 541 func (s *Stream) Unprocessed() bool { 542 return atomic.LoadUint32(&s.unprocessed) == 1 543 } 544 545 // GoString is implemented by Stream so context.String() won't 546 // race when printing %#v. 547 func (s *Stream) GoString() string { 548 return fmt.Sprintf("<stream: %p, %v>", s, s.method) 549 } 550 551 // state of transport 552 type transportState int 553 554 const ( 555 reachable transportState = iota 556 closing 557 draining 558 ) 559 560 // ServerConfig consists of all the configurations to establish a server transport. 561 type ServerConfig struct { 562 MaxStreams uint32 563 ConnectionTimeout time.Duration 564 Credentials credentials.TransportCredentials 565 InTapHandle tap.ServerInHandle 566 StatsHandlers []stats.Handler 567 KeepaliveParams keepalive.ServerParameters 568 KeepalivePolicy keepalive.EnforcementPolicy 569 InitialWindowSize int32 570 InitialConnWindowSize int32 571 WriteBufferSize int 572 ReadBufferSize int 573 SharedWriteBuffer bool 574 ChannelzParent *channelz.Server 575 MaxHeaderListSize *uint32 576 HeaderTableSize *uint32 577 } 578 579 // ConnectOptions covers all relevant options for communicating with the server. 580 type ConnectOptions struct { 581 // UserAgent is the application user agent. 582 UserAgent string 583 // Dialer specifies how to dial a network address. 584 Dialer func(context.Context, string) (net.Conn, error) 585 // FailOnNonTempDialError specifies if gRPC fails on non-temporary dial errors. 586 FailOnNonTempDialError bool 587 // PerRPCCredentials stores the PerRPCCredentials required to issue RPCs. 588 PerRPCCredentials []credentials.PerRPCCredentials 589 // TransportCredentials stores the Authenticator required to setup a client 590 // connection. Only one of TransportCredentials and CredsBundle is non-nil. 591 TransportCredentials credentials.TransportCredentials 592 // CredsBundle is the credentials bundle to be used. Only one of 593 // TransportCredentials and CredsBundle is non-nil. 594 CredsBundle credentials.Bundle 595 // KeepaliveParams stores the keepalive parameters. 596 KeepaliveParams keepalive.ClientParameters 597 // StatsHandlers stores the handler for stats. 598 StatsHandlers []stats.Handler 599 // InitialWindowSize sets the initial window size for a stream. 600 InitialWindowSize int32 601 // InitialConnWindowSize sets the initial window size for a connection. 602 InitialConnWindowSize int32 603 // WriteBufferSize sets the size of write buffer which in turn determines how much data can be batched before it's written on the wire. 604 WriteBufferSize int 605 // ReadBufferSize sets the size of read buffer, which in turn determines how much data can be read at most for one read syscall. 606 ReadBufferSize int 607 // SharedWriteBuffer indicates whether connections should reuse write buffer 608 SharedWriteBuffer bool 609 // ChannelzParent sets the addrConn id which initiated the creation of this client transport. 610 ChannelzParent *channelz.SubChannel 611 // MaxHeaderListSize sets the max (uncompressed) size of header list that is prepared to be received. 612 MaxHeaderListSize *uint32 613 // UseProxy specifies if a proxy should be used. 614 UseProxy bool 615 } 616 617 // NewClientTransport establishes the transport with the required ConnectOptions 618 // and returns it to the caller. 619 func NewClientTransport(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onClose func(GoAwayReason)) (ClientTransport, error) { 620 return newHTTP2Client(connectCtx, ctx, addr, opts, onClose) 621 } 622 623 // Options provides additional hints and information for message 624 // transmission. 625 type Options struct { 626 // Last indicates whether this write is the last piece for 627 // this stream. 628 Last bool 629 } 630 631 // CallHdr carries the information of a particular RPC. 632 type CallHdr struct { 633 // Host specifies the peer's host. 634 Host string 635 636 // Method specifies the operation to perform. 637 Method string 638 639 // SendCompress specifies the compression algorithm applied on 640 // outbound message. 641 SendCompress string 642 643 // Creds specifies credentials.PerRPCCredentials for a call. 644 Creds credentials.PerRPCCredentials 645 646 // ContentSubtype specifies the content-subtype for a request. For example, a 647 // content-subtype of "proto" will result in a content-type of 648 // "application/grpc+proto". The value of ContentSubtype must be all 649 // lowercase, otherwise the behavior is undefined. See 650 // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests 651 // for more details. 652 ContentSubtype string 653 654 PreviousAttempts int // value of grpc-previous-rpc-attempts header to set 655 656 DoneFunc func() // called when the stream is finished 657 } 658 659 // ClientTransport is the common interface for all gRPC client-side transport 660 // implementations. 661 type ClientTransport interface { 662 // Close tears down this transport. Once it returns, the transport 663 // should not be accessed any more. The caller must make sure this 664 // is called only once. 665 Close(err error) 666 667 // GracefulClose starts to tear down the transport: the transport will stop 668 // accepting new RPCs and NewStream will return error. Once all streams are 669 // finished, the transport will close. 670 // 671 // It does not block. 672 GracefulClose() 673 674 // Write sends the data for the given stream. A nil stream indicates 675 // the write is to be performed on the transport as a whole. 676 Write(s *Stream, hdr []byte, data []byte, opts *Options) error 677 678 // NewStream creates a Stream for an RPC. 679 NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error) 680 681 // CloseStream clears the footprint of a stream when the stream is 682 // not needed any more. The err indicates the error incurred when 683 // CloseStream is called. Must be called when a stream is finished 684 // unless the associated transport is closing. 685 CloseStream(stream *Stream, err error) 686 687 // Error returns a channel that is closed when some I/O error 688 // happens. Typically the caller should have a goroutine to monitor 689 // this in order to take action (e.g., close the current transport 690 // and create a new one) in error case. It should not return nil 691 // once the transport is initiated. 692 Error() <-chan struct{} 693 694 // GoAway returns a channel that is closed when ClientTransport 695 // receives the draining signal from the server (e.g., GOAWAY frame in 696 // HTTP/2). 697 GoAway() <-chan struct{} 698 699 // GetGoAwayReason returns the reason why GoAway frame was received, along 700 // with a human readable string with debug info. 701 GetGoAwayReason() (GoAwayReason, string) 702 703 // RemoteAddr returns the remote network address. 704 RemoteAddr() net.Addr 705 706 // IncrMsgSent increments the number of message sent through this transport. 707 IncrMsgSent() 708 709 // IncrMsgRecv increments the number of message received through this transport. 710 IncrMsgRecv() 711 } 712 713 // ServerTransport is the common interface for all gRPC server-side transport 714 // implementations. 715 // 716 // Methods may be called concurrently from multiple goroutines, but 717 // Write methods for a given Stream will be called serially. 718 type ServerTransport interface { 719 // HandleStreams receives incoming streams using the given handler. 720 HandleStreams(context.Context, func(*Stream)) 721 722 // WriteHeader sends the header metadata for the given stream. 723 // WriteHeader may not be called on all streams. 724 WriteHeader(s *Stream, md metadata.MD) error 725 726 // Write sends the data for the given stream. 727 // Write may not be called on all streams. 728 Write(s *Stream, hdr []byte, data []byte, opts *Options) error 729 730 // WriteStatus sends the status of a stream to the client. WriteStatus is 731 // the final call made on a stream and always occurs. 732 WriteStatus(s *Stream, st *status.Status) error 733 734 // Close tears down the transport. Once it is called, the transport 735 // should not be accessed any more. All the pending streams and their 736 // handlers will be terminated asynchronously. 737 Close(err error) 738 739 // Peer returns the peer of the server transport. 740 Peer() *peer.Peer 741 742 // Drain notifies the client this ServerTransport stops accepting new RPCs. 743 Drain(debugData string) 744 745 // IncrMsgSent increments the number of message sent through this transport. 746 IncrMsgSent() 747 748 // IncrMsgRecv increments the number of message received through this transport. 749 IncrMsgRecv() 750 } 751 752 // connectionErrorf creates an ConnectionError with the specified error description. 753 func connectionErrorf(temp bool, e error, format string, a ...any) ConnectionError { 754 return ConnectionError{ 755 Desc: fmt.Sprintf(format, a...), 756 temp: temp, 757 err: e, 758 } 759 } 760 761 // ConnectionError is an error that results in the termination of the 762 // entire connection and the retry of all the active streams. 763 type ConnectionError struct { 764 Desc string 765 temp bool 766 err error 767 } 768 769 func (e ConnectionError) Error() string { 770 return fmt.Sprintf("connection error: desc = %q", e.Desc) 771 } 772 773 // Temporary indicates if this connection error is temporary or fatal. 774 func (e ConnectionError) Temporary() bool { 775 return e.temp 776 } 777 778 // Origin returns the original error of this connection error. 779 func (e ConnectionError) Origin() error { 780 // Never return nil error here. 781 // If the original error is nil, return itself. 782 if e.err == nil { 783 return e 784 } 785 return e.err 786 } 787 788 // Unwrap returns the original error of this connection error or nil when the 789 // origin is nil. 790 func (e ConnectionError) Unwrap() error { 791 return e.err 792 } 793 794 var ( 795 // ErrConnClosing indicates that the transport is closing. 796 ErrConnClosing = connectionErrorf(true, nil, "transport is closing") 797 // errStreamDrain indicates that the stream is rejected because the 798 // connection is draining. This could be caused by goaway or balancer 799 // removing the address. 800 errStreamDrain = status.Error(codes.Unavailable, "the connection is draining") 801 // errStreamDone is returned from write at the client side to indiacte application 802 // layer of an error. 803 errStreamDone = errors.New("the stream is done") 804 // StatusGoAway indicates that the server sent a GOAWAY that included this 805 // stream's ID in unprocessed RPCs. 806 statusGoAway = status.New(codes.Unavailable, "the stream is rejected because server is draining the connection") 807 ) 808 809 // GoAwayReason contains the reason for the GoAway frame received. 810 type GoAwayReason uint8 811 812 const ( 813 // GoAwayInvalid indicates that no GoAway frame is received. 814 GoAwayInvalid GoAwayReason = 0 815 // GoAwayNoReason is the default value when GoAway frame is received. 816 GoAwayNoReason GoAwayReason = 1 817 // GoAwayTooManyPings indicates that a GoAway frame with 818 // ErrCodeEnhanceYourCalm was received and that the debug data said 819 // "too_many_pings". 820 GoAwayTooManyPings GoAwayReason = 2 821 ) 822 823 // ContextErr converts the error from context package into a status error. 824 func ContextErr(err error) error { 825 switch err { 826 case context.DeadlineExceeded: 827 return status.Error(codes.DeadlineExceeded, err.Error()) 828 case context.Canceled: 829 return status.Error(codes.Canceled, err.Error()) 830 } 831 return status.Errorf(codes.Internal, "Unexpected error from context packet: %v", err) 832 } 833