1
16
17 package podtopologyspread
18
19 import (
20 "context"
21 "fmt"
22 "reflect"
23
24 v1 "k8s.io/api/core/v1"
25 "k8s.io/apimachinery/pkg/labels"
26 "k8s.io/apimachinery/pkg/runtime"
27 "k8s.io/client-go/informers"
28 appslisters "k8s.io/client-go/listers/apps/v1"
29 corelisters "k8s.io/client-go/listers/core/v1"
30 "k8s.io/klog/v2"
31 "k8s.io/kubernetes/pkg/scheduler/apis/config"
32 "k8s.io/kubernetes/pkg/scheduler/apis/config/validation"
33 "k8s.io/kubernetes/pkg/scheduler/framework"
34 "k8s.io/kubernetes/pkg/scheduler/framework/parallelize"
35 "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
36 "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
37 "k8s.io/kubernetes/pkg/scheduler/util"
38 )
39
40 const (
41
42 ErrReasonConstraintsNotMatch = "node(s) didn't match pod topology spread constraints"
43
44 ErrReasonNodeLabelNotMatch = ErrReasonConstraintsNotMatch + " (missing required label)"
45 )
46
47 var systemDefaultConstraints = []v1.TopologySpreadConstraint{
48 {
49 TopologyKey: v1.LabelHostname,
50 WhenUnsatisfiable: v1.ScheduleAnyway,
51 MaxSkew: 3,
52 },
53 {
54 TopologyKey: v1.LabelTopologyZone,
55 WhenUnsatisfiable: v1.ScheduleAnyway,
56 MaxSkew: 5,
57 },
58 }
59
60
61 type PodTopologySpread struct {
62 systemDefaulted bool
63 parallelizer parallelize.Parallelizer
64 defaultConstraints []v1.TopologySpreadConstraint
65 sharedLister framework.SharedLister
66 services corelisters.ServiceLister
67 replicationCtrls corelisters.ReplicationControllerLister
68 replicaSets appslisters.ReplicaSetLister
69 statefulSets appslisters.StatefulSetLister
70 enableNodeInclusionPolicyInPodTopologySpread bool
71 enableMatchLabelKeysInPodTopologySpread bool
72 }
73
74 var _ framework.PreFilterPlugin = &PodTopologySpread{}
75 var _ framework.FilterPlugin = &PodTopologySpread{}
76 var _ framework.PreScorePlugin = &PodTopologySpread{}
77 var _ framework.ScorePlugin = &PodTopologySpread{}
78 var _ framework.EnqueueExtensions = &PodTopologySpread{}
79
80
81 const Name = names.PodTopologySpread
82
83
84 func (pl *PodTopologySpread) Name() string {
85 return Name
86 }
87
88
89 func New(_ context.Context, plArgs runtime.Object, h framework.Handle, fts feature.Features) (framework.Plugin, error) {
90 if h.SnapshotSharedLister() == nil {
91 return nil, fmt.Errorf("SnapshotSharedlister is nil")
92 }
93 args, err := getArgs(plArgs)
94 if err != nil {
95 return nil, err
96 }
97 if err := validation.ValidatePodTopologySpreadArgs(nil, &args); err != nil {
98 return nil, err
99 }
100 pl := &PodTopologySpread{
101 parallelizer: h.Parallelizer(),
102 sharedLister: h.SnapshotSharedLister(),
103 defaultConstraints: args.DefaultConstraints,
104 enableNodeInclusionPolicyInPodTopologySpread: fts.EnableNodeInclusionPolicyInPodTopologySpread,
105 enableMatchLabelKeysInPodTopologySpread: fts.EnableMatchLabelKeysInPodTopologySpread,
106 }
107 if args.DefaultingType == config.SystemDefaulting {
108 pl.defaultConstraints = systemDefaultConstraints
109 pl.systemDefaulted = true
110 }
111 if len(pl.defaultConstraints) != 0 {
112 if h.SharedInformerFactory() == nil {
113 return nil, fmt.Errorf("SharedInformerFactory is nil")
114 }
115 pl.setListers(h.SharedInformerFactory())
116 }
117 return pl, nil
118 }
119
120 func getArgs(obj runtime.Object) (config.PodTopologySpreadArgs, error) {
121 ptr, ok := obj.(*config.PodTopologySpreadArgs)
122 if !ok {
123 return config.PodTopologySpreadArgs{}, fmt.Errorf("want args to be of type PodTopologySpreadArgs, got %T", obj)
124 }
125 return *ptr, nil
126 }
127
128 func (pl *PodTopologySpread) setListers(factory informers.SharedInformerFactory) {
129 pl.services = factory.Core().V1().Services().Lister()
130 pl.replicationCtrls = factory.Core().V1().ReplicationControllers().Lister()
131 pl.replicaSets = factory.Apps().V1().ReplicaSets().Lister()
132 pl.statefulSets = factory.Apps().V1().StatefulSets().Lister()
133 }
134
135
136
137 func (pl *PodTopologySpread) EventsToRegister() []framework.ClusterEventWithHint {
138 return []framework.ClusterEventWithHint{
139
140
141
142
143
144
145
146 {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.All}, QueueingHintFn: pl.isSchedulableAfterPodChange},
147
148
149
150
151
152
153
154
155
156
157
158 {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.Delete | framework.UpdateNodeLabel | framework.UpdateNodeTaint}, QueueingHintFn: pl.isSchedulableAfterNodeChange},
159 }
160 }
161
162 func involvedInTopologySpreading(incomingPod, podWithSpreading *v1.Pod) bool {
163 return incomingPod.Spec.NodeName != "" && incomingPod.Namespace == podWithSpreading.Namespace
164 }
165
166 func (pl *PodTopologySpread) isSchedulableAfterPodChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
167 originalPod, modifiedPod, err := util.As[*v1.Pod](oldObj, newObj)
168 if err != nil {
169 return framework.Queue, err
170 }
171
172 if (modifiedPod != nil && !involvedInTopologySpreading(modifiedPod, pod)) || (originalPod != nil && !involvedInTopologySpreading(originalPod, pod)) {
173 logger.V(5).Info("the added/updated/deleted pod is unscheduled or has different namespace with target pod, so it doesn't make the target pod schedulable",
174 "pod", klog.KObj(pod), "originalPod", klog.KObj(originalPod))
175 return framework.QueueSkip, nil
176 }
177
178 constraints, err := pl.getConstraints(pod)
179 if err != nil {
180 return framework.Queue, err
181 }
182
183
184 if modifiedPod != nil && originalPod != nil {
185 if reflect.DeepEqual(modifiedPod.Labels, originalPod.Labels) {
186 logger.V(5).Info("the updated pod is unscheduled or has no updated labels or has different namespace with target pod, so it doesn't make the target pod schedulable",
187 "pod", klog.KObj(pod), "modifiedPod", klog.KObj(modifiedPod))
188 return framework.QueueSkip, nil
189 }
190 for _, c := range constraints {
191 if c.Selector.Matches(labels.Set(originalPod.Labels)) != c.Selector.Matches(labels.Set(modifiedPod.Labels)) {
192
193
194 logger.V(5).Info("a scheduled pod's label was updated and it makes the updated pod match or unmatch the pod's topology spread constraints",
195 "pod", klog.KObj(pod), "modifiedPod", klog.KObj(modifiedPod))
196 return framework.Queue, nil
197 }
198 }
199
200 logger.V(5).Info("a scheduled pod's label was updated, but it's a change unrelated to the pod's topology spread constraints",
201 "pod", klog.KObj(pod), "modifiedPod", klog.KObj(modifiedPod))
202 return framework.QueueSkip, nil
203 }
204
205
206 if modifiedPod != nil {
207 if podLabelsMatchSpreadConstraints(constraints, modifiedPod.Labels) {
208 logger.V(5).Info("a scheduled pod was created and it matches with the pod's topology spread constraints",
209 "pod", klog.KObj(pod), "createdPod", klog.KObj(modifiedPod))
210 return framework.Queue, nil
211 }
212 logger.V(5).Info("a scheduled pod was created, but it doesn't matches with the pod's topology spread constraints",
213 "pod", klog.KObj(pod), "createdPod", klog.KObj(modifiedPod))
214 return framework.QueueSkip, nil
215 }
216
217
218 if podLabelsMatchSpreadConstraints(constraints, originalPod.Labels) {
219 logger.V(5).Info("a scheduled pod which matches with the pod's topology spread constraints was deleted, and the pod may be schedulable now",
220 "pod", klog.KObj(pod), "deletedPod", klog.KObj(originalPod))
221 return framework.Queue, nil
222 }
223 logger.V(5).Info("a scheduled pod was deleted, but it's unrelated to the pod's topology spread constraints",
224 "pod", klog.KObj(pod), "deletedPod", klog.KObj(originalPod))
225
226 return framework.QueueSkip, nil
227 }
228
229
230
231 func (pl *PodTopologySpread) getConstraints(pod *v1.Pod) ([]topologySpreadConstraint, error) {
232 var constraints []topologySpreadConstraint
233 var err error
234 if len(pod.Spec.TopologySpreadConstraints) > 0 {
235
236
237 constraints, err = pl.filterTopologySpreadConstraints(
238 pod.Spec.TopologySpreadConstraints,
239 pod.Labels,
240 v1.DoNotSchedule,
241 )
242 if err != nil {
243 return nil, fmt.Errorf("obtaining pod's hard topology spread constraints: %w", err)
244 }
245 } else {
246 constraints, err = pl.buildDefaultConstraints(pod, v1.DoNotSchedule)
247 if err != nil {
248 return nil, fmt.Errorf("setting default hard topology spread constraints: %w", err)
249 }
250 }
251 return constraints, nil
252 }
253
254 func (pl *PodTopologySpread) isSchedulableAfterNodeChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
255 originalNode, modifiedNode, err := util.As[*v1.Node](oldObj, newObj)
256 if err != nil {
257 return framework.Queue, err
258 }
259
260 constraints, err := pl.getConstraints(pod)
261 if err != nil {
262 return framework.Queue, err
263 }
264
265
266
267
268
269 if modifiedNode != nil {
270 if !nodeLabelsMatchSpreadConstraints(modifiedNode.Labels, constraints) {
271 logger.V(5).Info("the created/updated node doesn't match pod topology spread constraints",
272 "pod", klog.KObj(pod), "node", klog.KObj(modifiedNode))
273 return framework.QueueSkip, nil
274 }
275 logger.V(5).Info("node that match topology spread constraints was created/updated, and the pod may be schedulable now",
276 "pod", klog.KObj(pod), "node", klog.KObj(modifiedNode))
277 return framework.Queue, nil
278 }
279
280
281 if !nodeLabelsMatchSpreadConstraints(originalNode.Labels, constraints) {
282 logger.V(5).Info("the deleted node doesn't match pod topology spread constraints", "pod", klog.KObj(pod), "node", klog.KObj(originalNode))
283 return framework.QueueSkip, nil
284 }
285 logger.V(5).Info("node that match topology spread constraints was deleted, and the pod may be schedulable now",
286 "pod", klog.KObj(pod), "node", klog.KObj(originalNode))
287 return framework.Queue, nil
288 }
289
View as plain text