     1  //go:build linux
     2  // +build linux
    20  package cm
    22  import (
    23  	"bytes"
    24  	"context"
    25  	"fmt"
    26  	"os"
    27  	"path"
    28  	"strings"
    29  	"sync"
    30  	"time"
    32  	"github.com/opencontainers/runc/libcontainer/cgroups"
    33  	"github.com/opencontainers/runc/libcontainer/cgroups/manager"
    34  	"github.com/opencontainers/runc/libcontainer/configs"
    35  	"k8s.io/klog/v2"
    36  	"k8s.io/mount-utils"
    37  	utilpath "k8s.io/utils/path"
    39  	v1 "k8s.io/api/core/v1"
    40  	"k8s.io/apimachinery/pkg/api/resource"
    41  	"k8s.io/apimachinery/pkg/types"
    42  	utilerrors "k8s.io/apimachinery/pkg/util/errors"
    43  	"k8s.io/apimachinery/pkg/util/sets"
    44  	"k8s.io/apimachinery/pkg/util/wait"
    45  	utilfeature "k8s.io/apiserver/pkg/util/feature"
    46  	clientset "k8s.io/client-go/kubernetes"
    47  	"k8s.io/client-go/tools/record"
    48  	utilsysctl "k8s.io/component-helpers/node/util/sysctl"
    49  	internalapi "k8s.io/cri-api/pkg/apis"
    50  	podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
    51  	kubefeatures "k8s.io/kubernetes/pkg/features"
    52  	"k8s.io/kubernetes/pkg/kubelet/cadvisor"
    53  	"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
    54  	"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager"
    55  	"k8s.io/kubernetes/pkg/kubelet/cm/dra"
    56  	"k8s.io/kubernetes/pkg/kubelet/cm/memorymanager"
    57  	memorymanagerstate "k8s.io/kubernetes/pkg/kubelet/cm/memorymanager/state"
    58  	"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
    59  	cmutil "k8s.io/kubernetes/pkg/kubelet/cm/util"
    60  	"k8s.io/kubernetes/pkg/kubelet/config"
    61  	kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
    62  	"k8s.io/kubernetes/pkg/kubelet/lifecycle"
    63  	"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
    64  	"k8s.io/kubernetes/pkg/kubelet/stats/pidlimit"
    65  	"k8s.io/kubernetes/pkg/kubelet/status"
    66  	"k8s.io/kubernetes/pkg/kubelet/userns/inuserns"
    67  	schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
    68  	"k8s.io/kubernetes/pkg/util/oom"
    69  )
    71  // A non-user container tracked by the Kubelet.
    72  type systemContainer struct {
    73  	// Absolute name of the container.
    74  	name string
    76  	// CPU limit in millicores.
    77  	cpuMillicores int64
    79  	// Function that ensures the state of the container.
    80  	// m is the cgroup manager for the specified container.
    81  	ensureStateFunc func(m cgroups.Manager) error
    83  	// Manager for the cgroups of the external container.
    84  	manager cgroups.Manager
    85  }
    87  func newSystemCgroups(containerName string) (*systemContainer, error) {
    88  	manager, err := createManager(containerName)
    89  	if err != nil {
    90  		return nil, err
    91  	}
    92  	return &systemContainer{
    93  		name:    containerName,
    94  		manager: manager,
    95  	}, nil
    96  }
    98  type containerManagerImpl struct {
    99  	sync.RWMutex
   100  	cadvisorInterface cadvisor.Interface
   101  	mountUtil         mount.Interface
   102  	NodeConfig
   103  	status Status
   104  	// External containers being managed.
   105  	systemContainers []*systemContainer
   106  	// Tasks that are run periodically
   107  	periodicTasks []func()
   108  	// Holds all the mounted cgroup subsystems
   109  	subsystems *CgroupSubsystems
   110  	nodeInfo   *v1.Node
   111  	// Interface for cgroup management
   112  	cgroupManager CgroupManager
   113  	// Capacity of this node.
   114  	capacity v1.ResourceList
   115  	// Capacity of this node, including internal resources.
   116  	internalCapacity v1.ResourceList
   117  	// Absolute cgroupfs path to a cgroup that Kubelet needs to place all pods under.
   118  	// This path include a top level container for enforcing Node Allocatable.
   119  	cgroupRoot CgroupName
   120  	// Event recorder interface.
   121  	recorder record.EventRecorder
   122  	// Interface for QoS cgroup management
   123  	qosContainerManager QOSContainerManager
   124  	// Interface for exporting and allocating devices reported by device plugins.
   125  	deviceManager devicemanager.Manager
   126  	// Interface for CPU affinity management.
   127  	cpuManager cpumanager.Manager
   128  	// Interface for memory affinity management.
   129  	memoryManager memorymanager.Manager
   130  	// Interface for Topology resource co-ordination
   131  	topologyManager topologymanager.Manager
   132  	// Interface for Dynamic Resource Allocation management.
   133  	draManager dra.Manager
   134  }
   136  type features struct {
   137  	cpuHardcapping bool
   138  }
   140  var _ ContainerManager = &containerManagerImpl{}
   142  // checks if the required cgroups subsystems are mounted.
   143  // As of now, only 'cpu' and 'memory' are required.
   144  // cpu quota is a soft requirement.
   145  func validateSystemRequirements(mountUtil mount.Interface) (features, error) {
   146  	const (
   147  		cgroupMountType = "cgroup"
   148  		localErr        = "system validation failed"
   149  	)
   150  	var (
   151  		cpuMountPoint string
   152  		f             features
   153  	)
   154  	mountPoints, err := mountUtil.List()
   155  	if err != nil {
   156  		return f, fmt.Errorf("%s - %v", localErr, err)
   157  	}
   159  	if cgroups.IsCgroup2UnifiedMode() {
   160  		f.cpuHardcapping = true
   161  		return f, nil
   162  	}
   164  	expectedCgroups := sets.New("cpu", "cpuacct", "cpuset", "memory")
   165  	for _, mountPoint := range mountPoints {
   166  		if mountPoint.Type == cgroupMountType {
   167  			for _, opt := range mountPoint.Opts {
   168  				if expectedCgroups.Has(opt) {
   169  					expectedCgroups.Delete(opt)
   170  				}
   171  				if opt == "cpu" {
   172  					cpuMountPoint = mountPoint.Path
   173  				}
   174  			}
   175  		}
   176  	}
   178  	if expectedCgroups.Len() > 0 {
   179  		return f, fmt.Errorf("%s - Following Cgroup subsystem not mounted: %v", localErr, sets.List(expectedCgroups))
   180  	}
   182  	// Check if cpu quota is available.
   183  	// CPU cgroup is required and so it expected to be mounted at this point.
   184  	periodExists, err := utilpath.Exists(utilpath.CheckFollowSymlink, path.Join(cpuMountPoint, "cpu.cfs_period_us"))
   185  	if err != nil {
   186  		klog.ErrorS(err, "Failed to detect if CPU cgroup cpu.cfs_period_us is available")
   187  	}
   188  	quotaExists, err := utilpath.Exists(utilpath.CheckFollowSymlink, path.Join(cpuMountPoint, "cpu.cfs_quota_us"))
   189  	if err != nil {
   190  		klog.ErrorS(err, "Failed to detect if CPU cgroup cpu.cfs_quota_us is available")
   191  	}
   192  	if quotaExists && periodExists {
   193  		f.cpuHardcapping = true
   194  	}
   195  	return f, nil
   196  }
   198  // TODO(vmarmol): Add limits to the system containers.
   199  // Takes the absolute name of the specified containers.
   200  // Empty container name disables use of the specified container.
   201  func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.Interface, nodeConfig NodeConfig, failSwapOn bool, recorder record.EventRecorder, kubeClient clientset.Interface) (ContainerManager, error) {
   202  	subsystems, err := GetCgroupSubsystems()
   203  	if err != nil {
   204  		return nil, fmt.Errorf("failed to get mounted cgroup subsystems: %v", err)
   205  	}
   207  	if failSwapOn {
   208  		// Check whether swap is enabled. The Kubelet does not support running with swap enabled.
   209  		swapFile := "/proc/swaps"
   210  		swapData, err := os.ReadFile(swapFile)
   211  		if err != nil {
   212  			if os.IsNotExist(err) {
   213  				klog.InfoS("File does not exist, assuming that swap is disabled", "path", swapFile)
   214  			} else {
   215  				return nil, err
   216  			}
   217  		} else {
   218  			swapData = bytes.TrimSpace(swapData) // extra trailing \n
   219  			swapLines := strings.Split(string(swapData), "\n")
   221  			// If there is more than one line (table headers) in /proc/swaps, swap is enabled and we should
   222  			// error out unless --fail-swap-on is set to false.
   223  			if len(swapLines) > 1 {
   224  				return nil, fmt.Errorf("running with swap on is not supported, please disable swap! or set --fail-swap-on flag to false. /proc/swaps contained: %v", swapLines)
   225  			}
   226  		}
   227  	}
   229  	var internalCapacity = v1.ResourceList{}
   230  	// It is safe to invoke `MachineInfo` on cAdvisor before logically initializing cAdvisor here because
   231  	// machine info is computed and cached once as part of cAdvisor object creation.
   232  	// But `RootFsInfo` and `ImagesFsInfo` are not available at this moment so they will be called later during manager starts
   233  	machineInfo, err := cadvisorInterface.MachineInfo()
   234  	if err != nil {
   235  		return nil, err
   236  	}
   237  	capacity := cadvisor.CapacityFromMachineInfo(machineInfo)
   238  	for k, v := range capacity {
   239  		internalCapacity[k] = v
   240  	}
   241  	pidlimits, err := pidlimit.Stats()
   242  	if err == nil && pidlimits != nil && pidlimits.MaxPID != nil {
   243  		internalCapacity[pidlimit.PIDs] = *resource.NewQuantity(
   244  			int64(*pidlimits.MaxPID),
   245  			resource.DecimalSI)
   246  	}
   248  	// Turn CgroupRoot from a string (in cgroupfs path format) to internal CgroupName
   249  	cgroupRoot := ParseCgroupfsToCgroupName(nodeConfig.CgroupRoot)
   250  	cgroupManager := NewCgroupManager(subsystems, nodeConfig.CgroupDriver)
   251  	// Check if Cgroup-root actually exists on the node
   252  	if nodeConfig.CgroupsPerQOS {
   253  		// this does default to / when enabled, but this tests against regressions.
   254  		if nodeConfig.CgroupRoot == "" {
   255  			return nil, fmt.Errorf("invalid configuration: cgroups-per-qos was specified and cgroup-root was not specified. To enable the QoS cgroup hierarchy you need to specify a valid cgroup-root")
   256  		}
   258  		// we need to check that the cgroup root actually exists for each subsystem
   259  		// of note, we always use the cgroupfs driver when performing this check since
   260  		// the input is provided in that format.
   261  		// this is important because we do not want any name conversion to occur.
   262  		if err := cgroupManager.Validate(cgroupRoot); err != nil {
   263  			return nil, fmt.Errorf("invalid configuration: %w", err)
   264  		}
   265  		klog.InfoS("Container manager verified user specified cgroup-root exists", "cgroupRoot", cgroupRoot)
   266  		// Include the top level cgroup for enforcing node allocatable into cgroup-root.
   267  		// This way, all sub modules can avoid having to understand the concept of node allocatable.
   268  		cgroupRoot = NewCgroupName(cgroupRoot, defaultNodeAllocatableCgroupName)
   269  	}
   270  	klog.InfoS("Creating Container Manager object based on Node Config", "nodeConfig", nodeConfig)
   272  	qosContainerManager, err := NewQOSContainerManager(subsystems, cgroupRoot, nodeConfig, cgroupManager)
   273  	if err != nil {
   274  		return nil, err
   275  	}
   277  	cm := &containerManagerImpl{
   278  		cadvisorInterface:   cadvisorInterface,
   279  		mountUtil:           mountUtil,
   280  		NodeConfig:          nodeConfig,
   281  		subsystems:          subsystems,
   282  		cgroupManager:       cgroupManager,
   283  		capacity:            capacity,
   284  		internalCapacity:    internalCapacity,
   285  		cgroupRoot:          cgroupRoot,
   286  		recorder:            recorder,
   287  		qosContainerManager: qosContainerManager,
   288  	}
   290  	cm.topologyManager, err = topologymanager.NewManager(
   291  		machineInfo.Topology,
   292  		nodeConfig.TopologyManagerPolicy,
   293  		nodeConfig.TopologyManagerScope,
   294  		nodeConfig.TopologyManagerPolicyOptions,
   295  	)
   297  	if err != nil {
   298  		return nil, err
   299  	}
   301  	klog.InfoS("Creating device plugin manager")
   302  	cm.deviceManager, err = devicemanager.NewManagerImpl(machineInfo.Topology, cm.topologyManager)
   303  	if err != nil {
   304  		return nil, err
   305  	}
   306  	cm.topologyManager.AddHintProvider(cm.deviceManager)
   308  	// initialize DRA manager
   309  	if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.DynamicResourceAllocation) {
   310  		klog.InfoS("Creating Dynamic Resource Allocation (DRA) manager")
   311  		cm.draManager, err = dra.NewManagerImpl(kubeClient, nodeConfig.KubeletRootDir, nodeConfig.NodeName)
   312  		if err != nil {
   313  			return nil, err
   314  		}
   315  	}
   317  	// Initialize CPU manager
   318  	cm.cpuManager, err = cpumanager.NewManager(
   319  		nodeConfig.CPUManagerPolicy,
   320  		nodeConfig.CPUManagerPolicyOptions,
   321  		nodeConfig.CPUManagerReconcilePeriod,
   322  		machineInfo,
   323  		nodeConfig.NodeAllocatableConfig.ReservedSystemCPUs,
   324  		cm.GetNodeAllocatableReservation(),
   325  		nodeConfig.KubeletRootDir,
   326  		cm.topologyManager,
   327  	)
   328  	if err != nil {
   329  		klog.ErrorS(err, "Failed to initialize cpu manager")
   330  		return nil, err
   331  	}
   332  	cm.topologyManager.AddHintProvider(cm.cpuManager)
   334  	if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.MemoryManager) {
   335  		cm.memoryManager, err = memorymanager.NewManager(
   336  			nodeConfig.ExperimentalMemoryManagerPolicy,
   337  			machineInfo,
   338  			cm.GetNodeAllocatableReservation(),
   339  			nodeConfig.ExperimentalMemoryManagerReservedMemory,
   340  			nodeConfig.KubeletRootDir,
   341  			cm.topologyManager,
   342  		)
   343  		if err != nil {
   344  			klog.ErrorS(err, "Failed to initialize memory manager")
   345  			return nil, err
   346  		}
   347  		cm.topologyManager.AddHintProvider(cm.memoryManager)
   348  	}
   350  	return cm, nil
   351  }
   353  // NewPodContainerManager is a factory method returns a PodContainerManager object
   354  // If qosCgroups are enabled then it returns the general pod container manager
   355  // otherwise it returns a no-op manager which essentially does nothing
   356  func (cm *containerManagerImpl) NewPodContainerManager() PodContainerManager {
   357  	if cm.NodeConfig.CgroupsPerQOS {
   358  		return &podContainerManagerImpl{
   359  			qosContainersInfo: cm.GetQOSContainersInfo(),
   360  			subsystems:        cm.subsystems,
   361  			cgroupManager:     cm.cgroupManager,
   362  			podPidsLimit:      cm.PodPidsLimit,
   363  			enforceCPULimits:  cm.EnforceCPULimits,
   364  			// cpuCFSQuotaPeriod is in microseconds. NodeConfig.CPUCFSQuotaPeriod is time.Duration (measured in nano seconds).
   365  			// Convert (cm.CPUCFSQuotaPeriod) [nanoseconds] / time.Microsecond (1000) to get cpuCFSQuotaPeriod in microseconds.
   366  			cpuCFSQuotaPeriod: uint64(cm.CPUCFSQuotaPeriod / time.Microsecond),
   367  		}
   368  	}
   369  	return &podContainerManagerNoop{
   370  		cgroupRoot: cm.cgroupRoot,
   371  	}
   372  }
   374  func (cm *containerManagerImpl) InternalContainerLifecycle() InternalContainerLifecycle {
   375  	return &internalContainerLifecycleImpl{cm.cpuManager, cm.memoryManager, cm.topologyManager}
   376  }
   378  // Create a cgroup container manager.
   379  func createManager(containerName string) (cgroups.Manager, error) {
   380  	cg := &configs.Cgroup{
   381  		Parent: "/",
   382  		Name:   containerName,
   383  		Resources: &configs.Resources{
   384  			SkipDevices: true,
   385  		},
   386  		Systemd: false,
   387  	}
   389  	return manager.New(cg)
   390  }
   392  type KernelTunableBehavior string
   394  const (
   395  	KernelTunableWarn   KernelTunableBehavior = "warn"
   396  	KernelTunableError  KernelTunableBehavior = "error"
   397  	KernelTunableModify KernelTunableBehavior = "modify"
   398  )
   400  // setupKernelTunables validates kernel tunable flags are set as expected
   401  // depending upon the specified option, it will either warn, error, or modify the kernel tunable flags
   402  func setupKernelTunables(option KernelTunableBehavior) error {
   403  	desiredState := map[string]int{
   404  		utilsysctl.VMOvercommitMemory: utilsysctl.VMOvercommitMemoryAlways,
   405  		utilsysctl.VMPanicOnOOM:       utilsysctl.VMPanicOnOOMInvokeOOMKiller,
   406  		utilsysctl.KernelPanic:        utilsysctl.KernelPanicRebootTimeout,
   407  		utilsysctl.KernelPanicOnOops:  utilsysctl.KernelPanicOnOopsAlways,
   408  		utilsysctl.RootMaxKeys:        utilsysctl.RootMaxKeysSetting,
   409  		utilsysctl.RootMaxBytes:       utilsysctl.RootMaxBytesSetting,
   410  	}
   412  	sysctl := utilsysctl.New()
   414  	errList := []error{}
   415  	for flag, expectedValue := range desiredState {
   416  		val, err := sysctl.GetSysctl(flag)
   417  		if err != nil {
   418  			errList = append(errList, err)
   419  			continue
   420  		}
   421  		if val == expectedValue {
   422  			continue
   423  		}
   425  		switch option {
   426  		case KernelTunableError:
   427  			errList = append(errList, fmt.Errorf("invalid kernel flag: %v, expected value: %v, actual value: %v", flag, expectedValue, val))
   428  		case KernelTunableWarn:
   429  			klog.V(2).InfoS("Invalid kernel flag", "flag", flag, "expectedValue", expectedValue, "actualValue", val)
   430  		case KernelTunableModify:
   431  			klog.V(2).InfoS("Updating kernel flag", "flag", flag, "expectedValue", expectedValue, "actualValue", val)
   432  			err = sysctl.SetSysctl(flag, expectedValue)
   433  			if err != nil {
   434  				if inuserns.RunningInUserNS() {
   435  					if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.KubeletInUserNamespace) {
   436  						klog.V(2).InfoS("Updating kernel flag failed (running in UserNS, ignoring)", "flag", flag, "err", err)
   437  						continue
   438  					}
   439  					klog.ErrorS(err, "Updating kernel flag failed (Hint: enable KubeletInUserNamespace feature flag to ignore the error)", "flag", flag)
   440  				}
   441  				errList = append(errList, err)
   442  			}
   443  		}
   444  	}
   445  	return utilerrors.NewAggregate(errList)
   446  }
   448  func (cm *containerManagerImpl) setupNode(activePods ActivePodsFunc) error {
   449  	f, err := validateSystemRequirements(cm.mountUtil)
   450  	if err != nil {
   451  		return err
   452  	}
   453  	if !f.cpuHardcapping {
   454  		cm.status.SoftRequirements = fmt.Errorf("CPU hardcapping unsupported")
   455  	}
   456  	b := KernelTunableModify
   457  	if cm.GetNodeConfig().ProtectKernelDefaults {
   458  		b = KernelTunableError
   459  	}
   460  	if err := setupKernelTunables(b); err != nil {
   461  		return err
   462  	}
   464  	// Setup top level qos containers only if CgroupsPerQOS flag is specified as true
   465  	if cm.NodeConfig.CgroupsPerQOS {
   466  		if err := cm.createNodeAllocatableCgroups(); err != nil {
   467  			return err
   468  		}
   469  		err = cm.qosContainerManager.Start(cm.GetNodeAllocatableAbsolute, activePods)
   470  		if err != nil {
   471  			return fmt.Errorf("failed to initialize top level QOS containers: %v", err)
   472  		}
   473  	}
   475  	// Enforce Node Allocatable (if required)
   476  	if err := cm.enforceNodeAllocatableCgroups(); err != nil {
   477  		return err
   478  	}
   480  	systemContainers := []*systemContainer{}
   482  	if cm.SystemCgroupsName != "" {
   483  		if cm.SystemCgroupsName == "/" {
   484  			return fmt.Errorf("system container cannot be root (\"/\")")
   485  		}
   486  		cont, err := newSystemCgroups(cm.SystemCgroupsName)
   487  		if err != nil {
   488  			return err
   489  		}
   490  		cont.ensureStateFunc = func(manager cgroups.Manager) error {
   491  			return ensureSystemCgroups("/", manager)
   492  		}
   493  		systemContainers = append(systemContainers, cont)
   494  	}
   496  	if cm.KubeletCgroupsName != "" {
   497  		cont, err := newSystemCgroups(cm.KubeletCgroupsName)
   498  		if err != nil {
   499  			return err
   500  		}
   502  		cont.ensureStateFunc = func(_ cgroups.Manager) error {
   503  			return ensureProcessInContainerWithOOMScore(os.Getpid(), int(cm.KubeletOOMScoreAdj), cont.manager)
   504  		}
   505  		systemContainers = append(systemContainers, cont)
   506  	} else {
   507  		cm.periodicTasks = append(cm.periodicTasks, func() {
   508  			if err := ensureProcessInContainerWithOOMScore(os.Getpid(), int(cm.KubeletOOMScoreAdj), nil); err != nil {
   509  				klog.ErrorS(err, "Failed to ensure process in container with oom score")
   510  				return
   511  			}
   512  			cont, err := getContainer(os.Getpid())
   513  			if err != nil {
   514  				klog.ErrorS(err, "Failed to find cgroups of kubelet")
   515  				return
   516  			}
   517  			cm.Lock()
   518  			defer cm.Unlock()
   520  			cm.KubeletCgroupsName = cont
   521  		})
   522  	}
   524  	cm.systemContainers = systemContainers
   525  	return nil
   526  }
   528  func (cm *containerManagerImpl) GetNodeConfig() NodeConfig {
   529  	cm.RLock()
   530  	defer cm.RUnlock()
   531  	return cm.NodeConfig
   532  }
   534  // GetPodCgroupRoot returns the literal cgroupfs value for the cgroup containing all pods.
   535  func (cm *containerManagerImpl) GetPodCgroupRoot() string {
   536  	return cm.cgroupManager.Name(cm.cgroupRoot)
   537  }
   539  func (cm *containerManagerImpl) GetMountedSubsystems() *CgroupSubsystems {
   540  	return cm.subsystems
   541  }
   543  func (cm *containerManagerImpl) GetQOSContainersInfo() QOSContainersInfo {
   544  	return cm.qosContainerManager.GetQOSContainersInfo()
   545  }
   547  func (cm *containerManagerImpl) UpdateQOSCgroups() error {
   548  	return cm.qosContainerManager.UpdateCgroups()
   549  }
   551  func (cm *containerManagerImpl) Status() Status {
   552  	cm.RLock()
   553  	defer cm.RUnlock()
   554  	return cm.status
   555  }
   557  func (cm *containerManagerImpl) Start(node *v1.Node,
   558  	activePods ActivePodsFunc,
   559  	sourcesReady config.SourcesReady,
   560  	podStatusProvider status.PodStatusProvider,
   561  	runtimeService internalapi.RuntimeService,
   562  	localStorageCapacityIsolation bool) error {
   563  	ctx := context.Background()
   565  	containerMap, containerRunningSet := buildContainerMapAndRunningSetFromRuntime(ctx, runtimeService)
   567  	// Initialize CPU manager
   568  	err := cm.cpuManager.Start(cpumanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService, containerMap)
   569  	if err != nil {
   570  		return fmt.Errorf("start cpu manager error: %v", err)
   571  	}
   573  	// Initialize memory manager
   574  	if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.MemoryManager) {
   575  		containerMap, _ := buildContainerMapAndRunningSetFromRuntime(ctx, runtimeService)
   576  		err := cm.memoryManager.Start(memorymanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService, containerMap)
   577  		if err != nil {
   578  			return fmt.Errorf("start memory manager error: %v", err)
   579  		}
   580  	}
   582  	// cache the node Info including resource capacity and
   583  	// allocatable of the node
   584  	cm.nodeInfo = node
   586  	if localStorageCapacityIsolation {
   587  		rootfs, err := cm.cadvisorInterface.RootFsInfo()
   588  		if err != nil {
   589  			return fmt.Errorf("failed to get rootfs info: %v", err)
   590  		}
   591  		for rName, rCap := range cadvisor.EphemeralStorageCapacityFromFsInfo(rootfs) {
   592  			cm.capacity[rName] = rCap
   593  		}
   594  	}
   596  	// Ensure that node allocatable configuration is valid.
   597  	if err := cm.validateNodeAllocatable(); err != nil {
   598  		return err
   599  	}
   601  	// Setup the node
   602  	if err := cm.setupNode(activePods); err != nil {
   603  		return err
   604  	}
   606  	// Don't run a background thread if there are no ensureStateFuncs.
   607  	hasEnsureStateFuncs := false
   608  	for _, cont := range cm.systemContainers {
   609  		if cont.ensureStateFunc != nil {
   610  			hasEnsureStateFuncs = true
   611  			break
   612  		}
   613  	}
   614  	if hasEnsureStateFuncs {
   615  		// Run ensure state functions every minute.
   616  		go wait.Until(func() {
   617  			for _, cont := range cm.systemContainers {
   618  				if cont.ensureStateFunc != nil {
   619  					if err := cont.ensureStateFunc(cont.manager); err != nil {
   620  						klog.InfoS("Failed to ensure state", "containerName", cont.name, "err", err)
   621  					}
   622  				}
   623  			}
   624  		}, time.Minute, wait.NeverStop)
   626  	}
   628  	if len(cm.periodicTasks) > 0 {
   629  		go wait.Until(func() {
   630  			for _, task := range cm.periodicTasks {
   631  				if task != nil {
   632  					task()
   633  				}
   634  			}
   635  		}, 5*time.Minute, wait.NeverStop)
   636  	}
   638  	// Starts device manager.
   639  	if err := cm.deviceManager.Start(devicemanager.ActivePodsFunc(activePods), sourcesReady, containerMap, containerRunningSet); err != nil {
   640  		return err
   641  	}
   643  	return nil
   644  }
   646  func (cm *containerManagerImpl) GetPluginRegistrationHandler() cache.PluginHandler {
   647  	return cm.deviceManager.GetWatcherHandler()
   648  }
   650  // TODO: move the GetResources logic to PodContainerManager.
   651  func (cm *containerManagerImpl) GetResources(pod *v1.Pod, container *v1.Container) (*kubecontainer.RunContainerOptions, error) {
   652  	opts := &kubecontainer.RunContainerOptions{}
   653  	if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.DynamicResourceAllocation) {
   654  		resOpts, err := cm.draManager.GetResources(pod, container)
   655  		if err != nil {
   656  			return nil, err
   657  		}
   658  		// NOTE: Passing CDI device names as annotations is a temporary solution
   659  		// It will be removed after all runtimes are updated
   660  		// to get CDI device names from the ContainerConfig.CDIDevices field
   661  		opts.Annotations = append(opts.Annotations, resOpts.Annotations...)
   662  		opts.CDIDevices = append(opts.CDIDevices, resOpts.CDIDevices...)
   663  	}
   664  	// Allocate should already be called during predicateAdmitHandler.Admit(),
   665  	// just try to fetch device runtime information from cached state here
   666  	devOpts, err := cm.deviceManager.GetDeviceRunContainerOptions(pod, container)
   667  	if err != nil {
   668  		return nil, err
   669  	} else if devOpts == nil {
   670  		return opts, nil
   671  	}
   672  	opts.Devices = append(opts.Devices, devOpts.Devices...)
   673  	opts.Mounts = append(opts.Mounts, devOpts.Mounts...)
   674  	opts.Envs = append(opts.Envs, devOpts.Envs...)
   675  	opts.Annotations = append(opts.Annotations, devOpts.Annotations...)
   676  	opts.CDIDevices = append(opts.CDIDevices, devOpts.CDIDevices...)
   677  	return opts, nil
   678  }
   680  func (cm *containerManagerImpl) UpdatePluginResources(node *schedulerframework.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
   681  	return cm.deviceManager.UpdatePluginResources(node, attrs)
   682  }
   684  func (cm *containerManagerImpl) GetAllocateResourcesPodAdmitHandler() lifecycle.PodAdmitHandler {
   685  	return cm.topologyManager
   686  }
   688  func (cm *containerManagerImpl) SystemCgroupsLimit() v1.ResourceList {
   689  	cpuLimit := int64(0)
   691  	// Sum up resources of all external containers.
   692  	for _, cont := range cm.systemContainers {
   693  		cpuLimit += cont.cpuMillicores
   694  	}
   696  	return v1.ResourceList{
   697  		v1.ResourceCPU: *resource.NewMilliQuantity(
   698  			cpuLimit,
   699  			resource.DecimalSI),
   700  	}
   701  }
   703  func isProcessRunningInHost(pid int) (bool, error) {
   704  	// Get init pid namespace.
   705  	initPidNs, err := os.Readlink("/proc/1/ns/pid")
   706  	if err != nil {
   707  		return false, fmt.Errorf("failed to find pid namespace of init process")
   708  	}
   709  	klog.V(10).InfoS("Found init PID namespace", "namespace", initPidNs)
   710  	processPidNs, err := os.Readlink(fmt.Sprintf("/proc/%d/ns/pid", pid))
   711  	if err != nil {
   712  		return false, fmt.Errorf("failed to find pid namespace of process %q", pid)
   713  	}
   714  	klog.V(10).InfoS("Process info", "pid", pid, "namespace", processPidNs)
   715  	return initPidNs == processPidNs, nil
   716  }
   718  func ensureProcessInContainerWithOOMScore(pid int, oomScoreAdj int, manager cgroups.Manager) error {
   719  	if runningInHost, err := isProcessRunningInHost(pid); err != nil {
   720  		// Err on the side of caution. Avoid moving the docker daemon unless we are able to identify its context.
   721  		return err
   722  	} else if !runningInHost {
   723  		// Process is running inside a container. Don't touch that.
   724  		klog.V(2).InfoS("PID is not running in the host namespace", "pid", pid)
   725  		return nil
   726  	}
   728  	var errs []error
   729  	if manager != nil {
   730  		cont, err := getContainer(pid)
   731  		if err != nil {
   732  			errs = append(errs, fmt.Errorf("failed to find container of PID %d: %v", pid, err))
   733  		}
   735  		name := ""
   736  		cgroups, err := manager.GetCgroups()
   737  		if err != nil {
   738  			errs = append(errs, fmt.Errorf("failed to get cgroups for %d: %v", pid, err))
   739  		} else {
   740  			name = cgroups.Name
   741  		}
   743  		if cont != name {
   744  			err = manager.Apply(pid)
   745  			if err != nil {
   746  				errs = append(errs, fmt.Errorf("failed to move PID %d (in %q) to %q: %v", pid, cont, name, err))
   747  			}
   748  		}
   749  	}
   751  	// Also apply oom-score-adj to processes
   752  	oomAdjuster := oom.NewOOMAdjuster()
   753  	klog.V(5).InfoS("Attempting to apply oom_score_adj to process", "oomScoreAdj", oomScoreAdj, "pid", pid)
   754  	if err := oomAdjuster.ApplyOOMScoreAdj(pid, oomScoreAdj); err != nil {
   755  		klog.V(3).InfoS("Failed to apply oom_score_adj to process", "oomScoreAdj", oomScoreAdj, "pid", pid, "err", err)
   756  		errs = append(errs, fmt.Errorf("failed to apply oom score %d to PID %d: %v", oomScoreAdj, pid, err))
   757  	}
   758  	return utilerrors.NewAggregate(errs)
   759  }
   761  // getContainer returns the cgroup associated with the specified pid.
   762  // It enforces a unified hierarchy for memory and cpu cgroups.
   763  // On systemd environments, it uses the name=systemd cgroup for the specified pid.
   764  func getContainer(pid int) (string, error) {
   765  	cgs, err := cgroups.ParseCgroupFile(fmt.Sprintf("/proc/%d/cgroup", pid))
   766  	if err != nil {
   767  		return "", err
   768  	}
   770  	if cgroups.IsCgroup2UnifiedMode() {
   771  		c, found := cgs[""]
   772  		if !found {
   773  			return "", cgroups.NewNotFoundError("unified")
   774  		}
   775  		return c, nil
   776  	}
   778  	cpu, found := cgs["cpu"]
   779  	if !found {
   780  		return "", cgroups.NewNotFoundError("cpu")
   781  	}
   782  	memory, found := cgs["memory"]
   783  	if !found {
   784  		return "", cgroups.NewNotFoundError("memory")
   785  	}
   787  	// since we use this container for accounting, we need to ensure its a unified hierarchy.
   788  	if cpu != memory {
   789  		return "", fmt.Errorf("cpu and memory cgroup hierarchy not unified.  cpu: %s, memory: %s", cpu, memory)
   790  	}
   792  	// on systemd, every pid is in a unified cgroup hierarchy (name=systemd as seen in systemd-cgls)
   793  	// cpu and memory accounting is off by default, users may choose to enable it per unit or globally.
   794  	// users could enable CPU and memory accounting globally via /etc/systemd/system.conf (DefaultCPUAccounting=true DefaultMemoryAccounting=true).
   795  	// users could also enable CPU and memory accounting per unit via CPUAccounting=true and MemoryAccounting=true
   796  	// we only warn if accounting is not enabled for CPU or memory so as to not break local development flows where kubelet is launched in a terminal.
   797  	// for example, the cgroup for the user session will be something like /user.slice/user-X.slice/session-X.scope, but the cpu and memory
   798  	// cgroup will be the closest ancestor where accounting is performed (most likely /) on systems that launch docker containers.
   799  	// as a result, on those systems, you will not get cpu or memory accounting statistics for kubelet.
   800  	// in addition, you would not get memory or cpu accounting for the runtime unless accounting was enabled on its unit (or globally).
   801  	if systemd, found := cgs["name=systemd"]; found {
   802  		if systemd != cpu {
   803  			klog.InfoS("CPUAccounting not enabled for process", "pid", pid)
   804  		}
   805  		if systemd != memory {
   806  			klog.InfoS("MemoryAccounting not enabled for process", "pid", pid)
   807  		}
   808  		return systemd, nil
   809  	}
   811  	return cpu, nil
   812  }
   814  // Ensures the system container is created and all non-kernel threads and process 1
   815  // without a container are moved to it.
   816  //
   817  // The reason of leaving kernel threads at root cgroup is that we don't want to tie the
   818  // execution of these threads with to-be defined /system quota and create priority inversions.
   819  func ensureSystemCgroups(rootCgroupPath string, manager cgroups.Manager) error {
   820  	// Move non-kernel PIDs to the system container.
   821  	// Only keep errors on latest attempt.
   822  	var finalErr error
   823  	for i := 0; i <= 10; i++ {
   824  		allPids, err := cmutil.GetPids(rootCgroupPath)
   825  		if err != nil {
   826  			finalErr = fmt.Errorf("failed to list PIDs for root: %v", err)
   827  			continue
   828  		}
   830  		// Remove kernel pids and other protected PIDs (pid 1, PIDs already in system & kubelet containers)
   831  		pids := make([]int, 0, len(allPids))
   832  		for _, pid := range allPids {
   833  			if pid == 1 || isKernelPid(pid) {
   834  				continue
   835  			}
   837  			pids = append(pids, pid)
   838  		}
   840  		// Check if we have moved all the non-kernel PIDs.
   841  		if len(pids) == 0 {
   842  			return nil
   843  		}
   845  		klog.V(3).InfoS("Moving non-kernel processes", "pids", pids)
   846  		for _, pid := range pids {
   847  			err := manager.Apply(pid)
   848  			if err != nil {
   849  				name := ""
   850  				cgroups, err := manager.GetCgroups()
   851  				if err == nil {
   852  					name = cgroups.Name
   853  				}
   855  				finalErr = fmt.Errorf("failed to move PID %d into the system container %q: %v", pid, name, err)
   856  			}
   857  		}
   859  	}
   861  	return finalErr
   862  }
   864  // Determines whether the specified PID is a kernel PID.
   865  func isKernelPid(pid int) bool {
   866  	// Kernel threads have no associated executable.
   867  	_, err := os.Readlink(fmt.Sprintf("/proc/%d/exe", pid))
   868  	return err != nil && os.IsNotExist(err)
   869  }
   871  // GetCapacity returns node capacity data for "cpu", "memory", "ephemeral-storage", and "huge-pages*"
   872  // At present this method is only invoked when introspecting ephemeral storage
   873  func (cm *containerManagerImpl) GetCapacity(localStorageCapacityIsolation bool) v1.ResourceList {
   874  	if localStorageCapacityIsolation {
   875  		// We store allocatable ephemeral-storage in the capacity property once we Start() the container manager
   876  		if _, ok := cm.capacity[v1.ResourceEphemeralStorage]; !ok {
   877  			// If we haven't yet stored the capacity for ephemeral-storage, we can try to fetch it directly from cAdvisor,
   878  			if cm.cadvisorInterface != nil {
   879  				rootfs, err := cm.cadvisorInterface.RootFsInfo()
   880  				if err != nil {
   881  					klog.ErrorS(err, "Unable to get rootfs data from cAdvisor interface")
   882  					// If the rootfsinfo retrieval from cAdvisor fails for any reason, fallback to returning the capacity property with no ephemeral storage data
   883  					return cm.capacity
   884  				}
   885  				// We don't want to mutate cm.capacity here so we'll manually construct a v1.ResourceList from it,
   886  				// and add ephemeral-storage
   887  				capacityWithEphemeralStorage := v1.ResourceList{}
   888  				for rName, rQuant := range cm.capacity {
   889  					capacityWithEphemeralStorage[rName] = rQuant
   890  				}
   891  				capacityWithEphemeralStorage[v1.ResourceEphemeralStorage] = cadvisor.EphemeralStorageCapacityFromFsInfo(rootfs)[v1.ResourceEphemeralStorage]
   892  				return capacityWithEphemeralStorage
   893  			}
   894  		}
   895  	}
   896  	return cm.capacity
   897  }
   899  func (cm *containerManagerImpl) GetDevicePluginResourceCapacity() (v1.ResourceList, v1.ResourceList, []string) {
   900  	return cm.deviceManager.GetCapacity()
   901  }
   903  func (cm *containerManagerImpl) GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices {
   904  	return containerDevicesFromResourceDeviceInstances(cm.deviceManager.GetDevices(podUID, containerName))
   905  }
   907  func (cm *containerManagerImpl) GetAllocatableDevices() []*podresourcesapi.ContainerDevices {
   908  	return containerDevicesFromResourceDeviceInstances(cm.deviceManager.GetAllocatableDevices())
   909  }
   911  func int64Slice(in []int) []int64 {
   912  	out := make([]int64, len(in))
   913  	for i := range in {
   914  		out[i] = int64(in[i])
   915  	}
   916  	return out
   917  }
   919  func (cm *containerManagerImpl) GetCPUs(podUID, containerName string) []int64 {
   920  	if cm.cpuManager != nil {
   921  		return int64Slice(cm.cpuManager.GetExclusiveCPUs(podUID, containerName).UnsortedList())
   922  	}
   923  	return []int64{}
   924  }
   926  func (cm *containerManagerImpl) GetAllocatableCPUs() []int64 {
   927  	if cm.cpuManager != nil {
   928  		return int64Slice(cm.cpuManager.GetAllocatableCPUs().UnsortedList())
   929  	}
   930  	return []int64{}
   931  }
   933  func (cm *containerManagerImpl) GetMemory(podUID, containerName string) []*podresourcesapi.ContainerMemory {
   934  	if cm.memoryManager == nil {
   935  		return []*podresourcesapi.ContainerMemory{}
   936  	}
   938  	return containerMemoryFromBlock(cm.memoryManager.GetMemory(podUID, containerName))
   939  }
   941  func (cm *containerManagerImpl) GetAllocatableMemory() []*podresourcesapi.ContainerMemory {
   942  	if cm.memoryManager == nil {
   943  		return []*podresourcesapi.ContainerMemory{}
   944  	}
   946  	return containerMemoryFromBlock(cm.memoryManager.GetAllocatableMemory())
   947  }
   949  func (cm *containerManagerImpl) GetDynamicResources(pod *v1.Pod, container *v1.Container) []*podresourcesapi.DynamicResource {
   950  	if !utilfeature.DefaultFeatureGate.Enabled(kubefeatures.DynamicResourceAllocation) {
   951  		return []*podresourcesapi.DynamicResource{}
   952  	}
   954  	var containerDynamicResources []*podresourcesapi.DynamicResource
   955  	containerClaimInfos, err := cm.draManager.GetContainerClaimInfos(pod, container)
   956  	if err != nil {
   957  		klog.ErrorS(err, "Unable to get container claim info state")
   958  		return []*podresourcesapi.DynamicResource{}
   959  	}
   960  	for _, containerClaimInfo := range containerClaimInfos {
   961  		var claimResources []*podresourcesapi.ClaimResource
   962  		containerClaimInfo.RLock()
   963  		// TODO: Currently  we maintain a list of ClaimResources, each of which contains
   964  		// a set of CDIDevices from a different kubelet plugin. In the future we may want to
   965  		// include the name of the kubelet plugin and/or other types of resources that are
   966  		// not CDIDevices (assuming the DRAmanager supports this).
   967  		for _, klPluginCdiDevices := range containerClaimInfo.CDIDevices {
   968  			var cdiDevices []*podresourcesapi.CDIDevice
   969  			for _, cdiDevice := range klPluginCdiDevices {
   970  				cdiDevices = append(cdiDevices, &podresourcesapi.CDIDevice{Name: cdiDevice})
   971  			}
   972  			claimResources = append(claimResources, &podresourcesapi.ClaimResource{CDIDevices: cdiDevices})
   973  		}
   974  		containerClaimInfo.RUnlock()
   975  		containerDynamicResource := podresourcesapi.DynamicResource{
   976  			ClassName:      containerClaimInfo.ClassName,
   977  			ClaimName:      containerClaimInfo.ClaimName,
   978  			ClaimNamespace: containerClaimInfo.Namespace,
   979  			ClaimResources: claimResources,
   980  		}
   981  		containerDynamicResources = append(containerDynamicResources, &containerDynamicResource)
   982  	}
   983  	return containerDynamicResources
   984  }
   986  func (cm *containerManagerImpl) ShouldResetExtendedResourceCapacity() bool {
   987  	return cm.deviceManager.ShouldResetExtendedResourceCapacity()
   988  }
   990  func (cm *containerManagerImpl) UpdateAllocatedDevices() {
   991  	cm.deviceManager.UpdateAllocatedDevices()
   992  }
   994  func containerMemoryFromBlock(blocks []memorymanagerstate.Block) []*podresourcesapi.ContainerMemory {
   995  	var containerMemories []*podresourcesapi.ContainerMemory
   997  	for _, b := range blocks {
   998  		containerMemory := podresourcesapi.ContainerMemory{
   999  			MemoryType: string(b.Type),
  1000  			Size_:      b.Size,
  1001  			Topology: &podresourcesapi.TopologyInfo{
  1002  				Nodes: []*podresourcesapi.NUMANode{},
  1003  			},
  1004  		}
  1006  		for _, numaNodeID := range b.NUMAAffinity {
  1007  			containerMemory.Topology.Nodes = append(containerMemory.Topology.Nodes, &podresourcesapi.NUMANode{ID: int64(numaNodeID)})
  1008  		}
  1010  		containerMemories = append(containerMemories, &containerMemory)
  1011  	}
  1013  	return containerMemories
  1014  }
  1016  func (cm *containerManagerImpl) PrepareDynamicResources(pod *v1.Pod) error {
  1017  	return cm.draManager.PrepareResources(pod)
  1018  }
  1020  func (cm *containerManagerImpl) UnprepareDynamicResources(pod *v1.Pod) error {
  1021  	return cm.draManager.UnprepareResources(pod)
  1022  }
  1024  func (cm *containerManagerImpl) PodMightNeedToUnprepareResources(UID types.UID) bool {
  1025  	return cm.draManager.PodMightNeedToUnprepareResources(UID)
  1026  }

