1
16
17
18
19 package nodeinfomanager
20
21 import (
22 "context"
23 "encoding/json"
24 goerrors "errors"
25 "fmt"
26 "math"
27 "strings"
28 "sync"
29
30 "time"
31
32 v1 "k8s.io/api/core/v1"
33 storagev1 "k8s.io/api/storage/v1"
34 "k8s.io/apimachinery/pkg/api/errors"
35 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
36 "k8s.io/apimachinery/pkg/types"
37 utilerrors "k8s.io/apimachinery/pkg/util/errors"
38 "k8s.io/apimachinery/pkg/util/sets"
39 "k8s.io/apimachinery/pkg/util/wait"
40 clientset "k8s.io/client-go/kubernetes"
41 nodeutil "k8s.io/component-helpers/node/util"
42 "k8s.io/klog/v2"
43 "k8s.io/kubernetes/pkg/volume"
44 "k8s.io/kubernetes/pkg/volume/util"
45 )
46
47 const (
48
49 annotationKeyNodeID = "csi.volume.kubernetes.io/nodeid"
50 )
51
52 var (
53 nodeKind = v1.SchemeGroupVersion.WithKind("Node")
54 updateBackoff = wait.Backoff{
55 Steps: 4,
56 Duration: 10 * time.Millisecond,
57 Factor: 5.0,
58 Jitter: 0.1,
59 }
60 )
61
62
63
64 type nodeInfoManager struct {
65 nodeName types.NodeName
66 volumeHost volume.VolumeHost
67 migratedPlugins map[string](func() bool)
68
69 lock sync.Mutex
70 }
71
72
73 type nodeUpdateFunc func(*v1.Node) (newNode *v1.Node, updated bool, err error)
74
75
76 type Interface interface {
77 CreateCSINode() (*storagev1.CSINode, error)
78
79
80 InitializeCSINodeWithAnnotation() error
81
82
83
84
85 InstallCSIDriver(driverName string, driverNodeID string, maxVolumeLimit int64, topology map[string]string) error
86
87
88
89
90 UninstallCSIDriver(driverName string) error
91 }
92
93
94 func NewNodeInfoManager(
95 nodeName types.NodeName,
96 volumeHost volume.VolumeHost,
97 migratedPlugins map[string](func() bool)) Interface {
98 return &nodeInfoManager{
99 nodeName: nodeName,
100 volumeHost: volumeHost,
101 migratedPlugins: migratedPlugins,
102 }
103 }
104
105
106
107
108
109 func (nim *nodeInfoManager) InstallCSIDriver(driverName string, driverNodeID string, maxAttachLimit int64, topology map[string]string) error {
110 if driverNodeID == "" {
111 return fmt.Errorf("error adding CSI driver node info: driverNodeID must not be empty")
112 }
113
114 nodeUpdateFuncs := []nodeUpdateFunc{
115 updateNodeIDInNode(driverName, driverNodeID),
116 updateTopologyLabels(topology),
117 }
118
119 err := nim.updateNode(nodeUpdateFuncs...)
120 if err != nil {
121 return fmt.Errorf("error updating Node object with CSI driver node info: %v", err)
122 }
123
124 err = nim.updateCSINode(driverName, driverNodeID, maxAttachLimit, topology)
125 if err != nil {
126 return fmt.Errorf("error updating CSINode object with CSI driver node info: %v", err)
127 }
128
129 return nil
130 }
131
132
133
134
135
136 func (nim *nodeInfoManager) UninstallCSIDriver(driverName string) error {
137 err := nim.uninstallDriverFromCSINode(driverName)
138 if err != nil {
139 return fmt.Errorf("error uninstalling CSI driver from CSINode object %v", err)
140 }
141
142 err = nim.updateNode(
143 removeMaxAttachLimit(driverName),
144 removeNodeIDFromNode(driverName),
145 )
146 if err != nil {
147 return fmt.Errorf("error removing CSI driver node info from Node object %v", err)
148 }
149 return nil
150 }
151
152 func (nim *nodeInfoManager) updateNode(updateFuncs ...nodeUpdateFunc) error {
153 var updateErrs []error
154 err := wait.ExponentialBackoff(updateBackoff, func() (bool, error) {
155 if err := nim.tryUpdateNode(updateFuncs...); err != nil {
156 updateErrs = append(updateErrs, err)
157 return false, nil
158 }
159 return true, nil
160 })
161 if err != nil {
162 return fmt.Errorf("error updating node: %v; caused by: %v", err, utilerrors.NewAggregate(updateErrs))
163 }
164 return nil
165 }
166
167
168
169
170
171
172 func (nim *nodeInfoManager) tryUpdateNode(updateFuncs ...nodeUpdateFunc) error {
173 nim.lock.Lock()
174 defer nim.lock.Unlock()
175
176
177
178
179 kubeClient := nim.volumeHost.GetKubeClient()
180 if kubeClient == nil {
181 return fmt.Errorf("error getting kube client")
182 }
183
184 nodeClient := kubeClient.CoreV1().Nodes()
185 originalNode, err := nodeClient.Get(context.TODO(), string(nim.nodeName), metav1.GetOptions{})
186 if err != nil {
187 return err
188 }
189 node := originalNode.DeepCopy()
190
191 needUpdate := false
192 for _, update := range updateFuncs {
193 newNode, updated, err := update(node)
194 if err != nil {
195 return err
196 }
197 node = newNode
198 needUpdate = needUpdate || updated
199 }
200
201 if needUpdate {
202
203
204 _, _, updateErr := nodeutil.PatchNodeStatus(kubeClient.CoreV1(), types.NodeName(node.Name), originalNode, node)
205 return updateErr
206 }
207
208 return nil
209 }
210
211
212 func buildNodeIDMapFromAnnotation(node *v1.Node) (map[string]string, error) {
213 var previousAnnotationValue string
214 if node.ObjectMeta.Annotations != nil {
215 previousAnnotationValue =
216 node.ObjectMeta.Annotations[annotationKeyNodeID]
217 }
218
219 var existingDriverMap map[string]string
220 if previousAnnotationValue != "" {
221
222 if err := json.Unmarshal([]byte(previousAnnotationValue), &existingDriverMap); err != nil {
223 return nil, fmt.Errorf(
224 "failed to parse node's %q annotation value (%q) err=%v",
225 annotationKeyNodeID,
226 previousAnnotationValue,
227 err)
228 }
229 }
230
231 if existingDriverMap == nil {
232 return make(map[string]string), nil
233 }
234 return existingDriverMap, nil
235 }
236
237
238
239 func updateNodeIDInNode(
240 csiDriverName string,
241 csiDriverNodeID string) nodeUpdateFunc {
242 return func(node *v1.Node) (*v1.Node, bool, error) {
243 existingDriverMap, err := buildNodeIDMapFromAnnotation(node)
244 if err != nil {
245 return nil, false, err
246 }
247
248 if val, ok := existingDriverMap[csiDriverName]; ok {
249 if val == csiDriverNodeID {
250
251 return node, false, nil
252 }
253 }
254
255
256 existingDriverMap[csiDriverName] = csiDriverNodeID
257 jsonObj, err := json.Marshal(existingDriverMap)
258 if err != nil {
259 return nil, false, fmt.Errorf(
260 "error while marshalling node ID map updated with driverName=%q, nodeID=%q: %v",
261 csiDriverName,
262 csiDriverNodeID,
263 err)
264 }
265
266 if node.ObjectMeta.Annotations == nil {
267 node.ObjectMeta.Annotations = make(map[string]string)
268 }
269 node.ObjectMeta.Annotations[annotationKeyNodeID] = string(jsonObj)
270
271 return node, true, nil
272 }
273 }
274
275
276
277 func removeNodeIDFromNode(csiDriverName string) nodeUpdateFunc {
278 return func(node *v1.Node) (*v1.Node, bool, error) {
279 var previousAnnotationValue string
280 if node.ObjectMeta.Annotations != nil {
281 previousAnnotationValue =
282 node.ObjectMeta.Annotations[annotationKeyNodeID]
283 }
284
285 if previousAnnotationValue == "" {
286 return node, false, nil
287 }
288
289
290 existingDriverMap := map[string]string{}
291 if err := json.Unmarshal([]byte(previousAnnotationValue), &existingDriverMap); err != nil {
292 return nil, false, fmt.Errorf(
293 "failed to parse node's %q annotation value (%q) err=%v",
294 annotationKeyNodeID,
295 previousAnnotationValue,
296 err)
297 }
298
299 if _, ok := existingDriverMap[csiDriverName]; !ok {
300
301 return node, false, nil
302 }
303
304
305 delete(existingDriverMap, csiDriverName)
306 if len(existingDriverMap) == 0 {
307 delete(node.ObjectMeta.Annotations, annotationKeyNodeID)
308 } else {
309 jsonObj, err := json.Marshal(existingDriverMap)
310 if err != nil {
311 return nil, false, fmt.Errorf(
312 "failed while trying to remove key %q from node %q annotation. Existing data: %v",
313 csiDriverName,
314 annotationKeyNodeID,
315 previousAnnotationValue)
316 }
317
318 node.ObjectMeta.Annotations[annotationKeyNodeID] = string(jsonObj)
319 }
320
321 return node, true, nil
322 }
323 }
324
325
326
327 func updateTopologyLabels(topology map[string]string) nodeUpdateFunc {
328 return func(node *v1.Node) (*v1.Node, bool, error) {
329 if len(topology) == 0 {
330 return node, false, nil
331 }
332
333 for k, v := range topology {
334 if curVal, exists := node.Labels[k]; exists && curVal != v {
335 return nil, false, fmt.Errorf("detected topology value collision: driver reported %q:%q but existing label is %q:%q", k, v, k, curVal)
336 }
337 }
338
339 if node.Labels == nil {
340 node.Labels = make(map[string]string)
341 }
342 for k, v := range topology {
343 node.Labels[k] = v
344 }
345 return node, true, nil
346 }
347 }
348
349 func (nim *nodeInfoManager) updateCSINode(
350 driverName string,
351 driverNodeID string,
352 maxAttachLimit int64,
353 topology map[string]string) error {
354
355 csiKubeClient := nim.volumeHost.GetKubeClient()
356 if csiKubeClient == nil {
357 return fmt.Errorf("error getting CSI client")
358 }
359
360 var updateErrs []error
361 err := wait.ExponentialBackoff(updateBackoff, func() (bool, error) {
362 if err := nim.tryUpdateCSINode(csiKubeClient, driverName, driverNodeID, maxAttachLimit, topology); err != nil {
363 updateErrs = append(updateErrs, err)
364 return false, nil
365 }
366 return true, nil
367 })
368 if err != nil {
369 return fmt.Errorf("error updating CSINode: %v; caused by: %v", err, utilerrors.NewAggregate(updateErrs))
370 }
371 return nil
372 }
373
374 func (nim *nodeInfoManager) tryUpdateCSINode(
375 csiKubeClient clientset.Interface,
376 driverName string,
377 driverNodeID string,
378 maxAttachLimit int64,
379 topology map[string]string) error {
380
381 nodeInfo, err := csiKubeClient.StorageV1().CSINodes().Get(context.TODO(), string(nim.nodeName), metav1.GetOptions{})
382 if nodeInfo == nil || errors.IsNotFound(err) {
383 nodeInfo, err = nim.CreateCSINode()
384 }
385 if err != nil {
386 return err
387 }
388
389 return nim.installDriverToCSINode(nodeInfo, driverName, driverNodeID, maxAttachLimit, topology)
390 }
391
392 func (nim *nodeInfoManager) InitializeCSINodeWithAnnotation() error {
393 csiKubeClient := nim.volumeHost.GetKubeClient()
394 if csiKubeClient == nil {
395 return goerrors.New("error getting CSI client")
396 }
397
398 var lastErr error
399 err := wait.ExponentialBackoff(updateBackoff, func() (bool, error) {
400 if lastErr = nim.tryInitializeCSINodeWithAnnotation(csiKubeClient); lastErr != nil {
401 klog.V(2).Infof("Failed to publish CSINode: %v", lastErr)
402 return false, nil
403 }
404 return true, nil
405 })
406 if err != nil {
407 return fmt.Errorf("error updating CSINode annotation: %v; caused by: %v", err, lastErr)
408 }
409
410 return nil
411 }
412
413 func (nim *nodeInfoManager) tryInitializeCSINodeWithAnnotation(csiKubeClient clientset.Interface) error {
414 nodeInfo, err := csiKubeClient.StorageV1().CSINodes().Get(context.TODO(), string(nim.nodeName), metav1.GetOptions{})
415 if nodeInfo == nil || errors.IsNotFound(err) {
416
417 _, err = nim.CreateCSINode()
418 return err
419 } else if err != nil {
420 return err
421 }
422
423 annotationModified := setMigrationAnnotation(nim.migratedPlugins, nodeInfo)
424
425 if annotationModified {
426 _, err := csiKubeClient.StorageV1().CSINodes().Update(context.TODO(), nodeInfo, metav1.UpdateOptions{})
427 return err
428 }
429 return nil
430
431 }
432
433 func (nim *nodeInfoManager) CreateCSINode() (*storagev1.CSINode, error) {
434
435 csiKubeClient := nim.volumeHost.GetKubeClient()
436 if csiKubeClient == nil {
437 return nil, fmt.Errorf("error getting CSI client")
438 }
439
440 node, err := csiKubeClient.CoreV1().Nodes().Get(context.TODO(), string(nim.nodeName), metav1.GetOptions{})
441 if err != nil {
442 return nil, err
443 }
444
445 nodeInfo := &storagev1.CSINode{
446 ObjectMeta: metav1.ObjectMeta{
447 Name: string(nim.nodeName),
448 OwnerReferences: []metav1.OwnerReference{
449 {
450 APIVersion: nodeKind.Version,
451 Kind: nodeKind.Kind,
452 Name: node.Name,
453 UID: node.UID,
454 },
455 },
456 },
457 Spec: storagev1.CSINodeSpec{
458 Drivers: []storagev1.CSINodeDriver{},
459 },
460 }
461
462 setMigrationAnnotation(nim.migratedPlugins, nodeInfo)
463
464 return csiKubeClient.StorageV1().CSINodes().Create(context.TODO(), nodeInfo, metav1.CreateOptions{})
465 }
466
467 func setMigrationAnnotation(migratedPlugins map[string](func() bool), nodeInfo *storagev1.CSINode) (modified bool) {
468 if migratedPlugins == nil {
469 return false
470 }
471
472 nodeInfoAnnotations := nodeInfo.GetAnnotations()
473 if nodeInfoAnnotations == nil {
474 nodeInfoAnnotations = map[string]string{}
475 }
476
477 var oldAnnotationSet sets.String
478 mpa := nodeInfoAnnotations[v1.MigratedPluginsAnnotationKey]
479 tok := strings.Split(mpa, ",")
480 if len(mpa) == 0 {
481 oldAnnotationSet = sets.NewString()
482 } else {
483 oldAnnotationSet = sets.NewString(tok...)
484 }
485
486 newAnnotationSet := sets.NewString()
487 for pluginName, migratedFunc := range migratedPlugins {
488 if migratedFunc() {
489 newAnnotationSet.Insert(pluginName)
490 }
491 }
492
493 if oldAnnotationSet.Equal(newAnnotationSet) {
494 return false
495 }
496
497 nas := strings.Join(newAnnotationSet.List(), ",")
498 if len(nas) != 0 {
499 nodeInfoAnnotations[v1.MigratedPluginsAnnotationKey] = nas
500 } else {
501 delete(nodeInfoAnnotations, v1.MigratedPluginsAnnotationKey)
502 }
503
504 nodeInfo.Annotations = nodeInfoAnnotations
505 return true
506 }
507
508
509 func keepAllocatableCount(driverInfoSpec storagev1.CSINodeDriver, maxAttachLimit int64) bool {
510 if maxAttachLimit == 0 {
511 return driverInfoSpec.Allocatable == nil || driverInfoSpec.Allocatable.Count == nil
512 }
513
514 return driverInfoSpec.Allocatable != nil && driverInfoSpec.Allocatable.Count != nil && int64(*driverInfoSpec.Allocatable.Count) == maxAttachLimit
515 }
516
517 func (nim *nodeInfoManager) installDriverToCSINode(
518 nodeInfo *storagev1.CSINode,
519 driverName string,
520 driverNodeID string,
521 maxAttachLimit int64,
522 topology map[string]string) error {
523
524 csiKubeClient := nim.volumeHost.GetKubeClient()
525 if csiKubeClient == nil {
526 return fmt.Errorf("error getting CSI client")
527 }
528
529 topologyKeys := sets.StringKeySet(topology)
530
531 specModified := true
532
533 newDriverSpecs := []storagev1.CSINodeDriver{}
534 for _, driverInfoSpec := range nodeInfo.Spec.Drivers {
535 if driverInfoSpec.Name == driverName {
536 if driverInfoSpec.NodeID == driverNodeID &&
537 sets.NewString(driverInfoSpec.TopologyKeys...).Equal(topologyKeys) &&
538 keepAllocatableCount(driverInfoSpec, maxAttachLimit) {
539 specModified = false
540 }
541 } else {
542
543 newDriverSpecs = append(newDriverSpecs, driverInfoSpec)
544 }
545 }
546
547 annotationModified := setMigrationAnnotation(nim.migratedPlugins, nodeInfo)
548
549 if !specModified && !annotationModified {
550 return nil
551 }
552
553
554 driverSpec := storagev1.CSINodeDriver{
555 Name: driverName,
556 NodeID: driverNodeID,
557 TopologyKeys: topologyKeys.List(),
558 }
559
560 if maxAttachLimit > 0 {
561 if maxAttachLimit > math.MaxInt32 {
562 klog.Warningf("Exceeded max supported attach limit value, truncating it to %d", math.MaxInt32)
563 maxAttachLimit = math.MaxInt32
564 }
565 m := int32(maxAttachLimit)
566 driverSpec.Allocatable = &storagev1.VolumeNodeResources{Count: &m}
567 } else if maxAttachLimit != 0 {
568 klog.Errorf("Invalid attach limit value %d cannot be added to CSINode object for %q", maxAttachLimit, driverName)
569 }
570
571 newDriverSpecs = append(newDriverSpecs, driverSpec)
572 nodeInfo.Spec.Drivers = newDriverSpecs
573
574 _, err := csiKubeClient.StorageV1().CSINodes().Update(context.TODO(), nodeInfo, metav1.UpdateOptions{})
575 return err
576 }
577
578 func (nim *nodeInfoManager) uninstallDriverFromCSINode(
579 csiDriverName string) error {
580
581 csiKubeClient := nim.volumeHost.GetKubeClient()
582 if csiKubeClient == nil {
583 return fmt.Errorf("error getting CSI client")
584 }
585
586 var updateErrs []error
587 err := wait.ExponentialBackoff(updateBackoff, func() (bool, error) {
588 if err := nim.tryUninstallDriverFromCSINode(csiKubeClient, csiDriverName); err != nil {
589 updateErrs = append(updateErrs, err)
590 return false, nil
591 }
592 return true, nil
593 })
594 if err != nil {
595 return fmt.Errorf("error updating CSINode: %v; caused by: %v", err, utilerrors.NewAggregate(updateErrs))
596 }
597 return nil
598 }
599
600 func (nim *nodeInfoManager) tryUninstallDriverFromCSINode(
601 csiKubeClient clientset.Interface,
602 csiDriverName string) error {
603
604 nodeInfoClient := csiKubeClient.StorageV1().CSINodes()
605 nodeInfo, err := nodeInfoClient.Get(context.TODO(), string(nim.nodeName), metav1.GetOptions{})
606 if err != nil && errors.IsNotFound(err) {
607 return nil
608 } else if err != nil {
609 return err
610 }
611
612 hasModified := false
613
614 drivers := nodeInfo.Spec.Drivers[:0]
615 for _, driver := range nodeInfo.Spec.Drivers {
616 if driver.Name != csiDriverName {
617 drivers = append(drivers, driver)
618 } else {
619
620
621 hasModified = true
622 }
623 }
624
625 if !hasModified {
626
627 return nil
628 }
629 nodeInfo.Spec.Drivers = drivers
630
631 _, err = nodeInfoClient.Update(context.TODO(), nodeInfo, metav1.UpdateOptions{})
632
633 return err
634
635 }
636
637 func removeMaxAttachLimit(driverName string) nodeUpdateFunc {
638 return func(node *v1.Node) (*v1.Node, bool, error) {
639 limitKey := v1.ResourceName(util.GetCSIAttachLimitKey(driverName))
640
641 capacityExists := false
642 if node.Status.Capacity != nil {
643 _, capacityExists = node.Status.Capacity[limitKey]
644 }
645
646 allocatableExists := false
647 if node.Status.Allocatable != nil {
648 _, allocatableExists = node.Status.Allocatable[limitKey]
649 }
650
651 if !capacityExists && !allocatableExists {
652 return node, false, nil
653 }
654
655 delete(node.Status.Capacity, limitKey)
656 if len(node.Status.Capacity) == 0 {
657 node.Status.Capacity = nil
658 }
659
660 delete(node.Status.Allocatable, limitKey)
661 if len(node.Status.Allocatable) == 0 {
662 node.Status.Allocatable = nil
663 }
664
665 return node, true, nil
666 }
667 }
668
View as plain text