1
16
17 package devicemanager
18
19 import (
20 "fmt"
21 "os"
22 "path/filepath"
23 "reflect"
24 goruntime "runtime"
25 "sync"
26 "sync/atomic"
27 "testing"
28 "time"
29
30 cadvisorapi "github.com/google/cadvisor/info/v1"
31 "github.com/stretchr/testify/assert"
32 "github.com/stretchr/testify/require"
33 v1 "k8s.io/api/core/v1"
34 "k8s.io/apimachinery/pkg/api/resource"
35 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
36 "k8s.io/apimachinery/pkg/types"
37 "k8s.io/apimachinery/pkg/util/sets"
38 "k8s.io/apimachinery/pkg/util/uuid"
39 "k8s.io/apimachinery/pkg/util/wait"
40 "k8s.io/client-go/tools/record"
41 pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
42 watcherapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
43 "k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
44 "k8s.io/kubernetes/pkg/kubelet/cm/containermap"
45 "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint"
46 plugin "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/plugin/v1beta1"
47 "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
48 "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask"
49 "k8s.io/kubernetes/pkg/kubelet/config"
50 "k8s.io/kubernetes/pkg/kubelet/lifecycle"
51 "k8s.io/kubernetes/pkg/kubelet/pluginmanager"
52 schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
53 )
54
55 const (
56 testResourceName = "fake-domain/resource"
57 )
58
59 func newWrappedManagerImpl(socketPath string, manager *ManagerImpl) *wrappedManagerImpl {
60 w := &wrappedManagerImpl{
61 ManagerImpl: manager,
62 callback: manager.genericDeviceUpdateCallback,
63 }
64 w.socketdir, _ = filepath.Split(socketPath)
65 w.server, _ = plugin.NewServer(socketPath, w, w)
66 return w
67 }
68
69 type wrappedManagerImpl struct {
70 *ManagerImpl
71 socketdir string
72 callback func(string, []pluginapi.Device)
73 }
74
75 func (m *wrappedManagerImpl) PluginListAndWatchReceiver(r string, resp *pluginapi.ListAndWatchResponse) {
76 var devices []pluginapi.Device
77 for _, d := range resp.Devices {
78 devices = append(devices, *d)
79 }
80 m.callback(r, devices)
81 }
82
83 func tmpSocketDir() (socketDir, socketName, pluginSocketName string, err error) {
84 socketDir, err = os.MkdirTemp("", "device_plugin")
85 if err != nil {
86 return
87 }
88 socketName = filepath.Join(socketDir, "server.sock")
89 pluginSocketName = filepath.Join(socketDir, "device-plugin.sock")
90 os.MkdirAll(socketDir, 0755)
91 return
92 }
93
94 func TestNewManagerImpl(t *testing.T) {
95 socketDir, socketName, _, err := tmpSocketDir()
96 topologyStore := topologymanager.NewFakeManager()
97 require.NoError(t, err)
98 defer os.RemoveAll(socketDir)
99 _, err = newManagerImpl(socketName, nil, topologyStore)
100 require.NoError(t, err)
101 os.RemoveAll(socketDir)
102 }
103
104 func TestNewManagerImplStart(t *testing.T) {
105 socketDir, socketName, pluginSocketName, err := tmpSocketDir()
106 require.NoError(t, err)
107 defer os.RemoveAll(socketDir)
108 m, _, p := setup(t, []*pluginapi.Device{}, func(n string, d []pluginapi.Device) {}, socketName, pluginSocketName)
109 cleanup(t, m, p)
110
111 cleanup(t, m, p)
112 }
113
114 func TestNewManagerImplStartProbeMode(t *testing.T) {
115 socketDir, socketName, pluginSocketName, err := tmpSocketDir()
116 require.NoError(t, err)
117 defer os.RemoveAll(socketDir)
118 m, _, p, _ := setupInProbeMode(t, []*pluginapi.Device{}, func(n string, d []pluginapi.Device) {}, socketName, pluginSocketName)
119 cleanup(t, m, p)
120 }
121
122
123
124
125 func TestDevicePluginReRegistration(t *testing.T) {
126
127 if goruntime.GOOS == "windows" {
128 t.Skip("Skipping test on Windows.")
129 }
130 socketDir, socketName, pluginSocketName, err := tmpSocketDir()
131 require.NoError(t, err)
132 defer os.RemoveAll(socketDir)
133 devs := []*pluginapi.Device{
134 {ID: "Dev1", Health: pluginapi.Healthy},
135 {ID: "Dev2", Health: pluginapi.Healthy},
136 }
137 devsForRegistration := []*pluginapi.Device{
138 {ID: "Dev3", Health: pluginapi.Healthy},
139 }
140 for _, preStartContainerFlag := range []bool{false, true} {
141 for _, getPreferredAllocationFlag := range []bool{false, true} {
142 m, ch, p1 := setup(t, devs, nil, socketName, pluginSocketName)
143 p1.Register(socketName, testResourceName, "")
144
145 select {
146 case <-ch:
147 case <-time.After(5 * time.Second):
148 t.Fatalf("timeout while waiting for manager update")
149 }
150 capacity, allocatable, _ := m.GetCapacity()
151 resourceCapacity := capacity[v1.ResourceName(testResourceName)]
152 resourceAllocatable := allocatable[v1.ResourceName(testResourceName)]
153 require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable")
154 require.Equal(t, int64(2), resourceAllocatable.Value(), "Devices are not updated.")
155
156 p2 := plugin.NewDevicePluginStub(devs, pluginSocketName+".new", testResourceName, preStartContainerFlag, getPreferredAllocationFlag)
157 err = p2.Start()
158 require.NoError(t, err)
159 p2.Register(socketName, testResourceName, "")
160
161 select {
162 case <-ch:
163 case <-time.After(5 * time.Second):
164 t.Fatalf("timeout while waiting for manager update")
165 }
166 capacity, allocatable, _ = m.GetCapacity()
167 resourceCapacity = capacity[v1.ResourceName(testResourceName)]
168 resourceAllocatable = allocatable[v1.ResourceName(testResourceName)]
169 require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable")
170 require.Equal(t, int64(2), resourceAllocatable.Value(), "Devices shouldn't change.")
171
172
173 p3 := plugin.NewDevicePluginStub(devsForRegistration, pluginSocketName+".third", testResourceName, preStartContainerFlag, getPreferredAllocationFlag)
174 err = p3.Start()
175 require.NoError(t, err)
176 p3.Register(socketName, testResourceName, "")
177
178 select {
179 case <-ch:
180 case <-time.After(5 * time.Second):
181 t.Fatalf("timeout while waiting for manager update")
182 }
183 capacity, allocatable, _ = m.GetCapacity()
184 resourceCapacity = capacity[v1.ResourceName(testResourceName)]
185 resourceAllocatable = allocatable[v1.ResourceName(testResourceName)]
186 require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable")
187 require.Equal(t, int64(1), resourceAllocatable.Value(), "Devices of plugin previously registered should be removed.")
188 p2.Stop()
189 p3.Stop()
190 cleanup(t, m, p1)
191 }
192 }
193 }
194
195
196
197
198
199
200 func TestDevicePluginReRegistrationProbeMode(t *testing.T) {
201
202 if goruntime.GOOS == "windows" {
203 t.Skip("Skipping test on Windows.")
204 }
205 socketDir, socketName, pluginSocketName, err := tmpSocketDir()
206 require.NoError(t, err)
207 defer os.RemoveAll(socketDir)
208 devs := []*pluginapi.Device{
209 {ID: "Dev1", Health: pluginapi.Healthy},
210 {ID: "Dev2", Health: pluginapi.Healthy},
211 }
212 devsForRegistration := []*pluginapi.Device{
213 {ID: "Dev3", Health: pluginapi.Healthy},
214 }
215
216 m, ch, p1, _ := setupInProbeMode(t, devs, nil, socketName, pluginSocketName)
217
218
219 select {
220 case <-ch:
221 case <-time.After(5 * time.Second):
222 t.FailNow()
223 }
224 capacity, allocatable, _ := m.GetCapacity()
225 resourceCapacity := capacity[v1.ResourceName(testResourceName)]
226 resourceAllocatable := allocatable[v1.ResourceName(testResourceName)]
227 require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable")
228 require.Equal(t, int64(2), resourceAllocatable.Value(), "Devices are not updated.")
229
230 p2 := plugin.NewDevicePluginStub(devs, pluginSocketName+".new", testResourceName, false, false)
231 err = p2.Start()
232 require.NoError(t, err)
233
234 select {
235 case <-ch:
236 case <-time.After(5 * time.Second):
237 t.FailNow()
238 }
239
240 capacity, allocatable, _ = m.GetCapacity()
241 resourceCapacity = capacity[v1.ResourceName(testResourceName)]
242 resourceAllocatable = allocatable[v1.ResourceName(testResourceName)]
243 require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable")
244 require.Equal(t, int64(2), resourceAllocatable.Value(), "Devices are not updated.")
245
246
247 p3 := plugin.NewDevicePluginStub(devsForRegistration, pluginSocketName+".third", testResourceName, false, false)
248 err = p3.Start()
249 require.NoError(t, err)
250
251 select {
252 case <-ch:
253 case <-time.After(5 * time.Second):
254 t.FailNow()
255 }
256
257 capacity, allocatable, _ = m.GetCapacity()
258 resourceCapacity = capacity[v1.ResourceName(testResourceName)]
259 resourceAllocatable = allocatable[v1.ResourceName(testResourceName)]
260 require.Equal(t, resourceCapacity.Value(), resourceAllocatable.Value(), "capacity should equal to allocatable")
261 require.Equal(t, int64(1), resourceAllocatable.Value(), "Devices of previous registered should be removed")
262 p2.Stop()
263 p3.Stop()
264 cleanup(t, m, p1)
265 }
266
267 func setupDeviceManager(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string,
268 topology []cadvisorapi.Node) (Manager, <-chan interface{}) {
269 topologyStore := topologymanager.NewFakeManager()
270 m, err := newManagerImpl(socketName, topology, topologyStore)
271 require.NoError(t, err)
272 updateChan := make(chan interface{})
273
274 w := newWrappedManagerImpl(socketName, m)
275 if callback != nil {
276 w.callback = callback
277 }
278
279 originalCallback := w.callback
280 w.callback = func(resourceName string, devices []pluginapi.Device) {
281 originalCallback(resourceName, devices)
282 updateChan <- new(interface{})
283 }
284 activePods := func() []*v1.Pod {
285 return []*v1.Pod{}
286 }
287
288
289
290 err = w.Start(activePods, &sourcesReadyStub{}, containermap.NewContainerMap(), sets.New[string]())
291 require.NoError(t, err)
292
293 return w, updateChan
294 }
295
296 func setupDevicePlugin(t *testing.T, devs []*pluginapi.Device, pluginSocketName string) *plugin.Stub {
297 p := plugin.NewDevicePluginStub(devs, pluginSocketName, testResourceName, false, false)
298 err := p.Start()
299 require.NoError(t, err)
300 return p
301 }
302
303 func setupPluginManager(t *testing.T, pluginSocketName string, m Manager) pluginmanager.PluginManager {
304 pluginManager := pluginmanager.NewPluginManager(
305 filepath.Dir(pluginSocketName),
306 &record.FakeRecorder{},
307 )
308
309 runPluginManager(pluginManager)
310 pluginManager.AddHandler(watcherapi.DevicePlugin, m.GetWatcherHandler())
311 return pluginManager
312 }
313
314 func runPluginManager(pluginManager pluginmanager.PluginManager) {
315
316 sourcesReady := config.NewSourcesReady(func(_ sets.String) bool { return true })
317 go pluginManager.Run(sourcesReady, wait.NeverStop)
318 }
319
320 func setup(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string) (Manager, <-chan interface{}, *plugin.Stub) {
321 m, updateChan := setupDeviceManager(t, devs, callback, socketName, nil)
322 p := setupDevicePlugin(t, devs, pluginSocketName)
323 return m, updateChan, p
324 }
325
326 func setupInProbeMode(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string) (Manager, <-chan interface{}, *plugin.Stub, pluginmanager.PluginManager) {
327 m, updateChan := setupDeviceManager(t, devs, callback, socketName, nil)
328 p := setupDevicePlugin(t, devs, pluginSocketName)
329 pm := setupPluginManager(t, pluginSocketName, m)
330 return m, updateChan, p, pm
331 }
332
333 func cleanup(t *testing.T, m Manager, p *plugin.Stub) {
334 p.Stop()
335 m.Stop()
336 }
337
338 func TestUpdateCapacityAllocatable(t *testing.T) {
339 socketDir, socketName, _, err := tmpSocketDir()
340 topologyStore := topologymanager.NewFakeManager()
341 require.NoError(t, err)
342 defer os.RemoveAll(socketDir)
343 testManager, err := newManagerImpl(socketName, nil, topologyStore)
344 as := assert.New(t)
345 as.NotNil(testManager)
346 as.Nil(err)
347
348 devs := []pluginapi.Device{
349 {ID: "Device1", Health: pluginapi.Healthy},
350 {ID: "Device2", Health: pluginapi.Healthy},
351 {ID: "Device3", Health: pluginapi.Unhealthy},
352 }
353 callback := testManager.genericDeviceUpdateCallback
354
355
356
357 resourceName1 := "domain1.com/resource1"
358 e1 := &endpointImpl{}
359 testManager.endpoints[resourceName1] = endpointInfo{e: e1, opts: nil}
360 callback(resourceName1, devs)
361 capacity, allocatable, removedResources := testManager.GetCapacity()
362 resource1Capacity, ok := capacity[v1.ResourceName(resourceName1)]
363 as.True(ok)
364 resource1Allocatable, ok := allocatable[v1.ResourceName(resourceName1)]
365 as.True(ok)
366 as.Equal(int64(3), resource1Capacity.Value())
367 as.Equal(int64(2), resource1Allocatable.Value())
368 as.Equal(0, len(removedResources))
369
370
371 devs1 := devs[:len(devs)-1]
372 callback(resourceName1, devs1)
373 capacity, allocatable, removedResources = testManager.GetCapacity()
374 resource1Capacity, ok = capacity[v1.ResourceName(resourceName1)]
375 as.True(ok)
376 resource1Allocatable, ok = allocatable[v1.ResourceName(resourceName1)]
377 as.True(ok)
378 as.Equal(int64(2), resource1Capacity.Value())
379 as.Equal(int64(2), resource1Allocatable.Value())
380 as.Equal(0, len(removedResources))
381
382
383 devs[1].Health = pluginapi.Unhealthy
384 callback(resourceName1, devs)
385 capacity, allocatable, removedResources = testManager.GetCapacity()
386 resource1Capacity, ok = capacity[v1.ResourceName(resourceName1)]
387 as.True(ok)
388 resource1Allocatable, ok = allocatable[v1.ResourceName(resourceName1)]
389 as.True(ok)
390 as.Equal(int64(3), resource1Capacity.Value())
391 as.Equal(int64(1), resource1Allocatable.Value())
392 as.Equal(0, len(removedResources))
393
394
395 devs2 := devs[1:]
396 callback(resourceName1, devs2)
397 capacity, allocatable, removedResources = testManager.GetCapacity()
398 resource1Capacity, ok = capacity[v1.ResourceName(resourceName1)]
399 as.True(ok)
400 resource1Allocatable, ok = allocatable[v1.ResourceName(resourceName1)]
401 as.True(ok)
402 as.Equal(int64(0), resource1Allocatable.Value())
403 as.Equal(int64(2), resource1Capacity.Value())
404 as.Equal(0, len(removedResources))
405
406
407 resourceName2 := "resource2"
408 e2 := &endpointImpl{}
409 e2.client = plugin.NewPluginClient(resourceName2, socketName, testManager)
410 testManager.endpoints[resourceName2] = endpointInfo{e: e2, opts: nil}
411 callback(resourceName2, devs)
412 capacity, allocatable, removedResources = testManager.GetCapacity()
413 as.Equal(2, len(capacity))
414 resource2Capacity, ok := capacity[v1.ResourceName(resourceName2)]
415 as.True(ok)
416 resource2Allocatable, ok := allocatable[v1.ResourceName(resourceName2)]
417 as.True(ok)
418 as.Equal(int64(3), resource2Capacity.Value())
419 as.Equal(int64(1), resource2Allocatable.Value())
420 as.Equal(0, len(removedResources))
421
422
423
424 e1.setStopTime(time.Now().Add(-1*endpointStopGracePeriod - time.Duration(10)*time.Second))
425 capacity, allocatable, removed := testManager.GetCapacity()
426 as.Equal([]string{resourceName1}, removed)
427 as.NotContains(capacity, v1.ResourceName(resourceName1))
428 as.NotContains(allocatable, v1.ResourceName(resourceName1))
429 val, ok := capacity[v1.ResourceName(resourceName2)]
430 as.True(ok)
431 as.Equal(int64(3), val.Value())
432 as.NotContains(testManager.healthyDevices, resourceName1)
433 as.NotContains(testManager.unhealthyDevices, resourceName1)
434 as.NotContains(testManager.endpoints, resourceName1)
435 as.Equal(1, len(testManager.endpoints))
436
437
438
439 e2.client.Disconnect()
440 as.False(e2.stopTime.IsZero())
441 _, err = e2.allocate([]string{"Device1"})
442 reflect.DeepEqual(err, fmt.Errorf(errEndpointStopped, e2))
443 _, err = e2.preStartContainer([]string{"Device1"})
444 reflect.DeepEqual(err, fmt.Errorf(errEndpointStopped, e2))
445
446
447 testManager.markResourceUnhealthy(resourceName2)
448 capacity, allocatable, removed = testManager.GetCapacity()
449 val, ok = capacity[v1.ResourceName(resourceName2)]
450 as.True(ok)
451 as.Equal(int64(3), val.Value())
452 val, ok = allocatable[v1.ResourceName(resourceName2)]
453 as.True(ok)
454 as.Equal(int64(0), val.Value())
455 as.Empty(removed)
456
457
458
459
460
461 err = testManager.writeCheckpoint()
462 as.Nil(err)
463 testManager.healthyDevices = make(map[string]sets.Set[string])
464 testManager.unhealthyDevices = make(map[string]sets.Set[string])
465 err = testManager.readCheckpoint()
466 as.Nil(err)
467 as.Equal(1, len(testManager.endpoints))
468 as.Contains(testManager.endpoints, resourceName2)
469 capacity, allocatable, removed = testManager.GetCapacity()
470 val, ok = capacity[v1.ResourceName(resourceName2)]
471 as.True(ok)
472 as.Equal(int64(0), val.Value())
473 val, ok = allocatable[v1.ResourceName(resourceName2)]
474 as.True(ok)
475 as.Equal(int64(0), val.Value())
476 as.Empty(removed)
477 as.True(testManager.isDevicePluginResource(resourceName2))
478 }
479
480 func TestGetAllocatableDevicesMultipleResources(t *testing.T) {
481 socketDir, socketName, _, err := tmpSocketDir()
482 topologyStore := topologymanager.NewFakeManager()
483 require.NoError(t, err)
484 defer os.RemoveAll(socketDir)
485 testManager, err := newManagerImpl(socketName, nil, topologyStore)
486 as := assert.New(t)
487 as.NotNil(testManager)
488 as.Nil(err)
489
490 resource1Devs := []pluginapi.Device{
491 {ID: "R1Device1", Health: pluginapi.Healthy},
492 {ID: "R1Device2", Health: pluginapi.Healthy},
493 {ID: "R1Device3", Health: pluginapi.Unhealthy},
494 }
495 resourceName1 := "domain1.com/resource1"
496 e1 := &endpointImpl{}
497 testManager.endpoints[resourceName1] = endpointInfo{e: e1, opts: nil}
498 testManager.genericDeviceUpdateCallback(resourceName1, resource1Devs)
499
500 resource2Devs := []pluginapi.Device{
501 {ID: "R2Device1", Health: pluginapi.Healthy},
502 }
503 resourceName2 := "other.domain2.org/resource2"
504 e2 := &endpointImpl{}
505 testManager.endpoints[resourceName2] = endpointInfo{e: e2, opts: nil}
506 testManager.genericDeviceUpdateCallback(resourceName2, resource2Devs)
507
508 allocatableDevs := testManager.GetAllocatableDevices()
509 as.Equal(2, len(allocatableDevs))
510
511 devInstances1, ok := allocatableDevs[resourceName1]
512 as.True(ok)
513 checkAllocatableDevicesConsistsOf(as, devInstances1, []string{"R1Device1", "R1Device2"})
514
515 devInstances2, ok := allocatableDevs[resourceName2]
516 as.True(ok)
517 checkAllocatableDevicesConsistsOf(as, devInstances2, []string{"R2Device1"})
518
519 }
520
521 func TestGetAllocatableDevicesHealthTransition(t *testing.T) {
522 socketDir, socketName, _, err := tmpSocketDir()
523 topologyStore := topologymanager.NewFakeManager()
524 require.NoError(t, err)
525 defer os.RemoveAll(socketDir)
526 testManager, err := newManagerImpl(socketName, nil, topologyStore)
527 as := assert.New(t)
528 as.NotNil(testManager)
529 as.Nil(err)
530
531 resource1Devs := []pluginapi.Device{
532 {ID: "R1Device1", Health: pluginapi.Healthy},
533 {ID: "R1Device2", Health: pluginapi.Healthy},
534 {ID: "R1Device3", Health: pluginapi.Unhealthy},
535 }
536
537
538
539 resourceName1 := "domain1.com/resource1"
540 e1 := &endpointImpl{}
541 testManager.endpoints[resourceName1] = endpointInfo{e: e1, opts: nil}
542
543 testManager.genericDeviceUpdateCallback(resourceName1, resource1Devs)
544
545 allocatableDevs := testManager.GetAllocatableDevices()
546 as.Equal(1, len(allocatableDevs))
547 devInstances, ok := allocatableDevs[resourceName1]
548 as.True(ok)
549 checkAllocatableDevicesConsistsOf(as, devInstances, []string{"R1Device1", "R1Device2"})
550
551
552 resource1Devs = []pluginapi.Device{
553 {ID: "R1Device1", Health: pluginapi.Healthy},
554 {ID: "R1Device2", Health: pluginapi.Healthy},
555 {ID: "R1Device3", Health: pluginapi.Healthy},
556 }
557 testManager.genericDeviceUpdateCallback(resourceName1, resource1Devs)
558
559 allocatableDevs = testManager.GetAllocatableDevices()
560 as.Equal(1, len(allocatableDevs))
561 devInstances, ok = allocatableDevs[resourceName1]
562 as.True(ok)
563 checkAllocatableDevicesConsistsOf(as, devInstances, []string{"R1Device1", "R1Device2", "R1Device3"})
564 }
565
566 func checkAllocatableDevicesConsistsOf(as *assert.Assertions, devInstances DeviceInstances, expectedDevs []string) {
567 as.Equal(len(expectedDevs), len(devInstances))
568 for _, deviceID := range expectedDevs {
569 _, ok := devInstances[deviceID]
570 as.True(ok)
571 }
572 }
573
574 func constructDevices(devices []string) checkpoint.DevicesPerNUMA {
575 ret := checkpoint.DevicesPerNUMA{}
576 for _, dev := range devices {
577 ret[0] = append(ret[0], dev)
578 }
579 return ret
580 }
581
582
583 type containerAllocateResponseBuilder struct {
584 devices map[string]string
585 mounts map[string]string
586 envs map[string]string
587 cdiDevices []string
588 }
589
590
591 type containerAllocateResponseBuilderOption func(*containerAllocateResponseBuilder)
592
593
594 func withDevices(devices map[string]string) containerAllocateResponseBuilderOption {
595 return func(b *containerAllocateResponseBuilder) {
596 b.devices = devices
597 }
598 }
599
600
601 func withMounts(mounts map[string]string) containerAllocateResponseBuilderOption {
602 return func(b *containerAllocateResponseBuilder) {
603 b.mounts = mounts
604 }
605 }
606
607
608 func withEnvs(envs map[string]string) containerAllocateResponseBuilderOption {
609 return func(b *containerAllocateResponseBuilder) {
610 b.envs = envs
611 }
612 }
613
614
615 func withCDIDevices(cdiDevices ...string) containerAllocateResponseBuilderOption {
616 return func(b *containerAllocateResponseBuilder) {
617 b.cdiDevices = cdiDevices
618 }
619 }
620
621
622 func newContainerAllocateResponse(opts ...containerAllocateResponseBuilderOption) *pluginapi.ContainerAllocateResponse {
623 b := &containerAllocateResponseBuilder{}
624 for _, opt := range opts {
625 opt(b)
626 }
627
628 return b.Build()
629 }
630
631
632 func (b *containerAllocateResponseBuilder) Build() *pluginapi.ContainerAllocateResponse {
633 resp := &pluginapi.ContainerAllocateResponse{}
634 for k, v := range b.devices {
635 resp.Devices = append(resp.Devices, &pluginapi.DeviceSpec{
636 HostPath: k,
637 ContainerPath: v,
638 Permissions: "mrw",
639 })
640 }
641 for k, v := range b.mounts {
642 resp.Mounts = append(resp.Mounts, &pluginapi.Mount{
643 ContainerPath: k,
644 HostPath: v,
645 ReadOnly: true,
646 })
647 }
648 resp.Envs = make(map[string]string)
649 for k, v := range b.envs {
650 resp.Envs[k] = v
651 }
652
653 var cdiDevices []*pluginapi.CDIDevice
654 for _, dev := range b.cdiDevices {
655 cdiDevice := pluginapi.CDIDevice{
656 Name: dev,
657 }
658 cdiDevices = append(cdiDevices, &cdiDevice)
659 }
660 resp.CDIDevices = cdiDevices
661
662 return resp
663 }
664
665 func TestCheckpoint(t *testing.T) {
666 resourceName1 := "domain1.com/resource1"
667 resourceName2 := "domain2.com/resource2"
668 resourceName3 := "domain2.com/resource3"
669 as := assert.New(t)
670 tmpDir, err := os.MkdirTemp("", "checkpoint")
671 as.Nil(err)
672 defer os.RemoveAll(tmpDir)
673 ckm, err := checkpointmanager.NewCheckpointManager(tmpDir)
674 as.Nil(err)
675 testManager := &ManagerImpl{
676 endpoints: make(map[string]endpointInfo),
677 healthyDevices: make(map[string]sets.Set[string]),
678 unhealthyDevices: make(map[string]sets.Set[string]),
679 allocatedDevices: make(map[string]sets.Set[string]),
680 podDevices: newPodDevices(),
681 checkpointManager: ckm,
682 }
683
684 testManager.podDevices.insert("pod1", "con1", resourceName1,
685 constructDevices([]string{"dev1", "dev2"}),
686 newContainerAllocateResponse(
687 withDevices(map[string]string{"/dev/r1dev1": "/dev/r1dev1", "/dev/r1dev2": "/dev/r1dev2"}),
688 withMounts(map[string]string{"/home/r1lib1": "/usr/r1lib1"}),
689 withCDIDevices("domain1.com/resource1=dev1", "domain1.com/resource1=dev2"),
690 ),
691 )
692 testManager.podDevices.insert("pod1", "con1", resourceName2,
693 constructDevices([]string{"dev1", "dev2"}),
694 newContainerAllocateResponse(
695 withDevices(map[string]string{"/dev/r2dev1": "/dev/r2dev1", "/dev/r2dev2": "/dev/r2dev2"}),
696 withMounts(map[string]string{"/home/r2lib1": "/usr/r2lib1"}),
697 withEnvs(map[string]string{"r2devices": "dev1 dev2"}),
698 ),
699 )
700 testManager.podDevices.insert("pod1", "con2", resourceName1,
701 constructDevices([]string{"dev3"}),
702 newContainerAllocateResponse(
703 withDevices(map[string]string{"/dev/r1dev3": "/dev/r1dev3"}),
704 withMounts(map[string]string{"/home/r1lib1": "/usr/r1lib1"}),
705 ),
706 )
707 testManager.podDevices.insert("pod2", "con1", resourceName1,
708 constructDevices([]string{"dev4"}),
709 newContainerAllocateResponse(
710 withDevices(map[string]string{"/dev/r1dev4": "/dev/r1dev4"}),
711 withMounts(map[string]string{"/home/r1lib1": "/usr/r1lib1"}),
712 ),
713 )
714 testManager.podDevices.insert("pod3", "con3", resourceName3,
715 checkpoint.DevicesPerNUMA{nodeWithoutTopology: []string{"dev5"}},
716 newContainerAllocateResponse(
717 withDevices(map[string]string{"/dev/r3dev5": "/dev/r3dev5"}),
718 withMounts(map[string]string{"/home/r3lib1": "/usr/r3lib1"}),
719 ),
720 )
721
722 testManager.healthyDevices[resourceName1] = sets.New[string]()
723 testManager.healthyDevices[resourceName1].Insert("dev1")
724 testManager.healthyDevices[resourceName1].Insert("dev2")
725 testManager.healthyDevices[resourceName1].Insert("dev3")
726 testManager.healthyDevices[resourceName1].Insert("dev4")
727 testManager.healthyDevices[resourceName1].Insert("dev5")
728 testManager.healthyDevices[resourceName2] = sets.New[string]()
729 testManager.healthyDevices[resourceName2].Insert("dev1")
730 testManager.healthyDevices[resourceName2].Insert("dev2")
731 testManager.healthyDevices[resourceName3] = sets.New[string]()
732 testManager.healthyDevices[resourceName3].Insert("dev5")
733
734 expectedPodDevices := testManager.podDevices
735 expectedAllocatedDevices := testManager.podDevices.devices()
736 expectedAllDevices := testManager.healthyDevices
737
738 err = testManager.writeCheckpoint()
739
740 as.Nil(err)
741 testManager.podDevices = newPodDevices()
742 err = testManager.readCheckpoint()
743 as.Nil(err)
744
745 as.Equal(expectedPodDevices.size(), testManager.podDevices.size())
746 for podUID, containerDevices := range expectedPodDevices.devs {
747 for conName, resources := range containerDevices {
748 for resource := range resources {
749 expDevices := expectedPodDevices.containerDevices(podUID, conName, resource)
750 testDevices := testManager.podDevices.containerDevices(podUID, conName, resource)
751 as.True(reflect.DeepEqual(expDevices, testDevices))
752 opts1 := expectedPodDevices.deviceRunContainerOptions(podUID, conName)
753 opts2 := testManager.podDevices.deviceRunContainerOptions(podUID, conName)
754 as.Equal(len(opts1.Envs), len(opts2.Envs))
755 as.Equal(len(opts1.Mounts), len(opts2.Mounts))
756 as.Equal(len(opts1.Devices), len(opts2.Devices))
757 }
758 }
759 }
760 as.True(reflect.DeepEqual(expectedAllocatedDevices, testManager.allocatedDevices))
761 as.True(reflect.DeepEqual(expectedAllDevices, testManager.healthyDevices))
762 }
763
764 type activePodsStub struct {
765 activePods []*v1.Pod
766 }
767
768 func (a *activePodsStub) getActivePods() []*v1.Pod {
769 return a.activePods
770 }
771
772 func (a *activePodsStub) updateActivePods(newPods []*v1.Pod) {
773 a.activePods = newPods
774 }
775
776 type MockEndpoint struct {
777 getPreferredAllocationFunc func(available, mustInclude []string, size int) (*pluginapi.PreferredAllocationResponse, error)
778 allocateFunc func(devs []string) (*pluginapi.AllocateResponse, error)
779 initChan chan []string
780 }
781
782 func (m *MockEndpoint) preStartContainer(devs []string) (*pluginapi.PreStartContainerResponse, error) {
783 m.initChan <- devs
784 return &pluginapi.PreStartContainerResponse{}, nil
785 }
786
787 func (m *MockEndpoint) getPreferredAllocation(available, mustInclude []string, size int) (*pluginapi.PreferredAllocationResponse, error) {
788 if m.getPreferredAllocationFunc != nil {
789 return m.getPreferredAllocationFunc(available, mustInclude, size)
790 }
791 return nil, nil
792 }
793
794 func (m *MockEndpoint) allocate(devs []string) (*pluginapi.AllocateResponse, error) {
795 if m.allocateFunc != nil {
796 return m.allocateFunc(devs)
797 }
798 return nil, nil
799 }
800
801 func (m *MockEndpoint) setStopTime(t time.Time) {}
802
803 func (m *MockEndpoint) isStopped() bool { return false }
804
805 func (m *MockEndpoint) stopGracePeriodExpired() bool { return false }
806
807 func makePod(limits v1.ResourceList) *v1.Pod {
808 return &v1.Pod{
809 ObjectMeta: metav1.ObjectMeta{
810 UID: uuid.NewUUID(),
811 },
812 Spec: v1.PodSpec{
813 Containers: []v1.Container{
814 {
815 Resources: v1.ResourceRequirements{
816 Limits: limits,
817 },
818 },
819 },
820 },
821 }
822 }
823
824 func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestResource) (*wrappedManagerImpl, error) {
825 monitorCallback := func(resourceName string, devices []pluginapi.Device) {}
826 ckm, err := checkpointmanager.NewCheckpointManager(tmpDir)
827 if err != nil {
828 return nil, err
829 }
830 m := &ManagerImpl{
831 healthyDevices: make(map[string]sets.Set[string]),
832 unhealthyDevices: make(map[string]sets.Set[string]),
833 allocatedDevices: make(map[string]sets.Set[string]),
834 endpoints: make(map[string]endpointInfo),
835 podDevices: newPodDevices(),
836 devicesToReuse: make(PodReusableDevices),
837 topologyAffinityStore: topologymanager.NewFakeManager(),
838 activePods: activePods,
839 sourcesReady: &sourcesReadyStub{},
840 checkpointManager: ckm,
841 allDevices: NewResourceDeviceInstances(),
842 }
843 testManager := &wrappedManagerImpl{
844 ManagerImpl: m,
845 socketdir: tmpDir,
846 callback: monitorCallback,
847 }
848
849 for _, res := range testRes {
850 testManager.healthyDevices[res.resourceName] = sets.New[string](res.devs.Devices().UnsortedList()...)
851 if res.resourceName == "domain1.com/resource1" {
852 testManager.endpoints[res.resourceName] = endpointInfo{
853 e: &MockEndpoint{allocateFunc: allocateStubFunc()},
854 opts: nil,
855 }
856 }
857 if res.resourceName == "domain2.com/resource2" {
858 testManager.endpoints[res.resourceName] = endpointInfo{
859 e: &MockEndpoint{
860 allocateFunc: func(devs []string) (*pluginapi.AllocateResponse, error) {
861 resp := new(pluginapi.ContainerAllocateResponse)
862 resp.Envs = make(map[string]string)
863 for _, dev := range devs {
864 switch dev {
865 case "dev3":
866 resp.Envs["key2"] = "val2"
867
868 case "dev4":
869 resp.Envs["key2"] = "val3"
870 }
871 }
872 resps := new(pluginapi.AllocateResponse)
873 resps.ContainerResponses = append(resps.ContainerResponses, resp)
874 return resps, nil
875 },
876 },
877 opts: nil,
878 }
879 }
880 testManager.allDevices[res.resourceName] = makeDevice(res.devs, res.topology)
881
882 }
883 return testManager, nil
884 }
885
886 type TestResource struct {
887 resourceName string
888 resourceQuantity resource.Quantity
889 devs checkpoint.DevicesPerNUMA
890 topology bool
891 }
892
893 func TestFilterByAffinity(t *testing.T) {
894 as := require.New(t)
895 allDevices := ResourceDeviceInstances{
896 "res1": map[string]pluginapi.Device{
897 "dev1": {
898 ID: "dev1",
899 Topology: &pluginapi.TopologyInfo{
900 Nodes: []*pluginapi.NUMANode{
901 {
902 ID: 1,
903 },
904 },
905 },
906 },
907 "dev2": {
908 ID: "dev2",
909 Topology: &pluginapi.TopologyInfo{
910 Nodes: []*pluginapi.NUMANode{
911 {
912 ID: 1,
913 },
914 {
915 ID: 2,
916 },
917 },
918 },
919 },
920 "dev3": {
921 ID: "dev3",
922 Topology: &pluginapi.TopologyInfo{
923 Nodes: []*pluginapi.NUMANode{
924 {
925 ID: 2,
926 },
927 },
928 },
929 },
930 "dev4": {
931 ID: "dev4",
932 Topology: &pluginapi.TopologyInfo{
933 Nodes: []*pluginapi.NUMANode{
934 {
935 ID: 2,
936 },
937 },
938 },
939 },
940 "devwithouttopology": {
941 ID: "dev5",
942 },
943 },
944 }
945
946 fakeAffinity, _ := bitmask.NewBitMask(2)
947 fakeHint := topologymanager.TopologyHint{
948 NUMANodeAffinity: fakeAffinity,
949 Preferred: true,
950 }
951 testManager := ManagerImpl{
952 topologyAffinityStore: topologymanager.NewFakeManagerWithHint(&fakeHint),
953 allDevices: allDevices,
954 }
955
956 testCases := []struct {
957 available sets.Set[string]
958 fromAffinityExpected sets.Set[string]
959 notFromAffinityExpected sets.Set[string]
960 withoutTopologyExpected sets.Set[string]
961 }{
962 {
963 available: sets.New[string]("dev1", "dev2"),
964 fromAffinityExpected: sets.New[string]("dev2"),
965 notFromAffinityExpected: sets.New[string]("dev1"),
966 withoutTopologyExpected: sets.New[string](),
967 },
968 {
969 available: sets.New[string]("dev1", "dev2", "dev3", "dev4"),
970 fromAffinityExpected: sets.New[string]("dev2", "dev3", "dev4"),
971 notFromAffinityExpected: sets.New[string]("dev1"),
972 withoutTopologyExpected: sets.New[string](),
973 },
974 }
975
976 for _, testCase := range testCases {
977 fromAffinity, notFromAffinity, withoutTopology := testManager.filterByAffinity("", "", "res1", testCase.available)
978 as.Truef(fromAffinity.Equal(testCase.fromAffinityExpected), "expect devices from affinity to be %v but got %v", testCase.fromAffinityExpected, fromAffinity)
979 as.Truef(notFromAffinity.Equal(testCase.notFromAffinityExpected), "expect devices not from affinity to be %v but got %v", testCase.notFromAffinityExpected, notFromAffinity)
980 as.Truef(withoutTopology.Equal(testCase.withoutTopologyExpected), "expect devices without topology to be %v but got %v", testCase.notFromAffinityExpected, notFromAffinity)
981 }
982 }
983
984 func TestPodContainerDeviceAllocation(t *testing.T) {
985 res1 := TestResource{
986 resourceName: "domain1.com/resource1",
987 resourceQuantity: *resource.NewQuantity(int64(2), resource.DecimalSI),
988 devs: checkpoint.DevicesPerNUMA{0: []string{"dev1", "dev2"}},
989 topology: true,
990 }
991 res2 := TestResource{
992 resourceName: "domain2.com/resource2",
993 resourceQuantity: *resource.NewQuantity(int64(1), resource.DecimalSI),
994 devs: checkpoint.DevicesPerNUMA{0: []string{"dev3", "dev4"}},
995 topology: false,
996 }
997 testResources := make([]TestResource, 2)
998 testResources = append(testResources, res1)
999 testResources = append(testResources, res2)
1000 as := require.New(t)
1001 podsStub := activePodsStub{
1002 activePods: []*v1.Pod{},
1003 }
1004 tmpDir, err := os.MkdirTemp("", "checkpoint")
1005 as.Nil(err)
1006 defer os.RemoveAll(tmpDir)
1007 testManager, err := getTestManager(tmpDir, podsStub.getActivePods, testResources)
1008 as.Nil(err)
1009
1010 testPods := []*v1.Pod{
1011 makePod(v1.ResourceList{
1012 v1.ResourceName(res1.resourceName): res1.resourceQuantity,
1013 v1.ResourceName("cpu"): res1.resourceQuantity,
1014 v1.ResourceName(res2.resourceName): res2.resourceQuantity}),
1015 makePod(v1.ResourceList{
1016 v1.ResourceName(res1.resourceName): res2.resourceQuantity}),
1017 makePod(v1.ResourceList{
1018 v1.ResourceName(res2.resourceName): res2.resourceQuantity}),
1019 }
1020 testCases := []struct {
1021 description string
1022 testPod *v1.Pod
1023 expectedContainerOptsLen []int
1024 expectedAllocatedResName1 int
1025 expectedAllocatedResName2 int
1026 expErr error
1027 }{
1028 {
1029 description: "Successful allocation of two Res1 resources and one Res2 resource",
1030 testPod: testPods[0],
1031 expectedContainerOptsLen: []int{3, 2, 2},
1032 expectedAllocatedResName1: 2,
1033 expectedAllocatedResName2: 1,
1034 expErr: nil,
1035 },
1036 {
1037 description: "Requesting to create a pod without enough resources should fail",
1038 testPod: testPods[1],
1039 expectedContainerOptsLen: nil,
1040 expectedAllocatedResName1: 2,
1041 expectedAllocatedResName2: 1,
1042 expErr: fmt.Errorf("requested number of devices unavailable for domain1.com/resource1. Requested: 1, Available: 0"),
1043 },
1044 {
1045 description: "Successful allocation of all available Res1 resources and Res2 resources",
1046 testPod: testPods[2],
1047 expectedContainerOptsLen: []int{0, 0, 1},
1048 expectedAllocatedResName1: 2,
1049 expectedAllocatedResName2: 2,
1050 expErr: nil,
1051 },
1052 }
1053 activePods := []*v1.Pod{}
1054 for _, testCase := range testCases {
1055 pod := testCase.testPod
1056 activePods = append(activePods, pod)
1057 podsStub.updateActivePods(activePods)
1058 err := testManager.Allocate(pod, &pod.Spec.Containers[0])
1059 if !reflect.DeepEqual(err, testCase.expErr) {
1060 t.Errorf("DevicePluginManager error (%v). expected error: %v but got: %v",
1061 testCase.description, testCase.expErr, err)
1062 }
1063 runContainerOpts, err := testManager.GetDeviceRunContainerOptions(pod, &pod.Spec.Containers[0])
1064 if testCase.expErr == nil {
1065 as.Nil(err)
1066 }
1067 if testCase.expectedContainerOptsLen == nil {
1068 as.Nil(runContainerOpts)
1069 } else {
1070 as.Equal(len(runContainerOpts.Devices), testCase.expectedContainerOptsLen[0])
1071 as.Equal(len(runContainerOpts.Mounts), testCase.expectedContainerOptsLen[1])
1072 as.Equal(len(runContainerOpts.Envs), testCase.expectedContainerOptsLen[2])
1073 }
1074 as.Equal(testCase.expectedAllocatedResName1, testManager.allocatedDevices[res1.resourceName].Len())
1075 as.Equal(testCase.expectedAllocatedResName2, testManager.allocatedDevices[res2.resourceName].Len())
1076 }
1077
1078 }
1079
1080 func TestPodContainerDeviceToAllocate(t *testing.T) {
1081 resourceName1 := "domain1.com/resource1"
1082 resourceName2 := "domain2.com/resource2"
1083 resourceName3 := "domain2.com/resource3"
1084 as := require.New(t)
1085 tmpDir, err := os.MkdirTemp("", "checkpoint")
1086 as.Nil(err)
1087 defer os.RemoveAll(tmpDir)
1088
1089 testManager := &ManagerImpl{
1090 endpoints: make(map[string]endpointInfo),
1091 healthyDevices: make(map[string]sets.Set[string]),
1092 unhealthyDevices: make(map[string]sets.Set[string]),
1093 allocatedDevices: make(map[string]sets.Set[string]),
1094 podDevices: newPodDevices(),
1095 activePods: func() []*v1.Pod { return []*v1.Pod{} },
1096 sourcesReady: &sourcesReadyStub{},
1097 }
1098
1099 testManager.podDevices.insert("pod1", "con1", resourceName1,
1100 constructDevices([]string{"dev1", "dev2"}),
1101 newContainerAllocateResponse(
1102 withDevices(map[string]string{"/dev/r2dev1": "/dev/r2dev1", "/dev/r2dev2": "/dev/r2dev2"}),
1103 withMounts(map[string]string{"/home/r2lib1": "/usr/r2lib1"}),
1104 withEnvs(map[string]string{"r2devices": "dev1 dev2"}),
1105 ),
1106 )
1107 testManager.podDevices.insert("pod2", "con2", resourceName2,
1108 checkpoint.DevicesPerNUMA{nodeWithoutTopology: []string{"dev5"}},
1109 newContainerAllocateResponse(
1110 withDevices(map[string]string{"/dev/r1dev5": "/dev/r1dev5"}),
1111 withMounts(map[string]string{"/home/r1lib1": "/usr/r1lib1"}),
1112 ),
1113 )
1114 testManager.podDevices.insert("pod3", "con3", resourceName3,
1115 checkpoint.DevicesPerNUMA{nodeWithoutTopology: []string{"dev5"}},
1116 newContainerAllocateResponse(
1117 withDevices(map[string]string{"/dev/r1dev5": "/dev/r1dev5"}),
1118 withMounts(map[string]string{"/home/r1lib1": "/usr/r1lib1"}),
1119 ),
1120 )
1121
1122
1123
1124
1125 testManager.healthyDevices[resourceName1] = sets.New[string]()
1126 testManager.healthyDevices[resourceName3] = sets.New[string]()
1127
1128 testManager.healthyDevices[resourceName3].Insert("dev7")
1129 testManager.healthyDevices[resourceName3].Insert("dev8")
1130
1131 testCases := []struct {
1132 description string
1133 podUID string
1134 contName string
1135 resource string
1136 required int
1137 reusableDevices sets.Set[string]
1138 expectedAllocatedDevices sets.Set[string]
1139 expErr error
1140 }{
1141 {
1142 description: "Admission error in case no healthy devices to allocate present",
1143 podUID: "pod1",
1144 contName: "con1",
1145 resource: resourceName1,
1146 required: 2,
1147 reusableDevices: sets.New[string](),
1148 expectedAllocatedDevices: nil,
1149 expErr: fmt.Errorf("no healthy devices present; cannot allocate unhealthy devices %s", resourceName1),
1150 },
1151 {
1152 description: "Admission error in case resource is not registered",
1153 podUID: "pod2",
1154 contName: "con2",
1155 resource: resourceName2,
1156 required: 1,
1157 reusableDevices: sets.New[string](),
1158 expectedAllocatedDevices: nil,
1159 expErr: fmt.Errorf("cannot allocate unregistered device %s", resourceName2),
1160 },
1161 {
1162 description: "Admission error in case resource not devices previously allocated no longer healthy",
1163 podUID: "pod3",
1164 contName: "con3",
1165 resource: resourceName3,
1166 required: 1,
1167 reusableDevices: sets.New[string](),
1168 expectedAllocatedDevices: nil,
1169 expErr: fmt.Errorf("previously allocated devices are no longer healthy; cannot allocate unhealthy devices %s", resourceName3),
1170 },
1171 }
1172
1173 for _, testCase := range testCases {
1174 allocDevices, err := testManager.devicesToAllocate(testCase.podUID, testCase.contName, testCase.resource, testCase.required, testCase.reusableDevices)
1175 if !reflect.DeepEqual(err, testCase.expErr) {
1176 t.Errorf("devicePluginManager error (%v). expected error: %v but got: %v",
1177 testCase.description, testCase.expErr, err)
1178 }
1179 if !reflect.DeepEqual(allocDevices, testCase.expectedAllocatedDevices) {
1180 t.Errorf("devicePluginManager error (%v). expected error: %v but got: %v",
1181 testCase.description, testCase.expectedAllocatedDevices, allocDevices)
1182 }
1183 }
1184
1185 }
1186
1187 func TestDevicesToAllocateConflictWithUpdateAllocatedDevices(t *testing.T) {
1188 podToAllocate := "podToAllocate"
1189 containerToAllocate := "containerToAllocate"
1190 podToRemove := "podToRemove"
1191 containerToRemove := "containerToRemove"
1192 deviceID := "deviceID"
1193 resourceName := "domain1.com/resource"
1194
1195 socket := filepath.Join(os.TempDir(), esocketName())
1196 devs := []*pluginapi.Device{
1197 {ID: deviceID, Health: pluginapi.Healthy},
1198 }
1199 p, e := esetup(t, devs, socket, resourceName, func(n string, d []pluginapi.Device) {})
1200
1201 waitUpdateAllocatedDevicesChan := make(chan struct{})
1202 waitSetGetPreferredAllocChan := make(chan struct{})
1203
1204 p.SetGetPreferredAllocFunc(func(r *pluginapi.PreferredAllocationRequest, devs map[string]pluginapi.Device) (*pluginapi.PreferredAllocationResponse, error) {
1205 waitSetGetPreferredAllocChan <- struct{}{}
1206 <-waitUpdateAllocatedDevicesChan
1207 return &pluginapi.PreferredAllocationResponse{
1208 ContainerResponses: []*pluginapi.ContainerPreferredAllocationResponse{
1209 {
1210 DeviceIDs: []string{deviceID},
1211 },
1212 },
1213 }, nil
1214 })
1215
1216 testManager := &ManagerImpl{
1217 endpoints: make(map[string]endpointInfo),
1218 healthyDevices: make(map[string]sets.Set[string]),
1219 unhealthyDevices: make(map[string]sets.Set[string]),
1220 allocatedDevices: make(map[string]sets.Set[string]),
1221 podDevices: newPodDevices(),
1222 activePods: func() []*v1.Pod { return []*v1.Pod{} },
1223 sourcesReady: &sourcesReadyStub{},
1224 topologyAffinityStore: topologymanager.NewFakeManager(),
1225 }
1226
1227 testManager.endpoints[resourceName] = endpointInfo{
1228 e: e,
1229 opts: &pluginapi.DevicePluginOptions{
1230 GetPreferredAllocationAvailable: true,
1231 },
1232 }
1233 testManager.healthyDevices[resourceName] = sets.New[string](deviceID)
1234 testManager.podDevices.insert(podToRemove, containerToRemove, resourceName, nil, nil)
1235
1236 go func() {
1237 <-waitSetGetPreferredAllocChan
1238 testManager.UpdateAllocatedDevices()
1239 waitUpdateAllocatedDevicesChan <- struct{}{}
1240 }()
1241
1242 set, err := testManager.devicesToAllocate(podToAllocate, containerToAllocate, resourceName, 1, sets.New[string]())
1243 assert.NoError(t, err)
1244 assert.Equal(t, set, sets.New[string](deviceID))
1245 }
1246
1247 func TestGetDeviceRunContainerOptions(t *testing.T) {
1248 res1 := TestResource{
1249 resourceName: "domain1.com/resource1",
1250 resourceQuantity: *resource.NewQuantity(int64(2), resource.DecimalSI),
1251 devs: checkpoint.DevicesPerNUMA{0: []string{"dev1", "dev2"}},
1252 topology: true,
1253 }
1254 res2 := TestResource{
1255 resourceName: "domain2.com/resource2",
1256 resourceQuantity: *resource.NewQuantity(int64(1), resource.DecimalSI),
1257 devs: checkpoint.DevicesPerNUMA{0: []string{"dev3", "dev4"}},
1258 topology: false,
1259 }
1260
1261 testResources := make([]TestResource, 2)
1262 testResources = append(testResources, res1)
1263 testResources = append(testResources, res2)
1264
1265 podsStub := activePodsStub{
1266 activePods: []*v1.Pod{},
1267 }
1268 as := require.New(t)
1269
1270 tmpDir, err := os.MkdirTemp("", "checkpoint")
1271 as.Nil(err)
1272 defer os.RemoveAll(tmpDir)
1273
1274 testManager, err := getTestManager(tmpDir, podsStub.getActivePods, testResources)
1275 as.Nil(err)
1276
1277 pod1 := makePod(v1.ResourceList{
1278 v1.ResourceName(res1.resourceName): res1.resourceQuantity,
1279 v1.ResourceName(res2.resourceName): res2.resourceQuantity,
1280 })
1281 pod2 := makePod(v1.ResourceList{
1282 v1.ResourceName(res2.resourceName): res2.resourceQuantity,
1283 })
1284
1285 activePods := []*v1.Pod{pod1, pod2}
1286 podsStub.updateActivePods(activePods)
1287
1288 err = testManager.Allocate(pod1, &pod1.Spec.Containers[0])
1289 as.Nil(err)
1290 err = testManager.Allocate(pod2, &pod2.Spec.Containers[0])
1291 as.Nil(err)
1292
1293
1294 runContainerOpts, err := testManager.GetDeviceRunContainerOptions(pod1, &pod1.Spec.Containers[0])
1295 as.Nil(err)
1296 as.Equal(len(runContainerOpts.Devices), 3)
1297 as.Equal(len(runContainerOpts.Mounts), 2)
1298 as.Equal(len(runContainerOpts.Envs), 2)
1299
1300 activePods = []*v1.Pod{pod2}
1301 podsStub.updateActivePods(activePods)
1302 testManager.UpdateAllocatedDevices()
1303
1304
1305 runContainerOpts, err = testManager.GetDeviceRunContainerOptions(pod1, &pod1.Spec.Containers[0])
1306 as.Nil(err)
1307 as.Nil(runContainerOpts)
1308 }
1309
1310 func TestInitContainerDeviceAllocation(t *testing.T) {
1311
1312
1313 res1 := TestResource{
1314 resourceName: "domain1.com/resource1",
1315 resourceQuantity: *resource.NewQuantity(int64(2), resource.DecimalSI),
1316 devs: checkpoint.DevicesPerNUMA{0: []string{"dev1", "dev2"}},
1317 topology: false,
1318 }
1319 res2 := TestResource{
1320 resourceName: "domain2.com/resource2",
1321 resourceQuantity: *resource.NewQuantity(int64(1), resource.DecimalSI),
1322 devs: checkpoint.DevicesPerNUMA{0: []string{"dev3", "dev4"}},
1323 topology: true,
1324 }
1325 testResources := make([]TestResource, 2)
1326 testResources = append(testResources, res1)
1327 testResources = append(testResources, res2)
1328 as := require.New(t)
1329 podsStub := activePodsStub{
1330 activePods: []*v1.Pod{},
1331 }
1332 tmpDir, err := os.MkdirTemp("", "checkpoint")
1333 as.Nil(err)
1334 defer os.RemoveAll(tmpDir)
1335
1336 testManager, err := getTestManager(tmpDir, podsStub.getActivePods, testResources)
1337 as.Nil(err)
1338
1339 podWithPluginResourcesInInitContainers := &v1.Pod{
1340 ObjectMeta: metav1.ObjectMeta{
1341 UID: uuid.NewUUID(),
1342 },
1343 Spec: v1.PodSpec{
1344 InitContainers: []v1.Container{
1345 {
1346 Name: string(uuid.NewUUID()),
1347 Resources: v1.ResourceRequirements{
1348 Limits: v1.ResourceList{
1349 v1.ResourceName(res1.resourceName): res2.resourceQuantity,
1350 },
1351 },
1352 },
1353 {
1354 Name: string(uuid.NewUUID()),
1355 Resources: v1.ResourceRequirements{
1356 Limits: v1.ResourceList{
1357 v1.ResourceName(res1.resourceName): res1.resourceQuantity,
1358 },
1359 },
1360 },
1361 },
1362 Containers: []v1.Container{
1363 {
1364 Name: string(uuid.NewUUID()),
1365 Resources: v1.ResourceRequirements{
1366 Limits: v1.ResourceList{
1367 v1.ResourceName(res1.resourceName): res2.resourceQuantity,
1368 v1.ResourceName(res2.resourceName): res2.resourceQuantity,
1369 },
1370 },
1371 },
1372 {
1373 Name: string(uuid.NewUUID()),
1374 Resources: v1.ResourceRequirements{
1375 Limits: v1.ResourceList{
1376 v1.ResourceName(res1.resourceName): res2.resourceQuantity,
1377 v1.ResourceName(res2.resourceName): res2.resourceQuantity,
1378 },
1379 },
1380 },
1381 },
1382 },
1383 }
1384 podsStub.updateActivePods([]*v1.Pod{podWithPluginResourcesInInitContainers})
1385 for _, container := range podWithPluginResourcesInInitContainers.Spec.InitContainers {
1386 err = testManager.Allocate(podWithPluginResourcesInInitContainers, &container)
1387 }
1388 for _, container := range podWithPluginResourcesInInitContainers.Spec.Containers {
1389 err = testManager.Allocate(podWithPluginResourcesInInitContainers, &container)
1390 }
1391 as.Nil(err)
1392 podUID := string(podWithPluginResourcesInInitContainers.UID)
1393 initCont1 := podWithPluginResourcesInInitContainers.Spec.InitContainers[0].Name
1394 initCont2 := podWithPluginResourcesInInitContainers.Spec.InitContainers[1].Name
1395 normalCont1 := podWithPluginResourcesInInitContainers.Spec.Containers[0].Name
1396 normalCont2 := podWithPluginResourcesInInitContainers.Spec.Containers[1].Name
1397 initCont1Devices := testManager.podDevices.containerDevices(podUID, initCont1, res1.resourceName)
1398 initCont2Devices := testManager.podDevices.containerDevices(podUID, initCont2, res1.resourceName)
1399 normalCont1Devices := testManager.podDevices.containerDevices(podUID, normalCont1, res1.resourceName)
1400 normalCont2Devices := testManager.podDevices.containerDevices(podUID, normalCont2, res1.resourceName)
1401 as.Equal(1, initCont1Devices.Len())
1402 as.Equal(2, initCont2Devices.Len())
1403 as.Equal(1, normalCont1Devices.Len())
1404 as.Equal(1, normalCont2Devices.Len())
1405 as.True(initCont2Devices.IsSuperset(initCont1Devices))
1406 as.True(initCont2Devices.IsSuperset(normalCont1Devices))
1407 as.True(initCont2Devices.IsSuperset(normalCont2Devices))
1408 as.Equal(0, normalCont1Devices.Intersection(normalCont2Devices).Len())
1409 }
1410
1411 func TestRestartableInitContainerDeviceAllocation(t *testing.T) {
1412
1413
1414
1415 oneDevice := resource.NewQuantity(int64(1), resource.DecimalSI)
1416 twoDevice := resource.NewQuantity(int64(2), resource.DecimalSI)
1417 threeDevice := resource.NewQuantity(int64(3), resource.DecimalSI)
1418 res1 := TestResource{
1419 resourceName: "domain1.com/resource1",
1420 resourceQuantity: *resource.NewQuantity(int64(6), resource.DecimalSI),
1421 devs: checkpoint.DevicesPerNUMA{
1422 0: []string{"dev1", "dev2", "dev3", "dev4", "dev5", "dev6"},
1423 },
1424 topology: false,
1425 }
1426 testResources := []TestResource{
1427 res1,
1428 }
1429 as := require.New(t)
1430 podsStub := activePodsStub{
1431 activePods: []*v1.Pod{},
1432 }
1433 tmpDir, err := os.MkdirTemp("", "checkpoint")
1434 as.Nil(err)
1435 defer os.RemoveAll(tmpDir)
1436
1437 testManager, err := getTestManager(tmpDir, podsStub.getActivePods, testResources)
1438 as.Nil(err)
1439
1440 containerRestartPolicyAlways := v1.ContainerRestartPolicyAlways
1441 podWithPluginResourcesInRestartableInitContainers := &v1.Pod{
1442 ObjectMeta: metav1.ObjectMeta{
1443 UID: uuid.NewUUID(),
1444 },
1445 Spec: v1.PodSpec{
1446 InitContainers: []v1.Container{
1447 {
1448 Name: string(uuid.NewUUID()),
1449 Resources: v1.ResourceRequirements{
1450 Limits: v1.ResourceList{
1451 v1.ResourceName(res1.resourceName): *threeDevice,
1452 },
1453 },
1454 },
1455 {
1456 Name: string(uuid.NewUUID()),
1457 Resources: v1.ResourceRequirements{
1458 Limits: v1.ResourceList{
1459 v1.ResourceName(res1.resourceName): *oneDevice,
1460 },
1461 },
1462 RestartPolicy: &containerRestartPolicyAlways,
1463 },
1464 {
1465 Name: string(uuid.NewUUID()),
1466 Resources: v1.ResourceRequirements{
1467 Limits: v1.ResourceList{
1468 v1.ResourceName(res1.resourceName): *twoDevice,
1469 },
1470 },
1471 RestartPolicy: &containerRestartPolicyAlways,
1472 },
1473 },
1474 Containers: []v1.Container{
1475 {
1476 Name: string(uuid.NewUUID()),
1477 Resources: v1.ResourceRequirements{
1478 Limits: v1.ResourceList{
1479 v1.ResourceName(res1.resourceName): *oneDevice,
1480 },
1481 },
1482 },
1483 {
1484 Name: string(uuid.NewUUID()),
1485 Resources: v1.ResourceRequirements{
1486 Limits: v1.ResourceList{
1487 v1.ResourceName(res1.resourceName): *twoDevice,
1488 },
1489 },
1490 },
1491 },
1492 },
1493 }
1494 podsStub.updateActivePods([]*v1.Pod{podWithPluginResourcesInRestartableInitContainers})
1495 for _, container := range podWithPluginResourcesInRestartableInitContainers.Spec.InitContainers {
1496 err = testManager.Allocate(podWithPluginResourcesInRestartableInitContainers, &container)
1497 }
1498 for _, container := range podWithPluginResourcesInRestartableInitContainers.Spec.Containers {
1499 err = testManager.Allocate(podWithPluginResourcesInRestartableInitContainers, &container)
1500 }
1501 as.Nil(err)
1502 podUID := string(podWithPluginResourcesInRestartableInitContainers.UID)
1503 regularInitCont1 := podWithPluginResourcesInRestartableInitContainers.Spec.InitContainers[0].Name
1504 restartableInitCont2 := podWithPluginResourcesInRestartableInitContainers.Spec.InitContainers[1].Name
1505 restartableInitCont3 := podWithPluginResourcesInRestartableInitContainers.Spec.InitContainers[2].Name
1506 normalCont1 := podWithPluginResourcesInRestartableInitContainers.Spec.Containers[0].Name
1507 normalCont2 := podWithPluginResourcesInRestartableInitContainers.Spec.Containers[1].Name
1508 regularInitCont1Devices := testManager.podDevices.containerDevices(podUID, regularInitCont1, res1.resourceName)
1509 restartableInitCont2Devices := testManager.podDevices.containerDevices(podUID, restartableInitCont2, res1.resourceName)
1510 restartableInitCont3Devices := testManager.podDevices.containerDevices(podUID, restartableInitCont3, res1.resourceName)
1511 normalCont1Devices := testManager.podDevices.containerDevices(podUID, normalCont1, res1.resourceName)
1512 normalCont2Devices := testManager.podDevices.containerDevices(podUID, normalCont2, res1.resourceName)
1513 as.Equal(3, regularInitCont1Devices.Len())
1514 as.Equal(1, restartableInitCont2Devices.Len())
1515 as.Equal(2, restartableInitCont3Devices.Len())
1516 as.Equal(1, normalCont1Devices.Len())
1517 as.Equal(2, normalCont2Devices.Len())
1518 as.True(regularInitCont1Devices.IsSuperset(restartableInitCont2Devices))
1519 as.True(regularInitCont1Devices.IsSuperset(restartableInitCont3Devices))
1520
1521
1522 dedicatedContainerDevices := []sets.Set[string]{
1523 restartableInitCont2Devices,
1524 restartableInitCont3Devices,
1525 normalCont1Devices,
1526 normalCont2Devices,
1527 }
1528
1529 for i := 0; i < len(dedicatedContainerDevices)-1; i++ {
1530 for j := i + 1; j < len(dedicatedContainerDevices); j++ {
1531 t.Logf("containerDevices[%d] = %v", i, dedicatedContainerDevices[i])
1532 t.Logf("containerDevices[%d] = %v", j, dedicatedContainerDevices[j])
1533 as.Empty(dedicatedContainerDevices[i].Intersection(dedicatedContainerDevices[j]))
1534 }
1535 }
1536 }
1537
1538 func TestUpdatePluginResources(t *testing.T) {
1539 pod := &v1.Pod{}
1540 pod.UID = types.UID("testPod")
1541
1542 resourceName1 := "domain1.com/resource1"
1543 devID1 := "dev1"
1544
1545 resourceName2 := "domain2.com/resource2"
1546 devID2 := "dev2"
1547
1548 as := assert.New(t)
1549 monitorCallback := func(resourceName string, devices []pluginapi.Device) {}
1550 tmpDir, err := os.MkdirTemp("", "checkpoint")
1551 as.Nil(err)
1552 defer os.RemoveAll(tmpDir)
1553
1554 ckm, err := checkpointmanager.NewCheckpointManager(tmpDir)
1555 as.Nil(err)
1556 m := &ManagerImpl{
1557 allocatedDevices: make(map[string]sets.Set[string]),
1558 healthyDevices: make(map[string]sets.Set[string]),
1559 podDevices: newPodDevices(),
1560 checkpointManager: ckm,
1561 }
1562 testManager := wrappedManagerImpl{
1563 ManagerImpl: m,
1564 callback: monitorCallback,
1565 }
1566 testManager.podDevices.devs[string(pod.UID)] = make(containerDevices)
1567
1568
1569 testManager.allocatedDevices[resourceName1] = sets.New[string]()
1570 testManager.allocatedDevices[resourceName1].Insert(devID1)
1571 testManager.allocatedDevices[resourceName2] = sets.New[string]()
1572 testManager.allocatedDevices[resourceName2].Insert(devID2)
1573
1574 cachedNode := &v1.Node{
1575 Status: v1.NodeStatus{
1576 Allocatable: v1.ResourceList{
1577
1578 v1.ResourceName(resourceName2): *resource.NewQuantity(int64(2), resource.DecimalSI),
1579 },
1580 },
1581 }
1582 nodeInfo := &schedulerframework.NodeInfo{}
1583 nodeInfo.SetNode(cachedNode)
1584
1585 testManager.UpdatePluginResources(nodeInfo, &lifecycle.PodAdmitAttributes{Pod: pod})
1586
1587 allocatableScalarResources := nodeInfo.Allocatable.ScalarResources
1588
1589 as.Equal(1, int(allocatableScalarResources[v1.ResourceName(resourceName1)]))
1590
1591 as.Equal(2, int(allocatableScalarResources[v1.ResourceName(resourceName2)]))
1592 }
1593
1594 func TestDevicePreStartContainer(t *testing.T) {
1595
1596
1597
1598 res1 := TestResource{
1599 resourceName: "domain1.com/resource1",
1600 resourceQuantity: *resource.NewQuantity(int64(2), resource.DecimalSI),
1601 devs: checkpoint.DevicesPerNUMA{0: []string{"dev1", "dev2"}},
1602 topology: false,
1603 }
1604 as := require.New(t)
1605 podsStub := activePodsStub{
1606 activePods: []*v1.Pod{},
1607 }
1608 tmpDir, err := os.MkdirTemp("", "checkpoint")
1609 as.Nil(err)
1610 defer os.RemoveAll(tmpDir)
1611
1612 testManager, err := getTestManager(tmpDir, podsStub.getActivePods, []TestResource{res1})
1613 as.Nil(err)
1614
1615 ch := make(chan []string, 1)
1616 testManager.endpoints[res1.resourceName] = endpointInfo{
1617 e: &MockEndpoint{
1618 initChan: ch,
1619 allocateFunc: allocateStubFunc(),
1620 },
1621 opts: &pluginapi.DevicePluginOptions{PreStartRequired: true},
1622 }
1623 pod := makePod(v1.ResourceList{
1624 v1.ResourceName(res1.resourceName): res1.resourceQuantity})
1625 activePods := []*v1.Pod{}
1626 activePods = append(activePods, pod)
1627 podsStub.updateActivePods(activePods)
1628 err = testManager.Allocate(pod, &pod.Spec.Containers[0])
1629 as.Nil(err)
1630 runContainerOpts, err := testManager.GetDeviceRunContainerOptions(pod, &pod.Spec.Containers[0])
1631 as.Nil(err)
1632 var initializedDevs []string
1633 select {
1634 case <-time.After(time.Second):
1635 t.Fatalf("Timed out while waiting on channel for response from PreStartContainer RPC stub")
1636 case initializedDevs = <-ch:
1637 break
1638 }
1639
1640 as.Contains(initializedDevs, "dev1")
1641 as.Contains(initializedDevs, "dev2")
1642 as.Equal(len(initializedDevs), res1.devs.Devices().Len())
1643
1644 expectedResps, err := allocateStubFunc()([]string{"dev1", "dev2"})
1645 as.Nil(err)
1646 as.Equal(1, len(expectedResps.ContainerResponses))
1647 expectedResp := expectedResps.ContainerResponses[0]
1648 as.Equal(len(runContainerOpts.Devices), len(expectedResp.Devices))
1649 as.Equal(len(runContainerOpts.Mounts), len(expectedResp.Mounts))
1650 as.Equal(len(runContainerOpts.Envs), len(expectedResp.Envs))
1651
1652 pod2 := makePod(v1.ResourceList{
1653 v1.ResourceName(res1.resourceName): *resource.NewQuantity(int64(0), resource.DecimalSI)})
1654 activePods = append(activePods, pod2)
1655 podsStub.updateActivePods(activePods)
1656 err = testManager.Allocate(pod2, &pod2.Spec.Containers[0])
1657 as.Nil(err)
1658 _, err = testManager.GetDeviceRunContainerOptions(pod2, &pod2.Spec.Containers[0])
1659 as.Nil(err)
1660 select {
1661 case <-time.After(time.Millisecond):
1662 t.Log("When pod resourceQuantity is 0, PreStartContainer RPC stub will be skipped")
1663 case <-ch:
1664 break
1665 }
1666 }
1667
1668 func TestResetExtendedResource(t *testing.T) {
1669 as := assert.New(t)
1670 tmpDir, err := os.MkdirTemp("", "checkpoint")
1671 as.Nil(err)
1672 defer os.RemoveAll(tmpDir)
1673 ckm, err := checkpointmanager.NewCheckpointManager(tmpDir)
1674 as.Nil(err)
1675 testManager := &ManagerImpl{
1676 endpoints: make(map[string]endpointInfo),
1677 healthyDevices: make(map[string]sets.Set[string]),
1678 unhealthyDevices: make(map[string]sets.Set[string]),
1679 allocatedDevices: make(map[string]sets.Set[string]),
1680 podDevices: newPodDevices(),
1681 checkpointManager: ckm,
1682 }
1683
1684 extendedResourceName := "domain.com/resource"
1685 testManager.podDevices.insert("pod", "con", extendedResourceName,
1686 constructDevices([]string{"dev1"}),
1687 newContainerAllocateResponse(
1688 withDevices(map[string]string{"/dev/dev1": "/dev/dev1"}),
1689 withMounts(map[string]string{"/home/lib1": "/usr/lib1"}),
1690 ),
1691 )
1692
1693 testManager.healthyDevices[extendedResourceName] = sets.New[string]()
1694 testManager.healthyDevices[extendedResourceName].Insert("dev1")
1695
1696 err = testManager.writeCheckpoint()
1697 as.Nil(err)
1698
1699 as.False(testManager.ShouldResetExtendedResourceCapacity())
1700
1701
1702 ckpts, err := ckm.ListCheckpoints()
1703 as.Nil(err)
1704 for _, ckpt := range ckpts {
1705 err = ckm.RemoveCheckpoint(ckpt)
1706 as.Nil(err)
1707 }
1708 as.True(testManager.ShouldResetExtendedResourceCapacity())
1709 }
1710
1711 func allocateStubFunc() func(devs []string) (*pluginapi.AllocateResponse, error) {
1712 return func(devs []string) (*pluginapi.AllocateResponse, error) {
1713 resp := new(pluginapi.ContainerAllocateResponse)
1714 resp.Envs = make(map[string]string)
1715 for _, dev := range devs {
1716 switch dev {
1717 case "dev1":
1718 resp.Devices = append(resp.Devices, &pluginapi.DeviceSpec{
1719 ContainerPath: "/dev/aaa",
1720 HostPath: "/dev/aaa",
1721 Permissions: "mrw",
1722 })
1723
1724 resp.Devices = append(resp.Devices, &pluginapi.DeviceSpec{
1725 ContainerPath: "/dev/bbb",
1726 HostPath: "/dev/bbb",
1727 Permissions: "mrw",
1728 })
1729
1730 resp.Mounts = append(resp.Mounts, &pluginapi.Mount{
1731 ContainerPath: "/container_dir1/file1",
1732 HostPath: "host_dir1/file1",
1733 ReadOnly: true,
1734 })
1735
1736 case "dev2":
1737 resp.Devices = append(resp.Devices, &pluginapi.DeviceSpec{
1738 ContainerPath: "/dev/ccc",
1739 HostPath: "/dev/ccc",
1740 Permissions: "mrw",
1741 })
1742
1743 resp.Mounts = append(resp.Mounts, &pluginapi.Mount{
1744 ContainerPath: "/container_dir1/file2",
1745 HostPath: "host_dir1/file2",
1746 ReadOnly: true,
1747 })
1748
1749 resp.Envs["key1"] = "val1"
1750 }
1751 }
1752 resps := new(pluginapi.AllocateResponse)
1753 resps.ContainerResponses = append(resps.ContainerResponses, resp)
1754 return resps, nil
1755 }
1756 }
1757
1758 func makeDevice(devOnNUMA checkpoint.DevicesPerNUMA, topology bool) map[string]pluginapi.Device {
1759 res := make(map[string]pluginapi.Device)
1760 var topologyInfo *pluginapi.TopologyInfo
1761 for node, devs := range devOnNUMA {
1762 if topology {
1763 topologyInfo = &pluginapi.TopologyInfo{Nodes: []*pluginapi.NUMANode{{ID: node}}}
1764 } else {
1765 topologyInfo = nil
1766 }
1767 for idx := range devs {
1768 res[devs[idx]] = pluginapi.Device{ID: devs[idx], Topology: topologyInfo}
1769 }
1770 }
1771 return res
1772 }
1773
1774 const deviceManagerCheckpointFilename = "kubelet_internal_checkpoint"
1775
1776 var oldCheckpoint string = `{"Data":{"PodDeviceEntries":[{"PodUID":"13ac2284-0d19-44b7-b94f-055b032dba9b","ContainerName":"centos","ResourceName":"example.com/deviceA","DeviceIDs":["DevA3"],"AllocResp":"CiIKHUVYQU1QTEVDT01ERVZJQ0VBX0RFVkEzX1RUWTEwEgEwGhwKCi9kZXYvdHR5MTASCi9kZXYvdHR5MTAaAnJ3"},{"PodUID":"86b9a017-c9ca-4069-815f-46ca3e53c1e4","ContainerName":"centos","ResourceName":"example.com/deviceA","DeviceIDs":["DevA4"],"AllocResp":"CiIKHUVYQU1QTEVDT01ERVZJQ0VBX0RFVkE0X1RUWTExEgEwGhwKCi9kZXYvdHR5MTESCi9kZXYvdHR5MTEaAnJ3"}],"RegisteredDevices":{"example.com/deviceA":["DevA1","DevA2","DevA3","DevA4"]}},"Checksum":405612085}`
1777
1778 func TestReadPreNUMACheckpoint(t *testing.T) {
1779 socketDir, socketName, _, err := tmpSocketDir()
1780 require.NoError(t, err)
1781 defer os.RemoveAll(socketDir)
1782
1783 err = os.WriteFile(filepath.Join(socketDir, deviceManagerCheckpointFilename), []byte(oldCheckpoint), 0644)
1784 require.NoError(t, err)
1785
1786 topologyStore := topologymanager.NewFakeManager()
1787 nodes := []cadvisorapi.Node{{Id: 0}}
1788 m, err := newManagerImpl(socketName, nodes, topologyStore)
1789 require.NoError(t, err)
1790
1791
1792 err = m.readCheckpoint()
1793 require.NoError(t, err)
1794 }
1795
1796 func TestGetTopologyHintsWithUpdates(t *testing.T) {
1797 socketDir, socketName, _, err := tmpSocketDir()
1798 defer os.RemoveAll(socketDir)
1799 require.NoError(t, err)
1800
1801 devs := []pluginapi.Device{}
1802 for i := 0; i < 1000; i++ {
1803 devs = append(devs, pluginapi.Device{
1804 ID: fmt.Sprintf("dev-%d", i),
1805 Health: pluginapi.Healthy,
1806 Topology: &pluginapi.TopologyInfo{
1807 Nodes: []*pluginapi.NUMANode{
1808 {ID: 0},
1809 },
1810 }})
1811 }
1812 testPod := makePod(v1.ResourceList{
1813 testResourceName: *resource.NewQuantity(int64(1), resource.DecimalSI),
1814 })
1815 topology := []cadvisorapi.Node{
1816 {Id: 0},
1817 }
1818 testCases := []struct {
1819 description string
1820 count int
1821 devices []pluginapi.Device
1822 testfunc func(manager *wrappedManagerImpl)
1823 }{
1824 {
1825 description: "GetTopologyHints data race when update device",
1826 count: 10,
1827 devices: devs,
1828 testfunc: func(manager *wrappedManagerImpl) {
1829 manager.GetTopologyHints(testPod, &testPod.Spec.Containers[0])
1830 },
1831 },
1832 {
1833 description: "GetPodTopologyHints data race when update device",
1834 count: 10,
1835 devices: devs,
1836 testfunc: func(manager *wrappedManagerImpl) {
1837 manager.GetPodTopologyHints(testPod)
1838 },
1839 },
1840 }
1841
1842 for _, test := range testCases {
1843 t.Run(test.description, func(t *testing.T) {
1844 m, _ := setupDeviceManager(t, nil, nil, socketName, topology)
1845 defer m.Stop()
1846 mimpl := m.(*wrappedManagerImpl)
1847
1848 wg := sync.WaitGroup{}
1849 wg.Add(2)
1850
1851 updated := atomic.Bool{}
1852 updated.Store(false)
1853 go func() {
1854 defer wg.Done()
1855 for i := 0; i < test.count; i++ {
1856
1857 mimpl.genericDeviceUpdateCallback(testResourceName, devs)
1858 }
1859 updated.Store(true)
1860 }()
1861 go func() {
1862 defer wg.Done()
1863 for !updated.Load() {
1864
1865
1866
1867
1868 test.testfunc(mimpl)
1869 }
1870 }()
1871 wg.Wait()
1872 })
1873 }
1874 }
1875
View as plain text