1
16
17 package devicemanager
18
19 import (
20 "k8s.io/api/core/v1"
21 "k8s.io/apimachinery/pkg/util/sets"
22 "k8s.io/klog/v2"
23 pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
24
25 "k8s.io/kubernetes/pkg/api/v1/resource"
26 "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
27 "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask"
28 )
29
30
31
32
33 func (m *ManagerImpl) GetTopologyHints(pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {
34
35
36 m.setPodPendingAdmission(pod)
37
38
39 m.UpdateAllocatedDevices()
40
41
42 deviceHints := make(map[string][]topologymanager.TopologyHint)
43 accumulatedResourceRequests := m.getContainerDeviceRequest(container)
44
45 m.mutex.Lock()
46 defer m.mutex.Unlock()
47 for resource, requested := range accumulatedResourceRequests {
48
49 if aligned := m.deviceHasTopologyAlignment(resource); !aligned {
50 klog.InfoS("Resource does not have a topology preference", "resource", resource)
51 deviceHints[resource] = nil
52 continue
53 }
54
55
56
57
58 allocated := m.podDevices.containerDevices(string(pod.UID), container.Name, resource)
59 if allocated.Len() > 0 {
60 if allocated.Len() != requested {
61 klog.ErrorS(nil, "Resource already allocated to pod with different number than request", "resource", resource, "pod", klog.KObj(pod), "containerName", container.Name, "request", requested, "allocated", allocated.Len())
62 deviceHints[resource] = []topologymanager.TopologyHint{}
63 continue
64 }
65 klog.InfoS("Regenerating TopologyHints for resource already allocated to pod", "resource", resource, "pod", klog.KObj(pod), "containerName", container.Name)
66 deviceHints[resource] = m.generateDeviceTopologyHints(resource, allocated, sets.Set[string]{}, requested)
67 continue
68 }
69
70
71 available := m.getAvailableDevices(resource)
72 reusable := m.devicesToReuse[string(pod.UID)][resource]
73 if available.Union(reusable).Len() < requested {
74 klog.ErrorS(nil, "Unable to generate topology hints: requested number of devices unavailable", "resource", resource, "request", requested, "available", available.Union(reusable).Len())
75 deviceHints[resource] = []topologymanager.TopologyHint{}
76 continue
77 }
78
79
80
81 deviceHints[resource] = m.generateDeviceTopologyHints(resource, available, reusable, requested)
82 }
83
84 return deviceHints
85 }
86
87
88
89 func (m *ManagerImpl) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.TopologyHint {
90
91
92 m.setPodPendingAdmission(pod)
93
94
95 m.UpdateAllocatedDevices()
96
97 deviceHints := make(map[string][]topologymanager.TopologyHint)
98 accumulatedResourceRequests := m.getPodDeviceRequest(pod)
99
100 m.mutex.Lock()
101 defer m.mutex.Unlock()
102 for resource, requested := range accumulatedResourceRequests {
103
104 if aligned := m.deviceHasTopologyAlignment(resource); !aligned {
105 klog.InfoS("Resource does not have a topology preference", "resource", resource)
106 deviceHints[resource] = nil
107 continue
108 }
109
110
111
112
113 allocated := m.podDevices.podDevices(string(pod.UID), resource)
114 if allocated.Len() > 0 {
115 if allocated.Len() != requested {
116 klog.ErrorS(nil, "Resource already allocated to pod with different number than request", "resource", resource, "pod", klog.KObj(pod), "request", requested, "allocated", allocated.Len())
117 deviceHints[resource] = []topologymanager.TopologyHint{}
118 continue
119 }
120 klog.InfoS("Regenerating TopologyHints for resource already allocated to pod", "resource", resource, "pod", klog.KObj(pod))
121 deviceHints[resource] = m.generateDeviceTopologyHints(resource, allocated, sets.Set[string]{}, requested)
122 continue
123 }
124
125
126 available := m.getAvailableDevices(resource)
127 if available.Len() < requested {
128 klog.ErrorS(nil, "Unable to generate topology hints: requested number of devices unavailable", "resource", resource, "request", requested, "available", available.Len())
129 deviceHints[resource] = []topologymanager.TopologyHint{}
130 continue
131 }
132
133
134
135 deviceHints[resource] = m.generateDeviceTopologyHints(resource, available, sets.Set[string]{}, requested)
136 }
137
138 return deviceHints
139 }
140
141 func (m *ManagerImpl) deviceHasTopologyAlignment(resource string) bool {
142
143 for _, device := range m.allDevices[resource] {
144 if device.Topology != nil && len(device.Topology.Nodes) > 0 {
145 return true
146 }
147 }
148 return false
149 }
150
151 func (m *ManagerImpl) getAvailableDevices(resource string) sets.Set[string] {
152
153 return m.healthyDevices[resource].Difference(m.allocatedDevices[resource])
154 }
155
156 func (m *ManagerImpl) generateDeviceTopologyHints(resource string, available sets.Set[string], reusable sets.Set[string], request int) []topologymanager.TopologyHint {
157
158 minAffinitySize := len(m.numaNodes)
159
160
161 hints := []topologymanager.TopologyHint{}
162 bitmask.IterateBitMasks(m.numaNodes, func(mask bitmask.BitMask) {
163
164 devicesInMask := 0
165 for _, device := range m.allDevices[resource] {
166 if mask.AnySet(m.getNUMANodeIds(device.Topology)) {
167 devicesInMask++
168 }
169 }
170 if devicesInMask >= request && mask.Count() < minAffinitySize {
171 minAffinitySize = mask.Count()
172 }
173
174
175 numMatching := 0
176 for d := range reusable {
177
178 if m.allDevices[resource][d].Topology == nil {
179 continue
180 }
181
182 if !mask.AnySet(m.getNUMANodeIds(m.allDevices[resource][d].Topology)) {
183 return
184 }
185 numMatching++
186 }
187
188
189
190 for d := range available {
191 if mask.AnySet(m.getNUMANodeIds(m.allDevices[resource][d].Topology)) {
192 numMatching++
193 }
194 }
195
196
197 if numMatching < request {
198 return
199 }
200
201
202
203
204 hints = append(hints, topologymanager.TopologyHint{
205 NUMANodeAffinity: mask,
206 Preferred: false,
207 })
208 })
209
210
211
212
213
214 for i := range hints {
215 if hints[i].NUMANodeAffinity.Count() == minAffinitySize {
216 hints[i].Preferred = true
217 }
218 }
219
220 return hints
221 }
222
223 func (m *ManagerImpl) getNUMANodeIds(topology *pluginapi.TopologyInfo) []int {
224 if topology == nil {
225 return nil
226 }
227 var ids []int
228 for _, n := range topology.Nodes {
229 ids = append(ids, int(n.ID))
230 }
231 return ids
232 }
233
234 func (m *ManagerImpl) getPodDeviceRequest(pod *v1.Pod) map[string]int {
235
236 limits := resource.PodLimits(pod, resource.PodResourcesOptions{
237 ExcludeOverhead: true,
238 })
239 podRequests := make(map[string]int)
240 for resourceName, quantity := range limits {
241 if !m.isDevicePluginResource(string(resourceName)) {
242 continue
243 }
244 podRequests[string(resourceName)] = int(quantity.Value())
245 }
246 return podRequests
247 }
248
249 func (m *ManagerImpl) getContainerDeviceRequest(container *v1.Container) map[string]int {
250 containerRequests := make(map[string]int)
251 for resourceObj, requestedObj := range container.Resources.Limits {
252 resource := string(resourceObj)
253 requested := int(requestedObj.Value())
254 if !m.isDevicePluginResource(resource) {
255 continue
256 }
257 containerRequests[resource] = requested
258 }
259 return containerRequests
260 }
261
View as plain text