1
16
17 package podtopologyspread
18
19 import (
20 v1 "k8s.io/api/core/v1"
21 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
22 "k8s.io/apimachinery/pkg/labels"
23 v1helper "k8s.io/component-helpers/scheduling/corev1"
24 "k8s.io/component-helpers/scheduling/corev1/nodeaffinity"
25 "k8s.io/kubernetes/pkg/scheduler/framework"
26 "k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper"
27 "k8s.io/utils/ptr"
28 )
29
30 type topologyPair struct {
31 key string
32 value string
33 }
34
35
36
37
38 type topologySpreadConstraint struct {
39 MaxSkew int32
40 TopologyKey string
41 Selector labels.Selector
42 MinDomains int32
43 NodeAffinityPolicy v1.NodeInclusionPolicy
44 NodeTaintsPolicy v1.NodeInclusionPolicy
45 }
46
47 func (tsc *topologySpreadConstraint) matchNodeInclusionPolicies(pod *v1.Pod, node *v1.Node, require nodeaffinity.RequiredNodeAffinity) bool {
48 if tsc.NodeAffinityPolicy == v1.NodeInclusionPolicyHonor {
49
50 if match, _ := require.Match(node); !match {
51 return false
52 }
53 }
54
55 if tsc.NodeTaintsPolicy == v1.NodeInclusionPolicyHonor {
56 if _, untolerated := v1helper.FindMatchingUntoleratedTaint(node.Spec.Taints, pod.Spec.Tolerations, helper.DoNotScheduleTaintsFilterFunc()); untolerated {
57 return false
58 }
59 }
60 return true
61 }
62
63
64
65
66 func (pl *PodTopologySpread) buildDefaultConstraints(p *v1.Pod, action v1.UnsatisfiableConstraintAction) ([]topologySpreadConstraint, error) {
67 constraints, err := pl.filterTopologySpreadConstraints(pl.defaultConstraints, p.Labels, action)
68 if err != nil || len(constraints) == 0 {
69 return nil, err
70 }
71 selector := helper.DefaultSelector(p, pl.services, pl.replicationCtrls, pl.replicaSets, pl.statefulSets)
72 if selector.Empty() {
73 return nil, nil
74 }
75 for i := range constraints {
76 constraints[i].Selector = selector
77 }
78 return constraints, nil
79 }
80
81
82 func nodeLabelsMatchSpreadConstraints(nodeLabels map[string]string, constraints []topologySpreadConstraint) bool {
83 for _, c := range constraints {
84 if _, ok := nodeLabels[c.TopologyKey]; !ok {
85 return false
86 }
87 }
88 return true
89 }
90
91 func (pl *PodTopologySpread) filterTopologySpreadConstraints(constraints []v1.TopologySpreadConstraint, podLabels map[string]string, action v1.UnsatisfiableConstraintAction) ([]topologySpreadConstraint, error) {
92 var result []topologySpreadConstraint
93 for _, c := range constraints {
94 if c.WhenUnsatisfiable == action {
95 selector, err := metav1.LabelSelectorAsSelector(c.LabelSelector)
96 if err != nil {
97 return nil, err
98 }
99
100 if pl.enableMatchLabelKeysInPodTopologySpread && len(c.MatchLabelKeys) > 0 {
101 matchLabels := make(labels.Set)
102 for _, labelKey := range c.MatchLabelKeys {
103 if value, ok := podLabels[labelKey]; ok {
104 matchLabels[labelKey] = value
105 }
106 }
107 if len(matchLabels) > 0 {
108 selector = mergeLabelSetWithSelector(matchLabels, selector)
109 }
110 }
111
112 tsc := topologySpreadConstraint{
113 MaxSkew: c.MaxSkew,
114 TopologyKey: c.TopologyKey,
115 Selector: selector,
116 MinDomains: ptr.Deref(c.MinDomains, 1),
117 NodeAffinityPolicy: v1.NodeInclusionPolicyHonor,
118 NodeTaintsPolicy: v1.NodeInclusionPolicyIgnore,
119 }
120 if pl.enableNodeInclusionPolicyInPodTopologySpread {
121 if c.NodeAffinityPolicy != nil {
122 tsc.NodeAffinityPolicy = *c.NodeAffinityPolicy
123 }
124 if c.NodeTaintsPolicy != nil {
125 tsc.NodeTaintsPolicy = *c.NodeTaintsPolicy
126 }
127 }
128 result = append(result, tsc)
129 }
130 }
131 return result, nil
132 }
133
134 func mergeLabelSetWithSelector(matchLabels labels.Set, s labels.Selector) labels.Selector {
135 mergedSelector := labels.SelectorFromSet(matchLabels)
136
137 requirements, ok := s.Requirements()
138 if !ok {
139 return s
140 }
141
142 for _, r := range requirements {
143 mergedSelector = mergedSelector.Add(r)
144 }
145
146 return mergedSelector
147 }
148
149 func countPodsMatchSelector(podInfos []*framework.PodInfo, selector labels.Selector, ns string) int {
150 if selector.Empty() {
151 return 0
152 }
153 count := 0
154 for _, p := range podInfos {
155
156 if p.Pod.DeletionTimestamp != nil || p.Pod.Namespace != ns {
157 continue
158 }
159 if selector.Matches(labels.Set(p.Pod.Labels)) {
160 count++
161 }
162 }
163 return count
164 }
165
166
167 func podLabelsMatchSpreadConstraints(constraints []topologySpreadConstraint, labels labels.Set) bool {
168 for _, c := range constraints {
169 if c.Selector.Matches(labels) {
170 return true
171 }
172 }
173 return false
174 }
175
View as plain text