...

Source file src/k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions.go

Documentation: k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumerestrictions

     1  /*
     2  Copyright 2019 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package volumerestrictions
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  
    23  	v1 "k8s.io/api/core/v1"
    24  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    25  	"k8s.io/apimachinery/pkg/runtime"
    26  	"k8s.io/apimachinery/pkg/util/sets"
    27  	corelisters "k8s.io/client-go/listers/core/v1"
    28  	v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
    29  	"k8s.io/kubernetes/pkg/scheduler/framework"
    30  	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
    31  	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
    32  )
    33  
    34  // VolumeRestrictions is a plugin that checks volume restrictions.
    35  type VolumeRestrictions struct {
    36  	pvcLister    corelisters.PersistentVolumeClaimLister
    37  	sharedLister framework.SharedLister
    38  }
    39  
    40  var _ framework.PreFilterPlugin = &VolumeRestrictions{}
    41  var _ framework.FilterPlugin = &VolumeRestrictions{}
    42  var _ framework.EnqueueExtensions = &VolumeRestrictions{}
    43  var _ framework.StateData = &preFilterState{}
    44  
    45  const (
    46  	// Name is the name of the plugin used in the plugin registry and configurations.
    47  	Name = names.VolumeRestrictions
    48  	// preFilterStateKey is the key in CycleState to VolumeRestrictions pre-computed data for Filtering.
    49  	// Using the name of the plugin will likely help us avoid collisions with other plugins.
    50  	preFilterStateKey = "PreFilter" + Name
    51  
    52  	// ErrReasonDiskConflict is used for NoDiskConflict predicate error.
    53  	ErrReasonDiskConflict = "node(s) had no available disk"
    54  	// ErrReasonReadWriteOncePodConflict is used when a pod is found using the same PVC with the ReadWriteOncePod access mode.
    55  	ErrReasonReadWriteOncePodConflict = "node has pod using PersistentVolumeClaim with the same name and ReadWriteOncePod access mode"
    56  )
    57  
    58  // preFilterState computed at PreFilter and used at Filter.
    59  type preFilterState struct {
    60  	// Names of the pod's volumes using the ReadWriteOncePod access mode.
    61  	readWriteOncePodPVCs sets.Set[string]
    62  	// The number of references to these ReadWriteOncePod volumes by scheduled pods.
    63  	conflictingPVCRefCount int
    64  }
    65  
    66  func (s *preFilterState) updateWithPod(podInfo *framework.PodInfo, multiplier int) {
    67  	s.conflictingPVCRefCount += multiplier * s.conflictingPVCRefCountForPod(podInfo)
    68  }
    69  
    70  func (s *preFilterState) conflictingPVCRefCountForPod(podInfo *framework.PodInfo) int {
    71  	conflicts := 0
    72  	for _, volume := range podInfo.Pod.Spec.Volumes {
    73  		if volume.PersistentVolumeClaim == nil {
    74  			continue
    75  		}
    76  		if s.readWriteOncePodPVCs.Has(volume.PersistentVolumeClaim.ClaimName) {
    77  			conflicts += 1
    78  		}
    79  	}
    80  	return conflicts
    81  }
    82  
    83  // Clone the prefilter state.
    84  func (s *preFilterState) Clone() framework.StateData {
    85  	if s == nil {
    86  		return nil
    87  	}
    88  	return &preFilterState{
    89  		readWriteOncePodPVCs:   s.readWriteOncePodPVCs,
    90  		conflictingPVCRefCount: s.conflictingPVCRefCount,
    91  	}
    92  }
    93  
    94  // Name returns name of the plugin. It is used in logs, etc.
    95  func (pl *VolumeRestrictions) Name() string {
    96  	return Name
    97  }
    98  
    99  func isVolumeConflict(volume *v1.Volume, pod *v1.Pod) bool {
   100  	for _, existingVolume := range pod.Spec.Volumes {
   101  		// Same GCE disk mounted by multiple pods conflicts unless all pods mount it read-only.
   102  		if volume.GCEPersistentDisk != nil && existingVolume.GCEPersistentDisk != nil {
   103  			disk, existingDisk := volume.GCEPersistentDisk, existingVolume.GCEPersistentDisk
   104  			if disk.PDName == existingDisk.PDName && !(disk.ReadOnly && existingDisk.ReadOnly) {
   105  				return true
   106  			}
   107  		}
   108  
   109  		if volume.AWSElasticBlockStore != nil && existingVolume.AWSElasticBlockStore != nil {
   110  			if volume.AWSElasticBlockStore.VolumeID == existingVolume.AWSElasticBlockStore.VolumeID {
   111  				return true
   112  			}
   113  		}
   114  
   115  		if volume.ISCSI != nil && existingVolume.ISCSI != nil {
   116  			iqn := volume.ISCSI.IQN
   117  			eiqn := existingVolume.ISCSI.IQN
   118  			// two ISCSI volumes are same, if they share the same iqn. As iscsi volumes are of type
   119  			// RWO or ROX, we could permit only one RW mount. Same iscsi volume mounted by multiple Pods
   120  			// conflict unless all other pods mount as read only.
   121  			if iqn == eiqn && !(volume.ISCSI.ReadOnly && existingVolume.ISCSI.ReadOnly) {
   122  				return true
   123  			}
   124  		}
   125  
   126  		if volume.RBD != nil && existingVolume.RBD != nil {
   127  			mon, pool, image := volume.RBD.CephMonitors, volume.RBD.RBDPool, volume.RBD.RBDImage
   128  			emon, epool, eimage := existingVolume.RBD.CephMonitors, existingVolume.RBD.RBDPool, existingVolume.RBD.RBDImage
   129  			// two RBDs images are the same if they share the same Ceph monitor, are in the same RADOS Pool, and have the same image name
   130  			// only one read-write mount is permitted for the same RBD image.
   131  			// same RBD image mounted by multiple Pods conflicts unless all Pods mount the image read-only
   132  			if haveOverlap(mon, emon) && pool == epool && image == eimage && !(volume.RBD.ReadOnly && existingVolume.RBD.ReadOnly) {
   133  				return true
   134  			}
   135  		}
   136  	}
   137  
   138  	return false
   139  }
   140  
   141  // haveOverlap searches two arrays and returns true if they have at least one common element; returns false otherwise.
   142  func haveOverlap(a1, a2 []string) bool {
   143  	if len(a1) > len(a2) {
   144  		a1, a2 = a2, a1
   145  	}
   146  	m := sets.New(a1...)
   147  	for _, val := range a2 {
   148  		if _, ok := m[val]; ok {
   149  			return true
   150  		}
   151  	}
   152  
   153  	return false
   154  }
   155  
   156  // return true if there are conflict checking targets.
   157  func needsRestrictionsCheck(v v1.Volume) bool {
   158  	return v.GCEPersistentDisk != nil || v.AWSElasticBlockStore != nil || v.RBD != nil || v.ISCSI != nil
   159  }
   160  
   161  // PreFilter computes and stores cycleState containing details for enforcing ReadWriteOncePod.
   162  func (pl *VolumeRestrictions) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
   163  	needsCheck := false
   164  	for i := range pod.Spec.Volumes {
   165  		if needsRestrictionsCheck(pod.Spec.Volumes[i]) {
   166  			needsCheck = true
   167  			break
   168  		}
   169  	}
   170  
   171  	pvcs, err := pl.readWriteOncePodPVCsForPod(ctx, pod)
   172  	if err != nil {
   173  		if apierrors.IsNotFound(err) {
   174  			return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, err.Error())
   175  		}
   176  		return nil, framework.AsStatus(err)
   177  	}
   178  
   179  	s, err := pl.calPreFilterState(ctx, pod, pvcs)
   180  	if err != nil {
   181  		return nil, framework.AsStatus(err)
   182  	}
   183  
   184  	if !needsCheck && s.conflictingPVCRefCount == 0 {
   185  		return nil, framework.NewStatus(framework.Skip)
   186  	}
   187  	cycleState.Write(preFilterStateKey, s)
   188  	return nil, nil
   189  }
   190  
   191  // AddPod from pre-computed data in cycleState.
   192  func (pl *VolumeRestrictions) AddPod(ctx context.Context, cycleState *framework.CycleState, podToSchedule *v1.Pod, podInfoToAdd *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status {
   193  	state, err := getPreFilterState(cycleState)
   194  	if err != nil {
   195  		return framework.AsStatus(err)
   196  	}
   197  	state.updateWithPod(podInfoToAdd, 1)
   198  	return nil
   199  }
   200  
   201  // RemovePod from pre-computed data in cycleState.
   202  func (pl *VolumeRestrictions) RemovePod(ctx context.Context, cycleState *framework.CycleState, podToSchedule *v1.Pod, podInfoToRemove *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status {
   203  	state, err := getPreFilterState(cycleState)
   204  	if err != nil {
   205  		return framework.AsStatus(err)
   206  	}
   207  	state.updateWithPod(podInfoToRemove, -1)
   208  	return nil
   209  }
   210  
   211  func getPreFilterState(cycleState *framework.CycleState) (*preFilterState, error) {
   212  	c, err := cycleState.Read(preFilterStateKey)
   213  	if err != nil {
   214  		// preFilterState doesn't exist, likely PreFilter wasn't invoked.
   215  		return nil, fmt.Errorf("cannot read %q from cycleState", preFilterStateKey)
   216  	}
   217  
   218  	s, ok := c.(*preFilterState)
   219  	if !ok {
   220  		return nil, fmt.Errorf("%+v convert to volumerestrictions.state error", c)
   221  	}
   222  	return s, nil
   223  }
   224  
   225  // calPreFilterState computes preFilterState describing which PVCs use ReadWriteOncePod
   226  // and which pods in the cluster are in conflict.
   227  func (pl *VolumeRestrictions) calPreFilterState(ctx context.Context, pod *v1.Pod, pvcs sets.Set[string]) (*preFilterState, error) {
   228  	conflictingPVCRefCount := 0
   229  	for pvc := range pvcs {
   230  		key := framework.GetNamespacedName(pod.Namespace, pvc)
   231  		if pl.sharedLister.StorageInfos().IsPVCUsedByPods(key) {
   232  			// There can only be at most one pod using the ReadWriteOncePod PVC.
   233  			conflictingPVCRefCount += 1
   234  		}
   235  	}
   236  	return &preFilterState{
   237  		readWriteOncePodPVCs:   pvcs,
   238  		conflictingPVCRefCount: conflictingPVCRefCount,
   239  	}, nil
   240  }
   241  
   242  func (pl *VolumeRestrictions) readWriteOncePodPVCsForPod(ctx context.Context, pod *v1.Pod) (sets.Set[string], error) {
   243  	pvcs := sets.New[string]()
   244  	for _, volume := range pod.Spec.Volumes {
   245  		if volume.PersistentVolumeClaim == nil {
   246  			continue
   247  		}
   248  
   249  		pvc, err := pl.pvcLister.PersistentVolumeClaims(pod.Namespace).Get(volume.PersistentVolumeClaim.ClaimName)
   250  		if err != nil {
   251  			return nil, err
   252  		}
   253  
   254  		if !v1helper.ContainsAccessMode(pvc.Spec.AccessModes, v1.ReadWriteOncePod) {
   255  			continue
   256  		}
   257  		pvcs.Insert(pvc.Name)
   258  	}
   259  	return pvcs, nil
   260  }
   261  
   262  // Checks if scheduling the pod onto this node would cause any conflicts with
   263  // existing volumes.
   264  func satisfyVolumeConflicts(pod *v1.Pod, nodeInfo *framework.NodeInfo) bool {
   265  	for i := range pod.Spec.Volumes {
   266  		v := pod.Spec.Volumes[i]
   267  		if !needsRestrictionsCheck(v) {
   268  			continue
   269  		}
   270  		for _, ev := range nodeInfo.Pods {
   271  			if isVolumeConflict(&v, ev.Pod) {
   272  				return false
   273  			}
   274  		}
   275  	}
   276  	return true
   277  }
   278  
   279  // Checks if scheduling the pod would cause any ReadWriteOncePod PVC access mode conflicts.
   280  func satisfyReadWriteOncePod(ctx context.Context, state *preFilterState) *framework.Status {
   281  	if state == nil {
   282  		return nil
   283  	}
   284  	if state.conflictingPVCRefCount > 0 {
   285  		return framework.NewStatus(framework.Unschedulable, ErrReasonReadWriteOncePodConflict)
   286  	}
   287  	return nil
   288  }
   289  
   290  // PreFilterExtensions returns prefilter extensions, pod add and remove.
   291  func (pl *VolumeRestrictions) PreFilterExtensions() framework.PreFilterExtensions {
   292  	return pl
   293  }
   294  
   295  // Filter invoked at the filter extension point.
   296  // It evaluates if a pod can fit due to the volumes it requests, and those that
   297  // are already mounted. If there is already a volume mounted on that node, another pod that uses the same volume
   298  // can't be scheduled there.
   299  // This is GCE, Amazon EBS, ISCSI and Ceph RBD specific for now:
   300  // - GCE PD allows multiple mounts as long as they're all read-only
   301  // - AWS EBS forbids any two pods mounting the same volume ID
   302  // - Ceph RBD forbids if any two pods share at least same monitor, and match pool and image, and the image is read-only
   303  // - ISCSI forbids if any two pods share at least same IQN and ISCSI volume is read-only
   304  // If the pod uses PVCs with the ReadWriteOncePod access mode, it evaluates if
   305  // these PVCs are already in-use and if preemption will help.
   306  func (pl *VolumeRestrictions) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
   307  	if !satisfyVolumeConflicts(pod, nodeInfo) {
   308  		return framework.NewStatus(framework.Unschedulable, ErrReasonDiskConflict)
   309  	}
   310  	state, err := getPreFilterState(cycleState)
   311  	if err != nil {
   312  		return framework.AsStatus(err)
   313  	}
   314  	return satisfyReadWriteOncePod(ctx, state)
   315  }
   316  
   317  // EventsToRegister returns the possible events that may make a Pod
   318  // failed by this plugin schedulable.
   319  func (pl *VolumeRestrictions) EventsToRegister() []framework.ClusterEventWithHint {
   320  	return []framework.ClusterEventWithHint{
   321  		// Pods may fail to schedule because of volumes conflicting with other pods on same node.
   322  		// Once running pods are deleted and volumes have been released, the unschedulable pod will be schedulable.
   323  		// Due to immutable fields `spec.volumes`, pod update events are ignored.
   324  		{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Delete}},
   325  		// A new Node may make a pod schedulable.
   326  		{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add}},
   327  		// Pods may fail to schedule because the PVC it uses has not yet been created.
   328  		// This PVC is required to exist to check its access modes.
   329  		{Event: framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add | framework.Update}},
   330  	}
   331  }
   332  
   333  // New initializes a new plugin and returns it.
   334  func New(_ context.Context, _ runtime.Object, handle framework.Handle, fts feature.Features) (framework.Plugin, error) {
   335  	informerFactory := handle.SharedInformerFactory()
   336  	pvcLister := informerFactory.Core().V1().PersistentVolumeClaims().Lister()
   337  	sharedLister := handle.SnapshotSharedLister()
   338  
   339  	return &VolumeRestrictions{
   340  		pvcLister:    pvcLister,
   341  		sharedLister: sharedLister,
   342  	}, nil
   343  }
   344  

View as plain text