1
16
17 package noderesources
18
19 import (
20 "context"
21 "fmt"
22 "strings"
23
24 "github.com/google/go-cmp/cmp"
25 v1 "k8s.io/api/core/v1"
26 "k8s.io/apimachinery/pkg/runtime"
27 "k8s.io/apimachinery/pkg/util/sets"
28 "k8s.io/klog/v2"
29 "k8s.io/kubernetes/pkg/api/v1/resource"
30 v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
31 "k8s.io/kubernetes/pkg/scheduler/apis/config"
32 "k8s.io/kubernetes/pkg/scheduler/apis/config/validation"
33 "k8s.io/kubernetes/pkg/scheduler/framework"
34 "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
35 "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
36 schedutil "k8s.io/kubernetes/pkg/scheduler/util"
37 )
38
39 var _ framework.PreFilterPlugin = &Fit{}
40 var _ framework.FilterPlugin = &Fit{}
41 var _ framework.EnqueueExtensions = &Fit{}
42 var _ framework.PreScorePlugin = &Fit{}
43 var _ framework.ScorePlugin = &Fit{}
44
45 const (
46
47 Name = names.NodeResourcesFit
48
49
50
51 preFilterStateKey = "PreFilter" + Name
52
53
54 preScoreStateKey = "PreScore" + Name
55 )
56
57
58 var nodeResourceStrategyTypeMap = map[config.ScoringStrategyType]scorer{
59 config.LeastAllocated: func(args *config.NodeResourcesFitArgs) *resourceAllocationScorer {
60 resources := args.ScoringStrategy.Resources
61 return &resourceAllocationScorer{
62 Name: string(config.LeastAllocated),
63 scorer: leastResourceScorer(resources),
64 resources: resources,
65 }
66 },
67 config.MostAllocated: func(args *config.NodeResourcesFitArgs) *resourceAllocationScorer {
68 resources := args.ScoringStrategy.Resources
69 return &resourceAllocationScorer{
70 Name: string(config.MostAllocated),
71 scorer: mostResourceScorer(resources),
72 resources: resources,
73 }
74 },
75 config.RequestedToCapacityRatio: func(args *config.NodeResourcesFitArgs) *resourceAllocationScorer {
76 resources := args.ScoringStrategy.Resources
77 return &resourceAllocationScorer{
78 Name: string(config.RequestedToCapacityRatio),
79 scorer: requestedToCapacityRatioScorer(resources, args.ScoringStrategy.RequestedToCapacityRatio.Shape),
80 resources: resources,
81 }
82 },
83 }
84
85
86 type Fit struct {
87 ignoredResources sets.Set[string]
88 ignoredResourceGroups sets.Set[string]
89 enableInPlacePodVerticalScaling bool
90 enableSidecarContainers bool
91 handle framework.Handle
92 resourceAllocationScorer
93 }
94
95
96 func (f *Fit) ScoreExtensions() framework.ScoreExtensions {
97 return nil
98 }
99
100
101 type preFilterState struct {
102 framework.Resource
103 }
104
105
106 func (s *preFilterState) Clone() framework.StateData {
107 return s
108 }
109
110
111 type preScoreState struct {
112
113
114 podRequests []int64
115 }
116
117
118
119 func (s *preScoreState) Clone() framework.StateData {
120 return s
121 }
122
123
124 func (f *Fit) PreScore(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodes []*framework.NodeInfo) *framework.Status {
125 state := &preScoreState{
126 podRequests: f.calculatePodResourceRequestList(pod, f.resources),
127 }
128 cycleState.Write(preScoreStateKey, state)
129 return nil
130 }
131
132 func getPreScoreState(cycleState *framework.CycleState) (*preScoreState, error) {
133 c, err := cycleState.Read(preScoreStateKey)
134 if err != nil {
135 return nil, fmt.Errorf("reading %q from cycleState: %w", preScoreStateKey, err)
136 }
137
138 s, ok := c.(*preScoreState)
139 if !ok {
140 return nil, fmt.Errorf("invalid PreScore state, got type %T", c)
141 }
142 return s, nil
143 }
144
145
146 func (f *Fit) Name() string {
147 return Name
148 }
149
150
151 func NewFit(_ context.Context, plArgs runtime.Object, h framework.Handle, fts feature.Features) (framework.Plugin, error) {
152 args, ok := plArgs.(*config.NodeResourcesFitArgs)
153 if !ok {
154 return nil, fmt.Errorf("want args to be of type NodeResourcesFitArgs, got %T", plArgs)
155 }
156 if err := validation.ValidateNodeResourcesFitArgs(nil, args); err != nil {
157 return nil, err
158 }
159
160 if args.ScoringStrategy == nil {
161 return nil, fmt.Errorf("scoring strategy not specified")
162 }
163
164 strategy := args.ScoringStrategy.Type
165 scorePlugin, exists := nodeResourceStrategyTypeMap[strategy]
166 if !exists {
167 return nil, fmt.Errorf("scoring strategy %s is not supported", strategy)
168 }
169
170 return &Fit{
171 ignoredResources: sets.New(args.IgnoredResources...),
172 ignoredResourceGroups: sets.New(args.IgnoredResourceGroups...),
173 enableInPlacePodVerticalScaling: fts.EnableInPlacePodVerticalScaling,
174 enableSidecarContainers: fts.EnableSidecarContainers,
175 handle: h,
176 resourceAllocationScorer: *scorePlugin(args),
177 }, nil
178 }
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207 func computePodResourceRequest(pod *v1.Pod) *preFilterState {
208
209 reqs := resource.PodRequests(pod, resource.PodResourcesOptions{})
210 result := &preFilterState{}
211 result.SetMaxResource(reqs)
212 return result
213 }
214
215
216 func (f *Fit) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
217 if !f.enableSidecarContainers && hasRestartableInitContainer(pod) {
218
219
220
221
222
223 return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, "Pod has a restartable init container and the SidecarContainers feature is disabled")
224 }
225 cycleState.Write(preFilterStateKey, computePodResourceRequest(pod))
226 return nil, nil
227 }
228
229
230 func (f *Fit) PreFilterExtensions() framework.PreFilterExtensions {
231 return nil
232 }
233
234 func getPreFilterState(cycleState *framework.CycleState) (*preFilterState, error) {
235 c, err := cycleState.Read(preFilterStateKey)
236 if err != nil {
237
238 return nil, fmt.Errorf("error reading %q from cycleState: %w", preFilterStateKey, err)
239 }
240
241 s, ok := c.(*preFilterState)
242 if !ok {
243 return nil, fmt.Errorf("%+v convert to NodeResourcesFit.preFilterState error", c)
244 }
245 return s, nil
246 }
247
248
249
250 func (f *Fit) EventsToRegister() []framework.ClusterEventWithHint {
251 podActionType := framework.Delete
252 if f.enableInPlacePodVerticalScaling {
253
254
255 podActionType |= framework.Update
256 }
257 return []framework.ClusterEventWithHint{
258 {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: podActionType}, QueueingHintFn: f.isSchedulableAfterPodChange},
259 {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Update}, QueueingHintFn: f.isSchedulableAfterNodeChange},
260 }
261 }
262
263
264
265 func (f *Fit) isSchedulableAfterPodChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
266 originalPod, modifiedPod, err := schedutil.As[*v1.Pod](oldObj, newObj)
267 if err != nil {
268 return framework.Queue, err
269 }
270
271 if modifiedPod == nil {
272 if originalPod.Spec.NodeName == "" {
273 logger.V(5).Info("the deleted pod was unscheduled and it wouldn't make the unscheduled pod schedulable", "pod", klog.KObj(pod), "deletedPod", klog.KObj(originalPod))
274 return framework.QueueSkip, nil
275 }
276 logger.V(5).Info("another scheduled pod was deleted, and it may make the unscheduled pod schedulable", "pod", klog.KObj(pod), "deletedPod", klog.KObj(originalPod))
277 return framework.Queue, nil
278 }
279
280 if !f.enableInPlacePodVerticalScaling {
281
282 logger.V(5).Info("another pod was modified, but InPlacePodVerticalScaling is disabled, so it doesn't make the unscheduled pod schedulable", "pod", klog.KObj(pod), "modifiedPod", klog.KObj(modifiedPod))
283 return framework.QueueSkip, nil
284 }
285
286
287
288
289 if !f.isResourceScaleDown(pod, originalPod, modifiedPod) {
290 if loggerV := logger.V(10); loggerV.Enabled() {
291
292 loggerV.Info("another Pod got modified, but the modification isn't related to the resource request", "pod", klog.KObj(pod), "modifiedPod", klog.KObj(modifiedPod), "diff", cmp.Diff(originalPod, modifiedPod))
293 } else {
294 logger.V(5).Info("another Pod got modified, but the modification isn't related to the resource request", "pod", klog.KObj(pod), "modifiedPod", klog.KObj(modifiedPod))
295 }
296 return framework.QueueSkip, nil
297 }
298
299 logger.V(5).Info("the max request resources of another scheduled pod got reduced and it may make the unscheduled pod schedulable", "pod", klog.KObj(pod), "modifiedPod", klog.KObj(modifiedPod))
300 return framework.Queue, nil
301 }
302
303
304
305 func (f *Fit) isResourceScaleDown(targetPod, originalOtherPod, modifiedOtherPod *v1.Pod) bool {
306 if modifiedOtherPod.Spec.NodeName == "" {
307
308 return false
309 }
310
311
312 originalMaxResourceReq, modifiedMaxResourceReq := &framework.Resource{}, &framework.Resource{}
313 originalMaxResourceReq.SetMaxResource(resource.PodRequests(originalOtherPod, resource.PodResourcesOptions{InPlacePodVerticalScalingEnabled: f.enableInPlacePodVerticalScaling}))
314 modifiedMaxResourceReq.SetMaxResource(resource.PodRequests(modifiedOtherPod, resource.PodResourcesOptions{InPlacePodVerticalScalingEnabled: f.enableInPlacePodVerticalScaling}))
315
316
317 podRequests := resource.PodRequests(targetPod, resource.PodResourcesOptions{InPlacePodVerticalScalingEnabled: f.enableInPlacePodVerticalScaling})
318 for rName, rValue := range podRequests {
319 if rValue.IsZero() {
320
321 continue
322 }
323 switch rName {
324 case v1.ResourceCPU:
325 if originalMaxResourceReq.MilliCPU > modifiedMaxResourceReq.MilliCPU {
326 return true
327 }
328 case v1.ResourceMemory:
329 if originalMaxResourceReq.Memory > modifiedMaxResourceReq.Memory {
330 return true
331 }
332 case v1.ResourceEphemeralStorage:
333 if originalMaxResourceReq.EphemeralStorage > modifiedMaxResourceReq.EphemeralStorage {
334 return true
335 }
336 default:
337 if schedutil.IsScalarResourceName(rName) && originalMaxResourceReq.ScalarResources[rName] > modifiedMaxResourceReq.ScalarResources[rName] {
338 return true
339 }
340 }
341 }
342 return false
343 }
344
345
346
347 func (f *Fit) isSchedulableAfterNodeChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
348 _, modifiedNode, err := schedutil.As[*v1.Node](oldObj, newObj)
349 if err != nil {
350 return framework.Queue, err
351 }
352
353
354 if isFit(pod, modifiedNode) {
355 logger.V(5).Info("node was updated, and may fit with the pod's resource requestments", "pod", klog.KObj(pod), "node", klog.KObj(modifiedNode))
356 return framework.Queue, nil
357 }
358
359 logger.V(5).Info("node was created or updated, but it doesn't have enough resource(s) to accommodate this pod", "pod", klog.KObj(pod), "node", klog.KObj(modifiedNode))
360 return framework.QueueSkip, nil
361 }
362
363
364
365 func isFit(pod *v1.Pod, node *v1.Node) bool {
366 if node == nil {
367 return false
368 }
369 nodeInfo := framework.NewNodeInfo()
370 nodeInfo.SetNode(node)
371 return len(Fits(pod, nodeInfo)) == 0
372 }
373
374
375
376
377 func (f *Fit) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
378 s, err := getPreFilterState(cycleState)
379 if err != nil {
380 return framework.AsStatus(err)
381 }
382
383 insufficientResources := fitsRequest(s, nodeInfo, f.ignoredResources, f.ignoredResourceGroups)
384
385 if len(insufficientResources) != 0 {
386
387 failureReasons := make([]string, 0, len(insufficientResources))
388 for i := range insufficientResources {
389 failureReasons = append(failureReasons, insufficientResources[i].Reason)
390 }
391 return framework.NewStatus(framework.Unschedulable, failureReasons...)
392 }
393 return nil
394 }
395
396 func hasRestartableInitContainer(pod *v1.Pod) bool {
397 for _, c := range pod.Spec.InitContainers {
398 if c.RestartPolicy != nil && *c.RestartPolicy == v1.ContainerRestartPolicyAlways {
399 return true
400 }
401 }
402 return false
403 }
404
405
406 type InsufficientResource struct {
407 ResourceName v1.ResourceName
408
409
410 Reason string
411 Requested int64
412 Used int64
413 Capacity int64
414 }
415
416
417 func Fits(pod *v1.Pod, nodeInfo *framework.NodeInfo) []InsufficientResource {
418 return fitsRequest(computePodResourceRequest(pod), nodeInfo, nil, nil)
419 }
420
421 func fitsRequest(podRequest *preFilterState, nodeInfo *framework.NodeInfo, ignoredExtendedResources, ignoredResourceGroups sets.Set[string]) []InsufficientResource {
422 insufficientResources := make([]InsufficientResource, 0, 4)
423
424 allowedPodNumber := nodeInfo.Allocatable.AllowedPodNumber
425 if len(nodeInfo.Pods)+1 > allowedPodNumber {
426 insufficientResources = append(insufficientResources, InsufficientResource{
427 ResourceName: v1.ResourcePods,
428 Reason: "Too many pods",
429 Requested: 1,
430 Used: int64(len(nodeInfo.Pods)),
431 Capacity: int64(allowedPodNumber),
432 })
433 }
434
435 if podRequest.MilliCPU == 0 &&
436 podRequest.Memory == 0 &&
437 podRequest.EphemeralStorage == 0 &&
438 len(podRequest.ScalarResources) == 0 {
439 return insufficientResources
440 }
441
442 if podRequest.MilliCPU > 0 && podRequest.MilliCPU > (nodeInfo.Allocatable.MilliCPU-nodeInfo.Requested.MilliCPU) {
443 insufficientResources = append(insufficientResources, InsufficientResource{
444 ResourceName: v1.ResourceCPU,
445 Reason: "Insufficient cpu",
446 Requested: podRequest.MilliCPU,
447 Used: nodeInfo.Requested.MilliCPU,
448 Capacity: nodeInfo.Allocatable.MilliCPU,
449 })
450 }
451 if podRequest.Memory > 0 && podRequest.Memory > (nodeInfo.Allocatable.Memory-nodeInfo.Requested.Memory) {
452 insufficientResources = append(insufficientResources, InsufficientResource{
453 ResourceName: v1.ResourceMemory,
454 Reason: "Insufficient memory",
455 Requested: podRequest.Memory,
456 Used: nodeInfo.Requested.Memory,
457 Capacity: nodeInfo.Allocatable.Memory,
458 })
459 }
460 if podRequest.EphemeralStorage > 0 &&
461 podRequest.EphemeralStorage > (nodeInfo.Allocatable.EphemeralStorage-nodeInfo.Requested.EphemeralStorage) {
462 insufficientResources = append(insufficientResources, InsufficientResource{
463 ResourceName: v1.ResourceEphemeralStorage,
464 Reason: "Insufficient ephemeral-storage",
465 Requested: podRequest.EphemeralStorage,
466 Used: nodeInfo.Requested.EphemeralStorage,
467 Capacity: nodeInfo.Allocatable.EphemeralStorage,
468 })
469 }
470
471 for rName, rQuant := range podRequest.ScalarResources {
472
473 if rQuant == 0 {
474 continue
475 }
476
477 if v1helper.IsExtendedResourceName(rName) {
478
479
480 var rNamePrefix string
481 if ignoredResourceGroups.Len() > 0 {
482 rNamePrefix = strings.Split(string(rName), "/")[0]
483 }
484 if ignoredExtendedResources.Has(string(rName)) || ignoredResourceGroups.Has(rNamePrefix) {
485 continue
486 }
487 }
488
489 if rQuant > (nodeInfo.Allocatable.ScalarResources[rName] - nodeInfo.Requested.ScalarResources[rName]) {
490 insufficientResources = append(insufficientResources, InsufficientResource{
491 ResourceName: rName,
492 Reason: fmt.Sprintf("Insufficient %v", rName),
493 Requested: podRequest.ScalarResources[rName],
494 Used: nodeInfo.Requested.ScalarResources[rName],
495 Capacity: nodeInfo.Allocatable.ScalarResources[rName],
496 })
497 }
498 }
499
500 return insufficientResources
501 }
502
503
504 func (f *Fit) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
505 nodeInfo, err := f.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
506 if err != nil {
507 return 0, framework.AsStatus(fmt.Errorf("getting node %q from Snapshot: %w", nodeName, err))
508 }
509
510 s, err := getPreScoreState(state)
511 if err != nil {
512 s = &preScoreState{
513 podRequests: f.calculatePodResourceRequestList(pod, f.resources),
514 }
515 }
516
517 return f.score(ctx, pod, nodeInfo, s.podRequests)
518 }
519
View as plain text