1
18
19 package latency
20
21 import (
22 "bytes"
23 "fmt"
24 "net"
25 "reflect"
26 "sync"
27 "testing"
28 "time"
29
30 "google.golang.org/grpc/internal/grpctest"
31 )
32
33 type s struct {
34 grpctest.Tester
35 }
36
37 func Test(t *testing.T) {
38 grpctest.RunSubTests(t, s{})
39 }
40
41
42 type bufConn struct {
43 *bytes.Buffer
44 }
45
46 func (bufConn) Close() error { panic("unimplemented") }
47 func (bufConn) LocalAddr() net.Addr { panic("unimplemented") }
48 func (bufConn) RemoteAddr() net.Addr { panic("unimplemented") }
49 func (bufConn) SetDeadline(t time.Time) error { panic("unimplemneted") }
50 func (bufConn) SetReadDeadline(t time.Time) error { panic("unimplemneted") }
51 func (bufConn) SetWriteDeadline(t time.Time) error { panic("unimplemneted") }
52
53 func restoreHooks() func() {
54 s := sleep
55 n := now
56 return func() {
57 sleep = s
58 now = n
59 }
60 }
61
62 func (s) TestConn(t *testing.T) {
63 defer restoreHooks()()
64
65
66 now = func() time.Time { return time.Unix(123, 456) }
67
68
69 var sleepTimes []time.Duration
70 sleep = func(t time.Duration) { sleepTimes = append(sleepTimes, t) }
71
72 wantSleeps := func(want ...time.Duration) {
73 if !reflect.DeepEqual(want, sleepTimes) {
74 t.Fatalf("sleepTimes = %v; want %v", sleepTimes, want)
75 }
76 sleepTimes = nil
77 }
78
79
80
81 latency := 1 * time.Second
82 c, err := (&Network{Kbps: 1, Latency: latency, MTU: 5}).Conn(bufConn{&bytes.Buffer{}})
83 if err != nil {
84 t.Fatalf("Unexpected error creating connection: %v", err)
85 }
86 wantSleeps(latency)
87
88
89 byteLatency := time.Second / 128
90
91 write := func(b []byte) {
92 n, err := c.Write(b)
93 if n != len(b) || err != nil {
94 t.Fatalf("c.Write(%v) = %v, %v; want %v, nil", b, n, err, len(b))
95 }
96 }
97
98 write([]byte{1, 2, 3, 4, 5})
99 pkt1Time := latency + byteLatency*5
100 write([]byte{6})
101 pkt2Time := pkt1Time + byteLatency
102 write([]byte{7, 8, 9, 10, 11, 12, 13})
103 pkt3Time := pkt2Time + byteLatency*5
104 pkt4Time := pkt3Time + byteLatency*2
105
106
107 wantSleeps()
108
109 read := func(n int, want []byte) {
110 b := make([]byte, n)
111 if rd, err := c.Read(b); err != nil || rd != len(want) {
112 t.Fatalf("c.Read(<%v bytes>) = %v, %v; want %v, nil", n, rd, err, len(want))
113 }
114 if !reflect.DeepEqual(b[:len(want)], want) {
115 t.Fatalf("read %v; want %v", b, want)
116 }
117 }
118
119 read(1, []byte{1})
120 wantSleeps(pkt1Time)
121 read(1, []byte{2})
122 wantSleeps()
123 read(3, []byte{3, 4, 5})
124 wantSleeps()
125 read(2, []byte{6})
126 wantSleeps(pkt2Time)
127 read(2, []byte{7, 8})
128 wantSleeps(pkt3Time)
129 read(10, []byte{9, 10, 11})
130 wantSleeps()
131 read(10, []byte{12, 13})
132 wantSleeps(pkt4Time)
133 }
134
135 func (s) TestSync(t *testing.T) {
136 defer restoreHooks()()
137
138
139 tn := time.Unix(123, 0)
140 now = func() time.Time { return tn }
141 sleep = func(d time.Duration) { tn = tn.Add(d) }
142
143
144
145 slowConn, err := (&Network{Kbps: 0, Latency: 20 * time.Millisecond, MTU: 5}).Conn(bufConn{&bytes.Buffer{}})
146 if err != nil {
147 t.Fatalf("Unexpected error creating connection: %v", err)
148 }
149 c, err := (&Network{Latency: 30 * time.Millisecond}).Conn(slowConn)
150 if err != nil {
151 t.Fatalf("Unexpected error creating connection: %v", err)
152 }
153 if c.(*conn).delay != 10*time.Millisecond {
154 t.Fatalf("c.delay = %v; want 10ms", c.(*conn).delay)
155 }
156 }
157
158 func (s) TestSyncTooSlow(t *testing.T) {
159 defer restoreHooks()()
160
161
162 tn := time.Unix(123, 0)
163 now = func() time.Time { return tn }
164 sleep = func(d time.Duration) { tn = tn.Add(d) }
165
166
167
168 slowConn, err := (&Network{Kbps: 0, Latency: 10 * time.Millisecond, MTU: 5}).Conn(bufConn{&bytes.Buffer{}})
169 if err != nil {
170 t.Fatalf("Unexpected error creating connection: %v", err)
171 }
172
173 errWant := "measured network latency (10ms) higher than desired latency (5ms)"
174 if _, err := (&Network{Latency: 5 * time.Millisecond}).Conn(slowConn); err == nil || err.Error() != errWant {
175 t.Fatalf("Conn() = _, %q; want _, %q", err, errWant)
176 }
177 }
178
179 func (s) TestListenerAndDialer(t *testing.T) {
180 defer restoreHooks()()
181
182 tn := time.Unix(123, 0)
183 startTime := tn
184 mu := &sync.Mutex{}
185 now = func() time.Time {
186 mu.Lock()
187 defer mu.Unlock()
188 return tn
189 }
190
191
192
193 n := &Network{Kbps: 2, Latency: 1 * time.Second, MTU: 10}
194
195 byteLatency := func(n int) time.Duration {
196 return time.Duration(n) * time.Second / 256
197 }
198
199
200 l, err := net.Listen("tcp", "localhost:0")
201 if err != nil {
202 t.Fatalf("Unexpected error creating listener: %v", err)
203 }
204 defer l.Close()
205 l = n.Listener(l)
206
207 var serverConn net.Conn
208 var scErr error
209 scDone := make(chan struct{})
210 go func() {
211 serverConn, scErr = l.Accept()
212 close(scDone)
213 }()
214
215
216 clientConn, err := n.TimeoutDialer(net.DialTimeout)("tcp", l.Addr().String(), 2*time.Second)
217 if err != nil {
218 t.Fatalf("Unexpected error dialing: %v", err)
219 }
220 defer clientConn.Close()
221
222
223 <-scDone
224 if scErr != nil {
225 t.Fatalf("Unexpected error listening: %v", scErr)
226 }
227 defer serverConn.Close()
228
229
230 sleep = func(d time.Duration) {
231 mu.Lock()
232 defer mu.Unlock()
233 if d > 0 {
234 tn = tn.Add(d)
235 }
236 }
237
238 seq := func(a, b int) []byte {
239 buf := make([]byte, b-a)
240 for i := 0; i < b-a; i++ {
241 buf[i] = byte(i + a)
242 }
243 return buf
244 }
245
246 pkt1 := seq(0, 10)
247 pkt2 := seq(10, 30)
248 pkt3 := seq(30, 35)
249
250 write := func(c net.Conn, b []byte) {
251 n, err := c.Write(b)
252 if n != len(b) || err != nil {
253 t.Fatalf("c.Write(%v) = %v, %v; want %v, nil", b, n, err, len(b))
254 }
255 }
256
257 write(serverConn, pkt1)
258 write(serverConn, pkt2)
259 write(serverConn, pkt3)
260 write(clientConn, pkt3)
261 write(clientConn, pkt1)
262 write(clientConn, pkt2)
263
264 if tn != startTime {
265 t.Fatalf("unexpected sleep in write; tn = %v; want %v", tn, startTime)
266 }
267
268 read := func(c net.Conn, n int, want []byte, timeWant time.Time) {
269 b := make([]byte, n)
270 if rd, err := c.Read(b); err != nil || rd != len(want) {
271 t.Fatalf("c.Read(<%v bytes>) = %v, %v; want %v, nil (read: %v)", n, rd, err, len(want), b[:rd])
272 }
273 if !reflect.DeepEqual(b[:len(want)], want) {
274 t.Fatalf("read %v; want %v", b, want)
275 }
276 if !tn.Equal(timeWant) {
277 t.Errorf("tn after read(%v) = %v; want %v", want, tn, timeWant)
278 }
279 }
280
281 read(clientConn, len(pkt1)+1, pkt1, startTime.Add(n.Latency+byteLatency(len(pkt1))))
282 read(serverConn, len(pkt3)+1, pkt3, tn)
283
284 read(clientConn, len(pkt2), pkt2[:10], startTime.Add(n.Latency+byteLatency(len(pkt1)+10)))
285 read(clientConn, len(pkt2), pkt2[10:], startTime.Add(n.Latency+byteLatency(len(pkt1)+len(pkt2))))
286 read(clientConn, len(pkt3), pkt3, startTime.Add(n.Latency+byteLatency(len(pkt1)+len(pkt2)+len(pkt3))))
287
288 read(serverConn, len(pkt1), pkt1, tn)
289 read(serverConn, len(pkt2), pkt2[:10], tn)
290 read(serverConn, len(pkt2), pkt2[10:], tn)
291
292
293
294 sleep(10 * time.Second)
295 write(clientConn, pkt1)
296 read(serverConn, len(pkt1), pkt1, tn.Add(n.Latency+byteLatency(len(pkt1))))
297
298
299
300 write(serverConn, pkt1)
301 sleep(10 * time.Second)
302 read(clientConn, len(pkt1), pkt1, tn)
303 }
304
305 func (s) TestBufferBloat(t *testing.T) {
306 defer restoreHooks()()
307
308
309 tn := time.Unix(123, 0)
310 now = func() time.Time { return tn }
311
312 var sleepTimes []time.Duration
313 sleep = func(d time.Duration) {
314 sleepTimes = append(sleepTimes, d)
315 tn = tn.Add(d)
316 }
317
318 wantSleeps := func(want ...time.Duration) error {
319 if !reflect.DeepEqual(want, sleepTimes) {
320 return fmt.Errorf("sleepTimes = %v; want %v", sleepTimes, want)
321 }
322 sleepTimes = nil
323 return nil
324 }
325
326 n := &Network{Kbps: 8 , Latency: time.Second, MTU: 8}
327 bdpBytes := (n.Kbps * 1024 / 8) * int(n.Latency/time.Second)
328 c, err := n.Conn(bufConn{&bytes.Buffer{}})
329 if err != nil {
330 t.Fatalf("Unexpected error creating connection: %v", err)
331 }
332 wantSleeps(n.Latency)
333
334 write := func(n int, sleeps ...time.Duration) {
335 if wt, err := c.Write(make([]byte, n)); err != nil || wt != n {
336 t.Fatalf("c.Write(<%v bytes>) = %v, %v; want %v, nil", n, wt, err, n)
337 }
338 if err := wantSleeps(sleeps...); err != nil {
339 t.Fatalf("After writing %v bytes: %v", n, err)
340 }
341 }
342
343 read := func(n int, sleeps ...time.Duration) {
344 if rd, err := c.Read(make([]byte, n)); err != nil || rd != n {
345 t.Fatalf("c.Read(_) = %v, %v; want %v, nil", rd, err, n)
346 }
347 if err := wantSleeps(sleeps...); err != nil {
348 t.Fatalf("After reading %v bytes: %v", n, err)
349 }
350 }
351
352 write(8)
353 read(8, time.Second+n.pktTime(8))
354
355 write(bdpBytes)
356 write(1)
357 write(n.MTU, n.pktTime(1))
358 write(1, n.pktTime(n.MTU))
359 write(n.MTU+1, n.pktTime(1), n.pktTime(n.MTU))
360
361 tn = tn.Add(10 * time.Second)
362 write(bdpBytes)
363 }
364
View as plain text