1
2
3
4
19
20 package cm
21
22 import (
23 "bytes"
24 "context"
25 "fmt"
26 "os"
27 "path"
28 "strings"
29 "sync"
30 "time"
31
32 "github.com/opencontainers/runc/libcontainer/cgroups"
33 "github.com/opencontainers/runc/libcontainer/cgroups/manager"
34 "github.com/opencontainers/runc/libcontainer/configs"
35 "k8s.io/klog/v2"
36 "k8s.io/mount-utils"
37 utilpath "k8s.io/utils/path"
38
39 v1 "k8s.io/api/core/v1"
40 "k8s.io/apimachinery/pkg/api/resource"
41 "k8s.io/apimachinery/pkg/types"
42 utilerrors "k8s.io/apimachinery/pkg/util/errors"
43 "k8s.io/apimachinery/pkg/util/sets"
44 "k8s.io/apimachinery/pkg/util/wait"
45 utilfeature "k8s.io/apiserver/pkg/util/feature"
46 clientset "k8s.io/client-go/kubernetes"
47 "k8s.io/client-go/tools/record"
48 utilsysctl "k8s.io/component-helpers/node/util/sysctl"
49 internalapi "k8s.io/cri-api/pkg/apis"
50 podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
51 kubefeatures "k8s.io/kubernetes/pkg/features"
52 "k8s.io/kubernetes/pkg/kubelet/cadvisor"
53 "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
54 "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager"
55 "k8s.io/kubernetes/pkg/kubelet/cm/dra"
56 "k8s.io/kubernetes/pkg/kubelet/cm/memorymanager"
57 memorymanagerstate "k8s.io/kubernetes/pkg/kubelet/cm/memorymanager/state"
58 "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
59 cmutil "k8s.io/kubernetes/pkg/kubelet/cm/util"
60 "k8s.io/kubernetes/pkg/kubelet/config"
61 kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
62 "k8s.io/kubernetes/pkg/kubelet/lifecycle"
63 "k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
64 "k8s.io/kubernetes/pkg/kubelet/stats/pidlimit"
65 "k8s.io/kubernetes/pkg/kubelet/status"
66 "k8s.io/kubernetes/pkg/kubelet/userns/inuserns"
67 schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
68 "k8s.io/kubernetes/pkg/util/oom"
69 )
70
71
72 type systemContainer struct {
73
74 name string
75
76
77 cpuMillicores int64
78
79
80
81 ensureStateFunc func(m cgroups.Manager) error
82
83
84 manager cgroups.Manager
85 }
86
87 func newSystemCgroups(containerName string) (*systemContainer, error) {
88 manager, err := createManager(containerName)
89 if err != nil {
90 return nil, err
91 }
92 return &systemContainer{
93 name: containerName,
94 manager: manager,
95 }, nil
96 }
97
98 type containerManagerImpl struct {
99 sync.RWMutex
100 cadvisorInterface cadvisor.Interface
101 mountUtil mount.Interface
102 NodeConfig
103 status Status
104
105 systemContainers []*systemContainer
106
107 periodicTasks []func()
108
109 subsystems *CgroupSubsystems
110 nodeInfo *v1.Node
111
112 cgroupManager CgroupManager
113
114 capacity v1.ResourceList
115
116 internalCapacity v1.ResourceList
117
118
119 cgroupRoot CgroupName
120
121 recorder record.EventRecorder
122
123 qosContainerManager QOSContainerManager
124
125 deviceManager devicemanager.Manager
126
127 cpuManager cpumanager.Manager
128
129 memoryManager memorymanager.Manager
130
131 topologyManager topologymanager.Manager
132
133 draManager dra.Manager
134 }
135
136 type features struct {
137 cpuHardcapping bool
138 }
139
140 var _ ContainerManager = &containerManagerImpl{}
141
142
143
144
145 func validateSystemRequirements(mountUtil mount.Interface) (features, error) {
146 const (
147 cgroupMountType = "cgroup"
148 localErr = "system validation failed"
149 )
150 var (
151 cpuMountPoint string
152 f features
153 )
154 mountPoints, err := mountUtil.List()
155 if err != nil {
156 return f, fmt.Errorf("%s - %v", localErr, err)
157 }
158
159 if cgroups.IsCgroup2UnifiedMode() {
160 f.cpuHardcapping = true
161 return f, nil
162 }
163
164 expectedCgroups := sets.New("cpu", "cpuacct", "cpuset", "memory")
165 for _, mountPoint := range mountPoints {
166 if mountPoint.Type == cgroupMountType {
167 for _, opt := range mountPoint.Opts {
168 if expectedCgroups.Has(opt) {
169 expectedCgroups.Delete(opt)
170 }
171 if opt == "cpu" {
172 cpuMountPoint = mountPoint.Path
173 }
174 }
175 }
176 }
177
178 if expectedCgroups.Len() > 0 {
179 return f, fmt.Errorf("%s - Following Cgroup subsystem not mounted: %v", localErr, sets.List(expectedCgroups))
180 }
181
182
183
184 periodExists, err := utilpath.Exists(utilpath.CheckFollowSymlink, path.Join(cpuMountPoint, "cpu.cfs_period_us"))
185 if err != nil {
186 klog.ErrorS(err, "Failed to detect if CPU cgroup cpu.cfs_period_us is available")
187 }
188 quotaExists, err := utilpath.Exists(utilpath.CheckFollowSymlink, path.Join(cpuMountPoint, "cpu.cfs_quota_us"))
189 if err != nil {
190 klog.ErrorS(err, "Failed to detect if CPU cgroup cpu.cfs_quota_us is available")
191 }
192 if quotaExists && periodExists {
193 f.cpuHardcapping = true
194 }
195 return f, nil
196 }
197
198
199
200
201 func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.Interface, nodeConfig NodeConfig, failSwapOn bool, recorder record.EventRecorder, kubeClient clientset.Interface) (ContainerManager, error) {
202 subsystems, err := GetCgroupSubsystems()
203 if err != nil {
204 return nil, fmt.Errorf("failed to get mounted cgroup subsystems: %v", err)
205 }
206
207 if failSwapOn {
208
209 swapFile := "/proc/swaps"
210 swapData, err := os.ReadFile(swapFile)
211 if err != nil {
212 if os.IsNotExist(err) {
213 klog.InfoS("File does not exist, assuming that swap is disabled", "path", swapFile)
214 } else {
215 return nil, err
216 }
217 } else {
218 swapData = bytes.TrimSpace(swapData)
219 swapLines := strings.Split(string(swapData), "\n")
220
221
222
223 if len(swapLines) > 1 {
224 return nil, fmt.Errorf("running with swap on is not supported, please disable swap! or set --fail-swap-on flag to false. /proc/swaps contained: %v", swapLines)
225 }
226 }
227 }
228
229 var internalCapacity = v1.ResourceList{}
230
231
232
233 machineInfo, err := cadvisorInterface.MachineInfo()
234 if err != nil {
235 return nil, err
236 }
237 capacity := cadvisor.CapacityFromMachineInfo(machineInfo)
238 for k, v := range capacity {
239 internalCapacity[k] = v
240 }
241 pidlimits, err := pidlimit.Stats()
242 if err == nil && pidlimits != nil && pidlimits.MaxPID != nil {
243 internalCapacity[pidlimit.PIDs] = *resource.NewQuantity(
244 int64(*pidlimits.MaxPID),
245 resource.DecimalSI)
246 }
247
248
249 cgroupRoot := ParseCgroupfsToCgroupName(nodeConfig.CgroupRoot)
250 cgroupManager := NewCgroupManager(subsystems, nodeConfig.CgroupDriver)
251
252 if nodeConfig.CgroupsPerQOS {
253
254 if nodeConfig.CgroupRoot == "" {
255 return nil, fmt.Errorf("invalid configuration: cgroups-per-qos was specified and cgroup-root was not specified. To enable the QoS cgroup hierarchy you need to specify a valid cgroup-root")
256 }
257
258
259
260
261
262 if err := cgroupManager.Validate(cgroupRoot); err != nil {
263 return nil, fmt.Errorf("invalid configuration: %w", err)
264 }
265 klog.InfoS("Container manager verified user specified cgroup-root exists", "cgroupRoot", cgroupRoot)
266
267
268 cgroupRoot = NewCgroupName(cgroupRoot, defaultNodeAllocatableCgroupName)
269 }
270 klog.InfoS("Creating Container Manager object based on Node Config", "nodeConfig", nodeConfig)
271
272 qosContainerManager, err := NewQOSContainerManager(subsystems, cgroupRoot, nodeConfig, cgroupManager)
273 if err != nil {
274 return nil, err
275 }
276
277 cm := &containerManagerImpl{
278 cadvisorInterface: cadvisorInterface,
279 mountUtil: mountUtil,
280 NodeConfig: nodeConfig,
281 subsystems: subsystems,
282 cgroupManager: cgroupManager,
283 capacity: capacity,
284 internalCapacity: internalCapacity,
285 cgroupRoot: cgroupRoot,
286 recorder: recorder,
287 qosContainerManager: qosContainerManager,
288 }
289
290 cm.topologyManager, err = topologymanager.NewManager(
291 machineInfo.Topology,
292 nodeConfig.TopologyManagerPolicy,
293 nodeConfig.TopologyManagerScope,
294 nodeConfig.TopologyManagerPolicyOptions,
295 )
296
297 if err != nil {
298 return nil, err
299 }
300
301 klog.InfoS("Creating device plugin manager")
302 cm.deviceManager, err = devicemanager.NewManagerImpl(machineInfo.Topology, cm.topologyManager)
303 if err != nil {
304 return nil, err
305 }
306 cm.topologyManager.AddHintProvider(cm.deviceManager)
307
308
309 if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.DynamicResourceAllocation) {
310 klog.InfoS("Creating Dynamic Resource Allocation (DRA) manager")
311 cm.draManager, err = dra.NewManagerImpl(kubeClient, nodeConfig.KubeletRootDir, nodeConfig.NodeName)
312 if err != nil {
313 return nil, err
314 }
315 }
316
317
318 cm.cpuManager, err = cpumanager.NewManager(
319 nodeConfig.CPUManagerPolicy,
320 nodeConfig.CPUManagerPolicyOptions,
321 nodeConfig.CPUManagerReconcilePeriod,
322 machineInfo,
323 nodeConfig.NodeAllocatableConfig.ReservedSystemCPUs,
324 cm.GetNodeAllocatableReservation(),
325 nodeConfig.KubeletRootDir,
326 cm.topologyManager,
327 )
328 if err != nil {
329 klog.ErrorS(err, "Failed to initialize cpu manager")
330 return nil, err
331 }
332 cm.topologyManager.AddHintProvider(cm.cpuManager)
333
334 if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.MemoryManager) {
335 cm.memoryManager, err = memorymanager.NewManager(
336 nodeConfig.ExperimentalMemoryManagerPolicy,
337 machineInfo,
338 cm.GetNodeAllocatableReservation(),
339 nodeConfig.ExperimentalMemoryManagerReservedMemory,
340 nodeConfig.KubeletRootDir,
341 cm.topologyManager,
342 )
343 if err != nil {
344 klog.ErrorS(err, "Failed to initialize memory manager")
345 return nil, err
346 }
347 cm.topologyManager.AddHintProvider(cm.memoryManager)
348 }
349
350 return cm, nil
351 }
352
353
354
355
356 func (cm *containerManagerImpl) NewPodContainerManager() PodContainerManager {
357 if cm.NodeConfig.CgroupsPerQOS {
358 return &podContainerManagerImpl{
359 qosContainersInfo: cm.GetQOSContainersInfo(),
360 subsystems: cm.subsystems,
361 cgroupManager: cm.cgroupManager,
362 podPidsLimit: cm.PodPidsLimit,
363 enforceCPULimits: cm.EnforceCPULimits,
364
365
366 cpuCFSQuotaPeriod: uint64(cm.CPUCFSQuotaPeriod / time.Microsecond),
367 }
368 }
369 return &podContainerManagerNoop{
370 cgroupRoot: cm.cgroupRoot,
371 }
372 }
373
374 func (cm *containerManagerImpl) InternalContainerLifecycle() InternalContainerLifecycle {
375 return &internalContainerLifecycleImpl{cm.cpuManager, cm.memoryManager, cm.topologyManager}
376 }
377
378
379 func createManager(containerName string) (cgroups.Manager, error) {
380 cg := &configs.Cgroup{
381 Parent: "/",
382 Name: containerName,
383 Resources: &configs.Resources{
384 SkipDevices: true,
385 },
386 Systemd: false,
387 }
388
389 return manager.New(cg)
390 }
391
392 type KernelTunableBehavior string
393
394 const (
395 KernelTunableWarn KernelTunableBehavior = "warn"
396 KernelTunableError KernelTunableBehavior = "error"
397 KernelTunableModify KernelTunableBehavior = "modify"
398 )
399
400
401
402 func setupKernelTunables(option KernelTunableBehavior) error {
403 desiredState := map[string]int{
404 utilsysctl.VMOvercommitMemory: utilsysctl.VMOvercommitMemoryAlways,
405 utilsysctl.VMPanicOnOOM: utilsysctl.VMPanicOnOOMInvokeOOMKiller,
406 utilsysctl.KernelPanic: utilsysctl.KernelPanicRebootTimeout,
407 utilsysctl.KernelPanicOnOops: utilsysctl.KernelPanicOnOopsAlways,
408 utilsysctl.RootMaxKeys: utilsysctl.RootMaxKeysSetting,
409 utilsysctl.RootMaxBytes: utilsysctl.RootMaxBytesSetting,
410 }
411
412 sysctl := utilsysctl.New()
413
414 errList := []error{}
415 for flag, expectedValue := range desiredState {
416 val, err := sysctl.GetSysctl(flag)
417 if err != nil {
418 errList = append(errList, err)
419 continue
420 }
421 if val == expectedValue {
422 continue
423 }
424
425 switch option {
426 case KernelTunableError:
427 errList = append(errList, fmt.Errorf("invalid kernel flag: %v, expected value: %v, actual value: %v", flag, expectedValue, val))
428 case KernelTunableWarn:
429 klog.V(2).InfoS("Invalid kernel flag", "flag", flag, "expectedValue", expectedValue, "actualValue", val)
430 case KernelTunableModify:
431 klog.V(2).InfoS("Updating kernel flag", "flag", flag, "expectedValue", expectedValue, "actualValue", val)
432 err = sysctl.SetSysctl(flag, expectedValue)
433 if err != nil {
434 if inuserns.RunningInUserNS() {
435 if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.KubeletInUserNamespace) {
436 klog.V(2).InfoS("Updating kernel flag failed (running in UserNS, ignoring)", "flag", flag, "err", err)
437 continue
438 }
439 klog.ErrorS(err, "Updating kernel flag failed (Hint: enable KubeletInUserNamespace feature flag to ignore the error)", "flag", flag)
440 }
441 errList = append(errList, err)
442 }
443 }
444 }
445 return utilerrors.NewAggregate(errList)
446 }
447
448 func (cm *containerManagerImpl) setupNode(activePods ActivePodsFunc) error {
449 f, err := validateSystemRequirements(cm.mountUtil)
450 if err != nil {
451 return err
452 }
453 if !f.cpuHardcapping {
454 cm.status.SoftRequirements = fmt.Errorf("CPU hardcapping unsupported")
455 }
456 b := KernelTunableModify
457 if cm.GetNodeConfig().ProtectKernelDefaults {
458 b = KernelTunableError
459 }
460 if err := setupKernelTunables(b); err != nil {
461 return err
462 }
463
464
465 if cm.NodeConfig.CgroupsPerQOS {
466 if err := cm.createNodeAllocatableCgroups(); err != nil {
467 return err
468 }
469 err = cm.qosContainerManager.Start(cm.GetNodeAllocatableAbsolute, activePods)
470 if err != nil {
471 return fmt.Errorf("failed to initialize top level QOS containers: %v", err)
472 }
473 }
474
475
476 if err := cm.enforceNodeAllocatableCgroups(); err != nil {
477 return err
478 }
479
480 systemContainers := []*systemContainer{}
481
482 if cm.SystemCgroupsName != "" {
483 if cm.SystemCgroupsName == "/" {
484 return fmt.Errorf("system container cannot be root (\"/\")")
485 }
486 cont, err := newSystemCgroups(cm.SystemCgroupsName)
487 if err != nil {
488 return err
489 }
490 cont.ensureStateFunc = func(manager cgroups.Manager) error {
491 return ensureSystemCgroups("/", manager)
492 }
493 systemContainers = append(systemContainers, cont)
494 }
495
496 if cm.KubeletCgroupsName != "" {
497 cont, err := newSystemCgroups(cm.KubeletCgroupsName)
498 if err != nil {
499 return err
500 }
501
502 cont.ensureStateFunc = func(_ cgroups.Manager) error {
503 return ensureProcessInContainerWithOOMScore(os.Getpid(), int(cm.KubeletOOMScoreAdj), cont.manager)
504 }
505 systemContainers = append(systemContainers, cont)
506 } else {
507 cm.periodicTasks = append(cm.periodicTasks, func() {
508 if err := ensureProcessInContainerWithOOMScore(os.Getpid(), int(cm.KubeletOOMScoreAdj), nil); err != nil {
509 klog.ErrorS(err, "Failed to ensure process in container with oom score")
510 return
511 }
512 cont, err := getContainer(os.Getpid())
513 if err != nil {
514 klog.ErrorS(err, "Failed to find cgroups of kubelet")
515 return
516 }
517 cm.Lock()
518 defer cm.Unlock()
519
520 cm.KubeletCgroupsName = cont
521 })
522 }
523
524 cm.systemContainers = systemContainers
525 return nil
526 }
527
528 func (cm *containerManagerImpl) GetNodeConfig() NodeConfig {
529 cm.RLock()
530 defer cm.RUnlock()
531 return cm.NodeConfig
532 }
533
534
535 func (cm *containerManagerImpl) GetPodCgroupRoot() string {
536 return cm.cgroupManager.Name(cm.cgroupRoot)
537 }
538
539 func (cm *containerManagerImpl) GetMountedSubsystems() *CgroupSubsystems {
540 return cm.subsystems
541 }
542
543 func (cm *containerManagerImpl) GetQOSContainersInfo() QOSContainersInfo {
544 return cm.qosContainerManager.GetQOSContainersInfo()
545 }
546
547 func (cm *containerManagerImpl) UpdateQOSCgroups() error {
548 return cm.qosContainerManager.UpdateCgroups()
549 }
550
551 func (cm *containerManagerImpl) Status() Status {
552 cm.RLock()
553 defer cm.RUnlock()
554 return cm.status
555 }
556
557 func (cm *containerManagerImpl) Start(node *v1.Node,
558 activePods ActivePodsFunc,
559 sourcesReady config.SourcesReady,
560 podStatusProvider status.PodStatusProvider,
561 runtimeService internalapi.RuntimeService,
562 localStorageCapacityIsolation bool) error {
563 ctx := context.Background()
564
565 containerMap, containerRunningSet := buildContainerMapAndRunningSetFromRuntime(ctx, runtimeService)
566
567
568 err := cm.cpuManager.Start(cpumanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService, containerMap)
569 if err != nil {
570 return fmt.Errorf("start cpu manager error: %v", err)
571 }
572
573
574 if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.MemoryManager) {
575 containerMap, _ := buildContainerMapAndRunningSetFromRuntime(ctx, runtimeService)
576 err := cm.memoryManager.Start(memorymanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService, containerMap)
577 if err != nil {
578 return fmt.Errorf("start memory manager error: %v", err)
579 }
580 }
581
582
583
584 cm.nodeInfo = node
585
586 if localStorageCapacityIsolation {
587 rootfs, err := cm.cadvisorInterface.RootFsInfo()
588 if err != nil {
589 return fmt.Errorf("failed to get rootfs info: %v", err)
590 }
591 for rName, rCap := range cadvisor.EphemeralStorageCapacityFromFsInfo(rootfs) {
592 cm.capacity[rName] = rCap
593 }
594 }
595
596
597 if err := cm.validateNodeAllocatable(); err != nil {
598 return err
599 }
600
601
602 if err := cm.setupNode(activePods); err != nil {
603 return err
604 }
605
606
607 hasEnsureStateFuncs := false
608 for _, cont := range cm.systemContainers {
609 if cont.ensureStateFunc != nil {
610 hasEnsureStateFuncs = true
611 break
612 }
613 }
614 if hasEnsureStateFuncs {
615
616 go wait.Until(func() {
617 for _, cont := range cm.systemContainers {
618 if cont.ensureStateFunc != nil {
619 if err := cont.ensureStateFunc(cont.manager); err != nil {
620 klog.InfoS("Failed to ensure state", "containerName", cont.name, "err", err)
621 }
622 }
623 }
624 }, time.Minute, wait.NeverStop)
625
626 }
627
628 if len(cm.periodicTasks) > 0 {
629 go wait.Until(func() {
630 for _, task := range cm.periodicTasks {
631 if task != nil {
632 task()
633 }
634 }
635 }, 5*time.Minute, wait.NeverStop)
636 }
637
638
639 if err := cm.deviceManager.Start(devicemanager.ActivePodsFunc(activePods), sourcesReady, containerMap, containerRunningSet); err != nil {
640 return err
641 }
642
643 return nil
644 }
645
646 func (cm *containerManagerImpl) GetPluginRegistrationHandler() cache.PluginHandler {
647 return cm.deviceManager.GetWatcherHandler()
648 }
649
650
651 func (cm *containerManagerImpl) GetResources(pod *v1.Pod, container *v1.Container) (*kubecontainer.RunContainerOptions, error) {
652 opts := &kubecontainer.RunContainerOptions{}
653 if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.DynamicResourceAllocation) {
654 resOpts, err := cm.draManager.GetResources(pod, container)
655 if err != nil {
656 return nil, err
657 }
658
659
660
661 opts.Annotations = append(opts.Annotations, resOpts.Annotations...)
662 opts.CDIDevices = append(opts.CDIDevices, resOpts.CDIDevices...)
663 }
664
665
666 devOpts, err := cm.deviceManager.GetDeviceRunContainerOptions(pod, container)
667 if err != nil {
668 return nil, err
669 } else if devOpts == nil {
670 return opts, nil
671 }
672 opts.Devices = append(opts.Devices, devOpts.Devices...)
673 opts.Mounts = append(opts.Mounts, devOpts.Mounts...)
674 opts.Envs = append(opts.Envs, devOpts.Envs...)
675 opts.Annotations = append(opts.Annotations, devOpts.Annotations...)
676 opts.CDIDevices = append(opts.CDIDevices, devOpts.CDIDevices...)
677 return opts, nil
678 }
679
680 func (cm *containerManagerImpl) UpdatePluginResources(node *schedulerframework.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
681 return cm.deviceManager.UpdatePluginResources(node, attrs)
682 }
683
684 func (cm *containerManagerImpl) GetAllocateResourcesPodAdmitHandler() lifecycle.PodAdmitHandler {
685 return cm.topologyManager
686 }
687
688 func (cm *containerManagerImpl) SystemCgroupsLimit() v1.ResourceList {
689 cpuLimit := int64(0)
690
691
692 for _, cont := range cm.systemContainers {
693 cpuLimit += cont.cpuMillicores
694 }
695
696 return v1.ResourceList{
697 v1.ResourceCPU: *resource.NewMilliQuantity(
698 cpuLimit,
699 resource.DecimalSI),
700 }
701 }
702
703 func isProcessRunningInHost(pid int) (bool, error) {
704
705 initPidNs, err := os.Readlink("/proc/1/ns/pid")
706 if err != nil {
707 return false, fmt.Errorf("failed to find pid namespace of init process")
708 }
709 klog.V(10).InfoS("Found init PID namespace", "namespace", initPidNs)
710 processPidNs, err := os.Readlink(fmt.Sprintf("/proc/%d/ns/pid", pid))
711 if err != nil {
712 return false, fmt.Errorf("failed to find pid namespace of process %q", pid)
713 }
714 klog.V(10).InfoS("Process info", "pid", pid, "namespace", processPidNs)
715 return initPidNs == processPidNs, nil
716 }
717
718 func ensureProcessInContainerWithOOMScore(pid int, oomScoreAdj int, manager cgroups.Manager) error {
719 if runningInHost, err := isProcessRunningInHost(pid); err != nil {
720
721 return err
722 } else if !runningInHost {
723
724 klog.V(2).InfoS("PID is not running in the host namespace", "pid", pid)
725 return nil
726 }
727
728 var errs []error
729 if manager != nil {
730 cont, err := getContainer(pid)
731 if err != nil {
732 errs = append(errs, fmt.Errorf("failed to find container of PID %d: %v", pid, err))
733 }
734
735 name := ""
736 cgroups, err := manager.GetCgroups()
737 if err != nil {
738 errs = append(errs, fmt.Errorf("failed to get cgroups for %d: %v", pid, err))
739 } else {
740 name = cgroups.Name
741 }
742
743 if cont != name {
744 err = manager.Apply(pid)
745 if err != nil {
746 errs = append(errs, fmt.Errorf("failed to move PID %d (in %q) to %q: %v", pid, cont, name, err))
747 }
748 }
749 }
750
751
752 oomAdjuster := oom.NewOOMAdjuster()
753 klog.V(5).InfoS("Attempting to apply oom_score_adj to process", "oomScoreAdj", oomScoreAdj, "pid", pid)
754 if err := oomAdjuster.ApplyOOMScoreAdj(pid, oomScoreAdj); err != nil {
755 klog.V(3).InfoS("Failed to apply oom_score_adj to process", "oomScoreAdj", oomScoreAdj, "pid", pid, "err", err)
756 errs = append(errs, fmt.Errorf("failed to apply oom score %d to PID %d: %v", oomScoreAdj, pid, err))
757 }
758 return utilerrors.NewAggregate(errs)
759 }
760
761
762
763
764 func getContainer(pid int) (string, error) {
765 cgs, err := cgroups.ParseCgroupFile(fmt.Sprintf("/proc/%d/cgroup", pid))
766 if err != nil {
767 return "", err
768 }
769
770 if cgroups.IsCgroup2UnifiedMode() {
771 c, found := cgs[""]
772 if !found {
773 return "", cgroups.NewNotFoundError("unified")
774 }
775 return c, nil
776 }
777
778 cpu, found := cgs["cpu"]
779 if !found {
780 return "", cgroups.NewNotFoundError("cpu")
781 }
782 memory, found := cgs["memory"]
783 if !found {
784 return "", cgroups.NewNotFoundError("memory")
785 }
786
787
788 if cpu != memory {
789 return "", fmt.Errorf("cpu and memory cgroup hierarchy not unified. cpu: %s, memory: %s", cpu, memory)
790 }
791
792
793
794
795
796
797
798
799
800
801 if systemd, found := cgs["name=systemd"]; found {
802 if systemd != cpu {
803 klog.InfoS("CPUAccounting not enabled for process", "pid", pid)
804 }
805 if systemd != memory {
806 klog.InfoS("MemoryAccounting not enabled for process", "pid", pid)
807 }
808 return systemd, nil
809 }
810
811 return cpu, nil
812 }
813
814
815
816
817
818
819 func ensureSystemCgroups(rootCgroupPath string, manager cgroups.Manager) error {
820
821
822 var finalErr error
823 for i := 0; i <= 10; i++ {
824 allPids, err := cmutil.GetPids(rootCgroupPath)
825 if err != nil {
826 finalErr = fmt.Errorf("failed to list PIDs for root: %v", err)
827 continue
828 }
829
830
831 pids := make([]int, 0, len(allPids))
832 for _, pid := range allPids {
833 if pid == 1 || isKernelPid(pid) {
834 continue
835 }
836
837 pids = append(pids, pid)
838 }
839
840
841 if len(pids) == 0 {
842 return nil
843 }
844
845 klog.V(3).InfoS("Moving non-kernel processes", "pids", pids)
846 for _, pid := range pids {
847 err := manager.Apply(pid)
848 if err != nil {
849 name := ""
850 cgroups, err := manager.GetCgroups()
851 if err == nil {
852 name = cgroups.Name
853 }
854
855 finalErr = fmt.Errorf("failed to move PID %d into the system container %q: %v", pid, name, err)
856 }
857 }
858
859 }
860
861 return finalErr
862 }
863
864
865 func isKernelPid(pid int) bool {
866
867 _, err := os.Readlink(fmt.Sprintf("/proc/%d/exe", pid))
868 return err != nil && os.IsNotExist(err)
869 }
870
871
872
873 func (cm *containerManagerImpl) GetCapacity(localStorageCapacityIsolation bool) v1.ResourceList {
874 if localStorageCapacityIsolation {
875
876 if _, ok := cm.capacity[v1.ResourceEphemeralStorage]; !ok {
877
878 if cm.cadvisorInterface != nil {
879 rootfs, err := cm.cadvisorInterface.RootFsInfo()
880 if err != nil {
881 klog.ErrorS(err, "Unable to get rootfs data from cAdvisor interface")
882
883 return cm.capacity
884 }
885
886
887 capacityWithEphemeralStorage := v1.ResourceList{}
888 for rName, rQuant := range cm.capacity {
889 capacityWithEphemeralStorage[rName] = rQuant
890 }
891 capacityWithEphemeralStorage[v1.ResourceEphemeralStorage] = cadvisor.EphemeralStorageCapacityFromFsInfo(rootfs)[v1.ResourceEphemeralStorage]
892 return capacityWithEphemeralStorage
893 }
894 }
895 }
896 return cm.capacity
897 }
898
899 func (cm *containerManagerImpl) GetDevicePluginResourceCapacity() (v1.ResourceList, v1.ResourceList, []string) {
900 return cm.deviceManager.GetCapacity()
901 }
902
903 func (cm *containerManagerImpl) GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices {
904 return containerDevicesFromResourceDeviceInstances(cm.deviceManager.GetDevices(podUID, containerName))
905 }
906
907 func (cm *containerManagerImpl) GetAllocatableDevices() []*podresourcesapi.ContainerDevices {
908 return containerDevicesFromResourceDeviceInstances(cm.deviceManager.GetAllocatableDevices())
909 }
910
911 func int64Slice(in []int) []int64 {
912 out := make([]int64, len(in))
913 for i := range in {
914 out[i] = int64(in[i])
915 }
916 return out
917 }
918
919 func (cm *containerManagerImpl) GetCPUs(podUID, containerName string) []int64 {
920 if cm.cpuManager != nil {
921 return int64Slice(cm.cpuManager.GetExclusiveCPUs(podUID, containerName).UnsortedList())
922 }
923 return []int64{}
924 }
925
926 func (cm *containerManagerImpl) GetAllocatableCPUs() []int64 {
927 if cm.cpuManager != nil {
928 return int64Slice(cm.cpuManager.GetAllocatableCPUs().UnsortedList())
929 }
930 return []int64{}
931 }
932
933 func (cm *containerManagerImpl) GetMemory(podUID, containerName string) []*podresourcesapi.ContainerMemory {
934 if cm.memoryManager == nil {
935 return []*podresourcesapi.ContainerMemory{}
936 }
937
938 return containerMemoryFromBlock(cm.memoryManager.GetMemory(podUID, containerName))
939 }
940
941 func (cm *containerManagerImpl) GetAllocatableMemory() []*podresourcesapi.ContainerMemory {
942 if cm.memoryManager == nil {
943 return []*podresourcesapi.ContainerMemory{}
944 }
945
946 return containerMemoryFromBlock(cm.memoryManager.GetAllocatableMemory())
947 }
948
949 func (cm *containerManagerImpl) GetDynamicResources(pod *v1.Pod, container *v1.Container) []*podresourcesapi.DynamicResource {
950 if !utilfeature.DefaultFeatureGate.Enabled(kubefeatures.DynamicResourceAllocation) {
951 return []*podresourcesapi.DynamicResource{}
952 }
953
954 var containerDynamicResources []*podresourcesapi.DynamicResource
955 containerClaimInfos, err := cm.draManager.GetContainerClaimInfos(pod, container)
956 if err != nil {
957 klog.ErrorS(err, "Unable to get container claim info state")
958 return []*podresourcesapi.DynamicResource{}
959 }
960 for _, containerClaimInfo := range containerClaimInfos {
961 var claimResources []*podresourcesapi.ClaimResource
962 containerClaimInfo.RLock()
963
964
965
966
967 for _, klPluginCdiDevices := range containerClaimInfo.CDIDevices {
968 var cdiDevices []*podresourcesapi.CDIDevice
969 for _, cdiDevice := range klPluginCdiDevices {
970 cdiDevices = append(cdiDevices, &podresourcesapi.CDIDevice{Name: cdiDevice})
971 }
972 claimResources = append(claimResources, &podresourcesapi.ClaimResource{CDIDevices: cdiDevices})
973 }
974 containerClaimInfo.RUnlock()
975 containerDynamicResource := podresourcesapi.DynamicResource{
976 ClassName: containerClaimInfo.ClassName,
977 ClaimName: containerClaimInfo.ClaimName,
978 ClaimNamespace: containerClaimInfo.Namespace,
979 ClaimResources: claimResources,
980 }
981 containerDynamicResources = append(containerDynamicResources, &containerDynamicResource)
982 }
983 return containerDynamicResources
984 }
985
986 func (cm *containerManagerImpl) ShouldResetExtendedResourceCapacity() bool {
987 return cm.deviceManager.ShouldResetExtendedResourceCapacity()
988 }
989
990 func (cm *containerManagerImpl) UpdateAllocatedDevices() {
991 cm.deviceManager.UpdateAllocatedDevices()
992 }
993
994 func containerMemoryFromBlock(blocks []memorymanagerstate.Block) []*podresourcesapi.ContainerMemory {
995 var containerMemories []*podresourcesapi.ContainerMemory
996
997 for _, b := range blocks {
998 containerMemory := podresourcesapi.ContainerMemory{
999 MemoryType: string(b.Type),
1000 Size_: b.Size,
1001 Topology: &podresourcesapi.TopologyInfo{
1002 Nodes: []*podresourcesapi.NUMANode{},
1003 },
1004 }
1005
1006 for _, numaNodeID := range b.NUMAAffinity {
1007 containerMemory.Topology.Nodes = append(containerMemory.Topology.Nodes, &podresourcesapi.NUMANode{ID: int64(numaNodeID)})
1008 }
1009
1010 containerMemories = append(containerMemories, &containerMemory)
1011 }
1012
1013 return containerMemories
1014 }
1015
1016 func (cm *containerManagerImpl) PrepareDynamicResources(pod *v1.Pod) error {
1017 return cm.draManager.PrepareResources(pod)
1018 }
1019
1020 func (cm *containerManagerImpl) UnprepareDynamicResources(pod *v1.Pod) error {
1021 return cm.draManager.UnprepareResources(pod)
1022 }
1023
1024 func (cm *containerManagerImpl) PodMightNeedToUnprepareResources(UID types.UID) bool {
1025 return cm.draManager.PodMightNeedToUnprepareResources(UID)
1026 }
1027
View as plain text