1
16
17 package nodevolumelimits
18
19 import (
20 "context"
21 "errors"
22 "fmt"
23 "os"
24 "regexp"
25 "strconv"
26
27 v1 "k8s.io/api/core/v1"
28 storage "k8s.io/api/storage/v1"
29 apierrors "k8s.io/apimachinery/pkg/api/errors"
30 "k8s.io/apimachinery/pkg/runtime"
31 "k8s.io/apimachinery/pkg/util/rand"
32 "k8s.io/apimachinery/pkg/util/sets"
33 "k8s.io/client-go/informers"
34 corelisters "k8s.io/client-go/listers/core/v1"
35 storagelisters "k8s.io/client-go/listers/storage/v1"
36 "k8s.io/component-helpers/storage/ephemeral"
37 csilibplugins "k8s.io/csi-translation-lib/plugins"
38 "k8s.io/klog/v2"
39 "k8s.io/kubernetes/pkg/scheduler/framework"
40 "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
41 "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
42 volumeutil "k8s.io/kubernetes/pkg/volume/util"
43 )
44
45 const (
46
47
48 defaultMaxGCEPDVolumes = 16
49
50
51
52 defaultMaxAzureDiskVolumes = 16
53
54
55 ebsVolumeFilterType = "EBS"
56
57 gcePDVolumeFilterType = "GCE"
58
59 azureDiskVolumeFilterType = "AzureDisk"
60
61 cinderVolumeFilterType = "Cinder"
62
63
64 ErrReasonMaxVolumeCountExceeded = "node(s) exceed max volume count"
65
66
67 KubeMaxPDVols = "KUBE_MAX_PD_VOLS"
68 )
69
70
71 const AzureDiskName = names.AzureDiskLimits
72
73
74 func NewAzureDisk(ctx context.Context, _ runtime.Object, handle framework.Handle, fts feature.Features) (framework.Plugin, error) {
75 informerFactory := handle.SharedInformerFactory()
76 return newNonCSILimitsWithInformerFactory(ctx, azureDiskVolumeFilterType, informerFactory, fts), nil
77 }
78
79
80 const CinderName = names.CinderLimits
81
82
83 func NewCinder(ctx context.Context, _ runtime.Object, handle framework.Handle, fts feature.Features) (framework.Plugin, error) {
84 informerFactory := handle.SharedInformerFactory()
85 return newNonCSILimitsWithInformerFactory(ctx, cinderVolumeFilterType, informerFactory, fts), nil
86 }
87
88
89 const EBSName = names.EBSLimits
90
91
92 func NewEBS(ctx context.Context, _ runtime.Object, handle framework.Handle, fts feature.Features) (framework.Plugin, error) {
93 informerFactory := handle.SharedInformerFactory()
94 return newNonCSILimitsWithInformerFactory(ctx, ebsVolumeFilterType, informerFactory, fts), nil
95 }
96
97
98 const GCEPDName = names.GCEPDLimits
99
100
101 func NewGCEPD(ctx context.Context, _ runtime.Object, handle framework.Handle, fts feature.Features) (framework.Plugin, error) {
102 informerFactory := handle.SharedInformerFactory()
103 return newNonCSILimitsWithInformerFactory(ctx, gcePDVolumeFilterType, informerFactory, fts), nil
104 }
105
106
107 type nonCSILimits struct {
108 name string
109 filter VolumeFilter
110 volumeLimitKey v1.ResourceName
111 maxVolumeFunc func(node *v1.Node) int
112 csiNodeLister storagelisters.CSINodeLister
113 pvLister corelisters.PersistentVolumeLister
114 pvcLister corelisters.PersistentVolumeClaimLister
115 scLister storagelisters.StorageClassLister
116
117
118
119
120 randomVolumeIDPrefix string
121 }
122
123 var _ framework.PreFilterPlugin = &nonCSILimits{}
124 var _ framework.FilterPlugin = &nonCSILimits{}
125 var _ framework.EnqueueExtensions = &nonCSILimits{}
126
127
128 func newNonCSILimitsWithInformerFactory(
129 ctx context.Context,
130 filterName string,
131 informerFactory informers.SharedInformerFactory,
132 fts feature.Features,
133 ) framework.Plugin {
134 pvLister := informerFactory.Core().V1().PersistentVolumes().Lister()
135 pvcLister := informerFactory.Core().V1().PersistentVolumeClaims().Lister()
136 csiNodesLister := informerFactory.Storage().V1().CSINodes().Lister()
137 scLister := informerFactory.Storage().V1().StorageClasses().Lister()
138
139 return newNonCSILimits(ctx, filterName, csiNodesLister, scLister, pvLister, pvcLister, fts)
140 }
141
142
143
144
145
146
147
148
149
150
151
152 func newNonCSILimits(
153 ctx context.Context,
154 filterName string,
155 csiNodeLister storagelisters.CSINodeLister,
156 scLister storagelisters.StorageClassLister,
157 pvLister corelisters.PersistentVolumeLister,
158 pvcLister corelisters.PersistentVolumeClaimLister,
159 fts feature.Features,
160 ) framework.Plugin {
161 logger := klog.FromContext(ctx)
162 var filter VolumeFilter
163 var volumeLimitKey v1.ResourceName
164 var name string
165
166 switch filterName {
167 case ebsVolumeFilterType:
168 name = EBSName
169 filter = ebsVolumeFilter
170 volumeLimitKey = v1.ResourceName(volumeutil.EBSVolumeLimitKey)
171 case gcePDVolumeFilterType:
172 name = GCEPDName
173 filter = gcePDVolumeFilter
174 volumeLimitKey = v1.ResourceName(volumeutil.GCEVolumeLimitKey)
175 case azureDiskVolumeFilterType:
176 name = AzureDiskName
177 filter = azureDiskVolumeFilter
178 volumeLimitKey = v1.ResourceName(volumeutil.AzureVolumeLimitKey)
179 case cinderVolumeFilterType:
180 name = CinderName
181 filter = cinderVolumeFilter
182 volumeLimitKey = v1.ResourceName(volumeutil.CinderVolumeLimitKey)
183 default:
184 logger.Error(errors.New("wrong filterName"), "Cannot create nonCSILimits plugin")
185 return nil
186 }
187 pl := &nonCSILimits{
188 name: name,
189 filter: filter,
190 volumeLimitKey: volumeLimitKey,
191 maxVolumeFunc: getMaxVolumeFunc(logger, filterName),
192 csiNodeLister: csiNodeLister,
193 pvLister: pvLister,
194 pvcLister: pvcLister,
195 scLister: scLister,
196 randomVolumeIDPrefix: rand.String(32),
197 }
198
199 return pl
200 }
201
202
203 func (pl *nonCSILimits) Name() string {
204 return pl.name
205 }
206
207
208
209 func (pl *nonCSILimits) EventsToRegister() []framework.ClusterEventWithHint {
210 return []framework.ClusterEventWithHint{
211 {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add}},
212 {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Delete}},
213 {Event: framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add}},
214 }
215 }
216
217
218
219
220 func (pl *nonCSILimits) PreFilter(ctx context.Context, _ *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
221 volumes := pod.Spec.Volumes
222 for i := range volumes {
223 vol := &volumes[i]
224 _, ok := pl.filter.FilterVolume(vol)
225 if ok || vol.PersistentVolumeClaim != nil || vol.Ephemeral != nil {
226 return nil, nil
227 }
228 }
229
230 return nil, framework.NewStatus(framework.Skip)
231 }
232
233
234 func (pl *nonCSILimits) PreFilterExtensions() framework.PreFilterExtensions {
235 return nil
236 }
237
238
239 func (pl *nonCSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
240
241
242 if len(pod.Spec.Volumes) == 0 {
243 return nil
244 }
245
246 logger := klog.FromContext(ctx)
247 newVolumes := sets.New[string]()
248 if err := pl.filterVolumes(logger, pod, true , newVolumes); err != nil {
249 if apierrors.IsNotFound(err) {
250
251 return framework.NewStatus(framework.UnschedulableAndUnresolvable, err.Error())
252 }
253 return framework.AsStatus(err)
254 }
255
256
257 if len(newVolumes) == 0 {
258 return nil
259 }
260
261 node := nodeInfo.Node()
262
263 var csiNode *storage.CSINode
264 var err error
265 if pl.csiNodeLister != nil {
266 csiNode, err = pl.csiNodeLister.Get(node.Name)
267 if err != nil {
268
269
270 logger.V(5).Info("Could not get a CSINode object for the node", "node", klog.KObj(node), "err", err)
271 }
272 }
273
274
275 if pl.filter.IsMigrated(csiNode) {
276 return nil
277 }
278
279
280 existingVolumes := sets.New[string]()
281 for _, existingPod := range nodeInfo.Pods {
282 if err := pl.filterVolumes(logger, existingPod.Pod, false , existingVolumes); err != nil {
283 return framework.AsStatus(err)
284 }
285 }
286 numExistingVolumes := len(existingVolumes)
287
288
289 for k := range existingVolumes {
290 delete(newVolumes, k)
291 }
292
293 numNewVolumes := len(newVolumes)
294 maxAttachLimit := pl.maxVolumeFunc(node)
295 volumeLimits := volumeLimits(nodeInfo)
296 if maxAttachLimitFromAllocatable, ok := volumeLimits[pl.volumeLimitKey]; ok {
297 maxAttachLimit = int(maxAttachLimitFromAllocatable)
298 }
299
300 if numExistingVolumes+numNewVolumes > maxAttachLimit {
301 return framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded)
302 }
303 return nil
304 }
305
306 func (pl *nonCSILimits) filterVolumes(logger klog.Logger, pod *v1.Pod, newPod bool, filteredVolumes sets.Set[string]) error {
307 volumes := pod.Spec.Volumes
308 for i := range volumes {
309 vol := &volumes[i]
310 if id, ok := pl.filter.FilterVolume(vol); ok {
311 filteredVolumes.Insert(id)
312 continue
313 }
314
315 pvcName := ""
316 isEphemeral := false
317 switch {
318 case vol.PersistentVolumeClaim != nil:
319 pvcName = vol.PersistentVolumeClaim.ClaimName
320 case vol.Ephemeral != nil:
321
322
323
324
325 pvcName = ephemeral.VolumeClaimName(pod, vol)
326 isEphemeral = true
327 default:
328 continue
329 }
330 if pvcName == "" {
331 return fmt.Errorf("PersistentVolumeClaim had no name")
332 }
333
334
335
336
337 pvID := fmt.Sprintf("%s-%s/%s", pl.randomVolumeIDPrefix, pod.Namespace, pvcName)
338
339 pvc, err := pl.pvcLister.PersistentVolumeClaims(pod.Namespace).Get(pvcName)
340 if err != nil {
341 if newPod {
342
343
344
345 return fmt.Errorf("looking up PVC %s/%s: %w", pod.Namespace, pvcName, err)
346 }
347
348
349 logger.V(4).Info("Unable to look up PVC info, assuming PVC doesn't match predicate when counting limits", "pod", klog.KObj(pod), "PVC", klog.KRef(pod.Namespace, pvcName), "err", err)
350 continue
351 }
352
353
354 if isEphemeral {
355 if err := ephemeral.VolumeIsForPod(pod, pvc); err != nil {
356 return err
357 }
358 }
359
360 pvName := pvc.Spec.VolumeName
361 if pvName == "" {
362
363
364
365
366 if pl.matchProvisioner(pvc) {
367 logger.V(4).Info("PVC is not bound, assuming PVC matches predicate when counting limits", "pod", klog.KObj(pod), "PVC", klog.KRef(pod.Namespace, pvcName))
368 filteredVolumes.Insert(pvID)
369 }
370 continue
371 }
372
373 pv, err := pl.pvLister.Get(pvName)
374 if err != nil {
375
376
377 if pl.matchProvisioner(pvc) {
378 logger.V(4).Info("Unable to look up PV, assuming PV matches predicate when counting limits", "pod", klog.KObj(pod), "PVC", klog.KRef(pod.Namespace, pvcName), "PV", klog.KRef("", pvName), "err", err)
379 filteredVolumes.Insert(pvID)
380 }
381 continue
382 }
383
384 if id, ok := pl.filter.FilterPersistentVolume(pv); ok {
385 filteredVolumes.Insert(id)
386 }
387 }
388
389 return nil
390 }
391
392
393 func (pl *nonCSILimits) matchProvisioner(pvc *v1.PersistentVolumeClaim) bool {
394 if pvc.Spec.StorageClassName == nil {
395 return false
396 }
397
398 storageClass, err := pl.scLister.Get(*pvc.Spec.StorageClassName)
399 if err != nil || storageClass == nil {
400 return false
401 }
402
403 return pl.filter.MatchProvisioner(storageClass)
404 }
405
406
407 func getMaxVolLimitFromEnv(logger klog.Logger) int {
408 if rawMaxVols := os.Getenv(KubeMaxPDVols); rawMaxVols != "" {
409 if parsedMaxVols, err := strconv.Atoi(rawMaxVols); err != nil {
410 logger.Error(err, "Unable to parse maximum PD volumes value, using default")
411 } else if parsedMaxVols <= 0 {
412 logger.Error(errors.New("maximum PD volumes is negative"), "Unable to parse maximum PD volumes value, using default")
413 } else {
414 return parsedMaxVols
415 }
416 }
417
418 return -1
419 }
420
421
422 type VolumeFilter struct {
423
424 FilterVolume func(vol *v1.Volume) (id string, relevant bool)
425 FilterPersistentVolume func(pv *v1.PersistentVolume) (id string, relevant bool)
426
427 MatchProvisioner func(sc *storage.StorageClass) (relevant bool)
428
429 IsMigrated func(csiNode *storage.CSINode) bool
430 }
431
432
433 var ebsVolumeFilter = VolumeFilter{
434 FilterVolume: func(vol *v1.Volume) (string, bool) {
435 if vol.AWSElasticBlockStore != nil {
436 return vol.AWSElasticBlockStore.VolumeID, true
437 }
438 return "", false
439 },
440
441 FilterPersistentVolume: func(pv *v1.PersistentVolume) (string, bool) {
442 if pv.Spec.AWSElasticBlockStore != nil {
443 return pv.Spec.AWSElasticBlockStore.VolumeID, true
444 }
445 return "", false
446 },
447
448 MatchProvisioner: func(sc *storage.StorageClass) bool {
449 return sc.Provisioner == csilibplugins.AWSEBSInTreePluginName
450 },
451
452 IsMigrated: func(csiNode *storage.CSINode) bool {
453 return isCSIMigrationOn(csiNode, csilibplugins.AWSEBSInTreePluginName)
454 },
455 }
456
457
458 var gcePDVolumeFilter = VolumeFilter{
459 FilterVolume: func(vol *v1.Volume) (string, bool) {
460 if vol.GCEPersistentDisk != nil {
461 return vol.GCEPersistentDisk.PDName, true
462 }
463 return "", false
464 },
465
466 FilterPersistentVolume: func(pv *v1.PersistentVolume) (string, bool) {
467 if pv.Spec.GCEPersistentDisk != nil {
468 return pv.Spec.GCEPersistentDisk.PDName, true
469 }
470 return "", false
471 },
472
473 MatchProvisioner: func(sc *storage.StorageClass) bool {
474 return sc.Provisioner == csilibplugins.GCEPDInTreePluginName
475 },
476
477 IsMigrated: func(csiNode *storage.CSINode) bool {
478 return isCSIMigrationOn(csiNode, csilibplugins.GCEPDInTreePluginName)
479 },
480 }
481
482
483 var azureDiskVolumeFilter = VolumeFilter{
484 FilterVolume: func(vol *v1.Volume) (string, bool) {
485 if vol.AzureDisk != nil {
486 return vol.AzureDisk.DiskName, true
487 }
488 return "", false
489 },
490
491 FilterPersistentVolume: func(pv *v1.PersistentVolume) (string, bool) {
492 if pv.Spec.AzureDisk != nil {
493 return pv.Spec.AzureDisk.DiskName, true
494 }
495 return "", false
496 },
497
498 MatchProvisioner: func(sc *storage.StorageClass) bool {
499 return sc.Provisioner == csilibplugins.AzureDiskInTreePluginName
500 },
501
502 IsMigrated: func(csiNode *storage.CSINode) bool {
503 return isCSIMigrationOn(csiNode, csilibplugins.AzureDiskInTreePluginName)
504 },
505 }
506
507
508
509 var cinderVolumeFilter = VolumeFilter{
510 FilterVolume: func(vol *v1.Volume) (string, bool) {
511 if vol.Cinder != nil {
512 return vol.Cinder.VolumeID, true
513 }
514 return "", false
515 },
516
517 FilterPersistentVolume: func(pv *v1.PersistentVolume) (string, bool) {
518 if pv.Spec.Cinder != nil {
519 return pv.Spec.Cinder.VolumeID, true
520 }
521 return "", false
522 },
523
524 MatchProvisioner: func(sc *storage.StorageClass) bool {
525 return sc.Provisioner == csilibplugins.CinderInTreePluginName
526 },
527
528 IsMigrated: func(csiNode *storage.CSINode) bool {
529 return isCSIMigrationOn(csiNode, csilibplugins.CinderInTreePluginName)
530 },
531 }
532
533 func getMaxVolumeFunc(logger klog.Logger, filterName string) func(node *v1.Node) int {
534 return func(node *v1.Node) int {
535 maxVolumesFromEnv := getMaxVolLimitFromEnv(logger)
536 if maxVolumesFromEnv > 0 {
537 return maxVolumesFromEnv
538 }
539
540 var nodeInstanceType string
541 for k, v := range node.ObjectMeta.Labels {
542 if k == v1.LabelInstanceType || k == v1.LabelInstanceTypeStable {
543 nodeInstanceType = v
544 break
545 }
546 }
547 switch filterName {
548 case ebsVolumeFilterType:
549 return getMaxEBSVolume(nodeInstanceType)
550 case gcePDVolumeFilterType:
551 return defaultMaxGCEPDVolumes
552 case azureDiskVolumeFilterType:
553 return defaultMaxAzureDiskVolumes
554 case cinderVolumeFilterType:
555 return volumeutil.DefaultMaxCinderVolumes
556 default:
557 return -1
558 }
559 }
560 }
561
562 func getMaxEBSVolume(nodeInstanceType string) int {
563 if ok, _ := regexp.MatchString(volumeutil.EBSNitroLimitRegex, nodeInstanceType); ok {
564 return volumeutil.DefaultMaxEBSNitroVolumeLimit
565 }
566 return volumeutil.DefaultMaxEBSVolumes
567 }
568
View as plain text