1
16
17 package remotecommand
18
19 import (
20 "context"
21 "fmt"
22 "net/http"
23 "net/url"
24
25 "k8s.io/apimachinery/pkg/util/httpstream"
26 "k8s.io/apimachinery/pkg/util/remotecommand"
27 restclient "k8s.io/client-go/rest"
28 "k8s.io/client-go/transport/spdy"
29 "k8s.io/klog/v2"
30 )
31
32
33 type spdyStreamExecutor struct {
34 upgrader spdy.Upgrader
35 transport http.RoundTripper
36
37 method string
38 url *url.URL
39 protocols []string
40 rejectRedirects bool
41 }
42
43
44
45 func NewSPDYExecutor(config *restclient.Config, method string, url *url.URL) (Executor, error) {
46 wrapper, upgradeRoundTripper, err := spdy.RoundTripperFor(config)
47 if err != nil {
48 return nil, err
49 }
50 return NewSPDYExecutorForTransports(wrapper, upgradeRoundTripper, method, url)
51 }
52
53
54
55
56
57 func NewSPDYExecutorRejectRedirects(transport http.RoundTripper, upgrader spdy.Upgrader, method string, url *url.URL) (Executor, error) {
58 executor, err := NewSPDYExecutorForTransports(transport, upgrader, method, url)
59 if err != nil {
60 return nil, err
61 }
62 spdyExecutor := executor.(*spdyStreamExecutor)
63 spdyExecutor.rejectRedirects = true
64 return spdyExecutor, nil
65 }
66
67
68
69 func NewSPDYExecutorForTransports(transport http.RoundTripper, upgrader spdy.Upgrader, method string, url *url.URL) (Executor, error) {
70 return NewSPDYExecutorForProtocols(
71 transport, upgrader, method, url,
72 remotecommand.StreamProtocolV5Name,
73 remotecommand.StreamProtocolV4Name,
74 remotecommand.StreamProtocolV3Name,
75 remotecommand.StreamProtocolV2Name,
76 remotecommand.StreamProtocolV1Name,
77 )
78 }
79
80
81
82
83 func NewSPDYExecutorForProtocols(transport http.RoundTripper, upgrader spdy.Upgrader, method string, url *url.URL, protocols ...string) (Executor, error) {
84 return &spdyStreamExecutor{
85 upgrader: upgrader,
86 transport: transport,
87 method: method,
88 url: url,
89 protocols: protocols,
90 }, nil
91 }
92
93
94
95 func (e *spdyStreamExecutor) Stream(options StreamOptions) error {
96 return e.StreamWithContext(context.Background(), options)
97 }
98
99
100 func (e *spdyStreamExecutor) newConnectionAndStream(ctx context.Context, options StreamOptions) (httpstream.Connection, streamProtocolHandler, error) {
101 req, err := http.NewRequestWithContext(ctx, e.method, e.url.String(), nil)
102 if err != nil {
103 return nil, nil, fmt.Errorf("error creating request: %v", err)
104 }
105
106 client := http.Client{Transport: e.transport}
107 if e.rejectRedirects {
108 client.CheckRedirect = func(req *http.Request, via []*http.Request) error {
109 return fmt.Errorf("redirect not allowed")
110 }
111 }
112 conn, protocol, err := spdy.Negotiate(
113 e.upgrader,
114 &client,
115 req,
116 e.protocols...,
117 )
118 if err != nil {
119 return nil, nil, err
120 }
121
122 var streamer streamProtocolHandler
123
124 switch protocol {
125 case remotecommand.StreamProtocolV5Name:
126 streamer = newStreamProtocolV5(options)
127 case remotecommand.StreamProtocolV4Name:
128 streamer = newStreamProtocolV4(options)
129 case remotecommand.StreamProtocolV3Name:
130 streamer = newStreamProtocolV3(options)
131 case remotecommand.StreamProtocolV2Name:
132 streamer = newStreamProtocolV2(options)
133 case "":
134 klog.V(4).Infof("The server did not negotiate a streaming protocol version. Falling back to %s", remotecommand.StreamProtocolV1Name)
135 fallthrough
136 case remotecommand.StreamProtocolV1Name:
137 streamer = newStreamProtocolV1(options)
138 }
139
140 return conn, streamer, nil
141 }
142
143
144
145 func (e *spdyStreamExecutor) StreamWithContext(ctx context.Context, options StreamOptions) error {
146 conn, streamer, err := e.newConnectionAndStream(ctx, options)
147 if err != nil {
148 return err
149 }
150 defer conn.Close()
151
152 panicChan := make(chan any, 1)
153 errorChan := make(chan error, 1)
154 go func() {
155 defer func() {
156 if p := recover(); p != nil {
157 panicChan <- p
158 }
159 }()
160 errorChan <- streamer.stream(conn)
161 }()
162
163 select {
164 case p := <-panicChan:
165 panic(p)
166 case err := <-errorChan:
167 return err
168 case <-ctx.Done():
169 return ctx.Err()
170 }
171 }
172
View as plain text