...

Source file src/k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/topology_hints.go

Documentation: k8s.io/kubernetes/pkg/kubelet/cm/devicemanager

     1  /*
     2  Copyright 2019 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package devicemanager
    18  
    19  import (
    20  	"k8s.io/api/core/v1"
    21  	"k8s.io/apimachinery/pkg/util/sets"
    22  	"k8s.io/klog/v2"
    23  	pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
    24  
    25  	"k8s.io/kubernetes/pkg/api/v1/resource"
    26  	"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
    27  	"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask"
    28  )
    29  
    30  // GetTopologyHints implements the TopologyManager HintProvider Interface which
    31  // ensures the Device Manager is consulted when Topology Aware Hints for each
    32  // container are created.
    33  func (m *ManagerImpl) GetTopologyHints(pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {
    34  	// The pod is during the admission phase. We need to save the pod to avoid it
    35  	// being cleaned before the admission ended
    36  	m.setPodPendingAdmission(pod)
    37  
    38  	// Garbage collect any stranded device resources before providing TopologyHints
    39  	m.UpdateAllocatedDevices()
    40  
    41  	// Loop through all device resources and generate TopologyHints for them.
    42  	deviceHints := make(map[string][]topologymanager.TopologyHint)
    43  	accumulatedResourceRequests := m.getContainerDeviceRequest(container)
    44  
    45  	m.mutex.Lock()
    46  	defer m.mutex.Unlock()
    47  	for resource, requested := range accumulatedResourceRequests {
    48  		// Only consider devices that actually contain topology information.
    49  		if aligned := m.deviceHasTopologyAlignment(resource); !aligned {
    50  			klog.InfoS("Resource does not have a topology preference", "resource", resource)
    51  			deviceHints[resource] = nil
    52  			continue
    53  		}
    54  
    55  		// Short circuit to regenerate the same hints if there are already
    56  		// devices allocated to the Container. This might happen after a
    57  		// kubelet restart, for example.
    58  		allocated := m.podDevices.containerDevices(string(pod.UID), container.Name, resource)
    59  		if allocated.Len() > 0 {
    60  			if allocated.Len() != requested {
    61  				klog.ErrorS(nil, "Resource already allocated to pod with different number than request", "resource", resource, "pod", klog.KObj(pod), "containerName", container.Name, "request", requested, "allocated", allocated.Len())
    62  				deviceHints[resource] = []topologymanager.TopologyHint{}
    63  				continue
    64  			}
    65  			klog.InfoS("Regenerating TopologyHints for resource already allocated to pod", "resource", resource, "pod", klog.KObj(pod), "containerName", container.Name)
    66  			deviceHints[resource] = m.generateDeviceTopologyHints(resource, allocated, sets.Set[string]{}, requested)
    67  			continue
    68  		}
    69  
    70  		// Get the list of available devices, for which TopologyHints should be generated.
    71  		available := m.getAvailableDevices(resource)
    72  		reusable := m.devicesToReuse[string(pod.UID)][resource]
    73  		if available.Union(reusable).Len() < requested {
    74  			klog.ErrorS(nil, "Unable to generate topology hints: requested number of devices unavailable", "resource", resource, "request", requested, "available", available.Union(reusable).Len())
    75  			deviceHints[resource] = []topologymanager.TopologyHint{}
    76  			continue
    77  		}
    78  
    79  		// Generate TopologyHints for this resource given the current
    80  		// request size and the list of available devices.
    81  		deviceHints[resource] = m.generateDeviceTopologyHints(resource, available, reusable, requested)
    82  	}
    83  
    84  	return deviceHints
    85  }
    86  
    87  // GetPodTopologyHints implements the topologymanager.HintProvider Interface which
    88  // ensures the Device Manager is consulted when Topology Aware Hints for Pod are created.
    89  func (m *ManagerImpl) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.TopologyHint {
    90  	// The pod is during the admission phase. We need to save the pod to avoid it
    91  	// being cleaned before the admission ended
    92  	m.setPodPendingAdmission(pod)
    93  
    94  	// Garbage collect any stranded device resources before providing TopologyHints
    95  	m.UpdateAllocatedDevices()
    96  
    97  	deviceHints := make(map[string][]topologymanager.TopologyHint)
    98  	accumulatedResourceRequests := m.getPodDeviceRequest(pod)
    99  
   100  	m.mutex.Lock()
   101  	defer m.mutex.Unlock()
   102  	for resource, requested := range accumulatedResourceRequests {
   103  		// Only consider devices that actually contain topology information.
   104  		if aligned := m.deviceHasTopologyAlignment(resource); !aligned {
   105  			klog.InfoS("Resource does not have a topology preference", "resource", resource)
   106  			deviceHints[resource] = nil
   107  			continue
   108  		}
   109  
   110  		// Short circuit to regenerate the same hints if there are already
   111  		// devices allocated to the Pod. This might happen after a
   112  		// kubelet restart, for example.
   113  		allocated := m.podDevices.podDevices(string(pod.UID), resource)
   114  		if allocated.Len() > 0 {
   115  			if allocated.Len() != requested {
   116  				klog.ErrorS(nil, "Resource already allocated to pod with different number than request", "resource", resource, "pod", klog.KObj(pod), "request", requested, "allocated", allocated.Len())
   117  				deviceHints[resource] = []topologymanager.TopologyHint{}
   118  				continue
   119  			}
   120  			klog.InfoS("Regenerating TopologyHints for resource already allocated to pod", "resource", resource, "pod", klog.KObj(pod))
   121  			deviceHints[resource] = m.generateDeviceTopologyHints(resource, allocated, sets.Set[string]{}, requested)
   122  			continue
   123  		}
   124  
   125  		// Get the list of available devices, for which TopologyHints should be generated.
   126  		available := m.getAvailableDevices(resource)
   127  		if available.Len() < requested {
   128  			klog.ErrorS(nil, "Unable to generate topology hints: requested number of devices unavailable", "resource", resource, "request", requested, "available", available.Len())
   129  			deviceHints[resource] = []topologymanager.TopologyHint{}
   130  			continue
   131  		}
   132  
   133  		// Generate TopologyHints for this resource given the current
   134  		// request size and the list of available devices.
   135  		deviceHints[resource] = m.generateDeviceTopologyHints(resource, available, sets.Set[string]{}, requested)
   136  	}
   137  
   138  	return deviceHints
   139  }
   140  
   141  func (m *ManagerImpl) deviceHasTopologyAlignment(resource string) bool {
   142  	// If any device has Topology NUMANodes available, we assume they care about alignment.
   143  	for _, device := range m.allDevices[resource] {
   144  		if device.Topology != nil && len(device.Topology.Nodes) > 0 {
   145  			return true
   146  		}
   147  	}
   148  	return false
   149  }
   150  
   151  func (m *ManagerImpl) getAvailableDevices(resource string) sets.Set[string] {
   152  	// Strip all devices in use from the list of healthy ones.
   153  	return m.healthyDevices[resource].Difference(m.allocatedDevices[resource])
   154  }
   155  
   156  func (m *ManagerImpl) generateDeviceTopologyHints(resource string, available sets.Set[string], reusable sets.Set[string], request int) []topologymanager.TopologyHint {
   157  	// Initialize minAffinitySize to include all NUMA Nodes
   158  	minAffinitySize := len(m.numaNodes)
   159  
   160  	// Iterate through all combinations of NUMA Nodes and build hints from them.
   161  	hints := []topologymanager.TopologyHint{}
   162  	bitmask.IterateBitMasks(m.numaNodes, func(mask bitmask.BitMask) {
   163  		// First, update minAffinitySize for the current request size.
   164  		devicesInMask := 0
   165  		for _, device := range m.allDevices[resource] {
   166  			if mask.AnySet(m.getNUMANodeIds(device.Topology)) {
   167  				devicesInMask++
   168  			}
   169  		}
   170  		if devicesInMask >= request && mask.Count() < minAffinitySize {
   171  			minAffinitySize = mask.Count()
   172  		}
   173  
   174  		// Then check to see if all the reusable devices are part of the bitmask.
   175  		numMatching := 0
   176  		for d := range reusable {
   177  			// Skip the device if it doesn't specify any topology info.
   178  			if m.allDevices[resource][d].Topology == nil {
   179  				continue
   180  			}
   181  			// Otherwise disregard this mask if its NUMANode isn't part of it.
   182  			if !mask.AnySet(m.getNUMANodeIds(m.allDevices[resource][d].Topology)) {
   183  				return
   184  			}
   185  			numMatching++
   186  		}
   187  
   188  		// Finally, check to see if enough available devices remain on the
   189  		// current NUMA node combination to satisfy the device request.
   190  		for d := range available {
   191  			if mask.AnySet(m.getNUMANodeIds(m.allDevices[resource][d].Topology)) {
   192  				numMatching++
   193  			}
   194  		}
   195  
   196  		// If they don't, then move onto the next combination.
   197  		if numMatching < request {
   198  			return
   199  		}
   200  
   201  		// Otherwise, create a new hint from the NUMA mask and add it to the
   202  		// list of hints.  We set all hint preferences to 'false' on the first
   203  		// pass through.
   204  		hints = append(hints, topologymanager.TopologyHint{
   205  			NUMANodeAffinity: mask,
   206  			Preferred:        false,
   207  		})
   208  	})
   209  
   210  	// Loop back through all hints and update the 'Preferred' field based on
   211  	// counting the number of bits sets in the affinity mask and comparing it
   212  	// to the minAffinity. Only those with an equal number of bits set will be
   213  	// considered preferred.
   214  	for i := range hints {
   215  		if hints[i].NUMANodeAffinity.Count() == minAffinitySize {
   216  			hints[i].Preferred = true
   217  		}
   218  	}
   219  
   220  	return hints
   221  }
   222  
   223  func (m *ManagerImpl) getNUMANodeIds(topology *pluginapi.TopologyInfo) []int {
   224  	if topology == nil {
   225  		return nil
   226  	}
   227  	var ids []int
   228  	for _, n := range topology.Nodes {
   229  		ids = append(ids, int(n.ID))
   230  	}
   231  	return ids
   232  }
   233  
   234  func (m *ManagerImpl) getPodDeviceRequest(pod *v1.Pod) map[string]int {
   235  	// for these device plugin resources, requests == limits
   236  	limits := resource.PodLimits(pod, resource.PodResourcesOptions{
   237  		ExcludeOverhead: true,
   238  	})
   239  	podRequests := make(map[string]int)
   240  	for resourceName, quantity := range limits {
   241  		if !m.isDevicePluginResource(string(resourceName)) {
   242  			continue
   243  		}
   244  		podRequests[string(resourceName)] = int(quantity.Value())
   245  	}
   246  	return podRequests
   247  }
   248  
   249  func (m *ManagerImpl) getContainerDeviceRequest(container *v1.Container) map[string]int {
   250  	containerRequests := make(map[string]int)
   251  	for resourceObj, requestedObj := range container.Resources.Limits {
   252  		resource := string(resourceObj)
   253  		requested := int(requestedObj.Value())
   254  		if !m.isDevicePluginResource(resource) {
   255  			continue
   256  		}
   257  		containerRequests[resource] = requested
   258  	}
   259  	return containerRequests
   260  }
   261  

View as plain text