...

Source file src/k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodevolumelimits/csi.go

Documentation: k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodevolumelimits

     1  /*
     2  Copyright 2019 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package nodevolumelimits
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  
    23  	v1 "k8s.io/api/core/v1"
    24  	storagev1 "k8s.io/api/storage/v1"
    25  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    26  	"k8s.io/apimachinery/pkg/runtime"
    27  	"k8s.io/apimachinery/pkg/util/rand"
    28  	corelisters "k8s.io/client-go/listers/core/v1"
    29  	storagelisters "k8s.io/client-go/listers/storage/v1"
    30  	ephemeral "k8s.io/component-helpers/storage/ephemeral"
    31  	storagehelpers "k8s.io/component-helpers/storage/volume"
    32  	csitrans "k8s.io/csi-translation-lib"
    33  	"k8s.io/klog/v2"
    34  	"k8s.io/kubernetes/pkg/scheduler/framework"
    35  	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
    36  	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
    37  	volumeutil "k8s.io/kubernetes/pkg/volume/util"
    38  )
    39  
    40  // InTreeToCSITranslator contains methods required to check migratable status
    41  // and perform translations from InTree PV's to CSI
    42  type InTreeToCSITranslator interface {
    43  	IsPVMigratable(pv *v1.PersistentVolume) bool
    44  	IsInlineMigratable(vol *v1.Volume) bool
    45  	IsMigratableIntreePluginByName(inTreePluginName string) bool
    46  	GetInTreePluginNameFromSpec(pv *v1.PersistentVolume, vol *v1.Volume) (string, error)
    47  	GetCSINameFromInTreeName(pluginName string) (string, error)
    48  	TranslateInTreePVToCSI(pv *v1.PersistentVolume) (*v1.PersistentVolume, error)
    49  	TranslateInTreeInlineVolumeToCSI(volume *v1.Volume, podNamespace string) (*v1.PersistentVolume, error)
    50  }
    51  
    52  // CSILimits is a plugin that checks node volume limits.
    53  type CSILimits struct {
    54  	csiNodeLister storagelisters.CSINodeLister
    55  	pvLister      corelisters.PersistentVolumeLister
    56  	pvcLister     corelisters.PersistentVolumeClaimLister
    57  	scLister      storagelisters.StorageClassLister
    58  
    59  	randomVolumeIDPrefix string
    60  
    61  	translator InTreeToCSITranslator
    62  }
    63  
    64  var _ framework.PreFilterPlugin = &CSILimits{}
    65  var _ framework.FilterPlugin = &CSILimits{}
    66  var _ framework.EnqueueExtensions = &CSILimits{}
    67  
    68  // CSIName is the name of the plugin used in the plugin registry and configurations.
    69  const CSIName = names.NodeVolumeLimits
    70  
    71  // Name returns name of the plugin. It is used in logs, etc.
    72  func (pl *CSILimits) Name() string {
    73  	return CSIName
    74  }
    75  
    76  // EventsToRegister returns the possible events that may make a Pod
    77  // failed by this plugin schedulable.
    78  func (pl *CSILimits) EventsToRegister() []framework.ClusterEventWithHint {
    79  	return []framework.ClusterEventWithHint{
    80  		// We don't register any `QueueingHintFn` intentionally
    81  		// because any new CSINode could make pods that were rejected by CSI volumes schedulable.
    82  		{Event: framework.ClusterEvent{Resource: framework.CSINode, ActionType: framework.Add}},
    83  		{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Delete}},
    84  		{Event: framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add}},
    85  	}
    86  }
    87  
    88  // PreFilter invoked at the prefilter extension point
    89  //
    90  // If the pod haven't those types of volumes, we'll skip the Filter phase
    91  func (pl *CSILimits) PreFilter(ctx context.Context, _ *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
    92  	volumes := pod.Spec.Volumes
    93  	for i := range volumes {
    94  		vol := &volumes[i]
    95  		if vol.PersistentVolumeClaim != nil || vol.Ephemeral != nil || pl.translator.IsInlineMigratable(vol) {
    96  			return nil, nil
    97  		}
    98  	}
    99  
   100  	return nil, framework.NewStatus(framework.Skip)
   101  }
   102  
   103  // PreFilterExtensions returns prefilter extensions, pod add and remove.
   104  func (pl *CSILimits) PreFilterExtensions() framework.PreFilterExtensions {
   105  	return nil
   106  }
   107  
   108  // Filter invoked at the filter extension point.
   109  func (pl *CSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
   110  	// If the new pod doesn't have any volume attached to it, the predicate will always be true
   111  	if len(pod.Spec.Volumes) == 0 {
   112  		return nil
   113  	}
   114  
   115  	node := nodeInfo.Node()
   116  
   117  	logger := klog.FromContext(ctx)
   118  
   119  	// If CSINode doesn't exist, the predicate may read the limits from Node object
   120  	csiNode, err := pl.csiNodeLister.Get(node.Name)
   121  	if err != nil {
   122  		// TODO: return the error once CSINode is created by default (2 releases)
   123  		logger.V(5).Info("Could not get a CSINode object for the node", "node", klog.KObj(node), "err", err)
   124  	}
   125  
   126  	newVolumes := make(map[string]string)
   127  	if err := pl.filterAttachableVolumes(logger, pod, csiNode, true /* new pod */, newVolumes); err != nil {
   128  		if apierrors.IsNotFound(err) {
   129  			// PVC is not found. This Pod will never be schedulable until PVC is created.
   130  			return framework.NewStatus(framework.UnschedulableAndUnresolvable, err.Error())
   131  		}
   132  		return framework.AsStatus(err)
   133  	}
   134  
   135  	// If the pod doesn't have any new CSI volumes, the predicate will always be true
   136  	if len(newVolumes) == 0 {
   137  		return nil
   138  	}
   139  
   140  	// If the node doesn't have volume limits, the predicate will always be true
   141  	nodeVolumeLimits := getVolumeLimits(nodeInfo, csiNode)
   142  	if len(nodeVolumeLimits) == 0 {
   143  		return nil
   144  	}
   145  
   146  	attachedVolumes := make(map[string]string)
   147  	for _, existingPod := range nodeInfo.Pods {
   148  		if err := pl.filterAttachableVolumes(logger, existingPod.Pod, csiNode, false /* existing pod */, attachedVolumes); err != nil {
   149  			return framework.AsStatus(err)
   150  		}
   151  	}
   152  
   153  	attachedVolumeCount := map[string]int{}
   154  	for volumeUniqueName, volumeLimitKey := range attachedVolumes {
   155  		// Don't count single volume used in multiple pods more than once
   156  		delete(newVolumes, volumeUniqueName)
   157  		attachedVolumeCount[volumeLimitKey]++
   158  	}
   159  
   160  	newVolumeCount := map[string]int{}
   161  	for _, volumeLimitKey := range newVolumes {
   162  		newVolumeCount[volumeLimitKey]++
   163  	}
   164  
   165  	for volumeLimitKey, count := range newVolumeCount {
   166  		maxVolumeLimit, ok := nodeVolumeLimits[v1.ResourceName(volumeLimitKey)]
   167  		if ok {
   168  			currentVolumeCount := attachedVolumeCount[volumeLimitKey]
   169  			logger.V(5).Info("Found plugin volume limits", "node", node.Name, "volumeLimitKey", volumeLimitKey,
   170  				"maxLimits", maxVolumeLimit, "currentVolumeCount", currentVolumeCount, "newVolumeCount", count,
   171  				"pod", klog.KObj(pod))
   172  			if currentVolumeCount+count > int(maxVolumeLimit) {
   173  				return framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded)
   174  			}
   175  		}
   176  	}
   177  
   178  	return nil
   179  }
   180  
   181  func (pl *CSILimits) filterAttachableVolumes(
   182  	logger klog.Logger, pod *v1.Pod, csiNode *storagev1.CSINode, newPod bool, result map[string]string) error {
   183  	for _, vol := range pod.Spec.Volumes {
   184  		pvcName := ""
   185  		isEphemeral := false
   186  		switch {
   187  		case vol.PersistentVolumeClaim != nil:
   188  			// Normal CSI volume can only be used through PVC
   189  			pvcName = vol.PersistentVolumeClaim.ClaimName
   190  		case vol.Ephemeral != nil:
   191  			// Generic ephemeral inline volumes also use a PVC,
   192  			// just with a computed name and certain ownership.
   193  			// That is checked below once the pvc object is
   194  			// retrieved.
   195  			pvcName = ephemeral.VolumeClaimName(pod, &vol)
   196  			isEphemeral = true
   197  		default:
   198  			// Inline Volume does not have PVC.
   199  			// Need to check if CSI migration is enabled for this inline volume.
   200  			// - If the volume is migratable and CSI migration is enabled, need to count it
   201  			// as well.
   202  			// - If the volume is not migratable, it will be count in non_csi filter.
   203  			if err := pl.checkAttachableInlineVolume(logger, &vol, csiNode, pod, result); err != nil {
   204  				return err
   205  			}
   206  
   207  			continue
   208  		}
   209  
   210  		if pvcName == "" {
   211  			return fmt.Errorf("PersistentVolumeClaim had no name")
   212  		}
   213  
   214  		pvc, err := pl.pvcLister.PersistentVolumeClaims(pod.Namespace).Get(pvcName)
   215  
   216  		if err != nil {
   217  			if newPod {
   218  				// The PVC is required to proceed with
   219  				// scheduling of a new pod because it cannot
   220  				// run without it. Bail out immediately.
   221  				return fmt.Errorf("looking up PVC %s/%s: %w", pod.Namespace, pvcName, err)
   222  			}
   223  			// If the PVC is invalid, we don't count the volume because
   224  			// there's no guarantee that it belongs to the running predicate.
   225  			logger.V(5).Info("Unable to look up PVC info", "pod", klog.KObj(pod), "PVC", klog.KRef(pod.Namespace, pvcName))
   226  			continue
   227  		}
   228  
   229  		// The PVC for an ephemeral volume must be owned by the pod.
   230  		if isEphemeral {
   231  			if err := ephemeral.VolumeIsForPod(pod, pvc); err != nil {
   232  				return err
   233  			}
   234  		}
   235  
   236  		driverName, volumeHandle := pl.getCSIDriverInfo(logger, csiNode, pvc)
   237  		if driverName == "" || volumeHandle == "" {
   238  			logger.V(5).Info("Could not find a CSI driver name or volume handle, not counting volume")
   239  			continue
   240  		}
   241  
   242  		volumeUniqueName := fmt.Sprintf("%s/%s", driverName, volumeHandle)
   243  		volumeLimitKey := volumeutil.GetCSIAttachLimitKey(driverName)
   244  		result[volumeUniqueName] = volumeLimitKey
   245  	}
   246  	return nil
   247  }
   248  
   249  // checkAttachableInlineVolume takes an inline volume and add to the result map if the
   250  // volume is migratable and CSI migration for this plugin has been enabled.
   251  func (pl *CSILimits) checkAttachableInlineVolume(logger klog.Logger, vol *v1.Volume, csiNode *storagev1.CSINode,
   252  	pod *v1.Pod, result map[string]string) error {
   253  	if !pl.translator.IsInlineMigratable(vol) {
   254  		return nil
   255  	}
   256  	// Check if the intree provisioner CSI migration has been enabled.
   257  	inTreeProvisionerName, err := pl.translator.GetInTreePluginNameFromSpec(nil, vol)
   258  	if err != nil {
   259  		return fmt.Errorf("looking up provisioner name for volume %s: %w", vol.Name, err)
   260  	}
   261  	if !isCSIMigrationOn(csiNode, inTreeProvisionerName) {
   262  		csiNodeName := ""
   263  		if csiNode != nil {
   264  			csiNodeName = csiNode.Name
   265  		}
   266  		logger.V(5).Info("CSI Migration is not enabled for provisioner", "provisioner", inTreeProvisionerName,
   267  			"pod", klog.KObj(pod), "csiNode", csiNodeName)
   268  		return nil
   269  	}
   270  	// Do translation for the in-tree volume.
   271  	translatedPV, err := pl.translator.TranslateInTreeInlineVolumeToCSI(vol, pod.Namespace)
   272  	if err != nil || translatedPV == nil {
   273  		return fmt.Errorf("converting volume(%s) from inline to csi: %w", vol.Name, err)
   274  	}
   275  	driverName, err := pl.translator.GetCSINameFromInTreeName(inTreeProvisionerName)
   276  	if err != nil {
   277  		return fmt.Errorf("looking up CSI driver name for provisioner %s: %w", inTreeProvisionerName, err)
   278  	}
   279  	// TranslateInTreeInlineVolumeToCSI should translate inline volume to CSI. If it is not set,
   280  	// the volume does not support inline. Skip the count.
   281  	if translatedPV.Spec.PersistentVolumeSource.CSI == nil {
   282  		return nil
   283  	}
   284  	volumeUniqueName := fmt.Sprintf("%s/%s", driverName, translatedPV.Spec.PersistentVolumeSource.CSI.VolumeHandle)
   285  	volumeLimitKey := volumeutil.GetCSIAttachLimitKey(driverName)
   286  	result[volumeUniqueName] = volumeLimitKey
   287  	return nil
   288  }
   289  
   290  // getCSIDriverInfo returns the CSI driver name and volume ID of a given PVC.
   291  // If the PVC is from a migrated in-tree plugin, this function will return
   292  // the information of the CSI driver that the plugin has been migrated to.
   293  func (pl *CSILimits) getCSIDriverInfo(logger klog.Logger, csiNode *storagev1.CSINode, pvc *v1.PersistentVolumeClaim) (string, string) {
   294  	pvName := pvc.Spec.VolumeName
   295  
   296  	if pvName == "" {
   297  		logger.V(5).Info("Persistent volume had no name for claim", "PVC", klog.KObj(pvc))
   298  		return pl.getCSIDriverInfoFromSC(logger, csiNode, pvc)
   299  	}
   300  
   301  	pv, err := pl.pvLister.Get(pvName)
   302  	if err != nil {
   303  		logger.V(5).Info("Unable to look up PV info for PVC and PV", "PVC", klog.KObj(pvc), "PV", klog.KRef("", pvName))
   304  		// If we can't fetch PV associated with PVC, may be it got deleted
   305  		// or PVC was prebound to a PVC that hasn't been created yet.
   306  		// fallback to using StorageClass for volume counting
   307  		return pl.getCSIDriverInfoFromSC(logger, csiNode, pvc)
   308  	}
   309  
   310  	csiSource := pv.Spec.PersistentVolumeSource.CSI
   311  	if csiSource == nil {
   312  		// We make a fast path for non-CSI volumes that aren't migratable
   313  		if !pl.translator.IsPVMigratable(pv) {
   314  			return "", ""
   315  		}
   316  
   317  		pluginName, err := pl.translator.GetInTreePluginNameFromSpec(pv, nil)
   318  		if err != nil {
   319  			logger.V(5).Info("Unable to look up plugin name from PV spec", "err", err)
   320  			return "", ""
   321  		}
   322  
   323  		if !isCSIMigrationOn(csiNode, pluginName) {
   324  			logger.V(5).Info("CSI Migration of plugin is not enabled", "plugin", pluginName)
   325  			return "", ""
   326  		}
   327  
   328  		csiPV, err := pl.translator.TranslateInTreePVToCSI(pv)
   329  		if err != nil {
   330  			logger.V(5).Info("Unable to translate in-tree volume to CSI", "err", err)
   331  			return "", ""
   332  		}
   333  
   334  		if csiPV.Spec.PersistentVolumeSource.CSI == nil {
   335  			logger.V(5).Info("Unable to get a valid volume source for translated PV", "PV", pvName)
   336  			return "", ""
   337  		}
   338  
   339  		csiSource = csiPV.Spec.PersistentVolumeSource.CSI
   340  	}
   341  
   342  	return csiSource.Driver, csiSource.VolumeHandle
   343  }
   344  
   345  // getCSIDriverInfoFromSC returns the CSI driver name and a random volume ID of a given PVC's StorageClass.
   346  func (pl *CSILimits) getCSIDriverInfoFromSC(logger klog.Logger, csiNode *storagev1.CSINode, pvc *v1.PersistentVolumeClaim) (string, string) {
   347  	namespace := pvc.Namespace
   348  	pvcName := pvc.Name
   349  	scName := storagehelpers.GetPersistentVolumeClaimClass(pvc)
   350  
   351  	// If StorageClass is not set or not found, then PVC must be using immediate binding mode
   352  	// and hence it must be bound before scheduling. So it is safe to not count it.
   353  	if scName == "" {
   354  		logger.V(5).Info("PVC has no StorageClass", "PVC", klog.KObj(pvc))
   355  		return "", ""
   356  	}
   357  
   358  	storageClass, err := pl.scLister.Get(scName)
   359  	if err != nil {
   360  		logger.V(5).Info("Could not get StorageClass for PVC", "PVC", klog.KObj(pvc), "err", err)
   361  		return "", ""
   362  	}
   363  
   364  	// We use random prefix to avoid conflict with volume IDs. If PVC is bound during the execution of the
   365  	// predicate and there is another pod on the same node that uses same volume, then we will overcount
   366  	// the volume and consider both volumes as different.
   367  	volumeHandle := fmt.Sprintf("%s-%s/%s", pl.randomVolumeIDPrefix, namespace, pvcName)
   368  
   369  	provisioner := storageClass.Provisioner
   370  	if pl.translator.IsMigratableIntreePluginByName(provisioner) {
   371  		if !isCSIMigrationOn(csiNode, provisioner) {
   372  			logger.V(5).Info("CSI Migration of provisioner is not enabled", "provisioner", provisioner)
   373  			return "", ""
   374  		}
   375  
   376  		driverName, err := pl.translator.GetCSINameFromInTreeName(provisioner)
   377  		if err != nil {
   378  			logger.V(5).Info("Unable to look up driver name from provisioner name", "provisioner", provisioner, "err", err)
   379  			return "", ""
   380  		}
   381  		return driverName, volumeHandle
   382  	}
   383  
   384  	return provisioner, volumeHandle
   385  }
   386  
   387  // NewCSI initializes a new plugin and returns it.
   388  func NewCSI(_ context.Context, _ runtime.Object, handle framework.Handle, fts feature.Features) (framework.Plugin, error) {
   389  	informerFactory := handle.SharedInformerFactory()
   390  	pvLister := informerFactory.Core().V1().PersistentVolumes().Lister()
   391  	pvcLister := informerFactory.Core().V1().PersistentVolumeClaims().Lister()
   392  	csiNodesLister := informerFactory.Storage().V1().CSINodes().Lister()
   393  	scLister := informerFactory.Storage().V1().StorageClasses().Lister()
   394  	csiTranslator := csitrans.New()
   395  
   396  	return &CSILimits{
   397  		csiNodeLister:        csiNodesLister,
   398  		pvLister:             pvLister,
   399  		pvcLister:            pvcLister,
   400  		scLister:             scLister,
   401  		randomVolumeIDPrefix: rand.String(32),
   402  		translator:           csiTranslator,
   403  	}, nil
   404  }
   405  
   406  func getVolumeLimits(nodeInfo *framework.NodeInfo, csiNode *storagev1.CSINode) map[v1.ResourceName]int64 {
   407  	// TODO: stop getting values from Node object in v1.18
   408  	nodeVolumeLimits := volumeLimits(nodeInfo)
   409  	if csiNode != nil {
   410  		for i := range csiNode.Spec.Drivers {
   411  			d := csiNode.Spec.Drivers[i]
   412  			if d.Allocatable != nil && d.Allocatable.Count != nil {
   413  				// TODO: drop GetCSIAttachLimitKey once we don't get values from Node object (v1.18)
   414  				k := v1.ResourceName(volumeutil.GetCSIAttachLimitKey(d.Name))
   415  				nodeVolumeLimits[k] = int64(*d.Allocatable.Count)
   416  			}
   417  		}
   418  	}
   419  	return nodeVolumeLimits
   420  }
   421  

View as plain text