...

Source file src/k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/manager_test.go

Documentation: k8s.io/kubernetes/pkg/kubelet/cm/devicemanager

     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package devicemanager
    18  
    19  import (
    20  	"fmt"
    21  	"os"
    22  	"path/filepath"
    23  	"reflect"
    24  	goruntime "runtime"
    25  	"sync"
    26  	"sync/atomic"
    27  	"testing"
    28  	"time"
    29  
    30  	cadvisorapi "github.com/google/cadvisor/info/v1"
    31  	"github.com/stretchr/testify/assert"
    32  	"github.com/stretchr/testify/require"
    33  	v1 "k8s.io/api/core/v1"
    34  	"k8s.io/apimachinery/pkg/api/resource"
    35  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    36  	"k8s.io/apimachinery/pkg/types"
    37  	"k8s.io/apimachinery/pkg/util/sets"
    38  	"k8s.io/apimachinery/pkg/util/uuid"
    39  	"k8s.io/apimachinery/pkg/util/wait"
    40  	"k8s.io/client-go/tools/record"
    41  	pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
    42  	watcherapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
    43  	"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
    44  	"k8s.io/kubernetes/pkg/kubelet/cm/containermap"
    45  	"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint"
    46  	plugin "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/plugin/v1beta1"
    47  	"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
    48  	"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask"
    49  	"k8s.io/kubernetes/pkg/kubelet/config"
    50  	"k8s.io/kubernetes/pkg/kubelet/lifecycle"
    51  	"k8s.io/kubernetes/pkg/kubelet/pluginmanager"
    52  	schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
    53  )
    54  
    55  const (
    56  	testResourceName = "fake-domain/resource"
    57  )
    58  
    59  func newWrappedManagerImpl(socketPath string, manager *ManagerImpl) *wrappedManagerImpl {
    60  	w := &wrappedManagerImpl{
    61  		ManagerImpl: manager,
    62  		callback:    manager.genericDeviceUpdateCallback,
    63  	}
    64  	w.socketdir, _ = filepath.Split(socketPath)
    65  	w.server, _ = plugin.NewServer(socketPath, w, w)
    66  	return w
    67  }
    68  
    69  type wrappedManagerImpl struct {
    70  	*ManagerImpl
    71  	socketdir string
    72  	callback  func(string, []pluginapi.Device)
    73  }
    74  
    75  func (m *wrappedManagerImpl) PluginListAndWatchReceiver(r string, resp *pluginapi.ListAndWatchResponse) {
    76  	var devices []pluginapi.Device
    77  	for _, d := range resp.Devices {
    78  		devices = append(devices, *d)
    79  	}
    80  	m.callback(r, devices)
    81  }
    82  
    83  func tmpSocketDir() (socketDir, socketName, pluginSocketName string, err error) {
    84  	socketDir, err = os.MkdirTemp("", "device_plugin")
    85  	if err != nil {
    86  		return
    87  	}
    88  	socketName = filepath.Join(socketDir, "server.sock")
    89  	pluginSocketName = filepath.Join(socketDir, "device-plugin.sock")
    90  	os.MkdirAll(socketDir, 0755)
    91  	return
    92  }
    93  
    94  func TestNewManagerImpl(t *testing.T) {
    95  	socketDir, socketName, _, err := tmpSocketDir()
    96  	topologyStore := topologymanager.NewFakeManager()
    97  	require.NoError(t, err)
    98  	defer os.RemoveAll(socketDir)
    99  	_, err = newManagerImpl(socketName, nil, topologyStore)
   100  	require.NoError(t, err)
   101  	os.RemoveAll(socketDir)
   102  }
   103  
   104  func TestNewManagerImplStart(t *testing.T) {
   105  	socketDir, socketName, pluginSocketName, err := tmpSocketDir()
   106  	require.NoError(t, err)
   107  	defer os.RemoveAll(socketDir)
   108  	m, _, p := setup(t, []*pluginapi.Device{}, func(n string, d []pluginapi.Device) {}, socketName, pluginSocketName)
   109  	cleanup(t, m, p)
   110  	// Stop should tolerate being called more than once.
   111  	cleanup(t, m, p)
   112  }
   113  
   114  func TestNewManagerImplStartProbeMode(t *testing.T) {
   115  	socketDir, socketName, pluginSocketName, err := tmpSocketDir()
   116  	require.NoError(t, err)
   117  	defer os.RemoveAll(socketDir)
   118  	m, _, p, _ := setupInProbeMode(t, []*pluginapi.Device{}, func(n string, d []pluginapi.Device) {}, socketName, pluginSocketName)
   119  	cleanup(t, m, p)
   120  }
   121  
   122  // Tests that the device plugin manager correctly handles registration and re-registration by
   123  // making sure that after registration, devices are correctly updated and if a re-registration
   124  // happens, we will NOT delete devices; and no orphaned devices left.
   125  func TestDevicePluginReRegistration(t *testing.T) {
   126  	// TODO: Remove skip once https://github.com/kubernetes/kubernetes/pull/115269 merges.
   127  	if goruntime.GOOS == "windows" {
   128  		t.Skip("Skipping test on Windows.")
   129  	}
   130  	socketDir, socketName, pluginSocketName, err := tmpSocketDir()
   131  	require.NoError(t, err)
   132  	defer os.RemoveAll(socketDir)
   133  	devs := []*pluginapi.Device{
   134  		{ID: "Dev1", Health: pluginapi.Healthy},
   135  		{ID: "Dev2", Health: pluginapi.Healthy},
   136  	}
   137  	devsForRegistration := []*pluginapi.Device{
   138  		{ID: "Dev3", Health: pluginapi.Healthy},
   139  	}
   140  	for _, preStartContainerFlag := range []bool{false, true} {
   141  		for _, getPreferredAllocationFlag := range []bool{false, true} {
   142  			m, ch, p1 := setup(t, devs, nil, socketName, pluginSocketName)
   143  			p1.Register(socketName, testResourceName, "")
   144  
   145  			select {
   146  			case <-ch:
   147  			case <-time.After(5 * time.Second):
   148  				t.Fatalf("timeout while waiting for manager update")
   149  			}
   150  			capacity, allocatable, _ := m.GetCapacity()
   151  			resourceCapacity := capacity[v1.ResourceName(testResourceName)]
   152  			resourceAllocatable := allocatable[v1.ResourceName(testResourceName)]
   153  			require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable")
   154  			require.Equal(t, int64(2), resourceAllocatable.Value(), "Devices are not updated.")
   155  
   156  			p2 := plugin.NewDevicePluginStub(devs, pluginSocketName+".new", testResourceName, preStartContainerFlag, getPreferredAllocationFlag)
   157  			err = p2.Start()
   158  			require.NoError(t, err)
   159  			p2.Register(socketName, testResourceName, "")
   160  
   161  			select {
   162  			case <-ch:
   163  			case <-time.After(5 * time.Second):
   164  				t.Fatalf("timeout while waiting for manager update")
   165  			}
   166  			capacity, allocatable, _ = m.GetCapacity()
   167  			resourceCapacity = capacity[v1.ResourceName(testResourceName)]
   168  			resourceAllocatable = allocatable[v1.ResourceName(testResourceName)]
   169  			require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable")
   170  			require.Equal(t, int64(2), resourceAllocatable.Value(), "Devices shouldn't change.")
   171  
   172  			// Test the scenario that a plugin re-registers with different devices.
   173  			p3 := plugin.NewDevicePluginStub(devsForRegistration, pluginSocketName+".third", testResourceName, preStartContainerFlag, getPreferredAllocationFlag)
   174  			err = p3.Start()
   175  			require.NoError(t, err)
   176  			p3.Register(socketName, testResourceName, "")
   177  
   178  			select {
   179  			case <-ch:
   180  			case <-time.After(5 * time.Second):
   181  				t.Fatalf("timeout while waiting for manager update")
   182  			}
   183  			capacity, allocatable, _ = m.GetCapacity()
   184  			resourceCapacity = capacity[v1.ResourceName(testResourceName)]
   185  			resourceAllocatable = allocatable[v1.ResourceName(testResourceName)]
   186  			require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable")
   187  			require.Equal(t, int64(1), resourceAllocatable.Value(), "Devices of plugin previously registered should be removed.")
   188  			p2.Stop()
   189  			p3.Stop()
   190  			cleanup(t, m, p1)
   191  		}
   192  	}
   193  }
   194  
   195  // Tests that the device plugin manager correctly handles registration and re-registration by
   196  // making sure that after registration, devices are correctly updated and if a re-registration
   197  // happens, we will NOT delete devices; and no orphaned devices left.
   198  // While testing above scenario, plugin discovery and registration will be done using
   199  // Kubelet probe based mechanism
   200  func TestDevicePluginReRegistrationProbeMode(t *testing.T) {
   201  	// TODO: Remove skip once https://github.com/kubernetes/kubernetes/pull/115269 merges.
   202  	if goruntime.GOOS == "windows" {
   203  		t.Skip("Skipping test on Windows.")
   204  	}
   205  	socketDir, socketName, pluginSocketName, err := tmpSocketDir()
   206  	require.NoError(t, err)
   207  	defer os.RemoveAll(socketDir)
   208  	devs := []*pluginapi.Device{
   209  		{ID: "Dev1", Health: pluginapi.Healthy},
   210  		{ID: "Dev2", Health: pluginapi.Healthy},
   211  	}
   212  	devsForRegistration := []*pluginapi.Device{
   213  		{ID: "Dev3", Health: pluginapi.Healthy},
   214  	}
   215  
   216  	m, ch, p1, _ := setupInProbeMode(t, devs, nil, socketName, pluginSocketName)
   217  
   218  	// Wait for the first callback to be issued.
   219  	select {
   220  	case <-ch:
   221  	case <-time.After(5 * time.Second):
   222  		t.FailNow()
   223  	}
   224  	capacity, allocatable, _ := m.GetCapacity()
   225  	resourceCapacity := capacity[v1.ResourceName(testResourceName)]
   226  	resourceAllocatable := allocatable[v1.ResourceName(testResourceName)]
   227  	require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable")
   228  	require.Equal(t, int64(2), resourceAllocatable.Value(), "Devices are not updated.")
   229  
   230  	p2 := plugin.NewDevicePluginStub(devs, pluginSocketName+".new", testResourceName, false, false)
   231  	err = p2.Start()
   232  	require.NoError(t, err)
   233  	// Wait for the second callback to be issued.
   234  	select {
   235  	case <-ch:
   236  	case <-time.After(5 * time.Second):
   237  		t.FailNow()
   238  	}
   239  
   240  	capacity, allocatable, _ = m.GetCapacity()
   241  	resourceCapacity = capacity[v1.ResourceName(testResourceName)]
   242  	resourceAllocatable = allocatable[v1.ResourceName(testResourceName)]
   243  	require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable")
   244  	require.Equal(t, int64(2), resourceAllocatable.Value(), "Devices are not updated.")
   245  
   246  	// Test the scenario that a plugin re-registers with different devices.
   247  	p3 := plugin.NewDevicePluginStub(devsForRegistration, pluginSocketName+".third", testResourceName, false, false)
   248  	err = p3.Start()
   249  	require.NoError(t, err)
   250  	// Wait for the third callback to be issued.
   251  	select {
   252  	case <-ch:
   253  	case <-time.After(5 * time.Second):
   254  		t.FailNow()
   255  	}
   256  
   257  	capacity, allocatable, _ = m.GetCapacity()
   258  	resourceCapacity = capacity[v1.ResourceName(testResourceName)]
   259  	resourceAllocatable = allocatable[v1.ResourceName(testResourceName)]
   260  	require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable")
   261  	require.Equal(t, int64(1), resourceAllocatable.Value(), "Devices of previous registered should be removed")
   262  	p2.Stop()
   263  	p3.Stop()
   264  	cleanup(t, m, p1)
   265  }
   266  
   267  func setupDeviceManager(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string,
   268  	topology []cadvisorapi.Node) (Manager, <-chan interface{}) {
   269  	topologyStore := topologymanager.NewFakeManager()
   270  	m, err := newManagerImpl(socketName, topology, topologyStore)
   271  	require.NoError(t, err)
   272  	updateChan := make(chan interface{})
   273  
   274  	w := newWrappedManagerImpl(socketName, m)
   275  	if callback != nil {
   276  		w.callback = callback
   277  	}
   278  
   279  	originalCallback := w.callback
   280  	w.callback = func(resourceName string, devices []pluginapi.Device) {
   281  		originalCallback(resourceName, devices)
   282  		updateChan <- new(interface{})
   283  	}
   284  	activePods := func() []*v1.Pod {
   285  		return []*v1.Pod{}
   286  	}
   287  
   288  	// test steady state, initialization where sourcesReady, containerMap and containerRunningSet
   289  	// are relevant will be tested with a different flow
   290  	err = w.Start(activePods, &sourcesReadyStub{}, containermap.NewContainerMap(), sets.New[string]())
   291  	require.NoError(t, err)
   292  
   293  	return w, updateChan
   294  }
   295  
   296  func setupDevicePlugin(t *testing.T, devs []*pluginapi.Device, pluginSocketName string) *plugin.Stub {
   297  	p := plugin.NewDevicePluginStub(devs, pluginSocketName, testResourceName, false, false)
   298  	err := p.Start()
   299  	require.NoError(t, err)
   300  	return p
   301  }
   302  
   303  func setupPluginManager(t *testing.T, pluginSocketName string, m Manager) pluginmanager.PluginManager {
   304  	pluginManager := pluginmanager.NewPluginManager(
   305  		filepath.Dir(pluginSocketName), /* sockDir */
   306  		&record.FakeRecorder{},
   307  	)
   308  
   309  	runPluginManager(pluginManager)
   310  	pluginManager.AddHandler(watcherapi.DevicePlugin, m.GetWatcherHandler())
   311  	return pluginManager
   312  }
   313  
   314  func runPluginManager(pluginManager pluginmanager.PluginManager) {
   315  	// FIXME: Replace sets.String with sets.Set[string]
   316  	sourcesReady := config.NewSourcesReady(func(_ sets.String) bool { return true })
   317  	go pluginManager.Run(sourcesReady, wait.NeverStop)
   318  }
   319  
   320  func setup(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string) (Manager, <-chan interface{}, *plugin.Stub) {
   321  	m, updateChan := setupDeviceManager(t, devs, callback, socketName, nil)
   322  	p := setupDevicePlugin(t, devs, pluginSocketName)
   323  	return m, updateChan, p
   324  }
   325  
   326  func setupInProbeMode(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string) (Manager, <-chan interface{}, *plugin.Stub, pluginmanager.PluginManager) {
   327  	m, updateChan := setupDeviceManager(t, devs, callback, socketName, nil)
   328  	p := setupDevicePlugin(t, devs, pluginSocketName)
   329  	pm := setupPluginManager(t, pluginSocketName, m)
   330  	return m, updateChan, p, pm
   331  }
   332  
   333  func cleanup(t *testing.T, m Manager, p *plugin.Stub) {
   334  	p.Stop()
   335  	m.Stop()
   336  }
   337  
   338  func TestUpdateCapacityAllocatable(t *testing.T) {
   339  	socketDir, socketName, _, err := tmpSocketDir()
   340  	topologyStore := topologymanager.NewFakeManager()
   341  	require.NoError(t, err)
   342  	defer os.RemoveAll(socketDir)
   343  	testManager, err := newManagerImpl(socketName, nil, topologyStore)
   344  	as := assert.New(t)
   345  	as.NotNil(testManager)
   346  	as.Nil(err)
   347  
   348  	devs := []pluginapi.Device{
   349  		{ID: "Device1", Health: pluginapi.Healthy},
   350  		{ID: "Device2", Health: pluginapi.Healthy},
   351  		{ID: "Device3", Health: pluginapi.Unhealthy},
   352  	}
   353  	callback := testManager.genericDeviceUpdateCallback
   354  
   355  	// Adds three devices for resource1, two healthy and one unhealthy.
   356  	// Expects capacity for resource1 to be 2.
   357  	resourceName1 := "domain1.com/resource1"
   358  	e1 := &endpointImpl{}
   359  	testManager.endpoints[resourceName1] = endpointInfo{e: e1, opts: nil}
   360  	callback(resourceName1, devs)
   361  	capacity, allocatable, removedResources := testManager.GetCapacity()
   362  	resource1Capacity, ok := capacity[v1.ResourceName(resourceName1)]
   363  	as.True(ok)
   364  	resource1Allocatable, ok := allocatable[v1.ResourceName(resourceName1)]
   365  	as.True(ok)
   366  	as.Equal(int64(3), resource1Capacity.Value())
   367  	as.Equal(int64(2), resource1Allocatable.Value())
   368  	as.Equal(0, len(removedResources))
   369  
   370  	// Deletes an unhealthy device should NOT change allocatable but change capacity.
   371  	devs1 := devs[:len(devs)-1]
   372  	callback(resourceName1, devs1)
   373  	capacity, allocatable, removedResources = testManager.GetCapacity()
   374  	resource1Capacity, ok = capacity[v1.ResourceName(resourceName1)]
   375  	as.True(ok)
   376  	resource1Allocatable, ok = allocatable[v1.ResourceName(resourceName1)]
   377  	as.True(ok)
   378  	as.Equal(int64(2), resource1Capacity.Value())
   379  	as.Equal(int64(2), resource1Allocatable.Value())
   380  	as.Equal(0, len(removedResources))
   381  
   382  	// Updates a healthy device to unhealthy should reduce allocatable by 1.
   383  	devs[1].Health = pluginapi.Unhealthy
   384  	callback(resourceName1, devs)
   385  	capacity, allocatable, removedResources = testManager.GetCapacity()
   386  	resource1Capacity, ok = capacity[v1.ResourceName(resourceName1)]
   387  	as.True(ok)
   388  	resource1Allocatable, ok = allocatable[v1.ResourceName(resourceName1)]
   389  	as.True(ok)
   390  	as.Equal(int64(3), resource1Capacity.Value())
   391  	as.Equal(int64(1), resource1Allocatable.Value())
   392  	as.Equal(0, len(removedResources))
   393  
   394  	// Deletes a healthy device should reduce capacity and allocatable by 1.
   395  	devs2 := devs[1:]
   396  	callback(resourceName1, devs2)
   397  	capacity, allocatable, removedResources = testManager.GetCapacity()
   398  	resource1Capacity, ok = capacity[v1.ResourceName(resourceName1)]
   399  	as.True(ok)
   400  	resource1Allocatable, ok = allocatable[v1.ResourceName(resourceName1)]
   401  	as.True(ok)
   402  	as.Equal(int64(0), resource1Allocatable.Value())
   403  	as.Equal(int64(2), resource1Capacity.Value())
   404  	as.Equal(0, len(removedResources))
   405  
   406  	// Tests adding another resource.
   407  	resourceName2 := "resource2"
   408  	e2 := &endpointImpl{}
   409  	e2.client = plugin.NewPluginClient(resourceName2, socketName, testManager)
   410  	testManager.endpoints[resourceName2] = endpointInfo{e: e2, opts: nil}
   411  	callback(resourceName2, devs)
   412  	capacity, allocatable, removedResources = testManager.GetCapacity()
   413  	as.Equal(2, len(capacity))
   414  	resource2Capacity, ok := capacity[v1.ResourceName(resourceName2)]
   415  	as.True(ok)
   416  	resource2Allocatable, ok := allocatable[v1.ResourceName(resourceName2)]
   417  	as.True(ok)
   418  	as.Equal(int64(3), resource2Capacity.Value())
   419  	as.Equal(int64(1), resource2Allocatable.Value())
   420  	as.Equal(0, len(removedResources))
   421  
   422  	// Expires resourceName1 endpoint. Verifies testManager.GetCapacity() reports that resourceName1
   423  	// is removed from capacity and it no longer exists in healthyDevices after the call.
   424  	e1.setStopTime(time.Now().Add(-1*endpointStopGracePeriod - time.Duration(10)*time.Second))
   425  	capacity, allocatable, removed := testManager.GetCapacity()
   426  	as.Equal([]string{resourceName1}, removed)
   427  	as.NotContains(capacity, v1.ResourceName(resourceName1))
   428  	as.NotContains(allocatable, v1.ResourceName(resourceName1))
   429  	val, ok := capacity[v1.ResourceName(resourceName2)]
   430  	as.True(ok)
   431  	as.Equal(int64(3), val.Value())
   432  	as.NotContains(testManager.healthyDevices, resourceName1)
   433  	as.NotContains(testManager.unhealthyDevices, resourceName1)
   434  	as.NotContains(testManager.endpoints, resourceName1)
   435  	as.Equal(1, len(testManager.endpoints))
   436  
   437  	// Stops resourceName2 endpoint. Verifies its stopTime is set, allocate and
   438  	// preStartContainer calls return errors.
   439  	e2.client.Disconnect()
   440  	as.False(e2.stopTime.IsZero())
   441  	_, err = e2.allocate([]string{"Device1"})
   442  	reflect.DeepEqual(err, fmt.Errorf(errEndpointStopped, e2))
   443  	_, err = e2.preStartContainer([]string{"Device1"})
   444  	reflect.DeepEqual(err, fmt.Errorf(errEndpointStopped, e2))
   445  	// Marks resourceName2 unhealthy and verifies its capacity/allocatable are
   446  	// correctly updated.
   447  	testManager.markResourceUnhealthy(resourceName2)
   448  	capacity, allocatable, removed = testManager.GetCapacity()
   449  	val, ok = capacity[v1.ResourceName(resourceName2)]
   450  	as.True(ok)
   451  	as.Equal(int64(3), val.Value())
   452  	val, ok = allocatable[v1.ResourceName(resourceName2)]
   453  	as.True(ok)
   454  	as.Equal(int64(0), val.Value())
   455  	as.Empty(removed)
   456  	// Writes and re-reads checkpoints. Verifies we create a stopped endpoint
   457  	// for resourceName2, its capacity is set to zero, and we still consider
   458  	// it as a DevicePlugin resource. This makes sure any pod that was scheduled
   459  	// during the time of propagating capacity change to the scheduler will be
   460  	// properly rejected instead of being incorrectly started.
   461  	err = testManager.writeCheckpoint()
   462  	as.Nil(err)
   463  	testManager.healthyDevices = make(map[string]sets.Set[string])
   464  	testManager.unhealthyDevices = make(map[string]sets.Set[string])
   465  	err = testManager.readCheckpoint()
   466  	as.Nil(err)
   467  	as.Equal(1, len(testManager.endpoints))
   468  	as.Contains(testManager.endpoints, resourceName2)
   469  	capacity, allocatable, removed = testManager.GetCapacity()
   470  	val, ok = capacity[v1.ResourceName(resourceName2)]
   471  	as.True(ok)
   472  	as.Equal(int64(0), val.Value())
   473  	val, ok = allocatable[v1.ResourceName(resourceName2)]
   474  	as.True(ok)
   475  	as.Equal(int64(0), val.Value())
   476  	as.Empty(removed)
   477  	as.True(testManager.isDevicePluginResource(resourceName2))
   478  }
   479  
   480  func TestGetAllocatableDevicesMultipleResources(t *testing.T) {
   481  	socketDir, socketName, _, err := tmpSocketDir()
   482  	topologyStore := topologymanager.NewFakeManager()
   483  	require.NoError(t, err)
   484  	defer os.RemoveAll(socketDir)
   485  	testManager, err := newManagerImpl(socketName, nil, topologyStore)
   486  	as := assert.New(t)
   487  	as.NotNil(testManager)
   488  	as.Nil(err)
   489  
   490  	resource1Devs := []pluginapi.Device{
   491  		{ID: "R1Device1", Health: pluginapi.Healthy},
   492  		{ID: "R1Device2", Health: pluginapi.Healthy},
   493  		{ID: "R1Device3", Health: pluginapi.Unhealthy},
   494  	}
   495  	resourceName1 := "domain1.com/resource1"
   496  	e1 := &endpointImpl{}
   497  	testManager.endpoints[resourceName1] = endpointInfo{e: e1, opts: nil}
   498  	testManager.genericDeviceUpdateCallback(resourceName1, resource1Devs)
   499  
   500  	resource2Devs := []pluginapi.Device{
   501  		{ID: "R2Device1", Health: pluginapi.Healthy},
   502  	}
   503  	resourceName2 := "other.domain2.org/resource2"
   504  	e2 := &endpointImpl{}
   505  	testManager.endpoints[resourceName2] = endpointInfo{e: e2, opts: nil}
   506  	testManager.genericDeviceUpdateCallback(resourceName2, resource2Devs)
   507  
   508  	allocatableDevs := testManager.GetAllocatableDevices()
   509  	as.Equal(2, len(allocatableDevs))
   510  
   511  	devInstances1, ok := allocatableDevs[resourceName1]
   512  	as.True(ok)
   513  	checkAllocatableDevicesConsistsOf(as, devInstances1, []string{"R1Device1", "R1Device2"})
   514  
   515  	devInstances2, ok := allocatableDevs[resourceName2]
   516  	as.True(ok)
   517  	checkAllocatableDevicesConsistsOf(as, devInstances2, []string{"R2Device1"})
   518  
   519  }
   520  
   521  func TestGetAllocatableDevicesHealthTransition(t *testing.T) {
   522  	socketDir, socketName, _, err := tmpSocketDir()
   523  	topologyStore := topologymanager.NewFakeManager()
   524  	require.NoError(t, err)
   525  	defer os.RemoveAll(socketDir)
   526  	testManager, err := newManagerImpl(socketName, nil, topologyStore)
   527  	as := assert.New(t)
   528  	as.NotNil(testManager)
   529  	as.Nil(err)
   530  
   531  	resource1Devs := []pluginapi.Device{
   532  		{ID: "R1Device1", Health: pluginapi.Healthy},
   533  		{ID: "R1Device2", Health: pluginapi.Healthy},
   534  		{ID: "R1Device3", Health: pluginapi.Unhealthy},
   535  	}
   536  
   537  	// Adds three devices for resource1, two healthy and one unhealthy.
   538  	// Expects allocatable devices for resource1 to be 2.
   539  	resourceName1 := "domain1.com/resource1"
   540  	e1 := &endpointImpl{}
   541  	testManager.endpoints[resourceName1] = endpointInfo{e: e1, opts: nil}
   542  
   543  	testManager.genericDeviceUpdateCallback(resourceName1, resource1Devs)
   544  
   545  	allocatableDevs := testManager.GetAllocatableDevices()
   546  	as.Equal(1, len(allocatableDevs))
   547  	devInstances, ok := allocatableDevs[resourceName1]
   548  	as.True(ok)
   549  	checkAllocatableDevicesConsistsOf(as, devInstances, []string{"R1Device1", "R1Device2"})
   550  
   551  	// Unhealthy device becomes healthy
   552  	resource1Devs = []pluginapi.Device{
   553  		{ID: "R1Device1", Health: pluginapi.Healthy},
   554  		{ID: "R1Device2", Health: pluginapi.Healthy},
   555  		{ID: "R1Device3", Health: pluginapi.Healthy},
   556  	}
   557  	testManager.genericDeviceUpdateCallback(resourceName1, resource1Devs)
   558  
   559  	allocatableDevs = testManager.GetAllocatableDevices()
   560  	as.Equal(1, len(allocatableDevs))
   561  	devInstances, ok = allocatableDevs[resourceName1]
   562  	as.True(ok)
   563  	checkAllocatableDevicesConsistsOf(as, devInstances, []string{"R1Device1", "R1Device2", "R1Device3"})
   564  }
   565  
   566  func checkAllocatableDevicesConsistsOf(as *assert.Assertions, devInstances DeviceInstances, expectedDevs []string) {
   567  	as.Equal(len(expectedDevs), len(devInstances))
   568  	for _, deviceID := range expectedDevs {
   569  		_, ok := devInstances[deviceID]
   570  		as.True(ok)
   571  	}
   572  }
   573  
   574  func constructDevices(devices []string) checkpoint.DevicesPerNUMA {
   575  	ret := checkpoint.DevicesPerNUMA{}
   576  	for _, dev := range devices {
   577  		ret[0] = append(ret[0], dev)
   578  	}
   579  	return ret
   580  }
   581  
   582  // containerAllocateResponseBuilder is a helper to build a ContainerAllocateResponse
   583  type containerAllocateResponseBuilder struct {
   584  	devices    map[string]string
   585  	mounts     map[string]string
   586  	envs       map[string]string
   587  	cdiDevices []string
   588  }
   589  
   590  // containerAllocateResponseBuilderOption defines a functional option for a containerAllocateResponseBuilder
   591  type containerAllocateResponseBuilderOption func(*containerAllocateResponseBuilder)
   592  
   593  // withDevices sets the devices for the containerAllocateResponseBuilder
   594  func withDevices(devices map[string]string) containerAllocateResponseBuilderOption {
   595  	return func(b *containerAllocateResponseBuilder) {
   596  		b.devices = devices
   597  	}
   598  }
   599  
   600  // withMounts sets the mounts for the containerAllocateResponseBuilder
   601  func withMounts(mounts map[string]string) containerAllocateResponseBuilderOption {
   602  	return func(b *containerAllocateResponseBuilder) {
   603  		b.mounts = mounts
   604  	}
   605  }
   606  
   607  // withEnvs sets the envs for the containerAllocateResponseBuilder
   608  func withEnvs(envs map[string]string) containerAllocateResponseBuilderOption {
   609  	return func(b *containerAllocateResponseBuilder) {
   610  		b.envs = envs
   611  	}
   612  }
   613  
   614  // withCDIDevices sets the cdiDevices for the containerAllocateResponseBuilder
   615  func withCDIDevices(cdiDevices ...string) containerAllocateResponseBuilderOption {
   616  	return func(b *containerAllocateResponseBuilder) {
   617  		b.cdiDevices = cdiDevices
   618  	}
   619  }
   620  
   621  // newContainerAllocateResponse creates a ContainerAllocateResponse with the given options.
   622  func newContainerAllocateResponse(opts ...containerAllocateResponseBuilderOption) *pluginapi.ContainerAllocateResponse {
   623  	b := &containerAllocateResponseBuilder{}
   624  	for _, opt := range opts {
   625  		opt(b)
   626  	}
   627  
   628  	return b.Build()
   629  }
   630  
   631  // Build uses the configured builder to create a ContainerAllocateResponse.
   632  func (b *containerAllocateResponseBuilder) Build() *pluginapi.ContainerAllocateResponse {
   633  	resp := &pluginapi.ContainerAllocateResponse{}
   634  	for k, v := range b.devices {
   635  		resp.Devices = append(resp.Devices, &pluginapi.DeviceSpec{
   636  			HostPath:      k,
   637  			ContainerPath: v,
   638  			Permissions:   "mrw",
   639  		})
   640  	}
   641  	for k, v := range b.mounts {
   642  		resp.Mounts = append(resp.Mounts, &pluginapi.Mount{
   643  			ContainerPath: k,
   644  			HostPath:      v,
   645  			ReadOnly:      true,
   646  		})
   647  	}
   648  	resp.Envs = make(map[string]string)
   649  	for k, v := range b.envs {
   650  		resp.Envs[k] = v
   651  	}
   652  
   653  	var cdiDevices []*pluginapi.CDIDevice
   654  	for _, dev := range b.cdiDevices {
   655  		cdiDevice := pluginapi.CDIDevice{
   656  			Name: dev,
   657  		}
   658  		cdiDevices = append(cdiDevices, &cdiDevice)
   659  	}
   660  	resp.CDIDevices = cdiDevices
   661  
   662  	return resp
   663  }
   664  
   665  func TestCheckpoint(t *testing.T) {
   666  	resourceName1 := "domain1.com/resource1"
   667  	resourceName2 := "domain2.com/resource2"
   668  	resourceName3 := "domain2.com/resource3"
   669  	as := assert.New(t)
   670  	tmpDir, err := os.MkdirTemp("", "checkpoint")
   671  	as.Nil(err)
   672  	defer os.RemoveAll(tmpDir)
   673  	ckm, err := checkpointmanager.NewCheckpointManager(tmpDir)
   674  	as.Nil(err)
   675  	testManager := &ManagerImpl{
   676  		endpoints:         make(map[string]endpointInfo),
   677  		healthyDevices:    make(map[string]sets.Set[string]),
   678  		unhealthyDevices:  make(map[string]sets.Set[string]),
   679  		allocatedDevices:  make(map[string]sets.Set[string]),
   680  		podDevices:        newPodDevices(),
   681  		checkpointManager: ckm,
   682  	}
   683  
   684  	testManager.podDevices.insert("pod1", "con1", resourceName1,
   685  		constructDevices([]string{"dev1", "dev2"}),
   686  		newContainerAllocateResponse(
   687  			withDevices(map[string]string{"/dev/r1dev1": "/dev/r1dev1", "/dev/r1dev2": "/dev/r1dev2"}),
   688  			withMounts(map[string]string{"/home/r1lib1": "/usr/r1lib1"}),
   689  			withCDIDevices("domain1.com/resource1=dev1", "domain1.com/resource1=dev2"),
   690  		),
   691  	)
   692  	testManager.podDevices.insert("pod1", "con1", resourceName2,
   693  		constructDevices([]string{"dev1", "dev2"}),
   694  		newContainerAllocateResponse(
   695  			withDevices(map[string]string{"/dev/r2dev1": "/dev/r2dev1", "/dev/r2dev2": "/dev/r2dev2"}),
   696  			withMounts(map[string]string{"/home/r2lib1": "/usr/r2lib1"}),
   697  			withEnvs(map[string]string{"r2devices": "dev1 dev2"}),
   698  		),
   699  	)
   700  	testManager.podDevices.insert("pod1", "con2", resourceName1,
   701  		constructDevices([]string{"dev3"}),
   702  		newContainerAllocateResponse(
   703  			withDevices(map[string]string{"/dev/r1dev3": "/dev/r1dev3"}),
   704  			withMounts(map[string]string{"/home/r1lib1": "/usr/r1lib1"}),
   705  		),
   706  	)
   707  	testManager.podDevices.insert("pod2", "con1", resourceName1,
   708  		constructDevices([]string{"dev4"}),
   709  		newContainerAllocateResponse(
   710  			withDevices(map[string]string{"/dev/r1dev4": "/dev/r1dev4"}),
   711  			withMounts(map[string]string{"/home/r1lib1": "/usr/r1lib1"}),
   712  		),
   713  	)
   714  	testManager.podDevices.insert("pod3", "con3", resourceName3,
   715  		checkpoint.DevicesPerNUMA{nodeWithoutTopology: []string{"dev5"}},
   716  		newContainerAllocateResponse(
   717  			withDevices(map[string]string{"/dev/r3dev5": "/dev/r3dev5"}),
   718  			withMounts(map[string]string{"/home/r3lib1": "/usr/r3lib1"}),
   719  		),
   720  	)
   721  
   722  	testManager.healthyDevices[resourceName1] = sets.New[string]()
   723  	testManager.healthyDevices[resourceName1].Insert("dev1")
   724  	testManager.healthyDevices[resourceName1].Insert("dev2")
   725  	testManager.healthyDevices[resourceName1].Insert("dev3")
   726  	testManager.healthyDevices[resourceName1].Insert("dev4")
   727  	testManager.healthyDevices[resourceName1].Insert("dev5")
   728  	testManager.healthyDevices[resourceName2] = sets.New[string]()
   729  	testManager.healthyDevices[resourceName2].Insert("dev1")
   730  	testManager.healthyDevices[resourceName2].Insert("dev2")
   731  	testManager.healthyDevices[resourceName3] = sets.New[string]()
   732  	testManager.healthyDevices[resourceName3].Insert("dev5")
   733  
   734  	expectedPodDevices := testManager.podDevices
   735  	expectedAllocatedDevices := testManager.podDevices.devices()
   736  	expectedAllDevices := testManager.healthyDevices
   737  
   738  	err = testManager.writeCheckpoint()
   739  
   740  	as.Nil(err)
   741  	testManager.podDevices = newPodDevices()
   742  	err = testManager.readCheckpoint()
   743  	as.Nil(err)
   744  
   745  	as.Equal(expectedPodDevices.size(), testManager.podDevices.size())
   746  	for podUID, containerDevices := range expectedPodDevices.devs {
   747  		for conName, resources := range containerDevices {
   748  			for resource := range resources {
   749  				expDevices := expectedPodDevices.containerDevices(podUID, conName, resource)
   750  				testDevices := testManager.podDevices.containerDevices(podUID, conName, resource)
   751  				as.True(reflect.DeepEqual(expDevices, testDevices))
   752  				opts1 := expectedPodDevices.deviceRunContainerOptions(podUID, conName)
   753  				opts2 := testManager.podDevices.deviceRunContainerOptions(podUID, conName)
   754  				as.Equal(len(opts1.Envs), len(opts2.Envs))
   755  				as.Equal(len(opts1.Mounts), len(opts2.Mounts))
   756  				as.Equal(len(opts1.Devices), len(opts2.Devices))
   757  			}
   758  		}
   759  	}
   760  	as.True(reflect.DeepEqual(expectedAllocatedDevices, testManager.allocatedDevices))
   761  	as.True(reflect.DeepEqual(expectedAllDevices, testManager.healthyDevices))
   762  }
   763  
   764  type activePodsStub struct {
   765  	activePods []*v1.Pod
   766  }
   767  
   768  func (a *activePodsStub) getActivePods() []*v1.Pod {
   769  	return a.activePods
   770  }
   771  
   772  func (a *activePodsStub) updateActivePods(newPods []*v1.Pod) {
   773  	a.activePods = newPods
   774  }
   775  
   776  type MockEndpoint struct {
   777  	getPreferredAllocationFunc func(available, mustInclude []string, size int) (*pluginapi.PreferredAllocationResponse, error)
   778  	allocateFunc               func(devs []string) (*pluginapi.AllocateResponse, error)
   779  	initChan                   chan []string
   780  }
   781  
   782  func (m *MockEndpoint) preStartContainer(devs []string) (*pluginapi.PreStartContainerResponse, error) {
   783  	m.initChan <- devs
   784  	return &pluginapi.PreStartContainerResponse{}, nil
   785  }
   786  
   787  func (m *MockEndpoint) getPreferredAllocation(available, mustInclude []string, size int) (*pluginapi.PreferredAllocationResponse, error) {
   788  	if m.getPreferredAllocationFunc != nil {
   789  		return m.getPreferredAllocationFunc(available, mustInclude, size)
   790  	}
   791  	return nil, nil
   792  }
   793  
   794  func (m *MockEndpoint) allocate(devs []string) (*pluginapi.AllocateResponse, error) {
   795  	if m.allocateFunc != nil {
   796  		return m.allocateFunc(devs)
   797  	}
   798  	return nil, nil
   799  }
   800  
   801  func (m *MockEndpoint) setStopTime(t time.Time) {}
   802  
   803  func (m *MockEndpoint) isStopped() bool { return false }
   804  
   805  func (m *MockEndpoint) stopGracePeriodExpired() bool { return false }
   806  
   807  func makePod(limits v1.ResourceList) *v1.Pod {
   808  	return &v1.Pod{
   809  		ObjectMeta: metav1.ObjectMeta{
   810  			UID: uuid.NewUUID(),
   811  		},
   812  		Spec: v1.PodSpec{
   813  			Containers: []v1.Container{
   814  				{
   815  					Resources: v1.ResourceRequirements{
   816  						Limits: limits,
   817  					},
   818  				},
   819  			},
   820  		},
   821  	}
   822  }
   823  
   824  func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestResource) (*wrappedManagerImpl, error) {
   825  	monitorCallback := func(resourceName string, devices []pluginapi.Device) {}
   826  	ckm, err := checkpointmanager.NewCheckpointManager(tmpDir)
   827  	if err != nil {
   828  		return nil, err
   829  	}
   830  	m := &ManagerImpl{
   831  		healthyDevices:        make(map[string]sets.Set[string]),
   832  		unhealthyDevices:      make(map[string]sets.Set[string]),
   833  		allocatedDevices:      make(map[string]sets.Set[string]),
   834  		endpoints:             make(map[string]endpointInfo),
   835  		podDevices:            newPodDevices(),
   836  		devicesToReuse:        make(PodReusableDevices),
   837  		topologyAffinityStore: topologymanager.NewFakeManager(),
   838  		activePods:            activePods,
   839  		sourcesReady:          &sourcesReadyStub{},
   840  		checkpointManager:     ckm,
   841  		allDevices:            NewResourceDeviceInstances(),
   842  	}
   843  	testManager := &wrappedManagerImpl{
   844  		ManagerImpl: m,
   845  		socketdir:   tmpDir,
   846  		callback:    monitorCallback,
   847  	}
   848  
   849  	for _, res := range testRes {
   850  		testManager.healthyDevices[res.resourceName] = sets.New[string](res.devs.Devices().UnsortedList()...)
   851  		if res.resourceName == "domain1.com/resource1" {
   852  			testManager.endpoints[res.resourceName] = endpointInfo{
   853  				e:    &MockEndpoint{allocateFunc: allocateStubFunc()},
   854  				opts: nil,
   855  			}
   856  		}
   857  		if res.resourceName == "domain2.com/resource2" {
   858  			testManager.endpoints[res.resourceName] = endpointInfo{
   859  				e: &MockEndpoint{
   860  					allocateFunc: func(devs []string) (*pluginapi.AllocateResponse, error) {
   861  						resp := new(pluginapi.ContainerAllocateResponse)
   862  						resp.Envs = make(map[string]string)
   863  						for _, dev := range devs {
   864  							switch dev {
   865  							case "dev3":
   866  								resp.Envs["key2"] = "val2"
   867  
   868  							case "dev4":
   869  								resp.Envs["key2"] = "val3"
   870  							}
   871  						}
   872  						resps := new(pluginapi.AllocateResponse)
   873  						resps.ContainerResponses = append(resps.ContainerResponses, resp)
   874  						return resps, nil
   875  					},
   876  				},
   877  				opts: nil,
   878  			}
   879  		}
   880  		testManager.allDevices[res.resourceName] = makeDevice(res.devs, res.topology)
   881  
   882  	}
   883  	return testManager, nil
   884  }
   885  
   886  type TestResource struct {
   887  	resourceName     string
   888  	resourceQuantity resource.Quantity
   889  	devs             checkpoint.DevicesPerNUMA
   890  	topology         bool
   891  }
   892  
   893  func TestFilterByAffinity(t *testing.T) {
   894  	as := require.New(t)
   895  	allDevices := ResourceDeviceInstances{
   896  		"res1": map[string]pluginapi.Device{
   897  			"dev1": {
   898  				ID: "dev1",
   899  				Topology: &pluginapi.TopologyInfo{
   900  					Nodes: []*pluginapi.NUMANode{
   901  						{
   902  							ID: 1,
   903  						},
   904  					},
   905  				},
   906  			},
   907  			"dev2": {
   908  				ID: "dev2",
   909  				Topology: &pluginapi.TopologyInfo{
   910  					Nodes: []*pluginapi.NUMANode{
   911  						{
   912  							ID: 1,
   913  						},
   914  						{
   915  							ID: 2,
   916  						},
   917  					},
   918  				},
   919  			},
   920  			"dev3": {
   921  				ID: "dev3",
   922  				Topology: &pluginapi.TopologyInfo{
   923  					Nodes: []*pluginapi.NUMANode{
   924  						{
   925  							ID: 2,
   926  						},
   927  					},
   928  				},
   929  			},
   930  			"dev4": {
   931  				ID: "dev4",
   932  				Topology: &pluginapi.TopologyInfo{
   933  					Nodes: []*pluginapi.NUMANode{
   934  						{
   935  							ID: 2,
   936  						},
   937  					},
   938  				},
   939  			},
   940  			"devwithouttopology": {
   941  				ID: "dev5",
   942  			},
   943  		},
   944  	}
   945  
   946  	fakeAffinity, _ := bitmask.NewBitMask(2)
   947  	fakeHint := topologymanager.TopologyHint{
   948  		NUMANodeAffinity: fakeAffinity,
   949  		Preferred:        true,
   950  	}
   951  	testManager := ManagerImpl{
   952  		topologyAffinityStore: topologymanager.NewFakeManagerWithHint(&fakeHint),
   953  		allDevices:            allDevices,
   954  	}
   955  
   956  	testCases := []struct {
   957  		available               sets.Set[string]
   958  		fromAffinityExpected    sets.Set[string]
   959  		notFromAffinityExpected sets.Set[string]
   960  		withoutTopologyExpected sets.Set[string]
   961  	}{
   962  		{
   963  			available:               sets.New[string]("dev1", "dev2"),
   964  			fromAffinityExpected:    sets.New[string]("dev2"),
   965  			notFromAffinityExpected: sets.New[string]("dev1"),
   966  			withoutTopologyExpected: sets.New[string](),
   967  		},
   968  		{
   969  			available:               sets.New[string]("dev1", "dev2", "dev3", "dev4"),
   970  			fromAffinityExpected:    sets.New[string]("dev2", "dev3", "dev4"),
   971  			notFromAffinityExpected: sets.New[string]("dev1"),
   972  			withoutTopologyExpected: sets.New[string](),
   973  		},
   974  	}
   975  
   976  	for _, testCase := range testCases {
   977  		fromAffinity, notFromAffinity, withoutTopology := testManager.filterByAffinity("", "", "res1", testCase.available)
   978  		as.Truef(fromAffinity.Equal(testCase.fromAffinityExpected), "expect devices from affinity to be %v but got %v", testCase.fromAffinityExpected, fromAffinity)
   979  		as.Truef(notFromAffinity.Equal(testCase.notFromAffinityExpected), "expect devices not from affinity to be %v but got %v", testCase.notFromAffinityExpected, notFromAffinity)
   980  		as.Truef(withoutTopology.Equal(testCase.withoutTopologyExpected), "expect devices without topology to be %v but got %v", testCase.notFromAffinityExpected, notFromAffinity)
   981  	}
   982  }
   983  
   984  func TestPodContainerDeviceAllocation(t *testing.T) {
   985  	res1 := TestResource{
   986  		resourceName:     "domain1.com/resource1",
   987  		resourceQuantity: *resource.NewQuantity(int64(2), resource.DecimalSI),
   988  		devs:             checkpoint.DevicesPerNUMA{0: []string{"dev1", "dev2"}},
   989  		topology:         true,
   990  	}
   991  	res2 := TestResource{
   992  		resourceName:     "domain2.com/resource2",
   993  		resourceQuantity: *resource.NewQuantity(int64(1), resource.DecimalSI),
   994  		devs:             checkpoint.DevicesPerNUMA{0: []string{"dev3", "dev4"}},
   995  		topology:         false,
   996  	}
   997  	testResources := make([]TestResource, 2)
   998  	testResources = append(testResources, res1)
   999  	testResources = append(testResources, res2)
  1000  	as := require.New(t)
  1001  	podsStub := activePodsStub{
  1002  		activePods: []*v1.Pod{},
  1003  	}
  1004  	tmpDir, err := os.MkdirTemp("", "checkpoint")
  1005  	as.Nil(err)
  1006  	defer os.RemoveAll(tmpDir)
  1007  	testManager, err := getTestManager(tmpDir, podsStub.getActivePods, testResources)
  1008  	as.Nil(err)
  1009  
  1010  	testPods := []*v1.Pod{
  1011  		makePod(v1.ResourceList{
  1012  			v1.ResourceName(res1.resourceName): res1.resourceQuantity,
  1013  			v1.ResourceName("cpu"):             res1.resourceQuantity,
  1014  			v1.ResourceName(res2.resourceName): res2.resourceQuantity}),
  1015  		makePod(v1.ResourceList{
  1016  			v1.ResourceName(res1.resourceName): res2.resourceQuantity}),
  1017  		makePod(v1.ResourceList{
  1018  			v1.ResourceName(res2.resourceName): res2.resourceQuantity}),
  1019  	}
  1020  	testCases := []struct {
  1021  		description               string
  1022  		testPod                   *v1.Pod
  1023  		expectedContainerOptsLen  []int
  1024  		expectedAllocatedResName1 int
  1025  		expectedAllocatedResName2 int
  1026  		expErr                    error
  1027  	}{
  1028  		{
  1029  			description:               "Successful allocation of two Res1 resources and one Res2 resource",
  1030  			testPod:                   testPods[0],
  1031  			expectedContainerOptsLen:  []int{3, 2, 2},
  1032  			expectedAllocatedResName1: 2,
  1033  			expectedAllocatedResName2: 1,
  1034  			expErr:                    nil,
  1035  		},
  1036  		{
  1037  			description:               "Requesting to create a pod without enough resources should fail",
  1038  			testPod:                   testPods[1],
  1039  			expectedContainerOptsLen:  nil,
  1040  			expectedAllocatedResName1: 2,
  1041  			expectedAllocatedResName2: 1,
  1042  			expErr:                    fmt.Errorf("requested number of devices unavailable for domain1.com/resource1. Requested: 1, Available: 0"),
  1043  		},
  1044  		{
  1045  			description:               "Successful allocation of all available Res1 resources and Res2 resources",
  1046  			testPod:                   testPods[2],
  1047  			expectedContainerOptsLen:  []int{0, 0, 1},
  1048  			expectedAllocatedResName1: 2,
  1049  			expectedAllocatedResName2: 2,
  1050  			expErr:                    nil,
  1051  		},
  1052  	}
  1053  	activePods := []*v1.Pod{}
  1054  	for _, testCase := range testCases {
  1055  		pod := testCase.testPod
  1056  		activePods = append(activePods, pod)
  1057  		podsStub.updateActivePods(activePods)
  1058  		err := testManager.Allocate(pod, &pod.Spec.Containers[0])
  1059  		if !reflect.DeepEqual(err, testCase.expErr) {
  1060  			t.Errorf("DevicePluginManager error (%v). expected error: %v but got: %v",
  1061  				testCase.description, testCase.expErr, err)
  1062  		}
  1063  		runContainerOpts, err := testManager.GetDeviceRunContainerOptions(pod, &pod.Spec.Containers[0])
  1064  		if testCase.expErr == nil {
  1065  			as.Nil(err)
  1066  		}
  1067  		if testCase.expectedContainerOptsLen == nil {
  1068  			as.Nil(runContainerOpts)
  1069  		} else {
  1070  			as.Equal(len(runContainerOpts.Devices), testCase.expectedContainerOptsLen[0])
  1071  			as.Equal(len(runContainerOpts.Mounts), testCase.expectedContainerOptsLen[1])
  1072  			as.Equal(len(runContainerOpts.Envs), testCase.expectedContainerOptsLen[2])
  1073  		}
  1074  		as.Equal(testCase.expectedAllocatedResName1, testManager.allocatedDevices[res1.resourceName].Len())
  1075  		as.Equal(testCase.expectedAllocatedResName2, testManager.allocatedDevices[res2.resourceName].Len())
  1076  	}
  1077  
  1078  }
  1079  
  1080  func TestPodContainerDeviceToAllocate(t *testing.T) {
  1081  	resourceName1 := "domain1.com/resource1"
  1082  	resourceName2 := "domain2.com/resource2"
  1083  	resourceName3 := "domain2.com/resource3"
  1084  	as := require.New(t)
  1085  	tmpDir, err := os.MkdirTemp("", "checkpoint")
  1086  	as.Nil(err)
  1087  	defer os.RemoveAll(tmpDir)
  1088  
  1089  	testManager := &ManagerImpl{
  1090  		endpoints:        make(map[string]endpointInfo),
  1091  		healthyDevices:   make(map[string]sets.Set[string]),
  1092  		unhealthyDevices: make(map[string]sets.Set[string]),
  1093  		allocatedDevices: make(map[string]sets.Set[string]),
  1094  		podDevices:       newPodDevices(),
  1095  		activePods:       func() []*v1.Pod { return []*v1.Pod{} },
  1096  		sourcesReady:     &sourcesReadyStub{},
  1097  	}
  1098  
  1099  	testManager.podDevices.insert("pod1", "con1", resourceName1,
  1100  		constructDevices([]string{"dev1", "dev2"}),
  1101  		newContainerAllocateResponse(
  1102  			withDevices(map[string]string{"/dev/r2dev1": "/dev/r2dev1", "/dev/r2dev2": "/dev/r2dev2"}),
  1103  			withMounts(map[string]string{"/home/r2lib1": "/usr/r2lib1"}),
  1104  			withEnvs(map[string]string{"r2devices": "dev1 dev2"}),
  1105  		),
  1106  	)
  1107  	testManager.podDevices.insert("pod2", "con2", resourceName2,
  1108  		checkpoint.DevicesPerNUMA{nodeWithoutTopology: []string{"dev5"}},
  1109  		newContainerAllocateResponse(
  1110  			withDevices(map[string]string{"/dev/r1dev5": "/dev/r1dev5"}),
  1111  			withMounts(map[string]string{"/home/r1lib1": "/usr/r1lib1"}),
  1112  		),
  1113  	)
  1114  	testManager.podDevices.insert("pod3", "con3", resourceName3,
  1115  		checkpoint.DevicesPerNUMA{nodeWithoutTopology: []string{"dev5"}},
  1116  		newContainerAllocateResponse(
  1117  			withDevices(map[string]string{"/dev/r1dev5": "/dev/r1dev5"}),
  1118  			withMounts(map[string]string{"/home/r1lib1": "/usr/r1lib1"}),
  1119  		),
  1120  	)
  1121  
  1122  	// no healthy devices for resourceName1 and devices corresponding to
  1123  	// resource2 are intentionally omitted to simulate that the resource
  1124  	// hasn't been registered.
  1125  	testManager.healthyDevices[resourceName1] = sets.New[string]()
  1126  	testManager.healthyDevices[resourceName3] = sets.New[string]()
  1127  	// dev5 is no longer in the list of healthy devices
  1128  	testManager.healthyDevices[resourceName3].Insert("dev7")
  1129  	testManager.healthyDevices[resourceName3].Insert("dev8")
  1130  
  1131  	testCases := []struct {
  1132  		description              string
  1133  		podUID                   string
  1134  		contName                 string
  1135  		resource                 string
  1136  		required                 int
  1137  		reusableDevices          sets.Set[string]
  1138  		expectedAllocatedDevices sets.Set[string]
  1139  		expErr                   error
  1140  	}{
  1141  		{
  1142  			description:              "Admission error in case no healthy devices to allocate present",
  1143  			podUID:                   "pod1",
  1144  			contName:                 "con1",
  1145  			resource:                 resourceName1,
  1146  			required:                 2,
  1147  			reusableDevices:          sets.New[string](),
  1148  			expectedAllocatedDevices: nil,
  1149  			expErr:                   fmt.Errorf("no healthy devices present; cannot allocate unhealthy devices %s", resourceName1),
  1150  		},
  1151  		{
  1152  			description:              "Admission error in case resource is not registered",
  1153  			podUID:                   "pod2",
  1154  			contName:                 "con2",
  1155  			resource:                 resourceName2,
  1156  			required:                 1,
  1157  			reusableDevices:          sets.New[string](),
  1158  			expectedAllocatedDevices: nil,
  1159  			expErr:                   fmt.Errorf("cannot allocate unregistered device %s", resourceName2),
  1160  		},
  1161  		{
  1162  			description:              "Admission error in case resource not devices previously allocated no longer healthy",
  1163  			podUID:                   "pod3",
  1164  			contName:                 "con3",
  1165  			resource:                 resourceName3,
  1166  			required:                 1,
  1167  			reusableDevices:          sets.New[string](),
  1168  			expectedAllocatedDevices: nil,
  1169  			expErr:                   fmt.Errorf("previously allocated devices are no longer healthy; cannot allocate unhealthy devices %s", resourceName3),
  1170  		},
  1171  	}
  1172  
  1173  	for _, testCase := range testCases {
  1174  		allocDevices, err := testManager.devicesToAllocate(testCase.podUID, testCase.contName, testCase.resource, testCase.required, testCase.reusableDevices)
  1175  		if !reflect.DeepEqual(err, testCase.expErr) {
  1176  			t.Errorf("devicePluginManager error (%v). expected error: %v but got: %v",
  1177  				testCase.description, testCase.expErr, err)
  1178  		}
  1179  		if !reflect.DeepEqual(allocDevices, testCase.expectedAllocatedDevices) {
  1180  			t.Errorf("devicePluginManager error (%v). expected error: %v but got: %v",
  1181  				testCase.description, testCase.expectedAllocatedDevices, allocDevices)
  1182  		}
  1183  	}
  1184  
  1185  }
  1186  
  1187  func TestDevicesToAllocateConflictWithUpdateAllocatedDevices(t *testing.T) {
  1188  	podToAllocate := "podToAllocate"
  1189  	containerToAllocate := "containerToAllocate"
  1190  	podToRemove := "podToRemove"
  1191  	containerToRemove := "containerToRemove"
  1192  	deviceID := "deviceID"
  1193  	resourceName := "domain1.com/resource"
  1194  
  1195  	socket := filepath.Join(os.TempDir(), esocketName())
  1196  	devs := []*pluginapi.Device{
  1197  		{ID: deviceID, Health: pluginapi.Healthy},
  1198  	}
  1199  	p, e := esetup(t, devs, socket, resourceName, func(n string, d []pluginapi.Device) {})
  1200  
  1201  	waitUpdateAllocatedDevicesChan := make(chan struct{})
  1202  	waitSetGetPreferredAllocChan := make(chan struct{})
  1203  
  1204  	p.SetGetPreferredAllocFunc(func(r *pluginapi.PreferredAllocationRequest, devs map[string]pluginapi.Device) (*pluginapi.PreferredAllocationResponse, error) {
  1205  		waitSetGetPreferredAllocChan <- struct{}{}
  1206  		<-waitUpdateAllocatedDevicesChan
  1207  		return &pluginapi.PreferredAllocationResponse{
  1208  			ContainerResponses: []*pluginapi.ContainerPreferredAllocationResponse{
  1209  				{
  1210  					DeviceIDs: []string{deviceID},
  1211  				},
  1212  			},
  1213  		}, nil
  1214  	})
  1215  
  1216  	testManager := &ManagerImpl{
  1217  		endpoints:             make(map[string]endpointInfo),
  1218  		healthyDevices:        make(map[string]sets.Set[string]),
  1219  		unhealthyDevices:      make(map[string]sets.Set[string]),
  1220  		allocatedDevices:      make(map[string]sets.Set[string]),
  1221  		podDevices:            newPodDevices(),
  1222  		activePods:            func() []*v1.Pod { return []*v1.Pod{} },
  1223  		sourcesReady:          &sourcesReadyStub{},
  1224  		topologyAffinityStore: topologymanager.NewFakeManager(),
  1225  	}
  1226  
  1227  	testManager.endpoints[resourceName] = endpointInfo{
  1228  		e: e,
  1229  		opts: &pluginapi.DevicePluginOptions{
  1230  			GetPreferredAllocationAvailable: true,
  1231  		},
  1232  	}
  1233  	testManager.healthyDevices[resourceName] = sets.New[string](deviceID)
  1234  	testManager.podDevices.insert(podToRemove, containerToRemove, resourceName, nil, nil)
  1235  
  1236  	go func() {
  1237  		<-waitSetGetPreferredAllocChan
  1238  		testManager.UpdateAllocatedDevices()
  1239  		waitUpdateAllocatedDevicesChan <- struct{}{}
  1240  	}()
  1241  
  1242  	set, err := testManager.devicesToAllocate(podToAllocate, containerToAllocate, resourceName, 1, sets.New[string]())
  1243  	assert.NoError(t, err)
  1244  	assert.Equal(t, set, sets.New[string](deviceID))
  1245  }
  1246  
  1247  func TestGetDeviceRunContainerOptions(t *testing.T) {
  1248  	res1 := TestResource{
  1249  		resourceName:     "domain1.com/resource1",
  1250  		resourceQuantity: *resource.NewQuantity(int64(2), resource.DecimalSI),
  1251  		devs:             checkpoint.DevicesPerNUMA{0: []string{"dev1", "dev2"}},
  1252  		topology:         true,
  1253  	}
  1254  	res2 := TestResource{
  1255  		resourceName:     "domain2.com/resource2",
  1256  		resourceQuantity: *resource.NewQuantity(int64(1), resource.DecimalSI),
  1257  		devs:             checkpoint.DevicesPerNUMA{0: []string{"dev3", "dev4"}},
  1258  		topology:         false,
  1259  	}
  1260  
  1261  	testResources := make([]TestResource, 2)
  1262  	testResources = append(testResources, res1)
  1263  	testResources = append(testResources, res2)
  1264  
  1265  	podsStub := activePodsStub{
  1266  		activePods: []*v1.Pod{},
  1267  	}
  1268  	as := require.New(t)
  1269  
  1270  	tmpDir, err := os.MkdirTemp("", "checkpoint")
  1271  	as.Nil(err)
  1272  	defer os.RemoveAll(tmpDir)
  1273  
  1274  	testManager, err := getTestManager(tmpDir, podsStub.getActivePods, testResources)
  1275  	as.Nil(err)
  1276  
  1277  	pod1 := makePod(v1.ResourceList{
  1278  		v1.ResourceName(res1.resourceName): res1.resourceQuantity,
  1279  		v1.ResourceName(res2.resourceName): res2.resourceQuantity,
  1280  	})
  1281  	pod2 := makePod(v1.ResourceList{
  1282  		v1.ResourceName(res2.resourceName): res2.resourceQuantity,
  1283  	})
  1284  
  1285  	activePods := []*v1.Pod{pod1, pod2}
  1286  	podsStub.updateActivePods(activePods)
  1287  
  1288  	err = testManager.Allocate(pod1, &pod1.Spec.Containers[0])
  1289  	as.Nil(err)
  1290  	err = testManager.Allocate(pod2, &pod2.Spec.Containers[0])
  1291  	as.Nil(err)
  1292  
  1293  	// when pod is in activePods, GetDeviceRunContainerOptions should return
  1294  	runContainerOpts, err := testManager.GetDeviceRunContainerOptions(pod1, &pod1.Spec.Containers[0])
  1295  	as.Nil(err)
  1296  	as.Equal(len(runContainerOpts.Devices), 3)
  1297  	as.Equal(len(runContainerOpts.Mounts), 2)
  1298  	as.Equal(len(runContainerOpts.Envs), 2)
  1299  
  1300  	activePods = []*v1.Pod{pod2}
  1301  	podsStub.updateActivePods(activePods)
  1302  	testManager.UpdateAllocatedDevices()
  1303  
  1304  	// when pod is removed from activePods,G etDeviceRunContainerOptions should return error
  1305  	runContainerOpts, err = testManager.GetDeviceRunContainerOptions(pod1, &pod1.Spec.Containers[0])
  1306  	as.Nil(err)
  1307  	as.Nil(runContainerOpts)
  1308  }
  1309  
  1310  func TestInitContainerDeviceAllocation(t *testing.T) {
  1311  	// Requesting to create a pod that requests resourceName1 in init containers and normal containers
  1312  	// should succeed with devices allocated to init containers reallocated to normal containers.
  1313  	res1 := TestResource{
  1314  		resourceName:     "domain1.com/resource1",
  1315  		resourceQuantity: *resource.NewQuantity(int64(2), resource.DecimalSI),
  1316  		devs:             checkpoint.DevicesPerNUMA{0: []string{"dev1", "dev2"}},
  1317  		topology:         false,
  1318  	}
  1319  	res2 := TestResource{
  1320  		resourceName:     "domain2.com/resource2",
  1321  		resourceQuantity: *resource.NewQuantity(int64(1), resource.DecimalSI),
  1322  		devs:             checkpoint.DevicesPerNUMA{0: []string{"dev3", "dev4"}},
  1323  		topology:         true,
  1324  	}
  1325  	testResources := make([]TestResource, 2)
  1326  	testResources = append(testResources, res1)
  1327  	testResources = append(testResources, res2)
  1328  	as := require.New(t)
  1329  	podsStub := activePodsStub{
  1330  		activePods: []*v1.Pod{},
  1331  	}
  1332  	tmpDir, err := os.MkdirTemp("", "checkpoint")
  1333  	as.Nil(err)
  1334  	defer os.RemoveAll(tmpDir)
  1335  
  1336  	testManager, err := getTestManager(tmpDir, podsStub.getActivePods, testResources)
  1337  	as.Nil(err)
  1338  
  1339  	podWithPluginResourcesInInitContainers := &v1.Pod{
  1340  		ObjectMeta: metav1.ObjectMeta{
  1341  			UID: uuid.NewUUID(),
  1342  		},
  1343  		Spec: v1.PodSpec{
  1344  			InitContainers: []v1.Container{
  1345  				{
  1346  					Name: string(uuid.NewUUID()),
  1347  					Resources: v1.ResourceRequirements{
  1348  						Limits: v1.ResourceList{
  1349  							v1.ResourceName(res1.resourceName): res2.resourceQuantity,
  1350  						},
  1351  					},
  1352  				},
  1353  				{
  1354  					Name: string(uuid.NewUUID()),
  1355  					Resources: v1.ResourceRequirements{
  1356  						Limits: v1.ResourceList{
  1357  							v1.ResourceName(res1.resourceName): res1.resourceQuantity,
  1358  						},
  1359  					},
  1360  				},
  1361  			},
  1362  			Containers: []v1.Container{
  1363  				{
  1364  					Name: string(uuid.NewUUID()),
  1365  					Resources: v1.ResourceRequirements{
  1366  						Limits: v1.ResourceList{
  1367  							v1.ResourceName(res1.resourceName): res2.resourceQuantity,
  1368  							v1.ResourceName(res2.resourceName): res2.resourceQuantity,
  1369  						},
  1370  					},
  1371  				},
  1372  				{
  1373  					Name: string(uuid.NewUUID()),
  1374  					Resources: v1.ResourceRequirements{
  1375  						Limits: v1.ResourceList{
  1376  							v1.ResourceName(res1.resourceName): res2.resourceQuantity,
  1377  							v1.ResourceName(res2.resourceName): res2.resourceQuantity,
  1378  						},
  1379  					},
  1380  				},
  1381  			},
  1382  		},
  1383  	}
  1384  	podsStub.updateActivePods([]*v1.Pod{podWithPluginResourcesInInitContainers})
  1385  	for _, container := range podWithPluginResourcesInInitContainers.Spec.InitContainers {
  1386  		err = testManager.Allocate(podWithPluginResourcesInInitContainers, &container)
  1387  	}
  1388  	for _, container := range podWithPluginResourcesInInitContainers.Spec.Containers {
  1389  		err = testManager.Allocate(podWithPluginResourcesInInitContainers, &container)
  1390  	}
  1391  	as.Nil(err)
  1392  	podUID := string(podWithPluginResourcesInInitContainers.UID)
  1393  	initCont1 := podWithPluginResourcesInInitContainers.Spec.InitContainers[0].Name
  1394  	initCont2 := podWithPluginResourcesInInitContainers.Spec.InitContainers[1].Name
  1395  	normalCont1 := podWithPluginResourcesInInitContainers.Spec.Containers[0].Name
  1396  	normalCont2 := podWithPluginResourcesInInitContainers.Spec.Containers[1].Name
  1397  	initCont1Devices := testManager.podDevices.containerDevices(podUID, initCont1, res1.resourceName)
  1398  	initCont2Devices := testManager.podDevices.containerDevices(podUID, initCont2, res1.resourceName)
  1399  	normalCont1Devices := testManager.podDevices.containerDevices(podUID, normalCont1, res1.resourceName)
  1400  	normalCont2Devices := testManager.podDevices.containerDevices(podUID, normalCont2, res1.resourceName)
  1401  	as.Equal(1, initCont1Devices.Len())
  1402  	as.Equal(2, initCont2Devices.Len())
  1403  	as.Equal(1, normalCont1Devices.Len())
  1404  	as.Equal(1, normalCont2Devices.Len())
  1405  	as.True(initCont2Devices.IsSuperset(initCont1Devices))
  1406  	as.True(initCont2Devices.IsSuperset(normalCont1Devices))
  1407  	as.True(initCont2Devices.IsSuperset(normalCont2Devices))
  1408  	as.Equal(0, normalCont1Devices.Intersection(normalCont2Devices).Len())
  1409  }
  1410  
  1411  func TestRestartableInitContainerDeviceAllocation(t *testing.T) {
  1412  	// Requesting to create a pod that requests resourceName1 in restartable
  1413  	// init containers and normal containers should succeed with devices
  1414  	// allocated to init containers not reallocated to normal containers.
  1415  	oneDevice := resource.NewQuantity(int64(1), resource.DecimalSI)
  1416  	twoDevice := resource.NewQuantity(int64(2), resource.DecimalSI)
  1417  	threeDevice := resource.NewQuantity(int64(3), resource.DecimalSI)
  1418  	res1 := TestResource{
  1419  		resourceName:     "domain1.com/resource1",
  1420  		resourceQuantity: *resource.NewQuantity(int64(6), resource.DecimalSI),
  1421  		devs: checkpoint.DevicesPerNUMA{
  1422  			0: []string{"dev1", "dev2", "dev3", "dev4", "dev5", "dev6"},
  1423  		},
  1424  		topology: false,
  1425  	}
  1426  	testResources := []TestResource{
  1427  		res1,
  1428  	}
  1429  	as := require.New(t)
  1430  	podsStub := activePodsStub{
  1431  		activePods: []*v1.Pod{},
  1432  	}
  1433  	tmpDir, err := os.MkdirTemp("", "checkpoint")
  1434  	as.Nil(err)
  1435  	defer os.RemoveAll(tmpDir)
  1436  
  1437  	testManager, err := getTestManager(tmpDir, podsStub.getActivePods, testResources)
  1438  	as.Nil(err)
  1439  
  1440  	containerRestartPolicyAlways := v1.ContainerRestartPolicyAlways
  1441  	podWithPluginResourcesInRestartableInitContainers := &v1.Pod{
  1442  		ObjectMeta: metav1.ObjectMeta{
  1443  			UID: uuid.NewUUID(),
  1444  		},
  1445  		Spec: v1.PodSpec{
  1446  			InitContainers: []v1.Container{
  1447  				{
  1448  					Name: string(uuid.NewUUID()),
  1449  					Resources: v1.ResourceRequirements{
  1450  						Limits: v1.ResourceList{
  1451  							v1.ResourceName(res1.resourceName): *threeDevice,
  1452  						},
  1453  					},
  1454  				},
  1455  				{
  1456  					Name: string(uuid.NewUUID()),
  1457  					Resources: v1.ResourceRequirements{
  1458  						Limits: v1.ResourceList{
  1459  							v1.ResourceName(res1.resourceName): *oneDevice,
  1460  						},
  1461  					},
  1462  					RestartPolicy: &containerRestartPolicyAlways,
  1463  				},
  1464  				{
  1465  					Name: string(uuid.NewUUID()),
  1466  					Resources: v1.ResourceRequirements{
  1467  						Limits: v1.ResourceList{
  1468  							v1.ResourceName(res1.resourceName): *twoDevice,
  1469  						},
  1470  					},
  1471  					RestartPolicy: &containerRestartPolicyAlways,
  1472  				},
  1473  			},
  1474  			Containers: []v1.Container{
  1475  				{
  1476  					Name: string(uuid.NewUUID()),
  1477  					Resources: v1.ResourceRequirements{
  1478  						Limits: v1.ResourceList{
  1479  							v1.ResourceName(res1.resourceName): *oneDevice,
  1480  						},
  1481  					},
  1482  				},
  1483  				{
  1484  					Name: string(uuid.NewUUID()),
  1485  					Resources: v1.ResourceRequirements{
  1486  						Limits: v1.ResourceList{
  1487  							v1.ResourceName(res1.resourceName): *twoDevice,
  1488  						},
  1489  					},
  1490  				},
  1491  			},
  1492  		},
  1493  	}
  1494  	podsStub.updateActivePods([]*v1.Pod{podWithPluginResourcesInRestartableInitContainers})
  1495  	for _, container := range podWithPluginResourcesInRestartableInitContainers.Spec.InitContainers {
  1496  		err = testManager.Allocate(podWithPluginResourcesInRestartableInitContainers, &container)
  1497  	}
  1498  	for _, container := range podWithPluginResourcesInRestartableInitContainers.Spec.Containers {
  1499  		err = testManager.Allocate(podWithPluginResourcesInRestartableInitContainers, &container)
  1500  	}
  1501  	as.Nil(err)
  1502  	podUID := string(podWithPluginResourcesInRestartableInitContainers.UID)
  1503  	regularInitCont1 := podWithPluginResourcesInRestartableInitContainers.Spec.InitContainers[0].Name
  1504  	restartableInitCont2 := podWithPluginResourcesInRestartableInitContainers.Spec.InitContainers[1].Name
  1505  	restartableInitCont3 := podWithPluginResourcesInRestartableInitContainers.Spec.InitContainers[2].Name
  1506  	normalCont1 := podWithPluginResourcesInRestartableInitContainers.Spec.Containers[0].Name
  1507  	normalCont2 := podWithPluginResourcesInRestartableInitContainers.Spec.Containers[1].Name
  1508  	regularInitCont1Devices := testManager.podDevices.containerDevices(podUID, regularInitCont1, res1.resourceName)
  1509  	restartableInitCont2Devices := testManager.podDevices.containerDevices(podUID, restartableInitCont2, res1.resourceName)
  1510  	restartableInitCont3Devices := testManager.podDevices.containerDevices(podUID, restartableInitCont3, res1.resourceName)
  1511  	normalCont1Devices := testManager.podDevices.containerDevices(podUID, normalCont1, res1.resourceName)
  1512  	normalCont2Devices := testManager.podDevices.containerDevices(podUID, normalCont2, res1.resourceName)
  1513  	as.Equal(3, regularInitCont1Devices.Len())
  1514  	as.Equal(1, restartableInitCont2Devices.Len())
  1515  	as.Equal(2, restartableInitCont3Devices.Len())
  1516  	as.Equal(1, normalCont1Devices.Len())
  1517  	as.Equal(2, normalCont2Devices.Len())
  1518  	as.True(regularInitCont1Devices.IsSuperset(restartableInitCont2Devices))
  1519  	as.True(regularInitCont1Devices.IsSuperset(restartableInitCont3Devices))
  1520  	// regularInitCont1Devices are sharable with other containers
  1521  
  1522  	dedicatedContainerDevices := []sets.Set[string]{
  1523  		restartableInitCont2Devices,
  1524  		restartableInitCont3Devices,
  1525  		normalCont1Devices,
  1526  		normalCont2Devices,
  1527  	}
  1528  
  1529  	for i := 0; i < len(dedicatedContainerDevices)-1; i++ {
  1530  		for j := i + 1; j < len(dedicatedContainerDevices); j++ {
  1531  			t.Logf("containerDevices[%d] = %v", i, dedicatedContainerDevices[i])
  1532  			t.Logf("containerDevices[%d] = %v", j, dedicatedContainerDevices[j])
  1533  			as.Empty(dedicatedContainerDevices[i].Intersection(dedicatedContainerDevices[j]))
  1534  		}
  1535  	}
  1536  }
  1537  
  1538  func TestUpdatePluginResources(t *testing.T) {
  1539  	pod := &v1.Pod{}
  1540  	pod.UID = types.UID("testPod")
  1541  
  1542  	resourceName1 := "domain1.com/resource1"
  1543  	devID1 := "dev1"
  1544  
  1545  	resourceName2 := "domain2.com/resource2"
  1546  	devID2 := "dev2"
  1547  
  1548  	as := assert.New(t)
  1549  	monitorCallback := func(resourceName string, devices []pluginapi.Device) {}
  1550  	tmpDir, err := os.MkdirTemp("", "checkpoint")
  1551  	as.Nil(err)
  1552  	defer os.RemoveAll(tmpDir)
  1553  
  1554  	ckm, err := checkpointmanager.NewCheckpointManager(tmpDir)
  1555  	as.Nil(err)
  1556  	m := &ManagerImpl{
  1557  		allocatedDevices:  make(map[string]sets.Set[string]),
  1558  		healthyDevices:    make(map[string]sets.Set[string]),
  1559  		podDevices:        newPodDevices(),
  1560  		checkpointManager: ckm,
  1561  	}
  1562  	testManager := wrappedManagerImpl{
  1563  		ManagerImpl: m,
  1564  		callback:    monitorCallback,
  1565  	}
  1566  	testManager.podDevices.devs[string(pod.UID)] = make(containerDevices)
  1567  
  1568  	// require one of resource1 and one of resource2
  1569  	testManager.allocatedDevices[resourceName1] = sets.New[string]()
  1570  	testManager.allocatedDevices[resourceName1].Insert(devID1)
  1571  	testManager.allocatedDevices[resourceName2] = sets.New[string]()
  1572  	testManager.allocatedDevices[resourceName2].Insert(devID2)
  1573  
  1574  	cachedNode := &v1.Node{
  1575  		Status: v1.NodeStatus{
  1576  			Allocatable: v1.ResourceList{
  1577  				// has no resource1 and two of resource2
  1578  				v1.ResourceName(resourceName2): *resource.NewQuantity(int64(2), resource.DecimalSI),
  1579  			},
  1580  		},
  1581  	}
  1582  	nodeInfo := &schedulerframework.NodeInfo{}
  1583  	nodeInfo.SetNode(cachedNode)
  1584  
  1585  	testManager.UpdatePluginResources(nodeInfo, &lifecycle.PodAdmitAttributes{Pod: pod})
  1586  
  1587  	allocatableScalarResources := nodeInfo.Allocatable.ScalarResources
  1588  	// allocatable in nodeInfo is less than needed, should update
  1589  	as.Equal(1, int(allocatableScalarResources[v1.ResourceName(resourceName1)]))
  1590  	// allocatable in nodeInfo is more than needed, should skip updating
  1591  	as.Equal(2, int(allocatableScalarResources[v1.ResourceName(resourceName2)]))
  1592  }
  1593  
  1594  func TestDevicePreStartContainer(t *testing.T) {
  1595  	// Ensures that if device manager is indicated to invoke `PreStartContainer` RPC
  1596  	// by device plugin, then device manager invokes PreStartContainer at endpoint interface.
  1597  	// Also verifies that final allocation of mounts, envs etc is same as expected.
  1598  	res1 := TestResource{
  1599  		resourceName:     "domain1.com/resource1",
  1600  		resourceQuantity: *resource.NewQuantity(int64(2), resource.DecimalSI),
  1601  		devs:             checkpoint.DevicesPerNUMA{0: []string{"dev1", "dev2"}},
  1602  		topology:         false,
  1603  	}
  1604  	as := require.New(t)
  1605  	podsStub := activePodsStub{
  1606  		activePods: []*v1.Pod{},
  1607  	}
  1608  	tmpDir, err := os.MkdirTemp("", "checkpoint")
  1609  	as.Nil(err)
  1610  	defer os.RemoveAll(tmpDir)
  1611  
  1612  	testManager, err := getTestManager(tmpDir, podsStub.getActivePods, []TestResource{res1})
  1613  	as.Nil(err)
  1614  
  1615  	ch := make(chan []string, 1)
  1616  	testManager.endpoints[res1.resourceName] = endpointInfo{
  1617  		e: &MockEndpoint{
  1618  			initChan:     ch,
  1619  			allocateFunc: allocateStubFunc(),
  1620  		},
  1621  		opts: &pluginapi.DevicePluginOptions{PreStartRequired: true},
  1622  	}
  1623  	pod := makePod(v1.ResourceList{
  1624  		v1.ResourceName(res1.resourceName): res1.resourceQuantity})
  1625  	activePods := []*v1.Pod{}
  1626  	activePods = append(activePods, pod)
  1627  	podsStub.updateActivePods(activePods)
  1628  	err = testManager.Allocate(pod, &pod.Spec.Containers[0])
  1629  	as.Nil(err)
  1630  	runContainerOpts, err := testManager.GetDeviceRunContainerOptions(pod, &pod.Spec.Containers[0])
  1631  	as.Nil(err)
  1632  	var initializedDevs []string
  1633  	select {
  1634  	case <-time.After(time.Second):
  1635  		t.Fatalf("Timed out while waiting on channel for response from PreStartContainer RPC stub")
  1636  	case initializedDevs = <-ch:
  1637  		break
  1638  	}
  1639  
  1640  	as.Contains(initializedDevs, "dev1")
  1641  	as.Contains(initializedDevs, "dev2")
  1642  	as.Equal(len(initializedDevs), res1.devs.Devices().Len())
  1643  
  1644  	expectedResps, err := allocateStubFunc()([]string{"dev1", "dev2"})
  1645  	as.Nil(err)
  1646  	as.Equal(1, len(expectedResps.ContainerResponses))
  1647  	expectedResp := expectedResps.ContainerResponses[0]
  1648  	as.Equal(len(runContainerOpts.Devices), len(expectedResp.Devices))
  1649  	as.Equal(len(runContainerOpts.Mounts), len(expectedResp.Mounts))
  1650  	as.Equal(len(runContainerOpts.Envs), len(expectedResp.Envs))
  1651  
  1652  	pod2 := makePod(v1.ResourceList{
  1653  		v1.ResourceName(res1.resourceName): *resource.NewQuantity(int64(0), resource.DecimalSI)})
  1654  	activePods = append(activePods, pod2)
  1655  	podsStub.updateActivePods(activePods)
  1656  	err = testManager.Allocate(pod2, &pod2.Spec.Containers[0])
  1657  	as.Nil(err)
  1658  	_, err = testManager.GetDeviceRunContainerOptions(pod2, &pod2.Spec.Containers[0])
  1659  	as.Nil(err)
  1660  	select {
  1661  	case <-time.After(time.Millisecond):
  1662  		t.Log("When pod resourceQuantity is 0,  PreStartContainer RPC stub will be skipped")
  1663  	case <-ch:
  1664  		break
  1665  	}
  1666  }
  1667  
  1668  func TestResetExtendedResource(t *testing.T) {
  1669  	as := assert.New(t)
  1670  	tmpDir, err := os.MkdirTemp("", "checkpoint")
  1671  	as.Nil(err)
  1672  	defer os.RemoveAll(tmpDir)
  1673  	ckm, err := checkpointmanager.NewCheckpointManager(tmpDir)
  1674  	as.Nil(err)
  1675  	testManager := &ManagerImpl{
  1676  		endpoints:         make(map[string]endpointInfo),
  1677  		healthyDevices:    make(map[string]sets.Set[string]),
  1678  		unhealthyDevices:  make(map[string]sets.Set[string]),
  1679  		allocatedDevices:  make(map[string]sets.Set[string]),
  1680  		podDevices:        newPodDevices(),
  1681  		checkpointManager: ckm,
  1682  	}
  1683  
  1684  	extendedResourceName := "domain.com/resource"
  1685  	testManager.podDevices.insert("pod", "con", extendedResourceName,
  1686  		constructDevices([]string{"dev1"}),
  1687  		newContainerAllocateResponse(
  1688  			withDevices(map[string]string{"/dev/dev1": "/dev/dev1"}),
  1689  			withMounts(map[string]string{"/home/lib1": "/usr/lib1"}),
  1690  		),
  1691  	)
  1692  
  1693  	testManager.healthyDevices[extendedResourceName] = sets.New[string]()
  1694  	testManager.healthyDevices[extendedResourceName].Insert("dev1")
  1695  	// checkpoint is present, indicating node hasn't been recreated
  1696  	err = testManager.writeCheckpoint()
  1697  	as.Nil(err)
  1698  
  1699  	as.False(testManager.ShouldResetExtendedResourceCapacity())
  1700  
  1701  	// checkpoint is absent, representing node recreation
  1702  	ckpts, err := ckm.ListCheckpoints()
  1703  	as.Nil(err)
  1704  	for _, ckpt := range ckpts {
  1705  		err = ckm.RemoveCheckpoint(ckpt)
  1706  		as.Nil(err)
  1707  	}
  1708  	as.True(testManager.ShouldResetExtendedResourceCapacity())
  1709  }
  1710  
  1711  func allocateStubFunc() func(devs []string) (*pluginapi.AllocateResponse, error) {
  1712  	return func(devs []string) (*pluginapi.AllocateResponse, error) {
  1713  		resp := new(pluginapi.ContainerAllocateResponse)
  1714  		resp.Envs = make(map[string]string)
  1715  		for _, dev := range devs {
  1716  			switch dev {
  1717  			case "dev1":
  1718  				resp.Devices = append(resp.Devices, &pluginapi.DeviceSpec{
  1719  					ContainerPath: "/dev/aaa",
  1720  					HostPath:      "/dev/aaa",
  1721  					Permissions:   "mrw",
  1722  				})
  1723  
  1724  				resp.Devices = append(resp.Devices, &pluginapi.DeviceSpec{
  1725  					ContainerPath: "/dev/bbb",
  1726  					HostPath:      "/dev/bbb",
  1727  					Permissions:   "mrw",
  1728  				})
  1729  
  1730  				resp.Mounts = append(resp.Mounts, &pluginapi.Mount{
  1731  					ContainerPath: "/container_dir1/file1",
  1732  					HostPath:      "host_dir1/file1",
  1733  					ReadOnly:      true,
  1734  				})
  1735  
  1736  			case "dev2":
  1737  				resp.Devices = append(resp.Devices, &pluginapi.DeviceSpec{
  1738  					ContainerPath: "/dev/ccc",
  1739  					HostPath:      "/dev/ccc",
  1740  					Permissions:   "mrw",
  1741  				})
  1742  
  1743  				resp.Mounts = append(resp.Mounts, &pluginapi.Mount{
  1744  					ContainerPath: "/container_dir1/file2",
  1745  					HostPath:      "host_dir1/file2",
  1746  					ReadOnly:      true,
  1747  				})
  1748  
  1749  				resp.Envs["key1"] = "val1"
  1750  			}
  1751  		}
  1752  		resps := new(pluginapi.AllocateResponse)
  1753  		resps.ContainerResponses = append(resps.ContainerResponses, resp)
  1754  		return resps, nil
  1755  	}
  1756  }
  1757  
  1758  func makeDevice(devOnNUMA checkpoint.DevicesPerNUMA, topology bool) map[string]pluginapi.Device {
  1759  	res := make(map[string]pluginapi.Device)
  1760  	var topologyInfo *pluginapi.TopologyInfo
  1761  	for node, devs := range devOnNUMA {
  1762  		if topology {
  1763  			topologyInfo = &pluginapi.TopologyInfo{Nodes: []*pluginapi.NUMANode{{ID: node}}}
  1764  		} else {
  1765  			topologyInfo = nil
  1766  		}
  1767  		for idx := range devs {
  1768  			res[devs[idx]] = pluginapi.Device{ID: devs[idx], Topology: topologyInfo}
  1769  		}
  1770  	}
  1771  	return res
  1772  }
  1773  
  1774  const deviceManagerCheckpointFilename = "kubelet_internal_checkpoint"
  1775  
  1776  var oldCheckpoint string = `{"Data":{"PodDeviceEntries":[{"PodUID":"13ac2284-0d19-44b7-b94f-055b032dba9b","ContainerName":"centos","ResourceName":"example.com/deviceA","DeviceIDs":["DevA3"],"AllocResp":"CiIKHUVYQU1QTEVDT01ERVZJQ0VBX0RFVkEzX1RUWTEwEgEwGhwKCi9kZXYvdHR5MTASCi9kZXYvdHR5MTAaAnJ3"},{"PodUID":"86b9a017-c9ca-4069-815f-46ca3e53c1e4","ContainerName":"centos","ResourceName":"example.com/deviceA","DeviceIDs":["DevA4"],"AllocResp":"CiIKHUVYQU1QTEVDT01ERVZJQ0VBX0RFVkE0X1RUWTExEgEwGhwKCi9kZXYvdHR5MTESCi9kZXYvdHR5MTEaAnJ3"}],"RegisteredDevices":{"example.com/deviceA":["DevA1","DevA2","DevA3","DevA4"]}},"Checksum":405612085}`
  1777  
  1778  func TestReadPreNUMACheckpoint(t *testing.T) {
  1779  	socketDir, socketName, _, err := tmpSocketDir()
  1780  	require.NoError(t, err)
  1781  	defer os.RemoveAll(socketDir)
  1782  
  1783  	err = os.WriteFile(filepath.Join(socketDir, deviceManagerCheckpointFilename), []byte(oldCheckpoint), 0644)
  1784  	require.NoError(t, err)
  1785  
  1786  	topologyStore := topologymanager.NewFakeManager()
  1787  	nodes := []cadvisorapi.Node{{Id: 0}}
  1788  	m, err := newManagerImpl(socketName, nodes, topologyStore)
  1789  	require.NoError(t, err)
  1790  
  1791  	// TODO: we should not calling private methods, but among the existing tests we do anyway
  1792  	err = m.readCheckpoint()
  1793  	require.NoError(t, err)
  1794  }
  1795  
  1796  func TestGetTopologyHintsWithUpdates(t *testing.T) {
  1797  	socketDir, socketName, _, err := tmpSocketDir()
  1798  	defer os.RemoveAll(socketDir)
  1799  	require.NoError(t, err)
  1800  
  1801  	devs := []pluginapi.Device{}
  1802  	for i := 0; i < 1000; i++ {
  1803  		devs = append(devs, pluginapi.Device{
  1804  			ID:     fmt.Sprintf("dev-%d", i),
  1805  			Health: pluginapi.Healthy,
  1806  			Topology: &pluginapi.TopologyInfo{
  1807  				Nodes: []*pluginapi.NUMANode{
  1808  					{ID: 0},
  1809  				},
  1810  			}})
  1811  	}
  1812  	testPod := makePod(v1.ResourceList{
  1813  		testResourceName: *resource.NewQuantity(int64(1), resource.DecimalSI),
  1814  	})
  1815  	topology := []cadvisorapi.Node{
  1816  		{Id: 0},
  1817  	}
  1818  	testCases := []struct {
  1819  		description string
  1820  		count       int
  1821  		devices     []pluginapi.Device
  1822  		testfunc    func(manager *wrappedManagerImpl)
  1823  	}{
  1824  		{
  1825  			description: "GetTopologyHints data race when update device",
  1826  			count:       10,
  1827  			devices:     devs,
  1828  			testfunc: func(manager *wrappedManagerImpl) {
  1829  				manager.GetTopologyHints(testPod, &testPod.Spec.Containers[0])
  1830  			},
  1831  		},
  1832  		{
  1833  			description: "GetPodTopologyHints data race when update device",
  1834  			count:       10,
  1835  			devices:     devs,
  1836  			testfunc: func(manager *wrappedManagerImpl) {
  1837  				manager.GetPodTopologyHints(testPod)
  1838  			},
  1839  		},
  1840  	}
  1841  
  1842  	for _, test := range testCases {
  1843  		t.Run(test.description, func(t *testing.T) {
  1844  			m, _ := setupDeviceManager(t, nil, nil, socketName, topology)
  1845  			defer m.Stop()
  1846  			mimpl := m.(*wrappedManagerImpl)
  1847  
  1848  			wg := sync.WaitGroup{}
  1849  			wg.Add(2)
  1850  
  1851  			updated := atomic.Bool{}
  1852  			updated.Store(false)
  1853  			go func() {
  1854  				defer wg.Done()
  1855  				for i := 0; i < test.count; i++ {
  1856  					// simulate the device plugin to send device updates
  1857  					mimpl.genericDeviceUpdateCallback(testResourceName, devs)
  1858  				}
  1859  				updated.Store(true)
  1860  			}()
  1861  			go func() {
  1862  				defer wg.Done()
  1863  				for !updated.Load() {
  1864  					// When a data race occurs, golang will throw an error, and recover() cannot catch this error,
  1865  					// Such as: `throw("Concurrent map iteration and map writing")`.
  1866  					// When this test ends quietly, no data race error occurs.
  1867  					// Otherwise, the test process exits automatically and prints all goroutine call stacks.
  1868  					test.testfunc(mimpl)
  1869  				}
  1870  			}()
  1871  			wg.Wait()
  1872  		})
  1873  	}
  1874  }
  1875  

View as plain text