1
16
17 package spdy
18
19 import (
20 "fmt"
21 "io"
22 "net"
23 "net/http"
24 "sync"
25 "sync/atomic"
26 "testing"
27 "time"
28
29 "github.com/moby/spdystream"
30 "k8s.io/apimachinery/pkg/util/httpstream"
31 )
32
33 func runProxy(t *testing.T, backendUrl string, proxyUrl chan<- string, proxyDone chan<- struct{}, errCh chan<- error) {
34 listener, err := net.Listen("tcp4", "localhost:0")
35 if err != nil {
36 errCh <- err
37 return
38 }
39 defer listener.Close()
40
41 proxyUrl <- listener.Addr().String()
42
43 clientConn, err := listener.Accept()
44 if err != nil {
45 t.Errorf("proxy: error accepting client connection: %v", err)
46 return
47 }
48
49 backendConn, err := net.Dial("tcp4", backendUrl)
50 if err != nil {
51 t.Errorf("proxy: error dialing backend: %v", err)
52 return
53 }
54 defer backendConn.Close()
55
56 var wg sync.WaitGroup
57 wg.Add(2)
58
59 go func() {
60 defer wg.Done()
61 io.Copy(backendConn, clientConn)
62 }()
63
64 go func() {
65 defer wg.Done()
66 io.Copy(clientConn, backendConn)
67 }()
68
69 wg.Wait()
70
71 proxyDone <- struct{}{}
72 }
73
74 func runServer(t *testing.T, backendUrl chan<- string, serverDone chan<- struct{}, errCh chan<- error) {
75 listener, err := net.Listen("tcp4", "localhost:0")
76 if err != nil {
77 errCh <- err
78 return
79 }
80 defer listener.Close()
81
82 backendUrl <- listener.Addr().String()
83
84 conn, err := listener.Accept()
85 if err != nil {
86 t.Errorf("server: error accepting connection: %v", err)
87 return
88 }
89
90 streamChan := make(chan httpstream.Stream)
91 replySentChan := make(chan (<-chan struct{}))
92 spdyConn, err := NewServerConnection(conn, func(stream httpstream.Stream, replySent <-chan struct{}) error {
93 streamChan <- stream
94 replySentChan <- replySent
95 return nil
96 })
97 if err != nil {
98 t.Errorf("server: error creating spdy connection: %v", err)
99 return
100 }
101
102 stream := <-streamChan
103 replySent := <-replySentChan
104 <-replySent
105
106 buf := make([]byte, 1)
107 _, err = stream.Read(buf)
108 if err != io.EOF {
109 t.Errorf("server: unexpected read error: %v", err)
110 return
111 }
112
113 <-spdyConn.CloseChan()
114 raw := spdyConn.(*connection).conn
115 if err := raw.Wait(15 * time.Second); err != nil {
116 t.Errorf("server: timed out waiting for connection closure: %v", err)
117 }
118
119 serverDone <- struct{}{}
120 }
121
122 func TestConnectionCloseIsImmediateThroughAProxy(t *testing.T) {
123 errCh := make(chan error)
124
125 serverDone := make(chan struct{}, 1)
126 backendUrlChan := make(chan string)
127 go runServer(t, backendUrlChan, serverDone, errCh)
128
129 var backendUrl string
130 select {
131 case err := <-errCh:
132 t.Fatalf("server: error listening: %v", err)
133 case backendUrl = <-backendUrlChan:
134 }
135
136 proxyDone := make(chan struct{}, 1)
137 proxyUrlChan := make(chan string)
138 go runProxy(t, backendUrl, proxyUrlChan, proxyDone, errCh)
139
140 var proxyUrl string
141 select {
142 case err := <-errCh:
143 t.Fatalf("error listening: %v", err)
144 case proxyUrl = <-proxyUrlChan:
145 }
146
147 conn, err := net.Dial("tcp4", proxyUrl)
148 if err != nil {
149 t.Fatalf("client: error connecting to proxy: %v", err)
150 }
151
152 spdyConn, err := NewClientConnection(conn)
153 if err != nil {
154 t.Fatalf("client: error creating spdy connection: %v", err)
155 }
156
157 if _, err := spdyConn.CreateStream(http.Header{}); err != nil {
158 t.Fatalf("client: error creating stream: %v", err)
159 }
160
161 spdyConn.Close()
162 raw := spdyConn.(*connection).conn
163 if err := raw.Wait(15 * time.Second); err != nil {
164 t.Fatalf("client: timed out waiting for connection closure: %v", err)
165 }
166
167 expired := time.NewTimer(15 * time.Second)
168 defer expired.Stop()
169 i := 0
170 for {
171 select {
172 case <-expired.C:
173 t.Fatalf("timed out waiting for proxy and/or server closure")
174 case <-serverDone:
175 i++
176 case <-proxyDone:
177 i++
178 }
179 if i == 2 {
180 break
181 }
182 }
183 }
184
185 func TestConnectionPings(t *testing.T) {
186 const pingPeriod = 10 * time.Millisecond
187 timeout := time.After(10 * time.Second)
188
189
190 listener, err := net.Listen("tcp4", "localhost:0")
191 if err != nil {
192 t.Fatal(err)
193 }
194 defer listener.Close()
195
196 srvErr := make(chan error, 1)
197 go func() {
198 defer close(srvErr)
199
200 srvConn, err := listener.Accept()
201 if err != nil {
202 srvErr <- fmt.Errorf("server: error accepting connection: %v", err)
203 return
204 }
205 defer srvConn.Close()
206
207 spdyConn, err := spdystream.NewConnection(srvConn, true)
208 if err != nil {
209 srvErr <- fmt.Errorf("server: error creating spdy connection: %v", err)
210 return
211 }
212
213 var pingsSent int64
214 srvSPDYConn := newConnection(
215 spdyConn,
216 func(stream httpstream.Stream, replySent <-chan struct{}) error {
217
218 go io.Copy(stream, stream)
219 return nil
220 },
221 pingPeriod,
222 func() (time.Duration, error) {
223 atomic.AddInt64(&pingsSent, 1)
224 return 0, nil
225 })
226 defer srvSPDYConn.Close()
227
228
229
230 select {
231 case <-timeout:
232 srvErr <- fmt.Errorf("server: timeout waiting for connection to close")
233 return
234 case <-srvSPDYConn.CloseChan():
235 }
236
237
238 gotPings := atomic.LoadInt64(&pingsSent)
239 if gotPings < 1 {
240 t.Errorf("server: failed to send any pings (check logs)")
241 }
242 }()
243
244
245 clConn, err := net.Dial("tcp4", listener.Addr().String())
246 if err != nil {
247 t.Fatalf("client: error connecting to proxy: %v", err)
248 }
249 defer clConn.Close()
250 clSPDYConn, err := NewClientConnection(clConn)
251 if err != nil {
252 t.Fatalf("client: error creating spdy connection: %v", err)
253 }
254 defer clSPDYConn.Close()
255 start := time.Now()
256 clSPDYStream, err := clSPDYConn.CreateStream(http.Header{})
257 if err != nil {
258 t.Fatalf("client: error creating stream: %v", err)
259 }
260 defer clSPDYStream.Close()
261
262
263
264 in := "foo"
265 if _, err := fmt.Fprintln(clSPDYStream, in); err != nil {
266 t.Fatalf("client: error writing data to stream: %v", err)
267 }
268 var out string
269 if _, err := fmt.Fscanln(clSPDYStream, &out); err != nil {
270 t.Fatalf("client: error reading data from stream: %v", err)
271 }
272 if in != out {
273 t.Errorf("client: received data doesn't match sent data: got %q, want %q", out, in)
274 }
275
276
277
278 elapsed := time.Since(start)
279 if elapsed < 3*pingPeriod {
280 time.Sleep(3*pingPeriod - elapsed)
281 }
282 clSPDYConn.Close()
283
284 select {
285 case err, ok := <-srvErr:
286 if ok && err != nil {
287 t.Error(err)
288 }
289 case <-timeout:
290 t.Errorf("timed out waiting for server to exit")
291 }
292 }
293
294 type fakeStream struct{ id uint32 }
295
296 func (*fakeStream) Read(p []byte) (int, error) { return 0, nil }
297 func (*fakeStream) Write(p []byte) (int, error) { return 0, nil }
298 func (*fakeStream) Close() error { return nil }
299 func (*fakeStream) Reset() error { return nil }
300 func (*fakeStream) Headers() http.Header { return nil }
301 func (f *fakeStream) Identifier() uint32 { return f.id }
302
303 func TestConnectionRemoveStreams(t *testing.T) {
304 c := &connection{streams: make(map[uint32]httpstream.Stream)}
305 stream0 := &fakeStream{id: 0}
306 stream1 := &fakeStream{id: 1}
307 stream2 := &fakeStream{id: 2}
308
309 c.registerStream(stream0)
310 c.registerStream(stream1)
311
312 if len(c.streams) != 2 {
313 t.Fatalf("should have two streams, has %d", len(c.streams))
314 }
315
316
317 c.RemoveStreams(stream2)
318
319 if len(c.streams) != 2 {
320 t.Fatalf("should have two streams, has %d", len(c.streams))
321 }
322
323
324 c.RemoveStreams(stream0, stream1)
325
326
327 c.RemoveStreams(nil)
328
329 if len(c.streams) != 0 {
330 t.Fatalf("should not have any streams, has %d", len(c.streams))
331 }
332
333 }
334
View as plain text