1
16
17 package devicemanager
18
19 import (
20 "fmt"
21 "os"
22 "path/filepath"
23 "sync"
24 "testing"
25 "time"
26
27 "github.com/stretchr/testify/require"
28
29 pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
30 plugin "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/plugin/v1beta1"
31 )
32
33
34
35
36 type monitorCallback func(resourceName string, devices []pluginapi.Device)
37
38 func newMockPluginManager() *mockPluginManager {
39 return &mockPluginManager{
40 func(string) error { return nil },
41 func(string, plugin.DevicePlugin) error { return nil },
42 func(string) {},
43 func(string, *pluginapi.ListAndWatchResponse) {},
44 }
45 }
46
47 type mockPluginManager struct {
48 cleanupPluginDirectory func(string) error
49 pluginConnected func(string, plugin.DevicePlugin) error
50 pluginDisconnected func(string)
51 pluginListAndWatchReceiver func(string, *pluginapi.ListAndWatchResponse)
52 }
53
54 func (m *mockPluginManager) CleanupPluginDirectory(r string) error {
55 return m.cleanupPluginDirectory(r)
56 }
57
58 func (m *mockPluginManager) PluginConnected(r string, p plugin.DevicePlugin) error {
59 return m.pluginConnected(r, p)
60 }
61
62 func (m *mockPluginManager) PluginDisconnected(r string) {
63 m.pluginDisconnected(r)
64 }
65
66 func (m *mockPluginManager) PluginListAndWatchReceiver(r string, lr *pluginapi.ListAndWatchResponse) {
67 m.pluginListAndWatchReceiver(r, lr)
68 }
69
70 func esocketName() string {
71 return fmt.Sprintf("mock%d.sock", time.Now().UnixNano())
72 }
73
74 func TestNewEndpoint(t *testing.T) {
75 socket := filepath.Join(os.TempDir(), esocketName())
76
77 devs := []*pluginapi.Device{
78 {ID: "ADeviceId", Health: pluginapi.Healthy},
79 }
80
81 p, e := esetup(t, devs, socket, "mock", func(n string, d []pluginapi.Device) {})
82 defer ecleanup(t, p, e)
83 }
84
85 func TestRun(t *testing.T) {
86 socket := filepath.Join(os.TempDir(), esocketName())
87
88 devs := []*pluginapi.Device{
89 {ID: "ADeviceId", Health: pluginapi.Healthy},
90 {ID: "AnotherDeviceId", Health: pluginapi.Healthy},
91 {ID: "AThirdDeviceId", Health: pluginapi.Unhealthy},
92 }
93
94 updated := []*pluginapi.Device{
95 {ID: "ADeviceId", Health: pluginapi.Unhealthy},
96 {ID: "AThirdDeviceId", Health: pluginapi.Healthy},
97 {ID: "AFourthDeviceId", Health: pluginapi.Healthy},
98 }
99
100 callbackCount := 0
101 callbackChan := make(chan int)
102 callback := func(n string, devices []pluginapi.Device) {
103
104
105 if callbackCount > 2 {
106 t.FailNow()
107 }
108
109
110 if callbackCount == 0 {
111 require.Len(t, devices, 3)
112 require.Equal(t, devices[0].ID, devs[0].ID)
113 require.Equal(t, devices[1].ID, devs[1].ID)
114 require.Equal(t, devices[2].ID, devs[2].ID)
115 require.Equal(t, devices[0].Health, devs[0].Health)
116 require.Equal(t, devices[1].Health, devs[1].Health)
117 require.Equal(t, devices[2].Health, devs[2].Health)
118 }
119
120
121 if callbackCount == 1 {
122 require.Len(t, devices, 3)
123 require.Equal(t, devices[0].ID, updated[0].ID)
124 require.Equal(t, devices[1].ID, updated[1].ID)
125 require.Equal(t, devices[2].ID, updated[2].ID)
126 require.Equal(t, devices[0].Health, updated[0].Health)
127 require.Equal(t, devices[1].Health, updated[1].Health)
128 require.Equal(t, devices[2].Health, updated[2].Health)
129 }
130
131 callbackCount++
132 callbackChan <- callbackCount
133 }
134
135 p, e := esetup(t, devs, socket, "mock", callback)
136 defer ecleanup(t, p, e)
137
138 go e.client.Run()
139
140 <-callbackChan
141
142 p.Update(updated)
143
144
145 <-callbackChan
146
147 require.Equal(t, callbackCount, 2)
148 }
149
150 func TestAllocate(t *testing.T) {
151 socket := filepath.Join(os.TempDir(), esocketName())
152 devs := []*pluginapi.Device{
153 {ID: "ADeviceId", Health: pluginapi.Healthy},
154 }
155 callbackCount := 0
156 callbackChan := make(chan int)
157 p, e := esetup(t, devs, socket, "mock", func(n string, d []pluginapi.Device) {
158 callbackCount++
159 callbackChan <- callbackCount
160 })
161 defer ecleanup(t, p, e)
162
163 resp := new(pluginapi.AllocateResponse)
164 contResp := new(pluginapi.ContainerAllocateResponse)
165 contResp.Devices = append(contResp.Devices, &pluginapi.DeviceSpec{
166 ContainerPath: "/dev/aaa",
167 HostPath: "/dev/aaa",
168 Permissions: "mrw",
169 })
170
171 contResp.Devices = append(contResp.Devices, &pluginapi.DeviceSpec{
172 ContainerPath: "/dev/bbb",
173 HostPath: "/dev/bbb",
174 Permissions: "mrw",
175 })
176
177 contResp.Mounts = append(contResp.Mounts, &pluginapi.Mount{
178 ContainerPath: "/container_dir1/file1",
179 HostPath: "host_dir1/file1",
180 ReadOnly: true,
181 })
182
183 resp.ContainerResponses = append(resp.ContainerResponses, contResp)
184
185 p.SetAllocFunc(func(r *pluginapi.AllocateRequest, devs map[string]pluginapi.Device) (*pluginapi.AllocateResponse, error) {
186 return resp, nil
187 })
188
189 go e.client.Run()
190
191 select {
192 case <-callbackChan:
193 break
194 case <-time.After(time.Second):
195 t.FailNow()
196 }
197
198 respOut, err := e.allocate([]string{"ADeviceId"})
199 require.NoError(t, err)
200 require.Equal(t, resp, respOut)
201 }
202
203 func TestGetPreferredAllocation(t *testing.T) {
204 socket := filepath.Join(os.TempDir(), esocketName())
205 callbackCount := 0
206 callbackChan := make(chan int)
207 p, e := esetup(t, []*pluginapi.Device{}, socket, "mock", func(n string, d []pluginapi.Device) {
208 callbackCount++
209 callbackChan <- callbackCount
210 })
211 defer ecleanup(t, p, e)
212
213 resp := &pluginapi.PreferredAllocationResponse{
214 ContainerResponses: []*pluginapi.ContainerPreferredAllocationResponse{
215 {DeviceIDs: []string{"device0", "device1", "device2"}},
216 },
217 }
218
219 p.SetGetPreferredAllocFunc(func(r *pluginapi.PreferredAllocationRequest, devs map[string]pluginapi.Device) (*pluginapi.PreferredAllocationResponse, error) {
220 return resp, nil
221 })
222
223 go e.client.Run()
224
225 select {
226 case <-callbackChan:
227 break
228 case <-time.After(time.Second):
229 t.FailNow()
230 }
231
232 respOut, err := e.getPreferredAllocation([]string{}, []string{}, -1)
233 require.NoError(t, err)
234 require.Equal(t, resp, respOut)
235 }
236
237 func esetup(t *testing.T, devs []*pluginapi.Device, socket, resourceName string, callback monitorCallback) (*plugin.Stub, *endpointImpl) {
238 m := newMockPluginManager()
239
240 m.pluginListAndWatchReceiver = func(r string, resp *pluginapi.ListAndWatchResponse) {
241 var newDevs []pluginapi.Device
242 for _, d := range resp.Devices {
243 newDevs = append(newDevs, *d)
244 }
245 callback(resourceName, newDevs)
246 }
247
248 var dp plugin.DevicePlugin
249 var wg sync.WaitGroup
250 wg.Add(1)
251 m.pluginConnected = func(r string, c plugin.DevicePlugin) error {
252 dp = c
253 wg.Done()
254 return nil
255 }
256
257 p := plugin.NewDevicePluginStub(devs, socket, resourceName, false, false)
258 err := p.Start()
259 require.NoError(t, err)
260
261 c := plugin.NewPluginClient(resourceName, socket, m)
262 err = c.Connect()
263 require.NoError(t, err)
264
265 wg.Wait()
266
267 e := newEndpointImpl(dp)
268 e.client = c
269
270 m.pluginDisconnected = func(r string) {
271 e.setStopTime(time.Now())
272 }
273
274 return p, e
275 }
276
277 func ecleanup(t *testing.T, p *plugin.Stub, e *endpointImpl) {
278 p.Stop()
279 e.client.Disconnect()
280 }
281
View as plain text