     1  /*
     2  Copyright 2014 The Kubernetes Authors.
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     8      http://www.apache.org/licenses/LICENSE-2.0
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    17  package portforward
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"net/http"
    23  	"net/url"
    24  	"os"
    25  	"os/signal"
    26  	"strconv"
    27  	"strings"
    28  	"time"
    30  	"github.com/spf13/cobra"
    32  	corev1 "k8s.io/api/core/v1"
    33  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    34  	"k8s.io/apimachinery/pkg/util/httpstream"
    35  	"k8s.io/apimachinery/pkg/util/sets"
    36  	"k8s.io/cli-runtime/pkg/genericiooptions"
    37  	"k8s.io/client-go/kubernetes/scheme"
    38  	corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
    39  	restclient "k8s.io/client-go/rest"
    40  	"k8s.io/client-go/tools/portforward"
    41  	"k8s.io/client-go/transport/spdy"
    42  	cmdutil "k8s.io/kubectl/pkg/cmd/util"
    43  	"k8s.io/kubectl/pkg/polymorphichelpers"
    44  	"k8s.io/kubectl/pkg/util"
    45  	"k8s.io/kubectl/pkg/util/completion"
    46  	"k8s.io/kubectl/pkg/util/i18n"
    47  	"k8s.io/kubectl/pkg/util/templates"
    48  )
    50  // PortForwardOptions contains all the options for running the port-forward cli command.
    51  type PortForwardOptions struct {
    52  	Namespace     string
    53  	PodName       string
    54  	RESTClient    restclient.Interface
    55  	Config        *restclient.Config
    56  	PodClient     corev1client.PodsGetter
    57  	Address       []string
    58  	Ports         []string
    59  	PortForwarder portForwarder
    60  	StopChannel   chan struct{}
    61  	ReadyChannel  chan struct{}
    62  }
    64  var (
    65  	portforwardLong = templates.LongDesc(i18n.T(`
    66                  Forward one or more local ports to a pod.
    68                  Use resource type/name such as deployment/mydeployment to select a pod. Resource type defaults to 'pod' if omitted.
    70                  If there are multiple pods matching the criteria, a pod will be selected automatically. The
    71                  forwarding session ends when the selected pod terminates, and a rerun of the command is needed
    72                  to resume forwarding.`))
    74  	portforwardExample = templates.Examples(i18n.T(`
    75  		# Listen on ports 5000 and 6000 locally, forwarding data to/from ports 5000 and 6000 in the pod
    76  		kubectl port-forward pod/mypod 5000 6000
    78  		# Listen on ports 5000 and 6000 locally, forwarding data to/from ports 5000 and 6000 in a pod selected by the deployment
    79  		kubectl port-forward deployment/mydeployment 5000 6000
    81  		# Listen on port 8443 locally, forwarding to the targetPort of the service's port named "https" in a pod selected by the service
    82  		kubectl port-forward service/myservice 8443:https
    84  		# Listen on port 8888 locally, forwarding to 5000 in the pod
    85  		kubectl port-forward pod/mypod 8888:5000
    87  		# Listen on port 8888 on all addresses, forwarding to 5000 in the pod
    88  		kubectl port-forward --address pod/mypod 8888:5000
    90  		# Listen on port 8888 on localhost and selected IP, forwarding to 5000 in the pod
    91  		kubectl port-forward --address localhost, pod/mypod 8888:5000
    93  		# Listen on a random port locally, forwarding to 5000 in the pod
    94  		kubectl port-forward pod/mypod :5000`))
    95  )
    97  const (
    98  	// Amount of time to wait until at least one pod is running
    99  	defaultPodPortForwardWaitTimeout = 60 * time.Second
   100  )
   102  func NewCmdPortForward(f cmdutil.Factory, streams genericiooptions.IOStreams) *cobra.Command {
   103  	opts := NewDefaultPortForwardOptions(streams)
   104  	cmd := &cobra.Command{
   105  		Use:                   "port-forward TYPE/NAME [options] [LOCAL_PORT:]REMOTE_PORT [...[LOCAL_PORT_N:]REMOTE_PORT_N]",
   106  		DisableFlagsInUseLine: true,
   107  		Short:                 i18n.T("Forward one or more local ports to a pod"),
   108  		Long:                  portforwardLong,
   109  		Example:               portforwardExample,
   110  		ValidArgsFunction:     completion.PodResourceNameCompletionFunc(f),
   111  		Run: func(cmd *cobra.Command, args []string) {
   112  			cmdutil.CheckErr(opts.Complete(f, cmd, args))
   113  			cmdutil.CheckErr(opts.Validate())
   114  			cmdutil.CheckErr(opts.RunPortForward())
   115  		},
   116  	}
   117  	cmdutil.AddPodRunningTimeoutFlag(cmd, defaultPodPortForwardWaitTimeout)
   118  	cmd.Flags().StringSliceVar(&opts.Address, "address", []string{"localhost"}, "Addresses to listen on (comma separated). Only accepts IP addresses or localhost as a value. When localhost is supplied, kubectl will try to bind on both and ::1 and will fail if neither of these addresses are available to bind.")
   119  	// TODO support UID
   120  	return cmd
   121  }
   123  func NewDefaultPortForwardOptions(streams genericiooptions.IOStreams) *PortForwardOptions {
   124  	return &PortForwardOptions{
   125  		PortForwarder: &defaultPortForwarder{
   126  			IOStreams: streams,
   127  		},
   128  	}
   129  }
   131  type portForwarder interface {
   132  	ForwardPorts(method string, url *url.URL, opts PortForwardOptions) error
   133  }
   135  type defaultPortForwarder struct {
   136  	genericiooptions.IOStreams
   137  }
   139  func (f *defaultPortForwarder) ForwardPorts(method string, url *url.URL, opts PortForwardOptions) error {
   140  	transport, upgrader, err := spdy.RoundTripperFor(opts.Config)
   141  	if err != nil {
   142  		return err
   143  	}
   144  	dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, method, url)
   145  	if cmdutil.PortForwardWebsockets.IsEnabled() {
   146  		tunnelingDialer, err := portforward.NewSPDYOverWebsocketDialer(url, opts.Config)
   147  		if err != nil {
   148  			return err
   149  		}
   150  		// First attempt tunneling (websocket) dialer, then fallback to spdy dialer.
   151  		dialer = portforward.NewFallbackDialer(tunnelingDialer, dialer, httpstream.IsUpgradeFailure)
   152  	}
   153  	fw, err := portforward.NewOnAddresses(dialer, opts.Address, opts.Ports, opts.StopChannel, opts.ReadyChannel, f.Out, f.ErrOut)
   154  	if err != nil {
   155  		return err
   156  	}
   157  	return fw.ForwardPorts()
   158  }
   160  // splitPort splits port string which is in form of [LOCAL PORT]:REMOTE PORT
   161  // and returns local and remote ports separately
   162  func splitPort(port string) (local, remote string) {
   163  	parts := strings.Split(port, ":")
   164  	if len(parts) == 2 {
   165  		return parts[0], parts[1]
   166  	}
   168  	return parts[0], parts[0]
   169  }
   171  // Translates service port to target port
   172  // It rewrites ports as needed if the Service port declares targetPort.
   173  // It returns an error when a named targetPort can't find a match in the pod, or the Service did not declare
   174  // the port.
   175  func translateServicePortToTargetPort(ports []string, svc corev1.Service, pod corev1.Pod) ([]string, error) {
   176  	var translated []string
   177  	for _, port := range ports {
   178  		localPort, remotePort := splitPort(port)
   180  		portnum, err := strconv.Atoi(remotePort)
   181  		if err != nil {
   182  			svcPort, err := util.LookupServicePortNumberByName(svc, remotePort)
   183  			if err != nil {
   184  				return nil, err
   185  			}
   186  			portnum = int(svcPort)
   188  			if localPort == remotePort {
   189  				localPort = strconv.Itoa(portnum)
   190  			}
   191  		}
   192  		containerPort, err := util.LookupContainerPortNumberByServicePort(svc, pod, int32(portnum))
   193  		if err != nil {
   194  			// can't resolve a named port, or Service did not declare this port, return an error
   195  			return nil, err
   196  		}
   198  		// convert the resolved target port back to a string
   199  		remotePort = strconv.Itoa(int(containerPort))
   201  		if localPort != remotePort {
   202  			translated = append(translated, fmt.Sprintf("%s:%s", localPort, remotePort))
   203  		} else {
   204  			translated = append(translated, remotePort)
   205  		}
   206  	}
   207  	return translated, nil
   208  }
   210  // convertPodNamedPortToNumber converts named ports into port numbers
   211  // It returns an error when a named port can't be found in the pod containers
   212  func convertPodNamedPortToNumber(ports []string, pod corev1.Pod) ([]string, error) {
   213  	var converted []string
   214  	for _, port := range ports {
   215  		localPort, remotePort := splitPort(port)
   217  		containerPortStr := remotePort
   218  		_, err := strconv.Atoi(remotePort)
   219  		if err != nil {
   220  			containerPort, err := util.LookupContainerPortNumberByName(pod, remotePort)
   221  			if err != nil {
   222  				return nil, err
   223  			}
   225  			containerPortStr = strconv.Itoa(int(containerPort))
   226  		}
   228  		if localPort != remotePort {
   229  			converted = append(converted, fmt.Sprintf("%s:%s", localPort, containerPortStr))
   230  		} else {
   231  			converted = append(converted, containerPortStr)
   232  		}
   233  	}
   235  	return converted, nil
   236  }
   238  func checkUDPPorts(udpOnlyPorts sets.Int, ports []string, obj metav1.Object) error {
   239  	for _, port := range ports {
   240  		_, remotePort := splitPort(port)
   241  		portNum, err := strconv.Atoi(remotePort)
   242  		if err != nil {
   243  			switch v := obj.(type) {
   244  			case *corev1.Service:
   245  				svcPort, err := util.LookupServicePortNumberByName(*v, remotePort)
   246  				if err != nil {
   247  					return err
   248  				}
   249  				portNum = int(svcPort)
   251  			case *corev1.Pod:
   252  				ctPort, err := util.LookupContainerPortNumberByName(*v, remotePort)
   253  				if err != nil {
   254  					return err
   255  				}
   256  				portNum = int(ctPort)
   258  			default:
   259  				return fmt.Errorf("unknown object: %v", obj)
   260  			}
   261  		}
   262  		if udpOnlyPorts.Has(portNum) {
   263  			return fmt.Errorf("UDP protocol is not supported for %s", remotePort)
   264  		}
   265  	}
   266  	return nil
   267  }
   269  // checkUDPPortInService returns an error if remote port in Service is a UDP port
   270  // TODO: remove this check after #47862 is solved
   271  func checkUDPPortInService(ports []string, svc *corev1.Service) error {
   272  	udpPorts := sets.NewInt()
   273  	tcpPorts := sets.NewInt()
   274  	for _, port := range svc.Spec.Ports {
   275  		portNum := int(port.Port)
   276  		switch port.Protocol {
   277  		case corev1.ProtocolUDP:
   278  			udpPorts.Insert(portNum)
   279  		case corev1.ProtocolTCP:
   280  			tcpPorts.Insert(portNum)
   281  		}
   282  	}
   283  	return checkUDPPorts(udpPorts.Difference(tcpPorts), ports, svc)
   284  }
   286  // checkUDPPortInPod returns an error if remote port in Pod is a UDP port
   287  // TODO: remove this check after #47862 is solved
   288  func checkUDPPortInPod(ports []string, pod *corev1.Pod) error {
   289  	udpPorts := sets.NewInt()
   290  	tcpPorts := sets.NewInt()
   291  	for _, ct := range pod.Spec.Containers {
   292  		for _, ctPort := range ct.Ports {
   293  			portNum := int(ctPort.ContainerPort)
   294  			switch ctPort.Protocol {
   295  			case corev1.ProtocolUDP:
   296  				udpPorts.Insert(portNum)
   297  			case corev1.ProtocolTCP:
   298  				tcpPorts.Insert(portNum)
   299  			}
   300  		}
   301  	}
   302  	return checkUDPPorts(udpPorts.Difference(tcpPorts), ports, pod)
   303  }
   305  // Complete completes all the required options for port-forward cmd.
   306  func (o *PortForwardOptions) Complete(f cmdutil.Factory, cmd *cobra.Command, args []string) error {
   307  	var err error
   308  	if len(args) < 2 {
   309  		return cmdutil.UsageErrorf(cmd, "TYPE/NAME and list of ports are required for port-forward")
   310  	}
   312  	o.Namespace, _, err = f.ToRawKubeConfigLoader().Namespace()
   313  	if err != nil {
   314  		return err
   315  	}
   317  	builder := f.NewBuilder().
   318  		WithScheme(scheme.Scheme, scheme.Scheme.PrioritizedVersionsAllGroups()...).
   319  		ContinueOnError().
   320  		NamespaceParam(o.Namespace).DefaultNamespace()
   322  	getPodTimeout, err := cmdutil.GetPodRunningTimeoutFlag(cmd)
   323  	if err != nil {
   324  		return cmdutil.UsageErrorf(cmd, err.Error())
   325  	}
   327  	resourceName := args[0]
   328  	builder.ResourceNames("pods", resourceName)
   330  	obj, err := builder.Do().Object()
   331  	if err != nil {
   332  		return err
   333  	}
   335  	forwardablePod, err := polymorphichelpers.AttachablePodForObjectFn(f, obj, getPodTimeout)
   336  	if err != nil {
   337  		return err
   338  	}
   340  	o.PodName = forwardablePod.Name
   342  	// handle service port mapping to target port if needed
   343  	switch t := obj.(type) {
   344  	case *corev1.Service:
   345  		err = checkUDPPortInService(args[1:], t)
   346  		if err != nil {
   347  			return err
   348  		}
   349  		o.Ports, err = translateServicePortToTargetPort(args[1:], *t, *forwardablePod)
   350  		if err != nil {
   351  			return err
   352  		}
   353  	default:
   354  		err = checkUDPPortInPod(args[1:], forwardablePod)
   355  		if err != nil {
   356  			return err
   357  		}
   358  		o.Ports, err = convertPodNamedPortToNumber(args[1:], *forwardablePod)
   359  		if err != nil {
   360  			return err
   361  		}
   362  	}
   364  	clientset, err := f.KubernetesClientSet()
   365  	if err != nil {
   366  		return err
   367  	}
   369  	o.PodClient = clientset.CoreV1()
   371  	o.Config, err = f.ToRESTConfig()
   372  	if err != nil {
   373  		return err
   374  	}
   375  	o.RESTClient, err = f.RESTClient()
   376  	if err != nil {
   377  		return err
   378  	}
   380  	o.StopChannel = make(chan struct{}, 1)
   381  	o.ReadyChannel = make(chan struct{})
   382  	return nil
   383  }
   385  // Validate validates all the required options for port-forward cmd.
   386  func (o PortForwardOptions) Validate() error {
   387  	if len(o.PodName) == 0 {
   388  		return fmt.Errorf("pod name or resource type/name must be specified")
   389  	}
   391  	if len(o.Ports) < 1 {
   392  		return fmt.Errorf("at least 1 PORT is required for port-forward")
   393  	}
   395  	if o.PortForwarder == nil || o.PodClient == nil || o.RESTClient == nil || o.Config == nil {
   396  		return fmt.Errorf("client, client config, restClient, and portforwarder must be provided")
   397  	}
   398  	return nil
   399  }
   401  // Deprecated: Use RunPortForwardContext instead, which allows canceling.
   402  // RunPortForward implements all the necessary functionality for port-forward cmd.
   403  func (o PortForwardOptions) RunPortForward() error {
   404  	return o.RunPortForwardContext(context.Background())
   405  }
   407  // RunPortForwardContext implements all the necessary functionality for port-forward cmd.
   408  // It ends portforwarding when an error is received from the backend, or an os.Interrupt
   409  // signal is received, or the provided context is done.
   410  func (o PortForwardOptions) RunPortForwardContext(ctx context.Context) error {
   411  	pod, err := o.PodClient.Pods(o.Namespace).Get(ctx, o.PodName, metav1.GetOptions{})
   412  	if err != nil {
   413  		return err
   414  	}
   416  	if pod.Status.Phase != corev1.PodRunning {
   417  		return fmt.Errorf("unable to forward port because pod is not running. Current status=%v", pod.Status.Phase)
   418  	}
   420  	signals := make(chan os.Signal, 1)
   421  	signal.Notify(signals, os.Interrupt)
   422  	defer signal.Stop(signals)
   424  	returnCtx, returnCtxCancel := context.WithCancel(ctx)
   425  	defer returnCtxCancel()
   427  	go func() {
   428  		select {
   429  		case <-signals:
   430  		case <-returnCtx.Done():
   431  		}
   432  		if o.StopChannel != nil {
   433  			close(o.StopChannel)
   434  		}
   435  	}()
   437  	req := o.RESTClient.Post().
   438  		Resource("pods").
   439  		Namespace(o.Namespace).
   440  		Name(pod.Name).
   441  		SubResource("portforward")
   443  	return o.PortForwarder.ForwardPorts("POST", req.URL(), o)
   444  }

