1
16
17 package portforward
18
19 import (
20 "bufio"
21 "bytes"
22 "context"
23 "fmt"
24 "io"
25 "net"
26 "net/http"
27 "net/http/httptest"
28 "net/url"
29 "strconv"
30 "strings"
31 "sync"
32 "testing"
33 "time"
34
35 "github.com/stretchr/testify/require"
36
37 corev1 "k8s.io/api/core/v1"
38 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
39 "k8s.io/apimachinery/pkg/types"
40 "k8s.io/apimachinery/pkg/util/remotecommand"
41 "k8s.io/apimachinery/pkg/util/wait"
42 utilfeature "k8s.io/apiserver/pkg/util/feature"
43 "k8s.io/cli-runtime/pkg/genericiooptions"
44 "k8s.io/client-go/kubernetes"
45 featuregatetesting "k8s.io/component-base/featuregate/testing"
46 "k8s.io/kubectl/pkg/cmd/portforward"
47 kubeletportforward "k8s.io/kubelet/pkg/cri/streaming/portforward"
48 kastesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
49 kubefeatures "k8s.io/kubernetes/pkg/features"
50
51 "k8s.io/kubernetes/test/integration/framework"
52 )
53
54 const remotePort = "8765"
55
56 func TestPortforward(t *testing.T) {
57 defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, kubefeatures.PortForwardWebsockets, true)()
58 t.Setenv("KUBECTL_PORT_FORWARD_WEBSOCKETS", "true")
59
60 var podName string
61 var podUID types.UID
62 backendServer := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
63 t.Logf("backend saw request: %v", req.URL.String())
64 kubeletportforward.ServePortForward(
65 w,
66 req,
67 &dummyPortForwarder{t: t},
68 podName,
69 podUID,
70 &kubeletportforward.V4Options{},
71 wait.ForeverTestTimeout,
72 remotecommand.DefaultStreamCreationTimeout,
73 []string{kubeletportforward.ProtocolV1Name},
74 )
75 }))
76 defer backendServer.Close()
77 backendURL, _ := url.Parse(backendServer.URL)
78 backendHost := backendURL.Hostname()
79 backendPort, _ := strconv.Atoi(backendURL.Port())
80
81 etcd := framework.SharedEtcd()
82 server := kastesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, etcd)
83 defer server.TearDownFn()
84
85 adminClient, err := kubernetes.NewForConfig(server.ClientConfig)
86 require.NoError(t, err)
87
88 node := &corev1.Node{
89 ObjectMeta: metav1.ObjectMeta{Name: "mynode"},
90 Status: corev1.NodeStatus{
91 DaemonEndpoints: corev1.NodeDaemonEndpoints{KubeletEndpoint: corev1.DaemonEndpoint{Port: int32(backendPort)}},
92 Addresses: []corev1.NodeAddress{{Type: corev1.NodeInternalIP, Address: backendHost}},
93 },
94 }
95 if _, err := adminClient.CoreV1().Nodes().Create(context.Background(), node, metav1.CreateOptions{}); err != nil {
96 t.Fatal(err)
97 }
98
99 pod := &corev1.Pod{
100 ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "mypod"},
101 Spec: corev1.PodSpec{
102 NodeName: "mynode",
103 Containers: []corev1.Container{{Name: "test", Image: "test"}},
104 },
105 }
106 if _, err := adminClient.CoreV1().Pods("default").Create(context.Background(), pod, metav1.CreateOptions{}); err != nil {
107 t.Fatal(err)
108 }
109 if _, err := adminClient.CoreV1().Pods("default").Patch(context.Background(), "mypod", types.MergePatchType, []byte(`{"status":{"phase":"Running"}}`), metav1.PatchOptions{}, "status"); err != nil {
110 t.Fatal(err)
111 }
112
113
114
115 localRemotePort := fmt.Sprintf(":%s", remotePort)
116 streams, _, out, errOut := genericiooptions.NewTestIOStreams()
117 portForwardOptions := portforward.NewDefaultPortForwardOptions(streams)
118 portForwardOptions.Namespace = "default"
119 portForwardOptions.PodName = "mypod"
120 portForwardOptions.RESTClient = adminClient.CoreV1().RESTClient()
121 portForwardOptions.Config = server.ClientConfig
122 portForwardOptions.PodClient = adminClient.CoreV1()
123 portForwardOptions.Address = []string{"127.0.0.1"}
124 portForwardOptions.Ports = []string{localRemotePort}
125 portForwardOptions.StopChannel = make(chan struct{}, 1)
126 portForwardOptions.ReadyChannel = make(chan struct{})
127
128 if err := portForwardOptions.Validate(); err != nil {
129 t.Fatal(err)
130 }
131
132 ctx, cancel := context.WithCancel(context.Background())
133 defer cancel()
134
135 wg := sync.WaitGroup{}
136 wg.Add(1)
137 go func() {
138 defer wg.Done()
139 if err := portForwardOptions.RunPortForwardContext(ctx); err != nil {
140 t.Error(err)
141 }
142 }()
143
144 t.Log("waiting for port forward to be ready")
145 select {
146 case <-portForwardOptions.ReadyChannel:
147 t.Log("port forward was ready")
148 case <-time.After(wait.ForeverTestTimeout):
149 t.Error("port forward was never ready")
150 }
151
152
153 localPort, err := parsePort(out.String())
154 require.NoError(t, err)
155 t.Logf("Local Port: %s", localPort)
156
157 timeoutContext, cleanupTimeoutContext := context.WithDeadline(context.Background(), time.Now().Add(5*time.Second))
158 defer cleanupTimeoutContext()
159 testReq, _ := http.NewRequest("GET", fmt.Sprintf("http://127.0.0.1:%s/test", localPort), nil)
160 testReq = testReq.WithContext(timeoutContext)
161 testResp, err := http.DefaultClient.Do(testReq)
162 if err != nil {
163 t.Error(err)
164 } else {
165 t.Log(testResp.StatusCode)
166 data, err := io.ReadAll(testResp.Body)
167 if err != nil {
168 t.Error(err)
169 } else {
170 t.Log("client saw response:", string(data))
171 }
172 if string(data) != fmt.Sprintf("request to %s was ok", remotePort) {
173 t.Errorf("unexpected data")
174 }
175 if testResp.StatusCode != 200 {
176 t.Error("expected success")
177 }
178 }
179
180 cancel()
181
182 wg.Wait()
183 t.Logf("stdout: %s", out.String())
184 t.Logf("stderr: %s", errOut.String())
185 }
186
187
188
189
190
191 func parsePort(forwardAddr string) (string, error) {
192 parts := strings.Split(forwardAddr, " ")
193 if len(parts) != 5 {
194 return "", fmt.Errorf("unable to parse local port from stdout: %s", forwardAddr)
195 }
196
197 _, localPort, err := net.SplitHostPort(parts[2])
198 if err != nil {
199 return "", fmt.Errorf("unable to parse local port: %w", err)
200 }
201 return localPort, nil
202 }
203
204 type dummyPortForwarder struct {
205 t *testing.T
206 }
207
208 func (d *dummyPortForwarder) PortForward(ctx context.Context, name string, uid types.UID, port int32, stream io.ReadWriteCloser) error {
209 d.t.Logf("handling port forward request for %d", port)
210
211 req, err := http.ReadRequest(bufio.NewReader(stream))
212 if err != nil {
213 d.t.Logf("error reading request: %v", err)
214 return err
215 }
216 d.t.Log(req.URL.String())
217 defer req.Body.Close()
218
219 resp := &http.Response{
220 StatusCode: 200,
221 Proto: "HTTP/1.1",
222 ProtoMajor: 1,
223 ProtoMinor: 1,
224 Body: io.NopCloser(bytes.NewBufferString(fmt.Sprintf("request to %d was ok", port))),
225 }
226 resp.Write(stream)
227 return stream.Close()
228 }
229
View as plain text