...
1
2
3
4
5 package http2
6
7 import (
8 "bytes"
9 "context"
10 "errors"
11 "io"
12 "math"
13 "net"
14 "net/netip"
15 "os"
16 "sync"
17 "time"
18 )
19
20
21
22
23
24
25
26 func synctestNetPipe(group *synctestGroup) (r, w *synctestNetConn) {
27 s1addr := net.TCPAddrFromAddrPort(netip.MustParseAddrPort("127.0.0.1:8000"))
28 s2addr := net.TCPAddrFromAddrPort(netip.MustParseAddrPort("127.0.0.1:8001"))
29 s1 := newSynctestNetConnHalf(s1addr)
30 s2 := newSynctestNetConnHalf(s2addr)
31 r = &synctestNetConn{group: group, loc: s1, rem: s2}
32 w = &synctestNetConn{group: group, loc: s2, rem: s1}
33 r.peer = w
34 w.peer = r
35 return r, w
36 }
37
38
39 type synctestNetConn struct {
40 group *synctestGroup
41
42
43
44
45 loc, rem *synctestNetConnHalf
46
47
48 autoWait bool
49
50
51 peer *synctestNetConn
52 }
53
54
55 func (c *synctestNetConn) Read(b []byte) (n int, err error) {
56 if c.autoWait {
57 c.group.Wait()
58 }
59 return c.loc.read(b)
60 }
61
62
63
64 func (c *synctestNetConn) Peek() []byte {
65 if c.autoWait {
66 c.group.Wait()
67 }
68 return c.loc.peek()
69 }
70
71
72 func (c *synctestNetConn) Write(b []byte) (n int, err error) {
73 if c.autoWait {
74 defer c.group.Wait()
75 }
76 return c.rem.write(b)
77 }
78
79
80 func (c *synctestNetConn) IsClosedByPeer() bool {
81 if c.autoWait {
82 c.group.Wait()
83 }
84 return c.loc.isClosedByPeer()
85 }
86
87
88 func (c *synctestNetConn) Close() error {
89 c.loc.setWriteError(errors.New("connection closed by peer"))
90 c.rem.setReadError(io.EOF)
91 if c.autoWait {
92 c.group.Wait()
93 }
94 return nil
95 }
96
97
98 func (c *synctestNetConn) LocalAddr() net.Addr {
99 return c.loc.addr
100 }
101
102
103 func (c *synctestNetConn) RemoteAddr() net.Addr {
104 return c.rem.addr
105 }
106
107
108 func (c *synctestNetConn) SetDeadline(t time.Time) error {
109 c.SetReadDeadline(t)
110 c.SetWriteDeadline(t)
111 return nil
112 }
113
114
115 func (c *synctestNetConn) SetReadDeadline(t time.Time) error {
116 c.loc.rctx.setDeadline(c.group, t)
117 return nil
118 }
119
120
121 func (c *synctestNetConn) SetWriteDeadline(t time.Time) error {
122 c.rem.wctx.setDeadline(c.group, t)
123 return nil
124 }
125
126
127
128 func (c *synctestNetConn) SetReadBufferSize(size int) {
129 c.loc.setReadBufferSize(size)
130 }
131
132
133
134 type synctestNetConnHalf struct {
135 addr net.Addr
136
137
138 rctx, wctx deadlineContext
139
140
141
142
143
144
145
146 lockr chan struct{}
147 lockw chan struct{}
148 lockrw chan struct{}
149 lockc chan struct{}
150
151 bufMax int
152 buf bytes.Buffer
153 readErr error
154 writeErr error
155 }
156
157 func newSynctestNetConnHalf(addr net.Addr) *synctestNetConnHalf {
158 h := &synctestNetConnHalf{
159 addr: addr,
160 lockw: make(chan struct{}, 1),
161 lockr: make(chan struct{}, 1),
162 lockrw: make(chan struct{}, 1),
163 lockc: make(chan struct{}, 1),
164 bufMax: math.MaxInt,
165 }
166 h.unlock()
167 return h
168 }
169
170 func (h *synctestNetConnHalf) lock() {
171 select {
172 case <-h.lockw:
173 case <-h.lockr:
174 case <-h.lockrw:
175 case <-h.lockc:
176 }
177 }
178
179 func (h *synctestNetConnHalf) unlock() {
180 canRead := h.readErr != nil || h.buf.Len() > 0
181 canWrite := h.writeErr != nil || h.bufMax > h.buf.Len()
182 switch {
183 case canRead && canWrite:
184 h.lockrw <- struct{}{}
185 case canRead:
186 h.lockr <- struct{}{}
187 case canWrite:
188 h.lockw <- struct{}{}
189 default:
190 h.lockc <- struct{}{}
191 }
192 }
193
194 func (h *synctestNetConnHalf) readWaitAndLock() error {
195 select {
196 case <-h.lockr:
197 return nil
198 case <-h.lockrw:
199 return nil
200 default:
201 }
202 ctx := h.rctx.context()
203 select {
204 case <-h.lockr:
205 return nil
206 case <-h.lockrw:
207 return nil
208 case <-ctx.Done():
209 return context.Cause(ctx)
210 }
211 }
212
213 func (h *synctestNetConnHalf) writeWaitAndLock() error {
214 select {
215 case <-h.lockw:
216 return nil
217 case <-h.lockrw:
218 return nil
219 default:
220 }
221 ctx := h.wctx.context()
222 select {
223 case <-h.lockw:
224 return nil
225 case <-h.lockrw:
226 return nil
227 case <-ctx.Done():
228 return context.Cause(ctx)
229 }
230 }
231
232 func (h *synctestNetConnHalf) peek() []byte {
233 h.lock()
234 defer h.unlock()
235 return h.buf.Bytes()
236 }
237
238 func (h *synctestNetConnHalf) isClosedByPeer() bool {
239 h.lock()
240 defer h.unlock()
241 return h.readErr != nil
242 }
243
244 func (h *synctestNetConnHalf) read(b []byte) (n int, err error) {
245 if err := h.readWaitAndLock(); err != nil {
246 return 0, err
247 }
248 defer h.unlock()
249 if h.buf.Len() == 0 && h.readErr != nil {
250 return 0, h.readErr
251 }
252 return h.buf.Read(b)
253 }
254
255 func (h *synctestNetConnHalf) setReadBufferSize(size int) {
256 h.lock()
257 defer h.unlock()
258 h.bufMax = size
259 }
260
261 func (h *synctestNetConnHalf) write(b []byte) (n int, err error) {
262 for n < len(b) {
263 nn, err := h.writePartial(b[n:])
264 n += nn
265 if err != nil {
266 return n, err
267 }
268 }
269 return n, nil
270 }
271
272 func (h *synctestNetConnHalf) writePartial(b []byte) (n int, err error) {
273 if err := h.writeWaitAndLock(); err != nil {
274 return 0, err
275 }
276 defer h.unlock()
277 if h.writeErr != nil {
278 return 0, h.writeErr
279 }
280 writeMax := h.bufMax - h.buf.Len()
281 if writeMax < len(b) {
282 b = b[:writeMax]
283 }
284 return h.buf.Write(b)
285 }
286
287 func (h *synctestNetConnHalf) setReadError(err error) {
288 h.lock()
289 defer h.unlock()
290 if h.readErr == nil {
291 h.readErr = err
292 }
293 }
294
295 func (h *synctestNetConnHalf) setWriteError(err error) {
296 h.lock()
297 defer h.unlock()
298 if h.writeErr == nil {
299 h.writeErr = err
300 }
301 }
302
303
304 type deadlineContext struct {
305 mu sync.Mutex
306 ctx context.Context
307 cancel context.CancelCauseFunc
308 timer timer
309 }
310
311
312 func (t *deadlineContext) context() context.Context {
313 t.mu.Lock()
314 defer t.mu.Unlock()
315 if t.ctx == nil {
316 t.ctx, t.cancel = context.WithCancelCause(context.Background())
317 }
318 return t.ctx
319 }
320
321
322 func (t *deadlineContext) setDeadline(group *synctestGroup, deadline time.Time) {
323 t.mu.Lock()
324 defer t.mu.Unlock()
325
326
327 if t.ctx == nil || t.cancel == nil {
328 t.ctx, t.cancel = context.WithCancelCause(context.Background())
329 }
330
331 if t.timer != nil {
332 t.timer.Stop()
333 }
334 if deadline.IsZero() {
335
336 return
337 }
338 if !deadline.After(group.Now()) {
339
340 t.cancel(os.ErrDeadlineExceeded)
341 t.cancel = nil
342 return
343 }
344 if t.timer != nil {
345
346 t.timer.Reset(deadline.Sub(group.Now()))
347 return
348 }
349
350 t.timer = group.AfterFunc(deadline.Sub(group.Now()), func() {
351 t.mu.Lock()
352 defer t.mu.Unlock()
353 t.cancel(os.ErrDeadlineExceeded)
354 t.cancel = nil
355 })
356 }
357
View as plain text