1
16
17 package kubelet
18
19 import (
20 "context"
21 "crypto/tls"
22 "fmt"
23 "net"
24 "os"
25 "path/filepath"
26 "reflect"
27 goruntime "runtime"
28 "sort"
29 "strconv"
30 "strings"
31 "testing"
32 "time"
33
34 sdktrace "go.opentelemetry.io/otel/sdk/trace"
35 "go.opentelemetry.io/otel/sdk/trace/tracetest"
36 oteltrace "go.opentelemetry.io/otel/trace"
37
38 "github.com/golang/mock/gomock"
39 cadvisorapi "github.com/google/cadvisor/info/v1"
40 cadvisorapiv2 "github.com/google/cadvisor/info/v2"
41 "github.com/stretchr/testify/assert"
42 "github.com/stretchr/testify/require"
43 core "k8s.io/client-go/testing"
44 "k8s.io/mount-utils"
45
46 v1 "k8s.io/api/core/v1"
47 "k8s.io/apimachinery/pkg/api/resource"
48 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
49 "k8s.io/apimachinery/pkg/labels"
50 "k8s.io/apimachinery/pkg/types"
51 "k8s.io/apimachinery/pkg/util/sets"
52 "k8s.io/apimachinery/pkg/util/wait"
53 utilfeature "k8s.io/apiserver/pkg/util/feature"
54 "k8s.io/client-go/kubernetes/fake"
55 "k8s.io/client-go/tools/record"
56 "k8s.io/client-go/util/flowcontrol"
57 featuregatetesting "k8s.io/component-base/featuregate/testing"
58 internalapi "k8s.io/cri-api/pkg/apis"
59 runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
60 "k8s.io/klog/v2/ktesting"
61 "k8s.io/kubernetes/pkg/features"
62 kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config"
63 cadvisortest "k8s.io/kubernetes/pkg/kubelet/cadvisor/testing"
64 "k8s.io/kubernetes/pkg/kubelet/clustertrustbundle"
65 "k8s.io/kubernetes/pkg/kubelet/cm"
66 "k8s.io/kubernetes/pkg/kubelet/config"
67 "k8s.io/kubernetes/pkg/kubelet/configmap"
68 kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
69 containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
70 "k8s.io/kubernetes/pkg/kubelet/cri/remote"
71 fakeremote "k8s.io/kubernetes/pkg/kubelet/cri/remote/fake"
72 "k8s.io/kubernetes/pkg/kubelet/eviction"
73 "k8s.io/kubernetes/pkg/kubelet/images"
74 "k8s.io/kubernetes/pkg/kubelet/kuberuntime"
75 "k8s.io/kubernetes/pkg/kubelet/lifecycle"
76 "k8s.io/kubernetes/pkg/kubelet/logs"
77 "k8s.io/kubernetes/pkg/kubelet/network/dns"
78 "k8s.io/kubernetes/pkg/kubelet/nodeshutdown"
79 "k8s.io/kubernetes/pkg/kubelet/pleg"
80 "k8s.io/kubernetes/pkg/kubelet/pluginmanager"
81 kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
82 podtest "k8s.io/kubernetes/pkg/kubelet/pod/testing"
83 proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
84 probetest "k8s.io/kubernetes/pkg/kubelet/prober/testing"
85 "k8s.io/kubernetes/pkg/kubelet/secret"
86 "k8s.io/kubernetes/pkg/kubelet/server"
87 serverstats "k8s.io/kubernetes/pkg/kubelet/server/stats"
88 "k8s.io/kubernetes/pkg/kubelet/stats"
89 "k8s.io/kubernetes/pkg/kubelet/status"
90 "k8s.io/kubernetes/pkg/kubelet/status/state"
91 statustest "k8s.io/kubernetes/pkg/kubelet/status/testing"
92 "k8s.io/kubernetes/pkg/kubelet/token"
93 kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
94 kubeletutil "k8s.io/kubernetes/pkg/kubelet/util"
95 "k8s.io/kubernetes/pkg/kubelet/util/queue"
96 kubeletvolume "k8s.io/kubernetes/pkg/kubelet/volumemanager"
97 schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
98 "k8s.io/kubernetes/pkg/util/oom"
99 "k8s.io/kubernetes/pkg/volume"
100 _ "k8s.io/kubernetes/pkg/volume/hostpath"
101 volumesecret "k8s.io/kubernetes/pkg/volume/secret"
102 volumetest "k8s.io/kubernetes/pkg/volume/testing"
103 "k8s.io/kubernetes/pkg/volume/util"
104 "k8s.io/kubernetes/pkg/volume/util/hostutil"
105 "k8s.io/kubernetes/pkg/volume/util/subpath"
106 "k8s.io/utils/clock"
107 testingclock "k8s.io/utils/clock/testing"
108 utilpointer "k8s.io/utils/pointer"
109 )
110
111 func init() {
112 }
113
114 const (
115 testKubeletHostname = "127.0.0.1"
116 testKubeletHostIP = "127.0.0.1"
117 testKubeletHostIPv6 = "::1"
118
119
120
121 minImgSize int64 = 23 * 1024 * 1024
122 maxImgSize int64 = 1000 * 1024 * 1024
123 )
124
125
126
127 type fakeImageGCManager struct {
128 fakeImageService kubecontainer.ImageService
129 images.ImageGCManager
130 }
131
132 func (f *fakeImageGCManager) GetImageList() ([]kubecontainer.Image, error) {
133 return f.fakeImageService.ListImages(context.Background())
134 }
135
136 type TestKubelet struct {
137 kubelet *Kubelet
138 fakeRuntime *containertest.FakeRuntime
139 fakeContainerManager *cm.FakeContainerManager
140 fakeKubeClient *fake.Clientset
141 fakeMirrorClient *podtest.FakeMirrorClient
142 fakeClock *testingclock.FakeClock
143 mounter mount.Interface
144 volumePlugin *volumetest.FakeVolumePlugin
145 }
146
147 func (tk *TestKubelet) Cleanup() {
148 if tk.kubelet != nil {
149 os.RemoveAll(tk.kubelet.rootDirectory)
150 tk.kubelet = nil
151 }
152 }
153
154
155 func newTestKubelet(t *testing.T, controllerAttachDetachEnabled bool) *TestKubelet {
156 imageList := []kubecontainer.Image{
157 {
158 ID: "abc",
159 RepoTags: []string{"registry.k8s.io:v1", "registry.k8s.io:v2"},
160 Size: 123,
161 },
162 {
163 ID: "efg",
164 RepoTags: []string{"registry.k8s.io:v3", "registry.k8s.io:v4"},
165 Size: 456,
166 },
167 }
168 return newTestKubeletWithImageList(t, imageList, controllerAttachDetachEnabled, true , true )
169 }
170
171 func newTestKubeletWithImageList(
172 t *testing.T,
173 imageList []kubecontainer.Image,
174 controllerAttachDetachEnabled bool,
175 initFakeVolumePlugin bool,
176 localStorageCapacityIsolation bool,
177 ) *TestKubelet {
178 logger, _ := ktesting.NewTestContext(t)
179
180 fakeRuntime := &containertest.FakeRuntime{
181 ImageList: imageList,
182
183 RuntimeStatus: &kubecontainer.RuntimeStatus{
184 Conditions: []kubecontainer.RuntimeCondition{
185 {Type: "RuntimeReady", Status: true},
186 {Type: "NetworkReady", Status: true},
187 },
188 },
189 VersionInfo: "1.5.0",
190 RuntimeType: "test",
191 T: t,
192 }
193
194 fakeRecorder := &record.FakeRecorder{}
195 fakeKubeClient := &fake.Clientset{}
196 kubelet := &Kubelet{}
197 kubelet.recorder = fakeRecorder
198 kubelet.kubeClient = fakeKubeClient
199 kubelet.heartbeatClient = fakeKubeClient
200 kubelet.os = &containertest.FakeOS{}
201 kubelet.mounter = mount.NewFakeMounter(nil)
202 kubelet.hostutil = hostutil.NewFakeHostUtil(nil)
203 kubelet.subpather = &subpath.FakeSubpath{}
204
205 kubelet.hostname = testKubeletHostname
206 kubelet.nodeName = types.NodeName(testKubeletHostname)
207 kubelet.runtimeState = newRuntimeState(maxWaitForContainerRuntime)
208 kubelet.runtimeState.setNetworkState(nil)
209 kubelet.rootDirectory = t.TempDir()
210 kubelet.podLogsDirectory = t.TempDir()
211 kubelet.sourcesReady = config.NewSourcesReady(func(_ sets.String) bool { return true })
212 kubelet.serviceLister = testServiceLister{}
213 kubelet.serviceHasSynced = func() bool { return true }
214 kubelet.nodeHasSynced = func() bool { return true }
215 kubelet.nodeLister = testNodeLister{
216 nodes: []*v1.Node{
217 {
218 ObjectMeta: metav1.ObjectMeta{
219 Name: string(kubelet.nodeName),
220 },
221 Status: v1.NodeStatus{
222 Conditions: []v1.NodeCondition{
223 {
224 Type: v1.NodeReady,
225 Status: v1.ConditionTrue,
226 Reason: "Ready",
227 Message: "Node ready",
228 },
229 },
230 Addresses: []v1.NodeAddress{
231 {
232 Type: v1.NodeInternalIP,
233 Address: testKubeletHostIP,
234 },
235 {
236 Type: v1.NodeInternalIP,
237 Address: testKubeletHostIPv6,
238 },
239 },
240 VolumesAttached: []v1.AttachedVolume{
241 {
242 Name: "fake/fake-device",
243 DevicePath: "fake/path",
244 },
245 },
246 },
247 },
248 },
249 }
250 kubelet.recorder = fakeRecorder
251 if err := kubelet.setupDataDirs(); err != nil {
252 t.Fatalf("can't initialize kubelet data dirs: %v", err)
253 }
254 kubelet.daemonEndpoints = &v1.NodeDaemonEndpoints{}
255
256 kubelet.cadvisor = &cadvisortest.Fake{}
257 machineInfo, _ := kubelet.cadvisor.MachineInfo()
258 kubelet.setCachedMachineInfo(machineInfo)
259 kubelet.tracer = oteltrace.NewNoopTracerProvider().Tracer("")
260
261 fakeMirrorClient := podtest.NewFakeMirrorClient()
262 secretManager := secret.NewSimpleSecretManager(kubelet.kubeClient)
263 kubelet.secretManager = secretManager
264 configMapManager := configmap.NewSimpleConfigMapManager(kubelet.kubeClient)
265 kubelet.configMapManager = configMapManager
266 kubelet.mirrorPodClient = fakeMirrorClient
267 kubelet.podManager = kubepod.NewBasicPodManager()
268 podStartupLatencyTracker := kubeletutil.NewPodStartupLatencyTracker()
269 kubelet.statusManager = status.NewManager(fakeKubeClient, kubelet.podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker, kubelet.getRootDir())
270 kubelet.nodeStartupLatencyTracker = kubeletutil.NewNodeStartupLatencyTracker()
271
272 kubelet.containerRuntime = fakeRuntime
273 kubelet.runtimeCache = containertest.NewFakeRuntimeCache(kubelet.containerRuntime)
274 kubelet.reasonCache = NewReasonCache()
275 kubelet.podCache = containertest.NewFakeCache(kubelet.containerRuntime)
276 kubelet.podWorkers = &fakePodWorkers{
277 syncPodFn: kubelet.SyncPod,
278 cache: kubelet.podCache,
279 t: t,
280 }
281
282 kubelet.probeManager = probetest.FakeManager{}
283 kubelet.livenessManager = proberesults.NewManager()
284 kubelet.readinessManager = proberesults.NewManager()
285 kubelet.startupManager = proberesults.NewManager()
286
287 fakeContainerManager := cm.NewFakeContainerManager()
288 kubelet.containerManager = fakeContainerManager
289 fakeNodeRef := &v1.ObjectReference{
290 Kind: "Node",
291 Name: testKubeletHostname,
292 UID: types.UID(testKubeletHostname),
293 Namespace: "",
294 }
295
296 volumeStatsAggPeriod := time.Second * 10
297 kubelet.resourceAnalyzer = serverstats.NewResourceAnalyzer(kubelet, volumeStatsAggPeriod, kubelet.recorder)
298
299 fakeHostStatsProvider := stats.NewFakeHostStatsProvider()
300
301 kubelet.StatsProvider = stats.NewCadvisorStatsProvider(
302 kubelet.cadvisor,
303 kubelet.resourceAnalyzer,
304 kubelet.podManager,
305 kubelet.runtimeCache,
306 fakeRuntime,
307 kubelet.statusManager,
308 fakeHostStatsProvider,
309 )
310 fakeImageGCPolicy := images.ImageGCPolicy{
311 HighThresholdPercent: 90,
312 LowThresholdPercent: 80,
313 }
314 imageGCManager, err := images.NewImageGCManager(fakeRuntime, kubelet.StatsProvider, fakeRecorder, fakeNodeRef, fakeImageGCPolicy, oteltrace.NewNoopTracerProvider())
315 assert.NoError(t, err)
316 kubelet.imageManager = &fakeImageGCManager{
317 fakeImageService: fakeRuntime,
318 ImageGCManager: imageGCManager,
319 }
320 kubelet.containerLogManager = logs.NewStubContainerLogManager()
321 containerGCPolicy := kubecontainer.GCPolicy{
322 MinAge: time.Duration(0),
323 MaxPerPodContainer: 1,
324 MaxContainers: -1,
325 }
326 containerGC, err := kubecontainer.NewContainerGC(fakeRuntime, containerGCPolicy, kubelet.sourcesReady)
327 assert.NoError(t, err)
328 kubelet.containerGC = containerGC
329
330 fakeClock := testingclock.NewFakeClock(time.Now())
331 kubelet.backOff = flowcontrol.NewBackOff(time.Second, time.Minute)
332 kubelet.backOff.Clock = fakeClock
333 kubelet.resyncInterval = 10 * time.Second
334 kubelet.workQueue = queue.NewBasicWorkQueue(fakeClock)
335
336 kubelet.pleg = pleg.NewGenericPLEG(fakeRuntime, make(chan *pleg.PodLifecycleEvent, 100), &pleg.RelistDuration{RelistPeriod: time.Hour, RelistThreshold: genericPlegRelistThreshold}, kubelet.podCache, clock.RealClock{})
337 kubelet.clock = fakeClock
338
339 nodeRef := &v1.ObjectReference{
340 Kind: "Node",
341 Name: string(kubelet.nodeName),
342 UID: types.UID(kubelet.nodeName),
343 Namespace: "",
344 }
345
346 evictionManager, evictionAdmitHandler := eviction.NewManager(kubelet.resourceAnalyzer, eviction.Config{},
347 killPodNow(kubelet.podWorkers, fakeRecorder), kubelet.imageManager, kubelet.containerGC, fakeRecorder, nodeRef, kubelet.clock, kubelet.supportLocalStorageCapacityIsolation())
348
349 kubelet.evictionManager = evictionManager
350 kubelet.admitHandlers.AddPodAdmitHandler(evictionAdmitHandler)
351
352
353 shutdownManager, shutdownAdmitHandler := nodeshutdown.NewManager(&nodeshutdown.Config{
354 Logger: logger,
355 ProbeManager: kubelet.probeManager,
356 Recorder: fakeRecorder,
357 NodeRef: nodeRef,
358 GetPodsFunc: kubelet.podManager.GetPods,
359 KillPodFunc: killPodNow(kubelet.podWorkers, fakeRecorder),
360 SyncNodeStatusFunc: func() {},
361 ShutdownGracePeriodRequested: 0,
362 ShutdownGracePeriodCriticalPods: 0,
363 })
364 kubelet.shutdownManager = shutdownManager
365 kubelet.admitHandlers.AddPodAdmitHandler(shutdownAdmitHandler)
366
367
368 kubelet.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(kubelet.getNodeAnyWay, lifecycle.NewAdmissionFailureHandlerStub(), kubelet.containerManager.UpdatePluginResources))
369
370 allPlugins := []volume.VolumePlugin{}
371 plug := &volumetest.FakeVolumePlugin{PluginName: "fake", Host: nil}
372 if initFakeVolumePlugin {
373 allPlugins = append(allPlugins, plug)
374 } else {
375 allPlugins = append(allPlugins, volumesecret.ProbeVolumePlugins()...)
376 }
377
378 var prober volume.DynamicPluginProber
379 kubelet.volumePluginMgr, err =
380 NewInitializedVolumePluginMgr(kubelet, kubelet.secretManager, kubelet.configMapManager, token.NewManager(kubelet.kubeClient), &clustertrustbundle.NoopManager{}, allPlugins, prober)
381 require.NoError(t, err, "Failed to initialize VolumePluginMgr")
382
383 kubelet.volumeManager = kubeletvolume.NewVolumeManager(
384 controllerAttachDetachEnabled,
385 kubelet.nodeName,
386 kubelet.podManager,
387 kubelet.podWorkers,
388 fakeKubeClient,
389 kubelet.volumePluginMgr,
390 fakeRuntime,
391 kubelet.mounter,
392 kubelet.hostutil,
393 kubelet.getPodsDir(),
394 kubelet.recorder,
395 false,
396 volumetest.NewBlockVolumePathHandler())
397
398 kubelet.pluginManager = pluginmanager.NewPluginManager(
399 kubelet.getPluginsRegistrationDir(),
400 kubelet.recorder,
401 )
402 kubelet.setNodeStatusFuncs = kubelet.defaultNodeStatusFuncs()
403
404
405 activeDeadlineHandler, err := newActiveDeadlineHandler(kubelet.statusManager, kubelet.recorder, kubelet.clock)
406 require.NoError(t, err, "Can't initialize active deadline handler")
407
408 kubelet.AddPodSyncLoopHandler(activeDeadlineHandler)
409 kubelet.AddPodSyncHandler(activeDeadlineHandler)
410 kubelet.kubeletConfiguration.LocalStorageCapacityIsolation = localStorageCapacityIsolation
411 return &TestKubelet{kubelet, fakeRuntime, fakeContainerManager, fakeKubeClient, fakeMirrorClient, fakeClock, nil, plug}
412 }
413
414 func newTestPods(count int) []*v1.Pod {
415 pods := make([]*v1.Pod, count)
416 for i := 0; i < count; i++ {
417 pods[i] = &v1.Pod{
418 Spec: v1.PodSpec{
419 HostNetwork: true,
420 },
421 ObjectMeta: metav1.ObjectMeta{
422 UID: types.UID(strconv.Itoa(10000 + i)),
423 Name: fmt.Sprintf("pod%d", i),
424 },
425 }
426 }
427 return pods
428 }
429
430 func TestSyncLoopAbort(t *testing.T) {
431 ctx := context.Background()
432 testKubelet := newTestKubelet(t, false )
433 defer testKubelet.Cleanup()
434 kubelet := testKubelet.kubelet
435 kubelet.runtimeState.setRuntimeSync(time.Now())
436
437
438 kubelet.resyncInterval = time.Second * 30
439
440 ch := make(chan kubetypes.PodUpdate)
441 close(ch)
442
443
444 ok := kubelet.syncLoopIteration(ctx, ch, kubelet, make(chan time.Time), make(chan time.Time), make(chan *pleg.PodLifecycleEvent, 1))
445 require.False(t, ok, "Expected syncLoopIteration to return !ok since update chan was closed")
446
447
448 kubelet.syncLoop(ctx, ch, kubelet)
449 }
450
451 func TestSyncPodsStartPod(t *testing.T) {
452 testKubelet := newTestKubelet(t, false )
453 defer testKubelet.Cleanup()
454 kubelet := testKubelet.kubelet
455 fakeRuntime := testKubelet.fakeRuntime
456 pods := []*v1.Pod{
457 podWithUIDNameNsSpec("12345678", "foo", "new", v1.PodSpec{
458 Containers: []v1.Container{
459 {Name: "bar"},
460 },
461 }),
462 }
463 kubelet.podManager.SetPods(pods)
464 kubelet.HandlePodSyncs(pods)
465 fakeRuntime.AssertStartedPods([]string{string(pods[0].UID)})
466 }
467
468 func TestHandlePodCleanupsPerQOS(t *testing.T) {
469 ctx := context.Background()
470 testKubelet := newTestKubelet(t, false )
471 defer testKubelet.Cleanup()
472
473 pod := &kubecontainer.Pod{
474 ID: "12345678",
475 Name: "foo",
476 Namespace: "new",
477 Containers: []*kubecontainer.Container{
478 {Name: "bar"},
479 },
480 }
481
482 fakeRuntime := testKubelet.fakeRuntime
483 fakeContainerManager := testKubelet.fakeContainerManager
484 fakeContainerManager.PodContainerManager.AddPodFromCgroups(pod)
485 fakeRuntime.PodList = []*containertest.FakePod{
486 {Pod: pod},
487 }
488 kubelet := testKubelet.kubelet
489 kubelet.cgroupsPerQOS = true
490
491
492
493
494
495
496
497 kubelet.HandlePodCleanups(ctx)
498
499
500 if actual, expected := kubelet.podWorkers.(*fakePodWorkers).triggeredDeletion, []types.UID{"12345678"}; !reflect.DeepEqual(actual, expected) {
501 t.Fatalf("expected %v to be deleted, got %v", expected, actual)
502 }
503 fakeRuntime.AssertKilledPods([]string(nil))
504
505
506 fakeRuntime.PodList = nil
507
508 kubelet.HandlePodCleanups(ctx)
509 kubelet.HandlePodCleanups(ctx)
510 kubelet.HandlePodCleanups(ctx)
511
512 destroyCount := 0
513 err := wait.Poll(100*time.Millisecond, 10*time.Second, func() (bool, error) {
514 fakeContainerManager.PodContainerManager.Lock()
515 defer fakeContainerManager.PodContainerManager.Unlock()
516 destroyCount = 0
517 for _, functionName := range fakeContainerManager.PodContainerManager.CalledFunctions {
518 if functionName == "Destroy" {
519 destroyCount = destroyCount + 1
520 }
521 }
522 return destroyCount >= 1, nil
523 })
524
525 assert.NoError(t, err, "wait should not return error")
526
527
528
529
530 assert.True(t, destroyCount >= 1, "Expect 1 or more destroys")
531 }
532
533 func TestDispatchWorkOfCompletedPod(t *testing.T) {
534 testKubelet := newTestKubelet(t, false )
535 defer testKubelet.Cleanup()
536 kubelet := testKubelet.kubelet
537 var got bool
538 kubelet.podWorkers = &fakePodWorkers{
539 syncPodFn: func(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (bool, error) {
540 got = true
541 return false, nil
542 },
543 cache: kubelet.podCache,
544 t: t,
545 }
546 now := metav1.NewTime(time.Now())
547 pods := []*v1.Pod{
548 {
549 ObjectMeta: metav1.ObjectMeta{
550 UID: "1",
551 Name: "completed-pod1",
552 Namespace: "ns",
553 Annotations: make(map[string]string),
554 },
555 Status: v1.PodStatus{
556 Phase: v1.PodFailed,
557 ContainerStatuses: []v1.ContainerStatus{
558 {
559 State: v1.ContainerState{
560 Terminated: &v1.ContainerStateTerminated{},
561 },
562 },
563 },
564 },
565 },
566 {
567 ObjectMeta: metav1.ObjectMeta{
568 UID: "2",
569 Name: "completed-pod2",
570 Namespace: "ns",
571 Annotations: make(map[string]string),
572 },
573 Status: v1.PodStatus{
574 Phase: v1.PodSucceeded,
575 ContainerStatuses: []v1.ContainerStatus{
576 {
577 State: v1.ContainerState{
578 Terminated: &v1.ContainerStateTerminated{},
579 },
580 },
581 },
582 },
583 },
584 {
585 ObjectMeta: metav1.ObjectMeta{
586 UID: "3",
587 Name: "completed-pod3",
588 Namespace: "ns",
589 Annotations: make(map[string]string),
590 DeletionTimestamp: &now,
591 },
592 Status: v1.PodStatus{
593 ContainerStatuses: []v1.ContainerStatus{
594 {
595 State: v1.ContainerState{
596 Terminated: &v1.ContainerStateTerminated{},
597 },
598 },
599 },
600 },
601 },
602 }
603 for _, pod := range pods {
604 kubelet.podWorkers.UpdatePod(UpdatePodOptions{
605 Pod: pod,
606 UpdateType: kubetypes.SyncPodSync,
607 StartTime: time.Now(),
608 })
609 if !got {
610 t.Errorf("Should not skip completed pod %q", pod.Name)
611 }
612 got = false
613 }
614 }
615
616 func TestDispatchWorkOfActivePod(t *testing.T) {
617 testKubelet := newTestKubelet(t, false )
618 defer testKubelet.Cleanup()
619 kubelet := testKubelet.kubelet
620 var got bool
621 kubelet.podWorkers = &fakePodWorkers{
622 syncPodFn: func(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (bool, error) {
623 got = true
624 return false, nil
625 },
626 cache: kubelet.podCache,
627 t: t,
628 }
629 pods := []*v1.Pod{
630 {
631 ObjectMeta: metav1.ObjectMeta{
632 UID: "1",
633 Name: "active-pod1",
634 Namespace: "ns",
635 Annotations: make(map[string]string),
636 },
637 Status: v1.PodStatus{
638 Phase: v1.PodRunning,
639 },
640 },
641 {
642 ObjectMeta: metav1.ObjectMeta{
643 UID: "2",
644 Name: "active-pod2",
645 Namespace: "ns",
646 Annotations: make(map[string]string),
647 },
648 Status: v1.PodStatus{
649 Phase: v1.PodFailed,
650 ContainerStatuses: []v1.ContainerStatus{
651 {
652 State: v1.ContainerState{
653 Running: &v1.ContainerStateRunning{},
654 },
655 },
656 },
657 },
658 },
659 }
660
661 for _, pod := range pods {
662 kubelet.podWorkers.UpdatePod(UpdatePodOptions{
663 Pod: pod,
664 UpdateType: kubetypes.SyncPodSync,
665 StartTime: time.Now(),
666 })
667 if !got {
668 t.Errorf("Should not skip active pod %q", pod.Name)
669 }
670 got = false
671 }
672 }
673
674 func TestHandlePodCleanups(t *testing.T) {
675 ctx := context.Background()
676 testKubelet := newTestKubelet(t, false )
677 defer testKubelet.Cleanup()
678
679 pod := &kubecontainer.Pod{
680 ID: "12345678",
681 Name: "foo",
682 Namespace: "new",
683 Containers: []*kubecontainer.Container{
684 {Name: "bar"},
685 },
686 }
687
688 fakeRuntime := testKubelet.fakeRuntime
689 fakeRuntime.PodList = []*containertest.FakePod{
690 {Pod: pod},
691 }
692 kubelet := testKubelet.kubelet
693
694 kubelet.HandlePodCleanups(ctx)
695
696
697 if actual, expected := kubelet.podWorkers.(*fakePodWorkers).triggeredDeletion, []types.UID{"12345678"}; !reflect.DeepEqual(actual, expected) {
698 t.Fatalf("expected %v to be deleted, got %v", expected, actual)
699 }
700 fakeRuntime.AssertKilledPods([]string(nil))
701 }
702
703 func TestHandlePodRemovesWhenSourcesAreReady(t *testing.T) {
704 if testing.Short() {
705 t.Skip("skipping test in short mode.")
706 }
707
708 ready := false
709
710 testKubelet := newTestKubelet(t, false )
711 defer testKubelet.Cleanup()
712
713 fakePod := &kubecontainer.Pod{
714 ID: "1",
715 Name: "foo",
716 Namespace: "new",
717 Containers: []*kubecontainer.Container{
718 {Name: "bar"},
719 },
720 }
721
722 pods := []*v1.Pod{
723 podWithUIDNameNs("1", "foo", "new"),
724 }
725
726 fakeRuntime := testKubelet.fakeRuntime
727 fakeRuntime.PodList = []*containertest.FakePod{
728 {Pod: fakePod},
729 }
730 kubelet := testKubelet.kubelet
731 kubelet.sourcesReady = config.NewSourcesReady(func(_ sets.String) bool { return ready })
732
733 kubelet.HandlePodRemoves(pods)
734 time.Sleep(2 * time.Second)
735
736
737 if expect, actual := []types.UID(nil), kubelet.podWorkers.(*fakePodWorkers).triggeredDeletion; !reflect.DeepEqual(expect, actual) {
738 t.Fatalf("expected %v kills, got %v", expect, actual)
739 }
740
741 ready = true
742 kubelet.HandlePodRemoves(pods)
743 time.Sleep(2 * time.Second)
744
745
746 if expect, actual := []types.UID{"1"}, kubelet.podWorkers.(*fakePodWorkers).triggeredDeletion; !reflect.DeepEqual(expect, actual) {
747 t.Fatalf("expected %v kills, got %v", expect, actual)
748 }
749 }
750
751 type testNodeLister struct {
752 nodes []*v1.Node
753 }
754
755 func (nl testNodeLister) Get(name string) (*v1.Node, error) {
756 for _, node := range nl.nodes {
757 if node.Name == name {
758 return node, nil
759 }
760 }
761 return nil, fmt.Errorf("Node with name: %s does not exist", name)
762 }
763
764 func (nl testNodeLister) List(_ labels.Selector) (ret []*v1.Node, err error) {
765 return nl.nodes, nil
766 }
767
768 func checkPodStatus(t *testing.T, kl *Kubelet, pod *v1.Pod, phase v1.PodPhase) {
769 t.Helper()
770 status, found := kl.statusManager.GetPodStatus(pod.UID)
771 require.True(t, found, "Status of pod %q is not found in the status map", pod.UID)
772 require.Equal(t, phase, status.Phase)
773 }
774
775
776 func TestHandlePortConflicts(t *testing.T) {
777 testKubelet := newTestKubelet(t, false )
778 defer testKubelet.Cleanup()
779 kl := testKubelet.kubelet
780
781 kl.nodeLister = testNodeLister{nodes: []*v1.Node{
782 {
783 ObjectMeta: metav1.ObjectMeta{Name: string(kl.nodeName)},
784 Status: v1.NodeStatus{
785 Allocatable: v1.ResourceList{
786 v1.ResourcePods: *resource.NewQuantity(110, resource.DecimalSI),
787 },
788 },
789 },
790 }}
791
792 recorder := record.NewFakeRecorder(20)
793 nodeRef := &v1.ObjectReference{
794 Kind: "Node",
795 Name: "testNode",
796 UID: types.UID("testNode"),
797 Namespace: "",
798 }
799 testClusterDNSDomain := "TEST"
800 kl.dnsConfigurer = dns.NewConfigurer(recorder, nodeRef, nil, nil, testClusterDNSDomain, "")
801
802 spec := v1.PodSpec{NodeName: string(kl.nodeName), Containers: []v1.Container{{Ports: []v1.ContainerPort{{HostPort: 80}}}}}
803 pods := []*v1.Pod{
804 podWithUIDNameNsSpec("123456789", "newpod", "foo", spec),
805 podWithUIDNameNsSpec("987654321", "oldpod", "foo", spec),
806 }
807
808 pods[1].CreationTimestamp = metav1.NewTime(time.Now())
809 pods[0].CreationTimestamp = metav1.NewTime(time.Now().Add(1 * time.Second))
810
811 notfittingPod := pods[0]
812 fittingPod := pods[1]
813 kl.podWorkers.(*fakePodWorkers).running = map[types.UID]bool{
814 pods[0].UID: true,
815 pods[1].UID: true,
816 }
817
818 kl.HandlePodAdditions(pods)
819
820
821 checkPodStatus(t, kl, notfittingPod, v1.PodFailed)
822 checkPodStatus(t, kl, fittingPod, v1.PodPending)
823 }
824
825
826 func TestHandleHostNameConflicts(t *testing.T) {
827 testKubelet := newTestKubelet(t, false )
828 defer testKubelet.Cleanup()
829 kl := testKubelet.kubelet
830
831 kl.nodeLister = testNodeLister{nodes: []*v1.Node{
832 {
833 ObjectMeta: metav1.ObjectMeta{Name: "127.0.0.1"},
834 Status: v1.NodeStatus{
835 Allocatable: v1.ResourceList{
836 v1.ResourcePods: *resource.NewQuantity(110, resource.DecimalSI),
837 },
838 },
839 },
840 }}
841
842 recorder := record.NewFakeRecorder(20)
843 nodeRef := &v1.ObjectReference{
844 Kind: "Node",
845 Name: "testNode",
846 UID: types.UID("testNode"),
847 Namespace: "",
848 }
849 testClusterDNSDomain := "TEST"
850 kl.dnsConfigurer = dns.NewConfigurer(recorder, nodeRef, nil, nil, testClusterDNSDomain, "")
851
852
853 pods := []*v1.Pod{
854 podWithUIDNameNsSpec("123456789", "notfittingpod", "foo", v1.PodSpec{NodeName: "127.0.0.2"}),
855 podWithUIDNameNsSpec("987654321", "fittingpod", "foo", v1.PodSpec{NodeName: "127.0.0.1"}),
856 }
857
858 notfittingPod := pods[0]
859 fittingPod := pods[1]
860
861 kl.HandlePodAdditions(pods)
862
863
864 checkPodStatus(t, kl, notfittingPod, v1.PodFailed)
865 checkPodStatus(t, kl, fittingPod, v1.PodPending)
866 }
867
868
869 func TestHandleNodeSelector(t *testing.T) {
870 testKubelet := newTestKubelet(t, false )
871 defer testKubelet.Cleanup()
872 kl := testKubelet.kubelet
873 nodes := []*v1.Node{
874 {
875 ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname, Labels: map[string]string{"key": "B"}},
876 Status: v1.NodeStatus{
877 Allocatable: v1.ResourceList{
878 v1.ResourcePods: *resource.NewQuantity(110, resource.DecimalSI),
879 },
880 },
881 },
882 }
883 kl.nodeLister = testNodeLister{nodes: nodes}
884
885 recorder := record.NewFakeRecorder(20)
886 nodeRef := &v1.ObjectReference{
887 Kind: "Node",
888 Name: "testNode",
889 UID: types.UID("testNode"),
890 Namespace: "",
891 }
892 testClusterDNSDomain := "TEST"
893 kl.dnsConfigurer = dns.NewConfigurer(recorder, nodeRef, nil, nil, testClusterDNSDomain, "")
894
895 pods := []*v1.Pod{
896 podWithUIDNameNsSpec("123456789", "podA", "foo", v1.PodSpec{NodeSelector: map[string]string{"key": "A"}}),
897 podWithUIDNameNsSpec("987654321", "podB", "foo", v1.PodSpec{NodeSelector: map[string]string{"key": "B"}}),
898 }
899
900 notfittingPod := pods[0]
901 fittingPod := pods[1]
902
903 kl.HandlePodAdditions(pods)
904
905
906 checkPodStatus(t, kl, notfittingPod, v1.PodFailed)
907 checkPodStatus(t, kl, fittingPod, v1.PodPending)
908 }
909
910
911 func TestHandleNodeSelectorBasedOnOS(t *testing.T) {
912 tests := []struct {
913 name string
914 nodeLabels map[string]string
915 podSelector map[string]string
916 podStatus v1.PodPhase
917 }{
918 {
919 name: "correct OS label, wrong pod selector, admission denied",
920 nodeLabels: map[string]string{v1.LabelOSStable: goruntime.GOOS, v1.LabelArchStable: goruntime.GOARCH},
921 podSelector: map[string]string{v1.LabelOSStable: "dummyOS"},
922 podStatus: v1.PodFailed,
923 },
924 {
925 name: "correct OS label, correct pod selector, admission denied",
926 nodeLabels: map[string]string{v1.LabelOSStable: goruntime.GOOS, v1.LabelArchStable: goruntime.GOARCH},
927 podSelector: map[string]string{v1.LabelOSStable: goruntime.GOOS},
928 podStatus: v1.PodPending,
929 },
930 {
931
932 name: "new node label, correct pod selector, admitted",
933 nodeLabels: map[string]string{v1.LabelOSStable: goruntime.GOOS, v1.LabelArchStable: goruntime.GOARCH, "key": "B"},
934 podSelector: map[string]string{"key": "B"},
935 podStatus: v1.PodPending,
936 },
937 }
938 for _, test := range tests {
939 t.Run(test.name, func(t *testing.T) {
940 testKubelet := newTestKubelet(t, false )
941 defer testKubelet.Cleanup()
942 kl := testKubelet.kubelet
943 nodes := []*v1.Node{
944 {
945 ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname, Labels: test.nodeLabels},
946 Status: v1.NodeStatus{
947 Allocatable: v1.ResourceList{
948 v1.ResourcePods: *resource.NewQuantity(110, resource.DecimalSI),
949 },
950 },
951 },
952 }
953 kl.nodeLister = testNodeLister{nodes: nodes}
954
955 recorder := record.NewFakeRecorder(20)
956 nodeRef := &v1.ObjectReference{
957 Kind: "Node",
958 Name: "testNode",
959 UID: types.UID("testNode"),
960 Namespace: "",
961 }
962 testClusterDNSDomain := "TEST"
963 kl.dnsConfigurer = dns.NewConfigurer(recorder, nodeRef, nil, nil, testClusterDNSDomain, "")
964
965 pod := podWithUIDNameNsSpec("123456789", "podA", "foo", v1.PodSpec{NodeSelector: test.podSelector})
966
967 kl.HandlePodAdditions([]*v1.Pod{pod})
968
969
970 checkPodStatus(t, kl, pod, test.podStatus)
971 })
972 }
973 }
974
975
976 func TestHandleMemExceeded(t *testing.T) {
977 testKubelet := newTestKubelet(t, false )
978 defer testKubelet.Cleanup()
979 kl := testKubelet.kubelet
980 nodes := []*v1.Node{
981 {ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname},
982 Status: v1.NodeStatus{Capacity: v1.ResourceList{}, Allocatable: v1.ResourceList{
983 v1.ResourceCPU: *resource.NewMilliQuantity(10, resource.DecimalSI),
984 v1.ResourceMemory: *resource.NewQuantity(100, resource.BinarySI),
985 v1.ResourcePods: *resource.NewQuantity(40, resource.DecimalSI),
986 }}},
987 }
988 kl.nodeLister = testNodeLister{nodes: nodes}
989
990 recorder := record.NewFakeRecorder(20)
991 nodeRef := &v1.ObjectReference{
992 Kind: "Node",
993 Name: "testNode",
994 UID: types.UID("testNode"),
995 Namespace: "",
996 }
997 testClusterDNSDomain := "TEST"
998 kl.dnsConfigurer = dns.NewConfigurer(recorder, nodeRef, nil, nil, testClusterDNSDomain, "")
999
1000 spec := v1.PodSpec{NodeName: string(kl.nodeName),
1001 Containers: []v1.Container{{Resources: v1.ResourceRequirements{
1002 Requests: v1.ResourceList{
1003 v1.ResourceMemory: resource.MustParse("90"),
1004 },
1005 }}},
1006 }
1007 pods := []*v1.Pod{
1008 podWithUIDNameNsSpec("123456789", "newpod", "foo", spec),
1009 podWithUIDNameNsSpec("987654321", "oldpod", "foo", spec),
1010 }
1011
1012 pods[1].CreationTimestamp = metav1.NewTime(time.Now())
1013 pods[0].CreationTimestamp = metav1.NewTime(time.Now().Add(1 * time.Second))
1014
1015 notfittingPod := pods[0]
1016 fittingPod := pods[1]
1017 kl.podWorkers.(*fakePodWorkers).running = map[types.UID]bool{
1018 pods[0].UID: true,
1019 pods[1].UID: true,
1020 }
1021
1022 kl.HandlePodAdditions(pods)
1023
1024
1025 checkPodStatus(t, kl, notfittingPod, v1.PodFailed)
1026 checkPodStatus(t, kl, fittingPod, v1.PodPending)
1027 }
1028
1029
1030
1031 func TestHandlePluginResources(t *testing.T) {
1032 testKubelet := newTestKubelet(t, false )
1033 defer testKubelet.Cleanup()
1034 kl := testKubelet.kubelet
1035
1036 adjustedResource := v1.ResourceName("domain1.com/adjustedResource")
1037 emptyResource := v1.ResourceName("domain2.com/emptyResource")
1038 missingResource := v1.ResourceName("domain2.com/missingResource")
1039 failedResource := v1.ResourceName("domain2.com/failedResource")
1040 resourceQuantity0 := *resource.NewQuantity(int64(0), resource.DecimalSI)
1041 resourceQuantity1 := *resource.NewQuantity(int64(1), resource.DecimalSI)
1042 resourceQuantity2 := *resource.NewQuantity(int64(2), resource.DecimalSI)
1043 resourceQuantityInvalid := *resource.NewQuantity(int64(-1), resource.DecimalSI)
1044 allowedPodQuantity := *resource.NewQuantity(int64(10), resource.DecimalSI)
1045 nodes := []*v1.Node{
1046 {ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname},
1047 Status: v1.NodeStatus{Capacity: v1.ResourceList{}, Allocatable: v1.ResourceList{
1048 adjustedResource: resourceQuantity1,
1049 emptyResource: resourceQuantity0,
1050 v1.ResourcePods: allowedPodQuantity,
1051 }}},
1052 }
1053 kl.nodeLister = testNodeLister{nodes: nodes}
1054
1055 updatePluginResourcesFunc := func(node *schedulerframework.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
1056
1057
1058
1059
1060
1061 updateResourceMap := map[v1.ResourceName]resource.Quantity{
1062 adjustedResource: resourceQuantity2,
1063 emptyResource: resourceQuantity0,
1064 failedResource: resourceQuantityInvalid,
1065 }
1066 pod := attrs.Pod
1067 newAllocatableResource := node.Allocatable.Clone()
1068 for _, container := range pod.Spec.Containers {
1069 for resource := range container.Resources.Requests {
1070 newQuantity, exist := updateResourceMap[resource]
1071 if !exist {
1072 continue
1073 }
1074 if newQuantity.Value() < 0 {
1075 return fmt.Errorf("Allocation failed")
1076 }
1077 newAllocatableResource.ScalarResources[resource] = newQuantity.Value()
1078 }
1079 }
1080 node.Allocatable = newAllocatableResource
1081 return nil
1082 }
1083
1084
1085 kl.admitHandlers = lifecycle.PodAdmitHandlers{}
1086 kl.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(kl.getNodeAnyWay, lifecycle.NewAdmissionFailureHandlerStub(), updatePluginResourcesFunc))
1087
1088 recorder := record.NewFakeRecorder(20)
1089 nodeRef := &v1.ObjectReference{
1090 Kind: "Node",
1091 Name: "testNode",
1092 UID: types.UID("testNode"),
1093 Namespace: "",
1094 }
1095 testClusterDNSDomain := "TEST"
1096 kl.dnsConfigurer = dns.NewConfigurer(recorder, nodeRef, nil, nil, testClusterDNSDomain, "")
1097
1098
1099
1100 fittingPodSpec := v1.PodSpec{NodeName: string(kl.nodeName),
1101 Containers: []v1.Container{{Resources: v1.ResourceRequirements{
1102 Limits: v1.ResourceList{
1103 adjustedResource: resourceQuantity2,
1104 },
1105 Requests: v1.ResourceList{
1106 adjustedResource: resourceQuantity2,
1107 },
1108 }}},
1109 }
1110
1111
1112 emptyPodSpec := v1.PodSpec{NodeName: string(kl.nodeName),
1113 Containers: []v1.Container{{Resources: v1.ResourceRequirements{
1114 Limits: v1.ResourceList{
1115 emptyResource: resourceQuantity2,
1116 },
1117 Requests: v1.ResourceList{
1118 emptyResource: resourceQuantity2,
1119 },
1120 }}},
1121 }
1122
1123
1124
1125
1126
1127 missingPodSpec := v1.PodSpec{NodeName: string(kl.nodeName),
1128 Containers: []v1.Container{{Resources: v1.ResourceRequirements{
1129 Limits: v1.ResourceList{
1130 missingResource: resourceQuantity2,
1131 },
1132 Requests: v1.ResourceList{
1133 missingResource: resourceQuantity2,
1134 },
1135 }}},
1136 }
1137
1138 failedPodSpec := v1.PodSpec{NodeName: string(kl.nodeName),
1139 Containers: []v1.Container{{Resources: v1.ResourceRequirements{
1140 Limits: v1.ResourceList{
1141 failedResource: resourceQuantity1,
1142 },
1143 Requests: v1.ResourceList{
1144 failedResource: resourceQuantity1,
1145 },
1146 }}},
1147 }
1148
1149 fittingPod := podWithUIDNameNsSpec("1", "fittingpod", "foo", fittingPodSpec)
1150 emptyPod := podWithUIDNameNsSpec("2", "emptypod", "foo", emptyPodSpec)
1151 missingPod := podWithUIDNameNsSpec("3", "missingpod", "foo", missingPodSpec)
1152 failedPod := podWithUIDNameNsSpec("4", "failedpod", "foo", failedPodSpec)
1153
1154 kl.HandlePodAdditions([]*v1.Pod{fittingPod, emptyPod, missingPod, failedPod})
1155
1156
1157 checkPodStatus(t, kl, fittingPod, v1.PodPending)
1158 checkPodStatus(t, kl, emptyPod, v1.PodFailed)
1159 checkPodStatus(t, kl, missingPod, v1.PodPending)
1160 checkPodStatus(t, kl, failedPod, v1.PodFailed)
1161 }
1162
1163
1164 func TestPurgingObsoleteStatusMapEntries(t *testing.T) {
1165 ctx := context.Background()
1166 testKubelet := newTestKubelet(t, false )
1167 defer testKubelet.Cleanup()
1168
1169 kl := testKubelet.kubelet
1170 pods := []*v1.Pod{
1171 {ObjectMeta: metav1.ObjectMeta{Name: "pod1", UID: "1234"}, Spec: v1.PodSpec{Containers: []v1.Container{{Ports: []v1.ContainerPort{{HostPort: 80}}}}}},
1172 {ObjectMeta: metav1.ObjectMeta{Name: "pod2", UID: "4567"}, Spec: v1.PodSpec{Containers: []v1.Container{{Ports: []v1.ContainerPort{{HostPort: 80}}}}}},
1173 }
1174 podToTest := pods[1]
1175
1176 kl.HandlePodAdditions(pods)
1177 if _, found := kl.statusManager.GetPodStatus(podToTest.UID); !found {
1178 t.Fatalf("expected to have status cached for pod2")
1179 }
1180
1181 kl.podManager.SetPods([]*v1.Pod{})
1182 kl.HandlePodCleanups(ctx)
1183 if _, found := kl.statusManager.GetPodStatus(podToTest.UID); found {
1184 t.Fatalf("expected to not have status cached for pod2")
1185 }
1186 }
1187
1188 func TestValidateContainerLogStatus(t *testing.T) {
1189 testKubelet := newTestKubelet(t, false )
1190 defer testKubelet.Cleanup()
1191 kubelet := testKubelet.kubelet
1192 containerName := "x"
1193 testCases := []struct {
1194 statuses []v1.ContainerStatus
1195 success bool
1196 pSuccess bool
1197 }{
1198 {
1199 statuses: []v1.ContainerStatus{
1200 {
1201 Name: containerName,
1202 State: v1.ContainerState{
1203 Running: &v1.ContainerStateRunning{},
1204 },
1205 LastTerminationState: v1.ContainerState{
1206 Terminated: &v1.ContainerStateTerminated{ContainerID: "docker://fakeid"},
1207 },
1208 },
1209 },
1210 success: true,
1211 pSuccess: true,
1212 },
1213 {
1214 statuses: []v1.ContainerStatus{
1215 {
1216 Name: containerName,
1217 State: v1.ContainerState{
1218 Running: &v1.ContainerStateRunning{},
1219 },
1220 },
1221 },
1222 success: true,
1223 pSuccess: false,
1224 },
1225 {
1226 statuses: []v1.ContainerStatus{
1227 {
1228 Name: containerName,
1229 State: v1.ContainerState{
1230 Terminated: &v1.ContainerStateTerminated{},
1231 },
1232 },
1233 },
1234 success: false,
1235 pSuccess: false,
1236 },
1237 {
1238 statuses: []v1.ContainerStatus{
1239 {
1240 Name: containerName,
1241 State: v1.ContainerState{
1242 Terminated: &v1.ContainerStateTerminated{ContainerID: "docker://fakeid"},
1243 },
1244 },
1245 },
1246 success: true,
1247 pSuccess: false,
1248 },
1249 {
1250 statuses: []v1.ContainerStatus{
1251 {
1252 Name: containerName,
1253 State: v1.ContainerState{
1254 Terminated: &v1.ContainerStateTerminated{},
1255 },
1256 LastTerminationState: v1.ContainerState{
1257 Terminated: &v1.ContainerStateTerminated{},
1258 },
1259 },
1260 },
1261 success: false,
1262 pSuccess: false,
1263 },
1264 {
1265 statuses: []v1.ContainerStatus{
1266 {
1267 Name: containerName,
1268 State: v1.ContainerState{
1269 Terminated: &v1.ContainerStateTerminated{},
1270 },
1271 LastTerminationState: v1.ContainerState{
1272 Terminated: &v1.ContainerStateTerminated{ContainerID: "docker://fakeid"},
1273 },
1274 },
1275 },
1276 success: true,
1277 pSuccess: true,
1278 },
1279 {
1280 statuses: []v1.ContainerStatus{
1281 {
1282 Name: containerName,
1283 State: v1.ContainerState{
1284 Waiting: &v1.ContainerStateWaiting{},
1285 },
1286 },
1287 },
1288 success: false,
1289 pSuccess: false,
1290 },
1291 {
1292 statuses: []v1.ContainerStatus{
1293 {
1294 Name: containerName,
1295 State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{Reason: "ErrImagePull"}},
1296 },
1297 },
1298 success: false,
1299 pSuccess: false,
1300 },
1301 {
1302 statuses: []v1.ContainerStatus{
1303 {
1304 Name: containerName,
1305 State: v1.ContainerState{Waiting: &v1.ContainerStateWaiting{Reason: "ErrImagePullBackOff"}},
1306 },
1307 },
1308 success: false,
1309 pSuccess: false,
1310 },
1311 }
1312
1313 for i, tc := range testCases {
1314
1315 previous := false
1316 podStatus := &v1.PodStatus{ContainerStatuses: tc.statuses}
1317 _, err := kubelet.validateContainerLogStatus("podName", podStatus, containerName, previous)
1318 if !tc.success {
1319 assert.Error(t, err, fmt.Sprintf("[case %d] error", i))
1320 } else {
1321 assert.NoError(t, err, "[case %d] error", i)
1322 }
1323
1324 previous = true
1325 _, err = kubelet.validateContainerLogStatus("podName", podStatus, containerName, previous)
1326 if !tc.pSuccess {
1327 assert.Error(t, err, fmt.Sprintf("[case %d] error", i))
1328 } else {
1329 assert.NoError(t, err, "[case %d] error", i)
1330 }
1331
1332 _, err = kubelet.validateContainerLogStatus("podName", podStatus, "blah", false)
1333 assert.Error(t, err, fmt.Sprintf("[case %d] invalid container name should cause an error", i))
1334 }
1335 }
1336
1337 func TestCreateMirrorPod(t *testing.T) {
1338 tests := []struct {
1339 name string
1340 updateType kubetypes.SyncPodType
1341 }{
1342 {
1343 name: "SyncPodCreate",
1344 updateType: kubetypes.SyncPodCreate,
1345 },
1346 {
1347 name: "SyncPodUpdate",
1348 updateType: kubetypes.SyncPodUpdate,
1349 },
1350 }
1351 for _, tt := range tests {
1352 tt := tt
1353 t.Run(tt.name, func(t *testing.T) {
1354 testKubelet := newTestKubelet(t, false )
1355 defer testKubelet.Cleanup()
1356
1357 kl := testKubelet.kubelet
1358 manager := testKubelet.fakeMirrorClient
1359 pod := podWithUIDNameNs("12345678", "bar", "foo")
1360 pod.Annotations[kubetypes.ConfigSourceAnnotationKey] = "file"
1361 pods := []*v1.Pod{pod}
1362 kl.podManager.SetPods(pods)
1363 isTerminal, err := kl.SyncPod(context.Background(), tt.updateType, pod, nil, &kubecontainer.PodStatus{})
1364 assert.NoError(t, err)
1365 if isTerminal {
1366 t.Fatalf("pod should not be terminal: %#v", pod)
1367 }
1368 podFullName := kubecontainer.GetPodFullName(pod)
1369 assert.True(t, manager.HasPod(podFullName), "Expected mirror pod %q to be created", podFullName)
1370 assert.Equal(t, 1, manager.NumOfPods(), "Expected only 1 mirror pod %q, got %+v", podFullName, manager.GetPods())
1371 })
1372 }
1373 }
1374
1375 func TestDeleteOutdatedMirrorPod(t *testing.T) {
1376 testKubelet := newTestKubelet(t, false )
1377 defer testKubelet.Cleanup()
1378
1379 kl := testKubelet.kubelet
1380 manager := testKubelet.fakeMirrorClient
1381 pod := podWithUIDNameNsSpec("12345678", "foo", "ns", v1.PodSpec{
1382 Containers: []v1.Container{
1383 {Name: "1234", Image: "foo"},
1384 },
1385 })
1386 pod.Annotations[kubetypes.ConfigSourceAnnotationKey] = "file"
1387
1388
1389 mirrorPod := podWithUIDNameNsSpec("11111111", "foo", "ns", v1.PodSpec{
1390 Containers: []v1.Container{
1391 {Name: "1234", Image: "bar"},
1392 },
1393 })
1394 mirrorPod.Annotations[kubetypes.ConfigSourceAnnotationKey] = "api"
1395 mirrorPod.Annotations[kubetypes.ConfigMirrorAnnotationKey] = "mirror"
1396
1397 pods := []*v1.Pod{pod, mirrorPod}
1398 kl.podManager.SetPods(pods)
1399 isTerminal, err := kl.SyncPod(context.Background(), kubetypes.SyncPodUpdate, pod, mirrorPod, &kubecontainer.PodStatus{})
1400 assert.NoError(t, err)
1401 if isTerminal {
1402 t.Fatalf("pod should not be terminal: %#v", pod)
1403 }
1404 name := kubecontainer.GetPodFullName(pod)
1405 creates, deletes := manager.GetCounts(name)
1406 if creates != 1 || deletes != 1 {
1407 t.Errorf("expected 1 creation and 1 deletion of %q, got %d, %d", name, creates, deletes)
1408 }
1409 }
1410
1411 func TestDeleteOrphanedMirrorPods(t *testing.T) {
1412 ctx := context.Background()
1413 testKubelet := newTestKubelet(t, false )
1414 defer testKubelet.Cleanup()
1415
1416 kl := testKubelet.kubelet
1417 manager := testKubelet.fakeMirrorClient
1418 orphanPods := []*v1.Pod{
1419 {
1420 ObjectMeta: metav1.ObjectMeta{
1421 UID: "12345678",
1422 Name: "pod1",
1423 Namespace: "ns",
1424 Annotations: map[string]string{
1425 kubetypes.ConfigSourceAnnotationKey: "api",
1426 kubetypes.ConfigMirrorAnnotationKey: "mirror",
1427 },
1428 },
1429 },
1430 {
1431 ObjectMeta: metav1.ObjectMeta{
1432 UID: "12345679",
1433 Name: "pod2",
1434 Namespace: "ns",
1435 Annotations: map[string]string{
1436 kubetypes.ConfigSourceAnnotationKey: "api",
1437 kubetypes.ConfigMirrorAnnotationKey: "mirror",
1438 },
1439 },
1440 },
1441 {
1442 ObjectMeta: metav1.ObjectMeta{
1443 UID: "12345670",
1444 Name: "pod3",
1445 Namespace: "ns",
1446 Annotations: map[string]string{
1447 kubetypes.ConfigSourceAnnotationKey: "api",
1448 kubetypes.ConfigMirrorAnnotationKey: "mirror",
1449 },
1450 },
1451 },
1452 }
1453
1454 kl.podManager.SetPods(orphanPods)
1455
1456
1457 kl.podWorkers.(*fakePodWorkers).terminatingStaticPods = map[string]bool{
1458 kubecontainer.GetPodFullName(orphanPods[2]): true,
1459 }
1460
1461
1462 kl.HandlePodCleanups(ctx)
1463 assert.Len(t, manager.GetPods(), 0, "Expected 0 mirror pods")
1464 for i, pod := range orphanPods {
1465 name := kubecontainer.GetPodFullName(pod)
1466 creates, deletes := manager.GetCounts(name)
1467 switch i {
1468 case 2:
1469 if creates != 0 || deletes != 0 {
1470 t.Errorf("expected 0 creation and 0 deletion of %q, got %d, %d", name, creates, deletes)
1471 }
1472 default:
1473 if creates != 0 || deletes != 1 {
1474 t.Errorf("expected 0 creation and one deletion of %q, got %d, %d", name, creates, deletes)
1475 }
1476 }
1477 }
1478 }
1479
1480 func TestNetworkErrorsWithoutHostNetwork(t *testing.T) {
1481 testKubelet := newTestKubelet(t, false )
1482 defer testKubelet.Cleanup()
1483 kubelet := testKubelet.kubelet
1484
1485 kubelet.runtimeState.setNetworkState(fmt.Errorf("simulated network error"))
1486
1487 pod := podWithUIDNameNsSpec("12345678", "hostnetwork", "new", v1.PodSpec{
1488 HostNetwork: false,
1489
1490 Containers: []v1.Container{
1491 {Name: "foo"},
1492 },
1493 })
1494
1495 kubelet.podManager.SetPods([]*v1.Pod{pod})
1496 isTerminal, err := kubelet.SyncPod(context.Background(), kubetypes.SyncPodUpdate, pod, nil, &kubecontainer.PodStatus{})
1497 assert.Error(t, err, "expected pod with hostNetwork=false to fail when network in error")
1498 if isTerminal {
1499 t.Fatalf("pod should not be terminal: %#v", pod)
1500 }
1501
1502 pod.Annotations[kubetypes.ConfigSourceAnnotationKey] = kubetypes.FileSource
1503 pod.Spec.HostNetwork = true
1504 isTerminal, err = kubelet.SyncPod(context.Background(), kubetypes.SyncPodUpdate, pod, nil, &kubecontainer.PodStatus{})
1505 assert.NoError(t, err, "expected pod with hostNetwork=true to succeed when network in error")
1506 if isTerminal {
1507 t.Fatalf("pod should not be terminal: %#v", pod)
1508 }
1509 }
1510
1511 func TestFilterOutInactivePods(t *testing.T) {
1512 testKubelet := newTestKubelet(t, false )
1513 defer testKubelet.Cleanup()
1514 kubelet := testKubelet.kubelet
1515 pods := newTestPods(8)
1516 now := metav1.NewTime(time.Now())
1517
1518
1519 pods[0].Status.Phase = v1.PodFailed
1520 pods[1].Status.Phase = v1.PodSucceeded
1521
1522
1523 pods[2].Status.Phase = v1.PodRunning
1524 pods[2].DeletionTimestamp = &now
1525 pods[2].Status.ContainerStatuses = []v1.ContainerStatus{
1526 {State: v1.ContainerState{
1527 Running: &v1.ContainerStateRunning{
1528 StartedAt: now,
1529 },
1530 }},
1531 }
1532
1533
1534 pods[3].Status.Phase = v1.PodPending
1535 pods[4].Status.Phase = v1.PodRunning
1536
1537
1538 pods[5].Status.Phase = v1.PodRunning
1539 kubelet.statusManager.SetPodStatus(pods[5], v1.PodStatus{Phase: v1.PodFailed})
1540
1541
1542 pods[6].Status.Phase = v1.PodRunning
1543 kubelet.podWorkers.(*fakePodWorkers).terminated = map[types.UID]bool{
1544 pods[6].UID: true,
1545 }
1546
1547
1548
1549 pods[7].Status.Phase = v1.PodFailed
1550 kubelet.podWorkers.(*fakePodWorkers).terminationRequested = map[types.UID]bool{
1551 pods[7].UID: true,
1552 }
1553
1554 expected := []*v1.Pod{pods[2], pods[3], pods[4], pods[7]}
1555 kubelet.podManager.SetPods(pods)
1556 actual := kubelet.filterOutInactivePods(pods)
1557 assert.Equal(t, expected, actual)
1558 }
1559
1560 func TestCheckpointContainer(t *testing.T) {
1561 testKubelet := newTestKubelet(t, false )
1562 defer testKubelet.Cleanup()
1563 kubelet := testKubelet.kubelet
1564
1565 fakeRuntime := testKubelet.fakeRuntime
1566 containerID := kubecontainer.ContainerID{
1567 Type: "test",
1568 ID: "abc1234",
1569 }
1570
1571 fakePod := &containertest.FakePod{
1572 Pod: &kubecontainer.Pod{
1573 ID: "12345678",
1574 Name: "podFoo",
1575 Namespace: "nsFoo",
1576 Containers: []*kubecontainer.Container{
1577 {
1578 Name: "containerFoo",
1579 ID: containerID,
1580 },
1581 },
1582 },
1583 }
1584
1585 fakeRuntime.PodList = []*containertest.FakePod{fakePod}
1586 wrongContainerName := "wrongContainerName"
1587
1588 tests := []struct {
1589 name string
1590 containerName string
1591 checkpointLocation string
1592 expectedStatus error
1593 expectedLocation string
1594 }{
1595 {
1596 name: "Checkpoint with wrong container name",
1597 containerName: wrongContainerName,
1598 checkpointLocation: "",
1599 expectedStatus: fmt.Errorf("container %s not found", wrongContainerName),
1600 expectedLocation: "",
1601 },
1602 {
1603 name: "Checkpoint with default checkpoint location",
1604 containerName: fakePod.Pod.Containers[0].Name,
1605 checkpointLocation: "",
1606 expectedStatus: nil,
1607 expectedLocation: filepath.Join(
1608 kubelet.getCheckpointsDir(),
1609 fmt.Sprintf(
1610 "checkpoint-%s_%s-%s",
1611 fakePod.Pod.Name,
1612 fakePod.Pod.Namespace,
1613 fakePod.Pod.Containers[0].Name,
1614 ),
1615 ),
1616 },
1617 {
1618 name: "Checkpoint with ignored location",
1619 containerName: fakePod.Pod.Containers[0].Name,
1620 checkpointLocation: "somethingThatWillBeIgnored",
1621 expectedStatus: nil,
1622 expectedLocation: filepath.Join(
1623 kubelet.getCheckpointsDir(),
1624 fmt.Sprintf(
1625 "checkpoint-%s_%s-%s",
1626 fakePod.Pod.Name,
1627 fakePod.Pod.Namespace,
1628 fakePod.Pod.Containers[0].Name,
1629 ),
1630 ),
1631 },
1632 }
1633
1634 for _, test := range tests {
1635 t.Run(test.name, func(t *testing.T) {
1636 ctx := context.Background()
1637 options := &runtimeapi.CheckpointContainerRequest{}
1638 if test.checkpointLocation != "" {
1639 options.Location = test.checkpointLocation
1640 }
1641 status := kubelet.CheckpointContainer(
1642 ctx,
1643 fakePod.Pod.ID,
1644 fmt.Sprintf(
1645 "%s_%s",
1646 fakePod.Pod.Name,
1647 fakePod.Pod.Namespace,
1648 ),
1649 test.containerName,
1650 options,
1651 )
1652 require.Equal(t, test.expectedStatus, status)
1653
1654 if status != nil {
1655 return
1656 }
1657
1658 require.True(
1659 t,
1660 strings.HasPrefix(
1661 options.Location,
1662 test.expectedLocation,
1663 ),
1664 )
1665 require.Equal(
1666 t,
1667 options.ContainerId,
1668 containerID.ID,
1669 )
1670
1671 })
1672 }
1673 }
1674
1675 func TestSyncPodsSetStatusToFailedForPodsThatRunTooLong(t *testing.T) {
1676 testKubelet := newTestKubelet(t, false )
1677 defer testKubelet.Cleanup()
1678 fakeRuntime := testKubelet.fakeRuntime
1679 kubelet := testKubelet.kubelet
1680
1681 now := metav1.Now()
1682 startTime := metav1.NewTime(now.Time.Add(-1 * time.Minute))
1683 exceededActiveDeadlineSeconds := int64(30)
1684
1685 pods := []*v1.Pod{
1686 {
1687 ObjectMeta: metav1.ObjectMeta{
1688 UID: "12345678",
1689 Name: "bar",
1690 Namespace: "new",
1691 },
1692 Spec: v1.PodSpec{
1693 Containers: []v1.Container{
1694 {Name: "foo"},
1695 },
1696 ActiveDeadlineSeconds: &exceededActiveDeadlineSeconds,
1697 },
1698 Status: v1.PodStatus{
1699 StartTime: &startTime,
1700 },
1701 },
1702 }
1703
1704 fakeRuntime.PodList = []*containertest.FakePod{
1705 {Pod: &kubecontainer.Pod{
1706 ID: "12345678",
1707 Name: "bar",
1708 Namespace: "new",
1709 Containers: []*kubecontainer.Container{
1710 {Name: "foo"},
1711 },
1712 }},
1713 }
1714
1715
1716 kubelet.HandlePodUpdates(pods)
1717 status, found := kubelet.statusManager.GetPodStatus(pods[0].UID)
1718 assert.True(t, found, "expected to found status for pod %q", pods[0].UID)
1719 assert.Equal(t, v1.PodFailed, status.Phase)
1720
1721 assert.NotNil(t, status.ContainerStatuses)
1722 }
1723
1724 func TestSyncPodsDoesNotSetPodsThatDidNotRunTooLongToFailed(t *testing.T) {
1725 testKubelet := newTestKubelet(t, false )
1726 defer testKubelet.Cleanup()
1727 fakeRuntime := testKubelet.fakeRuntime
1728
1729 kubelet := testKubelet.kubelet
1730
1731 now := metav1.Now()
1732 startTime := metav1.NewTime(now.Time.Add(-1 * time.Minute))
1733 exceededActiveDeadlineSeconds := int64(300)
1734
1735 pods := []*v1.Pod{
1736 {
1737 ObjectMeta: metav1.ObjectMeta{
1738 UID: "12345678",
1739 Name: "bar",
1740 Namespace: "new",
1741 },
1742 Spec: v1.PodSpec{
1743 Containers: []v1.Container{
1744 {Name: "foo"},
1745 },
1746 ActiveDeadlineSeconds: &exceededActiveDeadlineSeconds,
1747 },
1748 Status: v1.PodStatus{
1749 StartTime: &startTime,
1750 },
1751 },
1752 }
1753
1754 fakeRuntime.PodList = []*containertest.FakePod{
1755 {Pod: &kubecontainer.Pod{
1756 ID: "12345678",
1757 Name: "bar",
1758 Namespace: "new",
1759 Containers: []*kubecontainer.Container{
1760 {Name: "foo"},
1761 },
1762 }},
1763 }
1764
1765 kubelet.podManager.SetPods(pods)
1766 kubelet.HandlePodUpdates(pods)
1767 status, found := kubelet.statusManager.GetPodStatus(pods[0].UID)
1768 assert.True(t, found, "expected to found status for pod %q", pods[0].UID)
1769 assert.NotEqual(t, v1.PodFailed, status.Phase)
1770 }
1771
1772 func podWithUIDNameNs(uid types.UID, name, namespace string) *v1.Pod {
1773 return &v1.Pod{
1774 ObjectMeta: metav1.ObjectMeta{
1775 UID: uid,
1776 Name: name,
1777 Namespace: namespace,
1778 Annotations: map[string]string{},
1779 },
1780 }
1781 }
1782
1783 func podWithUIDNameNsSpec(uid types.UID, name, namespace string, spec v1.PodSpec) *v1.Pod {
1784 pod := podWithUIDNameNs(uid, name, namespace)
1785 pod.Spec = spec
1786 return pod
1787 }
1788
1789 func TestDeletePodDirsForDeletedPods(t *testing.T) {
1790 ctx := context.Background()
1791 testKubelet := newTestKubelet(t, false )
1792 defer testKubelet.Cleanup()
1793 kl := testKubelet.kubelet
1794 pods := []*v1.Pod{
1795 podWithUIDNameNs("12345678", "pod1", "ns"),
1796 podWithUIDNameNs("12345679", "pod2", "ns"),
1797 }
1798
1799 kl.podManager.SetPods(pods)
1800
1801 kl.HandlePodSyncs(kl.podManager.GetPods())
1802 for i := range pods {
1803 assert.True(t, dirExists(kl.getPodDir(pods[i].UID)), "Expected directory to exist for pod %d", i)
1804 }
1805
1806
1807 kl.podManager.SetPods([]*v1.Pod{pods[0]})
1808 kl.HandlePodCleanups(ctx)
1809 assert.True(t, dirExists(kl.getPodDir(pods[0].UID)), "Expected directory to exist for pod 0")
1810 assert.False(t, dirExists(kl.getPodDir(pods[1].UID)), "Expected directory to be deleted for pod 1")
1811 }
1812
1813 func syncAndVerifyPodDir(t *testing.T, testKubelet *TestKubelet, pods []*v1.Pod, podsToCheck []*v1.Pod, shouldExist bool) {
1814 ctx := context.Background()
1815 t.Helper()
1816 kl := testKubelet.kubelet
1817
1818 kl.podManager.SetPods(pods)
1819 kl.HandlePodSyncs(pods)
1820 kl.HandlePodCleanups(ctx)
1821 for i, pod := range podsToCheck {
1822 exist := dirExists(kl.getPodDir(pod.UID))
1823 assert.Equal(t, shouldExist, exist, "directory of pod %d", i)
1824 }
1825 }
1826
1827 func TestDoesNotDeletePodDirsForTerminatedPods(t *testing.T) {
1828 testKubelet := newTestKubelet(t, false )
1829 defer testKubelet.Cleanup()
1830 kl := testKubelet.kubelet
1831 pods := []*v1.Pod{
1832 podWithUIDNameNs("12345678", "pod1", "ns"),
1833 podWithUIDNameNs("12345679", "pod2", "ns"),
1834 podWithUIDNameNs("12345680", "pod3", "ns"),
1835 }
1836 syncAndVerifyPodDir(t, testKubelet, pods, pods, true)
1837
1838
1839 kl.statusManager.SetPodStatus(pods[1], v1.PodStatus{Phase: v1.PodFailed})
1840 kl.statusManager.SetPodStatus(pods[2], v1.PodStatus{Phase: v1.PodSucceeded})
1841 syncAndVerifyPodDir(t, testKubelet, pods, pods, true)
1842 }
1843
1844 func TestDoesNotDeletePodDirsIfContainerIsRunning(t *testing.T) {
1845 testKubelet := newTestKubelet(t, false )
1846 defer testKubelet.Cleanup()
1847 runningPod := &kubecontainer.Pod{
1848 ID: "12345678",
1849 Name: "pod1",
1850 Namespace: "ns",
1851 }
1852 apiPod := podWithUIDNameNs(runningPod.ID, runningPod.Name, runningPod.Namespace)
1853
1854
1855
1856 pods := []*v1.Pod{apiPod}
1857 testKubelet.kubelet.podWorkers.(*fakePodWorkers).running = map[types.UID]bool{apiPod.UID: true}
1858 syncAndVerifyPodDir(t, testKubelet, pods, []*v1.Pod{apiPod}, true)
1859
1860
1861
1862 pods = []*v1.Pod{}
1863 testKubelet.fakeRuntime.PodList = []*containertest.FakePod{{Pod: runningPod, NetnsPath: ""}}
1864 syncAndVerifyPodDir(t, testKubelet, pods, []*v1.Pod{apiPod}, true)
1865
1866
1867
1868 pods = []*v1.Pod{}
1869 testKubelet.fakeRuntime.PodList = []*containertest.FakePod{}
1870 testKubelet.kubelet.podWorkers.(*fakePodWorkers).running = nil
1871 syncAndVerifyPodDir(t, testKubelet, pods, []*v1.Pod{apiPod}, false)
1872 }
1873
1874 func TestGetPodsToSync(t *testing.T) {
1875 testKubelet := newTestKubelet(t, false )
1876 defer testKubelet.Cleanup()
1877 kubelet := testKubelet.kubelet
1878 clock := testKubelet.fakeClock
1879 pods := newTestPods(5)
1880
1881 exceededActiveDeadlineSeconds := int64(30)
1882 notYetActiveDeadlineSeconds := int64(120)
1883 startTime := metav1.NewTime(clock.Now())
1884 pods[0].Status.StartTime = &startTime
1885 pods[0].Spec.ActiveDeadlineSeconds = &exceededActiveDeadlineSeconds
1886 pods[1].Status.StartTime = &startTime
1887 pods[1].Spec.ActiveDeadlineSeconds = ¬YetActiveDeadlineSeconds
1888 pods[2].Status.StartTime = &startTime
1889 pods[2].Spec.ActiveDeadlineSeconds = &exceededActiveDeadlineSeconds
1890
1891 kubelet.podManager.SetPods(pods)
1892 kubelet.workQueue.Enqueue(pods[2].UID, 0)
1893 kubelet.workQueue.Enqueue(pods[3].UID, 30*time.Second)
1894 kubelet.workQueue.Enqueue(pods[4].UID, 2*time.Minute)
1895
1896 clock.Step(1 * time.Minute)
1897
1898 expected := []*v1.Pod{pods[2], pods[3], pods[0]}
1899 podsToSync := kubelet.getPodsToSync()
1900 sort.Sort(podsByUID(expected))
1901 sort.Sort(podsByUID(podsToSync))
1902 assert.Equal(t, expected, podsToSync)
1903 }
1904
1905 func TestGenerateAPIPodStatusWithSortedContainers(t *testing.T) {
1906 testKubelet := newTestKubelet(t, false )
1907 defer testKubelet.Cleanup()
1908 kubelet := testKubelet.kubelet
1909 numContainers := 10
1910 expectedOrder := []string{}
1911 cStatuses := []*kubecontainer.Status{}
1912 specContainerList := []v1.Container{}
1913 for i := 0; i < numContainers; i++ {
1914 id := fmt.Sprintf("%v", i)
1915 containerName := fmt.Sprintf("%vcontainer", id)
1916 expectedOrder = append(expectedOrder, containerName)
1917 cStatus := &kubecontainer.Status{
1918 ID: kubecontainer.BuildContainerID("test", id),
1919 Name: containerName,
1920 }
1921
1922 if i%2 == 0 {
1923 cStatuses = append(cStatuses, cStatus)
1924 } else {
1925 cStatuses = append([]*kubecontainer.Status{cStatus}, cStatuses...)
1926 }
1927 specContainerList = append(specContainerList, v1.Container{Name: containerName})
1928 }
1929 pod := podWithUIDNameNs("uid1", "foo", "test")
1930 pod.Spec = v1.PodSpec{
1931 Containers: specContainerList,
1932 }
1933
1934 status := &kubecontainer.PodStatus{
1935 ID: pod.UID,
1936 Name: pod.Name,
1937 Namespace: pod.Namespace,
1938 ContainerStatuses: cStatuses,
1939 }
1940 for i := 0; i < 5; i++ {
1941 apiStatus := kubelet.generateAPIPodStatus(pod, status, false)
1942 for i, c := range apiStatus.ContainerStatuses {
1943 if expectedOrder[i] != c.Name {
1944 t.Fatalf("Container status not sorted, expected %v at index %d, but found %v", expectedOrder[i], i, c.Name)
1945 }
1946 }
1947 }
1948 }
1949
1950 func verifyContainerStatuses(t *testing.T, statuses []v1.ContainerStatus, expectedState, expectedLastTerminationState map[string]v1.ContainerState, message string) {
1951 for _, s := range statuses {
1952 assert.Equal(t, expectedState[s.Name], s.State, "%s: state", message)
1953 assert.Equal(t, expectedLastTerminationState[s.Name], s.LastTerminationState, "%s: last terminated state", message)
1954 }
1955 }
1956
1957
1958 func TestGenerateAPIPodStatusWithReasonCache(t *testing.T) {
1959
1960 testTimestamp := time.Unix(123456789, 987654321)
1961 testErrorReason := fmt.Errorf("test-error")
1962 emptyContainerID := (&kubecontainer.ContainerID{}).String()
1963 testKubelet := newTestKubelet(t, false )
1964 defer testKubelet.Cleanup()
1965 kubelet := testKubelet.kubelet
1966 pod := podWithUIDNameNs("12345678", "foo", "new")
1967 pod.Spec = v1.PodSpec{RestartPolicy: v1.RestartPolicyOnFailure}
1968
1969 podStatus := &kubecontainer.PodStatus{
1970 ID: pod.UID,
1971 Name: pod.Name,
1972 Namespace: pod.Namespace,
1973 }
1974 tests := []struct {
1975 containers []v1.Container
1976 statuses []*kubecontainer.Status
1977 reasons map[string]error
1978 oldStatuses []v1.ContainerStatus
1979 expectedState map[string]v1.ContainerState
1980
1981 expectedInitState map[string]v1.ContainerState
1982 expectedLastTerminationState map[string]v1.ContainerState
1983 }{
1984
1985
1986 {
1987 containers: []v1.Container{{Name: "without-old-record"}, {Name: "with-old-record"}},
1988 statuses: []*kubecontainer.Status{},
1989 reasons: map[string]error{},
1990 oldStatuses: []v1.ContainerStatus{{
1991 Name: "with-old-record",
1992 LastTerminationState: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{}},
1993 }},
1994 expectedState: map[string]v1.ContainerState{
1995 "without-old-record": {Waiting: &v1.ContainerStateWaiting{
1996 Reason: ContainerCreating,
1997 }},
1998 "with-old-record": {Waiting: &v1.ContainerStateWaiting{
1999 Reason: ContainerCreating,
2000 }},
2001 },
2002 expectedInitState: map[string]v1.ContainerState{
2003 "without-old-record": {Waiting: &v1.ContainerStateWaiting{
2004 Reason: PodInitializing,
2005 }},
2006 "with-old-record": {Waiting: &v1.ContainerStateWaiting{
2007 Reason: PodInitializing,
2008 }},
2009 },
2010 expectedLastTerminationState: map[string]v1.ContainerState{
2011 "with-old-record": {Terminated: &v1.ContainerStateTerminated{}},
2012 },
2013 },
2014
2015 {
2016 containers: []v1.Container{{Name: "running"}},
2017 statuses: []*kubecontainer.Status{
2018 {
2019 Name: "running",
2020 State: kubecontainer.ContainerStateRunning,
2021 StartedAt: testTimestamp,
2022 },
2023 {
2024 Name: "running",
2025 State: kubecontainer.ContainerStateExited,
2026 ExitCode: 1,
2027 },
2028 },
2029 reasons: map[string]error{},
2030 oldStatuses: []v1.ContainerStatus{},
2031 expectedState: map[string]v1.ContainerState{
2032 "running": {Running: &v1.ContainerStateRunning{
2033 StartedAt: metav1.NewTime(testTimestamp),
2034 }},
2035 },
2036 expectedLastTerminationState: map[string]v1.ContainerState{
2037 "running": {Terminated: &v1.ContainerStateTerminated{
2038 ExitCode: 1,
2039 ContainerID: emptyContainerID,
2040 }},
2041 },
2042 },
2043
2044
2045
2046
2047
2048
2049
2050
2051 {
2052 containers: []v1.Container{{Name: "without-reason"}, {Name: "with-reason"}},
2053 statuses: []*kubecontainer.Status{
2054 {
2055 Name: "without-reason",
2056 State: kubecontainer.ContainerStateExited,
2057 ExitCode: 1,
2058 },
2059 {
2060 Name: "with-reason",
2061 State: kubecontainer.ContainerStateExited,
2062 ExitCode: 2,
2063 },
2064 {
2065 Name: "without-reason",
2066 State: kubecontainer.ContainerStateExited,
2067 ExitCode: 3,
2068 },
2069 {
2070 Name: "with-reason",
2071 State: kubecontainer.ContainerStateExited,
2072 ExitCode: 4,
2073 },
2074 {
2075 Name: "succeed",
2076 State: kubecontainer.ContainerStateExited,
2077 ExitCode: 0,
2078 },
2079 {
2080 Name: "succeed",
2081 State: kubecontainer.ContainerStateExited,
2082 ExitCode: 5,
2083 },
2084 },
2085 reasons: map[string]error{"with-reason": testErrorReason, "succeed": testErrorReason},
2086 oldStatuses: []v1.ContainerStatus{},
2087 expectedState: map[string]v1.ContainerState{
2088 "without-reason": {Terminated: &v1.ContainerStateTerminated{
2089 ExitCode: 1,
2090 ContainerID: emptyContainerID,
2091 }},
2092 "with-reason": {Waiting: &v1.ContainerStateWaiting{Reason: testErrorReason.Error()}},
2093 "succeed": {Terminated: &v1.ContainerStateTerminated{
2094 ExitCode: 0,
2095 ContainerID: emptyContainerID,
2096 }},
2097 },
2098 expectedLastTerminationState: map[string]v1.ContainerState{
2099 "without-reason": {Terminated: &v1.ContainerStateTerminated{
2100 ExitCode: 3,
2101 ContainerID: emptyContainerID,
2102 }},
2103 "with-reason": {Terminated: &v1.ContainerStateTerminated{
2104 ExitCode: 2,
2105 ContainerID: emptyContainerID,
2106 }},
2107 "succeed": {Terminated: &v1.ContainerStateTerminated{
2108 ExitCode: 5,
2109 ContainerID: emptyContainerID,
2110 }},
2111 },
2112 },
2113
2114
2115
2116
2117 {
2118 containers: []v1.Container{{Name: "unknown"}},
2119 statuses: []*kubecontainer.Status{
2120 {
2121 Name: "unknown",
2122 State: kubecontainer.ContainerStateUnknown,
2123 },
2124 {
2125 Name: "unknown",
2126 State: kubecontainer.ContainerStateRunning,
2127 },
2128 },
2129 reasons: map[string]error{},
2130 oldStatuses: []v1.ContainerStatus{{
2131 Name: "unknown",
2132 State: v1.ContainerState{Running: &v1.ContainerStateRunning{}},
2133 }},
2134 expectedState: map[string]v1.ContainerState{
2135 "unknown": {Terminated: &v1.ContainerStateTerminated{
2136 ExitCode: 137,
2137 Message: "The container could not be located when the pod was terminated",
2138 Reason: "ContainerStatusUnknown",
2139 }},
2140 },
2141 expectedLastTerminationState: map[string]v1.ContainerState{
2142 "unknown": {Running: &v1.ContainerStateRunning{}},
2143 },
2144 },
2145 }
2146
2147 for i, test := range tests {
2148 kubelet.reasonCache = NewReasonCache()
2149 for n, e := range test.reasons {
2150 kubelet.reasonCache.add(pod.UID, n, e, "")
2151 }
2152 pod.Spec.Containers = test.containers
2153 pod.Status.ContainerStatuses = test.oldStatuses
2154 podStatus.ContainerStatuses = test.statuses
2155 apiStatus := kubelet.generateAPIPodStatus(pod, podStatus, false)
2156 verifyContainerStatuses(t, apiStatus.ContainerStatuses, test.expectedState, test.expectedLastTerminationState, fmt.Sprintf("case %d", i))
2157 }
2158
2159
2160 for i, test := range tests {
2161 kubelet.reasonCache = NewReasonCache()
2162 for n, e := range test.reasons {
2163 kubelet.reasonCache.add(pod.UID, n, e, "")
2164 }
2165 pod.Spec.InitContainers = test.containers
2166 pod.Status.InitContainerStatuses = test.oldStatuses
2167 podStatus.ContainerStatuses = test.statuses
2168 apiStatus := kubelet.generateAPIPodStatus(pod, podStatus, false)
2169 expectedState := test.expectedState
2170 if test.expectedInitState != nil {
2171 expectedState = test.expectedInitState
2172 }
2173 verifyContainerStatuses(t, apiStatus.InitContainerStatuses, expectedState, test.expectedLastTerminationState, fmt.Sprintf("case %d", i))
2174 }
2175 }
2176
2177
2178 func TestGenerateAPIPodStatusWithDifferentRestartPolicies(t *testing.T) {
2179 testErrorReason := fmt.Errorf("test-error")
2180 emptyContainerID := (&kubecontainer.ContainerID{}).String()
2181 testKubelet := newTestKubelet(t, false )
2182 defer testKubelet.Cleanup()
2183 kubelet := testKubelet.kubelet
2184 pod := podWithUIDNameNs("12345678", "foo", "new")
2185 containers := []v1.Container{{Name: "succeed"}, {Name: "failed"}}
2186 podStatus := &kubecontainer.PodStatus{
2187 ID: pod.UID,
2188 Name: pod.Name,
2189 Namespace: pod.Namespace,
2190 ContainerStatuses: []*kubecontainer.Status{
2191 {
2192 Name: "succeed",
2193 State: kubecontainer.ContainerStateExited,
2194 ExitCode: 0,
2195 },
2196 {
2197 Name: "failed",
2198 State: kubecontainer.ContainerStateExited,
2199 ExitCode: 1,
2200 },
2201 {
2202 Name: "succeed",
2203 State: kubecontainer.ContainerStateExited,
2204 ExitCode: 2,
2205 },
2206 {
2207 Name: "failed",
2208 State: kubecontainer.ContainerStateExited,
2209 ExitCode: 3,
2210 },
2211 },
2212 }
2213 kubelet.reasonCache.add(pod.UID, "succeed", testErrorReason, "")
2214 kubelet.reasonCache.add(pod.UID, "failed", testErrorReason, "")
2215 for c, test := range []struct {
2216 restartPolicy v1.RestartPolicy
2217 expectedState map[string]v1.ContainerState
2218 expectedLastTerminationState map[string]v1.ContainerState
2219
2220 expectedInitState map[string]v1.ContainerState
2221
2222 expectedInitLastTerminationState map[string]v1.ContainerState
2223 }{
2224 {
2225 restartPolicy: v1.RestartPolicyNever,
2226 expectedState: map[string]v1.ContainerState{
2227 "succeed": {Terminated: &v1.ContainerStateTerminated{
2228 ExitCode: 0,
2229 ContainerID: emptyContainerID,
2230 }},
2231 "failed": {Terminated: &v1.ContainerStateTerminated{
2232 ExitCode: 1,
2233 ContainerID: emptyContainerID,
2234 }},
2235 },
2236 expectedLastTerminationState: map[string]v1.ContainerState{
2237 "succeed": {Terminated: &v1.ContainerStateTerminated{
2238 ExitCode: 2,
2239 ContainerID: emptyContainerID,
2240 }},
2241 "failed": {Terminated: &v1.ContainerStateTerminated{
2242 ExitCode: 3,
2243 ContainerID: emptyContainerID,
2244 }},
2245 },
2246 },
2247 {
2248 restartPolicy: v1.RestartPolicyOnFailure,
2249 expectedState: map[string]v1.ContainerState{
2250 "succeed": {Terminated: &v1.ContainerStateTerminated{
2251 ExitCode: 0,
2252 ContainerID: emptyContainerID,
2253 }},
2254 "failed": {Waiting: &v1.ContainerStateWaiting{Reason: testErrorReason.Error()}},
2255 },
2256 expectedLastTerminationState: map[string]v1.ContainerState{
2257 "succeed": {Terminated: &v1.ContainerStateTerminated{
2258 ExitCode: 2,
2259 ContainerID: emptyContainerID,
2260 }},
2261 "failed": {Terminated: &v1.ContainerStateTerminated{
2262 ExitCode: 1,
2263 ContainerID: emptyContainerID,
2264 }},
2265 },
2266 },
2267 {
2268 restartPolicy: v1.RestartPolicyAlways,
2269 expectedState: map[string]v1.ContainerState{
2270 "succeed": {Waiting: &v1.ContainerStateWaiting{Reason: testErrorReason.Error()}},
2271 "failed": {Waiting: &v1.ContainerStateWaiting{Reason: testErrorReason.Error()}},
2272 },
2273 expectedLastTerminationState: map[string]v1.ContainerState{
2274 "succeed": {Terminated: &v1.ContainerStateTerminated{
2275 ExitCode: 0,
2276 ContainerID: emptyContainerID,
2277 }},
2278 "failed": {Terminated: &v1.ContainerStateTerminated{
2279 ExitCode: 1,
2280 ContainerID: emptyContainerID,
2281 }},
2282 },
2283
2284
2285 expectedInitState: map[string]v1.ContainerState{
2286 "succeed": {Terminated: &v1.ContainerStateTerminated{
2287 ExitCode: 0,
2288 ContainerID: emptyContainerID,
2289 }},
2290 "failed": {Waiting: &v1.ContainerStateWaiting{Reason: testErrorReason.Error()}},
2291 },
2292 expectedInitLastTerminationState: map[string]v1.ContainerState{
2293 "succeed": {Terminated: &v1.ContainerStateTerminated{
2294 ExitCode: 2,
2295 ContainerID: emptyContainerID,
2296 }},
2297 "failed": {Terminated: &v1.ContainerStateTerminated{
2298 ExitCode: 1,
2299 ContainerID: emptyContainerID,
2300 }},
2301 },
2302 },
2303 } {
2304 pod.Spec.RestartPolicy = test.restartPolicy
2305
2306 pod.Spec.Containers = containers
2307 apiStatus := kubelet.generateAPIPodStatus(pod, podStatus, false)
2308 expectedState, expectedLastTerminationState := test.expectedState, test.expectedLastTerminationState
2309 verifyContainerStatuses(t, apiStatus.ContainerStatuses, expectedState, expectedLastTerminationState, fmt.Sprintf("case %d", c))
2310 pod.Spec.Containers = nil
2311
2312
2313 pod.Spec.InitContainers = containers
2314 apiStatus = kubelet.generateAPIPodStatus(pod, podStatus, false)
2315 if test.expectedInitState != nil {
2316 expectedState = test.expectedInitState
2317 }
2318 if test.expectedInitLastTerminationState != nil {
2319 expectedLastTerminationState = test.expectedInitLastTerminationState
2320 }
2321 verifyContainerStatuses(t, apiStatus.InitContainerStatuses, expectedState, expectedLastTerminationState, fmt.Sprintf("case %d", c))
2322 pod.Spec.InitContainers = nil
2323 }
2324 }
2325
2326
2327 type testPodAdmitHandler struct {
2328
2329 podsToReject []*v1.Pod
2330 }
2331
2332
2333 func (a *testPodAdmitHandler) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAdmitResult {
2334 for _, podToReject := range a.podsToReject {
2335 if podToReject.UID == attrs.Pod.UID {
2336 return lifecycle.PodAdmitResult{Admit: false, Reason: "Rejected", Message: "Pod is rejected"}
2337 }
2338 }
2339 return lifecycle.PodAdmitResult{Admit: true}
2340 }
2341
2342
2343 func TestHandlePodAdditionsInvokesPodAdmitHandlers(t *testing.T) {
2344 testKubelet := newTestKubelet(t, false )
2345 defer testKubelet.Cleanup()
2346 kl := testKubelet.kubelet
2347 kl.nodeLister = testNodeLister{nodes: []*v1.Node{
2348 {
2349 ObjectMeta: metav1.ObjectMeta{Name: string(kl.nodeName)},
2350 Status: v1.NodeStatus{
2351 Allocatable: v1.ResourceList{
2352 v1.ResourcePods: *resource.NewQuantity(110, resource.DecimalSI),
2353 },
2354 },
2355 },
2356 }}
2357
2358 pods := []*v1.Pod{
2359 {
2360 ObjectMeta: metav1.ObjectMeta{
2361 UID: "123456789",
2362 Name: "podA",
2363 Namespace: "foo",
2364 },
2365 },
2366 {
2367 ObjectMeta: metav1.ObjectMeta{
2368 UID: "987654321",
2369 Name: "podB",
2370 Namespace: "foo",
2371 },
2372 },
2373 }
2374 podToReject := pods[0]
2375 podToAdmit := pods[1]
2376 podsToReject := []*v1.Pod{podToReject}
2377
2378 kl.admitHandlers.AddPodAdmitHandler(&testPodAdmitHandler{podsToReject: podsToReject})
2379
2380 kl.HandlePodAdditions(pods)
2381
2382
2383 checkPodStatus(t, kl, podToReject, v1.PodFailed)
2384 checkPodStatus(t, kl, podToAdmit, v1.PodPending)
2385 }
2386
2387 func TestPodResourceAllocationReset(t *testing.T) {
2388 defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.InPlacePodVerticalScaling, true)()
2389 testKubelet := newTestKubelet(t, false)
2390 defer testKubelet.Cleanup()
2391 kubelet := testKubelet.kubelet
2392 kubelet.statusManager = status.NewFakeManager()
2393
2394 nodes := []*v1.Node{
2395 {
2396 ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname},
2397 Status: v1.NodeStatus{
2398 Capacity: v1.ResourceList{
2399 v1.ResourceCPU: resource.MustParse("8"),
2400 v1.ResourceMemory: resource.MustParse("8Gi"),
2401 },
2402 Allocatable: v1.ResourceList{
2403 v1.ResourceCPU: resource.MustParse("4"),
2404 v1.ResourceMemory: resource.MustParse("4Gi"),
2405 v1.ResourcePods: *resource.NewQuantity(40, resource.DecimalSI),
2406 },
2407 },
2408 },
2409 }
2410 kubelet.nodeLister = testNodeLister{nodes: nodes}
2411
2412 cpu500m := resource.MustParse("500m")
2413 cpu800m := resource.MustParse("800m")
2414 mem500M := resource.MustParse("500Mi")
2415 mem800M := resource.MustParse("800Mi")
2416 cpu500mMem500MPodSpec := &v1.PodSpec{
2417 Containers: []v1.Container{
2418 {
2419 Name: "c1",
2420 Resources: v1.ResourceRequirements{
2421 Requests: v1.ResourceList{v1.ResourceCPU: cpu500m, v1.ResourceMemory: mem500M},
2422 },
2423 },
2424 },
2425 }
2426 cpu800mMem800MPodSpec := cpu500mMem500MPodSpec.DeepCopy()
2427 cpu800mMem800MPodSpec.Containers[0].Resources.Requests = v1.ResourceList{v1.ResourceCPU: cpu800m, v1.ResourceMemory: mem800M}
2428 cpu800mPodSpec := cpu500mMem500MPodSpec.DeepCopy()
2429 cpu800mPodSpec.Containers[0].Resources.Requests = v1.ResourceList{v1.ResourceCPU: cpu800m}
2430 mem800MPodSpec := cpu500mMem500MPodSpec.DeepCopy()
2431 mem800MPodSpec.Containers[0].Resources.Requests = v1.ResourceList{v1.ResourceMemory: mem800M}
2432
2433 cpu500mPodSpec := cpu500mMem500MPodSpec.DeepCopy()
2434 cpu500mPodSpec.Containers[0].Resources.Requests = v1.ResourceList{v1.ResourceCPU: cpu500m}
2435 mem500MPodSpec := cpu500mMem500MPodSpec.DeepCopy()
2436 mem500MPodSpec.Containers[0].Resources.Requests = v1.ResourceList{v1.ResourceMemory: mem500M}
2437 emptyPodSpec := cpu500mMem500MPodSpec.DeepCopy()
2438 emptyPodSpec.Containers[0].Resources.Requests = v1.ResourceList{}
2439
2440 tests := []struct {
2441 name string
2442 pod *v1.Pod
2443 existingPodAllocation *v1.Pod
2444 expectedPodResourceAllocation state.PodResourceAllocation
2445 }{
2446 {
2447 name: "Having both memory and cpu, resource allocation not exists",
2448 pod: podWithUIDNameNsSpec("1", "pod1", "foo", *cpu500mMem500MPodSpec),
2449 expectedPodResourceAllocation: state.PodResourceAllocation{
2450 "1": map[string]v1.ResourceList{
2451 cpu500mMem500MPodSpec.Containers[0].Name: cpu500mMem500MPodSpec.Containers[0].Resources.Requests,
2452 },
2453 },
2454 },
2455 {
2456 name: "Having both memory and cpu, resource allocation exists",
2457 pod: podWithUIDNameNsSpec("2", "pod2", "foo", *cpu500mMem500MPodSpec),
2458 existingPodAllocation: podWithUIDNameNsSpec("2", "pod2", "foo", *cpu500mMem500MPodSpec),
2459 expectedPodResourceAllocation: state.PodResourceAllocation{
2460 "2": map[string]v1.ResourceList{
2461 cpu500mMem500MPodSpec.Containers[0].Name: cpu500mMem500MPodSpec.Containers[0].Resources.Requests,
2462 },
2463 },
2464 },
2465 {
2466 name: "Having both memory and cpu, resource allocation exists (with different value)",
2467 pod: podWithUIDNameNsSpec("3", "pod3", "foo", *cpu500mMem500MPodSpec),
2468 existingPodAllocation: podWithUIDNameNsSpec("3", "pod3", "foo", *cpu800mMem800MPodSpec),
2469 expectedPodResourceAllocation: state.PodResourceAllocation{
2470 "3": map[string]v1.ResourceList{
2471 cpu800mMem800MPodSpec.Containers[0].Name: cpu800mMem800MPodSpec.Containers[0].Resources.Requests,
2472 },
2473 },
2474 },
2475 {
2476 name: "Only has cpu, resource allocation not exists",
2477 pod: podWithUIDNameNsSpec("4", "pod5", "foo", *cpu500mPodSpec),
2478 expectedPodResourceAllocation: state.PodResourceAllocation{
2479 "4": map[string]v1.ResourceList{
2480 cpu500mPodSpec.Containers[0].Name: cpu500mPodSpec.Containers[0].Resources.Requests,
2481 },
2482 },
2483 },
2484 {
2485 name: "Only has cpu, resource allocation exists",
2486 pod: podWithUIDNameNsSpec("5", "pod5", "foo", *cpu500mPodSpec),
2487 existingPodAllocation: podWithUIDNameNsSpec("5", "pod5", "foo", *cpu500mPodSpec),
2488 expectedPodResourceAllocation: state.PodResourceAllocation{
2489 "5": map[string]v1.ResourceList{
2490 cpu500mPodSpec.Containers[0].Name: cpu500mPodSpec.Containers[0].Resources.Requests,
2491 },
2492 },
2493 },
2494 {
2495 name: "Only has cpu, resource allocation exists (with different value)",
2496 pod: podWithUIDNameNsSpec("6", "pod6", "foo", *cpu500mPodSpec),
2497 existingPodAllocation: podWithUIDNameNsSpec("6", "pod6", "foo", *cpu800mPodSpec),
2498 expectedPodResourceAllocation: state.PodResourceAllocation{
2499 "6": map[string]v1.ResourceList{
2500 cpu800mPodSpec.Containers[0].Name: cpu800mPodSpec.Containers[0].Resources.Requests,
2501 },
2502 },
2503 },
2504 {
2505 name: "Only has memory, resource allocation not exists",
2506 pod: podWithUIDNameNsSpec("7", "pod7", "foo", *mem500MPodSpec),
2507 expectedPodResourceAllocation: state.PodResourceAllocation{
2508 "7": map[string]v1.ResourceList{
2509 mem500MPodSpec.Containers[0].Name: mem500MPodSpec.Containers[0].Resources.Requests,
2510 },
2511 },
2512 },
2513 {
2514 name: "Only has memory, resource allocation exists",
2515 pod: podWithUIDNameNsSpec("8", "pod8", "foo", *mem500MPodSpec),
2516 existingPodAllocation: podWithUIDNameNsSpec("8", "pod8", "foo", *mem500MPodSpec),
2517 expectedPodResourceAllocation: state.PodResourceAllocation{
2518 "8": map[string]v1.ResourceList{
2519 mem500MPodSpec.Containers[0].Name: mem500MPodSpec.Containers[0].Resources.Requests,
2520 },
2521 },
2522 },
2523 {
2524 name: "Only has memory, resource allocation exists (with different value)",
2525 pod: podWithUIDNameNsSpec("9", "pod9", "foo", *mem500MPodSpec),
2526 existingPodAllocation: podWithUIDNameNsSpec("9", "pod9", "foo", *mem800MPodSpec),
2527 expectedPodResourceAllocation: state.PodResourceAllocation{
2528 "9": map[string]v1.ResourceList{
2529 mem800MPodSpec.Containers[0].Name: mem800MPodSpec.Containers[0].Resources.Requests,
2530 },
2531 },
2532 },
2533 {
2534 name: "No CPU and memory, resource allocation not exists",
2535 pod: podWithUIDNameNsSpec("10", "pod10", "foo", *emptyPodSpec),
2536 expectedPodResourceAllocation: state.PodResourceAllocation{
2537 "10": map[string]v1.ResourceList{
2538 emptyPodSpec.Containers[0].Name: emptyPodSpec.Containers[0].Resources.Requests,
2539 },
2540 },
2541 },
2542 {
2543 name: "No CPU and memory, resource allocation exists",
2544 pod: podWithUIDNameNsSpec("11", "pod11", "foo", *emptyPodSpec),
2545 existingPodAllocation: podWithUIDNameNsSpec("11", "pod11", "foo", *emptyPodSpec),
2546 expectedPodResourceAllocation: state.PodResourceAllocation{
2547 "11": map[string]v1.ResourceList{
2548 emptyPodSpec.Containers[0].Name: emptyPodSpec.Containers[0].Resources.Requests,
2549 },
2550 },
2551 },
2552 }
2553 for _, tc := range tests {
2554 if tc.existingPodAllocation != nil {
2555
2556 err := kubelet.statusManager.SetPodAllocation(tc.existingPodAllocation)
2557 if err != nil {
2558 t.Fatalf("failed to set pod allocation: %v", err)
2559 }
2560 }
2561 kubelet.HandlePodAdditions([]*v1.Pod{tc.pod})
2562
2563 allocatedResources, found := kubelet.statusManager.GetContainerResourceAllocation(string(tc.pod.UID), tc.pod.Spec.Containers[0].Name)
2564 if !found {
2565 t.Fatalf("resource allocation should exist: (pod: %#v, container: %s)", tc.pod, tc.pod.Spec.Containers[0].Name)
2566 }
2567 assert.Equal(t, tc.expectedPodResourceAllocation[string(tc.pod.UID)][tc.pod.Spec.Containers[0].Name], allocatedResources, tc.name)
2568 }
2569 }
2570
2571 func TestHandlePodResourcesResize(t *testing.T) {
2572 defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.InPlacePodVerticalScaling, true)()
2573 testKubelet := newTestKubelet(t, false)
2574 defer testKubelet.Cleanup()
2575 kubelet := testKubelet.kubelet
2576 kubelet.statusManager = status.NewFakeManager()
2577
2578 cpu500m := resource.MustParse("500m")
2579 cpu1000m := resource.MustParse("1")
2580 cpu1500m := resource.MustParse("1500m")
2581 cpu2500m := resource.MustParse("2500m")
2582 cpu5000m := resource.MustParse("5000m")
2583 mem500M := resource.MustParse("500Mi")
2584 mem1000M := resource.MustParse("1Gi")
2585 mem1500M := resource.MustParse("1500Mi")
2586 mem2500M := resource.MustParse("2500Mi")
2587 mem4500M := resource.MustParse("4500Mi")
2588
2589 nodes := []*v1.Node{
2590 {
2591 ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname},
2592 Status: v1.NodeStatus{
2593 Capacity: v1.ResourceList{
2594 v1.ResourceCPU: resource.MustParse("8"),
2595 v1.ResourceMemory: resource.MustParse("8Gi"),
2596 },
2597 Allocatable: v1.ResourceList{
2598 v1.ResourceCPU: resource.MustParse("4"),
2599 v1.ResourceMemory: resource.MustParse("4Gi"),
2600 v1.ResourcePods: *resource.NewQuantity(40, resource.DecimalSI),
2601 },
2602 },
2603 },
2604 }
2605 kubelet.nodeLister = testNodeLister{nodes: nodes}
2606
2607 testPod1 := &v1.Pod{
2608 ObjectMeta: metav1.ObjectMeta{
2609 UID: "1111",
2610 Name: "pod1",
2611 Namespace: "ns1",
2612 },
2613 Spec: v1.PodSpec{
2614 Containers: []v1.Container{
2615 {
2616 Name: "c1",
2617 Image: "i1",
2618 Resources: v1.ResourceRequirements{
2619 Requests: v1.ResourceList{v1.ResourceCPU: cpu1000m, v1.ResourceMemory: mem1000M},
2620 },
2621 },
2622 },
2623 },
2624 Status: v1.PodStatus{
2625 Phase: v1.PodRunning,
2626 ContainerStatuses: []v1.ContainerStatus{
2627 {
2628 Name: "c1",
2629 AllocatedResources: v1.ResourceList{v1.ResourceCPU: cpu1000m, v1.ResourceMemory: mem1000M},
2630 Resources: &v1.ResourceRequirements{},
2631 },
2632 },
2633 },
2634 }
2635 testPod2 := testPod1.DeepCopy()
2636 testPod2.UID = "2222"
2637 testPod2.Name = "pod2"
2638 testPod2.Namespace = "ns2"
2639 testPod3 := testPod1.DeepCopy()
2640 testPod3.UID = "3333"
2641 testPod3.Name = "pod3"
2642 testPod3.Namespace = "ns2"
2643
2644 testKubelet.fakeKubeClient = fake.NewSimpleClientset(testPod1, testPod2, testPod3)
2645 kubelet.kubeClient = testKubelet.fakeKubeClient
2646 defer testKubelet.fakeKubeClient.ClearActions()
2647 kubelet.podManager.AddPod(testPod1)
2648 kubelet.podManager.AddPod(testPod2)
2649 kubelet.podManager.AddPod(testPod3)
2650 kubelet.podWorkers.(*fakePodWorkers).running = map[types.UID]bool{
2651 testPod1.UID: true,
2652 testPod2.UID: true,
2653 testPod3.UID: true,
2654 }
2655 defer kubelet.podManager.RemovePod(testPod3)
2656 defer kubelet.podManager.RemovePod(testPod2)
2657 defer kubelet.podManager.RemovePod(testPod1)
2658
2659 tests := []struct {
2660 name string
2661 pod *v1.Pod
2662 newRequests v1.ResourceList
2663 expectedAllocations v1.ResourceList
2664 expectedResize v1.PodResizeStatus
2665 }{
2666 {
2667 name: "Request CPU and memory decrease - expect InProgress",
2668 pod: testPod2,
2669 newRequests: v1.ResourceList{v1.ResourceCPU: cpu500m, v1.ResourceMemory: mem500M},
2670 expectedAllocations: v1.ResourceList{v1.ResourceCPU: cpu500m, v1.ResourceMemory: mem500M},
2671 expectedResize: v1.PodResizeStatusInProgress,
2672 },
2673 {
2674 name: "Request CPU increase, memory decrease - expect InProgress",
2675 pod: testPod2,
2676 newRequests: v1.ResourceList{v1.ResourceCPU: cpu1500m, v1.ResourceMemory: mem500M},
2677 expectedAllocations: v1.ResourceList{v1.ResourceCPU: cpu1500m, v1.ResourceMemory: mem500M},
2678 expectedResize: v1.PodResizeStatusInProgress,
2679 },
2680 {
2681 name: "Request CPU decrease, memory increase - expect InProgress",
2682 pod: testPod2,
2683 newRequests: v1.ResourceList{v1.ResourceCPU: cpu500m, v1.ResourceMemory: mem1500M},
2684 expectedAllocations: v1.ResourceList{v1.ResourceCPU: cpu500m, v1.ResourceMemory: mem1500M},
2685 expectedResize: v1.PodResizeStatusInProgress,
2686 },
2687 {
2688 name: "Request CPU and memory increase beyond current capacity - expect Deferred",
2689 pod: testPod2,
2690 newRequests: v1.ResourceList{v1.ResourceCPU: cpu2500m, v1.ResourceMemory: mem2500M},
2691 expectedAllocations: v1.ResourceList{v1.ResourceCPU: cpu1000m, v1.ResourceMemory: mem1000M},
2692 expectedResize: v1.PodResizeStatusDeferred,
2693 },
2694 {
2695 name: "Request CPU decrease and memory increase beyond current capacity - expect Deferred",
2696 pod: testPod2,
2697 newRequests: v1.ResourceList{v1.ResourceCPU: cpu500m, v1.ResourceMemory: mem2500M},
2698 expectedAllocations: v1.ResourceList{v1.ResourceCPU: cpu1000m, v1.ResourceMemory: mem1000M},
2699 expectedResize: v1.PodResizeStatusDeferred,
2700 },
2701 {
2702 name: "Request memory increase beyond node capacity - expect Infeasible",
2703 pod: testPod2,
2704 newRequests: v1.ResourceList{v1.ResourceCPU: cpu1000m, v1.ResourceMemory: mem4500M},
2705 expectedAllocations: v1.ResourceList{v1.ResourceCPU: cpu1000m, v1.ResourceMemory: mem1000M},
2706 expectedResize: v1.PodResizeStatusInfeasible,
2707 },
2708 {
2709 name: "Request CPU increase beyond node capacity - expect Infeasible",
2710 pod: testPod2,
2711 newRequests: v1.ResourceList{v1.ResourceCPU: cpu5000m, v1.ResourceMemory: mem1000M},
2712 expectedAllocations: v1.ResourceList{v1.ResourceCPU: cpu1000m, v1.ResourceMemory: mem1000M},
2713 expectedResize: v1.PodResizeStatusInfeasible,
2714 },
2715 }
2716
2717 for _, tt := range tests {
2718 tt.pod.Spec.Containers[0].Resources.Requests = tt.newRequests
2719 tt.pod.Status.ContainerStatuses[0].AllocatedResources = v1.ResourceList{v1.ResourceCPU: cpu1000m, v1.ResourceMemory: mem1000M}
2720 kubelet.handlePodResourcesResize(tt.pod)
2721 updatedPod, found := kubelet.podManager.GetPodByName(tt.pod.Namespace, tt.pod.Name)
2722 assert.True(t, found, "expected to find pod %s", tt.pod.Name)
2723 assert.Equal(t, tt.expectedAllocations, updatedPod.Status.ContainerStatuses[0].AllocatedResources, tt.name)
2724 assert.Equal(t, tt.expectedResize, updatedPod.Status.Resize, tt.name)
2725 testKubelet.fakeKubeClient.ClearActions()
2726 }
2727 }
2728
2729
2730 type testPodSyncLoopHandler struct {
2731
2732 podsToSync []*v1.Pod
2733 }
2734
2735
2736 func (a *testPodSyncLoopHandler) ShouldSync(pod *v1.Pod) bool {
2737 for _, podToSync := range a.podsToSync {
2738 if podToSync.UID == pod.UID {
2739 return true
2740 }
2741 }
2742 return false
2743 }
2744
2745
2746 func TestGetPodsToSyncInvokesPodSyncLoopHandlers(t *testing.T) {
2747 testKubelet := newTestKubelet(t, false )
2748 defer testKubelet.Cleanup()
2749 kubelet := testKubelet.kubelet
2750 pods := newTestPods(5)
2751 expected := []*v1.Pod{pods[0]}
2752 kubelet.AddPodSyncLoopHandler(&testPodSyncLoopHandler{expected})
2753 kubelet.podManager.SetPods(pods)
2754
2755 podsToSync := kubelet.getPodsToSync()
2756 sort.Sort(podsByUID(expected))
2757 sort.Sort(podsByUID(podsToSync))
2758 assert.Equal(t, expected, podsToSync)
2759 }
2760
2761
2762 type testPodSyncHandler struct {
2763
2764 podsToEvict []*v1.Pod
2765
2766 reason string
2767
2768 message string
2769 }
2770
2771
2772 func (a *testPodSyncHandler) ShouldEvict(pod *v1.Pod) lifecycle.ShouldEvictResponse {
2773 for _, podToEvict := range a.podsToEvict {
2774 if podToEvict.UID == pod.UID {
2775 return lifecycle.ShouldEvictResponse{Evict: true, Reason: a.reason, Message: a.message}
2776 }
2777 }
2778 return lifecycle.ShouldEvictResponse{Evict: false}
2779 }
2780
2781
2782 func TestGenerateAPIPodStatusInvokesPodSyncHandlers(t *testing.T) {
2783 testKubelet := newTestKubelet(t, false )
2784 defer testKubelet.Cleanup()
2785 kubelet := testKubelet.kubelet
2786 pod := newTestPods(1)[0]
2787 podsToEvict := []*v1.Pod{pod}
2788 kubelet.AddPodSyncHandler(&testPodSyncHandler{podsToEvict, "Evicted", "because"})
2789 status := &kubecontainer.PodStatus{
2790 ID: pod.UID,
2791 Name: pod.Name,
2792 Namespace: pod.Namespace,
2793 }
2794 apiStatus := kubelet.generateAPIPodStatus(pod, status, false)
2795 require.Equal(t, v1.PodFailed, apiStatus.Phase)
2796 require.Equal(t, "Evicted", apiStatus.Reason)
2797 require.Equal(t, "because", apiStatus.Message)
2798 }
2799
2800 func TestSyncTerminatingPodKillPod(t *testing.T) {
2801 testKubelet := newTestKubelet(t, false )
2802 defer testKubelet.Cleanup()
2803 kl := testKubelet.kubelet
2804 pod := &v1.Pod{
2805 ObjectMeta: metav1.ObjectMeta{
2806 UID: "12345678",
2807 Name: "bar",
2808 Namespace: "foo",
2809 },
2810 }
2811 pods := []*v1.Pod{pod}
2812 kl.podManager.SetPods(pods)
2813 podStatus := &kubecontainer.PodStatus{ID: pod.UID}
2814 gracePeriodOverride := int64(0)
2815 err := kl.SyncTerminatingPod(context.Background(), pod, podStatus, &gracePeriodOverride, func(podStatus *v1.PodStatus) {
2816 podStatus.Phase = v1.PodFailed
2817 podStatus.Reason = "reason"
2818 podStatus.Message = "message"
2819 })
2820 require.NoError(t, err)
2821
2822
2823 checkPodStatus(t, kl, pod, v1.PodFailed)
2824 }
2825
2826 func TestSyncLabels(t *testing.T) {
2827 tests := []struct {
2828 name string
2829 existingNode *v1.Node
2830 isPatchingNeeded bool
2831 }{
2832 {
2833 name: "no labels",
2834 existingNode: &v1.Node{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{}}},
2835 isPatchingNeeded: true,
2836 },
2837 {
2838 name: "wrong labels",
2839 existingNode: &v1.Node{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{v1.LabelOSStable: "dummyOS", v1.LabelArchStable: "dummyArch"}}},
2840 isPatchingNeeded: true,
2841 },
2842 {
2843 name: "correct labels",
2844 existingNode: &v1.Node{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{v1.LabelOSStable: goruntime.GOOS, v1.LabelArchStable: goruntime.GOARCH}}},
2845 isPatchingNeeded: false,
2846 },
2847 {
2848 name: "partially correct labels",
2849 existingNode: &v1.Node{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{v1.LabelOSStable: goruntime.GOOS, v1.LabelArchStable: "dummyArch"}}},
2850 isPatchingNeeded: true,
2851 },
2852 }
2853
2854 for _, test := range tests {
2855 t.Run(test.name, func(t *testing.T) {
2856 testKubelet := newTestKubelet(t, false)
2857 defer testKubelet.Cleanup()
2858 kl := testKubelet.kubelet
2859 kubeClient := testKubelet.fakeKubeClient
2860
2861 test.existingNode.Name = string(kl.nodeName)
2862
2863 kl.nodeLister = testNodeLister{nodes: []*v1.Node{test.existingNode}}
2864 go func() { kl.syncNodeStatus() }()
2865
2866 err := retryWithExponentialBackOff(
2867 100*time.Millisecond,
2868 func() (bool, error) {
2869 var savedNode *v1.Node
2870 if test.isPatchingNeeded {
2871 actions := kubeClient.Actions()
2872 if len(actions) == 0 {
2873 t.Logf("No action yet")
2874 return false, nil
2875 }
2876 for _, action := range actions {
2877 if action.GetVerb() == "patch" {
2878 var (
2879 err error
2880 patchAction = action.(core.PatchActionImpl)
2881 patchContent = patchAction.GetPatch()
2882 )
2883 savedNode, err = applyNodeStatusPatch(test.existingNode, patchContent)
2884 if err != nil {
2885 t.Logf("node patching failed, %v", err)
2886 return false, nil
2887 }
2888 }
2889 }
2890 } else {
2891 savedNode = test.existingNode
2892 }
2893 if savedNode == nil || savedNode.Labels == nil {
2894 t.Logf("savedNode.Labels should not be nil")
2895 return false, nil
2896 }
2897 val, ok := savedNode.Labels[v1.LabelOSStable]
2898 if !ok {
2899 t.Logf("expected kubernetes.io/os label to be present")
2900 return false, nil
2901 }
2902 if val != goruntime.GOOS {
2903 t.Logf("expected kubernetes.io/os to match runtime.GOOS but got %v", val)
2904 return false, nil
2905 }
2906 val, ok = savedNode.Labels[v1.LabelArchStable]
2907 if !ok {
2908 t.Logf("expected kubernetes.io/arch label to be present")
2909 return false, nil
2910 }
2911 if val != goruntime.GOARCH {
2912 t.Logf("expected kubernetes.io/arch to match runtime.GOARCH but got %v", val)
2913 return false, nil
2914 }
2915 return true, nil
2916 },
2917 )
2918 if err != nil {
2919 t.Fatalf("expected labels to be reconciled but it failed with %v", err)
2920 }
2921 })
2922 }
2923 }
2924
2925 func waitForVolumeUnmount(
2926 volumeManager kubeletvolume.VolumeManager,
2927 pod *v1.Pod) error {
2928 var podVolumes kubecontainer.VolumeMap
2929 err := retryWithExponentialBackOff(
2930 time.Duration(50*time.Millisecond),
2931 func() (bool, error) {
2932
2933 podVolumes = volumeManager.GetMountedVolumesForPod(
2934 util.GetUniquePodName(pod))
2935
2936 if len(podVolumes) != 0 {
2937 return false, nil
2938 }
2939
2940 return true, nil
2941 },
2942 )
2943
2944 if err != nil {
2945 return fmt.Errorf(
2946 "Expected volumes to be unmounted. But some volumes are still mounted: %#v", podVolumes)
2947 }
2948
2949 return nil
2950 }
2951
2952 func waitForVolumeDetach(
2953 volumeName v1.UniqueVolumeName,
2954 volumeManager kubeletvolume.VolumeManager) error {
2955 attachedVolumes := []v1.UniqueVolumeName{}
2956 err := retryWithExponentialBackOff(
2957 time.Duration(50*time.Millisecond),
2958 func() (bool, error) {
2959
2960 volumeAttached := volumeManager.VolumeIsAttached(volumeName)
2961 return !volumeAttached, nil
2962 },
2963 )
2964
2965 if err != nil {
2966 return fmt.Errorf(
2967 "Expected volumes to be detached. But some volumes are still attached: %#v", attachedVolumes)
2968 }
2969
2970 return nil
2971 }
2972
2973 func retryWithExponentialBackOff(initialDuration time.Duration, fn wait.ConditionFunc) error {
2974 backoff := wait.Backoff{
2975 Duration: initialDuration,
2976 Factor: 3,
2977 Jitter: 0,
2978 Steps: 6,
2979 }
2980 return wait.ExponentialBackoff(backoff, fn)
2981 }
2982
2983 func simulateVolumeInUseUpdate(
2984 volumeName v1.UniqueVolumeName,
2985 stopCh <-chan struct{},
2986 volumeManager kubeletvolume.VolumeManager) {
2987 ticker := time.NewTicker(100 * time.Millisecond)
2988 defer ticker.Stop()
2989 for {
2990 select {
2991 case <-ticker.C:
2992 volumeManager.MarkVolumesAsReportedInUse(
2993 []v1.UniqueVolumeName{volumeName})
2994 case <-stopCh:
2995 return
2996 }
2997 }
2998 }
2999
3000 func runVolumeManager(kubelet *Kubelet) chan struct{} {
3001 stopCh := make(chan struct{})
3002 go kubelet.volumeManager.Run(kubelet.sourcesReady, stopCh)
3003 return stopCh
3004 }
3005
3006
3007 func dirExists(path string) bool {
3008 s, err := os.Stat(path)
3009 if err != nil {
3010 return false
3011 }
3012 return s.IsDir()
3013 }
3014
3015
3016 type podsByUID []*v1.Pod
3017
3018 func (p podsByUID) Len() int { return len(p) }
3019 func (p podsByUID) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
3020 func (p podsByUID) Less(i, j int) bool { return p[i].UID < p[j].UID }
3021
3022
3023
3024
3025 func createAndStartFakeRemoteRuntime(t *testing.T) (*fakeremote.RemoteRuntime, string) {
3026 endpoint, err := fakeremote.GenerateEndpoint()
3027 require.NoError(t, err)
3028
3029 fakeRuntime := fakeremote.NewFakeRemoteRuntime()
3030 fakeRuntime.Start(endpoint)
3031
3032 return fakeRuntime, endpoint
3033 }
3034
3035 func createRemoteRuntimeService(endpoint string, t *testing.T, tp oteltrace.TracerProvider) internalapi.RuntimeService {
3036 runtimeService, err := remote.NewRemoteRuntimeService(endpoint, 15*time.Second, tp)
3037 require.NoError(t, err)
3038 return runtimeService
3039 }
3040
3041 func TestNewMainKubeletStandAlone(t *testing.T) {
3042 tempDir, err := os.MkdirTemp("", "logs")
3043 ContainerLogsDir = tempDir
3044 assert.NoError(t, err)
3045 defer os.RemoveAll(ContainerLogsDir)
3046 kubeCfg := &kubeletconfiginternal.KubeletConfiguration{
3047 SyncFrequency: metav1.Duration{Duration: time.Minute},
3048 ConfigMapAndSecretChangeDetectionStrategy: kubeletconfiginternal.WatchChangeDetectionStrategy,
3049 ContainerLogMaxSize: "10Mi",
3050 ContainerLogMaxFiles: 5,
3051 MemoryThrottlingFactor: utilpointer.Float64(0),
3052 }
3053 var prober volume.DynamicPluginProber
3054 tp := oteltrace.NewNoopTracerProvider()
3055 mockCtrl := gomock.NewController(t)
3056 defer mockCtrl.Finish()
3057 cadvisor := cadvisortest.NewMockInterface(mockCtrl)
3058 cadvisor.EXPECT().MachineInfo().Return(&cadvisorapi.MachineInfo{}, nil).AnyTimes()
3059 cadvisor.EXPECT().ImagesFsInfo().Return(cadvisorapiv2.FsInfo{
3060 Usage: 400,
3061 Capacity: 1000,
3062 Available: 600,
3063 }, nil).AnyTimes()
3064 tlsOptions := &server.TLSOptions{
3065 Config: &tls.Config{
3066 MinVersion: 0,
3067 },
3068 }
3069 fakeRuntime, endpoint := createAndStartFakeRemoteRuntime(t)
3070 defer func() {
3071 fakeRuntime.Stop()
3072 }()
3073 fakeRecorder := &record.FakeRecorder{}
3074 rtSvc := createRemoteRuntimeService(endpoint, t, oteltrace.NewNoopTracerProvider())
3075 kubeDep := &Dependencies{
3076 Auth: nil,
3077 CAdvisorInterface: cadvisor,
3078 Cloud: nil,
3079 ContainerManager: cm.NewStubContainerManager(),
3080 KubeClient: nil,
3081 HeartbeatClient: nil,
3082 EventClient: nil,
3083 TracerProvider: tp,
3084 HostUtil: hostutil.NewFakeHostUtil(nil),
3085 Mounter: mount.NewFakeMounter(nil),
3086 Recorder: fakeRecorder,
3087 RemoteRuntimeService: rtSvc,
3088 RemoteImageService: fakeRuntime.ImageService,
3089 Subpather: &subpath.FakeSubpath{},
3090 OOMAdjuster: oom.NewOOMAdjuster(),
3091 OSInterface: kubecontainer.RealOS{},
3092 DynamicPluginProber: prober,
3093 TLSOptions: tlsOptions,
3094 }
3095 crOptions := &config.ContainerRuntimeOptions{}
3096
3097 testMainKubelet, err := NewMainKubelet(
3098 kubeCfg,
3099 kubeDep,
3100 crOptions,
3101 "hostname",
3102 false,
3103 "hostname",
3104 []net.IP{},
3105 "",
3106 "external",
3107 "/tmp/cert",
3108 "/tmp/rootdir",
3109 tempDir,
3110 "",
3111 "",
3112 false,
3113 []v1.Taint{},
3114 []string{},
3115 "",
3116 false,
3117 false,
3118 metav1.Duration{Duration: time.Minute},
3119 1024,
3120 110,
3121 true,
3122 true,
3123 map[string]string{},
3124 1024,
3125 false,
3126 )
3127 assert.NoError(t, err, "NewMainKubelet should succeed")
3128 assert.NotNil(t, testMainKubelet, "testMainKubelet should not be nil")
3129
3130 testMainKubelet.BirthCry()
3131 testMainKubelet.StartGarbageCollection()
3132
3133
3134
3135
3136
3137
3138
3139
3140
3141
3142
3143
3144
3145
3146
3147
3148
3149
3150
3151
3152
3153
3154
3155 assert.Nil(t, testMainKubelet.configMapManager, "configmap manager should be nil if kubelet is in standalone mode")
3156 assert.Nil(t, testMainKubelet.secretManager, "secret manager should be nil if kubelet is in standalone mode")
3157 }
3158
3159 func TestSyncPodSpans(t *testing.T) {
3160 testKubelet := newTestKubelet(t, false)
3161 kubelet := testKubelet.kubelet
3162
3163 recorder := record.NewFakeRecorder(20)
3164 nodeRef := &v1.ObjectReference{
3165 Kind: "Node",
3166 Name: "testNode",
3167 UID: types.UID("testNode"),
3168 Namespace: "",
3169 }
3170 kubelet.dnsConfigurer = dns.NewConfigurer(recorder, nodeRef, nil, nil, "TEST", "")
3171
3172 kubeCfg := &kubeletconfiginternal.KubeletConfiguration{
3173 SyncFrequency: metav1.Duration{Duration: time.Minute},
3174 ConfigMapAndSecretChangeDetectionStrategy: kubeletconfiginternal.WatchChangeDetectionStrategy,
3175 ContainerLogMaxSize: "10Mi",
3176 ContainerLogMaxFiles: 5,
3177 MemoryThrottlingFactor: utilpointer.Float64(0),
3178 }
3179
3180 exp := tracetest.NewInMemoryExporter()
3181 tp := sdktrace.NewTracerProvider(
3182 sdktrace.WithSyncer(exp),
3183 )
3184 kubelet.tracer = tp.Tracer(instrumentationScope)
3185
3186 fakeRuntime, endpoint := createAndStartFakeRemoteRuntime(t)
3187 defer func() {
3188 fakeRuntime.Stop()
3189 }()
3190 runtimeSvc := createRemoteRuntimeService(endpoint, t, tp)
3191 kubelet.runtimeService = runtimeSvc
3192
3193 fakeRuntime.ImageService.SetFakeImageSize(100)
3194 fakeRuntime.ImageService.SetFakeImages([]string{"test:latest"})
3195 imageSvc, err := remote.NewRemoteImageService(endpoint, 15*time.Second, tp)
3196 assert.NoError(t, err)
3197
3198 kubelet.containerRuntime, err = kuberuntime.NewKubeGenericRuntimeManager(
3199 kubelet.recorder,
3200 kubelet.livenessManager,
3201 kubelet.readinessManager,
3202 kubelet.startupManager,
3203 kubelet.rootDirectory,
3204 kubelet.podLogsDirectory,
3205 kubelet.machineInfo,
3206 kubelet.podWorkers,
3207 kubelet.os,
3208 kubelet,
3209 nil,
3210 kubelet.backOff,
3211 kubeCfg.SerializeImagePulls,
3212 kubeCfg.MaxParallelImagePulls,
3213 float32(kubeCfg.RegistryPullQPS),
3214 int(kubeCfg.RegistryBurst),
3215 "",
3216 "",
3217 kubeCfg.CPUCFSQuota,
3218 kubeCfg.CPUCFSQuotaPeriod,
3219 runtimeSvc,
3220 imageSvc,
3221 kubelet.containerManager,
3222 kubelet.containerLogManager,
3223 kubelet.runtimeClassManager,
3224 false,
3225 kubeCfg.MemorySwap.SwapBehavior,
3226 kubelet.containerManager.GetNodeAllocatableAbsolute,
3227 *kubeCfg.MemoryThrottlingFactor,
3228 kubeletutil.NewPodStartupLatencyTracker(),
3229 tp,
3230 )
3231 assert.NoError(t, err)
3232
3233 pod := podWithUIDNameNsSpec("12345678", "foo", "new", v1.PodSpec{
3234 Containers: []v1.Container{
3235 {
3236 Name: "bar",
3237 Image: "test:latest",
3238 ImagePullPolicy: v1.PullAlways,
3239 },
3240 },
3241 EnableServiceLinks: utilpointer.Bool(false),
3242 })
3243
3244 _, err = kubelet.SyncPod(context.Background(), kubetypes.SyncPodCreate, pod, nil, &kubecontainer.PodStatus{})
3245 require.NoError(t, err)
3246
3247 require.NoError(t, err)
3248 assert.NotEmpty(t, exp.GetSpans())
3249
3250
3251 var rootSpan *tracetest.SpanStub
3252 spans := exp.GetSpans()
3253 for i, span := range spans {
3254 if span.Name == "syncPod" {
3255 rootSpan = &spans[i]
3256 break
3257 }
3258 }
3259 assert.NotNil(t, rootSpan)
3260
3261 imageServiceSpans := make([]tracetest.SpanStub, 0)
3262 runtimeServiceSpans := make([]tracetest.SpanStub, 0)
3263 for _, span := range exp.GetSpans() {
3264 if span.SpanContext.TraceID() == rootSpan.SpanContext.TraceID() {
3265 switch {
3266 case strings.HasPrefix(span.Name, "runtime.v1.ImageService"):
3267 imageServiceSpans = append(imageServiceSpans, span)
3268 case strings.HasPrefix(span.Name, "runtime.v1.RuntimeService"):
3269 runtimeServiceSpans = append(runtimeServiceSpans, span)
3270 }
3271 }
3272 }
3273 assert.NotEmpty(t, imageServiceSpans, "syncPod trace should have image service spans")
3274 assert.NotEmpty(t, runtimeServiceSpans, "syncPod trace should have runtime service spans")
3275
3276 for _, span := range imageServiceSpans {
3277 assert.Equal(t, span.Parent.SpanID(), rootSpan.SpanContext.SpanID(), fmt.Sprintf("image service span %s %s should be child of root span", span.Name, span.Parent.SpanID()))
3278 }
3279
3280 for _, span := range runtimeServiceSpans {
3281 assert.Equal(t, span.Parent.SpanID(), rootSpan.SpanContext.SpanID(), fmt.Sprintf("runtime service span %s %s should be child of root span", span.Name, span.Parent.SpanID()))
3282 }
3283 }
3284
View as plain text