1
16
17 package devicemanager
18
19 import (
20 "sync"
21
22 "k8s.io/klog/v2"
23
24 "k8s.io/apimachinery/pkg/types"
25 "k8s.io/apimachinery/pkg/util/sets"
26 utilfeature "k8s.io/apiserver/pkg/util/feature"
27 pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
28 kubefeatures "k8s.io/kubernetes/pkg/features"
29 "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint"
30 "k8s.io/kubernetes/pkg/kubelet/cm/util/cdi"
31 kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
32 )
33
34 type deviceAllocateInfo struct {
35
36 deviceIds checkpoint.DevicesPerNUMA
37
38 allocResp *pluginapi.ContainerAllocateResponse
39 }
40
41 type resourceAllocateInfo map[string]deviceAllocateInfo
42 type containerDevices map[string]resourceAllocateInfo
43 type podDevices struct {
44 sync.RWMutex
45 devs map[string]containerDevices
46 }
47
48
49
50
51 func newPodDevices() *podDevices {
52 return &podDevices{devs: make(map[string]containerDevices)}
53 }
54
55 func (pdev *podDevices) pods() sets.Set[string] {
56 pdev.RLock()
57 defer pdev.RUnlock()
58 ret := sets.New[string]()
59 for k := range pdev.devs {
60 ret.Insert(k)
61 }
62 return ret
63 }
64
65 func (pdev *podDevices) size() int {
66 pdev.RLock()
67 defer pdev.RUnlock()
68 return len(pdev.devs)
69 }
70
71 func (pdev *podDevices) hasPod(podUID string) bool {
72 pdev.RLock()
73 defer pdev.RUnlock()
74 _, podExists := pdev.devs[podUID]
75 return podExists
76 }
77
78 func (pdev *podDevices) insert(podUID, contName, resource string, devices checkpoint.DevicesPerNUMA, resp *pluginapi.ContainerAllocateResponse) {
79 pdev.Lock()
80 defer pdev.Unlock()
81 if _, podExists := pdev.devs[podUID]; !podExists {
82 pdev.devs[podUID] = make(containerDevices)
83 }
84 if _, contExists := pdev.devs[podUID][contName]; !contExists {
85 pdev.devs[podUID][contName] = make(resourceAllocateInfo)
86 }
87 pdev.devs[podUID][contName][resource] = deviceAllocateInfo{
88 deviceIds: devices,
89 allocResp: resp,
90 }
91 }
92
93 func (pdev *podDevices) delete(pods []string) {
94 pdev.Lock()
95 defer pdev.Unlock()
96 for _, uid := range pods {
97 delete(pdev.devs, uid)
98 }
99 }
100
101
102
103 func (pdev *podDevices) podDevices(podUID, resource string) sets.Set[string] {
104 pdev.RLock()
105 defer pdev.RUnlock()
106
107 ret := sets.New[string]()
108 for contName := range pdev.devs[podUID] {
109 ret = ret.Union(pdev.containerDevices(podUID, contName, resource))
110 }
111 return ret
112 }
113
114
115
116 func (pdev *podDevices) containerDevices(podUID, contName, resource string) sets.Set[string] {
117 pdev.RLock()
118 defer pdev.RUnlock()
119 if _, podExists := pdev.devs[podUID]; !podExists {
120 return nil
121 }
122 if _, contExists := pdev.devs[podUID][contName]; !contExists {
123 return nil
124 }
125 devs, resourceExists := pdev.devs[podUID][contName][resource]
126 if !resourceExists {
127 return nil
128 }
129 return devs.deviceIds.Devices()
130 }
131
132
133 func (pdev *podDevices) addContainerAllocatedResources(podUID, contName string, allocatedResources map[string]sets.Set[string]) {
134 pdev.RLock()
135 defer pdev.RUnlock()
136 containers, exists := pdev.devs[podUID]
137 if !exists {
138 return
139 }
140 resources, exists := containers[contName]
141 if !exists {
142 return
143 }
144 for resource, devices := range resources {
145 allocatedResources[resource] = allocatedResources[resource].Union(devices.deviceIds.Devices())
146 }
147 }
148
149
150 func (pdev *podDevices) removeContainerAllocatedResources(podUID, contName string, allocatedResources map[string]sets.Set[string]) {
151 pdev.RLock()
152 defer pdev.RUnlock()
153 containers, exists := pdev.devs[podUID]
154 if !exists {
155 return
156 }
157 resources, exists := containers[contName]
158 if !exists {
159 return
160 }
161 for resource, devices := range resources {
162 allocatedResources[resource] = allocatedResources[resource].Difference(devices.deviceIds.Devices())
163 }
164 }
165
166
167 func (pdev *podDevices) devices() map[string]sets.Set[string] {
168 ret := make(map[string]sets.Set[string])
169 pdev.RLock()
170 defer pdev.RUnlock()
171 for _, containerDevices := range pdev.devs {
172 for _, resources := range containerDevices {
173 for resource, devices := range resources {
174 if _, exists := ret[resource]; !exists {
175 ret[resource] = sets.New[string]()
176 }
177 if devices.allocResp != nil {
178 ret[resource] = ret[resource].Union(devices.deviceIds.Devices())
179 }
180 }
181 }
182 }
183 return ret
184 }
185
186
187 func (pdev *podDevices) toCheckpointData() []checkpoint.PodDevicesEntry {
188 var data []checkpoint.PodDevicesEntry
189 pdev.RLock()
190 defer pdev.RUnlock()
191 for podUID, containerDevices := range pdev.devs {
192 for conName, resources := range containerDevices {
193 for resource, devices := range resources {
194 if devices.allocResp == nil {
195 klog.ErrorS(nil, "Can't marshal allocResp, allocation response is missing", "podUID", podUID, "containerName", conName, "resourceName", resource)
196 continue
197 }
198
199 allocResp, err := devices.allocResp.Marshal()
200 if err != nil {
201 klog.ErrorS(err, "Can't marshal allocResp", "podUID", podUID, "containerName", conName, "resourceName", resource)
202 continue
203 }
204 data = append(data, checkpoint.PodDevicesEntry{
205 PodUID: podUID,
206 ContainerName: conName,
207 ResourceName: resource,
208 DeviceIDs: devices.deviceIds,
209 AllocResp: allocResp})
210 }
211 }
212 }
213 return data
214 }
215
216
217 func (pdev *podDevices) fromCheckpointData(data []checkpoint.PodDevicesEntry) {
218 for _, entry := range data {
219 klog.V(2).InfoS("Get checkpoint entry",
220 "podUID", entry.PodUID, "containerName", entry.ContainerName,
221 "resourceName", entry.ResourceName, "deviceIDs", entry.DeviceIDs, "allocated", entry.AllocResp)
222 allocResp := &pluginapi.ContainerAllocateResponse{}
223 err := allocResp.Unmarshal(entry.AllocResp)
224 if err != nil {
225 klog.ErrorS(err, "Can't unmarshal allocResp", "podUID", entry.PodUID, "containerName", entry.ContainerName, "resourceName", entry.ResourceName)
226 continue
227 }
228 pdev.insert(entry.PodUID, entry.ContainerName, entry.ResourceName, entry.DeviceIDs, allocResp)
229 }
230 }
231
232
233 func (pdev *podDevices) deviceRunContainerOptions(podUID, contName string) *DeviceRunContainerOptions {
234 pdev.RLock()
235 defer pdev.RUnlock()
236
237 containers, exists := pdev.devs[podUID]
238 if !exists {
239 return nil
240 }
241 resources, exists := containers[contName]
242 if !exists {
243 return nil
244 }
245 opts := &DeviceRunContainerOptions{}
246
247 devsMap := make(map[string]string)
248 mountsMap := make(map[string]string)
249 envsMap := make(map[string]string)
250 annotationsMap := make(map[string]string)
251
252 allCDIDevices := sets.New[string]()
253
254 for _, devices := range resources {
255 resp := devices.allocResp
256
257
258
259
260
261
262
263
264 for k, v := range resp.Envs {
265 if e, ok := envsMap[k]; ok {
266 klog.V(4).InfoS("Skip existing env", "envKey", k, "envValue", v)
267 if e != v {
268 klog.ErrorS(nil, "Environment variable has conflicting setting", "envKey", k, "expected", v, "got", e)
269 }
270 continue
271 }
272 klog.V(4).InfoS("Add env", "envKey", k, "envValue", v)
273 envsMap[k] = v
274 opts.Envs = append(opts.Envs, kubecontainer.EnvVar{Name: k, Value: v})
275 }
276
277
278 for _, dev := range resp.Devices {
279 if d, ok := devsMap[dev.ContainerPath]; ok {
280 klog.V(4).InfoS("Skip existing device", "containerPath", dev.ContainerPath, "hostPath", dev.HostPath)
281 if d != dev.HostPath {
282 klog.ErrorS(nil, "Container device has conflicting mapping host devices",
283 "containerPath", dev.ContainerPath, "got", d, "expected", dev.HostPath)
284 }
285 continue
286 }
287 klog.V(4).InfoS("Add device", "containerPath", dev.ContainerPath, "hostPath", dev.HostPath)
288 devsMap[dev.ContainerPath] = dev.HostPath
289 opts.Devices = append(opts.Devices, kubecontainer.DeviceInfo{
290 PathOnHost: dev.HostPath,
291 PathInContainer: dev.ContainerPath,
292 Permissions: dev.Permissions,
293 })
294 }
295
296
297 for _, mount := range resp.Mounts {
298 if m, ok := mountsMap[mount.ContainerPath]; ok {
299 klog.V(4).InfoS("Skip existing mount", "containerPath", mount.ContainerPath, "hostPath", mount.HostPath)
300 if m != mount.HostPath {
301 klog.ErrorS(nil, "Container mount has conflicting mapping host mounts",
302 "containerPath", mount.ContainerPath, "conflictingPath", m, "hostPath", mount.HostPath)
303 }
304 continue
305 }
306 klog.V(4).InfoS("Add mount", "containerPath", mount.ContainerPath, "hostPath", mount.HostPath)
307 mountsMap[mount.ContainerPath] = mount.HostPath
308 opts.Mounts = append(opts.Mounts, kubecontainer.Mount{
309 Name: mount.ContainerPath,
310 ContainerPath: mount.ContainerPath,
311 HostPath: mount.HostPath,
312 ReadOnly: mount.ReadOnly,
313
314 SELinuxRelabel: false,
315 })
316 }
317
318
319 for k, v := range resp.Annotations {
320 if e, ok := annotationsMap[k]; ok {
321 klog.V(4).InfoS("Skip existing annotation", "annotationKey", k, "annotationValue", v)
322 if e != v {
323 klog.ErrorS(nil, "Annotation has conflicting setting", "annotationKey", k, "expected", e, "got", v)
324 }
325 continue
326 }
327 klog.V(4).InfoS("Add annotation", "annotationKey", k, "annotationValue", v)
328 annotationsMap[k] = v
329 opts.Annotations = append(opts.Annotations, kubecontainer.Annotation{Name: k, Value: v})
330 }
331
332 if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.DevicePluginCDIDevices) {
333
334 cdiDevices := getCDIDeviceInfo(resp, allCDIDevices)
335 opts.CDIDevices = append(opts.CDIDevices, cdiDevices...)
336 }
337 }
338
339
340
341 if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.DevicePluginCDIDevices) {
342
343
344
345 resourceID := podUID + "-" + contName
346 cdiAnnotations := getCDIAnnotations(resourceID, allCDIDevices, annotationsMap)
347 opts.Annotations = append(opts.Annotations, cdiAnnotations...)
348 }
349
350 return opts
351 }
352
353
354
355
356
357 func getCDIAnnotations(resourceID string, cdiDevices sets.Set[string], annotationsMap map[string]string) []kubecontainer.Annotation {
358
359 sortedCDIDevices := sets.List[string](cdiDevices)
360 annotations, err := cdi.GenerateAnnotations(types.UID(resourceID), "devicemanager", sortedCDIDevices)
361 if err != nil {
362 klog.ErrorS(err, "Failed to create CDI annotations")
363 return nil
364 }
365
366 var cdiAnnotations []kubecontainer.Annotation
367 for _, annotation := range annotations {
368 if e, ok := annotationsMap[annotation.Name]; ok {
369 klog.V(4).InfoS("Skip existing annotation", "annotationKey", annotation.Name, "annotationValue", annotation.Value)
370 if e != annotation.Value {
371 klog.ErrorS(nil, "Annotation has conflicting setting", "annotationKey", annotation.Name, "expected", e, "got", annotation.Value)
372 }
373 continue
374 }
375 klog.V(4).InfoS("Add annotation", "annotationKey", annotation.Name, "annotationValue", annotation.Value)
376 annotationsMap[annotation.Name] = annotation.Value
377 cdiAnnotations = append(cdiAnnotations, kubecontainer.Annotation{Name: annotation.Name, Value: annotation.Value})
378 }
379
380 return cdiAnnotations
381 }
382
383
384 func getCDIDeviceInfo(resp *pluginapi.ContainerAllocateResponse, knownCDIDevices sets.Set[string]) []kubecontainer.CDIDevice {
385 var cdiDevices []kubecontainer.CDIDevice
386 for _, cdiDevice := range resp.CDIDevices {
387 if knownCDIDevices.Has(cdiDevice.Name) {
388 klog.V(4).InfoS("Skip existing CDI Device", "name", cdiDevice.Name)
389 continue
390 }
391 klog.V(4).InfoS("Add CDI device", "name", cdiDevice.Name)
392 knownCDIDevices.Insert(cdiDevice.Name)
393
394 device := kubecontainer.CDIDevice{
395 Name: cdiDevice.Name,
396 }
397 cdiDevices = append(cdiDevices, device)
398 }
399
400 return cdiDevices
401 }
402
403
404 func (pdev *podDevices) getContainerDevices(podUID, contName string) ResourceDeviceInstances {
405 pdev.RLock()
406 defer pdev.RUnlock()
407
408 if _, podExists := pdev.devs[podUID]; !podExists {
409 return nil
410 }
411 if _, contExists := pdev.devs[podUID][contName]; !contExists {
412 return nil
413 }
414 resDev := NewResourceDeviceInstances()
415 for resource, allocateInfo := range pdev.devs[podUID][contName] {
416 if len(allocateInfo.deviceIds) == 0 {
417 continue
418 }
419 devicePluginMap := make(map[string]pluginapi.Device)
420 for numaid, devlist := range allocateInfo.deviceIds {
421 for _, devID := range devlist {
422 var topology *pluginapi.TopologyInfo
423 if numaid != nodeWithoutTopology {
424 NUMANodes := []*pluginapi.NUMANode{{ID: numaid}}
425 if pDev, ok := devicePluginMap[devID]; ok && pDev.Topology != nil {
426 if nodes := pDev.Topology.GetNodes(); nodes != nil {
427 NUMANodes = append(NUMANodes, nodes...)
428 }
429 }
430
431
432 topology = &pluginapi.TopologyInfo{Nodes: NUMANodes}
433 }
434 devicePluginMap[devID] = pluginapi.Device{
435 Topology: topology,
436 }
437 }
438 }
439 resDev[resource] = devicePluginMap
440 }
441 return resDev
442 }
443
444
445 type DeviceInstances map[string]pluginapi.Device
446
447
448 type ResourceDeviceInstances map[string]DeviceInstances
449
450
451 func NewResourceDeviceInstances() ResourceDeviceInstances {
452 return make(ResourceDeviceInstances)
453 }
454
455
456 func (rdev ResourceDeviceInstances) Clone() ResourceDeviceInstances {
457 clone := NewResourceDeviceInstances()
458 for resourceName, resourceDevs := range rdev {
459 clone[resourceName] = make(map[string]pluginapi.Device)
460 for devID, dev := range resourceDevs {
461 clone[resourceName][devID] = dev
462 }
463 }
464 return clone
465 }
466
467
468
469 func (rdev ResourceDeviceInstances) Filter(cond map[string]sets.Set[string]) ResourceDeviceInstances {
470 filtered := NewResourceDeviceInstances()
471 for resourceName, filterIDs := range cond {
472 if _, exists := rdev[resourceName]; !exists {
473 continue
474 }
475 filtered[resourceName] = DeviceInstances{}
476 for instanceID, instance := range rdev[resourceName] {
477 if filterIDs.Has(instanceID) {
478 filtered[resourceName][instanceID] = instance
479 }
480 }
481 }
482 return filtered
483 }
484
View as plain text