...

Source file src/k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumezone/volume_zone.go

Documentation: k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumezone

     1  /*
     2  Copyright 2019 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package volumezone
    18  
    19  import (
    20  	"context"
    21  	"errors"
    22  	"fmt"
    23  
    24  	v1 "k8s.io/api/core/v1"
    25  	storage "k8s.io/api/storage/v1"
    26  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    27  	"k8s.io/apimachinery/pkg/runtime"
    28  	"k8s.io/apimachinery/pkg/util/sets"
    29  	corelisters "k8s.io/client-go/listers/core/v1"
    30  	storagelisters "k8s.io/client-go/listers/storage/v1"
    31  	volumehelpers "k8s.io/cloud-provider/volume/helpers"
    32  	storagehelpers "k8s.io/component-helpers/storage/volume"
    33  	"k8s.io/klog/v2"
    34  	"k8s.io/kubernetes/pkg/scheduler/framework"
    35  	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
    36  )
    37  
    38  // VolumeZone is a plugin that checks volume zone.
    39  type VolumeZone struct {
    40  	pvLister  corelisters.PersistentVolumeLister
    41  	pvcLister corelisters.PersistentVolumeClaimLister
    42  	scLister  storagelisters.StorageClassLister
    43  }
    44  
    45  var _ framework.FilterPlugin = &VolumeZone{}
    46  var _ framework.PreFilterPlugin = &VolumeZone{}
    47  var _ framework.EnqueueExtensions = &VolumeZone{}
    48  
    49  const (
    50  	// Name is the name of the plugin used in the plugin registry and configurations.
    51  	Name = names.VolumeZone
    52  
    53  	preFilterStateKey framework.StateKey = "PreFilter" + Name
    54  
    55  	// ErrReasonConflict is used for NoVolumeZoneConflict predicate error.
    56  	ErrReasonConflict = "node(s) had no available volume zone"
    57  )
    58  
    59  // pvTopology holds the value of a pv's topologyLabel
    60  type pvTopology struct {
    61  	pvName string
    62  	key    string
    63  	values sets.Set[string]
    64  }
    65  
    66  // the state is initialized in PreFilter phase. because we save the pointer in
    67  // framework.CycleState, in the later phases we don't need to call Write method
    68  // to update the value
    69  type stateData struct {
    70  	// podPVTopologies holds the pv information we need
    71  	// it's initialized in the PreFilter phase
    72  	podPVTopologies []pvTopology
    73  }
    74  
    75  func (d *stateData) Clone() framework.StateData {
    76  	return d
    77  }
    78  
    79  var topologyLabels = []string{
    80  	v1.LabelFailureDomainBetaZone,
    81  	v1.LabelFailureDomainBetaRegion,
    82  	v1.LabelTopologyZone,
    83  	v1.LabelTopologyRegion,
    84  }
    85  
    86  func translateToGALabel(label string) string {
    87  	if label == v1.LabelFailureDomainBetaRegion {
    88  		return v1.LabelTopologyRegion
    89  	}
    90  	if label == v1.LabelFailureDomainBetaZone {
    91  		return v1.LabelTopologyZone
    92  	}
    93  	return label
    94  }
    95  
    96  // Name returns name of the plugin. It is used in logs, etc.
    97  func (pl *VolumeZone) Name() string {
    98  	return Name
    99  }
   100  
   101  // PreFilter invoked at the prefilter extension point
   102  //
   103  // # It finds the topology of the PersistentVolumes corresponding to the volumes a pod requests
   104  //
   105  // Currently, this is only supported with PersistentVolumeClaims,
   106  // and only looks for the bound PersistentVolume.
   107  func (pl *VolumeZone) PreFilter(ctx context.Context, cs *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
   108  	podPVTopologies, status := pl.getPVbyPod(ctx, pod)
   109  	if !status.IsSuccess() {
   110  		return nil, status
   111  	}
   112  	if len(podPVTopologies) == 0 {
   113  		return nil, framework.NewStatus(framework.Skip)
   114  	}
   115  	cs.Write(preFilterStateKey, &stateData{podPVTopologies: podPVTopologies})
   116  	return nil, nil
   117  }
   118  
   119  func (pl *VolumeZone) getPVbyPod(ctx context.Context, pod *v1.Pod) ([]pvTopology, *framework.Status) {
   120  	logger := klog.FromContext(ctx)
   121  	podPVTopologies := make([]pvTopology, 0)
   122  
   123  	for i := range pod.Spec.Volumes {
   124  		volume := pod.Spec.Volumes[i]
   125  		if volume.PersistentVolumeClaim == nil {
   126  			continue
   127  		}
   128  		pvcName := volume.PersistentVolumeClaim.ClaimName
   129  		if pvcName == "" {
   130  			return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, "PersistentVolumeClaim had no name")
   131  		}
   132  		pvc, err := pl.pvcLister.PersistentVolumeClaims(pod.Namespace).Get(pvcName)
   133  		if s := getErrorAsStatus(err); !s.IsSuccess() {
   134  			return nil, s
   135  		}
   136  
   137  		pvName := pvc.Spec.VolumeName
   138  		if pvName == "" {
   139  			scName := storagehelpers.GetPersistentVolumeClaimClass(pvc)
   140  			if len(scName) == 0 {
   141  				return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, "PersistentVolumeClaim had no pv name and storageClass name")
   142  			}
   143  
   144  			class, err := pl.scLister.Get(scName)
   145  			if s := getErrorAsStatus(err); !s.IsSuccess() {
   146  				return nil, s
   147  			}
   148  			if class.VolumeBindingMode == nil {
   149  				return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("VolumeBindingMode not set for StorageClass %q", scName))
   150  			}
   151  			if *class.VolumeBindingMode == storage.VolumeBindingWaitForFirstConsumer {
   152  				// Skip unbound volumes
   153  				continue
   154  			}
   155  
   156  			return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, "PersistentVolume had no name")
   157  		}
   158  
   159  		pv, err := pl.pvLister.Get(pvName)
   160  		if s := getErrorAsStatus(err); !s.IsSuccess() {
   161  			return nil, s
   162  		}
   163  
   164  		for _, key := range topologyLabels {
   165  			if value, ok := pv.ObjectMeta.Labels[key]; ok {
   166  				volumeVSet, err := volumehelpers.LabelZonesToSet(value)
   167  				if err != nil {
   168  					logger.Info("Failed to parse label, ignoring the label", "label", fmt.Sprintf("%s:%s", key, value), "err", err)
   169  					continue
   170  				}
   171  				podPVTopologies = append(podPVTopologies, pvTopology{
   172  					pvName: pv.Name,
   173  					key:    key,
   174  					values: sets.Set[string](volumeVSet),
   175  				})
   176  			}
   177  		}
   178  	}
   179  	return podPVTopologies, nil
   180  }
   181  
   182  // PreFilterExtensions returns prefilter extensions, pod add and remove.
   183  func (pl *VolumeZone) PreFilterExtensions() framework.PreFilterExtensions {
   184  	return nil
   185  }
   186  
   187  // Filter invoked at the filter extension point.
   188  //
   189  // It evaluates if a pod can fit due to the volumes it requests, given
   190  // that some volumes may have zone scheduling constraints.  The requirement is that any
   191  // volume zone-labels must match the equivalent zone-labels on the node.  It is OK for
   192  // the node to have more zone-label constraints (for example, a hypothetical replicated
   193  // volume might allow region-wide access)
   194  //
   195  // Currently this is only supported with PersistentVolumeClaims, and looks to the labels
   196  // only on the bound PersistentVolume.
   197  //
   198  // Working with volumes declared inline in the pod specification (i.e. not
   199  // using a PersistentVolume) is likely to be harder, as it would require
   200  // determining the zone of a volume during scheduling, and that is likely to
   201  // require calling out to the cloud provider.  It seems that we are moving away
   202  // from inline volume declarations anyway.
   203  func (pl *VolumeZone) Filter(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
   204  	logger := klog.FromContext(ctx)
   205  	// If a pod doesn't have any volume attached to it, the predicate will always be true.
   206  	// Thus we make a fast path for it, to avoid unnecessary computations in this case.
   207  	if len(pod.Spec.Volumes) == 0 {
   208  		return nil
   209  	}
   210  	var podPVTopologies []pvTopology
   211  	state, err := getStateData(cs)
   212  	if err != nil {
   213  		// Fallback to calculate pv list here
   214  		var status *framework.Status
   215  		podPVTopologies, status = pl.getPVbyPod(ctx, pod)
   216  		if !status.IsSuccess() {
   217  			return status
   218  		}
   219  	} else {
   220  		podPVTopologies = state.podPVTopologies
   221  	}
   222  
   223  	node := nodeInfo.Node()
   224  	hasAnyNodeConstraint := false
   225  	for _, topologyLabel := range topologyLabels {
   226  		if _, ok := node.Labels[topologyLabel]; ok {
   227  			hasAnyNodeConstraint = true
   228  			break
   229  		}
   230  	}
   231  
   232  	if !hasAnyNodeConstraint {
   233  		// The node has no zone constraints, so we're OK to schedule.
   234  		// This is to handle a single-zone cluster scenario where the node may not have any topology labels.
   235  		return nil
   236  	}
   237  
   238  	for _, pvTopology := range podPVTopologies {
   239  		v, ok := node.Labels[pvTopology.key]
   240  		if !ok {
   241  			// if we can't match the beta label, try to match pv's beta label with node's ga label
   242  			v, ok = node.Labels[translateToGALabel(pvTopology.key)]
   243  		}
   244  		if !ok || !pvTopology.values.Has(v) {
   245  			logger.V(10).Info("Won't schedule pod onto node due to volume (mismatch on label key)", "pod", klog.KObj(pod), "node", klog.KObj(node), "PV", klog.KRef("", pvTopology.pvName), "PVLabelKey", pvTopology.key)
   246  			return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonConflict)
   247  		}
   248  	}
   249  
   250  	return nil
   251  }
   252  
   253  func getStateData(cs *framework.CycleState) (*stateData, error) {
   254  	state, err := cs.Read(preFilterStateKey)
   255  	if err != nil {
   256  		return nil, err
   257  	}
   258  	s, ok := state.(*stateData)
   259  	if !ok {
   260  		return nil, errors.New("unable to convert state into stateData")
   261  	}
   262  	return s, nil
   263  }
   264  
   265  func getErrorAsStatus(err error) *framework.Status {
   266  	if err != nil {
   267  		if apierrors.IsNotFound(err) {
   268  			return framework.NewStatus(framework.UnschedulableAndUnresolvable, err.Error())
   269  		}
   270  		return framework.AsStatus(err)
   271  	}
   272  	return nil
   273  }
   274  
   275  // EventsToRegister returns the possible events that may make a Pod
   276  // failed by this plugin schedulable.
   277  func (pl *VolumeZone) EventsToRegister() []framework.ClusterEventWithHint {
   278  	return []framework.ClusterEventWithHint{
   279  		// New storageClass with bind mode `VolumeBindingWaitForFirstConsumer` will make a pod schedulable.
   280  		// Due to immutable field `storageClass.volumeBindingMode`, storageClass update events are ignored.
   281  		{Event: framework.ClusterEvent{Resource: framework.StorageClass, ActionType: framework.Add}},
   282  		// A new node or updating a node's volume zone labels may make a pod schedulable.
   283  		//
   284  		// A note about UpdateNodeTaint event:
   285  		// NodeAdd QueueingHint isn't always called because of the internal feature called preCheck.
   286  		// As a common problematic scenario,
   287  		// when a node is added but not ready, NodeAdd event is filtered out by preCheck and doesn't arrive.
   288  		// In such cases, this plugin may miss some events that actually make pods schedulable.
   289  		// As a workaround, we add UpdateNodeTaint event to catch the case.
   290  		// We can remove UpdateNodeTaint when we remove the preCheck feature.
   291  		// See: https://github.com/kubernetes/kubernetes/issues/110175
   292  		{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel | framework.UpdateNodeTaint}},
   293  		// A new pvc may make a pod schedulable.
   294  		// Due to fields are immutable except `spec.resources`, pvc update events are ignored.
   295  		{Event: framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add}},
   296  		// A new pv or updating a pv's volume zone labels may make a pod schedulable.
   297  		{Event: framework.ClusterEvent{Resource: framework.PersistentVolume, ActionType: framework.Add | framework.Update}},
   298  	}
   299  }
   300  
   301  // New initializes a new plugin and returns it.
   302  func New(_ context.Context, _ runtime.Object, handle framework.Handle) (framework.Plugin, error) {
   303  	informerFactory := handle.SharedInformerFactory()
   304  	pvLister := informerFactory.Core().V1().PersistentVolumes().Lister()
   305  	pvcLister := informerFactory.Core().V1().PersistentVolumeClaims().Lister()
   306  	scLister := informerFactory.Storage().V1().StorageClasses().Lister()
   307  	return &VolumeZone{
   308  		pvLister,
   309  		pvcLister,
   310  		scLister,
   311  	}, nil
   312  }
   313  

View as plain text