1
16
17 package rest
18
19 import (
20 "context"
21 "fmt"
22 "net/http"
23 "net/url"
24
25 "k8s.io/apimachinery/pkg/runtime"
26 "k8s.io/apimachinery/pkg/util/httpstream/wsstream"
27 "k8s.io/apimachinery/pkg/util/net"
28 "k8s.io/apimachinery/pkg/util/proxy"
29 genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
30 "k8s.io/apiserver/pkg/registry/rest"
31 utilfeature "k8s.io/apiserver/pkg/util/feature"
32 translator "k8s.io/apiserver/pkg/util/proxy"
33 api "k8s.io/kubernetes/pkg/apis/core"
34 "k8s.io/kubernetes/pkg/capabilities"
35 "k8s.io/kubernetes/pkg/features"
36 "k8s.io/kubernetes/pkg/kubelet/client"
37 "k8s.io/kubernetes/pkg/registry/core/pod"
38 )
39
40
41 type ProxyREST struct {
42 Store *genericregistry.Store
43 ProxyTransport http.RoundTripper
44 }
45
46
47 var _ = rest.Connecter(&ProxyREST{})
48
49 var proxyMethods = []string{"GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS"}
50
51
52 func (r *ProxyREST) New() runtime.Object {
53 return &api.PodProxyOptions{}
54 }
55
56
57 func (r *ProxyREST) Destroy() {
58
59
60 }
61
62
63 func (r *ProxyREST) ConnectMethods() []string {
64 return proxyMethods
65 }
66
67
68 func (r *ProxyREST) NewConnectOptions() (runtime.Object, bool, string) {
69 return &api.PodProxyOptions{}, true, "path"
70 }
71
72
73 func (r *ProxyREST) Connect(ctx context.Context, id string, opts runtime.Object, responder rest.Responder) (http.Handler, error) {
74 proxyOpts, ok := opts.(*api.PodProxyOptions)
75 if !ok {
76 return nil, fmt.Errorf("Invalid options object: %#v", opts)
77 }
78 location, transport, err := pod.ResourceLocation(ctx, r.Store, r.ProxyTransport, id)
79 if err != nil {
80 return nil, err
81 }
82 location.Path = net.JoinPreservingTrailingSlash(location.Path, proxyOpts.Path)
83
84 return newThrottledUpgradeAwareProxyHandler(location, transport, true, false, responder), nil
85 }
86
87
88 var upgradeableMethods = []string{"GET", "POST"}
89
90
91 type AttachREST struct {
92 Store *genericregistry.Store
93 KubeletConn client.ConnectionInfoGetter
94 }
95
96
97 var _ = rest.Connecter(&AttachREST{})
98
99
100 func (r *AttachREST) New() runtime.Object {
101 return &api.PodAttachOptions{}
102 }
103
104
105 func (r *AttachREST) Destroy() {
106
107
108 }
109
110
111 func (r *AttachREST) Connect(ctx context.Context, name string, opts runtime.Object, responder rest.Responder) (http.Handler, error) {
112 attachOpts, ok := opts.(*api.PodAttachOptions)
113 if !ok {
114 return nil, fmt.Errorf("Invalid options object: %#v", opts)
115 }
116 location, transport, err := pod.AttachLocation(ctx, r.Store, r.KubeletConn, name, attachOpts)
117 if err != nil {
118 return nil, err
119 }
120 handler := newThrottledUpgradeAwareProxyHandler(location, transport, false, true, responder)
121 if utilfeature.DefaultFeatureGate.Enabled(features.TranslateStreamCloseWebsocketRequests) {
122
123
124 streamOptions := translator.Options{
125 Stdin: attachOpts.Stdin,
126 Stdout: attachOpts.Stdout,
127 Stderr: attachOpts.Stderr,
128 Tty: attachOpts.TTY,
129 }
130 maxBytesPerSec := capabilities.Get().PerConnectionBandwidthLimitBytesPerSec
131 streamtranslator := translator.NewStreamTranslatorHandler(location, transport, maxBytesPerSec, streamOptions)
132 handler = translator.NewTranslatingHandler(handler, streamtranslator, wsstream.IsWebSocketRequestWithStreamCloseProtocol)
133 }
134 return handler, nil
135 }
136
137
138 func (r *AttachREST) NewConnectOptions() (runtime.Object, bool, string) {
139 return &api.PodAttachOptions{}, false, ""
140 }
141
142
143 func (r *AttachREST) ConnectMethods() []string {
144 return upgradeableMethods
145 }
146
147
148 type ExecREST struct {
149 Store *genericregistry.Store
150 KubeletConn client.ConnectionInfoGetter
151 }
152
153
154 var _ = rest.Connecter(&ExecREST{})
155
156
157 func (r *ExecREST) New() runtime.Object {
158 return &api.PodExecOptions{}
159 }
160
161
162 func (r *ExecREST) Destroy() {
163
164
165 }
166
167
168 func (r *ExecREST) Connect(ctx context.Context, name string, opts runtime.Object, responder rest.Responder) (http.Handler, error) {
169 execOpts, ok := opts.(*api.PodExecOptions)
170 if !ok {
171 return nil, fmt.Errorf("invalid options object: %#v", opts)
172 }
173 location, transport, err := pod.ExecLocation(ctx, r.Store, r.KubeletConn, name, execOpts)
174 if err != nil {
175 return nil, err
176 }
177 handler := newThrottledUpgradeAwareProxyHandler(location, transport, false, true, responder)
178 if utilfeature.DefaultFeatureGate.Enabled(features.TranslateStreamCloseWebsocketRequests) {
179
180
181 streamOptions := translator.Options{
182 Stdin: execOpts.Stdin,
183 Stdout: execOpts.Stdout,
184 Stderr: execOpts.Stderr,
185 Tty: execOpts.TTY,
186 }
187 maxBytesPerSec := capabilities.Get().PerConnectionBandwidthLimitBytesPerSec
188 streamtranslator := translator.NewStreamTranslatorHandler(location, transport, maxBytesPerSec, streamOptions)
189 handler = translator.NewTranslatingHandler(handler, streamtranslator, wsstream.IsWebSocketRequestWithStreamCloseProtocol)
190 }
191 return handler, nil
192 }
193
194
195 func (r *ExecREST) NewConnectOptions() (runtime.Object, bool, string) {
196 return &api.PodExecOptions{}, false, ""
197 }
198
199
200 func (r *ExecREST) ConnectMethods() []string {
201 return upgradeableMethods
202 }
203
204
205 type PortForwardREST struct {
206 Store *genericregistry.Store
207 KubeletConn client.ConnectionInfoGetter
208 }
209
210
211 var _ = rest.Connecter(&PortForwardREST{})
212
213
214 func (r *PortForwardREST) New() runtime.Object {
215 return &api.PodPortForwardOptions{}
216 }
217
218
219 func (r *PortForwardREST) Destroy() {
220
221
222 }
223
224
225
226 func (r *PortForwardREST) NewConnectOptions() (runtime.Object, bool, string) {
227 return &api.PodPortForwardOptions{}, false, ""
228 }
229
230
231 func (r *PortForwardREST) ConnectMethods() []string {
232 return upgradeableMethods
233 }
234
235
236 func (r *PortForwardREST) Connect(ctx context.Context, name string, opts runtime.Object, responder rest.Responder) (http.Handler, error) {
237 portForwardOpts, ok := opts.(*api.PodPortForwardOptions)
238 if !ok {
239 return nil, fmt.Errorf("invalid options object: %#v", opts)
240 }
241 location, transport, err := pod.PortForwardLocation(ctx, r.Store, r.KubeletConn, name, portForwardOpts)
242 if err != nil {
243 return nil, err
244 }
245 handler := newThrottledUpgradeAwareProxyHandler(location, transport, false, true, responder)
246 if utilfeature.DefaultFeatureGate.Enabled(features.PortForwardWebsockets) {
247 tunnelingHandler := translator.NewTunnelingHandler(handler)
248 handler = translator.NewTranslatingHandler(handler, tunnelingHandler, wsstream.IsWebSocketRequestWithTunnelingProtocol)
249 }
250 return handler, nil
251 }
252
253 func newThrottledUpgradeAwareProxyHandler(location *url.URL, transport http.RoundTripper, wrapTransport, upgradeRequired bool, responder rest.Responder) http.Handler {
254 handler := proxy.NewUpgradeAwareHandler(location, transport, wrapTransport, upgradeRequired, proxy.NewErrorResponder(responder))
255 handler.MaxBytesPerSec = capabilities.Get().PerConnectionBandwidthLimitBytesPerSec
256 return handler
257 }
258
View as plain text