1
16
17 package podtopologyspread
18
19 import (
20 "context"
21 "fmt"
22 "math"
23 "sync/atomic"
24
25 v1 "k8s.io/api/core/v1"
26 "k8s.io/apimachinery/pkg/util/sets"
27 "k8s.io/component-helpers/scheduling/corev1/nodeaffinity"
28 "k8s.io/kubernetes/pkg/scheduler/framework"
29 )
30
31 const preScoreStateKey = "PreScore" + Name
32 const invalidScore = -1
33
34
35
36 type preScoreState struct {
37 Constraints []topologySpreadConstraint
38
39 IgnoredNodes sets.Set[string]
40
41 TopologyPairToPodCounts map[topologyPair]*int64
42
43
44
45 TopologyNormalizingWeight []float64
46 }
47
48
49
50 func (s *preScoreState) Clone() framework.StateData {
51 return s
52 }
53
54
55
56
57
58
59 func (pl *PodTopologySpread) initPreScoreState(s *preScoreState, pod *v1.Pod, filteredNodes []*framework.NodeInfo, requireAllTopologies bool) error {
60 var err error
61 if len(pod.Spec.TopologySpreadConstraints) > 0 {
62 s.Constraints, err = pl.filterTopologySpreadConstraints(
63 pod.Spec.TopologySpreadConstraints,
64 pod.Labels,
65 v1.ScheduleAnyway,
66 )
67 if err != nil {
68 return fmt.Errorf("obtaining pod's soft topology spread constraints: %w", err)
69 }
70 } else {
71 s.Constraints, err = pl.buildDefaultConstraints(pod, v1.ScheduleAnyway)
72 if err != nil {
73 return fmt.Errorf("setting default soft topology spread constraints: %w", err)
74 }
75 }
76 if len(s.Constraints) == 0 {
77 return nil
78 }
79 topoSize := make([]int, len(s.Constraints))
80 for _, node := range filteredNodes {
81 if requireAllTopologies && !nodeLabelsMatchSpreadConstraints(node.Node().Labels, s.Constraints) {
82
83
84 s.IgnoredNodes.Insert(node.Node().Name)
85 continue
86 }
87 for i, constraint := range s.Constraints {
88
89 if constraint.TopologyKey == v1.LabelHostname {
90 continue
91 }
92 pair := topologyPair{key: constraint.TopologyKey, value: node.Node().Labels[constraint.TopologyKey]}
93 if s.TopologyPairToPodCounts[pair] == nil {
94 s.TopologyPairToPodCounts[pair] = new(int64)
95 topoSize[i]++
96 }
97 }
98 }
99
100 s.TopologyNormalizingWeight = make([]float64, len(s.Constraints))
101 for i, c := range s.Constraints {
102 sz := topoSize[i]
103 if c.TopologyKey == v1.LabelHostname {
104 sz = len(filteredNodes) - len(s.IgnoredNodes)
105 }
106 s.TopologyNormalizingWeight[i] = topologyNormalizingWeight(sz)
107 }
108 return nil
109 }
110
111
112 func (pl *PodTopologySpread) PreScore(
113 ctx context.Context,
114 cycleState *framework.CycleState,
115 pod *v1.Pod,
116 filteredNodes []*framework.NodeInfo,
117 ) *framework.Status {
118 allNodes, err := pl.sharedLister.NodeInfos().List()
119 if err != nil {
120 return framework.AsStatus(fmt.Errorf("getting all nodes: %w", err))
121 }
122
123 if len(allNodes) == 0 {
124
125 return framework.NewStatus(framework.Skip)
126 }
127
128 state := &preScoreState{
129 IgnoredNodes: sets.New[string](),
130 TopologyPairToPodCounts: make(map[topologyPair]*int64),
131 }
132
133
134
135 requireAllTopologies := len(pod.Spec.TopologySpreadConstraints) > 0 || !pl.systemDefaulted
136 err = pl.initPreScoreState(state, pod, filteredNodes, requireAllTopologies)
137 if err != nil {
138 return framework.AsStatus(fmt.Errorf("calculating preScoreState: %w", err))
139 }
140
141
142 if len(state.Constraints) == 0 {
143 return framework.NewStatus(framework.Skip)
144 }
145
146
147 requiredNodeAffinity := nodeaffinity.GetRequiredNodeAffinity(pod)
148 processAllNode := func(i int) {
149 nodeInfo := allNodes[i]
150 node := nodeInfo.Node()
151
152 if !pl.enableNodeInclusionPolicyInPodTopologySpread {
153
154 if match, _ := requiredNodeAffinity.Match(node); !match {
155 return
156 }
157 }
158
159
160 if requireAllTopologies && !nodeLabelsMatchSpreadConstraints(node.Labels, state.Constraints) {
161 return
162 }
163
164 for _, c := range state.Constraints {
165 if pl.enableNodeInclusionPolicyInPodTopologySpread &&
166 !c.matchNodeInclusionPolicies(pod, node, requiredNodeAffinity) {
167 continue
168 }
169
170 pair := topologyPair{key: c.TopologyKey, value: node.Labels[c.TopologyKey]}
171
172
173
174 tpCount := state.TopologyPairToPodCounts[pair]
175 if tpCount == nil {
176 continue
177 }
178 count := countPodsMatchSelector(nodeInfo.Pods, c.Selector, pod.Namespace)
179 atomic.AddInt64(tpCount, int64(count))
180 }
181 }
182 pl.parallelizer.Until(ctx, len(allNodes), processAllNode, pl.Name())
183
184 cycleState.Write(preScoreStateKey, state)
185 return nil
186 }
187
188
189
190
191 func (pl *PodTopologySpread) Score(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
192 nodeInfo, err := pl.sharedLister.NodeInfos().Get(nodeName)
193 if err != nil {
194 return 0, framework.AsStatus(fmt.Errorf("getting node %q from Snapshot: %w", nodeName, err))
195 }
196
197 node := nodeInfo.Node()
198 s, err := getPreScoreState(cycleState)
199 if err != nil {
200 return 0, framework.AsStatus(err)
201 }
202
203
204 if s.IgnoredNodes.Has(node.Name) {
205 return 0, nil
206 }
207
208
209
210 var score float64
211 for i, c := range s.Constraints {
212 if tpVal, ok := node.Labels[c.TopologyKey]; ok {
213 var cnt int64
214 if c.TopologyKey == v1.LabelHostname {
215 cnt = int64(countPodsMatchSelector(nodeInfo.Pods, c.Selector, pod.Namespace))
216 } else {
217 pair := topologyPair{key: c.TopologyKey, value: tpVal}
218 cnt = *s.TopologyPairToPodCounts[pair]
219 }
220 score += scoreForCount(cnt, c.MaxSkew, s.TopologyNormalizingWeight[i])
221 }
222 }
223 return int64(math.Round(score)), nil
224 }
225
226
227 func (pl *PodTopologySpread) NormalizeScore(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status {
228 s, err := getPreScoreState(cycleState)
229 if err != nil {
230 return framework.AsStatus(err)
231 }
232 if s == nil {
233 return nil
234 }
235
236
237 var minScore int64 = math.MaxInt64
238 var maxScore int64
239 for i, score := range scores {
240
241 if s.IgnoredNodes.Has(score.Name) {
242 scores[i].Score = invalidScore
243 continue
244 }
245 if score.Score < minScore {
246 minScore = score.Score
247 }
248 if score.Score > maxScore {
249 maxScore = score.Score
250 }
251 }
252
253 for i := range scores {
254 if scores[i].Score == invalidScore {
255 scores[i].Score = 0
256 continue
257 }
258 if maxScore == 0 {
259 scores[i].Score = framework.MaxNodeScore
260 continue
261 }
262 s := scores[i].Score
263 scores[i].Score = framework.MaxNodeScore * (maxScore + minScore - s) / maxScore
264 }
265 return nil
266 }
267
268
269 func (pl *PodTopologySpread) ScoreExtensions() framework.ScoreExtensions {
270 return pl
271 }
272
273 func getPreScoreState(cycleState *framework.CycleState) (*preScoreState, error) {
274 c, err := cycleState.Read(preScoreStateKey)
275 if err != nil {
276 return nil, fmt.Errorf("error reading %q from cycleState: %w", preScoreStateKey, err)
277 }
278
279 s, ok := c.(*preScoreState)
280 if !ok {
281 return nil, fmt.Errorf("%+v convert to podtopologyspread.preScoreState error", c)
282 }
283 return s, nil
284 }
285
286
287
288
289
290
291
292
293
294
295 func topologyNormalizingWeight(size int) float64 {
296 return math.Log(float64(size + 2))
297 }
298
299
300
301
302
303 func scoreForCount(cnt int64, maxSkew int32, tpWeight float64) float64 {
304 return float64(cnt)*tpWeight + float64(maxSkew-1)
305 }
306
View as plain text