1
16
17 package memorymanager
18
19 import (
20 "context"
21 "fmt"
22 "sync"
23
24 cadvisorapi "github.com/google/cadvisor/info/v1"
25
26 v1 "k8s.io/api/core/v1"
27 "k8s.io/apimachinery/pkg/api/resource"
28 "k8s.io/apimachinery/pkg/util/sets"
29 runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
30 "k8s.io/klog/v2"
31 corev1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
32 kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
33 "k8s.io/kubernetes/pkg/kubelet/cm/containermap"
34 "k8s.io/kubernetes/pkg/kubelet/cm/memorymanager/state"
35 "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
36 "k8s.io/kubernetes/pkg/kubelet/config"
37 "k8s.io/kubernetes/pkg/kubelet/status"
38 "k8s.io/kubernetes/pkg/kubelet/types"
39 )
40
41
42 const memoryManagerStateFileName = "memory_manager_state"
43
44
45 type ActivePodsFunc func() []*v1.Pod
46
47 type runtimeService interface {
48 UpdateContainerResources(ctx context.Context, id string, resources *runtimeapi.ContainerResources) error
49 }
50
51 type sourcesReadyStub struct{}
52
53 func (s *sourcesReadyStub) AddSource(source string) {}
54 func (s *sourcesReadyStub) AllReady() bool { return true }
55
56
57 type Manager interface {
58
59 Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService, initialContainers containermap.ContainerMap) error
60
61
62
63 AddContainer(p *v1.Pod, c *v1.Container, containerID string)
64
65
66
67 Allocate(pod *v1.Pod, container *v1.Container) error
68
69
70
71 RemoveContainer(containerID string) error
72
73
74 State() state.Reader
75
76
77
78
79 GetTopologyHints(*v1.Pod, *v1.Container) map[string][]topologymanager.TopologyHint
80
81
82
83
84 GetPodTopologyHints(*v1.Pod) map[string][]topologymanager.TopologyHint
85
86
87 GetMemoryNUMANodes(pod *v1.Pod, container *v1.Container) sets.Set[int]
88
89
90 GetAllocatableMemory() []state.Block
91
92
93 GetMemory(podUID, containerName string) []state.Block
94 }
95
96 type manager struct {
97 sync.Mutex
98 policy Policy
99
100
101
102 state state.State
103
104
105
106 containerRuntime runtimeService
107
108
109
110 activePods ActivePodsFunc
111
112
113
114 podStatusProvider status.PodStatusProvider
115
116
117
118 containerMap containermap.ContainerMap
119
120
121
122 sourcesReady config.SourcesReady
123
124
125 stateFileDirectory string
126
127
128 allocatableMemory []state.Block
129
130
131 pendingAdmissionPod *v1.Pod
132 }
133
134 var _ Manager = &manager{}
135
136
137 func NewManager(policyName string, machineInfo *cadvisorapi.MachineInfo, nodeAllocatableReservation v1.ResourceList, reservedMemory []kubeletconfig.MemoryReservation, stateFileDirectory string, affinity topologymanager.Store) (Manager, error) {
138 var policy Policy
139
140 switch policyType(policyName) {
141
142 case policyTypeNone:
143 policy = NewPolicyNone()
144
145 case policyTypeStatic:
146 systemReserved, err := getSystemReservedMemory(machineInfo, nodeAllocatableReservation, reservedMemory)
147 if err != nil {
148 return nil, err
149 }
150
151 policy, err = NewPolicyStatic(machineInfo, systemReserved, affinity)
152 if err != nil {
153 return nil, err
154 }
155
156 default:
157 return nil, fmt.Errorf("unknown policy: \"%s\"", policyName)
158 }
159
160 manager := &manager{
161 policy: policy,
162 stateFileDirectory: stateFileDirectory,
163 }
164 manager.sourcesReady = &sourcesReadyStub{}
165 return manager, nil
166 }
167
168
169 func (m *manager) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService, initialContainers containermap.ContainerMap) error {
170 klog.InfoS("Starting memorymanager", "policy", m.policy.Name())
171 m.sourcesReady = sourcesReady
172 m.activePods = activePods
173 m.podStatusProvider = podStatusProvider
174 m.containerRuntime = containerRuntime
175 m.containerMap = initialContainers
176
177 stateImpl, err := state.NewCheckpointState(m.stateFileDirectory, memoryManagerStateFileName, m.policy.Name())
178 if err != nil {
179 klog.ErrorS(err, "Could not initialize checkpoint manager, please drain node and remove policy state file")
180 return err
181 }
182 m.state = stateImpl
183
184 err = m.policy.Start(m.state)
185 if err != nil {
186 klog.ErrorS(err, "Policy start error")
187 return err
188 }
189
190 m.allocatableMemory = m.policy.GetAllocatableMemory(m.state)
191
192 return nil
193 }
194
195
196 func (m *manager) AddContainer(pod *v1.Pod, container *v1.Container, containerID string) {
197 m.Lock()
198 defer m.Unlock()
199
200 m.containerMap.Add(string(pod.UID), container.Name, containerID)
201
202
203
204
205
206
207 for _, initContainer := range pod.Spec.InitContainers {
208 if initContainer.Name == container.Name {
209 break
210 }
211
212
213
214
215 if types.IsRestartableInitContainer(&initContainer) {
216 continue
217 }
218
219 m.policyRemoveContainerByRef(string(pod.UID), initContainer.Name)
220 }
221 }
222
223
224 func (m *manager) GetMemoryNUMANodes(pod *v1.Pod, container *v1.Container) sets.Set[int] {
225
226 numaNodes := sets.New[int]()
227 for _, block := range m.state.GetMemoryBlocks(string(pod.UID), container.Name) {
228 for _, nodeID := range block.NUMAAffinity {
229
230 numaNodes.Insert(nodeID)
231 }
232 }
233
234 if numaNodes.Len() == 0 {
235 klog.V(5).InfoS("No allocation is available", "pod", klog.KObj(pod), "containerName", container.Name)
236 return nil
237 }
238
239 klog.InfoS("Memory affinity", "pod", klog.KObj(pod), "containerName", container.Name, "numaNodes", numaNodes)
240 return numaNodes
241 }
242
243
244 func (m *manager) Allocate(pod *v1.Pod, container *v1.Container) error {
245
246
247 m.setPodPendingAdmission(pod)
248
249
250 m.removeStaleState()
251
252 m.Lock()
253 defer m.Unlock()
254
255
256 if err := m.policy.Allocate(m.state, pod, container); err != nil {
257 klog.ErrorS(err, "Allocate error")
258 return err
259 }
260 return nil
261 }
262
263
264 func (m *manager) RemoveContainer(containerID string) error {
265 m.Lock()
266 defer m.Unlock()
267
268
269 podUID, containerName, err := m.containerMap.GetContainerRef(containerID)
270 if err != nil {
271 klog.InfoS("Failed to get container from container map", "containerID", containerID, "err", err)
272 return nil
273 }
274
275 m.policyRemoveContainerByRef(podUID, containerName)
276
277 return nil
278 }
279
280
281 func (m *manager) State() state.Reader {
282 return m.state
283 }
284
285
286 func (m *manager) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.TopologyHint {
287
288
289 m.setPodPendingAdmission(pod)
290
291
292 m.removeStaleState()
293
294 return m.policy.GetPodTopologyHints(m.state, pod)
295 }
296
297
298 func (m *manager) GetTopologyHints(pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {
299
300
301 m.setPodPendingAdmission(pod)
302
303
304 m.removeStaleState()
305
306 return m.policy.GetTopologyHints(m.state, pod, container)
307 }
308
309
310 func (m *manager) removeStaleState() {
311
312
313
314 if !m.sourcesReady.AllReady() {
315 return
316 }
317
318
319
320
321
322 m.Lock()
323 defer m.Unlock()
324
325
326 activeAndAdmittedPods := m.activePods()
327 if m.pendingAdmissionPod != nil {
328 activeAndAdmittedPods = append(activeAndAdmittedPods, m.pendingAdmissionPod)
329 }
330
331
332 activeContainers := make(map[string]map[string]struct{})
333 for _, pod := range activeAndAdmittedPods {
334 activeContainers[string(pod.UID)] = make(map[string]struct{})
335 for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
336 activeContainers[string(pod.UID)][container.Name] = struct{}{}
337 }
338 }
339
340
341
342 assignments := m.state.GetMemoryAssignments()
343 for podUID := range assignments {
344 for containerName := range assignments[podUID] {
345 if _, ok := activeContainers[podUID][containerName]; !ok {
346 klog.InfoS("RemoveStaleState removing state", "podUID", podUID, "containerName", containerName)
347 m.policyRemoveContainerByRef(podUID, containerName)
348 }
349 }
350 }
351
352 m.containerMap.Visit(func(podUID, containerName, containerID string) {
353 if _, ok := activeContainers[podUID][containerName]; !ok {
354 klog.InfoS("RemoveStaleState removing state", "podUID", podUID, "containerName", containerName)
355 m.policyRemoveContainerByRef(podUID, containerName)
356 }
357 })
358 }
359
360 func (m *manager) policyRemoveContainerByRef(podUID string, containerName string) {
361 m.policy.RemoveContainer(m.state, podUID, containerName)
362 m.containerMap.RemoveByContainerRef(podUID, containerName)
363 }
364
365 func getTotalMemoryTypeReserved(machineInfo *cadvisorapi.MachineInfo, reservedMemory []kubeletconfig.MemoryReservation) (map[v1.ResourceName]resource.Quantity, error) {
366 totalMemoryType := map[v1.ResourceName]resource.Quantity{}
367
368 numaNodes := map[int]bool{}
369 for _, numaNode := range machineInfo.Topology {
370 numaNodes[numaNode.Id] = true
371 }
372
373 for _, reservation := range reservedMemory {
374 if !numaNodes[int(reservation.NumaNode)] {
375 return nil, fmt.Errorf("the reserved memory configuration references a NUMA node %d that does not exist on this machine", reservation.NumaNode)
376 }
377
378 for resourceName, q := range reservation.Limits {
379 if value, ok := totalMemoryType[resourceName]; ok {
380 q.Add(value)
381 }
382 totalMemoryType[resourceName] = q
383 }
384 }
385
386 return totalMemoryType, nil
387 }
388
389 func validateReservedMemory(machineInfo *cadvisorapi.MachineInfo, nodeAllocatableReservation v1.ResourceList, reservedMemory []kubeletconfig.MemoryReservation) error {
390 totalMemoryType, err := getTotalMemoryTypeReserved(machineInfo, reservedMemory)
391 if err != nil {
392 return err
393 }
394
395 commonMemoryTypeSet := make(map[v1.ResourceName]bool)
396 for resourceType := range totalMemoryType {
397 commonMemoryTypeSet[resourceType] = true
398 }
399
400 for resourceType := range nodeAllocatableReservation {
401 if !(corev1helper.IsHugePageResourceName(resourceType) || resourceType == v1.ResourceMemory) {
402 continue
403 }
404 commonMemoryTypeSet[resourceType] = true
405 }
406
407 for resourceType := range commonMemoryTypeSet {
408 nodeAllocatableMemory := resource.NewQuantity(0, resource.DecimalSI)
409 if memValue, set := nodeAllocatableReservation[resourceType]; set {
410 nodeAllocatableMemory.Add(memValue)
411 }
412
413 reservedMemory := resource.NewQuantity(0, resource.DecimalSI)
414 if memValue, set := totalMemoryType[resourceType]; set {
415 reservedMemory.Add(memValue)
416 }
417
418 if !(*nodeAllocatableMemory).Equal(*reservedMemory) {
419 return fmt.Errorf("the total amount %q of type %q is not equal to the value %q determined by Node Allocatable feature", reservedMemory.String(), resourceType, nodeAllocatableMemory.String())
420 }
421 }
422
423 return nil
424 }
425
426 func convertReserved(machineInfo *cadvisorapi.MachineInfo, reservedMemory []kubeletconfig.MemoryReservation) (systemReservedMemory, error) {
427 reservedMemoryConverted := make(map[int]map[v1.ResourceName]uint64)
428 for _, node := range machineInfo.Topology {
429 reservedMemoryConverted[node.Id] = make(map[v1.ResourceName]uint64)
430 }
431
432 for _, reservation := range reservedMemory {
433 for resourceName, q := range reservation.Limits {
434 val, success := q.AsInt64()
435 if !success {
436 return nil, fmt.Errorf("could not covert a variable of type Quantity to int64")
437 }
438 reservedMemoryConverted[int(reservation.NumaNode)][resourceName] = uint64(val)
439 }
440 }
441
442 return reservedMemoryConverted, nil
443 }
444
445 func getSystemReservedMemory(machineInfo *cadvisorapi.MachineInfo, nodeAllocatableReservation v1.ResourceList, reservedMemory []kubeletconfig.MemoryReservation) (systemReservedMemory, error) {
446 if err := validateReservedMemory(machineInfo, nodeAllocatableReservation, reservedMemory); err != nil {
447 return nil, err
448 }
449
450 reservedMemoryConverted, err := convertReserved(machineInfo, reservedMemory)
451 if err != nil {
452 return nil, err
453 }
454
455 return reservedMemoryConverted, nil
456 }
457
458
459 func (m *manager) GetAllocatableMemory() []state.Block {
460 return m.allocatableMemory
461 }
462
463
464 func (m *manager) GetMemory(podUID, containerName string) []state.Block {
465 return m.state.GetMemoryBlocks(podUID, containerName)
466 }
467
468 func (m *manager) setPodPendingAdmission(pod *v1.Pod) {
469 m.Lock()
470 defer m.Unlock()
471
472 m.pendingAdmissionPod = pod
473 }
474
View as plain text