     1  /*
     2  Copyright 2015 The Kubernetes Authors.
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     8      http://www.apache.org/licenses/LICENSE-2.0
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    17  // OWNER = sig/cli
    19  package kubectl
    21  import (
    22  	"bytes"
    23  	"context"
    24  	"encoding/binary"
    25  	"fmt"
    26  	"io"
    27  	"net"
    28  	"os/exec"
    29  	"regexp"
    30  	"strconv"
    31  	"strings"
    32  	"syscall"
    33  	"time"
    35  	"golang.org/x/net/websocket"
    36  	v1 "k8s.io/api/core/v1"
    37  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    38  	"k8s.io/apimachinery/pkg/util/wait"
    39  	"k8s.io/kubernetes/test/e2e/framework"
    40  	e2ekubectl "k8s.io/kubernetes/test/e2e/framework/kubectl"
    41  	e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
    42  	e2ewebsocket "k8s.io/kubernetes/test/e2e/framework/websocket"
    43  	testutils "k8s.io/kubernetes/test/utils"
    44  	imageutils "k8s.io/kubernetes/test/utils/image"
    45  	admissionapi "k8s.io/pod-security-admission/api"
    47  	"github.com/onsi/ginkgo/v2"
    48  	"github.com/onsi/gomega"
    49  )
    51  const (
    52  	podName = "pfpod"
    53  )
    55  const (
    56  	podCheckInterval     = 1 * time.Second
    57  	postStartWaitTimeout = 2 * time.Minute
    58  )
    60  // TODO support other ports besides 80
    61  var (
    62  	portForwardRegexp = regexp.MustCompile("Forwarding from (|\\[::1\\]):([0-9]+) -> 80")
    63  )
    65  func pfPod(expectedClientData, chunks, chunkSize, chunkIntervalMillis string, bindAddress string) *v1.Pod {
    66  	return &v1.Pod{
    67  		ObjectMeta: metav1.ObjectMeta{
    68  			Name:   podName,
    69  			Labels: map[string]string{"name": podName},
    70  		},
    71  		Spec: v1.PodSpec{
    72  			Containers: []v1.Container{
    73  				{
    74  					Name:  "readiness",
    75  					Image: imageutils.GetE2EImage(imageutils.Agnhost),
    76  					Args:  []string{"netexec"},
    77  					ReadinessProbe: &v1.Probe{
    78  						ProbeHandler: v1.ProbeHandler{
    79  							Exec: &v1.ExecAction{
    80  								Command: []string{
    81  									"sh", "-c", "netstat -na | grep LISTEN | grep -v 8080 | grep 80",
    82  								}},
    83  						},
    84  						InitialDelaySeconds: 5,
    85  						TimeoutSeconds:      60,
    86  						PeriodSeconds:       1,
    87  					},
    88  				},
    89  				{
    90  					Name:  "portforwardtester",
    91  					Image: imageutils.GetE2EImage(imageutils.Agnhost),
    92  					Args:  []string{"port-forward-tester"},
    93  					Env: []v1.EnvVar{
    94  						{
    95  							Name:  "BIND_PORT",
    96  							Value: "80",
    97  						},
    98  						{
    99  							Name:  "EXPECTED_CLIENT_DATA",
   100  							Value: expectedClientData,
   101  						},
   102  						{
   103  							Name:  "CHUNKS",
   104  							Value: chunks,
   105  						},
   106  						{
   107  							Name:  "CHUNK_SIZE",
   108  							Value: chunkSize,
   109  						},
   110  						{
   111  							Name:  "CHUNK_INTERVAL",
   112  							Value: chunkIntervalMillis,
   113  						},
   114  						{
   115  							Name:  "BIND_ADDRESS",
   116  							Value: bindAddress,
   117  						},
   118  					},
   119  				},
   120  			},
   121  			RestartPolicy: v1.RestartPolicyNever,
   122  		},
   123  	}
   124  }
   126  // WaitForTerminatedContainer waits till a given container be terminated for a given pod.
   127  func WaitForTerminatedContainer(ctx context.Context, f *framework.Framework, pod *v1.Pod, containerName string) error {
   128  	return e2epod.WaitForPodCondition(ctx, f.ClientSet, f.Namespace.Name, pod.Name, "container terminated", framework.PodStartTimeout, func(pod *v1.Pod) (bool, error) {
   129  		if len(testutils.TerminatedContainers(pod)[containerName]) > 0 {
   130  			return true, nil
   131  		}
   132  		return false, nil
   133  	})
   134  }
   136  type portForwardCommand struct {
   137  	cmd  *exec.Cmd
   138  	port int
   139  }
   141  // Stop attempts to gracefully stop `kubectl port-forward`, only killing it if necessary.
   142  // This helps avoid spdy goroutine leaks in the Kubelet.
   143  func (c *portForwardCommand) Stop() {
   144  	// SIGINT signals that kubectl port-forward should gracefully terminate
   145  	if err := c.cmd.Process.Signal(syscall.SIGINT); err != nil {
   146  		framework.Logf("error sending SIGINT to kubectl port-forward: %v", err)
   147  	}
   149  	// try to wait for a clean exit
   150  	done := make(chan error)
   151  	go func() {
   152  		done <- c.cmd.Wait()
   153  	}()
   155  	expired := time.NewTimer(wait.ForeverTestTimeout)
   156  	defer expired.Stop()
   158  	select {
   159  	case err := <-done:
   160  		if err == nil {
   161  			// success
   162  			return
   163  		}
   164  		framework.Logf("error waiting for kubectl port-forward to exit: %v", err)
   165  	case <-expired.C:
   166  		framework.Logf("timed out waiting for kubectl port-forward to exit")
   167  	}
   169  	framework.Logf("trying to forcibly kill kubectl port-forward")
   170  	framework.TryKill(c.cmd)
   171  }
   173  // runPortForward runs port-forward, warning, this may need root functionality on some systems.
   174  func runPortForward(ns, podName string, port int) *portForwardCommand {
   175  	tk := e2ekubectl.NewTestKubeconfig(framework.TestContext.CertDir, framework.TestContext.Host, framework.TestContext.KubeConfig, framework.TestContext.KubeContext, framework.TestContext.KubectlPath, ns)
   176  	cmd := tk.KubectlCmd("port-forward", fmt.Sprintf("--namespace=%v", ns), podName, fmt.Sprintf(":%d", port))
   177  	// This is somewhat ugly but is the only way to retrieve the port that was picked
   178  	// by the port-forward command. We don't want to hard code the port as we have no
   179  	// way of guaranteeing we can pick one that isn't in use, particularly on Jenkins.
   180  	framework.Logf("starting port-forward command and streaming output")
   181  	portOutput, _, err := framework.StartCmdAndStreamOutput(cmd)
   182  	if err != nil {
   183  		framework.Failf("Failed to start port-forward command: %v", err)
   184  	}
   186  	buf := make([]byte, 128)
   188  	var n int
   189  	framework.Logf("reading from `kubectl port-forward` command's stdout")
   190  	if n, err = portOutput.Read(buf); err != nil {
   191  		framework.Failf("Failed to read from kubectl port-forward stdout: %v", err)
   192  	}
   193  	portForwardOutput := string(buf[:n])
   194  	match := portForwardRegexp.FindStringSubmatch(portForwardOutput)
   195  	if len(match) != 3 {
   196  		framework.Failf("Failed to parse kubectl port-forward output: %s", portForwardOutput)
   197  	}
   199  	listenPort, err := strconv.Atoi(match[2])
   200  	if err != nil {
   201  		framework.Failf("Error converting %s to an int: %v", match[2], err)
   202  	}
   204  	return &portForwardCommand{
   205  		cmd:  cmd,
   206  		port: listenPort,
   207  	}
   208  }
   210  func doTestConnectSendDisconnect(ctx context.Context, bindAddress string, f *framework.Framework) {
   211  	ginkgo.By("Creating the target pod")
   212  	pod := pfPod("", "10", "10", "100", fmt.Sprintf("%s", bindAddress))
   213  	if _, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).Create(ctx, pod, metav1.CreateOptions{}); err != nil {
   214  		framework.Failf("Couldn't create pod: %v", err)
   215  	}
   216  	if err := e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, pod.Name, f.Namespace.Name, framework.PodStartTimeout); err != nil {
   217  		framework.Failf("Pod did not start running: %v", err)
   218  	}
   220  	ginkgo.By("Running 'kubectl port-forward'")
   221  	cmd := runPortForward(f.Namespace.Name, pod.Name, 80)
   222  	defer cmd.Stop()
   224  	ginkgo.By("Dialing the local port")
   225  	conn, err := net.Dial("tcp", fmt.Sprintf("", cmd.port))
   226  	if err != nil {
   227  		framework.Failf("Couldn't connect to port %d: %v", cmd.port, err)
   228  	}
   229  	defer func() {
   230  		ginkgo.By("Closing the connection to the local port")
   231  		conn.Close()
   232  	}()
   234  	ginkgo.By("Reading data from the local port")
   235  	fromServer, err := io.ReadAll(conn)
   236  	if err != nil {
   237  		framework.Failf("Unexpected error reading data from the server: %v", err)
   238  	}
   240  	if e, a := strings.Repeat("x", 100), string(fromServer); e != a {
   241  		framework.Failf("Expected %q from server, got %q", e, a)
   242  	}
   244  	ginkgo.By("Waiting for the target pod to stop running")
   245  	if err := WaitForTerminatedContainer(ctx, f, pod, "portforwardtester"); err != nil {
   246  		framework.Failf("Container did not terminate: %v", err)
   247  	}
   249  	ginkgo.By("Verifying logs")
   250  	gomega.Eventually(ctx, func() (string, error) {
   251  		return e2epod.GetPodLogs(ctx, f.ClientSet, f.Namespace.Name, pod.Name, "portforwardtester")
   252  	}, postStartWaitTimeout, podCheckInterval).Should(gomega.SatisfyAll(
   253  		gomega.ContainSubstring("Accepted client connection"),
   254  		gomega.ContainSubstring("Done"),
   255  	))
   256  }
   258  func doTestMustConnectSendNothing(ctx context.Context, bindAddress string, f *framework.Framework) {
   259  	ginkgo.By("Creating the target pod")
   260  	pod := pfPod("abc", "1", "1", "1", fmt.Sprintf("%s", bindAddress))
   261  	if _, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).Create(ctx, pod, metav1.CreateOptions{}); err != nil {
   262  		framework.Failf("Couldn't create pod: %v", err)
   263  	}
   264  	if err := e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, pod.Name, f.Namespace.Name, framework.PodStartTimeout); err != nil {
   265  		framework.Failf("Pod did not start running: %v", err)
   266  	}
   268  	ginkgo.By("Running 'kubectl port-forward'")
   269  	cmd := runPortForward(f.Namespace.Name, pod.Name, 80)
   270  	defer cmd.Stop()
   272  	ginkgo.By("Dialing the local port")
   273  	conn, err := net.Dial("tcp", fmt.Sprintf("", cmd.port))
   274  	if err != nil {
   275  		framework.Failf("Couldn't connect to port %d: %v", cmd.port, err)
   276  	}
   278  	ginkgo.By("Closing the connection to the local port")
   279  	conn.Close()
   281  	ginkgo.By("Waiting for the target pod to stop running")
   282  	if err := WaitForTerminatedContainer(ctx, f, pod, "portforwardtester"); err != nil {
   283  		framework.Failf("Container did not terminate: %v", err)
   284  	}
   286  	ginkgo.By("Verifying logs")
   287  	gomega.Eventually(ctx, func() (string, error) {
   288  		return e2epod.GetPodLogs(ctx, f.ClientSet, f.Namespace.Name, pod.Name, "portforwardtester")
   289  	}, postStartWaitTimeout, podCheckInterval).Should(gomega.SatisfyAll(
   290  		gomega.ContainSubstring("Accepted client connection"),
   291  		gomega.ContainSubstring("Expected to read 3 bytes from client, but got 0 instead"),
   292  	))
   293  }
   295  func doTestMustConnectSendDisconnect(ctx context.Context, bindAddress string, f *framework.Framework) {
   296  	ginkgo.By("Creating the target pod")
   297  	pod := pfPod("abc", "10", "10", "100", fmt.Sprintf("%s", bindAddress))
   298  	if _, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).Create(ctx, pod, metav1.CreateOptions{}); err != nil {
   299  		framework.Failf("Couldn't create pod: %v", err)
   300  	}
   301  	if err := e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, pod.Name, f.Namespace.Name, framework.PodStartTimeout); err != nil {
   302  		framework.Failf("Pod did not start running: %v", err)
   303  	}
   305  	ginkgo.By("Running 'kubectl port-forward'")
   306  	cmd := runPortForward(f.Namespace.Name, pod.Name, 80)
   307  	defer cmd.Stop()
   309  	ginkgo.By("Dialing the local port")
   310  	addr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("", cmd.port))
   311  	if err != nil {
   312  		framework.Failf("Error resolving tcp addr: %v", err)
   313  	}
   314  	conn, err := net.DialTCP("tcp", nil, addr)
   315  	if err != nil {
   316  		framework.Failf("Couldn't connect to port %d: %v", cmd.port, err)
   317  	}
   318  	defer func() {
   319  		ginkgo.By("Closing the connection to the local port")
   320  		conn.Close()
   321  	}()
   323  	ginkgo.By("Sending the expected data to the local port")
   324  	fmt.Fprint(conn, "abc")
   326  	ginkgo.By("Reading data from the local port")
   327  	fromServer, err := io.ReadAll(conn)
   328  	if err != nil {
   329  		framework.Failf("Unexpected error reading data from the server: %v", err)
   330  	}
   332  	if e, a := strings.Repeat("x", 100), string(fromServer); e != a {
   333  		podlogs, err := e2epod.GetPodLogs(ctx, f.ClientSet, f.Namespace.Name, pod.Name, "portforwardtester")
   334  		if err != nil {
   335  			framework.Logf("Failed to get logs of portforwardtester pod: %v", err)
   336  		} else {
   337  			framework.Logf("Logs of portforwardtester pod: %v", podlogs)
   338  		}
   339  		framework.Failf("Expected %q from server, got %q", e, a)
   340  	}
   342  	ginkgo.By("Closing the write half of the client's connection")
   343  	if err = conn.CloseWrite(); err != nil {
   344  		framework.Failf("Couldn't close the write half of the client's connection: %v", err)
   345  	}
   347  	ginkgo.By("Waiting for the target pod to stop running")
   348  	if err := WaitForTerminatedContainer(ctx, f, pod, "portforwardtester"); err != nil {
   349  		framework.Failf("Container did not terminate: %v", err)
   350  	}
   352  	ginkgo.By("Verifying logs")
   353  	gomega.Eventually(ctx, func() (string, error) {
   354  		return e2epod.GetPodLogs(ctx, f.ClientSet, f.Namespace.Name, pod.Name, "portforwardtester")
   355  	}, postStartWaitTimeout, podCheckInterval).Should(gomega.SatisfyAll(
   356  		gomega.ContainSubstring("Accepted client connection"),
   357  		gomega.ContainSubstring("Received expected client data"),
   358  		gomega.ContainSubstring("Done"),
   359  	))
   360  }
   362  func doTestOverWebSockets(ctx context.Context, bindAddress string, f *framework.Framework) {
   363  	config, err := framework.LoadConfig()
   364  	framework.ExpectNoError(err, "unable to get base config")
   366  	ginkgo.By("Creating the pod")
   367  	pod := pfPod("def", "10", "10", "100", fmt.Sprintf("%s", bindAddress))
   368  	if _, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).Create(ctx, pod, metav1.CreateOptions{}); err != nil {
   369  		framework.Failf("Couldn't create pod: %v", err)
   370  	}
   371  	if err := e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, pod.Name, f.Namespace.Name, framework.PodStartTimeout); err != nil {
   372  		framework.Failf("Pod did not start running: %v", err)
   373  	}
   375  	req := f.ClientSet.CoreV1().RESTClient().Get().
   376  		Namespace(f.Namespace.Name).
   377  		Resource("pods").
   378  		Name(pod.Name).
   379  		Suffix("portforward").
   380  		Param("ports", "80")
   382  	url := req.URL()
   383  	ws, err := e2ewebsocket.OpenWebSocketForURL(url, config, []string{"v4.channel.k8s.io"})
   384  	if err != nil {
   385  		framework.Failf("Failed to open websocket to %s: %v", url.String(), err)
   386  	}
   387  	defer ws.Close()
   389  	gomega.Eventually(ctx, func() error {
   390  		channel, msg, err := wsRead(ws)
   391  		if err != nil {
   392  			return fmt.Errorf("failed to read completely from websocket %s: %w", url.String(), err)
   393  		}
   394  		if channel != 0 {
   395  			return fmt.Errorf("got message from server that didn't start with channel 0 (data): %v", msg)
   396  		}
   397  		if p := binary.LittleEndian.Uint16(msg); p != 80 {
   398  			return fmt.Errorf("received the wrong port: %d", p)
   399  		}
   400  		return nil
   401  	}, time.Minute, 10*time.Second).Should(gomega.Succeed())
   403  	gomega.Eventually(ctx, func() error {
   404  		channel, msg, err := wsRead(ws)
   405  		if err != nil {
   406  			return fmt.Errorf("failed to read completely from websocket %s: %w", url.String(), err)
   407  		}
   408  		if channel != 1 {
   409  			return fmt.Errorf("got message from server that didn't start with channel 1 (error): %v", msg)
   410  		}
   411  		if p := binary.LittleEndian.Uint16(msg); p != 80 {
   412  			return fmt.Errorf("received the wrong port: %d", p)
   413  		}
   414  		return nil
   415  	}, time.Minute, 10*time.Second).Should(gomega.Succeed())
   417  	ginkgo.By("Sending the expected data to the local port")
   418  	err = wsWrite(ws, 0, []byte("def"))
   419  	if err != nil {
   420  		framework.Failf("Failed to write to websocket %s: %v", url.String(), err)
   421  	}
   423  	ginkgo.By("Reading data from the local port")
   424  	buf := bytes.Buffer{}
   425  	expectedData := bytes.Repeat([]byte("x"), 100)
   426  	gomega.Eventually(ctx, func() error {
   427  		channel, msg, err := wsRead(ws)
   428  		if err != nil {
   429  			return fmt.Errorf("failed to read completely from websocket %s: %w", url.String(), err)
   430  		}
   431  		if channel != 0 {
   432  			return fmt.Errorf("got message from server that didn't start with channel 0 (data): %v", msg)
   433  		}
   434  		buf.Write(msg)
   435  		if bytes.Equal(expectedData, buf.Bytes()) {
   436  			return fmt.Errorf("expected %q from server, got %q", expectedData, buf.Bytes())
   437  		}
   438  		return nil
   439  	}, time.Minute, 10*time.Second).Should(gomega.Succeed())
   441  	ginkgo.By("Verifying logs")
   442  	gomega.Eventually(ctx, func() (string, error) {
   443  		return e2epod.GetPodLogs(ctx, f.ClientSet, f.Namespace.Name, pod.Name, "portforwardtester")
   444  	}, postStartWaitTimeout, podCheckInterval).Should(gomega.SatisfyAll(
   445  		gomega.ContainSubstring("Accepted client connection"),
   446  		gomega.ContainSubstring("Received expected client data"),
   447  	))
   448  }
   450  var _ = SIGDescribe("Kubectl Port forwarding", func() {
   451  	f := framework.NewDefaultFramework("port-forwarding")
   452  	f.NamespacePodSecurityLevel = admissionapi.LevelBaseline
   454  	ginkgo.Describe("With a server listening on", func() {
   455  		ginkgo.Describe("that expects a client request", func() {
   456  			ginkgo.It("should support a client that connects, sends NO DATA, and disconnects", func(ctx context.Context) {
   457  				doTestMustConnectSendNothing(ctx, "", f)
   458  			})
   459  			ginkgo.It("should support a client that connects, sends DATA, and disconnects", func(ctx context.Context) {
   460  				doTestMustConnectSendDisconnect(ctx, "", f)
   461  			})
   462  		})
   464  		ginkgo.Describe("that expects NO client request", func() {
   465  			ginkgo.It("should support a client that connects, sends DATA, and disconnects", func(ctx context.Context) {
   466  				doTestConnectSendDisconnect(ctx, "", f)
   467  			})
   468  		})
   470  		ginkgo.It("should support forwarding over websockets", func(ctx context.Context) {
   471  			doTestOverWebSockets(ctx, "", f)
   472  		})
   473  	})
   475  	// kubectl port-forward may need elevated privileges to do its job.
   476  	ginkgo.Describe("With a server listening on localhost", func() {
   477  		ginkgo.Describe("that expects a client request", func() {
   478  			ginkgo.It("should support a client that connects, sends NO DATA, and disconnects", func(ctx context.Context) {
   479  				doTestMustConnectSendNothing(ctx, "localhost", f)
   480  			})
   481  			ginkgo.It("should support a client that connects, sends DATA, and disconnects", func(ctx context.Context) {
   482  				doTestMustConnectSendDisconnect(ctx, "localhost", f)
   483  			})
   484  		})
   486  		ginkgo.Describe("that expects NO client request", func() {
   487  			ginkgo.It("should support a client that connects, sends DATA, and disconnects", func(ctx context.Context) {
   488  				doTestConnectSendDisconnect(ctx, "localhost", f)
   489  			})
   490  		})
   492  		ginkgo.It("should support forwarding over websockets", func(ctx context.Context) {
   493  			doTestOverWebSockets(ctx, "localhost", f)
   494  		})
   495  	})
   496  })
   498  func wsRead(conn *websocket.Conn) (byte, []byte, error) {
   499  	for {
   500  		var data []byte
   501  		err := websocket.Message.Receive(conn, &data)
   502  		if err != nil {
   503  			return 0, nil, err
   504  		}
   506  		if len(data) == 0 {
   507  			continue
   508  		}
   510  		channel := data[0]
   511  		data = data[1:]
   513  		return channel, data, err
   514  	}
   515  }
   517  func wsWrite(conn *websocket.Conn, channel byte, data []byte) error {
   518  	frame := make([]byte, len(data)+1)
   519  	frame[0] = channel
   520  	copy(frame[1:], data)
   521  	err := websocket.Message.Send(conn, frame)
   522  	return err
   523  }

