1
16
17 package devicemanager
18
19 import (
20 "context"
21 "fmt"
22 "os"
23 "path/filepath"
24 "runtime"
25 "sort"
26 "sync"
27 "time"
28
29 cadvisorapi "github.com/google/cadvisor/info/v1"
30 "k8s.io/klog/v2"
31
32 v1 "k8s.io/api/core/v1"
33 "k8s.io/apimachinery/pkg/api/resource"
34 errorsutil "k8s.io/apimachinery/pkg/util/errors"
35 "k8s.io/apimachinery/pkg/util/sets"
36 pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
37 "k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
38 "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
39 "k8s.io/kubernetes/pkg/kubelet/cm/containermap"
40 "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint"
41 plugin "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/plugin/v1beta1"
42 "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
43 "k8s.io/kubernetes/pkg/kubelet/config"
44 "k8s.io/kubernetes/pkg/kubelet/lifecycle"
45 "k8s.io/kubernetes/pkg/kubelet/metrics"
46 "k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
47 "k8s.io/kubernetes/pkg/kubelet/types"
48 schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
49 )
50
51 const nodeWithoutTopology = -1
52
53
54 type ActivePodsFunc func() []*v1.Pod
55
56
57 type ManagerImpl struct {
58 checkpointdir string
59
60 endpoints map[string]endpointInfo
61 mutex sync.Mutex
62
63 server plugin.Server
64
65
66
67
68 activePods ActivePodsFunc
69
70
71
72 sourcesReady config.SourcesReady
73
74
75 allDevices ResourceDeviceInstances
76
77
78 healthyDevices map[string]sets.Set[string]
79
80
81 unhealthyDevices map[string]sets.Set[string]
82
83
84 allocatedDevices map[string]sets.Set[string]
85
86
87 podDevices *podDevices
88 checkpointManager checkpointmanager.CheckpointManager
89
90
91 numaNodes []int
92
93
94 topologyAffinityStore topologymanager.Store
95
96
97
98 devicesToReuse PodReusableDevices
99
100
101 pendingAdmissionPod *v1.Pod
102
103
104
105 containerMap containermap.ContainerMap
106
107
108
109
110 containerRunningSet sets.Set[string]
111 }
112
113 type endpointInfo struct {
114 e endpoint
115 opts *pluginapi.DevicePluginOptions
116 }
117
118 type sourcesReadyStub struct{}
119
120
121 type PodReusableDevices map[string]map[string]sets.Set[string]
122
123 func (s *sourcesReadyStub) AddSource(source string) {}
124 func (s *sourcesReadyStub) AllReady() bool { return true }
125
126
127 func NewManagerImpl(topology []cadvisorapi.Node, topologyAffinityStore topologymanager.Store) (*ManagerImpl, error) {
128 socketPath := pluginapi.KubeletSocket
129 if runtime.GOOS == "windows" {
130 socketPath = os.Getenv("SYSTEMDRIVE") + pluginapi.KubeletSocketWindows
131 }
132 return newManagerImpl(socketPath, topology, topologyAffinityStore)
133 }
134
135 func newManagerImpl(socketPath string, topology []cadvisorapi.Node, topologyAffinityStore topologymanager.Store) (*ManagerImpl, error) {
136 klog.V(2).InfoS("Creating Device Plugin manager", "path", socketPath)
137
138 var numaNodes []int
139 for _, node := range topology {
140 numaNodes = append(numaNodes, node.Id)
141 }
142
143 manager := &ManagerImpl{
144 endpoints: make(map[string]endpointInfo),
145
146 allDevices: NewResourceDeviceInstances(),
147 healthyDevices: make(map[string]sets.Set[string]),
148 unhealthyDevices: make(map[string]sets.Set[string]),
149 allocatedDevices: make(map[string]sets.Set[string]),
150 podDevices: newPodDevices(),
151 numaNodes: numaNodes,
152 topologyAffinityStore: topologyAffinityStore,
153 devicesToReuse: make(PodReusableDevices),
154 }
155
156 server, err := plugin.NewServer(socketPath, manager, manager)
157 if err != nil {
158 return nil, fmt.Errorf("failed to create plugin server: %v", err)
159 }
160
161 manager.server = server
162 manager.checkpointdir, _ = filepath.Split(server.SocketPath())
163
164
165
166 manager.activePods = func() []*v1.Pod { return []*v1.Pod{} }
167 manager.sourcesReady = &sourcesReadyStub{}
168 checkpointManager, err := checkpointmanager.NewCheckpointManager(manager.checkpointdir)
169 if err != nil {
170 return nil, fmt.Errorf("failed to initialize checkpoint manager: %v", err)
171 }
172 manager.checkpointManager = checkpointManager
173
174 return manager, nil
175 }
176
177
178
179 func (m *ManagerImpl) CleanupPluginDirectory(dir string) error {
180 d, err := os.Open(dir)
181 if err != nil {
182 return err
183 }
184 defer d.Close()
185 names, err := d.Readdirnames(-1)
186 if err != nil {
187 return err
188 }
189 var errs []error
190 for _, name := range names {
191 filePath := filepath.Join(dir, name)
192 if filePath == m.checkpointFile() {
193 continue
194 }
195
196
197
198 stat, err := os.Lstat(filePath)
199 if err != nil {
200 klog.ErrorS(err, "Failed to stat file", "path", filePath)
201 continue
202 }
203 if stat.IsDir() {
204 continue
205 }
206 err = os.RemoveAll(filePath)
207 if err != nil {
208 errs = append(errs, err)
209 klog.ErrorS(err, "Failed to remove file", "path", filePath)
210 continue
211 }
212 }
213 return errorsutil.NewAggregate(errs)
214 }
215
216
217
218 func (m *ManagerImpl) PluginConnected(resourceName string, p plugin.DevicePlugin) error {
219 options, err := p.API().GetDevicePluginOptions(context.Background(), &pluginapi.Empty{})
220 if err != nil {
221 return fmt.Errorf("failed to get device plugin options: %v", err)
222 }
223
224 e := newEndpointImpl(p)
225
226 m.mutex.Lock()
227 defer m.mutex.Unlock()
228 m.endpoints[resourceName] = endpointInfo{e, options}
229
230 klog.V(2).InfoS("Device plugin connected", "resourceName", resourceName)
231 return nil
232 }
233
234
235
236 func (m *ManagerImpl) PluginDisconnected(resourceName string) {
237 m.mutex.Lock()
238 defer m.mutex.Unlock()
239
240 if ep, exists := m.endpoints[resourceName]; exists {
241 m.markResourceUnhealthy(resourceName)
242 klog.V(2).InfoS("Endpoint became unhealthy", "resourceName", resourceName, "endpoint", ep)
243
244 ep.e.setStopTime(time.Now())
245 }
246 }
247
248
249
250
251
252 func (m *ManagerImpl) PluginListAndWatchReceiver(resourceName string, resp *pluginapi.ListAndWatchResponse) {
253 var devices []pluginapi.Device
254 for _, d := range resp.Devices {
255 devices = append(devices, *d)
256 }
257 m.genericDeviceUpdateCallback(resourceName, devices)
258 }
259
260 func (m *ManagerImpl) genericDeviceUpdateCallback(resourceName string, devices []pluginapi.Device) {
261 healthyCount := 0
262 m.mutex.Lock()
263 m.healthyDevices[resourceName] = sets.New[string]()
264 m.unhealthyDevices[resourceName] = sets.New[string]()
265 m.allDevices[resourceName] = make(map[string]pluginapi.Device)
266 for _, dev := range devices {
267 m.allDevices[resourceName][dev.ID] = dev
268 if dev.Health == pluginapi.Healthy {
269 m.healthyDevices[resourceName].Insert(dev.ID)
270 healthyCount++
271 } else {
272 m.unhealthyDevices[resourceName].Insert(dev.ID)
273 }
274 }
275 m.mutex.Unlock()
276 if err := m.writeCheckpoint(); err != nil {
277 klog.ErrorS(err, "Writing checkpoint encountered")
278 }
279 klog.V(2).InfoS("Processed device updates for resource", "resourceName", resourceName, "totalCount", len(devices), "healthyCount", healthyCount)
280 }
281
282
283 func (m *ManagerImpl) GetWatcherHandler() cache.PluginHandler {
284 return m.server
285 }
286
287
288 func (m *ManagerImpl) checkpointFile() string {
289 return filepath.Join(m.checkpointdir, kubeletDeviceManagerCheckpoint)
290 }
291
292
293
294
295 func (m *ManagerImpl) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, initialContainers containermap.ContainerMap, initialContainerRunningSet sets.Set[string]) error {
296 klog.V(2).InfoS("Starting Device Plugin manager")
297
298 m.activePods = activePods
299 m.sourcesReady = sourcesReady
300 m.containerMap = initialContainers
301 m.containerRunningSet = initialContainerRunningSet
302
303
304 err := m.readCheckpoint()
305 if err != nil {
306 klog.InfoS("Continue after failing to read checkpoint file. Device allocation info may NOT be up-to-date", "err", err)
307 }
308
309 return m.server.Start()
310 }
311
312
313
314
315 func (m *ManagerImpl) Stop() error {
316 return m.server.Stop()
317 }
318
319
320
321 func (m *ManagerImpl) Allocate(pod *v1.Pod, container *v1.Container) error {
322
323
324 m.setPodPendingAdmission(pod)
325
326 if _, ok := m.devicesToReuse[string(pod.UID)]; !ok {
327 m.devicesToReuse[string(pod.UID)] = make(map[string]sets.Set[string])
328 }
329
330 for podUID := range m.devicesToReuse {
331 if podUID != string(pod.UID) {
332 delete(m.devicesToReuse, podUID)
333 }
334 }
335
336
337
338 for _, initContainer := range pod.Spec.InitContainers {
339 if container.Name == initContainer.Name {
340 if err := m.allocateContainerResources(pod, container, m.devicesToReuse[string(pod.UID)]); err != nil {
341 return err
342 }
343 if !types.IsRestartableInitContainer(&initContainer) {
344 m.podDevices.addContainerAllocatedResources(string(pod.UID), container.Name, m.devicesToReuse[string(pod.UID)])
345 } else {
346
347
348
349 m.podDevices.removeContainerAllocatedResources(string(pod.UID), container.Name, m.devicesToReuse[string(pod.UID)])
350 }
351 return nil
352 }
353 }
354 if err := m.allocateContainerResources(pod, container, m.devicesToReuse[string(pod.UID)]); err != nil {
355 return err
356 }
357 m.podDevices.removeContainerAllocatedResources(string(pod.UID), container.Name, m.devicesToReuse[string(pod.UID)])
358 return nil
359 }
360
361
362 func (m *ManagerImpl) UpdatePluginResources(node *schedulerframework.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
363 pod := attrs.Pod
364
365
366 if !m.podDevices.hasPod(string(pod.UID)) {
367 return nil
368 }
369
370 m.sanitizeNodeAllocatable(node)
371 return nil
372 }
373
374 func (m *ManagerImpl) markResourceUnhealthy(resourceName string) {
375 klog.V(2).InfoS("Mark all resources Unhealthy for resource", "resourceName", resourceName)
376 healthyDevices := sets.New[string]()
377 if _, ok := m.healthyDevices[resourceName]; ok {
378 healthyDevices = m.healthyDevices[resourceName]
379 m.healthyDevices[resourceName] = sets.New[string]()
380 }
381 if _, ok := m.unhealthyDevices[resourceName]; !ok {
382 m.unhealthyDevices[resourceName] = sets.New[string]()
383 }
384 m.unhealthyDevices[resourceName] = m.unhealthyDevices[resourceName].Union(healthyDevices)
385 }
386
387
388
389
390
391
392
393
394
395
396
397
398
399 func (m *ManagerImpl) GetCapacity() (v1.ResourceList, v1.ResourceList, []string) {
400 needsUpdateCheckpoint := false
401 var capacity = v1.ResourceList{}
402 var allocatable = v1.ResourceList{}
403 deletedResources := sets.New[string]()
404 m.mutex.Lock()
405 for resourceName, devices := range m.healthyDevices {
406 eI, ok := m.endpoints[resourceName]
407 if (ok && eI.e.stopGracePeriodExpired()) || !ok {
408
409
410
411 if !ok {
412 klog.ErrorS(nil, "Unexpected: healthyDevices and endpoints are out of sync")
413 }
414 delete(m.endpoints, resourceName)
415 delete(m.healthyDevices, resourceName)
416 deletedResources.Insert(resourceName)
417 needsUpdateCheckpoint = true
418 } else {
419 capacity[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(devices.Len()), resource.DecimalSI)
420 allocatable[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(devices.Len()), resource.DecimalSI)
421 }
422 }
423 for resourceName, devices := range m.unhealthyDevices {
424 eI, ok := m.endpoints[resourceName]
425 if (ok && eI.e.stopGracePeriodExpired()) || !ok {
426 if !ok {
427 klog.ErrorS(nil, "Unexpected: unhealthyDevices and endpoints are out of sync")
428 }
429 delete(m.endpoints, resourceName)
430 delete(m.unhealthyDevices, resourceName)
431 deletedResources.Insert(resourceName)
432 needsUpdateCheckpoint = true
433 } else {
434 capacityCount := capacity[v1.ResourceName(resourceName)]
435 unhealthyCount := *resource.NewQuantity(int64(devices.Len()), resource.DecimalSI)
436 capacityCount.Add(unhealthyCount)
437 capacity[v1.ResourceName(resourceName)] = capacityCount
438 }
439 }
440 m.mutex.Unlock()
441 if needsUpdateCheckpoint {
442 if err := m.writeCheckpoint(); err != nil {
443 klog.ErrorS(err, "Error on writing checkpoint")
444 }
445 }
446 return capacity, allocatable, deletedResources.UnsortedList()
447 }
448
449
450 func (m *ManagerImpl) writeCheckpoint() error {
451 m.mutex.Lock()
452 registeredDevs := make(map[string][]string)
453 for resource, devices := range m.healthyDevices {
454 registeredDevs[resource] = devices.UnsortedList()
455 }
456 data := checkpoint.New(m.podDevices.toCheckpointData(),
457 registeredDevs)
458 m.mutex.Unlock()
459 err := m.checkpointManager.CreateCheckpoint(kubeletDeviceManagerCheckpoint, data)
460 if err != nil {
461 err2 := fmt.Errorf("failed to write checkpoint file %q: %v", kubeletDeviceManagerCheckpoint, err)
462 klog.InfoS("Failed to write checkpoint file", "err", err)
463 return err2
464 }
465 return nil
466 }
467
468
469
470 func (m *ManagerImpl) readCheckpoint() error {
471
472
473
474
475 cp, err := m.getCheckpointV2()
476 if err != nil {
477 if err == errors.ErrCheckpointNotFound {
478
479 klog.InfoS("Failed to read data from checkpoint", "checkpoint", kubeletDeviceManagerCheckpoint, "err", err)
480 return nil
481 }
482
483 var errv1 error
484
485 cp, errv1 = m.getCheckpointV1()
486 if errv1 != nil {
487 klog.InfoS("Failed to read checkpoint V1 file", "err", errv1)
488
489
490 return err
491 }
492 klog.InfoS("Read data from a V1 checkpoint", "checkpoint", kubeletDeviceManagerCheckpoint)
493 }
494
495 m.mutex.Lock()
496 defer m.mutex.Unlock()
497 podDevices, registeredDevs := cp.GetDataInLatestFormat()
498 m.podDevices.fromCheckpointData(podDevices)
499 m.allocatedDevices = m.podDevices.devices()
500 for resource := range registeredDevs {
501
502
503 m.healthyDevices[resource] = sets.New[string]()
504 m.unhealthyDevices[resource] = sets.New[string]()
505 m.endpoints[resource] = endpointInfo{e: newStoppedEndpointImpl(resource), opts: nil}
506 }
507 return nil
508 }
509
510 func (m *ManagerImpl) getCheckpointV2() (checkpoint.DeviceManagerCheckpoint, error) {
511 registeredDevs := make(map[string][]string)
512 devEntries := make([]checkpoint.PodDevicesEntry, 0)
513 cp := checkpoint.New(devEntries, registeredDevs)
514 err := m.checkpointManager.GetCheckpoint(kubeletDeviceManagerCheckpoint, cp)
515 return cp, err
516 }
517
518 func (m *ManagerImpl) getCheckpointV1() (checkpoint.DeviceManagerCheckpoint, error) {
519 registeredDevs := make(map[string][]string)
520 devEntries := make([]checkpoint.PodDevicesEntryV1, 0)
521 cp := checkpoint.NewV1(devEntries, registeredDevs)
522 err := m.checkpointManager.GetCheckpoint(kubeletDeviceManagerCheckpoint, cp)
523 return cp, err
524 }
525
526
527 func (m *ManagerImpl) UpdateAllocatedDevices() {
528 if !m.sourcesReady.AllReady() {
529 return
530 }
531
532 m.mutex.Lock()
533 defer m.mutex.Unlock()
534
535 activeAndAdmittedPods := m.activePods()
536 if m.pendingAdmissionPod != nil {
537 activeAndAdmittedPods = append(activeAndAdmittedPods, m.pendingAdmissionPod)
538 }
539
540 podsToBeRemoved := m.podDevices.pods()
541 for _, pod := range activeAndAdmittedPods {
542 podsToBeRemoved.Delete(string(pod.UID))
543 }
544 if len(podsToBeRemoved) <= 0 {
545 return
546 }
547 klog.V(3).InfoS("Pods to be removed", "podUIDs", sets.List(podsToBeRemoved))
548 m.podDevices.delete(sets.List(podsToBeRemoved))
549
550 m.allocatedDevices = m.podDevices.devices()
551 }
552
553
554
555 func (m *ManagerImpl) devicesToAllocate(podUID, contName, resource string, required int, reusableDevices sets.Set[string]) (sets.Set[string], error) {
556 m.mutex.Lock()
557 defer m.mutex.Unlock()
558 needed := required
559
560
561 devices := m.podDevices.containerDevices(podUID, contName, resource)
562 if devices != nil {
563 klog.V(3).InfoS("Found pre-allocated devices for resource on pod", "resourceName", resource, "containerName", contName, "podUID", podUID, "devices", sets.List(devices))
564 needed = needed - devices.Len()
565
566
567 if needed != 0 {
568 return nil, fmt.Errorf("pod %q container %q changed request for resource %q from %d to %d", podUID, contName, resource, devices.Len(), required)
569 }
570 }
571
572
573
574
575
576
577
578
579
580
581
582 if !m.sourcesReady.AllReady() && m.isContainerAlreadyRunning(podUID, contName) {
583 klog.V(3).InfoS("container detected running, nothing to do", "deviceNumber", needed, "resourceName", resource, "podUID", podUID, "containerName", contName)
584 return nil, nil
585 }
586
587
588 klog.V(3).InfoS("Need devices to allocate for pod", "deviceNumber", needed, "resourceName", resource, "podUID", podUID, "containerName", contName)
589 healthyDevices, hasRegistered := m.healthyDevices[resource]
590
591
592
593
594
595
596
597 if !hasRegistered {
598 return nil, fmt.Errorf("cannot allocate unregistered device %s", resource)
599 }
600
601
602 if healthyDevices.Len() == 0 {
603 return nil, fmt.Errorf("no healthy devices present; cannot allocate unhealthy devices %s", resource)
604 }
605
606
607 if !healthyDevices.IsSuperset(devices) {
608 return nil, fmt.Errorf("previously allocated devices are no longer healthy; cannot allocate unhealthy devices %s", resource)
609 }
610
611
612
613 if needed == 0 {
614 klog.V(3).InfoS("no devices needed, nothing to do", "deviceNumber", needed, "resourceName", resource, "podUID", podUID, "containerName", contName)
615
616 return nil, nil
617 }
618
619
620
621 allocated := sets.New[string]()
622
623
624
625 allocateRemainingFrom := func(devices sets.Set[string]) bool {
626
627
628
629
630 if m.allocatedDevices[resource] == nil {
631 m.allocatedDevices[resource] = sets.New[string]()
632 }
633 for device := range devices.Difference(allocated) {
634 m.allocatedDevices[resource].Insert(device)
635 allocated.Insert(device)
636 needed--
637 if needed == 0 {
638 return true
639 }
640 }
641 return false
642 }
643
644
645 if allocateRemainingFrom(reusableDevices) {
646 return allocated, nil
647 }
648
649
650 devicesInUse := m.allocatedDevices[resource]
651
652 available := m.healthyDevices[resource].Difference(devicesInUse)
653 if available.Len() < needed {
654 return nil, fmt.Errorf("requested number of devices unavailable for %s. Requested: %d, Available: %d", resource, needed, available.Len())
655 }
656
657
658 aligned, unaligned, noAffinity := m.filterByAffinity(podUID, contName, resource, available)
659
660
661
662 if needed < aligned.Len() {
663
664 preferred, err := m.callGetPreferredAllocationIfAvailable(podUID, contName, resource, aligned.Union(allocated), allocated, required)
665 if err != nil {
666 return nil, err
667 }
668 if allocateRemainingFrom(preferred.Intersection(aligned)) {
669 return allocated, nil
670 }
671
672
673 if allocateRemainingFrom(aligned) {
674 return allocated, nil
675 }
676
677 return nil, fmt.Errorf("unexpectedly allocated less resources than required. Requested: %d, Got: %d", required, required-needed)
678 }
679
680
681
682
683 if allocateRemainingFrom(aligned) {
684 return allocated, nil
685 }
686
687
688
689 preferred, err := m.callGetPreferredAllocationIfAvailable(podUID, contName, resource, available.Union(allocated), allocated, required)
690 if err != nil {
691 return nil, err
692 }
693 if allocateRemainingFrom(preferred.Intersection(available)) {
694 return allocated, nil
695 }
696
697
698
699
700 if allocateRemainingFrom(unaligned) {
701 return allocated, nil
702 }
703 if allocateRemainingFrom(noAffinity) {
704 return allocated, nil
705 }
706
707 return nil, fmt.Errorf("unexpectedly allocated less resources than required. Requested: %d, Got: %d", required, required-needed)
708 }
709
710 func (m *ManagerImpl) filterByAffinity(podUID, contName, resource string, available sets.Set[string]) (sets.Set[string], sets.Set[string], sets.Set[string]) {
711
712 hint := m.topologyAffinityStore.GetAffinity(podUID, contName)
713 if !m.deviceHasTopologyAlignment(resource) || hint.NUMANodeAffinity == nil {
714 return sets.New[string](), sets.New[string](), available
715 }
716
717
718
719
720
721 perNodeDevices := make(map[int]sets.Set[string])
722 for d := range available {
723 if m.allDevices[resource][d].Topology == nil || len(m.allDevices[resource][d].Topology.Nodes) == 0 {
724 if _, ok := perNodeDevices[nodeWithoutTopology]; !ok {
725 perNodeDevices[nodeWithoutTopology] = sets.New[string]()
726 }
727 perNodeDevices[nodeWithoutTopology].Insert(d)
728 continue
729 }
730
731 for _, node := range m.allDevices[resource][d].Topology.Nodes {
732 if _, ok := perNodeDevices[int(node.ID)]; !ok {
733 perNodeDevices[int(node.ID)] = sets.New[string]()
734 }
735 perNodeDevices[int(node.ID)].Insert(d)
736 }
737 }
738
739
740 var nodes []int
741 for node := range perNodeDevices {
742 nodes = append(nodes, node)
743 }
744
745
746
747
748
749
750 sort.Slice(nodes, func(i, j int) bool {
751
752 if hint.NUMANodeAffinity.IsSet(nodes[i]) && hint.NUMANodeAffinity.IsSet(nodes[j]) {
753 return perNodeDevices[nodes[i]].Len() < perNodeDevices[nodes[j]].Len()
754 }
755 if hint.NUMANodeAffinity.IsSet(nodes[i]) {
756 return true
757 }
758 if hint.NUMANodeAffinity.IsSet(nodes[j]) {
759 return false
760 }
761
762
763 if nodes[i] == nodeWithoutTopology {
764 return false
765 }
766 if nodes[j] == nodeWithoutTopology {
767 return true
768 }
769
770
771 return perNodeDevices[nodes[i]].Len() < perNodeDevices[nodes[j]].Len()
772 })
773
774
775
776
777
778
779
780
781 var fromAffinity []string
782 var notFromAffinity []string
783 var withoutTopology []string
784 for d := range available {
785
786
787
788
789 for _, n := range nodes {
790 if perNodeDevices[n].Has(d) {
791 if n == nodeWithoutTopology {
792 withoutTopology = append(withoutTopology, d)
793 } else if hint.NUMANodeAffinity.IsSet(n) {
794 fromAffinity = append(fromAffinity, d)
795 } else {
796 notFromAffinity = append(notFromAffinity, d)
797 }
798 break
799 }
800 }
801 }
802
803
804 return sets.New[string](fromAffinity...), sets.New[string](notFromAffinity...), sets.New[string](withoutTopology...)
805 }
806
807
808
809
810
811 func (m *ManagerImpl) allocateContainerResources(pod *v1.Pod, container *v1.Container, devicesToReuse map[string]sets.Set[string]) error {
812 podUID := string(pod.UID)
813 contName := container.Name
814 allocatedDevicesUpdated := false
815 needsUpdateCheckpoint := false
816
817
818
819
820 for k, v := range container.Resources.Limits {
821 resource := string(k)
822 needed := int(v.Value())
823 klog.V(3).InfoS("Looking for needed resources", "needed", needed, "resourceName", resource)
824 if !m.isDevicePluginResource(resource) {
825 continue
826 }
827
828
829 if !allocatedDevicesUpdated {
830 m.UpdateAllocatedDevices()
831 allocatedDevicesUpdated = true
832 }
833 allocDevices, err := m.devicesToAllocate(podUID, contName, resource, needed, devicesToReuse[resource])
834 if err != nil {
835 return err
836 }
837 if allocDevices == nil || len(allocDevices) <= 0 {
838 continue
839 }
840
841 needsUpdateCheckpoint = true
842
843 startRPCTime := time.Now()
844
845
846
847
848
849
850
851
852
853
854
855
856 m.mutex.Lock()
857 eI, ok := m.endpoints[resource]
858 m.mutex.Unlock()
859 if !ok {
860 m.mutex.Lock()
861 m.allocatedDevices = m.podDevices.devices()
862 m.mutex.Unlock()
863 return fmt.Errorf("unknown Device Plugin %s", resource)
864 }
865
866 devs := allocDevices.UnsortedList()
867
868
869 klog.V(3).InfoS("Making allocation request for device plugin", "devices", devs, "resourceName", resource)
870 resp, err := eI.e.allocate(devs)
871 metrics.DevicePluginAllocationDuration.WithLabelValues(resource).Observe(metrics.SinceInSeconds(startRPCTime))
872 if err != nil {
873
874
875 m.mutex.Lock()
876 m.allocatedDevices = m.podDevices.devices()
877 m.mutex.Unlock()
878 return err
879 }
880
881 if len(resp.ContainerResponses) == 0 {
882 return fmt.Errorf("no containers return in allocation response %v", resp)
883 }
884
885 allocDevicesWithNUMA := checkpoint.NewDevicesPerNUMA()
886
887 m.mutex.Lock()
888 for dev := range allocDevices {
889 if m.allDevices[resource][dev].Topology == nil || len(m.allDevices[resource][dev].Topology.Nodes) == 0 {
890 allocDevicesWithNUMA[nodeWithoutTopology] = append(allocDevicesWithNUMA[nodeWithoutTopology], dev)
891 continue
892 }
893 for idx := range m.allDevices[resource][dev].Topology.Nodes {
894 node := m.allDevices[resource][dev].Topology.Nodes[idx]
895 allocDevicesWithNUMA[node.ID] = append(allocDevicesWithNUMA[node.ID], dev)
896 }
897 }
898 m.mutex.Unlock()
899 m.podDevices.insert(podUID, contName, resource, allocDevicesWithNUMA, resp.ContainerResponses[0])
900 }
901
902 if needsUpdateCheckpoint {
903 return m.writeCheckpoint()
904 }
905
906 return nil
907 }
908
909
910 func (m *ManagerImpl) checkPodActive(pod *v1.Pod) bool {
911 activePods := m.activePods()
912 for _, activePod := range activePods {
913 if activePod.UID == pod.UID {
914 return true
915 }
916 }
917
918 return false
919 }
920
921
922
923
924 func (m *ManagerImpl) GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) (*DeviceRunContainerOptions, error) {
925 podUID := string(pod.UID)
926 contName := container.Name
927 needsReAllocate := false
928 for k, v := range container.Resources.Limits {
929 resource := string(k)
930 if !m.isDevicePluginResource(resource) || v.Value() == 0 {
931 continue
932 }
933 err := m.callPreStartContainerIfNeeded(podUID, contName, resource)
934 if err != nil {
935 return nil, err
936 }
937
938 if !m.checkPodActive(pod) {
939 klog.ErrorS(nil, "pod deleted from activePods, skip to reAllocate", "podUID", podUID)
940 continue
941 }
942
943
944
945
946 if m.podDevices.containerDevices(podUID, contName, resource) == nil {
947 needsReAllocate = true
948 }
949 }
950 if needsReAllocate {
951 klog.V(2).InfoS("Needs to re-allocate device plugin resources for pod", "pod", klog.KObj(pod), "containerName", container.Name)
952 if err := m.Allocate(pod, container); err != nil {
953 return nil, err
954 }
955 }
956 return m.podDevices.deviceRunContainerOptions(string(pod.UID), container.Name), nil
957 }
958
959
960
961 func (m *ManagerImpl) callPreStartContainerIfNeeded(podUID, contName, resource string) error {
962 m.mutex.Lock()
963 eI, ok := m.endpoints[resource]
964 if !ok {
965 m.mutex.Unlock()
966 return fmt.Errorf("endpoint not found in cache for a registered resource: %s", resource)
967 }
968
969 if eI.opts == nil || !eI.opts.PreStartRequired {
970 m.mutex.Unlock()
971 klog.V(4).InfoS("Plugin options indicate to skip PreStartContainer for resource", "resourceName", resource)
972 return nil
973 }
974
975 devices := m.podDevices.containerDevices(podUID, contName, resource)
976 if devices == nil {
977 m.mutex.Unlock()
978 return fmt.Errorf("no devices found allocated in local cache for pod %s, container %s, resource %s", podUID, contName, resource)
979 }
980
981 m.mutex.Unlock()
982 devs := devices.UnsortedList()
983 klog.V(4).InfoS("Issuing a PreStartContainer call for container", "containerName", contName, "podUID", podUID)
984 _, err := eI.e.preStartContainer(devs)
985 if err != nil {
986 return fmt.Errorf("device plugin PreStartContainer rpc failed with err: %v", err)
987 }
988
989 return nil
990 }
991
992
993
994 func (m *ManagerImpl) callGetPreferredAllocationIfAvailable(podUID, contName, resource string, available, mustInclude sets.Set[string], size int) (sets.Set[string], error) {
995 eI, ok := m.endpoints[resource]
996 if !ok {
997 return nil, fmt.Errorf("endpoint not found in cache for a registered resource: %s", resource)
998 }
999
1000 if eI.opts == nil || !eI.opts.GetPreferredAllocationAvailable {
1001 klog.V(4).InfoS("Plugin options indicate to skip GetPreferredAllocation for resource", "resourceName", resource)
1002 return nil, nil
1003 }
1004
1005 m.mutex.Unlock()
1006 klog.V(4).InfoS("Issuing a GetPreferredAllocation call for container", "containerName", contName, "podUID", podUID)
1007 resp, err := eI.e.getPreferredAllocation(available.UnsortedList(), mustInclude.UnsortedList(), size)
1008 m.mutex.Lock()
1009 if err != nil {
1010 return nil, fmt.Errorf("device plugin GetPreferredAllocation rpc failed with err: %v", err)
1011 }
1012 if resp != nil && len(resp.ContainerResponses) > 0 {
1013 return sets.New[string](resp.ContainerResponses[0].DeviceIDs...), nil
1014 }
1015 return sets.New[string](), nil
1016 }
1017
1018
1019
1020
1021
1022 func (m *ManagerImpl) sanitizeNodeAllocatable(node *schedulerframework.NodeInfo) {
1023 var newAllocatableResource *schedulerframework.Resource
1024 allocatableResource := node.Allocatable
1025 if allocatableResource.ScalarResources == nil {
1026 allocatableResource.ScalarResources = make(map[v1.ResourceName]int64)
1027 }
1028
1029 m.mutex.Lock()
1030 defer m.mutex.Unlock()
1031 for resource, devices := range m.allocatedDevices {
1032 needed := devices.Len()
1033 quant, ok := allocatableResource.ScalarResources[v1.ResourceName(resource)]
1034 if ok && int(quant) >= needed {
1035 continue
1036 }
1037
1038
1039 if newAllocatableResource == nil {
1040 newAllocatableResource = allocatableResource.Clone()
1041 }
1042 newAllocatableResource.ScalarResources[v1.ResourceName(resource)] = int64(needed)
1043 }
1044 if newAllocatableResource != nil {
1045 node.Allocatable = newAllocatableResource
1046 }
1047 }
1048
1049 func (m *ManagerImpl) isDevicePluginResource(resource string) bool {
1050 m.mutex.Lock()
1051 defer m.mutex.Unlock()
1052 _, registeredResource := m.healthyDevices[resource]
1053 _, allocatedResource := m.allocatedDevices[resource]
1054
1055
1056 if registeredResource || allocatedResource {
1057 return true
1058 }
1059 return false
1060 }
1061
1062
1063 func (m *ManagerImpl) GetAllocatableDevices() ResourceDeviceInstances {
1064 m.mutex.Lock()
1065 defer m.mutex.Unlock()
1066 resp := m.allDevices.Filter(m.healthyDevices)
1067 klog.V(4).InfoS("GetAllocatableDevices", "known", len(m.allDevices), "allocatable", len(resp))
1068 return resp
1069 }
1070
1071
1072 func (m *ManagerImpl) GetDevices(podUID, containerName string) ResourceDeviceInstances {
1073 return m.podDevices.getContainerDevices(podUID, containerName)
1074 }
1075
1076
1077
1078
1079 func (m *ManagerImpl) ShouldResetExtendedResourceCapacity() bool {
1080 checkpoints, err := m.checkpointManager.ListCheckpoints()
1081 if err != nil {
1082 return false
1083 }
1084 return len(checkpoints) == 0
1085 }
1086
1087 func (m *ManagerImpl) setPodPendingAdmission(pod *v1.Pod) {
1088 m.mutex.Lock()
1089 defer m.mutex.Unlock()
1090
1091 m.pendingAdmissionPod = pod
1092 }
1093
1094 func (m *ManagerImpl) isContainerAlreadyRunning(podUID, cntName string) bool {
1095 cntID, err := m.containerMap.GetContainerID(podUID, cntName)
1096 if err != nil {
1097 klog.V(4).InfoS("container not found in the initial map, assumed NOT running", "podUID", podUID, "containerName", cntName, "err", err)
1098 return false
1099 }
1100
1101
1102
1103
1104 if !m.containerRunningSet.Has(cntID) {
1105 klog.V(4).InfoS("container not present in the initial running set", "podUID", podUID, "containerName", cntName, "containerID", cntID)
1106 return false
1107 }
1108
1109
1110 klog.V(4).InfoS("container found in the initial set, assumed running", "podUID", podUID, "containerName", cntName, "containerID", cntID)
1111 return true
1112 }
1113
View as plain text