
Source file src/golang.org/x/net/quic/stream.go

Documentation: golang.org/x/net/quic

     1  // Copyright 2023 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     5  //go:build go1.21
     7  package quic
     9  import (
    10  	"context"
    11  	"errors"
    12  	"fmt"
    13  	"io"
    14  	"math"
    15  )
    17  // A Stream is an ordered byte stream.
    18  //
    19  // Streams may be bidirectional, read-only, or write-only.
    20  // Methods inappropriate for a stream's direction
    21  // (for example, [Write] to a read-only stream)
    22  // return errors.
    23  //
    24  // It is not safe to perform concurrent reads from or writes to a stream.
    25  // It is safe, however, to read and write at the same time.
    26  //
    27  // Reads and writes are buffered.
    28  // It is generally not necessary to wrap a stream in a [bufio.ReadWriter]
    29  // or otherwise apply additional buffering.
    30  //
    31  // To cancel reads or writes, use the [SetReadContext] and [SetWriteContext] methods.
    32  type Stream struct {
    33  	id   streamID
    34  	conn *Conn
    36  	// Contexts used for read/write operations.
    37  	// Intentionally not mutex-guarded, to allow the race detector to catch concurrent access.
    38  	inctx  context.Context
    39  	outctx context.Context
    41  	// ingate's lock guards receive-related state.
    42  	//
    43  	// The gate condition is set if a read from the stream will not block,
    44  	// either because the stream has available data or because the read will fail.
    45  	ingate      gate
    46  	in          pipe            // received data
    47  	inwin       int64           // last MAX_STREAM_DATA sent to the peer
    48  	insendmax   sentVal         // set when we should send MAX_STREAM_DATA to the peer
    49  	inmaxbuf    int64           // maximum amount of data we will buffer
    50  	insize      int64           // stream final size; -1 before this is known
    51  	inset       rangeset[int64] // received ranges
    52  	inclosed    sentVal         // set by CloseRead
    53  	inresetcode int64           // RESET_STREAM code received from the peer; -1 if not reset
    55  	// outgate's lock guards send-related state.
    56  	//
    57  	// The gate condition is set if a write to the stream will not block,
    58  	// either because the stream has available flow control or because
    59  	// the write will fail.
    60  	outgate      gate
    61  	out          pipe            // buffered data to send
    62  	outflushed   int64           // offset of last flush call
    63  	outwin       int64           // maximum MAX_STREAM_DATA received from the peer
    64  	outmaxsent   int64           // maximum data offset we've sent to the peer
    65  	outmaxbuf    int64           // maximum amount of data we will buffer
    66  	outunsent    rangeset[int64] // ranges buffered but not yet sent (only flushed data)
    67  	outacked     rangeset[int64] // ranges sent and acknowledged
    68  	outopened    sentVal         // set if we should open the stream
    69  	outclosed    sentVal         // set by CloseWrite
    70  	outblocked   sentVal         // set when a write to the stream is blocked by flow control
    71  	outreset     sentVal         // set by Reset
    72  	outresetcode uint64          // reset code to send in RESET_STREAM
    73  	outdone      chan struct{}   // closed when all data sent
    75  	// Unsynchronized buffers, used for lock-free fast path.
    76  	inbuf     []byte // received data
    77  	inbufoff  int    // bytes of inbuf which have been consumed
    78  	outbuf    []byte // written data
    79  	outbufoff int    // bytes of outbuf which contain data to write
    81  	// Atomic stream state bits.
    82  	//
    83  	// These bits provide a fast way to coordinate between the
    84  	// send and receive sides of the stream, and the conn's loop.
    85  	//
    86  	// streamIn* bits must be set with ingate held.
    87  	// streamOut* bits must be set with outgate held.
    88  	// streamConn* bits are set by the conn's loop.
    89  	// streamQueue* bits must be set with streamsState.sendMu held.
    90  	state atomicBits[streamState]
    92  	prev, next *Stream // guarded by streamsState.sendMu
    93  }
    95  type streamState uint32
    97  const (
    98  	// streamInSendMeta is set when there are frames to send for the
    99  	// inbound side of the stream. For example, MAX_STREAM_DATA.
   100  	// Inbound frames are never flow-controlled.
   101  	streamInSendMeta = streamState(1 << iota)
   103  	// streamOutSendMeta is set when there are non-flow-controlled frames
   104  	// to send for the outbound side of the stream. For example, STREAM_DATA_BLOCKED.
   105  	// streamOutSendData is set when there are no non-flow-controlled outbound frames
   106  	// and the stream has data to send.
   107  	//
   108  	// At most one of streamOutSendMeta and streamOutSendData is set at any time.
   109  	streamOutSendMeta
   110  	streamOutSendData
   112  	// streamInDone and streamOutDone are set when the inbound or outbound
   113  	// sides of the stream are finished. When both are set, the stream
   114  	// can be removed from the Conn and forgotten.
   115  	streamInDone
   116  	streamOutDone
   118  	// streamConnRemoved is set when the stream has been removed from the conn.
   119  	streamConnRemoved
   121  	// streamQueueMeta and streamQueueData indicate which of the streamsState
   122  	// send queues the conn is currently on.
   123  	streamQueueMeta
   124  	streamQueueData
   125  )
   127  type streamQueue int
   129  const (
   130  	noQueue   = streamQueue(iota)
   131  	metaQueue // streamsState.queueMeta
   132  	dataQueue // streamsState.queueData
   133  )
   135  // streamResetByConnClose is assigned to Stream.inresetcode to indicate that a stream
   136  // was implicitly reset when the connection closed. It's out of the range of
   137  // possible reset codes the peer can send.
   138  const streamResetByConnClose = math.MaxInt64
   140  // wantQueue returns the send queue the stream should be on.
   141  func (s streamState) wantQueue() streamQueue {
   142  	switch {
   143  	case s&(streamInSendMeta|streamOutSendMeta) != 0:
   144  		return metaQueue
   145  	case s&(streamInDone|streamOutDone|streamConnRemoved) == streamInDone|streamOutDone:
   146  		return metaQueue
   147  	case s&streamOutSendData != 0:
   148  		// The stream has no non-flow-controlled frames to send,
   149  		// but does have data. Put it on the data queue, which is only
   150  		// processed when flow control is available.
   151  		return dataQueue
   152  	}
   153  	return noQueue
   154  }
   156  // inQueue returns the send queue the stream is currently on.
   157  func (s streamState) inQueue() streamQueue {
   158  	switch {
   159  	case s&streamQueueMeta != 0:
   160  		return metaQueue
   161  	case s&streamQueueData != 0:
   162  		return dataQueue
   163  	}
   164  	return noQueue
   165  }
   167  // newStream returns a new stream.
   168  //
   169  // The stream's ingate and outgate are locked.
   170  // (We create the stream with locked gates so after the caller
   171  // initializes the flow control window,
   172  // unlocking outgate will set the stream writability state.)
   173  func newStream(c *Conn, id streamID) *Stream {
   174  	s := &Stream{
   175  		conn:        c,
   176  		id:          id,
   177  		insize:      -1, // -1 indicates the stream size is unknown
   178  		inresetcode: -1, // -1 indicates no RESET_STREAM received
   179  		ingate:      newLockedGate(),
   180  		outgate:     newLockedGate(),
   181  		inctx:       context.Background(),
   182  		outctx:      context.Background(),
   183  	}
   184  	if !s.IsReadOnly() {
   185  		s.outdone = make(chan struct{})
   186  	}
   187  	return s
   188  }
   190  // SetReadContext sets the context used for reads from the stream.
   191  //
   192  // It is not safe to call SetReadContext concurrently.
   193  func (s *Stream) SetReadContext(ctx context.Context) {
   194  	s.inctx = ctx
   195  }
   197  // SetWriteContext sets the context used for writes to the stream.
   198  // The write context is also used by Close when waiting for writes to be
   199  // received by the peer.
   200  //
   201  // It is not safe to call SetWriteContext concurrently.
   202  func (s *Stream) SetWriteContext(ctx context.Context) {
   203  	s.outctx = ctx
   204  }
   206  // IsReadOnly reports whether the stream is read-only
   207  // (a unidirectional stream created by the peer).
   208  func (s *Stream) IsReadOnly() bool {
   209  	return s.id.streamType() == uniStream && s.id.initiator() != s.conn.side
   210  }
   212  // IsWriteOnly reports whether the stream is write-only
   213  // (a unidirectional stream created locally).
   214  func (s *Stream) IsWriteOnly() bool {
   215  	return s.id.streamType() == uniStream && s.id.initiator() == s.conn.side
   216  }
   218  // Read reads data from the stream.
   219  //
   220  // Read returns as soon as at least one byte of data is available.
   221  //
   222  // If the peer closes the stream cleanly, Read returns io.EOF after
   223  // returning all data sent by the peer.
   224  // If the peer aborts reads on the stream, Read returns
   225  // an error wrapping StreamResetCode.
   226  //
   227  // It is not safe to call Read concurrently.
   228  func (s *Stream) Read(b []byte) (n int, err error) {
   229  	if s.IsWriteOnly() {
   230  		return 0, errors.New("read from write-only stream")
   231  	}
   232  	if len(s.inbuf) > s.inbufoff {
   233  		// Fast path: If s.inbuf contains unread bytes, return them immediately
   234  		// without taking a lock.
   235  		n = copy(b, s.inbuf[s.inbufoff:])
   236  		s.inbufoff += n
   237  		return n, nil
   238  	}
   239  	if err := s.ingate.waitAndLock(s.inctx, s.conn.testHooks); err != nil {
   240  		return 0, err
   241  	}
   242  	if s.inbufoff > 0 {
   243  		// Discard bytes consumed by the fast path above.
   244  		s.in.discardBefore(s.in.start + int64(s.inbufoff))
   245  		s.inbufoff = 0
   246  		s.inbuf = nil
   247  	}
   248  	// bytesRead contains the number of bytes of connection-level flow control to return.
   249  	// We return flow control for bytes read by this Read call, as well as bytes moved
   250  	// to the fast-path read buffer (s.inbuf).
   251  	var bytesRead int64
   252  	defer func() {
   253  		s.inUnlock()
   254  		s.conn.handleStreamBytesReadOffLoop(bytesRead) // must be done with ingate unlocked
   255  	}()
   256  	if s.inresetcode != -1 {
   257  		return 0, fmt.Errorf("stream reset by peer: %w", StreamErrorCode(s.inresetcode))
   258  	}
   259  	if s.inclosed.isSet() {
   260  		return 0, errors.New("read from closed stream")
   261  	}
   262  	if s.insize == s.in.start {
   263  		return 0, io.EOF
   264  	}
   265  	// Getting here indicates the stream contains data to be read.
   266  	if len(s.inset) < 1 || s.inset[0].start != 0 || s.inset[0].end <= s.in.start {
   267  		panic("BUG: inconsistent input stream state")
   268  	}
   269  	if size := int(s.inset[0].end - s.in.start); size < len(b) {
   270  		b = b[:size]
   271  	}
   272  	bytesRead = int64(len(b))
   273  	start := s.in.start
   274  	end := start + int64(len(b))
   275  	s.in.copy(start, b)
   276  	s.in.discardBefore(end)
   277  	if end == s.insize {
   278  		// We have read up to the end of the stream.
   279  		// No need to update stream flow control.
   280  		return len(b), io.EOF
   281  	}
   282  	if len(s.inset) > 0 && s.inset[0].start <= s.in.start && s.inset[0].end > s.in.start {
   283  		// If we have more readable bytes available, put the next chunk of data
   284  		// in s.inbuf for lock-free reads.
   285  		s.inbuf = s.in.peek(s.inset[0].end - s.in.start)
   286  		bytesRead += int64(len(s.inbuf))
   287  	}
   288  	if s.insize == -1 || s.insize > s.inwin {
   289  		newWindow := s.in.start + int64(len(s.inbuf)) + s.inmaxbuf
   290  		addedWindow := newWindow - s.inwin
   291  		if shouldUpdateFlowControl(s.inmaxbuf, addedWindow) {
   292  			// Update stream flow control with a STREAM_MAX_DATA frame.
   293  			s.insendmax.setUnsent()
   294  		}
   295  	}
   296  	return len(b), nil
   297  }
   299  // ReadByte reads and returns a single byte from the stream.
   300  //
   301  // It is not safe to call ReadByte concurrently.
   302  func (s *Stream) ReadByte() (byte, error) {
   303  	if len(s.inbuf) > s.inbufoff {
   304  		b := s.inbuf[s.inbufoff]
   305  		s.inbufoff++
   306  		return b, nil
   307  	}
   308  	var b [1]byte
   309  	n, err := s.Read(b[:])
   310  	if n > 0 {
   311  		return b[0], err
   312  	}
   313  	return 0, err
   314  }
   316  // shouldUpdateFlowControl determines whether to send a flow control window update.
   317  //
   318  // We want to balance keeping the peer well-supplied with flow control with not sending
   319  // many small updates.
   320  func shouldUpdateFlowControl(maxWindow, addedWindow int64) bool {
   321  	return addedWindow >= maxWindow/8
   322  }
   324  // Write writes data to the stream.
   325  //
   326  // Write writes data to the stream write buffer.
   327  // Buffered data is only sent when the buffer is sufficiently full.
   328  // Call the Flush method to ensure buffered data is sent.
   329  func (s *Stream) Write(b []byte) (n int, err error) {
   330  	if s.IsReadOnly() {
   331  		return 0, errors.New("write to read-only stream")
   332  	}
   333  	if len(b) > 0 && len(s.outbuf)-s.outbufoff >= len(b) {
   334  		// Fast path: The data to write fits in s.outbuf.
   335  		copy(s.outbuf[s.outbufoff:], b)
   336  		s.outbufoff += len(b)
   337  		return len(b), nil
   338  	}
   339  	canWrite := s.outgate.lock()
   340  	s.flushFastOutputBuffer()
   341  	for {
   342  		// The first time through this loop, we may or may not be write blocked.
   343  		// We exit the loop after writing all data, so on subsequent passes through
   344  		// the loop we are always write blocked.
   345  		if len(b) > 0 && !canWrite {
   346  			// Our send buffer is full. Wait for the peer to ack some data.
   347  			s.outUnlock()
   348  			if err := s.outgate.waitAndLock(s.outctx, s.conn.testHooks); err != nil {
   349  				return n, err
   350  			}
   351  			// Successfully returning from waitAndLockGate means we are no longer
   352  			// write blocked. (Unlike traditional condition variables, gates do not
   353  			// have spurious wakeups.)
   354  		}
   355  		if s.outreset.isSet() {
   356  			s.outUnlock()
   357  			return n, errors.New("write to reset stream")
   358  		}
   359  		if s.outclosed.isSet() {
   360  			s.outUnlock()
   361  			return n, errors.New("write to closed stream")
   362  		}
   363  		if len(b) == 0 {
   364  			break
   365  		}
   366  		// Write limit is our send buffer limit.
   367  		// This is a stream offset.
   368  		lim := s.out.start + s.outmaxbuf
   369  		// Amount to write is min(the full buffer, data up to the write limit).
   370  		// This is a number of bytes.
   371  		nn := min(int64(len(b)), lim-s.out.end)
   372  		// Copy the data into the output buffer.
   373  		s.out.writeAt(b[:nn], s.out.end)
   374  		b = b[nn:]
   375  		n += int(nn)
   376  		// Possibly flush the output buffer.
   377  		// We automatically flush if:
   378  		//   - We have enough data to consume the send window.
   379  		//     Sending this data may cause the peer to extend the window.
   380  		//   - We have buffered as much data as we're willing do.
   381  		//     We need to send data to clear out buffer space.
   382  		//   - We have enough data to fill a 1-RTT packet using the smallest
   383  		//     possible maximum datagram size (1200 bytes, less header byte,
   384  		//     connection ID, packet number, and AEAD overhead).
   385  		const autoFlushSize = smallestMaxDatagramSize - 1 - connIDLen - 1 - aeadOverhead
   386  		shouldFlush := s.out.end >= s.outwin || // peer send window is full
   387  			s.out.end >= lim || // local send buffer is full
   388  			(s.out.end-s.outflushed) >= autoFlushSize // enough data buffered
   389  		if shouldFlush {
   390  			s.flushLocked()
   391  		}
   392  		if s.out.end > s.outwin {
   393  			// We're blocked by flow control.
   394  			// Send a STREAM_DATA_BLOCKED frame to let the peer know.
   395  			s.outblocked.set()
   396  		}
   397  		// If we have bytes left to send, we're blocked.
   398  		canWrite = false
   399  	}
   400  	if lim := s.out.start + s.outmaxbuf - s.out.end - 1; lim > 0 {
   401  		// If s.out has space allocated and available to be written into,
   402  		// then reference it in s.outbuf for fast-path writes.
   403  		//
   404  		// It's perhaps a bit pointless to limit s.outbuf to the send buffer limit.
   405  		// We've already allocated this buffer so we aren't saving any memory
   406  		// by not using it.
   407  		// For now, we limit it anyway to make it easier to reason about limits.
   408  		//
   409  		// We set the limit to one less than the send buffer limit (the -1 above)
   410  		// so that a write which completely fills the buffer will overflow
   411  		// s.outbuf and trigger a flush.
   412  		s.outbuf = s.out.availableBuffer()
   413  		if int64(len(s.outbuf)) > lim {
   414  			s.outbuf = s.outbuf[:lim]
   415  		}
   416  	}
   417  	s.outUnlock()
   418  	return n, nil
   419  }
   421  // WriteBytes writes a single byte to the stream.
   422  func (s *Stream) WriteByte(c byte) error {
   423  	if s.outbufoff < len(s.outbuf) {
   424  		s.outbuf[s.outbufoff] = c
   425  		s.outbufoff++
   426  		return nil
   427  	}
   428  	b := [1]byte{c}
   429  	_, err := s.Write(b[:])
   430  	return err
   431  }
   433  func (s *Stream) flushFastOutputBuffer() {
   434  	if s.outbuf == nil {
   435  		return
   436  	}
   437  	// Commit data previously written to s.outbuf.
   438  	// s.outbuf is a reference to a buffer in s.out, so we just need to record
   439  	// that the output buffer has been extended.
   440  	s.out.end += int64(s.outbufoff)
   441  	s.outbuf = nil
   442  	s.outbufoff = 0
   443  }
   445  // Flush flushes data written to the stream.
   446  // It does not wait for the peer to acknowledge receipt of the data.
   447  // Use Close to wait for the peer's acknowledgement.
   448  func (s *Stream) Flush() {
   449  	s.outgate.lock()
   450  	defer s.outUnlock()
   451  	s.flushLocked()
   452  }
   454  func (s *Stream) flushLocked() {
   455  	s.flushFastOutputBuffer()
   456  	s.outopened.set()
   457  	if s.outflushed < s.outwin {
   458  		s.outunsent.add(s.outflushed, min(s.outwin, s.out.end))
   459  	}
   460  	s.outflushed = s.out.end
   461  }
   463  // Close closes the stream.
   464  // Any blocked stream operations will be unblocked and return errors.
   465  //
   466  // Close flushes any data in the stream write buffer and waits for the peer to
   467  // acknowledge receipt of the data.
   468  // If the stream has been reset, it waits for the peer to acknowledge the reset.
   469  // If the context expires before the peer receives the stream's data,
   470  // Close discards the buffer and returns the context error.
   471  func (s *Stream) Close() error {
   472  	s.CloseRead()
   473  	if s.IsReadOnly() {
   474  		return nil
   475  	}
   476  	s.CloseWrite()
   477  	// TODO: Return code from peer's RESET_STREAM frame?
   478  	if err := s.conn.waitOnDone(s.outctx, s.outdone); err != nil {
   479  		return err
   480  	}
   481  	s.outgate.lock()
   482  	defer s.outUnlock()
   483  	if s.outclosed.isReceived() && s.outacked.isrange(0, s.out.end) {
   484  		return nil
   485  	}
   486  	return errors.New("stream reset")
   487  }
   489  // CloseRead aborts reads on the stream.
   490  // Any blocked reads will be unblocked and return errors.
   491  //
   492  // CloseRead notifies the peer that the stream has been closed for reading.
   493  // It does not wait for the peer to acknowledge the closure.
   494  // Use Close to wait for the peer's acknowledgement.
   495  func (s *Stream) CloseRead() {
   496  	if s.IsWriteOnly() {
   497  		return
   498  	}
   499  	s.ingate.lock()
   500  	if s.inset.isrange(0, s.insize) || s.inresetcode != -1 {
   501  		// We've already received all data from the peer,
   502  		// so there's no need to send STOP_SENDING.
   503  		// This is the same as saying we sent one and they got it.
   504  		s.inclosed.setReceived()
   505  	} else {
   506  		s.inclosed.set()
   507  	}
   508  	discarded := s.in.end - s.in.start
   509  	s.in.discardBefore(s.in.end)
   510  	s.inUnlock()
   511  	s.conn.handleStreamBytesReadOffLoop(discarded) // must be done with ingate unlocked
   512  }
   514  // CloseWrite aborts writes on the stream.
   515  // Any blocked writes will be unblocked and return errors.
   516  //
   517  // CloseWrite sends any data in the stream write buffer to the peer.
   518  // It does not wait for the peer to acknowledge receipt of the data.
   519  // Use Close to wait for the peer's acknowledgement.
   520  func (s *Stream) CloseWrite() {
   521  	if s.IsReadOnly() {
   522  		return
   523  	}
   524  	s.outgate.lock()
   525  	defer s.outUnlock()
   526  	s.outclosed.set()
   527  	s.flushLocked()
   528  }
   530  // Reset aborts writes on the stream and notifies the peer
   531  // that the stream was terminated abruptly.
   532  // Any blocked writes will be unblocked and return errors.
   533  //
   534  // Reset sends the application protocol error code, which must be
   535  // less than 2^62, to the peer.
   536  // It does not wait for the peer to acknowledge receipt of the error.
   537  // Use Close to wait for the peer's acknowledgement.
   538  //
   539  // Reset does not affect reads.
   540  // Use CloseRead to abort reads on the stream.
   541  func (s *Stream) Reset(code uint64) {
   542  	const userClosed = true
   543  	s.resetInternal(code, userClosed)
   544  }
   546  // resetInternal resets the send side of the stream.
   547  //
   548  // If userClosed is true, this is s.Reset.
   549  // If userClosed is false, this is a reaction to a STOP_SENDING frame.
   550  func (s *Stream) resetInternal(code uint64, userClosed bool) {
   551  	s.outgate.lock()
   552  	defer s.outUnlock()
   553  	if s.IsReadOnly() {
   554  		return
   555  	}
   556  	if userClosed {
   557  		// Mark that the user closed the stream.
   558  		s.outclosed.set()
   559  	}
   560  	if s.outreset.isSet() {
   561  		return
   562  	}
   563  	if code > maxVarint {
   564  		code = maxVarint
   565  	}
   566  	// We could check here to see if the stream is closed and the
   567  	// peer has acked all the data and the FIN, but sending an
   568  	// extra RESET_STREAM in this case is harmless.
   569  	s.outreset.set()
   570  	s.outresetcode = code
   571  	s.outbuf = nil
   572  	s.outbufoff = 0
   573  	s.out.discardBefore(s.out.end)
   574  	s.outunsent = rangeset[int64]{}
   575  	s.outblocked.clear()
   576  }
   578  // connHasClosed indicates the stream's conn has closed.
   579  func (s *Stream) connHasClosed() {
   580  	// If we're in the closing state, the user closed the conn.
   581  	// Otherwise, we the peer initiated the close.
   582  	// This only matters for the error we're going to return from stream operations.
   583  	localClose := s.conn.lifetime.state == connStateClosing
   585  	s.ingate.lock()
   586  	if !s.inset.isrange(0, s.insize) && s.inresetcode == -1 {
   587  		if localClose {
   588  			s.inclosed.set()
   589  		} else {
   590  			s.inresetcode = streamResetByConnClose
   591  		}
   592  	}
   593  	s.inUnlock()
   595  	s.outgate.lock()
   596  	if localClose {
   597  		s.outclosed.set()
   598  	}
   599  	s.outreset.set()
   600  	s.outUnlock()
   601  }
   603  // inUnlock unlocks s.ingate.
   604  // It sets the gate condition if reads from s will not block.
   605  // If s has receive-related frames to write or if both directions
   606  // are done and the stream should be removed, it notifies the Conn.
   607  func (s *Stream) inUnlock() {
   608  	state := s.inUnlockNoQueue()
   609  	s.conn.maybeQueueStreamForSend(s, state)
   610  }
   612  // inUnlockNoQueue is inUnlock,
   613  // but reports whether s has frames to write rather than notifying the Conn.
   614  func (s *Stream) inUnlockNoQueue() streamState {
   615  	nextByte := s.in.start + int64(len(s.inbuf))
   616  	canRead := s.inset.contains(nextByte) || // data available to read
   617  		s.insize == s.in.start+int64(len(s.inbuf)) || // at EOF
   618  		s.inresetcode != -1 || // reset by peer
   619  		s.inclosed.isSet() // closed locally
   620  	defer s.ingate.unlock(canRead)
   621  	var state streamState
   622  	switch {
   623  	case s.IsWriteOnly():
   624  		state = streamInDone
   625  	case s.inresetcode != -1: // reset by peer
   626  		fallthrough
   627  	case s.in.start == s.insize: // all data received and read
   628  		// We don't increase MAX_STREAMS until the user calls ReadClose or Close,
   629  		// so the receive side is not finished until inclosed is set.
   630  		if s.inclosed.isSet() {
   631  			state = streamInDone
   632  		}
   633  	case s.insendmax.shouldSend(): // STREAM_MAX_DATA
   634  		state = streamInSendMeta
   635  	case s.inclosed.shouldSend(): // STOP_SENDING
   636  		state = streamInSendMeta
   637  	}
   638  	const mask = streamInDone | streamInSendMeta
   639  	return s.state.set(state, mask)
   640  }
   642  // outUnlock unlocks s.outgate.
   643  // It sets the gate condition if writes to s will not block.
   644  // If s has send-related frames to write or if both directions
   645  // are done and the stream should be removed, it notifies the Conn.
   646  func (s *Stream) outUnlock() {
   647  	state := s.outUnlockNoQueue()
   648  	s.conn.maybeQueueStreamForSend(s, state)
   649  }
   651  // outUnlockNoQueue is outUnlock,
   652  // but reports whether s has frames to write rather than notifying the Conn.
   653  func (s *Stream) outUnlockNoQueue() streamState {
   654  	isDone := s.outclosed.isReceived() && s.outacked.isrange(0, s.out.end) || // all data acked
   655  		s.outreset.isSet() // reset locally
   656  	if isDone {
   657  		select {
   658  		case <-s.outdone:
   659  		default:
   660  			if !s.IsReadOnly() {
   661  				close(s.outdone)
   662  			}
   663  		}
   664  	}
   665  	lim := s.out.start + s.outmaxbuf
   666  	canWrite := lim > s.out.end || // available send buffer
   667  		s.outclosed.isSet() || // closed locally
   668  		s.outreset.isSet() // reset locally
   669  	defer s.outgate.unlock(canWrite)
   670  	var state streamState
   671  	switch {
   672  	case s.IsReadOnly():
   673  		state = streamOutDone
   674  	case s.outclosed.isReceived() && s.outacked.isrange(0, s.out.end): // all data sent and acked
   675  		fallthrough
   676  	case s.outreset.isReceived(): // RESET_STREAM sent and acked
   677  		// We don't increase MAX_STREAMS until the user calls WriteClose or Close,
   678  		// so the send side is not finished until outclosed is set.
   679  		if s.outclosed.isSet() {
   680  			state = streamOutDone
   681  		}
   682  	case s.outreset.shouldSend(): // RESET_STREAM
   683  		state = streamOutSendMeta
   684  	case s.outreset.isSet(): // RESET_STREAM sent but not acknowledged
   685  	case s.outblocked.shouldSend(): // STREAM_DATA_BLOCKED
   686  		state = streamOutSendMeta
   687  	case len(s.outunsent) > 0: // STREAM frame with data
   688  		if s.outunsent.min() < s.outmaxsent {
   689  			state = streamOutSendMeta // resent data, will not consume flow control
   690  		} else {
   691  			state = streamOutSendData // new data, requires flow control
   692  		}
   693  	case s.outclosed.shouldSend() && s.out.end == s.outmaxsent: // empty STREAM frame with FIN bit
   694  		state = streamOutSendMeta
   695  	case s.outopened.shouldSend(): // STREAM frame with no data
   696  		state = streamOutSendMeta
   697  	}
   698  	const mask = streamOutDone | streamOutSendMeta | streamOutSendData
   699  	return s.state.set(state, mask)
   700  }
   702  // handleData handles data received in a STREAM frame.
   703  func (s *Stream) handleData(off int64, b []byte, fin bool) error {
   704  	s.ingate.lock()
   705  	defer s.inUnlock()
   706  	end := off + int64(len(b))
   707  	if err := s.checkStreamBounds(end, fin); err != nil {
   708  		return err
   709  	}
   710  	if s.inclosed.isSet() || s.inresetcode != -1 {
   711  		// The user read-closed the stream, or the peer reset it.
   712  		// Either way, we can discard this frame.
   713  		return nil
   714  	}
   715  	if s.insize == -1 && end > s.in.end {
   716  		added := end - s.in.end
   717  		if err := s.conn.handleStreamBytesReceived(added); err != nil {
   718  			return err
   719  		}
   720  	}
   721  	s.in.writeAt(b, off)
   722  	s.inset.add(off, end)
   723  	if fin {
   724  		s.insize = end
   725  		// The peer has enough flow control window to send the entire stream.
   726  		s.insendmax.clear()
   727  	}
   728  	return nil
   729  }
   731  // handleReset handles a RESET_STREAM frame.
   732  func (s *Stream) handleReset(code uint64, finalSize int64) error {
   733  	s.ingate.lock()
   734  	defer s.inUnlock()
   735  	const fin = true
   736  	if err := s.checkStreamBounds(finalSize, fin); err != nil {
   737  		return err
   738  	}
   739  	if s.inresetcode != -1 {
   740  		// The stream was already reset.
   741  		return nil
   742  	}
   743  	if s.insize == -1 {
   744  		added := finalSize - s.in.end
   745  		if err := s.conn.handleStreamBytesReceived(added); err != nil {
   746  			return err
   747  		}
   748  	}
   749  	s.conn.handleStreamBytesReadOnLoop(finalSize - s.in.start)
   750  	s.in.discardBefore(s.in.end)
   751  	s.inresetcode = int64(code)
   752  	s.insize = finalSize
   753  	return nil
   754  }
   756  // checkStreamBounds validates the stream offset in a STREAM or RESET_STREAM frame.
   757  func (s *Stream) checkStreamBounds(end int64, fin bool) error {
   758  	if end > s.inwin {
   759  		// The peer sent us data past the maximum flow control window we gave them.
   760  		return localTransportError{
   761  			code:   errFlowControl,
   762  			reason: "stream flow control window exceeded",
   763  		}
   764  	}
   765  	if s.insize != -1 && end > s.insize {
   766  		// The peer sent us data past the final size of the stream they previously gave us.
   767  		return localTransportError{
   768  			code:   errFinalSize,
   769  			reason: "data received past end of stream",
   770  		}
   771  	}
   772  	if fin && s.insize != -1 && end != s.insize {
   773  		// The peer changed the final size of the stream.
   774  		return localTransportError{
   775  			code:   errFinalSize,
   776  			reason: "final size of stream changed",
   777  		}
   778  	}
   779  	if fin && end < s.in.end {
   780  		// The peer has previously sent us data past the final size.
   781  		return localTransportError{
   782  			code:   errFinalSize,
   783  			reason: "end of stream occurs before prior data",
   784  		}
   785  	}
   786  	return nil
   787  }
   789  // handleStopSending handles a STOP_SENDING frame.
   790  func (s *Stream) handleStopSending(code uint64) error {
   791  	// Peer requests that we reset this stream.
   792  	// https://www.rfc-editor.org/rfc/rfc9000#section-3.5-4
   793  	const userReset = false
   794  	s.resetInternal(code, userReset)
   795  	return nil
   796  }
   798  // handleMaxStreamData handles an update received in a MAX_STREAM_DATA frame.
   799  func (s *Stream) handleMaxStreamData(maxStreamData int64) error {
   800  	s.outgate.lock()
   801  	defer s.outUnlock()
   802  	if maxStreamData <= s.outwin {
   803  		return nil
   804  	}
   805  	if s.outflushed > s.outwin {
   806  		s.outunsent.add(s.outwin, min(maxStreamData, s.outflushed))
   807  	}
   808  	s.outwin = maxStreamData
   809  	if s.out.end > s.outwin {
   810  		// We've still got more data than flow control window.
   811  		s.outblocked.setUnsent()
   812  	} else {
   813  		s.outblocked.clear()
   814  	}
   815  	return nil
   816  }
   818  // ackOrLoss handles the fate of stream frames other than STREAM.
   819  func (s *Stream) ackOrLoss(pnum packetNumber, ftype byte, fate packetFate) {
   820  	// Frames which carry new information each time they are sent
   821  	// (MAX_STREAM_DATA, STREAM_DATA_BLOCKED) must only be marked
   822  	// as received if the most recent packet carrying this frame is acked.
   823  	//
   824  	// Frames which are always the same (STOP_SENDING, RESET_STREAM)
   825  	// can be marked as received if any packet carrying this frame is acked.
   826  	switch ftype {
   827  	case frameTypeResetStream:
   828  		s.outgate.lock()
   829  		s.outreset.ackOrLoss(pnum, fate)
   830  		s.outUnlock()
   831  	case frameTypeStopSending:
   832  		s.ingate.lock()
   833  		s.inclosed.ackOrLoss(pnum, fate)
   834  		s.inUnlock()
   835  	case frameTypeMaxStreamData:
   836  		s.ingate.lock()
   837  		s.insendmax.ackLatestOrLoss(pnum, fate)
   838  		s.inUnlock()
   839  	case frameTypeStreamDataBlocked:
   840  		s.outgate.lock()
   841  		s.outblocked.ackLatestOrLoss(pnum, fate)
   842  		s.outUnlock()
   843  	default:
   844  		panic("unhandled frame type")
   845  	}
   846  }
   848  // ackOrLossData handles the fate of a STREAM frame.
   849  func (s *Stream) ackOrLossData(pnum packetNumber, start, end int64, fin bool, fate packetFate) {
   850  	s.outgate.lock()
   851  	defer s.outUnlock()
   852  	s.outopened.ackOrLoss(pnum, fate)
   853  	if fin {
   854  		s.outclosed.ackOrLoss(pnum, fate)
   855  	}
   856  	if s.outreset.isSet() {
   857  		// If the stream has been reset, we don't care any more.
   858  		return
   859  	}
   860  	switch fate {
   861  	case packetAcked:
   862  		s.outacked.add(start, end)
   863  		s.outunsent.sub(start, end)
   864  		// If this ack is for data at the start of the send buffer, we can now discard it.
   865  		if s.outacked.contains(s.out.start) {
   866  			s.out.discardBefore(s.outacked[0].end)
   867  		}
   868  	case packetLost:
   869  		// Mark everything lost, but not previously acked, as needing retransmission.
   870  		// We do this by adding all the lost bytes to outunsent, and then
   871  		// removing everything already acked.
   872  		s.outunsent.add(start, end)
   873  		for _, a := range s.outacked {
   874  			s.outunsent.sub(a.start, a.end)
   875  		}
   876  	}
   877  }
   879  // appendInFramesLocked appends STOP_SENDING and MAX_STREAM_DATA frames
   880  // to the current packet.
   881  //
   882  // It returns true if no more frames need appending,
   883  // false if not everything fit in the current packet.
   884  func (s *Stream) appendInFramesLocked(w *packetWriter, pnum packetNumber, pto bool) bool {
   885  	if s.inclosed.shouldSendPTO(pto) {
   886  		// We don't currently have an API for setting the error code.
   887  		// Just send zero.
   888  		code := uint64(0)
   889  		if !w.appendStopSendingFrame(s.id, code) {
   890  			return false
   891  		}
   892  		s.inclosed.setSent(pnum)
   893  	}
   894  	// TODO: STOP_SENDING
   895  	if s.insendmax.shouldSendPTO(pto) {
   896  		// MAX_STREAM_DATA
   897  		maxStreamData := s.in.start + s.inmaxbuf
   898  		if !w.appendMaxStreamDataFrame(s.id, maxStreamData) {
   899  			return false
   900  		}
   901  		s.inwin = maxStreamData
   902  		s.insendmax.setSent(pnum)
   903  	}
   904  	return true
   905  }
   907  // appendOutFramesLocked appends RESET_STREAM, STREAM_DATA_BLOCKED, and STREAM frames
   908  // to the current packet.
   909  //
   910  // It returns true if no more frames need appending,
   911  // false if not everything fit in the current packet.
   912  func (s *Stream) appendOutFramesLocked(w *packetWriter, pnum packetNumber, pto bool) bool {
   913  	if s.outreset.isSet() {
   914  		// RESET_STREAM
   915  		if s.outreset.shouldSendPTO(pto) {
   916  			if !w.appendResetStreamFrame(s.id, s.outresetcode, min(s.outwin, s.out.end)) {
   917  				return false
   918  			}
   919  			s.outreset.setSent(pnum)
   920  			s.frameOpensStream(pnum)
   921  		}
   922  		return true
   923  	}
   924  	if s.outblocked.shouldSendPTO(pto) {
   926  		if !w.appendStreamDataBlockedFrame(s.id, s.outwin) {
   927  			return false
   928  		}
   929  		s.outblocked.setSent(pnum)
   930  		s.frameOpensStream(pnum)
   931  	}
   932  	for {
   933  		// STREAM
   934  		off, size := dataToSend(min(s.out.start, s.outwin), min(s.outflushed, s.outwin), s.outunsent, s.outacked, pto)
   935  		if end := off + size; end > s.outmaxsent {
   936  			// This will require connection-level flow control to send.
   937  			end = min(end, s.outmaxsent+s.conn.streams.outflow.avail())
   938  			end = max(end, off)
   939  			size = end - off
   940  		}
   941  		fin := s.outclosed.isSet() && off+size == s.out.end
   942  		shouldSend := size > 0 || // have data to send
   943  			s.outopened.shouldSendPTO(pto) || // should open the stream
   944  			(fin && s.outclosed.shouldSendPTO(pto)) // should close the stream
   945  		if !shouldSend {
   946  			return true
   947  		}
   948  		b, added := w.appendStreamFrame(s.id, off, int(size), fin)
   949  		if !added {
   950  			return false
   951  		}
   952  		s.out.copy(off, b)
   953  		end := off + int64(len(b))
   954  		if end > s.outmaxsent {
   955  			s.conn.streams.outflow.consume(end - s.outmaxsent)
   956  			s.outmaxsent = end
   957  		}
   958  		s.outunsent.sub(off, end)
   959  		s.frameOpensStream(pnum)
   960  		if fin {
   961  			s.outclosed.setSent(pnum)
   962  		}
   963  		if pto {
   964  			return true
   965  		}
   966  		if int64(len(b)) < size {
   967  			return false
   968  		}
   969  	}
   970  }
   972  // frameOpensStream records that we're sending a frame that will open the stream.
   973  //
   974  // If we don't have an acknowledgement from the peer for a previous frame opening the stream,
   975  // record this packet as being the latest one to open it.
   976  func (s *Stream) frameOpensStream(pnum packetNumber) {
   977  	if !s.outopened.isReceived() {
   978  		s.outopened.setSent(pnum)
   979  	}
   980  }
   982  // dataToSend returns the next range of data to send in a STREAM or CRYPTO_STREAM.
   983  func dataToSend(start, end int64, outunsent, outacked rangeset[int64], pto bool) (sendStart, size int64) {
   984  	switch {
   985  	case pto:
   986  		// On PTO, resend unacked data that fits in the probe packet.
   987  		// For simplicity, we send the range starting at s.out.start
   988  		// (which is definitely unacked, or else we would have discarded it)
   989  		// up to the next acked byte (if any).
   990  		//
   991  		// This may miss unacked data starting after that acked byte,
   992  		// but avoids resending data the peer has acked.
   993  		for _, r := range outacked {
   994  			if r.start > start {
   995  				return start, r.start - start
   996  			}
   997  		}
   998  		return start, end - start
   999  	case outunsent.numRanges() > 0:
  1000  		return outunsent.min(), outunsent[0].size()
  1001  	default:
  1002  		return end, 0
  1003  	}
  1004  }

View as plain text