1
16
17 package portforward
18
19 import (
20 "context"
21 "fmt"
22 "net/http"
23 "net/url"
24 "os"
25 "os/signal"
26 "strconv"
27 "strings"
28 "time"
29
30 "github.com/spf13/cobra"
31
32 corev1 "k8s.io/api/core/v1"
33 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
34 "k8s.io/apimachinery/pkg/util/httpstream"
35 "k8s.io/apimachinery/pkg/util/sets"
36 "k8s.io/cli-runtime/pkg/genericiooptions"
37 "k8s.io/client-go/kubernetes/scheme"
38 corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
39 restclient "k8s.io/client-go/rest"
40 "k8s.io/client-go/tools/portforward"
41 "k8s.io/client-go/transport/spdy"
42 cmdutil "k8s.io/kubectl/pkg/cmd/util"
43 "k8s.io/kubectl/pkg/polymorphichelpers"
44 "k8s.io/kubectl/pkg/util"
45 "k8s.io/kubectl/pkg/util/completion"
46 "k8s.io/kubectl/pkg/util/i18n"
47 "k8s.io/kubectl/pkg/util/templates"
48 )
49
50
51 type PortForwardOptions struct {
52 Namespace string
53 PodName string
54 RESTClient restclient.Interface
55 Config *restclient.Config
56 PodClient corev1client.PodsGetter
57 Address []string
58 Ports []string
59 PortForwarder portForwarder
60 StopChannel chan struct{}
61 ReadyChannel chan struct{}
62 }
63
64 var (
65 portforwardLong = templates.LongDesc(i18n.T(`
66 Forward one or more local ports to a pod.
67
68 Use resource type/name such as deployment/mydeployment to select a pod. Resource type defaults to 'pod' if omitted.
69
70 If there are multiple pods matching the criteria, a pod will be selected automatically. The
71 forwarding session ends when the selected pod terminates, and a rerun of the command is needed
72 to resume forwarding.`))
73
74 portforwardExample = templates.Examples(i18n.T(`
75 # Listen on ports 5000 and 6000 locally, forwarding data to/from ports 5000 and 6000 in the pod
76 kubectl port-forward pod/mypod 5000 6000
77
78 # Listen on ports 5000 and 6000 locally, forwarding data to/from ports 5000 and 6000 in a pod selected by the deployment
79 kubectl port-forward deployment/mydeployment 5000 6000
80
81 # Listen on port 8443 locally, forwarding to the targetPort of the service's port named "https" in a pod selected by the service
82 kubectl port-forward service/myservice 8443:https
83
84 # Listen on port 8888 locally, forwarding to 5000 in the pod
85 kubectl port-forward pod/mypod 8888:5000
86
87 # Listen on port 8888 on all addresses, forwarding to 5000 in the pod
88 kubectl port-forward --address 0.0.0.0 pod/mypod 8888:5000
89
90 # Listen on port 8888 on localhost and selected IP, forwarding to 5000 in the pod
91 kubectl port-forward --address localhost,10.19.21.23 pod/mypod 8888:5000
92
93 # Listen on a random port locally, forwarding to 5000 in the pod
94 kubectl port-forward pod/mypod :5000`))
95 )
96
97 const (
98
99 defaultPodPortForwardWaitTimeout = 60 * time.Second
100 )
101
102 func NewCmdPortForward(f cmdutil.Factory, streams genericiooptions.IOStreams) *cobra.Command {
103 opts := NewDefaultPortForwardOptions(streams)
104 cmd := &cobra.Command{
105 Use: "port-forward TYPE/NAME [options] [LOCAL_PORT:]REMOTE_PORT [...[LOCAL_PORT_N:]REMOTE_PORT_N]",
106 DisableFlagsInUseLine: true,
107 Short: i18n.T("Forward one or more local ports to a pod"),
108 Long: portforwardLong,
109 Example: portforwardExample,
110 ValidArgsFunction: completion.PodResourceNameCompletionFunc(f),
111 Run: func(cmd *cobra.Command, args []string) {
112 cmdutil.CheckErr(opts.Complete(f, cmd, args))
113 cmdutil.CheckErr(opts.Validate())
114 cmdutil.CheckErr(opts.RunPortForward())
115 },
116 }
117 cmdutil.AddPodRunningTimeoutFlag(cmd, defaultPodPortForwardWaitTimeout)
118 cmd.Flags().StringSliceVar(&opts.Address, "address", []string{"localhost"}, "Addresses to listen on (comma separated). Only accepts IP addresses or localhost as a value. When localhost is supplied, kubectl will try to bind on both 127.0.0.1 and ::1 and will fail if neither of these addresses are available to bind.")
119
120 return cmd
121 }
122
123 func NewDefaultPortForwardOptions(streams genericiooptions.IOStreams) *PortForwardOptions {
124 return &PortForwardOptions{
125 PortForwarder: &defaultPortForwarder{
126 IOStreams: streams,
127 },
128 }
129 }
130
131 type portForwarder interface {
132 ForwardPorts(method string, url *url.URL, opts PortForwardOptions) error
133 }
134
135 type defaultPortForwarder struct {
136 genericiooptions.IOStreams
137 }
138
139 func (f *defaultPortForwarder) ForwardPorts(method string, url *url.URL, opts PortForwardOptions) error {
140 transport, upgrader, err := spdy.RoundTripperFor(opts.Config)
141 if err != nil {
142 return err
143 }
144 dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, method, url)
145 if cmdutil.PortForwardWebsockets.IsEnabled() {
146 tunnelingDialer, err := portforward.NewSPDYOverWebsocketDialer(url, opts.Config)
147 if err != nil {
148 return err
149 }
150
151 dialer = portforward.NewFallbackDialer(tunnelingDialer, dialer, httpstream.IsUpgradeFailure)
152 }
153 fw, err := portforward.NewOnAddresses(dialer, opts.Address, opts.Ports, opts.StopChannel, opts.ReadyChannel, f.Out, f.ErrOut)
154 if err != nil {
155 return err
156 }
157 return fw.ForwardPorts()
158 }
159
160
161
162 func splitPort(port string) (local, remote string) {
163 parts := strings.Split(port, ":")
164 if len(parts) == 2 {
165 return parts[0], parts[1]
166 }
167
168 return parts[0], parts[0]
169 }
170
171
172
173
174
175 func translateServicePortToTargetPort(ports []string, svc corev1.Service, pod corev1.Pod) ([]string, error) {
176 var translated []string
177 for _, port := range ports {
178 localPort, remotePort := splitPort(port)
179
180 portnum, err := strconv.Atoi(remotePort)
181 if err != nil {
182 svcPort, err := util.LookupServicePortNumberByName(svc, remotePort)
183 if err != nil {
184 return nil, err
185 }
186 portnum = int(svcPort)
187
188 if localPort == remotePort {
189 localPort = strconv.Itoa(portnum)
190 }
191 }
192 containerPort, err := util.LookupContainerPortNumberByServicePort(svc, pod, int32(portnum))
193 if err != nil {
194
195 return nil, err
196 }
197
198
199 remotePort = strconv.Itoa(int(containerPort))
200
201 if localPort != remotePort {
202 translated = append(translated, fmt.Sprintf("%s:%s", localPort, remotePort))
203 } else {
204 translated = append(translated, remotePort)
205 }
206 }
207 return translated, nil
208 }
209
210
211
212 func convertPodNamedPortToNumber(ports []string, pod corev1.Pod) ([]string, error) {
213 var converted []string
214 for _, port := range ports {
215 localPort, remotePort := splitPort(port)
216
217 containerPortStr := remotePort
218 _, err := strconv.Atoi(remotePort)
219 if err != nil {
220 containerPort, err := util.LookupContainerPortNumberByName(pod, remotePort)
221 if err != nil {
222 return nil, err
223 }
224
225 containerPortStr = strconv.Itoa(int(containerPort))
226 }
227
228 if localPort != remotePort {
229 converted = append(converted, fmt.Sprintf("%s:%s", localPort, containerPortStr))
230 } else {
231 converted = append(converted, containerPortStr)
232 }
233 }
234
235 return converted, nil
236 }
237
238 func checkUDPPorts(udpOnlyPorts sets.Int, ports []string, obj metav1.Object) error {
239 for _, port := range ports {
240 _, remotePort := splitPort(port)
241 portNum, err := strconv.Atoi(remotePort)
242 if err != nil {
243 switch v := obj.(type) {
244 case *corev1.Service:
245 svcPort, err := util.LookupServicePortNumberByName(*v, remotePort)
246 if err != nil {
247 return err
248 }
249 portNum = int(svcPort)
250
251 case *corev1.Pod:
252 ctPort, err := util.LookupContainerPortNumberByName(*v, remotePort)
253 if err != nil {
254 return err
255 }
256 portNum = int(ctPort)
257
258 default:
259 return fmt.Errorf("unknown object: %v", obj)
260 }
261 }
262 if udpOnlyPorts.Has(portNum) {
263 return fmt.Errorf("UDP protocol is not supported for %s", remotePort)
264 }
265 }
266 return nil
267 }
268
269
270
271 func checkUDPPortInService(ports []string, svc *corev1.Service) error {
272 udpPorts := sets.NewInt()
273 tcpPorts := sets.NewInt()
274 for _, port := range svc.Spec.Ports {
275 portNum := int(port.Port)
276 switch port.Protocol {
277 case corev1.ProtocolUDP:
278 udpPorts.Insert(portNum)
279 case corev1.ProtocolTCP:
280 tcpPorts.Insert(portNum)
281 }
282 }
283 return checkUDPPorts(udpPorts.Difference(tcpPorts), ports, svc)
284 }
285
286
287
288 func checkUDPPortInPod(ports []string, pod *corev1.Pod) error {
289 udpPorts := sets.NewInt()
290 tcpPorts := sets.NewInt()
291 for _, ct := range pod.Spec.Containers {
292 for _, ctPort := range ct.Ports {
293 portNum := int(ctPort.ContainerPort)
294 switch ctPort.Protocol {
295 case corev1.ProtocolUDP:
296 udpPorts.Insert(portNum)
297 case corev1.ProtocolTCP:
298 tcpPorts.Insert(portNum)
299 }
300 }
301 }
302 return checkUDPPorts(udpPorts.Difference(tcpPorts), ports, pod)
303 }
304
305
306 func (o *PortForwardOptions) Complete(f cmdutil.Factory, cmd *cobra.Command, args []string) error {
307 var err error
308 if len(args) < 2 {
309 return cmdutil.UsageErrorf(cmd, "TYPE/NAME and list of ports are required for port-forward")
310 }
311
312 o.Namespace, _, err = f.ToRawKubeConfigLoader().Namespace()
313 if err != nil {
314 return err
315 }
316
317 builder := f.NewBuilder().
318 WithScheme(scheme.Scheme, scheme.Scheme.PrioritizedVersionsAllGroups()...).
319 ContinueOnError().
320 NamespaceParam(o.Namespace).DefaultNamespace()
321
322 getPodTimeout, err := cmdutil.GetPodRunningTimeoutFlag(cmd)
323 if err != nil {
324 return cmdutil.UsageErrorf(cmd, err.Error())
325 }
326
327 resourceName := args[0]
328 builder.ResourceNames("pods", resourceName)
329
330 obj, err := builder.Do().Object()
331 if err != nil {
332 return err
333 }
334
335 forwardablePod, err := polymorphichelpers.AttachablePodForObjectFn(f, obj, getPodTimeout)
336 if err != nil {
337 return err
338 }
339
340 o.PodName = forwardablePod.Name
341
342
343 switch t := obj.(type) {
344 case *corev1.Service:
345 err = checkUDPPortInService(args[1:], t)
346 if err != nil {
347 return err
348 }
349 o.Ports, err = translateServicePortToTargetPort(args[1:], *t, *forwardablePod)
350 if err != nil {
351 return err
352 }
353 default:
354 err = checkUDPPortInPod(args[1:], forwardablePod)
355 if err != nil {
356 return err
357 }
358 o.Ports, err = convertPodNamedPortToNumber(args[1:], *forwardablePod)
359 if err != nil {
360 return err
361 }
362 }
363
364 clientset, err := f.KubernetesClientSet()
365 if err != nil {
366 return err
367 }
368
369 o.PodClient = clientset.CoreV1()
370
371 o.Config, err = f.ToRESTConfig()
372 if err != nil {
373 return err
374 }
375 o.RESTClient, err = f.RESTClient()
376 if err != nil {
377 return err
378 }
379
380 o.StopChannel = make(chan struct{}, 1)
381 o.ReadyChannel = make(chan struct{})
382 return nil
383 }
384
385
386 func (o PortForwardOptions) Validate() error {
387 if len(o.PodName) == 0 {
388 return fmt.Errorf("pod name or resource type/name must be specified")
389 }
390
391 if len(o.Ports) < 1 {
392 return fmt.Errorf("at least 1 PORT is required for port-forward")
393 }
394
395 if o.PortForwarder == nil || o.PodClient == nil || o.RESTClient == nil || o.Config == nil {
396 return fmt.Errorf("client, client config, restClient, and portforwarder must be provided")
397 }
398 return nil
399 }
400
401
402
403 func (o PortForwardOptions) RunPortForward() error {
404 return o.RunPortForwardContext(context.Background())
405 }
406
407
408
409
410 func (o PortForwardOptions) RunPortForwardContext(ctx context.Context) error {
411 pod, err := o.PodClient.Pods(o.Namespace).Get(ctx, o.PodName, metav1.GetOptions{})
412 if err != nil {
413 return err
414 }
415
416 if pod.Status.Phase != corev1.PodRunning {
417 return fmt.Errorf("unable to forward port because pod is not running. Current status=%v", pod.Status.Phase)
418 }
419
420 signals := make(chan os.Signal, 1)
421 signal.Notify(signals, os.Interrupt)
422 defer signal.Stop(signals)
423
424 returnCtx, returnCtxCancel := context.WithCancel(ctx)
425 defer returnCtxCancel()
426
427 go func() {
428 select {
429 case <-signals:
430 case <-returnCtx.Done():
431 }
432 if o.StopChannel != nil {
433 close(o.StopChannel)
434 }
435 }()
436
437 req := o.RESTClient.Post().
438 Resource("pods").
439 Namespace(o.Namespace).
440 Name(pod.Name).
441 SubResource("portforward")
442
443 return o.PortForwarder.ForwardPorts("POST", req.URL(), o)
444 }
445
View as plain text