1
2
3
4
5
6
7 package quic
8
9 import (
10 "context"
11 "errors"
12 "fmt"
13 "io"
14 "math"
15 )
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32 type Stream struct {
33 id streamID
34 conn *Conn
35
36
37
38 inctx context.Context
39 outctx context.Context
40
41
42
43
44
45 ingate gate
46 in pipe
47 inwin int64
48 insendmax sentVal
49 inmaxbuf int64
50 insize int64
51 inset rangeset[int64]
52 inclosed sentVal
53 inresetcode int64
54
55
56
57
58
59
60 outgate gate
61 out pipe
62 outflushed int64
63 outwin int64
64 outmaxsent int64
65 outmaxbuf int64
66 outunsent rangeset[int64]
67 outacked rangeset[int64]
68 outopened sentVal
69 outclosed sentVal
70 outblocked sentVal
71 outreset sentVal
72 outresetcode uint64
73 outdone chan struct{}
74
75
76 inbuf []byte
77 inbufoff int
78 outbuf []byte
79 outbufoff int
80
81
82
83
84
85
86
87
88
89
90 state atomicBits[streamState]
91
92 prev, next *Stream
93 }
94
95 type streamState uint32
96
97 const (
98
99
100
101 streamInSendMeta = streamState(1 << iota)
102
103
104
105
106
107
108
109 streamOutSendMeta
110 streamOutSendData
111
112
113
114
115 streamInDone
116 streamOutDone
117
118
119 streamConnRemoved
120
121
122
123 streamQueueMeta
124 streamQueueData
125 )
126
127 type streamQueue int
128
129 const (
130 noQueue = streamQueue(iota)
131 metaQueue
132 dataQueue
133 )
134
135
136
137
138 const streamResetByConnClose = math.MaxInt64
139
140
141 func (s streamState) wantQueue() streamQueue {
142 switch {
143 case s&(streamInSendMeta|streamOutSendMeta) != 0:
144 return metaQueue
145 case s&(streamInDone|streamOutDone|streamConnRemoved) == streamInDone|streamOutDone:
146 return metaQueue
147 case s&streamOutSendData != 0:
148
149
150
151 return dataQueue
152 }
153 return noQueue
154 }
155
156
157 func (s streamState) inQueue() streamQueue {
158 switch {
159 case s&streamQueueMeta != 0:
160 return metaQueue
161 case s&streamQueueData != 0:
162 return dataQueue
163 }
164 return noQueue
165 }
166
167
168
169
170
171
172
173 func newStream(c *Conn, id streamID) *Stream {
174 s := &Stream{
175 conn: c,
176 id: id,
177 insize: -1,
178 inresetcode: -1,
179 ingate: newLockedGate(),
180 outgate: newLockedGate(),
181 inctx: context.Background(),
182 outctx: context.Background(),
183 }
184 if !s.IsReadOnly() {
185 s.outdone = make(chan struct{})
186 }
187 return s
188 }
189
190
191
192
193 func (s *Stream) SetReadContext(ctx context.Context) {
194 s.inctx = ctx
195 }
196
197
198
199
200
201
202 func (s *Stream) SetWriteContext(ctx context.Context) {
203 s.outctx = ctx
204 }
205
206
207
208 func (s *Stream) IsReadOnly() bool {
209 return s.id.streamType() == uniStream && s.id.initiator() != s.conn.side
210 }
211
212
213
214 func (s *Stream) IsWriteOnly() bool {
215 return s.id.streamType() == uniStream && s.id.initiator() == s.conn.side
216 }
217
218
219
220
221
222
223
224
225
226
227
228 func (s *Stream) Read(b []byte) (n int, err error) {
229 if s.IsWriteOnly() {
230 return 0, errors.New("read from write-only stream")
231 }
232 if len(s.inbuf) > s.inbufoff {
233
234
235 n = copy(b, s.inbuf[s.inbufoff:])
236 s.inbufoff += n
237 return n, nil
238 }
239 if err := s.ingate.waitAndLock(s.inctx, s.conn.testHooks); err != nil {
240 return 0, err
241 }
242 if s.inbufoff > 0 {
243
244 s.in.discardBefore(s.in.start + int64(s.inbufoff))
245 s.inbufoff = 0
246 s.inbuf = nil
247 }
248
249
250
251 var bytesRead int64
252 defer func() {
253 s.inUnlock()
254 s.conn.handleStreamBytesReadOffLoop(bytesRead)
255 }()
256 if s.inresetcode != -1 {
257 return 0, fmt.Errorf("stream reset by peer: %w", StreamErrorCode(s.inresetcode))
258 }
259 if s.inclosed.isSet() {
260 return 0, errors.New("read from closed stream")
261 }
262 if s.insize == s.in.start {
263 return 0, io.EOF
264 }
265
266 if len(s.inset) < 1 || s.inset[0].start != 0 || s.inset[0].end <= s.in.start {
267 panic("BUG: inconsistent input stream state")
268 }
269 if size := int(s.inset[0].end - s.in.start); size < len(b) {
270 b = b[:size]
271 }
272 bytesRead = int64(len(b))
273 start := s.in.start
274 end := start + int64(len(b))
275 s.in.copy(start, b)
276 s.in.discardBefore(end)
277 if end == s.insize {
278
279
280 return len(b), io.EOF
281 }
282 if len(s.inset) > 0 && s.inset[0].start <= s.in.start && s.inset[0].end > s.in.start {
283
284
285 s.inbuf = s.in.peek(s.inset[0].end - s.in.start)
286 bytesRead += int64(len(s.inbuf))
287 }
288 if s.insize == -1 || s.insize > s.inwin {
289 newWindow := s.in.start + int64(len(s.inbuf)) + s.inmaxbuf
290 addedWindow := newWindow - s.inwin
291 if shouldUpdateFlowControl(s.inmaxbuf, addedWindow) {
292
293 s.insendmax.setUnsent()
294 }
295 }
296 return len(b), nil
297 }
298
299
300
301
302 func (s *Stream) ReadByte() (byte, error) {
303 if len(s.inbuf) > s.inbufoff {
304 b := s.inbuf[s.inbufoff]
305 s.inbufoff++
306 return b, nil
307 }
308 var b [1]byte
309 n, err := s.Read(b[:])
310 if n > 0 {
311 return b[0], err
312 }
313 return 0, err
314 }
315
316
317
318
319
320 func shouldUpdateFlowControl(maxWindow, addedWindow int64) bool {
321 return addedWindow >= maxWindow/8
322 }
323
324
325
326
327
328
329 func (s *Stream) Write(b []byte) (n int, err error) {
330 if s.IsReadOnly() {
331 return 0, errors.New("write to read-only stream")
332 }
333 if len(b) > 0 && len(s.outbuf)-s.outbufoff >= len(b) {
334
335 copy(s.outbuf[s.outbufoff:], b)
336 s.outbufoff += len(b)
337 return len(b), nil
338 }
339 canWrite := s.outgate.lock()
340 s.flushFastOutputBuffer()
341 for {
342
343
344
345 if len(b) > 0 && !canWrite {
346
347 s.outUnlock()
348 if err := s.outgate.waitAndLock(s.outctx, s.conn.testHooks); err != nil {
349 return n, err
350 }
351
352
353
354 }
355 if s.outreset.isSet() {
356 s.outUnlock()
357 return n, errors.New("write to reset stream")
358 }
359 if s.outclosed.isSet() {
360 s.outUnlock()
361 return n, errors.New("write to closed stream")
362 }
363 if len(b) == 0 {
364 break
365 }
366
367
368 lim := s.out.start + s.outmaxbuf
369
370
371 nn := min(int64(len(b)), lim-s.out.end)
372
373 s.out.writeAt(b[:nn], s.out.end)
374 b = b[nn:]
375 n += int(nn)
376
377
378
379
380
381
382
383
384
385 const autoFlushSize = smallestMaxDatagramSize - 1 - connIDLen - 1 - aeadOverhead
386 shouldFlush := s.out.end >= s.outwin ||
387 s.out.end >= lim ||
388 (s.out.end-s.outflushed) >= autoFlushSize
389 if shouldFlush {
390 s.flushLocked()
391 }
392 if s.out.end > s.outwin {
393
394
395 s.outblocked.set()
396 }
397
398 canWrite = false
399 }
400 if lim := s.out.start + s.outmaxbuf - s.out.end - 1; lim > 0 {
401
402
403
404
405
406
407
408
409
410
411
412 s.outbuf = s.out.availableBuffer()
413 if int64(len(s.outbuf)) > lim {
414 s.outbuf = s.outbuf[:lim]
415 }
416 }
417 s.outUnlock()
418 return n, nil
419 }
420
421
422 func (s *Stream) WriteByte(c byte) error {
423 if s.outbufoff < len(s.outbuf) {
424 s.outbuf[s.outbufoff] = c
425 s.outbufoff++
426 return nil
427 }
428 b := [1]byte{c}
429 _, err := s.Write(b[:])
430 return err
431 }
432
433 func (s *Stream) flushFastOutputBuffer() {
434 if s.outbuf == nil {
435 return
436 }
437
438
439
440 s.out.end += int64(s.outbufoff)
441 s.outbuf = nil
442 s.outbufoff = 0
443 }
444
445
446
447
448 func (s *Stream) Flush() {
449 s.outgate.lock()
450 defer s.outUnlock()
451 s.flushLocked()
452 }
453
454 func (s *Stream) flushLocked() {
455 s.flushFastOutputBuffer()
456 s.outopened.set()
457 if s.outflushed < s.outwin {
458 s.outunsent.add(s.outflushed, min(s.outwin, s.out.end))
459 }
460 s.outflushed = s.out.end
461 }
462
463
464
465
466
467
468
469
470
471 func (s *Stream) Close() error {
472 s.CloseRead()
473 if s.IsReadOnly() {
474 return nil
475 }
476 s.CloseWrite()
477
478 if err := s.conn.waitOnDone(s.outctx, s.outdone); err != nil {
479 return err
480 }
481 s.outgate.lock()
482 defer s.outUnlock()
483 if s.outclosed.isReceived() && s.outacked.isrange(0, s.out.end) {
484 return nil
485 }
486 return errors.New("stream reset")
487 }
488
489
490
491
492
493
494
495 func (s *Stream) CloseRead() {
496 if s.IsWriteOnly() {
497 return
498 }
499 s.ingate.lock()
500 if s.inset.isrange(0, s.insize) || s.inresetcode != -1 {
501
502
503
504 s.inclosed.setReceived()
505 } else {
506 s.inclosed.set()
507 }
508 discarded := s.in.end - s.in.start
509 s.in.discardBefore(s.in.end)
510 s.inUnlock()
511 s.conn.handleStreamBytesReadOffLoop(discarded)
512 }
513
514
515
516
517
518
519
520 func (s *Stream) CloseWrite() {
521 if s.IsReadOnly() {
522 return
523 }
524 s.outgate.lock()
525 defer s.outUnlock()
526 s.outclosed.set()
527 s.flushLocked()
528 }
529
530
531
532
533
534
535
536
537
538
539
540
541 func (s *Stream) Reset(code uint64) {
542 const userClosed = true
543 s.resetInternal(code, userClosed)
544 }
545
546
547
548
549
550 func (s *Stream) resetInternal(code uint64, userClosed bool) {
551 s.outgate.lock()
552 defer s.outUnlock()
553 if s.IsReadOnly() {
554 return
555 }
556 if userClosed {
557
558 s.outclosed.set()
559 }
560 if s.outreset.isSet() {
561 return
562 }
563 if code > maxVarint {
564 code = maxVarint
565 }
566
567
568
569 s.outreset.set()
570 s.outresetcode = code
571 s.outbuf = nil
572 s.outbufoff = 0
573 s.out.discardBefore(s.out.end)
574 s.outunsent = rangeset[int64]{}
575 s.outblocked.clear()
576 }
577
578
579 func (s *Stream) connHasClosed() {
580
581
582
583 localClose := s.conn.lifetime.state == connStateClosing
584
585 s.ingate.lock()
586 if !s.inset.isrange(0, s.insize) && s.inresetcode == -1 {
587 if localClose {
588 s.inclosed.set()
589 } else {
590 s.inresetcode = streamResetByConnClose
591 }
592 }
593 s.inUnlock()
594
595 s.outgate.lock()
596 if localClose {
597 s.outclosed.set()
598 }
599 s.outreset.set()
600 s.outUnlock()
601 }
602
603
604
605
606
607 func (s *Stream) inUnlock() {
608 state := s.inUnlockNoQueue()
609 s.conn.maybeQueueStreamForSend(s, state)
610 }
611
612
613
614 func (s *Stream) inUnlockNoQueue() streamState {
615 nextByte := s.in.start + int64(len(s.inbuf))
616 canRead := s.inset.contains(nextByte) ||
617 s.insize == s.in.start+int64(len(s.inbuf)) ||
618 s.inresetcode != -1 ||
619 s.inclosed.isSet()
620 defer s.ingate.unlock(canRead)
621 var state streamState
622 switch {
623 case s.IsWriteOnly():
624 state = streamInDone
625 case s.inresetcode != -1:
626 fallthrough
627 case s.in.start == s.insize:
628
629
630 if s.inclosed.isSet() {
631 state = streamInDone
632 }
633 case s.insendmax.shouldSend():
634 state = streamInSendMeta
635 case s.inclosed.shouldSend():
636 state = streamInSendMeta
637 }
638 const mask = streamInDone | streamInSendMeta
639 return s.state.set(state, mask)
640 }
641
642
643
644
645
646 func (s *Stream) outUnlock() {
647 state := s.outUnlockNoQueue()
648 s.conn.maybeQueueStreamForSend(s, state)
649 }
650
651
652
653 func (s *Stream) outUnlockNoQueue() streamState {
654 isDone := s.outclosed.isReceived() && s.outacked.isrange(0, s.out.end) ||
655 s.outreset.isSet()
656 if isDone {
657 select {
658 case <-s.outdone:
659 default:
660 if !s.IsReadOnly() {
661 close(s.outdone)
662 }
663 }
664 }
665 lim := s.out.start + s.outmaxbuf
666 canWrite := lim > s.out.end ||
667 s.outclosed.isSet() ||
668 s.outreset.isSet()
669 defer s.outgate.unlock(canWrite)
670 var state streamState
671 switch {
672 case s.IsReadOnly():
673 state = streamOutDone
674 case s.outclosed.isReceived() && s.outacked.isrange(0, s.out.end):
675 fallthrough
676 case s.outreset.isReceived():
677
678
679 if s.outclosed.isSet() {
680 state = streamOutDone
681 }
682 case s.outreset.shouldSend():
683 state = streamOutSendMeta
684 case s.outreset.isSet():
685 case s.outblocked.shouldSend():
686 state = streamOutSendMeta
687 case len(s.outunsent) > 0:
688 if s.outunsent.min() < s.outmaxsent {
689 state = streamOutSendMeta
690 } else {
691 state = streamOutSendData
692 }
693 case s.outclosed.shouldSend() && s.out.end == s.outmaxsent:
694 state = streamOutSendMeta
695 case s.outopened.shouldSend():
696 state = streamOutSendMeta
697 }
698 const mask = streamOutDone | streamOutSendMeta | streamOutSendData
699 return s.state.set(state, mask)
700 }
701
702
703 func (s *Stream) handleData(off int64, b []byte, fin bool) error {
704 s.ingate.lock()
705 defer s.inUnlock()
706 end := off + int64(len(b))
707 if err := s.checkStreamBounds(end, fin); err != nil {
708 return err
709 }
710 if s.inclosed.isSet() || s.inresetcode != -1 {
711
712
713 return nil
714 }
715 if s.insize == -1 && end > s.in.end {
716 added := end - s.in.end
717 if err := s.conn.handleStreamBytesReceived(added); err != nil {
718 return err
719 }
720 }
721 s.in.writeAt(b, off)
722 s.inset.add(off, end)
723 if fin {
724 s.insize = end
725
726 s.insendmax.clear()
727 }
728 return nil
729 }
730
731
732 func (s *Stream) handleReset(code uint64, finalSize int64) error {
733 s.ingate.lock()
734 defer s.inUnlock()
735 const fin = true
736 if err := s.checkStreamBounds(finalSize, fin); err != nil {
737 return err
738 }
739 if s.inresetcode != -1 {
740
741 return nil
742 }
743 if s.insize == -1 {
744 added := finalSize - s.in.end
745 if err := s.conn.handleStreamBytesReceived(added); err != nil {
746 return err
747 }
748 }
749 s.conn.handleStreamBytesReadOnLoop(finalSize - s.in.start)
750 s.in.discardBefore(s.in.end)
751 s.inresetcode = int64(code)
752 s.insize = finalSize
753 return nil
754 }
755
756
757 func (s *Stream) checkStreamBounds(end int64, fin bool) error {
758 if end > s.inwin {
759
760 return localTransportError{
761 code: errFlowControl,
762 reason: "stream flow control window exceeded",
763 }
764 }
765 if s.insize != -1 && end > s.insize {
766
767 return localTransportError{
768 code: errFinalSize,
769 reason: "data received past end of stream",
770 }
771 }
772 if fin && s.insize != -1 && end != s.insize {
773
774 return localTransportError{
775 code: errFinalSize,
776 reason: "final size of stream changed",
777 }
778 }
779 if fin && end < s.in.end {
780
781 return localTransportError{
782 code: errFinalSize,
783 reason: "end of stream occurs before prior data",
784 }
785 }
786 return nil
787 }
788
789
790 func (s *Stream) handleStopSending(code uint64) error {
791
792
793 const userReset = false
794 s.resetInternal(code, userReset)
795 return nil
796 }
797
798
799 func (s *Stream) handleMaxStreamData(maxStreamData int64) error {
800 s.outgate.lock()
801 defer s.outUnlock()
802 if maxStreamData <= s.outwin {
803 return nil
804 }
805 if s.outflushed > s.outwin {
806 s.outunsent.add(s.outwin, min(maxStreamData, s.outflushed))
807 }
808 s.outwin = maxStreamData
809 if s.out.end > s.outwin {
810
811 s.outblocked.setUnsent()
812 } else {
813 s.outblocked.clear()
814 }
815 return nil
816 }
817
818
819 func (s *Stream) ackOrLoss(pnum packetNumber, ftype byte, fate packetFate) {
820
821
822
823
824
825
826 switch ftype {
827 case frameTypeResetStream:
828 s.outgate.lock()
829 s.outreset.ackOrLoss(pnum, fate)
830 s.outUnlock()
831 case frameTypeStopSending:
832 s.ingate.lock()
833 s.inclosed.ackOrLoss(pnum, fate)
834 s.inUnlock()
835 case frameTypeMaxStreamData:
836 s.ingate.lock()
837 s.insendmax.ackLatestOrLoss(pnum, fate)
838 s.inUnlock()
839 case frameTypeStreamDataBlocked:
840 s.outgate.lock()
841 s.outblocked.ackLatestOrLoss(pnum, fate)
842 s.outUnlock()
843 default:
844 panic("unhandled frame type")
845 }
846 }
847
848
849 func (s *Stream) ackOrLossData(pnum packetNumber, start, end int64, fin bool, fate packetFate) {
850 s.outgate.lock()
851 defer s.outUnlock()
852 s.outopened.ackOrLoss(pnum, fate)
853 if fin {
854 s.outclosed.ackOrLoss(pnum, fate)
855 }
856 if s.outreset.isSet() {
857
858 return
859 }
860 switch fate {
861 case packetAcked:
862 s.outacked.add(start, end)
863 s.outunsent.sub(start, end)
864
865 if s.outacked.contains(s.out.start) {
866 s.out.discardBefore(s.outacked[0].end)
867 }
868 case packetLost:
869
870
871
872 s.outunsent.add(start, end)
873 for _, a := range s.outacked {
874 s.outunsent.sub(a.start, a.end)
875 }
876 }
877 }
878
879
880
881
882
883
884 func (s *Stream) appendInFramesLocked(w *packetWriter, pnum packetNumber, pto bool) bool {
885 if s.inclosed.shouldSendPTO(pto) {
886
887
888 code := uint64(0)
889 if !w.appendStopSendingFrame(s.id, code) {
890 return false
891 }
892 s.inclosed.setSent(pnum)
893 }
894
895 if s.insendmax.shouldSendPTO(pto) {
896
897 maxStreamData := s.in.start + s.inmaxbuf
898 if !w.appendMaxStreamDataFrame(s.id, maxStreamData) {
899 return false
900 }
901 s.inwin = maxStreamData
902 s.insendmax.setSent(pnum)
903 }
904 return true
905 }
906
907
908
909
910
911
912 func (s *Stream) appendOutFramesLocked(w *packetWriter, pnum packetNumber, pto bool) bool {
913 if s.outreset.isSet() {
914
915 if s.outreset.shouldSendPTO(pto) {
916 if !w.appendResetStreamFrame(s.id, s.outresetcode, min(s.outwin, s.out.end)) {
917 return false
918 }
919 s.outreset.setSent(pnum)
920 s.frameOpensStream(pnum)
921 }
922 return true
923 }
924 if s.outblocked.shouldSendPTO(pto) {
925
926 if !w.appendStreamDataBlockedFrame(s.id, s.outwin) {
927 return false
928 }
929 s.outblocked.setSent(pnum)
930 s.frameOpensStream(pnum)
931 }
932 for {
933
934 off, size := dataToSend(min(s.out.start, s.outwin), min(s.outflushed, s.outwin), s.outunsent, s.outacked, pto)
935 if end := off + size; end > s.outmaxsent {
936
937 end = min(end, s.outmaxsent+s.conn.streams.outflow.avail())
938 end = max(end, off)
939 size = end - off
940 }
941 fin := s.outclosed.isSet() && off+size == s.out.end
942 shouldSend := size > 0 ||
943 s.outopened.shouldSendPTO(pto) ||
944 (fin && s.outclosed.shouldSendPTO(pto))
945 if !shouldSend {
946 return true
947 }
948 b, added := w.appendStreamFrame(s.id, off, int(size), fin)
949 if !added {
950 return false
951 }
952 s.out.copy(off, b)
953 end := off + int64(len(b))
954 if end > s.outmaxsent {
955 s.conn.streams.outflow.consume(end - s.outmaxsent)
956 s.outmaxsent = end
957 }
958 s.outunsent.sub(off, end)
959 s.frameOpensStream(pnum)
960 if fin {
961 s.outclosed.setSent(pnum)
962 }
963 if pto {
964 return true
965 }
966 if int64(len(b)) < size {
967 return false
968 }
969 }
970 }
971
972
973
974
975
976 func (s *Stream) frameOpensStream(pnum packetNumber) {
977 if !s.outopened.isReceived() {
978 s.outopened.setSent(pnum)
979 }
980 }
981
982
983 func dataToSend(start, end int64, outunsent, outacked rangeset[int64], pto bool) (sendStart, size int64) {
984 switch {
985 case pto:
986
987
988
989
990
991
992
993 for _, r := range outacked {
994 if r.start > start {
995 return start, r.start - start
996 }
997 }
998 return start, end - start
999 case outunsent.numRanges() > 0:
1000 return outunsent.min(), outunsent[0].size()
1001 default:
1002 return end, 0
1003 }
1004 }
1005
View as plain text