...

Source file src/k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding/volume_binding.go

Documentation: k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding

     1  /*
     2  Copyright 2019 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package volumebinding
    18  
    19  import (
    20  	"context"
    21  	"errors"
    22  	"fmt"
    23  	"sync"
    24  	"time"
    25  
    26  	v1 "k8s.io/api/core/v1"
    27  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    28  	"k8s.io/apimachinery/pkg/runtime"
    29  	corelisters "k8s.io/client-go/listers/core/v1"
    30  	"k8s.io/component-helpers/storage/ephemeral"
    31  	"k8s.io/klog/v2"
    32  	"k8s.io/kubernetes/pkg/scheduler/apis/config"
    33  	"k8s.io/kubernetes/pkg/scheduler/apis/config/validation"
    34  	"k8s.io/kubernetes/pkg/scheduler/framework"
    35  	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
    36  	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper"
    37  	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
    38  )
    39  
    40  const (
    41  	stateKey framework.StateKey = Name
    42  
    43  	maxUtilization = 100
    44  )
    45  
    46  // the state is initialized in PreFilter phase. because we save the pointer in
    47  // framework.CycleState, in the later phases we don't need to call Write method
    48  // to update the value
    49  type stateData struct {
    50  	allBound bool
    51  	// podVolumesByNode holds the pod's volume information found in the Filter
    52  	// phase for each node
    53  	// it's initialized in the PreFilter phase
    54  	podVolumesByNode map[string]*PodVolumes
    55  	podVolumeClaims  *PodVolumeClaims
    56  	// hasStaticBindings declares whether the pod contains one or more StaticBinding.
    57  	// If not, vloumeBinding will skip score extension point.
    58  	hasStaticBindings bool
    59  	sync.Mutex
    60  }
    61  
    62  func (d *stateData) Clone() framework.StateData {
    63  	return d
    64  }
    65  
    66  // VolumeBinding is a plugin that binds pod volumes in scheduling.
    67  // In the Filter phase, pod binding cache is created for the pod and used in
    68  // Reserve and PreBind phases.
    69  type VolumeBinding struct {
    70  	Binder    SchedulerVolumeBinder
    71  	PVCLister corelisters.PersistentVolumeClaimLister
    72  	scorer    volumeCapacityScorer
    73  	fts       feature.Features
    74  }
    75  
    76  var _ framework.PreFilterPlugin = &VolumeBinding{}
    77  var _ framework.FilterPlugin = &VolumeBinding{}
    78  var _ framework.ReservePlugin = &VolumeBinding{}
    79  var _ framework.PreBindPlugin = &VolumeBinding{}
    80  var _ framework.PreScorePlugin = &VolumeBinding{}
    81  var _ framework.ScorePlugin = &VolumeBinding{}
    82  var _ framework.EnqueueExtensions = &VolumeBinding{}
    83  
    84  // Name is the name of the plugin used in Registry and configurations.
    85  const Name = names.VolumeBinding
    86  
    87  // Name returns name of the plugin. It is used in logs, etc.
    88  func (pl *VolumeBinding) Name() string {
    89  	return Name
    90  }
    91  
    92  // EventsToRegister returns the possible events that may make a Pod
    93  // failed by this plugin schedulable.
    94  func (pl *VolumeBinding) EventsToRegister() []framework.ClusterEventWithHint {
    95  	events := []framework.ClusterEventWithHint{
    96  		// Pods may fail because of missing or mis-configured storage class
    97  		// (e.g., allowedTopologies, volumeBindingMode), and hence may become
    98  		// schedulable upon StorageClass Add or Update events.
    99  		{Event: framework.ClusterEvent{Resource: framework.StorageClass, ActionType: framework.Add | framework.Update}},
   100  		// We bind PVCs with PVs, so any changes may make the pods schedulable.
   101  		{Event: framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add | framework.Update}},
   102  		{Event: framework.ClusterEvent{Resource: framework.PersistentVolume, ActionType: framework.Add | framework.Update}},
   103  		// Pods may fail to find available PVs because the node labels do not
   104  		// match the storage class's allowed topologies or PV's node affinity.
   105  		// A new or updated node may make pods schedulable.
   106  		//
   107  		// A note about UpdateNodeTaint event:
   108  		// NodeAdd QueueingHint isn't always called because of the internal feature called preCheck.
   109  		// As a common problematic scenario,
   110  		// when a node is added but not ready, NodeAdd event is filtered out by preCheck and doesn't arrive.
   111  		// In such cases, this plugin may miss some events that actually make pods schedulable.
   112  		// As a workaround, we add UpdateNodeTaint event to catch the case.
   113  		// We can remove UpdateNodeTaint when we remove the preCheck feature.
   114  		// See: https://github.com/kubernetes/kubernetes/issues/110175
   115  		{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel | framework.UpdateNodeTaint}},
   116  		// We rely on CSI node to translate in-tree PV to CSI.
   117  		{Event: framework.ClusterEvent{Resource: framework.CSINode, ActionType: framework.Add | framework.Update}},
   118  		// When CSIStorageCapacity is enabled, pods may become schedulable
   119  		// on CSI driver & storage capacity changes.
   120  		{Event: framework.ClusterEvent{Resource: framework.CSIDriver, ActionType: framework.Add | framework.Update}},
   121  		{Event: framework.ClusterEvent{Resource: framework.CSIStorageCapacity, ActionType: framework.Add | framework.Update}},
   122  	}
   123  	return events
   124  }
   125  
   126  // podHasPVCs returns 2 values:
   127  // - the first one to denote if the given "pod" has any PVC defined.
   128  // - the second one to return any error if the requested PVC is illegal.
   129  func (pl *VolumeBinding) podHasPVCs(pod *v1.Pod) (bool, error) {
   130  	hasPVC := false
   131  	for _, vol := range pod.Spec.Volumes {
   132  		var pvcName string
   133  		isEphemeral := false
   134  		switch {
   135  		case vol.PersistentVolumeClaim != nil:
   136  			pvcName = vol.PersistentVolumeClaim.ClaimName
   137  		case vol.Ephemeral != nil:
   138  			pvcName = ephemeral.VolumeClaimName(pod, &vol)
   139  			isEphemeral = true
   140  		default:
   141  			// Volume is not using a PVC, ignore
   142  			continue
   143  		}
   144  		hasPVC = true
   145  		pvc, err := pl.PVCLister.PersistentVolumeClaims(pod.Namespace).Get(pvcName)
   146  		if err != nil {
   147  			// The error usually has already enough context ("persistentvolumeclaim "myclaim" not found"),
   148  			// but we can do better for generic ephemeral inline volumes where that situation
   149  			// is normal directly after creating a pod.
   150  			if isEphemeral && apierrors.IsNotFound(err) {
   151  				err = fmt.Errorf("waiting for ephemeral volume controller to create the persistentvolumeclaim %q", pvcName)
   152  			}
   153  			return hasPVC, err
   154  		}
   155  
   156  		if pvc.Status.Phase == v1.ClaimLost {
   157  			return hasPVC, fmt.Errorf("persistentvolumeclaim %q bound to non-existent persistentvolume %q", pvc.Name, pvc.Spec.VolumeName)
   158  		}
   159  
   160  		if pvc.DeletionTimestamp != nil {
   161  			return hasPVC, fmt.Errorf("persistentvolumeclaim %q is being deleted", pvc.Name)
   162  		}
   163  
   164  		if isEphemeral {
   165  			if err := ephemeral.VolumeIsForPod(pod, pvc); err != nil {
   166  				return hasPVC, err
   167  			}
   168  		}
   169  	}
   170  	return hasPVC, nil
   171  }
   172  
   173  // PreFilter invoked at the prefilter extension point to check if pod has all
   174  // immediate PVCs bound. If not all immediate PVCs are bound, an
   175  // UnschedulableAndUnresolvable is returned.
   176  func (pl *VolumeBinding) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
   177  	logger := klog.FromContext(ctx)
   178  	// If pod does not reference any PVC, we don't need to do anything.
   179  	if hasPVC, err := pl.podHasPVCs(pod); err != nil {
   180  		return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, err.Error())
   181  	} else if !hasPVC {
   182  		state.Write(stateKey, &stateData{})
   183  		return nil, framework.NewStatus(framework.Skip)
   184  	}
   185  	podVolumeClaims, err := pl.Binder.GetPodVolumeClaims(logger, pod)
   186  	if err != nil {
   187  		return nil, framework.AsStatus(err)
   188  	}
   189  	if len(podVolumeClaims.unboundClaimsImmediate) > 0 {
   190  		// Return UnschedulableAndUnresolvable error if immediate claims are
   191  		// not bound. Pod will be moved to active/backoff queues once these
   192  		// claims are bound by PV controller.
   193  		status := framework.NewStatus(framework.UnschedulableAndUnresolvable)
   194  		status.AppendReason("pod has unbound immediate PersistentVolumeClaims")
   195  		return nil, status
   196  	}
   197  	// Attempt to reduce down the number of nodes to consider in subsequent scheduling stages if pod has bound claims.
   198  	var result *framework.PreFilterResult
   199  	if eligibleNodes := pl.Binder.GetEligibleNodes(logger, podVolumeClaims.boundClaims); eligibleNodes != nil {
   200  		result = &framework.PreFilterResult{
   201  			NodeNames: eligibleNodes,
   202  		}
   203  	}
   204  
   205  	state.Write(stateKey, &stateData{
   206  		podVolumesByNode: make(map[string]*PodVolumes),
   207  		podVolumeClaims: &PodVolumeClaims{
   208  			boundClaims:                podVolumeClaims.boundClaims,
   209  			unboundClaimsDelayBinding:  podVolumeClaims.unboundClaimsDelayBinding,
   210  			unboundVolumesDelayBinding: podVolumeClaims.unboundVolumesDelayBinding,
   211  		},
   212  	})
   213  	return result, nil
   214  }
   215  
   216  // PreFilterExtensions returns prefilter extensions, pod add and remove.
   217  func (pl *VolumeBinding) PreFilterExtensions() framework.PreFilterExtensions {
   218  	return nil
   219  }
   220  
   221  func getStateData(cs *framework.CycleState) (*stateData, error) {
   222  	state, err := cs.Read(stateKey)
   223  	if err != nil {
   224  		return nil, err
   225  	}
   226  	s, ok := state.(*stateData)
   227  	if !ok {
   228  		return nil, errors.New("unable to convert state into stateData")
   229  	}
   230  	return s, nil
   231  }
   232  
   233  // Filter invoked at the filter extension point.
   234  // It evaluates if a pod can fit due to the volumes it requests,
   235  // for both bound and unbound PVCs.
   236  //
   237  // For PVCs that are bound, then it checks that the corresponding PV's node affinity is
   238  // satisfied by the given node.
   239  //
   240  // For PVCs that are unbound, it tries to find available PVs that can satisfy the PVC requirements
   241  // and that the PV node affinity is satisfied by the given node.
   242  //
   243  // If storage capacity tracking is enabled, then enough space has to be available
   244  // for the node and volumes that still need to be created.
   245  //
   246  // The predicate returns true if all bound PVCs have compatible PVs with the node, and if all unbound
   247  // PVCs can be matched with an available and node-compatible PV.
   248  func (pl *VolumeBinding) Filter(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
   249  	logger := klog.FromContext(ctx)
   250  	node := nodeInfo.Node()
   251  
   252  	state, err := getStateData(cs)
   253  	if err != nil {
   254  		return framework.AsStatus(err)
   255  	}
   256  
   257  	podVolumes, reasons, err := pl.Binder.FindPodVolumes(logger, pod, state.podVolumeClaims, node)
   258  
   259  	if err != nil {
   260  		return framework.AsStatus(err)
   261  	}
   262  
   263  	if len(reasons) > 0 {
   264  		status := framework.NewStatus(framework.UnschedulableAndUnresolvable)
   265  		for _, reason := range reasons {
   266  			status.AppendReason(string(reason))
   267  		}
   268  		return status
   269  	}
   270  
   271  	// multiple goroutines call `Filter` on different nodes simultaneously and the `CycleState` may be duplicated, so we must use a local lock here
   272  	state.Lock()
   273  	state.podVolumesByNode[node.Name] = podVolumes
   274  	state.hasStaticBindings = state.hasStaticBindings || (podVolumes != nil && len(podVolumes.StaticBindings) > 0)
   275  	state.Unlock()
   276  	return nil
   277  }
   278  
   279  // PreScore invoked at the preScore extension point. It checks whether volumeBinding can skip Score
   280  func (pl *VolumeBinding) PreScore(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodes []*framework.NodeInfo) *framework.Status {
   281  	if pl.scorer == nil {
   282  		return framework.NewStatus(framework.Skip)
   283  	}
   284  	state, err := getStateData(cs)
   285  	if err != nil {
   286  		return framework.AsStatus(err)
   287  	}
   288  	if state.hasStaticBindings {
   289  		return nil
   290  	}
   291  	return framework.NewStatus(framework.Skip)
   292  }
   293  
   294  // Score invoked at the score extension point.
   295  func (pl *VolumeBinding) Score(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
   296  	if pl.scorer == nil {
   297  		return 0, nil
   298  	}
   299  	state, err := getStateData(cs)
   300  	if err != nil {
   301  		return 0, framework.AsStatus(err)
   302  	}
   303  	podVolumes, ok := state.podVolumesByNode[nodeName]
   304  	if !ok {
   305  		return 0, nil
   306  	}
   307  	// group by storage class
   308  	classResources := make(classResourceMap)
   309  	for _, staticBinding := range podVolumes.StaticBindings {
   310  		class := staticBinding.StorageClassName()
   311  		storageResource := staticBinding.StorageResource()
   312  		if _, ok := classResources[class]; !ok {
   313  			classResources[class] = &StorageResource{
   314  				Requested: 0,
   315  				Capacity:  0,
   316  			}
   317  		}
   318  		classResources[class].Requested += storageResource.Requested
   319  		classResources[class].Capacity += storageResource.Capacity
   320  	}
   321  	return pl.scorer(classResources), nil
   322  }
   323  
   324  // ScoreExtensions of the Score plugin.
   325  func (pl *VolumeBinding) ScoreExtensions() framework.ScoreExtensions {
   326  	return nil
   327  }
   328  
   329  // Reserve reserves volumes of pod and saves binding status in cycle state.
   330  func (pl *VolumeBinding) Reserve(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
   331  	state, err := getStateData(cs)
   332  	if err != nil {
   333  		return framework.AsStatus(err)
   334  	}
   335  	// we don't need to hold the lock as only one node will be reserved for the given pod
   336  	podVolumes, ok := state.podVolumesByNode[nodeName]
   337  	if ok {
   338  		allBound, err := pl.Binder.AssumePodVolumes(klog.FromContext(ctx), pod, nodeName, podVolumes)
   339  		if err != nil {
   340  			return framework.AsStatus(err)
   341  		}
   342  		state.allBound = allBound
   343  	} else {
   344  		// may not exist if the pod does not reference any PVC
   345  		state.allBound = true
   346  	}
   347  	return nil
   348  }
   349  
   350  // PreBind will make the API update with the assumed bindings and wait until
   351  // the PV controller has completely finished the binding operation.
   352  //
   353  // If binding errors, times out or gets undone, then an error will be returned to
   354  // retry scheduling.
   355  func (pl *VolumeBinding) PreBind(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
   356  	s, err := getStateData(cs)
   357  	if err != nil {
   358  		return framework.AsStatus(err)
   359  	}
   360  	if s.allBound {
   361  		// no need to bind volumes
   362  		return nil
   363  	}
   364  	// we don't need to hold the lock as only one node will be pre-bound for the given pod
   365  	podVolumes, ok := s.podVolumesByNode[nodeName]
   366  	if !ok {
   367  		return framework.AsStatus(fmt.Errorf("no pod volumes found for node %q", nodeName))
   368  	}
   369  	logger := klog.FromContext(ctx)
   370  	logger.V(5).Info("Trying to bind volumes for pod", "pod", klog.KObj(pod))
   371  	err = pl.Binder.BindPodVolumes(ctx, pod, podVolumes)
   372  	if err != nil {
   373  		logger.V(5).Info("Failed to bind volumes for pod", "pod", klog.KObj(pod), "err", err)
   374  		return framework.AsStatus(err)
   375  	}
   376  	logger.V(5).Info("Success binding volumes for pod", "pod", klog.KObj(pod))
   377  	return nil
   378  }
   379  
   380  // Unreserve clears assumed PV and PVC cache.
   381  // It's idempotent, and does nothing if no cache found for the given pod.
   382  func (pl *VolumeBinding) Unreserve(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) {
   383  	s, err := getStateData(cs)
   384  	if err != nil {
   385  		return
   386  	}
   387  	// we don't need to hold the lock as only one node may be unreserved
   388  	podVolumes, ok := s.podVolumesByNode[nodeName]
   389  	if !ok {
   390  		return
   391  	}
   392  	pl.Binder.RevertAssumedPodVolumes(podVolumes)
   393  }
   394  
   395  // New initializes a new plugin and returns it.
   396  func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts feature.Features) (framework.Plugin, error) {
   397  	args, ok := plArgs.(*config.VolumeBindingArgs)
   398  	if !ok {
   399  		return nil, fmt.Errorf("want args to be of type VolumeBindingArgs, got %T", plArgs)
   400  	}
   401  	if err := validation.ValidateVolumeBindingArgsWithOptions(nil, args, validation.VolumeBindingArgsValidationOptions{
   402  		AllowVolumeCapacityPriority: fts.EnableVolumeCapacityPriority,
   403  	}); err != nil {
   404  		return nil, err
   405  	}
   406  	podInformer := fh.SharedInformerFactory().Core().V1().Pods()
   407  	nodeInformer := fh.SharedInformerFactory().Core().V1().Nodes()
   408  	pvcInformer := fh.SharedInformerFactory().Core().V1().PersistentVolumeClaims()
   409  	pvInformer := fh.SharedInformerFactory().Core().V1().PersistentVolumes()
   410  	storageClassInformer := fh.SharedInformerFactory().Storage().V1().StorageClasses()
   411  	csiNodeInformer := fh.SharedInformerFactory().Storage().V1().CSINodes()
   412  	capacityCheck := CapacityCheck{
   413  		CSIDriverInformer:          fh.SharedInformerFactory().Storage().V1().CSIDrivers(),
   414  		CSIStorageCapacityInformer: fh.SharedInformerFactory().Storage().V1().CSIStorageCapacities(),
   415  	}
   416  	binder := NewVolumeBinder(klog.FromContext(ctx), fh.ClientSet(), podInformer, nodeInformer, csiNodeInformer, pvcInformer, pvInformer, storageClassInformer, capacityCheck, time.Duration(args.BindTimeoutSeconds)*time.Second)
   417  
   418  	// build score function
   419  	var scorer volumeCapacityScorer
   420  	if fts.EnableVolumeCapacityPriority {
   421  		shape := make(helper.FunctionShape, 0, len(args.Shape))
   422  		for _, point := range args.Shape {
   423  			shape = append(shape, helper.FunctionShapePoint{
   424  				Utilization: int64(point.Utilization),
   425  				Score:       int64(point.Score) * (framework.MaxNodeScore / config.MaxCustomPriorityScore),
   426  			})
   427  		}
   428  		scorer = buildScorerFunction(shape)
   429  	}
   430  	return &VolumeBinding{
   431  		Binder:    binder,
   432  		PVCLister: pvcInformer.Lister(),
   433  		scorer:    scorer,
   434  		fts:       fts,
   435  	}, nil
   436  }
   437  

View as plain text