1
16
17 package volumebinding
18
19 import (
20 "context"
21 "errors"
22 "fmt"
23 "sync"
24 "time"
25
26 v1 "k8s.io/api/core/v1"
27 apierrors "k8s.io/apimachinery/pkg/api/errors"
28 "k8s.io/apimachinery/pkg/runtime"
29 corelisters "k8s.io/client-go/listers/core/v1"
30 "k8s.io/component-helpers/storage/ephemeral"
31 "k8s.io/klog/v2"
32 "k8s.io/kubernetes/pkg/scheduler/apis/config"
33 "k8s.io/kubernetes/pkg/scheduler/apis/config/validation"
34 "k8s.io/kubernetes/pkg/scheduler/framework"
35 "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
36 "k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper"
37 "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
38 )
39
40 const (
41 stateKey framework.StateKey = Name
42
43 maxUtilization = 100
44 )
45
46
47
48
49 type stateData struct {
50 allBound bool
51
52
53
54 podVolumesByNode map[string]*PodVolumes
55 podVolumeClaims *PodVolumeClaims
56
57
58 hasStaticBindings bool
59 sync.Mutex
60 }
61
62 func (d *stateData) Clone() framework.StateData {
63 return d
64 }
65
66
67
68
69 type VolumeBinding struct {
70 Binder SchedulerVolumeBinder
71 PVCLister corelisters.PersistentVolumeClaimLister
72 scorer volumeCapacityScorer
73 fts feature.Features
74 }
75
76 var _ framework.PreFilterPlugin = &VolumeBinding{}
77 var _ framework.FilterPlugin = &VolumeBinding{}
78 var _ framework.ReservePlugin = &VolumeBinding{}
79 var _ framework.PreBindPlugin = &VolumeBinding{}
80 var _ framework.PreScorePlugin = &VolumeBinding{}
81 var _ framework.ScorePlugin = &VolumeBinding{}
82 var _ framework.EnqueueExtensions = &VolumeBinding{}
83
84
85 const Name = names.VolumeBinding
86
87
88 func (pl *VolumeBinding) Name() string {
89 return Name
90 }
91
92
93
94 func (pl *VolumeBinding) EventsToRegister() []framework.ClusterEventWithHint {
95 events := []framework.ClusterEventWithHint{
96
97
98
99 {Event: framework.ClusterEvent{Resource: framework.StorageClass, ActionType: framework.Add | framework.Update}},
100
101 {Event: framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add | framework.Update}},
102 {Event: framework.ClusterEvent{Resource: framework.PersistentVolume, ActionType: framework.Add | framework.Update}},
103
104
105
106
107
108
109
110
111
112
113
114
115 {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel | framework.UpdateNodeTaint}},
116
117 {Event: framework.ClusterEvent{Resource: framework.CSINode, ActionType: framework.Add | framework.Update}},
118
119
120 {Event: framework.ClusterEvent{Resource: framework.CSIDriver, ActionType: framework.Add | framework.Update}},
121 {Event: framework.ClusterEvent{Resource: framework.CSIStorageCapacity, ActionType: framework.Add | framework.Update}},
122 }
123 return events
124 }
125
126
127
128
129 func (pl *VolumeBinding) podHasPVCs(pod *v1.Pod) (bool, error) {
130 hasPVC := false
131 for _, vol := range pod.Spec.Volumes {
132 var pvcName string
133 isEphemeral := false
134 switch {
135 case vol.PersistentVolumeClaim != nil:
136 pvcName = vol.PersistentVolumeClaim.ClaimName
137 case vol.Ephemeral != nil:
138 pvcName = ephemeral.VolumeClaimName(pod, &vol)
139 isEphemeral = true
140 default:
141
142 continue
143 }
144 hasPVC = true
145 pvc, err := pl.PVCLister.PersistentVolumeClaims(pod.Namespace).Get(pvcName)
146 if err != nil {
147
148
149
150 if isEphemeral && apierrors.IsNotFound(err) {
151 err = fmt.Errorf("waiting for ephemeral volume controller to create the persistentvolumeclaim %q", pvcName)
152 }
153 return hasPVC, err
154 }
155
156 if pvc.Status.Phase == v1.ClaimLost {
157 return hasPVC, fmt.Errorf("persistentvolumeclaim %q bound to non-existent persistentvolume %q", pvc.Name, pvc.Spec.VolumeName)
158 }
159
160 if pvc.DeletionTimestamp != nil {
161 return hasPVC, fmt.Errorf("persistentvolumeclaim %q is being deleted", pvc.Name)
162 }
163
164 if isEphemeral {
165 if err := ephemeral.VolumeIsForPod(pod, pvc); err != nil {
166 return hasPVC, err
167 }
168 }
169 }
170 return hasPVC, nil
171 }
172
173
174
175
176 func (pl *VolumeBinding) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
177 logger := klog.FromContext(ctx)
178
179 if hasPVC, err := pl.podHasPVCs(pod); err != nil {
180 return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, err.Error())
181 } else if !hasPVC {
182 state.Write(stateKey, &stateData{})
183 return nil, framework.NewStatus(framework.Skip)
184 }
185 podVolumeClaims, err := pl.Binder.GetPodVolumeClaims(logger, pod)
186 if err != nil {
187 return nil, framework.AsStatus(err)
188 }
189 if len(podVolumeClaims.unboundClaimsImmediate) > 0 {
190
191
192
193 status := framework.NewStatus(framework.UnschedulableAndUnresolvable)
194 status.AppendReason("pod has unbound immediate PersistentVolumeClaims")
195 return nil, status
196 }
197
198 var result *framework.PreFilterResult
199 if eligibleNodes := pl.Binder.GetEligibleNodes(logger, podVolumeClaims.boundClaims); eligibleNodes != nil {
200 result = &framework.PreFilterResult{
201 NodeNames: eligibleNodes,
202 }
203 }
204
205 state.Write(stateKey, &stateData{
206 podVolumesByNode: make(map[string]*PodVolumes),
207 podVolumeClaims: &PodVolumeClaims{
208 boundClaims: podVolumeClaims.boundClaims,
209 unboundClaimsDelayBinding: podVolumeClaims.unboundClaimsDelayBinding,
210 unboundVolumesDelayBinding: podVolumeClaims.unboundVolumesDelayBinding,
211 },
212 })
213 return result, nil
214 }
215
216
217 func (pl *VolumeBinding) PreFilterExtensions() framework.PreFilterExtensions {
218 return nil
219 }
220
221 func getStateData(cs *framework.CycleState) (*stateData, error) {
222 state, err := cs.Read(stateKey)
223 if err != nil {
224 return nil, err
225 }
226 s, ok := state.(*stateData)
227 if !ok {
228 return nil, errors.New("unable to convert state into stateData")
229 }
230 return s, nil
231 }
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248 func (pl *VolumeBinding) Filter(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
249 logger := klog.FromContext(ctx)
250 node := nodeInfo.Node()
251
252 state, err := getStateData(cs)
253 if err != nil {
254 return framework.AsStatus(err)
255 }
256
257 podVolumes, reasons, err := pl.Binder.FindPodVolumes(logger, pod, state.podVolumeClaims, node)
258
259 if err != nil {
260 return framework.AsStatus(err)
261 }
262
263 if len(reasons) > 0 {
264 status := framework.NewStatus(framework.UnschedulableAndUnresolvable)
265 for _, reason := range reasons {
266 status.AppendReason(string(reason))
267 }
268 return status
269 }
270
271
272 state.Lock()
273 state.podVolumesByNode[node.Name] = podVolumes
274 state.hasStaticBindings = state.hasStaticBindings || (podVolumes != nil && len(podVolumes.StaticBindings) > 0)
275 state.Unlock()
276 return nil
277 }
278
279
280 func (pl *VolumeBinding) PreScore(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodes []*framework.NodeInfo) *framework.Status {
281 if pl.scorer == nil {
282 return framework.NewStatus(framework.Skip)
283 }
284 state, err := getStateData(cs)
285 if err != nil {
286 return framework.AsStatus(err)
287 }
288 if state.hasStaticBindings {
289 return nil
290 }
291 return framework.NewStatus(framework.Skip)
292 }
293
294
295 func (pl *VolumeBinding) Score(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
296 if pl.scorer == nil {
297 return 0, nil
298 }
299 state, err := getStateData(cs)
300 if err != nil {
301 return 0, framework.AsStatus(err)
302 }
303 podVolumes, ok := state.podVolumesByNode[nodeName]
304 if !ok {
305 return 0, nil
306 }
307
308 classResources := make(classResourceMap)
309 for _, staticBinding := range podVolumes.StaticBindings {
310 class := staticBinding.StorageClassName()
311 storageResource := staticBinding.StorageResource()
312 if _, ok := classResources[class]; !ok {
313 classResources[class] = &StorageResource{
314 Requested: 0,
315 Capacity: 0,
316 }
317 }
318 classResources[class].Requested += storageResource.Requested
319 classResources[class].Capacity += storageResource.Capacity
320 }
321 return pl.scorer(classResources), nil
322 }
323
324
325 func (pl *VolumeBinding) ScoreExtensions() framework.ScoreExtensions {
326 return nil
327 }
328
329
330 func (pl *VolumeBinding) Reserve(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
331 state, err := getStateData(cs)
332 if err != nil {
333 return framework.AsStatus(err)
334 }
335
336 podVolumes, ok := state.podVolumesByNode[nodeName]
337 if ok {
338 allBound, err := pl.Binder.AssumePodVolumes(klog.FromContext(ctx), pod, nodeName, podVolumes)
339 if err != nil {
340 return framework.AsStatus(err)
341 }
342 state.allBound = allBound
343 } else {
344
345 state.allBound = true
346 }
347 return nil
348 }
349
350
351
352
353
354
355 func (pl *VolumeBinding) PreBind(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
356 s, err := getStateData(cs)
357 if err != nil {
358 return framework.AsStatus(err)
359 }
360 if s.allBound {
361
362 return nil
363 }
364
365 podVolumes, ok := s.podVolumesByNode[nodeName]
366 if !ok {
367 return framework.AsStatus(fmt.Errorf("no pod volumes found for node %q", nodeName))
368 }
369 logger := klog.FromContext(ctx)
370 logger.V(5).Info("Trying to bind volumes for pod", "pod", klog.KObj(pod))
371 err = pl.Binder.BindPodVolumes(ctx, pod, podVolumes)
372 if err != nil {
373 logger.V(5).Info("Failed to bind volumes for pod", "pod", klog.KObj(pod), "err", err)
374 return framework.AsStatus(err)
375 }
376 logger.V(5).Info("Success binding volumes for pod", "pod", klog.KObj(pod))
377 return nil
378 }
379
380
381
382 func (pl *VolumeBinding) Unreserve(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) {
383 s, err := getStateData(cs)
384 if err != nil {
385 return
386 }
387
388 podVolumes, ok := s.podVolumesByNode[nodeName]
389 if !ok {
390 return
391 }
392 pl.Binder.RevertAssumedPodVolumes(podVolumes)
393 }
394
395
396 func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts feature.Features) (framework.Plugin, error) {
397 args, ok := plArgs.(*config.VolumeBindingArgs)
398 if !ok {
399 return nil, fmt.Errorf("want args to be of type VolumeBindingArgs, got %T", plArgs)
400 }
401 if err := validation.ValidateVolumeBindingArgsWithOptions(nil, args, validation.VolumeBindingArgsValidationOptions{
402 AllowVolumeCapacityPriority: fts.EnableVolumeCapacityPriority,
403 }); err != nil {
404 return nil, err
405 }
406 podInformer := fh.SharedInformerFactory().Core().V1().Pods()
407 nodeInformer := fh.SharedInformerFactory().Core().V1().Nodes()
408 pvcInformer := fh.SharedInformerFactory().Core().V1().PersistentVolumeClaims()
409 pvInformer := fh.SharedInformerFactory().Core().V1().PersistentVolumes()
410 storageClassInformer := fh.SharedInformerFactory().Storage().V1().StorageClasses()
411 csiNodeInformer := fh.SharedInformerFactory().Storage().V1().CSINodes()
412 capacityCheck := CapacityCheck{
413 CSIDriverInformer: fh.SharedInformerFactory().Storage().V1().CSIDrivers(),
414 CSIStorageCapacityInformer: fh.SharedInformerFactory().Storage().V1().CSIStorageCapacities(),
415 }
416 binder := NewVolumeBinder(klog.FromContext(ctx), fh.ClientSet(), podInformer, nodeInformer, csiNodeInformer, pvcInformer, pvInformer, storageClassInformer, capacityCheck, time.Duration(args.BindTimeoutSeconds)*time.Second)
417
418
419 var scorer volumeCapacityScorer
420 if fts.EnableVolumeCapacityPriority {
421 shape := make(helper.FunctionShape, 0, len(args.Shape))
422 for _, point := range args.Shape {
423 shape = append(shape, helper.FunctionShapePoint{
424 Utilization: int64(point.Utilization),
425 Score: int64(point.Score) * (framework.MaxNodeScore / config.MaxCustomPriorityScore),
426 })
427 }
428 scorer = buildScorerFunction(shape)
429 }
430 return &VolumeBinding{
431 Binder: binder,
432 PVCLister: pvcInformer.Lister(),
433 scorer: scorer,
434 fts: fts,
435 }, nil
436 }
437
View as plain text