1
16
17 package plugin
18
19 import (
20 "context"
21 "errors"
22 "fmt"
23 "io"
24 "sync"
25 "time"
26
27 "github.com/google/go-cmp/cmp"
28 "google.golang.org/grpc/codes"
29 "google.golang.org/grpc/status"
30
31 v1 "k8s.io/api/core/v1"
32 resourceapi "k8s.io/api/resource/v1alpha2"
33 apiequality "k8s.io/apimachinery/pkg/api/equality"
34 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
35 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
36 "k8s.io/apimachinery/pkg/util/sets"
37 resourceinformers "k8s.io/client-go/informers/resource/v1alpha2"
38 "k8s.io/client-go/kubernetes"
39 "k8s.io/client-go/tools/cache"
40 "k8s.io/client-go/util/workqueue"
41 "k8s.io/klog/v2"
42 drapb "k8s.io/kubelet/pkg/apis/dra/v1alpha3"
43 "k8s.io/utils/ptr"
44 )
45
46 const (
47
48
49 resyncPeriod = time.Duration(10 * time.Minute)
50 )
51
52
53
54 type nodeResourcesController struct {
55 ctx context.Context
56 kubeClient kubernetes.Interface
57 getNode func() (*v1.Node, error)
58 wg sync.WaitGroup
59 queue workqueue.RateLimitingInterface
60 sliceStore cache.Store
61
62 mutex sync.RWMutex
63 activePlugins map[string]*activePlugin
64 }
65
66
67
68
69
70 type activePlugin struct {
71
72
73 cancel func(reason error)
74
75
76
77
78
79 resources []*resourceapi.ResourceModel
80 }
81
82
83
84
85
86
87
88
89 func startNodeResourcesController(ctx context.Context, kubeClient kubernetes.Interface, getNode func() (*v1.Node, error)) *nodeResourcesController {
90 if kubeClient == nil {
91 return nil
92 }
93
94 logger := klog.FromContext(ctx)
95 logger = klog.LoggerWithName(logger, "node resources controller")
96 ctx = klog.NewContext(ctx, logger)
97
98 c := &nodeResourcesController{
99 ctx: ctx,
100 kubeClient: kubeClient,
101 getNode: getNode,
102 queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "node_resource_slices"),
103 activePlugins: make(map[string]*activePlugin),
104 }
105
106 c.wg.Add(1)
107 go func() {
108 defer c.wg.Done()
109 c.run(ctx)
110 }()
111
112 return c
113 }
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130 func (c *nodeResourcesController) addPlugin(driverName string, pluginInstance *plugin) {
131 if c == nil {
132 return
133 }
134
135 klog.FromContext(c.ctx).V(2).Info("Adding plugin", "driverName", driverName)
136 c.mutex.Lock()
137 defer c.mutex.Unlock()
138
139 if active := c.activePlugins[driverName]; active != nil {
140 active.cancel(errors.New("plugin has re-registered"))
141 }
142 active := &activePlugin{}
143 cancelCtx, cancel := context.WithCancelCause(c.ctx)
144 active.cancel = cancel
145 c.activePlugins[driverName] = active
146 c.queue.Add(driverName)
147
148 c.wg.Add(1)
149 go func() {
150 defer c.wg.Done()
151 c.monitorPlugin(cancelCtx, active, driverName, pluginInstance)
152 }()
153 }
154
155
156 func (c *nodeResourcesController) removePlugin(driverName string) {
157 if c == nil {
158 return
159 }
160
161 klog.FromContext(c.ctx).V(2).Info("Removing plugin", "driverName", driverName)
162 c.mutex.Lock()
163 defer c.mutex.Unlock()
164 if active, ok := c.activePlugins[driverName]; ok {
165 active.cancel(errors.New("plugin has unregistered"))
166 delete(c.activePlugins, driverName)
167 c.queue.Add(driverName)
168 }
169 }
170
171
172
173
174
175
176 func (c *nodeResourcesController) monitorPlugin(ctx context.Context, active *activePlugin, driverName string, pluginInstance *plugin) {
177 logger := klog.FromContext(ctx)
178 logger = klog.LoggerWithValues(logger, "driverName", driverName)
179 logger.Info("Starting to monitor node resources of the plugin")
180 defer func() {
181 r := recover()
182 logger.Info("Stopping to monitor node resources of the plugin", "reason", context.Cause(ctx), "err", ctx.Err(), "recover", r)
183 }()
184
185
186 for ctx.Err() == nil {
187 logger.V(5).Info("Calling NodeListAndWatchResources")
188 stream, err := pluginInstance.NodeListAndWatchResources(ctx, new(drapb.NodeListAndWatchResourcesRequest))
189 if err != nil {
190 switch {
191 case status.Convert(err).Code() == codes.Unimplemented:
192
193 active.cancel(errors.New("plugin does not support node resource reporting"))
194 default:
195
196 logger.Error(err, "Creating gRPC stream for node resources failed")
197
198 select {
199 case <-time.After(5 * time.Second):
200 case <-ctx.Done():
201 }
202 }
203 continue
204 }
205 for {
206 response, err := stream.Recv()
207 if err != nil {
208 switch {
209 case errors.Is(err, io.EOF):
210
211
212 active.cancel(errors.New("plugin has closed the stream"))
213 case status.Convert(err).Code() == codes.Unimplemented:
214
215 active.cancel(errors.New("plugin does not support node resource reporting"))
216 case ctx.Err() == nil:
217
218 logger.Error(err, "Reading node resources from gRPC stream failed")
219
220 select {
221 case <-time.After(5 * time.Second):
222 case <-ctx.Done():
223 }
224 }
225 break
226 }
227
228 if loggerV := logger.V(6); loggerV.Enabled() {
229 loggerV.Info("Driver resources updated", "resources", response.Resources)
230 } else {
231 logger.V(5).Info("Driver resources updated", "numResources", len(response.Resources))
232 }
233
234 c.mutex.Lock()
235 active.resources = response.Resources
236 c.mutex.Unlock()
237 c.queue.Add(driverName)
238 }
239 }
240 }
241
242
243
244 func (c *nodeResourcesController) run(ctx context.Context) {
245 logger := klog.FromContext(ctx)
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262 var node *v1.Node
263 var err error
264 for {
265 node, err = c.getNode()
266 if err == nil {
267 break
268 }
269 logger.V(5).Info("Getting Node object failed, waiting", "err", err)
270 select {
271 case <-ctx.Done():
272 return
273 case <-time.After(time.Second):
274 }
275 }
276
277
278 informer := resourceinformers.NewFilteredResourceSliceInformer(c.kubeClient, resyncPeriod, nil, func(options *metav1.ListOptions) {
279 options.FieldSelector = "nodeName=" + node.Name
280 })
281 c.sliceStore = informer.GetStore()
282 handler, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
283 AddFunc: func(obj any) {
284 slice, ok := obj.(*resourceapi.ResourceSlice)
285 if !ok {
286 return
287 }
288 logger.V(5).Info("ResourceSlice add", "slice", klog.KObj(slice))
289 c.queue.Add(slice.DriverName)
290 },
291 UpdateFunc: func(old, new any) {
292 oldSlice, ok := old.(*resourceapi.ResourceSlice)
293 if !ok {
294 return
295 }
296 newSlice, ok := new.(*resourceapi.ResourceSlice)
297 if !ok {
298 return
299 }
300 if loggerV := logger.V(6); loggerV.Enabled() {
301 loggerV.Info("ResourceSlice update", "slice", klog.KObj(newSlice), "diff", cmp.Diff(oldSlice, newSlice))
302 } else {
303 logger.V(5).Info("ResourceSlice update", "slice", klog.KObj(newSlice))
304 }
305 c.queue.Add(newSlice.DriverName)
306 },
307 DeleteFunc: func(obj any) {
308 if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok {
309 obj = tombstone.Obj
310 }
311 slice, ok := obj.(*resourceapi.ResourceSlice)
312 if !ok {
313 return
314 }
315 logger.V(5).Info("ResourceSlice delete", "slice", klog.KObj(slice))
316 c.queue.Add(slice.DriverName)
317 },
318 })
319 if err != nil {
320 logger.Error(err, "Registering event handler on the ResourceSlice informer failed, disabling resource monitoring")
321 return
322 }
323
324
325 c.wg.Add(1)
326 go func() {
327 defer c.wg.Done()
328 informer.Run(ctx.Done())
329 }()
330 for !handler.HasSynced() {
331 select {
332 case <-time.After(time.Second):
333 case <-ctx.Done():
334 return
335 }
336 }
337 logger.Info("ResourceSlice informer has synced")
338
339 for c.processNextWorkItem(ctx) {
340 }
341 }
342
343 func (c *nodeResourcesController) processNextWorkItem(ctx context.Context) bool {
344 key, shutdown := c.queue.Get()
345 if shutdown {
346 return false
347 }
348 defer c.queue.Done(key)
349
350 driverName := key.(string)
351
352
353 var err error
354 func() {
355 defer func() {
356 if r := recover(); r != nil {
357 err = fmt.Errorf("internal error: %v", r)
358 }
359 }()
360 err = c.sync(ctx, driverName)
361 }()
362
363 if err != nil {
364
365 utilruntime.HandleError(fmt.Errorf("processing driver %v: %v", driverName, err))
366 c.queue.AddRateLimited(key)
367
368
369
370 return true
371 }
372
373 c.queue.Forget(key)
374 return true
375 }
376
377 func (c *nodeResourcesController) sync(ctx context.Context, driverName string) error {
378 logger := klog.FromContext(ctx)
379
380
381 slices := c.sliceStore.List()
382 var driverResources []*resourceapi.ResourceModel
383 c.mutex.RLock()
384 if active, ok := c.activePlugins[driverName]; ok {
385
386 driverResources = active.resources
387 }
388 c.mutex.RUnlock()
389
390
391
392 storedResourceIndices := sets.New[int]()
393
394
395
396 obsoleteSlices := make([]*resourceapi.ResourceSlice, 0, len(slices))
397
398
399 for _, obj := range slices {
400 slice := obj.(*resourceapi.ResourceSlice)
401 if slice.DriverName != driverName {
402 continue
403 }
404
405 index := indexOfModel(driverResources, &slice.ResourceModel)
406 if index >= 0 {
407 storedResourceIndices.Insert(index)
408 continue
409 }
410
411 obsoleteSlices = append(obsoleteSlices, slice)
412 }
413
414 if loggerV := logger.V(6); loggerV.Enabled() {
415
416 loggerV.Info("Syncing existing driver node resource slices with driver resources", "slices", klog.KObjSlice(slices), "resources", driverResources)
417 } else {
418 logger.V(5).Info("Syncing existing driver node resource slices with driver resources", "slices", klog.KObjSlice(slices), "numResources", len(driverResources))
419 }
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438 numObsoleteSlices := len(obsoleteSlices)
439 for index, resource := range driverResources {
440 if storedResourceIndices.Has(index) {
441
442
443 continue
444 }
445
446 if numObsoleteSlices > 0 {
447
448 slice := obsoleteSlices[numObsoleteSlices-1]
449 numObsoleteSlices--
450 slice = slice.DeepCopy()
451 slice.ResourceModel = *resource
452 logger.V(5).Info("Reusing existing node resource slice", "slice", klog.KObj(slice))
453 if _, err := c.kubeClient.ResourceV1alpha2().ResourceSlices().Update(ctx, slice, metav1.UpdateOptions{}); err != nil {
454 return fmt.Errorf("update node resource slice: %w", err)
455 }
456 continue
457 }
458
459
460
461
462 node, err := c.getNode()
463 if err != nil {
464 return fmt.Errorf("retrieve node object: %w", err)
465 }
466
467
468 slice := &resourceapi.ResourceSlice{
469 ObjectMeta: metav1.ObjectMeta{
470 GenerateName: node.Name + "-" + driverName + "-",
471 OwnerReferences: []metav1.OwnerReference{
472 {
473 APIVersion: v1.SchemeGroupVersion.WithKind("Node").Version,
474 Kind: v1.SchemeGroupVersion.WithKind("Node").Kind,
475 Name: node.Name,
476 UID: node.UID,
477 Controller: ptr.To(true),
478 },
479 },
480 },
481 NodeName: node.Name,
482 DriverName: driverName,
483 ResourceModel: *resource,
484 }
485 logger.V(5).Info("Creating new node resource slice", "slice", klog.KObj(slice))
486 if _, err := c.kubeClient.ResourceV1alpha2().ResourceSlices().Create(ctx, slice, metav1.CreateOptions{}); err != nil {
487 return fmt.Errorf("create node resource slice: %w", err)
488 }
489 }
490
491
492 for i := 0; i < numObsoleteSlices; i++ {
493 slice := obsoleteSlices[i]
494 logger.V(5).Info("Deleting obsolete node resource slice", "slice", klog.KObj(slice))
495 if err := c.kubeClient.ResourceV1alpha2().ResourceSlices().Delete(ctx, slice.Name, metav1.DeleteOptions{}); err != nil {
496 return fmt.Errorf("delete node resource slice: %w", err)
497 }
498 }
499
500 return nil
501 }
502
503 func indexOfModel(models []*resourceapi.ResourceModel, model *resourceapi.ResourceModel) int {
504 for index, m := range models {
505 if apiequality.Semantic.DeepEqual(m, model) {
506 return index
507 }
508 }
509 return -1
510 }
511
View as plain text