1
16
17 package framework
18
19 import (
20 "errors"
21 "fmt"
22 "sort"
23 "strings"
24 "sync/atomic"
25 "time"
26
27 v1 "k8s.io/api/core/v1"
28 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29 "k8s.io/apimachinery/pkg/labels"
30 utilerrors "k8s.io/apimachinery/pkg/util/errors"
31 "k8s.io/apimachinery/pkg/util/sets"
32 utilfeature "k8s.io/apiserver/pkg/util/feature"
33 "k8s.io/klog/v2"
34
35 podutil "k8s.io/kubernetes/pkg/api/v1/pod"
36 resourcehelper "k8s.io/kubernetes/pkg/api/v1/resource"
37 "k8s.io/kubernetes/pkg/features"
38 schedutil "k8s.io/kubernetes/pkg/scheduler/util"
39 )
40
41 var generation int64
42
43
44
45 type ActionType int64
46
47
48 const (
49 Add ActionType = 1 << iota
50 Delete
51
52 UpdateNodeAllocatable
53 UpdateNodeLabel
54 UpdateNodeTaint
55 UpdateNodeCondition
56 UpdateNodeAnnotation
57
58 All ActionType = 1<<iota - 1
59
60
61 Update = UpdateNodeAllocatable | UpdateNodeLabel | UpdateNodeTaint | UpdateNodeCondition | UpdateNodeAnnotation
62 )
63
64
65 type GVK string
66
67
68 const (
69
70
71
72
73
74
75 Pod GVK = "Pod"
76
77
78
79
80
81
82
83
84
85 Node GVK = "Node"
86 PersistentVolume GVK = "PersistentVolume"
87 PersistentVolumeClaim GVK = "PersistentVolumeClaim"
88 CSINode GVK = "storage.k8s.io/CSINode"
89 CSIDriver GVK = "storage.k8s.io/CSIDriver"
90 CSIStorageCapacity GVK = "storage.k8s.io/CSIStorageCapacity"
91 StorageClass GVK = "storage.k8s.io/StorageClass"
92 PodSchedulingContext GVK = "PodSchedulingContext"
93 ResourceClaim GVK = "ResourceClaim"
94 ResourceClass GVK = "ResourceClass"
95 ResourceClaimParameters GVK = "ResourceClaimParameters"
96 ResourceClassParameters GVK = "ResourceClassParameters"
97
98
99
100
101
102
103
104
105 WildCard GVK = "*"
106 )
107
108 type ClusterEventWithHint struct {
109 Event ClusterEvent
110
111
112
113
114
115 QueueingHintFn QueueingHintFn
116 }
117
118
119
120
121
122
123
124
125
126
127
128
129 type QueueingHintFn func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (QueueingHint, error)
130
131 type QueueingHint int
132
133 const (
134
135
136 QueueSkip QueueingHint = iota
137
138
139 Queue
140 )
141
142 func (s QueueingHint) String() string {
143 switch s {
144 case QueueSkip:
145 return "QueueSkip"
146 case Queue:
147 return "Queue"
148 }
149 return ""
150 }
151
152
153
154
155 type ClusterEvent struct {
156 Resource GVK
157 ActionType ActionType
158 Label string
159 }
160
161
162 func (ce ClusterEvent) IsWildCard() bool {
163 return ce.Resource == WildCard && ce.ActionType == All
164 }
165
166
167
168
169
170
171
172
173 func (ce ClusterEvent) Match(event ClusterEvent) bool {
174 return ce.IsWildCard() || (ce.Resource == WildCard || ce.Resource == event.Resource) && ce.ActionType&event.ActionType != 0
175 }
176
177 func UnrollWildCardResource() []ClusterEventWithHint {
178 return []ClusterEventWithHint{
179 {Event: ClusterEvent{Resource: Pod, ActionType: All}},
180 {Event: ClusterEvent{Resource: Node, ActionType: All}},
181 {Event: ClusterEvent{Resource: PersistentVolume, ActionType: All}},
182 {Event: ClusterEvent{Resource: PersistentVolumeClaim, ActionType: All}},
183 {Event: ClusterEvent{Resource: CSINode, ActionType: All}},
184 {Event: ClusterEvent{Resource: CSIDriver, ActionType: All}},
185 {Event: ClusterEvent{Resource: CSIStorageCapacity, ActionType: All}},
186 {Event: ClusterEvent{Resource: StorageClass, ActionType: All}},
187 {Event: ClusterEvent{Resource: PodSchedulingContext, ActionType: All}},
188 {Event: ClusterEvent{Resource: ResourceClaim, ActionType: All}},
189 {Event: ClusterEvent{Resource: ResourceClass, ActionType: All}},
190 {Event: ClusterEvent{Resource: ResourceClaimParameters, ActionType: All}},
191 {Event: ClusterEvent{Resource: ResourceClassParameters, ActionType: All}},
192 }
193 }
194
195
196
197
198 type QueuedPodInfo struct {
199 *PodInfo
200
201 Timestamp time.Time
202
203
204 Attempts int
205
206
207
208
209 InitialAttemptTimestamp *time.Time
210
211
212 UnschedulablePlugins sets.Set[string]
213
214 PendingPlugins sets.Set[string]
215
216 Gated bool
217 }
218
219
220 func (pqi *QueuedPodInfo) DeepCopy() *QueuedPodInfo {
221 return &QueuedPodInfo{
222 PodInfo: pqi.PodInfo.DeepCopy(),
223 Timestamp: pqi.Timestamp,
224 Attempts: pqi.Attempts,
225 InitialAttemptTimestamp: pqi.InitialAttemptTimestamp,
226 UnschedulablePlugins: pqi.UnschedulablePlugins.Clone(),
227 Gated: pqi.Gated,
228 }
229 }
230
231
232
233
234 type PodInfo struct {
235 Pod *v1.Pod
236 RequiredAffinityTerms []AffinityTerm
237 RequiredAntiAffinityTerms []AffinityTerm
238 PreferredAffinityTerms []WeightedAffinityTerm
239 PreferredAntiAffinityTerms []WeightedAffinityTerm
240 }
241
242
243 func (pi *PodInfo) DeepCopy() *PodInfo {
244 return &PodInfo{
245 Pod: pi.Pod.DeepCopy(),
246 RequiredAffinityTerms: pi.RequiredAffinityTerms,
247 RequiredAntiAffinityTerms: pi.RequiredAntiAffinityTerms,
248 PreferredAffinityTerms: pi.PreferredAffinityTerms,
249 PreferredAntiAffinityTerms: pi.PreferredAntiAffinityTerms,
250 }
251 }
252
253
254
255 func (pi *PodInfo) Update(pod *v1.Pod) error {
256 if pod != nil && pi.Pod != nil && pi.Pod.UID == pod.UID {
257
258
259 pi.Pod = pod
260 return nil
261 }
262 var preferredAffinityTerms []v1.WeightedPodAffinityTerm
263 var preferredAntiAffinityTerms []v1.WeightedPodAffinityTerm
264 if affinity := pod.Spec.Affinity; affinity != nil {
265 if a := affinity.PodAffinity; a != nil {
266 preferredAffinityTerms = a.PreferredDuringSchedulingIgnoredDuringExecution
267 }
268 if a := affinity.PodAntiAffinity; a != nil {
269 preferredAntiAffinityTerms = a.PreferredDuringSchedulingIgnoredDuringExecution
270 }
271 }
272
273
274 var parseErrs []error
275 requiredAffinityTerms, err := getAffinityTerms(pod, getPodAffinityTerms(pod.Spec.Affinity))
276 if err != nil {
277 parseErrs = append(parseErrs, fmt.Errorf("requiredAffinityTerms: %w", err))
278 }
279 requiredAntiAffinityTerms, err := getAffinityTerms(pod,
280 getPodAntiAffinityTerms(pod.Spec.Affinity))
281 if err != nil {
282 parseErrs = append(parseErrs, fmt.Errorf("requiredAntiAffinityTerms: %w", err))
283 }
284 weightedAffinityTerms, err := getWeightedAffinityTerms(pod, preferredAffinityTerms)
285 if err != nil {
286 parseErrs = append(parseErrs, fmt.Errorf("preferredAffinityTerms: %w", err))
287 }
288 weightedAntiAffinityTerms, err := getWeightedAffinityTerms(pod, preferredAntiAffinityTerms)
289 if err != nil {
290 parseErrs = append(parseErrs, fmt.Errorf("preferredAntiAffinityTerms: %w", err))
291 }
292
293 pi.Pod = pod
294 pi.RequiredAffinityTerms = requiredAffinityTerms
295 pi.RequiredAntiAffinityTerms = requiredAntiAffinityTerms
296 pi.PreferredAffinityTerms = weightedAffinityTerms
297 pi.PreferredAntiAffinityTerms = weightedAntiAffinityTerms
298 return utilerrors.NewAggregate(parseErrs)
299 }
300
301
302 type AffinityTerm struct {
303 Namespaces sets.Set[string]
304 Selector labels.Selector
305 TopologyKey string
306 NamespaceSelector labels.Selector
307 }
308
309
310 func (at *AffinityTerm) Matches(pod *v1.Pod, nsLabels labels.Set) bool {
311 if at.Namespaces.Has(pod.Namespace) || at.NamespaceSelector.Matches(nsLabels) {
312 return at.Selector.Matches(labels.Set(pod.Labels))
313 }
314 return false
315 }
316
317
318 type WeightedAffinityTerm struct {
319 AffinityTerm
320 Weight int32
321 }
322
323
324 const ExtenderName = "Extender"
325
326
327 type Diagnosis struct {
328
329
330
331 NodeToStatusMap NodeToStatusMap
332
333 UnschedulablePlugins sets.Set[string]
334
335 PendingPlugins sets.Set[string]
336
337 PreFilterMsg string
338
339 PostFilterMsg string
340 }
341
342
343 type FitError struct {
344 Pod *v1.Pod
345 NumAllNodes int
346 Diagnosis Diagnosis
347 }
348
349 const (
350
351 NoNodeAvailableMsg = "0/%v nodes are available"
352 )
353
354 func (d *Diagnosis) AddPluginStatus(sts *Status) {
355 if sts.Plugin() == "" {
356 return
357 }
358 if sts.IsRejected() {
359 if d.UnschedulablePlugins == nil {
360 d.UnschedulablePlugins = sets.New[string]()
361 }
362 d.UnschedulablePlugins.Insert(sts.Plugin())
363 }
364 if sts.Code() == Pending {
365 if d.PendingPlugins == nil {
366 d.PendingPlugins = sets.New[string]()
367 }
368 d.PendingPlugins.Insert(sts.Plugin())
369 }
370 }
371
372
373
374 func (f *FitError) Error() string {
375 reasonMsg := fmt.Sprintf(NoNodeAvailableMsg+":", f.NumAllNodes)
376 preFilterMsg := f.Diagnosis.PreFilterMsg
377 if preFilterMsg != "" {
378
379
380 reasonMsg += fmt.Sprintf(" %v.", preFilterMsg)
381 }
382
383 if preFilterMsg == "" {
384
385
386
387
388
389
390 reasons := make(map[string]int)
391 for _, status := range f.Diagnosis.NodeToStatusMap {
392 for _, reason := range status.Reasons() {
393 reasons[reason]++
394 }
395 }
396
397 sortReasonsHistogram := func() []string {
398 var reasonStrings []string
399 for k, v := range reasons {
400 reasonStrings = append(reasonStrings, fmt.Sprintf("%v %v", v, k))
401 }
402 sort.Strings(reasonStrings)
403 return reasonStrings
404 }
405 sortedFilterMsg := sortReasonsHistogram()
406 if len(sortedFilterMsg) != 0 {
407 reasonMsg += fmt.Sprintf(" %v.", strings.Join(sortedFilterMsg, ", "))
408 }
409 }
410
411
412
413
414 postFilterMsg := f.Diagnosis.PostFilterMsg
415 if postFilterMsg != "" {
416 reasonMsg += fmt.Sprintf(" %v", postFilterMsg)
417 }
418 return reasonMsg
419 }
420
421 func newAffinityTerm(pod *v1.Pod, term *v1.PodAffinityTerm) (*AffinityTerm, error) {
422 selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
423 if err != nil {
424 return nil, err
425 }
426
427 namespaces := getNamespacesFromPodAffinityTerm(pod, term)
428 nsSelector, err := metav1.LabelSelectorAsSelector(term.NamespaceSelector)
429 if err != nil {
430 return nil, err
431 }
432
433 return &AffinityTerm{Namespaces: namespaces, Selector: selector, TopologyKey: term.TopologyKey, NamespaceSelector: nsSelector}, nil
434 }
435
436
437
438 func getAffinityTerms(pod *v1.Pod, v1Terms []v1.PodAffinityTerm) ([]AffinityTerm, error) {
439 if v1Terms == nil {
440 return nil, nil
441 }
442
443 var terms []AffinityTerm
444 for i := range v1Terms {
445 t, err := newAffinityTerm(pod, &v1Terms[i])
446 if err != nil {
447
448 return nil, err
449 }
450 terms = append(terms, *t)
451 }
452 return terms, nil
453 }
454
455
456 func getWeightedAffinityTerms(pod *v1.Pod, v1Terms []v1.WeightedPodAffinityTerm) ([]WeightedAffinityTerm, error) {
457 if v1Terms == nil {
458 return nil, nil
459 }
460
461 var terms []WeightedAffinityTerm
462 for i := range v1Terms {
463 t, err := newAffinityTerm(pod, &v1Terms[i].PodAffinityTerm)
464 if err != nil {
465
466 return nil, err
467 }
468 terms = append(terms, WeightedAffinityTerm{AffinityTerm: *t, Weight: v1Terms[i].Weight})
469 }
470 return terms, nil
471 }
472
473
474 func NewPodInfo(pod *v1.Pod) (*PodInfo, error) {
475 pInfo := &PodInfo{}
476 err := pInfo.Update(pod)
477 return pInfo, err
478 }
479
480 func getPodAffinityTerms(affinity *v1.Affinity) (terms []v1.PodAffinityTerm) {
481 if affinity != nil && affinity.PodAffinity != nil {
482 if len(affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution) != 0 {
483 terms = affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution
484 }
485
486
487
488
489 }
490 return terms
491 }
492
493 func getPodAntiAffinityTerms(affinity *v1.Affinity) (terms []v1.PodAffinityTerm) {
494 if affinity != nil && affinity.PodAntiAffinity != nil {
495 if len(affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution) != 0 {
496 terms = affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution
497 }
498
499
500
501
502 }
503 return terms
504 }
505
506
507
508 func getNamespacesFromPodAffinityTerm(pod *v1.Pod, podAffinityTerm *v1.PodAffinityTerm) sets.Set[string] {
509 names := sets.Set[string]{}
510 if len(podAffinityTerm.Namespaces) == 0 && podAffinityTerm.NamespaceSelector == nil {
511 names.Insert(pod.Namespace)
512 } else {
513 names.Insert(podAffinityTerm.Namespaces...)
514 }
515 return names
516 }
517
518
519 type ImageStateSummary struct {
520
521 Size int64
522
523
524 NumNodes int
525
526
527 Nodes sets.Set[string]
528 }
529
530
531 func (iss *ImageStateSummary) Snapshot() *ImageStateSummary {
532 return &ImageStateSummary{
533 Size: iss.Size,
534 NumNodes: iss.Nodes.Len(),
535 }
536 }
537
538
539 type NodeInfo struct {
540
541 node *v1.Node
542
543
544 Pods []*PodInfo
545
546
547 PodsWithAffinity []*PodInfo
548
549
550 PodsWithRequiredAntiAffinity []*PodInfo
551
552
553 UsedPorts HostPortInfo
554
555
556
557 Requested *Resource
558
559
560
561
562 NonZeroRequested *Resource
563
564
565 Allocatable *Resource
566
567
568
569
570 ImageStates map[string]*ImageStateSummary
571
572
573
574 PVCRefCounts map[string]int
575
576
577
578 Generation int64
579 }
580
581
582
583
584
585 func nextGeneration() int64 {
586 return atomic.AddInt64(&generation, 1)
587 }
588
589
590 type Resource struct {
591 MilliCPU int64
592 Memory int64
593 EphemeralStorage int64
594
595
596 AllowedPodNumber int
597
598 ScalarResources map[v1.ResourceName]int64
599 }
600
601
602 func NewResource(rl v1.ResourceList) *Resource {
603 r := &Resource{}
604 r.Add(rl)
605 return r
606 }
607
608
609 func (r *Resource) Add(rl v1.ResourceList) {
610 if r == nil {
611 return
612 }
613
614 for rName, rQuant := range rl {
615 switch rName {
616 case v1.ResourceCPU:
617 r.MilliCPU += rQuant.MilliValue()
618 case v1.ResourceMemory:
619 r.Memory += rQuant.Value()
620 case v1.ResourcePods:
621 r.AllowedPodNumber += int(rQuant.Value())
622 case v1.ResourceEphemeralStorage:
623 r.EphemeralStorage += rQuant.Value()
624 default:
625 if schedutil.IsScalarResourceName(rName) {
626 r.AddScalar(rName, rQuant.Value())
627 }
628 }
629 }
630 }
631
632
633 func (r *Resource) Clone() *Resource {
634 res := &Resource{
635 MilliCPU: r.MilliCPU,
636 Memory: r.Memory,
637 AllowedPodNumber: r.AllowedPodNumber,
638 EphemeralStorage: r.EphemeralStorage,
639 }
640 if r.ScalarResources != nil {
641 res.ScalarResources = make(map[v1.ResourceName]int64, len(r.ScalarResources))
642 for k, v := range r.ScalarResources {
643 res.ScalarResources[k] = v
644 }
645 }
646 return res
647 }
648
649
650 func (r *Resource) AddScalar(name v1.ResourceName, quantity int64) {
651 r.SetScalar(name, r.ScalarResources[name]+quantity)
652 }
653
654
655 func (r *Resource) SetScalar(name v1.ResourceName, quantity int64) {
656
657 if r.ScalarResources == nil {
658 r.ScalarResources = map[v1.ResourceName]int64{}
659 }
660 r.ScalarResources[name] = quantity
661 }
662
663
664 func (r *Resource) SetMaxResource(rl v1.ResourceList) {
665 if r == nil {
666 return
667 }
668
669 for rName, rQuantity := range rl {
670 switch rName {
671 case v1.ResourceMemory:
672 r.Memory = max(r.Memory, rQuantity.Value())
673 case v1.ResourceCPU:
674 r.MilliCPU = max(r.MilliCPU, rQuantity.MilliValue())
675 case v1.ResourceEphemeralStorage:
676 r.EphemeralStorage = max(r.EphemeralStorage, rQuantity.Value())
677 default:
678 if schedutil.IsScalarResourceName(rName) {
679 r.SetScalar(rName, max(r.ScalarResources[rName], rQuantity.Value()))
680 }
681 }
682 }
683 }
684
685
686
687
688 func NewNodeInfo(pods ...*v1.Pod) *NodeInfo {
689 ni := &NodeInfo{
690 Requested: &Resource{},
691 NonZeroRequested: &Resource{},
692 Allocatable: &Resource{},
693 Generation: nextGeneration(),
694 UsedPorts: make(HostPortInfo),
695 ImageStates: make(map[string]*ImageStateSummary),
696 PVCRefCounts: make(map[string]int),
697 }
698 for _, pod := range pods {
699 ni.AddPod(pod)
700 }
701 return ni
702 }
703
704
705 func (n *NodeInfo) Node() *v1.Node {
706 if n == nil {
707 return nil
708 }
709 return n.node
710 }
711
712
713 func (n *NodeInfo) Snapshot() *NodeInfo {
714 clone := &NodeInfo{
715 node: n.node,
716 Requested: n.Requested.Clone(),
717 NonZeroRequested: n.NonZeroRequested.Clone(),
718 Allocatable: n.Allocatable.Clone(),
719 UsedPorts: make(HostPortInfo),
720 ImageStates: make(map[string]*ImageStateSummary),
721 PVCRefCounts: make(map[string]int),
722 Generation: n.Generation,
723 }
724 if len(n.Pods) > 0 {
725 clone.Pods = append([]*PodInfo(nil), n.Pods...)
726 }
727 if len(n.UsedPorts) > 0 {
728
729
730 for ip, portMap := range n.UsedPorts {
731 clone.UsedPorts[ip] = make(map[ProtocolPort]struct{})
732 for protocolPort, v := range portMap {
733 clone.UsedPorts[ip][protocolPort] = v
734 }
735 }
736 }
737 if len(n.PodsWithAffinity) > 0 {
738 clone.PodsWithAffinity = append([]*PodInfo(nil), n.PodsWithAffinity...)
739 }
740 if len(n.PodsWithRequiredAntiAffinity) > 0 {
741 clone.PodsWithRequiredAntiAffinity = append([]*PodInfo(nil), n.PodsWithRequiredAntiAffinity...)
742 }
743 if len(n.ImageStates) > 0 {
744 state := make(map[string]*ImageStateSummary, len(n.ImageStates))
745 for imageName, imageState := range n.ImageStates {
746 state[imageName] = imageState.Snapshot()
747 }
748 clone.ImageStates = state
749 }
750 for key, value := range n.PVCRefCounts {
751 clone.PVCRefCounts[key] = value
752 }
753 return clone
754 }
755
756
757 func (n *NodeInfo) String() string {
758 podKeys := make([]string, len(n.Pods))
759 for i, p := range n.Pods {
760 podKeys[i] = p.Pod.Name
761 }
762 return fmt.Sprintf("&NodeInfo{Pods:%v, RequestedResource:%#v, NonZeroRequest: %#v, UsedPort: %#v, AllocatableResource:%#v}",
763 podKeys, n.Requested, n.NonZeroRequested, n.UsedPorts, n.Allocatable)
764 }
765
766
767
768 func (n *NodeInfo) AddPodInfo(podInfo *PodInfo) {
769 n.Pods = append(n.Pods, podInfo)
770 if podWithAffinity(podInfo.Pod) {
771 n.PodsWithAffinity = append(n.PodsWithAffinity, podInfo)
772 }
773 if podWithRequiredAntiAffinity(podInfo.Pod) {
774 n.PodsWithRequiredAntiAffinity = append(n.PodsWithRequiredAntiAffinity, podInfo)
775 }
776 n.update(podInfo.Pod, 1)
777 }
778
779
780 func (n *NodeInfo) AddPod(pod *v1.Pod) {
781
782
783 podInfo, _ := NewPodInfo(pod)
784 n.AddPodInfo(podInfo)
785 }
786
787 func podWithAffinity(p *v1.Pod) bool {
788 affinity := p.Spec.Affinity
789 return affinity != nil && (affinity.PodAffinity != nil || affinity.PodAntiAffinity != nil)
790 }
791
792 func podWithRequiredAntiAffinity(p *v1.Pod) bool {
793 affinity := p.Spec.Affinity
794 return affinity != nil && affinity.PodAntiAffinity != nil &&
795 len(affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution) != 0
796 }
797
798 func removeFromSlice(logger klog.Logger, s []*PodInfo, k string) ([]*PodInfo, bool) {
799 var removed bool
800 for i := range s {
801 tmpKey, err := GetPodKey(s[i].Pod)
802 if err != nil {
803 logger.Error(err, "Cannot get pod key", "pod", klog.KObj(s[i].Pod))
804 continue
805 }
806 if k == tmpKey {
807
808 s[i] = s[len(s)-1]
809 s = s[:len(s)-1]
810 removed = true
811 break
812 }
813 }
814
815 if len(s) == 0 {
816 return nil, removed
817 }
818 return s, removed
819 }
820
821
822 func (n *NodeInfo) RemovePod(logger klog.Logger, pod *v1.Pod) error {
823 k, err := GetPodKey(pod)
824 if err != nil {
825 return err
826 }
827 if podWithAffinity(pod) {
828 n.PodsWithAffinity, _ = removeFromSlice(logger, n.PodsWithAffinity, k)
829 }
830 if podWithRequiredAntiAffinity(pod) {
831 n.PodsWithRequiredAntiAffinity, _ = removeFromSlice(logger, n.PodsWithRequiredAntiAffinity, k)
832 }
833
834 var removed bool
835 if n.Pods, removed = removeFromSlice(logger, n.Pods, k); removed {
836 n.update(pod, -1)
837 return nil
838 }
839 return fmt.Errorf("no corresponding pod %s in pods of node %s", pod.Name, n.node.Name)
840 }
841
842
843
844 func (n *NodeInfo) update(pod *v1.Pod, sign int64) {
845 res, non0CPU, non0Mem := calculateResource(pod)
846 n.Requested.MilliCPU += sign * res.MilliCPU
847 n.Requested.Memory += sign * res.Memory
848 n.Requested.EphemeralStorage += sign * res.EphemeralStorage
849 if n.Requested.ScalarResources == nil && len(res.ScalarResources) > 0 {
850 n.Requested.ScalarResources = map[v1.ResourceName]int64{}
851 }
852 for rName, rQuant := range res.ScalarResources {
853 n.Requested.ScalarResources[rName] += sign * rQuant
854 }
855 n.NonZeroRequested.MilliCPU += sign * non0CPU
856 n.NonZeroRequested.Memory += sign * non0Mem
857
858
859 n.updateUsedPorts(pod, sign > 0)
860 n.updatePVCRefCounts(pod, sign > 0)
861
862 n.Generation = nextGeneration()
863 }
864
865 func calculateResource(pod *v1.Pod) (Resource, int64, int64) {
866 var non0InitCPU, non0InitMem int64
867 var non0CPU, non0Mem int64
868 requests := resourcehelper.PodRequests(pod, resourcehelper.PodResourcesOptions{
869 InPlacePodVerticalScalingEnabled: utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling),
870 ContainerFn: func(requests v1.ResourceList, containerType podutil.ContainerType) {
871 non0CPUReq, non0MemReq := schedutil.GetNonzeroRequests(&requests)
872 switch containerType {
873 case podutil.Containers:
874 non0CPU += non0CPUReq
875 non0Mem += non0MemReq
876 case podutil.InitContainers:
877 non0InitCPU = max(non0InitCPU, non0CPUReq)
878 non0InitMem = max(non0InitMem, non0MemReq)
879 }
880 },
881 })
882
883 non0CPU = max(non0CPU, non0InitCPU)
884 non0Mem = max(non0Mem, non0InitMem)
885
886
887
888 if pod.Spec.Overhead != nil {
889 if _, found := pod.Spec.Overhead[v1.ResourceCPU]; found {
890 non0CPU += pod.Spec.Overhead.Cpu().MilliValue()
891 }
892
893 if _, found := pod.Spec.Overhead[v1.ResourceMemory]; found {
894 non0Mem += pod.Spec.Overhead.Memory().Value()
895 }
896 }
897 var res Resource
898 res.Add(requests)
899 return res, non0CPU, non0Mem
900 }
901
902
903 func (n *NodeInfo) updateUsedPorts(pod *v1.Pod, add bool) {
904 for _, container := range pod.Spec.Containers {
905 for _, podPort := range container.Ports {
906 if add {
907 n.UsedPorts.Add(podPort.HostIP, string(podPort.Protocol), podPort.HostPort)
908 } else {
909 n.UsedPorts.Remove(podPort.HostIP, string(podPort.Protocol), podPort.HostPort)
910 }
911 }
912 }
913 }
914
915
916 func (n *NodeInfo) updatePVCRefCounts(pod *v1.Pod, add bool) {
917 for _, v := range pod.Spec.Volumes {
918 if v.PersistentVolumeClaim == nil {
919 continue
920 }
921
922 key := GetNamespacedName(pod.Namespace, v.PersistentVolumeClaim.ClaimName)
923 if add {
924 n.PVCRefCounts[key] += 1
925 } else {
926 n.PVCRefCounts[key] -= 1
927 if n.PVCRefCounts[key] <= 0 {
928 delete(n.PVCRefCounts, key)
929 }
930 }
931 }
932 }
933
934
935 func (n *NodeInfo) SetNode(node *v1.Node) {
936 n.node = node
937 n.Allocatable = NewResource(node.Status.Allocatable)
938 n.Generation = nextGeneration()
939 }
940
941
942 func (n *NodeInfo) RemoveNode() {
943 n.node = nil
944 n.Generation = nextGeneration()
945 }
946
947
948 func GetPodKey(pod *v1.Pod) (string, error) {
949 uid := string(pod.UID)
950 if len(uid) == 0 {
951 return "", errors.New("cannot get cache key for pod with empty UID")
952 }
953 return uid, nil
954 }
955
956
957 func GetNamespacedName(namespace, name string) string {
958 return fmt.Sprintf("%s/%s", namespace, name)
959 }
960
961
962 const DefaultBindAllHostIP = "0.0.0.0"
963
964
965 type ProtocolPort struct {
966 Protocol string
967 Port int32
968 }
969
970
971 func NewProtocolPort(protocol string, port int32) *ProtocolPort {
972 pp := &ProtocolPort{
973 Protocol: protocol,
974 Port: port,
975 }
976
977 if len(pp.Protocol) == 0 {
978 pp.Protocol = string(v1.ProtocolTCP)
979 }
980
981 return pp
982 }
983
984
985 type HostPortInfo map[string]map[ProtocolPort]struct{}
986
987
988 func (h HostPortInfo) Add(ip, protocol string, port int32) {
989 if port <= 0 {
990 return
991 }
992
993 h.sanitize(&ip, &protocol)
994
995 pp := NewProtocolPort(protocol, port)
996 if _, ok := h[ip]; !ok {
997 h[ip] = map[ProtocolPort]struct{}{
998 *pp: {},
999 }
1000 return
1001 }
1002
1003 h[ip][*pp] = struct{}{}
1004 }
1005
1006
1007 func (h HostPortInfo) Remove(ip, protocol string, port int32) {
1008 if port <= 0 {
1009 return
1010 }
1011
1012 h.sanitize(&ip, &protocol)
1013
1014 pp := NewProtocolPort(protocol, port)
1015 if m, ok := h[ip]; ok {
1016 delete(m, *pp)
1017 if len(h[ip]) == 0 {
1018 delete(h, ip)
1019 }
1020 }
1021 }
1022
1023
1024 func (h HostPortInfo) Len() int {
1025 length := 0
1026 for _, m := range h {
1027 length += len(m)
1028 }
1029 return length
1030 }
1031
1032
1033
1034 func (h HostPortInfo) CheckConflict(ip, protocol string, port int32) bool {
1035 if port <= 0 {
1036 return false
1037 }
1038
1039 h.sanitize(&ip, &protocol)
1040
1041 pp := NewProtocolPort(protocol, port)
1042
1043
1044 if ip == DefaultBindAllHostIP {
1045 for _, m := range h {
1046 if _, ok := m[*pp]; ok {
1047 return true
1048 }
1049 }
1050 return false
1051 }
1052
1053
1054 for _, key := range []string{DefaultBindAllHostIP, ip} {
1055 if m, ok := h[key]; ok {
1056 if _, ok2 := m[*pp]; ok2 {
1057 return true
1058 }
1059 }
1060 }
1061
1062 return false
1063 }
1064
1065
1066 func (h HostPortInfo) sanitize(ip, protocol *string) {
1067 if len(*ip) == 0 {
1068 *ip = DefaultBindAllHostIP
1069 }
1070 if len(*protocol) == 0 {
1071 *protocol = string(v1.ProtocolTCP)
1072 }
1073 }
1074
View as plain text