1
16
17 package volumezone
18
19 import (
20 "context"
21 "errors"
22 "fmt"
23
24 v1 "k8s.io/api/core/v1"
25 storage "k8s.io/api/storage/v1"
26 apierrors "k8s.io/apimachinery/pkg/api/errors"
27 "k8s.io/apimachinery/pkg/runtime"
28 "k8s.io/apimachinery/pkg/util/sets"
29 corelisters "k8s.io/client-go/listers/core/v1"
30 storagelisters "k8s.io/client-go/listers/storage/v1"
31 volumehelpers "k8s.io/cloud-provider/volume/helpers"
32 storagehelpers "k8s.io/component-helpers/storage/volume"
33 "k8s.io/klog/v2"
34 "k8s.io/kubernetes/pkg/scheduler/framework"
35 "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
36 )
37
38
39 type VolumeZone struct {
40 pvLister corelisters.PersistentVolumeLister
41 pvcLister corelisters.PersistentVolumeClaimLister
42 scLister storagelisters.StorageClassLister
43 }
44
45 var _ framework.FilterPlugin = &VolumeZone{}
46 var _ framework.PreFilterPlugin = &VolumeZone{}
47 var _ framework.EnqueueExtensions = &VolumeZone{}
48
49 const (
50
51 Name = names.VolumeZone
52
53 preFilterStateKey framework.StateKey = "PreFilter" + Name
54
55
56 ErrReasonConflict = "node(s) had no available volume zone"
57 )
58
59
60 type pvTopology struct {
61 pvName string
62 key string
63 values sets.Set[string]
64 }
65
66
67
68
69 type stateData struct {
70
71
72 podPVTopologies []pvTopology
73 }
74
75 func (d *stateData) Clone() framework.StateData {
76 return d
77 }
78
79 var topologyLabels = []string{
80 v1.LabelFailureDomainBetaZone,
81 v1.LabelFailureDomainBetaRegion,
82 v1.LabelTopologyZone,
83 v1.LabelTopologyRegion,
84 }
85
86 func translateToGALabel(label string) string {
87 if label == v1.LabelFailureDomainBetaRegion {
88 return v1.LabelTopologyRegion
89 }
90 if label == v1.LabelFailureDomainBetaZone {
91 return v1.LabelTopologyZone
92 }
93 return label
94 }
95
96
97 func (pl *VolumeZone) Name() string {
98 return Name
99 }
100
101
102
103
104
105
106
107 func (pl *VolumeZone) PreFilter(ctx context.Context, cs *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
108 podPVTopologies, status := pl.getPVbyPod(ctx, pod)
109 if !status.IsSuccess() {
110 return nil, status
111 }
112 if len(podPVTopologies) == 0 {
113 return nil, framework.NewStatus(framework.Skip)
114 }
115 cs.Write(preFilterStateKey, &stateData{podPVTopologies: podPVTopologies})
116 return nil, nil
117 }
118
119 func (pl *VolumeZone) getPVbyPod(ctx context.Context, pod *v1.Pod) ([]pvTopology, *framework.Status) {
120 logger := klog.FromContext(ctx)
121 podPVTopologies := make([]pvTopology, 0)
122
123 for i := range pod.Spec.Volumes {
124 volume := pod.Spec.Volumes[i]
125 if volume.PersistentVolumeClaim == nil {
126 continue
127 }
128 pvcName := volume.PersistentVolumeClaim.ClaimName
129 if pvcName == "" {
130 return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, "PersistentVolumeClaim had no name")
131 }
132 pvc, err := pl.pvcLister.PersistentVolumeClaims(pod.Namespace).Get(pvcName)
133 if s := getErrorAsStatus(err); !s.IsSuccess() {
134 return nil, s
135 }
136
137 pvName := pvc.Spec.VolumeName
138 if pvName == "" {
139 scName := storagehelpers.GetPersistentVolumeClaimClass(pvc)
140 if len(scName) == 0 {
141 return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, "PersistentVolumeClaim had no pv name and storageClass name")
142 }
143
144 class, err := pl.scLister.Get(scName)
145 if s := getErrorAsStatus(err); !s.IsSuccess() {
146 return nil, s
147 }
148 if class.VolumeBindingMode == nil {
149 return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("VolumeBindingMode not set for StorageClass %q", scName))
150 }
151 if *class.VolumeBindingMode == storage.VolumeBindingWaitForFirstConsumer {
152
153 continue
154 }
155
156 return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, "PersistentVolume had no name")
157 }
158
159 pv, err := pl.pvLister.Get(pvName)
160 if s := getErrorAsStatus(err); !s.IsSuccess() {
161 return nil, s
162 }
163
164 for _, key := range topologyLabels {
165 if value, ok := pv.ObjectMeta.Labels[key]; ok {
166 volumeVSet, err := volumehelpers.LabelZonesToSet(value)
167 if err != nil {
168 logger.Info("Failed to parse label, ignoring the label", "label", fmt.Sprintf("%s:%s", key, value), "err", err)
169 continue
170 }
171 podPVTopologies = append(podPVTopologies, pvTopology{
172 pvName: pv.Name,
173 key: key,
174 values: sets.Set[string](volumeVSet),
175 })
176 }
177 }
178 }
179 return podPVTopologies, nil
180 }
181
182
183 func (pl *VolumeZone) PreFilterExtensions() framework.PreFilterExtensions {
184 return nil
185 }
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203 func (pl *VolumeZone) Filter(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
204 logger := klog.FromContext(ctx)
205
206
207 if len(pod.Spec.Volumes) == 0 {
208 return nil
209 }
210 var podPVTopologies []pvTopology
211 state, err := getStateData(cs)
212 if err != nil {
213
214 var status *framework.Status
215 podPVTopologies, status = pl.getPVbyPod(ctx, pod)
216 if !status.IsSuccess() {
217 return status
218 }
219 } else {
220 podPVTopologies = state.podPVTopologies
221 }
222
223 node := nodeInfo.Node()
224 hasAnyNodeConstraint := false
225 for _, topologyLabel := range topologyLabels {
226 if _, ok := node.Labels[topologyLabel]; ok {
227 hasAnyNodeConstraint = true
228 break
229 }
230 }
231
232 if !hasAnyNodeConstraint {
233
234
235 return nil
236 }
237
238 for _, pvTopology := range podPVTopologies {
239 v, ok := node.Labels[pvTopology.key]
240 if !ok {
241
242 v, ok = node.Labels[translateToGALabel(pvTopology.key)]
243 }
244 if !ok || !pvTopology.values.Has(v) {
245 logger.V(10).Info("Won't schedule pod onto node due to volume (mismatch on label key)", "pod", klog.KObj(pod), "node", klog.KObj(node), "PV", klog.KRef("", pvTopology.pvName), "PVLabelKey", pvTopology.key)
246 return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonConflict)
247 }
248 }
249
250 return nil
251 }
252
253 func getStateData(cs *framework.CycleState) (*stateData, error) {
254 state, err := cs.Read(preFilterStateKey)
255 if err != nil {
256 return nil, err
257 }
258 s, ok := state.(*stateData)
259 if !ok {
260 return nil, errors.New("unable to convert state into stateData")
261 }
262 return s, nil
263 }
264
265 func getErrorAsStatus(err error) *framework.Status {
266 if err != nil {
267 if apierrors.IsNotFound(err) {
268 return framework.NewStatus(framework.UnschedulableAndUnresolvable, err.Error())
269 }
270 return framework.AsStatus(err)
271 }
272 return nil
273 }
274
275
276
277 func (pl *VolumeZone) EventsToRegister() []framework.ClusterEventWithHint {
278 return []framework.ClusterEventWithHint{
279
280
281 {Event: framework.ClusterEvent{Resource: framework.StorageClass, ActionType: framework.Add}},
282
283
284
285
286
287
288
289
290
291
292 {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel | framework.UpdateNodeTaint}},
293
294
295 {Event: framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add}},
296
297 {Event: framework.ClusterEvent{Resource: framework.PersistentVolume, ActionType: framework.Add | framework.Update}},
298 }
299 }
300
301
302 func New(_ context.Context, _ runtime.Object, handle framework.Handle) (framework.Plugin, error) {
303 informerFactory := handle.SharedInformerFactory()
304 pvLister := informerFactory.Core().V1().PersistentVolumes().Lister()
305 pvcLister := informerFactory.Core().V1().PersistentVolumeClaims().Lister()
306 scLister := informerFactory.Storage().V1().StorageClasses().Lister()
307 return &VolumeZone{
308 pvLister,
309 pvcLister,
310 scLister,
311 }, nil
312 }
313
View as plain text