...

Source file src/k8s.io/kubernetes/pkg/volume/csi/nodeinfomanager/nodeinfomanager.go

Documentation: k8s.io/kubernetes/pkg/volume/csi/nodeinfomanager

     1  /*
     2  Copyright 2018 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  // Package nodeinfomanager includes internal functions used to add/delete labels to
    18  // kubernetes nodes for corresponding CSI drivers
    19  package nodeinfomanager // import "k8s.io/kubernetes/pkg/volume/csi/nodeinfomanager"
    20  
    21  import (
    22  	"context"
    23  	"encoding/json"
    24  	goerrors "errors"
    25  	"fmt"
    26  	"math"
    27  	"strings"
    28  	"sync"
    29  
    30  	"time"
    31  
    32  	v1 "k8s.io/api/core/v1"
    33  	storagev1 "k8s.io/api/storage/v1"
    34  	"k8s.io/apimachinery/pkg/api/errors"
    35  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    36  	"k8s.io/apimachinery/pkg/types"
    37  	utilerrors "k8s.io/apimachinery/pkg/util/errors"
    38  	"k8s.io/apimachinery/pkg/util/sets"
    39  	"k8s.io/apimachinery/pkg/util/wait"
    40  	clientset "k8s.io/client-go/kubernetes"
    41  	nodeutil "k8s.io/component-helpers/node/util"
    42  	"k8s.io/klog/v2"
    43  	"k8s.io/kubernetes/pkg/volume"
    44  	"k8s.io/kubernetes/pkg/volume/util"
    45  )
    46  
    47  const (
    48  	// Name of node annotation that contains JSON map of driver names to node
    49  	annotationKeyNodeID = "csi.volume.kubernetes.io/nodeid"
    50  )
    51  
    52  var (
    53  	nodeKind      = v1.SchemeGroupVersion.WithKind("Node")
    54  	updateBackoff = wait.Backoff{
    55  		Steps:    4,
    56  		Duration: 10 * time.Millisecond,
    57  		Factor:   5.0,
    58  		Jitter:   0.1,
    59  	}
    60  )
    61  
    62  // nodeInfoManager contains necessary common dependencies to update node info on both
    63  // the Node and CSINode objects.
    64  type nodeInfoManager struct {
    65  	nodeName        types.NodeName
    66  	volumeHost      volume.VolumeHost
    67  	migratedPlugins map[string](func() bool)
    68  	// lock protects changes to node.
    69  	lock sync.Mutex
    70  }
    71  
    72  // If no updates is needed, the function must return the same Node object as the input.
    73  type nodeUpdateFunc func(*v1.Node) (newNode *v1.Node, updated bool, err error)
    74  
    75  // Interface implements an interface for managing labels of a node
    76  type Interface interface {
    77  	CreateCSINode() (*storagev1.CSINode, error)
    78  
    79  	// Updates or Creates the CSINode object with annotations for CSI Migration
    80  	InitializeCSINodeWithAnnotation() error
    81  
    82  	// Record in the cluster the given node information from the CSI driver with the given name.
    83  	// Concurrent calls to InstallCSIDriver() is allowed, but they should not be intertwined with calls
    84  	// to other methods in this interface.
    85  	InstallCSIDriver(driverName string, driverNodeID string, maxVolumeLimit int64, topology map[string]string) error
    86  
    87  	// Remove in the cluster node information from the CSI driver with the given name.
    88  	// Concurrent calls to UninstallCSIDriver() is allowed, but they should not be intertwined with calls
    89  	// to other methods in this interface.
    90  	UninstallCSIDriver(driverName string) error
    91  }
    92  
    93  // NewNodeInfoManager initializes nodeInfoManager
    94  func NewNodeInfoManager(
    95  	nodeName types.NodeName,
    96  	volumeHost volume.VolumeHost,
    97  	migratedPlugins map[string](func() bool)) Interface {
    98  	return &nodeInfoManager{
    99  		nodeName:        nodeName,
   100  		volumeHost:      volumeHost,
   101  		migratedPlugins: migratedPlugins,
   102  	}
   103  }
   104  
   105  // InstallCSIDriver updates the node ID annotation in the Node object and CSIDrivers field in the
   106  // CSINode object. If the CSINode object doesn't yet exist, it will be created.
   107  // If multiple calls to InstallCSIDriver() are made in parallel, some calls might receive Node or
   108  // CSINode update conflicts, which causes the function to retry the corresponding update.
   109  func (nim *nodeInfoManager) InstallCSIDriver(driverName string, driverNodeID string, maxAttachLimit int64, topology map[string]string) error {
   110  	if driverNodeID == "" {
   111  		return fmt.Errorf("error adding CSI driver node info: driverNodeID must not be empty")
   112  	}
   113  
   114  	nodeUpdateFuncs := []nodeUpdateFunc{
   115  		updateNodeIDInNode(driverName, driverNodeID),
   116  		updateTopologyLabels(topology),
   117  	}
   118  
   119  	err := nim.updateNode(nodeUpdateFuncs...)
   120  	if err != nil {
   121  		return fmt.Errorf("error updating Node object with CSI driver node info: %v", err)
   122  	}
   123  
   124  	err = nim.updateCSINode(driverName, driverNodeID, maxAttachLimit, topology)
   125  	if err != nil {
   126  		return fmt.Errorf("error updating CSINode object with CSI driver node info: %v", err)
   127  	}
   128  
   129  	return nil
   130  }
   131  
   132  // UninstallCSIDriver removes the node ID annotation from the Node object and CSIDrivers field from the
   133  // CSINode object. If the CSINodeInfo object contains no CSIDrivers, it will be deleted.
   134  // If multiple calls to UninstallCSIDriver() are made in parallel, some calls might receive Node or
   135  // CSINode update conflicts, which causes the function to retry the corresponding update.
   136  func (nim *nodeInfoManager) UninstallCSIDriver(driverName string) error {
   137  	err := nim.uninstallDriverFromCSINode(driverName)
   138  	if err != nil {
   139  		return fmt.Errorf("error uninstalling CSI driver from CSINode object %v", err)
   140  	}
   141  
   142  	err = nim.updateNode(
   143  		removeMaxAttachLimit(driverName),
   144  		removeNodeIDFromNode(driverName),
   145  	)
   146  	if err != nil {
   147  		return fmt.Errorf("error removing CSI driver node info from Node object %v", err)
   148  	}
   149  	return nil
   150  }
   151  
   152  func (nim *nodeInfoManager) updateNode(updateFuncs ...nodeUpdateFunc) error {
   153  	var updateErrs []error
   154  	err := wait.ExponentialBackoff(updateBackoff, func() (bool, error) {
   155  		if err := nim.tryUpdateNode(updateFuncs...); err != nil {
   156  			updateErrs = append(updateErrs, err)
   157  			return false, nil
   158  		}
   159  		return true, nil
   160  	})
   161  	if err != nil {
   162  		return fmt.Errorf("error updating node: %v; caused by: %v", err, utilerrors.NewAggregate(updateErrs))
   163  	}
   164  	return nil
   165  }
   166  
   167  // updateNode repeatedly attempts to update the corresponding node object
   168  // which is modified by applying the given update functions sequentially.
   169  // Because updateFuncs are applied sequentially, later updateFuncs should take into account
   170  // the effects of previous updateFuncs to avoid potential conflicts. For example, if multiple
   171  // functions update the same field, updates in the last function are persisted.
   172  func (nim *nodeInfoManager) tryUpdateNode(updateFuncs ...nodeUpdateFunc) error {
   173  	nim.lock.Lock()
   174  	defer nim.lock.Unlock()
   175  
   176  	// Retrieve the latest version of Node before attempting update, so that
   177  	// existing changes are not overwritten.
   178  
   179  	kubeClient := nim.volumeHost.GetKubeClient()
   180  	if kubeClient == nil {
   181  		return fmt.Errorf("error getting kube client")
   182  	}
   183  
   184  	nodeClient := kubeClient.CoreV1().Nodes()
   185  	originalNode, err := nodeClient.Get(context.TODO(), string(nim.nodeName), metav1.GetOptions{})
   186  	if err != nil {
   187  		return err
   188  	}
   189  	node := originalNode.DeepCopy()
   190  
   191  	needUpdate := false
   192  	for _, update := range updateFuncs {
   193  		newNode, updated, err := update(node)
   194  		if err != nil {
   195  			return err
   196  		}
   197  		node = newNode
   198  		needUpdate = needUpdate || updated
   199  	}
   200  
   201  	if needUpdate {
   202  		// PatchNodeStatus can update both node's status and labels or annotations
   203  		// Updating status by directly updating node does not work
   204  		_, _, updateErr := nodeutil.PatchNodeStatus(kubeClient.CoreV1(), types.NodeName(node.Name), originalNode, node)
   205  		return updateErr
   206  	}
   207  
   208  	return nil
   209  }
   210  
   211  // Guarantees the map is non-nil if no error is returned.
   212  func buildNodeIDMapFromAnnotation(node *v1.Node) (map[string]string, error) {
   213  	var previousAnnotationValue string
   214  	if node.ObjectMeta.Annotations != nil {
   215  		previousAnnotationValue =
   216  			node.ObjectMeta.Annotations[annotationKeyNodeID]
   217  	}
   218  
   219  	var existingDriverMap map[string]string
   220  	if previousAnnotationValue != "" {
   221  		// Parse previousAnnotationValue as JSON
   222  		if err := json.Unmarshal([]byte(previousAnnotationValue), &existingDriverMap); err != nil {
   223  			return nil, fmt.Errorf(
   224  				"failed to parse node's %q annotation value (%q) err=%v",
   225  				annotationKeyNodeID,
   226  				previousAnnotationValue,
   227  				err)
   228  		}
   229  	}
   230  
   231  	if existingDriverMap == nil {
   232  		return make(map[string]string), nil
   233  	}
   234  	return existingDriverMap, nil
   235  }
   236  
   237  // updateNodeIDInNode returns a function that updates a Node object with the given
   238  // Node ID information.
   239  func updateNodeIDInNode(
   240  	csiDriverName string,
   241  	csiDriverNodeID string) nodeUpdateFunc {
   242  	return func(node *v1.Node) (*v1.Node, bool, error) {
   243  		existingDriverMap, err := buildNodeIDMapFromAnnotation(node)
   244  		if err != nil {
   245  			return nil, false, err
   246  		}
   247  
   248  		if val, ok := existingDriverMap[csiDriverName]; ok {
   249  			if val == csiDriverNodeID {
   250  				// Value already exists in node annotation, nothing more to do
   251  				return node, false, nil
   252  			}
   253  		}
   254  
   255  		// Add/update annotation value
   256  		existingDriverMap[csiDriverName] = csiDriverNodeID
   257  		jsonObj, err := json.Marshal(existingDriverMap)
   258  		if err != nil {
   259  			return nil, false, fmt.Errorf(
   260  				"error while marshalling node ID map updated with driverName=%q, nodeID=%q: %v",
   261  				csiDriverName,
   262  				csiDriverNodeID,
   263  				err)
   264  		}
   265  
   266  		if node.ObjectMeta.Annotations == nil {
   267  			node.ObjectMeta.Annotations = make(map[string]string)
   268  		}
   269  		node.ObjectMeta.Annotations[annotationKeyNodeID] = string(jsonObj)
   270  
   271  		return node, true, nil
   272  	}
   273  }
   274  
   275  // removeNodeIDFromNode returns a function that removes node ID information matching the given
   276  // driver name from a Node object.
   277  func removeNodeIDFromNode(csiDriverName string) nodeUpdateFunc {
   278  	return func(node *v1.Node) (*v1.Node, bool, error) {
   279  		var previousAnnotationValue string
   280  		if node.ObjectMeta.Annotations != nil {
   281  			previousAnnotationValue =
   282  				node.ObjectMeta.Annotations[annotationKeyNodeID]
   283  		}
   284  
   285  		if previousAnnotationValue == "" {
   286  			return node, false, nil
   287  		}
   288  
   289  		// Parse previousAnnotationValue as JSON
   290  		existingDriverMap := map[string]string{}
   291  		if err := json.Unmarshal([]byte(previousAnnotationValue), &existingDriverMap); err != nil {
   292  			return nil, false, fmt.Errorf(
   293  				"failed to parse node's %q annotation value (%q) err=%v",
   294  				annotationKeyNodeID,
   295  				previousAnnotationValue,
   296  				err)
   297  		}
   298  
   299  		if _, ok := existingDriverMap[csiDriverName]; !ok {
   300  			// Value is already missing in node annotation, nothing more to do
   301  			return node, false, nil
   302  		}
   303  
   304  		// Delete annotation value
   305  		delete(existingDriverMap, csiDriverName)
   306  		if len(existingDriverMap) == 0 {
   307  			delete(node.ObjectMeta.Annotations, annotationKeyNodeID)
   308  		} else {
   309  			jsonObj, err := json.Marshal(existingDriverMap)
   310  			if err != nil {
   311  				return nil, false, fmt.Errorf(
   312  					"failed while trying to remove key %q from node %q annotation. Existing data: %v",
   313  					csiDriverName,
   314  					annotationKeyNodeID,
   315  					previousAnnotationValue)
   316  			}
   317  
   318  			node.ObjectMeta.Annotations[annotationKeyNodeID] = string(jsonObj)
   319  		}
   320  
   321  		return node, true, nil
   322  	}
   323  }
   324  
   325  // updateTopologyLabels returns a function that updates labels of a Node object with the given
   326  // topology information.
   327  func updateTopologyLabels(topology map[string]string) nodeUpdateFunc {
   328  	return func(node *v1.Node) (*v1.Node, bool, error) {
   329  		if len(topology) == 0 {
   330  			return node, false, nil
   331  		}
   332  
   333  		for k, v := range topology {
   334  			if curVal, exists := node.Labels[k]; exists && curVal != v {
   335  				return nil, false, fmt.Errorf("detected topology value collision: driver reported %q:%q but existing label is %q:%q", k, v, k, curVal)
   336  			}
   337  		}
   338  
   339  		if node.Labels == nil {
   340  			node.Labels = make(map[string]string)
   341  		}
   342  		for k, v := range topology {
   343  			node.Labels[k] = v
   344  		}
   345  		return node, true, nil
   346  	}
   347  }
   348  
   349  func (nim *nodeInfoManager) updateCSINode(
   350  	driverName string,
   351  	driverNodeID string,
   352  	maxAttachLimit int64,
   353  	topology map[string]string) error {
   354  
   355  	csiKubeClient := nim.volumeHost.GetKubeClient()
   356  	if csiKubeClient == nil {
   357  		return fmt.Errorf("error getting CSI client")
   358  	}
   359  
   360  	var updateErrs []error
   361  	err := wait.ExponentialBackoff(updateBackoff, func() (bool, error) {
   362  		if err := nim.tryUpdateCSINode(csiKubeClient, driverName, driverNodeID, maxAttachLimit, topology); err != nil {
   363  			updateErrs = append(updateErrs, err)
   364  			return false, nil
   365  		}
   366  		return true, nil
   367  	})
   368  	if err != nil {
   369  		return fmt.Errorf("error updating CSINode: %v; caused by: %v", err, utilerrors.NewAggregate(updateErrs))
   370  	}
   371  	return nil
   372  }
   373  
   374  func (nim *nodeInfoManager) tryUpdateCSINode(
   375  	csiKubeClient clientset.Interface,
   376  	driverName string,
   377  	driverNodeID string,
   378  	maxAttachLimit int64,
   379  	topology map[string]string) error {
   380  
   381  	nodeInfo, err := csiKubeClient.StorageV1().CSINodes().Get(context.TODO(), string(nim.nodeName), metav1.GetOptions{})
   382  	if nodeInfo == nil || errors.IsNotFound(err) {
   383  		nodeInfo, err = nim.CreateCSINode()
   384  	}
   385  	if err != nil {
   386  		return err
   387  	}
   388  
   389  	return nim.installDriverToCSINode(nodeInfo, driverName, driverNodeID, maxAttachLimit, topology)
   390  }
   391  
   392  func (nim *nodeInfoManager) InitializeCSINodeWithAnnotation() error {
   393  	csiKubeClient := nim.volumeHost.GetKubeClient()
   394  	if csiKubeClient == nil {
   395  		return goerrors.New("error getting CSI client")
   396  	}
   397  
   398  	var lastErr error
   399  	err := wait.ExponentialBackoff(updateBackoff, func() (bool, error) {
   400  		if lastErr = nim.tryInitializeCSINodeWithAnnotation(csiKubeClient); lastErr != nil {
   401  			klog.V(2).Infof("Failed to publish CSINode: %v", lastErr)
   402  			return false, nil
   403  		}
   404  		return true, nil
   405  	})
   406  	if err != nil {
   407  		return fmt.Errorf("error updating CSINode annotation: %v; caused by: %v", err, lastErr)
   408  	}
   409  
   410  	return nil
   411  }
   412  
   413  func (nim *nodeInfoManager) tryInitializeCSINodeWithAnnotation(csiKubeClient clientset.Interface) error {
   414  	nodeInfo, err := csiKubeClient.StorageV1().CSINodes().Get(context.TODO(), string(nim.nodeName), metav1.GetOptions{})
   415  	if nodeInfo == nil || errors.IsNotFound(err) {
   416  		// CreateCSINode will set the annotation
   417  		_, err = nim.CreateCSINode()
   418  		return err
   419  	} else if err != nil {
   420  		return err
   421  	}
   422  
   423  	annotationModified := setMigrationAnnotation(nim.migratedPlugins, nodeInfo)
   424  
   425  	if annotationModified {
   426  		_, err := csiKubeClient.StorageV1().CSINodes().Update(context.TODO(), nodeInfo, metav1.UpdateOptions{})
   427  		return err
   428  	}
   429  	return nil
   430  
   431  }
   432  
   433  func (nim *nodeInfoManager) CreateCSINode() (*storagev1.CSINode, error) {
   434  
   435  	csiKubeClient := nim.volumeHost.GetKubeClient()
   436  	if csiKubeClient == nil {
   437  		return nil, fmt.Errorf("error getting CSI client")
   438  	}
   439  
   440  	node, err := csiKubeClient.CoreV1().Nodes().Get(context.TODO(), string(nim.nodeName), metav1.GetOptions{})
   441  	if err != nil {
   442  		return nil, err
   443  	}
   444  
   445  	nodeInfo := &storagev1.CSINode{
   446  		ObjectMeta: metav1.ObjectMeta{
   447  			Name: string(nim.nodeName),
   448  			OwnerReferences: []metav1.OwnerReference{
   449  				{
   450  					APIVersion: nodeKind.Version,
   451  					Kind:       nodeKind.Kind,
   452  					Name:       node.Name,
   453  					UID:        node.UID,
   454  				},
   455  			},
   456  		},
   457  		Spec: storagev1.CSINodeSpec{
   458  			Drivers: []storagev1.CSINodeDriver{},
   459  		},
   460  	}
   461  
   462  	setMigrationAnnotation(nim.migratedPlugins, nodeInfo)
   463  
   464  	return csiKubeClient.StorageV1().CSINodes().Create(context.TODO(), nodeInfo, metav1.CreateOptions{})
   465  }
   466  
   467  func setMigrationAnnotation(migratedPlugins map[string](func() bool), nodeInfo *storagev1.CSINode) (modified bool) {
   468  	if migratedPlugins == nil {
   469  		return false
   470  	}
   471  
   472  	nodeInfoAnnotations := nodeInfo.GetAnnotations()
   473  	if nodeInfoAnnotations == nil {
   474  		nodeInfoAnnotations = map[string]string{}
   475  	}
   476  
   477  	var oldAnnotationSet sets.String
   478  	mpa := nodeInfoAnnotations[v1.MigratedPluginsAnnotationKey]
   479  	tok := strings.Split(mpa, ",")
   480  	if len(mpa) == 0 {
   481  		oldAnnotationSet = sets.NewString()
   482  	} else {
   483  		oldAnnotationSet = sets.NewString(tok...)
   484  	}
   485  
   486  	newAnnotationSet := sets.NewString()
   487  	for pluginName, migratedFunc := range migratedPlugins {
   488  		if migratedFunc() {
   489  			newAnnotationSet.Insert(pluginName)
   490  		}
   491  	}
   492  
   493  	if oldAnnotationSet.Equal(newAnnotationSet) {
   494  		return false
   495  	}
   496  
   497  	nas := strings.Join(newAnnotationSet.List(), ",")
   498  	if len(nas) != 0 {
   499  		nodeInfoAnnotations[v1.MigratedPluginsAnnotationKey] = nas
   500  	} else {
   501  		delete(nodeInfoAnnotations, v1.MigratedPluginsAnnotationKey)
   502  	}
   503  
   504  	nodeInfo.Annotations = nodeInfoAnnotations
   505  	return true
   506  }
   507  
   508  // Returns true if and only if new maxAttachLimit doesn't require CSINode update
   509  func keepAllocatableCount(driverInfoSpec storagev1.CSINodeDriver, maxAttachLimit int64) bool {
   510  	if maxAttachLimit == 0 {
   511  		return driverInfoSpec.Allocatable == nil || driverInfoSpec.Allocatable.Count == nil
   512  	}
   513  
   514  	return driverInfoSpec.Allocatable != nil && driverInfoSpec.Allocatable.Count != nil && int64(*driverInfoSpec.Allocatable.Count) == maxAttachLimit
   515  }
   516  
   517  func (nim *nodeInfoManager) installDriverToCSINode(
   518  	nodeInfo *storagev1.CSINode,
   519  	driverName string,
   520  	driverNodeID string,
   521  	maxAttachLimit int64,
   522  	topology map[string]string) error {
   523  
   524  	csiKubeClient := nim.volumeHost.GetKubeClient()
   525  	if csiKubeClient == nil {
   526  		return fmt.Errorf("error getting CSI client")
   527  	}
   528  
   529  	topologyKeys := sets.StringKeySet(topology)
   530  
   531  	specModified := true
   532  	// Clone driver list, omitting the driver that matches the given driverName
   533  	newDriverSpecs := []storagev1.CSINodeDriver{}
   534  	for _, driverInfoSpec := range nodeInfo.Spec.Drivers {
   535  		if driverInfoSpec.Name == driverName {
   536  			if driverInfoSpec.NodeID == driverNodeID &&
   537  				sets.NewString(driverInfoSpec.TopologyKeys...).Equal(topologyKeys) &&
   538  				keepAllocatableCount(driverInfoSpec, maxAttachLimit) {
   539  				specModified = false
   540  			}
   541  		} else {
   542  			// Omit driverInfoSpec matching given driverName
   543  			newDriverSpecs = append(newDriverSpecs, driverInfoSpec)
   544  		}
   545  	}
   546  
   547  	annotationModified := setMigrationAnnotation(nim.migratedPlugins, nodeInfo)
   548  
   549  	if !specModified && !annotationModified {
   550  		return nil
   551  	}
   552  
   553  	// Append new driver
   554  	driverSpec := storagev1.CSINodeDriver{
   555  		Name:         driverName,
   556  		NodeID:       driverNodeID,
   557  		TopologyKeys: topologyKeys.List(),
   558  	}
   559  
   560  	if maxAttachLimit > 0 {
   561  		if maxAttachLimit > math.MaxInt32 {
   562  			klog.Warningf("Exceeded max supported attach limit value, truncating it to %d", math.MaxInt32)
   563  			maxAttachLimit = math.MaxInt32
   564  		}
   565  		m := int32(maxAttachLimit)
   566  		driverSpec.Allocatable = &storagev1.VolumeNodeResources{Count: &m}
   567  	} else if maxAttachLimit != 0 {
   568  		klog.Errorf("Invalid attach limit value %d cannot be added to CSINode object for %q", maxAttachLimit, driverName)
   569  	}
   570  
   571  	newDriverSpecs = append(newDriverSpecs, driverSpec)
   572  	nodeInfo.Spec.Drivers = newDriverSpecs
   573  
   574  	_, err := csiKubeClient.StorageV1().CSINodes().Update(context.TODO(), nodeInfo, metav1.UpdateOptions{})
   575  	return err
   576  }
   577  
   578  func (nim *nodeInfoManager) uninstallDriverFromCSINode(
   579  	csiDriverName string) error {
   580  
   581  	csiKubeClient := nim.volumeHost.GetKubeClient()
   582  	if csiKubeClient == nil {
   583  		return fmt.Errorf("error getting CSI client")
   584  	}
   585  
   586  	var updateErrs []error
   587  	err := wait.ExponentialBackoff(updateBackoff, func() (bool, error) {
   588  		if err := nim.tryUninstallDriverFromCSINode(csiKubeClient, csiDriverName); err != nil {
   589  			updateErrs = append(updateErrs, err)
   590  			return false, nil
   591  		}
   592  		return true, nil
   593  	})
   594  	if err != nil {
   595  		return fmt.Errorf("error updating CSINode: %v; caused by: %v", err, utilerrors.NewAggregate(updateErrs))
   596  	}
   597  	return nil
   598  }
   599  
   600  func (nim *nodeInfoManager) tryUninstallDriverFromCSINode(
   601  	csiKubeClient clientset.Interface,
   602  	csiDriverName string) error {
   603  
   604  	nodeInfoClient := csiKubeClient.StorageV1().CSINodes()
   605  	nodeInfo, err := nodeInfoClient.Get(context.TODO(), string(nim.nodeName), metav1.GetOptions{})
   606  	if err != nil && errors.IsNotFound(err) {
   607  		return nil
   608  	} else if err != nil {
   609  		return err
   610  	}
   611  
   612  	hasModified := false
   613  	// Uninstall CSINodeDriver with name csiDriverName
   614  	drivers := nodeInfo.Spec.Drivers[:0]
   615  	for _, driver := range nodeInfo.Spec.Drivers {
   616  		if driver.Name != csiDriverName {
   617  			drivers = append(drivers, driver)
   618  		} else {
   619  			// Found a driver with name csiDriverName
   620  			// Set hasModified to true because it will be removed
   621  			hasModified = true
   622  		}
   623  	}
   624  
   625  	if !hasModified {
   626  		// No changes, don't update
   627  		return nil
   628  	}
   629  	nodeInfo.Spec.Drivers = drivers
   630  
   631  	_, err = nodeInfoClient.Update(context.TODO(), nodeInfo, metav1.UpdateOptions{})
   632  
   633  	return err // do not wrap error
   634  
   635  }
   636  
   637  func removeMaxAttachLimit(driverName string) nodeUpdateFunc {
   638  	return func(node *v1.Node) (*v1.Node, bool, error) {
   639  		limitKey := v1.ResourceName(util.GetCSIAttachLimitKey(driverName))
   640  
   641  		capacityExists := false
   642  		if node.Status.Capacity != nil {
   643  			_, capacityExists = node.Status.Capacity[limitKey]
   644  		}
   645  
   646  		allocatableExists := false
   647  		if node.Status.Allocatable != nil {
   648  			_, allocatableExists = node.Status.Allocatable[limitKey]
   649  		}
   650  
   651  		if !capacityExists && !allocatableExists {
   652  			return node, false, nil
   653  		}
   654  
   655  		delete(node.Status.Capacity, limitKey)
   656  		if len(node.Status.Capacity) == 0 {
   657  			node.Status.Capacity = nil
   658  		}
   659  
   660  		delete(node.Status.Allocatable, limitKey)
   661  		if len(node.Status.Allocatable) == 0 {
   662  			node.Status.Allocatable = nil
   663  		}
   664  
   665  		return node, true, nil
   666  	}
   667  }
   668  

View as plain text