...

Source file src/k8s.io/kubernetes/pkg/kubelet/cm/node_container_manager_linux.go

Documentation: k8s.io/kubernetes/pkg/kubelet/cm

     1  //go:build linux
     2  // +build linux
     3  
     4  /*
     5  Copyright 2017 The Kubernetes Authors.
     6  
     7  Licensed under the Apache License, Version 2.0 (the "License");
     8  you may not use this file except in compliance with the License.
     9  You may obtain a copy of the License at
    10  
    11      http://www.apache.org/licenses/LICENSE-2.0
    12  
    13  Unless required by applicable law or agreed to in writing, software
    14  distributed under the License is distributed on an "AS IS" BASIS,
    15  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    16  See the License for the specific language governing permissions and
    17  limitations under the License.
    18  */
    19  
    20  package cm
    21  
    22  import (
    23  	"fmt"
    24  	"strconv"
    25  	"strings"
    26  	"time"
    27  
    28  	v1 "k8s.io/api/core/v1"
    29  	"k8s.io/apimachinery/pkg/api/resource"
    30  	"k8s.io/apimachinery/pkg/types"
    31  	utilfeature "k8s.io/apiserver/pkg/util/feature"
    32  	"k8s.io/klog/v2"
    33  	kubefeatures "k8s.io/kubernetes/pkg/features"
    34  	"k8s.io/kubernetes/pkg/kubelet/events"
    35  	"k8s.io/kubernetes/pkg/kubelet/stats/pidlimit"
    36  	kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
    37  )
    38  
    39  const (
    40  	defaultNodeAllocatableCgroupName = "kubepods"
    41  )
    42  
    43  // createNodeAllocatableCgroups creates Node Allocatable Cgroup when CgroupsPerQOS flag is specified as true
    44  func (cm *containerManagerImpl) createNodeAllocatableCgroups() error {
    45  	nodeAllocatable := cm.internalCapacity
    46  	// Use Node Allocatable limits instead of capacity if the user requested enforcing node allocatable.
    47  	nc := cm.NodeConfig.NodeAllocatableConfig
    48  	if cm.CgroupsPerQOS && nc.EnforceNodeAllocatable.Has(kubetypes.NodeAllocatableEnforcementKey) {
    49  		nodeAllocatable = cm.getNodeAllocatableInternalAbsolute()
    50  	}
    51  
    52  	cgroupConfig := &CgroupConfig{
    53  		Name: cm.cgroupRoot,
    54  		// The default limits for cpu shares can be very low which can lead to CPU starvation for pods.
    55  		ResourceParameters: getCgroupConfig(nodeAllocatable),
    56  	}
    57  	if cm.cgroupManager.Exists(cgroupConfig.Name) {
    58  		return nil
    59  	}
    60  	if err := cm.cgroupManager.Create(cgroupConfig); err != nil {
    61  		klog.ErrorS(err, "Failed to create cgroup", "cgroupName", cm.cgroupRoot)
    62  		return err
    63  	}
    64  	return nil
    65  }
    66  
    67  // enforceNodeAllocatableCgroups enforce Node Allocatable Cgroup settings.
    68  func (cm *containerManagerImpl) enforceNodeAllocatableCgroups() error {
    69  	nc := cm.NodeConfig.NodeAllocatableConfig
    70  
    71  	// We need to update limits on node allocatable cgroup no matter what because
    72  	// default cpu shares on cgroups are low and can cause cpu starvation.
    73  	nodeAllocatable := cm.internalCapacity
    74  	// Use Node Allocatable limits instead of capacity if the user requested enforcing node allocatable.
    75  	if cm.CgroupsPerQOS && nc.EnforceNodeAllocatable.Has(kubetypes.NodeAllocatableEnforcementKey) {
    76  		nodeAllocatable = cm.getNodeAllocatableInternalAbsolute()
    77  	}
    78  
    79  	klog.V(4).InfoS("Attempting to enforce Node Allocatable", "config", nc)
    80  
    81  	cgroupConfig := &CgroupConfig{
    82  		Name:               cm.cgroupRoot,
    83  		ResourceParameters: getCgroupConfig(nodeAllocatable),
    84  	}
    85  
    86  	// Using ObjectReference for events as the node maybe not cached; refer to #42701 for detail.
    87  	nodeRef := &v1.ObjectReference{
    88  		Kind:      "Node",
    89  		Name:      cm.nodeInfo.Name,
    90  		UID:       types.UID(cm.nodeInfo.Name),
    91  		Namespace: "",
    92  	}
    93  
    94  	// If Node Allocatable is enforced on a node that has not been drained or is updated on an existing node to a lower value,
    95  	// existing memory usage across pods might be higher than current Node Allocatable Memory Limits.
    96  	// Pod Evictions are expected to bring down memory usage to below Node Allocatable limits.
    97  	// Until evictions happen retry cgroup updates.
    98  	// Update limits on non root cgroup-root to be safe since the default limits for CPU can be too low.
    99  	// Check if cgroupRoot is set to a non-empty value (empty would be the root container)
   100  	if len(cm.cgroupRoot) > 0 {
   101  		go func() {
   102  			for {
   103  				err := cm.cgroupManager.Update(cgroupConfig)
   104  				if err == nil {
   105  					cm.recorder.Event(nodeRef, v1.EventTypeNormal, events.SuccessfulNodeAllocatableEnforcement, "Updated Node Allocatable limit across pods")
   106  					return
   107  				}
   108  				message := fmt.Sprintf("Failed to update Node Allocatable Limits %q: %v", cm.cgroupRoot, err)
   109  				cm.recorder.Event(nodeRef, v1.EventTypeWarning, events.FailedNodeAllocatableEnforcement, message)
   110  				time.Sleep(time.Minute)
   111  			}
   112  		}()
   113  	}
   114  	// Now apply kube reserved and system reserved limits if required.
   115  	if nc.EnforceNodeAllocatable.Has(kubetypes.SystemReservedEnforcementKey) {
   116  		klog.V(2).InfoS("Enforcing system reserved on cgroup", "cgroupName", nc.SystemReservedCgroupName, "limits", nc.SystemReserved)
   117  		if err := enforceExistingCgroup(cm.cgroupManager, cm.cgroupManager.CgroupName(nc.SystemReservedCgroupName), nc.SystemReserved); err != nil {
   118  			message := fmt.Sprintf("Failed to enforce System Reserved Cgroup Limits on %q: %v", nc.SystemReservedCgroupName, err)
   119  			cm.recorder.Event(nodeRef, v1.EventTypeWarning, events.FailedNodeAllocatableEnforcement, message)
   120  			return fmt.Errorf(message)
   121  		}
   122  		cm.recorder.Eventf(nodeRef, v1.EventTypeNormal, events.SuccessfulNodeAllocatableEnforcement, "Updated limits on system reserved cgroup %v", nc.SystemReservedCgroupName)
   123  	}
   124  	if nc.EnforceNodeAllocatable.Has(kubetypes.KubeReservedEnforcementKey) {
   125  		klog.V(2).InfoS("Enforcing kube reserved on cgroup", "cgroupName", nc.KubeReservedCgroupName, "limits", nc.KubeReserved)
   126  		if err := enforceExistingCgroup(cm.cgroupManager, cm.cgroupManager.CgroupName(nc.KubeReservedCgroupName), nc.KubeReserved); err != nil {
   127  			message := fmt.Sprintf("Failed to enforce Kube Reserved Cgroup Limits on %q: %v", nc.KubeReservedCgroupName, err)
   128  			cm.recorder.Event(nodeRef, v1.EventTypeWarning, events.FailedNodeAllocatableEnforcement, message)
   129  			return fmt.Errorf(message)
   130  		}
   131  		cm.recorder.Eventf(nodeRef, v1.EventTypeNormal, events.SuccessfulNodeAllocatableEnforcement, "Updated limits on kube reserved cgroup %v", nc.KubeReservedCgroupName)
   132  	}
   133  	return nil
   134  }
   135  
   136  // enforceExistingCgroup updates the limits `rl` on existing cgroup `cName` using `cgroupManager` interface.
   137  func enforceExistingCgroup(cgroupManager CgroupManager, cName CgroupName, rl v1.ResourceList) error {
   138  	rp := getCgroupConfig(rl)
   139  	if rp == nil {
   140  		return fmt.Errorf("%q cgroup is not configured properly", cName)
   141  	}
   142  
   143  	// Enforce MemoryQoS for cgroups of kube-reserved/system-reserved. For more information,
   144  	// see https://github.com/kubernetes/enhancements/tree/master/keps/sig-node/2570-memory-qos
   145  	if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.MemoryQoS) {
   146  		if rp.Memory != nil {
   147  			if rp.Unified == nil {
   148  				rp.Unified = make(map[string]string)
   149  			}
   150  			rp.Unified[Cgroup2MemoryMin] = strconv.FormatInt(*rp.Memory, 10)
   151  		}
   152  	}
   153  
   154  	cgroupConfig := &CgroupConfig{
   155  		Name:               cName,
   156  		ResourceParameters: rp,
   157  	}
   158  	klog.V(4).InfoS("Enforcing limits on cgroup", "cgroupName", cName, "cpuShares", cgroupConfig.ResourceParameters.CPUShares, "memory", cgroupConfig.ResourceParameters.Memory, "pidsLimit", cgroupConfig.ResourceParameters.PidsLimit)
   159  	if err := cgroupManager.Validate(cgroupConfig.Name); err != nil {
   160  		return err
   161  	}
   162  	if err := cgroupManager.Update(cgroupConfig); err != nil {
   163  		return err
   164  	}
   165  	return nil
   166  }
   167  
   168  // getCgroupConfig returns a ResourceConfig object that can be used to create or update cgroups via CgroupManager interface.
   169  func getCgroupConfig(rl v1.ResourceList) *ResourceConfig {
   170  	// TODO(vishh): Set CPU Quota if necessary.
   171  	if rl == nil {
   172  		return nil
   173  	}
   174  	var rc ResourceConfig
   175  	if q, exists := rl[v1.ResourceMemory]; exists {
   176  		// Memory is defined in bytes.
   177  		val := q.Value()
   178  		rc.Memory = &val
   179  	}
   180  	if q, exists := rl[v1.ResourceCPU]; exists {
   181  		// CPU is defined in milli-cores.
   182  		val := MilliCPUToShares(q.MilliValue())
   183  		rc.CPUShares = &val
   184  	}
   185  	if q, exists := rl[pidlimit.PIDs]; exists {
   186  		val := q.Value()
   187  		rc.PidsLimit = &val
   188  	}
   189  	rc.HugePageLimit = HugePageLimits(rl)
   190  
   191  	return &rc
   192  }
   193  
   194  // GetNodeAllocatableAbsolute returns the absolute value of Node Allocatable which is primarily useful for enforcement.
   195  // Note that not all resources that are available on the node are included in the returned list of resources.
   196  // Returns a ResourceList.
   197  func (cm *containerManagerImpl) GetNodeAllocatableAbsolute() v1.ResourceList {
   198  	return cm.getNodeAllocatableAbsoluteImpl(cm.capacity)
   199  }
   200  
   201  func (cm *containerManagerImpl) getNodeAllocatableAbsoluteImpl(capacity v1.ResourceList) v1.ResourceList {
   202  	result := make(v1.ResourceList)
   203  	for k, v := range capacity {
   204  		value := v.DeepCopy()
   205  		if cm.NodeConfig.SystemReserved != nil {
   206  			value.Sub(cm.NodeConfig.SystemReserved[k])
   207  		}
   208  		if cm.NodeConfig.KubeReserved != nil {
   209  			value.Sub(cm.NodeConfig.KubeReserved[k])
   210  		}
   211  		if value.Sign() < 0 {
   212  			// Negative Allocatable resources don't make sense.
   213  			value.Set(0)
   214  		}
   215  		result[k] = value
   216  	}
   217  	return result
   218  }
   219  
   220  // getNodeAllocatableInternalAbsolute is similar to getNodeAllocatableAbsolute except that
   221  // it also includes internal resources (currently process IDs).  It is intended for setting
   222  // up top level cgroups only.
   223  func (cm *containerManagerImpl) getNodeAllocatableInternalAbsolute() v1.ResourceList {
   224  	return cm.getNodeAllocatableAbsoluteImpl(cm.internalCapacity)
   225  }
   226  
   227  // GetNodeAllocatableReservation returns amount of compute or storage resource that have to be reserved on this node from scheduling.
   228  func (cm *containerManagerImpl) GetNodeAllocatableReservation() v1.ResourceList {
   229  	evictionReservation := hardEvictionReservation(cm.HardEvictionThresholds, cm.capacity)
   230  	result := make(v1.ResourceList)
   231  	for k := range cm.capacity {
   232  		value := resource.NewQuantity(0, resource.DecimalSI)
   233  		if cm.NodeConfig.SystemReserved != nil {
   234  			value.Add(cm.NodeConfig.SystemReserved[k])
   235  		}
   236  		if cm.NodeConfig.KubeReserved != nil {
   237  			value.Add(cm.NodeConfig.KubeReserved[k])
   238  		}
   239  		if evictionReservation != nil {
   240  			value.Add(evictionReservation[k])
   241  		}
   242  		if !value.IsZero() {
   243  			result[k] = *value
   244  		}
   245  	}
   246  	return result
   247  }
   248  
   249  // validateNodeAllocatable ensures that the user specified Node Allocatable Configuration doesn't reserve more than the node capacity.
   250  // Returns error if the configuration is invalid, nil otherwise.
   251  func (cm *containerManagerImpl) validateNodeAllocatable() error {
   252  	var errors []string
   253  	nar := cm.GetNodeAllocatableReservation()
   254  	for k, v := range nar {
   255  		value := cm.capacity[k].DeepCopy()
   256  		value.Sub(v)
   257  
   258  		if value.Sign() < 0 {
   259  			errors = append(errors, fmt.Sprintf("Resource %q has a reservation of %v but capacity of %v. Expected capacity >= reservation.", k, v, cm.capacity[k]))
   260  		}
   261  	}
   262  
   263  	if len(errors) > 0 {
   264  		return fmt.Errorf("invalid Node Allocatable configuration. %s", strings.Join(errors, " "))
   265  	}
   266  	return nil
   267  }
   268  

View as plain text