...

Source file src/k8s.io/kubernetes/pkg/kubelet/cm/memorymanager/policy_static.go

Documentation: k8s.io/kubernetes/pkg/kubelet/cm/memorymanager

     1  /*
     2  Copyright 2020 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package memorymanager
    18  
    19  import (
    20  	"fmt"
    21  	"reflect"
    22  	"sort"
    23  
    24  	cadvisorapi "github.com/google/cadvisor/info/v1"
    25  
    26  	v1 "k8s.io/api/core/v1"
    27  	"k8s.io/apimachinery/pkg/api/resource"
    28  	utilfeature "k8s.io/apiserver/pkg/util/feature"
    29  	"k8s.io/klog/v2"
    30  	podutil "k8s.io/kubernetes/pkg/api/v1/pod"
    31  	corehelper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
    32  	v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos"
    33  	"k8s.io/kubernetes/pkg/features"
    34  	"k8s.io/kubernetes/pkg/kubelet/cm/memorymanager/state"
    35  	"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
    36  	"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask"
    37  	"k8s.io/kubernetes/pkg/kubelet/metrics"
    38  	"k8s.io/kubernetes/pkg/kubelet/types"
    39  )
    40  
    41  const policyTypeStatic policyType = "Static"
    42  
    43  type systemReservedMemory map[int]map[v1.ResourceName]uint64
    44  type reusableMemory map[string]map[string]map[v1.ResourceName]uint64
    45  
    46  // staticPolicy is implementation of the policy interface for the static policy
    47  type staticPolicy struct {
    48  	// machineInfo contains machine memory related information
    49  	machineInfo *cadvisorapi.MachineInfo
    50  	// reserved contains memory that reserved for kube
    51  	systemReserved systemReservedMemory
    52  	// topology manager reference to get container Topology affinity
    53  	affinity topologymanager.Store
    54  	// initContainersReusableMemory contains the memory allocated for init
    55  	// containers that can be reused.
    56  	// Note that the restartable init container memory is not included here,
    57  	// because it is not reusable.
    58  	initContainersReusableMemory reusableMemory
    59  }
    60  
    61  var _ Policy = &staticPolicy{}
    62  
    63  // NewPolicyStatic returns new static policy instance
    64  func NewPolicyStatic(machineInfo *cadvisorapi.MachineInfo, reserved systemReservedMemory, affinity topologymanager.Store) (Policy, error) {
    65  	var totalSystemReserved uint64
    66  	for _, node := range reserved {
    67  		if _, ok := node[v1.ResourceMemory]; !ok {
    68  			continue
    69  		}
    70  		totalSystemReserved += node[v1.ResourceMemory]
    71  	}
    72  
    73  	// check if we have some reserved memory for the system
    74  	if totalSystemReserved <= 0 {
    75  		return nil, fmt.Errorf("[memorymanager] you should specify the system reserved memory")
    76  	}
    77  
    78  	return &staticPolicy{
    79  		machineInfo:                  machineInfo,
    80  		systemReserved:               reserved,
    81  		affinity:                     affinity,
    82  		initContainersReusableMemory: reusableMemory{},
    83  	}, nil
    84  }
    85  
    86  func (p *staticPolicy) Name() string {
    87  	return string(policyTypeStatic)
    88  }
    89  
    90  func (p *staticPolicy) Start(s state.State) error {
    91  	if err := p.validateState(s); err != nil {
    92  		klog.ErrorS(err, "Invalid state, please drain node and remove policy state file")
    93  		return err
    94  	}
    95  	return nil
    96  }
    97  
    98  // Allocate call is idempotent
    99  func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Container) (rerr error) {
   100  	// allocate the memory only for guaranteed pods
   101  	if v1qos.GetPodQOS(pod) != v1.PodQOSGuaranteed {
   102  		return nil
   103  	}
   104  
   105  	podUID := string(pod.UID)
   106  	klog.InfoS("Allocate", "pod", klog.KObj(pod), "containerName", container.Name)
   107  	// container belongs in an exclusively allocated pool
   108  	metrics.MemoryManagerPinningRequestTotal.Inc()
   109  	defer func() {
   110  		if rerr != nil {
   111  			metrics.MemoryManagerPinningErrorsTotal.Inc()
   112  		}
   113  	}()
   114  	if blocks := s.GetMemoryBlocks(podUID, container.Name); blocks != nil {
   115  		p.updatePodReusableMemory(pod, container, blocks)
   116  
   117  		klog.InfoS("Container already present in state, skipping", "pod", klog.KObj(pod), "containerName", container.Name)
   118  		return nil
   119  	}
   120  
   121  	// Call Topology Manager to get the aligned affinity across all hint providers.
   122  	hint := p.affinity.GetAffinity(podUID, container.Name)
   123  	klog.InfoS("Got topology affinity", "pod", klog.KObj(pod), "podUID", pod.UID, "containerName", container.Name, "hint", hint)
   124  
   125  	requestedResources, err := getRequestedResources(pod, container)
   126  	if err != nil {
   127  		return err
   128  	}
   129  
   130  	machineState := s.GetMachineState()
   131  	bestHint := &hint
   132  	// topology manager returned the hint with NUMA affinity nil
   133  	// we should use the default NUMA affinity calculated the same way as for the topology manager
   134  	if hint.NUMANodeAffinity == nil {
   135  		defaultHint, err := p.getDefaultHint(machineState, pod, requestedResources)
   136  		if err != nil {
   137  			return err
   138  		}
   139  
   140  		if !defaultHint.Preferred && bestHint.Preferred {
   141  			return fmt.Errorf("[memorymanager] failed to find the default preferred hint")
   142  		}
   143  		bestHint = defaultHint
   144  	}
   145  
   146  	// topology manager returns the hint that does not satisfy completely the container request
   147  	// we should extend this hint to the one who will satisfy the request and include the current hint
   148  	if !isAffinitySatisfyRequest(machineState, bestHint.NUMANodeAffinity, requestedResources) {
   149  		extendedHint, err := p.extendTopologyManagerHint(machineState, pod, requestedResources, bestHint.NUMANodeAffinity)
   150  		if err != nil {
   151  			return err
   152  		}
   153  
   154  		if !extendedHint.Preferred && bestHint.Preferred {
   155  			return fmt.Errorf("[memorymanager] failed to find the extended preferred hint")
   156  		}
   157  		bestHint = extendedHint
   158  	}
   159  
   160  	var containerBlocks []state.Block
   161  	maskBits := bestHint.NUMANodeAffinity.GetBits()
   162  	for resourceName, requestedSize := range requestedResources {
   163  		// update memory blocks
   164  		containerBlocks = append(containerBlocks, state.Block{
   165  			NUMAAffinity: maskBits,
   166  			Size:         requestedSize,
   167  			Type:         resourceName,
   168  		})
   169  
   170  		podReusableMemory := p.getPodReusableMemory(pod, bestHint.NUMANodeAffinity, resourceName)
   171  		if podReusableMemory >= requestedSize {
   172  			requestedSize = 0
   173  		} else {
   174  			requestedSize -= podReusableMemory
   175  		}
   176  
   177  		// Update nodes memory state
   178  		p.updateMachineState(machineState, maskBits, resourceName, requestedSize)
   179  	}
   180  
   181  	p.updatePodReusableMemory(pod, container, containerBlocks)
   182  
   183  	s.SetMachineState(machineState)
   184  	s.SetMemoryBlocks(podUID, container.Name, containerBlocks)
   185  
   186  	// update init containers memory blocks to reflect the fact that we re-used init containers memory
   187  	// it is possible that the size of the init container memory block will have 0 value, when all memory
   188  	// allocated for it was re-used
   189  	// we only do this so that the sum(memory_for_all_containers) == total amount of allocated memory to the pod, even
   190  	// though the final state here doesn't accurately reflect what was (in reality) allocated to each container
   191  	// TODO: we should refactor our state structs to reflect the amount of the re-used memory
   192  	p.updateInitContainersMemoryBlocks(s, pod, container, containerBlocks)
   193  
   194  	return nil
   195  }
   196  
   197  func (p *staticPolicy) updateMachineState(machineState state.NUMANodeMap, numaAffinity []int, resourceName v1.ResourceName, requestedSize uint64) {
   198  	for _, nodeID := range numaAffinity {
   199  		machineState[nodeID].NumberOfAssignments++
   200  		machineState[nodeID].Cells = numaAffinity
   201  
   202  		// we need to continue to update all affinity mask nodes
   203  		if requestedSize == 0 {
   204  			continue
   205  		}
   206  
   207  		// update the node memory state
   208  		nodeResourceMemoryState := machineState[nodeID].MemoryMap[resourceName]
   209  		if nodeResourceMemoryState.Free <= 0 {
   210  			continue
   211  		}
   212  
   213  		// the node has enough memory to satisfy the request
   214  		if nodeResourceMemoryState.Free >= requestedSize {
   215  			nodeResourceMemoryState.Reserved += requestedSize
   216  			nodeResourceMemoryState.Free -= requestedSize
   217  			requestedSize = 0
   218  			continue
   219  		}
   220  
   221  		// the node does not have enough memory, use the node remaining memory and move to the next node
   222  		requestedSize -= nodeResourceMemoryState.Free
   223  		nodeResourceMemoryState.Reserved += nodeResourceMemoryState.Free
   224  		nodeResourceMemoryState.Free = 0
   225  	}
   226  }
   227  
   228  func (p *staticPolicy) getPodReusableMemory(pod *v1.Pod, numaAffinity bitmask.BitMask, resourceName v1.ResourceName) uint64 {
   229  	podReusableMemory, ok := p.initContainersReusableMemory[string(pod.UID)]
   230  	if !ok {
   231  		return 0
   232  	}
   233  
   234  	numaReusableMemory, ok := podReusableMemory[numaAffinity.String()]
   235  	if !ok {
   236  		return 0
   237  	}
   238  
   239  	return numaReusableMemory[resourceName]
   240  }
   241  
   242  // RemoveContainer call is idempotent
   243  func (p *staticPolicy) RemoveContainer(s state.State, podUID string, containerName string) {
   244  	blocks := s.GetMemoryBlocks(podUID, containerName)
   245  	if blocks == nil {
   246  		return
   247  	}
   248  
   249  	klog.InfoS("RemoveContainer", "podUID", podUID, "containerName", containerName)
   250  	s.Delete(podUID, containerName)
   251  
   252  	// Mutate machine memory state to update free and reserved memory
   253  	machineState := s.GetMachineState()
   254  	for _, b := range blocks {
   255  		releasedSize := b.Size
   256  		for _, nodeID := range b.NUMAAffinity {
   257  			machineState[nodeID].NumberOfAssignments--
   258  
   259  			// once we do not have any memory allocations on this node, clear node groups
   260  			if machineState[nodeID].NumberOfAssignments == 0 {
   261  				machineState[nodeID].Cells = []int{nodeID}
   262  			}
   263  
   264  			// we still need to pass over all NUMA node under the affinity mask to update them
   265  			if releasedSize == 0 {
   266  				continue
   267  			}
   268  
   269  			nodeResourceMemoryState := machineState[nodeID].MemoryMap[b.Type]
   270  
   271  			// if the node does not have reserved memory to free, continue to the next node
   272  			if nodeResourceMemoryState.Reserved == 0 {
   273  				continue
   274  			}
   275  
   276  			// the reserved memory smaller than the amount of the memory that should be released
   277  			// release as much as possible and move to the next node
   278  			if nodeResourceMemoryState.Reserved < releasedSize {
   279  				releasedSize -= nodeResourceMemoryState.Reserved
   280  				nodeResourceMemoryState.Free += nodeResourceMemoryState.Reserved
   281  				nodeResourceMemoryState.Reserved = 0
   282  				continue
   283  			}
   284  
   285  			// the reserved memory big enough to satisfy the released memory
   286  			nodeResourceMemoryState.Free += releasedSize
   287  			nodeResourceMemoryState.Reserved -= releasedSize
   288  			releasedSize = 0
   289  		}
   290  	}
   291  
   292  	s.SetMachineState(machineState)
   293  }
   294  
   295  func regenerateHints(pod *v1.Pod, ctn *v1.Container, ctnBlocks []state.Block, reqRsrc map[v1.ResourceName]uint64) map[string][]topologymanager.TopologyHint {
   296  	hints := map[string][]topologymanager.TopologyHint{}
   297  	for resourceName := range reqRsrc {
   298  		hints[string(resourceName)] = []topologymanager.TopologyHint{}
   299  	}
   300  
   301  	if len(ctnBlocks) != len(reqRsrc) {
   302  		klog.ErrorS(nil, "The number of requested resources by the container differs from the number of memory blocks", "containerName", ctn.Name)
   303  		return nil
   304  	}
   305  
   306  	for _, b := range ctnBlocks {
   307  		if _, ok := reqRsrc[b.Type]; !ok {
   308  			klog.ErrorS(nil, "Container requested resources do not have resource of this type", "containerName", ctn.Name, "type", b.Type)
   309  			return nil
   310  		}
   311  
   312  		if b.Size != reqRsrc[b.Type] {
   313  			klog.ErrorS(nil, "Memory already allocated with different numbers than requested", "podUID", pod.UID, "type", b.Type, "containerName", ctn.Name, "requestedResource", reqRsrc[b.Type], "allocatedSize", b.Size)
   314  			return nil
   315  		}
   316  
   317  		containerNUMAAffinity, err := bitmask.NewBitMask(b.NUMAAffinity...)
   318  		if err != nil {
   319  			klog.ErrorS(err, "Failed to generate NUMA bitmask")
   320  			return nil
   321  		}
   322  
   323  		klog.InfoS("Regenerating TopologyHints, resource was already allocated to pod", "resourceName", b.Type, "pod", klog.KObj(pod), "podUID", pod.UID, "containerName", ctn.Name)
   324  		hints[string(b.Type)] = append(hints[string(b.Type)], topologymanager.TopologyHint{
   325  			NUMANodeAffinity: containerNUMAAffinity,
   326  			Preferred:        true,
   327  		})
   328  	}
   329  	return hints
   330  }
   331  
   332  func getPodRequestedResources(pod *v1.Pod) (map[v1.ResourceName]uint64, error) {
   333  	// Maximun resources requested by init containers at any given time.
   334  	reqRsrcsByInitCtrs := make(map[v1.ResourceName]uint64)
   335  	// Total resources requested by restartable init containers.
   336  	reqRsrcsByRestartableInitCtrs := make(map[v1.ResourceName]uint64)
   337  	for _, ctr := range pod.Spec.InitContainers {
   338  		reqRsrcs, err := getRequestedResources(pod, &ctr)
   339  
   340  		if err != nil {
   341  			return nil, err
   342  		}
   343  		for rsrcName, qty := range reqRsrcs {
   344  			if _, ok := reqRsrcsByInitCtrs[rsrcName]; !ok {
   345  				reqRsrcsByInitCtrs[rsrcName] = uint64(0)
   346  			}
   347  
   348  			// See https://github.com/kubernetes/enhancements/tree/master/keps/sig-node/753-sidecar-containers#resources-calculation-for-scheduling-and-pod-admission
   349  			// for the detail.
   350  			if types.IsRestartableInitContainer(&ctr) {
   351  				reqRsrcsByRestartableInitCtrs[rsrcName] += qty
   352  			} else if reqRsrcsByRestartableInitCtrs[rsrcName]+qty > reqRsrcsByInitCtrs[rsrcName] {
   353  				reqRsrcsByInitCtrs[rsrcName] = reqRsrcsByRestartableInitCtrs[rsrcName] + qty
   354  			}
   355  		}
   356  	}
   357  
   358  	reqRsrcsByAppCtrs := make(map[v1.ResourceName]uint64)
   359  	for _, ctr := range pod.Spec.Containers {
   360  		reqRsrcs, err := getRequestedResources(pod, &ctr)
   361  
   362  		if err != nil {
   363  			return nil, err
   364  		}
   365  		for rsrcName, qty := range reqRsrcs {
   366  			if _, ok := reqRsrcsByAppCtrs[rsrcName]; !ok {
   367  				reqRsrcsByAppCtrs[rsrcName] = uint64(0)
   368  			}
   369  
   370  			reqRsrcsByAppCtrs[rsrcName] += qty
   371  		}
   372  	}
   373  
   374  	reqRsrcs := make(map[v1.ResourceName]uint64)
   375  	for rsrcName := range reqRsrcsByAppCtrs {
   376  		// Total resources requested by long-running containers.
   377  		reqRsrcsByLongRunningCtrs := reqRsrcsByAppCtrs[rsrcName] + reqRsrcsByRestartableInitCtrs[rsrcName]
   378  		reqRsrcs[rsrcName] = reqRsrcsByLongRunningCtrs
   379  
   380  		if reqRsrcs[rsrcName] < reqRsrcsByInitCtrs[rsrcName] {
   381  			reqRsrcs[rsrcName] = reqRsrcsByInitCtrs[rsrcName]
   382  		}
   383  	}
   384  	return reqRsrcs, nil
   385  }
   386  
   387  func (p *staticPolicy) GetPodTopologyHints(s state.State, pod *v1.Pod) map[string][]topologymanager.TopologyHint {
   388  	if v1qos.GetPodQOS(pod) != v1.PodQOSGuaranteed {
   389  		return nil
   390  	}
   391  
   392  	reqRsrcs, err := getPodRequestedResources(pod)
   393  	if err != nil {
   394  		klog.ErrorS(err, "Failed to get pod requested resources", "pod", klog.KObj(pod), "podUID", pod.UID)
   395  		return nil
   396  	}
   397  
   398  	for _, ctn := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
   399  		containerBlocks := s.GetMemoryBlocks(string(pod.UID), ctn.Name)
   400  		// Short circuit to regenerate the same hints if there are already
   401  		// memory allocated for the container. This might happen after a
   402  		// kubelet restart, for example.
   403  		if containerBlocks != nil {
   404  			return regenerateHints(pod, &ctn, containerBlocks, reqRsrcs)
   405  		}
   406  	}
   407  
   408  	// the pod topology hints calculated only once for all containers, so no need to pass re-usable state
   409  	return p.calculateHints(s.GetMachineState(), pod, reqRsrcs)
   410  }
   411  
   412  // GetTopologyHints implements the topologymanager.HintProvider Interface
   413  // and is consulted to achieve NUMA aware resource alignment among this
   414  // and other resource controllers.
   415  func (p *staticPolicy) GetTopologyHints(s state.State, pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {
   416  	if v1qos.GetPodQOS(pod) != v1.PodQOSGuaranteed {
   417  		return nil
   418  	}
   419  
   420  	requestedResources, err := getRequestedResources(pod, container)
   421  	if err != nil {
   422  		klog.ErrorS(err, "Failed to get container requested resources", "pod", klog.KObj(pod), "podUID", pod.UID, "containerName", container.Name)
   423  		return nil
   424  	}
   425  
   426  	containerBlocks := s.GetMemoryBlocks(string(pod.UID), container.Name)
   427  	// Short circuit to regenerate the same hints if there are already
   428  	// memory allocated for the container. This might happen after a
   429  	// kubelet restart, for example.
   430  	if containerBlocks != nil {
   431  		return regenerateHints(pod, container, containerBlocks, requestedResources)
   432  	}
   433  
   434  	return p.calculateHints(s.GetMachineState(), pod, requestedResources)
   435  }
   436  
   437  func getRequestedResources(pod *v1.Pod, container *v1.Container) (map[v1.ResourceName]uint64, error) {
   438  	requestedResources := map[v1.ResourceName]uint64{}
   439  	resources := container.Resources.Requests
   440  	// In-place pod resize feature makes Container.Resources field mutable for CPU & memory.
   441  	// AllocatedResources holds the value of Container.Resources.Requests when the pod was admitted.
   442  	// We should return this value because this is what kubelet agreed to allocate for the container
   443  	// and the value configured with runtime.
   444  	if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
   445  		if cs, ok := podutil.GetContainerStatus(pod.Status.ContainerStatuses, container.Name); ok {
   446  			resources = cs.AllocatedResources
   447  		}
   448  	}
   449  	for resourceName, quantity := range resources {
   450  		if resourceName != v1.ResourceMemory && !corehelper.IsHugePageResourceName(resourceName) {
   451  			continue
   452  		}
   453  		requestedSize, succeed := quantity.AsInt64()
   454  		if !succeed {
   455  			return nil, fmt.Errorf("[memorymanager] failed to represent quantity as int64")
   456  		}
   457  		requestedResources[resourceName] = uint64(requestedSize)
   458  	}
   459  	return requestedResources, nil
   460  }
   461  
   462  func (p *staticPolicy) calculateHints(machineState state.NUMANodeMap, pod *v1.Pod, requestedResources map[v1.ResourceName]uint64) map[string][]topologymanager.TopologyHint {
   463  	var numaNodes []int
   464  	for n := range machineState {
   465  		numaNodes = append(numaNodes, n)
   466  	}
   467  	sort.Ints(numaNodes)
   468  
   469  	// Initialize minAffinitySize to include all NUMA Cells.
   470  	minAffinitySize := len(numaNodes)
   471  
   472  	hints := map[string][]topologymanager.TopologyHint{}
   473  	bitmask.IterateBitMasks(numaNodes, func(mask bitmask.BitMask) {
   474  		maskBits := mask.GetBits()
   475  		singleNUMAHint := len(maskBits) == 1
   476  
   477  		totalFreeSize := map[v1.ResourceName]uint64{}
   478  		totalAllocatableSize := map[v1.ResourceName]uint64{}
   479  		// calculate total free and allocatable memory for the node mask
   480  		for _, nodeID := range maskBits {
   481  			for resourceName := range requestedResources {
   482  				if _, ok := totalFreeSize[resourceName]; !ok {
   483  					totalFreeSize[resourceName] = 0
   484  				}
   485  				totalFreeSize[resourceName] += machineState[nodeID].MemoryMap[resourceName].Free
   486  
   487  				if _, ok := totalAllocatableSize[resourceName]; !ok {
   488  					totalAllocatableSize[resourceName] = 0
   489  				}
   490  				totalAllocatableSize[resourceName] += machineState[nodeID].MemoryMap[resourceName].Allocatable
   491  			}
   492  		}
   493  
   494  		// verify that for all memory types the node mask has enough allocatable resources
   495  		for resourceName, requestedSize := range requestedResources {
   496  			if totalAllocatableSize[resourceName] < requestedSize {
   497  				return
   498  			}
   499  		}
   500  
   501  		// set the minimum amount of NUMA nodes that can satisfy the container resources requests
   502  		if mask.Count() < minAffinitySize {
   503  			minAffinitySize = mask.Count()
   504  		}
   505  
   506  		// the node already in group with another node, it can not be used for the single NUMA node allocation
   507  		if singleNUMAHint && len(machineState[maskBits[0]].Cells) > 1 {
   508  			return
   509  		}
   510  
   511  		for _, nodeID := range maskBits {
   512  			// the node already used for the memory allocation
   513  			if !singleNUMAHint && machineState[nodeID].NumberOfAssignments > 0 {
   514  				// the node used for the single NUMA memory allocation, it can not be used for the multi NUMA node allocation
   515  				if len(machineState[nodeID].Cells) == 1 {
   516  					return
   517  				}
   518  
   519  				// the node already used with different group of nodes, it can not be use with in the current hint
   520  				if !areGroupsEqual(machineState[nodeID].Cells, maskBits) {
   521  					return
   522  				}
   523  			}
   524  		}
   525  
   526  		// verify that for all memory types the node mask has enough free resources
   527  		for resourceName, requestedSize := range requestedResources {
   528  			podReusableMemory := p.getPodReusableMemory(pod, mask, resourceName)
   529  			if totalFreeSize[resourceName]+podReusableMemory < requestedSize {
   530  				return
   531  			}
   532  		}
   533  
   534  		// add the node mask as topology hint for all memory types
   535  		for resourceName := range requestedResources {
   536  			if _, ok := hints[string(resourceName)]; !ok {
   537  				hints[string(resourceName)] = []topologymanager.TopologyHint{}
   538  			}
   539  			hints[string(resourceName)] = append(hints[string(resourceName)], topologymanager.TopologyHint{
   540  				NUMANodeAffinity: mask,
   541  				Preferred:        false,
   542  			})
   543  		}
   544  	})
   545  
   546  	// update hints preferred according to multiNUMAGroups, in case when it wasn't provided, the default
   547  	// behaviour to prefer the minimal amount of NUMA nodes will be used
   548  	for resourceName := range requestedResources {
   549  		for i, hint := range hints[string(resourceName)] {
   550  			hints[string(resourceName)][i].Preferred = p.isHintPreferred(hint.NUMANodeAffinity.GetBits(), minAffinitySize)
   551  		}
   552  	}
   553  
   554  	return hints
   555  }
   556  
   557  func (p *staticPolicy) isHintPreferred(maskBits []int, minAffinitySize int) bool {
   558  	return len(maskBits) == minAffinitySize
   559  }
   560  
   561  func areGroupsEqual(group1, group2 []int) bool {
   562  	sort.Ints(group1)
   563  	sort.Ints(group2)
   564  
   565  	if len(group1) != len(group2) {
   566  		return false
   567  	}
   568  
   569  	for i, elm := range group1 {
   570  		if group2[i] != elm {
   571  			return false
   572  		}
   573  	}
   574  	return true
   575  }
   576  
   577  func (p *staticPolicy) validateState(s state.State) error {
   578  	machineState := s.GetMachineState()
   579  	memoryAssignments := s.GetMemoryAssignments()
   580  
   581  	if len(machineState) == 0 {
   582  		// Machine state cannot be empty when assignments exist
   583  		if len(memoryAssignments) != 0 {
   584  			return fmt.Errorf("[memorymanager] machine state can not be empty when it has memory assignments")
   585  		}
   586  
   587  		defaultMachineState := p.getDefaultMachineState()
   588  		s.SetMachineState(defaultMachineState)
   589  
   590  		return nil
   591  	}
   592  
   593  	// calculate all memory assigned to containers
   594  	expectedMachineState := p.getDefaultMachineState()
   595  	for pod, container := range memoryAssignments {
   596  		for containerName, blocks := range container {
   597  			for _, b := range blocks {
   598  				requestedSize := b.Size
   599  				for _, nodeID := range b.NUMAAffinity {
   600  					nodeState, ok := expectedMachineState[nodeID]
   601  					if !ok {
   602  						return fmt.Errorf("[memorymanager] (pod: %s, container: %s) the memory assignment uses the NUMA that does not exist", pod, containerName)
   603  					}
   604  
   605  					nodeState.NumberOfAssignments++
   606  					nodeState.Cells = b.NUMAAffinity
   607  
   608  					memoryState, ok := nodeState.MemoryMap[b.Type]
   609  					if !ok {
   610  						return fmt.Errorf("[memorymanager] (pod: %s, container: %s) the memory assignment uses memory resource that does not exist", pod, containerName)
   611  					}
   612  
   613  					if requestedSize == 0 {
   614  						continue
   615  					}
   616  
   617  					// this node does not have enough memory continue to the next one
   618  					if memoryState.Free <= 0 {
   619  						continue
   620  					}
   621  
   622  					// the node has enough memory to satisfy the request
   623  					if memoryState.Free >= requestedSize {
   624  						memoryState.Reserved += requestedSize
   625  						memoryState.Free -= requestedSize
   626  						requestedSize = 0
   627  						continue
   628  					}
   629  
   630  					// the node does not have enough memory, use the node remaining memory and move to the next node
   631  					requestedSize -= memoryState.Free
   632  					memoryState.Reserved += memoryState.Free
   633  					memoryState.Free = 0
   634  				}
   635  			}
   636  		}
   637  	}
   638  
   639  	// State has already been initialized from file (is not empty)
   640  	// Validate that total size, system reserved and reserved memory not changed, it can happen, when:
   641  	// - adding or removing physical memory bank from the node
   642  	// - change of kubelet system-reserved, kube-reserved or pre-reserved-memory-zone parameters
   643  	if !areMachineStatesEqual(machineState, expectedMachineState) {
   644  		return fmt.Errorf("[memorymanager] the expected machine state is different from the real one")
   645  	}
   646  
   647  	return nil
   648  }
   649  
   650  func areMachineStatesEqual(ms1, ms2 state.NUMANodeMap) bool {
   651  	if len(ms1) != len(ms2) {
   652  		klog.ErrorS(nil, "Node states are different", "lengthNode1", len(ms1), "lengthNode2", len(ms2))
   653  		return false
   654  	}
   655  
   656  	for nodeID, nodeState1 := range ms1 {
   657  		nodeState2, ok := ms2[nodeID]
   658  		if !ok {
   659  			klog.ErrorS(nil, "Node state does not have node ID", "nodeID", nodeID)
   660  			return false
   661  		}
   662  
   663  		if nodeState1.NumberOfAssignments != nodeState2.NumberOfAssignments {
   664  			klog.ErrorS(nil, "Node states number of assignments are different", "assignment1", nodeState1.NumberOfAssignments, "assignment2", nodeState2.NumberOfAssignments)
   665  			return false
   666  		}
   667  
   668  		if !areGroupsEqual(nodeState1.Cells, nodeState2.Cells) {
   669  			klog.ErrorS(nil, "Node states groups are different", "stateNode1", nodeState1.Cells, "stateNode2", nodeState2.Cells)
   670  			return false
   671  		}
   672  
   673  		if len(nodeState1.MemoryMap) != len(nodeState2.MemoryMap) {
   674  			klog.ErrorS(nil, "Node states memory map have different lengths", "lengthNode1", len(nodeState1.MemoryMap), "lengthNode2", len(nodeState2.MemoryMap))
   675  			return false
   676  		}
   677  
   678  		for resourceName, memoryState1 := range nodeState1.MemoryMap {
   679  			memoryState2, ok := nodeState2.MemoryMap[resourceName]
   680  			if !ok {
   681  				klog.ErrorS(nil, "Memory state does not have resource", "resource", resourceName)
   682  				return false
   683  			}
   684  
   685  			if !reflect.DeepEqual(*memoryState1, *memoryState2) {
   686  				klog.ErrorS(nil, "Memory states for the NUMA node and resource are different", "node", nodeID, "resource", resourceName, "memoryState1", *memoryState1, "memoryState2", *memoryState2)
   687  				return false
   688  			}
   689  		}
   690  	}
   691  	return true
   692  }
   693  
   694  func (p *staticPolicy) getDefaultMachineState() state.NUMANodeMap {
   695  	defaultMachineState := state.NUMANodeMap{}
   696  	nodeHugepages := map[int]uint64{}
   697  	for _, node := range p.machineInfo.Topology {
   698  		defaultMachineState[node.Id] = &state.NUMANodeState{
   699  			NumberOfAssignments: 0,
   700  			MemoryMap:           map[v1.ResourceName]*state.MemoryTable{},
   701  			Cells:               []int{node.Id},
   702  		}
   703  
   704  		// fill memory table with huge pages values
   705  		for _, hugepage := range node.HugePages {
   706  			hugepageQuantity := resource.NewQuantity(int64(hugepage.PageSize)*1024, resource.BinarySI)
   707  			resourceName := corehelper.HugePageResourceName(*hugepageQuantity)
   708  			systemReserved := p.getResourceSystemReserved(node.Id, resourceName)
   709  			totalHugepagesSize := hugepage.NumPages * hugepage.PageSize * 1024
   710  			allocatable := totalHugepagesSize - systemReserved
   711  			defaultMachineState[node.Id].MemoryMap[resourceName] = &state.MemoryTable{
   712  				Allocatable:    allocatable,
   713  				Free:           allocatable,
   714  				Reserved:       0,
   715  				SystemReserved: systemReserved,
   716  				TotalMemSize:   totalHugepagesSize,
   717  			}
   718  			if _, ok := nodeHugepages[node.Id]; !ok {
   719  				nodeHugepages[node.Id] = 0
   720  			}
   721  			nodeHugepages[node.Id] += totalHugepagesSize
   722  		}
   723  
   724  		// fill memory table with regular memory values
   725  		systemReserved := p.getResourceSystemReserved(node.Id, v1.ResourceMemory)
   726  
   727  		allocatable := node.Memory - systemReserved
   728  		// remove memory allocated by hugepages
   729  		if allocatedByHugepages, ok := nodeHugepages[node.Id]; ok {
   730  			allocatable -= allocatedByHugepages
   731  		}
   732  		defaultMachineState[node.Id].MemoryMap[v1.ResourceMemory] = &state.MemoryTable{
   733  			Allocatable:    allocatable,
   734  			Free:           allocatable,
   735  			Reserved:       0,
   736  			SystemReserved: systemReserved,
   737  			TotalMemSize:   node.Memory,
   738  		}
   739  	}
   740  	return defaultMachineState
   741  }
   742  
   743  func (p *staticPolicy) getResourceSystemReserved(nodeID int, resourceName v1.ResourceName) uint64 {
   744  	var systemReserved uint64
   745  	if nodeSystemReserved, ok := p.systemReserved[nodeID]; ok {
   746  		if nodeMemorySystemReserved, ok := nodeSystemReserved[resourceName]; ok {
   747  			systemReserved = nodeMemorySystemReserved
   748  		}
   749  	}
   750  	return systemReserved
   751  }
   752  
   753  func (p *staticPolicy) getDefaultHint(machineState state.NUMANodeMap, pod *v1.Pod, requestedResources map[v1.ResourceName]uint64) (*topologymanager.TopologyHint, error) {
   754  	hints := p.calculateHints(machineState, pod, requestedResources)
   755  	if len(hints) < 1 {
   756  		return nil, fmt.Errorf("[memorymanager] failed to get the default NUMA affinity, no NUMA nodes with enough memory is available")
   757  	}
   758  
   759  	// hints for all memory types should be the same, so we will check hints only for regular memory type
   760  	return findBestHint(hints[string(v1.ResourceMemory)]), nil
   761  }
   762  
   763  func isAffinitySatisfyRequest(machineState state.NUMANodeMap, mask bitmask.BitMask, requestedResources map[v1.ResourceName]uint64) bool {
   764  	totalFreeSize := map[v1.ResourceName]uint64{}
   765  	for _, nodeID := range mask.GetBits() {
   766  		for resourceName := range requestedResources {
   767  			if _, ok := totalFreeSize[resourceName]; !ok {
   768  				totalFreeSize[resourceName] = 0
   769  			}
   770  			totalFreeSize[resourceName] += machineState[nodeID].MemoryMap[resourceName].Free
   771  		}
   772  	}
   773  
   774  	// verify that for all memory types the node mask has enough resources
   775  	for resourceName, requestedSize := range requestedResources {
   776  		if totalFreeSize[resourceName] < requestedSize {
   777  			return false
   778  		}
   779  	}
   780  
   781  	return true
   782  }
   783  
   784  // extendTopologyManagerHint extends the topology manager hint, in case when it does not satisfy to the container request
   785  // the topology manager uses bitwise AND to merge all topology hints into the best one, so in case of the restricted policy,
   786  // it possible that we will get the subset of hint that we provided to the topology manager, in this case we want to extend
   787  // it to the original one
   788  func (p *staticPolicy) extendTopologyManagerHint(machineState state.NUMANodeMap, pod *v1.Pod, requestedResources map[v1.ResourceName]uint64, mask bitmask.BitMask) (*topologymanager.TopologyHint, error) {
   789  	hints := p.calculateHints(machineState, pod, requestedResources)
   790  
   791  	var filteredHints []topologymanager.TopologyHint
   792  	// hints for all memory types should be the same, so we will check hints only for regular memory type
   793  	for _, hint := range hints[string(v1.ResourceMemory)] {
   794  		affinityBits := hint.NUMANodeAffinity.GetBits()
   795  		// filter all hints that does not include currentHint
   796  		if isHintInGroup(mask.GetBits(), affinityBits) {
   797  			filteredHints = append(filteredHints, hint)
   798  		}
   799  	}
   800  
   801  	if len(filteredHints) < 1 {
   802  		return nil, fmt.Errorf("[memorymanager] failed to find NUMA nodes to extend the current topology hint")
   803  	}
   804  
   805  	// try to find the preferred hint with the minimal number of NUMA nodes, relevant for the restricted policy
   806  	return findBestHint(filteredHints), nil
   807  }
   808  
   809  func isHintInGroup(hint []int, group []int) bool {
   810  	sort.Ints(hint)
   811  	sort.Ints(group)
   812  
   813  	hintIndex := 0
   814  	for i := range group {
   815  		if hintIndex == len(hint) {
   816  			return true
   817  		}
   818  
   819  		if group[i] != hint[hintIndex] {
   820  			continue
   821  		}
   822  		hintIndex++
   823  	}
   824  
   825  	return hintIndex == len(hint)
   826  }
   827  
   828  func findBestHint(hints []topologymanager.TopologyHint) *topologymanager.TopologyHint {
   829  	// try to find the preferred hint with the minimal number of NUMA nodes, relevant for the restricted policy
   830  	bestHint := topologymanager.TopologyHint{}
   831  	for _, hint := range hints {
   832  		if bestHint.NUMANodeAffinity == nil {
   833  			bestHint = hint
   834  			continue
   835  		}
   836  
   837  		// preferred of the current hint is true, when the extendedHint preferred is false
   838  		if hint.Preferred && !bestHint.Preferred {
   839  			bestHint = hint
   840  			continue
   841  		}
   842  
   843  		// both hints has the same preferred value, but the current hint has less NUMA nodes than the extended one
   844  		if hint.Preferred == bestHint.Preferred && hint.NUMANodeAffinity.IsNarrowerThan(bestHint.NUMANodeAffinity) {
   845  			bestHint = hint
   846  		}
   847  	}
   848  	return &bestHint
   849  }
   850  
   851  // GetAllocatableMemory returns the amount of allocatable memory for each NUMA node
   852  func (p *staticPolicy) GetAllocatableMemory(s state.State) []state.Block {
   853  	var allocatableMemory []state.Block
   854  	machineState := s.GetMachineState()
   855  	for numaNodeID, numaNodeState := range machineState {
   856  		for resourceName, memoryTable := range numaNodeState.MemoryMap {
   857  			if memoryTable.Allocatable == 0 {
   858  				continue
   859  			}
   860  
   861  			block := state.Block{
   862  				NUMAAffinity: []int{numaNodeID},
   863  				Type:         resourceName,
   864  				Size:         memoryTable.Allocatable,
   865  			}
   866  			allocatableMemory = append(allocatableMemory, block)
   867  		}
   868  	}
   869  	return allocatableMemory
   870  }
   871  
   872  func (p *staticPolicy) updatePodReusableMemory(pod *v1.Pod, container *v1.Container, memoryBlocks []state.Block) {
   873  	podUID := string(pod.UID)
   874  
   875  	// If pod entries to m.initContainersReusableMemory other than the current pod exist, delete them.
   876  	for uid := range p.initContainersReusableMemory {
   877  		if podUID != uid {
   878  			delete(p.initContainersReusableMemory, uid)
   879  		}
   880  	}
   881  
   882  	if isRegularInitContainer(pod, container) {
   883  		if _, ok := p.initContainersReusableMemory[podUID]; !ok {
   884  			p.initContainersReusableMemory[podUID] = map[string]map[v1.ResourceName]uint64{}
   885  		}
   886  
   887  		for _, block := range memoryBlocks {
   888  			blockBitMask, _ := bitmask.NewBitMask(block.NUMAAffinity...)
   889  			blockBitMaskString := blockBitMask.String()
   890  
   891  			if _, ok := p.initContainersReusableMemory[podUID][blockBitMaskString]; !ok {
   892  				p.initContainersReusableMemory[podUID][blockBitMaskString] = map[v1.ResourceName]uint64{}
   893  			}
   894  
   895  			if blockReusableMemory := p.initContainersReusableMemory[podUID][blockBitMaskString][block.Type]; block.Size > blockReusableMemory {
   896  				p.initContainersReusableMemory[podUID][blockBitMaskString][block.Type] = block.Size
   897  			}
   898  		}
   899  
   900  		return
   901  	}
   902  
   903  	// update re-usable memory once it used by the app container
   904  	for _, block := range memoryBlocks {
   905  		blockBitMask, _ := bitmask.NewBitMask(block.NUMAAffinity...)
   906  		if podReusableMemory := p.getPodReusableMemory(pod, blockBitMask, block.Type); podReusableMemory != 0 {
   907  			if block.Size >= podReusableMemory {
   908  				p.initContainersReusableMemory[podUID][blockBitMask.String()][block.Type] = 0
   909  			} else {
   910  				p.initContainersReusableMemory[podUID][blockBitMask.String()][block.Type] -= block.Size
   911  			}
   912  		}
   913  	}
   914  }
   915  
   916  func (p *staticPolicy) updateInitContainersMemoryBlocks(s state.State, pod *v1.Pod, container *v1.Container, containerMemoryBlocks []state.Block) {
   917  	podUID := string(pod.UID)
   918  
   919  	for _, containerBlock := range containerMemoryBlocks {
   920  		blockSize := containerBlock.Size
   921  		for _, initContainer := range pod.Spec.InitContainers {
   922  			// we do not want to continue updates once we reach the current container
   923  			if initContainer.Name == container.Name {
   924  				break
   925  			}
   926  
   927  			if blockSize == 0 {
   928  				break
   929  			}
   930  
   931  			if types.IsRestartableInitContainer(&initContainer) {
   932  				// we should not reuse the resource from any restartable init
   933  				// container
   934  				continue
   935  			}
   936  
   937  			initContainerBlocks := s.GetMemoryBlocks(podUID, initContainer.Name)
   938  			if len(initContainerBlocks) == 0 {
   939  				continue
   940  			}
   941  
   942  			for i := range initContainerBlocks {
   943  				initContainerBlock := &initContainerBlocks[i]
   944  				if initContainerBlock.Size == 0 {
   945  					continue
   946  				}
   947  
   948  				if initContainerBlock.Type != containerBlock.Type {
   949  					continue
   950  				}
   951  
   952  				if !isNUMAAffinitiesEqual(initContainerBlock.NUMAAffinity, containerBlock.NUMAAffinity) {
   953  					continue
   954  				}
   955  
   956  				if initContainerBlock.Size > blockSize {
   957  					initContainerBlock.Size -= blockSize
   958  					blockSize = 0
   959  				} else {
   960  					blockSize -= initContainerBlock.Size
   961  					initContainerBlock.Size = 0
   962  				}
   963  			}
   964  
   965  			s.SetMemoryBlocks(podUID, initContainer.Name, initContainerBlocks)
   966  		}
   967  	}
   968  }
   969  
   970  func isRegularInitContainer(pod *v1.Pod, container *v1.Container) bool {
   971  	for _, initContainer := range pod.Spec.InitContainers {
   972  		if initContainer.Name == container.Name {
   973  			return !types.IsRestartableInitContainer(&initContainer)
   974  		}
   975  	}
   976  
   977  	return false
   978  }
   979  
   980  func isNUMAAffinitiesEqual(numaAffinity1, numaAffinity2 []int) bool {
   981  	bitMask1, err := bitmask.NewBitMask(numaAffinity1...)
   982  	if err != nil {
   983  		klog.ErrorS(err, "failed to create bit mask", "numaAffinity1", numaAffinity1)
   984  		return false
   985  	}
   986  
   987  	bitMask2, err := bitmask.NewBitMask(numaAffinity2...)
   988  	if err != nil {
   989  		klog.ErrorS(err, "failed to create bit mask", "numaAffinity2", numaAffinity2)
   990  		return false
   991  	}
   992  
   993  	return bitMask1.IsEqual(bitMask2)
   994  }
   995  

View as plain text