1
2
3
4
5
6
7 package quic
8
9 import (
10 "context"
11 "log/slog"
12 "math"
13 "time"
14 )
15
16 type lossState struct {
17 side connSide
18
19
20
21 handshakeConfirmed bool
22
23
24
25 maxAckDelay time.Duration
26
27
28
29
30 timer time.Time
31
32
33 ptoTimerArmed bool
34
35
36 ptoExpired bool
37
38
39
40 ptoBackoffCount int
41
42
43
44
45
46
47
48
49
50
51
52
53 antiAmplificationLimit int
54
55
56 consecutiveNonAckElicitingPackets int
57
58 rtt rttState
59 pacer pacerState
60 cc *ccReno
61
62
63 spaces [numberSpaceCount]struct {
64 sentPacketList
65 maxAcked packetNumber
66 lastAckEliciting packetNumber
67 }
68
69
70 ackFrameRTT time.Duration
71 ackFrameContainsAckEliciting bool
72 }
73
74 const antiAmplificationUnlimited = math.MaxInt
75
76 func (c *lossState) init(side connSide, maxDatagramSize int, now time.Time) {
77 c.side = side
78 if side == clientSide {
79
80 c.antiAmplificationLimit = antiAmplificationUnlimited
81 }
82 c.rtt.init()
83 c.cc = newReno(maxDatagramSize)
84 c.pacer.init(now, c.cc.congestionWindow, timerGranularity)
85
86
87
88 c.maxAckDelay = 25 * time.Millisecond
89
90 for space := range c.spaces {
91 c.spaces[space].maxAcked = -1
92 c.spaces[space].lastAckEliciting = -1
93 }
94 }
95
96
97 func (c *lossState) setMaxAckDelay(d time.Duration) {
98 if d >= (1<<14)*time.Millisecond {
99
100
101 return
102 }
103 c.maxAckDelay = d
104 }
105
106
107 func (c *lossState) confirmHandshake() {
108 c.handshakeConfirmed = true
109 }
110
111
112
113 func (c *lossState) validateClientAddress() {
114 c.antiAmplificationLimit = antiAmplificationUnlimited
115 }
116
117
118
119
120
121
122
123 const minPacketSize = 128
124
125 type ccLimit int
126
127 const (
128 ccOK = ccLimit(iota)
129 ccBlocked
130 ccLimited
131 ccPaced
132 )
133
134
135
136 func (c *lossState) sendLimit(now time.Time) (limit ccLimit, next time.Time) {
137 if c.antiAmplificationLimit < minPacketSize {
138
139 return ccBlocked, time.Time{}
140 }
141 if c.ptoExpired {
142
143 return ccOK, time.Time{}
144 }
145 if !c.cc.canSend() {
146
147 return ccLimited, time.Time{}
148 }
149 if c.cc.bytesInFlight == 0 {
150
151 return ccOK, time.Time{}
152 }
153 canSend, next := c.pacer.canSend(now)
154 if !canSend {
155
156 return ccPaced, next
157 }
158 return ccOK, time.Time{}
159 }
160
161
162 func (c *lossState) maxSendSize() int {
163 return min(c.antiAmplificationLimit, c.cc.maxDatagramSize)
164 }
165
166
167
168 func (c *lossState) advance(now time.Time, lossf func(numberSpace, *sentPacket, packetFate)) {
169 c.pacer.advance(now, c.cc.congestionWindow, c.rtt.smoothedRTT)
170 if c.ptoTimerArmed && !c.timer.IsZero() && !c.timer.After(now) {
171 c.ptoExpired = true
172 c.timer = time.Time{}
173 c.ptoBackoffCount++
174 }
175 c.detectLoss(now, lossf)
176 }
177
178
179 func (c *lossState) nextNumber(space numberSpace) packetNumber {
180 return c.spaces[space].nextNum
181 }
182
183
184 func (c *lossState) packetSent(now time.Time, log *slog.Logger, space numberSpace, sent *sentPacket) {
185 sent.time = now
186 c.spaces[space].add(sent)
187 size := sent.size
188 if c.antiAmplificationLimit != antiAmplificationUnlimited {
189 c.antiAmplificationLimit = max(0, c.antiAmplificationLimit-size)
190 }
191 if sent.inFlight {
192 c.cc.packetSent(now, log, space, sent)
193 c.pacer.packetSent(now, size, c.cc.congestionWindow, c.rtt.smoothedRTT)
194 if sent.ackEliciting {
195 c.spaces[space].lastAckEliciting = sent.num
196 c.ptoExpired = false
197 }
198 c.scheduleTimer(now)
199 if logEnabled(log, QLogLevelPacket) {
200 logBytesInFlight(log, c.cc.bytesInFlight)
201 }
202 }
203 if sent.ackEliciting {
204 c.consecutiveNonAckElicitingPackets = 0
205 } else {
206 c.consecutiveNonAckElicitingPackets++
207 }
208 }
209
210
211 func (c *lossState) datagramReceived(now time.Time, size int) {
212 if c.antiAmplificationLimit != antiAmplificationUnlimited {
213 c.antiAmplificationLimit += 3 * size
214
215
216
217 c.scheduleTimer(now)
218 if c.ptoTimerArmed && !c.timer.IsZero() && !c.timer.After(now) {
219 c.ptoExpired = true
220 c.timer = time.Time{}
221 }
222 }
223 }
224
225
226
227
228 func (c *lossState) receiveAckStart() {
229 c.ackFrameContainsAckEliciting = false
230 c.ackFrameRTT = -1
231 }
232
233
234
235 func (c *lossState) receiveAckRange(now time.Time, space numberSpace, rangeIndex int, start, end packetNumber, ackf func(numberSpace, *sentPacket, packetFate)) {
236
237
238 if s := c.spaces[space].start(); start < s {
239 start = s
240 }
241 if e := c.spaces[space].end(); end > e {
242 end = e
243 }
244 if start >= end {
245 return
246 }
247 if rangeIndex == 0 {
248
249
250 sent := c.spaces[space].num(end - 1)
251 if !sent.acked {
252 c.ackFrameRTT = max(0, now.Sub(sent.time))
253 }
254 }
255 for pnum := start; pnum < end; pnum++ {
256 sent := c.spaces[space].num(pnum)
257 if sent.acked || sent.lost {
258 continue
259 }
260
261 if pnum > c.spaces[space].maxAcked {
262 c.spaces[space].maxAcked = pnum
263 }
264 sent.acked = true
265 c.cc.packetAcked(now, sent)
266 ackf(space, sent, packetAcked)
267 if sent.ackEliciting {
268 c.ackFrameContainsAckEliciting = true
269 }
270 }
271 }
272
273
274
275 func (c *lossState) receiveAckEnd(now time.Time, log *slog.Logger, space numberSpace, ackDelay time.Duration, lossf func(numberSpace, *sentPacket, packetFate)) {
276 c.spaces[space].sentPacketList.clean()
277
278
279
280 if c.ackFrameRTT >= 0 && c.ackFrameContainsAckEliciting {
281 c.rtt.updateSample(now, c.handshakeConfirmed, space, c.ackFrameRTT, ackDelay, c.maxAckDelay)
282 }
283
284
285
286 if !(c.side == clientSide && space == initialSpace) {
287 c.ptoBackoffCount = 0
288 }
289
290
291
292 c.timer = time.Time{}
293 c.detectLoss(now, lossf)
294 c.cc.packetBatchEnd(now, log, space, &c.rtt, c.maxAckDelay)
295
296 if logEnabled(log, QLogLevelPacket) {
297 var ssthresh slog.Attr
298 if c.cc.slowStartThreshold != math.MaxInt {
299 ssthresh = slog.Int("ssthresh", c.cc.slowStartThreshold)
300 }
301 log.LogAttrs(context.Background(), QLogLevelPacket,
302 "recovery:metrics_updated",
303 slog.Duration("min_rtt", c.rtt.minRTT),
304 slog.Duration("smoothed_rtt", c.rtt.smoothedRTT),
305 slog.Duration("latest_rtt", c.rtt.latestRTT),
306 slog.Duration("rtt_variance", c.rtt.rttvar),
307 slog.Int("congestion_window", c.cc.congestionWindow),
308 slog.Int("bytes_in_flight", c.cc.bytesInFlight),
309 ssthresh,
310 )
311 }
312 }
313
314
315
316
317 func (c *lossState) discardPackets(space numberSpace, log *slog.Logger, lossf func(numberSpace, *sentPacket, packetFate)) {
318 for i := 0; i < c.spaces[space].size; i++ {
319 sent := c.spaces[space].nth(i)
320 sent.lost = true
321 c.cc.packetDiscarded(sent)
322 lossf(numberSpace(space), sent, packetLost)
323 }
324 c.spaces[space].clean()
325 if logEnabled(log, QLogLevelPacket) {
326 logBytesInFlight(log, c.cc.bytesInFlight)
327 }
328 }
329
330
331 func (c *lossState) discardKeys(now time.Time, log *slog.Logger, space numberSpace) {
332
333 for i := 0; i < c.spaces[space].size; i++ {
334 sent := c.spaces[space].nth(i)
335 c.cc.packetDiscarded(sent)
336 }
337 c.spaces[space].discard()
338 c.spaces[space].maxAcked = -1
339 c.spaces[space].lastAckEliciting = -1
340 c.scheduleTimer(now)
341 if logEnabled(log, QLogLevelPacket) {
342 logBytesInFlight(log, c.cc.bytesInFlight)
343 }
344 }
345
346 func (c *lossState) lossDuration() time.Duration {
347
348 return max((9*max(c.rtt.smoothedRTT, c.rtt.latestRTT))/8, timerGranularity)
349 }
350
351 func (c *lossState) detectLoss(now time.Time, lossf func(numberSpace, *sentPacket, packetFate)) {
352
353 const lossThreshold = 3
354
355 lossTime := now.Add(-c.lossDuration())
356 for space := numberSpace(0); space < numberSpaceCount; space++ {
357 for i := 0; i < c.spaces[space].size; i++ {
358 sent := c.spaces[space].nth(i)
359 if sent.lost || sent.acked {
360 continue
361 }
362
363
364
365
366
367 switch {
368 case c.spaces[space].maxAcked-sent.num >= lossThreshold:
369
370
371 fallthrough
372 case sent.num <= c.spaces[space].maxAcked && !sent.time.After(lossTime):
373
374
375 sent.lost = true
376 lossf(space, sent, packetLost)
377 if sent.inFlight {
378 c.cc.packetLost(now, space, sent, &c.rtt)
379 }
380 }
381 if !sent.lost {
382 break
383 }
384 }
385 c.spaces[space].clean()
386 }
387 c.scheduleTimer(now)
388 }
389
390
391
392
393
394
395
396
397
398
399 func (c *lossState) scheduleTimer(now time.Time) {
400 c.ptoTimerArmed = false
401
402
403
404
405
406 var oldestPotentiallyLost time.Time
407 for space := numberSpace(0); space < numberSpaceCount; space++ {
408 if c.spaces[space].size > 0 && c.spaces[space].start() <= c.spaces[space].maxAcked {
409 firstTime := c.spaces[space].nth(0).time
410 if oldestPotentiallyLost.IsZero() || firstTime.Before(oldestPotentiallyLost) {
411 oldestPotentiallyLost = firstTime
412 }
413 }
414 }
415 if !oldestPotentiallyLost.IsZero() {
416 c.timer = oldestPotentiallyLost.Add(c.lossDuration())
417 return
418 }
419
420
421 if c.ptoExpired {
422
423 c.timer = time.Time{}
424 return
425 }
426 if c.antiAmplificationLimit >= 0 && c.antiAmplificationLimit < minPacketSize {
427
428
429 c.timer = time.Time{}
430 return
431 }
432
433
434
435 var last time.Time
436 if !c.handshakeConfirmed {
437 for space := initialSpace; space <= handshakeSpace; space++ {
438 sent := c.spaces[space].num(c.spaces[space].lastAckEliciting)
439 if sent == nil {
440 continue
441 }
442 if last.IsZero() || last.After(sent.time) {
443 last = sent.time
444 }
445 }
446 } else {
447 sent := c.spaces[appDataSpace].num(c.spaces[appDataSpace].lastAckEliciting)
448 if sent != nil {
449 last = sent.time
450 }
451 }
452 if last.IsZero() &&
453 c.side == clientSide &&
454 c.spaces[handshakeSpace].maxAcked < 0 &&
455 !c.handshakeConfirmed {
456
457
458
459 if !c.timer.IsZero() {
460
461
462 c.ptoTimerArmed = true
463 return
464 }
465 last = now
466 } else if last.IsZero() {
467 c.timer = time.Time{}
468 return
469 }
470 c.timer = last.Add(c.ptoPeriod())
471 c.ptoTimerArmed = true
472 }
473
474 func (c *lossState) ptoPeriod() time.Duration {
475
476 return c.ptoBasePeriod() << c.ptoBackoffCount
477 }
478
479 func (c *lossState) ptoBasePeriod() time.Duration {
480
481 pto := c.rtt.smoothedRTT + max(4*c.rtt.rttvar, timerGranularity)
482 if c.handshakeConfirmed {
483
484
485
486 pto += c.maxAckDelay
487 }
488 return pto
489 }
490
491 func logBytesInFlight(log *slog.Logger, bytesInFlight int) {
492 log.LogAttrs(context.Background(), QLogLevelPacket,
493 "recovery:metrics_updated",
494 slog.Int("bytes_in_flight", bytesInFlight),
495 )
496 }
497
View as plain text