1
16
17 package volumerestrictions
18
19 import (
20 "context"
21 "fmt"
22
23 v1 "k8s.io/api/core/v1"
24 apierrors "k8s.io/apimachinery/pkg/api/errors"
25 "k8s.io/apimachinery/pkg/runtime"
26 "k8s.io/apimachinery/pkg/util/sets"
27 corelisters "k8s.io/client-go/listers/core/v1"
28 v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
29 "k8s.io/kubernetes/pkg/scheduler/framework"
30 "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
31 "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
32 )
33
34
35 type VolumeRestrictions struct {
36 pvcLister corelisters.PersistentVolumeClaimLister
37 sharedLister framework.SharedLister
38 }
39
40 var _ framework.PreFilterPlugin = &VolumeRestrictions{}
41 var _ framework.FilterPlugin = &VolumeRestrictions{}
42 var _ framework.EnqueueExtensions = &VolumeRestrictions{}
43 var _ framework.StateData = &preFilterState{}
44
45 const (
46
47 Name = names.VolumeRestrictions
48
49
50 preFilterStateKey = "PreFilter" + Name
51
52
53 ErrReasonDiskConflict = "node(s) had no available disk"
54
55 ErrReasonReadWriteOncePodConflict = "node has pod using PersistentVolumeClaim with the same name and ReadWriteOncePod access mode"
56 )
57
58
59 type preFilterState struct {
60
61 readWriteOncePodPVCs sets.Set[string]
62
63 conflictingPVCRefCount int
64 }
65
66 func (s *preFilterState) updateWithPod(podInfo *framework.PodInfo, multiplier int) {
67 s.conflictingPVCRefCount += multiplier * s.conflictingPVCRefCountForPod(podInfo)
68 }
69
70 func (s *preFilterState) conflictingPVCRefCountForPod(podInfo *framework.PodInfo) int {
71 conflicts := 0
72 for _, volume := range podInfo.Pod.Spec.Volumes {
73 if volume.PersistentVolumeClaim == nil {
74 continue
75 }
76 if s.readWriteOncePodPVCs.Has(volume.PersistentVolumeClaim.ClaimName) {
77 conflicts += 1
78 }
79 }
80 return conflicts
81 }
82
83
84 func (s *preFilterState) Clone() framework.StateData {
85 if s == nil {
86 return nil
87 }
88 return &preFilterState{
89 readWriteOncePodPVCs: s.readWriteOncePodPVCs,
90 conflictingPVCRefCount: s.conflictingPVCRefCount,
91 }
92 }
93
94
95 func (pl *VolumeRestrictions) Name() string {
96 return Name
97 }
98
99 func isVolumeConflict(volume *v1.Volume, pod *v1.Pod) bool {
100 for _, existingVolume := range pod.Spec.Volumes {
101
102 if volume.GCEPersistentDisk != nil && existingVolume.GCEPersistentDisk != nil {
103 disk, existingDisk := volume.GCEPersistentDisk, existingVolume.GCEPersistentDisk
104 if disk.PDName == existingDisk.PDName && !(disk.ReadOnly && existingDisk.ReadOnly) {
105 return true
106 }
107 }
108
109 if volume.AWSElasticBlockStore != nil && existingVolume.AWSElasticBlockStore != nil {
110 if volume.AWSElasticBlockStore.VolumeID == existingVolume.AWSElasticBlockStore.VolumeID {
111 return true
112 }
113 }
114
115 if volume.ISCSI != nil && existingVolume.ISCSI != nil {
116 iqn := volume.ISCSI.IQN
117 eiqn := existingVolume.ISCSI.IQN
118
119
120
121 if iqn == eiqn && !(volume.ISCSI.ReadOnly && existingVolume.ISCSI.ReadOnly) {
122 return true
123 }
124 }
125
126 if volume.RBD != nil && existingVolume.RBD != nil {
127 mon, pool, image := volume.RBD.CephMonitors, volume.RBD.RBDPool, volume.RBD.RBDImage
128 emon, epool, eimage := existingVolume.RBD.CephMonitors, existingVolume.RBD.RBDPool, existingVolume.RBD.RBDImage
129
130
131
132 if haveOverlap(mon, emon) && pool == epool && image == eimage && !(volume.RBD.ReadOnly && existingVolume.RBD.ReadOnly) {
133 return true
134 }
135 }
136 }
137
138 return false
139 }
140
141
142 func haveOverlap(a1, a2 []string) bool {
143 if len(a1) > len(a2) {
144 a1, a2 = a2, a1
145 }
146 m := sets.New(a1...)
147 for _, val := range a2 {
148 if _, ok := m[val]; ok {
149 return true
150 }
151 }
152
153 return false
154 }
155
156
157 func needsRestrictionsCheck(v v1.Volume) bool {
158 return v.GCEPersistentDisk != nil || v.AWSElasticBlockStore != nil || v.RBD != nil || v.ISCSI != nil
159 }
160
161
162 func (pl *VolumeRestrictions) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
163 needsCheck := false
164 for i := range pod.Spec.Volumes {
165 if needsRestrictionsCheck(pod.Spec.Volumes[i]) {
166 needsCheck = true
167 break
168 }
169 }
170
171 pvcs, err := pl.readWriteOncePodPVCsForPod(ctx, pod)
172 if err != nil {
173 if apierrors.IsNotFound(err) {
174 return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, err.Error())
175 }
176 return nil, framework.AsStatus(err)
177 }
178
179 s, err := pl.calPreFilterState(ctx, pod, pvcs)
180 if err != nil {
181 return nil, framework.AsStatus(err)
182 }
183
184 if !needsCheck && s.conflictingPVCRefCount == 0 {
185 return nil, framework.NewStatus(framework.Skip)
186 }
187 cycleState.Write(preFilterStateKey, s)
188 return nil, nil
189 }
190
191
192 func (pl *VolumeRestrictions) AddPod(ctx context.Context, cycleState *framework.CycleState, podToSchedule *v1.Pod, podInfoToAdd *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status {
193 state, err := getPreFilterState(cycleState)
194 if err != nil {
195 return framework.AsStatus(err)
196 }
197 state.updateWithPod(podInfoToAdd, 1)
198 return nil
199 }
200
201
202 func (pl *VolumeRestrictions) RemovePod(ctx context.Context, cycleState *framework.CycleState, podToSchedule *v1.Pod, podInfoToRemove *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status {
203 state, err := getPreFilterState(cycleState)
204 if err != nil {
205 return framework.AsStatus(err)
206 }
207 state.updateWithPod(podInfoToRemove, -1)
208 return nil
209 }
210
211 func getPreFilterState(cycleState *framework.CycleState) (*preFilterState, error) {
212 c, err := cycleState.Read(preFilterStateKey)
213 if err != nil {
214
215 return nil, fmt.Errorf("cannot read %q from cycleState", preFilterStateKey)
216 }
217
218 s, ok := c.(*preFilterState)
219 if !ok {
220 return nil, fmt.Errorf("%+v convert to volumerestrictions.state error", c)
221 }
222 return s, nil
223 }
224
225
226
227 func (pl *VolumeRestrictions) calPreFilterState(ctx context.Context, pod *v1.Pod, pvcs sets.Set[string]) (*preFilterState, error) {
228 conflictingPVCRefCount := 0
229 for pvc := range pvcs {
230 key := framework.GetNamespacedName(pod.Namespace, pvc)
231 if pl.sharedLister.StorageInfos().IsPVCUsedByPods(key) {
232
233 conflictingPVCRefCount += 1
234 }
235 }
236 return &preFilterState{
237 readWriteOncePodPVCs: pvcs,
238 conflictingPVCRefCount: conflictingPVCRefCount,
239 }, nil
240 }
241
242 func (pl *VolumeRestrictions) readWriteOncePodPVCsForPod(ctx context.Context, pod *v1.Pod) (sets.Set[string], error) {
243 pvcs := sets.New[string]()
244 for _, volume := range pod.Spec.Volumes {
245 if volume.PersistentVolumeClaim == nil {
246 continue
247 }
248
249 pvc, err := pl.pvcLister.PersistentVolumeClaims(pod.Namespace).Get(volume.PersistentVolumeClaim.ClaimName)
250 if err != nil {
251 return nil, err
252 }
253
254 if !v1helper.ContainsAccessMode(pvc.Spec.AccessModes, v1.ReadWriteOncePod) {
255 continue
256 }
257 pvcs.Insert(pvc.Name)
258 }
259 return pvcs, nil
260 }
261
262
263
264 func satisfyVolumeConflicts(pod *v1.Pod, nodeInfo *framework.NodeInfo) bool {
265 for i := range pod.Spec.Volumes {
266 v := pod.Spec.Volumes[i]
267 if !needsRestrictionsCheck(v) {
268 continue
269 }
270 for _, ev := range nodeInfo.Pods {
271 if isVolumeConflict(&v, ev.Pod) {
272 return false
273 }
274 }
275 }
276 return true
277 }
278
279
280 func satisfyReadWriteOncePod(ctx context.Context, state *preFilterState) *framework.Status {
281 if state == nil {
282 return nil
283 }
284 if state.conflictingPVCRefCount > 0 {
285 return framework.NewStatus(framework.Unschedulable, ErrReasonReadWriteOncePodConflict)
286 }
287 return nil
288 }
289
290
291 func (pl *VolumeRestrictions) PreFilterExtensions() framework.PreFilterExtensions {
292 return pl
293 }
294
295
296
297
298
299
300
301
302
303
304
305
306 func (pl *VolumeRestrictions) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
307 if !satisfyVolumeConflicts(pod, nodeInfo) {
308 return framework.NewStatus(framework.Unschedulable, ErrReasonDiskConflict)
309 }
310 state, err := getPreFilterState(cycleState)
311 if err != nil {
312 return framework.AsStatus(err)
313 }
314 return satisfyReadWriteOncePod(ctx, state)
315 }
316
317
318
319 func (pl *VolumeRestrictions) EventsToRegister() []framework.ClusterEventWithHint {
320 return []framework.ClusterEventWithHint{
321
322
323
324 {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Delete}},
325
326 {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add}},
327
328
329 {Event: framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add | framework.Update}},
330 }
331 }
332
333
334 func New(_ context.Context, _ runtime.Object, handle framework.Handle, fts feature.Features) (framework.Plugin, error) {
335 informerFactory := handle.SharedInformerFactory()
336 pvcLister := informerFactory.Core().V1().PersistentVolumeClaims().Lister()
337 sharedLister := handle.SnapshotSharedLister()
338
339 return &VolumeRestrictions{
340 pvcLister: pvcLister,
341 sharedLister: sharedLister,
342 }, nil
343 }
344
View as plain text