...

Source file src/k8s.io/kubernetes/pkg/kubelet/cm/memorymanager/memory_manager.go

Documentation: k8s.io/kubernetes/pkg/kubelet/cm/memorymanager

     1  /*
     2  Copyright 2020 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package memorymanager
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"sync"
    23  
    24  	cadvisorapi "github.com/google/cadvisor/info/v1"
    25  
    26  	v1 "k8s.io/api/core/v1"
    27  	"k8s.io/apimachinery/pkg/api/resource"
    28  	"k8s.io/apimachinery/pkg/util/sets"
    29  	runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
    30  	"k8s.io/klog/v2"
    31  	corev1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
    32  	kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
    33  	"k8s.io/kubernetes/pkg/kubelet/cm/containermap"
    34  	"k8s.io/kubernetes/pkg/kubelet/cm/memorymanager/state"
    35  	"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
    36  	"k8s.io/kubernetes/pkg/kubelet/config"
    37  	"k8s.io/kubernetes/pkg/kubelet/status"
    38  	"k8s.io/kubernetes/pkg/kubelet/types"
    39  )
    40  
    41  // memoryManagerStateFileName is the file name where memory manager stores its state
    42  const memoryManagerStateFileName = "memory_manager_state"
    43  
    44  // ActivePodsFunc is a function that returns a list of active pods
    45  type ActivePodsFunc func() []*v1.Pod
    46  
    47  type runtimeService interface {
    48  	UpdateContainerResources(ctx context.Context, id string, resources *runtimeapi.ContainerResources) error
    49  }
    50  
    51  type sourcesReadyStub struct{}
    52  
    53  func (s *sourcesReadyStub) AddSource(source string) {}
    54  func (s *sourcesReadyStub) AllReady() bool          { return true }
    55  
    56  // Manager interface provides methods for Kubelet to manage pod memory.
    57  type Manager interface {
    58  	// Start is called during Kubelet initialization.
    59  	Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService, initialContainers containermap.ContainerMap) error
    60  
    61  	// AddContainer adds the mapping between container ID to pod UID and the container name
    62  	// The mapping used to remove the memory allocation during the container removal
    63  	AddContainer(p *v1.Pod, c *v1.Container, containerID string)
    64  
    65  	// Allocate is called to pre-allocate memory resources during Pod admission.
    66  	// This must be called at some point prior to the AddContainer() call for a container, e.g. at pod admission time.
    67  	Allocate(pod *v1.Pod, container *v1.Container) error
    68  
    69  	// RemoveContainer is called after Kubelet decides to kill or delete a
    70  	// container. After this call, any memory allocated to the container is freed.
    71  	RemoveContainer(containerID string) error
    72  
    73  	// State returns a read-only interface to the internal memory manager state.
    74  	State() state.Reader
    75  
    76  	// GetTopologyHints implements the topologymanager.HintProvider Interface
    77  	// and is consulted to achieve NUMA aware resource alignment among this
    78  	// and other resource controllers.
    79  	GetTopologyHints(*v1.Pod, *v1.Container) map[string][]topologymanager.TopologyHint
    80  
    81  	// GetPodTopologyHints implements the topologymanager.HintProvider Interface
    82  	// and is consulted to achieve NUMA aware resource alignment among this
    83  	// and other resource controllers.
    84  	GetPodTopologyHints(*v1.Pod) map[string][]topologymanager.TopologyHint
    85  
    86  	// GetMemoryNUMANodes provides NUMA nodes that are used to allocate the container memory
    87  	GetMemoryNUMANodes(pod *v1.Pod, container *v1.Container) sets.Set[int]
    88  
    89  	// GetAllocatableMemory returns the amount of allocatable memory for each NUMA node
    90  	GetAllocatableMemory() []state.Block
    91  
    92  	// GetMemory returns the memory allocated by a container from NUMA nodes
    93  	GetMemory(podUID, containerName string) []state.Block
    94  }
    95  
    96  type manager struct {
    97  	sync.Mutex
    98  	policy Policy
    99  
   100  	// state allows to restore information regarding memory allocation for guaranteed pods
   101  	// in the case of the kubelet restart
   102  	state state.State
   103  
   104  	// containerRuntime is the container runtime service interface needed
   105  	// to make UpdateContainerResources() calls against the containers.
   106  	containerRuntime runtimeService
   107  
   108  	// activePods is a method for listing active pods on the node
   109  	// so all the containers can be updated during call to the removeStaleState.
   110  	activePods ActivePodsFunc
   111  
   112  	// podStatusProvider provides a method for obtaining pod statuses
   113  	// and the containerID of their containers
   114  	podStatusProvider status.PodStatusProvider
   115  
   116  	// containerMap provides a mapping from (pod, container) -> containerID
   117  	// for all containers a pod
   118  	containerMap containermap.ContainerMap
   119  
   120  	// sourcesReady provides the readiness of kubelet configuration sources such as apiserver update readiness.
   121  	// We use it to determine when we can purge inactive pods from checkpointed state.
   122  	sourcesReady config.SourcesReady
   123  
   124  	// stateFileDirectory holds the directory where the state file for checkpoints is held.
   125  	stateFileDirectory string
   126  
   127  	// allocatableMemory holds the allocatable memory for each NUMA node
   128  	allocatableMemory []state.Block
   129  
   130  	// pendingAdmissionPod contain the pod during the admission phase
   131  	pendingAdmissionPod *v1.Pod
   132  }
   133  
   134  var _ Manager = &manager{}
   135  
   136  // NewManager returns new instance of the memory manager
   137  func NewManager(policyName string, machineInfo *cadvisorapi.MachineInfo, nodeAllocatableReservation v1.ResourceList, reservedMemory []kubeletconfig.MemoryReservation, stateFileDirectory string, affinity topologymanager.Store) (Manager, error) {
   138  	var policy Policy
   139  
   140  	switch policyType(policyName) {
   141  
   142  	case policyTypeNone:
   143  		policy = NewPolicyNone()
   144  
   145  	case policyTypeStatic:
   146  		systemReserved, err := getSystemReservedMemory(machineInfo, nodeAllocatableReservation, reservedMemory)
   147  		if err != nil {
   148  			return nil, err
   149  		}
   150  
   151  		policy, err = NewPolicyStatic(machineInfo, systemReserved, affinity)
   152  		if err != nil {
   153  			return nil, err
   154  		}
   155  
   156  	default:
   157  		return nil, fmt.Errorf("unknown policy: \"%s\"", policyName)
   158  	}
   159  
   160  	manager := &manager{
   161  		policy:             policy,
   162  		stateFileDirectory: stateFileDirectory,
   163  	}
   164  	manager.sourcesReady = &sourcesReadyStub{}
   165  	return manager, nil
   166  }
   167  
   168  // Start starts the memory manager under the kubelet and calls policy start
   169  func (m *manager) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService, initialContainers containermap.ContainerMap) error {
   170  	klog.InfoS("Starting memorymanager", "policy", m.policy.Name())
   171  	m.sourcesReady = sourcesReady
   172  	m.activePods = activePods
   173  	m.podStatusProvider = podStatusProvider
   174  	m.containerRuntime = containerRuntime
   175  	m.containerMap = initialContainers
   176  
   177  	stateImpl, err := state.NewCheckpointState(m.stateFileDirectory, memoryManagerStateFileName, m.policy.Name())
   178  	if err != nil {
   179  		klog.ErrorS(err, "Could not initialize checkpoint manager, please drain node and remove policy state file")
   180  		return err
   181  	}
   182  	m.state = stateImpl
   183  
   184  	err = m.policy.Start(m.state)
   185  	if err != nil {
   186  		klog.ErrorS(err, "Policy start error")
   187  		return err
   188  	}
   189  
   190  	m.allocatableMemory = m.policy.GetAllocatableMemory(m.state)
   191  
   192  	return nil
   193  }
   194  
   195  // AddContainer saves the value of requested memory for the guaranteed pod under the state and set memory affinity according to the topolgy manager
   196  func (m *manager) AddContainer(pod *v1.Pod, container *v1.Container, containerID string) {
   197  	m.Lock()
   198  	defer m.Unlock()
   199  
   200  	m.containerMap.Add(string(pod.UID), container.Name, containerID)
   201  
   202  	// Since we know that each init container always runs to completion before
   203  	// the next container starts, we can safely remove references to any previously
   204  	// started init containers. This will free up the memory from these init containers
   205  	// for use in other pods. If the current container happens to be an init container,
   206  	// we skip deletion of it until the next container is added, and this is called again.
   207  	for _, initContainer := range pod.Spec.InitContainers {
   208  		if initContainer.Name == container.Name {
   209  			break
   210  		}
   211  
   212  		// Since a restartable init container remains running for the full
   213  		// duration of the pod's lifecycle, we should not remove it from the
   214  		// memory manager state.
   215  		if types.IsRestartableInitContainer(&initContainer) {
   216  			continue
   217  		}
   218  
   219  		m.policyRemoveContainerByRef(string(pod.UID), initContainer.Name)
   220  	}
   221  }
   222  
   223  // GetMemoryNUMANodes provides NUMA nodes that used to allocate the container memory
   224  func (m *manager) GetMemoryNUMANodes(pod *v1.Pod, container *v1.Container) sets.Set[int] {
   225  	// Get NUMA node affinity of blocks assigned to the container during Allocate()
   226  	numaNodes := sets.New[int]()
   227  	for _, block := range m.state.GetMemoryBlocks(string(pod.UID), container.Name) {
   228  		for _, nodeID := range block.NUMAAffinity {
   229  			// avoid nodes duplication when hugepages and memory blocks pinned to the same NUMA node
   230  			numaNodes.Insert(nodeID)
   231  		}
   232  	}
   233  
   234  	if numaNodes.Len() == 0 {
   235  		klog.V(5).InfoS("No allocation is available", "pod", klog.KObj(pod), "containerName", container.Name)
   236  		return nil
   237  	}
   238  
   239  	klog.InfoS("Memory affinity", "pod", klog.KObj(pod), "containerName", container.Name, "numaNodes", numaNodes)
   240  	return numaNodes
   241  }
   242  
   243  // Allocate is called to pre-allocate memory resources during Pod admission.
   244  func (m *manager) Allocate(pod *v1.Pod, container *v1.Container) error {
   245  	// The pod is during the admission phase. We need to save the pod to avoid it
   246  	// being cleaned before the admission ended
   247  	m.setPodPendingAdmission(pod)
   248  
   249  	// Garbage collect any stranded resources before allocation
   250  	m.removeStaleState()
   251  
   252  	m.Lock()
   253  	defer m.Unlock()
   254  
   255  	// Call down into the policy to assign this container memory if required.
   256  	if err := m.policy.Allocate(m.state, pod, container); err != nil {
   257  		klog.ErrorS(err, "Allocate error")
   258  		return err
   259  	}
   260  	return nil
   261  }
   262  
   263  // RemoveContainer removes the container from the state
   264  func (m *manager) RemoveContainer(containerID string) error {
   265  	m.Lock()
   266  	defer m.Unlock()
   267  
   268  	// if error appears it means container entry already does not exist under the container map
   269  	podUID, containerName, err := m.containerMap.GetContainerRef(containerID)
   270  	if err != nil {
   271  		klog.InfoS("Failed to get container from container map", "containerID", containerID, "err", err)
   272  		return nil
   273  	}
   274  
   275  	m.policyRemoveContainerByRef(podUID, containerName)
   276  
   277  	return nil
   278  }
   279  
   280  // State returns the state of the manager
   281  func (m *manager) State() state.Reader {
   282  	return m.state
   283  }
   284  
   285  // GetPodTopologyHints returns the topology hints for the topology manager
   286  func (m *manager) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.TopologyHint {
   287  	// The pod is during the admission phase. We need to save the pod to avoid it
   288  	// being cleaned before the admission ended
   289  	m.setPodPendingAdmission(pod)
   290  
   291  	// Garbage collect any stranded resources before providing TopologyHints
   292  	m.removeStaleState()
   293  	// Delegate to active policy
   294  	return m.policy.GetPodTopologyHints(m.state, pod)
   295  }
   296  
   297  // GetTopologyHints returns the topology hints for the topology manager
   298  func (m *manager) GetTopologyHints(pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {
   299  	// The pod is during the admission phase. We need to save the pod to avoid it
   300  	// being cleaned before the admission ended
   301  	m.setPodPendingAdmission(pod)
   302  
   303  	// Garbage collect any stranded resources before providing TopologyHints
   304  	m.removeStaleState()
   305  	// Delegate to active policy
   306  	return m.policy.GetTopologyHints(m.state, pod, container)
   307  }
   308  
   309  // TODO: move the method to the upper level, to re-use it under the CPU and memory managers
   310  func (m *manager) removeStaleState() {
   311  	// Only once all sources are ready do we attempt to remove any stale state.
   312  	// This ensures that the call to `m.activePods()` below will succeed with
   313  	// the actual active pods list.
   314  	if !m.sourcesReady.AllReady() {
   315  		return
   316  	}
   317  
   318  	// We grab the lock to ensure that no new containers will grab memory block while
   319  	// executing the code below. Without this lock, its possible that we end up
   320  	// removing state that is newly added by an asynchronous call to
   321  	// AddContainer() during the execution of this code.
   322  	m.Lock()
   323  	defer m.Unlock()
   324  
   325  	// Get the list of admitted and active pods.
   326  	activeAndAdmittedPods := m.activePods()
   327  	if m.pendingAdmissionPod != nil {
   328  		activeAndAdmittedPods = append(activeAndAdmittedPods, m.pendingAdmissionPod)
   329  	}
   330  
   331  	// Build a list of (podUID, containerName) pairs for all containers in all active Pods.
   332  	activeContainers := make(map[string]map[string]struct{})
   333  	for _, pod := range activeAndAdmittedPods {
   334  		activeContainers[string(pod.UID)] = make(map[string]struct{})
   335  		for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
   336  			activeContainers[string(pod.UID)][container.Name] = struct{}{}
   337  		}
   338  	}
   339  
   340  	// Loop through the MemoryManager state. Remove any state for containers not
   341  	// in the `activeContainers` list built above.
   342  	assignments := m.state.GetMemoryAssignments()
   343  	for podUID := range assignments {
   344  		for containerName := range assignments[podUID] {
   345  			if _, ok := activeContainers[podUID][containerName]; !ok {
   346  				klog.InfoS("RemoveStaleState removing state", "podUID", podUID, "containerName", containerName)
   347  				m.policyRemoveContainerByRef(podUID, containerName)
   348  			}
   349  		}
   350  	}
   351  
   352  	m.containerMap.Visit(func(podUID, containerName, containerID string) {
   353  		if _, ok := activeContainers[podUID][containerName]; !ok {
   354  			klog.InfoS("RemoveStaleState removing state", "podUID", podUID, "containerName", containerName)
   355  			m.policyRemoveContainerByRef(podUID, containerName)
   356  		}
   357  	})
   358  }
   359  
   360  func (m *manager) policyRemoveContainerByRef(podUID string, containerName string) {
   361  	m.policy.RemoveContainer(m.state, podUID, containerName)
   362  	m.containerMap.RemoveByContainerRef(podUID, containerName)
   363  }
   364  
   365  func getTotalMemoryTypeReserved(machineInfo *cadvisorapi.MachineInfo, reservedMemory []kubeletconfig.MemoryReservation) (map[v1.ResourceName]resource.Quantity, error) {
   366  	totalMemoryType := map[v1.ResourceName]resource.Quantity{}
   367  
   368  	numaNodes := map[int]bool{}
   369  	for _, numaNode := range machineInfo.Topology {
   370  		numaNodes[numaNode.Id] = true
   371  	}
   372  
   373  	for _, reservation := range reservedMemory {
   374  		if !numaNodes[int(reservation.NumaNode)] {
   375  			return nil, fmt.Errorf("the reserved memory configuration references a NUMA node %d that does not exist on this machine", reservation.NumaNode)
   376  		}
   377  
   378  		for resourceName, q := range reservation.Limits {
   379  			if value, ok := totalMemoryType[resourceName]; ok {
   380  				q.Add(value)
   381  			}
   382  			totalMemoryType[resourceName] = q
   383  		}
   384  	}
   385  
   386  	return totalMemoryType, nil
   387  }
   388  
   389  func validateReservedMemory(machineInfo *cadvisorapi.MachineInfo, nodeAllocatableReservation v1.ResourceList, reservedMemory []kubeletconfig.MemoryReservation) error {
   390  	totalMemoryType, err := getTotalMemoryTypeReserved(machineInfo, reservedMemory)
   391  	if err != nil {
   392  		return err
   393  	}
   394  
   395  	commonMemoryTypeSet := make(map[v1.ResourceName]bool)
   396  	for resourceType := range totalMemoryType {
   397  		commonMemoryTypeSet[resourceType] = true
   398  	}
   399  
   400  	for resourceType := range nodeAllocatableReservation {
   401  		if !(corev1helper.IsHugePageResourceName(resourceType) || resourceType == v1.ResourceMemory) {
   402  			continue
   403  		}
   404  		commonMemoryTypeSet[resourceType] = true
   405  	}
   406  
   407  	for resourceType := range commonMemoryTypeSet {
   408  		nodeAllocatableMemory := resource.NewQuantity(0, resource.DecimalSI)
   409  		if memValue, set := nodeAllocatableReservation[resourceType]; set {
   410  			nodeAllocatableMemory.Add(memValue)
   411  		}
   412  
   413  		reservedMemory := resource.NewQuantity(0, resource.DecimalSI)
   414  		if memValue, set := totalMemoryType[resourceType]; set {
   415  			reservedMemory.Add(memValue)
   416  		}
   417  
   418  		if !(*nodeAllocatableMemory).Equal(*reservedMemory) {
   419  			return fmt.Errorf("the total amount %q of type %q is not equal to the value %q determined by Node Allocatable feature", reservedMemory.String(), resourceType, nodeAllocatableMemory.String())
   420  		}
   421  	}
   422  
   423  	return nil
   424  }
   425  
   426  func convertReserved(machineInfo *cadvisorapi.MachineInfo, reservedMemory []kubeletconfig.MemoryReservation) (systemReservedMemory, error) {
   427  	reservedMemoryConverted := make(map[int]map[v1.ResourceName]uint64)
   428  	for _, node := range machineInfo.Topology {
   429  		reservedMemoryConverted[node.Id] = make(map[v1.ResourceName]uint64)
   430  	}
   431  
   432  	for _, reservation := range reservedMemory {
   433  		for resourceName, q := range reservation.Limits {
   434  			val, success := q.AsInt64()
   435  			if !success {
   436  				return nil, fmt.Errorf("could not covert a variable of type Quantity to int64")
   437  			}
   438  			reservedMemoryConverted[int(reservation.NumaNode)][resourceName] = uint64(val)
   439  		}
   440  	}
   441  
   442  	return reservedMemoryConverted, nil
   443  }
   444  
   445  func getSystemReservedMemory(machineInfo *cadvisorapi.MachineInfo, nodeAllocatableReservation v1.ResourceList, reservedMemory []kubeletconfig.MemoryReservation) (systemReservedMemory, error) {
   446  	if err := validateReservedMemory(machineInfo, nodeAllocatableReservation, reservedMemory); err != nil {
   447  		return nil, err
   448  	}
   449  
   450  	reservedMemoryConverted, err := convertReserved(machineInfo, reservedMemory)
   451  	if err != nil {
   452  		return nil, err
   453  	}
   454  
   455  	return reservedMemoryConverted, nil
   456  }
   457  
   458  // GetAllocatableMemory returns the amount of allocatable memory for each NUMA node
   459  func (m *manager) GetAllocatableMemory() []state.Block {
   460  	return m.allocatableMemory
   461  }
   462  
   463  // GetMemory returns the memory allocated by a container from NUMA nodes
   464  func (m *manager) GetMemory(podUID, containerName string) []state.Block {
   465  	return m.state.GetMemoryBlocks(podUID, containerName)
   466  }
   467  
   468  func (m *manager) setPodPendingAdmission(pod *v1.Pod) {
   469  	m.Lock()
   470  	defer m.Unlock()
   471  
   472  	m.pendingAdmissionPod = pod
   473  }
   474  

View as plain text