...

Source file src/k8s.io/kubernetes/pkg/kubelet/server/server_test.go

Documentation: k8s.io/kubernetes/pkg/kubelet/server

     1  /*
     2  Copyright 2014 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package server
    18  
    19  import (
    20  	"context"
    21  	"crypto/tls"
    22  	"errors"
    23  	"fmt"
    24  	"io"
    25  	"net"
    26  	"net/http"
    27  	"net/http/httptest"
    28  	"net/http/httputil"
    29  	"net/url"
    30  	"reflect"
    31  	"strconv"
    32  	"strings"
    33  	"testing"
    34  	"time"
    35  
    36  	cadvisorapi "github.com/google/cadvisor/info/v1"
    37  	cadvisorapiv2 "github.com/google/cadvisor/info/v2"
    38  	"github.com/stretchr/testify/assert"
    39  	"github.com/stretchr/testify/require"
    40  	oteltrace "go.opentelemetry.io/otel/trace"
    41  	v1 "k8s.io/api/core/v1"
    42  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    43  	"k8s.io/apimachinery/pkg/types"
    44  	"k8s.io/apimachinery/pkg/util/httpstream"
    45  	"k8s.io/apimachinery/pkg/util/httpstream/spdy"
    46  	"k8s.io/apiserver/pkg/authentication/authenticator"
    47  	"k8s.io/apiserver/pkg/authentication/user"
    48  	"k8s.io/apiserver/pkg/authorization/authorizer"
    49  	"k8s.io/client-go/tools/record"
    50  	"k8s.io/client-go/tools/remotecommand"
    51  	runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
    52  	statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
    53  	api "k8s.io/kubernetes/pkg/apis/core"
    54  	"k8s.io/utils/pointer"
    55  
    56  	// Do some initialization to decode the query parameters correctly.
    57  	utilfeature "k8s.io/apiserver/pkg/util/feature"
    58  	featuregatetesting "k8s.io/component-base/featuregate/testing"
    59  	"k8s.io/kubelet/pkg/cri/streaming"
    60  	"k8s.io/kubelet/pkg/cri/streaming/portforward"
    61  	remotecommandserver "k8s.io/kubelet/pkg/cri/streaming/remotecommand"
    62  	_ "k8s.io/kubernetes/pkg/apis/core/install"
    63  	"k8s.io/kubernetes/pkg/features"
    64  	kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config"
    65  	"k8s.io/kubernetes/pkg/kubelet/cm"
    66  	"k8s.io/kubernetes/pkg/kubelet/server/stats"
    67  	"k8s.io/kubernetes/pkg/volume"
    68  )
    69  
    70  const (
    71  	testUID          = "9b01b80f-8fb4-11e4-95ab-4200af06647"
    72  	testContainerID  = "container789"
    73  	testPodSandboxID = "pod0987"
    74  )
    75  
    76  type fakeKubelet struct {
    77  	podByNameFunc       func(namespace, name string) (*v1.Pod, bool)
    78  	machineInfoFunc     func() (*cadvisorapi.MachineInfo, error)
    79  	podsFunc            func() []*v1.Pod
    80  	runningPodsFunc     func(ctx context.Context) ([]*v1.Pod, error)
    81  	logFunc             func(w http.ResponseWriter, req *http.Request)
    82  	runFunc             func(podFullName string, uid types.UID, containerName string, cmd []string) ([]byte, error)
    83  	getExecCheck        func(string, types.UID, string, []string, remotecommandserver.Options)
    84  	getAttachCheck      func(string, types.UID, string, remotecommandserver.Options)
    85  	getPortForwardCheck func(string, string, types.UID, portforward.V4Options)
    86  
    87  	containerLogsFunc func(ctx context.Context, podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error
    88  	hostnameFunc      func() string
    89  	resyncInterval    time.Duration
    90  	loopEntryTime     time.Time
    91  	plegHealth        bool
    92  	streamingRuntime  streaming.Server
    93  }
    94  
    95  func (fk *fakeKubelet) ResyncInterval() time.Duration {
    96  	return fk.resyncInterval
    97  }
    98  
    99  func (fk *fakeKubelet) LatestLoopEntryTime() time.Time {
   100  	return fk.loopEntryTime
   101  }
   102  
   103  func (fk *fakeKubelet) GetPodByName(namespace, name string) (*v1.Pod, bool) {
   104  	return fk.podByNameFunc(namespace, name)
   105  }
   106  
   107  func (fk *fakeKubelet) GetRequestedContainersInfo(containerName string, options cadvisorapiv2.RequestOptions) (map[string]*cadvisorapi.ContainerInfo, error) {
   108  	return map[string]*cadvisorapi.ContainerInfo{}, nil
   109  }
   110  
   111  func (fk *fakeKubelet) GetCachedMachineInfo() (*cadvisorapi.MachineInfo, error) {
   112  	return fk.machineInfoFunc()
   113  }
   114  
   115  func (*fakeKubelet) GetVersionInfo() (*cadvisorapi.VersionInfo, error) {
   116  	return &cadvisorapi.VersionInfo{}, nil
   117  }
   118  
   119  func (fk *fakeKubelet) GetPods() []*v1.Pod {
   120  	return fk.podsFunc()
   121  }
   122  
   123  func (fk *fakeKubelet) GetRunningPods(ctx context.Context) ([]*v1.Pod, error) {
   124  	return fk.runningPodsFunc(ctx)
   125  }
   126  
   127  func (fk *fakeKubelet) ServeLogs(w http.ResponseWriter, req *http.Request) {
   128  	fk.logFunc(w, req)
   129  }
   130  
   131  func (fk *fakeKubelet) GetKubeletContainerLogs(ctx context.Context, podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error {
   132  	return fk.containerLogsFunc(ctx, podFullName, containerName, logOptions, stdout, stderr)
   133  }
   134  
   135  func (fk *fakeKubelet) GetHostname() string {
   136  	return fk.hostnameFunc()
   137  }
   138  
   139  func (fk *fakeKubelet) RunInContainer(_ context.Context, podFullName string, uid types.UID, containerName string, cmd []string) ([]byte, error) {
   140  	return fk.runFunc(podFullName, uid, containerName, cmd)
   141  }
   142  
   143  func (fk *fakeKubelet) CheckpointContainer(_ context.Context, podUID types.UID, podFullName, containerName string, options *runtimeapi.CheckpointContainerRequest) error {
   144  	if containerName == "checkpointingFailure" {
   145  		return fmt.Errorf("Returning error for test")
   146  	}
   147  	return nil
   148  }
   149  
   150  func (fk *fakeKubelet) ListMetricDescriptors(ctx context.Context) ([]*runtimeapi.MetricDescriptor, error) {
   151  	return nil, nil
   152  }
   153  
   154  func (fk *fakeKubelet) ListPodSandboxMetrics(ctx context.Context) ([]*runtimeapi.PodSandboxMetrics, error) {
   155  	return nil, nil
   156  }
   157  
   158  type fakeRuntime struct {
   159  	execFunc        func(string, []string, io.Reader, io.WriteCloser, io.WriteCloser, bool, <-chan remotecommand.TerminalSize) error
   160  	attachFunc      func(string, io.Reader, io.WriteCloser, io.WriteCloser, bool, <-chan remotecommand.TerminalSize) error
   161  	portForwardFunc func(string, int32, io.ReadWriteCloser) error
   162  }
   163  
   164  func (f *fakeRuntime) Exec(_ context.Context, containerID string, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error {
   165  	return f.execFunc(containerID, cmd, stdin, stdout, stderr, tty, resize)
   166  }
   167  
   168  func (f *fakeRuntime) Attach(_ context.Context, containerID string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error {
   169  	return f.attachFunc(containerID, stdin, stdout, stderr, tty, resize)
   170  }
   171  
   172  func (f *fakeRuntime) PortForward(_ context.Context, podSandboxID string, port int32, stream io.ReadWriteCloser) error {
   173  	return f.portForwardFunc(podSandboxID, port, stream)
   174  }
   175  
   176  type testStreamingServer struct {
   177  	streaming.Server
   178  	fakeRuntime    *fakeRuntime
   179  	testHTTPServer *httptest.Server
   180  }
   181  
   182  func newTestStreamingServer(streamIdleTimeout time.Duration) (s *testStreamingServer, err error) {
   183  	s = &testStreamingServer{}
   184  	s.testHTTPServer = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
   185  		s.ServeHTTP(w, r)
   186  	}))
   187  	defer func() {
   188  		if err != nil {
   189  			s.testHTTPServer.Close()
   190  		}
   191  	}()
   192  
   193  	testURL, err := url.Parse(s.testHTTPServer.URL)
   194  	if err != nil {
   195  		return nil, err
   196  	}
   197  
   198  	s.fakeRuntime = &fakeRuntime{}
   199  	config := streaming.DefaultConfig
   200  	config.BaseURL = testURL
   201  	if streamIdleTimeout != 0 {
   202  		config.StreamIdleTimeout = streamIdleTimeout
   203  	}
   204  	s.Server, err = streaming.NewServer(config, s.fakeRuntime)
   205  	if err != nil {
   206  		return nil, err
   207  	}
   208  	return s, nil
   209  }
   210  
   211  func (fk *fakeKubelet) GetExec(_ context.Context, podFullName string, podUID types.UID, containerName string, cmd []string, streamOpts remotecommandserver.Options) (*url.URL, error) {
   212  	if fk.getExecCheck != nil {
   213  		fk.getExecCheck(podFullName, podUID, containerName, cmd, streamOpts)
   214  	}
   215  	// Always use testContainerID
   216  	resp, err := fk.streamingRuntime.GetExec(&runtimeapi.ExecRequest{
   217  		ContainerId: testContainerID,
   218  		Cmd:         cmd,
   219  		Tty:         streamOpts.TTY,
   220  		Stdin:       streamOpts.Stdin,
   221  		Stdout:      streamOpts.Stdout,
   222  		Stderr:      streamOpts.Stderr,
   223  	})
   224  	if err != nil {
   225  		return nil, err
   226  	}
   227  	return url.Parse(resp.GetUrl())
   228  }
   229  
   230  func (fk *fakeKubelet) GetAttach(_ context.Context, podFullName string, podUID types.UID, containerName string, streamOpts remotecommandserver.Options) (*url.URL, error) {
   231  	if fk.getAttachCheck != nil {
   232  		fk.getAttachCheck(podFullName, podUID, containerName, streamOpts)
   233  	}
   234  	// Always use testContainerID
   235  	resp, err := fk.streamingRuntime.GetAttach(&runtimeapi.AttachRequest{
   236  		ContainerId: testContainerID,
   237  		Tty:         streamOpts.TTY,
   238  		Stdin:       streamOpts.Stdin,
   239  		Stdout:      streamOpts.Stdout,
   240  		Stderr:      streamOpts.Stderr,
   241  	})
   242  	if err != nil {
   243  		return nil, err
   244  	}
   245  	return url.Parse(resp.GetUrl())
   246  }
   247  
   248  func (fk *fakeKubelet) GetPortForward(ctx context.Context, podName, podNamespace string, podUID types.UID, portForwardOpts portforward.V4Options) (*url.URL, error) {
   249  	if fk.getPortForwardCheck != nil {
   250  		fk.getPortForwardCheck(podName, podNamespace, podUID, portForwardOpts)
   251  	}
   252  	// Always use testPodSandboxID
   253  	resp, err := fk.streamingRuntime.GetPortForward(&runtimeapi.PortForwardRequest{
   254  		PodSandboxId: testPodSandboxID,
   255  		Port:         portForwardOpts.Ports,
   256  	})
   257  	if err != nil {
   258  		return nil, err
   259  	}
   260  	return url.Parse(resp.GetUrl())
   261  }
   262  
   263  // Unused functions
   264  func (*fakeKubelet) GetNode() (*v1.Node, error)                       { return nil, nil }
   265  func (*fakeKubelet) GetNodeConfig() cm.NodeConfig                     { return cm.NodeConfig{} }
   266  func (*fakeKubelet) GetPodCgroupRoot() string                         { return "" }
   267  func (*fakeKubelet) GetPodByCgroupfs(cgroupfs string) (*v1.Pod, bool) { return nil, false }
   268  func (fk *fakeKubelet) ListVolumesForPod(podUID types.UID) (map[string]volume.Volume, bool) {
   269  	return map[string]volume.Volume{}, true
   270  }
   271  func (*fakeKubelet) ListBlockVolumesForPod(podUID types.UID) (map[string]volume.BlockVolume, bool) {
   272  	return map[string]volume.BlockVolume{}, true
   273  }
   274  func (*fakeKubelet) RootFsStats() (*statsapi.FsStats, error)                     { return nil, nil }
   275  func (*fakeKubelet) ListPodStats(_ context.Context) ([]statsapi.PodStats, error) { return nil, nil }
   276  func (*fakeKubelet) ListPodStatsAndUpdateCPUNanoCoreUsage(_ context.Context) ([]statsapi.PodStats, error) {
   277  	return nil, nil
   278  }
   279  func (*fakeKubelet) ListPodCPUAndMemoryStats(_ context.Context) ([]statsapi.PodStats, error) {
   280  	return nil, nil
   281  }
   282  func (*fakeKubelet) ImageFsStats(_ context.Context) (*statsapi.FsStats, *statsapi.FsStats, error) {
   283  	return nil, nil, nil
   284  }
   285  func (*fakeKubelet) RlimitStats() (*statsapi.RlimitStats, error) { return nil, nil }
   286  func (*fakeKubelet) GetCgroupStats(cgroupName string, updateStats bool) (*statsapi.ContainerStats, *statsapi.NetworkStats, error) {
   287  	return nil, nil, nil
   288  }
   289  func (*fakeKubelet) GetCgroupCPUAndMemoryStats(cgroupName string, updateStats bool) (*statsapi.ContainerStats, error) {
   290  	return nil, nil
   291  }
   292  
   293  type fakeAuth struct {
   294  	authenticateFunc func(*http.Request) (*authenticator.Response, bool, error)
   295  	attributesFunc   func(user.Info, *http.Request) authorizer.Attributes
   296  	authorizeFunc    func(authorizer.Attributes) (authorized authorizer.Decision, reason string, err error)
   297  }
   298  
   299  func (f *fakeAuth) AuthenticateRequest(req *http.Request) (*authenticator.Response, bool, error) {
   300  	return f.authenticateFunc(req)
   301  }
   302  func (f *fakeAuth) GetRequestAttributes(u user.Info, req *http.Request) authorizer.Attributes {
   303  	return f.attributesFunc(u, req)
   304  }
   305  func (f *fakeAuth) Authorize(ctx context.Context, a authorizer.Attributes) (authorized authorizer.Decision, reason string, err error) {
   306  	return f.authorizeFunc(a)
   307  }
   308  
   309  type serverTestFramework struct {
   310  	serverUnderTest *Server
   311  	fakeKubelet     *fakeKubelet
   312  	fakeAuth        *fakeAuth
   313  	testHTTPServer  *httptest.Server
   314  }
   315  
   316  func newServerTest() *serverTestFramework {
   317  	return newServerTestWithDebug(true, nil)
   318  }
   319  
   320  func newServerTestWithDebug(enableDebugging bool, streamingServer streaming.Server) *serverTestFramework {
   321  	kubeCfg := &kubeletconfiginternal.KubeletConfiguration{
   322  		EnableDebuggingHandlers: enableDebugging,
   323  		EnableSystemLogHandler:  enableDebugging,
   324  		EnableProfilingHandler:  enableDebugging,
   325  		EnableDebugFlagsHandler: enableDebugging,
   326  	}
   327  	return newServerTestWithDebuggingHandlers(kubeCfg, streamingServer)
   328  }
   329  
   330  func newServerTestWithDebuggingHandlers(kubeCfg *kubeletconfiginternal.KubeletConfiguration, streamingServer streaming.Server) *serverTestFramework {
   331  
   332  	fw := &serverTestFramework{}
   333  	fw.fakeKubelet = &fakeKubelet{
   334  		hostnameFunc: func() string {
   335  			return "127.0.0.1"
   336  		},
   337  		podByNameFunc: func(namespace, name string) (*v1.Pod, bool) {
   338  			return &v1.Pod{
   339  				ObjectMeta: metav1.ObjectMeta{
   340  					Namespace: namespace,
   341  					Name:      name,
   342  					UID:       testUID,
   343  				},
   344  			}, true
   345  		},
   346  		plegHealth:       true,
   347  		streamingRuntime: streamingServer,
   348  	}
   349  	fw.fakeAuth = &fakeAuth{
   350  		authenticateFunc: func(req *http.Request) (*authenticator.Response, bool, error) {
   351  			return &authenticator.Response{User: &user.DefaultInfo{Name: "test"}}, true, nil
   352  		},
   353  		attributesFunc: func(u user.Info, req *http.Request) authorizer.Attributes {
   354  			return &authorizer.AttributesRecord{User: u}
   355  		},
   356  		authorizeFunc: func(a authorizer.Attributes) (decision authorizer.Decision, reason string, err error) {
   357  			return authorizer.DecisionAllow, "", nil
   358  		},
   359  	}
   360  	server := NewServer(
   361  		fw.fakeKubelet,
   362  		stats.NewResourceAnalyzer(fw.fakeKubelet, time.Minute, &record.FakeRecorder{}),
   363  		fw.fakeAuth,
   364  		oteltrace.NewNoopTracerProvider(),
   365  		kubeCfg,
   366  	)
   367  	fw.serverUnderTest = &server
   368  	fw.testHTTPServer = httptest.NewServer(fw.serverUnderTest)
   369  	return fw
   370  }
   371  
   372  // A helper function to return the correct pod name.
   373  func getPodName(name, namespace string) string {
   374  	if namespace == "" {
   375  		namespace = metav1.NamespaceDefault
   376  	}
   377  	return name + "_" + namespace
   378  }
   379  
   380  func TestServeLogs(t *testing.T) {
   381  	fw := newServerTest()
   382  	defer fw.testHTTPServer.Close()
   383  
   384  	content := string(`<pre><a href="kubelet.log">kubelet.log</a><a href="google.log">google.log</a></pre>`)
   385  
   386  	fw.fakeKubelet.logFunc = func(w http.ResponseWriter, req *http.Request) {
   387  		w.WriteHeader(http.StatusOK)
   388  		w.Header().Add("Content-Type", "text/html")
   389  		w.Write([]byte(content))
   390  	}
   391  
   392  	resp, err := http.Get(fw.testHTTPServer.URL + "/logs/")
   393  	if err != nil {
   394  		t.Fatalf("Got error GETing: %v", err)
   395  	}
   396  	defer resp.Body.Close()
   397  
   398  	body, err := httputil.DumpResponse(resp, true)
   399  	if err != nil {
   400  		// copying the response body did not work
   401  		t.Errorf("Cannot copy resp: %#v", err)
   402  	}
   403  	result := string(body)
   404  	if !strings.Contains(result, "kubelet.log") || !strings.Contains(result, "google.log") {
   405  		t.Errorf("Received wrong data: %s", result)
   406  	}
   407  }
   408  
   409  func TestServeRunInContainer(t *testing.T) {
   410  	fw := newServerTest()
   411  	defer fw.testHTTPServer.Close()
   412  	output := "foo bar"
   413  	podNamespace := "other"
   414  	podName := "foo"
   415  	expectedPodName := getPodName(podName, podNamespace)
   416  	expectedContainerName := "baz"
   417  	expectedCommand := "ls -a"
   418  	fw.fakeKubelet.runFunc = func(podFullName string, uid types.UID, containerName string, cmd []string) ([]byte, error) {
   419  		if podFullName != expectedPodName {
   420  			t.Errorf("expected %s, got %s", expectedPodName, podFullName)
   421  		}
   422  		if containerName != expectedContainerName {
   423  			t.Errorf("expected %s, got %s", expectedContainerName, containerName)
   424  		}
   425  		if strings.Join(cmd, " ") != expectedCommand {
   426  			t.Errorf("expected: %s, got %v", expectedCommand, cmd)
   427  		}
   428  
   429  		return []byte(output), nil
   430  	}
   431  
   432  	resp, err := http.Post(fw.testHTTPServer.URL+"/run/"+podNamespace+"/"+podName+"/"+expectedContainerName+"?cmd=ls%20-a", "", nil)
   433  
   434  	if err != nil {
   435  		t.Fatalf("Got error POSTing: %v", err)
   436  	}
   437  	defer resp.Body.Close()
   438  
   439  	body, err := io.ReadAll(resp.Body)
   440  	if err != nil {
   441  		// copying the response body did not work
   442  		t.Errorf("Cannot copy resp: %#v", err)
   443  	}
   444  	result := string(body)
   445  	if result != output {
   446  		t.Errorf("expected %s, got %s", output, result)
   447  	}
   448  }
   449  
   450  func TestServeRunInContainerWithUID(t *testing.T) {
   451  	fw := newServerTest()
   452  	defer fw.testHTTPServer.Close()
   453  	output := "foo bar"
   454  	podNamespace := "other"
   455  	podName := "foo"
   456  	expectedPodName := getPodName(podName, podNamespace)
   457  	expectedContainerName := "baz"
   458  	expectedCommand := "ls -a"
   459  	fw.fakeKubelet.runFunc = func(podFullName string, uid types.UID, containerName string, cmd []string) ([]byte, error) {
   460  		if podFullName != expectedPodName {
   461  			t.Errorf("expected %s, got %s", expectedPodName, podFullName)
   462  		}
   463  		if string(uid) != testUID {
   464  			t.Errorf("expected %s, got %s", testUID, uid)
   465  		}
   466  		if containerName != expectedContainerName {
   467  			t.Errorf("expected %s, got %s", expectedContainerName, containerName)
   468  		}
   469  		if strings.Join(cmd, " ") != expectedCommand {
   470  			t.Errorf("expected: %s, got %v", expectedCommand, cmd)
   471  		}
   472  
   473  		return []byte(output), nil
   474  	}
   475  
   476  	resp, err := http.Post(fw.testHTTPServer.URL+"/run/"+podNamespace+"/"+podName+"/"+testUID+"/"+expectedContainerName+"?cmd=ls%20-a", "", nil)
   477  	if err != nil {
   478  		t.Fatalf("Got error POSTing: %v", err)
   479  	}
   480  	defer resp.Body.Close()
   481  
   482  	body, err := io.ReadAll(resp.Body)
   483  	if err != nil {
   484  		// copying the response body did not work
   485  		t.Errorf("Cannot copy resp: %#v", err)
   486  	}
   487  	result := string(body)
   488  	if result != output {
   489  		t.Errorf("expected %s, got %s", output, result)
   490  	}
   491  }
   492  
   493  func TestHealthCheck(t *testing.T) {
   494  	fw := newServerTest()
   495  	defer fw.testHTTPServer.Close()
   496  	fw.fakeKubelet.hostnameFunc = func() string {
   497  		return "127.0.0.1"
   498  	}
   499  
   500  	// Test with correct hostname, Docker version
   501  	assertHealthIsOk(t, fw.testHTTPServer.URL+"/healthz")
   502  
   503  	// Test with incorrect hostname
   504  	fw.fakeKubelet.hostnameFunc = func() string {
   505  		return "fake"
   506  	}
   507  	assertHealthIsOk(t, fw.testHTTPServer.URL+"/healthz")
   508  }
   509  
   510  func assertHealthFails(t *testing.T, httpURL string, expectedErrorCode int) {
   511  	resp, err := http.Get(httpURL)
   512  	if err != nil {
   513  		t.Fatalf("Got error GETing: %v", err)
   514  	}
   515  	defer resp.Body.Close()
   516  	if resp.StatusCode != expectedErrorCode {
   517  		t.Errorf("expected status code %d, got %d", expectedErrorCode, resp.StatusCode)
   518  	}
   519  }
   520  
   521  // Ensure all registered handlers & services have an associated testcase.
   522  func TestAuthzCoverage(t *testing.T) {
   523  	fw := newServerTest()
   524  	defer fw.testHTTPServer.Close()
   525  
   526  	// method:path -> has coverage
   527  	expectedCases := map[string]bool{}
   528  
   529  	// Test all the non-web-service handlers
   530  	for _, path := range fw.serverUnderTest.restfulCont.RegisteredHandlePaths() {
   531  		expectedCases["GET:"+path] = false
   532  		expectedCases["POST:"+path] = false
   533  	}
   534  
   535  	// Test all the generated web-service paths
   536  	for _, ws := range fw.serverUnderTest.restfulCont.RegisteredWebServices() {
   537  		for _, r := range ws.Routes() {
   538  			expectedCases[r.Method+":"+r.Path] = false
   539  		}
   540  	}
   541  
   542  	// This is a sanity check that the Handle->HandleWithFilter() delegation is working
   543  	// Ideally, these would move to registered web services and this list would get shorter
   544  	expectedPaths := []string{"/healthz", "/metrics", "/metrics/cadvisor"}
   545  	for _, expectedPath := range expectedPaths {
   546  		if _, expected := expectedCases["GET:"+expectedPath]; !expected {
   547  			t.Errorf("Expected registered handle path %s was missing", expectedPath)
   548  		}
   549  	}
   550  
   551  	for _, tc := range AuthzTestCases() {
   552  		expectedCases[tc.Method+":"+tc.Path] = true
   553  	}
   554  
   555  	for tc, found := range expectedCases {
   556  		if !found {
   557  			t.Errorf("Missing authz test case for %s", tc)
   558  		}
   559  	}
   560  }
   561  
   562  func TestAuthFilters(t *testing.T) {
   563  	// Enable features.ContainerCheckpoint during test
   564  	defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ContainerCheckpoint, true)()
   565  
   566  	fw := newServerTest()
   567  	defer fw.testHTTPServer.Close()
   568  
   569  	attributesGetter := NewNodeAuthorizerAttributesGetter(authzTestNodeName)
   570  
   571  	for _, tc := range AuthzTestCases() {
   572  		t.Run(tc.Method+":"+tc.Path, func(t *testing.T) {
   573  			var (
   574  				expectedUser = AuthzTestUser()
   575  
   576  				calledAuthenticate = false
   577  				calledAuthorize    = false
   578  				calledAttributes   = false
   579  			)
   580  
   581  			fw.fakeAuth.authenticateFunc = func(req *http.Request) (*authenticator.Response, bool, error) {
   582  				calledAuthenticate = true
   583  				return &authenticator.Response{User: expectedUser}, true, nil
   584  			}
   585  			fw.fakeAuth.attributesFunc = func(u user.Info, req *http.Request) authorizer.Attributes {
   586  				calledAttributes = true
   587  				require.Equal(t, expectedUser, u)
   588  				return attributesGetter.GetRequestAttributes(u, req)
   589  			}
   590  			fw.fakeAuth.authorizeFunc = func(a authorizer.Attributes) (decision authorizer.Decision, reason string, err error) {
   591  				calledAuthorize = true
   592  				tc.AssertAttributes(t, a)
   593  				return authorizer.DecisionNoOpinion, "", nil
   594  			}
   595  
   596  			req, err := http.NewRequest(tc.Method, fw.testHTTPServer.URL+tc.Path, nil)
   597  			require.NoError(t, err)
   598  
   599  			resp, err := http.DefaultClient.Do(req)
   600  			require.NoError(t, err)
   601  			defer resp.Body.Close()
   602  
   603  			assert.Equal(t, http.StatusForbidden, resp.StatusCode)
   604  			assert.True(t, calledAuthenticate, "Authenticate was not called")
   605  			assert.True(t, calledAttributes, "Attributes were not called")
   606  			assert.True(t, calledAuthorize, "Authorize was not called")
   607  		})
   608  	}
   609  }
   610  
   611  func TestAuthenticationError(t *testing.T) {
   612  	var (
   613  		expectedUser       = &user.DefaultInfo{Name: "test"}
   614  		expectedAttributes = &authorizer.AttributesRecord{User: expectedUser}
   615  
   616  		calledAuthenticate = false
   617  		calledAuthorize    = false
   618  		calledAttributes   = false
   619  	)
   620  
   621  	fw := newServerTest()
   622  	defer fw.testHTTPServer.Close()
   623  	fw.fakeAuth.authenticateFunc = func(req *http.Request) (*authenticator.Response, bool, error) {
   624  		calledAuthenticate = true
   625  		return &authenticator.Response{User: expectedUser}, true, nil
   626  	}
   627  	fw.fakeAuth.attributesFunc = func(u user.Info, req *http.Request) authorizer.Attributes {
   628  		calledAttributes = true
   629  		return expectedAttributes
   630  	}
   631  	fw.fakeAuth.authorizeFunc = func(a authorizer.Attributes) (decision authorizer.Decision, reason string, err error) {
   632  		calledAuthorize = true
   633  		return authorizer.DecisionNoOpinion, "", errors.New("Failed")
   634  	}
   635  
   636  	assertHealthFails(t, fw.testHTTPServer.URL+"/healthz", http.StatusInternalServerError)
   637  
   638  	if !calledAuthenticate {
   639  		t.Fatalf("Authenticate was not called")
   640  	}
   641  	if !calledAttributes {
   642  		t.Fatalf("Attributes was not called")
   643  	}
   644  	if !calledAuthorize {
   645  		t.Fatalf("Authorize was not called")
   646  	}
   647  }
   648  
   649  func TestAuthenticationFailure(t *testing.T) {
   650  	var (
   651  		expectedUser       = &user.DefaultInfo{Name: "test"}
   652  		expectedAttributes = &authorizer.AttributesRecord{User: expectedUser}
   653  
   654  		calledAuthenticate = false
   655  		calledAuthorize    = false
   656  		calledAttributes   = false
   657  	)
   658  
   659  	fw := newServerTest()
   660  	defer fw.testHTTPServer.Close()
   661  	fw.fakeAuth.authenticateFunc = func(req *http.Request) (*authenticator.Response, bool, error) {
   662  		calledAuthenticate = true
   663  		return nil, false, nil
   664  	}
   665  	fw.fakeAuth.attributesFunc = func(u user.Info, req *http.Request) authorizer.Attributes {
   666  		calledAttributes = true
   667  		return expectedAttributes
   668  	}
   669  	fw.fakeAuth.authorizeFunc = func(a authorizer.Attributes) (decision authorizer.Decision, reason string, err error) {
   670  		calledAuthorize = true
   671  		return authorizer.DecisionNoOpinion, "", nil
   672  	}
   673  
   674  	assertHealthFails(t, fw.testHTTPServer.URL+"/healthz", http.StatusUnauthorized)
   675  
   676  	if !calledAuthenticate {
   677  		t.Fatalf("Authenticate was not called")
   678  	}
   679  	if calledAttributes {
   680  		t.Fatalf("Attributes was called unexpectedly")
   681  	}
   682  	if calledAuthorize {
   683  		t.Fatalf("Authorize was called unexpectedly")
   684  	}
   685  }
   686  
   687  func TestAuthorizationSuccess(t *testing.T) {
   688  	var (
   689  		expectedUser       = &user.DefaultInfo{Name: "test"}
   690  		expectedAttributes = &authorizer.AttributesRecord{User: expectedUser}
   691  
   692  		calledAuthenticate = false
   693  		calledAuthorize    = false
   694  		calledAttributes   = false
   695  	)
   696  
   697  	fw := newServerTest()
   698  	defer fw.testHTTPServer.Close()
   699  	fw.fakeAuth.authenticateFunc = func(req *http.Request) (*authenticator.Response, bool, error) {
   700  		calledAuthenticate = true
   701  		return &authenticator.Response{User: expectedUser}, true, nil
   702  	}
   703  	fw.fakeAuth.attributesFunc = func(u user.Info, req *http.Request) authorizer.Attributes {
   704  		calledAttributes = true
   705  		return expectedAttributes
   706  	}
   707  	fw.fakeAuth.authorizeFunc = func(a authorizer.Attributes) (decision authorizer.Decision, reason string, err error) {
   708  		calledAuthorize = true
   709  		return authorizer.DecisionAllow, "", nil
   710  	}
   711  
   712  	assertHealthIsOk(t, fw.testHTTPServer.URL+"/healthz")
   713  
   714  	if !calledAuthenticate {
   715  		t.Fatalf("Authenticate was not called")
   716  	}
   717  	if !calledAttributes {
   718  		t.Fatalf("Attributes were not called")
   719  	}
   720  	if !calledAuthorize {
   721  		t.Fatalf("Authorize was not called")
   722  	}
   723  }
   724  
   725  func TestSyncLoopCheck(t *testing.T) {
   726  	fw := newServerTest()
   727  	defer fw.testHTTPServer.Close()
   728  	fw.fakeKubelet.hostnameFunc = func() string {
   729  		return "127.0.0.1"
   730  	}
   731  
   732  	fw.fakeKubelet.resyncInterval = time.Minute
   733  	fw.fakeKubelet.loopEntryTime = time.Now()
   734  
   735  	// Test with correct hostname, Docker version
   736  	assertHealthIsOk(t, fw.testHTTPServer.URL+"/healthz")
   737  
   738  	fw.fakeKubelet.loopEntryTime = time.Now().Add(time.Minute * -10)
   739  	assertHealthFails(t, fw.testHTTPServer.URL+"/healthz", http.StatusInternalServerError)
   740  }
   741  
   742  // returns http response status code from the HTTP GET
   743  func assertHealthIsOk(t *testing.T, httpURL string) {
   744  	resp, err := http.Get(httpURL)
   745  	if err != nil {
   746  		t.Fatalf("Got error GETing: %v", err)
   747  	}
   748  	defer resp.Body.Close()
   749  	if resp.StatusCode != http.StatusOK {
   750  		t.Errorf("expected status code %d, got %d", http.StatusOK, resp.StatusCode)
   751  	}
   752  	body, readErr := io.ReadAll(resp.Body)
   753  	if readErr != nil {
   754  		// copying the response body did not work
   755  		t.Fatalf("Cannot copy resp: %#v", readErr)
   756  	}
   757  	result := string(body)
   758  	if !strings.Contains(result, "ok") {
   759  		t.Errorf("expected body contains ok, got %s", result)
   760  	}
   761  }
   762  
   763  func setPodByNameFunc(fw *serverTestFramework, namespace, pod, container string) {
   764  	fw.fakeKubelet.podByNameFunc = func(namespace, name string) (*v1.Pod, bool) {
   765  		return &v1.Pod{
   766  			ObjectMeta: metav1.ObjectMeta{
   767  				Namespace: namespace,
   768  				Name:      pod,
   769  			},
   770  			Spec: v1.PodSpec{
   771  				Containers: []v1.Container{
   772  					{
   773  						Name: container,
   774  					},
   775  				},
   776  			},
   777  		}, true
   778  	}
   779  }
   780  
   781  func setGetContainerLogsFunc(fw *serverTestFramework, t *testing.T, expectedPodName, expectedContainerName string, expectedLogOptions *v1.PodLogOptions, output string) {
   782  	fw.fakeKubelet.containerLogsFunc = func(_ context.Context, podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error {
   783  		if podFullName != expectedPodName {
   784  			t.Errorf("expected %s, got %s", expectedPodName, podFullName)
   785  		}
   786  		if containerName != expectedContainerName {
   787  			t.Errorf("expected %s, got %s", expectedContainerName, containerName)
   788  		}
   789  		if !reflect.DeepEqual(expectedLogOptions, logOptions) {
   790  			t.Errorf("expected %#v, got %#v", expectedLogOptions, logOptions)
   791  		}
   792  
   793  		io.WriteString(stdout, output)
   794  		return nil
   795  	}
   796  }
   797  
   798  func TestContainerLogs(t *testing.T) {
   799  	fw := newServerTest()
   800  	defer fw.testHTTPServer.Close()
   801  
   802  	tests := map[string]struct {
   803  		query        string
   804  		podLogOption *v1.PodLogOptions
   805  	}{
   806  		"without tail":     {"", &v1.PodLogOptions{}},
   807  		"with tail":        {"?tailLines=5", &v1.PodLogOptions{TailLines: pointer.Int64(5)}},
   808  		"with legacy tail": {"?tail=5", &v1.PodLogOptions{TailLines: pointer.Int64(5)}},
   809  		"with tail all":    {"?tail=all", &v1.PodLogOptions{}},
   810  		"with follow":      {"?follow=1", &v1.PodLogOptions{Follow: true}},
   811  	}
   812  
   813  	for desc, test := range tests {
   814  		t.Run(desc, func(t *testing.T) {
   815  			output := "foo bar"
   816  			podNamespace := "other"
   817  			podName := "foo"
   818  			expectedPodName := getPodName(podName, podNamespace)
   819  			expectedContainerName := "baz"
   820  			setPodByNameFunc(fw, podNamespace, podName, expectedContainerName)
   821  			setGetContainerLogsFunc(fw, t, expectedPodName, expectedContainerName, test.podLogOption, output)
   822  			resp, err := http.Get(fw.testHTTPServer.URL + "/containerLogs/" + podNamespace + "/" + podName + "/" + expectedContainerName + test.query)
   823  			if err != nil {
   824  				t.Errorf("Got error GETing: %v", err)
   825  			}
   826  			defer resp.Body.Close()
   827  
   828  			body, err := io.ReadAll(resp.Body)
   829  			if err != nil {
   830  				t.Errorf("Error reading container logs: %v", err)
   831  			}
   832  			result := string(body)
   833  			if result != output {
   834  				t.Errorf("Expected: '%v', got: '%v'", output, result)
   835  			}
   836  		})
   837  	}
   838  }
   839  
   840  func TestContainerLogsWithInvalidTail(t *testing.T) {
   841  	fw := newServerTest()
   842  	defer fw.testHTTPServer.Close()
   843  	output := "foo bar"
   844  	podNamespace := "other"
   845  	podName := "foo"
   846  	expectedPodName := getPodName(podName, podNamespace)
   847  	expectedContainerName := "baz"
   848  	setPodByNameFunc(fw, podNamespace, podName, expectedContainerName)
   849  	setGetContainerLogsFunc(fw, t, expectedPodName, expectedContainerName, &v1.PodLogOptions{}, output)
   850  	resp, err := http.Get(fw.testHTTPServer.URL + "/containerLogs/" + podNamespace + "/" + podName + "/" + expectedContainerName + "?tail=-1")
   851  	if err != nil {
   852  		t.Errorf("Got error GETing: %v", err)
   853  	}
   854  	defer resp.Body.Close()
   855  	if resp.StatusCode != http.StatusUnprocessableEntity {
   856  		t.Errorf("Unexpected non-error reading container logs: %#v", resp)
   857  	}
   858  }
   859  
   860  func TestCheckpointContainer(t *testing.T) {
   861  	podNamespace := "other"
   862  	podName := "foo"
   863  	expectedContainerName := "baz"
   864  
   865  	setupTest := func(featureGate bool) *serverTestFramework {
   866  		// Enable features.ContainerCheckpoint during test
   867  		defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ContainerCheckpoint, featureGate)()
   868  
   869  		fw := newServerTest()
   870  		// GetPodByName() should always fail
   871  		fw.fakeKubelet.podByNameFunc = func(namespace, name string) (*v1.Pod, bool) {
   872  			return nil, false
   873  		}
   874  		return fw
   875  	}
   876  	fw := setupTest(true)
   877  	defer fw.testHTTPServer.Close()
   878  
   879  	t.Run("wrong pod namespace", func(t *testing.T) {
   880  		resp, err := http.Post(fw.testHTTPServer.URL+"/checkpoint/"+podNamespace+"/"+podName+"/"+expectedContainerName, "", nil)
   881  		if err != nil {
   882  			t.Errorf("Got error POSTing: %v", err)
   883  		}
   884  		defer resp.Body.Close()
   885  		if resp.StatusCode != http.StatusNotFound {
   886  			t.Errorf("Unexpected non-error checkpointing container: %#v", resp)
   887  		}
   888  	})
   889  	// let GetPodByName() return a result, but our container "wrongContainerName" is not part of the Pod
   890  	setPodByNameFunc(fw, podNamespace, podName, expectedContainerName)
   891  	t.Run("wrong container name", func(t *testing.T) {
   892  		resp, err := http.Post(fw.testHTTPServer.URL+"/checkpoint/"+podNamespace+"/"+podName+"/wrongContainerName", "", nil)
   893  		if err != nil {
   894  			t.Errorf("Got error POSTing: %v", err)
   895  		}
   896  		defer resp.Body.Close()
   897  		if resp.StatusCode != http.StatusNotFound {
   898  			t.Errorf("Unexpected non-error checkpointing container: %#v", resp)
   899  		}
   900  	})
   901  	// Now the checkpointing of the container fails
   902  	fw.fakeKubelet.podByNameFunc = func(namespace, name string) (*v1.Pod, bool) {
   903  		return &v1.Pod{
   904  			ObjectMeta: metav1.ObjectMeta{
   905  				Namespace: podNamespace,
   906  				Name:      podName,
   907  			},
   908  			Spec: v1.PodSpec{
   909  				Containers: []v1.Container{
   910  					{
   911  						Name: "checkpointingFailure",
   912  					},
   913  				},
   914  			},
   915  		}, true
   916  	}
   917  	t.Run("checkpointing fails", func(t *testing.T) {
   918  		resp, err := http.Post(fw.testHTTPServer.URL+"/checkpoint/"+podNamespace+"/"+podName+"/checkpointingFailure", "", nil)
   919  		if err != nil {
   920  			t.Errorf("Got error POSTing: %v", err)
   921  		}
   922  		defer resp.Body.Close()
   923  		assert.Equal(t, resp.StatusCode, 500)
   924  		body, _ := io.ReadAll(resp.Body)
   925  		assert.Equal(t, string(body), "checkpointing of other/foo/checkpointingFailure failed (Returning error for test)")
   926  	})
   927  	// Now test a successful checkpoint succeeds
   928  	setPodByNameFunc(fw, podNamespace, podName, expectedContainerName)
   929  	t.Run("checkpointing succeeds", func(t *testing.T) {
   930  		resp, err := http.Post(fw.testHTTPServer.URL+"/checkpoint/"+podNamespace+"/"+podName+"/"+expectedContainerName, "", nil)
   931  		if err != nil {
   932  			t.Errorf("Got error POSTing: %v", err)
   933  		}
   934  		assert.Equal(t, resp.StatusCode, 200)
   935  	})
   936  
   937  	// Now test for 404 if checkpointing support is explicitly disabled.
   938  	fw.testHTTPServer.Close()
   939  	fw = setupTest(false)
   940  	defer fw.testHTTPServer.Close()
   941  	setPodByNameFunc(fw, podNamespace, podName, expectedContainerName)
   942  	t.Run("checkpointing fails because disabled", func(t *testing.T) {
   943  		resp, err := http.Post(fw.testHTTPServer.URL+"/checkpoint/"+podNamespace+"/"+podName+"/"+expectedContainerName, "", nil)
   944  		if err != nil {
   945  			t.Errorf("Got error POSTing: %v", err)
   946  		}
   947  		assert.Equal(t, 404, resp.StatusCode)
   948  	})
   949  }
   950  
   951  func makeReq(t *testing.T, method, url, clientProtocol string) *http.Request {
   952  	req, err := http.NewRequest(method, url, nil)
   953  	if err != nil {
   954  		t.Fatalf("error creating request: %v", err)
   955  	}
   956  	req.Header.Set("Content-Type", "")
   957  	req.Header.Add("X-Stream-Protocol-Version", clientProtocol)
   958  	return req
   959  }
   960  
   961  func TestServeExecInContainerIdleTimeout(t *testing.T) {
   962  	ss, err := newTestStreamingServer(100 * time.Millisecond)
   963  	require.NoError(t, err)
   964  	defer ss.testHTTPServer.Close()
   965  	fw := newServerTestWithDebug(true, ss)
   966  	defer fw.testHTTPServer.Close()
   967  
   968  	podNamespace := "other"
   969  	podName := "foo"
   970  	expectedContainerName := "baz"
   971  
   972  	url := fw.testHTTPServer.URL + "/exec/" + podNamespace + "/" + podName + "/" + expectedContainerName + "?c=ls&c=-a&" + api.ExecStdinParam + "=1"
   973  
   974  	upgradeRoundTripper, err := spdy.NewRoundTripper(&tls.Config{})
   975  	if err != nil {
   976  		t.Fatalf("Error creating SpdyRoundTripper: %v", err)
   977  	}
   978  	c := &http.Client{Transport: upgradeRoundTripper}
   979  
   980  	resp, err := c.Do(makeReq(t, "POST", url, "v4.channel.k8s.io"))
   981  	if err != nil {
   982  		t.Fatalf("Got error POSTing: %v", err)
   983  	}
   984  	defer resp.Body.Close()
   985  
   986  	upgradeRoundTripper.Dialer = &net.Dialer{
   987  		Deadline: time.Now().Add(60 * time.Second),
   988  		Timeout:  60 * time.Second,
   989  	}
   990  	conn, err := upgradeRoundTripper.NewConnection(resp)
   991  	if err != nil {
   992  		t.Fatalf("Unexpected error creating streaming connection: %s", err)
   993  	}
   994  	if conn == nil {
   995  		t.Fatal("Unexpected nil connection")
   996  	}
   997  
   998  	<-conn.CloseChan()
   999  }
  1000  
  1001  func testExecAttach(t *testing.T, verb string) {
  1002  	tests := map[string]struct {
  1003  		stdin              bool
  1004  		stdout             bool
  1005  		stderr             bool
  1006  		tty                bool
  1007  		responseStatusCode int
  1008  		uid                bool
  1009  	}{
  1010  		"no input or output":           {responseStatusCode: http.StatusBadRequest},
  1011  		"stdin":                        {stdin: true, responseStatusCode: http.StatusSwitchingProtocols},
  1012  		"stdout":                       {stdout: true, responseStatusCode: http.StatusSwitchingProtocols},
  1013  		"stderr":                       {stderr: true, responseStatusCode: http.StatusSwitchingProtocols},
  1014  		"stdout and stderr":            {stdout: true, stderr: true, responseStatusCode: http.StatusSwitchingProtocols},
  1015  		"stdin stdout and stderr":      {stdin: true, stdout: true, stderr: true, responseStatusCode: http.StatusSwitchingProtocols},
  1016  		"stdin stdout stderr with uid": {stdin: true, stdout: true, stderr: true, responseStatusCode: http.StatusSwitchingProtocols, uid: true},
  1017  	}
  1018  
  1019  	for desc := range tests {
  1020  		test := tests[desc]
  1021  		t.Run(desc, func(t *testing.T) {
  1022  			ss, err := newTestStreamingServer(0)
  1023  			require.NoError(t, err)
  1024  			defer ss.testHTTPServer.Close()
  1025  			fw := newServerTestWithDebug(true, ss)
  1026  			defer fw.testHTTPServer.Close()
  1027  			fmt.Println(desc)
  1028  
  1029  			podNamespace := "other"
  1030  			podName := "foo"
  1031  			expectedPodName := getPodName(podName, podNamespace)
  1032  			expectedContainerName := "baz"
  1033  			expectedCommand := "ls -a"
  1034  			expectedStdin := "stdin"
  1035  			expectedStdout := "stdout"
  1036  			expectedStderr := "stderr"
  1037  			done := make(chan struct{})
  1038  			clientStdoutReadDone := make(chan struct{})
  1039  			clientStderrReadDone := make(chan struct{})
  1040  			execInvoked := false
  1041  			attachInvoked := false
  1042  
  1043  			checkStream := func(podFullName string, uid types.UID, containerName string, streamOpts remotecommandserver.Options) {
  1044  				assert.Equal(t, expectedPodName, podFullName, "podFullName")
  1045  				if test.uid {
  1046  					assert.Equal(t, testUID, string(uid), "uid")
  1047  				}
  1048  				assert.Equal(t, expectedContainerName, containerName, "containerName")
  1049  				assert.Equal(t, test.stdin, streamOpts.Stdin, "stdin")
  1050  				assert.Equal(t, test.stdout, streamOpts.Stdout, "stdout")
  1051  				assert.Equal(t, test.tty, streamOpts.TTY, "tty")
  1052  				assert.Equal(t, !test.tty && test.stderr, streamOpts.Stderr, "stderr")
  1053  			}
  1054  
  1055  			fw.fakeKubelet.getExecCheck = func(podFullName string, uid types.UID, containerName string, cmd []string, streamOpts remotecommandserver.Options) {
  1056  				execInvoked = true
  1057  				assert.Equal(t, expectedCommand, strings.Join(cmd, " "), "cmd")
  1058  				checkStream(podFullName, uid, containerName, streamOpts)
  1059  			}
  1060  
  1061  			fw.fakeKubelet.getAttachCheck = func(podFullName string, uid types.UID, containerName string, streamOpts remotecommandserver.Options) {
  1062  				attachInvoked = true
  1063  				checkStream(podFullName, uid, containerName, streamOpts)
  1064  			}
  1065  
  1066  			testStream := func(containerID string, in io.Reader, out, stderr io.WriteCloser, tty bool, done chan struct{}) error {
  1067  				close(done)
  1068  				assert.Equal(t, testContainerID, containerID, "containerID")
  1069  				assert.Equal(t, test.tty, tty, "tty")
  1070  				require.Equal(t, test.stdin, in != nil, "in")
  1071  				require.Equal(t, test.stdout, out != nil, "out")
  1072  				require.Equal(t, !test.tty && test.stderr, stderr != nil, "err")
  1073  
  1074  				if test.stdin {
  1075  					b := make([]byte, 10)
  1076  					n, err := in.Read(b)
  1077  					assert.NoError(t, err, "reading from stdin")
  1078  					assert.Equal(t, expectedStdin, string(b[0:n]), "content from stdin")
  1079  				}
  1080  
  1081  				if test.stdout {
  1082  					_, err := out.Write([]byte(expectedStdout))
  1083  					assert.NoError(t, err, "writing to stdout")
  1084  					out.Close()
  1085  					<-clientStdoutReadDone
  1086  				}
  1087  
  1088  				if !test.tty && test.stderr {
  1089  					_, err := stderr.Write([]byte(expectedStderr))
  1090  					assert.NoError(t, err, "writing to stderr")
  1091  					stderr.Close()
  1092  					<-clientStderrReadDone
  1093  				}
  1094  				return nil
  1095  			}
  1096  
  1097  			ss.fakeRuntime.execFunc = func(containerID string, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error {
  1098  				assert.Equal(t, expectedCommand, strings.Join(cmd, " "), "cmd")
  1099  				return testStream(containerID, stdin, stdout, stderr, tty, done)
  1100  			}
  1101  
  1102  			ss.fakeRuntime.attachFunc = func(containerID string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error {
  1103  				return testStream(containerID, stdin, stdout, stderr, tty, done)
  1104  			}
  1105  
  1106  			var url string
  1107  			if test.uid {
  1108  				url = fw.testHTTPServer.URL + "/" + verb + "/" + podNamespace + "/" + podName + "/" + testUID + "/" + expectedContainerName + "?ignore=1"
  1109  			} else {
  1110  				url = fw.testHTTPServer.URL + "/" + verb + "/" + podNamespace + "/" + podName + "/" + expectedContainerName + "?ignore=1"
  1111  			}
  1112  			if verb == "exec" {
  1113  				url += "&command=ls&command=-a"
  1114  			}
  1115  			if test.stdin {
  1116  				url += "&" + api.ExecStdinParam + "=1"
  1117  			}
  1118  			if test.stdout {
  1119  				url += "&" + api.ExecStdoutParam + "=1"
  1120  			}
  1121  			if test.stderr && !test.tty {
  1122  				url += "&" + api.ExecStderrParam + "=1"
  1123  			}
  1124  			if test.tty {
  1125  				url += "&" + api.ExecTTYParam + "=1"
  1126  			}
  1127  
  1128  			var (
  1129  				resp                *http.Response
  1130  				upgradeRoundTripper httpstream.UpgradeRoundTripper
  1131  				c                   *http.Client
  1132  			)
  1133  			upgradeRoundTripper, err = spdy.NewRoundTripper(&tls.Config{})
  1134  			if err != nil {
  1135  				t.Fatalf("Error creating SpdyRoundTripper: %v", err)
  1136  			}
  1137  			c = &http.Client{Transport: upgradeRoundTripper}
  1138  
  1139  			resp, err = c.Do(makeReq(t, "POST", url, "v4.channel.k8s.io"))
  1140  			require.NoError(t, err, "POSTing")
  1141  			defer resp.Body.Close()
  1142  
  1143  			_, err = io.ReadAll(resp.Body)
  1144  			assert.NoError(t, err, "reading response body")
  1145  
  1146  			require.Equal(t, test.responseStatusCode, resp.StatusCode, "response status")
  1147  			if test.responseStatusCode != http.StatusSwitchingProtocols {
  1148  				return
  1149  			}
  1150  
  1151  			conn, err := upgradeRoundTripper.NewConnection(resp)
  1152  			require.NoError(t, err, "creating streaming connection")
  1153  			defer conn.Close()
  1154  
  1155  			h := http.Header{}
  1156  			h.Set(api.StreamType, api.StreamTypeError)
  1157  			_, err = conn.CreateStream(h)
  1158  			require.NoError(t, err, "creating error stream")
  1159  
  1160  			if test.stdin {
  1161  				h.Set(api.StreamType, api.StreamTypeStdin)
  1162  				stream, err := conn.CreateStream(h)
  1163  				require.NoError(t, err, "creating stdin stream")
  1164  				_, err = stream.Write([]byte(expectedStdin))
  1165  				require.NoError(t, err, "writing to stdin stream")
  1166  			}
  1167  
  1168  			var stdoutStream httpstream.Stream
  1169  			if test.stdout {
  1170  				h.Set(api.StreamType, api.StreamTypeStdout)
  1171  				stdoutStream, err = conn.CreateStream(h)
  1172  				require.NoError(t, err, "creating stdout stream")
  1173  			}
  1174  
  1175  			var stderrStream httpstream.Stream
  1176  			if test.stderr && !test.tty {
  1177  				h.Set(api.StreamType, api.StreamTypeStderr)
  1178  				stderrStream, err = conn.CreateStream(h)
  1179  				require.NoError(t, err, "creating stderr stream")
  1180  			}
  1181  
  1182  			if test.stdout {
  1183  				output := make([]byte, 10)
  1184  				n, err := stdoutStream.Read(output)
  1185  				close(clientStdoutReadDone)
  1186  				assert.NoError(t, err, "reading from stdout stream")
  1187  				assert.Equal(t, expectedStdout, string(output[0:n]), "stdout")
  1188  			}
  1189  
  1190  			if test.stderr && !test.tty {
  1191  				output := make([]byte, 10)
  1192  				n, err := stderrStream.Read(output)
  1193  				close(clientStderrReadDone)
  1194  				assert.NoError(t, err, "reading from stderr stream")
  1195  				assert.Equal(t, expectedStderr, string(output[0:n]), "stderr")
  1196  			}
  1197  
  1198  			// wait for the server to finish before checking if the attach/exec funcs were invoked
  1199  			<-done
  1200  
  1201  			if verb == "exec" {
  1202  				assert.True(t, execInvoked, "exec should be invoked")
  1203  				assert.False(t, attachInvoked, "attach should not be invoked")
  1204  			} else {
  1205  				assert.True(t, attachInvoked, "attach should be invoked")
  1206  				assert.False(t, execInvoked, "exec should not be invoked")
  1207  			}
  1208  		})
  1209  	}
  1210  }
  1211  
  1212  func TestServeExecInContainer(t *testing.T) {
  1213  	testExecAttach(t, "exec")
  1214  }
  1215  
  1216  func TestServeAttachContainer(t *testing.T) {
  1217  	testExecAttach(t, "attach")
  1218  }
  1219  
  1220  func TestServePortForwardIdleTimeout(t *testing.T) {
  1221  	ss, err := newTestStreamingServer(100 * time.Millisecond)
  1222  	require.NoError(t, err)
  1223  	defer ss.testHTTPServer.Close()
  1224  	fw := newServerTestWithDebug(true, ss)
  1225  	defer fw.testHTTPServer.Close()
  1226  
  1227  	podNamespace := "other"
  1228  	podName := "foo"
  1229  
  1230  	url := fw.testHTTPServer.URL + "/portForward/" + podNamespace + "/" + podName
  1231  
  1232  	upgradeRoundTripper, err := spdy.NewRoundTripper(&tls.Config{})
  1233  	if err != nil {
  1234  		t.Fatalf("Error creating SpdyRoundTripper: %v", err)
  1235  	}
  1236  	c := &http.Client{Transport: upgradeRoundTripper}
  1237  
  1238  	req := makeReq(t, "POST", url, "portforward.k8s.io")
  1239  	resp, err := c.Do(req)
  1240  	if err != nil {
  1241  		t.Fatalf("Got error POSTing: %v", err)
  1242  	}
  1243  	defer resp.Body.Close()
  1244  
  1245  	conn, err := upgradeRoundTripper.NewConnection(resp)
  1246  	if err != nil {
  1247  		t.Fatalf("Unexpected error creating streaming connection: %s", err)
  1248  	}
  1249  	if conn == nil {
  1250  		t.Fatal("Unexpected nil connection")
  1251  	}
  1252  	defer conn.Close()
  1253  
  1254  	<-conn.CloseChan()
  1255  }
  1256  
  1257  func TestServePortForward(t *testing.T) {
  1258  	tests := map[string]struct {
  1259  		port          string
  1260  		uid           bool
  1261  		clientData    string
  1262  		containerData string
  1263  		shouldError   bool
  1264  	}{
  1265  		"no port":                       {port: "", shouldError: true},
  1266  		"none number port":              {port: "abc", shouldError: true},
  1267  		"negative port":                 {port: "-1", shouldError: true},
  1268  		"too large port":                {port: "65536", shouldError: true},
  1269  		"0 port":                        {port: "0", shouldError: true},
  1270  		"min port":                      {port: "1", shouldError: false},
  1271  		"normal port":                   {port: "8000", shouldError: false},
  1272  		"normal port with data forward": {port: "8000", clientData: "client data", containerData: "container data", shouldError: false},
  1273  		"max port":                      {port: "65535", shouldError: false},
  1274  		"normal port with uid":          {port: "8000", uid: true, shouldError: false},
  1275  	}
  1276  
  1277  	podNamespace := "other"
  1278  	podName := "foo"
  1279  
  1280  	for desc := range tests {
  1281  		test := tests[desc]
  1282  		t.Run(desc, func(t *testing.T) {
  1283  			ss, err := newTestStreamingServer(0)
  1284  			require.NoError(t, err)
  1285  			defer ss.testHTTPServer.Close()
  1286  			fw := newServerTestWithDebug(true, ss)
  1287  			defer fw.testHTTPServer.Close()
  1288  
  1289  			portForwardFuncDone := make(chan struct{})
  1290  
  1291  			fw.fakeKubelet.getPortForwardCheck = func(name, namespace string, uid types.UID, opts portforward.V4Options) {
  1292  				assert.Equal(t, podName, name, "pod name")
  1293  				assert.Equal(t, podNamespace, namespace, "pod namespace")
  1294  				if test.uid {
  1295  					assert.Equal(t, testUID, string(uid), "uid")
  1296  				}
  1297  			}
  1298  
  1299  			ss.fakeRuntime.portForwardFunc = func(podSandboxID string, port int32, stream io.ReadWriteCloser) error {
  1300  				defer close(portForwardFuncDone)
  1301  				assert.Equal(t, testPodSandboxID, podSandboxID, "pod sandbox id")
  1302  				// The port should be valid if it reaches here.
  1303  				testPort, err := strconv.ParseInt(test.port, 10, 32)
  1304  				require.NoError(t, err, "parse port")
  1305  				assert.Equal(t, int32(testPort), port, "port")
  1306  
  1307  				if test.clientData != "" {
  1308  					fromClient := make([]byte, 32)
  1309  					n, err := stream.Read(fromClient)
  1310  					assert.NoError(t, err, "reading client data")
  1311  					assert.Equal(t, test.clientData, string(fromClient[0:n]), "client data")
  1312  				}
  1313  
  1314  				if test.containerData != "" {
  1315  					_, err := stream.Write([]byte(test.containerData))
  1316  					assert.NoError(t, err, "writing container data")
  1317  				}
  1318  
  1319  				return nil
  1320  			}
  1321  
  1322  			var url string
  1323  			if test.uid {
  1324  				url = fmt.Sprintf("%s/portForward/%s/%s/%s", fw.testHTTPServer.URL, podNamespace, podName, testUID)
  1325  			} else {
  1326  				url = fmt.Sprintf("%s/portForward/%s/%s", fw.testHTTPServer.URL, podNamespace, podName)
  1327  			}
  1328  
  1329  			var (
  1330  				upgradeRoundTripper httpstream.UpgradeRoundTripper
  1331  				c                   *http.Client
  1332  			)
  1333  
  1334  			upgradeRoundTripper, err = spdy.NewRoundTripper(&tls.Config{})
  1335  			if err != nil {
  1336  				t.Fatalf("Error creating SpdyRoundTripper: %v", err)
  1337  			}
  1338  			c = &http.Client{Transport: upgradeRoundTripper}
  1339  
  1340  			req := makeReq(t, "POST", url, "portforward.k8s.io")
  1341  			resp, err := c.Do(req)
  1342  			require.NoError(t, err, "POSTing")
  1343  			defer resp.Body.Close()
  1344  
  1345  			assert.Equal(t, http.StatusSwitchingProtocols, resp.StatusCode, "status code")
  1346  
  1347  			conn, err := upgradeRoundTripper.NewConnection(resp)
  1348  			require.NoError(t, err, "creating streaming connection")
  1349  			defer conn.Close()
  1350  
  1351  			headers := http.Header{}
  1352  			headers.Set("streamType", "error")
  1353  			headers.Set("port", test.port)
  1354  			_, err = conn.CreateStream(headers)
  1355  			assert.Equal(t, test.shouldError, err != nil, "expect error")
  1356  
  1357  			if test.shouldError {
  1358  				return
  1359  			}
  1360  
  1361  			headers.Set("streamType", "data")
  1362  			headers.Set("port", test.port)
  1363  			dataStream, err := conn.CreateStream(headers)
  1364  			require.NoError(t, err, "create stream")
  1365  
  1366  			if test.clientData != "" {
  1367  				_, err := dataStream.Write([]byte(test.clientData))
  1368  				assert.NoError(t, err, "writing client data")
  1369  			}
  1370  
  1371  			if test.containerData != "" {
  1372  				fromContainer := make([]byte, 32)
  1373  				n, err := dataStream.Read(fromContainer)
  1374  				assert.NoError(t, err, "reading container data")
  1375  				assert.Equal(t, test.containerData, string(fromContainer[0:n]), "container data")
  1376  			}
  1377  
  1378  			<-portForwardFuncDone
  1379  		})
  1380  	}
  1381  }
  1382  
  1383  func TestMetricBuckets(t *testing.T) {
  1384  	tests := map[string]struct {
  1385  		url    string
  1386  		bucket string
  1387  	}{
  1388  		"healthz endpoint":                {url: "/healthz", bucket: "healthz"},
  1389  		"attach":                          {url: "/attach/podNamespace/podID/containerName", bucket: "attach"},
  1390  		"attach with uid":                 {url: "/attach/podNamespace/podID/uid/containerName", bucket: "attach"},
  1391  		"configz":                         {url: "/configz", bucket: "configz"},
  1392  		"containerLogs":                   {url: "/containerLogs/podNamespace/podID/containerName", bucket: "containerLogs"},
  1393  		"debug v flags":                   {url: "/debug/flags/v", bucket: "debug"},
  1394  		"pprof with sub":                  {url: "/debug/pprof/subpath", bucket: "debug"},
  1395  		"exec":                            {url: "/exec/podNamespace/podID/containerName", bucket: "exec"},
  1396  		"exec with uid":                   {url: "/exec/podNamespace/podID/uid/containerName", bucket: "exec"},
  1397  		"healthz":                         {url: "/healthz/", bucket: "healthz"},
  1398  		"healthz log sub":                 {url: "/healthz/log", bucket: "healthz"},
  1399  		"healthz ping":                    {url: "/healthz/ping", bucket: "healthz"},
  1400  		"healthz sync loop":               {url: "/healthz/syncloop", bucket: "healthz"},
  1401  		"logs":                            {url: "/logs/", bucket: "logs"},
  1402  		"logs with path":                  {url: "/logs/logpath", bucket: "logs"},
  1403  		"metrics":                         {url: "/metrics", bucket: "metrics"},
  1404  		"metrics cadvisor sub":            {url: "/metrics/cadvisor", bucket: "metrics/cadvisor"},
  1405  		"metrics probes sub":              {url: "/metrics/probes", bucket: "metrics/probes"},
  1406  		"metrics resource sub":            {url: "/metrics/resource", bucket: "metrics/resource"},
  1407  		"pods":                            {url: "/pods/", bucket: "pods"},
  1408  		"portForward":                     {url: "/portForward/podNamespace/podID", bucket: "portForward"},
  1409  		"portForward with uid":            {url: "/portForward/podNamespace/podID/uid", bucket: "portForward"},
  1410  		"run":                             {url: "/run/podNamespace/podID/containerName", bucket: "run"},
  1411  		"run with uid":                    {url: "/run/podNamespace/podID/uid/containerName", bucket: "run"},
  1412  		"runningpods":                     {url: "/runningpods/", bucket: "runningpods"},
  1413  		"stats":                           {url: "/stats/", bucket: "stats"},
  1414  		"stats summary sub":               {url: "/stats/summary", bucket: "stats"},
  1415  		"invalid path":                    {url: "/junk", bucket: "other"},
  1416  		"invalid path starting with good": {url: "/healthzjunk", bucket: "other"},
  1417  	}
  1418  	fw := newServerTest()
  1419  	defer fw.testHTTPServer.Close()
  1420  
  1421  	for _, test := range tests {
  1422  		path := test.url
  1423  		bucket := test.bucket
  1424  		require.Equal(t, fw.serverUnderTest.getMetricBucket(path), bucket)
  1425  	}
  1426  }
  1427  
  1428  func TestMetricMethodBuckets(t *testing.T) {
  1429  	tests := map[string]struct {
  1430  		method string
  1431  		bucket string
  1432  	}{
  1433  		"normal GET":     {method: "GET", bucket: "GET"},
  1434  		"normal POST":    {method: "POST", bucket: "POST"},
  1435  		"invalid method": {method: "WEIRD", bucket: "other"},
  1436  	}
  1437  
  1438  	fw := newServerTest()
  1439  	defer fw.testHTTPServer.Close()
  1440  
  1441  	for _, test := range tests {
  1442  		method := test.method
  1443  		bucket := test.bucket
  1444  		require.Equal(t, fw.serverUnderTest.getMetricMethodBucket(method), bucket)
  1445  	}
  1446  }
  1447  
  1448  func TestDebuggingDisabledHandlers(t *testing.T) {
  1449  	// for backward compatibility even if enablesystemLogHandler or enableProfilingHandler is set but not
  1450  	// enableDebuggingHandler then /logs, /pprof and /flags shouldn't be served.
  1451  	kubeCfg := &kubeletconfiginternal.KubeletConfiguration{
  1452  		EnableDebuggingHandlers: false,
  1453  		EnableSystemLogHandler:  true,
  1454  		EnableDebugFlagsHandler: true,
  1455  		EnableProfilingHandler:  true,
  1456  	}
  1457  	fw := newServerTestWithDebuggingHandlers(kubeCfg, nil)
  1458  	defer fw.testHTTPServer.Close()
  1459  
  1460  	paths := []string{
  1461  		"/run", "/exec", "/attach", "/portForward", "/containerLogs", "/runningpods",
  1462  		"/run/", "/exec/", "/attach/", "/portForward/", "/containerLogs/", "/runningpods/",
  1463  		"/run/xxx", "/exec/xxx", "/attach/xxx", "/debug/pprof/profile", "/logs/kubelet.log",
  1464  	}
  1465  
  1466  	for _, p := range paths {
  1467  		verifyEndpointResponse(t, fw, p, "Debug endpoints are disabled.\n")
  1468  	}
  1469  }
  1470  
  1471  func TestDisablingLogAndProfilingHandler(t *testing.T) {
  1472  	kubeCfg := &kubeletconfiginternal.KubeletConfiguration{
  1473  		EnableDebuggingHandlers: true,
  1474  	}
  1475  	fw := newServerTestWithDebuggingHandlers(kubeCfg, nil)
  1476  	defer fw.testHTTPServer.Close()
  1477  
  1478  	// verify debug endpoints are disabled
  1479  	verifyEndpointResponse(t, fw, "/logs/kubelet.log", "logs endpoint is disabled.\n")
  1480  	verifyEndpointResponse(t, fw, "/debug/pprof/profile?seconds=2", "profiling endpoint is disabled.\n")
  1481  	verifyEndpointResponse(t, fw, "/debug/flags/v", "flags endpoint is disabled.\n")
  1482  }
  1483  
  1484  func TestFailedParseParamsSummaryHandler(t *testing.T) {
  1485  	fw := newServerTest()
  1486  	defer fw.testHTTPServer.Close()
  1487  
  1488  	resp, err := http.Post(fw.testHTTPServer.URL+"/stats/summary", "invalid/content/type", nil)
  1489  	assert.NoError(t, err)
  1490  	defer resp.Body.Close()
  1491  	v, err := io.ReadAll(resp.Body)
  1492  	assert.NoError(t, err)
  1493  	assert.Equal(t, http.StatusInternalServerError, resp.StatusCode)
  1494  	assert.Contains(t, string(v), "parse form failed")
  1495  }
  1496  
  1497  func verifyEndpointResponse(t *testing.T, fw *serverTestFramework, path string, expectedResponse string) {
  1498  	resp, err := http.Get(fw.testHTTPServer.URL + path)
  1499  	require.NoError(t, err)
  1500  	assert.Equal(t, http.StatusMethodNotAllowed, resp.StatusCode)
  1501  	body, err := io.ReadAll(resp.Body)
  1502  	require.NoError(t, err)
  1503  	assert.Equal(t, expectedResponse, string(body))
  1504  
  1505  	resp, err = http.Post(fw.testHTTPServer.URL+path, "", nil)
  1506  	require.NoError(t, err)
  1507  	assert.Equal(t, http.StatusMethodNotAllowed, resp.StatusCode)
  1508  	body, err = io.ReadAll(resp.Body)
  1509  	require.NoError(t, err)
  1510  	assert.Equal(t, expectedResponse, string(body))
  1511  }
  1512  
  1513  func TestTrimURLPath(t *testing.T) {
  1514  	tests := []struct {
  1515  		path, expected string
  1516  	}{
  1517  		{"", ""},
  1518  		{"//", ""},
  1519  		{"/pods", "pods"},
  1520  		{"pods", "pods"},
  1521  		{"pods/", "pods"},
  1522  		{"good/", "good"},
  1523  		{"pods/probes", "pods"},
  1524  		{"metrics", "metrics"},
  1525  		{"metrics/resource", "metrics/resource"},
  1526  		{"metrics/hello", "metrics/hello"},
  1527  	}
  1528  
  1529  	for _, test := range tests {
  1530  		assert.Equal(t, test.expected, getURLRootPath(test.path), fmt.Sprintf("path is: %s", test.path))
  1531  	}
  1532  }
  1533  

View as plain text