...

Source file src/github.com/moby/spdystream/stream.go

Documentation: github.com/moby/spdystream

     1  /*
     2     Copyright 2014-2021 Docker Inc.
     3  
     4     Licensed under the Apache License, Version 2.0 (the "License");
     5     you may not use this file except in compliance with the License.
     6     You may obtain a copy of the License at
     7  
     8         http://www.apache.org/licenses/LICENSE-2.0
     9  
    10     Unless required by applicable law or agreed to in writing, software
    11     distributed under the License is distributed on an "AS IS" BASIS,
    12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13     See the License for the specific language governing permissions and
    14     limitations under the License.
    15  */
    16  
    17  package spdystream
    18  
    19  import (
    20  	"errors"
    21  	"fmt"
    22  	"io"
    23  	"net"
    24  	"net/http"
    25  	"sync"
    26  	"time"
    27  
    28  	"github.com/moby/spdystream/spdy"
    29  )
    30  
    31  var (
    32  	ErrUnreadPartialData = errors.New("unread partial data")
    33  )
    34  
    35  type Stream struct {
    36  	streamId  spdy.StreamId
    37  	parent    *Stream
    38  	conn      *Connection
    39  	startChan chan error
    40  
    41  	dataLock sync.RWMutex
    42  	dataChan chan []byte
    43  	unread   []byte
    44  
    45  	priority   uint8
    46  	headers    http.Header
    47  	headerChan chan http.Header
    48  	finishLock sync.Mutex
    49  	finished   bool
    50  	replyCond  *sync.Cond
    51  	replied    bool
    52  	closeLock  sync.Mutex
    53  	closeChan  chan bool
    54  }
    55  
    56  // WriteData writes data to stream, sending a dataframe per call
    57  func (s *Stream) WriteData(data []byte, fin bool) error {
    58  	s.waitWriteReply()
    59  	var flags spdy.DataFlags
    60  
    61  	if fin {
    62  		flags = spdy.DataFlagFin
    63  		s.finishLock.Lock()
    64  		if s.finished {
    65  			s.finishLock.Unlock()
    66  			return ErrWriteClosedStream
    67  		}
    68  		s.finished = true
    69  		s.finishLock.Unlock()
    70  	}
    71  
    72  	dataFrame := &spdy.DataFrame{
    73  		StreamId: s.streamId,
    74  		Flags:    flags,
    75  		Data:     data,
    76  	}
    77  
    78  	debugMessage("(%p) (%d) Writing data frame", s, s.streamId)
    79  	return s.conn.framer.WriteFrame(dataFrame)
    80  }
    81  
    82  // Write writes bytes to a stream, calling write data for each call.
    83  func (s *Stream) Write(data []byte) (n int, err error) {
    84  	err = s.WriteData(data, false)
    85  	if err == nil {
    86  		n = len(data)
    87  	}
    88  	return
    89  }
    90  
    91  // Read reads bytes from a stream, a single read will never get more
    92  // than what is sent on a single data frame, but a multiple calls to
    93  // read may get data from the same data frame.
    94  func (s *Stream) Read(p []byte) (n int, err error) {
    95  	if s.unread == nil {
    96  		select {
    97  		case <-s.closeChan:
    98  			return 0, io.EOF
    99  		case read, ok := <-s.dataChan:
   100  			if !ok {
   101  				return 0, io.EOF
   102  			}
   103  			s.unread = read
   104  		}
   105  	}
   106  	n = copy(p, s.unread)
   107  	if n < len(s.unread) {
   108  		s.unread = s.unread[n:]
   109  	} else {
   110  		s.unread = nil
   111  	}
   112  	return
   113  }
   114  
   115  // ReadData reads an entire data frame and returns the byte array
   116  // from the data frame.  If there is unread data from the result
   117  // of a Read call, this function will return an ErrUnreadPartialData.
   118  func (s *Stream) ReadData() ([]byte, error) {
   119  	debugMessage("(%p) Reading data from %d", s, s.streamId)
   120  	if s.unread != nil {
   121  		return nil, ErrUnreadPartialData
   122  	}
   123  	select {
   124  	case <-s.closeChan:
   125  		return nil, io.EOF
   126  	case read, ok := <-s.dataChan:
   127  		if !ok {
   128  			return nil, io.EOF
   129  		}
   130  		return read, nil
   131  	}
   132  }
   133  
   134  func (s *Stream) waitWriteReply() {
   135  	if s.replyCond != nil {
   136  		s.replyCond.L.Lock()
   137  		for !s.replied {
   138  			s.replyCond.Wait()
   139  		}
   140  		s.replyCond.L.Unlock()
   141  	}
   142  }
   143  
   144  // Wait waits for the stream to receive a reply.
   145  func (s *Stream) Wait() error {
   146  	return s.WaitTimeout(time.Duration(0))
   147  }
   148  
   149  // WaitTimeout waits for the stream to receive a reply or for timeout.
   150  // When the timeout is reached, ErrTimeout will be returned.
   151  func (s *Stream) WaitTimeout(timeout time.Duration) error {
   152  	var timeoutChan <-chan time.Time
   153  	if timeout > time.Duration(0) {
   154  		timeoutChan = time.After(timeout)
   155  	}
   156  
   157  	select {
   158  	case err := <-s.startChan:
   159  		if err != nil {
   160  			return err
   161  		}
   162  		break
   163  	case <-timeoutChan:
   164  		return ErrTimeout
   165  	}
   166  	return nil
   167  }
   168  
   169  // Close closes the stream by sending an empty data frame with the
   170  // finish flag set, indicating this side is finished with the stream.
   171  func (s *Stream) Close() error {
   172  	select {
   173  	case <-s.closeChan:
   174  		// Stream is now fully closed
   175  		s.conn.removeStream(s)
   176  	default:
   177  		break
   178  	}
   179  	return s.WriteData([]byte{}, true)
   180  }
   181  
   182  // Reset sends a reset frame, putting the stream into the fully closed state.
   183  func (s *Stream) Reset() error {
   184  	s.conn.removeStream(s)
   185  	return s.resetStream()
   186  }
   187  
   188  func (s *Stream) resetStream() error {
   189  	// Always call closeRemoteChannels, even if s.finished is already true.
   190  	// This makes it so that stream.Close() followed by stream.Reset() allows
   191  	// stream.Read() to unblock.
   192  	s.closeRemoteChannels()
   193  
   194  	s.finishLock.Lock()
   195  	if s.finished {
   196  		s.finishLock.Unlock()
   197  		return nil
   198  	}
   199  	s.finished = true
   200  	s.finishLock.Unlock()
   201  
   202  	resetFrame := &spdy.RstStreamFrame{
   203  		StreamId: s.streamId,
   204  		Status:   spdy.Cancel,
   205  	}
   206  	return s.conn.framer.WriteFrame(resetFrame)
   207  }
   208  
   209  // CreateSubStream creates a stream using the current as the parent
   210  func (s *Stream) CreateSubStream(headers http.Header, fin bool) (*Stream, error) {
   211  	return s.conn.CreateStream(headers, s, fin)
   212  }
   213  
   214  // SetPriority sets the stream priority, does not affect the
   215  // remote priority of this stream after Open has been called.
   216  // Valid values are 0 through 7, 0 being the highest priority
   217  // and 7 the lowest.
   218  func (s *Stream) SetPriority(priority uint8) {
   219  	s.priority = priority
   220  }
   221  
   222  // SendHeader sends a header frame across the stream
   223  func (s *Stream) SendHeader(headers http.Header, fin bool) error {
   224  	return s.conn.sendHeaders(headers, s, fin)
   225  }
   226  
   227  // SendReply sends a reply on a stream, only valid to be called once
   228  // when handling a new stream
   229  func (s *Stream) SendReply(headers http.Header, fin bool) error {
   230  	if s.replyCond == nil {
   231  		return errors.New("cannot reply on initiated stream")
   232  	}
   233  	s.replyCond.L.Lock()
   234  	defer s.replyCond.L.Unlock()
   235  	if s.replied {
   236  		return nil
   237  	}
   238  
   239  	err := s.conn.sendReply(headers, s, fin)
   240  	if err != nil {
   241  		return err
   242  	}
   243  
   244  	s.replied = true
   245  	s.replyCond.Broadcast()
   246  	return nil
   247  }
   248  
   249  // Refuse sends a reset frame with the status refuse, only
   250  // valid to be called once when handling a new stream.  This
   251  // may be used to indicate that a stream is not allowed
   252  // when http status codes are not being used.
   253  func (s *Stream) Refuse() error {
   254  	if s.replied {
   255  		return nil
   256  	}
   257  	s.replied = true
   258  	return s.conn.sendReset(spdy.RefusedStream, s)
   259  }
   260  
   261  // Cancel sends a reset frame with the status canceled. This
   262  // can be used at any time by the creator of the Stream to
   263  // indicate the stream is no longer needed.
   264  func (s *Stream) Cancel() error {
   265  	return s.conn.sendReset(spdy.Cancel, s)
   266  }
   267  
   268  // ReceiveHeader receives a header sent on the other side
   269  // of the stream.  This function will block until a header
   270  // is received or stream is closed.
   271  func (s *Stream) ReceiveHeader() (http.Header, error) {
   272  	select {
   273  	case <-s.closeChan:
   274  		break
   275  	case header, ok := <-s.headerChan:
   276  		if !ok {
   277  			return nil, fmt.Errorf("header chan closed")
   278  		}
   279  		return header, nil
   280  	}
   281  	return nil, fmt.Errorf("stream closed")
   282  }
   283  
   284  // Parent returns the parent stream
   285  func (s *Stream) Parent() *Stream {
   286  	return s.parent
   287  }
   288  
   289  // Headers returns the headers used to create the stream
   290  func (s *Stream) Headers() http.Header {
   291  	return s.headers
   292  }
   293  
   294  // String returns the string version of stream using the
   295  // streamId to uniquely identify the stream
   296  func (s *Stream) String() string {
   297  	return fmt.Sprintf("stream:%d", s.streamId)
   298  }
   299  
   300  // Identifier returns a 32 bit identifier for the stream
   301  func (s *Stream) Identifier() uint32 {
   302  	return uint32(s.streamId)
   303  }
   304  
   305  // IsFinished returns whether the stream has finished
   306  // sending data
   307  func (s *Stream) IsFinished() bool {
   308  	return s.finished
   309  }
   310  
   311  // Implement net.Conn interface
   312  
   313  func (s *Stream) LocalAddr() net.Addr {
   314  	return s.conn.conn.LocalAddr()
   315  }
   316  
   317  func (s *Stream) RemoteAddr() net.Addr {
   318  	return s.conn.conn.RemoteAddr()
   319  }
   320  
   321  // TODO set per stream values instead of connection-wide
   322  
   323  func (s *Stream) SetDeadline(t time.Time) error {
   324  	return s.conn.conn.SetDeadline(t)
   325  }
   326  
   327  func (s *Stream) SetReadDeadline(t time.Time) error {
   328  	return s.conn.conn.SetReadDeadline(t)
   329  }
   330  
   331  func (s *Stream) SetWriteDeadline(t time.Time) error {
   332  	return s.conn.conn.SetWriteDeadline(t)
   333  }
   334  
   335  func (s *Stream) closeRemoteChannels() {
   336  	s.closeLock.Lock()
   337  	defer s.closeLock.Unlock()
   338  	select {
   339  	case <-s.closeChan:
   340  	default:
   341  		close(s.closeChan)
   342  	}
   343  }
   344  

View as plain text