1
16
17 package spdy
18
19 import (
20 "bufio"
21 "context"
22 "crypto/tls"
23 "encoding/base64"
24 "errors"
25 "fmt"
26 "io"
27 "net"
28 "net/http"
29 "net/http/httputil"
30 "net/url"
31 "strings"
32 "time"
33
34 "golang.org/x/net/proxy"
35 apierrors "k8s.io/apimachinery/pkg/api/errors"
36 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
37 "k8s.io/apimachinery/pkg/runtime"
38 "k8s.io/apimachinery/pkg/runtime/serializer"
39 "k8s.io/apimachinery/pkg/util/httpstream"
40 utilnet "k8s.io/apimachinery/pkg/util/net"
41 apiproxy "k8s.io/apimachinery/pkg/util/proxy"
42 "k8s.io/apimachinery/third_party/forked/golang/netutil"
43 )
44
45
46
47
48 type SpdyRoundTripper struct {
49
50
51 tlsConfig *tls.Config
52
53
59
60 conn net.Conn
61
62
63 Dialer *net.Dialer
64
65
66
67 proxier func(req *http.Request) (*url.URL, error)
68
69
70
71 pingPeriod time.Duration
72
73
74
75 upgradeTransport http.RoundTripper
76 }
77
78 var _ utilnet.TLSClientConfigHolder = &SpdyRoundTripper{}
79 var _ httpstream.UpgradeRoundTripper = &SpdyRoundTripper{}
80 var _ utilnet.Dialer = &SpdyRoundTripper{}
81
82
83
84 func NewRoundTripper(tlsConfig *tls.Config) (*SpdyRoundTripper, error) {
85 return NewRoundTripperWithConfig(RoundTripperConfig{
86 TLS: tlsConfig,
87 UpgradeTransport: nil,
88 })
89 }
90
91
92
93 func NewRoundTripperWithProxy(tlsConfig *tls.Config, proxier func(*http.Request) (*url.URL, error)) (*SpdyRoundTripper, error) {
94 return NewRoundTripperWithConfig(RoundTripperConfig{
95 TLS: tlsConfig,
96 Proxier: proxier,
97 UpgradeTransport: nil,
98 })
99 }
100
101
102
103 func NewRoundTripperWithConfig(cfg RoundTripperConfig) (*SpdyRoundTripper, error) {
104
105 if cfg.UpgradeTransport != nil {
106 if cfg.TLS != nil || cfg.Proxier != nil {
107 return nil, fmt.Errorf("SpdyRoundTripper: UpgradeTransport is mutually exclusive to TLSConfig or Proxier")
108 }
109 tlsConfig, err := utilnet.TLSClientConfig(cfg.UpgradeTransport)
110 if err != nil {
111 return nil, fmt.Errorf("SpdyRoundTripper: Unable to retrieve TLSConfig from UpgradeTransport: %v", err)
112 }
113 cfg.TLS = tlsConfig
114 }
115 if cfg.Proxier == nil {
116 cfg.Proxier = utilnet.NewProxierWithNoProxyCIDR(http.ProxyFromEnvironment)
117 }
118 return &SpdyRoundTripper{
119 tlsConfig: cfg.TLS,
120 proxier: cfg.Proxier,
121 pingPeriod: cfg.PingPeriod,
122 upgradeTransport: cfg.UpgradeTransport,
123 }, nil
124 }
125
126
127 type RoundTripperConfig struct {
128
129 TLS *tls.Config
130
131 Proxier func(*http.Request) (*url.URL, error)
132
133
134 PingPeriod time.Duration
135
136
137
138 UpgradeTransport http.RoundTripper
139 }
140
141
142
143 func (s *SpdyRoundTripper) TLSClientConfig() *tls.Config {
144 return s.tlsConfig
145 }
146
147
148 func (s *SpdyRoundTripper) Dial(req *http.Request) (net.Conn, error) {
149 var conn net.Conn
150 var err error
151 if s.upgradeTransport != nil {
152 conn, err = apiproxy.DialURL(req.Context(), req.URL, s.upgradeTransport)
153 } else {
154 conn, err = s.dial(req)
155 }
156 if err != nil {
157 return nil, err
158 }
159
160 if err := req.Write(conn); err != nil {
161 conn.Close()
162 return nil, err
163 }
164
165 return conn, nil
166 }
167
168
169
170 func (s *SpdyRoundTripper) dial(req *http.Request) (net.Conn, error) {
171 proxyURL, err := s.proxier(req)
172 if err != nil {
173 return nil, err
174 }
175
176 if proxyURL == nil {
177 return s.dialWithoutProxy(req.Context(), req.URL)
178 }
179
180 switch proxyURL.Scheme {
181 case "socks5":
182 return s.dialWithSocks5Proxy(req, proxyURL)
183 case "https", "http", "":
184 return s.dialWithHttpProxy(req, proxyURL)
185 }
186
187 return nil, fmt.Errorf("proxy URL scheme not supported: %s", proxyURL.Scheme)
188 }
189
190
191 func (s *SpdyRoundTripper) dialWithHttpProxy(req *http.Request, proxyURL *url.URL) (net.Conn, error) {
192
193 targetHost := netutil.CanonicalAddr(req.URL)
194
195
196 proxyReq := http.Request{
197 Method: "CONNECT",
198 URL: &url.URL{},
199 Host: targetHost,
200 }
201
202 proxyReq = *proxyReq.WithContext(req.Context())
203
204 if pa := s.proxyAuth(proxyURL); pa != "" {
205 proxyReq.Header = http.Header{}
206 proxyReq.Header.Set("Proxy-Authorization", pa)
207 }
208
209 proxyDialConn, err := s.dialWithoutProxy(proxyReq.Context(), proxyURL)
210 if err != nil {
211 return nil, err
212 }
213
214
215 proxyClientConn := httputil.NewProxyClientConn(proxyDialConn, nil)
216 response, err := proxyClientConn.Do(&proxyReq)
217
218
219 if err != nil && err != httputil.ErrPersistEOF {
220 return nil, err
221 }
222 if response != nil && response.StatusCode >= 300 || response.StatusCode < 200 {
223 return nil, fmt.Errorf("CONNECT request to %s returned response: %s", proxyURL.Redacted(), response.Status)
224 }
225
226 rwc, _ := proxyClientConn.Hijack()
227
228 if req.URL.Scheme == "https" {
229 return s.tlsConn(proxyReq.Context(), rwc, targetHost)
230 }
231 return rwc, nil
232 }
233
234
235 func (s *SpdyRoundTripper) dialWithSocks5Proxy(req *http.Request, proxyURL *url.URL) (net.Conn, error) {
236
237 targetHost := netutil.CanonicalAddr(req.URL)
238 proxyDialAddr := netutil.CanonicalAddr(proxyURL)
239
240 var auth *proxy.Auth
241 if proxyURL.User != nil {
242 pass, _ := proxyURL.User.Password()
243 auth = &proxy.Auth{
244 User: proxyURL.User.Username(),
245 Password: pass,
246 }
247 }
248
249 dialer := s.Dialer
250 if dialer == nil {
251 dialer = &net.Dialer{
252 Timeout: 30 * time.Second,
253 }
254 }
255
256 proxyDialer, err := proxy.SOCKS5("tcp", proxyDialAddr, auth, dialer)
257 if err != nil {
258 return nil, err
259 }
260
261
262 contextDialer, ok := proxyDialer.(proxy.ContextDialer)
263 if !ok {
264 return nil, errors.New("SOCKS5 Dialer must implement ContextDialer")
265 }
266
267 proxyDialConn, err := contextDialer.DialContext(req.Context(), "tcp", targetHost)
268 if err != nil {
269 return nil, err
270 }
271
272 if req.URL.Scheme == "https" {
273 return s.tlsConn(req.Context(), proxyDialConn, targetHost)
274 }
275 return proxyDialConn, nil
276 }
277
278
279 func (s *SpdyRoundTripper) tlsConn(ctx context.Context, rwc net.Conn, targetHost string) (net.Conn, error) {
280
281 host, _, err := net.SplitHostPort(targetHost)
282 if err != nil {
283 return nil, err
284 }
285
286 tlsConfig := s.tlsConfig
287 switch {
288 case tlsConfig == nil:
289 tlsConfig = &tls.Config{ServerName: host}
290 case len(tlsConfig.ServerName) == 0:
291 tlsConfig = tlsConfig.Clone()
292 tlsConfig.ServerName = host
293 }
294
295 tlsConn := tls.Client(rwc, tlsConfig)
296
297 if err := tlsConn.HandshakeContext(ctx); err != nil {
298 tlsConn.Close()
299 return nil, err
300 }
301
302 return tlsConn, nil
303 }
304
305
306 func (s *SpdyRoundTripper) dialWithoutProxy(ctx context.Context, url *url.URL) (net.Conn, error) {
307 dialAddr := netutil.CanonicalAddr(url)
308 dialer := s.Dialer
309 if dialer == nil {
310 dialer = &net.Dialer{}
311 }
312
313 if url.Scheme == "http" {
314 return dialer.DialContext(ctx, "tcp", dialAddr)
315 }
316
317 tlsDialer := tls.Dialer{
318 NetDialer: dialer,
319 Config: s.tlsConfig,
320 }
321 return tlsDialer.DialContext(ctx, "tcp", dialAddr)
322 }
323
324
325 func (s *SpdyRoundTripper) proxyAuth(proxyURL *url.URL) string {
326 if proxyURL == nil || proxyURL.User == nil {
327 return ""
328 }
329 username := proxyURL.User.Username()
330 password, _ := proxyURL.User.Password()
331 auth := username + ":" + password
332 return "Basic " + base64.StdEncoding.EncodeToString([]byte(auth))
333 }
334
335
336
337
338 func (s *SpdyRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
339 req = utilnet.CloneRequest(req)
340 req.Header.Add(httpstream.HeaderConnection, httpstream.HeaderUpgrade)
341 req.Header.Add(httpstream.HeaderUpgrade, HeaderSpdy31)
342
343 conn, err := s.Dial(req)
344 if err != nil {
345 return nil, err
346 }
347
348 responseReader := bufio.NewReader(conn)
349
350 resp, err := http.ReadResponse(responseReader, nil)
351 if err != nil {
352 conn.Close()
353 return nil, err
354 }
355
356 s.conn = conn
357
358 return resp, nil
359 }
360
361
362
363 func (s *SpdyRoundTripper) NewConnection(resp *http.Response) (httpstream.Connection, error) {
364 connectionHeader := strings.ToLower(resp.Header.Get(httpstream.HeaderConnection))
365 upgradeHeader := strings.ToLower(resp.Header.Get(httpstream.HeaderUpgrade))
366 if (resp.StatusCode != http.StatusSwitchingProtocols) || !strings.Contains(connectionHeader, strings.ToLower(httpstream.HeaderUpgrade)) || !strings.Contains(upgradeHeader, strings.ToLower(HeaderSpdy31)) {
367 defer resp.Body.Close()
368 responseError := ""
369 responseErrorBytes, err := io.ReadAll(resp.Body)
370 if err != nil {
371 responseError = "unable to read error from server response"
372 } else {
373
374 if obj, _, err := statusCodecs.UniversalDecoder().Decode(responseErrorBytes, nil, &metav1.Status{}); err == nil {
375 if status, ok := obj.(*metav1.Status); ok {
376 return nil, &apierrors.StatusError{ErrStatus: *status}
377 }
378 }
379 responseError = string(responseErrorBytes)
380 responseError = strings.TrimSpace(responseError)
381 }
382
383 return nil, fmt.Errorf("unable to upgrade connection: %s", responseError)
384 }
385
386 return NewClientConnectionWithPings(s.conn, s.pingPeriod)
387 }
388
389
390 var statusScheme = runtime.NewScheme()
391
392
393 var statusCodecs = serializer.NewCodecFactory(statusScheme)
394
395 func init() {
396 statusScheme.AddUnversionedTypes(metav1.SchemeGroupVersion,
397 &metav1.Status{},
398 )
399 }
400
View as plain text