...

Source file src/k8s.io/kubernetes/test/integration/apiserver/portforward/portforward_test.go

Documentation: k8s.io/kubernetes/test/integration/apiserver/portforward

     1  /*
     2  Copyright 2024 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package portforward
    18  
    19  import (
    20  	"bufio"
    21  	"bytes"
    22  	"context"
    23  	"fmt"
    24  	"io"
    25  	"net"
    26  	"net/http"
    27  	"net/http/httptest"
    28  	"net/url"
    29  	"strconv"
    30  	"strings"
    31  	"sync"
    32  	"testing"
    33  	"time"
    34  
    35  	"github.com/stretchr/testify/require"
    36  
    37  	corev1 "k8s.io/api/core/v1"
    38  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    39  	"k8s.io/apimachinery/pkg/types"
    40  	"k8s.io/apimachinery/pkg/util/remotecommand"
    41  	"k8s.io/apimachinery/pkg/util/wait"
    42  	utilfeature "k8s.io/apiserver/pkg/util/feature"
    43  	"k8s.io/cli-runtime/pkg/genericiooptions"
    44  	"k8s.io/client-go/kubernetes"
    45  	featuregatetesting "k8s.io/component-base/featuregate/testing"
    46  	"k8s.io/kubectl/pkg/cmd/portforward"
    47  	kubeletportforward "k8s.io/kubelet/pkg/cri/streaming/portforward"
    48  	kastesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
    49  	kubefeatures "k8s.io/kubernetes/pkg/features"
    50  
    51  	"k8s.io/kubernetes/test/integration/framework"
    52  )
    53  
    54  const remotePort = "8765"
    55  
    56  func TestPortforward(t *testing.T) {
    57  	defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, kubefeatures.PortForwardWebsockets, true)()
    58  	t.Setenv("KUBECTL_PORT_FORWARD_WEBSOCKETS", "true")
    59  
    60  	var podName string
    61  	var podUID types.UID
    62  	backendServer := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
    63  		t.Logf("backend saw request: %v", req.URL.String())
    64  		kubeletportforward.ServePortForward(
    65  			w,
    66  			req,
    67  			&dummyPortForwarder{t: t},
    68  			podName,
    69  			podUID,
    70  			&kubeletportforward.V4Options{},
    71  			wait.ForeverTestTimeout, // idle timeout
    72  			remotecommand.DefaultStreamCreationTimeout, // stream creation timeout
    73  			[]string{kubeletportforward.ProtocolV1Name},
    74  		)
    75  	}))
    76  	defer backendServer.Close()
    77  	backendURL, _ := url.Parse(backendServer.URL)
    78  	backendHost := backendURL.Hostname()
    79  	backendPort, _ := strconv.Atoi(backendURL.Port())
    80  
    81  	etcd := framework.SharedEtcd()
    82  	server := kastesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, etcd)
    83  	defer server.TearDownFn()
    84  
    85  	adminClient, err := kubernetes.NewForConfig(server.ClientConfig)
    86  	require.NoError(t, err)
    87  
    88  	node := &corev1.Node{
    89  		ObjectMeta: metav1.ObjectMeta{Name: "mynode"},
    90  		Status: corev1.NodeStatus{
    91  			DaemonEndpoints: corev1.NodeDaemonEndpoints{KubeletEndpoint: corev1.DaemonEndpoint{Port: int32(backendPort)}},
    92  			Addresses:       []corev1.NodeAddress{{Type: corev1.NodeInternalIP, Address: backendHost}},
    93  		},
    94  	}
    95  	if _, err := adminClient.CoreV1().Nodes().Create(context.Background(), node, metav1.CreateOptions{}); err != nil {
    96  		t.Fatal(err)
    97  	}
    98  
    99  	pod := &corev1.Pod{
   100  		ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "mypod"},
   101  		Spec: corev1.PodSpec{
   102  			NodeName:   "mynode",
   103  			Containers: []corev1.Container{{Name: "test", Image: "test"}},
   104  		},
   105  	}
   106  	if _, err := adminClient.CoreV1().Pods("default").Create(context.Background(), pod, metav1.CreateOptions{}); err != nil {
   107  		t.Fatal(err)
   108  	}
   109  	if _, err := adminClient.CoreV1().Pods("default").Patch(context.Background(), "mypod", types.MergePatchType, []byte(`{"status":{"phase":"Running"}}`), metav1.PatchOptions{}, "status"); err != nil {
   110  		t.Fatal(err)
   111  	}
   112  
   113  	// local port missing asks os to find random open port.
   114  	// Example: ":8000" (local = random, remote = 8000)
   115  	localRemotePort := fmt.Sprintf(":%s", remotePort)
   116  	streams, _, out, errOut := genericiooptions.NewTestIOStreams()
   117  	portForwardOptions := portforward.NewDefaultPortForwardOptions(streams)
   118  	portForwardOptions.Namespace = "default"
   119  	portForwardOptions.PodName = "mypod"
   120  	portForwardOptions.RESTClient = adminClient.CoreV1().RESTClient()
   121  	portForwardOptions.Config = server.ClientConfig
   122  	portForwardOptions.PodClient = adminClient.CoreV1()
   123  	portForwardOptions.Address = []string{"127.0.0.1"}
   124  	portForwardOptions.Ports = []string{localRemotePort}
   125  	portForwardOptions.StopChannel = make(chan struct{}, 1)
   126  	portForwardOptions.ReadyChannel = make(chan struct{})
   127  
   128  	if err := portForwardOptions.Validate(); err != nil {
   129  		t.Fatal(err)
   130  	}
   131  
   132  	ctx, cancel := context.WithCancel(context.Background())
   133  	defer cancel()
   134  
   135  	wg := sync.WaitGroup{}
   136  	wg.Add(1)
   137  	go func() {
   138  		defer wg.Done()
   139  		if err := portForwardOptions.RunPortForwardContext(ctx); err != nil {
   140  			t.Error(err)
   141  		}
   142  	}()
   143  
   144  	t.Log("waiting for port forward to be ready")
   145  	select {
   146  	case <-portForwardOptions.ReadyChannel:
   147  		t.Log("port forward was ready")
   148  	case <-time.After(wait.ForeverTestTimeout):
   149  		t.Error("port forward was never ready")
   150  	}
   151  
   152  	// Parse out the randomly selected local port from "out" stream.
   153  	localPort, err := parsePort(out.String())
   154  	require.NoError(t, err)
   155  	t.Logf("Local Port: %s", localPort)
   156  
   157  	timeoutContext, cleanupTimeoutContext := context.WithDeadline(context.Background(), time.Now().Add(5*time.Second))
   158  	defer cleanupTimeoutContext()
   159  	testReq, _ := http.NewRequest("GET", fmt.Sprintf("http://127.0.0.1:%s/test", localPort), nil)
   160  	testReq = testReq.WithContext(timeoutContext)
   161  	testResp, err := http.DefaultClient.Do(testReq)
   162  	if err != nil {
   163  		t.Error(err)
   164  	} else {
   165  		t.Log(testResp.StatusCode)
   166  		data, err := io.ReadAll(testResp.Body)
   167  		if err != nil {
   168  			t.Error(err)
   169  		} else {
   170  			t.Log("client saw response:", string(data))
   171  		}
   172  		if string(data) != fmt.Sprintf("request to %s was ok", remotePort) {
   173  			t.Errorf("unexpected data")
   174  		}
   175  		if testResp.StatusCode != 200 {
   176  			t.Error("expected success")
   177  		}
   178  	}
   179  
   180  	cancel()
   181  
   182  	wg.Wait()
   183  	t.Logf("stdout: %s", out.String())
   184  	t.Logf("stderr: %s", errOut.String())
   185  }
   186  
   187  // parsePort parses out the local port from the port-forward output string.
   188  // This should work for both IP4 and IP6 addresses.
   189  //
   190  //	Example: "Forwarding from 127.0.0.1:8000 -> 4000", returns "8000".
   191  func parsePort(forwardAddr string) (string, error) {
   192  	parts := strings.Split(forwardAddr, " ")
   193  	if len(parts) != 5 {
   194  		return "", fmt.Errorf("unable to parse local port from stdout: %s", forwardAddr)
   195  	}
   196  	// parts[2] = "127.0.0.1:<LOCAL_PORT>"
   197  	_, localPort, err := net.SplitHostPort(parts[2])
   198  	if err != nil {
   199  		return "", fmt.Errorf("unable to parse local port: %w", err)
   200  	}
   201  	return localPort, nil
   202  }
   203  
   204  type dummyPortForwarder struct {
   205  	t *testing.T
   206  }
   207  
   208  func (d *dummyPortForwarder) PortForward(ctx context.Context, name string, uid types.UID, port int32, stream io.ReadWriteCloser) error {
   209  	d.t.Logf("handling port forward request for %d", port)
   210  
   211  	req, err := http.ReadRequest(bufio.NewReader(stream))
   212  	if err != nil {
   213  		d.t.Logf("error reading request: %v", err)
   214  		return err
   215  	}
   216  	d.t.Log(req.URL.String())
   217  	defer req.Body.Close() //nolint:errcheck
   218  
   219  	resp := &http.Response{
   220  		StatusCode: 200,
   221  		Proto:      "HTTP/1.1",
   222  		ProtoMajor: 1,
   223  		ProtoMinor: 1,
   224  		Body:       io.NopCloser(bytes.NewBufferString(fmt.Sprintf("request to %d was ok", port))),
   225  	}
   226  	resp.Write(stream) //nolint:errcheck
   227  	return stream.Close()
   228  }
   229  

View as plain text