...

Source file src/k8s.io/kubernetes/pkg/kubelet/cm/container_manager_linux.go

Documentation: k8s.io/kubernetes/pkg/kubelet/cm

     1  //go:build linux
     2  // +build linux
     3  
     4  /*
     5  Copyright 2015 The Kubernetes Authors.
     6  
     7  Licensed under the Apache License, Version 2.0 (the "License");
     8  you may not use this file except in compliance with the License.
     9  You may obtain a copy of the License at
    10  
    11      http://www.apache.org/licenses/LICENSE-2.0
    12  
    13  Unless required by applicable law or agreed to in writing, software
    14  distributed under the License is distributed on an "AS IS" BASIS,
    15  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    16  See the License for the specific language governing permissions and
    17  limitations under the License.
    18  */
    19  
    20  package cm
    21  
    22  import (
    23  	"bytes"
    24  	"context"
    25  	"fmt"
    26  	"os"
    27  	"path"
    28  	"strings"
    29  	"sync"
    30  	"time"
    31  
    32  	"github.com/opencontainers/runc/libcontainer/cgroups"
    33  	"github.com/opencontainers/runc/libcontainer/cgroups/manager"
    34  	"github.com/opencontainers/runc/libcontainer/configs"
    35  	"k8s.io/klog/v2"
    36  	"k8s.io/mount-utils"
    37  	utilpath "k8s.io/utils/path"
    38  
    39  	v1 "k8s.io/api/core/v1"
    40  	"k8s.io/apimachinery/pkg/api/resource"
    41  	"k8s.io/apimachinery/pkg/types"
    42  	utilerrors "k8s.io/apimachinery/pkg/util/errors"
    43  	"k8s.io/apimachinery/pkg/util/sets"
    44  	"k8s.io/apimachinery/pkg/util/wait"
    45  	utilfeature "k8s.io/apiserver/pkg/util/feature"
    46  	clientset "k8s.io/client-go/kubernetes"
    47  	"k8s.io/client-go/tools/record"
    48  	utilsysctl "k8s.io/component-helpers/node/util/sysctl"
    49  	internalapi "k8s.io/cri-api/pkg/apis"
    50  	podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
    51  	kubefeatures "k8s.io/kubernetes/pkg/features"
    52  	"k8s.io/kubernetes/pkg/kubelet/cadvisor"
    53  	"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
    54  	"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager"
    55  	"k8s.io/kubernetes/pkg/kubelet/cm/dra"
    56  	"k8s.io/kubernetes/pkg/kubelet/cm/memorymanager"
    57  	memorymanagerstate "k8s.io/kubernetes/pkg/kubelet/cm/memorymanager/state"
    58  	"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
    59  	cmutil "k8s.io/kubernetes/pkg/kubelet/cm/util"
    60  	"k8s.io/kubernetes/pkg/kubelet/config"
    61  	kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
    62  	"k8s.io/kubernetes/pkg/kubelet/lifecycle"
    63  	"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
    64  	"k8s.io/kubernetes/pkg/kubelet/stats/pidlimit"
    65  	"k8s.io/kubernetes/pkg/kubelet/status"
    66  	"k8s.io/kubernetes/pkg/kubelet/userns/inuserns"
    67  	schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
    68  	"k8s.io/kubernetes/pkg/util/oom"
    69  )
    70  
    71  // A non-user container tracked by the Kubelet.
    72  type systemContainer struct {
    73  	// Absolute name of the container.
    74  	name string
    75  
    76  	// CPU limit in millicores.
    77  	cpuMillicores int64
    78  
    79  	// Function that ensures the state of the container.
    80  	// m is the cgroup manager for the specified container.
    81  	ensureStateFunc func(m cgroups.Manager) error
    82  
    83  	// Manager for the cgroups of the external container.
    84  	manager cgroups.Manager
    85  }
    86  
    87  func newSystemCgroups(containerName string) (*systemContainer, error) {
    88  	manager, err := createManager(containerName)
    89  	if err != nil {
    90  		return nil, err
    91  	}
    92  	return &systemContainer{
    93  		name:    containerName,
    94  		manager: manager,
    95  	}, nil
    96  }
    97  
    98  type containerManagerImpl struct {
    99  	sync.RWMutex
   100  	cadvisorInterface cadvisor.Interface
   101  	mountUtil         mount.Interface
   102  	NodeConfig
   103  	status Status
   104  	// External containers being managed.
   105  	systemContainers []*systemContainer
   106  	// Tasks that are run periodically
   107  	periodicTasks []func()
   108  	// Holds all the mounted cgroup subsystems
   109  	subsystems *CgroupSubsystems
   110  	nodeInfo   *v1.Node
   111  	// Interface for cgroup management
   112  	cgroupManager CgroupManager
   113  	// Capacity of this node.
   114  	capacity v1.ResourceList
   115  	// Capacity of this node, including internal resources.
   116  	internalCapacity v1.ResourceList
   117  	// Absolute cgroupfs path to a cgroup that Kubelet needs to place all pods under.
   118  	// This path include a top level container for enforcing Node Allocatable.
   119  	cgroupRoot CgroupName
   120  	// Event recorder interface.
   121  	recorder record.EventRecorder
   122  	// Interface for QoS cgroup management
   123  	qosContainerManager QOSContainerManager
   124  	// Interface for exporting and allocating devices reported by device plugins.
   125  	deviceManager devicemanager.Manager
   126  	// Interface for CPU affinity management.
   127  	cpuManager cpumanager.Manager
   128  	// Interface for memory affinity management.
   129  	memoryManager memorymanager.Manager
   130  	// Interface for Topology resource co-ordination
   131  	topologyManager topologymanager.Manager
   132  	// Interface for Dynamic Resource Allocation management.
   133  	draManager dra.Manager
   134  }
   135  
   136  type features struct {
   137  	cpuHardcapping bool
   138  }
   139  
   140  var _ ContainerManager = &containerManagerImpl{}
   141  
   142  // checks if the required cgroups subsystems are mounted.
   143  // As of now, only 'cpu' and 'memory' are required.
   144  // cpu quota is a soft requirement.
   145  func validateSystemRequirements(mountUtil mount.Interface) (features, error) {
   146  	const (
   147  		cgroupMountType = "cgroup"
   148  		localErr        = "system validation failed"
   149  	)
   150  	var (
   151  		cpuMountPoint string
   152  		f             features
   153  	)
   154  	mountPoints, err := mountUtil.List()
   155  	if err != nil {
   156  		return f, fmt.Errorf("%s - %v", localErr, err)
   157  	}
   158  
   159  	if cgroups.IsCgroup2UnifiedMode() {
   160  		f.cpuHardcapping = true
   161  		return f, nil
   162  	}
   163  
   164  	expectedCgroups := sets.New("cpu", "cpuacct", "cpuset", "memory")
   165  	for _, mountPoint := range mountPoints {
   166  		if mountPoint.Type == cgroupMountType {
   167  			for _, opt := range mountPoint.Opts {
   168  				if expectedCgroups.Has(opt) {
   169  					expectedCgroups.Delete(opt)
   170  				}
   171  				if opt == "cpu" {
   172  					cpuMountPoint = mountPoint.Path
   173  				}
   174  			}
   175  		}
   176  	}
   177  
   178  	if expectedCgroups.Len() > 0 {
   179  		return f, fmt.Errorf("%s - Following Cgroup subsystem not mounted: %v", localErr, sets.List(expectedCgroups))
   180  	}
   181  
   182  	// Check if cpu quota is available.
   183  	// CPU cgroup is required and so it expected to be mounted at this point.
   184  	periodExists, err := utilpath.Exists(utilpath.CheckFollowSymlink, path.Join(cpuMountPoint, "cpu.cfs_period_us"))
   185  	if err != nil {
   186  		klog.ErrorS(err, "Failed to detect if CPU cgroup cpu.cfs_period_us is available")
   187  	}
   188  	quotaExists, err := utilpath.Exists(utilpath.CheckFollowSymlink, path.Join(cpuMountPoint, "cpu.cfs_quota_us"))
   189  	if err != nil {
   190  		klog.ErrorS(err, "Failed to detect if CPU cgroup cpu.cfs_quota_us is available")
   191  	}
   192  	if quotaExists && periodExists {
   193  		f.cpuHardcapping = true
   194  	}
   195  	return f, nil
   196  }
   197  
   198  // TODO(vmarmol): Add limits to the system containers.
   199  // Takes the absolute name of the specified containers.
   200  // Empty container name disables use of the specified container.
   201  func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.Interface, nodeConfig NodeConfig, failSwapOn bool, recorder record.EventRecorder, kubeClient clientset.Interface) (ContainerManager, error) {
   202  	subsystems, err := GetCgroupSubsystems()
   203  	if err != nil {
   204  		return nil, fmt.Errorf("failed to get mounted cgroup subsystems: %v", err)
   205  	}
   206  
   207  	if failSwapOn {
   208  		// Check whether swap is enabled. The Kubelet does not support running with swap enabled.
   209  		swapFile := "/proc/swaps"
   210  		swapData, err := os.ReadFile(swapFile)
   211  		if err != nil {
   212  			if os.IsNotExist(err) {
   213  				klog.InfoS("File does not exist, assuming that swap is disabled", "path", swapFile)
   214  			} else {
   215  				return nil, err
   216  			}
   217  		} else {
   218  			swapData = bytes.TrimSpace(swapData) // extra trailing \n
   219  			swapLines := strings.Split(string(swapData), "\n")
   220  
   221  			// If there is more than one line (table headers) in /proc/swaps, swap is enabled and we should
   222  			// error out unless --fail-swap-on is set to false.
   223  			if len(swapLines) > 1 {
   224  				return nil, fmt.Errorf("running with swap on is not supported, please disable swap! or set --fail-swap-on flag to false. /proc/swaps contained: %v", swapLines)
   225  			}
   226  		}
   227  	}
   228  
   229  	var internalCapacity = v1.ResourceList{}
   230  	// It is safe to invoke `MachineInfo` on cAdvisor before logically initializing cAdvisor here because
   231  	// machine info is computed and cached once as part of cAdvisor object creation.
   232  	// But `RootFsInfo` and `ImagesFsInfo` are not available at this moment so they will be called later during manager starts
   233  	machineInfo, err := cadvisorInterface.MachineInfo()
   234  	if err != nil {
   235  		return nil, err
   236  	}
   237  	capacity := cadvisor.CapacityFromMachineInfo(machineInfo)
   238  	for k, v := range capacity {
   239  		internalCapacity[k] = v
   240  	}
   241  	pidlimits, err := pidlimit.Stats()
   242  	if err == nil && pidlimits != nil && pidlimits.MaxPID != nil {
   243  		internalCapacity[pidlimit.PIDs] = *resource.NewQuantity(
   244  			int64(*pidlimits.MaxPID),
   245  			resource.DecimalSI)
   246  	}
   247  
   248  	// Turn CgroupRoot from a string (in cgroupfs path format) to internal CgroupName
   249  	cgroupRoot := ParseCgroupfsToCgroupName(nodeConfig.CgroupRoot)
   250  	cgroupManager := NewCgroupManager(subsystems, nodeConfig.CgroupDriver)
   251  	// Check if Cgroup-root actually exists on the node
   252  	if nodeConfig.CgroupsPerQOS {
   253  		// this does default to / when enabled, but this tests against regressions.
   254  		if nodeConfig.CgroupRoot == "" {
   255  			return nil, fmt.Errorf("invalid configuration: cgroups-per-qos was specified and cgroup-root was not specified. To enable the QoS cgroup hierarchy you need to specify a valid cgroup-root")
   256  		}
   257  
   258  		// we need to check that the cgroup root actually exists for each subsystem
   259  		// of note, we always use the cgroupfs driver when performing this check since
   260  		// the input is provided in that format.
   261  		// this is important because we do not want any name conversion to occur.
   262  		if err := cgroupManager.Validate(cgroupRoot); err != nil {
   263  			return nil, fmt.Errorf("invalid configuration: %w", err)
   264  		}
   265  		klog.InfoS("Container manager verified user specified cgroup-root exists", "cgroupRoot", cgroupRoot)
   266  		// Include the top level cgroup for enforcing node allocatable into cgroup-root.
   267  		// This way, all sub modules can avoid having to understand the concept of node allocatable.
   268  		cgroupRoot = NewCgroupName(cgroupRoot, defaultNodeAllocatableCgroupName)
   269  	}
   270  	klog.InfoS("Creating Container Manager object based on Node Config", "nodeConfig", nodeConfig)
   271  
   272  	qosContainerManager, err := NewQOSContainerManager(subsystems, cgroupRoot, nodeConfig, cgroupManager)
   273  	if err != nil {
   274  		return nil, err
   275  	}
   276  
   277  	cm := &containerManagerImpl{
   278  		cadvisorInterface:   cadvisorInterface,
   279  		mountUtil:           mountUtil,
   280  		NodeConfig:          nodeConfig,
   281  		subsystems:          subsystems,
   282  		cgroupManager:       cgroupManager,
   283  		capacity:            capacity,
   284  		internalCapacity:    internalCapacity,
   285  		cgroupRoot:          cgroupRoot,
   286  		recorder:            recorder,
   287  		qosContainerManager: qosContainerManager,
   288  	}
   289  
   290  	cm.topologyManager, err = topologymanager.NewManager(
   291  		machineInfo.Topology,
   292  		nodeConfig.TopologyManagerPolicy,
   293  		nodeConfig.TopologyManagerScope,
   294  		nodeConfig.TopologyManagerPolicyOptions,
   295  	)
   296  
   297  	if err != nil {
   298  		return nil, err
   299  	}
   300  
   301  	klog.InfoS("Creating device plugin manager")
   302  	cm.deviceManager, err = devicemanager.NewManagerImpl(machineInfo.Topology, cm.topologyManager)
   303  	if err != nil {
   304  		return nil, err
   305  	}
   306  	cm.topologyManager.AddHintProvider(cm.deviceManager)
   307  
   308  	// initialize DRA manager
   309  	if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.DynamicResourceAllocation) {
   310  		klog.InfoS("Creating Dynamic Resource Allocation (DRA) manager")
   311  		cm.draManager, err = dra.NewManagerImpl(kubeClient, nodeConfig.KubeletRootDir, nodeConfig.NodeName)
   312  		if err != nil {
   313  			return nil, err
   314  		}
   315  	}
   316  
   317  	// Initialize CPU manager
   318  	cm.cpuManager, err = cpumanager.NewManager(
   319  		nodeConfig.CPUManagerPolicy,
   320  		nodeConfig.CPUManagerPolicyOptions,
   321  		nodeConfig.CPUManagerReconcilePeriod,
   322  		machineInfo,
   323  		nodeConfig.NodeAllocatableConfig.ReservedSystemCPUs,
   324  		cm.GetNodeAllocatableReservation(),
   325  		nodeConfig.KubeletRootDir,
   326  		cm.topologyManager,
   327  	)
   328  	if err != nil {
   329  		klog.ErrorS(err, "Failed to initialize cpu manager")
   330  		return nil, err
   331  	}
   332  	cm.topologyManager.AddHintProvider(cm.cpuManager)
   333  
   334  	if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.MemoryManager) {
   335  		cm.memoryManager, err = memorymanager.NewManager(
   336  			nodeConfig.ExperimentalMemoryManagerPolicy,
   337  			machineInfo,
   338  			cm.GetNodeAllocatableReservation(),
   339  			nodeConfig.ExperimentalMemoryManagerReservedMemory,
   340  			nodeConfig.KubeletRootDir,
   341  			cm.topologyManager,
   342  		)
   343  		if err != nil {
   344  			klog.ErrorS(err, "Failed to initialize memory manager")
   345  			return nil, err
   346  		}
   347  		cm.topologyManager.AddHintProvider(cm.memoryManager)
   348  	}
   349  
   350  	return cm, nil
   351  }
   352  
   353  // NewPodContainerManager is a factory method returns a PodContainerManager object
   354  // If qosCgroups are enabled then it returns the general pod container manager
   355  // otherwise it returns a no-op manager which essentially does nothing
   356  func (cm *containerManagerImpl) NewPodContainerManager() PodContainerManager {
   357  	if cm.NodeConfig.CgroupsPerQOS {
   358  		return &podContainerManagerImpl{
   359  			qosContainersInfo: cm.GetQOSContainersInfo(),
   360  			subsystems:        cm.subsystems,
   361  			cgroupManager:     cm.cgroupManager,
   362  			podPidsLimit:      cm.PodPidsLimit,
   363  			enforceCPULimits:  cm.EnforceCPULimits,
   364  			// cpuCFSQuotaPeriod is in microseconds. NodeConfig.CPUCFSQuotaPeriod is time.Duration (measured in nano seconds).
   365  			// Convert (cm.CPUCFSQuotaPeriod) [nanoseconds] / time.Microsecond (1000) to get cpuCFSQuotaPeriod in microseconds.
   366  			cpuCFSQuotaPeriod: uint64(cm.CPUCFSQuotaPeriod / time.Microsecond),
   367  		}
   368  	}
   369  	return &podContainerManagerNoop{
   370  		cgroupRoot: cm.cgroupRoot,
   371  	}
   372  }
   373  
   374  func (cm *containerManagerImpl) InternalContainerLifecycle() InternalContainerLifecycle {
   375  	return &internalContainerLifecycleImpl{cm.cpuManager, cm.memoryManager, cm.topologyManager}
   376  }
   377  
   378  // Create a cgroup container manager.
   379  func createManager(containerName string) (cgroups.Manager, error) {
   380  	cg := &configs.Cgroup{
   381  		Parent: "/",
   382  		Name:   containerName,
   383  		Resources: &configs.Resources{
   384  			SkipDevices: true,
   385  		},
   386  		Systemd: false,
   387  	}
   388  
   389  	return manager.New(cg)
   390  }
   391  
   392  type KernelTunableBehavior string
   393  
   394  const (
   395  	KernelTunableWarn   KernelTunableBehavior = "warn"
   396  	KernelTunableError  KernelTunableBehavior = "error"
   397  	KernelTunableModify KernelTunableBehavior = "modify"
   398  )
   399  
   400  // setupKernelTunables validates kernel tunable flags are set as expected
   401  // depending upon the specified option, it will either warn, error, or modify the kernel tunable flags
   402  func setupKernelTunables(option KernelTunableBehavior) error {
   403  	desiredState := map[string]int{
   404  		utilsysctl.VMOvercommitMemory: utilsysctl.VMOvercommitMemoryAlways,
   405  		utilsysctl.VMPanicOnOOM:       utilsysctl.VMPanicOnOOMInvokeOOMKiller,
   406  		utilsysctl.KernelPanic:        utilsysctl.KernelPanicRebootTimeout,
   407  		utilsysctl.KernelPanicOnOops:  utilsysctl.KernelPanicOnOopsAlways,
   408  		utilsysctl.RootMaxKeys:        utilsysctl.RootMaxKeysSetting,
   409  		utilsysctl.RootMaxBytes:       utilsysctl.RootMaxBytesSetting,
   410  	}
   411  
   412  	sysctl := utilsysctl.New()
   413  
   414  	errList := []error{}
   415  	for flag, expectedValue := range desiredState {
   416  		val, err := sysctl.GetSysctl(flag)
   417  		if err != nil {
   418  			errList = append(errList, err)
   419  			continue
   420  		}
   421  		if val == expectedValue {
   422  			continue
   423  		}
   424  
   425  		switch option {
   426  		case KernelTunableError:
   427  			errList = append(errList, fmt.Errorf("invalid kernel flag: %v, expected value: %v, actual value: %v", flag, expectedValue, val))
   428  		case KernelTunableWarn:
   429  			klog.V(2).InfoS("Invalid kernel flag", "flag", flag, "expectedValue", expectedValue, "actualValue", val)
   430  		case KernelTunableModify:
   431  			klog.V(2).InfoS("Updating kernel flag", "flag", flag, "expectedValue", expectedValue, "actualValue", val)
   432  			err = sysctl.SetSysctl(flag, expectedValue)
   433  			if err != nil {
   434  				if inuserns.RunningInUserNS() {
   435  					if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.KubeletInUserNamespace) {
   436  						klog.V(2).InfoS("Updating kernel flag failed (running in UserNS, ignoring)", "flag", flag, "err", err)
   437  						continue
   438  					}
   439  					klog.ErrorS(err, "Updating kernel flag failed (Hint: enable KubeletInUserNamespace feature flag to ignore the error)", "flag", flag)
   440  				}
   441  				errList = append(errList, err)
   442  			}
   443  		}
   444  	}
   445  	return utilerrors.NewAggregate(errList)
   446  }
   447  
   448  func (cm *containerManagerImpl) setupNode(activePods ActivePodsFunc) error {
   449  	f, err := validateSystemRequirements(cm.mountUtil)
   450  	if err != nil {
   451  		return err
   452  	}
   453  	if !f.cpuHardcapping {
   454  		cm.status.SoftRequirements = fmt.Errorf("CPU hardcapping unsupported")
   455  	}
   456  	b := KernelTunableModify
   457  	if cm.GetNodeConfig().ProtectKernelDefaults {
   458  		b = KernelTunableError
   459  	}
   460  	if err := setupKernelTunables(b); err != nil {
   461  		return err
   462  	}
   463  
   464  	// Setup top level qos containers only if CgroupsPerQOS flag is specified as true
   465  	if cm.NodeConfig.CgroupsPerQOS {
   466  		if err := cm.createNodeAllocatableCgroups(); err != nil {
   467  			return err
   468  		}
   469  		err = cm.qosContainerManager.Start(cm.GetNodeAllocatableAbsolute, activePods)
   470  		if err != nil {
   471  			return fmt.Errorf("failed to initialize top level QOS containers: %v", err)
   472  		}
   473  	}
   474  
   475  	// Enforce Node Allocatable (if required)
   476  	if err := cm.enforceNodeAllocatableCgroups(); err != nil {
   477  		return err
   478  	}
   479  
   480  	systemContainers := []*systemContainer{}
   481  
   482  	if cm.SystemCgroupsName != "" {
   483  		if cm.SystemCgroupsName == "/" {
   484  			return fmt.Errorf("system container cannot be root (\"/\")")
   485  		}
   486  		cont, err := newSystemCgroups(cm.SystemCgroupsName)
   487  		if err != nil {
   488  			return err
   489  		}
   490  		cont.ensureStateFunc = func(manager cgroups.Manager) error {
   491  			return ensureSystemCgroups("/", manager)
   492  		}
   493  		systemContainers = append(systemContainers, cont)
   494  	}
   495  
   496  	if cm.KubeletCgroupsName != "" {
   497  		cont, err := newSystemCgroups(cm.KubeletCgroupsName)
   498  		if err != nil {
   499  			return err
   500  		}
   501  
   502  		cont.ensureStateFunc = func(_ cgroups.Manager) error {
   503  			return ensureProcessInContainerWithOOMScore(os.Getpid(), int(cm.KubeletOOMScoreAdj), cont.manager)
   504  		}
   505  		systemContainers = append(systemContainers, cont)
   506  	} else {
   507  		cm.periodicTasks = append(cm.periodicTasks, func() {
   508  			if err := ensureProcessInContainerWithOOMScore(os.Getpid(), int(cm.KubeletOOMScoreAdj), nil); err != nil {
   509  				klog.ErrorS(err, "Failed to ensure process in container with oom score")
   510  				return
   511  			}
   512  			cont, err := getContainer(os.Getpid())
   513  			if err != nil {
   514  				klog.ErrorS(err, "Failed to find cgroups of kubelet")
   515  				return
   516  			}
   517  			cm.Lock()
   518  			defer cm.Unlock()
   519  
   520  			cm.KubeletCgroupsName = cont
   521  		})
   522  	}
   523  
   524  	cm.systemContainers = systemContainers
   525  	return nil
   526  }
   527  
   528  func (cm *containerManagerImpl) GetNodeConfig() NodeConfig {
   529  	cm.RLock()
   530  	defer cm.RUnlock()
   531  	return cm.NodeConfig
   532  }
   533  
   534  // GetPodCgroupRoot returns the literal cgroupfs value for the cgroup containing all pods.
   535  func (cm *containerManagerImpl) GetPodCgroupRoot() string {
   536  	return cm.cgroupManager.Name(cm.cgroupRoot)
   537  }
   538  
   539  func (cm *containerManagerImpl) GetMountedSubsystems() *CgroupSubsystems {
   540  	return cm.subsystems
   541  }
   542  
   543  func (cm *containerManagerImpl) GetQOSContainersInfo() QOSContainersInfo {
   544  	return cm.qosContainerManager.GetQOSContainersInfo()
   545  }
   546  
   547  func (cm *containerManagerImpl) UpdateQOSCgroups() error {
   548  	return cm.qosContainerManager.UpdateCgroups()
   549  }
   550  
   551  func (cm *containerManagerImpl) Status() Status {
   552  	cm.RLock()
   553  	defer cm.RUnlock()
   554  	return cm.status
   555  }
   556  
   557  func (cm *containerManagerImpl) Start(node *v1.Node,
   558  	activePods ActivePodsFunc,
   559  	sourcesReady config.SourcesReady,
   560  	podStatusProvider status.PodStatusProvider,
   561  	runtimeService internalapi.RuntimeService,
   562  	localStorageCapacityIsolation bool) error {
   563  	ctx := context.Background()
   564  
   565  	containerMap, containerRunningSet := buildContainerMapAndRunningSetFromRuntime(ctx, runtimeService)
   566  
   567  	// Initialize CPU manager
   568  	err := cm.cpuManager.Start(cpumanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService, containerMap)
   569  	if err != nil {
   570  		return fmt.Errorf("start cpu manager error: %v", err)
   571  	}
   572  
   573  	// Initialize memory manager
   574  	if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.MemoryManager) {
   575  		containerMap, _ := buildContainerMapAndRunningSetFromRuntime(ctx, runtimeService)
   576  		err := cm.memoryManager.Start(memorymanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService, containerMap)
   577  		if err != nil {
   578  			return fmt.Errorf("start memory manager error: %v", err)
   579  		}
   580  	}
   581  
   582  	// cache the node Info including resource capacity and
   583  	// allocatable of the node
   584  	cm.nodeInfo = node
   585  
   586  	if localStorageCapacityIsolation {
   587  		rootfs, err := cm.cadvisorInterface.RootFsInfo()
   588  		if err != nil {
   589  			return fmt.Errorf("failed to get rootfs info: %v", err)
   590  		}
   591  		for rName, rCap := range cadvisor.EphemeralStorageCapacityFromFsInfo(rootfs) {
   592  			cm.capacity[rName] = rCap
   593  		}
   594  	}
   595  
   596  	// Ensure that node allocatable configuration is valid.
   597  	if err := cm.validateNodeAllocatable(); err != nil {
   598  		return err
   599  	}
   600  
   601  	// Setup the node
   602  	if err := cm.setupNode(activePods); err != nil {
   603  		return err
   604  	}
   605  
   606  	// Don't run a background thread if there are no ensureStateFuncs.
   607  	hasEnsureStateFuncs := false
   608  	for _, cont := range cm.systemContainers {
   609  		if cont.ensureStateFunc != nil {
   610  			hasEnsureStateFuncs = true
   611  			break
   612  		}
   613  	}
   614  	if hasEnsureStateFuncs {
   615  		// Run ensure state functions every minute.
   616  		go wait.Until(func() {
   617  			for _, cont := range cm.systemContainers {
   618  				if cont.ensureStateFunc != nil {
   619  					if err := cont.ensureStateFunc(cont.manager); err != nil {
   620  						klog.InfoS("Failed to ensure state", "containerName", cont.name, "err", err)
   621  					}
   622  				}
   623  			}
   624  		}, time.Minute, wait.NeverStop)
   625  
   626  	}
   627  
   628  	if len(cm.periodicTasks) > 0 {
   629  		go wait.Until(func() {
   630  			for _, task := range cm.periodicTasks {
   631  				if task != nil {
   632  					task()
   633  				}
   634  			}
   635  		}, 5*time.Minute, wait.NeverStop)
   636  	}
   637  
   638  	// Starts device manager.
   639  	if err := cm.deviceManager.Start(devicemanager.ActivePodsFunc(activePods), sourcesReady, containerMap, containerRunningSet); err != nil {
   640  		return err
   641  	}
   642  
   643  	return nil
   644  }
   645  
   646  func (cm *containerManagerImpl) GetPluginRegistrationHandler() cache.PluginHandler {
   647  	return cm.deviceManager.GetWatcherHandler()
   648  }
   649  
   650  // TODO: move the GetResources logic to PodContainerManager.
   651  func (cm *containerManagerImpl) GetResources(pod *v1.Pod, container *v1.Container) (*kubecontainer.RunContainerOptions, error) {
   652  	opts := &kubecontainer.RunContainerOptions{}
   653  	if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.DynamicResourceAllocation) {
   654  		resOpts, err := cm.draManager.GetResources(pod, container)
   655  		if err != nil {
   656  			return nil, err
   657  		}
   658  		// NOTE: Passing CDI device names as annotations is a temporary solution
   659  		// It will be removed after all runtimes are updated
   660  		// to get CDI device names from the ContainerConfig.CDIDevices field
   661  		opts.Annotations = append(opts.Annotations, resOpts.Annotations...)
   662  		opts.CDIDevices = append(opts.CDIDevices, resOpts.CDIDevices...)
   663  	}
   664  	// Allocate should already be called during predicateAdmitHandler.Admit(),
   665  	// just try to fetch device runtime information from cached state here
   666  	devOpts, err := cm.deviceManager.GetDeviceRunContainerOptions(pod, container)
   667  	if err != nil {
   668  		return nil, err
   669  	} else if devOpts == nil {
   670  		return opts, nil
   671  	}
   672  	opts.Devices = append(opts.Devices, devOpts.Devices...)
   673  	opts.Mounts = append(opts.Mounts, devOpts.Mounts...)
   674  	opts.Envs = append(opts.Envs, devOpts.Envs...)
   675  	opts.Annotations = append(opts.Annotations, devOpts.Annotations...)
   676  	opts.CDIDevices = append(opts.CDIDevices, devOpts.CDIDevices...)
   677  	return opts, nil
   678  }
   679  
   680  func (cm *containerManagerImpl) UpdatePluginResources(node *schedulerframework.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
   681  	return cm.deviceManager.UpdatePluginResources(node, attrs)
   682  }
   683  
   684  func (cm *containerManagerImpl) GetAllocateResourcesPodAdmitHandler() lifecycle.PodAdmitHandler {
   685  	return cm.topologyManager
   686  }
   687  
   688  func (cm *containerManagerImpl) SystemCgroupsLimit() v1.ResourceList {
   689  	cpuLimit := int64(0)
   690  
   691  	// Sum up resources of all external containers.
   692  	for _, cont := range cm.systemContainers {
   693  		cpuLimit += cont.cpuMillicores
   694  	}
   695  
   696  	return v1.ResourceList{
   697  		v1.ResourceCPU: *resource.NewMilliQuantity(
   698  			cpuLimit,
   699  			resource.DecimalSI),
   700  	}
   701  }
   702  
   703  func isProcessRunningInHost(pid int) (bool, error) {
   704  	// Get init pid namespace.
   705  	initPidNs, err := os.Readlink("/proc/1/ns/pid")
   706  	if err != nil {
   707  		return false, fmt.Errorf("failed to find pid namespace of init process")
   708  	}
   709  	klog.V(10).InfoS("Found init PID namespace", "namespace", initPidNs)
   710  	processPidNs, err := os.Readlink(fmt.Sprintf("/proc/%d/ns/pid", pid))
   711  	if err != nil {
   712  		return false, fmt.Errorf("failed to find pid namespace of process %q", pid)
   713  	}
   714  	klog.V(10).InfoS("Process info", "pid", pid, "namespace", processPidNs)
   715  	return initPidNs == processPidNs, nil
   716  }
   717  
   718  func ensureProcessInContainerWithOOMScore(pid int, oomScoreAdj int, manager cgroups.Manager) error {
   719  	if runningInHost, err := isProcessRunningInHost(pid); err != nil {
   720  		// Err on the side of caution. Avoid moving the docker daemon unless we are able to identify its context.
   721  		return err
   722  	} else if !runningInHost {
   723  		// Process is running inside a container. Don't touch that.
   724  		klog.V(2).InfoS("PID is not running in the host namespace", "pid", pid)
   725  		return nil
   726  	}
   727  
   728  	var errs []error
   729  	if manager != nil {
   730  		cont, err := getContainer(pid)
   731  		if err != nil {
   732  			errs = append(errs, fmt.Errorf("failed to find container of PID %d: %v", pid, err))
   733  		}
   734  
   735  		name := ""
   736  		cgroups, err := manager.GetCgroups()
   737  		if err != nil {
   738  			errs = append(errs, fmt.Errorf("failed to get cgroups for %d: %v", pid, err))
   739  		} else {
   740  			name = cgroups.Name
   741  		}
   742  
   743  		if cont != name {
   744  			err = manager.Apply(pid)
   745  			if err != nil {
   746  				errs = append(errs, fmt.Errorf("failed to move PID %d (in %q) to %q: %v", pid, cont, name, err))
   747  			}
   748  		}
   749  	}
   750  
   751  	// Also apply oom-score-adj to processes
   752  	oomAdjuster := oom.NewOOMAdjuster()
   753  	klog.V(5).InfoS("Attempting to apply oom_score_adj to process", "oomScoreAdj", oomScoreAdj, "pid", pid)
   754  	if err := oomAdjuster.ApplyOOMScoreAdj(pid, oomScoreAdj); err != nil {
   755  		klog.V(3).InfoS("Failed to apply oom_score_adj to process", "oomScoreAdj", oomScoreAdj, "pid", pid, "err", err)
   756  		errs = append(errs, fmt.Errorf("failed to apply oom score %d to PID %d: %v", oomScoreAdj, pid, err))
   757  	}
   758  	return utilerrors.NewAggregate(errs)
   759  }
   760  
   761  // getContainer returns the cgroup associated with the specified pid.
   762  // It enforces a unified hierarchy for memory and cpu cgroups.
   763  // On systemd environments, it uses the name=systemd cgroup for the specified pid.
   764  func getContainer(pid int) (string, error) {
   765  	cgs, err := cgroups.ParseCgroupFile(fmt.Sprintf("/proc/%d/cgroup", pid))
   766  	if err != nil {
   767  		return "", err
   768  	}
   769  
   770  	if cgroups.IsCgroup2UnifiedMode() {
   771  		c, found := cgs[""]
   772  		if !found {
   773  			return "", cgroups.NewNotFoundError("unified")
   774  		}
   775  		return c, nil
   776  	}
   777  
   778  	cpu, found := cgs["cpu"]
   779  	if !found {
   780  		return "", cgroups.NewNotFoundError("cpu")
   781  	}
   782  	memory, found := cgs["memory"]
   783  	if !found {
   784  		return "", cgroups.NewNotFoundError("memory")
   785  	}
   786  
   787  	// since we use this container for accounting, we need to ensure its a unified hierarchy.
   788  	if cpu != memory {
   789  		return "", fmt.Errorf("cpu and memory cgroup hierarchy not unified.  cpu: %s, memory: %s", cpu, memory)
   790  	}
   791  
   792  	// on systemd, every pid is in a unified cgroup hierarchy (name=systemd as seen in systemd-cgls)
   793  	// cpu and memory accounting is off by default, users may choose to enable it per unit or globally.
   794  	// users could enable CPU and memory accounting globally via /etc/systemd/system.conf (DefaultCPUAccounting=true DefaultMemoryAccounting=true).
   795  	// users could also enable CPU and memory accounting per unit via CPUAccounting=true and MemoryAccounting=true
   796  	// we only warn if accounting is not enabled for CPU or memory so as to not break local development flows where kubelet is launched in a terminal.
   797  	// for example, the cgroup for the user session will be something like /user.slice/user-X.slice/session-X.scope, but the cpu and memory
   798  	// cgroup will be the closest ancestor where accounting is performed (most likely /) on systems that launch docker containers.
   799  	// as a result, on those systems, you will not get cpu or memory accounting statistics for kubelet.
   800  	// in addition, you would not get memory or cpu accounting for the runtime unless accounting was enabled on its unit (or globally).
   801  	if systemd, found := cgs["name=systemd"]; found {
   802  		if systemd != cpu {
   803  			klog.InfoS("CPUAccounting not enabled for process", "pid", pid)
   804  		}
   805  		if systemd != memory {
   806  			klog.InfoS("MemoryAccounting not enabled for process", "pid", pid)
   807  		}
   808  		return systemd, nil
   809  	}
   810  
   811  	return cpu, nil
   812  }
   813  
   814  // Ensures the system container is created and all non-kernel threads and process 1
   815  // without a container are moved to it.
   816  //
   817  // The reason of leaving kernel threads at root cgroup is that we don't want to tie the
   818  // execution of these threads with to-be defined /system quota and create priority inversions.
   819  func ensureSystemCgroups(rootCgroupPath string, manager cgroups.Manager) error {
   820  	// Move non-kernel PIDs to the system container.
   821  	// Only keep errors on latest attempt.
   822  	var finalErr error
   823  	for i := 0; i <= 10; i++ {
   824  		allPids, err := cmutil.GetPids(rootCgroupPath)
   825  		if err != nil {
   826  			finalErr = fmt.Errorf("failed to list PIDs for root: %v", err)
   827  			continue
   828  		}
   829  
   830  		// Remove kernel pids and other protected PIDs (pid 1, PIDs already in system & kubelet containers)
   831  		pids := make([]int, 0, len(allPids))
   832  		for _, pid := range allPids {
   833  			if pid == 1 || isKernelPid(pid) {
   834  				continue
   835  			}
   836  
   837  			pids = append(pids, pid)
   838  		}
   839  
   840  		// Check if we have moved all the non-kernel PIDs.
   841  		if len(pids) == 0 {
   842  			return nil
   843  		}
   844  
   845  		klog.V(3).InfoS("Moving non-kernel processes", "pids", pids)
   846  		for _, pid := range pids {
   847  			err := manager.Apply(pid)
   848  			if err != nil {
   849  				name := ""
   850  				cgroups, err := manager.GetCgroups()
   851  				if err == nil {
   852  					name = cgroups.Name
   853  				}
   854  
   855  				finalErr = fmt.Errorf("failed to move PID %d into the system container %q: %v", pid, name, err)
   856  			}
   857  		}
   858  
   859  	}
   860  
   861  	return finalErr
   862  }
   863  
   864  // Determines whether the specified PID is a kernel PID.
   865  func isKernelPid(pid int) bool {
   866  	// Kernel threads have no associated executable.
   867  	_, err := os.Readlink(fmt.Sprintf("/proc/%d/exe", pid))
   868  	return err != nil && os.IsNotExist(err)
   869  }
   870  
   871  // GetCapacity returns node capacity data for "cpu", "memory", "ephemeral-storage", and "huge-pages*"
   872  // At present this method is only invoked when introspecting ephemeral storage
   873  func (cm *containerManagerImpl) GetCapacity(localStorageCapacityIsolation bool) v1.ResourceList {
   874  	if localStorageCapacityIsolation {
   875  		// We store allocatable ephemeral-storage in the capacity property once we Start() the container manager
   876  		if _, ok := cm.capacity[v1.ResourceEphemeralStorage]; !ok {
   877  			// If we haven't yet stored the capacity for ephemeral-storage, we can try to fetch it directly from cAdvisor,
   878  			if cm.cadvisorInterface != nil {
   879  				rootfs, err := cm.cadvisorInterface.RootFsInfo()
   880  				if err != nil {
   881  					klog.ErrorS(err, "Unable to get rootfs data from cAdvisor interface")
   882  					// If the rootfsinfo retrieval from cAdvisor fails for any reason, fallback to returning the capacity property with no ephemeral storage data
   883  					return cm.capacity
   884  				}
   885  				// We don't want to mutate cm.capacity here so we'll manually construct a v1.ResourceList from it,
   886  				// and add ephemeral-storage
   887  				capacityWithEphemeralStorage := v1.ResourceList{}
   888  				for rName, rQuant := range cm.capacity {
   889  					capacityWithEphemeralStorage[rName] = rQuant
   890  				}
   891  				capacityWithEphemeralStorage[v1.ResourceEphemeralStorage] = cadvisor.EphemeralStorageCapacityFromFsInfo(rootfs)[v1.ResourceEphemeralStorage]
   892  				return capacityWithEphemeralStorage
   893  			}
   894  		}
   895  	}
   896  	return cm.capacity
   897  }
   898  
   899  func (cm *containerManagerImpl) GetDevicePluginResourceCapacity() (v1.ResourceList, v1.ResourceList, []string) {
   900  	return cm.deviceManager.GetCapacity()
   901  }
   902  
   903  func (cm *containerManagerImpl) GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices {
   904  	return containerDevicesFromResourceDeviceInstances(cm.deviceManager.GetDevices(podUID, containerName))
   905  }
   906  
   907  func (cm *containerManagerImpl) GetAllocatableDevices() []*podresourcesapi.ContainerDevices {
   908  	return containerDevicesFromResourceDeviceInstances(cm.deviceManager.GetAllocatableDevices())
   909  }
   910  
   911  func int64Slice(in []int) []int64 {
   912  	out := make([]int64, len(in))
   913  	for i := range in {
   914  		out[i] = int64(in[i])
   915  	}
   916  	return out
   917  }
   918  
   919  func (cm *containerManagerImpl) GetCPUs(podUID, containerName string) []int64 {
   920  	if cm.cpuManager != nil {
   921  		return int64Slice(cm.cpuManager.GetExclusiveCPUs(podUID, containerName).UnsortedList())
   922  	}
   923  	return []int64{}
   924  }
   925  
   926  func (cm *containerManagerImpl) GetAllocatableCPUs() []int64 {
   927  	if cm.cpuManager != nil {
   928  		return int64Slice(cm.cpuManager.GetAllocatableCPUs().UnsortedList())
   929  	}
   930  	return []int64{}
   931  }
   932  
   933  func (cm *containerManagerImpl) GetMemory(podUID, containerName string) []*podresourcesapi.ContainerMemory {
   934  	if cm.memoryManager == nil {
   935  		return []*podresourcesapi.ContainerMemory{}
   936  	}
   937  
   938  	return containerMemoryFromBlock(cm.memoryManager.GetMemory(podUID, containerName))
   939  }
   940  
   941  func (cm *containerManagerImpl) GetAllocatableMemory() []*podresourcesapi.ContainerMemory {
   942  	if cm.memoryManager == nil {
   943  		return []*podresourcesapi.ContainerMemory{}
   944  	}
   945  
   946  	return containerMemoryFromBlock(cm.memoryManager.GetAllocatableMemory())
   947  }
   948  
   949  func (cm *containerManagerImpl) GetDynamicResources(pod *v1.Pod, container *v1.Container) []*podresourcesapi.DynamicResource {
   950  	if !utilfeature.DefaultFeatureGate.Enabled(kubefeatures.DynamicResourceAllocation) {
   951  		return []*podresourcesapi.DynamicResource{}
   952  	}
   953  
   954  	var containerDynamicResources []*podresourcesapi.DynamicResource
   955  	containerClaimInfos, err := cm.draManager.GetContainerClaimInfos(pod, container)
   956  	if err != nil {
   957  		klog.ErrorS(err, "Unable to get container claim info state")
   958  		return []*podresourcesapi.DynamicResource{}
   959  	}
   960  	for _, containerClaimInfo := range containerClaimInfos {
   961  		var claimResources []*podresourcesapi.ClaimResource
   962  		containerClaimInfo.RLock()
   963  		// TODO: Currently  we maintain a list of ClaimResources, each of which contains
   964  		// a set of CDIDevices from a different kubelet plugin. In the future we may want to
   965  		// include the name of the kubelet plugin and/or other types of resources that are
   966  		// not CDIDevices (assuming the DRAmanager supports this).
   967  		for _, klPluginCdiDevices := range containerClaimInfo.CDIDevices {
   968  			var cdiDevices []*podresourcesapi.CDIDevice
   969  			for _, cdiDevice := range klPluginCdiDevices {
   970  				cdiDevices = append(cdiDevices, &podresourcesapi.CDIDevice{Name: cdiDevice})
   971  			}
   972  			claimResources = append(claimResources, &podresourcesapi.ClaimResource{CDIDevices: cdiDevices})
   973  		}
   974  		containerClaimInfo.RUnlock()
   975  		containerDynamicResource := podresourcesapi.DynamicResource{
   976  			ClassName:      containerClaimInfo.ClassName,
   977  			ClaimName:      containerClaimInfo.ClaimName,
   978  			ClaimNamespace: containerClaimInfo.Namespace,
   979  			ClaimResources: claimResources,
   980  		}
   981  		containerDynamicResources = append(containerDynamicResources, &containerDynamicResource)
   982  	}
   983  	return containerDynamicResources
   984  }
   985  
   986  func (cm *containerManagerImpl) ShouldResetExtendedResourceCapacity() bool {
   987  	return cm.deviceManager.ShouldResetExtendedResourceCapacity()
   988  }
   989  
   990  func (cm *containerManagerImpl) UpdateAllocatedDevices() {
   991  	cm.deviceManager.UpdateAllocatedDevices()
   992  }
   993  
   994  func containerMemoryFromBlock(blocks []memorymanagerstate.Block) []*podresourcesapi.ContainerMemory {
   995  	var containerMemories []*podresourcesapi.ContainerMemory
   996  
   997  	for _, b := range blocks {
   998  		containerMemory := podresourcesapi.ContainerMemory{
   999  			MemoryType: string(b.Type),
  1000  			Size_:      b.Size,
  1001  			Topology: &podresourcesapi.TopologyInfo{
  1002  				Nodes: []*podresourcesapi.NUMANode{},
  1003  			},
  1004  		}
  1005  
  1006  		for _, numaNodeID := range b.NUMAAffinity {
  1007  			containerMemory.Topology.Nodes = append(containerMemory.Topology.Nodes, &podresourcesapi.NUMANode{ID: int64(numaNodeID)})
  1008  		}
  1009  
  1010  		containerMemories = append(containerMemories, &containerMemory)
  1011  	}
  1012  
  1013  	return containerMemories
  1014  }
  1015  
  1016  func (cm *containerManagerImpl) PrepareDynamicResources(pod *v1.Pod) error {
  1017  	return cm.draManager.PrepareResources(pod)
  1018  }
  1019  
  1020  func (cm *containerManagerImpl) UnprepareDynamicResources(pod *v1.Pod) error {
  1021  	return cm.draManager.UnprepareResources(pod)
  1022  }
  1023  
  1024  func (cm *containerManagerImpl) PodMightNeedToUnprepareResources(UID types.UID) bool {
  1025  	return cm.draManager.PodMightNeedToUnprepareResources(UID)
  1026  }
  1027  

View as plain text