1
16
17 package scheduler
18
19 import (
20 "container/heap"
21 "context"
22 "errors"
23 "fmt"
24 "math/rand"
25 "strconv"
26 "sync"
27 "sync/atomic"
28 "time"
29
30 v1 "k8s.io/api/core/v1"
31 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
32 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
33 "k8s.io/apimachinery/pkg/util/sets"
34 clientset "k8s.io/client-go/kubernetes"
35 "k8s.io/klog/v2"
36 extenderv1 "k8s.io/kube-scheduler/extender/v1"
37 podutil "k8s.io/kubernetes/pkg/api/v1/pod"
38 "k8s.io/kubernetes/pkg/apis/core/validation"
39 "k8s.io/kubernetes/pkg/scheduler/framework"
40 "k8s.io/kubernetes/pkg/scheduler/framework/parallelize"
41 internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
42 "k8s.io/kubernetes/pkg/scheduler/metrics"
43 "k8s.io/kubernetes/pkg/scheduler/util"
44 utiltrace "k8s.io/utils/trace"
45 )
46
47 const (
48
49 pluginMetricsSamplePercent = 10
50
51
52
53
54 minFeasibleNodesToFind = 100
55
56
57
58
59 minFeasibleNodesPercentageToFind = 5
60
61
62 numberOfHighestScoredNodesToReport = 3
63 )
64
65
66 func (sched *Scheduler) ScheduleOne(ctx context.Context) {
67 logger := klog.FromContext(ctx)
68 podInfo, err := sched.NextPod(logger)
69 if err != nil {
70 logger.Error(err, "Error while retrieving next pod from scheduling queue")
71 return
72 }
73
74 if podInfo == nil || podInfo.Pod == nil {
75 return
76 }
77
78 pod := podInfo.Pod
79
80
81
82 logger = klog.LoggerWithValues(logger, "pod", klog.KObj(pod))
83 ctx = klog.NewContext(ctx, logger)
84 logger.V(4).Info("About to try and schedule pod", "pod", klog.KObj(pod))
85
86 fwk, err := sched.frameworkForPod(pod)
87 if err != nil {
88
89
90 logger.Error(err, "Error occurred")
91 return
92 }
93 if sched.skipPodSchedule(ctx, fwk, pod) {
94 return
95 }
96
97 logger.V(3).Info("Attempting to schedule pod", "pod", klog.KObj(pod))
98
99
100 start := time.Now()
101 state := framework.NewCycleState()
102 state.SetRecordPluginMetrics(rand.Intn(100) < pluginMetricsSamplePercent)
103
104
105 podsToActivate := framework.NewPodsToActivate()
106 state.Write(framework.PodsToActivateKey, podsToActivate)
107
108 schedulingCycleCtx, cancel := context.WithCancel(ctx)
109 defer cancel()
110
111 scheduleResult, assumedPodInfo, status := sched.schedulingCycle(schedulingCycleCtx, state, fwk, podInfo, start, podsToActivate)
112 if !status.IsSuccess() {
113 sched.FailureHandler(schedulingCycleCtx, fwk, assumedPodInfo, status, scheduleResult.nominatingInfo, start)
114 return
115 }
116
117
118 go func() {
119 bindingCycleCtx, cancel := context.WithCancel(ctx)
120 defer cancel()
121
122 metrics.Goroutines.WithLabelValues(metrics.Binding).Inc()
123 defer metrics.Goroutines.WithLabelValues(metrics.Binding).Dec()
124
125 status := sched.bindingCycle(bindingCycleCtx, state, fwk, scheduleResult, assumedPodInfo, start, podsToActivate)
126 if !status.IsSuccess() {
127 sched.handleBindingCycleError(bindingCycleCtx, state, fwk, assumedPodInfo, start, scheduleResult, status)
128 return
129 }
130
131
132 sched.SchedulingQueue.Done(assumedPodInfo.Pod.UID)
133 }()
134 }
135
136 var clearNominatedNode = &framework.NominatingInfo{NominatingMode: framework.ModeOverride, NominatedNodeName: ""}
137
138
139 func (sched *Scheduler) schedulingCycle(
140 ctx context.Context,
141 state *framework.CycleState,
142 fwk framework.Framework,
143 podInfo *framework.QueuedPodInfo,
144 start time.Time,
145 podsToActivate *framework.PodsToActivate,
146 ) (ScheduleResult, *framework.QueuedPodInfo, *framework.Status) {
147 logger := klog.FromContext(ctx)
148 pod := podInfo.Pod
149 scheduleResult, err := sched.SchedulePod(ctx, fwk, state, pod)
150 if err != nil {
151 defer func() {
152 metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start))
153 }()
154 if err == ErrNoNodesAvailable {
155 status := framework.NewStatus(framework.UnschedulableAndUnresolvable).WithError(err)
156 return ScheduleResult{nominatingInfo: clearNominatedNode}, podInfo, status
157 }
158
159 fitError, ok := err.(*framework.FitError)
160 if !ok {
161 logger.Error(err, "Error selecting node for pod", "pod", klog.KObj(pod))
162 return ScheduleResult{nominatingInfo: clearNominatedNode}, podInfo, framework.AsStatus(err)
163 }
164
165
166
167
168
169
170 if !fwk.HasPostFilterPlugins() {
171 logger.V(3).Info("No PostFilter plugins are registered, so no preemption will be performed")
172 return ScheduleResult{}, podInfo, framework.NewStatus(framework.Unschedulable).WithError(err)
173 }
174
175
176 result, status := fwk.RunPostFilterPlugins(ctx, state, pod, fitError.Diagnosis.NodeToStatusMap)
177 msg := status.Message()
178 fitError.Diagnosis.PostFilterMsg = msg
179 if status.Code() == framework.Error {
180 logger.Error(nil, "Status after running PostFilter plugins for pod", "pod", klog.KObj(pod), "status", msg)
181 } else {
182 logger.V(5).Info("Status after running PostFilter plugins for pod", "pod", klog.KObj(pod), "status", msg)
183 }
184
185 var nominatingInfo *framework.NominatingInfo
186 if result != nil {
187 nominatingInfo = result.NominatingInfo
188 }
189 return ScheduleResult{nominatingInfo: nominatingInfo}, podInfo, framework.NewStatus(framework.Unschedulable).WithError(err)
190 }
191
192 metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start))
193
194
195 assumedPodInfo := podInfo.DeepCopy()
196 assumedPod := assumedPodInfo.Pod
197
198 err = sched.assume(logger, assumedPod, scheduleResult.SuggestedHost)
199 if err != nil {
200
201
202
203
204
205 return ScheduleResult{nominatingInfo: clearNominatedNode}, assumedPodInfo, framework.AsStatus(err)
206 }
207
208
209 if sts := fwk.RunReservePluginsReserve(ctx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
210
211 fwk.RunReservePluginsUnreserve(ctx, state, assumedPod, scheduleResult.SuggestedHost)
212 if forgetErr := sched.Cache.ForgetPod(logger, assumedPod); forgetErr != nil {
213 logger.Error(forgetErr, "Scheduler cache ForgetPod failed")
214 }
215
216 if sts.IsRejected() {
217 fitErr := &framework.FitError{
218 NumAllNodes: 1,
219 Pod: pod,
220 Diagnosis: framework.Diagnosis{
221 NodeToStatusMap: framework.NodeToStatusMap{scheduleResult.SuggestedHost: sts},
222 },
223 }
224 fitErr.Diagnosis.AddPluginStatus(sts)
225 return ScheduleResult{nominatingInfo: clearNominatedNode}, assumedPodInfo, framework.NewStatus(sts.Code()).WithError(fitErr)
226 }
227 return ScheduleResult{nominatingInfo: clearNominatedNode}, assumedPodInfo, sts
228 }
229
230
231 runPermitStatus := fwk.RunPermitPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)
232 if !runPermitStatus.IsWait() && !runPermitStatus.IsSuccess() {
233
234 fwk.RunReservePluginsUnreserve(ctx, state, assumedPod, scheduleResult.SuggestedHost)
235 if forgetErr := sched.Cache.ForgetPod(logger, assumedPod); forgetErr != nil {
236 logger.Error(forgetErr, "Scheduler cache ForgetPod failed")
237 }
238
239 if runPermitStatus.IsRejected() {
240 fitErr := &framework.FitError{
241 NumAllNodes: 1,
242 Pod: pod,
243 Diagnosis: framework.Diagnosis{
244 NodeToStatusMap: framework.NodeToStatusMap{scheduleResult.SuggestedHost: runPermitStatus},
245 },
246 }
247 fitErr.Diagnosis.AddPluginStatus(runPermitStatus)
248 return ScheduleResult{nominatingInfo: clearNominatedNode}, assumedPodInfo, framework.NewStatus(runPermitStatus.Code()).WithError(fitErr)
249 }
250
251 return ScheduleResult{nominatingInfo: clearNominatedNode}, assumedPodInfo, runPermitStatus
252 }
253
254
255 if len(podsToActivate.Map) != 0 {
256 sched.SchedulingQueue.Activate(logger, podsToActivate.Map)
257
258 podsToActivate.Map = make(map[string]*v1.Pod)
259 }
260
261 return scheduleResult, assumedPodInfo, nil
262 }
263
264
265 func (sched *Scheduler) bindingCycle(
266 ctx context.Context,
267 state *framework.CycleState,
268 fwk framework.Framework,
269 scheduleResult ScheduleResult,
270 assumedPodInfo *framework.QueuedPodInfo,
271 start time.Time,
272 podsToActivate *framework.PodsToActivate) *framework.Status {
273 logger := klog.FromContext(ctx)
274
275 assumedPod := assumedPodInfo.Pod
276
277
278 if status := fwk.WaitOnPermit(ctx, assumedPod); !status.IsSuccess() {
279 if status.IsRejected() {
280 fitErr := &framework.FitError{
281 NumAllNodes: 1,
282 Pod: assumedPodInfo.Pod,
283 Diagnosis: framework.Diagnosis{
284 NodeToStatusMap: framework.NodeToStatusMap{scheduleResult.SuggestedHost: status},
285 UnschedulablePlugins: sets.New(status.Plugin()),
286 },
287 }
288 return framework.NewStatus(status.Code()).WithError(fitErr)
289 }
290 return status
291 }
292
293
294 if status := fwk.RunPreBindPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost); !status.IsSuccess() {
295 return status
296 }
297
298
299 if status := sched.bind(ctx, fwk, assumedPod, scheduleResult.SuggestedHost, state); !status.IsSuccess() {
300 return status
301 }
302
303
304 logger.V(2).Info("Successfully bound pod to node", "pod", klog.KObj(assumedPod), "node", scheduleResult.SuggestedHost, "evaluatedNodes", scheduleResult.EvaluatedNodes, "feasibleNodes", scheduleResult.FeasibleNodes)
305 metrics.PodScheduled(fwk.ProfileName(), metrics.SinceInSeconds(start))
306 metrics.PodSchedulingAttempts.Observe(float64(assumedPodInfo.Attempts))
307 if assumedPodInfo.InitialAttemptTimestamp != nil {
308 metrics.PodSchedulingDuration.WithLabelValues(getAttemptsLabel(assumedPodInfo)).Observe(metrics.SinceInSeconds(*assumedPodInfo.InitialAttemptTimestamp))
309 metrics.PodSchedulingSLIDuration.WithLabelValues(getAttemptsLabel(assumedPodInfo)).Observe(metrics.SinceInSeconds(*assumedPodInfo.InitialAttemptTimestamp))
310 }
311
312 fwk.RunPostBindPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)
313
314
315 if len(podsToActivate.Map) != 0 {
316 sched.SchedulingQueue.Activate(logger, podsToActivate.Map)
317
318
319 }
320
321 return nil
322 }
323
324 func (sched *Scheduler) handleBindingCycleError(
325 ctx context.Context,
326 state *framework.CycleState,
327 fwk framework.Framework,
328 podInfo *framework.QueuedPodInfo,
329 start time.Time,
330 scheduleResult ScheduleResult,
331 status *framework.Status) {
332 logger := klog.FromContext(ctx)
333
334 assumedPod := podInfo.Pod
335
336 fwk.RunReservePluginsUnreserve(ctx, state, assumedPod, scheduleResult.SuggestedHost)
337 if forgetErr := sched.Cache.ForgetPod(logger, assumedPod); forgetErr != nil {
338 logger.Error(forgetErr, "scheduler cache ForgetPod failed")
339 } else {
340
341
342
343
344
345
346 if status.IsRejected() {
347 defer sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, internalqueue.AssignedPodDelete, assumedPod, nil, func(pod *v1.Pod) bool {
348 return assumedPod.UID != pod.UID
349 })
350 } else {
351 sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, internalqueue.AssignedPodDelete, assumedPod, nil, nil)
352 }
353 }
354
355 sched.FailureHandler(ctx, fwk, podInfo, status, clearNominatedNode, start)
356 }
357
358 func (sched *Scheduler) frameworkForPod(pod *v1.Pod) (framework.Framework, error) {
359 fwk, ok := sched.Profiles[pod.Spec.SchedulerName]
360 if !ok {
361 return nil, fmt.Errorf("profile not found for scheduler name %q", pod.Spec.SchedulerName)
362 }
363 return fwk, nil
364 }
365
366
367 func (sched *Scheduler) skipPodSchedule(ctx context.Context, fwk framework.Framework, pod *v1.Pod) bool {
368
369 if pod.DeletionTimestamp != nil {
370 fwk.EventRecorder().Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
371 klog.FromContext(ctx).V(3).Info("Skip schedule deleting pod", "pod", klog.KObj(pod))
372 return true
373 }
374
375
376
377
378 isAssumed, err := sched.Cache.IsAssumedPod(pod)
379 if err != nil {
380
381 utilruntime.HandleError(fmt.Errorf("failed to check whether pod %s/%s is assumed: %v", pod.Namespace, pod.Name, err))
382 return false
383 }
384 return isAssumed
385 }
386
387
388
389
390 func (sched *Scheduler) schedulePod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
391 trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name})
392 defer trace.LogIfLong(100 * time.Millisecond)
393 if err := sched.Cache.UpdateSnapshot(klog.FromContext(ctx), sched.nodeInfoSnapshot); err != nil {
394 return result, err
395 }
396 trace.Step("Snapshotting scheduler cache and node infos done")
397
398 if sched.nodeInfoSnapshot.NumNodes() == 0 {
399 return result, ErrNoNodesAvailable
400 }
401
402 feasibleNodes, diagnosis, err := sched.findNodesThatFitPod(ctx, fwk, state, pod)
403 if err != nil {
404 return result, err
405 }
406 trace.Step("Computing predicates done")
407
408 if len(feasibleNodes) == 0 {
409 return result, &framework.FitError{
410 Pod: pod,
411 NumAllNodes: sched.nodeInfoSnapshot.NumNodes(),
412 Diagnosis: diagnosis,
413 }
414 }
415
416
417 if len(feasibleNodes) == 1 {
418 return ScheduleResult{
419 SuggestedHost: feasibleNodes[0].Node().Name,
420 EvaluatedNodes: 1 + len(diagnosis.NodeToStatusMap),
421 FeasibleNodes: 1,
422 }, nil
423 }
424
425 priorityList, err := prioritizeNodes(ctx, sched.Extenders, fwk, state, pod, feasibleNodes)
426 if err != nil {
427 return result, err
428 }
429
430 host, _, err := selectHost(priorityList, numberOfHighestScoredNodesToReport)
431 trace.Step("Prioritizing done")
432
433 return ScheduleResult{
434 SuggestedHost: host,
435 EvaluatedNodes: len(feasibleNodes) + len(diagnosis.NodeToStatusMap),
436 FeasibleNodes: len(feasibleNodes),
437 }, err
438 }
439
440
441
442 func (sched *Scheduler) findNodesThatFitPod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) ([]*framework.NodeInfo, framework.Diagnosis, error) {
443 logger := klog.FromContext(ctx)
444
445 allNodes, err := sched.nodeInfoSnapshot.NodeInfos().List()
446 if err != nil {
447 return nil, framework.Diagnosis{
448 NodeToStatusMap: make(framework.NodeToStatusMap),
449 }, err
450 }
451
452 diagnosis := framework.Diagnosis{
453 NodeToStatusMap: make(framework.NodeToStatusMap, len(allNodes)),
454 }
455
456 preRes, s := fwk.RunPreFilterPlugins(ctx, state, pod)
457 if !s.IsSuccess() {
458 if !s.IsRejected() {
459 return nil, diagnosis, s.AsError()
460 }
461
462
463 for _, n := range allNodes {
464 diagnosis.NodeToStatusMap[n.Node().Name] = s
465 }
466
467
468 msg := s.Message()
469 diagnosis.PreFilterMsg = msg
470 logger.V(5).Info("Status after running PreFilter plugins for pod", "pod", klog.KObj(pod), "status", msg)
471 diagnosis.AddPluginStatus(s)
472 return nil, diagnosis, nil
473 }
474
475
476
477 if len(pod.Status.NominatedNodeName) > 0 {
478 feasibleNodes, err := sched.evaluateNominatedNode(ctx, pod, fwk, state, diagnosis)
479 if err != nil {
480 logger.Error(err, "Evaluation failed on nominated node", "pod", klog.KObj(pod), "node", pod.Status.NominatedNodeName)
481 }
482
483 if len(feasibleNodes) != 0 {
484 return feasibleNodes, diagnosis, nil
485 }
486 }
487
488 nodes := allNodes
489 if !preRes.AllNodes() {
490 nodes = make([]*framework.NodeInfo, 0, len(preRes.NodeNames))
491 for _, n := range allNodes {
492 if !preRes.NodeNames.Has(n.Node().Name) {
493
494
495 diagnosis.NodeToStatusMap[n.Node().Name] = framework.NewStatus(framework.UnschedulableAndUnresolvable, "node is filtered out by the prefilter result")
496 continue
497 }
498 nodes = append(nodes, n)
499 }
500 }
501 feasibleNodes, err := sched.findNodesThatPassFilters(ctx, fwk, state, pod, &diagnosis, nodes)
502
503
504 processedNodes := len(feasibleNodes) + len(diagnosis.NodeToStatusMap)
505 sched.nextStartNodeIndex = (sched.nextStartNodeIndex + processedNodes) % len(nodes)
506 if err != nil {
507 return nil, diagnosis, err
508 }
509
510 feasibleNodesAfterExtender, err := findNodesThatPassExtenders(ctx, sched.Extenders, pod, feasibleNodes, diagnosis.NodeToStatusMap)
511 if err != nil {
512 return nil, diagnosis, err
513 }
514 if len(feasibleNodesAfterExtender) != len(feasibleNodes) {
515
516
517
518
519
520
521
522
523 if diagnosis.UnschedulablePlugins == nil {
524 diagnosis.UnschedulablePlugins = sets.New[string]()
525 }
526 diagnosis.UnschedulablePlugins.Insert(framework.ExtenderName)
527 }
528
529 return feasibleNodesAfterExtender, diagnosis, nil
530 }
531
532 func (sched *Scheduler) evaluateNominatedNode(ctx context.Context, pod *v1.Pod, fwk framework.Framework, state *framework.CycleState, diagnosis framework.Diagnosis) ([]*framework.NodeInfo, error) {
533 nnn := pod.Status.NominatedNodeName
534 nodeInfo, err := sched.nodeInfoSnapshot.Get(nnn)
535 if err != nil {
536 return nil, err
537 }
538 node := []*framework.NodeInfo{nodeInfo}
539 feasibleNodes, err := sched.findNodesThatPassFilters(ctx, fwk, state, pod, &diagnosis, node)
540 if err != nil {
541 return nil, err
542 }
543
544 feasibleNodes, err = findNodesThatPassExtenders(ctx, sched.Extenders, pod, feasibleNodes, diagnosis.NodeToStatusMap)
545 if err != nil {
546 return nil, err
547 }
548
549 return feasibleNodes, nil
550 }
551
552
553 func (sched *Scheduler) hasScoring(fwk framework.Framework) bool {
554 if fwk.HasScorePlugins() {
555 return true
556 }
557 for _, extender := range sched.Extenders {
558 if extender.IsPrioritizer() {
559 return true
560 }
561 }
562 return false
563 }
564
565
566 func (sched *Scheduler) hasExtenderFilters() bool {
567 for _, extender := range sched.Extenders {
568 if extender.IsFilter() {
569 return true
570 }
571 }
572 return false
573 }
574
575
576 func (sched *Scheduler) findNodesThatPassFilters(
577 ctx context.Context,
578 fwk framework.Framework,
579 state *framework.CycleState,
580 pod *v1.Pod,
581 diagnosis *framework.Diagnosis,
582 nodes []*framework.NodeInfo) ([]*framework.NodeInfo, error) {
583 numAllNodes := len(nodes)
584 numNodesToFind := sched.numFeasibleNodesToFind(fwk.PercentageOfNodesToScore(), int32(numAllNodes))
585 if !sched.hasExtenderFilters() && !sched.hasScoring(fwk) {
586 numNodesToFind = 1
587 }
588
589
590
591 feasibleNodes := make([]*framework.NodeInfo, numNodesToFind)
592
593 if !fwk.HasFilterPlugins() {
594 for i := range feasibleNodes {
595 feasibleNodes[i] = nodes[(sched.nextStartNodeIndex+i)%numAllNodes]
596 }
597 return feasibleNodes, nil
598 }
599
600 errCh := parallelize.NewErrorChannel()
601 var feasibleNodesLen int32
602 ctx, cancel := context.WithCancel(ctx)
603 defer cancel()
604
605 type nodeStatus struct {
606 node string
607 status *framework.Status
608 }
609 result := make([]*nodeStatus, numAllNodes)
610 checkNode := func(i int) {
611
612
613 nodeInfo := nodes[(sched.nextStartNodeIndex+i)%numAllNodes]
614 status := fwk.RunFilterPluginsWithNominatedPods(ctx, state, pod, nodeInfo)
615 if status.Code() == framework.Error {
616 errCh.SendErrorWithCancel(status.AsError(), cancel)
617 return
618 }
619 if status.IsSuccess() {
620 length := atomic.AddInt32(&feasibleNodesLen, 1)
621 if length > numNodesToFind {
622 cancel()
623 atomic.AddInt32(&feasibleNodesLen, -1)
624 } else {
625 feasibleNodes[length-1] = nodeInfo
626 }
627 } else {
628 result[i] = &nodeStatus{node: nodeInfo.Node().Name, status: status}
629 }
630 }
631
632 beginCheckNode := time.Now()
633 statusCode := framework.Success
634 defer func() {
635
636
637
638 metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.Filter, statusCode.String(), fwk.ProfileName()).Observe(metrics.SinceInSeconds(beginCheckNode))
639 }()
640
641
642
643 fwk.Parallelizer().Until(ctx, numAllNodes, checkNode, metrics.Filter)
644 feasibleNodes = feasibleNodes[:feasibleNodesLen]
645 for _, item := range result {
646 if item == nil {
647 continue
648 }
649 diagnosis.NodeToStatusMap[item.node] = item.status
650 diagnosis.AddPluginStatus(item.status)
651 }
652 if err := errCh.ReceiveError(); err != nil {
653 statusCode = framework.Error
654 return feasibleNodes, err
655 }
656 return feasibleNodes, nil
657 }
658
659
660
661 func (sched *Scheduler) numFeasibleNodesToFind(percentageOfNodesToScore *int32, numAllNodes int32) (numNodes int32) {
662 if numAllNodes < minFeasibleNodesToFind {
663 return numAllNodes
664 }
665
666
667 var percentage int32
668 if percentageOfNodesToScore != nil {
669 percentage = *percentageOfNodesToScore
670 } else {
671 percentage = sched.percentageOfNodesToScore
672 }
673
674 if percentage == 0 {
675 percentage = int32(50) - numAllNodes/125
676 if percentage < minFeasibleNodesPercentageToFind {
677 percentage = minFeasibleNodesPercentageToFind
678 }
679 }
680
681 numNodes = numAllNodes * percentage / 100
682 if numNodes < minFeasibleNodesToFind {
683 return minFeasibleNodesToFind
684 }
685
686 return numNodes
687 }
688
689 func findNodesThatPassExtenders(ctx context.Context, extenders []framework.Extender, pod *v1.Pod, feasibleNodes []*framework.NodeInfo, statuses framework.NodeToStatusMap) ([]*framework.NodeInfo, error) {
690 logger := klog.FromContext(ctx)
691
692
693
694 for _, extender := range extenders {
695 if len(feasibleNodes) == 0 {
696 break
697 }
698 if !extender.IsInterested(pod) {
699 continue
700 }
701
702
703
704
705
706
707 feasibleList, failedMap, failedAndUnresolvableMap, err := extender.Filter(pod, feasibleNodes)
708 if err != nil {
709 if extender.IsIgnorable() {
710 logger.Info("Skipping extender as it returned error and has ignorable flag set", "extender", extender, "err", err)
711 continue
712 }
713 return nil, err
714 }
715
716 for failedNodeName, failedMsg := range failedAndUnresolvableMap {
717 var aggregatedReasons []string
718 if _, found := statuses[failedNodeName]; found {
719 aggregatedReasons = statuses[failedNodeName].Reasons()
720 }
721 aggregatedReasons = append(aggregatedReasons, failedMsg)
722 statuses[failedNodeName] = framework.NewStatus(framework.UnschedulableAndUnresolvable, aggregatedReasons...)
723 }
724
725 for failedNodeName, failedMsg := range failedMap {
726 if _, found := failedAndUnresolvableMap[failedNodeName]; found {
727
728
729 continue
730 }
731 if _, found := statuses[failedNodeName]; !found {
732 statuses[failedNodeName] = framework.NewStatus(framework.Unschedulable, failedMsg)
733 } else {
734 statuses[failedNodeName].AppendReason(failedMsg)
735 }
736 }
737
738 feasibleNodes = feasibleList
739 }
740 return feasibleNodes, nil
741 }
742
743
744
745
746
747
748 func prioritizeNodes(
749 ctx context.Context,
750 extenders []framework.Extender,
751 fwk framework.Framework,
752 state *framework.CycleState,
753 pod *v1.Pod,
754 nodes []*framework.NodeInfo,
755 ) ([]framework.NodePluginScores, error) {
756 logger := klog.FromContext(ctx)
757
758
759 if len(extenders) == 0 && !fwk.HasScorePlugins() {
760 result := make([]framework.NodePluginScores, 0, len(nodes))
761 for i := range nodes {
762 result = append(result, framework.NodePluginScores{
763 Name: nodes[i].Node().Name,
764 TotalScore: 1,
765 })
766 }
767 return result, nil
768 }
769
770
771 preScoreStatus := fwk.RunPreScorePlugins(ctx, state, pod, nodes)
772 if !preScoreStatus.IsSuccess() {
773 return nil, preScoreStatus.AsError()
774 }
775
776
777 nodesScores, scoreStatus := fwk.RunScorePlugins(ctx, state, pod, nodes)
778 if !scoreStatus.IsSuccess() {
779 return nil, scoreStatus.AsError()
780 }
781
782
783 loggerVTen := logger.V(10)
784 if loggerVTen.Enabled() {
785 for _, nodeScore := range nodesScores {
786 for _, pluginScore := range nodeScore.Scores {
787 loggerVTen.Info("Plugin scored node for pod", "pod", klog.KObj(pod), "plugin", pluginScore.Name, "node", nodeScore.Name, "score", pluginScore.Score)
788 }
789 }
790 }
791
792 if len(extenders) != 0 && nodes != nil {
793
794
795 allNodeExtendersScores := make(map[string]*framework.NodePluginScores, len(nodes))
796 var mu sync.Mutex
797 var wg sync.WaitGroup
798 for i := range extenders {
799 if !extenders[i].IsInterested(pod) {
800 continue
801 }
802 wg.Add(1)
803 go func(extIndex int) {
804 metrics.Goroutines.WithLabelValues(metrics.PrioritizingExtender).Inc()
805 defer func() {
806 metrics.Goroutines.WithLabelValues(metrics.PrioritizingExtender).Dec()
807 wg.Done()
808 }()
809 prioritizedList, weight, err := extenders[extIndex].Prioritize(pod, nodes)
810 if err != nil {
811
812 logger.V(5).Info("Failed to run extender's priority function. No score given by this extender.", "error", err, "pod", klog.KObj(pod), "extender", extenders[extIndex].Name())
813 return
814 }
815 mu.Lock()
816 defer mu.Unlock()
817 for i := range *prioritizedList {
818 nodename := (*prioritizedList)[i].Host
819 score := (*prioritizedList)[i].Score
820 if loggerVTen.Enabled() {
821 loggerVTen.Info("Extender scored node for pod", "pod", klog.KObj(pod), "extender", extenders[extIndex].Name(), "node", nodename, "score", score)
822 }
823
824
825
826 finalscore := score * weight * (framework.MaxNodeScore / extenderv1.MaxExtenderPriority)
827
828 if allNodeExtendersScores[nodename] == nil {
829 allNodeExtendersScores[nodename] = &framework.NodePluginScores{
830 Name: nodename,
831 Scores: make([]framework.PluginScore, 0, len(extenders)),
832 }
833 }
834 allNodeExtendersScores[nodename].Scores = append(allNodeExtendersScores[nodename].Scores, framework.PluginScore{
835 Name: extenders[extIndex].Name(),
836 Score: finalscore,
837 })
838 allNodeExtendersScores[nodename].TotalScore += finalscore
839 }
840 }(i)
841 }
842
843 wg.Wait()
844 for i := range nodesScores {
845 if score, ok := allNodeExtendersScores[nodes[i].Node().Name]; ok {
846 nodesScores[i].Scores = append(nodesScores[i].Scores, score.Scores...)
847 nodesScores[i].TotalScore += score.TotalScore
848 }
849 }
850 }
851
852 if loggerVTen.Enabled() {
853 for i := range nodesScores {
854 loggerVTen.Info("Calculated node's final score for pod", "pod", klog.KObj(pod), "node", nodesScores[i].Name, "score", nodesScores[i].TotalScore)
855 }
856 }
857 return nodesScores, nil
858 }
859
860 var errEmptyPriorityList = errors.New("empty priorityList")
861
862
863
864
865
866 func selectHost(nodeScoreList []framework.NodePluginScores, count int) (string, []framework.NodePluginScores, error) {
867 if len(nodeScoreList) == 0 {
868 return "", nil, errEmptyPriorityList
869 }
870
871 var h nodeScoreHeap = nodeScoreList
872 heap.Init(&h)
873 cntOfMaxScore := 1
874 selectedIndex := 0
875
876 sortedNodeScoreList := make([]framework.NodePluginScores, 0, count)
877 sortedNodeScoreList = append(sortedNodeScoreList, heap.Pop(&h).(framework.NodePluginScores))
878
879
880
881 for ns := heap.Pop(&h).(framework.NodePluginScores); ; ns = heap.Pop(&h).(framework.NodePluginScores) {
882 if ns.TotalScore != sortedNodeScoreList[0].TotalScore && len(sortedNodeScoreList) == count {
883 break
884 }
885
886 if ns.TotalScore == sortedNodeScoreList[0].TotalScore {
887 cntOfMaxScore++
888 if rand.Intn(cntOfMaxScore) == 0 {
889
890 selectedIndex = cntOfMaxScore - 1
891 }
892 }
893
894 sortedNodeScoreList = append(sortedNodeScoreList, ns)
895
896 if h.Len() == 0 {
897 break
898 }
899 }
900
901 if selectedIndex != 0 {
902
903 previous := sortedNodeScoreList[0]
904 sortedNodeScoreList[0] = sortedNodeScoreList[selectedIndex]
905 sortedNodeScoreList[selectedIndex] = previous
906 }
907
908 if len(sortedNodeScoreList) > count {
909 sortedNodeScoreList = sortedNodeScoreList[:count]
910 }
911
912 return sortedNodeScoreList[0].Name, sortedNodeScoreList, nil
913 }
914
915
916 type nodeScoreHeap []framework.NodePluginScores
917
918
919 var _ heap.Interface = &nodeScoreHeap{}
920
921 func (h nodeScoreHeap) Len() int { return len(h) }
922 func (h nodeScoreHeap) Less(i, j int) bool { return h[i].TotalScore > h[j].TotalScore }
923 func (h nodeScoreHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
924
925 func (h *nodeScoreHeap) Push(x interface{}) {
926 *h = append(*h, x.(framework.NodePluginScores))
927 }
928
929 func (h *nodeScoreHeap) Pop() interface{} {
930 old := *h
931 n := len(old)
932 x := old[n-1]
933 *h = old[0 : n-1]
934 return x
935 }
936
937
938
939 func (sched *Scheduler) assume(logger klog.Logger, assumed *v1.Pod, host string) error {
940
941
942
943
944 assumed.Spec.NodeName = host
945
946 if err := sched.Cache.AssumePod(logger, assumed); err != nil {
947 logger.Error(err, "Scheduler cache AssumePod failed")
948 return err
949 }
950
951 if sched.SchedulingQueue != nil {
952 sched.SchedulingQueue.DeleteNominatedPodIfExists(assumed)
953 }
954
955 return nil
956 }
957
958
959
960
961 func (sched *Scheduler) bind(ctx context.Context, fwk framework.Framework, assumed *v1.Pod, targetNode string, state *framework.CycleState) (status *framework.Status) {
962 logger := klog.FromContext(ctx)
963 defer func() {
964 sched.finishBinding(logger, fwk, assumed, targetNode, status)
965 }()
966
967 bound, err := sched.extendersBinding(logger, assumed, targetNode)
968 if bound {
969 return framework.AsStatus(err)
970 }
971 return fwk.RunBindPlugins(ctx, state, assumed, targetNode)
972 }
973
974
975 func (sched *Scheduler) extendersBinding(logger klog.Logger, pod *v1.Pod, node string) (bool, error) {
976 for _, extender := range sched.Extenders {
977 if !extender.IsBinder() || !extender.IsInterested(pod) {
978 continue
979 }
980 err := extender.Bind(&v1.Binding{
981 ObjectMeta: metav1.ObjectMeta{Namespace: pod.Namespace, Name: pod.Name, UID: pod.UID},
982 Target: v1.ObjectReference{Kind: "Node", Name: node},
983 })
984 if err != nil && extender.IsIgnorable() {
985 logger.Info("Skipping extender in bind as it returned error and has ignorable flag set", "extender", extender, "err", err)
986 continue
987 }
988 return true, err
989 }
990 return false, nil
991 }
992
993 func (sched *Scheduler) finishBinding(logger klog.Logger, fwk framework.Framework, assumed *v1.Pod, targetNode string, status *framework.Status) {
994 if finErr := sched.Cache.FinishBinding(logger, assumed); finErr != nil {
995 logger.Error(finErr, "Scheduler cache FinishBinding failed")
996 }
997 if !status.IsSuccess() {
998 logger.V(1).Info("Failed to bind pod", "pod", klog.KObj(assumed))
999 return
1000 }
1001
1002 fwk.EventRecorder().Eventf(assumed, nil, v1.EventTypeNormal, "Scheduled", "Binding", "Successfully assigned %v/%v to %v", assumed.Namespace, assumed.Name, targetNode)
1003 }
1004
1005 func getAttemptsLabel(p *framework.QueuedPodInfo) string {
1006
1007
1008 if p.Attempts >= 15 {
1009 return "15+"
1010 }
1011 return strconv.Itoa(p.Attempts)
1012 }
1013
1014
1015
1016 func (sched *Scheduler) handleSchedulingFailure(ctx context.Context, fwk framework.Framework, podInfo *framework.QueuedPodInfo, status *framework.Status, nominatingInfo *framework.NominatingInfo, start time.Time) {
1017 calledDone := false
1018 defer func() {
1019 if !calledDone {
1020
1021
1022
1023 sched.SchedulingQueue.Done(podInfo.Pod.UID)
1024 }
1025 }()
1026
1027 logger := klog.FromContext(ctx)
1028 reason := v1.PodReasonSchedulerError
1029 if status.IsRejected() {
1030 reason = v1.PodReasonUnschedulable
1031 }
1032
1033 switch reason {
1034 case v1.PodReasonUnschedulable:
1035 metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
1036 case v1.PodReasonSchedulerError:
1037 metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
1038 }
1039
1040 pod := podInfo.Pod
1041 err := status.AsError()
1042 errMsg := status.Message()
1043
1044 if err == ErrNoNodesAvailable {
1045 logger.V(2).Info("Unable to schedule pod; no nodes are registered to the cluster; waiting", "pod", klog.KObj(pod))
1046 } else if fitError, ok := err.(*framework.FitError); ok {
1047 podInfo.UnschedulablePlugins = fitError.Diagnosis.UnschedulablePlugins
1048 podInfo.PendingPlugins = fitError.Diagnosis.PendingPlugins
1049 logger.V(2).Info("Unable to schedule pod; no fit; waiting", "pod", klog.KObj(pod), "err", errMsg)
1050 } else {
1051 logger.Error(err, "Error scheduling pod; retrying", "pod", klog.KObj(pod))
1052 }
1053
1054
1055 podLister := fwk.SharedInformerFactory().Core().V1().Pods().Lister()
1056 cachedPod, e := podLister.Pods(pod.Namespace).Get(pod.Name)
1057 if e != nil {
1058 logger.Info("Pod doesn't exist in informer cache", "pod", klog.KObj(pod), "err", e)
1059
1060 } else {
1061
1062
1063 if len(cachedPod.Spec.NodeName) != 0 {
1064 logger.Info("Pod has been assigned to node. Abort adding it back to queue.", "pod", klog.KObj(pod), "node", cachedPod.Spec.NodeName)
1065
1066 } else {
1067
1068
1069
1070 podInfo.PodInfo, _ = framework.NewPodInfo(cachedPod.DeepCopy())
1071 if err := sched.SchedulingQueue.AddUnschedulableIfNotPresent(logger, podInfo, sched.SchedulingQueue.SchedulingCycle()); err != nil {
1072 logger.Error(err, "Error occurred")
1073 }
1074 calledDone = true
1075 }
1076 }
1077
1078
1079
1080
1081
1082 if sched.SchedulingQueue != nil {
1083 logger := klog.FromContext(ctx)
1084 sched.SchedulingQueue.AddNominatedPod(logger, podInfo.PodInfo, nominatingInfo)
1085 }
1086
1087 if err == nil {
1088
1089 return
1090 }
1091
1092 msg := truncateMessage(errMsg)
1093 fwk.EventRecorder().Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", msg)
1094 if err := updatePod(ctx, sched.client, pod, &v1.PodCondition{
1095 Type: v1.PodScheduled,
1096 Status: v1.ConditionFalse,
1097 Reason: reason,
1098 Message: errMsg,
1099 }, nominatingInfo); err != nil {
1100 klog.FromContext(ctx).Error(err, "Error updating pod", "pod", klog.KObj(pod))
1101 }
1102 }
1103
1104
1105 func truncateMessage(message string) string {
1106 max := validation.NoteLengthLimit
1107 if len(message) <= max {
1108 return message
1109 }
1110 suffix := " ..."
1111 return message[:max-len(suffix)] + suffix
1112 }
1113
1114 func updatePod(ctx context.Context, client clientset.Interface, pod *v1.Pod, condition *v1.PodCondition, nominatingInfo *framework.NominatingInfo) error {
1115 logger := klog.FromContext(ctx)
1116 logger.V(3).Info("Updating pod condition", "pod", klog.KObj(pod), "conditionType", condition.Type, "conditionStatus", condition.Status, "conditionReason", condition.Reason)
1117 podStatusCopy := pod.Status.DeepCopy()
1118
1119
1120 nnnNeedsUpdate := nominatingInfo.Mode() == framework.ModeOverride && pod.Status.NominatedNodeName != nominatingInfo.NominatedNodeName
1121 if !podutil.UpdatePodCondition(podStatusCopy, condition) && !nnnNeedsUpdate {
1122 return nil
1123 }
1124 if nnnNeedsUpdate {
1125 podStatusCopy.NominatedNodeName = nominatingInfo.NominatedNodeName
1126 }
1127 return util.PatchPodStatus(ctx, client, pod, podStatusCopy)
1128 }
1129
View as plain text