...

Source file src/k8s.io/kubernetes/pkg/scheduler/framework/plugins/interpodaffinity/scoring.go

Documentation: k8s.io/kubernetes/pkg/scheduler/framework/plugins/interpodaffinity

     1  /*
     2  Copyright 2019 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package interpodaffinity
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"math"
    23  	"sync/atomic"
    24  
    25  	v1 "k8s.io/api/core/v1"
    26  	"k8s.io/apimachinery/pkg/labels"
    27  	"k8s.io/klog/v2"
    28  	"k8s.io/kubernetes/pkg/scheduler/framework"
    29  )
    30  
    31  // preScoreStateKey is the key in CycleState to InterPodAffinity pre-computed data for Scoring.
    32  const preScoreStateKey = "PreScore" + Name
    33  
    34  type scoreMap map[string]map[string]int64
    35  
    36  // preScoreState computed at PreScore and used at Score.
    37  type preScoreState struct {
    38  	topologyScore scoreMap
    39  	podInfo       *framework.PodInfo
    40  	// A copy of the incoming pod's namespace labels.
    41  	namespaceLabels labels.Set
    42  }
    43  
    44  // Clone implements the mandatory Clone interface. We don't really copy the data since
    45  // there is no need for that.
    46  func (s *preScoreState) Clone() framework.StateData {
    47  	return s
    48  }
    49  
    50  func (m scoreMap) processTerm(term *framework.AffinityTerm, weight int32, pod *v1.Pod, nsLabels labels.Set, node *v1.Node, multiplier int32) {
    51  	if term.Matches(pod, nsLabels) {
    52  		if tpValue, tpValueExist := node.Labels[term.TopologyKey]; tpValueExist {
    53  			if m[term.TopologyKey] == nil {
    54  				m[term.TopologyKey] = make(map[string]int64)
    55  			}
    56  			m[term.TopologyKey][tpValue] += int64(weight * multiplier)
    57  		}
    58  	}
    59  }
    60  
    61  func (m scoreMap) processTerms(terms []framework.WeightedAffinityTerm, pod *v1.Pod, nsLabels labels.Set, node *v1.Node, multiplier int32) {
    62  	for _, term := range terms {
    63  		m.processTerm(&term.AffinityTerm, term.Weight, pod, nsLabels, node, multiplier)
    64  	}
    65  }
    66  
    67  func (m scoreMap) append(other scoreMap) {
    68  	for topology, oScores := range other {
    69  		scores := m[topology]
    70  		if scores == nil {
    71  			m[topology] = oScores
    72  			continue
    73  		}
    74  		for k, v := range oScores {
    75  			scores[k] += v
    76  		}
    77  	}
    78  }
    79  
    80  func (pl *InterPodAffinity) processExistingPod(
    81  	state *preScoreState,
    82  	existingPod *framework.PodInfo,
    83  	existingPodNodeInfo *framework.NodeInfo,
    84  	incomingPod *v1.Pod,
    85  	topoScore scoreMap,
    86  ) {
    87  	existingPodNode := existingPodNodeInfo.Node()
    88  	if len(existingPodNode.Labels) == 0 {
    89  		return
    90  	}
    91  
    92  	// For every soft pod affinity term of <pod>, if <existingPod> matches the term,
    93  	// increment <p.counts> for every node in the cluster with the same <term.TopologyKey>
    94  	// value as that of <existingPods>`s node by the term`s weight.
    95  	// Note that the incoming pod's terms have the namespaceSelector merged into the namespaces, and so
    96  	// here we don't lookup the existing pod's namespace labels, hence passing nil for nsLabels.
    97  	topoScore.processTerms(state.podInfo.PreferredAffinityTerms, existingPod.Pod, nil, existingPodNode, 1)
    98  
    99  	// For every soft pod anti-affinity term of <pod>, if <existingPod> matches the term,
   100  	// decrement <p.counts> for every node in the cluster with the same <term.TopologyKey>
   101  	// value as that of <existingPod>`s node by the term`s weight.
   102  	// Note that the incoming pod's terms have the namespaceSelector merged into the namespaces, and so
   103  	// here we don't lookup the existing pod's namespace labels, hence passing nil for nsLabels.
   104  	topoScore.processTerms(state.podInfo.PreferredAntiAffinityTerms, existingPod.Pod, nil, existingPodNode, -1)
   105  
   106  	// For every hard pod affinity term of <existingPod>, if <pod> matches the term,
   107  	// increment <p.counts> for every node in the cluster with the same <term.TopologyKey>
   108  	// value as that of <existingPod>'s node by the constant <args.hardPodAffinityWeight>
   109  	if pl.args.HardPodAffinityWeight > 0 && len(existingPodNode.Labels) != 0 {
   110  		for _, t := range existingPod.RequiredAffinityTerms {
   111  			topoScore.processTerm(&t, pl.args.HardPodAffinityWeight, incomingPod, state.namespaceLabels, existingPodNode, 1)
   112  		}
   113  	}
   114  
   115  	// For every soft pod affinity term of <existingPod>, if <pod> matches the term,
   116  	// increment <p.counts> for every node in the cluster with the same <term.TopologyKey>
   117  	// value as that of <existingPod>'s node by the term's weight.
   118  	topoScore.processTerms(existingPod.PreferredAffinityTerms, incomingPod, state.namespaceLabels, existingPodNode, 1)
   119  
   120  	// For every soft pod anti-affinity term of <existingPod>, if <pod> matches the term,
   121  	// decrement <pm.counts> for every node in the cluster with the same <term.TopologyKey>
   122  	// value as that of <existingPod>'s node by the term's weight.
   123  	topoScore.processTerms(existingPod.PreferredAntiAffinityTerms, incomingPod, state.namespaceLabels, existingPodNode, -1)
   124  }
   125  
   126  // PreScore builds and writes cycle state used by Score and NormalizeScore.
   127  func (pl *InterPodAffinity) PreScore(
   128  	pCtx context.Context,
   129  	cycleState *framework.CycleState,
   130  	pod *v1.Pod,
   131  	nodes []*framework.NodeInfo,
   132  ) *framework.Status {
   133  	if len(nodes) == 0 {
   134  		// No nodes to score.
   135  		return framework.NewStatus(framework.Skip)
   136  	}
   137  
   138  	if pl.sharedLister == nil {
   139  		return framework.NewStatus(framework.Error, "empty shared lister in InterPodAffinity PreScore")
   140  	}
   141  
   142  	affinity := pod.Spec.Affinity
   143  	hasPreferredAffinityConstraints := affinity != nil && affinity.PodAffinity != nil && len(affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution) > 0
   144  	hasPreferredAntiAffinityConstraints := affinity != nil && affinity.PodAntiAffinity != nil && len(affinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution) > 0
   145  	hasConstraints := hasPreferredAffinityConstraints || hasPreferredAntiAffinityConstraints
   146  
   147  	// Optionally ignore calculating preferences of existing pods' affinity rules
   148  	// if the incoming pod has no inter-pod affinities.
   149  	if pl.args.IgnorePreferredTermsOfExistingPods && !hasConstraints {
   150  		return framework.NewStatus(framework.Skip)
   151  	}
   152  
   153  	// Unless the pod being scheduled has preferred affinity terms, we only
   154  	// need to process nodes hosting pods with affinity.
   155  	var allNodes []*framework.NodeInfo
   156  	var err error
   157  	if hasConstraints {
   158  		allNodes, err = pl.sharedLister.NodeInfos().List()
   159  		if err != nil {
   160  			return framework.AsStatus(fmt.Errorf("failed to get all nodes from shared lister: %w", err))
   161  		}
   162  	} else {
   163  		allNodes, err = pl.sharedLister.NodeInfos().HavePodsWithAffinityList()
   164  		if err != nil {
   165  			return framework.AsStatus(fmt.Errorf("failed to get pods with affinity list: %w", err))
   166  		}
   167  	}
   168  
   169  	state := &preScoreState{
   170  		topologyScore: make(map[string]map[string]int64),
   171  	}
   172  
   173  	if state.podInfo, err = framework.NewPodInfo(pod); err != nil {
   174  		// Ideally we never reach here, because errors will be caught by PreFilter
   175  		return framework.AsStatus(fmt.Errorf("failed to parse pod: %w", err))
   176  	}
   177  
   178  	for i := range state.podInfo.PreferredAffinityTerms {
   179  		if err := pl.mergeAffinityTermNamespacesIfNotEmpty(&state.podInfo.PreferredAffinityTerms[i].AffinityTerm); err != nil {
   180  			return framework.AsStatus(fmt.Errorf("updating PreferredAffinityTerms: %w", err))
   181  		}
   182  	}
   183  	for i := range state.podInfo.PreferredAntiAffinityTerms {
   184  		if err := pl.mergeAffinityTermNamespacesIfNotEmpty(&state.podInfo.PreferredAntiAffinityTerms[i].AffinityTerm); err != nil {
   185  			return framework.AsStatus(fmt.Errorf("updating PreferredAntiAffinityTerms: %w", err))
   186  		}
   187  	}
   188  	logger := klog.FromContext(pCtx)
   189  	state.namespaceLabels = GetNamespaceLabelsSnapshot(logger, pod.Namespace, pl.nsLister)
   190  
   191  	topoScores := make([]scoreMap, len(allNodes))
   192  	index := int32(-1)
   193  	processNode := func(i int) {
   194  		nodeInfo := allNodes[i]
   195  
   196  		// Unless the pod being scheduled has preferred affinity terms, we only
   197  		// need to process pods with affinity in the node.
   198  		podsToProcess := nodeInfo.PodsWithAffinity
   199  		if hasConstraints {
   200  			// We need to process all the pods.
   201  			podsToProcess = nodeInfo.Pods
   202  		}
   203  
   204  		topoScore := make(scoreMap)
   205  		for _, existingPod := range podsToProcess {
   206  			pl.processExistingPod(state, existingPod, nodeInfo, pod, topoScore)
   207  		}
   208  		if len(topoScore) > 0 {
   209  			topoScores[atomic.AddInt32(&index, 1)] = topoScore
   210  		}
   211  	}
   212  	pl.parallelizer.Until(pCtx, len(allNodes), processNode, pl.Name())
   213  
   214  	if index == -1 {
   215  		return framework.NewStatus(framework.Skip)
   216  	}
   217  
   218  	for i := 0; i <= int(index); i++ {
   219  		state.topologyScore.append(topoScores[i])
   220  	}
   221  
   222  	cycleState.Write(preScoreStateKey, state)
   223  	return nil
   224  }
   225  
   226  func getPreScoreState(cycleState *framework.CycleState) (*preScoreState, error) {
   227  	c, err := cycleState.Read(preScoreStateKey)
   228  	if err != nil {
   229  		return nil, fmt.Errorf("failed to read %q from cycleState: %w", preScoreStateKey, err)
   230  	}
   231  
   232  	s, ok := c.(*preScoreState)
   233  	if !ok {
   234  		return nil, fmt.Errorf("%+v  convert to interpodaffinity.preScoreState error", c)
   235  	}
   236  	return s, nil
   237  }
   238  
   239  // Score invoked at the Score extension point.
   240  // The "score" returned in this function is the sum of weights got from cycleState which have its topologyKey matching with the node's labels.
   241  // it is normalized later.
   242  // Note: the returned "score" is positive for pod-affinity, and negative for pod-antiaffinity.
   243  func (pl *InterPodAffinity) Score(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
   244  	nodeInfo, err := pl.sharedLister.NodeInfos().Get(nodeName)
   245  	if err != nil {
   246  		return 0, framework.AsStatus(fmt.Errorf("failed to get node %q from Snapshot: %w", nodeName, err))
   247  	}
   248  	node := nodeInfo.Node()
   249  
   250  	s, err := getPreScoreState(cycleState)
   251  	if err != nil {
   252  		return 0, framework.AsStatus(err)
   253  	}
   254  	var score int64
   255  	for tpKey, tpValues := range s.topologyScore {
   256  		if v, exist := node.Labels[tpKey]; exist {
   257  			score += tpValues[v]
   258  		}
   259  	}
   260  
   261  	return score, nil
   262  }
   263  
   264  // NormalizeScore normalizes the score for each filteredNode.
   265  func (pl *InterPodAffinity) NormalizeScore(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status {
   266  	s, err := getPreScoreState(cycleState)
   267  	if err != nil {
   268  		return framework.AsStatus(err)
   269  	}
   270  	if len(s.topologyScore) == 0 {
   271  		return nil
   272  	}
   273  
   274  	var minCount int64 = math.MaxInt64
   275  	var maxCount int64 = math.MinInt64
   276  	for i := range scores {
   277  		score := scores[i].Score
   278  		if score > maxCount {
   279  			maxCount = score
   280  		}
   281  		if score < minCount {
   282  			minCount = score
   283  		}
   284  	}
   285  
   286  	maxMinDiff := maxCount - minCount
   287  	for i := range scores {
   288  		fScore := float64(0)
   289  		if maxMinDiff > 0 {
   290  			fScore = float64(framework.MaxNodeScore) * (float64(scores[i].Score-minCount) / float64(maxMinDiff))
   291  		}
   292  
   293  		scores[i].Score = int64(fScore)
   294  	}
   295  
   296  	return nil
   297  }
   298  
   299  // ScoreExtensions of the Score plugin.
   300  func (pl *InterPodAffinity) ScoreExtensions() framework.ScoreExtensions {
   301  	return pl
   302  }
   303  

View as plain text