1
2
3
4
5
6
7 package quic
8
9 import (
10 "context"
11 "crypto/tls"
12 "errors"
13 "fmt"
14 "log/slog"
15 "net/netip"
16 "time"
17 )
18
19
20
21
22 type Conn struct {
23 side connSide
24 endpoint *Endpoint
25 config *Config
26 testHooks connTestHooks
27 peerAddr netip.AddrPort
28 localAddr netip.AddrPort
29
30 msgc chan any
31 donec chan struct{}
32
33 w packetWriter
34 acks [numberSpaceCount]ackState
35 lifetime lifetimeState
36 idle idleState
37 connIDState connIDState
38 loss lossState
39 streams streamsState
40 path pathState
41
42
43 keysInitial fixedKeyPair
44 keysHandshake fixedKeyPair
45 keysAppData updatingKeyPair
46 crypto [numberSpaceCount]cryptoStream
47 tls *tls.QUICConn
48
49
50 retryToken []byte
51
52
53
54 handshakeConfirmed sentVal
55
56 peerAckDelayExponent int8
57
58
59 testSendPingSpace numberSpace
60 testSendPing sentVal
61
62 log *slog.Logger
63 }
64
65
66 type connTestHooks interface {
67
68 init()
69
70
71
72 nextMessage(msgc chan any, nextTimeout time.Time) (now time.Time, message any)
73
74
75 handleTLSEvent(tls.QUICEvent)
76
77
78
79 newConnID(seq int64) ([]byte, error)
80
81
82
83 waitUntil(ctx context.Context, until func() bool) error
84
85
86 timeNow() time.Time
87 }
88
89
90 type newServerConnIDs struct {
91 srcConnID []byte
92 dstConnID []byte
93 originalDstConnID []byte
94 retrySrcConnID []byte
95 }
96
97 func newConn(now time.Time, side connSide, cids newServerConnIDs, peerHostname string, peerAddr netip.AddrPort, config *Config, e *Endpoint) (conn *Conn, _ error) {
98 c := &Conn{
99 side: side,
100 endpoint: e,
101 config: config,
102 peerAddr: unmapAddrPort(peerAddr),
103 msgc: make(chan any, 1),
104 donec: make(chan struct{}),
105 peerAckDelayExponent: -1,
106 }
107 defer func() {
108
109
110
111 if conn == nil {
112 close(c.donec)
113 }
114 }()
115
116
117
118 c.msgc = make(chan any, 1)
119
120 if e.testHooks != nil {
121 e.testHooks.newConn(c)
122 }
123
124
125 var initialConnID []byte
126 if c.side == clientSide {
127 if err := c.connIDState.initClient(c); err != nil {
128 return nil, err
129 }
130 initialConnID, _ = c.connIDState.dstConnID()
131 } else {
132 initialConnID = cids.originalDstConnID
133 if cids.retrySrcConnID != nil {
134 initialConnID = cids.retrySrcConnID
135 }
136 if err := c.connIDState.initServer(c, cids); err != nil {
137 return nil, err
138 }
139 }
140
141
142 c.logConnectionStarted(cids.originalDstConnID, peerAddr)
143 c.keysAppData.init()
144 c.loss.init(c.side, smallestMaxDatagramSize, now)
145 c.streamsInit()
146 c.lifetimeInit()
147 c.restartIdleTimer(now)
148
149 if err := c.startTLS(now, initialConnID, peerHostname, transportParameters{
150 initialSrcConnID: c.connIDState.srcConnID(),
151 originalDstConnID: cids.originalDstConnID,
152 retrySrcConnID: cids.retrySrcConnID,
153 ackDelayExponent: ackDelayExponent,
154 maxUDPPayloadSize: maxUDPPayloadSize,
155 maxAckDelay: maxAckDelay,
156 disableActiveMigration: true,
157 initialMaxData: config.maxConnReadBufferSize(),
158 initialMaxStreamDataBidiLocal: config.maxStreamReadBufferSize(),
159 initialMaxStreamDataBidiRemote: config.maxStreamReadBufferSize(),
160 initialMaxStreamDataUni: config.maxStreamReadBufferSize(),
161 initialMaxStreamsBidi: c.streams.remoteLimit[bidiStream].max,
162 initialMaxStreamsUni: c.streams.remoteLimit[uniStream].max,
163 activeConnIDLimit: activeConnIDLimit,
164 }); err != nil {
165 return nil, err
166 }
167
168 if c.testHooks != nil {
169 c.testHooks.init()
170 }
171 go c.loop(now)
172 return c, nil
173 }
174
175 func (c *Conn) String() string {
176 return fmt.Sprintf("quic.Conn(%v,->%v)", c.side, c.peerAddr)
177 }
178
179
180 func (c *Conn) LocalAddr() netip.AddrPort {
181 return c.localAddr
182 }
183
184
185 func (c *Conn) RemoteAddr() netip.AddrPort {
186 return c.peerAddr
187 }
188
189
190
191 func (c *Conn) confirmHandshake(now time.Time) {
192
193
194
195
196 if c.handshakeConfirmed.isSet() {
197 return
198 }
199 if c.side == serverSide {
200
201 c.handshakeConfirmed.setUnsent()
202 c.endpoint.serverConnEstablished(c)
203 } else {
204
205
206
207 c.handshakeConfirmed.setReceived()
208 }
209 c.restartIdleTimer(now)
210 c.loss.confirmHandshake()
211
212
213 c.discardKeys(now, handshakeSpace)
214 }
215
216
217
218 func (c *Conn) discardKeys(now time.Time, space numberSpace) {
219 if err := c.crypto[space].discardKeys(); err != nil {
220 c.abort(now, err)
221 }
222 switch space {
223 case initialSpace:
224 c.keysInitial.discard()
225 case handshakeSpace:
226 c.keysHandshake.discard()
227 }
228 c.loss.discardKeys(now, c.log, space)
229 }
230
231
232 func (c *Conn) receiveTransportParameters(p transportParameters) error {
233 isRetry := c.retryToken != nil
234 if err := c.connIDState.validateTransportParameters(c, isRetry, p); err != nil {
235 return err
236 }
237 c.streams.outflow.setMaxData(p.initialMaxData)
238 c.streams.localLimit[bidiStream].setMax(p.initialMaxStreamsBidi)
239 c.streams.localLimit[uniStream].setMax(p.initialMaxStreamsUni)
240 c.streams.peerInitialMaxStreamDataBidiLocal = p.initialMaxStreamDataBidiLocal
241 c.streams.peerInitialMaxStreamDataRemote[bidiStream] = p.initialMaxStreamDataBidiRemote
242 c.streams.peerInitialMaxStreamDataRemote[uniStream] = p.initialMaxStreamDataUni
243 c.receivePeerMaxIdleTimeout(p.maxIdleTimeout)
244 c.peerAckDelayExponent = p.ackDelayExponent
245 c.loss.setMaxAckDelay(p.maxAckDelay)
246 if err := c.connIDState.setPeerActiveConnIDLimit(c, p.activeConnIDLimit); err != nil {
247 return err
248 }
249 if p.preferredAddrConnID != nil {
250 var (
251 seq int64 = 1
252 retirePriorTo int64 = 0
253 resetToken [16]byte
254 )
255 copy(resetToken[:], p.preferredAddrResetToken)
256 if err := c.connIDState.handleNewConnID(c, seq, retirePriorTo, p.preferredAddrConnID, resetToken); err != nil {
257 return err
258 }
259 }
260
261
262
263
264 return nil
265 }
266
267 type (
268 timerEvent struct{}
269 wakeEvent struct{}
270 )
271
272 var errIdleTimeout = errors.New("idle timeout")
273
274
275
276
277
278
279
280 func (c *Conn) loop(now time.Time) {
281 defer c.cleanup()
282
283
284
285
286
287
288 var timer *time.Timer
289 var lastTimeout time.Time
290 hooks := c.testHooks
291 if hooks == nil {
292 timer = time.AfterFunc(1*time.Hour, func() {
293 c.sendMsg(timerEvent{})
294 })
295 defer timer.Stop()
296 }
297
298 for c.lifetime.state != connStateDone {
299 sendTimeout := c.maybeSend(now)
300
301
302
303 nextTimeout := sendTimeout
304 nextTimeout = firstTime(nextTimeout, c.idle.nextTimeout)
305 if c.isAlive() {
306 nextTimeout = firstTime(nextTimeout, c.loss.timer)
307 nextTimeout = firstTime(nextTimeout, c.acks[appDataSpace].nextAck)
308 } else {
309 nextTimeout = firstTime(nextTimeout, c.lifetime.drainEndTime)
310 }
311
312 var m any
313 if hooks != nil {
314
315 now, m = hooks.nextMessage(c.msgc, nextTimeout)
316 } else if !nextTimeout.IsZero() && nextTimeout.Before(now) {
317
318 now = time.Now()
319 m = timerEvent{}
320 } else {
321
322
323 if !nextTimeout.Equal(lastTimeout) && !nextTimeout.IsZero() {
324
325
326
327 timer.Reset(nextTimeout.Sub(now))
328 lastTimeout = nextTimeout
329 }
330 m = <-c.msgc
331 now = time.Now()
332 }
333 switch m := m.(type) {
334 case *datagram:
335 if !c.handleDatagram(now, m) {
336 if c.logEnabled(QLogLevelPacket) {
337 c.logPacketDropped(m)
338 }
339 }
340 m.recycle()
341 case timerEvent:
342
343 if c.idleAdvance(now) {
344
345 c.abortImmediately(now, errIdleTimeout)
346 return
347 }
348 c.loss.advance(now, c.handleAckOrLoss)
349 if c.lifetimeAdvance(now) {
350
351
352 return
353 }
354 case wakeEvent:
355
356 case func(time.Time, *Conn):
357
358 m(now, c)
359 default:
360 panic(fmt.Sprintf("quic: unrecognized conn message %T", m))
361 }
362 }
363 }
364
365 func (c *Conn) cleanup() {
366 c.logConnectionClosed()
367 c.endpoint.connDrained(c)
368 c.tls.Close()
369 close(c.donec)
370 }
371
372
373
374
375 func (c *Conn) sendMsg(m any) {
376 select {
377 case c.msgc <- m:
378 case <-c.donec:
379 }
380 }
381
382
383 func (c *Conn) wake() {
384 select {
385 case c.msgc <- wakeEvent{}:
386 default:
387 }
388 }
389
390
391 func (c *Conn) runOnLoop(ctx context.Context, f func(now time.Time, c *Conn)) error {
392 donec := make(chan struct{})
393 msg := func(now time.Time, c *Conn) {
394 defer close(donec)
395 f(now, c)
396 }
397 if c.testHooks != nil {
398
399
400
401
402
403
404 msgc := c.msgc
405 c.testHooks.waitUntil(ctx, func() bool {
406 for {
407 select {
408 case msgc <- msg:
409 msgc = nil
410 case <-donec:
411 return true
412 case <-c.donec:
413 return true
414 default:
415 return false
416 }
417 }
418 })
419 } else {
420 c.sendMsg(msg)
421 }
422 select {
423 case <-donec:
424 case <-c.donec:
425 return errors.New("quic: connection closed")
426 }
427 return nil
428 }
429
430 func (c *Conn) waitOnDone(ctx context.Context, ch <-chan struct{}) error {
431 if c.testHooks != nil {
432 return c.testHooks.waitUntil(ctx, func() bool {
433 select {
434 case <-ch:
435 return true
436 default:
437 }
438 return false
439 })
440 }
441
442
443
444 select {
445 case <-ch:
446 return nil
447 default:
448 }
449 select {
450 case <-ch:
451 case <-ctx.Done():
452 return ctx.Err()
453 }
454 return nil
455 }
456
457
458 func firstTime(a, b time.Time) time.Time {
459 switch {
460 case a.IsZero():
461 return b
462 case b.IsZero():
463 return a
464 case a.Before(b):
465 return a
466 default:
467 return b
468 }
469 }
470
View as plain text