1
18
19
20
21
22 package latency
23
24 import (
25 "bytes"
26 "context"
27 "encoding/binary"
28 "fmt"
29 "io"
30 "net"
31 "time"
32 )
33
34
35 type Dialer func(network, address string) (net.Conn, error)
36
37
38 type TimeoutDialer func(network, address string, timeout time.Duration) (net.Conn, error)
39
40
41
42 type ContextDialer func(ctx context.Context, network, address string) (net.Conn, error)
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59 type Network struct {
60 Kbps int
61 Latency time.Duration
62 MTU int
63 }
64
65 var (
66
67 Local = Network{0, 0, 0}
68
69 LAN = Network{100 * 1024, 2 * time.Millisecond, 1500}
70
71 WAN = Network{20 * 1024, 30 * time.Millisecond, 1500}
72
73 Longhaul = Network{1000 * 1024, 200 * time.Millisecond, 9000}
74 )
75
76
77
78
79
80 func (n *Network) Conn(c net.Conn) (net.Conn, error) {
81 start := now()
82 nc := &conn{Conn: c, network: n, readBuf: new(bytes.Buffer)}
83 if err := nc.sync(); err != nil {
84 return nil, err
85 }
86 sleep(start.Add(nc.delay).Sub(now()))
87 return nc, nil
88 }
89
90 type conn struct {
91 net.Conn
92 network *Network
93
94 readBuf *bytes.Buffer
95 lastSendEnd time.Time
96 delay time.Duration
97 }
98
99
100 type header struct {
101 ReadTime int64
102 Sz int32
103 }
104
105 func (c *conn) Write(p []byte) (n int, err error) {
106 tNow := now()
107 if c.lastSendEnd.Before(tNow) {
108 c.lastSendEnd = tNow
109 }
110 for len(p) > 0 {
111 pkt := p
112 if c.network.MTU > 0 && len(pkt) > c.network.MTU {
113 pkt = pkt[:c.network.MTU]
114 p = p[c.network.MTU:]
115 } else {
116 p = nil
117 }
118 if c.network.Kbps > 0 {
119 if congestion := c.lastSendEnd.Sub(tNow) - c.delay; congestion > 0 {
120
121 sleep(congestion)
122 tNow = tNow.Add(congestion)
123 }
124 }
125 c.lastSendEnd = c.lastSendEnd.Add(c.network.pktTime(len(pkt)))
126 hdr := header{ReadTime: c.lastSendEnd.Add(c.delay).UnixNano(), Sz: int32(len(pkt))}
127 if err := binary.Write(c.Conn, binary.BigEndian, hdr); err != nil {
128 return n, err
129 }
130 x, err := c.Conn.Write(pkt)
131 n += x
132 if err != nil {
133 return n, err
134 }
135 }
136 return n, nil
137 }
138
139 func (c *conn) Read(p []byte) (n int, err error) {
140 if c.readBuf.Len() == 0 {
141 var hdr header
142 if err := binary.Read(c.Conn, binary.BigEndian, &hdr); err != nil {
143 return 0, err
144 }
145 defer func() { sleep(time.Unix(0, hdr.ReadTime).Sub(now())) }()
146
147 if _, err := io.CopyN(c.readBuf, c.Conn, int64(hdr.Sz)); err != nil {
148 return 0, err
149 }
150 }
151
152 return c.readBuf.Read(p)
153 }
154
155
156
157 func (c *conn) sync() error {
158 const (
159 pingMsg = "syncPing"
160 warmup = 10
161 giveUp = 50
162 accuracy = time.Millisecond
163 goodRun = 3
164 )
165
166 type syncMsg struct {
167 SendT int64
168 RecvT int64
169 }
170
171
172 if err := binary.Write(c.Conn, binary.BigEndian, []byte(pingMsg)); err != nil {
173 return err
174 }
175 var ping [8]byte
176 if err := binary.Read(c.Conn, binary.BigEndian, &ping); err != nil {
177 return err
178 } else if string(ping[:]) != pingMsg {
179 return fmt.Errorf("malformed handshake message: %v (want %q)", ping, pingMsg)
180 }
181
182
183 att := 0
184 good := 0
185 var latency time.Duration
186 localDone, remoteDone := false, false
187 send := true
188 for !localDone || !remoteDone {
189 if send {
190 if err := binary.Write(c.Conn, binary.BigEndian, syncMsg{SendT: now().UnixNano()}); err != nil {
191 return err
192 }
193 att++
194 send = false
195 }
196
197
198 m := syncMsg{}
199 if err := binary.Read(c.Conn, binary.BigEndian, &m); err != nil {
200 return err
201 }
202
203 if m.RecvT == 0 {
204
205 if m.SendT == 0 {
206 remoteDone = true
207 continue
208 }
209
210 m.RecvT = now().UnixNano()
211 if err := binary.Write(c.Conn, binary.BigEndian, m); err != nil {
212 return err
213 }
214 continue
215 }
216
217 lag := time.Duration(m.RecvT - m.SendT)
218 latency += lag
219 avgLatency := latency / time.Duration(att)
220 if e := lag - avgLatency; e > -accuracy && e < accuracy {
221 good++
222 } else {
223 good = 0
224 }
225 if att < giveUp && (att < warmup || good < goodRun) {
226 send = true
227 continue
228 }
229 localDone = true
230 latency = avgLatency
231
232 if err := binary.Write(c.Conn, binary.BigEndian, syncMsg{}); err != nil {
233 return err
234 }
235 }
236 if c.network.Latency <= 0 {
237 return nil
238 }
239 c.delay = c.network.Latency - latency
240 if c.delay < 0 {
241 return fmt.Errorf("measured network latency (%v) higher than desired latency (%v)", latency, c.network.Latency)
242 }
243 return nil
244 }
245
246
247
248 func (n *Network) Listener(l net.Listener) net.Listener {
249 return &listener{Listener: l, network: n}
250 }
251
252 type listener struct {
253 net.Listener
254 network *Network
255 }
256
257 func (l *listener) Accept() (net.Conn, error) {
258 c, err := l.Listener.Accept()
259 if err != nil {
260 return nil, err
261 }
262 return l.network.Conn(c)
263 }
264
265
266
267 func (n *Network) Dialer(d Dialer) Dialer {
268 return func(network, address string) (net.Conn, error) {
269 conn, err := d(network, address)
270 if err != nil {
271 return nil, err
272 }
273 return n.Conn(conn)
274 }
275 }
276
277
278
279
280 func (n *Network) TimeoutDialer(d TimeoutDialer) TimeoutDialer {
281 return func(network, address string, timeout time.Duration) (net.Conn, error) {
282 conn, err := d(network, address, timeout)
283 if err != nil {
284 return nil, err
285 }
286 return n.Conn(conn)
287 }
288 }
289
290
291
292
293 func (n *Network) ContextDialer(d ContextDialer) ContextDialer {
294 return func(ctx context.Context, network, address string) (net.Conn, error) {
295 conn, err := d(ctx, network, address)
296 if err != nil {
297 return nil, err
298 }
299 return n.Conn(conn)
300 }
301 }
302
303
304
305 func (n *Network) pktTime(b int) time.Duration {
306 if n.Kbps <= 0 {
307 return time.Duration(0)
308 }
309 return time.Duration(b) * time.Second / time.Duration(n.Kbps*(1024/8))
310 }
311
312
313
314 var now = time.Now
315 var sleep = time.Sleep
316
View as plain text