...
1
16
17 package portforward
18
19 import (
20 "fmt"
21 "net/http"
22 "net/url"
23 "strings"
24 "time"
25
26 "k8s.io/apimachinery/pkg/util/httpstream"
27 "k8s.io/apimachinery/pkg/util/httpstream/spdy"
28 constants "k8s.io/apimachinery/pkg/util/portforward"
29 restclient "k8s.io/client-go/rest"
30 "k8s.io/client-go/transport/websocket"
31 "k8s.io/klog/v2"
32 )
33
34 const PingPeriod = 10 * time.Second
35
36
37 type tunnelingDialer struct {
38 url *url.URL
39 transport http.RoundTripper
40 holder websocket.ConnectionHolder
41 }
42
43
44
45
46 func NewSPDYOverWebsocketDialer(url *url.URL, config *restclient.Config) (httpstream.Dialer, error) {
47 transport, holder, err := websocket.RoundTripperFor(config)
48 if err != nil {
49 return nil, err
50 }
51 return &tunnelingDialer{
52 url: url,
53 transport: transport,
54 holder: holder,
55 }, nil
56 }
57
58
59
60
61 func (d *tunnelingDialer) Dial(protocols ...string) (httpstream.Connection, string, error) {
62
63
64 req, err := http.NewRequest("GET", d.url.String(), nil)
65 if err != nil {
66 return nil, "", err
67 }
68
69
70 tunnelingProtocols := []string{}
71 for _, protocol := range protocols {
72 tunnelingProtocol := constants.WebsocketsSPDYTunnelingPrefix + protocol
73 tunnelingProtocols = append(tunnelingProtocols, tunnelingProtocol)
74 }
75 klog.V(4).Infoln("Before WebSocket Upgrade Connection...")
76 conn, err := websocket.Negotiate(d.transport, d.holder, req, tunnelingProtocols...)
77 if err != nil {
78 return nil, "", err
79 }
80 if conn == nil {
81 return nil, "", fmt.Errorf("negotiated websocket connection is nil")
82 }
83 protocol := conn.Subprotocol()
84 protocol = strings.TrimPrefix(protocol, constants.WebsocketsSPDYTunnelingPrefix)
85 klog.V(4).Infof("negotiated protocol: %s", protocol)
86
87
88 tConn := NewTunnelingConnection("client", conn)
89
90 spdyConn, err := spdy.NewClientConnectionWithPings(tConn, PingPeriod)
91
92 return spdyConn, protocol, err
93 }
94
View as plain text