1
16
17 package cpumanager
18
19 import (
20 "fmt"
21
22 v1 "k8s.io/api/core/v1"
23 utilfeature "k8s.io/apiserver/pkg/util/feature"
24 "k8s.io/klog/v2"
25 podutil "k8s.io/kubernetes/pkg/api/v1/pod"
26 v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos"
27 "k8s.io/kubernetes/pkg/features"
28 "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
29 "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
30 "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
31 "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask"
32 "k8s.io/kubernetes/pkg/kubelet/metrics"
33 "k8s.io/kubernetes/pkg/kubelet/types"
34 "k8s.io/utils/cpuset"
35 )
36
37 const (
38
39
40
41
42 PolicyStatic policyName = "static"
43
44 ErrorSMTAlignment = "SMTAlignmentError"
45 )
46
47
48 type SMTAlignmentError struct {
49 RequestedCPUs int
50 CpusPerCore int
51 AvailablePhysicalCPUs int
52 }
53
54 func (e SMTAlignmentError) Error() string {
55 if e.AvailablePhysicalCPUs > 0 {
56 return fmt.Sprintf("SMT Alignment Error: not enough free physical CPUs: available physical CPUs = %d, requested CPUs = %d, CPUs per core = %d", e.AvailablePhysicalCPUs, e.RequestedCPUs, e.CpusPerCore)
57 }
58 return fmt.Sprintf("SMT Alignment Error: requested %d cpus not multiple cpus per core = %d", e.RequestedCPUs, e.CpusPerCore)
59 }
60
61
62 func (e SMTAlignmentError) Type() string {
63 return ErrorSMTAlignment
64 }
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104 type staticPolicy struct {
105
106 topology *topology.CPUTopology
107
108 reservedCPUs cpuset.CPUSet
109
110
111
112
113
114 reservedPhysicalCPUs cpuset.CPUSet
115
116 affinity topologymanager.Store
117
118 cpusToReuse map[string]cpuset.CPUSet
119
120 options StaticPolicyOptions
121 }
122
123
124 var _ Policy = &staticPolicy{}
125
126
127
128
129 func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reservedCPUs cpuset.CPUSet, affinity topologymanager.Store, cpuPolicyOptions map[string]string) (Policy, error) {
130 opts, err := NewStaticPolicyOptions(cpuPolicyOptions)
131 if err != nil {
132 return nil, err
133 }
134 err = ValidateStaticPolicyOptions(opts, topology, affinity)
135 if err != nil {
136 return nil, err
137 }
138
139 klog.InfoS("Static policy created with configuration", "options", opts)
140
141 policy := &staticPolicy{
142 topology: topology,
143 affinity: affinity,
144 cpusToReuse: make(map[string]cpuset.CPUSet),
145 options: opts,
146 }
147
148 allCPUs := topology.CPUDetails.CPUs()
149 var reserved cpuset.CPUSet
150 if reservedCPUs.Size() > 0 {
151 reserved = reservedCPUs
152 } else {
153
154
155
156
157
158 reserved, _ = policy.takeByTopology(allCPUs, numReservedCPUs)
159 }
160
161 if reserved.Size() != numReservedCPUs {
162 err := fmt.Errorf("[cpumanager] unable to reserve the required amount of CPUs (size of %s did not equal %d)", reserved, numReservedCPUs)
163 return nil, err
164 }
165
166 var reservedPhysicalCPUs cpuset.CPUSet
167 for _, cpu := range reserved.UnsortedList() {
168 core, err := topology.CPUCoreID(cpu)
169 if err != nil {
170 return nil, fmt.Errorf("[cpumanager] unable to build the reserved physical CPUs from the reserved set: %w", err)
171 }
172 reservedPhysicalCPUs = reservedPhysicalCPUs.Union(topology.CPUDetails.CPUsInCores(core))
173 }
174
175 klog.InfoS("Reserved CPUs not available for exclusive assignment", "reservedSize", reserved.Size(), "reserved", reserved, "reservedPhysicalCPUs", reservedPhysicalCPUs)
176 policy.reservedCPUs = reserved
177 policy.reservedPhysicalCPUs = reservedPhysicalCPUs
178
179 return policy, nil
180 }
181
182 func (p *staticPolicy) Name() string {
183 return string(PolicyStatic)
184 }
185
186 func (p *staticPolicy) Start(s state.State) error {
187 if err := p.validateState(s); err != nil {
188 klog.ErrorS(err, "Static policy invalid state, please drain node and remove policy state file")
189 return err
190 }
191 return nil
192 }
193
194 func (p *staticPolicy) validateState(s state.State) error {
195 tmpAssignments := s.GetCPUAssignments()
196 tmpDefaultCPUset := s.GetDefaultCPUSet()
197
198
199 if tmpDefaultCPUset.IsEmpty() {
200 if len(tmpAssignments) != 0 {
201 return fmt.Errorf("default cpuset cannot be empty")
202 }
203
204 allCPUs := p.topology.CPUDetails.CPUs()
205 s.SetDefaultCPUSet(allCPUs)
206 return nil
207 }
208
209
210
211
212
213 if !p.reservedCPUs.Intersection(tmpDefaultCPUset).Equals(p.reservedCPUs) {
214 return fmt.Errorf("not all reserved cpus: \"%s\" are present in defaultCpuSet: \"%s\"",
215 p.reservedCPUs.String(), tmpDefaultCPUset.String())
216 }
217
218
219 for pod := range tmpAssignments {
220 for container, cset := range tmpAssignments[pod] {
221
222 if !tmpDefaultCPUset.Intersection(cset).IsEmpty() {
223 return fmt.Errorf("pod: %s, container: %s cpuset: \"%s\" overlaps with default cpuset \"%s\"",
224 pod, container, cset.String(), tmpDefaultCPUset.String())
225 }
226 }
227 }
228
229
230
231
232
233
234
235
236 totalKnownCPUs := tmpDefaultCPUset.Clone()
237 tmpCPUSets := []cpuset.CPUSet{}
238 for pod := range tmpAssignments {
239 for _, cset := range tmpAssignments[pod] {
240 tmpCPUSets = append(tmpCPUSets, cset)
241 }
242 }
243 totalKnownCPUs = totalKnownCPUs.Union(tmpCPUSets...)
244 if !totalKnownCPUs.Equals(p.topology.CPUDetails.CPUs()) {
245 return fmt.Errorf("current set of available CPUs \"%s\" doesn't match with CPUs in state \"%s\"",
246 p.topology.CPUDetails.CPUs().String(), totalKnownCPUs.String())
247 }
248
249 return nil
250 }
251
252
253 func (p *staticPolicy) GetAllocatableCPUs(s state.State) cpuset.CPUSet {
254 return p.topology.CPUDetails.CPUs().Difference(p.reservedCPUs)
255 }
256
257
258 func (p *staticPolicy) GetAvailableCPUs(s state.State) cpuset.CPUSet {
259 return s.GetDefaultCPUSet().Difference(p.reservedCPUs)
260 }
261
262 func (p *staticPolicy) GetAvailablePhysicalCPUs(s state.State) cpuset.CPUSet {
263 return s.GetDefaultCPUSet().Difference(p.reservedPhysicalCPUs)
264 }
265
266 func (p *staticPolicy) updateCPUsToReuse(pod *v1.Pod, container *v1.Container, cset cpuset.CPUSet) {
267
268 for podUID := range p.cpusToReuse {
269 if podUID != string(pod.UID) {
270 delete(p.cpusToReuse, podUID)
271 }
272 }
273
274 if _, ok := p.cpusToReuse[string(pod.UID)]; !ok {
275 p.cpusToReuse[string(pod.UID)] = cpuset.New()
276 }
277
278
279 for _, initContainer := range pod.Spec.InitContainers {
280 if container.Name == initContainer.Name {
281 if types.IsRestartableInitContainer(&initContainer) {
282
283
284
285 break
286 }
287 p.cpusToReuse[string(pod.UID)] = p.cpusToReuse[string(pod.UID)].Union(cset)
288 return
289 }
290 }
291
292
293 p.cpusToReuse[string(pod.UID)] = p.cpusToReuse[string(pod.UID)].Difference(cset)
294 }
295
296 func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Container) (rerr error) {
297 numCPUs := p.guaranteedCPUs(pod, container)
298 if numCPUs == 0 {
299
300 return nil
301 }
302
303 klog.InfoS("Static policy: Allocate", "pod", klog.KObj(pod), "containerName", container.Name)
304
305 metrics.CPUManagerPinningRequestsTotal.Inc()
306 defer func() {
307 if rerr != nil {
308 metrics.CPUManagerPinningErrorsTotal.Inc()
309 }
310 }()
311
312 if p.options.FullPhysicalCPUsOnly {
313 CPUsPerCore := p.topology.CPUsPerCore()
314 if (numCPUs % CPUsPerCore) != 0 {
315
316
317
318
319
320
321
322
323
324 return SMTAlignmentError{
325 RequestedCPUs: numCPUs,
326 CpusPerCore: CPUsPerCore,
327 }
328 }
329
330 availablePhysicalCPUs := p.GetAvailablePhysicalCPUs(s).Size()
331
332
333
334
335
336 if numCPUs > availablePhysicalCPUs {
337 return SMTAlignmentError{
338 RequestedCPUs: numCPUs,
339 CpusPerCore: CPUsPerCore,
340 AvailablePhysicalCPUs: availablePhysicalCPUs,
341 }
342 }
343 }
344 if cpuset, ok := s.GetCPUSet(string(pod.UID), container.Name); ok {
345 p.updateCPUsToReuse(pod, container, cpuset)
346 klog.InfoS("Static policy: container already present in state, skipping", "pod", klog.KObj(pod), "containerName", container.Name)
347 return nil
348 }
349
350
351 hint := p.affinity.GetAffinity(string(pod.UID), container.Name)
352 klog.InfoS("Topology Affinity", "pod", klog.KObj(pod), "containerName", container.Name, "affinity", hint)
353
354
355 cpuset, err := p.allocateCPUs(s, numCPUs, hint.NUMANodeAffinity, p.cpusToReuse[string(pod.UID)])
356 if err != nil {
357 klog.ErrorS(err, "Unable to allocate CPUs", "pod", klog.KObj(pod), "containerName", container.Name, "numCPUs", numCPUs)
358 return err
359 }
360 s.SetCPUSet(string(pod.UID), container.Name, cpuset)
361 p.updateCPUsToReuse(pod, container, cpuset)
362
363 return nil
364 }
365
366
367 func getAssignedCPUsOfSiblings(s state.State, podUID string, containerName string) cpuset.CPUSet {
368 assignments := s.GetCPUAssignments()
369 cset := cpuset.New()
370 for name, cpus := range assignments[podUID] {
371 if containerName == name {
372 continue
373 }
374 cset = cset.Union(cpus)
375 }
376 return cset
377 }
378
379 func (p *staticPolicy) RemoveContainer(s state.State, podUID string, containerName string) error {
380 klog.InfoS("Static policy: RemoveContainer", "podUID", podUID, "containerName", containerName)
381 cpusInUse := getAssignedCPUsOfSiblings(s, podUID, containerName)
382 if toRelease, ok := s.GetCPUSet(podUID, containerName); ok {
383 s.Delete(podUID, containerName)
384
385 toRelease = toRelease.Difference(cpusInUse)
386 s.SetDefaultCPUSet(s.GetDefaultCPUSet().Union(toRelease))
387 }
388 return nil
389 }
390
391 func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int, numaAffinity bitmask.BitMask, reusableCPUs cpuset.CPUSet) (cpuset.CPUSet, error) {
392 klog.InfoS("AllocateCPUs", "numCPUs", numCPUs, "socket", numaAffinity)
393
394 allocatableCPUs := p.GetAvailableCPUs(s).Union(reusableCPUs)
395
396
397 result := cpuset.New()
398 if numaAffinity != nil {
399 alignedCPUs := p.getAlignedCPUs(numaAffinity, allocatableCPUs)
400
401 numAlignedToAlloc := alignedCPUs.Size()
402 if numCPUs < numAlignedToAlloc {
403 numAlignedToAlloc = numCPUs
404 }
405
406 alignedCPUs, err := p.takeByTopology(alignedCPUs, numAlignedToAlloc)
407 if err != nil {
408 return cpuset.New(), err
409 }
410
411 result = result.Union(alignedCPUs)
412 }
413
414
415 remainingCPUs, err := p.takeByTopology(allocatableCPUs.Difference(result), numCPUs-result.Size())
416 if err != nil {
417 return cpuset.New(), err
418 }
419 result = result.Union(remainingCPUs)
420
421
422 s.SetDefaultCPUSet(s.GetDefaultCPUSet().Difference(result))
423
424 klog.InfoS("AllocateCPUs", "result", result)
425 return result, nil
426 }
427
428 func (p *staticPolicy) guaranteedCPUs(pod *v1.Pod, container *v1.Container) int {
429 if v1qos.GetPodQOS(pod) != v1.PodQOSGuaranteed {
430 return 0
431 }
432 cpuQuantity := container.Resources.Requests[v1.ResourceCPU]
433
434
435
436
437 if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
438 if cs, ok := podutil.GetContainerStatus(pod.Status.ContainerStatuses, container.Name); ok {
439 cpuQuantity = cs.AllocatedResources[v1.ResourceCPU]
440 }
441 }
442 if cpuQuantity.Value()*1000 != cpuQuantity.MilliValue() {
443 return 0
444 }
445
446
447
448 return int(cpuQuantity.Value())
449 }
450
451 func (p *staticPolicy) podGuaranteedCPUs(pod *v1.Pod) int {
452
453 requestedByInitContainers := 0
454 requestedByRestartableInitContainers := 0
455 for _, container := range pod.Spec.InitContainers {
456 if _, ok := container.Resources.Requests[v1.ResourceCPU]; !ok {
457 continue
458 }
459 requestedCPU := p.guaranteedCPUs(pod, &container)
460
461
462 if types.IsRestartableInitContainer(&container) {
463 requestedByRestartableInitContainers += requestedCPU
464 } else if requestedByRestartableInitContainers+requestedCPU > requestedByInitContainers {
465 requestedByInitContainers = requestedByRestartableInitContainers + requestedCPU
466 }
467 }
468
469
470 requestedByAppContainers := 0
471 for _, container := range pod.Spec.Containers {
472 if _, ok := container.Resources.Requests[v1.ResourceCPU]; !ok {
473 continue
474 }
475 requestedByAppContainers += p.guaranteedCPUs(pod, &container)
476 }
477
478 requestedByLongRunningContainers := requestedByAppContainers + requestedByRestartableInitContainers
479 if requestedByInitContainers > requestedByLongRunningContainers {
480 return requestedByInitContainers
481 }
482 return requestedByLongRunningContainers
483 }
484
485 func (p *staticPolicy) takeByTopology(availableCPUs cpuset.CPUSet, numCPUs int) (cpuset.CPUSet, error) {
486 if p.options.DistributeCPUsAcrossNUMA {
487 cpuGroupSize := 1
488 if p.options.FullPhysicalCPUsOnly {
489 cpuGroupSize = p.topology.CPUsPerCore()
490 }
491 return takeByTopologyNUMADistributed(p.topology, availableCPUs, numCPUs, cpuGroupSize)
492 }
493 return takeByTopologyNUMAPacked(p.topology, availableCPUs, numCPUs)
494 }
495
496 func (p *staticPolicy) GetTopologyHints(s state.State, pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {
497
498 requested := p.guaranteedCPUs(pod, container)
499
500
501
502
503
504 if requested == 0 {
505 return nil
506 }
507
508
509
510
511 if allocated, exists := s.GetCPUSet(string(pod.UID), container.Name); exists {
512 if allocated.Size() != requested {
513 klog.ErrorS(nil, "CPUs already allocated to container with different number than request", "pod", klog.KObj(pod), "containerName", container.Name, "requestedSize", requested, "allocatedSize", allocated.Size())
514
515
516
517 return map[string][]topologymanager.TopologyHint{
518 string(v1.ResourceCPU): {},
519 }
520 }
521 klog.InfoS("Regenerating TopologyHints for CPUs already allocated", "pod", klog.KObj(pod), "containerName", container.Name)
522 return map[string][]topologymanager.TopologyHint{
523 string(v1.ResourceCPU): p.generateCPUTopologyHints(allocated, cpuset.CPUSet{}, requested),
524 }
525 }
526
527
528 available := p.GetAvailableCPUs(s)
529
530
531
532 reusable := p.cpusToReuse[string(pod.UID)]
533
534
535 cpuHints := p.generateCPUTopologyHints(available, reusable, requested)
536 klog.InfoS("TopologyHints generated", "pod", klog.KObj(pod), "containerName", container.Name, "cpuHints", cpuHints)
537
538 return map[string][]topologymanager.TopologyHint{
539 string(v1.ResourceCPU): cpuHints,
540 }
541 }
542
543 func (p *staticPolicy) GetPodTopologyHints(s state.State, pod *v1.Pod) map[string][]topologymanager.TopologyHint {
544
545 requested := p.podGuaranteedCPUs(pod)
546
547
548
549
550
551 if requested == 0 {
552 return nil
553 }
554
555 assignedCPUs := cpuset.New()
556 for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
557 requestedByContainer := p.guaranteedCPUs(pod, &container)
558
559
560
561 if allocated, exists := s.GetCPUSet(string(pod.UID), container.Name); exists {
562 if allocated.Size() != requestedByContainer {
563 klog.ErrorS(nil, "CPUs already allocated to container with different number than request", "pod", klog.KObj(pod), "containerName", container.Name, "allocatedSize", requested, "requestedByContainer", requestedByContainer, "allocatedSize", allocated.Size())
564
565
566
567 return map[string][]topologymanager.TopologyHint{
568 string(v1.ResourceCPU): {},
569 }
570 }
571
572 assignedCPUs = assignedCPUs.Union(allocated)
573 }
574 }
575 if assignedCPUs.Size() == requested {
576 klog.InfoS("Regenerating TopologyHints for CPUs already allocated", "pod", klog.KObj(pod))
577 return map[string][]topologymanager.TopologyHint{
578 string(v1.ResourceCPU): p.generateCPUTopologyHints(assignedCPUs, cpuset.CPUSet{}, requested),
579 }
580 }
581
582
583 available := p.GetAvailableCPUs(s)
584
585
586
587 reusable := p.cpusToReuse[string(pod.UID)]
588
589
590 reusable = reusable.Union(assignedCPUs)
591
592
593 cpuHints := p.generateCPUTopologyHints(available, reusable, requested)
594 klog.InfoS("TopologyHints generated", "pod", klog.KObj(pod), "cpuHints", cpuHints)
595
596 return map[string][]topologymanager.TopologyHint{
597 string(v1.ResourceCPU): cpuHints,
598 }
599 }
600
601
602
603
604
605
606
607 func (p *staticPolicy) generateCPUTopologyHints(availableCPUs cpuset.CPUSet, reusableCPUs cpuset.CPUSet, request int) []topologymanager.TopologyHint {
608
609 minAffinitySize := p.topology.CPUDetails.NUMANodes().Size()
610
611
612 hints := []topologymanager.TopologyHint{}
613 bitmask.IterateBitMasks(p.topology.CPUDetails.NUMANodes().List(), func(mask bitmask.BitMask) {
614
615 cpusInMask := p.topology.CPUDetails.CPUsInNUMANodes(mask.GetBits()...).Size()
616 if cpusInMask >= request && mask.Count() < minAffinitySize {
617 minAffinitySize = mask.Count()
618 }
619
620
621
622 numMatching := 0
623 for _, c := range reusableCPUs.List() {
624
625 if !mask.IsSet(p.topology.CPUDetails[c].NUMANodeID) {
626 return
627 }
628 numMatching++
629 }
630
631
632
633 for _, c := range availableCPUs.List() {
634 if mask.IsSet(p.topology.CPUDetails[c].NUMANodeID) {
635 numMatching++
636 }
637 }
638
639
640 if numMatching < request {
641 return
642 }
643
644
645
646
647 hints = append(hints, topologymanager.TopologyHint{
648 NUMANodeAffinity: mask,
649 Preferred: false,
650 })
651 })
652
653
654
655
656
657 for i := range hints {
658 if p.options.AlignBySocket && p.isHintSocketAligned(hints[i], minAffinitySize) {
659 hints[i].Preferred = true
660 continue
661 }
662 if hints[i].NUMANodeAffinity.Count() == minAffinitySize {
663 hints[i].Preferred = true
664 }
665 }
666
667 return hints
668 }
669
670
671 func (p *staticPolicy) isHintSocketAligned(hint topologymanager.TopologyHint, minAffinitySize int) bool {
672 numaNodesBitMask := hint.NUMANodeAffinity.GetBits()
673 numaNodesPerSocket := p.topology.NumNUMANodes / p.topology.NumSockets
674 if numaNodesPerSocket == 0 {
675 return false
676 }
677
678
679 minSockets := (minAffinitySize + numaNodesPerSocket - 1) / numaNodesPerSocket
680 return p.topology.CPUDetails.SocketsInNUMANodes(numaNodesBitMask...).Size() == minSockets
681 }
682
683
684 func (p *staticPolicy) getAlignedCPUs(numaAffinity bitmask.BitMask, allocatableCPUs cpuset.CPUSet) cpuset.CPUSet {
685 alignedCPUs := cpuset.New()
686 numaBits := numaAffinity.GetBits()
687
688
689
690
691 if p.options.AlignBySocket {
692 socketBits := p.topology.CPUDetails.SocketsInNUMANodes(numaBits...).UnsortedList()
693 for _, socketID := range socketBits {
694 alignedCPUs = alignedCPUs.Union(allocatableCPUs.Intersection(p.topology.CPUDetails.CPUsInSockets(socketID)))
695 }
696 return alignedCPUs
697 }
698
699 for _, numaNodeID := range numaBits {
700 alignedCPUs = alignedCPUs.Union(allocatableCPUs.Intersection(p.topology.CPUDetails.CPUsInNUMANodes(numaNodeID)))
701 }
702
703 return alignedCPUs
704 }
705
View as plain text