1
16
17
18
19 package kubectl
20
21 import (
22 "bytes"
23 "context"
24 "encoding/binary"
25 "fmt"
26 "io"
27 "net"
28 "os/exec"
29 "regexp"
30 "strconv"
31 "strings"
32 "syscall"
33 "time"
34
35 "golang.org/x/net/websocket"
36 v1 "k8s.io/api/core/v1"
37 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
38 "k8s.io/apimachinery/pkg/util/wait"
39 "k8s.io/kubernetes/test/e2e/framework"
40 e2ekubectl "k8s.io/kubernetes/test/e2e/framework/kubectl"
41 e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
42 e2ewebsocket "k8s.io/kubernetes/test/e2e/framework/websocket"
43 testutils "k8s.io/kubernetes/test/utils"
44 imageutils "k8s.io/kubernetes/test/utils/image"
45 admissionapi "k8s.io/pod-security-admission/api"
46
47 "github.com/onsi/ginkgo/v2"
48 "github.com/onsi/gomega"
49 )
50
51 const (
52 podName = "pfpod"
53 )
54
55 const (
56 podCheckInterval = 1 * time.Second
57 postStartWaitTimeout = 2 * time.Minute
58 )
59
60
61 var (
62 portForwardRegexp = regexp.MustCompile("Forwarding from (127.0.0.1|\\[::1\\]):([0-9]+) -> 80")
63 )
64
65 func pfPod(expectedClientData, chunks, chunkSize, chunkIntervalMillis string, bindAddress string) *v1.Pod {
66 return &v1.Pod{
67 ObjectMeta: metav1.ObjectMeta{
68 Name: podName,
69 Labels: map[string]string{"name": podName},
70 },
71 Spec: v1.PodSpec{
72 Containers: []v1.Container{
73 {
74 Name: "readiness",
75 Image: imageutils.GetE2EImage(imageutils.Agnhost),
76 Args: []string{"netexec"},
77 ReadinessProbe: &v1.Probe{
78 ProbeHandler: v1.ProbeHandler{
79 Exec: &v1.ExecAction{
80 Command: []string{
81 "sh", "-c", "netstat -na | grep LISTEN | grep -v 8080 | grep 80",
82 }},
83 },
84 InitialDelaySeconds: 5,
85 TimeoutSeconds: 60,
86 PeriodSeconds: 1,
87 },
88 },
89 {
90 Name: "portforwardtester",
91 Image: imageutils.GetE2EImage(imageutils.Agnhost),
92 Args: []string{"port-forward-tester"},
93 Env: []v1.EnvVar{
94 {
95 Name: "BIND_PORT",
96 Value: "80",
97 },
98 {
99 Name: "EXPECTED_CLIENT_DATA",
100 Value: expectedClientData,
101 },
102 {
103 Name: "CHUNKS",
104 Value: chunks,
105 },
106 {
107 Name: "CHUNK_SIZE",
108 Value: chunkSize,
109 },
110 {
111 Name: "CHUNK_INTERVAL",
112 Value: chunkIntervalMillis,
113 },
114 {
115 Name: "BIND_ADDRESS",
116 Value: bindAddress,
117 },
118 },
119 },
120 },
121 RestartPolicy: v1.RestartPolicyNever,
122 },
123 }
124 }
125
126
127 func WaitForTerminatedContainer(ctx context.Context, f *framework.Framework, pod *v1.Pod, containerName string) error {
128 return e2epod.WaitForPodCondition(ctx, f.ClientSet, f.Namespace.Name, pod.Name, "container terminated", framework.PodStartTimeout, func(pod *v1.Pod) (bool, error) {
129 if len(testutils.TerminatedContainers(pod)[containerName]) > 0 {
130 return true, nil
131 }
132 return false, nil
133 })
134 }
135
136 type portForwardCommand struct {
137 cmd *exec.Cmd
138 port int
139 }
140
141
142
143 func (c *portForwardCommand) Stop() {
144
145 if err := c.cmd.Process.Signal(syscall.SIGINT); err != nil {
146 framework.Logf("error sending SIGINT to kubectl port-forward: %v", err)
147 }
148
149
150 done := make(chan error)
151 go func() {
152 done <- c.cmd.Wait()
153 }()
154
155 expired := time.NewTimer(wait.ForeverTestTimeout)
156 defer expired.Stop()
157
158 select {
159 case err := <-done:
160 if err == nil {
161
162 return
163 }
164 framework.Logf("error waiting for kubectl port-forward to exit: %v", err)
165 case <-expired.C:
166 framework.Logf("timed out waiting for kubectl port-forward to exit")
167 }
168
169 framework.Logf("trying to forcibly kill kubectl port-forward")
170 framework.TryKill(c.cmd)
171 }
172
173
174 func runPortForward(ns, podName string, port int) *portForwardCommand {
175 tk := e2ekubectl.NewTestKubeconfig(framework.TestContext.CertDir, framework.TestContext.Host, framework.TestContext.KubeConfig, framework.TestContext.KubeContext, framework.TestContext.KubectlPath, ns)
176 cmd := tk.KubectlCmd("port-forward", fmt.Sprintf("--namespace=%v", ns), podName, fmt.Sprintf(":%d", port))
177
178
179
180 framework.Logf("starting port-forward command and streaming output")
181 portOutput, _, err := framework.StartCmdAndStreamOutput(cmd)
182 if err != nil {
183 framework.Failf("Failed to start port-forward command: %v", err)
184 }
185
186 buf := make([]byte, 128)
187
188 var n int
189 framework.Logf("reading from `kubectl port-forward` command's stdout")
190 if n, err = portOutput.Read(buf); err != nil {
191 framework.Failf("Failed to read from kubectl port-forward stdout: %v", err)
192 }
193 portForwardOutput := string(buf[:n])
194 match := portForwardRegexp.FindStringSubmatch(portForwardOutput)
195 if len(match) != 3 {
196 framework.Failf("Failed to parse kubectl port-forward output: %s", portForwardOutput)
197 }
198
199 listenPort, err := strconv.Atoi(match[2])
200 if err != nil {
201 framework.Failf("Error converting %s to an int: %v", match[2], err)
202 }
203
204 return &portForwardCommand{
205 cmd: cmd,
206 port: listenPort,
207 }
208 }
209
210 func doTestConnectSendDisconnect(ctx context.Context, bindAddress string, f *framework.Framework) {
211 ginkgo.By("Creating the target pod")
212 pod := pfPod("", "10", "10", "100", fmt.Sprintf("%s", bindAddress))
213 if _, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).Create(ctx, pod, metav1.CreateOptions{}); err != nil {
214 framework.Failf("Couldn't create pod: %v", err)
215 }
216 if err := e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, pod.Name, f.Namespace.Name, framework.PodStartTimeout); err != nil {
217 framework.Failf("Pod did not start running: %v", err)
218 }
219
220 ginkgo.By("Running 'kubectl port-forward'")
221 cmd := runPortForward(f.Namespace.Name, pod.Name, 80)
222 defer cmd.Stop()
223
224 ginkgo.By("Dialing the local port")
225 conn, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", cmd.port))
226 if err != nil {
227 framework.Failf("Couldn't connect to port %d: %v", cmd.port, err)
228 }
229 defer func() {
230 ginkgo.By("Closing the connection to the local port")
231 conn.Close()
232 }()
233
234 ginkgo.By("Reading data from the local port")
235 fromServer, err := io.ReadAll(conn)
236 if err != nil {
237 framework.Failf("Unexpected error reading data from the server: %v", err)
238 }
239
240 if e, a := strings.Repeat("x", 100), string(fromServer); e != a {
241 framework.Failf("Expected %q from server, got %q", e, a)
242 }
243
244 ginkgo.By("Waiting for the target pod to stop running")
245 if err := WaitForTerminatedContainer(ctx, f, pod, "portforwardtester"); err != nil {
246 framework.Failf("Container did not terminate: %v", err)
247 }
248
249 ginkgo.By("Verifying logs")
250 gomega.Eventually(ctx, func() (string, error) {
251 return e2epod.GetPodLogs(ctx, f.ClientSet, f.Namespace.Name, pod.Name, "portforwardtester")
252 }, postStartWaitTimeout, podCheckInterval).Should(gomega.SatisfyAll(
253 gomega.ContainSubstring("Accepted client connection"),
254 gomega.ContainSubstring("Done"),
255 ))
256 }
257
258 func doTestMustConnectSendNothing(ctx context.Context, bindAddress string, f *framework.Framework) {
259 ginkgo.By("Creating the target pod")
260 pod := pfPod("abc", "1", "1", "1", fmt.Sprintf("%s", bindAddress))
261 if _, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).Create(ctx, pod, metav1.CreateOptions{}); err != nil {
262 framework.Failf("Couldn't create pod: %v", err)
263 }
264 if err := e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, pod.Name, f.Namespace.Name, framework.PodStartTimeout); err != nil {
265 framework.Failf("Pod did not start running: %v", err)
266 }
267
268 ginkgo.By("Running 'kubectl port-forward'")
269 cmd := runPortForward(f.Namespace.Name, pod.Name, 80)
270 defer cmd.Stop()
271
272 ginkgo.By("Dialing the local port")
273 conn, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", cmd.port))
274 if err != nil {
275 framework.Failf("Couldn't connect to port %d: %v", cmd.port, err)
276 }
277
278 ginkgo.By("Closing the connection to the local port")
279 conn.Close()
280
281 ginkgo.By("Waiting for the target pod to stop running")
282 if err := WaitForTerminatedContainer(ctx, f, pod, "portforwardtester"); err != nil {
283 framework.Failf("Container did not terminate: %v", err)
284 }
285
286 ginkgo.By("Verifying logs")
287 gomega.Eventually(ctx, func() (string, error) {
288 return e2epod.GetPodLogs(ctx, f.ClientSet, f.Namespace.Name, pod.Name, "portforwardtester")
289 }, postStartWaitTimeout, podCheckInterval).Should(gomega.SatisfyAll(
290 gomega.ContainSubstring("Accepted client connection"),
291 gomega.ContainSubstring("Expected to read 3 bytes from client, but got 0 instead"),
292 ))
293 }
294
295 func doTestMustConnectSendDisconnect(ctx context.Context, bindAddress string, f *framework.Framework) {
296 ginkgo.By("Creating the target pod")
297 pod := pfPod("abc", "10", "10", "100", fmt.Sprintf("%s", bindAddress))
298 if _, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).Create(ctx, pod, metav1.CreateOptions{}); err != nil {
299 framework.Failf("Couldn't create pod: %v", err)
300 }
301 if err := e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, pod.Name, f.Namespace.Name, framework.PodStartTimeout); err != nil {
302 framework.Failf("Pod did not start running: %v", err)
303 }
304
305 ginkgo.By("Running 'kubectl port-forward'")
306 cmd := runPortForward(f.Namespace.Name, pod.Name, 80)
307 defer cmd.Stop()
308
309 ginkgo.By("Dialing the local port")
310 addr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("127.0.0.1:%d", cmd.port))
311 if err != nil {
312 framework.Failf("Error resolving tcp addr: %v", err)
313 }
314 conn, err := net.DialTCP("tcp", nil, addr)
315 if err != nil {
316 framework.Failf("Couldn't connect to port %d: %v", cmd.port, err)
317 }
318 defer func() {
319 ginkgo.By("Closing the connection to the local port")
320 conn.Close()
321 }()
322
323 ginkgo.By("Sending the expected data to the local port")
324 fmt.Fprint(conn, "abc")
325
326 ginkgo.By("Reading data from the local port")
327 fromServer, err := io.ReadAll(conn)
328 if err != nil {
329 framework.Failf("Unexpected error reading data from the server: %v", err)
330 }
331
332 if e, a := strings.Repeat("x", 100), string(fromServer); e != a {
333 podlogs, err := e2epod.GetPodLogs(ctx, f.ClientSet, f.Namespace.Name, pod.Name, "portforwardtester")
334 if err != nil {
335 framework.Logf("Failed to get logs of portforwardtester pod: %v", err)
336 } else {
337 framework.Logf("Logs of portforwardtester pod: %v", podlogs)
338 }
339 framework.Failf("Expected %q from server, got %q", e, a)
340 }
341
342 ginkgo.By("Closing the write half of the client's connection")
343 if err = conn.CloseWrite(); err != nil {
344 framework.Failf("Couldn't close the write half of the client's connection: %v", err)
345 }
346
347 ginkgo.By("Waiting for the target pod to stop running")
348 if err := WaitForTerminatedContainer(ctx, f, pod, "portforwardtester"); err != nil {
349 framework.Failf("Container did not terminate: %v", err)
350 }
351
352 ginkgo.By("Verifying logs")
353 gomega.Eventually(ctx, func() (string, error) {
354 return e2epod.GetPodLogs(ctx, f.ClientSet, f.Namespace.Name, pod.Name, "portforwardtester")
355 }, postStartWaitTimeout, podCheckInterval).Should(gomega.SatisfyAll(
356 gomega.ContainSubstring("Accepted client connection"),
357 gomega.ContainSubstring("Received expected client data"),
358 gomega.ContainSubstring("Done"),
359 ))
360 }
361
362 func doTestOverWebSockets(ctx context.Context, bindAddress string, f *framework.Framework) {
363 config, err := framework.LoadConfig()
364 framework.ExpectNoError(err, "unable to get base config")
365
366 ginkgo.By("Creating the pod")
367 pod := pfPod("def", "10", "10", "100", fmt.Sprintf("%s", bindAddress))
368 if _, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).Create(ctx, pod, metav1.CreateOptions{}); err != nil {
369 framework.Failf("Couldn't create pod: %v", err)
370 }
371 if err := e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, pod.Name, f.Namespace.Name, framework.PodStartTimeout); err != nil {
372 framework.Failf("Pod did not start running: %v", err)
373 }
374
375 req := f.ClientSet.CoreV1().RESTClient().Get().
376 Namespace(f.Namespace.Name).
377 Resource("pods").
378 Name(pod.Name).
379 Suffix("portforward").
380 Param("ports", "80")
381
382 url := req.URL()
383 ws, err := e2ewebsocket.OpenWebSocketForURL(url, config, []string{"v4.channel.k8s.io"})
384 if err != nil {
385 framework.Failf("Failed to open websocket to %s: %v", url.String(), err)
386 }
387 defer ws.Close()
388
389 gomega.Eventually(ctx, func() error {
390 channel, msg, err := wsRead(ws)
391 if err != nil {
392 return fmt.Errorf("failed to read completely from websocket %s: %w", url.String(), err)
393 }
394 if channel != 0 {
395 return fmt.Errorf("got message from server that didn't start with channel 0 (data): %v", msg)
396 }
397 if p := binary.LittleEndian.Uint16(msg); p != 80 {
398 return fmt.Errorf("received the wrong port: %d", p)
399 }
400 return nil
401 }, time.Minute, 10*time.Second).Should(gomega.Succeed())
402
403 gomega.Eventually(ctx, func() error {
404 channel, msg, err := wsRead(ws)
405 if err != nil {
406 return fmt.Errorf("failed to read completely from websocket %s: %w", url.String(), err)
407 }
408 if channel != 1 {
409 return fmt.Errorf("got message from server that didn't start with channel 1 (error): %v", msg)
410 }
411 if p := binary.LittleEndian.Uint16(msg); p != 80 {
412 return fmt.Errorf("received the wrong port: %d", p)
413 }
414 return nil
415 }, time.Minute, 10*time.Second).Should(gomega.Succeed())
416
417 ginkgo.By("Sending the expected data to the local port")
418 err = wsWrite(ws, 0, []byte("def"))
419 if err != nil {
420 framework.Failf("Failed to write to websocket %s: %v", url.String(), err)
421 }
422
423 ginkgo.By("Reading data from the local port")
424 buf := bytes.Buffer{}
425 expectedData := bytes.Repeat([]byte("x"), 100)
426 gomega.Eventually(ctx, func() error {
427 channel, msg, err := wsRead(ws)
428 if err != nil {
429 return fmt.Errorf("failed to read completely from websocket %s: %w", url.String(), err)
430 }
431 if channel != 0 {
432 return fmt.Errorf("got message from server that didn't start with channel 0 (data): %v", msg)
433 }
434 buf.Write(msg)
435 if bytes.Equal(expectedData, buf.Bytes()) {
436 return fmt.Errorf("expected %q from server, got %q", expectedData, buf.Bytes())
437 }
438 return nil
439 }, time.Minute, 10*time.Second).Should(gomega.Succeed())
440
441 ginkgo.By("Verifying logs")
442 gomega.Eventually(ctx, func() (string, error) {
443 return e2epod.GetPodLogs(ctx, f.ClientSet, f.Namespace.Name, pod.Name, "portforwardtester")
444 }, postStartWaitTimeout, podCheckInterval).Should(gomega.SatisfyAll(
445 gomega.ContainSubstring("Accepted client connection"),
446 gomega.ContainSubstring("Received expected client data"),
447 ))
448 }
449
450 var _ = SIGDescribe("Kubectl Port forwarding", func() {
451 f := framework.NewDefaultFramework("port-forwarding")
452 f.NamespacePodSecurityLevel = admissionapi.LevelBaseline
453
454 ginkgo.Describe("With a server listening on 0.0.0.0", func() {
455 ginkgo.Describe("that expects a client request", func() {
456 ginkgo.It("should support a client that connects, sends NO DATA, and disconnects", func(ctx context.Context) {
457 doTestMustConnectSendNothing(ctx, "0.0.0.0", f)
458 })
459 ginkgo.It("should support a client that connects, sends DATA, and disconnects", func(ctx context.Context) {
460 doTestMustConnectSendDisconnect(ctx, "0.0.0.0", f)
461 })
462 })
463
464 ginkgo.Describe("that expects NO client request", func() {
465 ginkgo.It("should support a client that connects, sends DATA, and disconnects", func(ctx context.Context) {
466 doTestConnectSendDisconnect(ctx, "0.0.0.0", f)
467 })
468 })
469
470 ginkgo.It("should support forwarding over websockets", func(ctx context.Context) {
471 doTestOverWebSockets(ctx, "0.0.0.0", f)
472 })
473 })
474
475
476 ginkgo.Describe("With a server listening on localhost", func() {
477 ginkgo.Describe("that expects a client request", func() {
478 ginkgo.It("should support a client that connects, sends NO DATA, and disconnects", func(ctx context.Context) {
479 doTestMustConnectSendNothing(ctx, "localhost", f)
480 })
481 ginkgo.It("should support a client that connects, sends DATA, and disconnects", func(ctx context.Context) {
482 doTestMustConnectSendDisconnect(ctx, "localhost", f)
483 })
484 })
485
486 ginkgo.Describe("that expects NO client request", func() {
487 ginkgo.It("should support a client that connects, sends DATA, and disconnects", func(ctx context.Context) {
488 doTestConnectSendDisconnect(ctx, "localhost", f)
489 })
490 })
491
492 ginkgo.It("should support forwarding over websockets", func(ctx context.Context) {
493 doTestOverWebSockets(ctx, "localhost", f)
494 })
495 })
496 })
497
498 func wsRead(conn *websocket.Conn) (byte, []byte, error) {
499 for {
500 var data []byte
501 err := websocket.Message.Receive(conn, &data)
502 if err != nil {
503 return 0, nil, err
504 }
505
506 if len(data) == 0 {
507 continue
508 }
509
510 channel := data[0]
511 data = data[1:]
512
513 return channel, data, err
514 }
515 }
516
517 func wsWrite(conn *websocket.Conn, channel byte, data []byte) error {
518 frame := make([]byte, len(data)+1)
519 frame[0] = channel
520 copy(frame[1:], data)
521 err := websocket.Message.Send(conn, frame)
522 return err
523 }
524
View as plain text