...

Source file src/k8s.io/client-go/tools/portforward/tunneling_connection.go

Documentation: k8s.io/client-go/tools/portforward

     1  /*
     2  Copyright 2023 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package portforward
    18  
    19  import (
    20  	"errors"
    21  	"fmt"
    22  	"io"
    23  	"net"
    24  	"sync"
    25  	"time"
    26  
    27  	gwebsocket "github.com/gorilla/websocket"
    28  
    29  	"k8s.io/klog/v2"
    30  )
    31  
    32  var _ net.Conn = &TunnelingConnection{}
    33  
    34  // TunnelingConnection implements the "httpstream.Connection" interface, wrapping
    35  // a websocket connection that tunnels SPDY.
    36  type TunnelingConnection struct {
    37  	name              string
    38  	conn              *gwebsocket.Conn
    39  	inProgressMessage io.Reader
    40  	closeOnce         sync.Once
    41  }
    42  
    43  // NewTunnelingConnection wraps the passed gorilla/websockets connection
    44  // with the TunnelingConnection struct (implementing net.Conn).
    45  func NewTunnelingConnection(name string, conn *gwebsocket.Conn) *TunnelingConnection {
    46  	return &TunnelingConnection{
    47  		name: name,
    48  		conn: conn,
    49  	}
    50  }
    51  
    52  // Read implements "io.Reader" interface, reading from the stored connection
    53  // into the passed buffer "p". Returns the number of bytes read and an error.
    54  // Can keep track of the "inProgress" messsage from the tunneled connection.
    55  func (c *TunnelingConnection) Read(p []byte) (int, error) {
    56  	klog.V(7).Infof("%s: tunneling connection read...", c.name)
    57  	defer klog.V(7).Infof("%s: tunneling connection read...complete", c.name)
    58  	for {
    59  		if c.inProgressMessage == nil {
    60  			klog.V(8).Infof("%s: tunneling connection read before NextReader()...", c.name)
    61  			messageType, nextReader, err := c.conn.NextReader()
    62  			if err != nil {
    63  				closeError := &gwebsocket.CloseError{}
    64  				if errors.As(err, &closeError) && closeError.Code == gwebsocket.CloseNormalClosure {
    65  					return 0, io.EOF
    66  				}
    67  				klog.V(4).Infof("%s:tunneling connection NextReader() error: %v", c.name, err)
    68  				return 0, err
    69  			}
    70  			if messageType != gwebsocket.BinaryMessage {
    71  				return 0, fmt.Errorf("invalid message type received")
    72  			}
    73  			c.inProgressMessage = nextReader
    74  		}
    75  		klog.V(8).Infof("%s: tunneling connection read in progress message...", c.name)
    76  		i, err := c.inProgressMessage.Read(p)
    77  		if i == 0 && err == io.EOF {
    78  			c.inProgressMessage = nil
    79  		} else {
    80  			klog.V(8).Infof("%s: read %d bytes, error=%v, bytes=% X", c.name, i, err, p[:i])
    81  			return i, err
    82  		}
    83  	}
    84  }
    85  
    86  // Write implements "io.Writer" interface, copying the data in the passed
    87  // byte array "p" into the stored tunneled connection. Returns the number
    88  // of bytes written and an error.
    89  func (c *TunnelingConnection) Write(p []byte) (n int, err error) {
    90  	klog.V(7).Infof("%s: write: %d bytes, bytes=% X", c.name, len(p), p)
    91  	defer klog.V(7).Infof("%s: tunneling connection write...complete", c.name)
    92  	w, err := c.conn.NextWriter(gwebsocket.BinaryMessage)
    93  	if err != nil {
    94  		return 0, err
    95  	}
    96  	defer func() {
    97  		// close, which flushes the message
    98  		closeErr := w.Close()
    99  		if closeErr != nil && err == nil {
   100  			// if closing/flushing errored and we weren't already returning an error, return the close error
   101  			err = closeErr
   102  		}
   103  	}()
   104  
   105  	n, err = w.Write(p)
   106  	return
   107  }
   108  
   109  // Close implements "io.Closer" interface, signaling the other tunneled connection
   110  // endpoint, and closing the tunneled connection only once.
   111  func (c *TunnelingConnection) Close() error {
   112  	var err error
   113  	c.closeOnce.Do(func() {
   114  		klog.V(7).Infof("%s: tunneling connection Close()...", c.name)
   115  		// Signal other endpoint that websocket connection is closing; ignore error.
   116  		normalCloseMsg := gwebsocket.FormatCloseMessage(gwebsocket.CloseNormalClosure, "")
   117  		writeControlErr := c.conn.WriteControl(gwebsocket.CloseMessage, normalCloseMsg, time.Now().Add(time.Second))
   118  		closeErr := c.conn.Close()
   119  		if closeErr != nil {
   120  			err = closeErr
   121  		} else if writeControlErr != nil {
   122  			err = writeControlErr
   123  		}
   124  	})
   125  	return err
   126  }
   127  
   128  // LocalAddr implements part of the "net.Conn" interface, returning the local
   129  // endpoint network address of the tunneled connection.
   130  func (c *TunnelingConnection) LocalAddr() net.Addr {
   131  	return c.conn.LocalAddr()
   132  }
   133  
   134  // LocalAddr implements part of the "net.Conn" interface, returning the remote
   135  // endpoint network address of the tunneled connection.
   136  func (c *TunnelingConnection) RemoteAddr() net.Addr {
   137  	return c.conn.RemoteAddr()
   138  }
   139  
   140  // SetDeadline sets the *absolute* time in the future for both
   141  // read and write deadlines. Returns an error if one occurs.
   142  func (c *TunnelingConnection) SetDeadline(t time.Time) error {
   143  	rerr := c.SetReadDeadline(t)
   144  	werr := c.SetWriteDeadline(t)
   145  	return errors.Join(rerr, werr)
   146  }
   147  
   148  // SetDeadline sets the *absolute* time in the future for the
   149  // read deadlines. Returns an error if one occurs.
   150  func (c *TunnelingConnection) SetReadDeadline(t time.Time) error {
   151  	return c.conn.SetReadDeadline(t)
   152  }
   153  
   154  // SetDeadline sets the *absolute* time in the future for the
   155  // write deadlines. Returns an error if one occurs.
   156  func (c *TunnelingConnection) SetWriteDeadline(t time.Time) error {
   157  	return c.conn.SetWriteDeadline(t)
   158  }
   159  

View as plain text