1
16
17 package spdy
18
19 import (
20 "net"
21 "net/http"
22 "sync"
23 "time"
24
25 "github.com/moby/spdystream"
26 "k8s.io/apimachinery/pkg/util/httpstream"
27 "k8s.io/klog/v2"
28 )
29
30
31
32 type connection struct {
33 conn *spdystream.Connection
34 streams map[uint32]httpstream.Stream
35 streamLock sync.Mutex
36 newStreamHandler httpstream.NewStreamHandler
37 ping func() (time.Duration, error)
38 }
39
40
41 func NewClientConnection(conn net.Conn) (httpstream.Connection, error) {
42 return NewClientConnectionWithPings(conn, 0)
43 }
44
45
46
47
48
49
50 func NewClientConnectionWithPings(conn net.Conn, pingPeriod time.Duration) (httpstream.Connection, error) {
51 spdyConn, err := spdystream.NewConnection(conn, false)
52 if err != nil {
53 defer conn.Close()
54 return nil, err
55 }
56
57 return newConnection(spdyConn, httpstream.NoOpNewStreamHandler, pingPeriod, spdyConn.Ping), nil
58 }
59
60
61
62
63 func NewServerConnection(conn net.Conn, newStreamHandler httpstream.NewStreamHandler) (httpstream.Connection, error) {
64 return NewServerConnectionWithPings(conn, newStreamHandler, 0)
65 }
66
67
68
69
70
71
72
73
74 func NewServerConnectionWithPings(conn net.Conn, newStreamHandler httpstream.NewStreamHandler, pingPeriod time.Duration) (httpstream.Connection, error) {
75 spdyConn, err := spdystream.NewConnection(conn, true)
76 if err != nil {
77 defer conn.Close()
78 return nil, err
79 }
80
81 return newConnection(spdyConn, newStreamHandler, pingPeriod, spdyConn.Ping), nil
82 }
83
84
85
86
87 func newConnection(conn *spdystream.Connection, newStreamHandler httpstream.NewStreamHandler, pingPeriod time.Duration, pingFn func() (time.Duration, error)) httpstream.Connection {
88 c := &connection{
89 conn: conn,
90 newStreamHandler: newStreamHandler,
91 ping: pingFn,
92 streams: make(map[uint32]httpstream.Stream),
93 }
94 go conn.Serve(c.newSpdyStream)
95 if pingPeriod > 0 && pingFn != nil {
96 go c.sendPings(pingPeriod)
97 }
98 return c
99 }
100
101
102
103 const createStreamResponseTimeout = 30 * time.Second
104
105
106
107 func (c *connection) Close() error {
108 c.streamLock.Lock()
109 for _, s := range c.streams {
110
111 s.Reset()
112 }
113 c.streams = make(map[uint32]httpstream.Stream, 0)
114 c.streamLock.Unlock()
115
116
117
118
119 return c.conn.Close()
120 }
121
122
123 func (c *connection) RemoveStreams(streams ...httpstream.Stream) {
124 c.streamLock.Lock()
125 for _, stream := range streams {
126
127 if stream != nil {
128 delete(c.streams, stream.Identifier())
129 }
130 }
131 c.streamLock.Unlock()
132 }
133
134
135
136 func (c *connection) CreateStream(headers http.Header) (httpstream.Stream, error) {
137 stream, err := c.conn.CreateStream(headers, nil, false)
138 if err != nil {
139 return nil, err
140 }
141 if err = stream.WaitTimeout(createStreamResponseTimeout); err != nil {
142 return nil, err
143 }
144
145 c.registerStream(stream)
146 return stream, nil
147 }
148
149
150
151 func (c *connection) registerStream(s httpstream.Stream) {
152 c.streamLock.Lock()
153 c.streams[s.Identifier()] = s
154 c.streamLock.Unlock()
155 }
156
157
158
159 func (c *connection) CloseChan() <-chan bool {
160 return c.conn.CloseChan()
161 }
162
163
164
165
166
167 func (c *connection) newSpdyStream(stream *spdystream.Stream) {
168 replySent := make(chan struct{})
169 err := c.newStreamHandler(stream, replySent)
170 rejectStream := (err != nil)
171 if rejectStream {
172 klog.Warningf("Stream rejected: %v", err)
173 stream.Reset()
174 return
175 }
176
177 c.registerStream(stream)
178 stream.SendReply(http.Header{}, rejectStream)
179 close(replySent)
180 }
181
182
183
184 func (c *connection) SetIdleTimeout(timeout time.Duration) {
185 c.conn.SetIdleTimeout(timeout)
186 }
187
188 func (c *connection) sendPings(period time.Duration) {
189 t := time.NewTicker(period)
190 defer t.Stop()
191 for {
192 select {
193 case <-c.conn.CloseChan():
194 return
195 case <-t.C:
196 }
197 if _, err := c.ping(); err != nil {
198 klog.V(3).Infof("SPDY Ping failed: %v", err)
199
200
201
202 }
203 }
204 }
205
View as plain text