...

Source file src/k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/manager.go

Documentation: k8s.io/kubernetes/pkg/kubelet/cm/devicemanager

     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package devicemanager
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"os"
    23  	"path/filepath"
    24  	"runtime"
    25  	"sort"
    26  	"sync"
    27  	"time"
    28  
    29  	cadvisorapi "github.com/google/cadvisor/info/v1"
    30  	"k8s.io/klog/v2"
    31  
    32  	v1 "k8s.io/api/core/v1"
    33  	"k8s.io/apimachinery/pkg/api/resource"
    34  	errorsutil "k8s.io/apimachinery/pkg/util/errors"
    35  	"k8s.io/apimachinery/pkg/util/sets"
    36  	pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
    37  	"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
    38  	"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
    39  	"k8s.io/kubernetes/pkg/kubelet/cm/containermap"
    40  	"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint"
    41  	plugin "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/plugin/v1beta1"
    42  	"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
    43  	"k8s.io/kubernetes/pkg/kubelet/config"
    44  	"k8s.io/kubernetes/pkg/kubelet/lifecycle"
    45  	"k8s.io/kubernetes/pkg/kubelet/metrics"
    46  	"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
    47  	"k8s.io/kubernetes/pkg/kubelet/types"
    48  	schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
    49  )
    50  
    51  const nodeWithoutTopology = -1
    52  
    53  // ActivePodsFunc is a function that returns a list of pods to reconcile.
    54  type ActivePodsFunc func() []*v1.Pod
    55  
    56  // ManagerImpl is the structure in charge of managing Device Plugins.
    57  type ManagerImpl struct {
    58  	checkpointdir string
    59  
    60  	endpoints map[string]endpointInfo // Key is ResourceName
    61  	mutex     sync.Mutex
    62  
    63  	server plugin.Server
    64  
    65  	// activePods is a method for listing active pods on the node
    66  	// so the amount of pluginResources requested by existing pods
    67  	// could be counted when updating allocated devices
    68  	activePods ActivePodsFunc
    69  
    70  	// sourcesReady provides the readiness of kubelet configuration sources such as apiserver update readiness.
    71  	// We use it to determine when we can purge inactive pods from checkpointed state.
    72  	sourcesReady config.SourcesReady
    73  
    74  	// allDevices holds all the devices currently registered to the device manager
    75  	allDevices ResourceDeviceInstances
    76  
    77  	// healthyDevices contains all the registered healthy resourceNames and their exported device IDs.
    78  	healthyDevices map[string]sets.Set[string]
    79  
    80  	// unhealthyDevices contains all the unhealthy devices and their exported device IDs.
    81  	unhealthyDevices map[string]sets.Set[string]
    82  
    83  	// allocatedDevices contains allocated deviceIds, keyed by resourceName.
    84  	allocatedDevices map[string]sets.Set[string]
    85  
    86  	// podDevices contains pod to allocated device mapping.
    87  	podDevices        *podDevices
    88  	checkpointManager checkpointmanager.CheckpointManager
    89  
    90  	// List of NUMA Nodes available on the underlying machine
    91  	numaNodes []int
    92  
    93  	// Store of Topology Affinities that the Device Manager can query.
    94  	topologyAffinityStore topologymanager.Store
    95  
    96  	// devicesToReuse contains devices that can be reused as they have been allocated to
    97  	// init containers.
    98  	devicesToReuse PodReusableDevices
    99  
   100  	// pendingAdmissionPod contain the pod during the admission phase
   101  	pendingAdmissionPod *v1.Pod
   102  
   103  	// containerMap provides a mapping from (pod, container) -> containerID
   104  	// for all containers in a pod. Used to detect pods running across a restart
   105  	containerMap containermap.ContainerMap
   106  
   107  	// containerRunningSet identifies which container among those present in `containerMap`
   108  	// was reported running by the container runtime when `containerMap` was computed.
   109  	// Used to detect pods running across a restart
   110  	containerRunningSet sets.Set[string]
   111  }
   112  
   113  type endpointInfo struct {
   114  	e    endpoint
   115  	opts *pluginapi.DevicePluginOptions
   116  }
   117  
   118  type sourcesReadyStub struct{}
   119  
   120  // PodReusableDevices is a map by pod name of devices to reuse.
   121  type PodReusableDevices map[string]map[string]sets.Set[string]
   122  
   123  func (s *sourcesReadyStub) AddSource(source string) {}
   124  func (s *sourcesReadyStub) AllReady() bool          { return true }
   125  
   126  // NewManagerImpl creates a new manager.
   127  func NewManagerImpl(topology []cadvisorapi.Node, topologyAffinityStore topologymanager.Store) (*ManagerImpl, error) {
   128  	socketPath := pluginapi.KubeletSocket
   129  	if runtime.GOOS == "windows" {
   130  		socketPath = os.Getenv("SYSTEMDRIVE") + pluginapi.KubeletSocketWindows
   131  	}
   132  	return newManagerImpl(socketPath, topology, topologyAffinityStore)
   133  }
   134  
   135  func newManagerImpl(socketPath string, topology []cadvisorapi.Node, topologyAffinityStore topologymanager.Store) (*ManagerImpl, error) {
   136  	klog.V(2).InfoS("Creating Device Plugin manager", "path", socketPath)
   137  
   138  	var numaNodes []int
   139  	for _, node := range topology {
   140  		numaNodes = append(numaNodes, node.Id)
   141  	}
   142  
   143  	manager := &ManagerImpl{
   144  		endpoints: make(map[string]endpointInfo),
   145  
   146  		allDevices:            NewResourceDeviceInstances(),
   147  		healthyDevices:        make(map[string]sets.Set[string]),
   148  		unhealthyDevices:      make(map[string]sets.Set[string]),
   149  		allocatedDevices:      make(map[string]sets.Set[string]),
   150  		podDevices:            newPodDevices(),
   151  		numaNodes:             numaNodes,
   152  		topologyAffinityStore: topologyAffinityStore,
   153  		devicesToReuse:        make(PodReusableDevices),
   154  	}
   155  
   156  	server, err := plugin.NewServer(socketPath, manager, manager)
   157  	if err != nil {
   158  		return nil, fmt.Errorf("failed to create plugin server: %v", err)
   159  	}
   160  
   161  	manager.server = server
   162  	manager.checkpointdir, _ = filepath.Split(server.SocketPath())
   163  
   164  	// The following structures are populated with real implementations in manager.Start()
   165  	// Before that, initializes them to perform no-op operations.
   166  	manager.activePods = func() []*v1.Pod { return []*v1.Pod{} }
   167  	manager.sourcesReady = &sourcesReadyStub{}
   168  	checkpointManager, err := checkpointmanager.NewCheckpointManager(manager.checkpointdir)
   169  	if err != nil {
   170  		return nil, fmt.Errorf("failed to initialize checkpoint manager: %v", err)
   171  	}
   172  	manager.checkpointManager = checkpointManager
   173  
   174  	return manager, nil
   175  }
   176  
   177  // CleanupPluginDirectory is to remove all existing unix sockets
   178  // from /var/lib/kubelet/device-plugins on Device Plugin Manager start
   179  func (m *ManagerImpl) CleanupPluginDirectory(dir string) error {
   180  	d, err := os.Open(dir)
   181  	if err != nil {
   182  		return err
   183  	}
   184  	defer d.Close()
   185  	names, err := d.Readdirnames(-1)
   186  	if err != nil {
   187  		return err
   188  	}
   189  	var errs []error
   190  	for _, name := range names {
   191  		filePath := filepath.Join(dir, name)
   192  		if filePath == m.checkpointFile() {
   193  			continue
   194  		}
   195  		// TODO: Until the bug - https://github.com/golang/go/issues/33357 is fixed, os.stat wouldn't return the
   196  		// right mode(socket) on windows. Hence deleting the file, without checking whether
   197  		// its a socket, on windows.
   198  		stat, err := os.Lstat(filePath)
   199  		if err != nil {
   200  			klog.ErrorS(err, "Failed to stat file", "path", filePath)
   201  			continue
   202  		}
   203  		if stat.IsDir() {
   204  			continue
   205  		}
   206  		err = os.RemoveAll(filePath)
   207  		if err != nil {
   208  			errs = append(errs, err)
   209  			klog.ErrorS(err, "Failed to remove file", "path", filePath)
   210  			continue
   211  		}
   212  	}
   213  	return errorsutil.NewAggregate(errs)
   214  }
   215  
   216  // PluginConnected is to connect a plugin to a new endpoint.
   217  // This is done as part of device plugin registration.
   218  func (m *ManagerImpl) PluginConnected(resourceName string, p plugin.DevicePlugin) error {
   219  	options, err := p.API().GetDevicePluginOptions(context.Background(), &pluginapi.Empty{})
   220  	if err != nil {
   221  		return fmt.Errorf("failed to get device plugin options: %v", err)
   222  	}
   223  
   224  	e := newEndpointImpl(p)
   225  
   226  	m.mutex.Lock()
   227  	defer m.mutex.Unlock()
   228  	m.endpoints[resourceName] = endpointInfo{e, options}
   229  
   230  	klog.V(2).InfoS("Device plugin connected", "resourceName", resourceName)
   231  	return nil
   232  }
   233  
   234  // PluginDisconnected is to disconnect a plugin from an endpoint.
   235  // This is done as part of device plugin deregistration.
   236  func (m *ManagerImpl) PluginDisconnected(resourceName string) {
   237  	m.mutex.Lock()
   238  	defer m.mutex.Unlock()
   239  
   240  	if ep, exists := m.endpoints[resourceName]; exists {
   241  		m.markResourceUnhealthy(resourceName)
   242  		klog.V(2).InfoS("Endpoint became unhealthy", "resourceName", resourceName, "endpoint", ep)
   243  
   244  		ep.e.setStopTime(time.Now())
   245  	}
   246  }
   247  
   248  // PluginListAndWatchReceiver receives ListAndWatchResponse from a device plugin
   249  // and ensures that an upto date state (e.g. number of devices and device health)
   250  // is captured. Also, registered device and device to container allocation
   251  // information is checkpointed to the disk.
   252  func (m *ManagerImpl) PluginListAndWatchReceiver(resourceName string, resp *pluginapi.ListAndWatchResponse) {
   253  	var devices []pluginapi.Device
   254  	for _, d := range resp.Devices {
   255  		devices = append(devices, *d)
   256  	}
   257  	m.genericDeviceUpdateCallback(resourceName, devices)
   258  }
   259  
   260  func (m *ManagerImpl) genericDeviceUpdateCallback(resourceName string, devices []pluginapi.Device) {
   261  	healthyCount := 0
   262  	m.mutex.Lock()
   263  	m.healthyDevices[resourceName] = sets.New[string]()
   264  	m.unhealthyDevices[resourceName] = sets.New[string]()
   265  	m.allDevices[resourceName] = make(map[string]pluginapi.Device)
   266  	for _, dev := range devices {
   267  		m.allDevices[resourceName][dev.ID] = dev
   268  		if dev.Health == pluginapi.Healthy {
   269  			m.healthyDevices[resourceName].Insert(dev.ID)
   270  			healthyCount++
   271  		} else {
   272  			m.unhealthyDevices[resourceName].Insert(dev.ID)
   273  		}
   274  	}
   275  	m.mutex.Unlock()
   276  	if err := m.writeCheckpoint(); err != nil {
   277  		klog.ErrorS(err, "Writing checkpoint encountered")
   278  	}
   279  	klog.V(2).InfoS("Processed device updates for resource", "resourceName", resourceName, "totalCount", len(devices), "healthyCount", healthyCount)
   280  }
   281  
   282  // GetWatcherHandler returns the plugin handler
   283  func (m *ManagerImpl) GetWatcherHandler() cache.PluginHandler {
   284  	return m.server
   285  }
   286  
   287  // checkpointFile returns device plugin checkpoint file path.
   288  func (m *ManagerImpl) checkpointFile() string {
   289  	return filepath.Join(m.checkpointdir, kubeletDeviceManagerCheckpoint)
   290  }
   291  
   292  // Start starts the Device Plugin Manager and start initialization of
   293  // podDevices and allocatedDevices information from checkpointed state and
   294  // starts device plugin registration service.
   295  func (m *ManagerImpl) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, initialContainers containermap.ContainerMap, initialContainerRunningSet sets.Set[string]) error {
   296  	klog.V(2).InfoS("Starting Device Plugin manager")
   297  
   298  	m.activePods = activePods
   299  	m.sourcesReady = sourcesReady
   300  	m.containerMap = initialContainers
   301  	m.containerRunningSet = initialContainerRunningSet
   302  
   303  	// Loads in allocatedDevices information from disk.
   304  	err := m.readCheckpoint()
   305  	if err != nil {
   306  		klog.InfoS("Continue after failing to read checkpoint file. Device allocation info may NOT be up-to-date", "err", err)
   307  	}
   308  
   309  	return m.server.Start()
   310  }
   311  
   312  // Stop is the function that can stop the plugin server.
   313  // Can be called concurrently, more than once, and is safe to call
   314  // without a prior Start.
   315  func (m *ManagerImpl) Stop() error {
   316  	return m.server.Stop()
   317  }
   318  
   319  // Allocate is the call that you can use to allocate a set of devices
   320  // from the registered device plugins.
   321  func (m *ManagerImpl) Allocate(pod *v1.Pod, container *v1.Container) error {
   322  	// The pod is during the admission phase. We need to save the pod to avoid it
   323  	// being cleaned before the admission ended
   324  	m.setPodPendingAdmission(pod)
   325  
   326  	if _, ok := m.devicesToReuse[string(pod.UID)]; !ok {
   327  		m.devicesToReuse[string(pod.UID)] = make(map[string]sets.Set[string])
   328  	}
   329  	// If pod entries to m.devicesToReuse other than the current pod exist, delete them.
   330  	for podUID := range m.devicesToReuse {
   331  		if podUID != string(pod.UID) {
   332  			delete(m.devicesToReuse, podUID)
   333  		}
   334  	}
   335  	// Allocate resources for init containers first as we know the caller always loops
   336  	// through init containers before looping through app containers. Should the caller
   337  	// ever change those semantics, this logic will need to be amended.
   338  	for _, initContainer := range pod.Spec.InitContainers {
   339  		if container.Name == initContainer.Name {
   340  			if err := m.allocateContainerResources(pod, container, m.devicesToReuse[string(pod.UID)]); err != nil {
   341  				return err
   342  			}
   343  			if !types.IsRestartableInitContainer(&initContainer) {
   344  				m.podDevices.addContainerAllocatedResources(string(pod.UID), container.Name, m.devicesToReuse[string(pod.UID)])
   345  			} else {
   346  				// If the init container is restartable, we need to keep the
   347  				// devices allocated. In other words, we should remove them
   348  				// from the devicesToReuse.
   349  				m.podDevices.removeContainerAllocatedResources(string(pod.UID), container.Name, m.devicesToReuse[string(pod.UID)])
   350  			}
   351  			return nil
   352  		}
   353  	}
   354  	if err := m.allocateContainerResources(pod, container, m.devicesToReuse[string(pod.UID)]); err != nil {
   355  		return err
   356  	}
   357  	m.podDevices.removeContainerAllocatedResources(string(pod.UID), container.Name, m.devicesToReuse[string(pod.UID)])
   358  	return nil
   359  }
   360  
   361  // UpdatePluginResources updates node resources based on devices already allocated to pods.
   362  func (m *ManagerImpl) UpdatePluginResources(node *schedulerframework.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
   363  	pod := attrs.Pod
   364  
   365  	// quick return if no pluginResources requested
   366  	if !m.podDevices.hasPod(string(pod.UID)) {
   367  		return nil
   368  	}
   369  
   370  	m.sanitizeNodeAllocatable(node)
   371  	return nil
   372  }
   373  
   374  func (m *ManagerImpl) markResourceUnhealthy(resourceName string) {
   375  	klog.V(2).InfoS("Mark all resources Unhealthy for resource", "resourceName", resourceName)
   376  	healthyDevices := sets.New[string]()
   377  	if _, ok := m.healthyDevices[resourceName]; ok {
   378  		healthyDevices = m.healthyDevices[resourceName]
   379  		m.healthyDevices[resourceName] = sets.New[string]()
   380  	}
   381  	if _, ok := m.unhealthyDevices[resourceName]; !ok {
   382  		m.unhealthyDevices[resourceName] = sets.New[string]()
   383  	}
   384  	m.unhealthyDevices[resourceName] = m.unhealthyDevices[resourceName].Union(healthyDevices)
   385  }
   386  
   387  // GetCapacity is expected to be called when Kubelet updates its node status.
   388  // The first returned variable contains the registered device plugin resource capacity.
   389  // The second returned variable contains the registered device plugin resource allocatable.
   390  // The third returned variable contains previously registered resources that are no longer active.
   391  // Kubelet uses this information to update resource capacity/allocatable in its node status.
   392  // After the call, device plugin can remove the inactive resources from its internal list as the
   393  // change is already reflected in Kubelet node status.
   394  // Note in the special case after Kubelet restarts, device plugin resource capacities can
   395  // temporarily drop to zero till corresponding device plugins re-register. This is OK because
   396  // cm.UpdatePluginResource() run during predicate Admit guarantees we adjust nodeinfo
   397  // capacity for already allocated pods so that they can continue to run. However, new pods
   398  // requiring device plugin resources will not be scheduled till device plugin re-registers.
   399  func (m *ManagerImpl) GetCapacity() (v1.ResourceList, v1.ResourceList, []string) {
   400  	needsUpdateCheckpoint := false
   401  	var capacity = v1.ResourceList{}
   402  	var allocatable = v1.ResourceList{}
   403  	deletedResources := sets.New[string]()
   404  	m.mutex.Lock()
   405  	for resourceName, devices := range m.healthyDevices {
   406  		eI, ok := m.endpoints[resourceName]
   407  		if (ok && eI.e.stopGracePeriodExpired()) || !ok {
   408  			// The resources contained in endpoints and (un)healthyDevices
   409  			// should always be consistent. Otherwise, we run with the risk
   410  			// of failing to garbage collect non-existing resources or devices.
   411  			if !ok {
   412  				klog.ErrorS(nil, "Unexpected: healthyDevices and endpoints are out of sync")
   413  			}
   414  			delete(m.endpoints, resourceName)
   415  			delete(m.healthyDevices, resourceName)
   416  			deletedResources.Insert(resourceName)
   417  			needsUpdateCheckpoint = true
   418  		} else {
   419  			capacity[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(devices.Len()), resource.DecimalSI)
   420  			allocatable[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(devices.Len()), resource.DecimalSI)
   421  		}
   422  	}
   423  	for resourceName, devices := range m.unhealthyDevices {
   424  		eI, ok := m.endpoints[resourceName]
   425  		if (ok && eI.e.stopGracePeriodExpired()) || !ok {
   426  			if !ok {
   427  				klog.ErrorS(nil, "Unexpected: unhealthyDevices and endpoints are out of sync")
   428  			}
   429  			delete(m.endpoints, resourceName)
   430  			delete(m.unhealthyDevices, resourceName)
   431  			deletedResources.Insert(resourceName)
   432  			needsUpdateCheckpoint = true
   433  		} else {
   434  			capacityCount := capacity[v1.ResourceName(resourceName)]
   435  			unhealthyCount := *resource.NewQuantity(int64(devices.Len()), resource.DecimalSI)
   436  			capacityCount.Add(unhealthyCount)
   437  			capacity[v1.ResourceName(resourceName)] = capacityCount
   438  		}
   439  	}
   440  	m.mutex.Unlock()
   441  	if needsUpdateCheckpoint {
   442  		if err := m.writeCheckpoint(); err != nil {
   443  			klog.ErrorS(err, "Error on writing checkpoint")
   444  		}
   445  	}
   446  	return capacity, allocatable, deletedResources.UnsortedList()
   447  }
   448  
   449  // Checkpoints device to container allocation information to disk.
   450  func (m *ManagerImpl) writeCheckpoint() error {
   451  	m.mutex.Lock()
   452  	registeredDevs := make(map[string][]string)
   453  	for resource, devices := range m.healthyDevices {
   454  		registeredDevs[resource] = devices.UnsortedList()
   455  	}
   456  	data := checkpoint.New(m.podDevices.toCheckpointData(),
   457  		registeredDevs)
   458  	m.mutex.Unlock()
   459  	err := m.checkpointManager.CreateCheckpoint(kubeletDeviceManagerCheckpoint, data)
   460  	if err != nil {
   461  		err2 := fmt.Errorf("failed to write checkpoint file %q: %v", kubeletDeviceManagerCheckpoint, err)
   462  		klog.InfoS("Failed to write checkpoint file", "err", err)
   463  		return err2
   464  	}
   465  	return nil
   466  }
   467  
   468  // Reads device to container allocation information from disk, and populates
   469  // m.allocatedDevices accordingly.
   470  func (m *ManagerImpl) readCheckpoint() error {
   471  	// the vast majority of time we restore a compatible checkpoint, so we try
   472  	// the current version first. Trying to restore older format checkpoints is
   473  	// relevant only in the kubelet upgrade flow, which happens once in a
   474  	// (long) while.
   475  	cp, err := m.getCheckpointV2()
   476  	if err != nil {
   477  		if err == errors.ErrCheckpointNotFound {
   478  			// no point in trying anything else
   479  			klog.InfoS("Failed to read data from checkpoint", "checkpoint", kubeletDeviceManagerCheckpoint, "err", err)
   480  			return nil
   481  		}
   482  
   483  		var errv1 error
   484  		// one last try: maybe it's a old format checkpoint?
   485  		cp, errv1 = m.getCheckpointV1()
   486  		if errv1 != nil {
   487  			klog.InfoS("Failed to read checkpoint V1 file", "err", errv1)
   488  			// intentionally return the parent error. We expect to restore V1 checkpoints
   489  			// a tiny fraction of time, so what matters most is the current checkpoint read error.
   490  			return err
   491  		}
   492  		klog.InfoS("Read data from a V1 checkpoint", "checkpoint", kubeletDeviceManagerCheckpoint)
   493  	}
   494  
   495  	m.mutex.Lock()
   496  	defer m.mutex.Unlock()
   497  	podDevices, registeredDevs := cp.GetDataInLatestFormat()
   498  	m.podDevices.fromCheckpointData(podDevices)
   499  	m.allocatedDevices = m.podDevices.devices()
   500  	for resource := range registeredDevs {
   501  		// During start up, creates empty healthyDevices list so that the resource capacity
   502  		// will stay zero till the corresponding device plugin re-registers.
   503  		m.healthyDevices[resource] = sets.New[string]()
   504  		m.unhealthyDevices[resource] = sets.New[string]()
   505  		m.endpoints[resource] = endpointInfo{e: newStoppedEndpointImpl(resource), opts: nil}
   506  	}
   507  	return nil
   508  }
   509  
   510  func (m *ManagerImpl) getCheckpointV2() (checkpoint.DeviceManagerCheckpoint, error) {
   511  	registeredDevs := make(map[string][]string)
   512  	devEntries := make([]checkpoint.PodDevicesEntry, 0)
   513  	cp := checkpoint.New(devEntries, registeredDevs)
   514  	err := m.checkpointManager.GetCheckpoint(kubeletDeviceManagerCheckpoint, cp)
   515  	return cp, err
   516  }
   517  
   518  func (m *ManagerImpl) getCheckpointV1() (checkpoint.DeviceManagerCheckpoint, error) {
   519  	registeredDevs := make(map[string][]string)
   520  	devEntries := make([]checkpoint.PodDevicesEntryV1, 0)
   521  	cp := checkpoint.NewV1(devEntries, registeredDevs)
   522  	err := m.checkpointManager.GetCheckpoint(kubeletDeviceManagerCheckpoint, cp)
   523  	return cp, err
   524  }
   525  
   526  // UpdateAllocatedDevices frees any Devices that are bound to terminated pods.
   527  func (m *ManagerImpl) UpdateAllocatedDevices() {
   528  	if !m.sourcesReady.AllReady() {
   529  		return
   530  	}
   531  
   532  	m.mutex.Lock()
   533  	defer m.mutex.Unlock()
   534  
   535  	activeAndAdmittedPods := m.activePods()
   536  	if m.pendingAdmissionPod != nil {
   537  		activeAndAdmittedPods = append(activeAndAdmittedPods, m.pendingAdmissionPod)
   538  	}
   539  
   540  	podsToBeRemoved := m.podDevices.pods()
   541  	for _, pod := range activeAndAdmittedPods {
   542  		podsToBeRemoved.Delete(string(pod.UID))
   543  	}
   544  	if len(podsToBeRemoved) <= 0 {
   545  		return
   546  	}
   547  	klog.V(3).InfoS("Pods to be removed", "podUIDs", sets.List(podsToBeRemoved))
   548  	m.podDevices.delete(sets.List(podsToBeRemoved))
   549  	// Regenerated allocatedDevices after we update pod allocation information.
   550  	m.allocatedDevices = m.podDevices.devices()
   551  }
   552  
   553  // Returns list of device Ids we need to allocate with Allocate rpc call.
   554  // Returns empty list in case we don't need to issue the Allocate rpc call.
   555  func (m *ManagerImpl) devicesToAllocate(podUID, contName, resource string, required int, reusableDevices sets.Set[string]) (sets.Set[string], error) {
   556  	m.mutex.Lock()
   557  	defer m.mutex.Unlock()
   558  	needed := required
   559  	// Gets list of devices that have already been allocated.
   560  	// This can happen if a container restarts for example.
   561  	devices := m.podDevices.containerDevices(podUID, contName, resource)
   562  	if devices != nil {
   563  		klog.V(3).InfoS("Found pre-allocated devices for resource on pod", "resourceName", resource, "containerName", contName, "podUID", podUID, "devices", sets.List(devices))
   564  		needed = needed - devices.Len()
   565  		// A pod's resource is not expected to change once admitted by the API server,
   566  		// so just fail loudly here. We can revisit this part if this no longer holds.
   567  		if needed != 0 {
   568  			return nil, fmt.Errorf("pod %q container %q changed request for resource %q from %d to %d", podUID, contName, resource, devices.Len(), required)
   569  		}
   570  	}
   571  
   572  	// We have 3 major flows to handle:
   573  	// 1. kubelet running, normal allocation (needed > 0, container being  [re]created). Steady state and most common case by far and large.
   574  	// 2. kubelet restart. In this scenario every other component of the stack (device plugins, app container, runtime) is still running.
   575  	// 3. node reboot. In this scenario device plugins may not be running yet when we try to allocate devices.
   576  	//    note: if we get this far the runtime is surely running. This is usually enforced at OS level by startup system services dependencies.
   577  
   578  	// First we take care of the exceptional flow (scenarios 2 and 3). In both flows, kubelet is reinitializing, and while kubelet is initializing, sources are NOT all ready.
   579  	// Is this a simple kubelet restart (scenario 2)? To distinguish, we use the information we got for runtime. If we are asked to allocate devices for containers reported
   580  	// running, then it can only be a kubelet restart. On node reboot the runtime and the containers were also shut down. Then, if the container was running, it can only be
   581  	// because it already has access to all the required devices, so we got nothing to do and we can bail out.
   582  	if !m.sourcesReady.AllReady() && m.isContainerAlreadyRunning(podUID, contName) {
   583  		klog.V(3).InfoS("container detected running, nothing to do", "deviceNumber", needed, "resourceName", resource, "podUID", podUID, "containerName", contName)
   584  		return nil, nil
   585  	}
   586  
   587  	// We dealt with scenario 2. If we got this far it's either scenario 3 (node reboot) or scenario 1 (steady state, normal flow).
   588  	klog.V(3).InfoS("Need devices to allocate for pod", "deviceNumber", needed, "resourceName", resource, "podUID", podUID, "containerName", contName)
   589  	healthyDevices, hasRegistered := m.healthyDevices[resource]
   590  
   591  	// The following checks are expected to fail only happen on scenario 3 (node reboot).
   592  	// The kubelet is reinitializing and got a container from sources. But there's no ordering, so an app container may attempt allocation _before_ the device plugin was created,
   593  	// has registered and reported back to kubelet the devices.
   594  	// This can only happen on scenario 3 because at steady state (scenario 1) the scheduler prevents pod to be sent towards node which don't report enough devices.
   595  	// Note: we need to check the device health and registration status *before* we check how many devices are needed, doing otherwise caused issue #109595
   596  	// Note: if the scheduler is bypassed, we fall back in scenario 1, so we still need these checks.
   597  	if !hasRegistered {
   598  		return nil, fmt.Errorf("cannot allocate unregistered device %s", resource)
   599  	}
   600  
   601  	// Check if registered resource has healthy devices
   602  	if healthyDevices.Len() == 0 {
   603  		return nil, fmt.Errorf("no healthy devices present; cannot allocate unhealthy devices %s", resource)
   604  	}
   605  
   606  	// Check if all the previously allocated devices are healthy
   607  	if !healthyDevices.IsSuperset(devices) {
   608  		return nil, fmt.Errorf("previously allocated devices are no longer healthy; cannot allocate unhealthy devices %s", resource)
   609  	}
   610  
   611  	// We handled the known error paths in scenario 3 (node reboot), so from now on we can fall back in a common path.
   612  	// We cover container restart on kubelet steady state with the same flow.
   613  	if needed == 0 {
   614  		klog.V(3).InfoS("no devices needed, nothing to do", "deviceNumber", needed, "resourceName", resource, "podUID", podUID, "containerName", contName)
   615  		// No change, no work.
   616  		return nil, nil
   617  	}
   618  
   619  	// Declare the list of allocated devices.
   620  	// This will be populated and returned below.
   621  	allocated := sets.New[string]()
   622  
   623  	// Create a closure to help with device allocation
   624  	// Returns 'true' once no more devices need to be allocated.
   625  	allocateRemainingFrom := func(devices sets.Set[string]) bool {
   626  		// When we call callGetPreferredAllocationIfAvailable below, we will release
   627  		// the lock and call the device plugin. If someone calls ListResource concurrently,
   628  		// device manager will recalculate the allocatedDevices map. Some entries with
   629  		// empty sets may be removed, so we reinit here.
   630  		if m.allocatedDevices[resource] == nil {
   631  			m.allocatedDevices[resource] = sets.New[string]()
   632  		}
   633  		for device := range devices.Difference(allocated) {
   634  			m.allocatedDevices[resource].Insert(device)
   635  			allocated.Insert(device)
   636  			needed--
   637  			if needed == 0 {
   638  				return true
   639  			}
   640  		}
   641  		return false
   642  	}
   643  
   644  	// Allocates from reusableDevices list first.
   645  	if allocateRemainingFrom(reusableDevices) {
   646  		return allocated, nil
   647  	}
   648  
   649  	// Gets Devices in use.
   650  	devicesInUse := m.allocatedDevices[resource]
   651  	// Gets Available devices.
   652  	available := m.healthyDevices[resource].Difference(devicesInUse)
   653  	if available.Len() < needed {
   654  		return nil, fmt.Errorf("requested number of devices unavailable for %s. Requested: %d, Available: %d", resource, needed, available.Len())
   655  	}
   656  
   657  	// Filters available Devices based on NUMA affinity.
   658  	aligned, unaligned, noAffinity := m.filterByAffinity(podUID, contName, resource, available)
   659  
   660  	// If we can allocate all remaining devices from the set of aligned ones, then
   661  	// give the plugin the chance to influence which ones to allocate from that set.
   662  	if needed < aligned.Len() {
   663  		// First allocate from the preferred devices list (if available).
   664  		preferred, err := m.callGetPreferredAllocationIfAvailable(podUID, contName, resource, aligned.Union(allocated), allocated, required)
   665  		if err != nil {
   666  			return nil, err
   667  		}
   668  		if allocateRemainingFrom(preferred.Intersection(aligned)) {
   669  			return allocated, nil
   670  		}
   671  		// Then fallback to allocate from the aligned set if no preferred list
   672  		// is returned (or not enough devices are returned in that list).
   673  		if allocateRemainingFrom(aligned) {
   674  			return allocated, nil
   675  		}
   676  
   677  		return nil, fmt.Errorf("unexpectedly allocated less resources than required. Requested: %d, Got: %d", required, required-needed)
   678  	}
   679  
   680  	// If we can't allocate all remaining devices from the set of aligned ones,
   681  	// then start by first allocating all the aligned devices (to ensure
   682  	// that the alignment guaranteed by the TopologyManager is honored).
   683  	if allocateRemainingFrom(aligned) {
   684  		return allocated, nil
   685  	}
   686  
   687  	// Then give the plugin the chance to influence the decision on any
   688  	// remaining devices to allocate.
   689  	preferred, err := m.callGetPreferredAllocationIfAvailable(podUID, contName, resource, available.Union(allocated), allocated, required)
   690  	if err != nil {
   691  		return nil, err
   692  	}
   693  	if allocateRemainingFrom(preferred.Intersection(available)) {
   694  		return allocated, nil
   695  	}
   696  
   697  	// Finally, if the plugin did not return a preferred allocation (or didn't
   698  	// return a large enough one), then fall back to allocating the remaining
   699  	// devices from the 'unaligned' and 'noAffinity' sets.
   700  	if allocateRemainingFrom(unaligned) {
   701  		return allocated, nil
   702  	}
   703  	if allocateRemainingFrom(noAffinity) {
   704  		return allocated, nil
   705  	}
   706  
   707  	return nil, fmt.Errorf("unexpectedly allocated less resources than required. Requested: %d, Got: %d", required, required-needed)
   708  }
   709  
   710  func (m *ManagerImpl) filterByAffinity(podUID, contName, resource string, available sets.Set[string]) (sets.Set[string], sets.Set[string], sets.Set[string]) {
   711  	// If alignment information is not available, just pass the available list back.
   712  	hint := m.topologyAffinityStore.GetAffinity(podUID, contName)
   713  	if !m.deviceHasTopologyAlignment(resource) || hint.NUMANodeAffinity == nil {
   714  		return sets.New[string](), sets.New[string](), available
   715  	}
   716  
   717  	// Build a map of NUMA Nodes to the devices associated with them. A
   718  	// device may be associated to multiple NUMA nodes at the same time. If an
   719  	// available device does not have any NUMA Nodes associated with it, add it
   720  	// to a list of NUMA Nodes for the fake NUMANode -1.
   721  	perNodeDevices := make(map[int]sets.Set[string])
   722  	for d := range available {
   723  		if m.allDevices[resource][d].Topology == nil || len(m.allDevices[resource][d].Topology.Nodes) == 0 {
   724  			if _, ok := perNodeDevices[nodeWithoutTopology]; !ok {
   725  				perNodeDevices[nodeWithoutTopology] = sets.New[string]()
   726  			}
   727  			perNodeDevices[nodeWithoutTopology].Insert(d)
   728  			continue
   729  		}
   730  
   731  		for _, node := range m.allDevices[resource][d].Topology.Nodes {
   732  			if _, ok := perNodeDevices[int(node.ID)]; !ok {
   733  				perNodeDevices[int(node.ID)] = sets.New[string]()
   734  			}
   735  			perNodeDevices[int(node.ID)].Insert(d)
   736  		}
   737  	}
   738  
   739  	// Get a flat list of all the nodes associated with available devices.
   740  	var nodes []int
   741  	for node := range perNodeDevices {
   742  		nodes = append(nodes, node)
   743  	}
   744  
   745  	// Sort the list of nodes by:
   746  	// 1) Nodes contained in the 'hint's affinity set
   747  	// 2) Nodes not contained in the 'hint's affinity set
   748  	// 3) The fake NUMANode of -1 (assuming it is included in the list)
   749  	// Within each of the groups above, sort the nodes by how many devices they contain
   750  	sort.Slice(nodes, func(i, j int) bool {
   751  		// If one or the other of nodes[i] or nodes[j] is in the 'hint's affinity set
   752  		if hint.NUMANodeAffinity.IsSet(nodes[i]) && hint.NUMANodeAffinity.IsSet(nodes[j]) {
   753  			return perNodeDevices[nodes[i]].Len() < perNodeDevices[nodes[j]].Len()
   754  		}
   755  		if hint.NUMANodeAffinity.IsSet(nodes[i]) {
   756  			return true
   757  		}
   758  		if hint.NUMANodeAffinity.IsSet(nodes[j]) {
   759  			return false
   760  		}
   761  
   762  		// If one or the other of nodes[i] or nodes[j] is the fake NUMA node -1 (they can't both be)
   763  		if nodes[i] == nodeWithoutTopology {
   764  			return false
   765  		}
   766  		if nodes[j] == nodeWithoutTopology {
   767  			return true
   768  		}
   769  
   770  		// Otherwise both nodes[i] and nodes[j] are real NUMA nodes that are not in the 'hint's' affinity list.
   771  		return perNodeDevices[nodes[i]].Len() < perNodeDevices[nodes[j]].Len()
   772  	})
   773  
   774  	// Generate three sorted lists of devices. Devices in the first list come
   775  	// from valid NUMA Nodes contained in the affinity mask. Devices in the
   776  	// second list come from valid NUMA Nodes not in the affinity mask. Devices
   777  	// in the third list come from devices with no NUMA Node association (i.e.
   778  	// those mapped to the fake NUMA Node -1). Because we loop through the
   779  	// sorted list of NUMA nodes in order, within each list, devices are sorted
   780  	// by their connection to NUMA Nodes with more devices on them.
   781  	var fromAffinity []string
   782  	var notFromAffinity []string
   783  	var withoutTopology []string
   784  	for d := range available {
   785  		// Since the same device may be associated with multiple NUMA Nodes. We
   786  		// need to be careful not to add each device to multiple lists. The
   787  		// logic below ensures this by breaking after the first NUMA node that
   788  		// has the device is encountered.
   789  		for _, n := range nodes {
   790  			if perNodeDevices[n].Has(d) {
   791  				if n == nodeWithoutTopology {
   792  					withoutTopology = append(withoutTopology, d)
   793  				} else if hint.NUMANodeAffinity.IsSet(n) {
   794  					fromAffinity = append(fromAffinity, d)
   795  				} else {
   796  					notFromAffinity = append(notFromAffinity, d)
   797  				}
   798  				break
   799  			}
   800  		}
   801  	}
   802  
   803  	// Return all three lists containing the full set of devices across them.
   804  	return sets.New[string](fromAffinity...), sets.New[string](notFromAffinity...), sets.New[string](withoutTopology...)
   805  }
   806  
   807  // allocateContainerResources attempts to allocate all of required device
   808  // plugin resources for the input container, issues an Allocate rpc request
   809  // for each new device resource requirement, processes their AllocateResponses,
   810  // and updates the cached containerDevices on success.
   811  func (m *ManagerImpl) allocateContainerResources(pod *v1.Pod, container *v1.Container, devicesToReuse map[string]sets.Set[string]) error {
   812  	podUID := string(pod.UID)
   813  	contName := container.Name
   814  	allocatedDevicesUpdated := false
   815  	needsUpdateCheckpoint := false
   816  	// Extended resources are not allowed to be overcommitted.
   817  	// Since device plugin advertises extended resources,
   818  	// therefore Requests must be equal to Limits and iterating
   819  	// over the Limits should be sufficient.
   820  	for k, v := range container.Resources.Limits {
   821  		resource := string(k)
   822  		needed := int(v.Value())
   823  		klog.V(3).InfoS("Looking for needed resources", "needed", needed, "resourceName", resource)
   824  		if !m.isDevicePluginResource(resource) {
   825  			continue
   826  		}
   827  		// Updates allocatedDevices to garbage collect any stranded resources
   828  		// before doing the device plugin allocation.
   829  		if !allocatedDevicesUpdated {
   830  			m.UpdateAllocatedDevices()
   831  			allocatedDevicesUpdated = true
   832  		}
   833  		allocDevices, err := m.devicesToAllocate(podUID, contName, resource, needed, devicesToReuse[resource])
   834  		if err != nil {
   835  			return err
   836  		}
   837  		if allocDevices == nil || len(allocDevices) <= 0 {
   838  			continue
   839  		}
   840  
   841  		needsUpdateCheckpoint = true
   842  
   843  		startRPCTime := time.Now()
   844  		// Manager.Allocate involves RPC calls to device plugin, which
   845  		// could be heavy-weight. Therefore we want to perform this operation outside
   846  		// mutex lock. Note if Allocate call fails, we may leave container resources
   847  		// partially allocated for the failed container. We rely on UpdateAllocatedDevices()
   848  		// to garbage collect these resources later. Another side effect is that if
   849  		// we have X resource A and Y resource B in total, and two containers, container1
   850  		// and container2 both require X resource A and Y resource B. Both allocation
   851  		// requests may fail if we serve them in mixed order.
   852  		// TODO: may revisit this part later if we see inefficient resource allocation
   853  		// in real use as the result of this. Should also consider to parallelize device
   854  		// plugin Allocate grpc calls if it becomes common that a container may require
   855  		// resources from multiple device plugins.
   856  		m.mutex.Lock()
   857  		eI, ok := m.endpoints[resource]
   858  		m.mutex.Unlock()
   859  		if !ok {
   860  			m.mutex.Lock()
   861  			m.allocatedDevices = m.podDevices.devices()
   862  			m.mutex.Unlock()
   863  			return fmt.Errorf("unknown Device Plugin %s", resource)
   864  		}
   865  
   866  		devs := allocDevices.UnsortedList()
   867  		// TODO: refactor this part of code to just append a ContainerAllocationRequest
   868  		// in a passed in AllocateRequest pointer, and issues a single Allocate call per pod.
   869  		klog.V(3).InfoS("Making allocation request for device plugin", "devices", devs, "resourceName", resource)
   870  		resp, err := eI.e.allocate(devs)
   871  		metrics.DevicePluginAllocationDuration.WithLabelValues(resource).Observe(metrics.SinceInSeconds(startRPCTime))
   872  		if err != nil {
   873  			// In case of allocation failure, we want to restore m.allocatedDevices
   874  			// to the actual allocated state from m.podDevices.
   875  			m.mutex.Lock()
   876  			m.allocatedDevices = m.podDevices.devices()
   877  			m.mutex.Unlock()
   878  			return err
   879  		}
   880  
   881  		if len(resp.ContainerResponses) == 0 {
   882  			return fmt.Errorf("no containers return in allocation response %v", resp)
   883  		}
   884  
   885  		allocDevicesWithNUMA := checkpoint.NewDevicesPerNUMA()
   886  		// Update internal cached podDevices state.
   887  		m.mutex.Lock()
   888  		for dev := range allocDevices {
   889  			if m.allDevices[resource][dev].Topology == nil || len(m.allDevices[resource][dev].Topology.Nodes) == 0 {
   890  				allocDevicesWithNUMA[nodeWithoutTopology] = append(allocDevicesWithNUMA[nodeWithoutTopology], dev)
   891  				continue
   892  			}
   893  			for idx := range m.allDevices[resource][dev].Topology.Nodes {
   894  				node := m.allDevices[resource][dev].Topology.Nodes[idx]
   895  				allocDevicesWithNUMA[node.ID] = append(allocDevicesWithNUMA[node.ID], dev)
   896  			}
   897  		}
   898  		m.mutex.Unlock()
   899  		m.podDevices.insert(podUID, contName, resource, allocDevicesWithNUMA, resp.ContainerResponses[0])
   900  	}
   901  
   902  	if needsUpdateCheckpoint {
   903  		return m.writeCheckpoint()
   904  	}
   905  
   906  	return nil
   907  }
   908  
   909  // checkPodActive checks if the given pod is still in activePods list
   910  func (m *ManagerImpl) checkPodActive(pod *v1.Pod) bool {
   911  	activePods := m.activePods()
   912  	for _, activePod := range activePods {
   913  		if activePod.UID == pod.UID {
   914  			return true
   915  		}
   916  	}
   917  
   918  	return false
   919  }
   920  
   921  // GetDeviceRunContainerOptions checks whether we have cached containerDevices
   922  // for the passed-in <pod, container> and returns its DeviceRunContainerOptions
   923  // for the found one. An empty struct is returned in case no cached state is found.
   924  func (m *ManagerImpl) GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) (*DeviceRunContainerOptions, error) {
   925  	podUID := string(pod.UID)
   926  	contName := container.Name
   927  	needsReAllocate := false
   928  	for k, v := range container.Resources.Limits {
   929  		resource := string(k)
   930  		if !m.isDevicePluginResource(resource) || v.Value() == 0 {
   931  			continue
   932  		}
   933  		err := m.callPreStartContainerIfNeeded(podUID, contName, resource)
   934  		if err != nil {
   935  			return nil, err
   936  		}
   937  
   938  		if !m.checkPodActive(pod) {
   939  			klog.ErrorS(nil, "pod deleted from activePods, skip to reAllocate", "podUID", podUID)
   940  			continue
   941  		}
   942  
   943  		// This is a device plugin resource yet we don't have cached
   944  		// resource state. This is likely due to a race during node
   945  		// restart. We re-issue allocate request to cover this race.
   946  		if m.podDevices.containerDevices(podUID, contName, resource) == nil {
   947  			needsReAllocate = true
   948  		}
   949  	}
   950  	if needsReAllocate {
   951  		klog.V(2).InfoS("Needs to re-allocate device plugin resources for pod", "pod", klog.KObj(pod), "containerName", container.Name)
   952  		if err := m.Allocate(pod, container); err != nil {
   953  			return nil, err
   954  		}
   955  	}
   956  	return m.podDevices.deviceRunContainerOptions(string(pod.UID), container.Name), nil
   957  }
   958  
   959  // callPreStartContainerIfNeeded issues PreStartContainer grpc call for device plugin resource
   960  // with PreStartRequired option set.
   961  func (m *ManagerImpl) callPreStartContainerIfNeeded(podUID, contName, resource string) error {
   962  	m.mutex.Lock()
   963  	eI, ok := m.endpoints[resource]
   964  	if !ok {
   965  		m.mutex.Unlock()
   966  		return fmt.Errorf("endpoint not found in cache for a registered resource: %s", resource)
   967  	}
   968  
   969  	if eI.opts == nil || !eI.opts.PreStartRequired {
   970  		m.mutex.Unlock()
   971  		klog.V(4).InfoS("Plugin options indicate to skip PreStartContainer for resource", "resourceName", resource)
   972  		return nil
   973  	}
   974  
   975  	devices := m.podDevices.containerDevices(podUID, contName, resource)
   976  	if devices == nil {
   977  		m.mutex.Unlock()
   978  		return fmt.Errorf("no devices found allocated in local cache for pod %s, container %s, resource %s", podUID, contName, resource)
   979  	}
   980  
   981  	m.mutex.Unlock()
   982  	devs := devices.UnsortedList()
   983  	klog.V(4).InfoS("Issuing a PreStartContainer call for container", "containerName", contName, "podUID", podUID)
   984  	_, err := eI.e.preStartContainer(devs)
   985  	if err != nil {
   986  		return fmt.Errorf("device plugin PreStartContainer rpc failed with err: %v", err)
   987  	}
   988  	// TODO: Add metrics support for init RPC
   989  	return nil
   990  }
   991  
   992  // callGetPreferredAllocationIfAvailable issues GetPreferredAllocation grpc
   993  // call for device plugin resource with GetPreferredAllocationAvailable option set.
   994  func (m *ManagerImpl) callGetPreferredAllocationIfAvailable(podUID, contName, resource string, available, mustInclude sets.Set[string], size int) (sets.Set[string], error) {
   995  	eI, ok := m.endpoints[resource]
   996  	if !ok {
   997  		return nil, fmt.Errorf("endpoint not found in cache for a registered resource: %s", resource)
   998  	}
   999  
  1000  	if eI.opts == nil || !eI.opts.GetPreferredAllocationAvailable {
  1001  		klog.V(4).InfoS("Plugin options indicate to skip GetPreferredAllocation for resource", "resourceName", resource)
  1002  		return nil, nil
  1003  	}
  1004  
  1005  	m.mutex.Unlock()
  1006  	klog.V(4).InfoS("Issuing a GetPreferredAllocation call for container", "containerName", contName, "podUID", podUID)
  1007  	resp, err := eI.e.getPreferredAllocation(available.UnsortedList(), mustInclude.UnsortedList(), size)
  1008  	m.mutex.Lock()
  1009  	if err != nil {
  1010  		return nil, fmt.Errorf("device plugin GetPreferredAllocation rpc failed with err: %v", err)
  1011  	}
  1012  	if resp != nil && len(resp.ContainerResponses) > 0 {
  1013  		return sets.New[string](resp.ContainerResponses[0].DeviceIDs...), nil
  1014  	}
  1015  	return sets.New[string](), nil
  1016  }
  1017  
  1018  // sanitizeNodeAllocatable scans through allocatedDevices in the device manager
  1019  // and if necessary, updates allocatableResource in nodeInfo to at least equal to
  1020  // the allocated capacity. This allows pods that have already been scheduled on
  1021  // the node to pass GeneralPredicates admission checking even upon device plugin failure.
  1022  func (m *ManagerImpl) sanitizeNodeAllocatable(node *schedulerframework.NodeInfo) {
  1023  	var newAllocatableResource *schedulerframework.Resource
  1024  	allocatableResource := node.Allocatable
  1025  	if allocatableResource.ScalarResources == nil {
  1026  		allocatableResource.ScalarResources = make(map[v1.ResourceName]int64)
  1027  	}
  1028  
  1029  	m.mutex.Lock()
  1030  	defer m.mutex.Unlock()
  1031  	for resource, devices := range m.allocatedDevices {
  1032  		needed := devices.Len()
  1033  		quant, ok := allocatableResource.ScalarResources[v1.ResourceName(resource)]
  1034  		if ok && int(quant) >= needed {
  1035  			continue
  1036  		}
  1037  		// Needs to update nodeInfo.AllocatableResource to make sure
  1038  		// NodeInfo.allocatableResource at least equal to the capacity already allocated.
  1039  		if newAllocatableResource == nil {
  1040  			newAllocatableResource = allocatableResource.Clone()
  1041  		}
  1042  		newAllocatableResource.ScalarResources[v1.ResourceName(resource)] = int64(needed)
  1043  	}
  1044  	if newAllocatableResource != nil {
  1045  		node.Allocatable = newAllocatableResource
  1046  	}
  1047  }
  1048  
  1049  func (m *ManagerImpl) isDevicePluginResource(resource string) bool {
  1050  	m.mutex.Lock()
  1051  	defer m.mutex.Unlock()
  1052  	_, registeredResource := m.healthyDevices[resource]
  1053  	_, allocatedResource := m.allocatedDevices[resource]
  1054  	// Return true if this is either an active device plugin resource or
  1055  	// a resource we have previously allocated.
  1056  	if registeredResource || allocatedResource {
  1057  		return true
  1058  	}
  1059  	return false
  1060  }
  1061  
  1062  // GetAllocatableDevices returns information about all the healthy devices known to the manager
  1063  func (m *ManagerImpl) GetAllocatableDevices() ResourceDeviceInstances {
  1064  	m.mutex.Lock()
  1065  	defer m.mutex.Unlock()
  1066  	resp := m.allDevices.Filter(m.healthyDevices)
  1067  	klog.V(4).InfoS("GetAllocatableDevices", "known", len(m.allDevices), "allocatable", len(resp))
  1068  	return resp
  1069  }
  1070  
  1071  // GetDevices returns the devices used by the specified container
  1072  func (m *ManagerImpl) GetDevices(podUID, containerName string) ResourceDeviceInstances {
  1073  	return m.podDevices.getContainerDevices(podUID, containerName)
  1074  }
  1075  
  1076  // ShouldResetExtendedResourceCapacity returns whether the extended resources should be zeroed or not,
  1077  // depending on whether the node has been recreated. Absence of the checkpoint file strongly indicates the node
  1078  // has been recreated.
  1079  func (m *ManagerImpl) ShouldResetExtendedResourceCapacity() bool {
  1080  	checkpoints, err := m.checkpointManager.ListCheckpoints()
  1081  	if err != nil {
  1082  		return false
  1083  	}
  1084  	return len(checkpoints) == 0
  1085  }
  1086  
  1087  func (m *ManagerImpl) setPodPendingAdmission(pod *v1.Pod) {
  1088  	m.mutex.Lock()
  1089  	defer m.mutex.Unlock()
  1090  
  1091  	m.pendingAdmissionPod = pod
  1092  }
  1093  
  1094  func (m *ManagerImpl) isContainerAlreadyRunning(podUID, cntName string) bool {
  1095  	cntID, err := m.containerMap.GetContainerID(podUID, cntName)
  1096  	if err != nil {
  1097  		klog.V(4).InfoS("container not found in the initial map, assumed NOT running", "podUID", podUID, "containerName", cntName, "err", err)
  1098  		return false
  1099  	}
  1100  
  1101  	// note that if container runtime is down when kubelet restarts, this set will be empty,
  1102  	// so on kubelet restart containers will again fail admission, hitting https://github.com/kubernetes/kubernetes/issues/118559 again.
  1103  	// This scenario should however be rare enough.
  1104  	if !m.containerRunningSet.Has(cntID) {
  1105  		klog.V(4).InfoS("container not present in the initial running set", "podUID", podUID, "containerName", cntName, "containerID", cntID)
  1106  		return false
  1107  	}
  1108  
  1109  	// Once we make it here we know we have a running container.
  1110  	klog.V(4).InfoS("container found in the initial set, assumed running", "podUID", podUID, "containerName", cntName, "containerID", cntID)
  1111  	return true
  1112  }
  1113  

View as plain text