...

Source file src/k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread/plugin.go

Documentation: k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread

     1  /*
     2  Copyright 2019 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package podtopologyspread
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"reflect"
    23  
    24  	v1 "k8s.io/api/core/v1"
    25  	"k8s.io/apimachinery/pkg/labels"
    26  	"k8s.io/apimachinery/pkg/runtime"
    27  	"k8s.io/client-go/informers"
    28  	appslisters "k8s.io/client-go/listers/apps/v1"
    29  	corelisters "k8s.io/client-go/listers/core/v1"
    30  	"k8s.io/klog/v2"
    31  	"k8s.io/kubernetes/pkg/scheduler/apis/config"
    32  	"k8s.io/kubernetes/pkg/scheduler/apis/config/validation"
    33  	"k8s.io/kubernetes/pkg/scheduler/framework"
    34  	"k8s.io/kubernetes/pkg/scheduler/framework/parallelize"
    35  	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
    36  	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
    37  	"k8s.io/kubernetes/pkg/scheduler/util"
    38  )
    39  
    40  const (
    41  	// ErrReasonConstraintsNotMatch is used for PodTopologySpread filter error.
    42  	ErrReasonConstraintsNotMatch = "node(s) didn't match pod topology spread constraints"
    43  	// ErrReasonNodeLabelNotMatch is used when the node doesn't hold the required label.
    44  	ErrReasonNodeLabelNotMatch = ErrReasonConstraintsNotMatch + " (missing required label)"
    45  )
    46  
    47  var systemDefaultConstraints = []v1.TopologySpreadConstraint{
    48  	{
    49  		TopologyKey:       v1.LabelHostname,
    50  		WhenUnsatisfiable: v1.ScheduleAnyway,
    51  		MaxSkew:           3,
    52  	},
    53  	{
    54  		TopologyKey:       v1.LabelTopologyZone,
    55  		WhenUnsatisfiable: v1.ScheduleAnyway,
    56  		MaxSkew:           5,
    57  	},
    58  }
    59  
    60  // PodTopologySpread is a plugin that ensures pod's topologySpreadConstraints is satisfied.
    61  type PodTopologySpread struct {
    62  	systemDefaulted                              bool
    63  	parallelizer                                 parallelize.Parallelizer
    64  	defaultConstraints                           []v1.TopologySpreadConstraint
    65  	sharedLister                                 framework.SharedLister
    66  	services                                     corelisters.ServiceLister
    67  	replicationCtrls                             corelisters.ReplicationControllerLister
    68  	replicaSets                                  appslisters.ReplicaSetLister
    69  	statefulSets                                 appslisters.StatefulSetLister
    70  	enableNodeInclusionPolicyInPodTopologySpread bool
    71  	enableMatchLabelKeysInPodTopologySpread      bool
    72  }
    73  
    74  var _ framework.PreFilterPlugin = &PodTopologySpread{}
    75  var _ framework.FilterPlugin = &PodTopologySpread{}
    76  var _ framework.PreScorePlugin = &PodTopologySpread{}
    77  var _ framework.ScorePlugin = &PodTopologySpread{}
    78  var _ framework.EnqueueExtensions = &PodTopologySpread{}
    79  
    80  // Name is the name of the plugin used in the plugin registry and configurations.
    81  const Name = names.PodTopologySpread
    82  
    83  // Name returns name of the plugin. It is used in logs, etc.
    84  func (pl *PodTopologySpread) Name() string {
    85  	return Name
    86  }
    87  
    88  // New initializes a new plugin and returns it.
    89  func New(_ context.Context, plArgs runtime.Object, h framework.Handle, fts feature.Features) (framework.Plugin, error) {
    90  	if h.SnapshotSharedLister() == nil {
    91  		return nil, fmt.Errorf("SnapshotSharedlister is nil")
    92  	}
    93  	args, err := getArgs(plArgs)
    94  	if err != nil {
    95  		return nil, err
    96  	}
    97  	if err := validation.ValidatePodTopologySpreadArgs(nil, &args); err != nil {
    98  		return nil, err
    99  	}
   100  	pl := &PodTopologySpread{
   101  		parallelizer:       h.Parallelizer(),
   102  		sharedLister:       h.SnapshotSharedLister(),
   103  		defaultConstraints: args.DefaultConstraints,
   104  		enableNodeInclusionPolicyInPodTopologySpread: fts.EnableNodeInclusionPolicyInPodTopologySpread,
   105  		enableMatchLabelKeysInPodTopologySpread:      fts.EnableMatchLabelKeysInPodTopologySpread,
   106  	}
   107  	if args.DefaultingType == config.SystemDefaulting {
   108  		pl.defaultConstraints = systemDefaultConstraints
   109  		pl.systemDefaulted = true
   110  	}
   111  	if len(pl.defaultConstraints) != 0 {
   112  		if h.SharedInformerFactory() == nil {
   113  			return nil, fmt.Errorf("SharedInformerFactory is nil")
   114  		}
   115  		pl.setListers(h.SharedInformerFactory())
   116  	}
   117  	return pl, nil
   118  }
   119  
   120  func getArgs(obj runtime.Object) (config.PodTopologySpreadArgs, error) {
   121  	ptr, ok := obj.(*config.PodTopologySpreadArgs)
   122  	if !ok {
   123  		return config.PodTopologySpreadArgs{}, fmt.Errorf("want args to be of type PodTopologySpreadArgs, got %T", obj)
   124  	}
   125  	return *ptr, nil
   126  }
   127  
   128  func (pl *PodTopologySpread) setListers(factory informers.SharedInformerFactory) {
   129  	pl.services = factory.Core().V1().Services().Lister()
   130  	pl.replicationCtrls = factory.Core().V1().ReplicationControllers().Lister()
   131  	pl.replicaSets = factory.Apps().V1().ReplicaSets().Lister()
   132  	pl.statefulSets = factory.Apps().V1().StatefulSets().Lister()
   133  }
   134  
   135  // EventsToRegister returns the possible events that may make a Pod
   136  // failed by this plugin schedulable.
   137  func (pl *PodTopologySpread) EventsToRegister() []framework.ClusterEventWithHint {
   138  	return []framework.ClusterEventWithHint{
   139  		// All ActionType includes the following events:
   140  		// - Add. An unschedulable Pod may fail due to violating topology spread constraints,
   141  		// adding an assigned Pod may make it schedulable.
   142  		// - Update. Updating on an existing Pod's labels (e.g., removal) may make
   143  		// an unschedulable Pod schedulable.
   144  		// - Delete. An unschedulable Pod may fail due to violating an existing Pod's topology spread constraints,
   145  		// deleting an existing Pod may make it schedulable.
   146  		{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.All}, QueueingHintFn: pl.isSchedulableAfterPodChange},
   147  		// Node add|delete|update maybe lead an topology key changed,
   148  		// and make these pod in scheduling schedulable or unschedulable.
   149  		//
   150  		// A note about UpdateNodeTaint event:
   151  		// NodeAdd QueueingHint isn't always called because of the internal feature called preCheck.
   152  		// As a common problematic scenario,
   153  		// when a node is added but not ready, NodeAdd event is filtered out by preCheck and doesn't arrive.
   154  		// In such cases, this plugin may miss some events that actually make pods schedulable.
   155  		// As a workaround, we add UpdateNodeTaint event to catch the case.
   156  		// We can remove UpdateNodeTaint when we remove the preCheck feature.
   157  		// See: https://github.com/kubernetes/kubernetes/issues/110175
   158  		{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Delete | framework.UpdateNodeLabel | framework.UpdateNodeTaint}, QueueingHintFn: pl.isSchedulableAfterNodeChange},
   159  	}
   160  }
   161  
   162  func involvedInTopologySpreading(incomingPod, podWithSpreading *v1.Pod) bool {
   163  	return incomingPod.Spec.NodeName != "" && incomingPod.Namespace == podWithSpreading.Namespace
   164  }
   165  
   166  func (pl *PodTopologySpread) isSchedulableAfterPodChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
   167  	originalPod, modifiedPod, err := util.As[*v1.Pod](oldObj, newObj)
   168  	if err != nil {
   169  		return framework.Queue, err
   170  	}
   171  
   172  	if (modifiedPod != nil && !involvedInTopologySpreading(modifiedPod, pod)) || (originalPod != nil && !involvedInTopologySpreading(originalPod, pod)) {
   173  		logger.V(5).Info("the added/updated/deleted pod is unscheduled or has different namespace with target pod, so it doesn't make the target pod schedulable",
   174  			"pod", klog.KObj(pod), "originalPod", klog.KObj(originalPod))
   175  		return framework.QueueSkip, nil
   176  	}
   177  
   178  	constraints, err := pl.getConstraints(pod)
   179  	if err != nil {
   180  		return framework.Queue, err
   181  	}
   182  
   183  	// Pod is modified. Return Queue when the label(s) matching topologySpread's selector is added, changed, or deleted.
   184  	if modifiedPod != nil && originalPod != nil {
   185  		if reflect.DeepEqual(modifiedPod.Labels, originalPod.Labels) {
   186  			logger.V(5).Info("the updated pod is unscheduled or has no updated labels or has different namespace with target pod, so it doesn't make the target pod schedulable",
   187  				"pod", klog.KObj(pod), "modifiedPod", klog.KObj(modifiedPod))
   188  			return framework.QueueSkip, nil
   189  		}
   190  		for _, c := range constraints {
   191  			if c.Selector.Matches(labels.Set(originalPod.Labels)) != c.Selector.Matches(labels.Set(modifiedPod.Labels)) {
   192  				// This modification makes this Pod match(or not match) with this constraint.
   193  				// Maybe now the scheduling result of topology spread gets changed by this change.
   194  				logger.V(5).Info("a scheduled pod's label was updated and it makes the updated pod match or unmatch the pod's topology spread constraints",
   195  					"pod", klog.KObj(pod), "modifiedPod", klog.KObj(modifiedPod))
   196  				return framework.Queue, nil
   197  			}
   198  		}
   199  		// This modification of labels doesn't change whether this Pod would match selector or not in any constraints.
   200  		logger.V(5).Info("a scheduled pod's label was updated, but it's a change unrelated to the pod's topology spread constraints",
   201  			"pod", klog.KObj(pod), "modifiedPod", klog.KObj(modifiedPod))
   202  		return framework.QueueSkip, nil
   203  	}
   204  
   205  	// Pod is added. Return Queue when the added Pod has a label that matches with topologySpread's selector.
   206  	if modifiedPod != nil {
   207  		if podLabelsMatchSpreadConstraints(constraints, modifiedPod.Labels) {
   208  			logger.V(5).Info("a scheduled pod was created and it matches with the pod's topology spread constraints",
   209  				"pod", klog.KObj(pod), "createdPod", klog.KObj(modifiedPod))
   210  			return framework.Queue, nil
   211  		}
   212  		logger.V(5).Info("a scheduled pod was created, but it doesn't matches with the pod's topology spread constraints",
   213  			"pod", klog.KObj(pod), "createdPod", klog.KObj(modifiedPod))
   214  		return framework.QueueSkip, nil
   215  	}
   216  
   217  	// Pod is deleted. Return Queue when the deleted Pod has a label that matches with topologySpread's selector.
   218  	if podLabelsMatchSpreadConstraints(constraints, originalPod.Labels) {
   219  		logger.V(5).Info("a scheduled pod which matches with the pod's topology spread constraints was deleted, and the pod may be schedulable now",
   220  			"pod", klog.KObj(pod), "deletedPod", klog.KObj(originalPod))
   221  		return framework.Queue, nil
   222  	}
   223  	logger.V(5).Info("a scheduled pod was deleted, but it's unrelated to the pod's topology spread constraints",
   224  		"pod", klog.KObj(pod), "deletedPod", klog.KObj(originalPod))
   225  
   226  	return framework.QueueSkip, nil
   227  }
   228  
   229  // getConstraints extracts topologySpreadConstraint(s) from the Pod spec.
   230  // If the Pod doesn't have any topologySpreadConstraint, it returns default constraints.
   231  func (pl *PodTopologySpread) getConstraints(pod *v1.Pod) ([]topologySpreadConstraint, error) {
   232  	var constraints []topologySpreadConstraint
   233  	var err error
   234  	if len(pod.Spec.TopologySpreadConstraints) > 0 {
   235  		// We have feature gating in APIServer to strip the spec
   236  		// so don't need to re-check feature gate, just check length of Constraints.
   237  		constraints, err = pl.filterTopologySpreadConstraints(
   238  			pod.Spec.TopologySpreadConstraints,
   239  			pod.Labels,
   240  			v1.DoNotSchedule,
   241  		)
   242  		if err != nil {
   243  			return nil, fmt.Errorf("obtaining pod's hard topology spread constraints: %w", err)
   244  		}
   245  	} else {
   246  		constraints, err = pl.buildDefaultConstraints(pod, v1.DoNotSchedule)
   247  		if err != nil {
   248  			return nil, fmt.Errorf("setting default hard topology spread constraints: %w", err)
   249  		}
   250  	}
   251  	return constraints, nil
   252  }
   253  
   254  func (pl *PodTopologySpread) isSchedulableAfterNodeChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
   255  	originalNode, modifiedNode, err := util.As[*v1.Node](oldObj, newObj)
   256  	if err != nil {
   257  		return framework.Queue, err
   258  	}
   259  
   260  	constraints, err := pl.getConstraints(pod)
   261  	if err != nil {
   262  		return framework.Queue, err
   263  	}
   264  
   265  	// framework.Add/framework.Update: return Queue when node has topologyKey in its labels, else return QueueSkip.
   266  	//
   267  	// TODO: we can filter out node update events in a more fine-grained way once preCheck is completely removed.
   268  	// See: https://github.com/kubernetes/kubernetes/issues/110175
   269  	if modifiedNode != nil {
   270  		if !nodeLabelsMatchSpreadConstraints(modifiedNode.Labels, constraints) {
   271  			logger.V(5).Info("the created/updated node doesn't match pod topology spread constraints",
   272  				"pod", klog.KObj(pod), "node", klog.KObj(modifiedNode))
   273  			return framework.QueueSkip, nil
   274  		}
   275  		logger.V(5).Info("node that match topology spread constraints was created/updated, and the pod may be schedulable now",
   276  			"pod", klog.KObj(pod), "node", klog.KObj(modifiedNode))
   277  		return framework.Queue, nil
   278  	}
   279  
   280  	// framework.Delete: return Queue when node has topologyKey in its labels, else return QueueSkip.
   281  	if !nodeLabelsMatchSpreadConstraints(originalNode.Labels, constraints) {
   282  		logger.V(5).Info("the deleted node doesn't match pod topology spread constraints", "pod", klog.KObj(pod), "node", klog.KObj(originalNode))
   283  		return framework.QueueSkip, nil
   284  	}
   285  	logger.V(5).Info("node that match topology spread constraints was deleted, and the pod may be schedulable now",
   286  		"pod", klog.KObj(pod), "node", klog.KObj(originalNode))
   287  	return framework.Queue, nil
   288  }
   289  

View as plain text