1
16
17 package cpumanager
18
19 import (
20 "context"
21 "fmt"
22 "math"
23 "sync"
24 "time"
25
26 cadvisorapi "github.com/google/cadvisor/info/v1"
27 v1 "k8s.io/api/core/v1"
28 "k8s.io/apimachinery/pkg/util/wait"
29 runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
30 "k8s.io/klog/v2"
31
32 "k8s.io/kubernetes/pkg/kubelet/cm/containermap"
33 "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
34 "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
35 "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
36 "k8s.io/kubernetes/pkg/kubelet/config"
37 kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
38 "k8s.io/kubernetes/pkg/kubelet/status"
39 "k8s.io/utils/cpuset"
40 )
41
42
43 type ActivePodsFunc func() []*v1.Pod
44
45 type runtimeService interface {
46 UpdateContainerResources(ctx context.Context, id string, resources *runtimeapi.ContainerResources) error
47 }
48
49 type policyName string
50
51
52 const cpuManagerStateFileName = "cpu_manager_state"
53
54
55 type Manager interface {
56
57 Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService, initialContainers containermap.ContainerMap) error
58
59
60
61
62 Allocate(pod *v1.Pod, container *v1.Container) error
63
64
65
66 AddContainer(p *v1.Pod, c *v1.Container, containerID string)
67
68
69
70
71 RemoveContainer(containerID string) error
72
73
74 State() state.Reader
75
76
77
78
79 GetTopologyHints(*v1.Pod, *v1.Container) map[string][]topologymanager.TopologyHint
80
81
82
83 GetExclusiveCPUs(podUID, containerName string) cpuset.CPUSet
84
85
86
87
88 GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.TopologyHint
89
90
91 GetAllocatableCPUs() cpuset.CPUSet
92
93
94
95 GetCPUAffinity(podUID, containerName string) cpuset.CPUSet
96 }
97
98 type manager struct {
99 sync.Mutex
100 policy Policy
101
102
103 reconcilePeriod time.Duration
104
105
106
107 state state.State
108
109
110 lastUpdateState state.State
111
112
113
114 containerRuntime runtimeService
115
116
117
118 activePods ActivePodsFunc
119
120
121
122 podStatusProvider status.PodStatusProvider
123
124
125
126 containerMap containermap.ContainerMap
127
128 topology *topology.CPUTopology
129
130 nodeAllocatableReservation v1.ResourceList
131
132
133
134 sourcesReady config.SourcesReady
135
136
137 stateFileDirectory string
138
139
140 allocatableCPUs cpuset.CPUSet
141
142
143 pendingAdmissionPod *v1.Pod
144 }
145
146 var _ Manager = &manager{}
147
148 type sourcesReadyStub struct{}
149
150 func (s *sourcesReadyStub) AddSource(source string) {}
151 func (s *sourcesReadyStub) AllReady() bool { return true }
152
153
154 func NewManager(cpuPolicyName string, cpuPolicyOptions map[string]string, reconcilePeriod time.Duration, machineInfo *cadvisorapi.MachineInfo, specificCPUs cpuset.CPUSet, nodeAllocatableReservation v1.ResourceList, stateFileDirectory string, affinity topologymanager.Store) (Manager, error) {
155 var topo *topology.CPUTopology
156 var policy Policy
157 var err error
158
159 switch policyName(cpuPolicyName) {
160
161 case PolicyNone:
162 policy, err = NewNonePolicy(cpuPolicyOptions)
163 if err != nil {
164 return nil, fmt.Errorf("new none policy error: %w", err)
165 }
166
167 case PolicyStatic:
168 topo, err = topology.Discover(machineInfo)
169 if err != nil {
170 return nil, err
171 }
172 klog.InfoS("Detected CPU topology", "topology", topo)
173
174 reservedCPUs, ok := nodeAllocatableReservation[v1.ResourceCPU]
175 if !ok {
176
177 return nil, fmt.Errorf("[cpumanager] unable to determine reserved CPU resources for static policy")
178 }
179 if reservedCPUs.IsZero() {
180
181
182
183
184
185 return nil, fmt.Errorf("[cpumanager] the static policy requires systemreserved.cpu + kubereserved.cpu to be greater than zero")
186 }
187
188
189
190 reservedCPUsFloat := float64(reservedCPUs.MilliValue()) / 1000
191 numReservedCPUs := int(math.Ceil(reservedCPUsFloat))
192 policy, err = NewStaticPolicy(topo, numReservedCPUs, specificCPUs, affinity, cpuPolicyOptions)
193 if err != nil {
194 return nil, fmt.Errorf("new static policy error: %w", err)
195 }
196
197 default:
198 return nil, fmt.Errorf("unknown policy: \"%s\"", cpuPolicyName)
199 }
200
201 manager := &manager{
202 policy: policy,
203 reconcilePeriod: reconcilePeriod,
204 lastUpdateState: state.NewMemoryState(),
205 topology: topo,
206 nodeAllocatableReservation: nodeAllocatableReservation,
207 stateFileDirectory: stateFileDirectory,
208 }
209 manager.sourcesReady = &sourcesReadyStub{}
210 return manager, nil
211 }
212
213 func (m *manager) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService, initialContainers containermap.ContainerMap) error {
214 klog.InfoS("Starting CPU manager", "policy", m.policy.Name())
215 klog.InfoS("Reconciling", "reconcilePeriod", m.reconcilePeriod)
216 m.sourcesReady = sourcesReady
217 m.activePods = activePods
218 m.podStatusProvider = podStatusProvider
219 m.containerRuntime = containerRuntime
220 m.containerMap = initialContainers
221
222 stateImpl, err := state.NewCheckpointState(m.stateFileDirectory, cpuManagerStateFileName, m.policy.Name(), m.containerMap)
223 if err != nil {
224 klog.ErrorS(err, "Could not initialize checkpoint manager, please drain node and remove policy state file")
225 return err
226 }
227 m.state = stateImpl
228
229 err = m.policy.Start(m.state)
230 if err != nil {
231 klog.ErrorS(err, "Policy start error")
232 return err
233 }
234
235 m.allocatableCPUs = m.policy.GetAllocatableCPUs(m.state)
236
237 if m.policy.Name() == string(PolicyNone) {
238 return nil
239 }
240
241
242 go wait.Until(func() { m.reconcileState() }, m.reconcilePeriod, wait.NeverStop)
243 return nil
244 }
245
246 func (m *manager) Allocate(p *v1.Pod, c *v1.Container) error {
247
248
249 m.setPodPendingAdmission(p)
250
251
252 m.removeStaleState()
253
254 m.Lock()
255 defer m.Unlock()
256
257
258 err := m.policy.Allocate(m.state, p, c)
259 if err != nil {
260 klog.ErrorS(err, "Allocate error")
261 return err
262 }
263
264 return nil
265 }
266
267 func (m *manager) AddContainer(pod *v1.Pod, container *v1.Container, containerID string) {
268 m.Lock()
269 defer m.Unlock()
270 if cset, exists := m.state.GetCPUSet(string(pod.UID), container.Name); exists {
271 m.lastUpdateState.SetCPUSet(string(pod.UID), container.Name, cset)
272 }
273 m.containerMap.Add(string(pod.UID), container.Name, containerID)
274 }
275
276 func (m *manager) RemoveContainer(containerID string) error {
277 m.Lock()
278 defer m.Unlock()
279
280 err := m.policyRemoveContainerByID(containerID)
281 if err != nil {
282 klog.ErrorS(err, "RemoveContainer error")
283 return err
284 }
285
286 return nil
287 }
288
289 func (m *manager) policyRemoveContainerByID(containerID string) error {
290 podUID, containerName, err := m.containerMap.GetContainerRef(containerID)
291 if err != nil {
292 return nil
293 }
294
295 err = m.policy.RemoveContainer(m.state, podUID, containerName)
296 if err == nil {
297 m.lastUpdateState.Delete(podUID, containerName)
298 m.containerMap.RemoveByContainerID(containerID)
299 }
300
301 return err
302 }
303
304 func (m *manager) policyRemoveContainerByRef(podUID string, containerName string) error {
305 err := m.policy.RemoveContainer(m.state, podUID, containerName)
306 if err == nil {
307 m.lastUpdateState.Delete(podUID, containerName)
308 m.containerMap.RemoveByContainerRef(podUID, containerName)
309 }
310
311 return err
312 }
313
314 func (m *manager) State() state.Reader {
315 return m.state
316 }
317
318 func (m *manager) GetTopologyHints(pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {
319
320
321 m.setPodPendingAdmission(pod)
322
323 m.removeStaleState()
324
325 return m.policy.GetTopologyHints(m.state, pod, container)
326 }
327
328 func (m *manager) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.TopologyHint {
329
330
331 m.setPodPendingAdmission(pod)
332
333 m.removeStaleState()
334
335 return m.policy.GetPodTopologyHints(m.state, pod)
336 }
337
338 func (m *manager) GetAllocatableCPUs() cpuset.CPUSet {
339 return m.allocatableCPUs.Clone()
340 }
341
342 type reconciledContainer struct {
343 podName string
344 containerName string
345 containerID string
346 }
347
348 func (m *manager) removeStaleState() {
349
350
351
352 if !m.sourcesReady.AllReady() {
353 return
354 }
355
356
357
358
359
360 m.Lock()
361 defer m.Unlock()
362
363
364 activeAndAdmittedPods := m.activePods()
365 if m.pendingAdmissionPod != nil {
366 activeAndAdmittedPods = append(activeAndAdmittedPods, m.pendingAdmissionPod)
367 }
368
369
370 activeContainers := make(map[string]map[string]struct{})
371 for _, pod := range activeAndAdmittedPods {
372 activeContainers[string(pod.UID)] = make(map[string]struct{})
373 for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
374 activeContainers[string(pod.UID)][container.Name] = struct{}{}
375 }
376 }
377
378
379
380 assignments := m.state.GetCPUAssignments()
381 for podUID := range assignments {
382 for containerName := range assignments[podUID] {
383 if _, ok := activeContainers[podUID][containerName]; !ok {
384 klog.ErrorS(nil, "RemoveStaleState: removing container", "podUID", podUID, "containerName", containerName)
385 err := m.policyRemoveContainerByRef(podUID, containerName)
386 if err != nil {
387 klog.ErrorS(err, "RemoveStaleState: failed to remove container", "podUID", podUID, "containerName", containerName)
388 }
389 }
390 }
391 }
392
393 m.containerMap.Visit(func(podUID, containerName, containerID string) {
394 if _, ok := activeContainers[podUID][containerName]; !ok {
395 klog.ErrorS(nil, "RemoveStaleState: removing container", "podUID", podUID, "containerName", containerName)
396 err := m.policyRemoveContainerByRef(podUID, containerName)
397 if err != nil {
398 klog.ErrorS(err, "RemoveStaleState: failed to remove container", "podUID", podUID, "containerName", containerName)
399 }
400 }
401 })
402 }
403
404 func (m *manager) reconcileState() (success []reconciledContainer, failure []reconciledContainer) {
405 ctx := context.Background()
406 success = []reconciledContainer{}
407 failure = []reconciledContainer{}
408
409 m.removeStaleState()
410 for _, pod := range m.activePods() {
411 pstatus, ok := m.podStatusProvider.GetPodStatus(pod.UID)
412 if !ok {
413 klog.V(4).InfoS("ReconcileState: skipping pod; status not found", "pod", klog.KObj(pod))
414 failure = append(failure, reconciledContainer{pod.Name, "", ""})
415 continue
416 }
417
418 allContainers := pod.Spec.InitContainers
419 allContainers = append(allContainers, pod.Spec.Containers...)
420 for _, container := range allContainers {
421 containerID, err := findContainerIDByName(&pstatus, container.Name)
422 if err != nil {
423 klog.V(4).InfoS("ReconcileState: skipping container; ID not found in pod status", "pod", klog.KObj(pod), "containerName", container.Name, "err", err)
424 failure = append(failure, reconciledContainer{pod.Name, container.Name, ""})
425 continue
426 }
427
428 cstatus, err := findContainerStatusByName(&pstatus, container.Name)
429 if err != nil {
430 klog.V(4).InfoS("ReconcileState: skipping container; container status not found in pod status", "pod", klog.KObj(pod), "containerName", container.Name, "err", err)
431 failure = append(failure, reconciledContainer{pod.Name, container.Name, ""})
432 continue
433 }
434
435 if cstatus.State.Waiting != nil ||
436 (cstatus.State.Waiting == nil && cstatus.State.Running == nil && cstatus.State.Terminated == nil) {
437 klog.V(4).InfoS("ReconcileState: skipping container; container still in the waiting state", "pod", klog.KObj(pod), "containerName", container.Name, "err", err)
438 failure = append(failure, reconciledContainer{pod.Name, container.Name, ""})
439 continue
440 }
441
442 m.Lock()
443 if cstatus.State.Terminated != nil {
444
445
446
447
448
449 _, _, err := m.containerMap.GetContainerRef(containerID)
450 if err == nil {
451 klog.V(4).InfoS("ReconcileState: ignoring terminated container", "pod", klog.KObj(pod), "containerID", containerID)
452 }
453 m.Unlock()
454 continue
455 }
456
457
458
459
460 m.containerMap.Add(string(pod.UID), container.Name, containerID)
461 m.Unlock()
462
463 cset := m.state.GetCPUSetOrDefault(string(pod.UID), container.Name)
464 if cset.IsEmpty() {
465
466 klog.V(4).InfoS("ReconcileState: skipping container; assigned cpuset is empty", "pod", klog.KObj(pod), "containerName", container.Name)
467 failure = append(failure, reconciledContainer{pod.Name, container.Name, containerID})
468 continue
469 }
470
471 lcset := m.lastUpdateState.GetCPUSetOrDefault(string(pod.UID), container.Name)
472 if !cset.Equals(lcset) {
473 klog.V(4).InfoS("ReconcileState: updating container", "pod", klog.KObj(pod), "containerName", container.Name, "containerID", containerID, "cpuSet", cset)
474 err = m.updateContainerCPUSet(ctx, containerID, cset)
475 if err != nil {
476 klog.ErrorS(err, "ReconcileState: failed to update container", "pod", klog.KObj(pod), "containerName", container.Name, "containerID", containerID, "cpuSet", cset)
477 failure = append(failure, reconciledContainer{pod.Name, container.Name, containerID})
478 continue
479 }
480 m.lastUpdateState.SetCPUSet(string(pod.UID), container.Name, cset)
481 }
482 success = append(success, reconciledContainer{pod.Name, container.Name, containerID})
483 }
484 }
485 return success, failure
486 }
487
488 func findContainerIDByName(status *v1.PodStatus, name string) (string, error) {
489 allStatuses := status.InitContainerStatuses
490 allStatuses = append(allStatuses, status.ContainerStatuses...)
491 for _, container := range allStatuses {
492 if container.Name == name && container.ContainerID != "" {
493 cid := &kubecontainer.ContainerID{}
494 err := cid.ParseString(container.ContainerID)
495 if err != nil {
496 return "", err
497 }
498 return cid.ID, nil
499 }
500 }
501 return "", fmt.Errorf("unable to find ID for container with name %v in pod status (it may not be running)", name)
502 }
503
504 func findContainerStatusByName(status *v1.PodStatus, name string) (*v1.ContainerStatus, error) {
505 for _, containerStatus := range append(status.InitContainerStatuses, status.ContainerStatuses...) {
506 if containerStatus.Name == name {
507 return &containerStatus, nil
508 }
509 }
510 return nil, fmt.Errorf("unable to find status for container with name %v in pod status (it may not be running)", name)
511 }
512
513 func (m *manager) updateContainerCPUSet(ctx context.Context, containerID string, cpus cpuset.CPUSet) error {
514
515
516
517
518 return m.containerRuntime.UpdateContainerResources(
519 ctx,
520 containerID,
521 &runtimeapi.ContainerResources{
522 Linux: &runtimeapi.LinuxContainerResources{
523 CpusetCpus: cpus.String(),
524 },
525 })
526 }
527
528 func (m *manager) GetExclusiveCPUs(podUID, containerName string) cpuset.CPUSet {
529 if result, ok := m.state.GetCPUSet(podUID, containerName); ok {
530 return result
531 }
532
533 return cpuset.CPUSet{}
534 }
535
536 func (m *manager) GetCPUAffinity(podUID, containerName string) cpuset.CPUSet {
537 return m.state.GetCPUSetOrDefault(podUID, containerName)
538 }
539
540 func (m *manager) setPodPendingAdmission(pod *v1.Pod) {
541 m.Lock()
542 defer m.Unlock()
543
544 m.pendingAdmissionPod = pod
545 }
546
View as plain text