1
16
17 package server
18
19 import (
20 "context"
21 "crypto/tls"
22 "errors"
23 "fmt"
24 "io"
25 "net"
26 "net/http"
27 "net/http/httptest"
28 "net/http/httputil"
29 "net/url"
30 "reflect"
31 "strconv"
32 "strings"
33 "testing"
34 "time"
35
36 cadvisorapi "github.com/google/cadvisor/info/v1"
37 cadvisorapiv2 "github.com/google/cadvisor/info/v2"
38 "github.com/stretchr/testify/assert"
39 "github.com/stretchr/testify/require"
40 oteltrace "go.opentelemetry.io/otel/trace"
41 v1 "k8s.io/api/core/v1"
42 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
43 "k8s.io/apimachinery/pkg/types"
44 "k8s.io/apimachinery/pkg/util/httpstream"
45 "k8s.io/apimachinery/pkg/util/httpstream/spdy"
46 "k8s.io/apiserver/pkg/authentication/authenticator"
47 "k8s.io/apiserver/pkg/authentication/user"
48 "k8s.io/apiserver/pkg/authorization/authorizer"
49 "k8s.io/client-go/tools/record"
50 "k8s.io/client-go/tools/remotecommand"
51 runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
52 statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
53 api "k8s.io/kubernetes/pkg/apis/core"
54 "k8s.io/utils/pointer"
55
56
57 utilfeature "k8s.io/apiserver/pkg/util/feature"
58 featuregatetesting "k8s.io/component-base/featuregate/testing"
59 "k8s.io/kubelet/pkg/cri/streaming"
60 "k8s.io/kubelet/pkg/cri/streaming/portforward"
61 remotecommandserver "k8s.io/kubelet/pkg/cri/streaming/remotecommand"
62 _ "k8s.io/kubernetes/pkg/apis/core/install"
63 "k8s.io/kubernetes/pkg/features"
64 kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config"
65 "k8s.io/kubernetes/pkg/kubelet/cm"
66 "k8s.io/kubernetes/pkg/kubelet/server/stats"
67 "k8s.io/kubernetes/pkg/volume"
68 )
69
70 const (
71 testUID = "9b01b80f-8fb4-11e4-95ab-4200af06647"
72 testContainerID = "container789"
73 testPodSandboxID = "pod0987"
74 )
75
76 type fakeKubelet struct {
77 podByNameFunc func(namespace, name string) (*v1.Pod, bool)
78 machineInfoFunc func() (*cadvisorapi.MachineInfo, error)
79 podsFunc func() []*v1.Pod
80 runningPodsFunc func(ctx context.Context) ([]*v1.Pod, error)
81 logFunc func(w http.ResponseWriter, req *http.Request)
82 runFunc func(podFullName string, uid types.UID, containerName string, cmd []string) ([]byte, error)
83 getExecCheck func(string, types.UID, string, []string, remotecommandserver.Options)
84 getAttachCheck func(string, types.UID, string, remotecommandserver.Options)
85 getPortForwardCheck func(string, string, types.UID, portforward.V4Options)
86
87 containerLogsFunc func(ctx context.Context, podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error
88 hostnameFunc func() string
89 resyncInterval time.Duration
90 loopEntryTime time.Time
91 plegHealth bool
92 streamingRuntime streaming.Server
93 }
94
95 func (fk *fakeKubelet) ResyncInterval() time.Duration {
96 return fk.resyncInterval
97 }
98
99 func (fk *fakeKubelet) LatestLoopEntryTime() time.Time {
100 return fk.loopEntryTime
101 }
102
103 func (fk *fakeKubelet) GetPodByName(namespace, name string) (*v1.Pod, bool) {
104 return fk.podByNameFunc(namespace, name)
105 }
106
107 func (fk *fakeKubelet) GetRequestedContainersInfo(containerName string, options cadvisorapiv2.RequestOptions) (map[string]*cadvisorapi.ContainerInfo, error) {
108 return map[string]*cadvisorapi.ContainerInfo{}, nil
109 }
110
111 func (fk *fakeKubelet) GetCachedMachineInfo() (*cadvisorapi.MachineInfo, error) {
112 return fk.machineInfoFunc()
113 }
114
115 func (*fakeKubelet) GetVersionInfo() (*cadvisorapi.VersionInfo, error) {
116 return &cadvisorapi.VersionInfo{}, nil
117 }
118
119 func (fk *fakeKubelet) GetPods() []*v1.Pod {
120 return fk.podsFunc()
121 }
122
123 func (fk *fakeKubelet) GetRunningPods(ctx context.Context) ([]*v1.Pod, error) {
124 return fk.runningPodsFunc(ctx)
125 }
126
127 func (fk *fakeKubelet) ServeLogs(w http.ResponseWriter, req *http.Request) {
128 fk.logFunc(w, req)
129 }
130
131 func (fk *fakeKubelet) GetKubeletContainerLogs(ctx context.Context, podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error {
132 return fk.containerLogsFunc(ctx, podFullName, containerName, logOptions, stdout, stderr)
133 }
134
135 func (fk *fakeKubelet) GetHostname() string {
136 return fk.hostnameFunc()
137 }
138
139 func (fk *fakeKubelet) RunInContainer(_ context.Context, podFullName string, uid types.UID, containerName string, cmd []string) ([]byte, error) {
140 return fk.runFunc(podFullName, uid, containerName, cmd)
141 }
142
143 func (fk *fakeKubelet) CheckpointContainer(_ context.Context, podUID types.UID, podFullName, containerName string, options *runtimeapi.CheckpointContainerRequest) error {
144 if containerName == "checkpointingFailure" {
145 return fmt.Errorf("Returning error for test")
146 }
147 return nil
148 }
149
150 func (fk *fakeKubelet) ListMetricDescriptors(ctx context.Context) ([]*runtimeapi.MetricDescriptor, error) {
151 return nil, nil
152 }
153
154 func (fk *fakeKubelet) ListPodSandboxMetrics(ctx context.Context) ([]*runtimeapi.PodSandboxMetrics, error) {
155 return nil, nil
156 }
157
158 type fakeRuntime struct {
159 execFunc func(string, []string, io.Reader, io.WriteCloser, io.WriteCloser, bool, <-chan remotecommand.TerminalSize) error
160 attachFunc func(string, io.Reader, io.WriteCloser, io.WriteCloser, bool, <-chan remotecommand.TerminalSize) error
161 portForwardFunc func(string, int32, io.ReadWriteCloser) error
162 }
163
164 func (f *fakeRuntime) Exec(_ context.Context, containerID string, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error {
165 return f.execFunc(containerID, cmd, stdin, stdout, stderr, tty, resize)
166 }
167
168 func (f *fakeRuntime) Attach(_ context.Context, containerID string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error {
169 return f.attachFunc(containerID, stdin, stdout, stderr, tty, resize)
170 }
171
172 func (f *fakeRuntime) PortForward(_ context.Context, podSandboxID string, port int32, stream io.ReadWriteCloser) error {
173 return f.portForwardFunc(podSandboxID, port, stream)
174 }
175
176 type testStreamingServer struct {
177 streaming.Server
178 fakeRuntime *fakeRuntime
179 testHTTPServer *httptest.Server
180 }
181
182 func newTestStreamingServer(streamIdleTimeout time.Duration) (s *testStreamingServer, err error) {
183 s = &testStreamingServer{}
184 s.testHTTPServer = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
185 s.ServeHTTP(w, r)
186 }))
187 defer func() {
188 if err != nil {
189 s.testHTTPServer.Close()
190 }
191 }()
192
193 testURL, err := url.Parse(s.testHTTPServer.URL)
194 if err != nil {
195 return nil, err
196 }
197
198 s.fakeRuntime = &fakeRuntime{}
199 config := streaming.DefaultConfig
200 config.BaseURL = testURL
201 if streamIdleTimeout != 0 {
202 config.StreamIdleTimeout = streamIdleTimeout
203 }
204 s.Server, err = streaming.NewServer(config, s.fakeRuntime)
205 if err != nil {
206 return nil, err
207 }
208 return s, nil
209 }
210
211 func (fk *fakeKubelet) GetExec(_ context.Context, podFullName string, podUID types.UID, containerName string, cmd []string, streamOpts remotecommandserver.Options) (*url.URL, error) {
212 if fk.getExecCheck != nil {
213 fk.getExecCheck(podFullName, podUID, containerName, cmd, streamOpts)
214 }
215
216 resp, err := fk.streamingRuntime.GetExec(&runtimeapi.ExecRequest{
217 ContainerId: testContainerID,
218 Cmd: cmd,
219 Tty: streamOpts.TTY,
220 Stdin: streamOpts.Stdin,
221 Stdout: streamOpts.Stdout,
222 Stderr: streamOpts.Stderr,
223 })
224 if err != nil {
225 return nil, err
226 }
227 return url.Parse(resp.GetUrl())
228 }
229
230 func (fk *fakeKubelet) GetAttach(_ context.Context, podFullName string, podUID types.UID, containerName string, streamOpts remotecommandserver.Options) (*url.URL, error) {
231 if fk.getAttachCheck != nil {
232 fk.getAttachCheck(podFullName, podUID, containerName, streamOpts)
233 }
234
235 resp, err := fk.streamingRuntime.GetAttach(&runtimeapi.AttachRequest{
236 ContainerId: testContainerID,
237 Tty: streamOpts.TTY,
238 Stdin: streamOpts.Stdin,
239 Stdout: streamOpts.Stdout,
240 Stderr: streamOpts.Stderr,
241 })
242 if err != nil {
243 return nil, err
244 }
245 return url.Parse(resp.GetUrl())
246 }
247
248 func (fk *fakeKubelet) GetPortForward(ctx context.Context, podName, podNamespace string, podUID types.UID, portForwardOpts portforward.V4Options) (*url.URL, error) {
249 if fk.getPortForwardCheck != nil {
250 fk.getPortForwardCheck(podName, podNamespace, podUID, portForwardOpts)
251 }
252
253 resp, err := fk.streamingRuntime.GetPortForward(&runtimeapi.PortForwardRequest{
254 PodSandboxId: testPodSandboxID,
255 Port: portForwardOpts.Ports,
256 })
257 if err != nil {
258 return nil, err
259 }
260 return url.Parse(resp.GetUrl())
261 }
262
263
264 func (*fakeKubelet) GetNode() (*v1.Node, error) { return nil, nil }
265 func (*fakeKubelet) GetNodeConfig() cm.NodeConfig { return cm.NodeConfig{} }
266 func (*fakeKubelet) GetPodCgroupRoot() string { return "" }
267 func (*fakeKubelet) GetPodByCgroupfs(cgroupfs string) (*v1.Pod, bool) { return nil, false }
268 func (fk *fakeKubelet) ListVolumesForPod(podUID types.UID) (map[string]volume.Volume, bool) {
269 return map[string]volume.Volume{}, true
270 }
271 func (*fakeKubelet) ListBlockVolumesForPod(podUID types.UID) (map[string]volume.BlockVolume, bool) {
272 return map[string]volume.BlockVolume{}, true
273 }
274 func (*fakeKubelet) RootFsStats() (*statsapi.FsStats, error) { return nil, nil }
275 func (*fakeKubelet) ListPodStats(_ context.Context) ([]statsapi.PodStats, error) { return nil, nil }
276 func (*fakeKubelet) ListPodStatsAndUpdateCPUNanoCoreUsage(_ context.Context) ([]statsapi.PodStats, error) {
277 return nil, nil
278 }
279 func (*fakeKubelet) ListPodCPUAndMemoryStats(_ context.Context) ([]statsapi.PodStats, error) {
280 return nil, nil
281 }
282 func (*fakeKubelet) ImageFsStats(_ context.Context) (*statsapi.FsStats, *statsapi.FsStats, error) {
283 return nil, nil, nil
284 }
285 func (*fakeKubelet) RlimitStats() (*statsapi.RlimitStats, error) { return nil, nil }
286 func (*fakeKubelet) GetCgroupStats(cgroupName string, updateStats bool) (*statsapi.ContainerStats, *statsapi.NetworkStats, error) {
287 return nil, nil, nil
288 }
289 func (*fakeKubelet) GetCgroupCPUAndMemoryStats(cgroupName string, updateStats bool) (*statsapi.ContainerStats, error) {
290 return nil, nil
291 }
292
293 type fakeAuth struct {
294 authenticateFunc func(*http.Request) (*authenticator.Response, bool, error)
295 attributesFunc func(user.Info, *http.Request) authorizer.Attributes
296 authorizeFunc func(authorizer.Attributes) (authorized authorizer.Decision, reason string, err error)
297 }
298
299 func (f *fakeAuth) AuthenticateRequest(req *http.Request) (*authenticator.Response, bool, error) {
300 return f.authenticateFunc(req)
301 }
302 func (f *fakeAuth) GetRequestAttributes(u user.Info, req *http.Request) authorizer.Attributes {
303 return f.attributesFunc(u, req)
304 }
305 func (f *fakeAuth) Authorize(ctx context.Context, a authorizer.Attributes) (authorized authorizer.Decision, reason string, err error) {
306 return f.authorizeFunc(a)
307 }
308
309 type serverTestFramework struct {
310 serverUnderTest *Server
311 fakeKubelet *fakeKubelet
312 fakeAuth *fakeAuth
313 testHTTPServer *httptest.Server
314 }
315
316 func newServerTest() *serverTestFramework {
317 return newServerTestWithDebug(true, nil)
318 }
319
320 func newServerTestWithDebug(enableDebugging bool, streamingServer streaming.Server) *serverTestFramework {
321 kubeCfg := &kubeletconfiginternal.KubeletConfiguration{
322 EnableDebuggingHandlers: enableDebugging,
323 EnableSystemLogHandler: enableDebugging,
324 EnableProfilingHandler: enableDebugging,
325 EnableDebugFlagsHandler: enableDebugging,
326 }
327 return newServerTestWithDebuggingHandlers(kubeCfg, streamingServer)
328 }
329
330 func newServerTestWithDebuggingHandlers(kubeCfg *kubeletconfiginternal.KubeletConfiguration, streamingServer streaming.Server) *serverTestFramework {
331
332 fw := &serverTestFramework{}
333 fw.fakeKubelet = &fakeKubelet{
334 hostnameFunc: func() string {
335 return "127.0.0.1"
336 },
337 podByNameFunc: func(namespace, name string) (*v1.Pod, bool) {
338 return &v1.Pod{
339 ObjectMeta: metav1.ObjectMeta{
340 Namespace: namespace,
341 Name: name,
342 UID: testUID,
343 },
344 }, true
345 },
346 plegHealth: true,
347 streamingRuntime: streamingServer,
348 }
349 fw.fakeAuth = &fakeAuth{
350 authenticateFunc: func(req *http.Request) (*authenticator.Response, bool, error) {
351 return &authenticator.Response{User: &user.DefaultInfo{Name: "test"}}, true, nil
352 },
353 attributesFunc: func(u user.Info, req *http.Request) authorizer.Attributes {
354 return &authorizer.AttributesRecord{User: u}
355 },
356 authorizeFunc: func(a authorizer.Attributes) (decision authorizer.Decision, reason string, err error) {
357 return authorizer.DecisionAllow, "", nil
358 },
359 }
360 server := NewServer(
361 fw.fakeKubelet,
362 stats.NewResourceAnalyzer(fw.fakeKubelet, time.Minute, &record.FakeRecorder{}),
363 fw.fakeAuth,
364 oteltrace.NewNoopTracerProvider(),
365 kubeCfg,
366 )
367 fw.serverUnderTest = &server
368 fw.testHTTPServer = httptest.NewServer(fw.serverUnderTest)
369 return fw
370 }
371
372
373 func getPodName(name, namespace string) string {
374 if namespace == "" {
375 namespace = metav1.NamespaceDefault
376 }
377 return name + "_" + namespace
378 }
379
380 func TestServeLogs(t *testing.T) {
381 fw := newServerTest()
382 defer fw.testHTTPServer.Close()
383
384 content := string(`<pre><a href="kubelet.log">kubelet.log</a><a href="google.log">google.log</a></pre>`)
385
386 fw.fakeKubelet.logFunc = func(w http.ResponseWriter, req *http.Request) {
387 w.WriteHeader(http.StatusOK)
388 w.Header().Add("Content-Type", "text/html")
389 w.Write([]byte(content))
390 }
391
392 resp, err := http.Get(fw.testHTTPServer.URL + "/logs/")
393 if err != nil {
394 t.Fatalf("Got error GETing: %v", err)
395 }
396 defer resp.Body.Close()
397
398 body, err := httputil.DumpResponse(resp, true)
399 if err != nil {
400
401 t.Errorf("Cannot copy resp: %#v", err)
402 }
403 result := string(body)
404 if !strings.Contains(result, "kubelet.log") || !strings.Contains(result, "google.log") {
405 t.Errorf("Received wrong data: %s", result)
406 }
407 }
408
409 func TestServeRunInContainer(t *testing.T) {
410 fw := newServerTest()
411 defer fw.testHTTPServer.Close()
412 output := "foo bar"
413 podNamespace := "other"
414 podName := "foo"
415 expectedPodName := getPodName(podName, podNamespace)
416 expectedContainerName := "baz"
417 expectedCommand := "ls -a"
418 fw.fakeKubelet.runFunc = func(podFullName string, uid types.UID, containerName string, cmd []string) ([]byte, error) {
419 if podFullName != expectedPodName {
420 t.Errorf("expected %s, got %s", expectedPodName, podFullName)
421 }
422 if containerName != expectedContainerName {
423 t.Errorf("expected %s, got %s", expectedContainerName, containerName)
424 }
425 if strings.Join(cmd, " ") != expectedCommand {
426 t.Errorf("expected: %s, got %v", expectedCommand, cmd)
427 }
428
429 return []byte(output), nil
430 }
431
432 resp, err := http.Post(fw.testHTTPServer.URL+"/run/"+podNamespace+"/"+podName+"/"+expectedContainerName+"?cmd=ls%20-a", "", nil)
433
434 if err != nil {
435 t.Fatalf("Got error POSTing: %v", err)
436 }
437 defer resp.Body.Close()
438
439 body, err := io.ReadAll(resp.Body)
440 if err != nil {
441
442 t.Errorf("Cannot copy resp: %#v", err)
443 }
444 result := string(body)
445 if result != output {
446 t.Errorf("expected %s, got %s", output, result)
447 }
448 }
449
450 func TestServeRunInContainerWithUID(t *testing.T) {
451 fw := newServerTest()
452 defer fw.testHTTPServer.Close()
453 output := "foo bar"
454 podNamespace := "other"
455 podName := "foo"
456 expectedPodName := getPodName(podName, podNamespace)
457 expectedContainerName := "baz"
458 expectedCommand := "ls -a"
459 fw.fakeKubelet.runFunc = func(podFullName string, uid types.UID, containerName string, cmd []string) ([]byte, error) {
460 if podFullName != expectedPodName {
461 t.Errorf("expected %s, got %s", expectedPodName, podFullName)
462 }
463 if string(uid) != testUID {
464 t.Errorf("expected %s, got %s", testUID, uid)
465 }
466 if containerName != expectedContainerName {
467 t.Errorf("expected %s, got %s", expectedContainerName, containerName)
468 }
469 if strings.Join(cmd, " ") != expectedCommand {
470 t.Errorf("expected: %s, got %v", expectedCommand, cmd)
471 }
472
473 return []byte(output), nil
474 }
475
476 resp, err := http.Post(fw.testHTTPServer.URL+"/run/"+podNamespace+"/"+podName+"/"+testUID+"/"+expectedContainerName+"?cmd=ls%20-a", "", nil)
477 if err != nil {
478 t.Fatalf("Got error POSTing: %v", err)
479 }
480 defer resp.Body.Close()
481
482 body, err := io.ReadAll(resp.Body)
483 if err != nil {
484
485 t.Errorf("Cannot copy resp: %#v", err)
486 }
487 result := string(body)
488 if result != output {
489 t.Errorf("expected %s, got %s", output, result)
490 }
491 }
492
493 func TestHealthCheck(t *testing.T) {
494 fw := newServerTest()
495 defer fw.testHTTPServer.Close()
496 fw.fakeKubelet.hostnameFunc = func() string {
497 return "127.0.0.1"
498 }
499
500
501 assertHealthIsOk(t, fw.testHTTPServer.URL+"/healthz")
502
503
504 fw.fakeKubelet.hostnameFunc = func() string {
505 return "fake"
506 }
507 assertHealthIsOk(t, fw.testHTTPServer.URL+"/healthz")
508 }
509
510 func assertHealthFails(t *testing.T, httpURL string, expectedErrorCode int) {
511 resp, err := http.Get(httpURL)
512 if err != nil {
513 t.Fatalf("Got error GETing: %v", err)
514 }
515 defer resp.Body.Close()
516 if resp.StatusCode != expectedErrorCode {
517 t.Errorf("expected status code %d, got %d", expectedErrorCode, resp.StatusCode)
518 }
519 }
520
521
522 func TestAuthzCoverage(t *testing.T) {
523 fw := newServerTest()
524 defer fw.testHTTPServer.Close()
525
526
527 expectedCases := map[string]bool{}
528
529
530 for _, path := range fw.serverUnderTest.restfulCont.RegisteredHandlePaths() {
531 expectedCases["GET:"+path] = false
532 expectedCases["POST:"+path] = false
533 }
534
535
536 for _, ws := range fw.serverUnderTest.restfulCont.RegisteredWebServices() {
537 for _, r := range ws.Routes() {
538 expectedCases[r.Method+":"+r.Path] = false
539 }
540 }
541
542
543
544 expectedPaths := []string{"/healthz", "/metrics", "/metrics/cadvisor"}
545 for _, expectedPath := range expectedPaths {
546 if _, expected := expectedCases["GET:"+expectedPath]; !expected {
547 t.Errorf("Expected registered handle path %s was missing", expectedPath)
548 }
549 }
550
551 for _, tc := range AuthzTestCases() {
552 expectedCases[tc.Method+":"+tc.Path] = true
553 }
554
555 for tc, found := range expectedCases {
556 if !found {
557 t.Errorf("Missing authz test case for %s", tc)
558 }
559 }
560 }
561
562 func TestAuthFilters(t *testing.T) {
563
564 defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ContainerCheckpoint, true)()
565
566 fw := newServerTest()
567 defer fw.testHTTPServer.Close()
568
569 attributesGetter := NewNodeAuthorizerAttributesGetter(authzTestNodeName)
570
571 for _, tc := range AuthzTestCases() {
572 t.Run(tc.Method+":"+tc.Path, func(t *testing.T) {
573 var (
574 expectedUser = AuthzTestUser()
575
576 calledAuthenticate = false
577 calledAuthorize = false
578 calledAttributes = false
579 )
580
581 fw.fakeAuth.authenticateFunc = func(req *http.Request) (*authenticator.Response, bool, error) {
582 calledAuthenticate = true
583 return &authenticator.Response{User: expectedUser}, true, nil
584 }
585 fw.fakeAuth.attributesFunc = func(u user.Info, req *http.Request) authorizer.Attributes {
586 calledAttributes = true
587 require.Equal(t, expectedUser, u)
588 return attributesGetter.GetRequestAttributes(u, req)
589 }
590 fw.fakeAuth.authorizeFunc = func(a authorizer.Attributes) (decision authorizer.Decision, reason string, err error) {
591 calledAuthorize = true
592 tc.AssertAttributes(t, a)
593 return authorizer.DecisionNoOpinion, "", nil
594 }
595
596 req, err := http.NewRequest(tc.Method, fw.testHTTPServer.URL+tc.Path, nil)
597 require.NoError(t, err)
598
599 resp, err := http.DefaultClient.Do(req)
600 require.NoError(t, err)
601 defer resp.Body.Close()
602
603 assert.Equal(t, http.StatusForbidden, resp.StatusCode)
604 assert.True(t, calledAuthenticate, "Authenticate was not called")
605 assert.True(t, calledAttributes, "Attributes were not called")
606 assert.True(t, calledAuthorize, "Authorize was not called")
607 })
608 }
609 }
610
611 func TestAuthenticationError(t *testing.T) {
612 var (
613 expectedUser = &user.DefaultInfo{Name: "test"}
614 expectedAttributes = &authorizer.AttributesRecord{User: expectedUser}
615
616 calledAuthenticate = false
617 calledAuthorize = false
618 calledAttributes = false
619 )
620
621 fw := newServerTest()
622 defer fw.testHTTPServer.Close()
623 fw.fakeAuth.authenticateFunc = func(req *http.Request) (*authenticator.Response, bool, error) {
624 calledAuthenticate = true
625 return &authenticator.Response{User: expectedUser}, true, nil
626 }
627 fw.fakeAuth.attributesFunc = func(u user.Info, req *http.Request) authorizer.Attributes {
628 calledAttributes = true
629 return expectedAttributes
630 }
631 fw.fakeAuth.authorizeFunc = func(a authorizer.Attributes) (decision authorizer.Decision, reason string, err error) {
632 calledAuthorize = true
633 return authorizer.DecisionNoOpinion, "", errors.New("Failed")
634 }
635
636 assertHealthFails(t, fw.testHTTPServer.URL+"/healthz", http.StatusInternalServerError)
637
638 if !calledAuthenticate {
639 t.Fatalf("Authenticate was not called")
640 }
641 if !calledAttributes {
642 t.Fatalf("Attributes was not called")
643 }
644 if !calledAuthorize {
645 t.Fatalf("Authorize was not called")
646 }
647 }
648
649 func TestAuthenticationFailure(t *testing.T) {
650 var (
651 expectedUser = &user.DefaultInfo{Name: "test"}
652 expectedAttributes = &authorizer.AttributesRecord{User: expectedUser}
653
654 calledAuthenticate = false
655 calledAuthorize = false
656 calledAttributes = false
657 )
658
659 fw := newServerTest()
660 defer fw.testHTTPServer.Close()
661 fw.fakeAuth.authenticateFunc = func(req *http.Request) (*authenticator.Response, bool, error) {
662 calledAuthenticate = true
663 return nil, false, nil
664 }
665 fw.fakeAuth.attributesFunc = func(u user.Info, req *http.Request) authorizer.Attributes {
666 calledAttributes = true
667 return expectedAttributes
668 }
669 fw.fakeAuth.authorizeFunc = func(a authorizer.Attributes) (decision authorizer.Decision, reason string, err error) {
670 calledAuthorize = true
671 return authorizer.DecisionNoOpinion, "", nil
672 }
673
674 assertHealthFails(t, fw.testHTTPServer.URL+"/healthz", http.StatusUnauthorized)
675
676 if !calledAuthenticate {
677 t.Fatalf("Authenticate was not called")
678 }
679 if calledAttributes {
680 t.Fatalf("Attributes was called unexpectedly")
681 }
682 if calledAuthorize {
683 t.Fatalf("Authorize was called unexpectedly")
684 }
685 }
686
687 func TestAuthorizationSuccess(t *testing.T) {
688 var (
689 expectedUser = &user.DefaultInfo{Name: "test"}
690 expectedAttributes = &authorizer.AttributesRecord{User: expectedUser}
691
692 calledAuthenticate = false
693 calledAuthorize = false
694 calledAttributes = false
695 )
696
697 fw := newServerTest()
698 defer fw.testHTTPServer.Close()
699 fw.fakeAuth.authenticateFunc = func(req *http.Request) (*authenticator.Response, bool, error) {
700 calledAuthenticate = true
701 return &authenticator.Response{User: expectedUser}, true, nil
702 }
703 fw.fakeAuth.attributesFunc = func(u user.Info, req *http.Request) authorizer.Attributes {
704 calledAttributes = true
705 return expectedAttributes
706 }
707 fw.fakeAuth.authorizeFunc = func(a authorizer.Attributes) (decision authorizer.Decision, reason string, err error) {
708 calledAuthorize = true
709 return authorizer.DecisionAllow, "", nil
710 }
711
712 assertHealthIsOk(t, fw.testHTTPServer.URL+"/healthz")
713
714 if !calledAuthenticate {
715 t.Fatalf("Authenticate was not called")
716 }
717 if !calledAttributes {
718 t.Fatalf("Attributes were not called")
719 }
720 if !calledAuthorize {
721 t.Fatalf("Authorize was not called")
722 }
723 }
724
725 func TestSyncLoopCheck(t *testing.T) {
726 fw := newServerTest()
727 defer fw.testHTTPServer.Close()
728 fw.fakeKubelet.hostnameFunc = func() string {
729 return "127.0.0.1"
730 }
731
732 fw.fakeKubelet.resyncInterval = time.Minute
733 fw.fakeKubelet.loopEntryTime = time.Now()
734
735
736 assertHealthIsOk(t, fw.testHTTPServer.URL+"/healthz")
737
738 fw.fakeKubelet.loopEntryTime = time.Now().Add(time.Minute * -10)
739 assertHealthFails(t, fw.testHTTPServer.URL+"/healthz", http.StatusInternalServerError)
740 }
741
742
743 func assertHealthIsOk(t *testing.T, httpURL string) {
744 resp, err := http.Get(httpURL)
745 if err != nil {
746 t.Fatalf("Got error GETing: %v", err)
747 }
748 defer resp.Body.Close()
749 if resp.StatusCode != http.StatusOK {
750 t.Errorf("expected status code %d, got %d", http.StatusOK, resp.StatusCode)
751 }
752 body, readErr := io.ReadAll(resp.Body)
753 if readErr != nil {
754
755 t.Fatalf("Cannot copy resp: %#v", readErr)
756 }
757 result := string(body)
758 if !strings.Contains(result, "ok") {
759 t.Errorf("expected body contains ok, got %s", result)
760 }
761 }
762
763 func setPodByNameFunc(fw *serverTestFramework, namespace, pod, container string) {
764 fw.fakeKubelet.podByNameFunc = func(namespace, name string) (*v1.Pod, bool) {
765 return &v1.Pod{
766 ObjectMeta: metav1.ObjectMeta{
767 Namespace: namespace,
768 Name: pod,
769 },
770 Spec: v1.PodSpec{
771 Containers: []v1.Container{
772 {
773 Name: container,
774 },
775 },
776 },
777 }, true
778 }
779 }
780
781 func setGetContainerLogsFunc(fw *serverTestFramework, t *testing.T, expectedPodName, expectedContainerName string, expectedLogOptions *v1.PodLogOptions, output string) {
782 fw.fakeKubelet.containerLogsFunc = func(_ context.Context, podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error {
783 if podFullName != expectedPodName {
784 t.Errorf("expected %s, got %s", expectedPodName, podFullName)
785 }
786 if containerName != expectedContainerName {
787 t.Errorf("expected %s, got %s", expectedContainerName, containerName)
788 }
789 if !reflect.DeepEqual(expectedLogOptions, logOptions) {
790 t.Errorf("expected %#v, got %#v", expectedLogOptions, logOptions)
791 }
792
793 io.WriteString(stdout, output)
794 return nil
795 }
796 }
797
798 func TestContainerLogs(t *testing.T) {
799 fw := newServerTest()
800 defer fw.testHTTPServer.Close()
801
802 tests := map[string]struct {
803 query string
804 podLogOption *v1.PodLogOptions
805 }{
806 "without tail": {"", &v1.PodLogOptions{}},
807 "with tail": {"?tailLines=5", &v1.PodLogOptions{TailLines: pointer.Int64(5)}},
808 "with legacy tail": {"?tail=5", &v1.PodLogOptions{TailLines: pointer.Int64(5)}},
809 "with tail all": {"?tail=all", &v1.PodLogOptions{}},
810 "with follow": {"?follow=1", &v1.PodLogOptions{Follow: true}},
811 }
812
813 for desc, test := range tests {
814 t.Run(desc, func(t *testing.T) {
815 output := "foo bar"
816 podNamespace := "other"
817 podName := "foo"
818 expectedPodName := getPodName(podName, podNamespace)
819 expectedContainerName := "baz"
820 setPodByNameFunc(fw, podNamespace, podName, expectedContainerName)
821 setGetContainerLogsFunc(fw, t, expectedPodName, expectedContainerName, test.podLogOption, output)
822 resp, err := http.Get(fw.testHTTPServer.URL + "/containerLogs/" + podNamespace + "/" + podName + "/" + expectedContainerName + test.query)
823 if err != nil {
824 t.Errorf("Got error GETing: %v", err)
825 }
826 defer resp.Body.Close()
827
828 body, err := io.ReadAll(resp.Body)
829 if err != nil {
830 t.Errorf("Error reading container logs: %v", err)
831 }
832 result := string(body)
833 if result != output {
834 t.Errorf("Expected: '%v', got: '%v'", output, result)
835 }
836 })
837 }
838 }
839
840 func TestContainerLogsWithInvalidTail(t *testing.T) {
841 fw := newServerTest()
842 defer fw.testHTTPServer.Close()
843 output := "foo bar"
844 podNamespace := "other"
845 podName := "foo"
846 expectedPodName := getPodName(podName, podNamespace)
847 expectedContainerName := "baz"
848 setPodByNameFunc(fw, podNamespace, podName, expectedContainerName)
849 setGetContainerLogsFunc(fw, t, expectedPodName, expectedContainerName, &v1.PodLogOptions{}, output)
850 resp, err := http.Get(fw.testHTTPServer.URL + "/containerLogs/" + podNamespace + "/" + podName + "/" + expectedContainerName + "?tail=-1")
851 if err != nil {
852 t.Errorf("Got error GETing: %v", err)
853 }
854 defer resp.Body.Close()
855 if resp.StatusCode != http.StatusUnprocessableEntity {
856 t.Errorf("Unexpected non-error reading container logs: %#v", resp)
857 }
858 }
859
860 func TestCheckpointContainer(t *testing.T) {
861 podNamespace := "other"
862 podName := "foo"
863 expectedContainerName := "baz"
864
865 setupTest := func(featureGate bool) *serverTestFramework {
866
867 defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ContainerCheckpoint, featureGate)()
868
869 fw := newServerTest()
870
871 fw.fakeKubelet.podByNameFunc = func(namespace, name string) (*v1.Pod, bool) {
872 return nil, false
873 }
874 return fw
875 }
876 fw := setupTest(true)
877 defer fw.testHTTPServer.Close()
878
879 t.Run("wrong pod namespace", func(t *testing.T) {
880 resp, err := http.Post(fw.testHTTPServer.URL+"/checkpoint/"+podNamespace+"/"+podName+"/"+expectedContainerName, "", nil)
881 if err != nil {
882 t.Errorf("Got error POSTing: %v", err)
883 }
884 defer resp.Body.Close()
885 if resp.StatusCode != http.StatusNotFound {
886 t.Errorf("Unexpected non-error checkpointing container: %#v", resp)
887 }
888 })
889
890 setPodByNameFunc(fw, podNamespace, podName, expectedContainerName)
891 t.Run("wrong container name", func(t *testing.T) {
892 resp, err := http.Post(fw.testHTTPServer.URL+"/checkpoint/"+podNamespace+"/"+podName+"/wrongContainerName", "", nil)
893 if err != nil {
894 t.Errorf("Got error POSTing: %v", err)
895 }
896 defer resp.Body.Close()
897 if resp.StatusCode != http.StatusNotFound {
898 t.Errorf("Unexpected non-error checkpointing container: %#v", resp)
899 }
900 })
901
902 fw.fakeKubelet.podByNameFunc = func(namespace, name string) (*v1.Pod, bool) {
903 return &v1.Pod{
904 ObjectMeta: metav1.ObjectMeta{
905 Namespace: podNamespace,
906 Name: podName,
907 },
908 Spec: v1.PodSpec{
909 Containers: []v1.Container{
910 {
911 Name: "checkpointingFailure",
912 },
913 },
914 },
915 }, true
916 }
917 t.Run("checkpointing fails", func(t *testing.T) {
918 resp, err := http.Post(fw.testHTTPServer.URL+"/checkpoint/"+podNamespace+"/"+podName+"/checkpointingFailure", "", nil)
919 if err != nil {
920 t.Errorf("Got error POSTing: %v", err)
921 }
922 defer resp.Body.Close()
923 assert.Equal(t, resp.StatusCode, 500)
924 body, _ := io.ReadAll(resp.Body)
925 assert.Equal(t, string(body), "checkpointing of other/foo/checkpointingFailure failed (Returning error for test)")
926 })
927
928 setPodByNameFunc(fw, podNamespace, podName, expectedContainerName)
929 t.Run("checkpointing succeeds", func(t *testing.T) {
930 resp, err := http.Post(fw.testHTTPServer.URL+"/checkpoint/"+podNamespace+"/"+podName+"/"+expectedContainerName, "", nil)
931 if err != nil {
932 t.Errorf("Got error POSTing: %v", err)
933 }
934 assert.Equal(t, resp.StatusCode, 200)
935 })
936
937
938 fw.testHTTPServer.Close()
939 fw = setupTest(false)
940 defer fw.testHTTPServer.Close()
941 setPodByNameFunc(fw, podNamespace, podName, expectedContainerName)
942 t.Run("checkpointing fails because disabled", func(t *testing.T) {
943 resp, err := http.Post(fw.testHTTPServer.URL+"/checkpoint/"+podNamespace+"/"+podName+"/"+expectedContainerName, "", nil)
944 if err != nil {
945 t.Errorf("Got error POSTing: %v", err)
946 }
947 assert.Equal(t, 404, resp.StatusCode)
948 })
949 }
950
951 func makeReq(t *testing.T, method, url, clientProtocol string) *http.Request {
952 req, err := http.NewRequest(method, url, nil)
953 if err != nil {
954 t.Fatalf("error creating request: %v", err)
955 }
956 req.Header.Set("Content-Type", "")
957 req.Header.Add("X-Stream-Protocol-Version", clientProtocol)
958 return req
959 }
960
961 func TestServeExecInContainerIdleTimeout(t *testing.T) {
962 ss, err := newTestStreamingServer(100 * time.Millisecond)
963 require.NoError(t, err)
964 defer ss.testHTTPServer.Close()
965 fw := newServerTestWithDebug(true, ss)
966 defer fw.testHTTPServer.Close()
967
968 podNamespace := "other"
969 podName := "foo"
970 expectedContainerName := "baz"
971
972 url := fw.testHTTPServer.URL + "/exec/" + podNamespace + "/" + podName + "/" + expectedContainerName + "?c=ls&c=-a&" + api.ExecStdinParam + "=1"
973
974 upgradeRoundTripper, err := spdy.NewRoundTripper(&tls.Config{})
975 if err != nil {
976 t.Fatalf("Error creating SpdyRoundTripper: %v", err)
977 }
978 c := &http.Client{Transport: upgradeRoundTripper}
979
980 resp, err := c.Do(makeReq(t, "POST", url, "v4.channel.k8s.io"))
981 if err != nil {
982 t.Fatalf("Got error POSTing: %v", err)
983 }
984 defer resp.Body.Close()
985
986 upgradeRoundTripper.Dialer = &net.Dialer{
987 Deadline: time.Now().Add(60 * time.Second),
988 Timeout: 60 * time.Second,
989 }
990 conn, err := upgradeRoundTripper.NewConnection(resp)
991 if err != nil {
992 t.Fatalf("Unexpected error creating streaming connection: %s", err)
993 }
994 if conn == nil {
995 t.Fatal("Unexpected nil connection")
996 }
997
998 <-conn.CloseChan()
999 }
1000
1001 func testExecAttach(t *testing.T, verb string) {
1002 tests := map[string]struct {
1003 stdin bool
1004 stdout bool
1005 stderr bool
1006 tty bool
1007 responseStatusCode int
1008 uid bool
1009 }{
1010 "no input or output": {responseStatusCode: http.StatusBadRequest},
1011 "stdin": {stdin: true, responseStatusCode: http.StatusSwitchingProtocols},
1012 "stdout": {stdout: true, responseStatusCode: http.StatusSwitchingProtocols},
1013 "stderr": {stderr: true, responseStatusCode: http.StatusSwitchingProtocols},
1014 "stdout and stderr": {stdout: true, stderr: true, responseStatusCode: http.StatusSwitchingProtocols},
1015 "stdin stdout and stderr": {stdin: true, stdout: true, stderr: true, responseStatusCode: http.StatusSwitchingProtocols},
1016 "stdin stdout stderr with uid": {stdin: true, stdout: true, stderr: true, responseStatusCode: http.StatusSwitchingProtocols, uid: true},
1017 }
1018
1019 for desc := range tests {
1020 test := tests[desc]
1021 t.Run(desc, func(t *testing.T) {
1022 ss, err := newTestStreamingServer(0)
1023 require.NoError(t, err)
1024 defer ss.testHTTPServer.Close()
1025 fw := newServerTestWithDebug(true, ss)
1026 defer fw.testHTTPServer.Close()
1027 fmt.Println(desc)
1028
1029 podNamespace := "other"
1030 podName := "foo"
1031 expectedPodName := getPodName(podName, podNamespace)
1032 expectedContainerName := "baz"
1033 expectedCommand := "ls -a"
1034 expectedStdin := "stdin"
1035 expectedStdout := "stdout"
1036 expectedStderr := "stderr"
1037 done := make(chan struct{})
1038 clientStdoutReadDone := make(chan struct{})
1039 clientStderrReadDone := make(chan struct{})
1040 execInvoked := false
1041 attachInvoked := false
1042
1043 checkStream := func(podFullName string, uid types.UID, containerName string, streamOpts remotecommandserver.Options) {
1044 assert.Equal(t, expectedPodName, podFullName, "podFullName")
1045 if test.uid {
1046 assert.Equal(t, testUID, string(uid), "uid")
1047 }
1048 assert.Equal(t, expectedContainerName, containerName, "containerName")
1049 assert.Equal(t, test.stdin, streamOpts.Stdin, "stdin")
1050 assert.Equal(t, test.stdout, streamOpts.Stdout, "stdout")
1051 assert.Equal(t, test.tty, streamOpts.TTY, "tty")
1052 assert.Equal(t, !test.tty && test.stderr, streamOpts.Stderr, "stderr")
1053 }
1054
1055 fw.fakeKubelet.getExecCheck = func(podFullName string, uid types.UID, containerName string, cmd []string, streamOpts remotecommandserver.Options) {
1056 execInvoked = true
1057 assert.Equal(t, expectedCommand, strings.Join(cmd, " "), "cmd")
1058 checkStream(podFullName, uid, containerName, streamOpts)
1059 }
1060
1061 fw.fakeKubelet.getAttachCheck = func(podFullName string, uid types.UID, containerName string, streamOpts remotecommandserver.Options) {
1062 attachInvoked = true
1063 checkStream(podFullName, uid, containerName, streamOpts)
1064 }
1065
1066 testStream := func(containerID string, in io.Reader, out, stderr io.WriteCloser, tty bool, done chan struct{}) error {
1067 close(done)
1068 assert.Equal(t, testContainerID, containerID, "containerID")
1069 assert.Equal(t, test.tty, tty, "tty")
1070 require.Equal(t, test.stdin, in != nil, "in")
1071 require.Equal(t, test.stdout, out != nil, "out")
1072 require.Equal(t, !test.tty && test.stderr, stderr != nil, "err")
1073
1074 if test.stdin {
1075 b := make([]byte, 10)
1076 n, err := in.Read(b)
1077 assert.NoError(t, err, "reading from stdin")
1078 assert.Equal(t, expectedStdin, string(b[0:n]), "content from stdin")
1079 }
1080
1081 if test.stdout {
1082 _, err := out.Write([]byte(expectedStdout))
1083 assert.NoError(t, err, "writing to stdout")
1084 out.Close()
1085 <-clientStdoutReadDone
1086 }
1087
1088 if !test.tty && test.stderr {
1089 _, err := stderr.Write([]byte(expectedStderr))
1090 assert.NoError(t, err, "writing to stderr")
1091 stderr.Close()
1092 <-clientStderrReadDone
1093 }
1094 return nil
1095 }
1096
1097 ss.fakeRuntime.execFunc = func(containerID string, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error {
1098 assert.Equal(t, expectedCommand, strings.Join(cmd, " "), "cmd")
1099 return testStream(containerID, stdin, stdout, stderr, tty, done)
1100 }
1101
1102 ss.fakeRuntime.attachFunc = func(containerID string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan remotecommand.TerminalSize) error {
1103 return testStream(containerID, stdin, stdout, stderr, tty, done)
1104 }
1105
1106 var url string
1107 if test.uid {
1108 url = fw.testHTTPServer.URL + "/" + verb + "/" + podNamespace + "/" + podName + "/" + testUID + "/" + expectedContainerName + "?ignore=1"
1109 } else {
1110 url = fw.testHTTPServer.URL + "/" + verb + "/" + podNamespace + "/" + podName + "/" + expectedContainerName + "?ignore=1"
1111 }
1112 if verb == "exec" {
1113 url += "&command=ls&command=-a"
1114 }
1115 if test.stdin {
1116 url += "&" + api.ExecStdinParam + "=1"
1117 }
1118 if test.stdout {
1119 url += "&" + api.ExecStdoutParam + "=1"
1120 }
1121 if test.stderr && !test.tty {
1122 url += "&" + api.ExecStderrParam + "=1"
1123 }
1124 if test.tty {
1125 url += "&" + api.ExecTTYParam + "=1"
1126 }
1127
1128 var (
1129 resp *http.Response
1130 upgradeRoundTripper httpstream.UpgradeRoundTripper
1131 c *http.Client
1132 )
1133 upgradeRoundTripper, err = spdy.NewRoundTripper(&tls.Config{})
1134 if err != nil {
1135 t.Fatalf("Error creating SpdyRoundTripper: %v", err)
1136 }
1137 c = &http.Client{Transport: upgradeRoundTripper}
1138
1139 resp, err = c.Do(makeReq(t, "POST", url, "v4.channel.k8s.io"))
1140 require.NoError(t, err, "POSTing")
1141 defer resp.Body.Close()
1142
1143 _, err = io.ReadAll(resp.Body)
1144 assert.NoError(t, err, "reading response body")
1145
1146 require.Equal(t, test.responseStatusCode, resp.StatusCode, "response status")
1147 if test.responseStatusCode != http.StatusSwitchingProtocols {
1148 return
1149 }
1150
1151 conn, err := upgradeRoundTripper.NewConnection(resp)
1152 require.NoError(t, err, "creating streaming connection")
1153 defer conn.Close()
1154
1155 h := http.Header{}
1156 h.Set(api.StreamType, api.StreamTypeError)
1157 _, err = conn.CreateStream(h)
1158 require.NoError(t, err, "creating error stream")
1159
1160 if test.stdin {
1161 h.Set(api.StreamType, api.StreamTypeStdin)
1162 stream, err := conn.CreateStream(h)
1163 require.NoError(t, err, "creating stdin stream")
1164 _, err = stream.Write([]byte(expectedStdin))
1165 require.NoError(t, err, "writing to stdin stream")
1166 }
1167
1168 var stdoutStream httpstream.Stream
1169 if test.stdout {
1170 h.Set(api.StreamType, api.StreamTypeStdout)
1171 stdoutStream, err = conn.CreateStream(h)
1172 require.NoError(t, err, "creating stdout stream")
1173 }
1174
1175 var stderrStream httpstream.Stream
1176 if test.stderr && !test.tty {
1177 h.Set(api.StreamType, api.StreamTypeStderr)
1178 stderrStream, err = conn.CreateStream(h)
1179 require.NoError(t, err, "creating stderr stream")
1180 }
1181
1182 if test.stdout {
1183 output := make([]byte, 10)
1184 n, err := stdoutStream.Read(output)
1185 close(clientStdoutReadDone)
1186 assert.NoError(t, err, "reading from stdout stream")
1187 assert.Equal(t, expectedStdout, string(output[0:n]), "stdout")
1188 }
1189
1190 if test.stderr && !test.tty {
1191 output := make([]byte, 10)
1192 n, err := stderrStream.Read(output)
1193 close(clientStderrReadDone)
1194 assert.NoError(t, err, "reading from stderr stream")
1195 assert.Equal(t, expectedStderr, string(output[0:n]), "stderr")
1196 }
1197
1198
1199 <-done
1200
1201 if verb == "exec" {
1202 assert.True(t, execInvoked, "exec should be invoked")
1203 assert.False(t, attachInvoked, "attach should not be invoked")
1204 } else {
1205 assert.True(t, attachInvoked, "attach should be invoked")
1206 assert.False(t, execInvoked, "exec should not be invoked")
1207 }
1208 })
1209 }
1210 }
1211
1212 func TestServeExecInContainer(t *testing.T) {
1213 testExecAttach(t, "exec")
1214 }
1215
1216 func TestServeAttachContainer(t *testing.T) {
1217 testExecAttach(t, "attach")
1218 }
1219
1220 func TestServePortForwardIdleTimeout(t *testing.T) {
1221 ss, err := newTestStreamingServer(100 * time.Millisecond)
1222 require.NoError(t, err)
1223 defer ss.testHTTPServer.Close()
1224 fw := newServerTestWithDebug(true, ss)
1225 defer fw.testHTTPServer.Close()
1226
1227 podNamespace := "other"
1228 podName := "foo"
1229
1230 url := fw.testHTTPServer.URL + "/portForward/" + podNamespace + "/" + podName
1231
1232 upgradeRoundTripper, err := spdy.NewRoundTripper(&tls.Config{})
1233 if err != nil {
1234 t.Fatalf("Error creating SpdyRoundTripper: %v", err)
1235 }
1236 c := &http.Client{Transport: upgradeRoundTripper}
1237
1238 req := makeReq(t, "POST", url, "portforward.k8s.io")
1239 resp, err := c.Do(req)
1240 if err != nil {
1241 t.Fatalf("Got error POSTing: %v", err)
1242 }
1243 defer resp.Body.Close()
1244
1245 conn, err := upgradeRoundTripper.NewConnection(resp)
1246 if err != nil {
1247 t.Fatalf("Unexpected error creating streaming connection: %s", err)
1248 }
1249 if conn == nil {
1250 t.Fatal("Unexpected nil connection")
1251 }
1252 defer conn.Close()
1253
1254 <-conn.CloseChan()
1255 }
1256
1257 func TestServePortForward(t *testing.T) {
1258 tests := map[string]struct {
1259 port string
1260 uid bool
1261 clientData string
1262 containerData string
1263 shouldError bool
1264 }{
1265 "no port": {port: "", shouldError: true},
1266 "none number port": {port: "abc", shouldError: true},
1267 "negative port": {port: "-1", shouldError: true},
1268 "too large port": {port: "65536", shouldError: true},
1269 "0 port": {port: "0", shouldError: true},
1270 "min port": {port: "1", shouldError: false},
1271 "normal port": {port: "8000", shouldError: false},
1272 "normal port with data forward": {port: "8000", clientData: "client data", containerData: "container data", shouldError: false},
1273 "max port": {port: "65535", shouldError: false},
1274 "normal port with uid": {port: "8000", uid: true, shouldError: false},
1275 }
1276
1277 podNamespace := "other"
1278 podName := "foo"
1279
1280 for desc := range tests {
1281 test := tests[desc]
1282 t.Run(desc, func(t *testing.T) {
1283 ss, err := newTestStreamingServer(0)
1284 require.NoError(t, err)
1285 defer ss.testHTTPServer.Close()
1286 fw := newServerTestWithDebug(true, ss)
1287 defer fw.testHTTPServer.Close()
1288
1289 portForwardFuncDone := make(chan struct{})
1290
1291 fw.fakeKubelet.getPortForwardCheck = func(name, namespace string, uid types.UID, opts portforward.V4Options) {
1292 assert.Equal(t, podName, name, "pod name")
1293 assert.Equal(t, podNamespace, namespace, "pod namespace")
1294 if test.uid {
1295 assert.Equal(t, testUID, string(uid), "uid")
1296 }
1297 }
1298
1299 ss.fakeRuntime.portForwardFunc = func(podSandboxID string, port int32, stream io.ReadWriteCloser) error {
1300 defer close(portForwardFuncDone)
1301 assert.Equal(t, testPodSandboxID, podSandboxID, "pod sandbox id")
1302
1303 testPort, err := strconv.ParseInt(test.port, 10, 32)
1304 require.NoError(t, err, "parse port")
1305 assert.Equal(t, int32(testPort), port, "port")
1306
1307 if test.clientData != "" {
1308 fromClient := make([]byte, 32)
1309 n, err := stream.Read(fromClient)
1310 assert.NoError(t, err, "reading client data")
1311 assert.Equal(t, test.clientData, string(fromClient[0:n]), "client data")
1312 }
1313
1314 if test.containerData != "" {
1315 _, err := stream.Write([]byte(test.containerData))
1316 assert.NoError(t, err, "writing container data")
1317 }
1318
1319 return nil
1320 }
1321
1322 var url string
1323 if test.uid {
1324 url = fmt.Sprintf("%s/portForward/%s/%s/%s", fw.testHTTPServer.URL, podNamespace, podName, testUID)
1325 } else {
1326 url = fmt.Sprintf("%s/portForward/%s/%s", fw.testHTTPServer.URL, podNamespace, podName)
1327 }
1328
1329 var (
1330 upgradeRoundTripper httpstream.UpgradeRoundTripper
1331 c *http.Client
1332 )
1333
1334 upgradeRoundTripper, err = spdy.NewRoundTripper(&tls.Config{})
1335 if err != nil {
1336 t.Fatalf("Error creating SpdyRoundTripper: %v", err)
1337 }
1338 c = &http.Client{Transport: upgradeRoundTripper}
1339
1340 req := makeReq(t, "POST", url, "portforward.k8s.io")
1341 resp, err := c.Do(req)
1342 require.NoError(t, err, "POSTing")
1343 defer resp.Body.Close()
1344
1345 assert.Equal(t, http.StatusSwitchingProtocols, resp.StatusCode, "status code")
1346
1347 conn, err := upgradeRoundTripper.NewConnection(resp)
1348 require.NoError(t, err, "creating streaming connection")
1349 defer conn.Close()
1350
1351 headers := http.Header{}
1352 headers.Set("streamType", "error")
1353 headers.Set("port", test.port)
1354 _, err = conn.CreateStream(headers)
1355 assert.Equal(t, test.shouldError, err != nil, "expect error")
1356
1357 if test.shouldError {
1358 return
1359 }
1360
1361 headers.Set("streamType", "data")
1362 headers.Set("port", test.port)
1363 dataStream, err := conn.CreateStream(headers)
1364 require.NoError(t, err, "create stream")
1365
1366 if test.clientData != "" {
1367 _, err := dataStream.Write([]byte(test.clientData))
1368 assert.NoError(t, err, "writing client data")
1369 }
1370
1371 if test.containerData != "" {
1372 fromContainer := make([]byte, 32)
1373 n, err := dataStream.Read(fromContainer)
1374 assert.NoError(t, err, "reading container data")
1375 assert.Equal(t, test.containerData, string(fromContainer[0:n]), "container data")
1376 }
1377
1378 <-portForwardFuncDone
1379 })
1380 }
1381 }
1382
1383 func TestMetricBuckets(t *testing.T) {
1384 tests := map[string]struct {
1385 url string
1386 bucket string
1387 }{
1388 "healthz endpoint": {url: "/healthz", bucket: "healthz"},
1389 "attach": {url: "/attach/podNamespace/podID/containerName", bucket: "attach"},
1390 "attach with uid": {url: "/attach/podNamespace/podID/uid/containerName", bucket: "attach"},
1391 "configz": {url: "/configz", bucket: "configz"},
1392 "containerLogs": {url: "/containerLogs/podNamespace/podID/containerName", bucket: "containerLogs"},
1393 "debug v flags": {url: "/debug/flags/v", bucket: "debug"},
1394 "pprof with sub": {url: "/debug/pprof/subpath", bucket: "debug"},
1395 "exec": {url: "/exec/podNamespace/podID/containerName", bucket: "exec"},
1396 "exec with uid": {url: "/exec/podNamespace/podID/uid/containerName", bucket: "exec"},
1397 "healthz": {url: "/healthz/", bucket: "healthz"},
1398 "healthz log sub": {url: "/healthz/log", bucket: "healthz"},
1399 "healthz ping": {url: "/healthz/ping", bucket: "healthz"},
1400 "healthz sync loop": {url: "/healthz/syncloop", bucket: "healthz"},
1401 "logs": {url: "/logs/", bucket: "logs"},
1402 "logs with path": {url: "/logs/logpath", bucket: "logs"},
1403 "metrics": {url: "/metrics", bucket: "metrics"},
1404 "metrics cadvisor sub": {url: "/metrics/cadvisor", bucket: "metrics/cadvisor"},
1405 "metrics probes sub": {url: "/metrics/probes", bucket: "metrics/probes"},
1406 "metrics resource sub": {url: "/metrics/resource", bucket: "metrics/resource"},
1407 "pods": {url: "/pods/", bucket: "pods"},
1408 "portForward": {url: "/portForward/podNamespace/podID", bucket: "portForward"},
1409 "portForward with uid": {url: "/portForward/podNamespace/podID/uid", bucket: "portForward"},
1410 "run": {url: "/run/podNamespace/podID/containerName", bucket: "run"},
1411 "run with uid": {url: "/run/podNamespace/podID/uid/containerName", bucket: "run"},
1412 "runningpods": {url: "/runningpods/", bucket: "runningpods"},
1413 "stats": {url: "/stats/", bucket: "stats"},
1414 "stats summary sub": {url: "/stats/summary", bucket: "stats"},
1415 "invalid path": {url: "/junk", bucket: "other"},
1416 "invalid path starting with good": {url: "/healthzjunk", bucket: "other"},
1417 }
1418 fw := newServerTest()
1419 defer fw.testHTTPServer.Close()
1420
1421 for _, test := range tests {
1422 path := test.url
1423 bucket := test.bucket
1424 require.Equal(t, fw.serverUnderTest.getMetricBucket(path), bucket)
1425 }
1426 }
1427
1428 func TestMetricMethodBuckets(t *testing.T) {
1429 tests := map[string]struct {
1430 method string
1431 bucket string
1432 }{
1433 "normal GET": {method: "GET", bucket: "GET"},
1434 "normal POST": {method: "POST", bucket: "POST"},
1435 "invalid method": {method: "WEIRD", bucket: "other"},
1436 }
1437
1438 fw := newServerTest()
1439 defer fw.testHTTPServer.Close()
1440
1441 for _, test := range tests {
1442 method := test.method
1443 bucket := test.bucket
1444 require.Equal(t, fw.serverUnderTest.getMetricMethodBucket(method), bucket)
1445 }
1446 }
1447
1448 func TestDebuggingDisabledHandlers(t *testing.T) {
1449
1450
1451 kubeCfg := &kubeletconfiginternal.KubeletConfiguration{
1452 EnableDebuggingHandlers: false,
1453 EnableSystemLogHandler: true,
1454 EnableDebugFlagsHandler: true,
1455 EnableProfilingHandler: true,
1456 }
1457 fw := newServerTestWithDebuggingHandlers(kubeCfg, nil)
1458 defer fw.testHTTPServer.Close()
1459
1460 paths := []string{
1461 "/run", "/exec", "/attach", "/portForward", "/containerLogs", "/runningpods",
1462 "/run/", "/exec/", "/attach/", "/portForward/", "/containerLogs/", "/runningpods/",
1463 "/run/xxx", "/exec/xxx", "/attach/xxx", "/debug/pprof/profile", "/logs/kubelet.log",
1464 }
1465
1466 for _, p := range paths {
1467 verifyEndpointResponse(t, fw, p, "Debug endpoints are disabled.\n")
1468 }
1469 }
1470
1471 func TestDisablingLogAndProfilingHandler(t *testing.T) {
1472 kubeCfg := &kubeletconfiginternal.KubeletConfiguration{
1473 EnableDebuggingHandlers: true,
1474 }
1475 fw := newServerTestWithDebuggingHandlers(kubeCfg, nil)
1476 defer fw.testHTTPServer.Close()
1477
1478
1479 verifyEndpointResponse(t, fw, "/logs/kubelet.log", "logs endpoint is disabled.\n")
1480 verifyEndpointResponse(t, fw, "/debug/pprof/profile?seconds=2", "profiling endpoint is disabled.\n")
1481 verifyEndpointResponse(t, fw, "/debug/flags/v", "flags endpoint is disabled.\n")
1482 }
1483
1484 func TestFailedParseParamsSummaryHandler(t *testing.T) {
1485 fw := newServerTest()
1486 defer fw.testHTTPServer.Close()
1487
1488 resp, err := http.Post(fw.testHTTPServer.URL+"/stats/summary", "invalid/content/type", nil)
1489 assert.NoError(t, err)
1490 defer resp.Body.Close()
1491 v, err := io.ReadAll(resp.Body)
1492 assert.NoError(t, err)
1493 assert.Equal(t, http.StatusInternalServerError, resp.StatusCode)
1494 assert.Contains(t, string(v), "parse form failed")
1495 }
1496
1497 func verifyEndpointResponse(t *testing.T, fw *serverTestFramework, path string, expectedResponse string) {
1498 resp, err := http.Get(fw.testHTTPServer.URL + path)
1499 require.NoError(t, err)
1500 assert.Equal(t, http.StatusMethodNotAllowed, resp.StatusCode)
1501 body, err := io.ReadAll(resp.Body)
1502 require.NoError(t, err)
1503 assert.Equal(t, expectedResponse, string(body))
1504
1505 resp, err = http.Post(fw.testHTTPServer.URL+path, "", nil)
1506 require.NoError(t, err)
1507 assert.Equal(t, http.StatusMethodNotAllowed, resp.StatusCode)
1508 body, err = io.ReadAll(resp.Body)
1509 require.NoError(t, err)
1510 assert.Equal(t, expectedResponse, string(body))
1511 }
1512
1513 func TestTrimURLPath(t *testing.T) {
1514 tests := []struct {
1515 path, expected string
1516 }{
1517 {"", ""},
1518 {"//", ""},
1519 {"/pods", "pods"},
1520 {"pods", "pods"},
1521 {"pods/", "pods"},
1522 {"good/", "good"},
1523 {"pods/probes", "pods"},
1524 {"metrics", "metrics"},
1525 {"metrics/resource", "metrics/resource"},
1526 {"metrics/hello", "metrics/hello"},
1527 }
1528
1529 for _, test := range tests {
1530 assert.Equal(t, test.expected, getURLRootPath(test.path), fmt.Sprintf("path is: %s", test.path))
1531 }
1532 }
1533
View as plain text