...

Source file src/k8s.io/kubernetes/pkg/registry/core/pod/rest/subresources.go

Documentation: k8s.io/kubernetes/pkg/registry/core/pod/rest

     1  /*
     2  Copyright 2014 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package rest
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"net/http"
    23  	"net/url"
    24  
    25  	"k8s.io/apimachinery/pkg/runtime"
    26  	"k8s.io/apimachinery/pkg/util/httpstream/wsstream"
    27  	"k8s.io/apimachinery/pkg/util/net"
    28  	"k8s.io/apimachinery/pkg/util/proxy"
    29  	genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
    30  	"k8s.io/apiserver/pkg/registry/rest"
    31  	utilfeature "k8s.io/apiserver/pkg/util/feature"
    32  	translator "k8s.io/apiserver/pkg/util/proxy"
    33  	api "k8s.io/kubernetes/pkg/apis/core"
    34  	"k8s.io/kubernetes/pkg/capabilities"
    35  	"k8s.io/kubernetes/pkg/features"
    36  	"k8s.io/kubernetes/pkg/kubelet/client"
    37  	"k8s.io/kubernetes/pkg/registry/core/pod"
    38  )
    39  
    40  // ProxyREST implements the proxy subresource for a Pod
    41  type ProxyREST struct {
    42  	Store          *genericregistry.Store
    43  	ProxyTransport http.RoundTripper
    44  }
    45  
    46  // Implement Connecter
    47  var _ = rest.Connecter(&ProxyREST{})
    48  
    49  var proxyMethods = []string{"GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS"}
    50  
    51  // New returns an empty podProxyOptions object.
    52  func (r *ProxyREST) New() runtime.Object {
    53  	return &api.PodProxyOptions{}
    54  }
    55  
    56  // Destroy cleans up resources on shutdown.
    57  func (r *ProxyREST) Destroy() {
    58  	// Given that underlying store is shared with REST,
    59  	// we don't destroy it here explicitly.
    60  }
    61  
    62  // ConnectMethods returns the list of HTTP methods that can be proxied
    63  func (r *ProxyREST) ConnectMethods() []string {
    64  	return proxyMethods
    65  }
    66  
    67  // NewConnectOptions returns versioned resource that represents proxy parameters
    68  func (r *ProxyREST) NewConnectOptions() (runtime.Object, bool, string) {
    69  	return &api.PodProxyOptions{}, true, "path"
    70  }
    71  
    72  // Connect returns a handler for the pod proxy
    73  func (r *ProxyREST) Connect(ctx context.Context, id string, opts runtime.Object, responder rest.Responder) (http.Handler, error) {
    74  	proxyOpts, ok := opts.(*api.PodProxyOptions)
    75  	if !ok {
    76  		return nil, fmt.Errorf("Invalid options object: %#v", opts)
    77  	}
    78  	location, transport, err := pod.ResourceLocation(ctx, r.Store, r.ProxyTransport, id)
    79  	if err != nil {
    80  		return nil, err
    81  	}
    82  	location.Path = net.JoinPreservingTrailingSlash(location.Path, proxyOpts.Path)
    83  	// Return a proxy handler that uses the desired transport, wrapped with additional proxy handling (to get URL rewriting, X-Forwarded-* headers, etc)
    84  	return newThrottledUpgradeAwareProxyHandler(location, transport, true, false, responder), nil
    85  }
    86  
    87  // Support both GET and POST methods. We must support GET for browsers that want to use WebSockets.
    88  var upgradeableMethods = []string{"GET", "POST"}
    89  
    90  // AttachREST implements the attach subresource for a Pod
    91  type AttachREST struct {
    92  	Store       *genericregistry.Store
    93  	KubeletConn client.ConnectionInfoGetter
    94  }
    95  
    96  // Implement Connecter
    97  var _ = rest.Connecter(&AttachREST{})
    98  
    99  // New creates a new podAttachOptions object.
   100  func (r *AttachREST) New() runtime.Object {
   101  	return &api.PodAttachOptions{}
   102  }
   103  
   104  // Destroy cleans up resources on shutdown.
   105  func (r *AttachREST) Destroy() {
   106  	// Given that underlying store is shared with REST,
   107  	// we don't destroy it here explicitly.
   108  }
   109  
   110  // Connect returns a handler for the pod exec proxy
   111  func (r *AttachREST) Connect(ctx context.Context, name string, opts runtime.Object, responder rest.Responder) (http.Handler, error) {
   112  	attachOpts, ok := opts.(*api.PodAttachOptions)
   113  	if !ok {
   114  		return nil, fmt.Errorf("Invalid options object: %#v", opts)
   115  	}
   116  	location, transport, err := pod.AttachLocation(ctx, r.Store, r.KubeletConn, name, attachOpts)
   117  	if err != nil {
   118  		return nil, err
   119  	}
   120  	handler := newThrottledUpgradeAwareProxyHandler(location, transport, false, true, responder)
   121  	if utilfeature.DefaultFeatureGate.Enabled(features.TranslateStreamCloseWebsocketRequests) {
   122  		// Wrap the upgrade aware handler to implement stream translation
   123  		// for WebSocket/V5 upgrade requests.
   124  		streamOptions := translator.Options{
   125  			Stdin:  attachOpts.Stdin,
   126  			Stdout: attachOpts.Stdout,
   127  			Stderr: attachOpts.Stderr,
   128  			Tty:    attachOpts.TTY,
   129  		}
   130  		maxBytesPerSec := capabilities.Get().PerConnectionBandwidthLimitBytesPerSec
   131  		streamtranslator := translator.NewStreamTranslatorHandler(location, transport, maxBytesPerSec, streamOptions)
   132  		handler = translator.NewTranslatingHandler(handler, streamtranslator, wsstream.IsWebSocketRequestWithStreamCloseProtocol)
   133  	}
   134  	return handler, nil
   135  }
   136  
   137  // NewConnectOptions returns the versioned object that represents exec parameters
   138  func (r *AttachREST) NewConnectOptions() (runtime.Object, bool, string) {
   139  	return &api.PodAttachOptions{}, false, ""
   140  }
   141  
   142  // ConnectMethods returns the methods supported by exec
   143  func (r *AttachREST) ConnectMethods() []string {
   144  	return upgradeableMethods
   145  }
   146  
   147  // ExecREST implements the exec subresource for a Pod
   148  type ExecREST struct {
   149  	Store       *genericregistry.Store
   150  	KubeletConn client.ConnectionInfoGetter
   151  }
   152  
   153  // Implement Connecter
   154  var _ = rest.Connecter(&ExecREST{})
   155  
   156  // New creates a new podExecOptions object.
   157  func (r *ExecREST) New() runtime.Object {
   158  	return &api.PodExecOptions{}
   159  }
   160  
   161  // Destroy cleans up resources on shutdown.
   162  func (r *ExecREST) Destroy() {
   163  	// Given that underlying store is shared with REST,
   164  	// we don't destroy it here explicitly.
   165  }
   166  
   167  // Connect returns a handler for the pod exec proxy
   168  func (r *ExecREST) Connect(ctx context.Context, name string, opts runtime.Object, responder rest.Responder) (http.Handler, error) {
   169  	execOpts, ok := opts.(*api.PodExecOptions)
   170  	if !ok {
   171  		return nil, fmt.Errorf("invalid options object: %#v", opts)
   172  	}
   173  	location, transport, err := pod.ExecLocation(ctx, r.Store, r.KubeletConn, name, execOpts)
   174  	if err != nil {
   175  		return nil, err
   176  	}
   177  	handler := newThrottledUpgradeAwareProxyHandler(location, transport, false, true, responder)
   178  	if utilfeature.DefaultFeatureGate.Enabled(features.TranslateStreamCloseWebsocketRequests) {
   179  		// Wrap the upgrade aware handler to implement stream translation
   180  		// for WebSocket/V5 upgrade requests.
   181  		streamOptions := translator.Options{
   182  			Stdin:  execOpts.Stdin,
   183  			Stdout: execOpts.Stdout,
   184  			Stderr: execOpts.Stderr,
   185  			Tty:    execOpts.TTY,
   186  		}
   187  		maxBytesPerSec := capabilities.Get().PerConnectionBandwidthLimitBytesPerSec
   188  		streamtranslator := translator.NewStreamTranslatorHandler(location, transport, maxBytesPerSec, streamOptions)
   189  		handler = translator.NewTranslatingHandler(handler, streamtranslator, wsstream.IsWebSocketRequestWithStreamCloseProtocol)
   190  	}
   191  	return handler, nil
   192  }
   193  
   194  // NewConnectOptions returns the versioned object that represents exec parameters
   195  func (r *ExecREST) NewConnectOptions() (runtime.Object, bool, string) {
   196  	return &api.PodExecOptions{}, false, ""
   197  }
   198  
   199  // ConnectMethods returns the methods supported by exec
   200  func (r *ExecREST) ConnectMethods() []string {
   201  	return upgradeableMethods
   202  }
   203  
   204  // PortForwardREST implements the portforward subresource for a Pod
   205  type PortForwardREST struct {
   206  	Store       *genericregistry.Store
   207  	KubeletConn client.ConnectionInfoGetter
   208  }
   209  
   210  // Implement Connecter
   211  var _ = rest.Connecter(&PortForwardREST{})
   212  
   213  // New returns an empty podPortForwardOptions object
   214  func (r *PortForwardREST) New() runtime.Object {
   215  	return &api.PodPortForwardOptions{}
   216  }
   217  
   218  // Destroy cleans up resources on shutdown.
   219  func (r *PortForwardREST) Destroy() {
   220  	// Given that underlying store is shared with REST,
   221  	// we don't destroy it here explicitly.
   222  }
   223  
   224  // NewConnectOptions returns the versioned object that represents the
   225  // portforward parameters
   226  func (r *PortForwardREST) NewConnectOptions() (runtime.Object, bool, string) {
   227  	return &api.PodPortForwardOptions{}, false, ""
   228  }
   229  
   230  // ConnectMethods returns the methods supported by portforward
   231  func (r *PortForwardREST) ConnectMethods() []string {
   232  	return upgradeableMethods
   233  }
   234  
   235  // Connect returns a handler for the pod portforward proxy
   236  func (r *PortForwardREST) Connect(ctx context.Context, name string, opts runtime.Object, responder rest.Responder) (http.Handler, error) {
   237  	portForwardOpts, ok := opts.(*api.PodPortForwardOptions)
   238  	if !ok {
   239  		return nil, fmt.Errorf("invalid options object: %#v", opts)
   240  	}
   241  	location, transport, err := pod.PortForwardLocation(ctx, r.Store, r.KubeletConn, name, portForwardOpts)
   242  	if err != nil {
   243  		return nil, err
   244  	}
   245  	handler := newThrottledUpgradeAwareProxyHandler(location, transport, false, true, responder)
   246  	if utilfeature.DefaultFeatureGate.Enabled(features.PortForwardWebsockets) {
   247  		tunnelingHandler := translator.NewTunnelingHandler(handler)
   248  		handler = translator.NewTranslatingHandler(handler, tunnelingHandler, wsstream.IsWebSocketRequestWithTunnelingProtocol)
   249  	}
   250  	return handler, nil
   251  }
   252  
   253  func newThrottledUpgradeAwareProxyHandler(location *url.URL, transport http.RoundTripper, wrapTransport, upgradeRequired bool, responder rest.Responder) http.Handler {
   254  	handler := proxy.NewUpgradeAwareHandler(location, transport, wrapTransport, upgradeRequired, proxy.NewErrorResponder(responder))
   255  	handler.MaxBytesPerSec = capabilities.Get().PerConnectionBandwidthLimitBytesPerSec
   256  	return handler
   257  }
   258  

View as plain text