...

Source file src/k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodevolumelimits/non_csi.go

Documentation: k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodevolumelimits

     1  /*
     2  Copyright 2019 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package nodevolumelimits
    18  
    19  import (
    20  	"context"
    21  	"errors"
    22  	"fmt"
    23  	"os"
    24  	"regexp"
    25  	"strconv"
    26  
    27  	v1 "k8s.io/api/core/v1"
    28  	storage "k8s.io/api/storage/v1"
    29  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    30  	"k8s.io/apimachinery/pkg/runtime"
    31  	"k8s.io/apimachinery/pkg/util/rand"
    32  	"k8s.io/apimachinery/pkg/util/sets"
    33  	"k8s.io/client-go/informers"
    34  	corelisters "k8s.io/client-go/listers/core/v1"
    35  	storagelisters "k8s.io/client-go/listers/storage/v1"
    36  	"k8s.io/component-helpers/storage/ephemeral"
    37  	csilibplugins "k8s.io/csi-translation-lib/plugins"
    38  	"k8s.io/klog/v2"
    39  	"k8s.io/kubernetes/pkg/scheduler/framework"
    40  	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
    41  	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
    42  	volumeutil "k8s.io/kubernetes/pkg/volume/util"
    43  )
    44  
    45  const (
    46  	// defaultMaxGCEPDVolumes defines the maximum number of PD Volumes for GCE.
    47  	// GCE instances can have up to 16 PD volumes attached.
    48  	defaultMaxGCEPDVolumes = 16
    49  	// defaultMaxAzureDiskVolumes defines the maximum number of PD Volumes for Azure.
    50  	// Larger Azure VMs can actually have much more disks attached.
    51  	// TODO We should determine the max based on VM size
    52  	defaultMaxAzureDiskVolumes = 16
    53  
    54  	// ebsVolumeFilterType defines the filter name for ebsVolumeFilter.
    55  	ebsVolumeFilterType = "EBS"
    56  	// gcePDVolumeFilterType defines the filter name for gcePDVolumeFilter.
    57  	gcePDVolumeFilterType = "GCE"
    58  	// azureDiskVolumeFilterType defines the filter name for azureDiskVolumeFilter.
    59  	azureDiskVolumeFilterType = "AzureDisk"
    60  	// cinderVolumeFilterType defines the filter name for cinderVolumeFilter.
    61  	cinderVolumeFilterType = "Cinder"
    62  
    63  	// ErrReasonMaxVolumeCountExceeded is used for MaxVolumeCount predicate error.
    64  	ErrReasonMaxVolumeCountExceeded = "node(s) exceed max volume count"
    65  
    66  	// KubeMaxPDVols defines the maximum number of PD Volumes per kubelet.
    67  	KubeMaxPDVols = "KUBE_MAX_PD_VOLS"
    68  )
    69  
    70  // AzureDiskName is the name of the plugin used in the plugin registry and configurations.
    71  const AzureDiskName = names.AzureDiskLimits
    72  
    73  // NewAzureDisk returns function that initializes a new plugin and returns it.
    74  func NewAzureDisk(ctx context.Context, _ runtime.Object, handle framework.Handle, fts feature.Features) (framework.Plugin, error) {
    75  	informerFactory := handle.SharedInformerFactory()
    76  	return newNonCSILimitsWithInformerFactory(ctx, azureDiskVolumeFilterType, informerFactory, fts), nil
    77  }
    78  
    79  // CinderName is the name of the plugin used in the plugin registry and configurations.
    80  const CinderName = names.CinderLimits
    81  
    82  // NewCinder returns function that initializes a new plugin and returns it.
    83  func NewCinder(ctx context.Context, _ runtime.Object, handle framework.Handle, fts feature.Features) (framework.Plugin, error) {
    84  	informerFactory := handle.SharedInformerFactory()
    85  	return newNonCSILimitsWithInformerFactory(ctx, cinderVolumeFilterType, informerFactory, fts), nil
    86  }
    87  
    88  // EBSName is the name of the plugin used in the plugin registry and configurations.
    89  const EBSName = names.EBSLimits
    90  
    91  // NewEBS returns function that initializes a new plugin and returns it.
    92  func NewEBS(ctx context.Context, _ runtime.Object, handle framework.Handle, fts feature.Features) (framework.Plugin, error) {
    93  	informerFactory := handle.SharedInformerFactory()
    94  	return newNonCSILimitsWithInformerFactory(ctx, ebsVolumeFilterType, informerFactory, fts), nil
    95  }
    96  
    97  // GCEPDName is the name of the plugin used in the plugin registry and configurations.
    98  const GCEPDName = names.GCEPDLimits
    99  
   100  // NewGCEPD returns function that initializes a new plugin and returns it.
   101  func NewGCEPD(ctx context.Context, _ runtime.Object, handle framework.Handle, fts feature.Features) (framework.Plugin, error) {
   102  	informerFactory := handle.SharedInformerFactory()
   103  	return newNonCSILimitsWithInformerFactory(ctx, gcePDVolumeFilterType, informerFactory, fts), nil
   104  }
   105  
   106  // nonCSILimits contains information to check the max number of volumes for a plugin.
   107  type nonCSILimits struct {
   108  	name           string
   109  	filter         VolumeFilter
   110  	volumeLimitKey v1.ResourceName
   111  	maxVolumeFunc  func(node *v1.Node) int
   112  	csiNodeLister  storagelisters.CSINodeLister
   113  	pvLister       corelisters.PersistentVolumeLister
   114  	pvcLister      corelisters.PersistentVolumeClaimLister
   115  	scLister       storagelisters.StorageClassLister
   116  
   117  	// The string below is generated randomly during the struct's initialization.
   118  	// It is used to prefix volumeID generated inside the predicate() method to
   119  	// avoid conflicts with any real volume.
   120  	randomVolumeIDPrefix string
   121  }
   122  
   123  var _ framework.PreFilterPlugin = &nonCSILimits{}
   124  var _ framework.FilterPlugin = &nonCSILimits{}
   125  var _ framework.EnqueueExtensions = &nonCSILimits{}
   126  
   127  // newNonCSILimitsWithInformerFactory returns a plugin with filter name and informer factory.
   128  func newNonCSILimitsWithInformerFactory(
   129  	ctx context.Context,
   130  	filterName string,
   131  	informerFactory informers.SharedInformerFactory,
   132  	fts feature.Features,
   133  ) framework.Plugin {
   134  	pvLister := informerFactory.Core().V1().PersistentVolumes().Lister()
   135  	pvcLister := informerFactory.Core().V1().PersistentVolumeClaims().Lister()
   136  	csiNodesLister := informerFactory.Storage().V1().CSINodes().Lister()
   137  	scLister := informerFactory.Storage().V1().StorageClasses().Lister()
   138  
   139  	return newNonCSILimits(ctx, filterName, csiNodesLister, scLister, pvLister, pvcLister, fts)
   140  }
   141  
   142  // newNonCSILimits creates a plugin which evaluates whether a pod can fit based on the
   143  // number of volumes which match a filter that it requests, and those that are already present.
   144  //
   145  // DEPRECATED
   146  // All cloudprovider specific predicates defined here are deprecated in favour of CSI volume limit
   147  // predicate - MaxCSIVolumeCountPred.
   148  //
   149  // The predicate looks for both volumes used directly, as well as PVC volumes that are backed by relevant volume
   150  // types, counts the number of unique volumes, and rejects the new pod if it would place the total count over
   151  // the maximum.
   152  func newNonCSILimits(
   153  	ctx context.Context,
   154  	filterName string,
   155  	csiNodeLister storagelisters.CSINodeLister,
   156  	scLister storagelisters.StorageClassLister,
   157  	pvLister corelisters.PersistentVolumeLister,
   158  	pvcLister corelisters.PersistentVolumeClaimLister,
   159  	fts feature.Features,
   160  ) framework.Plugin {
   161  	logger := klog.FromContext(ctx)
   162  	var filter VolumeFilter
   163  	var volumeLimitKey v1.ResourceName
   164  	var name string
   165  
   166  	switch filterName {
   167  	case ebsVolumeFilterType:
   168  		name = EBSName
   169  		filter = ebsVolumeFilter
   170  		volumeLimitKey = v1.ResourceName(volumeutil.EBSVolumeLimitKey)
   171  	case gcePDVolumeFilterType:
   172  		name = GCEPDName
   173  		filter = gcePDVolumeFilter
   174  		volumeLimitKey = v1.ResourceName(volumeutil.GCEVolumeLimitKey)
   175  	case azureDiskVolumeFilterType:
   176  		name = AzureDiskName
   177  		filter = azureDiskVolumeFilter
   178  		volumeLimitKey = v1.ResourceName(volumeutil.AzureVolumeLimitKey)
   179  	case cinderVolumeFilterType:
   180  		name = CinderName
   181  		filter = cinderVolumeFilter
   182  		volumeLimitKey = v1.ResourceName(volumeutil.CinderVolumeLimitKey)
   183  	default:
   184  		logger.Error(errors.New("wrong filterName"), "Cannot create nonCSILimits plugin")
   185  		return nil
   186  	}
   187  	pl := &nonCSILimits{
   188  		name:                 name,
   189  		filter:               filter,
   190  		volumeLimitKey:       volumeLimitKey,
   191  		maxVolumeFunc:        getMaxVolumeFunc(logger, filterName),
   192  		csiNodeLister:        csiNodeLister,
   193  		pvLister:             pvLister,
   194  		pvcLister:            pvcLister,
   195  		scLister:             scLister,
   196  		randomVolumeIDPrefix: rand.String(32),
   197  	}
   198  
   199  	return pl
   200  }
   201  
   202  // Name returns name of the plugin. It is used in logs, etc.
   203  func (pl *nonCSILimits) Name() string {
   204  	return pl.name
   205  }
   206  
   207  // EventsToRegister returns the possible events that may make a Pod
   208  // failed by this plugin schedulable.
   209  func (pl *nonCSILimits) EventsToRegister() []framework.ClusterEventWithHint {
   210  	return []framework.ClusterEventWithHint{
   211  		{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add}},
   212  		{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Delete}},
   213  		{Event: framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add}},
   214  	}
   215  }
   216  
   217  // PreFilter invoked at the prefilter extension point
   218  //
   219  // If the pod haven't those types of volumes, we'll skip the Filter phase
   220  func (pl *nonCSILimits) PreFilter(ctx context.Context, _ *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
   221  	volumes := pod.Spec.Volumes
   222  	for i := range volumes {
   223  		vol := &volumes[i]
   224  		_, ok := pl.filter.FilterVolume(vol)
   225  		if ok || vol.PersistentVolumeClaim != nil || vol.Ephemeral != nil {
   226  			return nil, nil
   227  		}
   228  	}
   229  
   230  	return nil, framework.NewStatus(framework.Skip)
   231  }
   232  
   233  // PreFilterExtensions returns prefilter extensions, pod add and remove.
   234  func (pl *nonCSILimits) PreFilterExtensions() framework.PreFilterExtensions {
   235  	return nil
   236  }
   237  
   238  // Filter invoked at the filter extension point.
   239  func (pl *nonCSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
   240  	// If a pod doesn't have any volume attached to it, the predicate will always be true.
   241  	// Thus we make a fast path for it, to avoid unnecessary computations in this case.
   242  	if len(pod.Spec.Volumes) == 0 {
   243  		return nil
   244  	}
   245  
   246  	logger := klog.FromContext(ctx)
   247  	newVolumes := sets.New[string]()
   248  	if err := pl.filterVolumes(logger, pod, true /* new pod */, newVolumes); err != nil {
   249  		if apierrors.IsNotFound(err) {
   250  			// PVC is not found. This Pod will never be schedulable until PVC is created.
   251  			return framework.NewStatus(framework.UnschedulableAndUnresolvable, err.Error())
   252  		}
   253  		return framework.AsStatus(err)
   254  	}
   255  
   256  	// quick return
   257  	if len(newVolumes) == 0 {
   258  		return nil
   259  	}
   260  
   261  	node := nodeInfo.Node()
   262  
   263  	var csiNode *storage.CSINode
   264  	var err error
   265  	if pl.csiNodeLister != nil {
   266  		csiNode, err = pl.csiNodeLister.Get(node.Name)
   267  		if err != nil {
   268  			// we don't fail here because the CSINode object is only necessary
   269  			// for determining whether the migration is enabled or not
   270  			logger.V(5).Info("Could not get a CSINode object for the node", "node", klog.KObj(node), "err", err)
   271  		}
   272  	}
   273  
   274  	// if a plugin has been migrated to a CSI driver, defer to the CSI predicate
   275  	if pl.filter.IsMigrated(csiNode) {
   276  		return nil
   277  	}
   278  
   279  	// count unique volumes
   280  	existingVolumes := sets.New[string]()
   281  	for _, existingPod := range nodeInfo.Pods {
   282  		if err := pl.filterVolumes(logger, existingPod.Pod, false /* existing pod */, existingVolumes); err != nil {
   283  			return framework.AsStatus(err)
   284  		}
   285  	}
   286  	numExistingVolumes := len(existingVolumes)
   287  
   288  	// filter out already-mounted volumes
   289  	for k := range existingVolumes {
   290  		delete(newVolumes, k)
   291  	}
   292  
   293  	numNewVolumes := len(newVolumes)
   294  	maxAttachLimit := pl.maxVolumeFunc(node)
   295  	volumeLimits := volumeLimits(nodeInfo)
   296  	if maxAttachLimitFromAllocatable, ok := volumeLimits[pl.volumeLimitKey]; ok {
   297  		maxAttachLimit = int(maxAttachLimitFromAllocatable)
   298  	}
   299  
   300  	if numExistingVolumes+numNewVolumes > maxAttachLimit {
   301  		return framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded)
   302  	}
   303  	return nil
   304  }
   305  
   306  func (pl *nonCSILimits) filterVolumes(logger klog.Logger, pod *v1.Pod, newPod bool, filteredVolumes sets.Set[string]) error {
   307  	volumes := pod.Spec.Volumes
   308  	for i := range volumes {
   309  		vol := &volumes[i]
   310  		if id, ok := pl.filter.FilterVolume(vol); ok {
   311  			filteredVolumes.Insert(id)
   312  			continue
   313  		}
   314  
   315  		pvcName := ""
   316  		isEphemeral := false
   317  		switch {
   318  		case vol.PersistentVolumeClaim != nil:
   319  			pvcName = vol.PersistentVolumeClaim.ClaimName
   320  		case vol.Ephemeral != nil:
   321  			// Generic ephemeral inline volumes also use a PVC,
   322  			// just with a computed name and certain ownership.
   323  			// That is checked below once the pvc object is
   324  			// retrieved.
   325  			pvcName = ephemeral.VolumeClaimName(pod, vol)
   326  			isEphemeral = true
   327  		default:
   328  			continue
   329  		}
   330  		if pvcName == "" {
   331  			return fmt.Errorf("PersistentVolumeClaim had no name")
   332  		}
   333  
   334  		// Until we know real ID of the volume use namespace/pvcName as substitute
   335  		// with a random prefix (calculated and stored inside 'c' during initialization)
   336  		// to avoid conflicts with existing volume IDs.
   337  		pvID := fmt.Sprintf("%s-%s/%s", pl.randomVolumeIDPrefix, pod.Namespace, pvcName)
   338  
   339  		pvc, err := pl.pvcLister.PersistentVolumeClaims(pod.Namespace).Get(pvcName)
   340  		if err != nil {
   341  			if newPod {
   342  				// The PVC is required to proceed with
   343  				// scheduling of a new pod because it cannot
   344  				// run without it. Bail out immediately.
   345  				return fmt.Errorf("looking up PVC %s/%s: %w", pod.Namespace, pvcName, err)
   346  			}
   347  			// If the PVC is invalid, we don't count the volume because
   348  			// there's no guarantee that it belongs to the running predicate.
   349  			logger.V(4).Info("Unable to look up PVC info, assuming PVC doesn't match predicate when counting limits", "pod", klog.KObj(pod), "PVC", klog.KRef(pod.Namespace, pvcName), "err", err)
   350  			continue
   351  		}
   352  
   353  		// The PVC for an ephemeral volume must be owned by the pod.
   354  		if isEphemeral {
   355  			if err := ephemeral.VolumeIsForPod(pod, pvc); err != nil {
   356  				return err
   357  			}
   358  		}
   359  
   360  		pvName := pvc.Spec.VolumeName
   361  		if pvName == "" {
   362  			// PVC is not bound. It was either deleted and created again or
   363  			// it was forcefully unbound by admin. The pod can still use the
   364  			// original PV where it was bound to, so we count the volume if
   365  			// it belongs to the running predicate.
   366  			if pl.matchProvisioner(pvc) {
   367  				logger.V(4).Info("PVC is not bound, assuming PVC matches predicate when counting limits", "pod", klog.KObj(pod), "PVC", klog.KRef(pod.Namespace, pvcName))
   368  				filteredVolumes.Insert(pvID)
   369  			}
   370  			continue
   371  		}
   372  
   373  		pv, err := pl.pvLister.Get(pvName)
   374  		if err != nil {
   375  			// If the PV is invalid and PVC belongs to the running predicate,
   376  			// log the error and count the PV towards the PV limit.
   377  			if pl.matchProvisioner(pvc) {
   378  				logger.V(4).Info("Unable to look up PV, assuming PV matches predicate when counting limits", "pod", klog.KObj(pod), "PVC", klog.KRef(pod.Namespace, pvcName), "PV", klog.KRef("", pvName), "err", err)
   379  				filteredVolumes.Insert(pvID)
   380  			}
   381  			continue
   382  		}
   383  
   384  		if id, ok := pl.filter.FilterPersistentVolume(pv); ok {
   385  			filteredVolumes.Insert(id)
   386  		}
   387  	}
   388  
   389  	return nil
   390  }
   391  
   392  // matchProvisioner helps identify if the given PVC belongs to the running predicate.
   393  func (pl *nonCSILimits) matchProvisioner(pvc *v1.PersistentVolumeClaim) bool {
   394  	if pvc.Spec.StorageClassName == nil {
   395  		return false
   396  	}
   397  
   398  	storageClass, err := pl.scLister.Get(*pvc.Spec.StorageClassName)
   399  	if err != nil || storageClass == nil {
   400  		return false
   401  	}
   402  
   403  	return pl.filter.MatchProvisioner(storageClass)
   404  }
   405  
   406  // getMaxVolLimitFromEnv checks the max PD volumes environment variable, otherwise returning a default value.
   407  func getMaxVolLimitFromEnv(logger klog.Logger) int {
   408  	if rawMaxVols := os.Getenv(KubeMaxPDVols); rawMaxVols != "" {
   409  		if parsedMaxVols, err := strconv.Atoi(rawMaxVols); err != nil {
   410  			logger.Error(err, "Unable to parse maximum PD volumes value, using default")
   411  		} else if parsedMaxVols <= 0 {
   412  			logger.Error(errors.New("maximum PD volumes is negative"), "Unable to parse maximum PD volumes value, using default")
   413  		} else {
   414  			return parsedMaxVols
   415  		}
   416  	}
   417  
   418  	return -1
   419  }
   420  
   421  // VolumeFilter contains information on how to filter PD Volumes when checking PD Volume caps.
   422  type VolumeFilter struct {
   423  	// Filter normal volumes
   424  	FilterVolume           func(vol *v1.Volume) (id string, relevant bool)
   425  	FilterPersistentVolume func(pv *v1.PersistentVolume) (id string, relevant bool)
   426  	// MatchProvisioner evaluates if the StorageClass provisioner matches the running predicate
   427  	MatchProvisioner func(sc *storage.StorageClass) (relevant bool)
   428  	// IsMigrated returns a boolean specifying whether the plugin is migrated to a CSI driver
   429  	IsMigrated func(csiNode *storage.CSINode) bool
   430  }
   431  
   432  // ebsVolumeFilter is a VolumeFilter for filtering AWS ElasticBlockStore Volumes.
   433  var ebsVolumeFilter = VolumeFilter{
   434  	FilterVolume: func(vol *v1.Volume) (string, bool) {
   435  		if vol.AWSElasticBlockStore != nil {
   436  			return vol.AWSElasticBlockStore.VolumeID, true
   437  		}
   438  		return "", false
   439  	},
   440  
   441  	FilterPersistentVolume: func(pv *v1.PersistentVolume) (string, bool) {
   442  		if pv.Spec.AWSElasticBlockStore != nil {
   443  			return pv.Spec.AWSElasticBlockStore.VolumeID, true
   444  		}
   445  		return "", false
   446  	},
   447  
   448  	MatchProvisioner: func(sc *storage.StorageClass) bool {
   449  		return sc.Provisioner == csilibplugins.AWSEBSInTreePluginName
   450  	},
   451  
   452  	IsMigrated: func(csiNode *storage.CSINode) bool {
   453  		return isCSIMigrationOn(csiNode, csilibplugins.AWSEBSInTreePluginName)
   454  	},
   455  }
   456  
   457  // gcePDVolumeFilter is a VolumeFilter for filtering gce PersistentDisk Volumes.
   458  var gcePDVolumeFilter = VolumeFilter{
   459  	FilterVolume: func(vol *v1.Volume) (string, bool) {
   460  		if vol.GCEPersistentDisk != nil {
   461  			return vol.GCEPersistentDisk.PDName, true
   462  		}
   463  		return "", false
   464  	},
   465  
   466  	FilterPersistentVolume: func(pv *v1.PersistentVolume) (string, bool) {
   467  		if pv.Spec.GCEPersistentDisk != nil {
   468  			return pv.Spec.GCEPersistentDisk.PDName, true
   469  		}
   470  		return "", false
   471  	},
   472  
   473  	MatchProvisioner: func(sc *storage.StorageClass) bool {
   474  		return sc.Provisioner == csilibplugins.GCEPDInTreePluginName
   475  	},
   476  
   477  	IsMigrated: func(csiNode *storage.CSINode) bool {
   478  		return isCSIMigrationOn(csiNode, csilibplugins.GCEPDInTreePluginName)
   479  	},
   480  }
   481  
   482  // azureDiskVolumeFilter is a VolumeFilter for filtering azure Disk Volumes.
   483  var azureDiskVolumeFilter = VolumeFilter{
   484  	FilterVolume: func(vol *v1.Volume) (string, bool) {
   485  		if vol.AzureDisk != nil {
   486  			return vol.AzureDisk.DiskName, true
   487  		}
   488  		return "", false
   489  	},
   490  
   491  	FilterPersistentVolume: func(pv *v1.PersistentVolume) (string, bool) {
   492  		if pv.Spec.AzureDisk != nil {
   493  			return pv.Spec.AzureDisk.DiskName, true
   494  		}
   495  		return "", false
   496  	},
   497  
   498  	MatchProvisioner: func(sc *storage.StorageClass) bool {
   499  		return sc.Provisioner == csilibplugins.AzureDiskInTreePluginName
   500  	},
   501  
   502  	IsMigrated: func(csiNode *storage.CSINode) bool {
   503  		return isCSIMigrationOn(csiNode, csilibplugins.AzureDiskInTreePluginName)
   504  	},
   505  }
   506  
   507  // cinderVolumeFilter is a VolumeFilter for filtering cinder Volumes.
   508  // It will be deprecated once Openstack cloudprovider has been removed from in-tree.
   509  var cinderVolumeFilter = VolumeFilter{
   510  	FilterVolume: func(vol *v1.Volume) (string, bool) {
   511  		if vol.Cinder != nil {
   512  			return vol.Cinder.VolumeID, true
   513  		}
   514  		return "", false
   515  	},
   516  
   517  	FilterPersistentVolume: func(pv *v1.PersistentVolume) (string, bool) {
   518  		if pv.Spec.Cinder != nil {
   519  			return pv.Spec.Cinder.VolumeID, true
   520  		}
   521  		return "", false
   522  	},
   523  
   524  	MatchProvisioner: func(sc *storage.StorageClass) bool {
   525  		return sc.Provisioner == csilibplugins.CinderInTreePluginName
   526  	},
   527  
   528  	IsMigrated: func(csiNode *storage.CSINode) bool {
   529  		return isCSIMigrationOn(csiNode, csilibplugins.CinderInTreePluginName)
   530  	},
   531  }
   532  
   533  func getMaxVolumeFunc(logger klog.Logger, filterName string) func(node *v1.Node) int {
   534  	return func(node *v1.Node) int {
   535  		maxVolumesFromEnv := getMaxVolLimitFromEnv(logger)
   536  		if maxVolumesFromEnv > 0 {
   537  			return maxVolumesFromEnv
   538  		}
   539  
   540  		var nodeInstanceType string
   541  		for k, v := range node.ObjectMeta.Labels {
   542  			if k == v1.LabelInstanceType || k == v1.LabelInstanceTypeStable {
   543  				nodeInstanceType = v
   544  				break
   545  			}
   546  		}
   547  		switch filterName {
   548  		case ebsVolumeFilterType:
   549  			return getMaxEBSVolume(nodeInstanceType)
   550  		case gcePDVolumeFilterType:
   551  			return defaultMaxGCEPDVolumes
   552  		case azureDiskVolumeFilterType:
   553  			return defaultMaxAzureDiskVolumes
   554  		case cinderVolumeFilterType:
   555  			return volumeutil.DefaultMaxCinderVolumes
   556  		default:
   557  			return -1
   558  		}
   559  	}
   560  }
   561  
   562  func getMaxEBSVolume(nodeInstanceType string) int {
   563  	if ok, _ := regexp.MatchString(volumeutil.EBSNitroLimitRegex, nodeInstanceType); ok {
   564  		return volumeutil.DefaultMaxEBSNitroVolumeLimit
   565  	}
   566  	return volumeutil.DefaultMaxEBSVolumes
   567  }
   568  

View as plain text