1
16
17 package plugin
18
19 import (
20 "context"
21 "fmt"
22 "net"
23 "os"
24 "path/filepath"
25 "sync"
26 "testing"
27
28 "github.com/stretchr/testify/assert"
29 "google.golang.org/grpc"
30 drapbv1alpha2 "k8s.io/kubelet/pkg/apis/dra/v1alpha2"
31 drapbv1alpha3 "k8s.io/kubelet/pkg/apis/dra/v1alpha3"
32 )
33
34 type fakeV1alpha3GRPCServer struct {
35 drapbv1alpha3.UnimplementedNodeServer
36 }
37
38 func (f *fakeV1alpha3GRPCServer) NodePrepareResource(ctx context.Context, in *drapbv1alpha3.NodePrepareResourcesRequest) (*drapbv1alpha3.NodePrepareResourcesResponse, error) {
39 return &drapbv1alpha3.NodePrepareResourcesResponse{Claims: map[string]*drapbv1alpha3.NodePrepareResourceResponse{"dummy": {CDIDevices: []string{"dummy"}}}}, nil
40 }
41
42 func (f *fakeV1alpha3GRPCServer) NodeUnprepareResource(ctx context.Context, in *drapbv1alpha3.NodeUnprepareResourcesRequest) (*drapbv1alpha3.NodeUnprepareResourcesResponse, error) {
43 return &drapbv1alpha3.NodeUnprepareResourcesResponse{}, nil
44 }
45
46 func (f *fakeV1alpha3GRPCServer) NodeListAndWatchResources(req *drapbv1alpha3.NodeListAndWatchResourcesRequest, srv drapbv1alpha3.Node_NodeListAndWatchResourcesServer) error {
47 if err := srv.Send(&drapbv1alpha3.NodeListAndWatchResourcesResponse{}); err != nil {
48 return err
49 }
50 if err := srv.Send(&drapbv1alpha3.NodeListAndWatchResourcesResponse{}); err != nil {
51 return err
52 }
53 return nil
54 }
55
56 type fakeV1alpha2GRPCServer struct {
57 drapbv1alpha2.UnimplementedNodeServer
58 }
59
60 func (f *fakeV1alpha2GRPCServer) NodePrepareResource(ctx context.Context, in *drapbv1alpha2.NodePrepareResourceRequest) (*drapbv1alpha2.NodePrepareResourceResponse, error) {
61 return &drapbv1alpha2.NodePrepareResourceResponse{CdiDevices: []string{"dummy"}}, nil
62 }
63
64 func (f *fakeV1alpha2GRPCServer) NodeUnprepareResource(ctx context.Context, in *drapbv1alpha2.NodeUnprepareResourceRequest) (*drapbv1alpha2.NodeUnprepareResourceResponse, error) {
65 return &drapbv1alpha2.NodeUnprepareResourceResponse{}, nil
66 }
67
68 type tearDown func()
69
70 func setupFakeGRPCServer(version string) (string, tearDown, error) {
71 p, err := os.MkdirTemp("", "dra_plugin")
72 if err != nil {
73 return "", nil, err
74 }
75
76 closeCh := make(chan struct{})
77 addr := filepath.Join(p, "server.sock")
78 teardown := func() {
79 close(closeCh)
80 os.RemoveAll(addr)
81 }
82
83 listener, err := net.Listen("unix", addr)
84 if err != nil {
85 teardown()
86 return "", nil, err
87 }
88
89 s := grpc.NewServer()
90 switch version {
91 case v1alpha2Version:
92 fakeGRPCServer := &fakeV1alpha2GRPCServer{}
93 drapbv1alpha2.RegisterNodeServer(s, fakeGRPCServer)
94 case v1alpha3Version:
95 fakeGRPCServer := &fakeV1alpha3GRPCServer{}
96 drapbv1alpha3.RegisterNodeServer(s, fakeGRPCServer)
97 default:
98 return "", nil, fmt.Errorf("unsupported version: %s", version)
99 }
100
101 go func() {
102 go s.Serve(listener)
103 <-closeCh
104 s.GracefulStop()
105 }()
106
107 return addr, teardown, nil
108 }
109
110 func TestGRPCConnIsReused(t *testing.T) {
111 addr, teardown, err := setupFakeGRPCServer(v1alpha3Version)
112 if err != nil {
113 t.Fatal(err)
114 }
115 defer teardown()
116
117 reusedConns := make(map[*grpc.ClientConn]int)
118 wg := sync.WaitGroup{}
119 m := sync.Mutex{}
120
121 p := &plugin{
122 endpoint: addr,
123 version: v1alpha3Version,
124 }
125
126 conn, err := p.getOrCreateGRPCConn()
127 defer func() {
128 err := conn.Close()
129 if err != nil {
130 t.Error(err)
131 }
132 }()
133 if err != nil {
134 t.Fatal(err)
135 }
136
137
138 draPlugins.add("dummy-plugin", p)
139 defer draPlugins.delete("dummy-plugin")
140
141
142 for i := 0; i < 2; i++ {
143 wg.Add(1)
144 go func() {
145 defer wg.Done()
146 client, err := NewDRAPluginClient("dummy-plugin")
147 if err != nil {
148 t.Error(err)
149 return
150 }
151
152 req := &drapbv1alpha3.NodePrepareResourcesRequest{
153 Claims: []*drapbv1alpha3.Claim{
154 {
155 Namespace: "dummy-namespace",
156 Uid: "dummy-uid",
157 Name: "dummy-claim",
158 ResourceHandle: "dummy-resource",
159 },
160 },
161 }
162 client.NodePrepareResources(context.TODO(), req)
163
164 client.(*plugin).Lock()
165 conn := client.(*plugin).conn
166 client.(*plugin).Unlock()
167
168 m.Lock()
169 defer m.Unlock()
170 reusedConns[conn]++
171 }()
172 }
173
174 wg.Wait()
175
176 if len(reusedConns) != 1 {
177 t.Errorf("expected length to be 1 but got %d", len(reusedConns))
178 }
179 if counter, ok := reusedConns[conn]; ok && counter != 2 {
180 t.Errorf("expected counter to be 2 but got %d", counter)
181 }
182 }
183
184 func TestNewDRAPluginClient(t *testing.T) {
185 for _, test := range []struct {
186 description string
187 setup func(string) tearDown
188 pluginName string
189 shouldError bool
190 }{
191 {
192 description: "plugin name is empty",
193 setup: func(_ string) tearDown {
194 return func() {}
195 },
196 pluginName: "",
197 shouldError: true,
198 },
199 {
200 description: "plugin name not found in the list",
201 setup: func(_ string) tearDown {
202 return func() {}
203 },
204 pluginName: "plugin-name-not-found-in-the-list",
205 shouldError: true,
206 },
207 {
208 description: "plugin exists",
209 setup: func(name string) tearDown {
210 draPlugins.add(name, &plugin{})
211 return func() {
212 draPlugins.delete(name)
213 }
214 },
215 pluginName: "dummy-plugin",
216 },
217 } {
218 t.Run(test.description, func(t *testing.T) {
219 teardown := test.setup(test.pluginName)
220 defer teardown()
221
222 client, err := NewDRAPluginClient(test.pluginName)
223 if test.shouldError {
224 assert.Nil(t, client)
225 assert.Error(t, err)
226 } else {
227 assert.NotNil(t, client)
228 assert.Nil(t, err)
229 }
230 })
231 }
232 }
233
234 func TestNodeUnprepareResource(t *testing.T) {
235 for _, test := range []struct {
236 description string
237 serverSetup func(string) (string, tearDown, error)
238 serverVersion string
239 request *drapbv1alpha3.NodeUnprepareResourcesRequest
240 }{
241 {
242 description: "server supports v1alpha3",
243 serverSetup: setupFakeGRPCServer,
244 serverVersion: v1alpha3Version,
245 request: &drapbv1alpha3.NodeUnprepareResourcesRequest{},
246 },
247 {
248 description: "server supports v1alpha2, plugin client should fallback",
249 serverSetup: setupFakeGRPCServer,
250 serverVersion: v1alpha2Version,
251 request: &drapbv1alpha3.NodeUnprepareResourcesRequest{
252 Claims: []*drapbv1alpha3.Claim{
253 {
254 Namespace: "dummy-namespace",
255 Uid: "dummy-uid",
256 Name: "dummy-claim",
257 ResourceHandle: "dummy-resource",
258 },
259 },
260 },
261 },
262 } {
263 t.Run(test.description, func(t *testing.T) {
264 addr, teardown, err := setupFakeGRPCServer(test.serverVersion)
265 if err != nil {
266 t.Fatal(err)
267 }
268 defer teardown()
269
270 p := &plugin{
271 endpoint: addr,
272 version: v1alpha3Version,
273 clientTimeout: PluginClientTimeout,
274 }
275
276 conn, err := p.getOrCreateGRPCConn()
277 defer func() {
278 err := conn.Close()
279 if err != nil {
280 t.Error(err)
281 }
282 }()
283 if err != nil {
284 t.Fatal(err)
285 }
286
287 draPlugins.add("dummy-plugin", p)
288 defer draPlugins.delete("dummy-plugin")
289
290 client, err := NewDRAPluginClient("dummy-plugin")
291 if err != nil {
292 t.Fatal(err)
293 }
294
295 _, err = client.NodeUnprepareResources(context.TODO(), test.request)
296 if err != nil {
297 t.Fatal(err)
298 }
299 })
300 }
301 }
302
303 func TestListAndWatchResources(t *testing.T) {
304 for _, test := range []struct {
305 description string
306 serverSetup func(string) (string, tearDown, error)
307 serverVersion string
308 request *drapbv1alpha3.NodeListAndWatchResourcesRequest
309 responses []*drapbv1alpha3.NodeListAndWatchResourcesResponse
310 expectError string
311 }{
312 {
313 description: "server supports NodeResources API",
314 serverSetup: setupFakeGRPCServer,
315 serverVersion: v1alpha3Version,
316 request: &drapbv1alpha3.NodeListAndWatchResourcesRequest{},
317 responses: []*drapbv1alpha3.NodeListAndWatchResourcesResponse{
318 {},
319 {},
320 },
321 expectError: "EOF",
322 },
323 {
324 description: "server doesn't support NodeResources API",
325 serverSetup: setupFakeGRPCServer,
326 serverVersion: v1alpha2Version,
327 request: new(drapbv1alpha3.NodeListAndWatchResourcesRequest),
328 expectError: "Unimplemented",
329 },
330 } {
331 t.Run(test.description, func(t *testing.T) {
332 addr, teardown, err := setupFakeGRPCServer(test.serverVersion)
333 if err != nil {
334 t.Fatal(err)
335 }
336 defer teardown()
337
338 p := &plugin{
339 endpoint: addr,
340 version: v1alpha3Version,
341 }
342
343 conn, err := p.getOrCreateGRPCConn()
344 defer func() {
345 err := conn.Close()
346 if err != nil {
347 t.Error(err)
348 }
349 }()
350 if err != nil {
351 t.Fatal(err)
352 }
353
354 draPlugins.add("dummy-plugin", p)
355 defer draPlugins.delete("dummy-plugin")
356
357 client, err := NewDRAPluginClient("dummy-plugin")
358 if err != nil {
359 t.Fatal(err)
360 }
361
362 stream, err := client.NodeListAndWatchResources(context.Background(), test.request)
363 if err != nil {
364 t.Fatal(err)
365 }
366 var actualResponses []*drapbv1alpha3.NodeListAndWatchResourcesResponse
367 var actualErr error
368 for {
369 resp, err := stream.Recv()
370 if err != nil {
371 actualErr = err
372 break
373 }
374 actualResponses = append(actualResponses, resp)
375 }
376 assert.Equal(t, test.responses, actualResponses)
377 assert.Contains(t, actualErr.Error(), test.expectError)
378 })
379 }
380 }
381
View as plain text