1
16
17 package util
18
19 import (
20 "context"
21 "encoding/json"
22 "fmt"
23
24 v1 "k8s.io/api/core/v1"
25 "k8s.io/apimachinery/pkg/api/meta"
26 "k8s.io/apimachinery/pkg/api/resource"
27 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28 "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
29 "k8s.io/apimachinery/pkg/types"
30 "k8s.io/apimachinery/pkg/util/strategicpatch"
31 utilfeature "k8s.io/apiserver/pkg/util/feature"
32 clientset "k8s.io/client-go/kubernetes"
33 "k8s.io/kubernetes/pkg/features"
34 "k8s.io/kubernetes/pkg/volume"
35 volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
36 "k8s.io/mount-utils"
37 )
38
39 var (
40 knownResizeConditions = map[v1.PersistentVolumeClaimConditionType]bool{
41 v1.PersistentVolumeClaimFileSystemResizePending: true,
42 v1.PersistentVolumeClaimResizing: true,
43 }
44
45
46
47
48
49 AnnPreResizeCapacity = "volume.alpha.kubernetes.io/pre-resize-capacity"
50 )
51
52 type resizeProcessStatus struct {
53 condition v1.PersistentVolumeClaimCondition
54 processed bool
55 }
56
57
58 func UpdatePVSize(
59 pv *v1.PersistentVolume,
60 newSize resource.Quantity,
61 kubeClient clientset.Interface) (*v1.PersistentVolume, error) {
62 pvClone := pv.DeepCopy()
63 pvClone.Spec.Capacity[v1.ResourceStorage] = newSize
64
65 return PatchPV(pv, pvClone, kubeClient)
66 }
67
68
69 func AddAnnPreResizeCapacity(
70 pv *v1.PersistentVolume,
71 oldCapacity resource.Quantity,
72 kubeClient clientset.Interface) error {
73
74 if metav1.HasAnnotation(pv.ObjectMeta, AnnPreResizeCapacity) {
75 return nil
76 }
77
78 pvClone := pv.DeepCopy()
79 if pvClone.ObjectMeta.Annotations == nil {
80 pvClone.ObjectMeta.Annotations = make(map[string]string)
81 }
82 pvClone.ObjectMeta.Annotations[AnnPreResizeCapacity] = oldCapacity.String()
83
84 _, err := PatchPV(pv, pvClone, kubeClient)
85 return err
86 }
87
88
89 func DeleteAnnPreResizeCapacity(
90 pv *v1.PersistentVolume,
91 kubeClient clientset.Interface) error {
92
93 if !metav1.HasAnnotation(pv.ObjectMeta, AnnPreResizeCapacity) {
94 return nil
95 }
96 pvClone := pv.DeepCopy()
97 delete(pvClone.ObjectMeta.Annotations, AnnPreResizeCapacity)
98 _, err := PatchPV(pv, pvClone, kubeClient)
99 return err
100 }
101
102
103 func PatchPV(
104 oldPV *v1.PersistentVolume,
105 newPV *v1.PersistentVolume,
106 kubeClient clientset.Interface) (*v1.PersistentVolume, error) {
107 oldData, err := json.Marshal(oldPV)
108 if err != nil {
109 return oldPV, fmt.Errorf("unexpected error marshaling old PV %q with error : %v", oldPV.Name, err)
110 }
111
112 newData, err := json.Marshal(newPV)
113 if err != nil {
114 return oldPV, fmt.Errorf("unexpected error marshaling new PV %q with error : %v", newPV.Name, err)
115 }
116
117 patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, oldPV)
118 if err != nil {
119 return oldPV, fmt.Errorf("error Creating two way merge patch for PV %q with error : %v", oldPV.Name, err)
120 }
121
122 updatedPV, err := kubeClient.CoreV1().PersistentVolumes().Patch(context.TODO(), oldPV.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
123 if err != nil {
124 return oldPV, fmt.Errorf("error Patching PV %q with error : %v", oldPV.Name, err)
125 }
126 return updatedPV, nil
127 }
128
129
130
131 func MarkResizeInProgressWithResizer(
132 pvc *v1.PersistentVolumeClaim,
133 resizerName string,
134 kubeClient clientset.Interface) (*v1.PersistentVolumeClaim, error) {
135
136 progressCondition := v1.PersistentVolumeClaimCondition{
137 Type: v1.PersistentVolumeClaimResizing,
138 Status: v1.ConditionTrue,
139 LastTransitionTime: metav1.Now(),
140 }
141 conditions := []v1.PersistentVolumeClaimCondition{progressCondition}
142 newPVC := pvc.DeepCopy()
143 newPVC = MergeResizeConditionOnPVC(newPVC, conditions)
144 newPVC = setResizer(newPVC, resizerName)
145 return PatchPVCStatus(pvc , newPVC, kubeClient)
146 }
147
148 func MarkControllerReisizeInProgress(pvc *v1.PersistentVolumeClaim, resizerName string, newSize resource.Quantity, kubeClient clientset.Interface) (*v1.PersistentVolumeClaim, error) {
149
150 progressCondition := v1.PersistentVolumeClaimCondition{
151 Type: v1.PersistentVolumeClaimResizing,
152 Status: v1.ConditionTrue,
153 LastTransitionTime: metav1.Now(),
154 }
155 conditions := []v1.PersistentVolumeClaimCondition{progressCondition}
156 newPVC := pvc.DeepCopy()
157 newPVC = MergeResizeConditionOnPVC(newPVC, conditions)
158 newPVC = mergeStorageResourceStatus(newPVC, v1.PersistentVolumeClaimControllerResizeInProgress)
159 newPVC = mergeStorageAllocatedResources(newPVC, newSize)
160 newPVC = setResizer(newPVC, resizerName)
161 return PatchPVCStatus(pvc , newPVC, kubeClient)
162 }
163
164
165 func SetClaimResizer(
166 pvc *v1.PersistentVolumeClaim,
167 resizerName string,
168 kubeClient clientset.Interface) (*v1.PersistentVolumeClaim, error) {
169 newPVC := pvc.DeepCopy()
170 newPVC = setResizer(newPVC, resizerName)
171 return PatchPVCStatus(pvc , newPVC, kubeClient)
172 }
173
174 func setResizer(pvc *v1.PersistentVolumeClaim, resizerName string) *v1.PersistentVolumeClaim {
175 if val, ok := pvc.Annotations[volumetypes.VolumeResizerKey]; ok && val == resizerName {
176 return pvc
177 }
178 metav1.SetMetaDataAnnotation(&pvc.ObjectMeta, volumetypes.VolumeResizerKey, resizerName)
179 return pvc
180 }
181
182
183 func MarkForFSResize(
184 pvc *v1.PersistentVolumeClaim,
185 kubeClient clientset.Interface) (*v1.PersistentVolumeClaim, error) {
186 pvcCondition := v1.PersistentVolumeClaimCondition{
187 Type: v1.PersistentVolumeClaimFileSystemResizePending,
188 Status: v1.ConditionTrue,
189 LastTransitionTime: metav1.Now(),
190 Message: "Waiting for user to (re-)start a pod to finish file system resize of volume on node.",
191 }
192 conditions := []v1.PersistentVolumeClaimCondition{pvcCondition}
193 newPVC := pvc.DeepCopy()
194
195 if utilfeature.DefaultFeatureGate.Enabled(features.RecoverVolumeExpansionFailure) {
196 newPVC = mergeStorageResourceStatus(newPVC, v1.PersistentVolumeClaimNodeResizePending)
197 }
198
199 newPVC = MergeResizeConditionOnPVC(newPVC, conditions)
200 updatedPVC, err := PatchPVCStatus(pvc , newPVC, kubeClient)
201 return updatedPVC, err
202 }
203
204
205 func MarkResizeFinished(
206 pvc *v1.PersistentVolumeClaim,
207 newSize resource.Quantity,
208 kubeClient clientset.Interface) (*v1.PersistentVolumeClaim, error) {
209 return MarkFSResizeFinished(pvc, newSize, kubeClient)
210 }
211
212
213 func MarkFSResizeFinished(
214 pvc *v1.PersistentVolumeClaim,
215 newSize resource.Quantity,
216 kubeClient clientset.Interface) (*v1.PersistentVolumeClaim, error) {
217 newPVC := pvc.DeepCopy()
218
219 newPVC.Status.Capacity[v1.ResourceStorage] = newSize
220
221
222 if utilfeature.DefaultFeatureGate.Enabled(features.RecoverVolumeExpansionFailure) {
223 allocatedResourceStatusMap := newPVC.Status.AllocatedResourceStatuses
224 delete(allocatedResourceStatusMap, v1.ResourceStorage)
225 if len(allocatedResourceStatusMap) == 0 {
226 newPVC.Status.AllocatedResourceStatuses = nil
227 } else {
228 newPVC.Status.AllocatedResourceStatuses = allocatedResourceStatusMap
229 }
230 }
231
232 newPVC = MergeResizeConditionOnPVC(newPVC, []v1.PersistentVolumeClaimCondition{})
233 updatedPVC, err := PatchPVCStatus(pvc , newPVC, kubeClient)
234 return updatedPVC, err
235 }
236
237
238
239 func MarkNodeExpansionFailed(pvc *v1.PersistentVolumeClaim, kubeClient clientset.Interface) (*v1.PersistentVolumeClaim, error) {
240 newPVC := pvc.DeepCopy()
241 newPVC = mergeStorageResourceStatus(newPVC, v1.PersistentVolumeClaimNodeResizeFailed)
242
243 patchBytes, err := createPVCPatch(pvc, newPVC, false )
244 if err != nil {
245 return pvc, fmt.Errorf("patchPVCStatus failed to patch PVC %q: %v", pvc.Name, err)
246 }
247
248 updatedClaim, updateErr := kubeClient.CoreV1().PersistentVolumeClaims(pvc.Namespace).
249 Patch(context.TODO(), pvc.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status")
250 if updateErr != nil {
251 return pvc, fmt.Errorf("patchPVCStatus failed to patch PVC %q: %v", pvc.Name, updateErr)
252 }
253 return updatedClaim, nil
254 }
255
256
257 func MarkNodeExpansionInProgress(pvc *v1.PersistentVolumeClaim, kubeClient clientset.Interface) (*v1.PersistentVolumeClaim, error) {
258 newPVC := pvc.DeepCopy()
259 newPVC = mergeStorageResourceStatus(newPVC, v1.PersistentVolumeClaimNodeResizeInProgress)
260 updatedPVC, err := PatchPVCStatus(pvc , newPVC, kubeClient)
261 return updatedPVC, err
262 }
263
264
265
266
267
268 func PatchPVCStatus(
269 oldPVC *v1.PersistentVolumeClaim,
270 newPVC *v1.PersistentVolumeClaim,
271 kubeClient clientset.Interface) (*v1.PersistentVolumeClaim, error) {
272 patchBytes, err := createPVCPatch(oldPVC, newPVC, true )
273 if err != nil {
274 return oldPVC, fmt.Errorf("patchPVCStatus failed to patch PVC %q: %v", oldPVC.Name, err)
275 }
276
277 updatedClaim, updateErr := kubeClient.CoreV1().PersistentVolumeClaims(oldPVC.Namespace).
278 Patch(context.TODO(), oldPVC.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status")
279 if updateErr != nil {
280 return oldPVC, fmt.Errorf("patchPVCStatus failed to patch PVC %q: %v", oldPVC.Name, updateErr)
281 }
282 return updatedClaim, nil
283 }
284
285 func createPVCPatch(
286 oldPVC *v1.PersistentVolumeClaim,
287 newPVC *v1.PersistentVolumeClaim, addResourceVersionCheck bool) ([]byte, error) {
288 oldData, err := json.Marshal(oldPVC)
289 if err != nil {
290 return nil, fmt.Errorf("failed to marshal old data: %v", err)
291 }
292
293 newData, err := json.Marshal(newPVC)
294 if err != nil {
295 return nil, fmt.Errorf("failed to marshal new data: %v", err)
296 }
297
298 patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, oldPVC)
299 if err != nil {
300 return nil, fmt.Errorf("failed to create 2 way merge patch: %v", err)
301 }
302
303 if addResourceVersionCheck {
304 patchBytes, err = addResourceVersion(patchBytes, oldPVC.ResourceVersion)
305 if err != nil {
306 return nil, fmt.Errorf("failed to add resource version: %v", err)
307 }
308 }
309
310 return patchBytes, nil
311 }
312
313 func addResourceVersion(patchBytes []byte, resourceVersion string) ([]byte, error) {
314 var patchMap map[string]interface{}
315 err := json.Unmarshal(patchBytes, &patchMap)
316 if err != nil {
317 return nil, fmt.Errorf("error unmarshalling patch: %v", err)
318 }
319 u := unstructured.Unstructured{Object: patchMap}
320 a, err := meta.Accessor(&u)
321 if err != nil {
322 return nil, fmt.Errorf("error creating accessor: %v", err)
323 }
324 a.SetResourceVersion(resourceVersion)
325 versionBytes, err := json.Marshal(patchMap)
326 if err != nil {
327 return nil, fmt.Errorf("error marshalling json patch: %v", err)
328 }
329 return versionBytes, nil
330 }
331
332
333
334 func MergeResizeConditionOnPVC(
335 pvc *v1.PersistentVolumeClaim,
336 resizeConditions []v1.PersistentVolumeClaimCondition) *v1.PersistentVolumeClaim {
337 resizeConditionMap := map[v1.PersistentVolumeClaimConditionType]*resizeProcessStatus{}
338
339 for _, condition := range resizeConditions {
340 resizeConditionMap[condition.Type] = &resizeProcessStatus{condition, false}
341 }
342
343 oldConditions := pvc.Status.Conditions
344 newConditions := []v1.PersistentVolumeClaimCondition{}
345 for _, condition := range oldConditions {
346
347 if _, ok := knownResizeConditions[condition.Type]; !ok {
348 newConditions = append(newConditions, condition)
349 continue
350 }
351
352 if newCondition, ok := resizeConditionMap[condition.Type]; ok {
353 if newCondition.condition.Status != condition.Status {
354 newConditions = append(newConditions, newCondition.condition)
355 } else {
356 newConditions = append(newConditions, condition)
357 }
358 newCondition.processed = true
359 }
360 }
361
362
363 for _, newCondition := range resizeConditionMap {
364 if !newCondition.processed {
365 newConditions = append(newConditions, newCondition.condition)
366 }
367 }
368 pvc.Status.Conditions = newConditions
369 return pvc
370 }
371
372 func mergeStorageResourceStatus(pvc *v1.PersistentVolumeClaim, status v1.ClaimResourceStatus) *v1.PersistentVolumeClaim {
373 allocatedResourceStatusMap := pvc.Status.AllocatedResourceStatuses
374 if allocatedResourceStatusMap == nil {
375 pvc.Status.AllocatedResourceStatuses = map[v1.ResourceName]v1.ClaimResourceStatus{
376 v1.ResourceStorage: status,
377 }
378 return pvc
379 }
380 allocatedResourceStatusMap[v1.ResourceStorage] = status
381 pvc.Status.AllocatedResourceStatuses = allocatedResourceStatusMap
382 return pvc
383 }
384
385 func mergeStorageAllocatedResources(pvc *v1.PersistentVolumeClaim, size resource.Quantity) *v1.PersistentVolumeClaim {
386 allocatedResourcesMap := pvc.Status.AllocatedResources
387 if allocatedResourcesMap == nil {
388 pvc.Status.AllocatedResources = map[v1.ResourceName]resource.Quantity{
389 v1.ResourceStorage: size,
390 }
391 return pvc
392 }
393 allocatedResourcesMap[v1.ResourceStorage] = size
394 pvc.Status.AllocatedResources = allocatedResourcesMap
395 return pvc
396 }
397
398
399 func GenericResizeFS(host volume.VolumeHost, pluginName, devicePath, deviceMountPath string) (bool, error) {
400 resizer := mount.NewResizeFs(host.GetExec(pluginName))
401 return resizer.Resize(devicePath, deviceMountPath)
402 }
403
View as plain text