...

Source file src/k8s.io/client-go/transport/websocket/roundtripper.go

Documentation: k8s.io/client-go/transport/websocket

     1  /*
     2  Copyright 2023 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package websocket
    18  
    19  import (
    20  	"crypto/tls"
    21  	"errors"
    22  	"fmt"
    23  	"net/http"
    24  	"net/url"
    25  
    26  	gwebsocket "github.com/gorilla/websocket"
    27  
    28  	"k8s.io/apimachinery/pkg/util/httpstream"
    29  	"k8s.io/apimachinery/pkg/util/httpstream/wsstream"
    30  	utilnet "k8s.io/apimachinery/pkg/util/net"
    31  	restclient "k8s.io/client-go/rest"
    32  	"k8s.io/client-go/transport"
    33  )
    34  
    35  var (
    36  	_ utilnet.TLSClientConfigHolder = &RoundTripper{}
    37  	_ http.RoundTripper             = &RoundTripper{}
    38  )
    39  
    40  // ConnectionHolder defines functions for structure providing
    41  // access to the websocket connection.
    42  type ConnectionHolder interface {
    43  	DataBufferSize() int
    44  	Connection() *gwebsocket.Conn
    45  }
    46  
    47  // RoundTripper knows how to establish a connection to a remote WebSocket endpoint and make it available for use.
    48  // RoundTripper must not be reused.
    49  type RoundTripper struct {
    50  	// TLSConfig holds the TLS configuration settings to use when connecting
    51  	// to the remote server.
    52  	TLSConfig *tls.Config
    53  
    54  	// Proxier specifies a function to return a proxy for a given
    55  	// Request. If the function returns a non-nil error, the
    56  	// request is aborted with the provided error.
    57  	// If Proxy is nil or returns a nil *URL, no proxy is used.
    58  	Proxier func(req *http.Request) (*url.URL, error)
    59  
    60  	// Conn holds the WebSocket connection after a round trip.
    61  	Conn *gwebsocket.Conn
    62  }
    63  
    64  // Connection returns the stored websocket connection.
    65  func (rt *RoundTripper) Connection() *gwebsocket.Conn {
    66  	return rt.Conn
    67  }
    68  
    69  // DataBufferSize returns the size of buffers for the
    70  // websocket connection.
    71  func (rt *RoundTripper) DataBufferSize() int {
    72  	return 32 * 1024
    73  }
    74  
    75  // TLSClientConfig implements pkg/util/net.TLSClientConfigHolder.
    76  func (rt *RoundTripper) TLSClientConfig() *tls.Config {
    77  	return rt.TLSConfig
    78  }
    79  
    80  // RoundTrip connects to the remote websocket using the headers in the request and the TLS
    81  // configuration from the config
    82  func (rt *RoundTripper) RoundTrip(request *http.Request) (retResp *http.Response, retErr error) {
    83  	defer func() {
    84  		if request.Body != nil {
    85  			err := request.Body.Close()
    86  			if retErr == nil {
    87  				retErr = err
    88  			}
    89  		}
    90  	}()
    91  
    92  	// set the protocol version directly on the dialer from the header
    93  	protocolVersions := request.Header[wsstream.WebSocketProtocolHeader]
    94  	delete(request.Header, wsstream.WebSocketProtocolHeader)
    95  
    96  	dialer := gwebsocket.Dialer{
    97  		Proxy:           rt.Proxier,
    98  		TLSClientConfig: rt.TLSConfig,
    99  		Subprotocols:    protocolVersions,
   100  		ReadBufferSize:  rt.DataBufferSize() + 1024, // add space for the protocol byte indicating which channel the data is for
   101  		WriteBufferSize: rt.DataBufferSize() + 1024, // add space for the protocol byte indicating which channel the data is for
   102  	}
   103  	switch request.URL.Scheme {
   104  	case "https":
   105  		request.URL.Scheme = "wss"
   106  	case "http":
   107  		request.URL.Scheme = "ws"
   108  	default:
   109  		return nil, fmt.Errorf("unknown url scheme: %s", request.URL.Scheme)
   110  	}
   111  	wsConn, resp, err := dialer.DialContext(request.Context(), request.URL.String(), request.Header)
   112  	if err != nil {
   113  		if errors.Is(err, gwebsocket.ErrBadHandshake) {
   114  			return nil, &httpstream.UpgradeFailureError{Cause: err}
   115  		}
   116  		return nil, err
   117  	}
   118  
   119  	// Ensure we got back a protocol we understand
   120  	foundProtocol := false
   121  	for _, protocolVersion := range protocolVersions {
   122  		if protocolVersion == wsConn.Subprotocol() {
   123  			foundProtocol = true
   124  			break
   125  		}
   126  	}
   127  	if !foundProtocol {
   128  		wsConn.Close() // nolint:errcheck
   129  		return nil, &httpstream.UpgradeFailureError{Cause: fmt.Errorf("invalid protocol, expected one of %q, got %q", protocolVersions, wsConn.Subprotocol())}
   130  	}
   131  
   132  	rt.Conn = wsConn
   133  
   134  	return resp, nil
   135  }
   136  
   137  // RoundTripperFor transforms the passed rest config into a wrapped roundtripper, as well
   138  // as a pointer to the websocket RoundTripper. The websocket RoundTripper contains the
   139  // websocket connection after RoundTrip() on the wrapper. Returns an error if there is
   140  // a problem creating the round trippers.
   141  func RoundTripperFor(config *restclient.Config) (http.RoundTripper, ConnectionHolder, error) {
   142  	transportCfg, err := config.TransportConfig()
   143  	if err != nil {
   144  		return nil, nil, err
   145  	}
   146  	tlsConfig, err := transport.TLSConfigFor(transportCfg)
   147  	if err != nil {
   148  		return nil, nil, err
   149  	}
   150  	proxy := config.Proxy
   151  	if proxy == nil {
   152  		proxy = utilnet.NewProxierWithNoProxyCIDR(http.ProxyFromEnvironment)
   153  	}
   154  
   155  	upgradeRoundTripper := &RoundTripper{
   156  		TLSConfig: tlsConfig,
   157  		Proxier:   proxy,
   158  	}
   159  	wrapper, err := transport.HTTPWrappersForConfig(transportCfg, upgradeRoundTripper)
   160  	if err != nil {
   161  		return nil, nil, err
   162  	}
   163  	return wrapper, upgradeRoundTripper, nil
   164  }
   165  
   166  // Negotiate opens a connection to a remote server and attempts to negotiate
   167  // a WebSocket connection. Upon success, it returns the negotiated connection.
   168  // The round tripper rt must use the WebSocket round tripper wsRt - see RoundTripperFor.
   169  func Negotiate(rt http.RoundTripper, connectionInfo ConnectionHolder, req *http.Request, protocols ...string) (*gwebsocket.Conn, error) {
   170  	// Plumb protocols to RoundTripper#RoundTrip
   171  	req.Header[wsstream.WebSocketProtocolHeader] = protocols
   172  	resp, err := rt.RoundTrip(req)
   173  	if err != nil {
   174  		return nil, err
   175  	}
   176  	err = resp.Body.Close()
   177  	if err != nil {
   178  		connectionInfo.Connection().Close()
   179  		return nil, fmt.Errorf("error closing response body: %v", err)
   180  	}
   181  	return connectionInfo.Connection(), nil
   182  }
   183  

View as plain text