1
16
17 package nodevolumelimits
18
19 import (
20 "context"
21 "fmt"
22
23 v1 "k8s.io/api/core/v1"
24 storagev1 "k8s.io/api/storage/v1"
25 apierrors "k8s.io/apimachinery/pkg/api/errors"
26 "k8s.io/apimachinery/pkg/runtime"
27 "k8s.io/apimachinery/pkg/util/rand"
28 corelisters "k8s.io/client-go/listers/core/v1"
29 storagelisters "k8s.io/client-go/listers/storage/v1"
30 ephemeral "k8s.io/component-helpers/storage/ephemeral"
31 storagehelpers "k8s.io/component-helpers/storage/volume"
32 csitrans "k8s.io/csi-translation-lib"
33 "k8s.io/klog/v2"
34 "k8s.io/kubernetes/pkg/scheduler/framework"
35 "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
36 "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
37 volumeutil "k8s.io/kubernetes/pkg/volume/util"
38 )
39
40
41
42 type InTreeToCSITranslator interface {
43 IsPVMigratable(pv *v1.PersistentVolume) bool
44 IsInlineMigratable(vol *v1.Volume) bool
45 IsMigratableIntreePluginByName(inTreePluginName string) bool
46 GetInTreePluginNameFromSpec(pv *v1.PersistentVolume, vol *v1.Volume) (string, error)
47 GetCSINameFromInTreeName(pluginName string) (string, error)
48 TranslateInTreePVToCSI(pv *v1.PersistentVolume) (*v1.PersistentVolume, error)
49 TranslateInTreeInlineVolumeToCSI(volume *v1.Volume, podNamespace string) (*v1.PersistentVolume, error)
50 }
51
52
53 type CSILimits struct {
54 csiNodeLister storagelisters.CSINodeLister
55 pvLister corelisters.PersistentVolumeLister
56 pvcLister corelisters.PersistentVolumeClaimLister
57 scLister storagelisters.StorageClassLister
58
59 randomVolumeIDPrefix string
60
61 translator InTreeToCSITranslator
62 }
63
64 var _ framework.PreFilterPlugin = &CSILimits{}
65 var _ framework.FilterPlugin = &CSILimits{}
66 var _ framework.EnqueueExtensions = &CSILimits{}
67
68
69 const CSIName = names.NodeVolumeLimits
70
71
72 func (pl *CSILimits) Name() string {
73 return CSIName
74 }
75
76
77
78 func (pl *CSILimits) EventsToRegister() []framework.ClusterEventWithHint {
79 return []framework.ClusterEventWithHint{
80
81
82 {Event: framework.ClusterEvent{Resource: framework.CSINode, ActionType: framework.Add}},
83 {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Delete}},
84 {Event: framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add}},
85 }
86 }
87
88
89
90
91 func (pl *CSILimits) PreFilter(ctx context.Context, _ *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
92 volumes := pod.Spec.Volumes
93 for i := range volumes {
94 vol := &volumes[i]
95 if vol.PersistentVolumeClaim != nil || vol.Ephemeral != nil || pl.translator.IsInlineMigratable(vol) {
96 return nil, nil
97 }
98 }
99
100 return nil, framework.NewStatus(framework.Skip)
101 }
102
103
104 func (pl *CSILimits) PreFilterExtensions() framework.PreFilterExtensions {
105 return nil
106 }
107
108
109 func (pl *CSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
110
111 if len(pod.Spec.Volumes) == 0 {
112 return nil
113 }
114
115 node := nodeInfo.Node()
116
117 logger := klog.FromContext(ctx)
118
119
120 csiNode, err := pl.csiNodeLister.Get(node.Name)
121 if err != nil {
122
123 logger.V(5).Info("Could not get a CSINode object for the node", "node", klog.KObj(node), "err", err)
124 }
125
126 newVolumes := make(map[string]string)
127 if err := pl.filterAttachableVolumes(logger, pod, csiNode, true , newVolumes); err != nil {
128 if apierrors.IsNotFound(err) {
129
130 return framework.NewStatus(framework.UnschedulableAndUnresolvable, err.Error())
131 }
132 return framework.AsStatus(err)
133 }
134
135
136 if len(newVolumes) == 0 {
137 return nil
138 }
139
140
141 nodeVolumeLimits := getVolumeLimits(nodeInfo, csiNode)
142 if len(nodeVolumeLimits) == 0 {
143 return nil
144 }
145
146 attachedVolumes := make(map[string]string)
147 for _, existingPod := range nodeInfo.Pods {
148 if err := pl.filterAttachableVolumes(logger, existingPod.Pod, csiNode, false , attachedVolumes); err != nil {
149 return framework.AsStatus(err)
150 }
151 }
152
153 attachedVolumeCount := map[string]int{}
154 for volumeUniqueName, volumeLimitKey := range attachedVolumes {
155
156 delete(newVolumes, volumeUniqueName)
157 attachedVolumeCount[volumeLimitKey]++
158 }
159
160 newVolumeCount := map[string]int{}
161 for _, volumeLimitKey := range newVolumes {
162 newVolumeCount[volumeLimitKey]++
163 }
164
165 for volumeLimitKey, count := range newVolumeCount {
166 maxVolumeLimit, ok := nodeVolumeLimits[v1.ResourceName(volumeLimitKey)]
167 if ok {
168 currentVolumeCount := attachedVolumeCount[volumeLimitKey]
169 logger.V(5).Info("Found plugin volume limits", "node", node.Name, "volumeLimitKey", volumeLimitKey,
170 "maxLimits", maxVolumeLimit, "currentVolumeCount", currentVolumeCount, "newVolumeCount", count,
171 "pod", klog.KObj(pod))
172 if currentVolumeCount+count > int(maxVolumeLimit) {
173 return framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded)
174 }
175 }
176 }
177
178 return nil
179 }
180
181 func (pl *CSILimits) filterAttachableVolumes(
182 logger klog.Logger, pod *v1.Pod, csiNode *storagev1.CSINode, newPod bool, result map[string]string) error {
183 for _, vol := range pod.Spec.Volumes {
184 pvcName := ""
185 isEphemeral := false
186 switch {
187 case vol.PersistentVolumeClaim != nil:
188
189 pvcName = vol.PersistentVolumeClaim.ClaimName
190 case vol.Ephemeral != nil:
191
192
193
194
195 pvcName = ephemeral.VolumeClaimName(pod, &vol)
196 isEphemeral = true
197 default:
198
199
200
201
202
203 if err := pl.checkAttachableInlineVolume(logger, &vol, csiNode, pod, result); err != nil {
204 return err
205 }
206
207 continue
208 }
209
210 if pvcName == "" {
211 return fmt.Errorf("PersistentVolumeClaim had no name")
212 }
213
214 pvc, err := pl.pvcLister.PersistentVolumeClaims(pod.Namespace).Get(pvcName)
215
216 if err != nil {
217 if newPod {
218
219
220
221 return fmt.Errorf("looking up PVC %s/%s: %w", pod.Namespace, pvcName, err)
222 }
223
224
225 logger.V(5).Info("Unable to look up PVC info", "pod", klog.KObj(pod), "PVC", klog.KRef(pod.Namespace, pvcName))
226 continue
227 }
228
229
230 if isEphemeral {
231 if err := ephemeral.VolumeIsForPod(pod, pvc); err != nil {
232 return err
233 }
234 }
235
236 driverName, volumeHandle := pl.getCSIDriverInfo(logger, csiNode, pvc)
237 if driverName == "" || volumeHandle == "" {
238 logger.V(5).Info("Could not find a CSI driver name or volume handle, not counting volume")
239 continue
240 }
241
242 volumeUniqueName := fmt.Sprintf("%s/%s", driverName, volumeHandle)
243 volumeLimitKey := volumeutil.GetCSIAttachLimitKey(driverName)
244 result[volumeUniqueName] = volumeLimitKey
245 }
246 return nil
247 }
248
249
250
251 func (pl *CSILimits) checkAttachableInlineVolume(logger klog.Logger, vol *v1.Volume, csiNode *storagev1.CSINode,
252 pod *v1.Pod, result map[string]string) error {
253 if !pl.translator.IsInlineMigratable(vol) {
254 return nil
255 }
256
257 inTreeProvisionerName, err := pl.translator.GetInTreePluginNameFromSpec(nil, vol)
258 if err != nil {
259 return fmt.Errorf("looking up provisioner name for volume %s: %w", vol.Name, err)
260 }
261 if !isCSIMigrationOn(csiNode, inTreeProvisionerName) {
262 csiNodeName := ""
263 if csiNode != nil {
264 csiNodeName = csiNode.Name
265 }
266 logger.V(5).Info("CSI Migration is not enabled for provisioner", "provisioner", inTreeProvisionerName,
267 "pod", klog.KObj(pod), "csiNode", csiNodeName)
268 return nil
269 }
270
271 translatedPV, err := pl.translator.TranslateInTreeInlineVolumeToCSI(vol, pod.Namespace)
272 if err != nil || translatedPV == nil {
273 return fmt.Errorf("converting volume(%s) from inline to csi: %w", vol.Name, err)
274 }
275 driverName, err := pl.translator.GetCSINameFromInTreeName(inTreeProvisionerName)
276 if err != nil {
277 return fmt.Errorf("looking up CSI driver name for provisioner %s: %w", inTreeProvisionerName, err)
278 }
279
280
281 if translatedPV.Spec.PersistentVolumeSource.CSI == nil {
282 return nil
283 }
284 volumeUniqueName := fmt.Sprintf("%s/%s", driverName, translatedPV.Spec.PersistentVolumeSource.CSI.VolumeHandle)
285 volumeLimitKey := volumeutil.GetCSIAttachLimitKey(driverName)
286 result[volumeUniqueName] = volumeLimitKey
287 return nil
288 }
289
290
291
292
293 func (pl *CSILimits) getCSIDriverInfo(logger klog.Logger, csiNode *storagev1.CSINode, pvc *v1.PersistentVolumeClaim) (string, string) {
294 pvName := pvc.Spec.VolumeName
295
296 if pvName == "" {
297 logger.V(5).Info("Persistent volume had no name for claim", "PVC", klog.KObj(pvc))
298 return pl.getCSIDriverInfoFromSC(logger, csiNode, pvc)
299 }
300
301 pv, err := pl.pvLister.Get(pvName)
302 if err != nil {
303 logger.V(5).Info("Unable to look up PV info for PVC and PV", "PVC", klog.KObj(pvc), "PV", klog.KRef("", pvName))
304
305
306
307 return pl.getCSIDriverInfoFromSC(logger, csiNode, pvc)
308 }
309
310 csiSource := pv.Spec.PersistentVolumeSource.CSI
311 if csiSource == nil {
312
313 if !pl.translator.IsPVMigratable(pv) {
314 return "", ""
315 }
316
317 pluginName, err := pl.translator.GetInTreePluginNameFromSpec(pv, nil)
318 if err != nil {
319 logger.V(5).Info("Unable to look up plugin name from PV spec", "err", err)
320 return "", ""
321 }
322
323 if !isCSIMigrationOn(csiNode, pluginName) {
324 logger.V(5).Info("CSI Migration of plugin is not enabled", "plugin", pluginName)
325 return "", ""
326 }
327
328 csiPV, err := pl.translator.TranslateInTreePVToCSI(pv)
329 if err != nil {
330 logger.V(5).Info("Unable to translate in-tree volume to CSI", "err", err)
331 return "", ""
332 }
333
334 if csiPV.Spec.PersistentVolumeSource.CSI == nil {
335 logger.V(5).Info("Unable to get a valid volume source for translated PV", "PV", pvName)
336 return "", ""
337 }
338
339 csiSource = csiPV.Spec.PersistentVolumeSource.CSI
340 }
341
342 return csiSource.Driver, csiSource.VolumeHandle
343 }
344
345
346 func (pl *CSILimits) getCSIDriverInfoFromSC(logger klog.Logger, csiNode *storagev1.CSINode, pvc *v1.PersistentVolumeClaim) (string, string) {
347 namespace := pvc.Namespace
348 pvcName := pvc.Name
349 scName := storagehelpers.GetPersistentVolumeClaimClass(pvc)
350
351
352
353 if scName == "" {
354 logger.V(5).Info("PVC has no StorageClass", "PVC", klog.KObj(pvc))
355 return "", ""
356 }
357
358 storageClass, err := pl.scLister.Get(scName)
359 if err != nil {
360 logger.V(5).Info("Could not get StorageClass for PVC", "PVC", klog.KObj(pvc), "err", err)
361 return "", ""
362 }
363
364
365
366
367 volumeHandle := fmt.Sprintf("%s-%s/%s", pl.randomVolumeIDPrefix, namespace, pvcName)
368
369 provisioner := storageClass.Provisioner
370 if pl.translator.IsMigratableIntreePluginByName(provisioner) {
371 if !isCSIMigrationOn(csiNode, provisioner) {
372 logger.V(5).Info("CSI Migration of provisioner is not enabled", "provisioner", provisioner)
373 return "", ""
374 }
375
376 driverName, err := pl.translator.GetCSINameFromInTreeName(provisioner)
377 if err != nil {
378 logger.V(5).Info("Unable to look up driver name from provisioner name", "provisioner", provisioner, "err", err)
379 return "", ""
380 }
381 return driverName, volumeHandle
382 }
383
384 return provisioner, volumeHandle
385 }
386
387
388 func NewCSI(_ context.Context, _ runtime.Object, handle framework.Handle, fts feature.Features) (framework.Plugin, error) {
389 informerFactory := handle.SharedInformerFactory()
390 pvLister := informerFactory.Core().V1().PersistentVolumes().Lister()
391 pvcLister := informerFactory.Core().V1().PersistentVolumeClaims().Lister()
392 csiNodesLister := informerFactory.Storage().V1().CSINodes().Lister()
393 scLister := informerFactory.Storage().V1().StorageClasses().Lister()
394 csiTranslator := csitrans.New()
395
396 return &CSILimits{
397 csiNodeLister: csiNodesLister,
398 pvLister: pvLister,
399 pvcLister: pvcLister,
400 scLister: scLister,
401 randomVolumeIDPrefix: rand.String(32),
402 translator: csiTranslator,
403 }, nil
404 }
405
406 func getVolumeLimits(nodeInfo *framework.NodeInfo, csiNode *storagev1.CSINode) map[v1.ResourceName]int64 {
407
408 nodeVolumeLimits := volumeLimits(nodeInfo)
409 if csiNode != nil {
410 for i := range csiNode.Spec.Drivers {
411 d := csiNode.Spec.Drivers[i]
412 if d.Allocatable != nil && d.Allocatable.Count != nil {
413
414 k := v1.ResourceName(volumeutil.GetCSIAttachLimitKey(d.Name))
415 nodeVolumeLimits[k] = int64(*d.Allocatable.Count)
416 }
417 }
418 }
419 return nodeVolumeLimits
420 }
421
View as plain text