...

Source file src/k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread/scoring.go

Documentation: k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread

     1  /*
     2  Copyright 2019 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package podtopologyspread
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"math"
    23  	"sync/atomic"
    24  
    25  	v1 "k8s.io/api/core/v1"
    26  	"k8s.io/apimachinery/pkg/util/sets"
    27  	"k8s.io/component-helpers/scheduling/corev1/nodeaffinity"
    28  	"k8s.io/kubernetes/pkg/scheduler/framework"
    29  )
    30  
    31  const preScoreStateKey = "PreScore" + Name
    32  const invalidScore = -1
    33  
    34  // preScoreState computed at PreScore and used at Score.
    35  // Fields are exported for comparison during testing.
    36  type preScoreState struct {
    37  	Constraints []topologySpreadConstraint
    38  	// IgnoredNodes is a set of node names which miss some Constraints[*].topologyKey.
    39  	IgnoredNodes sets.Set[string]
    40  	// TopologyPairToPodCounts is keyed with topologyPair, and valued with the number of matching pods.
    41  	TopologyPairToPodCounts map[topologyPair]*int64
    42  	// TopologyNormalizingWeight is the weight we give to the counts per topology.
    43  	// This allows the pod counts of smaller topologies to not be watered down by
    44  	// bigger ones.
    45  	TopologyNormalizingWeight []float64
    46  }
    47  
    48  // Clone implements the mandatory Clone interface. We don't really copy the data since
    49  // there is no need for that.
    50  func (s *preScoreState) Clone() framework.StateData {
    51  	return s
    52  }
    53  
    54  // initPreScoreState iterates "filteredNodes" to filter out the nodes which
    55  // don't have required topologyKey(s), and initialize:
    56  // 1) s.TopologyPairToPodCounts: keyed with both eligible topology pair and node names.
    57  // 2) s.IgnoredNodes: the set of nodes that shouldn't be scored.
    58  // 3) s.TopologyNormalizingWeight: The weight to be given to each constraint based on the number of values in a topology.
    59  func (pl *PodTopologySpread) initPreScoreState(s *preScoreState, pod *v1.Pod, filteredNodes []*framework.NodeInfo, requireAllTopologies bool) error {
    60  	var err error
    61  	if len(pod.Spec.TopologySpreadConstraints) > 0 {
    62  		s.Constraints, err = pl.filterTopologySpreadConstraints(
    63  			pod.Spec.TopologySpreadConstraints,
    64  			pod.Labels,
    65  			v1.ScheduleAnyway,
    66  		)
    67  		if err != nil {
    68  			return fmt.Errorf("obtaining pod's soft topology spread constraints: %w", err)
    69  		}
    70  	} else {
    71  		s.Constraints, err = pl.buildDefaultConstraints(pod, v1.ScheduleAnyway)
    72  		if err != nil {
    73  			return fmt.Errorf("setting default soft topology spread constraints: %w", err)
    74  		}
    75  	}
    76  	if len(s.Constraints) == 0 {
    77  		return nil
    78  	}
    79  	topoSize := make([]int, len(s.Constraints))
    80  	for _, node := range filteredNodes {
    81  		if requireAllTopologies && !nodeLabelsMatchSpreadConstraints(node.Node().Labels, s.Constraints) {
    82  			// Nodes which don't have all required topologyKeys present are ignored
    83  			// when scoring later.
    84  			s.IgnoredNodes.Insert(node.Node().Name)
    85  			continue
    86  		}
    87  		for i, constraint := range s.Constraints {
    88  			// per-node counts are calculated during Score.
    89  			if constraint.TopologyKey == v1.LabelHostname {
    90  				continue
    91  			}
    92  			pair := topologyPair{key: constraint.TopologyKey, value: node.Node().Labels[constraint.TopologyKey]}
    93  			if s.TopologyPairToPodCounts[pair] == nil {
    94  				s.TopologyPairToPodCounts[pair] = new(int64)
    95  				topoSize[i]++
    96  			}
    97  		}
    98  	}
    99  
   100  	s.TopologyNormalizingWeight = make([]float64, len(s.Constraints))
   101  	for i, c := range s.Constraints {
   102  		sz := topoSize[i]
   103  		if c.TopologyKey == v1.LabelHostname {
   104  			sz = len(filteredNodes) - len(s.IgnoredNodes)
   105  		}
   106  		s.TopologyNormalizingWeight[i] = topologyNormalizingWeight(sz)
   107  	}
   108  	return nil
   109  }
   110  
   111  // PreScore builds and writes cycle state used by Score and NormalizeScore.
   112  func (pl *PodTopologySpread) PreScore(
   113  	ctx context.Context,
   114  	cycleState *framework.CycleState,
   115  	pod *v1.Pod,
   116  	filteredNodes []*framework.NodeInfo,
   117  ) *framework.Status {
   118  	allNodes, err := pl.sharedLister.NodeInfos().List()
   119  	if err != nil {
   120  		return framework.AsStatus(fmt.Errorf("getting all nodes: %w", err))
   121  	}
   122  
   123  	if len(allNodes) == 0 {
   124  		// No need to score.
   125  		return framework.NewStatus(framework.Skip)
   126  	}
   127  
   128  	state := &preScoreState{
   129  		IgnoredNodes:            sets.New[string](),
   130  		TopologyPairToPodCounts: make(map[topologyPair]*int64),
   131  	}
   132  	// Only require that nodes have all the topology labels if using
   133  	// non-system-default spreading rules. This allows nodes that don't have a
   134  	// zone label to still have hostname spreading.
   135  	requireAllTopologies := len(pod.Spec.TopologySpreadConstraints) > 0 || !pl.systemDefaulted
   136  	err = pl.initPreScoreState(state, pod, filteredNodes, requireAllTopologies)
   137  	if err != nil {
   138  		return framework.AsStatus(fmt.Errorf("calculating preScoreState: %w", err))
   139  	}
   140  
   141  	// return Skip if incoming pod doesn't have soft topology spread Constraints.
   142  	if len(state.Constraints) == 0 {
   143  		return framework.NewStatus(framework.Skip)
   144  	}
   145  
   146  	// Ignore parsing errors for backwards compatibility.
   147  	requiredNodeAffinity := nodeaffinity.GetRequiredNodeAffinity(pod)
   148  	processAllNode := func(i int) {
   149  		nodeInfo := allNodes[i]
   150  		node := nodeInfo.Node()
   151  
   152  		if !pl.enableNodeInclusionPolicyInPodTopologySpread {
   153  			// `node` should satisfy incoming pod's NodeSelector/NodeAffinity
   154  			if match, _ := requiredNodeAffinity.Match(node); !match {
   155  				return
   156  			}
   157  		}
   158  
   159  		// All topologyKeys need to be present in `node`
   160  		if requireAllTopologies && !nodeLabelsMatchSpreadConstraints(node.Labels, state.Constraints) {
   161  			return
   162  		}
   163  
   164  		for _, c := range state.Constraints {
   165  			if pl.enableNodeInclusionPolicyInPodTopologySpread &&
   166  				!c.matchNodeInclusionPolicies(pod, node, requiredNodeAffinity) {
   167  				continue
   168  			}
   169  
   170  			pair := topologyPair{key: c.TopologyKey, value: node.Labels[c.TopologyKey]}
   171  			// If current topology pair is not associated with any candidate node,
   172  			// continue to avoid unnecessary calculation.
   173  			// Per-node counts are also skipped, as they are done during Score.
   174  			tpCount := state.TopologyPairToPodCounts[pair]
   175  			if tpCount == nil {
   176  				continue
   177  			}
   178  			count := countPodsMatchSelector(nodeInfo.Pods, c.Selector, pod.Namespace)
   179  			atomic.AddInt64(tpCount, int64(count))
   180  		}
   181  	}
   182  	pl.parallelizer.Until(ctx, len(allNodes), processAllNode, pl.Name())
   183  
   184  	cycleState.Write(preScoreStateKey, state)
   185  	return nil
   186  }
   187  
   188  // Score invoked at the Score extension point.
   189  // The "score" returned in this function is the matching number of pods on the `nodeName`,
   190  // it is normalized later.
   191  func (pl *PodTopologySpread) Score(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
   192  	nodeInfo, err := pl.sharedLister.NodeInfos().Get(nodeName)
   193  	if err != nil {
   194  		return 0, framework.AsStatus(fmt.Errorf("getting node %q from Snapshot: %w", nodeName, err))
   195  	}
   196  
   197  	node := nodeInfo.Node()
   198  	s, err := getPreScoreState(cycleState)
   199  	if err != nil {
   200  		return 0, framework.AsStatus(err)
   201  	}
   202  
   203  	// Return if the node is not qualified.
   204  	if s.IgnoredNodes.Has(node.Name) {
   205  		return 0, nil
   206  	}
   207  
   208  	// For each present <pair>, current node gets a credit of <matchSum>.
   209  	// And we sum up <matchSum> and return it as this node's score.
   210  	var score float64
   211  	for i, c := range s.Constraints {
   212  		if tpVal, ok := node.Labels[c.TopologyKey]; ok {
   213  			var cnt int64
   214  			if c.TopologyKey == v1.LabelHostname {
   215  				cnt = int64(countPodsMatchSelector(nodeInfo.Pods, c.Selector, pod.Namespace))
   216  			} else {
   217  				pair := topologyPair{key: c.TopologyKey, value: tpVal}
   218  				cnt = *s.TopologyPairToPodCounts[pair]
   219  			}
   220  			score += scoreForCount(cnt, c.MaxSkew, s.TopologyNormalizingWeight[i])
   221  		}
   222  	}
   223  	return int64(math.Round(score)), nil
   224  }
   225  
   226  // NormalizeScore invoked after scoring all nodes.
   227  func (pl *PodTopologySpread) NormalizeScore(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status {
   228  	s, err := getPreScoreState(cycleState)
   229  	if err != nil {
   230  		return framework.AsStatus(err)
   231  	}
   232  	if s == nil {
   233  		return nil
   234  	}
   235  
   236  	// Calculate <minScore> and <maxScore>
   237  	var minScore int64 = math.MaxInt64
   238  	var maxScore int64
   239  	for i, score := range scores {
   240  		// it's mandatory to check if <score.Name> is present in m.IgnoredNodes
   241  		if s.IgnoredNodes.Has(score.Name) {
   242  			scores[i].Score = invalidScore
   243  			continue
   244  		}
   245  		if score.Score < minScore {
   246  			minScore = score.Score
   247  		}
   248  		if score.Score > maxScore {
   249  			maxScore = score.Score
   250  		}
   251  	}
   252  
   253  	for i := range scores {
   254  		if scores[i].Score == invalidScore {
   255  			scores[i].Score = 0
   256  			continue
   257  		}
   258  		if maxScore == 0 {
   259  			scores[i].Score = framework.MaxNodeScore
   260  			continue
   261  		}
   262  		s := scores[i].Score
   263  		scores[i].Score = framework.MaxNodeScore * (maxScore + minScore - s) / maxScore
   264  	}
   265  	return nil
   266  }
   267  
   268  // ScoreExtensions of the Score plugin.
   269  func (pl *PodTopologySpread) ScoreExtensions() framework.ScoreExtensions {
   270  	return pl
   271  }
   272  
   273  func getPreScoreState(cycleState *framework.CycleState) (*preScoreState, error) {
   274  	c, err := cycleState.Read(preScoreStateKey)
   275  	if err != nil {
   276  		return nil, fmt.Errorf("error reading %q from cycleState: %w", preScoreStateKey, err)
   277  	}
   278  
   279  	s, ok := c.(*preScoreState)
   280  	if !ok {
   281  		return nil, fmt.Errorf("%+v  convert to podtopologyspread.preScoreState error", c)
   282  	}
   283  	return s, nil
   284  }
   285  
   286  // topologyNormalizingWeight calculates the weight for the topology, based on
   287  // the number of values that exist for a topology.
   288  // Since <size> is at least 1 (all nodes that passed the Filters are in the
   289  // same topology), and k8s supports 5k nodes, the result is in the interval
   290  // <1.09, 8.52>.
   291  //
   292  // Note: <size> could also be zero when no nodes have the required topologies,
   293  // however we don't care about topology weight in this case as we return a 0
   294  // score for all nodes.
   295  func topologyNormalizingWeight(size int) float64 {
   296  	return math.Log(float64(size + 2))
   297  }
   298  
   299  // scoreForCount calculates the score based on number of matching pods in a
   300  // topology domain, the constraint's maxSkew and the topology weight.
   301  // `maxSkew-1` is added to the score so that differences between topology
   302  // domains get watered down, controlling the tolerance of the score to skews.
   303  func scoreForCount(cnt int64, maxSkew int32, tpWeight float64) float64 {
   304  	return float64(cnt)*tpWeight + float64(maxSkew-1)
   305  }
   306  

View as plain text