...

Source file src/k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeaffinity/node_affinity.go

Documentation: k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeaffinity

     1  /*
     2  Copyright 2019 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package nodeaffinity
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  
    23  	v1 "k8s.io/api/core/v1"
    24  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    25  	"k8s.io/apimachinery/pkg/runtime"
    26  	"k8s.io/apimachinery/pkg/util/sets"
    27  	"k8s.io/component-helpers/scheduling/corev1/nodeaffinity"
    28  	"k8s.io/klog/v2"
    29  	"k8s.io/kubernetes/pkg/scheduler/apis/config"
    30  	"k8s.io/kubernetes/pkg/scheduler/apis/config/validation"
    31  	"k8s.io/kubernetes/pkg/scheduler/framework"
    32  	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper"
    33  	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
    34  	"k8s.io/kubernetes/pkg/scheduler/util"
    35  )
    36  
    37  // NodeAffinity is a plugin that checks if a pod node selector matches the node label.
    38  type NodeAffinity struct {
    39  	handle              framework.Handle
    40  	addedNodeSelector   *nodeaffinity.NodeSelector
    41  	addedPrefSchedTerms *nodeaffinity.PreferredSchedulingTerms
    42  }
    43  
    44  var _ framework.PreFilterPlugin = &NodeAffinity{}
    45  var _ framework.FilterPlugin = &NodeAffinity{}
    46  var _ framework.PreScorePlugin = &NodeAffinity{}
    47  var _ framework.ScorePlugin = &NodeAffinity{}
    48  var _ framework.EnqueueExtensions = &NodeAffinity{}
    49  
    50  const (
    51  	// Name is the name of the plugin used in the plugin registry and configurations.
    52  	Name = names.NodeAffinity
    53  
    54  	// preScoreStateKey is the key in CycleState to NodeAffinity pre-computed data for Scoring.
    55  	preScoreStateKey = "PreScore" + Name
    56  
    57  	// preFilterStateKey is the key in CycleState to NodeAffinity pre-compute data for Filtering.
    58  	preFilterStateKey = "PreFilter" + Name
    59  
    60  	// ErrReasonPod is the reason for Pod's node affinity/selector not matching.
    61  	ErrReasonPod = "node(s) didn't match Pod's node affinity/selector"
    62  
    63  	// errReasonEnforced is the reason for added node affinity not matching.
    64  	errReasonEnforced = "node(s) didn't match scheduler-enforced node affinity"
    65  
    66  	// errReasonConflict is the reason for pod's conflicting affinity rules.
    67  	errReasonConflict = "pod affinity terms conflict"
    68  )
    69  
    70  // Name returns name of the plugin. It is used in logs, etc.
    71  func (pl *NodeAffinity) Name() string {
    72  	return Name
    73  }
    74  
    75  type preFilterState struct {
    76  	requiredNodeSelectorAndAffinity nodeaffinity.RequiredNodeAffinity
    77  }
    78  
    79  // Clone just returns the same state because it is not affected by pod additions or deletions.
    80  func (s *preFilterState) Clone() framework.StateData {
    81  	return s
    82  }
    83  
    84  // EventsToRegister returns the possible events that may make a Pod
    85  // failed by this plugin schedulable.
    86  func (pl *NodeAffinity) EventsToRegister() []framework.ClusterEventWithHint {
    87  	return []framework.ClusterEventWithHint{
    88  		{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterNodeChange},
    89  	}
    90  }
    91  
    92  // isSchedulableAfterNodeChange is invoked whenever a node changed. It checks whether
    93  // that change made a previously unschedulable pod schedulable.
    94  func (pl *NodeAffinity) isSchedulableAfterNodeChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
    95  	_, modifiedNode, err := util.As[*v1.Node](oldObj, newObj)
    96  	if err != nil {
    97  		return framework.Queue, err
    98  	}
    99  
   100  	if pl.addedNodeSelector != nil && !pl.addedNodeSelector.Match(modifiedNode) {
   101  		logger.V(4).Info("added or modified node didn't match scheduler-enforced node affinity and this event won't make the Pod schedulable", "pod", klog.KObj(pod), "node", klog.KObj(modifiedNode))
   102  		return framework.QueueSkip, nil
   103  	}
   104  
   105  	requiredNodeAffinity := nodeaffinity.GetRequiredNodeAffinity(pod)
   106  	isMatched, err := requiredNodeAffinity.Match(modifiedNode)
   107  	if err != nil {
   108  		return framework.Queue, err
   109  	}
   110  	if isMatched {
   111  		logger.V(4).Info("node was created or updated, and matches with the pod's NodeAffinity", "pod", klog.KObj(pod), "node", klog.KObj(modifiedNode))
   112  		return framework.Queue, nil
   113  	}
   114  
   115  	// TODO: also check if the original node meets the pod's requestments once preCheck is completely removed.
   116  	// See: https://github.com/kubernetes/kubernetes/issues/110175
   117  
   118  	logger.V(4).Info("node was created or updated, but it doesn't make this pod schedulable", "pod", klog.KObj(pod), "node", klog.KObj(modifiedNode))
   119  	return framework.QueueSkip, nil
   120  }
   121  
   122  // PreFilter builds and writes cycle state used by Filter.
   123  func (pl *NodeAffinity) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
   124  	affinity := pod.Spec.Affinity
   125  	noNodeAffinity := (affinity == nil ||
   126  		affinity.NodeAffinity == nil ||
   127  		affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil)
   128  	if noNodeAffinity && pl.addedNodeSelector == nil && pod.Spec.NodeSelector == nil {
   129  		// NodeAffinity Filter has nothing to do with the Pod.
   130  		return nil, framework.NewStatus(framework.Skip)
   131  	}
   132  
   133  	state := &preFilterState{requiredNodeSelectorAndAffinity: nodeaffinity.GetRequiredNodeAffinity(pod)}
   134  	cycleState.Write(preFilterStateKey, state)
   135  
   136  	if noNodeAffinity || len(affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms) == 0 {
   137  		return nil, nil
   138  	}
   139  
   140  	// Check if there is affinity to a specific node and return it.
   141  	terms := affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms
   142  	var nodeNames sets.Set[string]
   143  	for _, t := range terms {
   144  		var termNodeNames sets.Set[string]
   145  		for _, r := range t.MatchFields {
   146  			if r.Key == metav1.ObjectNameField && r.Operator == v1.NodeSelectorOpIn {
   147  				// The requirements represent ANDed constraints, and so we need to
   148  				// find the intersection of nodes.
   149  				s := sets.New(r.Values...)
   150  				if termNodeNames == nil {
   151  					termNodeNames = s
   152  				} else {
   153  					termNodeNames = termNodeNames.Intersection(s)
   154  				}
   155  			}
   156  		}
   157  		if termNodeNames == nil {
   158  			// If this term has no node.Name field affinity,
   159  			// then all nodes are eligible because the terms are ORed.
   160  			return nil, nil
   161  		}
   162  		nodeNames = nodeNames.Union(termNodeNames)
   163  	}
   164  	// If nodeNames is not nil, but length is 0, it means each term have conflicting affinity to node.Name;
   165  	// therefore, pod will not match any node.
   166  	if nodeNames != nil && len(nodeNames) == 0 {
   167  		return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, errReasonConflict)
   168  	} else if len(nodeNames) > 0 {
   169  		return &framework.PreFilterResult{NodeNames: nodeNames}, nil
   170  	}
   171  	return nil, nil
   172  
   173  }
   174  
   175  // PreFilterExtensions not necessary for this plugin as state doesn't depend on pod additions or deletions.
   176  func (pl *NodeAffinity) PreFilterExtensions() framework.PreFilterExtensions {
   177  	return nil
   178  }
   179  
   180  // Filter checks if the Node matches the Pod .spec.affinity.nodeAffinity and
   181  // the plugin's added affinity.
   182  func (pl *NodeAffinity) Filter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
   183  	node := nodeInfo.Node()
   184  
   185  	if pl.addedNodeSelector != nil && !pl.addedNodeSelector.Match(node) {
   186  		return framework.NewStatus(framework.UnschedulableAndUnresolvable, errReasonEnforced)
   187  	}
   188  
   189  	s, err := getPreFilterState(state)
   190  	if err != nil {
   191  		// Fallback to calculate requiredNodeSelector and requiredNodeAffinity
   192  		// here when PreFilter is disabled.
   193  		s = &preFilterState{requiredNodeSelectorAndAffinity: nodeaffinity.GetRequiredNodeAffinity(pod)}
   194  	}
   195  
   196  	// Ignore parsing errors for backwards compatibility.
   197  	match, _ := s.requiredNodeSelectorAndAffinity.Match(node)
   198  	if !match {
   199  		return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonPod)
   200  	}
   201  
   202  	return nil
   203  }
   204  
   205  // preScoreState computed at PreScore and used at Score.
   206  type preScoreState struct {
   207  	preferredNodeAffinity *nodeaffinity.PreferredSchedulingTerms
   208  }
   209  
   210  // Clone implements the mandatory Clone interface. We don't really copy the data since
   211  // there is no need for that.
   212  func (s *preScoreState) Clone() framework.StateData {
   213  	return s
   214  }
   215  
   216  // PreScore builds and writes cycle state used by Score and NormalizeScore.
   217  func (pl *NodeAffinity) PreScore(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodes []*framework.NodeInfo) *framework.Status {
   218  	if len(nodes) == 0 {
   219  		return nil
   220  	}
   221  	preferredNodeAffinity, err := getPodPreferredNodeAffinity(pod)
   222  	if err != nil {
   223  		return framework.AsStatus(err)
   224  	}
   225  	if preferredNodeAffinity == nil && pl.addedPrefSchedTerms == nil {
   226  		// NodeAffinity Score has nothing to do with the Pod.
   227  		return framework.NewStatus(framework.Skip)
   228  	}
   229  	state := &preScoreState{
   230  		preferredNodeAffinity: preferredNodeAffinity,
   231  	}
   232  	cycleState.Write(preScoreStateKey, state)
   233  	return nil
   234  }
   235  
   236  // Score returns the sum of the weights of the terms that match the Node.
   237  // Terms came from the Pod .spec.affinity.nodeAffinity and from the plugin's
   238  // default affinity.
   239  func (pl *NodeAffinity) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
   240  	nodeInfo, err := pl.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
   241  	if err != nil {
   242  		return 0, framework.AsStatus(fmt.Errorf("getting node %q from Snapshot: %w", nodeName, err))
   243  	}
   244  
   245  	node := nodeInfo.Node()
   246  
   247  	var count int64
   248  	if pl.addedPrefSchedTerms != nil {
   249  		count += pl.addedPrefSchedTerms.Score(node)
   250  	}
   251  
   252  	s, err := getPreScoreState(state)
   253  	if err != nil {
   254  		// Fallback to calculate preferredNodeAffinity here when PreScore is disabled.
   255  		preferredNodeAffinity, err := getPodPreferredNodeAffinity(pod)
   256  		if err != nil {
   257  			return 0, framework.AsStatus(err)
   258  		}
   259  		s = &preScoreState{
   260  			preferredNodeAffinity: preferredNodeAffinity,
   261  		}
   262  	}
   263  
   264  	if s.preferredNodeAffinity != nil {
   265  		count += s.preferredNodeAffinity.Score(node)
   266  	}
   267  
   268  	return count, nil
   269  }
   270  
   271  // NormalizeScore invoked after scoring all nodes.
   272  func (pl *NodeAffinity) NormalizeScore(ctx context.Context, state *framework.CycleState, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status {
   273  	return helper.DefaultNormalizeScore(framework.MaxNodeScore, false, scores)
   274  }
   275  
   276  // ScoreExtensions of the Score plugin.
   277  func (pl *NodeAffinity) ScoreExtensions() framework.ScoreExtensions {
   278  	return pl
   279  }
   280  
   281  // New initializes a new plugin and returns it.
   282  func New(_ context.Context, plArgs runtime.Object, h framework.Handle) (framework.Plugin, error) {
   283  	args, err := getArgs(plArgs)
   284  	if err != nil {
   285  		return nil, err
   286  	}
   287  	pl := &NodeAffinity{
   288  		handle: h,
   289  	}
   290  	if args.AddedAffinity != nil {
   291  		if ns := args.AddedAffinity.RequiredDuringSchedulingIgnoredDuringExecution; ns != nil {
   292  			pl.addedNodeSelector, err = nodeaffinity.NewNodeSelector(ns)
   293  			if err != nil {
   294  				return nil, fmt.Errorf("parsing addedAffinity.requiredDuringSchedulingIgnoredDuringExecution: %w", err)
   295  			}
   296  		}
   297  		// TODO: parse requiredDuringSchedulingRequiredDuringExecution when it gets added to the API.
   298  		if terms := args.AddedAffinity.PreferredDuringSchedulingIgnoredDuringExecution; len(terms) != 0 {
   299  			pl.addedPrefSchedTerms, err = nodeaffinity.NewPreferredSchedulingTerms(terms)
   300  			if err != nil {
   301  				return nil, fmt.Errorf("parsing addedAffinity.preferredDuringSchedulingIgnoredDuringExecution: %w", err)
   302  			}
   303  		}
   304  	}
   305  	return pl, nil
   306  }
   307  
   308  func getArgs(obj runtime.Object) (config.NodeAffinityArgs, error) {
   309  	ptr, ok := obj.(*config.NodeAffinityArgs)
   310  	if !ok {
   311  		return config.NodeAffinityArgs{}, fmt.Errorf("args are not of type NodeAffinityArgs, got %T", obj)
   312  	}
   313  	return *ptr, validation.ValidateNodeAffinityArgs(nil, ptr)
   314  }
   315  
   316  func getPodPreferredNodeAffinity(pod *v1.Pod) (*nodeaffinity.PreferredSchedulingTerms, error) {
   317  	affinity := pod.Spec.Affinity
   318  	if affinity != nil && affinity.NodeAffinity != nil && affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution != nil {
   319  		return nodeaffinity.NewPreferredSchedulingTerms(affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution)
   320  	}
   321  	return nil, nil
   322  }
   323  
   324  func getPreScoreState(cycleState *framework.CycleState) (*preScoreState, error) {
   325  	c, err := cycleState.Read(preScoreStateKey)
   326  	if err != nil {
   327  		return nil, fmt.Errorf("reading %q from cycleState: %w", preScoreStateKey, err)
   328  	}
   329  
   330  	s, ok := c.(*preScoreState)
   331  	if !ok {
   332  		return nil, fmt.Errorf("invalid PreScore state, got type %T", c)
   333  	}
   334  	return s, nil
   335  }
   336  
   337  func getPreFilterState(cycleState *framework.CycleState) (*preFilterState, error) {
   338  	c, err := cycleState.Read(preFilterStateKey)
   339  	if err != nil {
   340  		return nil, fmt.Errorf("reading %q from cycleState: %v", preFilterStateKey, err)
   341  	}
   342  
   343  	s, ok := c.(*preFilterState)
   344  	if !ok {
   345  		return nil, fmt.Errorf("invalid PreFilter state, got type %T", c)
   346  	}
   347  	return s, nil
   348  }
   349  

View as plain text