...

Source file src/google.golang.org/grpc/internal/transport/controlbuf.go

Documentation: google.golang.org/grpc/internal/transport

     1  /*
     2   *
     3   * Copyright 2014 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  package transport
    20  
    21  import (
    22  	"bytes"
    23  	"errors"
    24  	"fmt"
    25  	"net"
    26  	"runtime"
    27  	"strconv"
    28  	"sync"
    29  	"sync/atomic"
    30  
    31  	"golang.org/x/net/http2"
    32  	"golang.org/x/net/http2/hpack"
    33  	"google.golang.org/grpc/internal/grpclog"
    34  	"google.golang.org/grpc/internal/grpcutil"
    35  	"google.golang.org/grpc/status"
    36  )
    37  
    38  var updateHeaderTblSize = func(e *hpack.Encoder, v uint32) {
    39  	e.SetMaxDynamicTableSizeLimit(v)
    40  }
    41  
    42  type itemNode struct {
    43  	it   any
    44  	next *itemNode
    45  }
    46  
    47  type itemList struct {
    48  	head *itemNode
    49  	tail *itemNode
    50  }
    51  
    52  func (il *itemList) enqueue(i any) {
    53  	n := &itemNode{it: i}
    54  	if il.tail == nil {
    55  		il.head, il.tail = n, n
    56  		return
    57  	}
    58  	il.tail.next = n
    59  	il.tail = n
    60  }
    61  
    62  // peek returns the first item in the list without removing it from the
    63  // list.
    64  func (il *itemList) peek() any {
    65  	return il.head.it
    66  }
    67  
    68  func (il *itemList) dequeue() any {
    69  	if il.head == nil {
    70  		return nil
    71  	}
    72  	i := il.head.it
    73  	il.head = il.head.next
    74  	if il.head == nil {
    75  		il.tail = nil
    76  	}
    77  	return i
    78  }
    79  
    80  func (il *itemList) dequeueAll() *itemNode {
    81  	h := il.head
    82  	il.head, il.tail = nil, nil
    83  	return h
    84  }
    85  
    86  func (il *itemList) isEmpty() bool {
    87  	return il.head == nil
    88  }
    89  
    90  // The following defines various control items which could flow through
    91  // the control buffer of transport. They represent different aspects of
    92  // control tasks, e.g., flow control, settings, streaming resetting, etc.
    93  
    94  // maxQueuedTransportResponseFrames is the most queued "transport response"
    95  // frames we will buffer before preventing new reads from occurring on the
    96  // transport.  These are control frames sent in response to client requests,
    97  // such as RST_STREAM due to bad headers or settings acks.
    98  const maxQueuedTransportResponseFrames = 50
    99  
   100  type cbItem interface {
   101  	isTransportResponseFrame() bool
   102  }
   103  
   104  // registerStream is used to register an incoming stream with loopy writer.
   105  type registerStream struct {
   106  	streamID uint32
   107  	wq       *writeQuota
   108  }
   109  
   110  func (*registerStream) isTransportResponseFrame() bool { return false }
   111  
   112  // headerFrame is also used to register stream on the client-side.
   113  type headerFrame struct {
   114  	streamID   uint32
   115  	hf         []hpack.HeaderField
   116  	endStream  bool               // Valid on server side.
   117  	initStream func(uint32) error // Used only on the client side.
   118  	onWrite    func()
   119  	wq         *writeQuota    // write quota for the stream created.
   120  	cleanup    *cleanupStream // Valid on the server side.
   121  	onOrphaned func(error)    // Valid on client-side
   122  }
   123  
   124  func (h *headerFrame) isTransportResponseFrame() bool {
   125  	return h.cleanup != nil && h.cleanup.rst // Results in a RST_STREAM
   126  }
   127  
   128  type cleanupStream struct {
   129  	streamID uint32
   130  	rst      bool
   131  	rstCode  http2.ErrCode
   132  	onWrite  func()
   133  }
   134  
   135  func (c *cleanupStream) isTransportResponseFrame() bool { return c.rst } // Results in a RST_STREAM
   136  
   137  type earlyAbortStream struct {
   138  	httpStatus     uint32
   139  	streamID       uint32
   140  	contentSubtype string
   141  	status         *status.Status
   142  	rst            bool
   143  }
   144  
   145  func (*earlyAbortStream) isTransportResponseFrame() bool { return false }
   146  
   147  type dataFrame struct {
   148  	streamID  uint32
   149  	endStream bool
   150  	h         []byte
   151  	d         []byte
   152  	// onEachWrite is called every time
   153  	// a part of d is written out.
   154  	onEachWrite func()
   155  }
   156  
   157  func (*dataFrame) isTransportResponseFrame() bool { return false }
   158  
   159  type incomingWindowUpdate struct {
   160  	streamID  uint32
   161  	increment uint32
   162  }
   163  
   164  func (*incomingWindowUpdate) isTransportResponseFrame() bool { return false }
   165  
   166  type outgoingWindowUpdate struct {
   167  	streamID  uint32
   168  	increment uint32
   169  }
   170  
   171  func (*outgoingWindowUpdate) isTransportResponseFrame() bool {
   172  	return false // window updates are throttled by thresholds
   173  }
   174  
   175  type incomingSettings struct {
   176  	ss []http2.Setting
   177  }
   178  
   179  func (*incomingSettings) isTransportResponseFrame() bool { return true } // Results in a settings ACK
   180  
   181  type outgoingSettings struct {
   182  	ss []http2.Setting
   183  }
   184  
   185  func (*outgoingSettings) isTransportResponseFrame() bool { return false }
   186  
   187  type incomingGoAway struct {
   188  }
   189  
   190  func (*incomingGoAway) isTransportResponseFrame() bool { return false }
   191  
   192  type goAway struct {
   193  	code      http2.ErrCode
   194  	debugData []byte
   195  	headsUp   bool
   196  	closeConn error // if set, loopyWriter will exit with this error
   197  }
   198  
   199  func (*goAway) isTransportResponseFrame() bool { return false }
   200  
   201  type ping struct {
   202  	ack  bool
   203  	data [8]byte
   204  }
   205  
   206  func (*ping) isTransportResponseFrame() bool { return true }
   207  
   208  type outFlowControlSizeRequest struct {
   209  	resp chan uint32
   210  }
   211  
   212  func (*outFlowControlSizeRequest) isTransportResponseFrame() bool { return false }
   213  
   214  // closeConnection is an instruction to tell the loopy writer to flush the
   215  // framer and exit, which will cause the transport's connection to be closed
   216  // (by the client or server).  The transport itself will close after the reader
   217  // encounters the EOF caused by the connection closure.
   218  type closeConnection struct{}
   219  
   220  func (closeConnection) isTransportResponseFrame() bool { return false }
   221  
   222  type outStreamState int
   223  
   224  const (
   225  	active outStreamState = iota
   226  	empty
   227  	waitingOnStreamQuota
   228  )
   229  
   230  type outStream struct {
   231  	id               uint32
   232  	state            outStreamState
   233  	itl              *itemList
   234  	bytesOutStanding int
   235  	wq               *writeQuota
   236  
   237  	next *outStream
   238  	prev *outStream
   239  }
   240  
   241  func (s *outStream) deleteSelf() {
   242  	if s.prev != nil {
   243  		s.prev.next = s.next
   244  	}
   245  	if s.next != nil {
   246  		s.next.prev = s.prev
   247  	}
   248  	s.next, s.prev = nil, nil
   249  }
   250  
   251  type outStreamList struct {
   252  	// Following are sentinel objects that mark the
   253  	// beginning and end of the list. They do not
   254  	// contain any item lists. All valid objects are
   255  	// inserted in between them.
   256  	// This is needed so that an outStream object can
   257  	// deleteSelf() in O(1) time without knowing which
   258  	// list it belongs to.
   259  	head *outStream
   260  	tail *outStream
   261  }
   262  
   263  func newOutStreamList() *outStreamList {
   264  	head, tail := new(outStream), new(outStream)
   265  	head.next = tail
   266  	tail.prev = head
   267  	return &outStreamList{
   268  		head: head,
   269  		tail: tail,
   270  	}
   271  }
   272  
   273  func (l *outStreamList) enqueue(s *outStream) {
   274  	e := l.tail.prev
   275  	e.next = s
   276  	s.prev = e
   277  	s.next = l.tail
   278  	l.tail.prev = s
   279  }
   280  
   281  // remove from the beginning of the list.
   282  func (l *outStreamList) dequeue() *outStream {
   283  	b := l.head.next
   284  	if b == l.tail {
   285  		return nil
   286  	}
   287  	b.deleteSelf()
   288  	return b
   289  }
   290  
   291  // controlBuffer is a way to pass information to loopy.
   292  // Information is passed as specific struct types called control frames.
   293  // A control frame not only represents data, messages or headers to be sent out
   294  // but can also be used to instruct loopy to update its internal state.
   295  // It shouldn't be confused with an HTTP2 frame, although some of the control frames
   296  // like dataFrame and headerFrame do go out on wire as HTTP2 frames.
   297  type controlBuffer struct {
   298  	ch              chan struct{}
   299  	done            <-chan struct{}
   300  	mu              sync.Mutex
   301  	consumerWaiting bool
   302  	list            *itemList
   303  	err             error
   304  
   305  	// transportResponseFrames counts the number of queued items that represent
   306  	// the response of an action initiated by the peer.  trfChan is created
   307  	// when transportResponseFrames >= maxQueuedTransportResponseFrames and is
   308  	// closed and nilled when transportResponseFrames drops below the
   309  	// threshold.  Both fields are protected by mu.
   310  	transportResponseFrames int
   311  	trfChan                 atomic.Value // chan struct{}
   312  }
   313  
   314  func newControlBuffer(done <-chan struct{}) *controlBuffer {
   315  	return &controlBuffer{
   316  		ch:   make(chan struct{}, 1),
   317  		list: &itemList{},
   318  		done: done,
   319  	}
   320  }
   321  
   322  // throttle blocks if there are too many incomingSettings/cleanupStreams in the
   323  // controlbuf.
   324  func (c *controlBuffer) throttle() {
   325  	ch, _ := c.trfChan.Load().(chan struct{})
   326  	if ch != nil {
   327  		select {
   328  		case <-ch:
   329  		case <-c.done:
   330  		}
   331  	}
   332  }
   333  
   334  func (c *controlBuffer) put(it cbItem) error {
   335  	_, err := c.executeAndPut(nil, it)
   336  	return err
   337  }
   338  
   339  func (c *controlBuffer) executeAndPut(f func() bool, it cbItem) (bool, error) {
   340  	var wakeUp bool
   341  	c.mu.Lock()
   342  	if c.err != nil {
   343  		c.mu.Unlock()
   344  		return false, c.err
   345  	}
   346  	if f != nil {
   347  		if !f() { // f wasn't successful
   348  			c.mu.Unlock()
   349  			return false, nil
   350  		}
   351  	}
   352  	if c.consumerWaiting {
   353  		wakeUp = true
   354  		c.consumerWaiting = false
   355  	}
   356  	c.list.enqueue(it)
   357  	if it.isTransportResponseFrame() {
   358  		c.transportResponseFrames++
   359  		if c.transportResponseFrames == maxQueuedTransportResponseFrames {
   360  			// We are adding the frame that puts us over the threshold; create
   361  			// a throttling channel.
   362  			c.trfChan.Store(make(chan struct{}))
   363  		}
   364  	}
   365  	c.mu.Unlock()
   366  	if wakeUp {
   367  		select {
   368  		case c.ch <- struct{}{}:
   369  		default:
   370  		}
   371  	}
   372  	return true, nil
   373  }
   374  
   375  // Note argument f should never be nil.
   376  func (c *controlBuffer) execute(f func(it any) bool, it any) (bool, error) {
   377  	c.mu.Lock()
   378  	if c.err != nil {
   379  		c.mu.Unlock()
   380  		return false, c.err
   381  	}
   382  	if !f(it) { // f wasn't successful
   383  		c.mu.Unlock()
   384  		return false, nil
   385  	}
   386  	c.mu.Unlock()
   387  	return true, nil
   388  }
   389  
   390  func (c *controlBuffer) get(block bool) (any, error) {
   391  	for {
   392  		c.mu.Lock()
   393  		if c.err != nil {
   394  			c.mu.Unlock()
   395  			return nil, c.err
   396  		}
   397  		if !c.list.isEmpty() {
   398  			h := c.list.dequeue().(cbItem)
   399  			if h.isTransportResponseFrame() {
   400  				if c.transportResponseFrames == maxQueuedTransportResponseFrames {
   401  					// We are removing the frame that put us over the
   402  					// threshold; close and clear the throttling channel.
   403  					ch := c.trfChan.Load().(chan struct{})
   404  					close(ch)
   405  					c.trfChan.Store((chan struct{})(nil))
   406  				}
   407  				c.transportResponseFrames--
   408  			}
   409  			c.mu.Unlock()
   410  			return h, nil
   411  		}
   412  		if !block {
   413  			c.mu.Unlock()
   414  			return nil, nil
   415  		}
   416  		c.consumerWaiting = true
   417  		c.mu.Unlock()
   418  		select {
   419  		case <-c.ch:
   420  		case <-c.done:
   421  			return nil, errors.New("transport closed by client")
   422  		}
   423  	}
   424  }
   425  
   426  func (c *controlBuffer) finish() {
   427  	c.mu.Lock()
   428  	if c.err != nil {
   429  		c.mu.Unlock()
   430  		return
   431  	}
   432  	c.err = ErrConnClosing
   433  	// There may be headers for streams in the control buffer.
   434  	// These streams need to be cleaned out since the transport
   435  	// is still not aware of these yet.
   436  	for head := c.list.dequeueAll(); head != nil; head = head.next {
   437  		hdr, ok := head.it.(*headerFrame)
   438  		if !ok {
   439  			continue
   440  		}
   441  		if hdr.onOrphaned != nil { // It will be nil on the server-side.
   442  			hdr.onOrphaned(ErrConnClosing)
   443  		}
   444  	}
   445  	// In case throttle() is currently in flight, it needs to be unblocked.
   446  	// Otherwise, the transport may not close, since the transport is closed by
   447  	// the reader encountering the connection error.
   448  	ch, _ := c.trfChan.Load().(chan struct{})
   449  	if ch != nil {
   450  		close(ch)
   451  	}
   452  	c.trfChan.Store((chan struct{})(nil))
   453  	c.mu.Unlock()
   454  }
   455  
   456  type side int
   457  
   458  const (
   459  	clientSide side = iota
   460  	serverSide
   461  )
   462  
   463  // Loopy receives frames from the control buffer.
   464  // Each frame is handled individually; most of the work done by loopy goes
   465  // into handling data frames. Loopy maintains a queue of active streams, and each
   466  // stream maintains a queue of data frames; as loopy receives data frames
   467  // it gets added to the queue of the relevant stream.
   468  // Loopy goes over this list of active streams by processing one node every iteration,
   469  // thereby closely resemebling to a round-robin scheduling over all streams. While
   470  // processing a stream, loopy writes out data bytes from this stream capped by the min
   471  // of http2MaxFrameLen, connection-level flow control and stream-level flow control.
   472  type loopyWriter struct {
   473  	side      side
   474  	cbuf      *controlBuffer
   475  	sendQuota uint32
   476  	oiws      uint32 // outbound initial window size.
   477  	// estdStreams is map of all established streams that are not cleaned-up yet.
   478  	// On client-side, this is all streams whose headers were sent out.
   479  	// On server-side, this is all streams whose headers were received.
   480  	estdStreams map[uint32]*outStream // Established streams.
   481  	// activeStreams is a linked-list of all streams that have data to send and some
   482  	// stream-level flow control quota.
   483  	// Each of these streams internally have a list of data items(and perhaps trailers
   484  	// on the server-side) to be sent out.
   485  	activeStreams *outStreamList
   486  	framer        *framer
   487  	hBuf          *bytes.Buffer  // The buffer for HPACK encoding.
   488  	hEnc          *hpack.Encoder // HPACK encoder.
   489  	bdpEst        *bdpEstimator
   490  	draining      bool
   491  	conn          net.Conn
   492  	logger        *grpclog.PrefixLogger
   493  
   494  	// Side-specific handlers
   495  	ssGoAwayHandler func(*goAway) (bool, error)
   496  }
   497  
   498  func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator, conn net.Conn, logger *grpclog.PrefixLogger, goAwayHandler func(*goAway) (bool, error)) *loopyWriter {
   499  	var buf bytes.Buffer
   500  	l := &loopyWriter{
   501  		side:            s,
   502  		cbuf:            cbuf,
   503  		sendQuota:       defaultWindowSize,
   504  		oiws:            defaultWindowSize,
   505  		estdStreams:     make(map[uint32]*outStream),
   506  		activeStreams:   newOutStreamList(),
   507  		framer:          fr,
   508  		hBuf:            &buf,
   509  		hEnc:            hpack.NewEncoder(&buf),
   510  		bdpEst:          bdpEst,
   511  		conn:            conn,
   512  		logger:          logger,
   513  		ssGoAwayHandler: goAwayHandler,
   514  	}
   515  	return l
   516  }
   517  
   518  const minBatchSize = 1000
   519  
   520  // run should be run in a separate goroutine.
   521  // It reads control frames from controlBuf and processes them by:
   522  // 1. Updating loopy's internal state, or/and
   523  // 2. Writing out HTTP2 frames on the wire.
   524  //
   525  // Loopy keeps all active streams with data to send in a linked-list.
   526  // All streams in the activeStreams linked-list must have both:
   527  // 1. Data to send, and
   528  // 2. Stream level flow control quota available.
   529  //
   530  // In each iteration of run loop, other than processing the incoming control
   531  // frame, loopy calls processData, which processes one node from the
   532  // activeStreams linked-list.  This results in writing of HTTP2 frames into an
   533  // underlying write buffer.  When there's no more control frames to read from
   534  // controlBuf, loopy flushes the write buffer.  As an optimization, to increase
   535  // the batch size for each flush, loopy yields the processor, once if the batch
   536  // size is too low to give stream goroutines a chance to fill it up.
   537  //
   538  // Upon exiting, if the error causing the exit is not an I/O error, run()
   539  // flushes the underlying connection.  The connection is always left open to
   540  // allow different closing behavior on the client and server.
   541  func (l *loopyWriter) run() (err error) {
   542  	defer func() {
   543  		if l.logger.V(logLevel) {
   544  			l.logger.Infof("loopyWriter exiting with error: %v", err)
   545  		}
   546  		if !isIOError(err) {
   547  			l.framer.writer.Flush()
   548  		}
   549  		l.cbuf.finish()
   550  	}()
   551  	for {
   552  		it, err := l.cbuf.get(true)
   553  		if err != nil {
   554  			return err
   555  		}
   556  		if err = l.handle(it); err != nil {
   557  			return err
   558  		}
   559  		if _, err = l.processData(); err != nil {
   560  			return err
   561  		}
   562  		gosched := true
   563  	hasdata:
   564  		for {
   565  			it, err := l.cbuf.get(false)
   566  			if err != nil {
   567  				return err
   568  			}
   569  			if it != nil {
   570  				if err = l.handle(it); err != nil {
   571  					return err
   572  				}
   573  				if _, err = l.processData(); err != nil {
   574  					return err
   575  				}
   576  				continue hasdata
   577  			}
   578  			isEmpty, err := l.processData()
   579  			if err != nil {
   580  				return err
   581  			}
   582  			if !isEmpty {
   583  				continue hasdata
   584  			}
   585  			if gosched {
   586  				gosched = false
   587  				if l.framer.writer.offset < minBatchSize {
   588  					runtime.Gosched()
   589  					continue hasdata
   590  				}
   591  			}
   592  			l.framer.writer.Flush()
   593  			break hasdata
   594  		}
   595  	}
   596  }
   597  
   598  func (l *loopyWriter) outgoingWindowUpdateHandler(w *outgoingWindowUpdate) error {
   599  	return l.framer.fr.WriteWindowUpdate(w.streamID, w.increment)
   600  }
   601  
   602  func (l *loopyWriter) incomingWindowUpdateHandler(w *incomingWindowUpdate) {
   603  	// Otherwise update the quota.
   604  	if w.streamID == 0 {
   605  		l.sendQuota += w.increment
   606  		return
   607  	}
   608  	// Find the stream and update it.
   609  	if str, ok := l.estdStreams[w.streamID]; ok {
   610  		str.bytesOutStanding -= int(w.increment)
   611  		if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota > 0 && str.state == waitingOnStreamQuota {
   612  			str.state = active
   613  			l.activeStreams.enqueue(str)
   614  			return
   615  		}
   616  	}
   617  }
   618  
   619  func (l *loopyWriter) outgoingSettingsHandler(s *outgoingSettings) error {
   620  	return l.framer.fr.WriteSettings(s.ss...)
   621  }
   622  
   623  func (l *loopyWriter) incomingSettingsHandler(s *incomingSettings) error {
   624  	l.applySettings(s.ss)
   625  	return l.framer.fr.WriteSettingsAck()
   626  }
   627  
   628  func (l *loopyWriter) registerStreamHandler(h *registerStream) {
   629  	str := &outStream{
   630  		id:    h.streamID,
   631  		state: empty,
   632  		itl:   &itemList{},
   633  		wq:    h.wq,
   634  	}
   635  	l.estdStreams[h.streamID] = str
   636  }
   637  
   638  func (l *loopyWriter) headerHandler(h *headerFrame) error {
   639  	if l.side == serverSide {
   640  		str, ok := l.estdStreams[h.streamID]
   641  		if !ok {
   642  			if l.logger.V(logLevel) {
   643  				l.logger.Infof("Unrecognized streamID %d in loopyWriter", h.streamID)
   644  			}
   645  			return nil
   646  		}
   647  		// Case 1.A: Server is responding back with headers.
   648  		if !h.endStream {
   649  			return l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite)
   650  		}
   651  		// else:  Case 1.B: Server wants to close stream.
   652  
   653  		if str.state != empty { // either active or waiting on stream quota.
   654  			// add it str's list of items.
   655  			str.itl.enqueue(h)
   656  			return nil
   657  		}
   658  		if err := l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite); err != nil {
   659  			return err
   660  		}
   661  		return l.cleanupStreamHandler(h.cleanup)
   662  	}
   663  	// Case 2: Client wants to originate stream.
   664  	str := &outStream{
   665  		id:    h.streamID,
   666  		state: empty,
   667  		itl:   &itemList{},
   668  		wq:    h.wq,
   669  	}
   670  	return l.originateStream(str, h)
   671  }
   672  
   673  func (l *loopyWriter) originateStream(str *outStream, hdr *headerFrame) error {
   674  	// l.draining is set when handling GoAway. In which case, we want to avoid
   675  	// creating new streams.
   676  	if l.draining {
   677  		// TODO: provide a better error with the reason we are in draining.
   678  		hdr.onOrphaned(errStreamDrain)
   679  		return nil
   680  	}
   681  	if err := hdr.initStream(str.id); err != nil {
   682  		return err
   683  	}
   684  	if err := l.writeHeader(str.id, hdr.endStream, hdr.hf, hdr.onWrite); err != nil {
   685  		return err
   686  	}
   687  	l.estdStreams[str.id] = str
   688  	return nil
   689  }
   690  
   691  func (l *loopyWriter) writeHeader(streamID uint32, endStream bool, hf []hpack.HeaderField, onWrite func()) error {
   692  	if onWrite != nil {
   693  		onWrite()
   694  	}
   695  	l.hBuf.Reset()
   696  	for _, f := range hf {
   697  		if err := l.hEnc.WriteField(f); err != nil {
   698  			if l.logger.V(logLevel) {
   699  				l.logger.Warningf("Encountered error while encoding headers: %v", err)
   700  			}
   701  		}
   702  	}
   703  	var (
   704  		err               error
   705  		endHeaders, first bool
   706  	)
   707  	first = true
   708  	for !endHeaders {
   709  		size := l.hBuf.Len()
   710  		if size > http2MaxFrameLen {
   711  			size = http2MaxFrameLen
   712  		} else {
   713  			endHeaders = true
   714  		}
   715  		if first {
   716  			first = false
   717  			err = l.framer.fr.WriteHeaders(http2.HeadersFrameParam{
   718  				StreamID:      streamID,
   719  				BlockFragment: l.hBuf.Next(size),
   720  				EndStream:     endStream,
   721  				EndHeaders:    endHeaders,
   722  			})
   723  		} else {
   724  			err = l.framer.fr.WriteContinuation(
   725  				streamID,
   726  				endHeaders,
   727  				l.hBuf.Next(size),
   728  			)
   729  		}
   730  		if err != nil {
   731  			return err
   732  		}
   733  	}
   734  	return nil
   735  }
   736  
   737  func (l *loopyWriter) preprocessData(df *dataFrame) {
   738  	str, ok := l.estdStreams[df.streamID]
   739  	if !ok {
   740  		return
   741  	}
   742  	// If we got data for a stream it means that
   743  	// stream was originated and the headers were sent out.
   744  	str.itl.enqueue(df)
   745  	if str.state == empty {
   746  		str.state = active
   747  		l.activeStreams.enqueue(str)
   748  	}
   749  }
   750  
   751  func (l *loopyWriter) pingHandler(p *ping) error {
   752  	if !p.ack {
   753  		l.bdpEst.timesnap(p.data)
   754  	}
   755  	return l.framer.fr.WritePing(p.ack, p.data)
   756  
   757  }
   758  
   759  func (l *loopyWriter) outFlowControlSizeRequestHandler(o *outFlowControlSizeRequest) {
   760  	o.resp <- l.sendQuota
   761  }
   762  
   763  func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error {
   764  	c.onWrite()
   765  	if str, ok := l.estdStreams[c.streamID]; ok {
   766  		// On the server side it could be a trailers-only response or
   767  		// a RST_STREAM before stream initialization thus the stream might
   768  		// not be established yet.
   769  		delete(l.estdStreams, c.streamID)
   770  		str.deleteSelf()
   771  	}
   772  	if c.rst { // If RST_STREAM needs to be sent.
   773  		if err := l.framer.fr.WriteRSTStream(c.streamID, c.rstCode); err != nil {
   774  			return err
   775  		}
   776  	}
   777  	if l.draining && len(l.estdStreams) == 0 {
   778  		// Flush and close the connection; we are done with it.
   779  		return errors.New("finished processing active streams while in draining mode")
   780  	}
   781  	return nil
   782  }
   783  
   784  func (l *loopyWriter) earlyAbortStreamHandler(eas *earlyAbortStream) error {
   785  	if l.side == clientSide {
   786  		return errors.New("earlyAbortStream not handled on client")
   787  	}
   788  	// In case the caller forgets to set the http status, default to 200.
   789  	if eas.httpStatus == 0 {
   790  		eas.httpStatus = 200
   791  	}
   792  	headerFields := []hpack.HeaderField{
   793  		{Name: ":status", Value: strconv.Itoa(int(eas.httpStatus))},
   794  		{Name: "content-type", Value: grpcutil.ContentType(eas.contentSubtype)},
   795  		{Name: "grpc-status", Value: strconv.Itoa(int(eas.status.Code()))},
   796  		{Name: "grpc-message", Value: encodeGrpcMessage(eas.status.Message())},
   797  	}
   798  
   799  	if err := l.writeHeader(eas.streamID, true, headerFields, nil); err != nil {
   800  		return err
   801  	}
   802  	if eas.rst {
   803  		if err := l.framer.fr.WriteRSTStream(eas.streamID, http2.ErrCodeNo); err != nil {
   804  			return err
   805  		}
   806  	}
   807  	return nil
   808  }
   809  
   810  func (l *loopyWriter) incomingGoAwayHandler(*incomingGoAway) error {
   811  	if l.side == clientSide {
   812  		l.draining = true
   813  		if len(l.estdStreams) == 0 {
   814  			// Flush and close the connection; we are done with it.
   815  			return errors.New("received GOAWAY with no active streams")
   816  		}
   817  	}
   818  	return nil
   819  }
   820  
   821  func (l *loopyWriter) goAwayHandler(g *goAway) error {
   822  	// Handling of outgoing GoAway is very specific to side.
   823  	if l.ssGoAwayHandler != nil {
   824  		draining, err := l.ssGoAwayHandler(g)
   825  		if err != nil {
   826  			return err
   827  		}
   828  		l.draining = draining
   829  	}
   830  	return nil
   831  }
   832  
   833  func (l *loopyWriter) handle(i any) error {
   834  	switch i := i.(type) {
   835  	case *incomingWindowUpdate:
   836  		l.incomingWindowUpdateHandler(i)
   837  	case *outgoingWindowUpdate:
   838  		return l.outgoingWindowUpdateHandler(i)
   839  	case *incomingSettings:
   840  		return l.incomingSettingsHandler(i)
   841  	case *outgoingSettings:
   842  		return l.outgoingSettingsHandler(i)
   843  	case *headerFrame:
   844  		return l.headerHandler(i)
   845  	case *registerStream:
   846  		l.registerStreamHandler(i)
   847  	case *cleanupStream:
   848  		return l.cleanupStreamHandler(i)
   849  	case *earlyAbortStream:
   850  		return l.earlyAbortStreamHandler(i)
   851  	case *incomingGoAway:
   852  		return l.incomingGoAwayHandler(i)
   853  	case *dataFrame:
   854  		l.preprocessData(i)
   855  	case *ping:
   856  		return l.pingHandler(i)
   857  	case *goAway:
   858  		return l.goAwayHandler(i)
   859  	case *outFlowControlSizeRequest:
   860  		l.outFlowControlSizeRequestHandler(i)
   861  	case closeConnection:
   862  		// Just return a non-I/O error and run() will flush and close the
   863  		// connection.
   864  		return ErrConnClosing
   865  	default:
   866  		return fmt.Errorf("transport: unknown control message type %T", i)
   867  	}
   868  	return nil
   869  }
   870  
   871  func (l *loopyWriter) applySettings(ss []http2.Setting) {
   872  	for _, s := range ss {
   873  		switch s.ID {
   874  		case http2.SettingInitialWindowSize:
   875  			o := l.oiws
   876  			l.oiws = s.Val
   877  			if o < l.oiws {
   878  				// If the new limit is greater make all depleted streams active.
   879  				for _, stream := range l.estdStreams {
   880  					if stream.state == waitingOnStreamQuota {
   881  						stream.state = active
   882  						l.activeStreams.enqueue(stream)
   883  					}
   884  				}
   885  			}
   886  		case http2.SettingHeaderTableSize:
   887  			updateHeaderTblSize(l.hEnc, s.Val)
   888  		}
   889  	}
   890  }
   891  
   892  // processData removes the first stream from active streams, writes out at most 16KB
   893  // of its data and then puts it at the end of activeStreams if there's still more data
   894  // to be sent and stream has some stream-level flow control.
   895  func (l *loopyWriter) processData() (bool, error) {
   896  	if l.sendQuota == 0 {
   897  		return true, nil
   898  	}
   899  	str := l.activeStreams.dequeue() // Remove the first stream.
   900  	if str == nil {
   901  		return true, nil
   902  	}
   903  	dataItem := str.itl.peek().(*dataFrame) // Peek at the first data item this stream.
   904  	// A data item is represented by a dataFrame, since it later translates into
   905  	// multiple HTTP2 data frames.
   906  	// Every dataFrame has two buffers; h that keeps grpc-message header and d that is actual data.
   907  	// As an optimization to keep wire traffic low, data from d is copied to h to make as big as the
   908  	// maximum possible HTTP2 frame size.
   909  
   910  	if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // Empty data frame
   911  		// Client sends out empty data frame with endStream = true
   912  		if err := l.framer.fr.WriteData(dataItem.streamID, dataItem.endStream, nil); err != nil {
   913  			return false, err
   914  		}
   915  		str.itl.dequeue() // remove the empty data item from stream
   916  		if str.itl.isEmpty() {
   917  			str.state = empty
   918  		} else if trailer, ok := str.itl.peek().(*headerFrame); ok { // the next item is trailers.
   919  			if err := l.writeHeader(trailer.streamID, trailer.endStream, trailer.hf, trailer.onWrite); err != nil {
   920  				return false, err
   921  			}
   922  			if err := l.cleanupStreamHandler(trailer.cleanup); err != nil {
   923  				return false, err
   924  			}
   925  		} else {
   926  			l.activeStreams.enqueue(str)
   927  		}
   928  		return false, nil
   929  	}
   930  	var (
   931  		buf []byte
   932  	)
   933  	// Figure out the maximum size we can send
   934  	maxSize := http2MaxFrameLen
   935  	if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota <= 0 { // stream-level flow control.
   936  		str.state = waitingOnStreamQuota
   937  		return false, nil
   938  	} else if maxSize > strQuota {
   939  		maxSize = strQuota
   940  	}
   941  	if maxSize > int(l.sendQuota) { // connection-level flow control.
   942  		maxSize = int(l.sendQuota)
   943  	}
   944  	// Compute how much of the header and data we can send within quota and max frame length
   945  	hSize := min(maxSize, len(dataItem.h))
   946  	dSize := min(maxSize-hSize, len(dataItem.d))
   947  	if hSize != 0 {
   948  		if dSize == 0 {
   949  			buf = dataItem.h
   950  		} else {
   951  			// We can add some data to grpc message header to distribute bytes more equally across frames.
   952  			// Copy on the stack to avoid generating garbage
   953  			var localBuf [http2MaxFrameLen]byte
   954  			copy(localBuf[:hSize], dataItem.h)
   955  			copy(localBuf[hSize:], dataItem.d[:dSize])
   956  			buf = localBuf[:hSize+dSize]
   957  		}
   958  	} else {
   959  		buf = dataItem.d
   960  	}
   961  
   962  	size := hSize + dSize
   963  
   964  	// Now that outgoing flow controls are checked we can replenish str's write quota
   965  	str.wq.replenish(size)
   966  	var endStream bool
   967  	// If this is the last data message on this stream and all of it can be written in this iteration.
   968  	if dataItem.endStream && len(dataItem.h)+len(dataItem.d) <= size {
   969  		endStream = true
   970  	}
   971  	if dataItem.onEachWrite != nil {
   972  		dataItem.onEachWrite()
   973  	}
   974  	if err := l.framer.fr.WriteData(dataItem.streamID, endStream, buf[:size]); err != nil {
   975  		return false, err
   976  	}
   977  	str.bytesOutStanding += size
   978  	l.sendQuota -= uint32(size)
   979  	dataItem.h = dataItem.h[hSize:]
   980  	dataItem.d = dataItem.d[dSize:]
   981  
   982  	if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // All the data from that message was written out.
   983  		str.itl.dequeue()
   984  	}
   985  	if str.itl.isEmpty() {
   986  		str.state = empty
   987  	} else if trailer, ok := str.itl.peek().(*headerFrame); ok { // The next item is trailers.
   988  		if err := l.writeHeader(trailer.streamID, trailer.endStream, trailer.hf, trailer.onWrite); err != nil {
   989  			return false, err
   990  		}
   991  		if err := l.cleanupStreamHandler(trailer.cleanup); err != nil {
   992  			return false, err
   993  		}
   994  	} else if int(l.oiws)-str.bytesOutStanding <= 0 { // Ran out of stream quota.
   995  		str.state = waitingOnStreamQuota
   996  	} else { // Otherwise add it back to the list of active streams.
   997  		l.activeStreams.enqueue(str)
   998  	}
   999  	return false, nil
  1000  }
  1001  
  1002  func min(a, b int) int {
  1003  	if a < b {
  1004  		return a
  1005  	}
  1006  	return b
  1007  }
  1008  

View as plain text