...

Source file src/k8s.io/kubernetes/pkg/volume/util/resize_util.go

Documentation: k8s.io/kubernetes/pkg/volume/util

     1  /*
     2  Copyright 2018 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package util
    18  
    19  import (
    20  	"context"
    21  	"encoding/json"
    22  	"fmt"
    23  
    24  	v1 "k8s.io/api/core/v1"
    25  	"k8s.io/apimachinery/pkg/api/meta"
    26  	"k8s.io/apimachinery/pkg/api/resource"
    27  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    28  	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
    29  	"k8s.io/apimachinery/pkg/types"
    30  	"k8s.io/apimachinery/pkg/util/strategicpatch"
    31  	utilfeature "k8s.io/apiserver/pkg/util/feature"
    32  	clientset "k8s.io/client-go/kubernetes"
    33  	"k8s.io/kubernetes/pkg/features"
    34  	"k8s.io/kubernetes/pkg/volume"
    35  	volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
    36  	"k8s.io/mount-utils"
    37  )
    38  
    39  var (
    40  	knownResizeConditions = map[v1.PersistentVolumeClaimConditionType]bool{
    41  		v1.PersistentVolumeClaimFileSystemResizePending: true,
    42  		v1.PersistentVolumeClaimResizing:                true,
    43  	}
    44  
    45  	// AnnPreResizeCapacity annotation is added to a PV when expanding volume.
    46  	// Its value is status capacity of the PVC prior to the volume expansion
    47  	// Its value will be set by the external-resizer when it deems that filesystem resize is required after resizing volume.
    48  	// Its value will be used by pv_controller to determine pvc's status capacity when binding pvc and pv.
    49  	AnnPreResizeCapacity = "volume.alpha.kubernetes.io/pre-resize-capacity"
    50  )
    51  
    52  type resizeProcessStatus struct {
    53  	condition v1.PersistentVolumeClaimCondition
    54  	processed bool
    55  }
    56  
    57  // UpdatePVSize updates just pv size after cloudprovider resizing is successful
    58  func UpdatePVSize(
    59  	pv *v1.PersistentVolume,
    60  	newSize resource.Quantity,
    61  	kubeClient clientset.Interface) (*v1.PersistentVolume, error) {
    62  	pvClone := pv.DeepCopy()
    63  	pvClone.Spec.Capacity[v1.ResourceStorage] = newSize
    64  
    65  	return PatchPV(pv, pvClone, kubeClient)
    66  }
    67  
    68  // AddAnnPreResizeCapacity adds volume.alpha.kubernetes.io/pre-resize-capacity from the pv
    69  func AddAnnPreResizeCapacity(
    70  	pv *v1.PersistentVolume,
    71  	oldCapacity resource.Quantity,
    72  	kubeClient clientset.Interface) error {
    73  	// if the pv already has a resize annotation skip the process
    74  	if metav1.HasAnnotation(pv.ObjectMeta, AnnPreResizeCapacity) {
    75  		return nil
    76  	}
    77  
    78  	pvClone := pv.DeepCopy()
    79  	if pvClone.ObjectMeta.Annotations == nil {
    80  		pvClone.ObjectMeta.Annotations = make(map[string]string)
    81  	}
    82  	pvClone.ObjectMeta.Annotations[AnnPreResizeCapacity] = oldCapacity.String()
    83  
    84  	_, err := PatchPV(pv, pvClone, kubeClient)
    85  	return err
    86  }
    87  
    88  // DeleteAnnPreResizeCapacity deletes volume.alpha.kubernetes.io/pre-resize-capacity from the pv
    89  func DeleteAnnPreResizeCapacity(
    90  	pv *v1.PersistentVolume,
    91  	kubeClient clientset.Interface) error {
    92  	// if the pv does not have a resize annotation skip the entire process
    93  	if !metav1.HasAnnotation(pv.ObjectMeta, AnnPreResizeCapacity) {
    94  		return nil
    95  	}
    96  	pvClone := pv.DeepCopy()
    97  	delete(pvClone.ObjectMeta.Annotations, AnnPreResizeCapacity)
    98  	_, err := PatchPV(pv, pvClone, kubeClient)
    99  	return err
   100  }
   101  
   102  // PatchPV creates and executes a patch for pv
   103  func PatchPV(
   104  	oldPV *v1.PersistentVolume,
   105  	newPV *v1.PersistentVolume,
   106  	kubeClient clientset.Interface) (*v1.PersistentVolume, error) {
   107  	oldData, err := json.Marshal(oldPV)
   108  	if err != nil {
   109  		return oldPV, fmt.Errorf("unexpected error marshaling old PV %q with error : %v", oldPV.Name, err)
   110  	}
   111  
   112  	newData, err := json.Marshal(newPV)
   113  	if err != nil {
   114  		return oldPV, fmt.Errorf("unexpected error marshaling new PV %q with error : %v", newPV.Name, err)
   115  	}
   116  
   117  	patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, oldPV)
   118  	if err != nil {
   119  		return oldPV, fmt.Errorf("error Creating two way merge patch for PV %q with error : %v", oldPV.Name, err)
   120  	}
   121  
   122  	updatedPV, err := kubeClient.CoreV1().PersistentVolumes().Patch(context.TODO(), oldPV.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
   123  	if err != nil {
   124  		return oldPV, fmt.Errorf("error Patching PV %q with error : %v", oldPV.Name, err)
   125  	}
   126  	return updatedPV, nil
   127  }
   128  
   129  // MarkResizeInProgressWithResizer marks cloudprovider resizing as in progress
   130  // and also annotates the PVC with the name of the resizer.
   131  func MarkResizeInProgressWithResizer(
   132  	pvc *v1.PersistentVolumeClaim,
   133  	resizerName string,
   134  	kubeClient clientset.Interface) (*v1.PersistentVolumeClaim, error) {
   135  	// Mark PVC as Resize Started
   136  	progressCondition := v1.PersistentVolumeClaimCondition{
   137  		Type:               v1.PersistentVolumeClaimResizing,
   138  		Status:             v1.ConditionTrue,
   139  		LastTransitionTime: metav1.Now(),
   140  	}
   141  	conditions := []v1.PersistentVolumeClaimCondition{progressCondition}
   142  	newPVC := pvc.DeepCopy()
   143  	newPVC = MergeResizeConditionOnPVC(newPVC, conditions)
   144  	newPVC = setResizer(newPVC, resizerName)
   145  	return PatchPVCStatus(pvc /*oldPVC*/, newPVC, kubeClient)
   146  }
   147  
   148  func MarkControllerReisizeInProgress(pvc *v1.PersistentVolumeClaim, resizerName string, newSize resource.Quantity, kubeClient clientset.Interface) (*v1.PersistentVolumeClaim, error) {
   149  	// Mark PVC as Resize Started
   150  	progressCondition := v1.PersistentVolumeClaimCondition{
   151  		Type:               v1.PersistentVolumeClaimResizing,
   152  		Status:             v1.ConditionTrue,
   153  		LastTransitionTime: metav1.Now(),
   154  	}
   155  	conditions := []v1.PersistentVolumeClaimCondition{progressCondition}
   156  	newPVC := pvc.DeepCopy()
   157  	newPVC = MergeResizeConditionOnPVC(newPVC, conditions)
   158  	newPVC = mergeStorageResourceStatus(newPVC, v1.PersistentVolumeClaimControllerResizeInProgress)
   159  	newPVC = mergeStorageAllocatedResources(newPVC, newSize)
   160  	newPVC = setResizer(newPVC, resizerName)
   161  	return PatchPVCStatus(pvc /*oldPVC*/, newPVC, kubeClient)
   162  }
   163  
   164  // SetClaimResizer sets resizer annotation on PVC
   165  func SetClaimResizer(
   166  	pvc *v1.PersistentVolumeClaim,
   167  	resizerName string,
   168  	kubeClient clientset.Interface) (*v1.PersistentVolumeClaim, error) {
   169  	newPVC := pvc.DeepCopy()
   170  	newPVC = setResizer(newPVC, resizerName)
   171  	return PatchPVCStatus(pvc /*oldPVC*/, newPVC, kubeClient)
   172  }
   173  
   174  func setResizer(pvc *v1.PersistentVolumeClaim, resizerName string) *v1.PersistentVolumeClaim {
   175  	if val, ok := pvc.Annotations[volumetypes.VolumeResizerKey]; ok && val == resizerName {
   176  		return pvc
   177  	}
   178  	metav1.SetMetaDataAnnotation(&pvc.ObjectMeta, volumetypes.VolumeResizerKey, resizerName)
   179  	return pvc
   180  }
   181  
   182  // MarkForFSResize marks file system resizing as pending
   183  func MarkForFSResize(
   184  	pvc *v1.PersistentVolumeClaim,
   185  	kubeClient clientset.Interface) (*v1.PersistentVolumeClaim, error) {
   186  	pvcCondition := v1.PersistentVolumeClaimCondition{
   187  		Type:               v1.PersistentVolumeClaimFileSystemResizePending,
   188  		Status:             v1.ConditionTrue,
   189  		LastTransitionTime: metav1.Now(),
   190  		Message:            "Waiting for user to (re-)start a pod to finish file system resize of volume on node.",
   191  	}
   192  	conditions := []v1.PersistentVolumeClaimCondition{pvcCondition}
   193  	newPVC := pvc.DeepCopy()
   194  
   195  	if utilfeature.DefaultFeatureGate.Enabled(features.RecoverVolumeExpansionFailure) {
   196  		newPVC = mergeStorageResourceStatus(newPVC, v1.PersistentVolumeClaimNodeResizePending)
   197  	}
   198  
   199  	newPVC = MergeResizeConditionOnPVC(newPVC, conditions)
   200  	updatedPVC, err := PatchPVCStatus(pvc /*oldPVC*/, newPVC, kubeClient)
   201  	return updatedPVC, err
   202  }
   203  
   204  // MarkResizeFinished marks all resizing as done
   205  func MarkResizeFinished(
   206  	pvc *v1.PersistentVolumeClaim,
   207  	newSize resource.Quantity,
   208  	kubeClient clientset.Interface) (*v1.PersistentVolumeClaim, error) {
   209  	return MarkFSResizeFinished(pvc, newSize, kubeClient)
   210  }
   211  
   212  // MarkFSResizeFinished marks file system resizing as done
   213  func MarkFSResizeFinished(
   214  	pvc *v1.PersistentVolumeClaim,
   215  	newSize resource.Quantity,
   216  	kubeClient clientset.Interface) (*v1.PersistentVolumeClaim, error) {
   217  	newPVC := pvc.DeepCopy()
   218  
   219  	newPVC.Status.Capacity[v1.ResourceStorage] = newSize
   220  
   221  	// if RecoverVolumeExpansionFailure is enabled, we need to reset ResizeStatus back to nil
   222  	if utilfeature.DefaultFeatureGate.Enabled(features.RecoverVolumeExpansionFailure) {
   223  		allocatedResourceStatusMap := newPVC.Status.AllocatedResourceStatuses
   224  		delete(allocatedResourceStatusMap, v1.ResourceStorage)
   225  		if len(allocatedResourceStatusMap) == 0 {
   226  			newPVC.Status.AllocatedResourceStatuses = nil
   227  		} else {
   228  			newPVC.Status.AllocatedResourceStatuses = allocatedResourceStatusMap
   229  		}
   230  	}
   231  
   232  	newPVC = MergeResizeConditionOnPVC(newPVC, []v1.PersistentVolumeClaimCondition{})
   233  	updatedPVC, err := PatchPVCStatus(pvc /*oldPVC*/, newPVC, kubeClient)
   234  	return updatedPVC, err
   235  }
   236  
   237  // MarkNodeExpansionFailed marks a PVC for node expansion as failed. Kubelet should not retry expansion
   238  // of volumes which are in failed state.
   239  func MarkNodeExpansionFailed(pvc *v1.PersistentVolumeClaim, kubeClient clientset.Interface) (*v1.PersistentVolumeClaim, error) {
   240  	newPVC := pvc.DeepCopy()
   241  	newPVC = mergeStorageResourceStatus(newPVC, v1.PersistentVolumeClaimNodeResizeFailed)
   242  
   243  	patchBytes, err := createPVCPatch(pvc, newPVC, false /* addResourceVersionCheck */)
   244  	if err != nil {
   245  		return pvc, fmt.Errorf("patchPVCStatus failed to patch PVC %q: %v", pvc.Name, err)
   246  	}
   247  
   248  	updatedClaim, updateErr := kubeClient.CoreV1().PersistentVolumeClaims(pvc.Namespace).
   249  		Patch(context.TODO(), pvc.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status")
   250  	if updateErr != nil {
   251  		return pvc, fmt.Errorf("patchPVCStatus failed to patch PVC %q: %v", pvc.Name, updateErr)
   252  	}
   253  	return updatedClaim, nil
   254  }
   255  
   256  // MarkNodeExpansionInProgress marks pvc expansion in progress on node
   257  func MarkNodeExpansionInProgress(pvc *v1.PersistentVolumeClaim, kubeClient clientset.Interface) (*v1.PersistentVolumeClaim, error) {
   258  	newPVC := pvc.DeepCopy()
   259  	newPVC = mergeStorageResourceStatus(newPVC, v1.PersistentVolumeClaimNodeResizeInProgress)
   260  	updatedPVC, err := PatchPVCStatus(pvc /* oldPVC */, newPVC, kubeClient)
   261  	return updatedPVC, err
   262  }
   263  
   264  // PatchPVCStatus updates PVC status using PATCH verb
   265  // Don't use Update because this can be called from kubelet and if kubelet has an older client its
   266  // Updates will overwrite new fields. And to avoid writing to a stale object, add ResourceVersion
   267  // to the patch so that Patch will fail if the patch's RV != actual up-to-date RV like Update would
   268  func PatchPVCStatus(
   269  	oldPVC *v1.PersistentVolumeClaim,
   270  	newPVC *v1.PersistentVolumeClaim,
   271  	kubeClient clientset.Interface) (*v1.PersistentVolumeClaim, error) {
   272  	patchBytes, err := createPVCPatch(oldPVC, newPVC, true /* addResourceVersionCheck */)
   273  	if err != nil {
   274  		return oldPVC, fmt.Errorf("patchPVCStatus failed to patch PVC %q: %v", oldPVC.Name, err)
   275  	}
   276  
   277  	updatedClaim, updateErr := kubeClient.CoreV1().PersistentVolumeClaims(oldPVC.Namespace).
   278  		Patch(context.TODO(), oldPVC.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status")
   279  	if updateErr != nil {
   280  		return oldPVC, fmt.Errorf("patchPVCStatus failed to patch PVC %q: %v", oldPVC.Name, updateErr)
   281  	}
   282  	return updatedClaim, nil
   283  }
   284  
   285  func createPVCPatch(
   286  	oldPVC *v1.PersistentVolumeClaim,
   287  	newPVC *v1.PersistentVolumeClaim, addResourceVersionCheck bool) ([]byte, error) {
   288  	oldData, err := json.Marshal(oldPVC)
   289  	if err != nil {
   290  		return nil, fmt.Errorf("failed to marshal old data: %v", err)
   291  	}
   292  
   293  	newData, err := json.Marshal(newPVC)
   294  	if err != nil {
   295  		return nil, fmt.Errorf("failed to marshal new data: %v", err)
   296  	}
   297  
   298  	patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, oldPVC)
   299  	if err != nil {
   300  		return nil, fmt.Errorf("failed to create 2 way merge patch: %v", err)
   301  	}
   302  
   303  	if addResourceVersionCheck {
   304  		patchBytes, err = addResourceVersion(patchBytes, oldPVC.ResourceVersion)
   305  		if err != nil {
   306  			return nil, fmt.Errorf("failed to add resource version: %v", err)
   307  		}
   308  	}
   309  
   310  	return patchBytes, nil
   311  }
   312  
   313  func addResourceVersion(patchBytes []byte, resourceVersion string) ([]byte, error) {
   314  	var patchMap map[string]interface{}
   315  	err := json.Unmarshal(patchBytes, &patchMap)
   316  	if err != nil {
   317  		return nil, fmt.Errorf("error unmarshalling patch: %v", err)
   318  	}
   319  	u := unstructured.Unstructured{Object: patchMap}
   320  	a, err := meta.Accessor(&u)
   321  	if err != nil {
   322  		return nil, fmt.Errorf("error creating accessor: %v", err)
   323  	}
   324  	a.SetResourceVersion(resourceVersion)
   325  	versionBytes, err := json.Marshal(patchMap)
   326  	if err != nil {
   327  		return nil, fmt.Errorf("error marshalling json patch: %v", err)
   328  	}
   329  	return versionBytes, nil
   330  }
   331  
   332  // MergeResizeConditionOnPVC updates pvc with requested resize conditions
   333  // leaving other conditions untouched.
   334  func MergeResizeConditionOnPVC(
   335  	pvc *v1.PersistentVolumeClaim,
   336  	resizeConditions []v1.PersistentVolumeClaimCondition) *v1.PersistentVolumeClaim {
   337  	resizeConditionMap := map[v1.PersistentVolumeClaimConditionType]*resizeProcessStatus{}
   338  
   339  	for _, condition := range resizeConditions {
   340  		resizeConditionMap[condition.Type] = &resizeProcessStatus{condition, false}
   341  	}
   342  
   343  	oldConditions := pvc.Status.Conditions
   344  	newConditions := []v1.PersistentVolumeClaimCondition{}
   345  	for _, condition := range oldConditions {
   346  		// If Condition is of not resize type, we keep it.
   347  		if _, ok := knownResizeConditions[condition.Type]; !ok {
   348  			newConditions = append(newConditions, condition)
   349  			continue
   350  		}
   351  
   352  		if newCondition, ok := resizeConditionMap[condition.Type]; ok {
   353  			if newCondition.condition.Status != condition.Status {
   354  				newConditions = append(newConditions, newCondition.condition)
   355  			} else {
   356  				newConditions = append(newConditions, condition)
   357  			}
   358  			newCondition.processed = true
   359  		}
   360  	}
   361  
   362  	// append all unprocessed conditions
   363  	for _, newCondition := range resizeConditionMap {
   364  		if !newCondition.processed {
   365  			newConditions = append(newConditions, newCondition.condition)
   366  		}
   367  	}
   368  	pvc.Status.Conditions = newConditions
   369  	return pvc
   370  }
   371  
   372  func mergeStorageResourceStatus(pvc *v1.PersistentVolumeClaim, status v1.ClaimResourceStatus) *v1.PersistentVolumeClaim {
   373  	allocatedResourceStatusMap := pvc.Status.AllocatedResourceStatuses
   374  	if allocatedResourceStatusMap == nil {
   375  		pvc.Status.AllocatedResourceStatuses = map[v1.ResourceName]v1.ClaimResourceStatus{
   376  			v1.ResourceStorage: status,
   377  		}
   378  		return pvc
   379  	}
   380  	allocatedResourceStatusMap[v1.ResourceStorage] = status
   381  	pvc.Status.AllocatedResourceStatuses = allocatedResourceStatusMap
   382  	return pvc
   383  }
   384  
   385  func mergeStorageAllocatedResources(pvc *v1.PersistentVolumeClaim, size resource.Quantity) *v1.PersistentVolumeClaim {
   386  	allocatedResourcesMap := pvc.Status.AllocatedResources
   387  	if allocatedResourcesMap == nil {
   388  		pvc.Status.AllocatedResources = map[v1.ResourceName]resource.Quantity{
   389  			v1.ResourceStorage: size,
   390  		}
   391  		return pvc
   392  	}
   393  	allocatedResourcesMap[v1.ResourceStorage] = size
   394  	pvc.Status.AllocatedResources = allocatedResourcesMap
   395  	return pvc
   396  }
   397  
   398  // GenericResizeFS : call generic filesystem resizer for plugins that don't have any special filesystem resize requirements
   399  func GenericResizeFS(host volume.VolumeHost, pluginName, devicePath, deviceMountPath string) (bool, error) {
   400  	resizer := mount.NewResizeFs(host.GetExec(pluginName))
   401  	return resizer.Resize(devicePath, deviceMountPath)
   402  }
   403  

View as plain text