...

Source file src/k8s.io/client-go/tools/remotecommand/websocket.go

Documentation: k8s.io/client-go/tools/remotecommand

     1  /*
     2  Copyright 2023 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package remotecommand
    18  
    19  import (
    20  	"context"
    21  	"errors"
    22  	"fmt"
    23  	"io"
    24  	"net"
    25  	"net/http"
    26  	"sync"
    27  	"time"
    28  
    29  	gwebsocket "github.com/gorilla/websocket"
    30  
    31  	v1 "k8s.io/api/core/v1"
    32  	"k8s.io/apimachinery/pkg/util/httpstream"
    33  	"k8s.io/apimachinery/pkg/util/remotecommand"
    34  	restclient "k8s.io/client-go/rest"
    35  	"k8s.io/client-go/transport/websocket"
    36  	"k8s.io/klog/v2"
    37  )
    38  
    39  // writeDeadline defines the time that a client-side write to the websocket
    40  // connection must complete before an i/o timeout occurs.
    41  const writeDeadline = 60 * time.Second
    42  
    43  var (
    44  	_ Executor          = &wsStreamExecutor{}
    45  	_ streamCreator     = &wsStreamCreator{}
    46  	_ httpstream.Stream = &stream{}
    47  
    48  	streamType2streamID = map[string]byte{
    49  		v1.StreamTypeStdin:  remotecommand.StreamStdIn,
    50  		v1.StreamTypeStdout: remotecommand.StreamStdOut,
    51  		v1.StreamTypeStderr: remotecommand.StreamStdErr,
    52  		v1.StreamTypeError:  remotecommand.StreamErr,
    53  		v1.StreamTypeResize: remotecommand.StreamResize,
    54  	}
    55  )
    56  
    57  const (
    58  	// pingPeriod defines how often a heartbeat "ping" message is sent.
    59  	pingPeriod = 5 * time.Second
    60  	// pingReadDeadline defines the time waiting for a response heartbeat
    61  	// "pong" message before a timeout error occurs for websocket reading.
    62  	// This duration must always be greater than the "pingPeriod". By defining
    63  	// this deadline in terms of the ping period, we are essentially saying
    64  	// we can drop "X" (e.g. 12) pings before firing the timeout.
    65  	pingReadDeadline = (pingPeriod * 12) + (1 * time.Second)
    66  )
    67  
    68  // wsStreamExecutor handles transporting standard shell streams over an httpstream connection.
    69  type wsStreamExecutor struct {
    70  	transport http.RoundTripper
    71  	upgrader  websocket.ConnectionHolder
    72  	method    string
    73  	url       string
    74  	// requested protocols in priority order (e.g. v5.channel.k8s.io before v4.channel.k8s.io).
    75  	protocols []string
    76  	// selected protocol from the handshake process; could be empty string if handshake fails.
    77  	negotiated string
    78  	// period defines how often a "ping" heartbeat message is sent to the other endpoint.
    79  	heartbeatPeriod time.Duration
    80  	// deadline defines the amount of time before "pong" response must be received.
    81  	heartbeatDeadline time.Duration
    82  }
    83  
    84  func NewWebSocketExecutor(config *restclient.Config, method, url string) (Executor, error) {
    85  	// Only supports V5 protocol for correct version skew functionality.
    86  	// Previous api servers will proxy upgrade requests to legacy websocket
    87  	// servers on container runtimes which support V1-V4. These legacy
    88  	// websocket servers will not handle the new CLOSE signal.
    89  	return NewWebSocketExecutorForProtocols(config, method, url, remotecommand.StreamProtocolV5Name)
    90  }
    91  
    92  // NewWebSocketExecutorForProtocols allows to execute commands via a WebSocket connection.
    93  func NewWebSocketExecutorForProtocols(config *restclient.Config, method, url string, protocols ...string) (Executor, error) {
    94  	transport, upgrader, err := websocket.RoundTripperFor(config)
    95  	if err != nil {
    96  		return nil, fmt.Errorf("error creating websocket transports: %v", err)
    97  	}
    98  	return &wsStreamExecutor{
    99  		transport:         transport,
   100  		upgrader:          upgrader,
   101  		method:            method,
   102  		url:               url,
   103  		protocols:         protocols,
   104  		heartbeatPeriod:   pingPeriod,
   105  		heartbeatDeadline: pingReadDeadline,
   106  	}, nil
   107  }
   108  
   109  // Deprecated: use StreamWithContext instead to avoid possible resource leaks.
   110  // See https://github.com/kubernetes/kubernetes/pull/103177 for details.
   111  func (e *wsStreamExecutor) Stream(options StreamOptions) error {
   112  	return e.StreamWithContext(context.Background(), options)
   113  }
   114  
   115  // StreamWithContext upgrades an HTTPRequest to a WebSocket connection, and starts the various
   116  // goroutines to implement the necessary streams over the connection. The "options" parameter
   117  // defines which streams are requested. Returns an error if one occurred. This method is NOT
   118  // safe to run concurrently with the same executor (because of the state stored in the upgrader).
   119  func (e *wsStreamExecutor) StreamWithContext(ctx context.Context, options StreamOptions) error {
   120  	req, err := http.NewRequestWithContext(ctx, e.method, e.url, nil)
   121  	if err != nil {
   122  		return err
   123  	}
   124  	conn, err := websocket.Negotiate(e.transport, e.upgrader, req, e.protocols...)
   125  	if err != nil {
   126  		return err
   127  	}
   128  	if conn == nil {
   129  		panic(fmt.Errorf("websocket connection is nil"))
   130  	}
   131  	defer conn.Close()
   132  	e.negotiated = conn.Subprotocol()
   133  	klog.V(4).Infof("The subprotocol is %s", e.negotiated)
   134  
   135  	var streamer streamProtocolHandler
   136  	switch e.negotiated {
   137  	case remotecommand.StreamProtocolV5Name:
   138  		streamer = newStreamProtocolV5(options)
   139  	case remotecommand.StreamProtocolV4Name:
   140  		streamer = newStreamProtocolV4(options)
   141  	case remotecommand.StreamProtocolV3Name:
   142  		streamer = newStreamProtocolV3(options)
   143  	case remotecommand.StreamProtocolV2Name:
   144  		streamer = newStreamProtocolV2(options)
   145  	case "":
   146  		klog.V(4).Infof("The server did not negotiate a streaming protocol version. Falling back to %s", remotecommand.StreamProtocolV1Name)
   147  		fallthrough
   148  	case remotecommand.StreamProtocolV1Name:
   149  		streamer = newStreamProtocolV1(options)
   150  	}
   151  
   152  	panicChan := make(chan any, 1)
   153  	errorChan := make(chan error, 1)
   154  	go func() {
   155  		defer func() {
   156  			if p := recover(); p != nil {
   157  				panicChan <- p
   158  			}
   159  		}()
   160  		creator := newWSStreamCreator(conn)
   161  		go creator.readDemuxLoop(
   162  			e.upgrader.DataBufferSize(),
   163  			e.heartbeatPeriod,
   164  			e.heartbeatDeadline,
   165  		)
   166  		errorChan <- streamer.stream(creator)
   167  	}()
   168  
   169  	select {
   170  	case p := <-panicChan:
   171  		panic(p)
   172  	case err := <-errorChan:
   173  		return err
   174  	case <-ctx.Done():
   175  		return ctx.Err()
   176  	}
   177  }
   178  
   179  type wsStreamCreator struct {
   180  	conn *gwebsocket.Conn
   181  	// Protects writing to websocket connection; reading is lock-free
   182  	connWriteLock sync.Mutex
   183  	// map of stream id to stream; multiple streams read/write the connection
   184  	streams   map[byte]*stream
   185  	streamsMu sync.Mutex
   186  	// setStreamErr holds the error to return to anyone calling setStreams.
   187  	// this is populated in closeAllStreamReaders
   188  	setStreamErr error
   189  }
   190  
   191  func newWSStreamCreator(conn *gwebsocket.Conn) *wsStreamCreator {
   192  	return &wsStreamCreator{
   193  		conn:    conn,
   194  		streams: map[byte]*stream{},
   195  	}
   196  }
   197  
   198  func (c *wsStreamCreator) getStream(id byte) *stream {
   199  	c.streamsMu.Lock()
   200  	defer c.streamsMu.Unlock()
   201  	return c.streams[id]
   202  }
   203  
   204  func (c *wsStreamCreator) setStream(id byte, s *stream) error {
   205  	c.streamsMu.Lock()
   206  	defer c.streamsMu.Unlock()
   207  	if c.setStreamErr != nil {
   208  		return c.setStreamErr
   209  	}
   210  	c.streams[id] = s
   211  	return nil
   212  }
   213  
   214  // CreateStream uses id from passed headers to create a stream over "c.conn" connection.
   215  // Returns a Stream structure or nil and an error if one occurred.
   216  func (c *wsStreamCreator) CreateStream(headers http.Header) (httpstream.Stream, error) {
   217  	streamType := headers.Get(v1.StreamType)
   218  	id, ok := streamType2streamID[streamType]
   219  	if !ok {
   220  		return nil, fmt.Errorf("unknown stream type: %s", streamType)
   221  	}
   222  	if s := c.getStream(id); s != nil {
   223  		return nil, fmt.Errorf("duplicate stream for type %s", streamType)
   224  	}
   225  	reader, writer := io.Pipe()
   226  	s := &stream{
   227  		headers:       headers,
   228  		readPipe:      reader,
   229  		writePipe:     writer,
   230  		conn:          c.conn,
   231  		connWriteLock: &c.connWriteLock,
   232  		id:            id,
   233  	}
   234  	if err := c.setStream(id, s); err != nil {
   235  		_ = s.writePipe.Close()
   236  		_ = s.readPipe.Close()
   237  		return nil, err
   238  	}
   239  	return s, nil
   240  }
   241  
   242  // readDemuxLoop is the lock-free reading processor for this endpoint of the websocket
   243  // connection. This loop reads the connection, and demultiplexes the data
   244  // into one of the individual stream pipes (by checking the stream id). This
   245  // loop can *not* be run concurrently, because there can only be one websocket
   246  // connection reader at a time (a read mutex would provide no benefit).
   247  func (c *wsStreamCreator) readDemuxLoop(bufferSize int, period time.Duration, deadline time.Duration) {
   248  	// Initialize and start the ping/pong heartbeat.
   249  	h := newHeartbeat(c.conn, period, deadline)
   250  	// Set initial timeout for websocket connection reading.
   251  	if err := c.conn.SetReadDeadline(time.Now().Add(deadline)); err != nil {
   252  		klog.Errorf("Websocket initial setting read deadline failed %v", err)
   253  		return
   254  	}
   255  	go h.start()
   256  	// Buffer size must correspond to the same size allocated
   257  	// for the read buffer during websocket client creation. A
   258  	// difference can cause incomplete connection reads.
   259  	readBuffer := make([]byte, bufferSize)
   260  	for {
   261  		// NextReader() only returns data messages (BinaryMessage or Text
   262  		// Message). Even though this call will never return control frames
   263  		// such as ping, pong, or close, this call is necessary for these
   264  		// message types to be processed. There can only be one reader
   265  		// at a time, so this reader loop must *not* be run concurrently;
   266  		// there is no lock for reading. Calling "NextReader()" before the
   267  		// current reader has been processed will close the current reader.
   268  		// If the heartbeat read deadline times out, this "NextReader()" will
   269  		// return an i/o error, and error handling will clean up.
   270  		messageType, r, err := c.conn.NextReader()
   271  		if err != nil {
   272  			websocketErr, ok := err.(*gwebsocket.CloseError)
   273  			if ok && websocketErr.Code == gwebsocket.CloseNormalClosure {
   274  				err = nil // readers will get io.EOF as it's a normal closure
   275  			} else {
   276  				err = fmt.Errorf("next reader: %w", err)
   277  			}
   278  			c.closeAllStreamReaders(err)
   279  			return
   280  		}
   281  		// All remote command protocols send/receive only binary data messages.
   282  		if messageType != gwebsocket.BinaryMessage {
   283  			c.closeAllStreamReaders(fmt.Errorf("unexpected message type: %d", messageType))
   284  			return
   285  		}
   286  		// It's ok to read just a single byte because the underlying library wraps the actual
   287  		// connection with a buffered reader anyway.
   288  		_, err = io.ReadFull(r, readBuffer[:1])
   289  		if err != nil {
   290  			c.closeAllStreamReaders(fmt.Errorf("read stream id: %w", err))
   291  			return
   292  		}
   293  		streamID := readBuffer[0]
   294  		s := c.getStream(streamID)
   295  		if s == nil {
   296  			klog.Errorf("Unknown stream id %d, discarding message", streamID)
   297  			continue
   298  		}
   299  		for {
   300  			nr, errRead := r.Read(readBuffer)
   301  			if nr > 0 {
   302  				// Write the data to the stream's pipe. This can block.
   303  				_, errWrite := s.writePipe.Write(readBuffer[:nr])
   304  				if errWrite != nil {
   305  					// Pipe must have been closed by the stream user.
   306  					// Nothing to do, discard the message.
   307  					break
   308  				}
   309  			}
   310  			if errRead != nil {
   311  				if errRead == io.EOF {
   312  					break
   313  				}
   314  				c.closeAllStreamReaders(fmt.Errorf("read message: %w", err))
   315  				return
   316  			}
   317  		}
   318  	}
   319  }
   320  
   321  // closeAllStreamReaders closes readers in all streams.
   322  // This unblocks all stream.Read() calls, and keeps any future streams from being created.
   323  func (c *wsStreamCreator) closeAllStreamReaders(err error) {
   324  	c.streamsMu.Lock()
   325  	defer c.streamsMu.Unlock()
   326  	for _, s := range c.streams {
   327  		// Closing writePipe unblocks all readPipe.Read() callers and prevents any future writes.
   328  		_ = s.writePipe.CloseWithError(err)
   329  	}
   330  	// ensure callers to setStreams receive an error after this point
   331  	if err != nil {
   332  		c.setStreamErr = err
   333  	} else {
   334  		c.setStreamErr = fmt.Errorf("closed all streams")
   335  	}
   336  }
   337  
   338  type stream struct {
   339  	headers   http.Header
   340  	readPipe  *io.PipeReader
   341  	writePipe *io.PipeWriter
   342  	// conn is used for writing directly into the connection.
   343  	// Is nil after Close() / Reset() to prevent future writes.
   344  	conn *gwebsocket.Conn
   345  	// connWriteLock protects conn against concurrent write operations. There must be a single writer and a single reader only.
   346  	// The mutex is shared across all streams because the underlying connection is shared.
   347  	connWriteLock *sync.Mutex
   348  	id            byte
   349  }
   350  
   351  func (s *stream) Read(p []byte) (n int, err error) {
   352  	return s.readPipe.Read(p)
   353  }
   354  
   355  // Write writes directly to the underlying WebSocket connection.
   356  func (s *stream) Write(p []byte) (n int, err error) {
   357  	klog.V(4).Infof("Write() on stream %d", s.id)
   358  	defer klog.V(4).Infof("Write() done on stream %d", s.id)
   359  	s.connWriteLock.Lock()
   360  	defer s.connWriteLock.Unlock()
   361  	if s.conn == nil {
   362  		return 0, fmt.Errorf("write on closed stream %d", s.id)
   363  	}
   364  	err = s.conn.SetWriteDeadline(time.Now().Add(writeDeadline))
   365  	if err != nil {
   366  		klog.V(7).Infof("Websocket setting write deadline failed %v", err)
   367  		return 0, err
   368  	}
   369  	// Message writer buffers the message data, so we don't need to do that ourselves.
   370  	// Just write id and the data as two separate writes to avoid allocating an intermediate buffer.
   371  	w, err := s.conn.NextWriter(gwebsocket.BinaryMessage)
   372  	if err != nil {
   373  		return 0, err
   374  	}
   375  	defer func() {
   376  		if w != nil {
   377  			w.Close()
   378  		}
   379  	}()
   380  	_, err = w.Write([]byte{s.id})
   381  	if err != nil {
   382  		return 0, err
   383  	}
   384  	n, err = w.Write(p)
   385  	if err != nil {
   386  		return n, err
   387  	}
   388  	err = w.Close()
   389  	w = nil
   390  	return n, err
   391  }
   392  
   393  // Close half-closes the stream, indicating this side is finished with the stream.
   394  func (s *stream) Close() error {
   395  	klog.V(4).Infof("Close() on stream %d", s.id)
   396  	defer klog.V(4).Infof("Close() done on stream %d", s.id)
   397  	s.connWriteLock.Lock()
   398  	defer s.connWriteLock.Unlock()
   399  	if s.conn == nil {
   400  		return fmt.Errorf("Close() on already closed stream %d", s.id)
   401  	}
   402  	// Communicate the CLOSE stream signal to the other websocket endpoint.
   403  	err := s.conn.WriteMessage(gwebsocket.BinaryMessage, []byte{remotecommand.StreamClose, s.id})
   404  	s.conn = nil
   405  	return err
   406  }
   407  
   408  func (s *stream) Reset() error {
   409  	klog.V(4).Infof("Reset() on stream %d", s.id)
   410  	defer klog.V(4).Infof("Reset() done on stream %d", s.id)
   411  	s.Close()
   412  	return s.writePipe.Close()
   413  }
   414  
   415  func (s *stream) Headers() http.Header {
   416  	return s.headers
   417  }
   418  
   419  func (s *stream) Identifier() uint32 {
   420  	return uint32(s.id)
   421  }
   422  
   423  // heartbeat encasulates data necessary for the websocket ping/pong heartbeat. This
   424  // heartbeat works by setting a read deadline on the websocket connection, then
   425  // pushing this deadline into the future for every successful heartbeat. If the
   426  // heartbeat "pong" fails to respond within the deadline, then the "NextReader()" call
   427  // inside the "readDemuxLoop" will return an i/o error prompting a connection close
   428  // and cleanup.
   429  type heartbeat struct {
   430  	conn *gwebsocket.Conn
   431  	// period defines how often a "ping" heartbeat message is sent to the other endpoint
   432  	period time.Duration
   433  	// closing the "closer" channel will clean up the heartbeat timers
   434  	closer chan struct{}
   435  	// optional data to send with "ping" message
   436  	message []byte
   437  	// optionally received data message with "pong" message, same as sent with ping
   438  	pongMessage []byte
   439  }
   440  
   441  // newHeartbeat creates heartbeat structure encapsulating fields necessary to
   442  // run the websocket connection ping/pong mechanism and sets up handlers on
   443  // the websocket connection.
   444  func newHeartbeat(conn *gwebsocket.Conn, period time.Duration, deadline time.Duration) *heartbeat {
   445  	h := &heartbeat{
   446  		conn:   conn,
   447  		period: period,
   448  		closer: make(chan struct{}),
   449  	}
   450  	// Set up handler for receiving returned "pong" message from other endpoint
   451  	// by pushing the read deadline into the future. The "msg" received could
   452  	// be empty.
   453  	h.conn.SetPongHandler(func(msg string) error {
   454  		// Push the read deadline into the future.
   455  		klog.V(8).Infof("Pong message received (%s)--resetting read deadline", msg)
   456  		err := h.conn.SetReadDeadline(time.Now().Add(deadline))
   457  		if err != nil {
   458  			klog.Errorf("Websocket setting read deadline failed %v", err)
   459  			return err
   460  		}
   461  		if len(msg) > 0 {
   462  			h.pongMessage = []byte(msg)
   463  		}
   464  		return nil
   465  	})
   466  	// Set up handler to cleanup timers when this endpoint receives "Close" message.
   467  	closeHandler := h.conn.CloseHandler()
   468  	h.conn.SetCloseHandler(func(code int, text string) error {
   469  		close(h.closer)
   470  		return closeHandler(code, text)
   471  	})
   472  	return h
   473  }
   474  
   475  // setMessage is optional data sent with "ping" heartbeat. According to the websocket RFC
   476  // this data sent with "ping" message should be returned in "pong" message.
   477  func (h *heartbeat) setMessage(msg string) {
   478  	h.message = []byte(msg)
   479  }
   480  
   481  // start the heartbeat by setting up necesssary handlers and looping by sending "ping"
   482  // message every "period" until the "closer" channel is closed.
   483  func (h *heartbeat) start() {
   484  	// Loop to continually send "ping" message through websocket connection every "period".
   485  	t := time.NewTicker(h.period)
   486  	defer t.Stop()
   487  	for {
   488  		select {
   489  		case <-h.closer:
   490  			klog.V(8).Infof("closed channel--returning")
   491  			return
   492  		case <-t.C:
   493  			// "WriteControl" does not need to be protected by a mutex. According to
   494  			// gorilla/websockets library docs: "The Close and WriteControl methods can
   495  			// be called concurrently with all other methods."
   496  			if err := h.conn.WriteControl(gwebsocket.PingMessage, h.message, time.Now().Add(pingReadDeadline)); err == nil {
   497  				klog.V(8).Infof("Websocket Ping succeeeded")
   498  			} else {
   499  				klog.Errorf("Websocket Ping failed: %v", err)
   500  				if errors.Is(err, gwebsocket.ErrCloseSent) {
   501  					// we continue because c.conn.CloseChan will manage closing the connection already
   502  					continue
   503  				} else if e, ok := err.(net.Error); ok && e.Timeout() {
   504  					// Continue, in case this is a transient failure.
   505  					// c.conn.CloseChan above will tell us when the connection is
   506  					// actually closed.
   507  					// If Temporary function hadn't been deprecated, we would have used it.
   508  					// But most of temporary errors are timeout errors anyway.
   509  					continue
   510  				}
   511  				return
   512  			}
   513  		}
   514  	}
   515  }
   516  

View as plain text