1
16
17 package framework
18
19 import (
20 "context"
21 "fmt"
22 "sort"
23
24 v1 "k8s.io/api/core/v1"
25 "k8s.io/apimachinery/pkg/runtime"
26 corev1helpers "k8s.io/component-helpers/scheduling/corev1"
27 "k8s.io/klog/v2"
28 extenderv1 "k8s.io/kube-scheduler/extender/v1"
29 "k8s.io/kubernetes/pkg/scheduler/framework"
30 frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
31 "k8s.io/kubernetes/pkg/scheduler/util"
32 )
33
34
35 type FitPredicate func(pod *v1.Pod, node *framework.NodeInfo) *framework.Status
36
37
38 type PriorityFunc func(pod *v1.Pod, nodes []*framework.NodeInfo) (*framework.NodeScoreList, error)
39
40
41 type PriorityConfig struct {
42 Function PriorityFunc
43 Weight int64
44 }
45
46
47 func ErrorPredicateExtender(pod *v1.Pod, node *framework.NodeInfo) *framework.Status {
48 return framework.NewStatus(framework.Error, "some error")
49 }
50
51
52 func FalsePredicateExtender(pod *v1.Pod, node *framework.NodeInfo) *framework.Status {
53 return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("pod is unschedulable on the node %q", node.Node().Name))
54 }
55
56
57 func TruePredicateExtender(pod *v1.Pod, node *framework.NodeInfo) *framework.Status {
58 return framework.NewStatus(framework.Success)
59 }
60
61
62
63 func Node1PredicateExtender(pod *v1.Pod, node *framework.NodeInfo) *framework.Status {
64 if node.Node().Name == "node1" {
65 return framework.NewStatus(framework.Success)
66 }
67 return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("node %q is not allowed", node.Node().Name))
68 }
69
70
71
72 func Node2PredicateExtender(pod *v1.Pod, node *framework.NodeInfo) *framework.Status {
73 if node.Node().Name == "node2" {
74 return framework.NewStatus(framework.Success)
75 }
76 return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("node %q is not allowed", node.Node().Name))
77 }
78
79
80 func ErrorPrioritizerExtender(pod *v1.Pod, nodes []*framework.NodeInfo) (*framework.NodeScoreList, error) {
81 return &framework.NodeScoreList{}, fmt.Errorf("some error")
82 }
83
84
85
86 func Node1PrioritizerExtender(pod *v1.Pod, nodes []*framework.NodeInfo) (*framework.NodeScoreList, error) {
87 result := framework.NodeScoreList{}
88 for _, node := range nodes {
89 score := 1
90 if node.Node().Name == "node1" {
91 score = 10
92 }
93 result = append(result, framework.NodeScore{Name: node.Node().Name, Score: int64(score)})
94 }
95 return &result, nil
96 }
97
98
99
100 func Node2PrioritizerExtender(pod *v1.Pod, nodes []*framework.NodeInfo) (*framework.NodeScoreList, error) {
101 result := framework.NodeScoreList{}
102 for _, node := range nodes {
103 score := 1
104 if node.Node().Name == "node2" {
105 score = 10
106 }
107 result = append(result, framework.NodeScore{Name: node.Node().Name, Score: int64(score)})
108 }
109 return &result, nil
110 }
111
112 type node2PrioritizerPlugin struct{}
113
114
115 func NewNode2PrioritizerPlugin() frameworkruntime.PluginFactory {
116 return func(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) {
117 return &node2PrioritizerPlugin{}, nil
118 }
119 }
120
121
122 func (pl *node2PrioritizerPlugin) Name() string {
123 return "Node2Prioritizer"
124 }
125
126
127 func (pl *node2PrioritizerPlugin) Score(_ context.Context, _ *framework.CycleState, _ *v1.Pod, nodeName string) (int64, *framework.Status) {
128 score := 10
129 if nodeName == "node2" {
130 score = 100
131 }
132 return int64(score), nil
133 }
134
135
136 func (pl *node2PrioritizerPlugin) ScoreExtensions() framework.ScoreExtensions {
137 return nil
138 }
139
140
141 type FakeExtender struct {
142
143
144 ExtenderName string
145 Predicates []FitPredicate
146 Prioritizers []PriorityConfig
147 Weight int64
148 NodeCacheCapable bool
149 FilteredNodes []*framework.NodeInfo
150 UnInterested bool
151 Ignorable bool
152 Binder func() error
153
154
155 CachedNodeNameToInfo map[string]*framework.NodeInfo
156 }
157
158 const defaultFakeExtenderName = "defaultFakeExtender"
159
160
161 func (f *FakeExtender) Name() string {
162 if f.ExtenderName == "" {
163
164 return defaultFakeExtenderName
165 }
166 return f.ExtenderName
167 }
168
169
170 func (f *FakeExtender) IsIgnorable() bool {
171 return f.Ignorable
172 }
173
174
175 func (f *FakeExtender) SupportsPreemption() bool {
176
177 return true
178 }
179
180
181 func (f *FakeExtender) ProcessPreemption(
182 pod *v1.Pod,
183 nodeNameToVictims map[string]*extenderv1.Victims,
184 nodeInfos framework.NodeInfoLister,
185 ) (map[string]*extenderv1.Victims, error) {
186 nodeNameToVictimsCopy := map[string]*extenderv1.Victims{}
187
188 for k, v := range nodeNameToVictims {
189
190
191
192
193 nodeNameToVictimsCopy[k] = v
194 }
195
196
197
198 logger := klog.TODO()
199 for nodeName, victims := range nodeNameToVictimsCopy {
200
201 nodeInfo, _ := nodeInfos.Get(nodeName)
202 extenderVictimPods, extenderPDBViolations, fits, err := f.selectVictimsOnNodeByExtender(logger, pod, nodeInfo)
203 if err != nil {
204 return nil, err
205 }
206
207
208 if !fits {
209 delete(nodeNameToVictimsCopy, nodeName)
210 } else {
211
212 nodeNameToVictimsCopy[nodeName].Pods = append(victims.Pods, extenderVictimPods...)
213 nodeNameToVictimsCopy[nodeName].NumPDBViolations = victims.NumPDBViolations + int64(extenderPDBViolations)
214 }
215 }
216 return nodeNameToVictimsCopy, nil
217 }
218
219
220
221
222
223
224 func (f *FakeExtender) selectVictimsOnNodeByExtender(logger klog.Logger, pod *v1.Pod, node *framework.NodeInfo) ([]*v1.Pod, int, bool, error) {
225
226
227 if !f.NodeCacheCapable {
228 err := f.runPredicate(pod, node)
229 if err.IsSuccess() {
230 return []*v1.Pod{}, 0, true, nil
231 } else if err.IsRejected() {
232 return nil, 0, false, nil
233 } else {
234 return nil, 0, false, err.AsError()
235 }
236 }
237
238
239
240 nodeInfoCopy := f.CachedNodeNameToInfo[node.Node().Name].Snapshot()
241
242 var potentialVictims []*v1.Pod
243
244 removePod := func(rp *v1.Pod) error {
245 return nodeInfoCopy.RemovePod(logger, rp)
246 }
247 addPod := func(ap *v1.Pod) {
248 nodeInfoCopy.AddPod(ap)
249 }
250
251
252 podPriority := corev1helpers.PodPriority(pod)
253 for _, p := range nodeInfoCopy.Pods {
254 if corev1helpers.PodPriority(p.Pod) < podPriority {
255 potentialVictims = append(potentialVictims, p.Pod)
256 if err := removePod(p.Pod); err != nil {
257 return nil, 0, false, err
258 }
259 }
260 }
261 sort.Slice(potentialVictims, func(i, j int) bool { return util.MoreImportantPod(potentialVictims[i], potentialVictims[j]) })
262
263
264
265 status := f.runPredicate(pod, nodeInfoCopy)
266 if status.IsSuccess() {
267
268 } else if status.IsRejected() {
269
270 return nil, 0, false, nil
271 } else {
272
273 return nil, 0, false, status.AsError()
274 }
275
276 var victims []*v1.Pod
277
278
279 numViolatingVictim := 0
280
281 reprievePod := func(p *v1.Pod) bool {
282 addPod(p)
283 status := f.runPredicate(pod, nodeInfoCopy)
284 if !status.IsSuccess() {
285 if err := removePod(p); err != nil {
286 return false
287 }
288 victims = append(victims, p)
289 }
290 return status.IsSuccess()
291 }
292
293
294
295 for _, p := range potentialVictims {
296 reprievePod(p)
297 }
298
299 return victims, numViolatingVictim, true, nil
300 }
301
302
303
304 func (f *FakeExtender) runPredicate(pod *v1.Pod, node *framework.NodeInfo) *framework.Status {
305 for _, predicate := range f.Predicates {
306 status := predicate(pod, node)
307 if !status.IsSuccess() {
308 return status
309 }
310 }
311 return framework.NewStatus(framework.Success)
312 }
313
314
315 func (f *FakeExtender) Filter(pod *v1.Pod, nodes []*framework.NodeInfo) ([]*framework.NodeInfo, extenderv1.FailedNodesMap, extenderv1.FailedNodesMap, error) {
316 var filtered []*framework.NodeInfo
317 failedNodesMap := extenderv1.FailedNodesMap{}
318 failedAndUnresolvableMap := extenderv1.FailedNodesMap{}
319 for _, node := range nodes {
320 status := f.runPredicate(pod, node)
321 if status.IsSuccess() {
322 filtered = append(filtered, node)
323 } else if status.Code() == framework.Unschedulable {
324 failedNodesMap[node.Node().Name] = fmt.Sprintf("FakeExtender: node %q failed", node.Node().Name)
325 } else if status.Code() == framework.UnschedulableAndUnresolvable {
326 failedAndUnresolvableMap[node.Node().Name] = fmt.Sprintf("FakeExtender: node %q failed and unresolvable", node.Node().Name)
327 } else {
328 return nil, nil, nil, status.AsError()
329 }
330 }
331
332 f.FilteredNodes = filtered
333 if f.NodeCacheCapable {
334 return filtered, failedNodesMap, failedAndUnresolvableMap, nil
335 }
336 return filtered, failedNodesMap, failedAndUnresolvableMap, nil
337 }
338
339
340 func (f *FakeExtender) Prioritize(pod *v1.Pod, nodes []*framework.NodeInfo) (*extenderv1.HostPriorityList, int64, error) {
341 result := extenderv1.HostPriorityList{}
342 combinedScores := map[string]int64{}
343 for _, prioritizer := range f.Prioritizers {
344 weight := prioritizer.Weight
345 if weight == 0 {
346 continue
347 }
348 priorityFunc := prioritizer.Function
349 prioritizedList, err := priorityFunc(pod, nodes)
350 if err != nil {
351 return &extenderv1.HostPriorityList{}, 0, err
352 }
353 for _, hostEntry := range *prioritizedList {
354 combinedScores[hostEntry.Name] += hostEntry.Score * weight
355 }
356 }
357 for host, score := range combinedScores {
358 result = append(result, extenderv1.HostPriority{Host: host, Score: score})
359 }
360 return &result, f.Weight, nil
361 }
362
363
364 func (f *FakeExtender) Bind(binding *v1.Binding) error {
365 if f.Binder != nil {
366 return f.Binder()
367 }
368 if len(f.FilteredNodes) != 0 {
369 for _, node := range f.FilteredNodes {
370 if node.Node().Name == binding.Target.Name {
371 f.FilteredNodes = nil
372 return nil
373 }
374 }
375 err := fmt.Errorf("Node %v not in filtered nodes %v", binding.Target.Name, f.FilteredNodes)
376 f.FilteredNodes = nil
377 return err
378 }
379 return nil
380 }
381
382
383 func (f *FakeExtender) IsBinder() bool {
384 return true
385 }
386
387
388 func (f *FakeExtender) IsPrioritizer() bool {
389 return len(f.Prioritizers) > 0
390 }
391
392
393 func (f *FakeExtender) IsFilter() bool {
394 return len(f.Predicates) > 0
395 }
396
397
398 func (f *FakeExtender) IsInterested(pod *v1.Pod) bool {
399 return !f.UnInterested
400 }
401
402 var _ framework.Extender = &FakeExtender{}
403
View as plain text