...

Source file src/k8s.io/apimachinery/pkg/util/httpstream/wsstream/conn.go

Documentation: k8s.io/apimachinery/pkg/util/httpstream/wsstream

     1  /*
     2  Copyright 2015 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package wsstream
    18  
    19  import (
    20  	"encoding/base64"
    21  	"fmt"
    22  	"io"
    23  	"net/http"
    24  	"strings"
    25  	"time"
    26  
    27  	"golang.org/x/net/websocket"
    28  
    29  	"k8s.io/apimachinery/pkg/util/httpstream"
    30  	"k8s.io/apimachinery/pkg/util/portforward"
    31  	"k8s.io/apimachinery/pkg/util/remotecommand"
    32  	"k8s.io/apimachinery/pkg/util/runtime"
    33  	"k8s.io/klog/v2"
    34  )
    35  
    36  const WebSocketProtocolHeader = "Sec-Websocket-Protocol"
    37  
    38  // The Websocket subprotocol "channel.k8s.io" prepends each binary message with a byte indicating
    39  // the channel number (zero indexed) the message was sent on. Messages in both directions should
    40  // prefix their messages with this channel byte. When used for remote execution, the channel numbers
    41  // are by convention defined to match the POSIX file-descriptors assigned to STDIN, STDOUT, and STDERR
    42  // (0, 1, and 2). No other conversion is performed on the raw subprotocol - writes are sent as they
    43  // are received by the server.
    44  //
    45  // Example client session:
    46  //
    47  //	CONNECT http://server.com with subprotocol "channel.k8s.io"
    48  //	WRITE []byte{0, 102, 111, 111, 10} # send "foo\n" on channel 0 (STDIN)
    49  //	READ  []byte{1, 10}                # receive "\n" on channel 1 (STDOUT)
    50  //	CLOSE
    51  const ChannelWebSocketProtocol = "channel.k8s.io"
    52  
    53  // The Websocket subprotocol "base64.channel.k8s.io" base64 encodes each message with a character
    54  // indicating the channel number (zero indexed) the message was sent on. Messages in both directions
    55  // should prefix their messages with this channel char. When used for remote execution, the channel
    56  // numbers are by convention defined to match the POSIX file-descriptors assigned to STDIN, STDOUT,
    57  // and STDERR ('0', '1', and '2'). The data received on the server is base64 decoded (and must be
    58  // be valid) and data written by the server to the client is base64 encoded.
    59  //
    60  // Example client session:
    61  //
    62  //	CONNECT http://server.com with subprotocol "base64.channel.k8s.io"
    63  //	WRITE []byte{48, 90, 109, 57, 118, 67, 103, 111, 61} # send "foo\n" (base64: "Zm9vCgo=") on channel '0' (STDIN)
    64  //	READ  []byte{49, 67, 103, 61, 61} # receive "\n" (base64: "Cg==") on channel '1' (STDOUT)
    65  //	CLOSE
    66  const Base64ChannelWebSocketProtocol = "base64.channel.k8s.io"
    67  
    68  type codecType int
    69  
    70  const (
    71  	rawCodec codecType = iota
    72  	base64Codec
    73  )
    74  
    75  type ChannelType int
    76  
    77  const (
    78  	IgnoreChannel ChannelType = iota
    79  	ReadChannel
    80  	WriteChannel
    81  	ReadWriteChannel
    82  )
    83  
    84  // IsWebSocketRequest returns true if the incoming request contains connection upgrade headers
    85  // for WebSockets.
    86  func IsWebSocketRequest(req *http.Request) bool {
    87  	if !strings.EqualFold(req.Header.Get("Upgrade"), "websocket") {
    88  		return false
    89  	}
    90  	return httpstream.IsUpgradeRequest(req)
    91  }
    92  
    93  // IsWebSocketRequestWithStreamCloseProtocol returns true if the request contains headers
    94  // identifying that it is requesting a websocket upgrade with a remotecommand protocol
    95  // version that supports the "CLOSE" signal; false otherwise.
    96  func IsWebSocketRequestWithStreamCloseProtocol(req *http.Request) bool {
    97  	if !IsWebSocketRequest(req) {
    98  		return false
    99  	}
   100  	requestedProtocols := strings.TrimSpace(req.Header.Get(WebSocketProtocolHeader))
   101  	for _, requestedProtocol := range strings.Split(requestedProtocols, ",") {
   102  		if protocolSupportsStreamClose(strings.TrimSpace(requestedProtocol)) {
   103  			return true
   104  		}
   105  	}
   106  
   107  	return false
   108  }
   109  
   110  // IsWebSocketRequestWithTunnelingProtocol returns true if the request contains headers
   111  // identifying that it is requesting a websocket upgrade with a tunneling protocol;
   112  // false otherwise.
   113  func IsWebSocketRequestWithTunnelingProtocol(req *http.Request) bool {
   114  	if !IsWebSocketRequest(req) {
   115  		return false
   116  	}
   117  	requestedProtocols := strings.TrimSpace(req.Header.Get(WebSocketProtocolHeader))
   118  	for _, requestedProtocol := range strings.Split(requestedProtocols, ",") {
   119  		if protocolSupportsWebsocketTunneling(strings.TrimSpace(requestedProtocol)) {
   120  			return true
   121  		}
   122  	}
   123  
   124  	return false
   125  }
   126  
   127  // IgnoreReceives reads from a WebSocket until it is closed, then returns. If timeout is set, the
   128  // read and write deadlines are pushed every time a new message is received.
   129  func IgnoreReceives(ws *websocket.Conn, timeout time.Duration) {
   130  	defer runtime.HandleCrash()
   131  	var data []byte
   132  	for {
   133  		resetTimeout(ws, timeout)
   134  		if err := websocket.Message.Receive(ws, &data); err != nil {
   135  			return
   136  		}
   137  	}
   138  }
   139  
   140  // handshake ensures the provided user protocol matches one of the allowed protocols. It returns
   141  // no error if no protocol is specified.
   142  func handshake(config *websocket.Config, req *http.Request, allowed []string) error {
   143  	protocols := config.Protocol
   144  	if len(protocols) == 0 {
   145  		protocols = []string{""}
   146  	}
   147  
   148  	for _, protocol := range protocols {
   149  		for _, allow := range allowed {
   150  			if allow == protocol {
   151  				config.Protocol = []string{protocol}
   152  				return nil
   153  			}
   154  		}
   155  	}
   156  
   157  	return fmt.Errorf("requested protocol(s) are not supported: %v; supports %v", config.Protocol, allowed)
   158  }
   159  
   160  // ChannelProtocolConfig describes a websocket subprotocol with channels.
   161  type ChannelProtocolConfig struct {
   162  	Binary   bool
   163  	Channels []ChannelType
   164  }
   165  
   166  // NewDefaultChannelProtocols returns a channel protocol map with the
   167  // subprotocols "", "channel.k8s.io", "base64.channel.k8s.io" and the given
   168  // channels.
   169  func NewDefaultChannelProtocols(channels []ChannelType) map[string]ChannelProtocolConfig {
   170  	return map[string]ChannelProtocolConfig{
   171  		"":                             {Binary: true, Channels: channels},
   172  		ChannelWebSocketProtocol:       {Binary: true, Channels: channels},
   173  		Base64ChannelWebSocketProtocol: {Binary: false, Channels: channels},
   174  	}
   175  }
   176  
   177  // Conn supports sending multiple binary channels over a websocket connection.
   178  type Conn struct {
   179  	protocols        map[string]ChannelProtocolConfig
   180  	selectedProtocol string
   181  	channels         []*websocketChannel
   182  	codec            codecType
   183  	ready            chan struct{}
   184  	ws               *websocket.Conn
   185  	timeout          time.Duration
   186  }
   187  
   188  // NewConn creates a WebSocket connection that supports a set of channels. Channels begin each
   189  // web socket message with a single byte indicating the channel number (0-N). 255 is reserved for
   190  // future use. The channel types for each channel are passed as an array, supporting the different
   191  // duplex modes. Read and Write refer to whether the channel can be used as a Reader or Writer.
   192  //
   193  // The protocols parameter maps subprotocol names to ChannelProtocols. The empty string subprotocol
   194  // name is used if websocket.Config.Protocol is empty.
   195  func NewConn(protocols map[string]ChannelProtocolConfig) *Conn {
   196  	return &Conn{
   197  		ready:     make(chan struct{}),
   198  		protocols: protocols,
   199  	}
   200  }
   201  
   202  // SetIdleTimeout sets the interval for both reads and writes before timeout. If not specified,
   203  // there is no timeout on the connection.
   204  func (conn *Conn) SetIdleTimeout(duration time.Duration) {
   205  	conn.timeout = duration
   206  }
   207  
   208  // SetWriteDeadline sets a timeout on writing to the websocket connection. The
   209  // passed "duration" identifies how far into the future the write must complete
   210  // by before the timeout fires.
   211  func (conn *Conn) SetWriteDeadline(duration time.Duration) {
   212  	conn.ws.SetWriteDeadline(time.Now().Add(duration)) //nolint:errcheck
   213  }
   214  
   215  // Open the connection and create channels for reading and writing. It returns
   216  // the selected subprotocol, a slice of channels and an error.
   217  func (conn *Conn) Open(w http.ResponseWriter, req *http.Request) (string, []io.ReadWriteCloser, error) {
   218  	// serveHTTPComplete is channel that is closed/selected when "websocket#ServeHTTP" finishes.
   219  	serveHTTPComplete := make(chan struct{})
   220  	// Ensure panic in spawned goroutine is propagated into the parent goroutine.
   221  	panicChan := make(chan any, 1)
   222  	go func() {
   223  		// If websocket server returns, propagate panic if necessary. Otherwise,
   224  		// signal HTTPServe finished by closing "serveHTTPComplete".
   225  		defer func() {
   226  			if p := recover(); p != nil {
   227  				panicChan <- p
   228  			} else {
   229  				close(serveHTTPComplete)
   230  			}
   231  		}()
   232  		websocket.Server{Handshake: conn.handshake, Handler: conn.handle}.ServeHTTP(w, req)
   233  	}()
   234  
   235  	// In normal circumstances, "websocket.Server#ServeHTTP" calls "initialize" which closes
   236  	// "conn.ready" and then blocks until serving is complete.
   237  	select {
   238  	case <-conn.ready:
   239  		klog.V(8).Infof("websocket server initialized--serving")
   240  	case <-serveHTTPComplete:
   241  		// websocket server returned before completing initialization; cleanup and return error.
   242  		conn.closeNonThreadSafe() //nolint:errcheck
   243  		return "", nil, fmt.Errorf("websocket server finished before becoming ready")
   244  	case p := <-panicChan:
   245  		panic(p)
   246  	}
   247  
   248  	rwc := make([]io.ReadWriteCloser, len(conn.channels))
   249  	for i := range conn.channels {
   250  		rwc[i] = conn.channels[i]
   251  	}
   252  	return conn.selectedProtocol, rwc, nil
   253  }
   254  
   255  func (conn *Conn) initialize(ws *websocket.Conn) {
   256  	negotiated := ws.Config().Protocol
   257  	conn.selectedProtocol = negotiated[0]
   258  	p := conn.protocols[conn.selectedProtocol]
   259  	if p.Binary {
   260  		conn.codec = rawCodec
   261  	} else {
   262  		conn.codec = base64Codec
   263  	}
   264  	conn.ws = ws
   265  	conn.channels = make([]*websocketChannel, len(p.Channels))
   266  	for i, t := range p.Channels {
   267  		switch t {
   268  		case ReadChannel:
   269  			conn.channels[i] = newWebsocketChannel(conn, byte(i), true, false)
   270  		case WriteChannel:
   271  			conn.channels[i] = newWebsocketChannel(conn, byte(i), false, true)
   272  		case ReadWriteChannel:
   273  			conn.channels[i] = newWebsocketChannel(conn, byte(i), true, true)
   274  		case IgnoreChannel:
   275  			conn.channels[i] = newWebsocketChannel(conn, byte(i), false, false)
   276  		}
   277  	}
   278  
   279  	close(conn.ready)
   280  }
   281  
   282  func (conn *Conn) handshake(config *websocket.Config, req *http.Request) error {
   283  	supportedProtocols := make([]string, 0, len(conn.protocols))
   284  	for p := range conn.protocols {
   285  		supportedProtocols = append(supportedProtocols, p)
   286  	}
   287  	return handshake(config, req, supportedProtocols)
   288  }
   289  
   290  func (conn *Conn) resetTimeout() {
   291  	if conn.timeout > 0 {
   292  		conn.ws.SetDeadline(time.Now().Add(conn.timeout))
   293  	}
   294  }
   295  
   296  // closeNonThreadSafe cleans up by closing streams and the websocket
   297  // connection *without* waiting for the "ready" channel.
   298  func (conn *Conn) closeNonThreadSafe() error {
   299  	for _, s := range conn.channels {
   300  		s.Close()
   301  	}
   302  	var err error
   303  	if conn.ws != nil {
   304  		err = conn.ws.Close()
   305  	}
   306  	return err
   307  }
   308  
   309  // Close is only valid after Open has been called
   310  func (conn *Conn) Close() error {
   311  	<-conn.ready
   312  	return conn.closeNonThreadSafe()
   313  }
   314  
   315  // protocolSupportsStreamClose returns true if the passed protocol
   316  // supports the stream close signal (currently only V5 remotecommand);
   317  // false otherwise.
   318  func protocolSupportsStreamClose(protocol string) bool {
   319  	return protocol == remotecommand.StreamProtocolV5Name
   320  }
   321  
   322  // protocolSupportsWebsocketTunneling returns true if the passed protocol
   323  // is a tunneled Kubernetes spdy protocol; false otherwise.
   324  func protocolSupportsWebsocketTunneling(protocol string) bool {
   325  	return strings.HasPrefix(protocol, portforward.WebsocketsSPDYTunnelingPrefix) && strings.HasSuffix(protocol, portforward.KubernetesSuffix)
   326  }
   327  
   328  // handle implements a websocket handler.
   329  func (conn *Conn) handle(ws *websocket.Conn) {
   330  	conn.initialize(ws)
   331  	defer conn.Close()
   332  	supportsStreamClose := protocolSupportsStreamClose(conn.selectedProtocol)
   333  
   334  	for {
   335  		conn.resetTimeout()
   336  		var data []byte
   337  		if err := websocket.Message.Receive(ws, &data); err != nil {
   338  			if err != io.EOF {
   339  				klog.Errorf("Error on socket receive: %v", err)
   340  			}
   341  			break
   342  		}
   343  		if len(data) == 0 {
   344  			continue
   345  		}
   346  		if supportsStreamClose && data[0] == remotecommand.StreamClose {
   347  			if len(data) != 2 {
   348  				klog.Errorf("Single channel byte should follow stream close signal. Got %d bytes", len(data)-1)
   349  				break
   350  			} else {
   351  				channel := data[1]
   352  				if int(channel) >= len(conn.channels) {
   353  					klog.Errorf("Close is targeted for a channel %d that is not valid, possible protocol error", channel)
   354  					break
   355  				}
   356  				klog.V(4).Infof("Received half-close signal from client; close %d stream", channel)
   357  				conn.channels[channel].Close() // After first Close, other closes are noop.
   358  			}
   359  			continue
   360  		}
   361  		channel := data[0]
   362  		if conn.codec == base64Codec {
   363  			channel = channel - '0'
   364  		}
   365  		data = data[1:]
   366  		if int(channel) >= len(conn.channels) {
   367  			klog.V(6).Infof("Frame is targeted for a reader %d that is not valid, possible protocol error", channel)
   368  			continue
   369  		}
   370  		if _, err := conn.channels[channel].DataFromSocket(data); err != nil {
   371  			klog.Errorf("Unable to write frame (%d bytes) to %d: %v", len(data), channel, err)
   372  			continue
   373  		}
   374  	}
   375  }
   376  
   377  // write multiplexes the specified channel onto the websocket
   378  func (conn *Conn) write(num byte, data []byte) (int, error) {
   379  	conn.resetTimeout()
   380  	switch conn.codec {
   381  	case rawCodec:
   382  		frame := make([]byte, len(data)+1)
   383  		frame[0] = num
   384  		copy(frame[1:], data)
   385  		if err := websocket.Message.Send(conn.ws, frame); err != nil {
   386  			return 0, err
   387  		}
   388  	case base64Codec:
   389  		frame := string('0'+num) + base64.StdEncoding.EncodeToString(data)
   390  		if err := websocket.Message.Send(conn.ws, frame); err != nil {
   391  			return 0, err
   392  		}
   393  	}
   394  	return len(data), nil
   395  }
   396  
   397  // websocketChannel represents a channel in a connection
   398  type websocketChannel struct {
   399  	conn *Conn
   400  	num  byte
   401  	r    io.Reader
   402  	w    io.WriteCloser
   403  
   404  	read, write bool
   405  }
   406  
   407  // newWebsocketChannel creates a pipe for writing to a websocket. Do not write to this pipe
   408  // prior to the connection being opened. It may be no, half, or full duplex depending on
   409  // read and write.
   410  func newWebsocketChannel(conn *Conn, num byte, read, write bool) *websocketChannel {
   411  	r, w := io.Pipe()
   412  	return &websocketChannel{conn, num, r, w, read, write}
   413  }
   414  
   415  func (p *websocketChannel) Write(data []byte) (int, error) {
   416  	if !p.write {
   417  		return len(data), nil
   418  	}
   419  	return p.conn.write(p.num, data)
   420  }
   421  
   422  // DataFromSocket is invoked by the connection receiver to move data from the connection
   423  // into a specific channel.
   424  func (p *websocketChannel) DataFromSocket(data []byte) (int, error) {
   425  	if !p.read {
   426  		return len(data), nil
   427  	}
   428  
   429  	switch p.conn.codec {
   430  	case rawCodec:
   431  		return p.w.Write(data)
   432  	case base64Codec:
   433  		dst := make([]byte, len(data))
   434  		n, err := base64.StdEncoding.Decode(dst, data)
   435  		if err != nil {
   436  			return 0, err
   437  		}
   438  		return p.w.Write(dst[:n])
   439  	}
   440  	return 0, nil
   441  }
   442  
   443  func (p *websocketChannel) Read(data []byte) (int, error) {
   444  	if !p.read {
   445  		return 0, io.EOF
   446  	}
   447  	return p.r.Read(data)
   448  }
   449  
   450  func (p *websocketChannel) Close() error {
   451  	return p.w.Close()
   452  }
   453  

View as plain text