1
16
17 package interpodaffinity
18
19 import (
20 "context"
21 "fmt"
22 "math"
23 "sync/atomic"
24
25 v1 "k8s.io/api/core/v1"
26 "k8s.io/apimachinery/pkg/labels"
27 "k8s.io/klog/v2"
28 "k8s.io/kubernetes/pkg/scheduler/framework"
29 )
30
31
32 const preScoreStateKey = "PreScore" + Name
33
34 type scoreMap map[string]map[string]int64
35
36
37 type preScoreState struct {
38 topologyScore scoreMap
39 podInfo *framework.PodInfo
40
41 namespaceLabels labels.Set
42 }
43
44
45
46 func (s *preScoreState) Clone() framework.StateData {
47 return s
48 }
49
50 func (m scoreMap) processTerm(term *framework.AffinityTerm, weight int32, pod *v1.Pod, nsLabels labels.Set, node *v1.Node, multiplier int32) {
51 if term.Matches(pod, nsLabels) {
52 if tpValue, tpValueExist := node.Labels[term.TopologyKey]; tpValueExist {
53 if m[term.TopologyKey] == nil {
54 m[term.TopologyKey] = make(map[string]int64)
55 }
56 m[term.TopologyKey][tpValue] += int64(weight * multiplier)
57 }
58 }
59 }
60
61 func (m scoreMap) processTerms(terms []framework.WeightedAffinityTerm, pod *v1.Pod, nsLabels labels.Set, node *v1.Node, multiplier int32) {
62 for _, term := range terms {
63 m.processTerm(&term.AffinityTerm, term.Weight, pod, nsLabels, node, multiplier)
64 }
65 }
66
67 func (m scoreMap) append(other scoreMap) {
68 for topology, oScores := range other {
69 scores := m[topology]
70 if scores == nil {
71 m[topology] = oScores
72 continue
73 }
74 for k, v := range oScores {
75 scores[k] += v
76 }
77 }
78 }
79
80 func (pl *InterPodAffinity) processExistingPod(
81 state *preScoreState,
82 existingPod *framework.PodInfo,
83 existingPodNodeInfo *framework.NodeInfo,
84 incomingPod *v1.Pod,
85 topoScore scoreMap,
86 ) {
87 existingPodNode := existingPodNodeInfo.Node()
88 if len(existingPodNode.Labels) == 0 {
89 return
90 }
91
92
93
94
95
96
97 topoScore.processTerms(state.podInfo.PreferredAffinityTerms, existingPod.Pod, nil, existingPodNode, 1)
98
99
100
101
102
103
104 topoScore.processTerms(state.podInfo.PreferredAntiAffinityTerms, existingPod.Pod, nil, existingPodNode, -1)
105
106
107
108
109 if pl.args.HardPodAffinityWeight > 0 && len(existingPodNode.Labels) != 0 {
110 for _, t := range existingPod.RequiredAffinityTerms {
111 topoScore.processTerm(&t, pl.args.HardPodAffinityWeight, incomingPod, state.namespaceLabels, existingPodNode, 1)
112 }
113 }
114
115
116
117
118 topoScore.processTerms(existingPod.PreferredAffinityTerms, incomingPod, state.namespaceLabels, existingPodNode, 1)
119
120
121
122
123 topoScore.processTerms(existingPod.PreferredAntiAffinityTerms, incomingPod, state.namespaceLabels, existingPodNode, -1)
124 }
125
126
127 func (pl *InterPodAffinity) PreScore(
128 pCtx context.Context,
129 cycleState *framework.CycleState,
130 pod *v1.Pod,
131 nodes []*framework.NodeInfo,
132 ) *framework.Status {
133 if len(nodes) == 0 {
134
135 return framework.NewStatus(framework.Skip)
136 }
137
138 if pl.sharedLister == nil {
139 return framework.NewStatus(framework.Error, "empty shared lister in InterPodAffinity PreScore")
140 }
141
142 affinity := pod.Spec.Affinity
143 hasPreferredAffinityConstraints := affinity != nil && affinity.PodAffinity != nil && len(affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution) > 0
144 hasPreferredAntiAffinityConstraints := affinity != nil && affinity.PodAntiAffinity != nil && len(affinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution) > 0
145 hasConstraints := hasPreferredAffinityConstraints || hasPreferredAntiAffinityConstraints
146
147
148
149 if pl.args.IgnorePreferredTermsOfExistingPods && !hasConstraints {
150 return framework.NewStatus(framework.Skip)
151 }
152
153
154
155 var allNodes []*framework.NodeInfo
156 var err error
157 if hasConstraints {
158 allNodes, err = pl.sharedLister.NodeInfos().List()
159 if err != nil {
160 return framework.AsStatus(fmt.Errorf("failed to get all nodes from shared lister: %w", err))
161 }
162 } else {
163 allNodes, err = pl.sharedLister.NodeInfos().HavePodsWithAffinityList()
164 if err != nil {
165 return framework.AsStatus(fmt.Errorf("failed to get pods with affinity list: %w", err))
166 }
167 }
168
169 state := &preScoreState{
170 topologyScore: make(map[string]map[string]int64),
171 }
172
173 if state.podInfo, err = framework.NewPodInfo(pod); err != nil {
174
175 return framework.AsStatus(fmt.Errorf("failed to parse pod: %w", err))
176 }
177
178 for i := range state.podInfo.PreferredAffinityTerms {
179 if err := pl.mergeAffinityTermNamespacesIfNotEmpty(&state.podInfo.PreferredAffinityTerms[i].AffinityTerm); err != nil {
180 return framework.AsStatus(fmt.Errorf("updating PreferredAffinityTerms: %w", err))
181 }
182 }
183 for i := range state.podInfo.PreferredAntiAffinityTerms {
184 if err := pl.mergeAffinityTermNamespacesIfNotEmpty(&state.podInfo.PreferredAntiAffinityTerms[i].AffinityTerm); err != nil {
185 return framework.AsStatus(fmt.Errorf("updating PreferredAntiAffinityTerms: %w", err))
186 }
187 }
188 logger := klog.FromContext(pCtx)
189 state.namespaceLabels = GetNamespaceLabelsSnapshot(logger, pod.Namespace, pl.nsLister)
190
191 topoScores := make([]scoreMap, len(allNodes))
192 index := int32(-1)
193 processNode := func(i int) {
194 nodeInfo := allNodes[i]
195
196
197
198 podsToProcess := nodeInfo.PodsWithAffinity
199 if hasConstraints {
200
201 podsToProcess = nodeInfo.Pods
202 }
203
204 topoScore := make(scoreMap)
205 for _, existingPod := range podsToProcess {
206 pl.processExistingPod(state, existingPod, nodeInfo, pod, topoScore)
207 }
208 if len(topoScore) > 0 {
209 topoScores[atomic.AddInt32(&index, 1)] = topoScore
210 }
211 }
212 pl.parallelizer.Until(pCtx, len(allNodes), processNode, pl.Name())
213
214 if index == -1 {
215 return framework.NewStatus(framework.Skip)
216 }
217
218 for i := 0; i <= int(index); i++ {
219 state.topologyScore.append(topoScores[i])
220 }
221
222 cycleState.Write(preScoreStateKey, state)
223 return nil
224 }
225
226 func getPreScoreState(cycleState *framework.CycleState) (*preScoreState, error) {
227 c, err := cycleState.Read(preScoreStateKey)
228 if err != nil {
229 return nil, fmt.Errorf("failed to read %q from cycleState: %w", preScoreStateKey, err)
230 }
231
232 s, ok := c.(*preScoreState)
233 if !ok {
234 return nil, fmt.Errorf("%+v convert to interpodaffinity.preScoreState error", c)
235 }
236 return s, nil
237 }
238
239
240
241
242
243 func (pl *InterPodAffinity) Score(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
244 nodeInfo, err := pl.sharedLister.NodeInfos().Get(nodeName)
245 if err != nil {
246 return 0, framework.AsStatus(fmt.Errorf("failed to get node %q from Snapshot: %w", nodeName, err))
247 }
248 node := nodeInfo.Node()
249
250 s, err := getPreScoreState(cycleState)
251 if err != nil {
252 return 0, framework.AsStatus(err)
253 }
254 var score int64
255 for tpKey, tpValues := range s.topologyScore {
256 if v, exist := node.Labels[tpKey]; exist {
257 score += tpValues[v]
258 }
259 }
260
261 return score, nil
262 }
263
264
265 func (pl *InterPodAffinity) NormalizeScore(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, scores framework.NodeScoreList) *framework.Status {
266 s, err := getPreScoreState(cycleState)
267 if err != nil {
268 return framework.AsStatus(err)
269 }
270 if len(s.topologyScore) == 0 {
271 return nil
272 }
273
274 var minCount int64 = math.MaxInt64
275 var maxCount int64 = math.MinInt64
276 for i := range scores {
277 score := scores[i].Score
278 if score > maxCount {
279 maxCount = score
280 }
281 if score < minCount {
282 minCount = score
283 }
284 }
285
286 maxMinDiff := maxCount - minCount
287 for i := range scores {
288 fScore := float64(0)
289 if maxMinDiff > 0 {
290 fScore = float64(framework.MaxNodeScore) * (float64(scores[i].Score-minCount) / float64(maxMinDiff))
291 }
292
293 scores[i].Score = int64(fScore)
294 }
295
296 return nil
297 }
298
299
300 func (pl *InterPodAffinity) ScoreExtensions() framework.ScoreExtensions {
301 return pl
302 }
303
View as plain text