package k8s import ( "context" "fmt" "io" "net" "net/http" "net/url" "os" "strconv" log "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/portforward" "k8s.io/client-go/transport/spdy" // Load all the auth plugins for the cloud providers. _ "k8s.io/client-go/plugin/pkg/client/auth" ) // PortForward provides a port-forward connection into a Kubernetes cluster. type PortForward struct { method string url *url.URL host string namespace string podName string localPort int remotePort int emitLogs bool stopCh chan struct{} readyCh chan struct{} config *rest.Config } // NewContainerMetricsForward returns an instance of the PortForward struct that can // be used to establish a port-forward connection to a containers metrics // endpoint, specified by namespace, pod, container and portName. func NewContainerMetricsForward( k8sAPI *KubernetesAPI, pod corev1.Pod, container corev1.Container, emitLogs bool, portName string, ) (*PortForward, error) { var port corev1.ContainerPort for _, p := range container.Ports { if p.Name == portName { port = p break } } if port.Name != portName { return nil, fmt.Errorf("no %s port found for container %s/%s", portName, pod.GetName(), container.Name) } return NewPodPortForward(k8sAPI, pod.GetNamespace(), pod.GetName(), "localhost", 0, int(port.ContainerPort), emitLogs) } // NewPortForward returns an instance of the PortForward struct that can be used // to establish a port-forward connection to a pod in the deployment that's // specified by namespace and deployName. If localPort is 0, it will use a // random ephemeral port. // Note that the connection remains open for the life of the process, as this // function is typically called by the CLI. Care should be taken if called from // control plane code. func NewPortForward( ctx context.Context, k8sAPI *KubernetesAPI, namespace, deployName string, host string, localPort, remotePort int, emitLogs bool, ) (*PortForward, error) { timeoutSeconds := int64(30) podList, err := k8sAPI.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{TimeoutSeconds: &timeoutSeconds}) if err != nil { return nil, err } podName := "" for _, pod := range podList.Items { if pod.Status.Phase == corev1.PodRunning { grandparent, err := getDeploymentForPod(ctx, k8sAPI, pod) if err != nil { log.Warnf("Failed to get deploy for pod [%s]: %s", pod.Name, err) continue } if grandparent == deployName { podName = pod.Name break } } } if podName == "" { return nil, fmt.Errorf("no running pods found for %s", deployName) } return NewPodPortForward(k8sAPI, namespace, podName, host, localPort, remotePort, emitLogs) } func getDeploymentForPod(ctx context.Context, k8sAPI *KubernetesAPI, pod corev1.Pod) (string, error) { parents := pod.GetOwnerReferences() if len(parents) != 1 { return "", nil } rs, err := k8sAPI.AppsV1().ReplicaSets(pod.Namespace).Get(ctx, parents[0].Name, metav1.GetOptions{}) if err != nil { return "", err } grandparents := rs.GetOwnerReferences() if len(grandparents) != 1 { return "", nil } return grandparents[0].Name, nil } // NewPodPortForward returns an instance of the PortForward struct that can be // used to establish a port-forward connection to a specific Pod. func NewPodPortForward( k8sAPI *KubernetesAPI, namespace, podName string, host string, localPort, remotePort int, emitLogs bool, ) (*PortForward, error) { restClient := k8sAPI.CoreV1().RESTClient() // This early return is for testing purposes. If the k8sAPI is a fake // client, attempting to create a request will result in a nil-pointer // panic. Instead, we return with no port-forward and no error. if fakeRest, ok := restClient.(*rest.RESTClient); ok { if fakeRest == nil { return nil, nil } } req := restClient.Post(). Resource("pods"). Namespace(namespace). Name(podName). SubResource("portforward") var err error if localPort == 0 { if host != "localhost" { return nil, fmt.Errorf("local port must be specified when host is not localhost") } localPort, err = getEphemeralPort() if err != nil { return nil, err } } return &PortForward{ method: "POST", url: req.URL(), host: host, namespace: namespace, podName: podName, localPort: localPort, remotePort: remotePort, emitLogs: emitLogs, stopCh: make(chan struct{}, 1), readyCh: make(chan struct{}), config: k8sAPI.Config, }, nil } // run creates and runs the port-forward connection. // When the connection is established it blocks until Stop() is called. func (pf *PortForward) run() error { transport, upgrader, err := spdy.RoundTripperFor(pf.config) if err != nil { return err } out := io.Discard errOut := io.Discard if pf.emitLogs { out = os.Stdout errOut = os.Stderr } ports := []string{fmt.Sprintf("%d:%d", pf.localPort, pf.remotePort)} dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, pf.method, pf.url) fw, err := portforward.NewOnAddresses(dialer, []string{pf.host}, ports, pf.stopCh, pf.readyCh, out, errOut) if err != nil { return err } err = fw.ForwardPorts() if err != nil { err = fmt.Errorf("%w for %s/%s", err, pf.namespace, pf.podName) return err } return nil } // Init creates and runs a port-forward connection. // This function blocks until the connection is established, in which case it returns nil. // It's the caller's responsibility to call Stop() to finish the connection. func (pf *PortForward) Init() error { log.Debugf("Starting port forward to %s %d:%d", pf.url, pf.localPort, pf.remotePort) failure := make(chan error, 1) go func() { if err := pf.run(); err != nil { failure <- err } }() // The `select` statement below depends on one of two outcomes from `pf.run()`: // 1) Succeed and block, causing a receive on `<-pf.readyCh` // 2) Return an err, causing a receive `<-failure` select { case <-pf.readyCh: log.Debug("Port forward initialised") case err := <-failure: log.Debugf("Port forward failed: %v", err) return err } return nil } // Stop terminates the port-forward connection. // It is the caller's responsibility to call Stop even in case of errors func (pf *PortForward) Stop() { close(pf.stopCh) } // GetStop returns the stopCh. // Receiving on stopCh will block until the port forwarding stops. func (pf *PortForward) GetStop() <-chan struct{} { return pf.stopCh } // URLFor returns the URL for the port-forward connection. func (pf *PortForward) URLFor(path string) string { strPort := strconv.Itoa(pf.localPort) urlAddress := net.JoinHostPort(pf.host, strPort) return fmt.Sprintf("http://%s%s", urlAddress, path) } // AddressAndPort returns the address and port for the port-forward connection. func (pf *PortForward) AddressAndPort() string { strPort := strconv.Itoa(pf.localPort) return net.JoinHostPort(pf.host, strPort) } // getEphemeralPort selects a port for the port-forwarding. It binds to a free // ephemeral port and returns the port number. func getEphemeralPort() (int, error) { ln, err := net.Listen("tcp", "localhost:0") if err != nil { return 0, err } defer ln.Close() // get port tcpAddr, ok := ln.Addr().(*net.TCPAddr) if !ok { return 0, fmt.Errorf("invalid listen address: %s", ln.Addr()) } return tcpAddr.Port, nil }