...

Source file src/k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/policy_static.go

Documentation: k8s.io/kubernetes/pkg/kubelet/cm/cpumanager

     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package cpumanager
    18  
    19  import (
    20  	"fmt"
    21  
    22  	v1 "k8s.io/api/core/v1"
    23  	utilfeature "k8s.io/apiserver/pkg/util/feature"
    24  	"k8s.io/klog/v2"
    25  	podutil "k8s.io/kubernetes/pkg/api/v1/pod"
    26  	v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos"
    27  	"k8s.io/kubernetes/pkg/features"
    28  	"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
    29  	"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
    30  	"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
    31  	"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask"
    32  	"k8s.io/kubernetes/pkg/kubelet/metrics"
    33  	"k8s.io/kubernetes/pkg/kubelet/types"
    34  	"k8s.io/utils/cpuset"
    35  )
    36  
    37  const (
    38  
    39  	// PolicyStatic is the name of the static policy.
    40  	// Should options be given, these will be ignored and backward (up to 1.21 included)
    41  	// compatible behaviour will be enforced
    42  	PolicyStatic policyName = "static"
    43  	// ErrorSMTAlignment represents the type of an SMTAlignmentError
    44  	ErrorSMTAlignment = "SMTAlignmentError"
    45  )
    46  
    47  // SMTAlignmentError represents an error due to SMT alignment
    48  type SMTAlignmentError struct {
    49  	RequestedCPUs         int
    50  	CpusPerCore           int
    51  	AvailablePhysicalCPUs int
    52  }
    53  
    54  func (e SMTAlignmentError) Error() string {
    55  	if e.AvailablePhysicalCPUs > 0 {
    56  		return fmt.Sprintf("SMT Alignment Error: not enough free physical CPUs: available physical CPUs = %d, requested CPUs = %d, CPUs per core = %d", e.AvailablePhysicalCPUs, e.RequestedCPUs, e.CpusPerCore)
    57  	}
    58  	return fmt.Sprintf("SMT Alignment Error: requested %d cpus not multiple cpus per core = %d", e.RequestedCPUs, e.CpusPerCore)
    59  }
    60  
    61  // Type returns human-readable type of this error. Used in the admission control to populate Admission Failure reason.
    62  func (e SMTAlignmentError) Type() string {
    63  	return ErrorSMTAlignment
    64  }
    65  
    66  // staticPolicy is a CPU manager policy that does not change CPU
    67  // assignments for exclusively pinned guaranteed containers after the main
    68  // container process starts.
    69  //
    70  // This policy allocates CPUs exclusively for a container if all the following
    71  // conditions are met:
    72  //
    73  // - The pod QoS class is Guaranteed.
    74  // - The CPU request is a positive integer.
    75  //
    76  // The static policy maintains the following sets of logical CPUs:
    77  //
    78  //   - SHARED: Burstable, BestEffort, and non-integral Guaranteed containers
    79  //     run here. Initially this contains all CPU IDs on the system. As
    80  //     exclusive allocations are created and destroyed, this CPU set shrinks
    81  //     and grows, accordingly. This is stored in the state as the default
    82  //     CPU set.
    83  //
    84  //   - RESERVED: A subset of the shared pool which is not exclusively
    85  //     allocatable. The membership of this pool is static for the lifetime of
    86  //     the Kubelet. The size of the reserved pool is
    87  //     ceil(systemreserved.cpu + kubereserved.cpu).
    88  //     Reserved CPUs are taken topologically starting with lowest-indexed
    89  //     physical core, as reported by cAdvisor.
    90  //
    91  //   - ASSIGNABLE: Equal to SHARED - RESERVED. Exclusive CPUs are allocated
    92  //     from this pool.
    93  //
    94  //   - EXCLUSIVE ALLOCATIONS: CPU sets assigned exclusively to one container.
    95  //     These are stored as explicit assignments in the state.
    96  //
    97  // When an exclusive allocation is made, the static policy also updates the
    98  // default cpuset in the state abstraction. The CPU manager's periodic
    99  // reconcile loop takes care of rewriting the cpuset in cgroupfs for any
   100  // containers that may be running in the shared pool. For this reason,
   101  // applications running within exclusively-allocated containers must tolerate
   102  // potentially sharing their allocated CPUs for up to the CPU manager
   103  // reconcile period.
   104  type staticPolicy struct {
   105  	// cpu socket topology
   106  	topology *topology.CPUTopology
   107  	// set of CPUs that is not available for exclusive assignment
   108  	reservedCPUs cpuset.CPUSet
   109  	// Superset of reservedCPUs. It includes not just the reservedCPUs themselves,
   110  	// but also any siblings of those reservedCPUs on the same physical die.
   111  	// NOTE: If the reserved set includes full physical CPUs from the beginning
   112  	// (e.g. only reserved pairs of core siblings) this set is expected to be
   113  	// identical to the reserved set.
   114  	reservedPhysicalCPUs cpuset.CPUSet
   115  	// topology manager reference to get container Topology affinity
   116  	affinity topologymanager.Store
   117  	// set of CPUs to reuse across allocations in a pod
   118  	cpusToReuse map[string]cpuset.CPUSet
   119  	// options allow to fine-tune the behaviour of the policy
   120  	options StaticPolicyOptions
   121  }
   122  
   123  // Ensure staticPolicy implements Policy interface
   124  var _ Policy = &staticPolicy{}
   125  
   126  // NewStaticPolicy returns a CPU manager policy that does not change CPU
   127  // assignments for exclusively pinned guaranteed containers after the main
   128  // container process starts.
   129  func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reservedCPUs cpuset.CPUSet, affinity topologymanager.Store, cpuPolicyOptions map[string]string) (Policy, error) {
   130  	opts, err := NewStaticPolicyOptions(cpuPolicyOptions)
   131  	if err != nil {
   132  		return nil, err
   133  	}
   134  	err = ValidateStaticPolicyOptions(opts, topology, affinity)
   135  	if err != nil {
   136  		return nil, err
   137  	}
   138  
   139  	klog.InfoS("Static policy created with configuration", "options", opts)
   140  
   141  	policy := &staticPolicy{
   142  		topology:    topology,
   143  		affinity:    affinity,
   144  		cpusToReuse: make(map[string]cpuset.CPUSet),
   145  		options:     opts,
   146  	}
   147  
   148  	allCPUs := topology.CPUDetails.CPUs()
   149  	var reserved cpuset.CPUSet
   150  	if reservedCPUs.Size() > 0 {
   151  		reserved = reservedCPUs
   152  	} else {
   153  		// takeByTopology allocates CPUs associated with low-numbered cores from
   154  		// allCPUs.
   155  		//
   156  		// For example: Given a system with 8 CPUs available and HT enabled,
   157  		// if numReservedCPUs=2, then reserved={0,4}
   158  		reserved, _ = policy.takeByTopology(allCPUs, numReservedCPUs)
   159  	}
   160  
   161  	if reserved.Size() != numReservedCPUs {
   162  		err := fmt.Errorf("[cpumanager] unable to reserve the required amount of CPUs (size of %s did not equal %d)", reserved, numReservedCPUs)
   163  		return nil, err
   164  	}
   165  
   166  	var reservedPhysicalCPUs cpuset.CPUSet
   167  	for _, cpu := range reserved.UnsortedList() {
   168  		core, err := topology.CPUCoreID(cpu)
   169  		if err != nil {
   170  			return nil, fmt.Errorf("[cpumanager] unable to build the reserved physical CPUs from the reserved set: %w", err)
   171  		}
   172  		reservedPhysicalCPUs = reservedPhysicalCPUs.Union(topology.CPUDetails.CPUsInCores(core))
   173  	}
   174  
   175  	klog.InfoS("Reserved CPUs not available for exclusive assignment", "reservedSize", reserved.Size(), "reserved", reserved, "reservedPhysicalCPUs", reservedPhysicalCPUs)
   176  	policy.reservedCPUs = reserved
   177  	policy.reservedPhysicalCPUs = reservedPhysicalCPUs
   178  
   179  	return policy, nil
   180  }
   181  
   182  func (p *staticPolicy) Name() string {
   183  	return string(PolicyStatic)
   184  }
   185  
   186  func (p *staticPolicy) Start(s state.State) error {
   187  	if err := p.validateState(s); err != nil {
   188  		klog.ErrorS(err, "Static policy invalid state, please drain node and remove policy state file")
   189  		return err
   190  	}
   191  	return nil
   192  }
   193  
   194  func (p *staticPolicy) validateState(s state.State) error {
   195  	tmpAssignments := s.GetCPUAssignments()
   196  	tmpDefaultCPUset := s.GetDefaultCPUSet()
   197  
   198  	// Default cpuset cannot be empty when assignments exist
   199  	if tmpDefaultCPUset.IsEmpty() {
   200  		if len(tmpAssignments) != 0 {
   201  			return fmt.Errorf("default cpuset cannot be empty")
   202  		}
   203  		// state is empty initialize
   204  		allCPUs := p.topology.CPUDetails.CPUs()
   205  		s.SetDefaultCPUSet(allCPUs)
   206  		return nil
   207  	}
   208  
   209  	// State has already been initialized from file (is not empty)
   210  	// 1. Check if the reserved cpuset is not part of default cpuset because:
   211  	// - kube/system reserved have changed (increased) - may lead to some containers not being able to start
   212  	// - user tampered with file
   213  	if !p.reservedCPUs.Intersection(tmpDefaultCPUset).Equals(p.reservedCPUs) {
   214  		return fmt.Errorf("not all reserved cpus: \"%s\" are present in defaultCpuSet: \"%s\"",
   215  			p.reservedCPUs.String(), tmpDefaultCPUset.String())
   216  	}
   217  
   218  	// 2. Check if state for static policy is consistent
   219  	for pod := range tmpAssignments {
   220  		for container, cset := range tmpAssignments[pod] {
   221  			// None of the cpu in DEFAULT cset should be in s.assignments
   222  			if !tmpDefaultCPUset.Intersection(cset).IsEmpty() {
   223  				return fmt.Errorf("pod: %s, container: %s cpuset: \"%s\" overlaps with default cpuset \"%s\"",
   224  					pod, container, cset.String(), tmpDefaultCPUset.String())
   225  			}
   226  		}
   227  	}
   228  
   229  	// 3. It's possible that the set of available CPUs has changed since
   230  	// the state was written. This can be due to for example
   231  	// offlining a CPU when kubelet is not running. If this happens,
   232  	// CPU manager will run into trouble when later it tries to
   233  	// assign non-existent CPUs to containers. Validate that the
   234  	// topology that was received during CPU manager startup matches with
   235  	// the set of CPUs stored in the state.
   236  	totalKnownCPUs := tmpDefaultCPUset.Clone()
   237  	tmpCPUSets := []cpuset.CPUSet{}
   238  	for pod := range tmpAssignments {
   239  		for _, cset := range tmpAssignments[pod] {
   240  			tmpCPUSets = append(tmpCPUSets, cset)
   241  		}
   242  	}
   243  	totalKnownCPUs = totalKnownCPUs.Union(tmpCPUSets...)
   244  	if !totalKnownCPUs.Equals(p.topology.CPUDetails.CPUs()) {
   245  		return fmt.Errorf("current set of available CPUs \"%s\" doesn't match with CPUs in state \"%s\"",
   246  			p.topology.CPUDetails.CPUs().String(), totalKnownCPUs.String())
   247  	}
   248  
   249  	return nil
   250  }
   251  
   252  // GetAllocatableCPUs returns the total set of CPUs available for allocation.
   253  func (p *staticPolicy) GetAllocatableCPUs(s state.State) cpuset.CPUSet {
   254  	return p.topology.CPUDetails.CPUs().Difference(p.reservedCPUs)
   255  }
   256  
   257  // GetAvailableCPUs returns the set of unassigned CPUs minus the reserved set.
   258  func (p *staticPolicy) GetAvailableCPUs(s state.State) cpuset.CPUSet {
   259  	return s.GetDefaultCPUSet().Difference(p.reservedCPUs)
   260  }
   261  
   262  func (p *staticPolicy) GetAvailablePhysicalCPUs(s state.State) cpuset.CPUSet {
   263  	return s.GetDefaultCPUSet().Difference(p.reservedPhysicalCPUs)
   264  }
   265  
   266  func (p *staticPolicy) updateCPUsToReuse(pod *v1.Pod, container *v1.Container, cset cpuset.CPUSet) {
   267  	// If pod entries to m.cpusToReuse other than the current pod exist, delete them.
   268  	for podUID := range p.cpusToReuse {
   269  		if podUID != string(pod.UID) {
   270  			delete(p.cpusToReuse, podUID)
   271  		}
   272  	}
   273  	// If no cpuset exists for cpusToReuse by this pod yet, create one.
   274  	if _, ok := p.cpusToReuse[string(pod.UID)]; !ok {
   275  		p.cpusToReuse[string(pod.UID)] = cpuset.New()
   276  	}
   277  	// Check if the container is an init container.
   278  	// If so, add its cpuset to the cpuset of reusable CPUs for any new allocations.
   279  	for _, initContainer := range pod.Spec.InitContainers {
   280  		if container.Name == initContainer.Name {
   281  			if types.IsRestartableInitContainer(&initContainer) {
   282  				// If the container is a restartable init container, we should not
   283  				// reuse its cpuset, as a restartable init container can run with
   284  				// regular containers.
   285  				break
   286  			}
   287  			p.cpusToReuse[string(pod.UID)] = p.cpusToReuse[string(pod.UID)].Union(cset)
   288  			return
   289  		}
   290  	}
   291  	// Otherwise it is an app container.
   292  	// Remove its cpuset from the cpuset of reusable CPUs for any new allocations.
   293  	p.cpusToReuse[string(pod.UID)] = p.cpusToReuse[string(pod.UID)].Difference(cset)
   294  }
   295  
   296  func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Container) (rerr error) {
   297  	numCPUs := p.guaranteedCPUs(pod, container)
   298  	if numCPUs == 0 {
   299  		// container belongs in the shared pool (nothing to do; use default cpuset)
   300  		return nil
   301  	}
   302  
   303  	klog.InfoS("Static policy: Allocate", "pod", klog.KObj(pod), "containerName", container.Name)
   304  	// container belongs in an exclusively allocated pool
   305  	metrics.CPUManagerPinningRequestsTotal.Inc()
   306  	defer func() {
   307  		if rerr != nil {
   308  			metrics.CPUManagerPinningErrorsTotal.Inc()
   309  		}
   310  	}()
   311  
   312  	if p.options.FullPhysicalCPUsOnly {
   313  		CPUsPerCore := p.topology.CPUsPerCore()
   314  		if (numCPUs % CPUsPerCore) != 0 {
   315  			// Since CPU Manager has been enabled requesting strict SMT alignment, it means a guaranteed pod can only be admitted
   316  			// if the CPU requested is a multiple of the number of virtual cpus per physical cores.
   317  			// In case CPU request is not a multiple of the number of virtual cpus per physical cores the Pod will be put
   318  			// in Failed state, with SMTAlignmentError as reason. Since the allocation happens in terms of physical cores
   319  			// and the scheduler is responsible for ensuring that the workload goes to a node that has enough CPUs,
   320  			// the pod would be placed on a node where there are enough physical cores available to be allocated.
   321  			// Just like the behaviour in case of static policy, takeByTopology will try to first allocate CPUs from the same socket
   322  			// and only in case the request cannot be sattisfied on a single socket, CPU allocation is done for a workload to occupy all
   323  			// CPUs on a physical core. Allocation of individual threads would never have to occur.
   324  			return SMTAlignmentError{
   325  				RequestedCPUs: numCPUs,
   326  				CpusPerCore:   CPUsPerCore,
   327  			}
   328  		}
   329  
   330  		availablePhysicalCPUs := p.GetAvailablePhysicalCPUs(s).Size()
   331  
   332  		// It's legal to reserve CPUs which are not core siblings. In this case the CPU allocator can descend to single cores
   333  		// when picking CPUs. This will void the guarantee of FullPhysicalCPUsOnly. To prevent this, we need to additionally consider
   334  		// all the core siblings of the reserved CPUs as unavailable when computing the free CPUs, before to start the actual allocation.
   335  		// This way, by construction all possible CPUs allocation whose number is multiple of the SMT level are now correct again.
   336  		if numCPUs > availablePhysicalCPUs {
   337  			return SMTAlignmentError{
   338  				RequestedCPUs:         numCPUs,
   339  				CpusPerCore:           CPUsPerCore,
   340  				AvailablePhysicalCPUs: availablePhysicalCPUs,
   341  			}
   342  		}
   343  	}
   344  	if cpuset, ok := s.GetCPUSet(string(pod.UID), container.Name); ok {
   345  		p.updateCPUsToReuse(pod, container, cpuset)
   346  		klog.InfoS("Static policy: container already present in state, skipping", "pod", klog.KObj(pod), "containerName", container.Name)
   347  		return nil
   348  	}
   349  
   350  	// Call Topology Manager to get the aligned socket affinity across all hint providers.
   351  	hint := p.affinity.GetAffinity(string(pod.UID), container.Name)
   352  	klog.InfoS("Topology Affinity", "pod", klog.KObj(pod), "containerName", container.Name, "affinity", hint)
   353  
   354  	// Allocate CPUs according to the NUMA affinity contained in the hint.
   355  	cpuset, err := p.allocateCPUs(s, numCPUs, hint.NUMANodeAffinity, p.cpusToReuse[string(pod.UID)])
   356  	if err != nil {
   357  		klog.ErrorS(err, "Unable to allocate CPUs", "pod", klog.KObj(pod), "containerName", container.Name, "numCPUs", numCPUs)
   358  		return err
   359  	}
   360  	s.SetCPUSet(string(pod.UID), container.Name, cpuset)
   361  	p.updateCPUsToReuse(pod, container, cpuset)
   362  
   363  	return nil
   364  }
   365  
   366  // getAssignedCPUsOfSiblings returns assigned cpus of given container's siblings(all containers other than the given container) in the given pod `podUID`.
   367  func getAssignedCPUsOfSiblings(s state.State, podUID string, containerName string) cpuset.CPUSet {
   368  	assignments := s.GetCPUAssignments()
   369  	cset := cpuset.New()
   370  	for name, cpus := range assignments[podUID] {
   371  		if containerName == name {
   372  			continue
   373  		}
   374  		cset = cset.Union(cpus)
   375  	}
   376  	return cset
   377  }
   378  
   379  func (p *staticPolicy) RemoveContainer(s state.State, podUID string, containerName string) error {
   380  	klog.InfoS("Static policy: RemoveContainer", "podUID", podUID, "containerName", containerName)
   381  	cpusInUse := getAssignedCPUsOfSiblings(s, podUID, containerName)
   382  	if toRelease, ok := s.GetCPUSet(podUID, containerName); ok {
   383  		s.Delete(podUID, containerName)
   384  		// Mutate the shared pool, adding released cpus.
   385  		toRelease = toRelease.Difference(cpusInUse)
   386  		s.SetDefaultCPUSet(s.GetDefaultCPUSet().Union(toRelease))
   387  	}
   388  	return nil
   389  }
   390  
   391  func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int, numaAffinity bitmask.BitMask, reusableCPUs cpuset.CPUSet) (cpuset.CPUSet, error) {
   392  	klog.InfoS("AllocateCPUs", "numCPUs", numCPUs, "socket", numaAffinity)
   393  
   394  	allocatableCPUs := p.GetAvailableCPUs(s).Union(reusableCPUs)
   395  
   396  	// If there are aligned CPUs in numaAffinity, attempt to take those first.
   397  	result := cpuset.New()
   398  	if numaAffinity != nil {
   399  		alignedCPUs := p.getAlignedCPUs(numaAffinity, allocatableCPUs)
   400  
   401  		numAlignedToAlloc := alignedCPUs.Size()
   402  		if numCPUs < numAlignedToAlloc {
   403  			numAlignedToAlloc = numCPUs
   404  		}
   405  
   406  		alignedCPUs, err := p.takeByTopology(alignedCPUs, numAlignedToAlloc)
   407  		if err != nil {
   408  			return cpuset.New(), err
   409  		}
   410  
   411  		result = result.Union(alignedCPUs)
   412  	}
   413  
   414  	// Get any remaining CPUs from what's leftover after attempting to grab aligned ones.
   415  	remainingCPUs, err := p.takeByTopology(allocatableCPUs.Difference(result), numCPUs-result.Size())
   416  	if err != nil {
   417  		return cpuset.New(), err
   418  	}
   419  	result = result.Union(remainingCPUs)
   420  
   421  	// Remove allocated CPUs from the shared CPUSet.
   422  	s.SetDefaultCPUSet(s.GetDefaultCPUSet().Difference(result))
   423  
   424  	klog.InfoS("AllocateCPUs", "result", result)
   425  	return result, nil
   426  }
   427  
   428  func (p *staticPolicy) guaranteedCPUs(pod *v1.Pod, container *v1.Container) int {
   429  	if v1qos.GetPodQOS(pod) != v1.PodQOSGuaranteed {
   430  		return 0
   431  	}
   432  	cpuQuantity := container.Resources.Requests[v1.ResourceCPU]
   433  	// In-place pod resize feature makes Container.Resources field mutable for CPU & memory.
   434  	// AllocatedResources holds the value of Container.Resources.Requests when the pod was admitted.
   435  	// We should return this value because this is what kubelet agreed to allocate for the container
   436  	// and the value configured with runtime.
   437  	if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
   438  		if cs, ok := podutil.GetContainerStatus(pod.Status.ContainerStatuses, container.Name); ok {
   439  			cpuQuantity = cs.AllocatedResources[v1.ResourceCPU]
   440  		}
   441  	}
   442  	if cpuQuantity.Value()*1000 != cpuQuantity.MilliValue() {
   443  		return 0
   444  	}
   445  	// Safe downcast to do for all systems with < 2.1 billion CPUs.
   446  	// Per the language spec, `int` is guaranteed to be at least 32 bits wide.
   447  	// https://golang.org/ref/spec#Numeric_types
   448  	return int(cpuQuantity.Value())
   449  }
   450  
   451  func (p *staticPolicy) podGuaranteedCPUs(pod *v1.Pod) int {
   452  	// The maximum of requested CPUs by init containers.
   453  	requestedByInitContainers := 0
   454  	requestedByRestartableInitContainers := 0
   455  	for _, container := range pod.Spec.InitContainers {
   456  		if _, ok := container.Resources.Requests[v1.ResourceCPU]; !ok {
   457  			continue
   458  		}
   459  		requestedCPU := p.guaranteedCPUs(pod, &container)
   460  		// See https://github.com/kubernetes/enhancements/tree/master/keps/sig-node/753-sidecar-containers#resources-calculation-for-scheduling-and-pod-admission
   461  		// for the detail.
   462  		if types.IsRestartableInitContainer(&container) {
   463  			requestedByRestartableInitContainers += requestedCPU
   464  		} else if requestedByRestartableInitContainers+requestedCPU > requestedByInitContainers {
   465  			requestedByInitContainers = requestedByRestartableInitContainers + requestedCPU
   466  		}
   467  	}
   468  
   469  	// The sum of requested CPUs by app containers.
   470  	requestedByAppContainers := 0
   471  	for _, container := range pod.Spec.Containers {
   472  		if _, ok := container.Resources.Requests[v1.ResourceCPU]; !ok {
   473  			continue
   474  		}
   475  		requestedByAppContainers += p.guaranteedCPUs(pod, &container)
   476  	}
   477  
   478  	requestedByLongRunningContainers := requestedByAppContainers + requestedByRestartableInitContainers
   479  	if requestedByInitContainers > requestedByLongRunningContainers {
   480  		return requestedByInitContainers
   481  	}
   482  	return requestedByLongRunningContainers
   483  }
   484  
   485  func (p *staticPolicy) takeByTopology(availableCPUs cpuset.CPUSet, numCPUs int) (cpuset.CPUSet, error) {
   486  	if p.options.DistributeCPUsAcrossNUMA {
   487  		cpuGroupSize := 1
   488  		if p.options.FullPhysicalCPUsOnly {
   489  			cpuGroupSize = p.topology.CPUsPerCore()
   490  		}
   491  		return takeByTopologyNUMADistributed(p.topology, availableCPUs, numCPUs, cpuGroupSize)
   492  	}
   493  	return takeByTopologyNUMAPacked(p.topology, availableCPUs, numCPUs)
   494  }
   495  
   496  func (p *staticPolicy) GetTopologyHints(s state.State, pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {
   497  	// Get a count of how many guaranteed CPUs have been requested.
   498  	requested := p.guaranteedCPUs(pod, container)
   499  
   500  	// Number of required CPUs is not an integer or a container is not part of the Guaranteed QoS class.
   501  	// It will be treated by the TopologyManager as having no preference and cause it to ignore this
   502  	// resource when considering pod alignment.
   503  	// In terms of hints, this is equal to: TopologyHints[NUMANodeAffinity: nil, Preferred: true].
   504  	if requested == 0 {
   505  		return nil
   506  	}
   507  
   508  	// Short circuit to regenerate the same hints if there are already
   509  	// guaranteed CPUs allocated to the Container. This might happen after a
   510  	// kubelet restart, for example.
   511  	if allocated, exists := s.GetCPUSet(string(pod.UID), container.Name); exists {
   512  		if allocated.Size() != requested {
   513  			klog.ErrorS(nil, "CPUs already allocated to container with different number than request", "pod", klog.KObj(pod), "containerName", container.Name, "requestedSize", requested, "allocatedSize", allocated.Size())
   514  			// An empty list of hints will be treated as a preference that cannot be satisfied.
   515  			// In definition of hints this is equal to: TopologyHint[NUMANodeAffinity: nil, Preferred: false].
   516  			// For all but the best-effort policy, the Topology Manager will throw a pod-admission error.
   517  			return map[string][]topologymanager.TopologyHint{
   518  				string(v1.ResourceCPU): {},
   519  			}
   520  		}
   521  		klog.InfoS("Regenerating TopologyHints for CPUs already allocated", "pod", klog.KObj(pod), "containerName", container.Name)
   522  		return map[string][]topologymanager.TopologyHint{
   523  			string(v1.ResourceCPU): p.generateCPUTopologyHints(allocated, cpuset.CPUSet{}, requested),
   524  		}
   525  	}
   526  
   527  	// Get a list of available CPUs.
   528  	available := p.GetAvailableCPUs(s)
   529  
   530  	// Get a list of reusable CPUs (e.g. CPUs reused from initContainers).
   531  	// It should be an empty CPUSet for a newly created pod.
   532  	reusable := p.cpusToReuse[string(pod.UID)]
   533  
   534  	// Generate hints.
   535  	cpuHints := p.generateCPUTopologyHints(available, reusable, requested)
   536  	klog.InfoS("TopologyHints generated", "pod", klog.KObj(pod), "containerName", container.Name, "cpuHints", cpuHints)
   537  
   538  	return map[string][]topologymanager.TopologyHint{
   539  		string(v1.ResourceCPU): cpuHints,
   540  	}
   541  }
   542  
   543  func (p *staticPolicy) GetPodTopologyHints(s state.State, pod *v1.Pod) map[string][]topologymanager.TopologyHint {
   544  	// Get a count of how many guaranteed CPUs have been requested by Pod.
   545  	requested := p.podGuaranteedCPUs(pod)
   546  
   547  	// Number of required CPUs is not an integer or a pod is not part of the Guaranteed QoS class.
   548  	// It will be treated by the TopologyManager as having no preference and cause it to ignore this
   549  	// resource when considering pod alignment.
   550  	// In terms of hints, this is equal to: TopologyHints[NUMANodeAffinity: nil, Preferred: true].
   551  	if requested == 0 {
   552  		return nil
   553  	}
   554  
   555  	assignedCPUs := cpuset.New()
   556  	for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
   557  		requestedByContainer := p.guaranteedCPUs(pod, &container)
   558  		// Short circuit to regenerate the same hints if there are already
   559  		// guaranteed CPUs allocated to the Container. This might happen after a
   560  		// kubelet restart, for example.
   561  		if allocated, exists := s.GetCPUSet(string(pod.UID), container.Name); exists {
   562  			if allocated.Size() != requestedByContainer {
   563  				klog.ErrorS(nil, "CPUs already allocated to container with different number than request", "pod", klog.KObj(pod), "containerName", container.Name, "allocatedSize", requested, "requestedByContainer", requestedByContainer, "allocatedSize", allocated.Size())
   564  				// An empty list of hints will be treated as a preference that cannot be satisfied.
   565  				// In definition of hints this is equal to: TopologyHint[NUMANodeAffinity: nil, Preferred: false].
   566  				// For all but the best-effort policy, the Topology Manager will throw a pod-admission error.
   567  				return map[string][]topologymanager.TopologyHint{
   568  					string(v1.ResourceCPU): {},
   569  				}
   570  			}
   571  			// A set of CPUs already assigned to containers in this pod
   572  			assignedCPUs = assignedCPUs.Union(allocated)
   573  		}
   574  	}
   575  	if assignedCPUs.Size() == requested {
   576  		klog.InfoS("Regenerating TopologyHints for CPUs already allocated", "pod", klog.KObj(pod))
   577  		return map[string][]topologymanager.TopologyHint{
   578  			string(v1.ResourceCPU): p.generateCPUTopologyHints(assignedCPUs, cpuset.CPUSet{}, requested),
   579  		}
   580  	}
   581  
   582  	// Get a list of available CPUs.
   583  	available := p.GetAvailableCPUs(s)
   584  
   585  	// Get a list of reusable CPUs (e.g. CPUs reused from initContainers).
   586  	// It should be an empty CPUSet for a newly created pod.
   587  	reusable := p.cpusToReuse[string(pod.UID)]
   588  
   589  	// Ensure any CPUs already assigned to containers in this pod are included as part of the hint generation.
   590  	reusable = reusable.Union(assignedCPUs)
   591  
   592  	// Generate hints.
   593  	cpuHints := p.generateCPUTopologyHints(available, reusable, requested)
   594  	klog.InfoS("TopologyHints generated", "pod", klog.KObj(pod), "cpuHints", cpuHints)
   595  
   596  	return map[string][]topologymanager.TopologyHint{
   597  		string(v1.ResourceCPU): cpuHints,
   598  	}
   599  }
   600  
   601  // generateCPUTopologyHints generates a set of TopologyHints given the set of
   602  // available CPUs and the number of CPUs being requested.
   603  //
   604  // It follows the convention of marking all hints that have the same number of
   605  // bits set as the narrowest matching NUMANodeAffinity with 'Preferred: true', and
   606  // marking all others with 'Preferred: false'.
   607  func (p *staticPolicy) generateCPUTopologyHints(availableCPUs cpuset.CPUSet, reusableCPUs cpuset.CPUSet, request int) []topologymanager.TopologyHint {
   608  	// Initialize minAffinitySize to include all NUMA Nodes.
   609  	minAffinitySize := p.topology.CPUDetails.NUMANodes().Size()
   610  
   611  	// Iterate through all combinations of numa nodes bitmask and build hints from them.
   612  	hints := []topologymanager.TopologyHint{}
   613  	bitmask.IterateBitMasks(p.topology.CPUDetails.NUMANodes().List(), func(mask bitmask.BitMask) {
   614  		// First, update minAffinitySize for the current request size.
   615  		cpusInMask := p.topology.CPUDetails.CPUsInNUMANodes(mask.GetBits()...).Size()
   616  		if cpusInMask >= request && mask.Count() < minAffinitySize {
   617  			minAffinitySize = mask.Count()
   618  		}
   619  
   620  		// Then check to see if we have enough CPUs available on the current
   621  		// numa node bitmask to satisfy the CPU request.
   622  		numMatching := 0
   623  		for _, c := range reusableCPUs.List() {
   624  			// Disregard this mask if its NUMANode isn't part of it.
   625  			if !mask.IsSet(p.topology.CPUDetails[c].NUMANodeID) {
   626  				return
   627  			}
   628  			numMatching++
   629  		}
   630  
   631  		// Finally, check to see if enough available CPUs remain on the current
   632  		// NUMA node combination to satisfy the CPU request.
   633  		for _, c := range availableCPUs.List() {
   634  			if mask.IsSet(p.topology.CPUDetails[c].NUMANodeID) {
   635  				numMatching++
   636  			}
   637  		}
   638  
   639  		// If they don't, then move onto the next combination.
   640  		if numMatching < request {
   641  			return
   642  		}
   643  
   644  		// Otherwise, create a new hint from the numa node bitmask and add it to the
   645  		// list of hints.  We set all hint preferences to 'false' on the first
   646  		// pass through.
   647  		hints = append(hints, topologymanager.TopologyHint{
   648  			NUMANodeAffinity: mask,
   649  			Preferred:        false,
   650  		})
   651  	})
   652  
   653  	// Loop back through all hints and update the 'Preferred' field based on
   654  	// counting the number of bits sets in the affinity mask and comparing it
   655  	// to the minAffinitySize. Only those with an equal number of bits set (and
   656  	// with a minimal set of numa nodes) will be considered preferred.
   657  	for i := range hints {
   658  		if p.options.AlignBySocket && p.isHintSocketAligned(hints[i], minAffinitySize) {
   659  			hints[i].Preferred = true
   660  			continue
   661  		}
   662  		if hints[i].NUMANodeAffinity.Count() == minAffinitySize {
   663  			hints[i].Preferred = true
   664  		}
   665  	}
   666  
   667  	return hints
   668  }
   669  
   670  // isHintSocketAligned function return true if numa nodes in hint are socket aligned.
   671  func (p *staticPolicy) isHintSocketAligned(hint topologymanager.TopologyHint, minAffinitySize int) bool {
   672  	numaNodesBitMask := hint.NUMANodeAffinity.GetBits()
   673  	numaNodesPerSocket := p.topology.NumNUMANodes / p.topology.NumSockets
   674  	if numaNodesPerSocket == 0 {
   675  		return false
   676  	}
   677  	// minSockets refers to minimum number of socket required to satify allocation.
   678  	// A hint is considered socket aligned if sockets across which numa nodes span is equal to minSockets
   679  	minSockets := (minAffinitySize + numaNodesPerSocket - 1) / numaNodesPerSocket
   680  	return p.topology.CPUDetails.SocketsInNUMANodes(numaNodesBitMask...).Size() == minSockets
   681  }
   682  
   683  // getAlignedCPUs return set of aligned CPUs based on numa affinity mask and configured policy options.
   684  func (p *staticPolicy) getAlignedCPUs(numaAffinity bitmask.BitMask, allocatableCPUs cpuset.CPUSet) cpuset.CPUSet {
   685  	alignedCPUs := cpuset.New()
   686  	numaBits := numaAffinity.GetBits()
   687  
   688  	// If align-by-socket policy option is enabled, NUMA based hint is expanded to
   689  	// socket aligned hint. It will ensure that first socket aligned available CPUs are
   690  	// allocated before we try to find CPUs across socket to satisfy allocation request.
   691  	if p.options.AlignBySocket {
   692  		socketBits := p.topology.CPUDetails.SocketsInNUMANodes(numaBits...).UnsortedList()
   693  		for _, socketID := range socketBits {
   694  			alignedCPUs = alignedCPUs.Union(allocatableCPUs.Intersection(p.topology.CPUDetails.CPUsInSockets(socketID)))
   695  		}
   696  		return alignedCPUs
   697  	}
   698  
   699  	for _, numaNodeID := range numaBits {
   700  		alignedCPUs = alignedCPUs.Union(allocatableCPUs.Intersection(p.topology.CPUDetails.CPUsInNUMANodes(numaNodeID)))
   701  	}
   702  
   703  	return alignedCPUs
   704  }
   705  

View as plain text