...
1
16
17 package spdystream
18
19 import (
20 "errors"
21 "fmt"
22 "io"
23 "net"
24 "net/http"
25 "sync"
26 "time"
27
28 "github.com/moby/spdystream/spdy"
29 )
30
31 var (
32 ErrUnreadPartialData = errors.New("unread partial data")
33 )
34
35 type Stream struct {
36 streamId spdy.StreamId
37 parent *Stream
38 conn *Connection
39 startChan chan error
40
41 dataLock sync.RWMutex
42 dataChan chan []byte
43 unread []byte
44
45 priority uint8
46 headers http.Header
47 headerChan chan http.Header
48 finishLock sync.Mutex
49 finished bool
50 replyCond *sync.Cond
51 replied bool
52 closeLock sync.Mutex
53 closeChan chan bool
54 }
55
56
57 func (s *Stream) WriteData(data []byte, fin bool) error {
58 s.waitWriteReply()
59 var flags spdy.DataFlags
60
61 if fin {
62 flags = spdy.DataFlagFin
63 s.finishLock.Lock()
64 if s.finished {
65 s.finishLock.Unlock()
66 return ErrWriteClosedStream
67 }
68 s.finished = true
69 s.finishLock.Unlock()
70 }
71
72 dataFrame := &spdy.DataFrame{
73 StreamId: s.streamId,
74 Flags: flags,
75 Data: data,
76 }
77
78 debugMessage("(%p) (%d) Writing data frame", s, s.streamId)
79 return s.conn.framer.WriteFrame(dataFrame)
80 }
81
82
83 func (s *Stream) Write(data []byte) (n int, err error) {
84 err = s.WriteData(data, false)
85 if err == nil {
86 n = len(data)
87 }
88 return
89 }
90
91
92
93
94 func (s *Stream) Read(p []byte) (n int, err error) {
95 if s.unread == nil {
96 select {
97 case <-s.closeChan:
98 return 0, io.EOF
99 case read, ok := <-s.dataChan:
100 if !ok {
101 return 0, io.EOF
102 }
103 s.unread = read
104 }
105 }
106 n = copy(p, s.unread)
107 if n < len(s.unread) {
108 s.unread = s.unread[n:]
109 } else {
110 s.unread = nil
111 }
112 return
113 }
114
115
116
117
118 func (s *Stream) ReadData() ([]byte, error) {
119 debugMessage("(%p) Reading data from %d", s, s.streamId)
120 if s.unread != nil {
121 return nil, ErrUnreadPartialData
122 }
123 select {
124 case <-s.closeChan:
125 return nil, io.EOF
126 case read, ok := <-s.dataChan:
127 if !ok {
128 return nil, io.EOF
129 }
130 return read, nil
131 }
132 }
133
134 func (s *Stream) waitWriteReply() {
135 if s.replyCond != nil {
136 s.replyCond.L.Lock()
137 for !s.replied {
138 s.replyCond.Wait()
139 }
140 s.replyCond.L.Unlock()
141 }
142 }
143
144
145 func (s *Stream) Wait() error {
146 return s.WaitTimeout(time.Duration(0))
147 }
148
149
150
151 func (s *Stream) WaitTimeout(timeout time.Duration) error {
152 var timeoutChan <-chan time.Time
153 if timeout > time.Duration(0) {
154 timeoutChan = time.After(timeout)
155 }
156
157 select {
158 case err := <-s.startChan:
159 if err != nil {
160 return err
161 }
162 break
163 case <-timeoutChan:
164 return ErrTimeout
165 }
166 return nil
167 }
168
169
170
171 func (s *Stream) Close() error {
172 select {
173 case <-s.closeChan:
174
175 s.conn.removeStream(s)
176 default:
177 break
178 }
179 return s.WriteData([]byte{}, true)
180 }
181
182
183 func (s *Stream) Reset() error {
184 s.conn.removeStream(s)
185 return s.resetStream()
186 }
187
188 func (s *Stream) resetStream() error {
189
190
191
192 s.closeRemoteChannels()
193
194 s.finishLock.Lock()
195 if s.finished {
196 s.finishLock.Unlock()
197 return nil
198 }
199 s.finished = true
200 s.finishLock.Unlock()
201
202 resetFrame := &spdy.RstStreamFrame{
203 StreamId: s.streamId,
204 Status: spdy.Cancel,
205 }
206 return s.conn.framer.WriteFrame(resetFrame)
207 }
208
209
210 func (s *Stream) CreateSubStream(headers http.Header, fin bool) (*Stream, error) {
211 return s.conn.CreateStream(headers, s, fin)
212 }
213
214
215
216
217
218 func (s *Stream) SetPriority(priority uint8) {
219 s.priority = priority
220 }
221
222
223 func (s *Stream) SendHeader(headers http.Header, fin bool) error {
224 return s.conn.sendHeaders(headers, s, fin)
225 }
226
227
228
229 func (s *Stream) SendReply(headers http.Header, fin bool) error {
230 if s.replyCond == nil {
231 return errors.New("cannot reply on initiated stream")
232 }
233 s.replyCond.L.Lock()
234 defer s.replyCond.L.Unlock()
235 if s.replied {
236 return nil
237 }
238
239 err := s.conn.sendReply(headers, s, fin)
240 if err != nil {
241 return err
242 }
243
244 s.replied = true
245 s.replyCond.Broadcast()
246 return nil
247 }
248
249
250
251
252
253 func (s *Stream) Refuse() error {
254 if s.replied {
255 return nil
256 }
257 s.replied = true
258 return s.conn.sendReset(spdy.RefusedStream, s)
259 }
260
261
262
263
264 func (s *Stream) Cancel() error {
265 return s.conn.sendReset(spdy.Cancel, s)
266 }
267
268
269
270
271 func (s *Stream) ReceiveHeader() (http.Header, error) {
272 select {
273 case <-s.closeChan:
274 break
275 case header, ok := <-s.headerChan:
276 if !ok {
277 return nil, fmt.Errorf("header chan closed")
278 }
279 return header, nil
280 }
281 return nil, fmt.Errorf("stream closed")
282 }
283
284
285 func (s *Stream) Parent() *Stream {
286 return s.parent
287 }
288
289
290 func (s *Stream) Headers() http.Header {
291 return s.headers
292 }
293
294
295
296 func (s *Stream) String() string {
297 return fmt.Sprintf("stream:%d", s.streamId)
298 }
299
300
301 func (s *Stream) Identifier() uint32 {
302 return uint32(s.streamId)
303 }
304
305
306
307 func (s *Stream) IsFinished() bool {
308 return s.finished
309 }
310
311
312
313 func (s *Stream) LocalAddr() net.Addr {
314 return s.conn.conn.LocalAddr()
315 }
316
317 func (s *Stream) RemoteAddr() net.Addr {
318 return s.conn.conn.RemoteAddr()
319 }
320
321
322
323 func (s *Stream) SetDeadline(t time.Time) error {
324 return s.conn.conn.SetDeadline(t)
325 }
326
327 func (s *Stream) SetReadDeadline(t time.Time) error {
328 return s.conn.conn.SetReadDeadline(t)
329 }
330
331 func (s *Stream) SetWriteDeadline(t time.Time) error {
332 return s.conn.conn.SetWriteDeadline(t)
333 }
334
335 func (s *Stream) closeRemoteChannels() {
336 s.closeLock.Lock()
337 defer s.closeLock.Unlock()
338 select {
339 case <-s.closeChan:
340 default:
341 close(s.closeChan)
342 }
343 }
344
View as plain text