1
2
3
4
5
6
7 package quic
8
9 import (
10 "bytes"
11 "context"
12 "crypto/rand"
13 "errors"
14 "fmt"
15 "io"
16 "strings"
17 "testing"
18 )
19
20 func TestStreamWriteBlockedByOutputBuffer(t *testing.T) {
21 testStreamTypes(t, "", func(t *testing.T, styp streamType) {
22 want := []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
23 const writeBufferSize = 4
24 tc := newTestConn(t, clientSide, permissiveTransportParameters, func(c *Config) {
25 c.MaxStreamWriteBufferSize = writeBufferSize
26 })
27 tc.handshake()
28 tc.ignoreFrame(frameTypeAck)
29
30 s := newLocalStream(t, tc, styp)
31
32
33 n, err := s.Write(want)
34 if n != writeBufferSize || err != context.Canceled {
35 t.Fatalf("s.Write() = %v, %v; want %v, context.Canceled", n, err, writeBufferSize)
36 }
37 s.Flush()
38 tc.wantFrame("first write buffer of data sent",
39 packetType1RTT, debugFrameStream{
40 id: s.id,
41 data: want[:writeBufferSize],
42 })
43 off := int64(writeBufferSize)
44
45
46 w := runAsync(tc, func(ctx context.Context) (int, error) {
47 s.SetWriteContext(ctx)
48 n, err := s.Write(want[writeBufferSize:])
49 s.Flush()
50 return n, err
51 })
52 tc.wantIdle("write buffer is full, no more data can be sent")
53
54
55 tc.writeAckForAll()
56 tc.wantFrame("second write buffer of data sent",
57 packetType1RTT, debugFrameStream{
58 id: s.id,
59 off: off,
60 data: want[off:][:writeBufferSize],
61 })
62 off += writeBufferSize
63 tc.wantIdle("write buffer is full, no more data can be sent")
64
65
66 tc.writeAckForAll()
67 tc.wantFrame("remaining data sent",
68 packetType1RTT, debugFrameStream{
69 id: s.id,
70 off: off,
71 data: want[off:],
72 })
73
74 if n, err := w.result(); n != len(want)-writeBufferSize || err != nil {
75 t.Fatalf("s.Write() = %v, %v; want %v, nil",
76 len(want)-writeBufferSize, err, writeBufferSize)
77 }
78 })
79 }
80
81 func TestStreamWriteBlockedByStreamFlowControl(t *testing.T) {
82 testStreamTypes(t, "", func(t *testing.T, styp streamType) {
83 ctx := canceledContext()
84 want := []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
85 tc := newTestConn(t, clientSide, func(p *transportParameters) {
86 p.initialMaxStreamsBidi = 100
87 p.initialMaxStreamsUni = 100
88 p.initialMaxData = 1 << 20
89 })
90 tc.handshake()
91 tc.ignoreFrame(frameTypeAck)
92
93 s, err := tc.conn.newLocalStream(ctx, styp)
94 if err != nil {
95 t.Fatal(err)
96 }
97
98
99 _, err = s.Write(want[:1])
100 if err != nil {
101 t.Fatalf("write with available output buffer: unexpected error: %v", err)
102 }
103 s.Flush()
104 tc.wantFrame("write blocked by flow control triggers a STREAM_DATA_BLOCKED frame",
105 packetType1RTT, debugFrameStreamDataBlocked{
106 id: s.id,
107 max: 0,
108 })
109
110
111 _, err = s.Write(want[1:])
112 if err != nil {
113 t.Fatalf("write with available output buffer: unexpected error: %v", err)
114 }
115 s.Flush()
116 tc.wantIdle("adding more blocked data does not trigger another STREAM_DATA_BLOCKED")
117
118
119 tc.writeFrames(packetType1RTT, debugFrameMaxStreamData{
120 id: s.id,
121 max: 4,
122 })
123 tc.wantFrame("stream window extended, but still more data to write",
124 packetType1RTT, debugFrameStreamDataBlocked{
125 id: s.id,
126 max: 4,
127 })
128 tc.wantFrame("stream window extended to 4, expect blocked write to progress",
129 packetType1RTT, debugFrameStream{
130 id: s.id,
131 data: want[:4],
132 })
133
134
135 tc.writeFrames(packetType1RTT, debugFrameMaxStreamData{
136 id: s.id,
137 max: int64(len(want)),
138 })
139 tc.wantFrame("stream window extended further, expect blocked write to finish",
140 packetType1RTT, debugFrameStream{
141 id: s.id,
142 off: 4,
143 data: want[4:],
144 })
145 })
146 }
147
148 func TestStreamIgnoresMaxStreamDataReduction(t *testing.T) {
149
150
151
152 testStreamTypes(t, "", func(t *testing.T, styp streamType) {
153 ctx := canceledContext()
154 want := []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
155 tc := newTestConn(t, clientSide, func(p *transportParameters) {
156 if styp == uniStream {
157 p.initialMaxStreamsUni = 1
158 p.initialMaxStreamDataUni = 4
159 } else {
160 p.initialMaxStreamsBidi = 1
161 p.initialMaxStreamDataBidiRemote = 4
162 }
163 p.initialMaxData = 1 << 20
164 })
165 tc.handshake()
166 tc.ignoreFrame(frameTypeAck)
167 tc.ignoreFrame(frameTypeStreamDataBlocked)
168
169
170 s, err := tc.conn.newLocalStream(ctx, styp)
171 if err != nil {
172 t.Fatal(err)
173 }
174 s.Write(want[:1])
175 s.Flush()
176 tc.wantFrame("sent data (1 byte) fits within flow control limit",
177 packetType1RTT, debugFrameStream{
178 id: s.id,
179 off: 0,
180 data: want[:1],
181 })
182
183
184 tc.writeFrames(packetType1RTT, debugFrameMaxStreamData{
185 id: s.id,
186 max: 2,
187 })
188
189
190 s.Write(want[1:])
191 tc.wantFrame("stream limit is 4 bytes, ignoring decrease in MAX_STREAM_DATA",
192 packetType1RTT, debugFrameStream{
193 id: s.id,
194 off: 1,
195 data: want[1:4],
196 })
197
198
199
200 tc.writeFrames(packetType1RTT, debugFrameMaxStreamData{
201 id: s.id,
202 max: 8,
203 })
204 tc.writeFrames(packetType1RTT, debugFrameMaxStreamData{
205 id: s.id,
206 max: 6,
207 })
208
209
210 s.Write(want[4:])
211 tc.wantFrame("stream limit is 8 bytes, ignoring decrease in MAX_STREAM_DATA",
212 packetType1RTT, debugFrameStream{
213 id: s.id,
214 off: 4,
215 data: want[4:8],
216 })
217 })
218 }
219
220 func TestStreamWriteBlockedByWriteBufferLimit(t *testing.T) {
221 testStreamTypes(t, "", func(t *testing.T, styp streamType) {
222 want := []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
223 const maxWriteBuffer = 4
224 tc := newTestConn(t, clientSide, func(p *transportParameters) {
225 p.initialMaxStreamsBidi = 100
226 p.initialMaxStreamsUni = 100
227 p.initialMaxData = 1 << 20
228 p.initialMaxStreamDataBidiRemote = 1 << 20
229 p.initialMaxStreamDataUni = 1 << 20
230 }, func(c *Config) {
231 c.MaxStreamWriteBufferSize = maxWriteBuffer
232 })
233 tc.handshake()
234 tc.ignoreFrame(frameTypeAck)
235
236
237
238
239 s := newLocalStream(t, tc, styp)
240 w := runAsync(tc, func(ctx context.Context) (int, error) {
241 s.SetWriteContext(ctx)
242 return s.Write(want)
243 })
244 tc.wantFrame("stream write should send as much data as write buffer allows",
245 packetType1RTT, debugFrameStream{
246 id: s.id,
247 off: 0,
248 data: want[:maxWriteBuffer],
249 })
250 tc.wantIdle("no STREAM_DATA_BLOCKED, we're blocked locally not by flow control")
251
252
253 tc.writeAckForAll()
254 tc.wantFrame("ACK for previous data allows making progress",
255 packetType1RTT, debugFrameStream{
256 id: s.id,
257 off: maxWriteBuffer,
258 data: want[maxWriteBuffer:][:maxWriteBuffer],
259 })
260
261
262 w.cancel()
263 n, err := w.result()
264 if n != 2*maxWriteBuffer || err == nil {
265 t.Fatalf("Write() = %v, %v; want %v bytes, error", n, err, 2*maxWriteBuffer)
266 }
267 })
268 }
269
270 func TestStreamReceive(t *testing.T) {
271
272
273
274 want := make([]byte, 5000)
275 for i := range want {
276 want[i] = byte(i)
277 }
278 type frame struct {
279 start int64
280 end int64
281 fin bool
282 want int
283 wantEOF bool
284 }
285 for _, test := range []struct {
286 name string
287 frames []frame
288 }{{
289 name: "linear",
290 frames: []frame{{
291 start: 0,
292 end: 1000,
293 want: 1000,
294 }, {
295 start: 1000,
296 end: 2000,
297 want: 2000,
298 }, {
299 start: 2000,
300 end: 3000,
301 want: 3000,
302 fin: true,
303 wantEOF: true,
304 }},
305 }, {
306 name: "out of order",
307 frames: []frame{{
308 start: 1000,
309 end: 2000,
310 }, {
311 start: 2000,
312 end: 3000,
313 }, {
314 start: 0,
315 end: 1000,
316 want: 3000,
317 }},
318 }, {
319 name: "resent",
320 frames: []frame{{
321 start: 0,
322 end: 1000,
323 want: 1000,
324 }, {
325 start: 0,
326 end: 1000,
327 want: 1000,
328 }, {
329 start: 1000,
330 end: 2000,
331 want: 2000,
332 }, {
333 start: 0,
334 end: 1000,
335 want: 2000,
336 }, {
337 start: 1000,
338 end: 2000,
339 want: 2000,
340 }},
341 }, {
342 name: "overlapping",
343 frames: []frame{{
344 start: 0,
345 end: 1000,
346 want: 1000,
347 }, {
348 start: 3000,
349 end: 4000,
350 want: 1000,
351 }, {
352 start: 2000,
353 end: 3000,
354 want: 1000,
355 }, {
356 start: 1000,
357 end: 3000,
358 want: 4000,
359 }},
360 }, {
361 name: "early eof",
362 frames: []frame{{
363 start: 3000,
364 end: 3000,
365 fin: true,
366 want: 0,
367 }, {
368 start: 1000,
369 end: 2000,
370 want: 0,
371 }, {
372 start: 0,
373 end: 1000,
374 want: 2000,
375 }, {
376 start: 2000,
377 end: 3000,
378 want: 3000,
379 wantEOF: true,
380 }},
381 }, {
382 name: "empty eof",
383 frames: []frame{{
384 start: 0,
385 end: 1000,
386 want: 1000,
387 }, {
388 start: 1000,
389 end: 1000,
390 fin: true,
391 want: 1000,
392 wantEOF: true,
393 }},
394 }} {
395 testStreamTypes(t, test.name, func(t *testing.T, styp streamType) {
396 tc := newTestConn(t, serverSide)
397 tc.handshake()
398 sid := newStreamID(clientSide, styp, 0)
399 var s *Stream
400 got := make([]byte, len(want))
401 var total int
402 for _, f := range test.frames {
403 t.Logf("receive [%v,%v)", f.start, f.end)
404 tc.writeFrames(packetType1RTT, debugFrameStream{
405 id: sid,
406 off: f.start,
407 data: want[f.start:f.end],
408 fin: f.fin,
409 })
410 if s == nil {
411 s = tc.acceptStream()
412 }
413 for {
414 n, err := s.Read(got[total:])
415 t.Logf("s.Read() = %v, %v", n, err)
416 total += n
417 if f.wantEOF && err != io.EOF {
418 t.Fatalf("Read() error = %v; want io.EOF", err)
419 }
420 if !f.wantEOF && err == io.EOF {
421 t.Fatalf("Read() error = io.EOF, want something else")
422 }
423 if err != nil {
424 break
425 }
426 }
427 if total != f.want {
428 t.Fatalf("total bytes read = %v, want %v", total, f.want)
429 }
430 for i := 0; i < total; i++ {
431 if got[i] != want[i] {
432 t.Fatalf("byte %v differs: got %v, want %v", i, got[i], want[i])
433 }
434 }
435 }
436 })
437 }
438
439 }
440
441 func TestStreamReceiveExtendsStreamWindow(t *testing.T) {
442 testStreamTypes(t, "", func(t *testing.T, styp streamType) {
443 const maxWindowSize = 20
444 ctx := canceledContext()
445 tc := newTestConn(t, serverSide, func(c *Config) {
446 c.MaxStreamReadBufferSize = maxWindowSize
447 })
448 tc.handshake()
449 tc.ignoreFrame(frameTypeAck)
450 sid := newStreamID(clientSide, styp, 0)
451 tc.writeFrames(packetType1RTT, debugFrameStream{
452 id: sid,
453 off: 0,
454 data: make([]byte, maxWindowSize),
455 })
456 s, err := tc.conn.AcceptStream(ctx)
457 if err != nil {
458 t.Fatalf("AcceptStream: %v", err)
459 }
460 tc.wantIdle("stream window is not extended before data is read")
461 buf := make([]byte, maxWindowSize+1)
462 if n, err := s.Read(buf); n != maxWindowSize || err != nil {
463 t.Fatalf("s.Read() = %v, %v; want %v, nil", n, err, maxWindowSize)
464 }
465 tc.wantFrame("stream window is extended after reading data",
466 packetType1RTT, debugFrameMaxStreamData{
467 id: sid,
468 max: maxWindowSize * 2,
469 })
470 tc.writeFrames(packetType1RTT, debugFrameStream{
471 id: sid,
472 off: maxWindowSize,
473 data: make([]byte, maxWindowSize),
474 fin: true,
475 })
476 if n, err := s.Read(buf); n != maxWindowSize || err != io.EOF {
477 t.Fatalf("s.Read() = %v, %v; want %v, io.EOF", n, err, maxWindowSize)
478 }
479 tc.wantIdle("stream window is not extended after FIN")
480 })
481 }
482
483 func TestStreamReceiveViolatesStreamDataLimit(t *testing.T) {
484
485
486
487 testStreamTypes(t, "", func(t *testing.T, styp streamType) {
488 const maxStreamData = 10
489 for _, test := range []struct {
490 off int64
491 size int64
492 }{{
493 off: maxStreamData,
494 size: 1,
495 }, {
496 off: 0,
497 size: maxStreamData + 1,
498 }, {
499 off: maxStreamData - 1,
500 size: 2,
501 }} {
502 tc := newTestConn(t, serverSide, func(c *Config) {
503 c.MaxStreamReadBufferSize = maxStreamData
504 })
505 tc.handshake()
506 tc.ignoreFrame(frameTypeAck)
507 tc.writeFrames(packetType1RTT, debugFrameStream{
508 id: newStreamID(clientSide, styp, 0),
509 off: test.off,
510 data: make([]byte, test.size),
511 })
512 tc.wantFrame(
513 fmt.Sprintf("data [%v,%v) violates stream data limit and closes connection",
514 test.off, test.off+test.size),
515 packetType1RTT, debugFrameConnectionCloseTransport{
516 code: errFlowControl,
517 },
518 )
519 }
520 })
521 }
522
523 func TestStreamReceiveDuplicateDataDoesNotViolateLimits(t *testing.T) {
524 testStreamTypes(t, "", func(t *testing.T, styp streamType) {
525 const maxData = 10
526 tc := newTestConn(t, serverSide, func(c *Config) {
527
528 c.MaxStreamReadBufferSize = maxData
529 })
530 tc.handshake()
531 tc.ignoreFrame(frameTypeAck)
532 for i := 0; i < 3; i++ {
533 tc.writeFrames(packetType1RTT, debugFrameStream{
534 id: newStreamID(clientSide, styp, 0),
535 off: 0,
536 data: make([]byte, maxData),
537 })
538 tc.wantIdle(fmt.Sprintf("conn sends no frames after receiving data frame %v", i))
539 }
540 })
541 }
542
543 func TestStreamReceiveEmptyEOF(t *testing.T) {
544
545
546
547 testStreamTypes(t, "", func(t *testing.T, styp streamType) {
548 tc, s := newTestConnAndRemoteStream(t, serverSide, styp, permissiveTransportParameters)
549 want := []byte{1, 2, 3}
550 tc.writeFrames(packetType1RTT, debugFrameStream{
551 id: s.id,
552 data: want,
553 })
554 if got, err := s.ReadByte(); got != want[0] || err != nil {
555 t.Fatalf("s.ReadByte() = %v, %v; want %v, nil", got, err, want[0])
556 }
557
558 tc.writeFrames(packetType1RTT, debugFrameStream{
559 id: s.id,
560 off: 3,
561 fin: true,
562 })
563 if got, err := io.ReadAll(s); !bytes.Equal(got, want[1:]) || err != nil {
564 t.Fatalf("io.ReadAll(s) = {%x}, %v; want {%x}, nil", got, err, want[1:])
565 }
566 })
567 }
568
569 func finalSizeTest(t *testing.T, wantErr transportError, f func(tc *testConn, sid streamID) (finalSize int64), opts ...any) {
570 testStreamTypes(t, "", func(t *testing.T, styp streamType) {
571 for _, test := range []struct {
572 name string
573 finalFrame func(tc *testConn, sid streamID, finalSize int64)
574 }{{
575 name: "FIN",
576 finalFrame: func(tc *testConn, sid streamID, finalSize int64) {
577 tc.writeFrames(packetType1RTT, debugFrameStream{
578 id: sid,
579 off: finalSize,
580 fin: true,
581 })
582 },
583 }, {
584 name: "RESET_STREAM",
585 finalFrame: func(tc *testConn, sid streamID, finalSize int64) {
586 tc.writeFrames(packetType1RTT, debugFrameResetStream{
587 id: sid,
588 finalSize: finalSize,
589 })
590 },
591 }} {
592 t.Run(test.name, func(t *testing.T) {
593 tc := newTestConn(t, serverSide, opts...)
594 tc.handshake()
595 sid := newStreamID(clientSide, styp, 0)
596 finalSize := f(tc, sid)
597 test.finalFrame(tc, sid, finalSize)
598 tc.wantFrame("change in final size of stream is an error",
599 packetType1RTT, debugFrameConnectionCloseTransport{
600 code: wantErr,
601 },
602 )
603 })
604 }
605 })
606 }
607
608 func TestStreamFinalSizeChangedAfterFin(t *testing.T) {
609
610
611
612
613 finalSizeTest(t, errFinalSize, func(tc *testConn, sid streamID) (finalSize int64) {
614 tc.writeFrames(packetType1RTT, debugFrameStream{
615 id: sid,
616 off: 10,
617 fin: true,
618 })
619 return 9
620 })
621 }
622
623 func TestStreamFinalSizeBeforePreviousData(t *testing.T) {
624 finalSizeTest(t, errFinalSize, func(tc *testConn, sid streamID) (finalSize int64) {
625 tc.writeFrames(packetType1RTT, debugFrameStream{
626 id: sid,
627 off: 10,
628 data: []byte{0},
629 })
630 return 9
631 })
632 }
633
634 func TestStreamFinalSizePastMaxStreamData(t *testing.T) {
635 finalSizeTest(t, errFlowControl, func(tc *testConn, sid streamID) (finalSize int64) {
636 return 11
637 }, func(c *Config) {
638 c.MaxStreamReadBufferSize = 10
639 })
640 }
641
642 func TestStreamDataBeyondFinalSize(t *testing.T) {
643
644
645
646 testStreamTypes(t, "", func(t *testing.T, styp streamType) {
647 tc := newTestConn(t, serverSide)
648 tc.handshake()
649 sid := newStreamID(clientSide, styp, 0)
650
651 const write1size = 4
652 tc.writeFrames(packetType1RTT, debugFrameStream{
653 id: sid,
654 off: 0,
655 data: make([]byte, 16),
656 fin: true,
657 })
658 tc.writeFrames(packetType1RTT, debugFrameStream{
659 id: sid,
660 off: 16,
661 data: []byte{0},
662 })
663 tc.wantFrame("received data past final size of stream",
664 packetType1RTT, debugFrameConnectionCloseTransport{
665 code: errFinalSize,
666 },
667 )
668 })
669 }
670
671 func TestStreamReceiveUnblocksReader(t *testing.T) {
672 testStreamTypes(t, "", func(t *testing.T, styp streamType) {
673 tc := newTestConn(t, serverSide)
674 tc.handshake()
675 want := []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
676 sid := newStreamID(clientSide, styp, 0)
677
678
679 accept := runAsync(tc, func(ctx context.Context) (*Stream, error) {
680 return tc.conn.AcceptStream(ctx)
681 })
682 const write1size = 4
683 tc.writeFrames(packetType1RTT, debugFrameStream{
684 id: sid,
685 off: 0,
686 data: want[:write1size],
687 })
688 s, err := accept.result()
689 if err != nil {
690 t.Fatalf("AcceptStream() = %v", err)
691 }
692
693
694 got := make([]byte, len(want))
695 read := runAsync(tc, func(ctx context.Context) (int, error) {
696 return s.Read(got)
697 })
698 if n, err := read.result(); n != write1size || err != nil {
699 t.Fatalf("Read = %v, %v; want %v, nil", n, err, write1size)
700 }
701
702
703 read = runAsync(tc, func(ctx context.Context) (int, error) {
704 s.SetReadContext(ctx)
705 return s.Read(got[write1size:])
706 })
707 tc.writeFrames(packetType1RTT, debugFrameStream{
708 id: sid,
709 off: write1size,
710 data: want[write1size:],
711 fin: true,
712 })
713 if n, err := read.result(); n != len(want)-write1size || err != io.EOF {
714 t.Fatalf("Read = %v, %v; want %v, io.EOF", n, err, len(want)-write1size)
715 }
716 if !bytes.Equal(got, want) {
717 t.Fatalf("read bytes %x, want %x", got, want)
718 }
719 })
720 }
721
722
723
724
725
726
727
728
729 func testStreamSendFrameInvalidState(t *testing.T, f func(sid streamID) debugFrame) {
730 testSides(t, "stream_not_created", func(t *testing.T, side connSide) {
731 tc := newTestConn(t, side, permissiveTransportParameters)
732 tc.handshake()
733 tc.writeFrames(packetType1RTT, f(newStreamID(side, bidiStream, 0)))
734 tc.wantFrame("frame for local stream which has not been created",
735 packetType1RTT, debugFrameConnectionCloseTransport{
736 code: errStreamState,
737 })
738 })
739 testSides(t, "uni_stream", func(t *testing.T, side connSide) {
740 ctx := canceledContext()
741 tc := newTestConn(t, side, permissiveTransportParameters)
742 tc.handshake()
743 sid := newStreamID(side, uniStream, 0)
744 s, err := tc.conn.NewSendOnlyStream(ctx)
745 if err != nil {
746 t.Fatal(err)
747 }
748 s.Flush()
749 tc.wantFrame("new stream is opened",
750 packetType1RTT, debugFrameStream{
751 id: sid,
752 data: []byte{},
753 })
754 tc.writeFrames(packetType1RTT, f(sid))
755 tc.wantFrame("send-oriented frame for send-only stream",
756 packetType1RTT, debugFrameConnectionCloseTransport{
757 code: errStreamState,
758 })
759 })
760 }
761
762 func TestStreamResetStreamInvalidState(t *testing.T) {
763
764
765
766 testStreamSendFrameInvalidState(t, func(sid streamID) debugFrame {
767 return debugFrameResetStream{
768 id: sid,
769 code: 0,
770 finalSize: 0,
771 }
772 })
773 }
774
775 func TestStreamStreamFrameInvalidState(t *testing.T) {
776
777
778
779
780 testStreamSendFrameInvalidState(t, func(sid streamID) debugFrame {
781 return debugFrameStream{
782 id: sid,
783 }
784 })
785 }
786
787 func TestStreamDataBlockedInvalidState(t *testing.T) {
788
789
790
791
792 testStreamSendFrameInvalidState(t, func(sid streamID) debugFrame {
793 return debugFrameStream{
794 id: sid,
795 }
796 })
797 }
798
799
800
801
802
803
804
805
806 func testStreamReceiveFrameInvalidState(t *testing.T, f func(sid streamID) debugFrame) {
807 testSides(t, "stream_not_created", func(t *testing.T, side connSide) {
808 tc := newTestConn(t, side)
809 tc.handshake()
810 tc.writeFrames(packetType1RTT, f(newStreamID(side, bidiStream, 0)))
811 tc.wantFrame("frame for local stream which has not been created",
812 packetType1RTT, debugFrameConnectionCloseTransport{
813 code: errStreamState,
814 })
815 })
816 testSides(t, "uni_stream", func(t *testing.T, side connSide) {
817 tc := newTestConn(t, side)
818 tc.handshake()
819 tc.writeFrames(packetType1RTT, f(newStreamID(side.peer(), uniStream, 0)))
820 tc.wantFrame("receive-oriented frame for receive-only stream",
821 packetType1RTT, debugFrameConnectionCloseTransport{
822 code: errStreamState,
823 })
824 })
825 }
826
827 func TestStreamStopSendingInvalidState(t *testing.T) {
828
829
830
831
832
833
834 testStreamReceiveFrameInvalidState(t, func(sid streamID) debugFrame {
835 return debugFrameStopSending{
836 id: sid,
837 }
838 })
839 }
840
841 func TestStreamMaxStreamDataInvalidState(t *testing.T) {
842
843
844
845
846
847
848 testStreamReceiveFrameInvalidState(t, func(sid streamID) debugFrame {
849 return debugFrameMaxStreamData{
850 id: sid,
851 max: 1000,
852 }
853 })
854 }
855
856 func TestStreamOffsetTooLarge(t *testing.T) {
857
858
859
860 tc := newTestConn(t, serverSide)
861 tc.handshake()
862
863 tc.writeFrames(packetType1RTT,
864 debugFrameStream{
865 id: newStreamID(clientSide, bidiStream, 0),
866 off: 1<<62 - 1,
867 data: []byte{0},
868 })
869 got, _ := tc.readFrame()
870 want1 := debugFrameConnectionCloseTransport{code: errFrameEncoding}
871 want2 := debugFrameConnectionCloseTransport{code: errFlowControl}
872 if !frameEqual(got, want1) && !frameEqual(got, want2) {
873 t.Fatalf("STREAM offset exceeds 2^62-1\ngot: %v\nwant: %v\n or: %v", got, want1, want2)
874 }
875 }
876
877 func TestStreamReadFromWriteOnlyStream(t *testing.T) {
878 _, s := newTestConnAndLocalStream(t, serverSide, uniStream, permissiveTransportParameters)
879 buf := make([]byte, 10)
880 wantErr := "read from write-only stream"
881 if n, err := s.Read(buf); err == nil || !strings.Contains(err.Error(), wantErr) {
882 t.Errorf("s.Read() = %v, %v; want error %q", n, err, wantErr)
883 }
884 }
885
886 func TestStreamWriteToReadOnlyStream(t *testing.T) {
887 _, s := newTestConnAndRemoteStream(t, serverSide, uniStream)
888 buf := make([]byte, 10)
889 wantErr := "write to read-only stream"
890 if n, err := s.Write(buf); err == nil || !strings.Contains(err.Error(), wantErr) {
891 t.Errorf("s.Write() = %v, %v; want error %q", n, err, wantErr)
892 }
893 }
894
895 func TestStreamReadFromClosedStream(t *testing.T) {
896 tc, s := newTestConnAndRemoteStream(t, serverSide, bidiStream, permissiveTransportParameters)
897 s.CloseRead()
898 tc.wantFrame("CloseRead sends a STOP_SENDING frame",
899 packetType1RTT, debugFrameStopSending{
900 id: s.id,
901 })
902 wantErr := "read from closed stream"
903 if n, err := s.Read(make([]byte, 16)); err == nil || !strings.Contains(err.Error(), wantErr) {
904 t.Errorf("s.Read() = %v, %v; want error %q", n, err, wantErr)
905 }
906
907 tc.writeFrames(packetType1RTT, debugFrameStream{
908 id: s.id,
909 data: []byte{1, 2, 3},
910 fin: true,
911 })
912 if n, err := s.Read(make([]byte, 16)); err == nil || !strings.Contains(err.Error(), wantErr) {
913 t.Errorf("s.Read() = %v, %v; want error %q", n, err, wantErr)
914 }
915 }
916
917 func TestStreamCloseReadWithAllDataReceived(t *testing.T) {
918 tc, s := newTestConnAndRemoteStream(t, serverSide, bidiStream, permissiveTransportParameters)
919 tc.writeFrames(packetType1RTT, debugFrameStream{
920 id: s.id,
921 data: []byte{1, 2, 3},
922 fin: true,
923 })
924 s.CloseRead()
925 tc.wantIdle("CloseRead in Data Recvd state doesn't need to send STOP_SENDING")
926
927 wantErr := "read from closed stream"
928 if n, err := s.Read(make([]byte, 16)); err == nil || !strings.Contains(err.Error(), wantErr) {
929 t.Errorf("s.Read() = %v, %v; want error %q", n, err, wantErr)
930 }
931 }
932
933 func TestStreamWriteToClosedStream(t *testing.T) {
934 tc, s := newTestConnAndLocalStream(t, serverSide, bidiStream, permissiveTransportParameters)
935 s.CloseWrite()
936 tc.wantFrame("stream is opened after being closed",
937 packetType1RTT, debugFrameStream{
938 id: s.id,
939 off: 0,
940 fin: true,
941 data: []byte{},
942 })
943 wantErr := "write to closed stream"
944 if n, err := s.Write([]byte{}); err == nil || !strings.Contains(err.Error(), wantErr) {
945 t.Errorf("s.Write() = %v, %v; want error %q", n, err, wantErr)
946 }
947 }
948
949 func TestStreamResetBlockedStream(t *testing.T) {
950 tc, s := newTestConnAndLocalStream(t, serverSide, bidiStream, permissiveTransportParameters,
951 func(c *Config) {
952 c.MaxStreamWriteBufferSize = 4
953 })
954 tc.ignoreFrame(frameTypeStreamDataBlocked)
955 writing := runAsync(tc, func(ctx context.Context) (int, error) {
956 s.SetWriteContext(ctx)
957 return s.Write([]byte{0, 1, 2, 3, 4, 5, 6, 7})
958 })
959 tc.wantFrame("stream writes data until write buffer fills",
960 packetType1RTT, debugFrameStream{
961 id: s.id,
962 off: 0,
963 data: []byte{0, 1, 2, 3},
964 })
965 s.Reset(42)
966 tc.wantFrame("stream is reset",
967 packetType1RTT, debugFrameResetStream{
968 id: s.id,
969 code: 42,
970 finalSize: 4,
971 })
972 wantErr := "write to reset stream"
973 if n, err := writing.result(); n != 4 || !strings.Contains(err.Error(), wantErr) {
974 t.Errorf("s.Write() interrupted by Reset: %v, %q; want 4, %q", n, err, wantErr)
975 }
976 tc.writeAckForAll()
977 tc.wantIdle("buffer space is available, but stream has been reset")
978 s.Reset(100)
979 tc.wantIdle("resetting stream a second time has no effect")
980 if n, err := s.Write([]byte{}); err == nil || !strings.Contains(err.Error(), wantErr) {
981 t.Errorf("s.Write() = %v, %v; want error %q", n, err, wantErr)
982 }
983 }
984
985 func TestStreamWriteMoreThanOnePacketOfData(t *testing.T) {
986 tc, s := newTestConnAndLocalStream(t, serverSide, uniStream, func(p *transportParameters) {
987 p.initialMaxStreamsUni = 1
988 p.initialMaxData = 1 << 20
989 p.initialMaxStreamDataUni = 1 << 20
990 })
991 want := make([]byte, 4096)
992 rand.Read(want)
993 w := runAsync(tc, func(ctx context.Context) (int, error) {
994 n, err := s.Write(want)
995 s.Flush()
996 return n, err
997 })
998 got := make([]byte, 0, len(want))
999 for {
1000 f, _ := tc.readFrame()
1001 if f == nil {
1002 break
1003 }
1004 sf, ok := f.(debugFrameStream)
1005 if !ok {
1006 t.Fatalf("unexpected frame: %v", sf)
1007 }
1008 if len(got) != int(sf.off) {
1009 t.Fatalf("got frame: %v\nwant offset %v", sf, len(got))
1010 }
1011 got = append(got, sf.data...)
1012 }
1013 if n, err := w.result(); n != len(want) || err != nil {
1014 t.Fatalf("s.Write() = %v, %v; want %v, nil", n, err, len(want))
1015 }
1016 if !bytes.Equal(got, want) {
1017 t.Fatalf("mismatch in received stream data")
1018 }
1019 }
1020
1021 func TestStreamCloseWaitsForAcks(t *testing.T) {
1022 tc, s := newTestConnAndLocalStream(t, serverSide, uniStream, permissiveTransportParameters)
1023 data := make([]byte, 100)
1024 s.Write(data)
1025 s.Flush()
1026 tc.wantFrame("conn sends data for the stream",
1027 packetType1RTT, debugFrameStream{
1028 id: s.id,
1029 data: data,
1030 })
1031 if err := s.Close(); err != context.Canceled {
1032 t.Fatalf("s.Close() = %v, want context.Canceled (data not acked yet)", err)
1033 }
1034 tc.wantFrame("conn sends FIN for closed stream",
1035 packetType1RTT, debugFrameStream{
1036 id: s.id,
1037 off: int64(len(data)),
1038 fin: true,
1039 data: []byte{},
1040 })
1041 closing := runAsync(tc, func(ctx context.Context) (struct{}, error) {
1042 s.SetWriteContext(ctx)
1043 return struct{}{}, s.Close()
1044 })
1045 if _, err := closing.result(); err != errNotDone {
1046 t.Fatalf("s.Close() = %v, want it to block waiting for acks", err)
1047 }
1048 tc.writeAckForAll()
1049 if _, err := closing.result(); err != nil {
1050 t.Fatalf("s.Close() = %v, want nil (all data acked)", err)
1051 }
1052 }
1053
1054 func TestStreamCloseReadOnly(t *testing.T) {
1055 tc, s := newTestConnAndRemoteStream(t, serverSide, uniStream, permissiveTransportParameters)
1056 if err := s.Close(); err != nil {
1057 t.Errorf("s.Close() = %v, want nil", err)
1058 }
1059 tc.wantFrame("closed stream sends STOP_SENDING",
1060 packetType1RTT, debugFrameStopSending{
1061 id: s.id,
1062 })
1063 }
1064
1065 func TestStreamCloseUnblocked(t *testing.T) {
1066 for _, test := range []struct {
1067 name string
1068 unblock func(tc *testConn, s *Stream)
1069 success bool
1070 }{{
1071 name: "data received",
1072 unblock: func(tc *testConn, s *Stream) {
1073 tc.writeAckForAll()
1074 },
1075 success: true,
1076 }, {
1077 name: "stop sending received",
1078 unblock: func(tc *testConn, s *Stream) {
1079 tc.writeFrames(packetType1RTT, debugFrameStopSending{
1080 id: s.id,
1081 })
1082 },
1083 }, {
1084 name: "stream reset",
1085 unblock: func(tc *testConn, s *Stream) {
1086 s.Reset(0)
1087 tc.wait()
1088 },
1089 }} {
1090 t.Run(test.name, func(t *testing.T) {
1091 tc, s := newTestConnAndLocalStream(t, serverSide, uniStream, permissiveTransportParameters)
1092 data := make([]byte, 100)
1093 s.Write(data)
1094 s.Flush()
1095 tc.wantFrame("conn sends data for the stream",
1096 packetType1RTT, debugFrameStream{
1097 id: s.id,
1098 data: data,
1099 })
1100 if err := s.Close(); err != context.Canceled {
1101 t.Fatalf("s.Close() = %v, want context.Canceled (data not acked yet)", err)
1102 }
1103 tc.wantFrame("conn sends FIN for closed stream",
1104 packetType1RTT, debugFrameStream{
1105 id: s.id,
1106 off: int64(len(data)),
1107 fin: true,
1108 data: []byte{},
1109 })
1110 closing := runAsync(tc, func(ctx context.Context) (struct{}, error) {
1111 s.SetWriteContext(ctx)
1112 return struct{}{}, s.Close()
1113 })
1114 if _, err := closing.result(); err != errNotDone {
1115 t.Fatalf("s.Close() = %v, want it to block waiting for acks", err)
1116 }
1117 test.unblock(tc, s)
1118 _, err := closing.result()
1119 switch {
1120 case err == errNotDone:
1121 t.Fatalf("s.Close() still blocking; want it to have returned")
1122 case err == nil && !test.success:
1123 t.Fatalf("s.Close() = nil, want error")
1124 case err != nil && test.success:
1125 t.Fatalf("s.Close() = %v, want nil (all data acked)", err)
1126 }
1127 })
1128 }
1129 }
1130
1131 func TestStreamCloseWriteWhenBlockedByStreamFlowControl(t *testing.T) {
1132 tc, s := newTestConnAndLocalStream(t, serverSide, uniStream, permissiveTransportParameters,
1133 func(p *transportParameters) {
1134
1135 p.initialMaxStreamDataUni = 0
1136 })
1137 tc.ignoreFrame(frameTypeStreamDataBlocked)
1138 if _, err := s.Write([]byte{0, 1}); err != nil {
1139 t.Fatalf("s.Write = %v", err)
1140 }
1141 s.CloseWrite()
1142 tc.wantIdle("stream write is blocked by flow control")
1143
1144 tc.writeFrames(packetType1RTT, debugFrameMaxStreamData{
1145 id: s.id,
1146 max: 1,
1147 })
1148 tc.wantFrame("send data up to flow control limit",
1149 packetType1RTT, debugFrameStream{
1150 id: s.id,
1151 data: []byte{0},
1152 })
1153 tc.wantIdle("stream write is again blocked by flow control")
1154
1155 tc.writeFrames(packetType1RTT, debugFrameMaxStreamData{
1156 id: s.id,
1157 max: 2,
1158 })
1159 tc.wantFrame("send remaining data and FIN",
1160 packetType1RTT, debugFrameStream{
1161 id: s.id,
1162 off: 1,
1163 data: []byte{1},
1164 fin: true,
1165 })
1166 }
1167
1168 func TestStreamPeerResetsWithUnreadAndUnsentData(t *testing.T) {
1169 testStreamTypes(t, "", func(t *testing.T, styp streamType) {
1170 tc, s := newTestConnAndRemoteStream(t, serverSide, styp)
1171 data := []byte{0, 1, 2, 3, 4, 5, 6, 7}
1172 tc.writeFrames(packetType1RTT, debugFrameStream{
1173 id: s.id,
1174 data: data,
1175 })
1176 got := make([]byte, 4)
1177 if n, err := s.Read(got); n != len(got) || err != nil {
1178 t.Fatalf("Read start of stream: got %v, %v; want %v, nil", n, err, len(got))
1179 }
1180 const sentCode = 42
1181 tc.writeFrames(packetType1RTT, debugFrameResetStream{
1182 id: s.id,
1183 finalSize: 20,
1184 code: sentCode,
1185 })
1186 wantErr := StreamErrorCode(sentCode)
1187 if _, err := io.ReadAll(s); !errors.Is(err, wantErr) {
1188 t.Fatalf("Read reset stream: ReadAll got error %v; want %v", err, wantErr)
1189 }
1190 })
1191 }
1192
1193 func TestStreamPeerResetWakesBlockedRead(t *testing.T) {
1194 testStreamTypes(t, "", func(t *testing.T, styp streamType) {
1195 tc, s := newTestConnAndRemoteStream(t, serverSide, styp)
1196 reader := runAsync(tc, func(ctx context.Context) (int, error) {
1197 s.SetReadContext(ctx)
1198 got := make([]byte, 4)
1199 return s.Read(got)
1200 })
1201 const sentCode = 42
1202 tc.writeFrames(packetType1RTT, debugFrameResetStream{
1203 id: s.id,
1204 finalSize: 20,
1205 code: sentCode,
1206 })
1207 wantErr := StreamErrorCode(sentCode)
1208 if n, err := reader.result(); n != 0 || !errors.Is(err, wantErr) {
1209 t.Fatalf("Read reset stream: got %v, %v; want 0, %v", n, err, wantErr)
1210 }
1211 })
1212 }
1213
1214 func TestStreamPeerResetFollowedByData(t *testing.T) {
1215 testStreamTypes(t, "", func(t *testing.T, styp streamType) {
1216 tc, s := newTestConnAndRemoteStream(t, serverSide, styp)
1217 tc.writeFrames(packetType1RTT, debugFrameResetStream{
1218 id: s.id,
1219 finalSize: 4,
1220 code: 1,
1221 })
1222 tc.writeFrames(packetType1RTT, debugFrameStream{
1223 id: s.id,
1224 data: []byte{0, 1, 2, 3},
1225 })
1226
1227 tc.writeFrames(packetType1RTT, debugFrameResetStream{
1228 id: s.id,
1229 finalSize: 4,
1230 code: 2,
1231 })
1232 wantErr := StreamErrorCode(1)
1233 if n, err := s.Read(make([]byte, 16)); n != 0 || !errors.Is(err, wantErr) {
1234 t.Fatalf("Read from reset stream: got %v, %v; want 0, %v", n, err, wantErr)
1235 }
1236 })
1237 }
1238
1239 func TestStreamResetInvalidCode(t *testing.T) {
1240 tc, s := newTestConnAndLocalStream(t, serverSide, uniStream, permissiveTransportParameters)
1241 s.Reset(1 << 62)
1242 tc.wantFrame("reset with invalid code sends a RESET_STREAM anyway",
1243 packetType1RTT, debugFrameResetStream{
1244 id: s.id,
1245
1246
1247 code: (1 << 62) - 1,
1248 })
1249 }
1250
1251 func TestStreamResetReceiveOnly(t *testing.T) {
1252 tc, s := newTestConnAndRemoteStream(t, serverSide, uniStream)
1253 s.Reset(0)
1254 tc.wantIdle("resetting a receive-only stream has no effect")
1255 }
1256
1257 func TestStreamPeerStopSendingForActiveStream(t *testing.T) {
1258
1259
1260
1261 testStreamTypes(t, "", func(t *testing.T, styp streamType) {
1262 tc, s := newTestConnAndLocalStream(t, serverSide, styp, permissiveTransportParameters)
1263 for i := 0; i < 4; i++ {
1264 s.Write([]byte{byte(i)})
1265 s.Flush()
1266 tc.wantFrame("write sends a STREAM frame to peer",
1267 packetType1RTT, debugFrameStream{
1268 id: s.id,
1269 off: int64(i),
1270 data: []byte{byte(i)},
1271 })
1272 }
1273 tc.writeFrames(packetType1RTT, debugFrameStopSending{
1274 id: s.id,
1275 code: 42,
1276 })
1277 tc.wantFrame("receiving STOP_SENDING causes stream reset",
1278 packetType1RTT, debugFrameResetStream{
1279 id: s.id,
1280 code: 42,
1281 finalSize: 4,
1282 })
1283 if n, err := s.Write([]byte{0}); err == nil {
1284 t.Errorf("s.Write() after STOP_SENDING = %v, %v; want error", n, err)
1285 }
1286
1287 tc.writeAckForLatest()
1288 tc.wantIdle("lost STREAM frames for reset stream are not resent")
1289 })
1290 }
1291
1292 func TestStreamReceiveDataBlocked(t *testing.T) {
1293 tc := newTestConn(t, serverSide, permissiveTransportParameters)
1294 tc.handshake()
1295 tc.ignoreFrame(frameTypeAck)
1296
1297
1298
1299 tc.writeFrames(packetType1RTT, debugFrameStreamDataBlocked{
1300 id: newStreamID(clientSide, bidiStream, 0),
1301 max: 100,
1302 })
1303 tc.writeFrames(packetType1RTT, debugFrameDataBlocked{
1304 max: 100,
1305 })
1306 tc.wantIdle("no response to STREAM_DATA_BLOCKED and DATA_BLOCKED")
1307 }
1308
1309 func TestStreamFlushExplicit(t *testing.T) {
1310 testStreamTypes(t, "", func(t *testing.T, styp streamType) {
1311 tc, s := newTestConnAndLocalStream(t, clientSide, styp, permissiveTransportParameters)
1312 want := []byte{0, 1, 2, 3}
1313 n, err := s.Write(want)
1314 if n != len(want) || err != nil {
1315 t.Fatalf("s.Write() = %v, %v; want %v, nil", n, err, len(want))
1316 }
1317 tc.wantIdle("unflushed data is not sent")
1318 s.Flush()
1319 tc.wantFrame("data is sent after flush",
1320 packetType1RTT, debugFrameStream{
1321 id: s.id,
1322 data: want,
1323 })
1324 })
1325 }
1326
1327 func TestStreamFlushImplicitExact(t *testing.T) {
1328 testStreamTypes(t, "", func(t *testing.T, styp streamType) {
1329 const writeBufferSize = 4
1330 tc, s := newTestConnAndLocalStream(t, clientSide, styp,
1331 permissiveTransportParameters,
1332 func(c *Config) {
1333 c.MaxStreamWriteBufferSize = writeBufferSize
1334 })
1335 want := []byte{0, 1, 2, 3, 4, 5, 6}
1336
1337
1338 n, err := s.Write(want[:3])
1339 if n != 3 || err != nil {
1340 t.Fatalf("s.Write() = %v, %v; want %v, nil", n, err, len(want))
1341 }
1342 tc.wantIdle("unflushed data is not sent")
1343
1344
1345 n, err = s.Write(want[3:4])
1346 if n != 1 || err != nil {
1347 t.Fatalf("s.Write() = %v, %v; want %v, nil", n, err, len(want))
1348 }
1349 tc.wantFrame("data is sent after write buffer fills",
1350 packetType1RTT, debugFrameStream{
1351 id: s.id,
1352 data: want[0:4],
1353 })
1354 })
1355 }
1356
1357 func TestStreamFlushImplicitLargerThanBuffer(t *testing.T) {
1358 testStreamTypes(t, "", func(t *testing.T, styp streamType) {
1359 const writeBufferSize = 4
1360 tc, s := newTestConnAndLocalStream(t, clientSide, styp,
1361 permissiveTransportParameters,
1362 func(c *Config) {
1363 c.MaxStreamWriteBufferSize = writeBufferSize
1364 })
1365 want := []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
1366
1367 w := runAsync(tc, func(ctx context.Context) (int, error) {
1368 s.SetWriteContext(ctx)
1369 n, err := s.Write(want)
1370 return n, err
1371 })
1372
1373 tc.wantFrame("data is sent after write buffer fills",
1374 packetType1RTT, debugFrameStream{
1375 id: s.id,
1376 data: want[0:4],
1377 })
1378 tc.writeAckForAll()
1379 tc.wantFrame("ack permits sending more data",
1380 packetType1RTT, debugFrameStream{
1381 id: s.id,
1382 off: 4,
1383 data: want[4:8],
1384 })
1385 tc.writeAckForAll()
1386
1387 tc.wantIdle("write buffer is not full")
1388 if n, err := w.result(); n != len(want) || err != nil {
1389 t.Fatalf("Write() = %v, %v; want %v, nil", n, err, len(want))
1390 }
1391
1392 s.Flush()
1393 tc.wantFrame("flush sends last buffer of data",
1394 packetType1RTT, debugFrameStream{
1395 id: s.id,
1396 off: 8,
1397 data: want[8:],
1398 })
1399 })
1400 }
1401
1402 type streamSide string
1403
1404 const (
1405 localStream = streamSide("local")
1406 remoteStream = streamSide("remote")
1407 )
1408
1409 func newTestConnAndStream(t *testing.T, side connSide, sside streamSide, styp streamType, opts ...any) (*testConn, *Stream) {
1410 if sside == localStream {
1411 return newTestConnAndLocalStream(t, side, styp, opts...)
1412 } else {
1413 return newTestConnAndRemoteStream(t, side, styp, opts...)
1414 }
1415 }
1416
1417 func newTestConnAndLocalStream(t *testing.T, side connSide, styp streamType, opts ...any) (*testConn, *Stream) {
1418 t.Helper()
1419 tc := newTestConn(t, side, opts...)
1420 tc.handshake()
1421 tc.ignoreFrame(frameTypeAck)
1422 s := newLocalStream(t, tc, styp)
1423 s.SetReadContext(canceledContext())
1424 s.SetWriteContext(canceledContext())
1425 return tc, s
1426 }
1427
1428 func newLocalStream(t *testing.T, tc *testConn, styp streamType) *Stream {
1429 t.Helper()
1430 ctx := canceledContext()
1431 s, err := tc.conn.newLocalStream(ctx, styp)
1432 if err != nil {
1433 t.Fatalf("conn.newLocalStream(%v) = %v", styp, err)
1434 }
1435 s.SetReadContext(canceledContext())
1436 s.SetWriteContext(canceledContext())
1437 return s
1438 }
1439
1440 func newTestConnAndRemoteStream(t *testing.T, side connSide, styp streamType, opts ...any) (*testConn, *Stream) {
1441 t.Helper()
1442 tc := newTestConn(t, side, opts...)
1443 tc.handshake()
1444 tc.ignoreFrame(frameTypeAck)
1445 s := newRemoteStream(t, tc, styp)
1446 s.SetReadContext(canceledContext())
1447 s.SetWriteContext(canceledContext())
1448 return tc, s
1449 }
1450
1451 func newRemoteStream(t *testing.T, tc *testConn, styp streamType) *Stream {
1452 t.Helper()
1453 ctx := canceledContext()
1454 tc.writeFrames(packetType1RTT, debugFrameStream{
1455 id: newStreamID(tc.conn.side.peer(), styp, 0),
1456 })
1457 s, err := tc.conn.AcceptStream(ctx)
1458 if err != nil {
1459 t.Fatalf("conn.AcceptStream() = %v", err)
1460 }
1461 s.SetReadContext(canceledContext())
1462 s.SetWriteContext(canceledContext())
1463 return s
1464 }
1465
1466
1467 func permissiveTransportParameters(p *transportParameters) {
1468 p.initialMaxStreamsBidi = maxStreamsLimit
1469 p.initialMaxStreamsUni = maxStreamsLimit
1470 p.initialMaxData = maxVarint
1471 p.initialMaxStreamDataBidiRemote = maxVarint
1472 p.initialMaxStreamDataBidiLocal = maxVarint
1473 p.initialMaxStreamDataUni = maxVarint
1474 }
1475
1476 func makeTestData(n int) []byte {
1477 b := make([]byte, n)
1478 for i := 0; i < n; i++ {
1479 b[i] = byte(i)
1480 }
1481 return b
1482 }
1483
View as plain text