...

Source file src/k8s.io/kubernetes/pkg/controller/statefulset/stateful_pod_control.go

Documentation: k8s.io/kubernetes/pkg/controller/statefulset

     1  /*
     2  Copyright 2016 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package statefulset
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"strings"
    23  
    24  	apps "k8s.io/api/apps/v1"
    25  	v1 "k8s.io/api/core/v1"
    26  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    27  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    28  	errorutils "k8s.io/apimachinery/pkg/util/errors"
    29  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    30  	utilfeature "k8s.io/apiserver/pkg/util/feature"
    31  	clientset "k8s.io/client-go/kubernetes"
    32  	corelisters "k8s.io/client-go/listers/core/v1"
    33  	"k8s.io/client-go/tools/record"
    34  	"k8s.io/client-go/util/retry"
    35  	"k8s.io/klog/v2"
    36  	"k8s.io/kubernetes/pkg/features"
    37  )
    38  
    39  // StatefulPodControlObjectManager abstracts the manipulation of Pods and PVCs. The real controller implements this
    40  // with a clientset for writes and listers for reads; for tests we provide stubs.
    41  type StatefulPodControlObjectManager interface {
    42  	CreatePod(ctx context.Context, pod *v1.Pod) error
    43  	GetPod(namespace, podName string) (*v1.Pod, error)
    44  	UpdatePod(pod *v1.Pod) error
    45  	DeletePod(pod *v1.Pod) error
    46  	CreateClaim(claim *v1.PersistentVolumeClaim) error
    47  	GetClaim(namespace, claimName string) (*v1.PersistentVolumeClaim, error)
    48  	UpdateClaim(claim *v1.PersistentVolumeClaim) error
    49  }
    50  
    51  // StatefulPodControl defines the interface that StatefulSetController uses to create, update, and delete Pods,
    52  // and to update the Status of a StatefulSet. It follows the design paradigms used for PodControl, but its
    53  // implementation provides for PVC creation, ordered Pod creation, ordered Pod termination, and Pod identity enforcement.
    54  // Manipulation of objects is provided through objectMgr, which allows the k8s API to be mocked out for testing.
    55  type StatefulPodControl struct {
    56  	objectMgr StatefulPodControlObjectManager
    57  	recorder  record.EventRecorder
    58  }
    59  
    60  // NewStatefulPodControl constructs a StatefulPodControl using a realStatefulPodControlObjectManager with the given
    61  // clientset, listers and EventRecorder.
    62  func NewStatefulPodControl(
    63  	client clientset.Interface,
    64  	podLister corelisters.PodLister,
    65  	claimLister corelisters.PersistentVolumeClaimLister,
    66  	recorder record.EventRecorder,
    67  ) *StatefulPodControl {
    68  	return &StatefulPodControl{&realStatefulPodControlObjectManager{client, podLister, claimLister}, recorder}
    69  }
    70  
    71  // NewStatefulPodControlFromManager creates a StatefulPodControl using the given StatefulPodControlObjectManager and recorder.
    72  func NewStatefulPodControlFromManager(om StatefulPodControlObjectManager, recorder record.EventRecorder) *StatefulPodControl {
    73  	return &StatefulPodControl{om, recorder}
    74  }
    75  
    76  // realStatefulPodControlObjectManager uses a clientset.Interface and listers.
    77  type realStatefulPodControlObjectManager struct {
    78  	client      clientset.Interface
    79  	podLister   corelisters.PodLister
    80  	claimLister corelisters.PersistentVolumeClaimLister
    81  }
    82  
    83  func (om *realStatefulPodControlObjectManager) CreatePod(ctx context.Context, pod *v1.Pod) error {
    84  	_, err := om.client.CoreV1().Pods(pod.Namespace).Create(ctx, pod, metav1.CreateOptions{})
    85  	return err
    86  }
    87  
    88  func (om *realStatefulPodControlObjectManager) GetPod(namespace, podName string) (*v1.Pod, error) {
    89  	return om.podLister.Pods(namespace).Get(podName)
    90  }
    91  
    92  func (om *realStatefulPodControlObjectManager) UpdatePod(pod *v1.Pod) error {
    93  	_, err := om.client.CoreV1().Pods(pod.Namespace).Update(context.TODO(), pod, metav1.UpdateOptions{})
    94  	return err
    95  }
    96  
    97  func (om *realStatefulPodControlObjectManager) DeletePod(pod *v1.Pod) error {
    98  	return om.client.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{})
    99  }
   100  
   101  func (om *realStatefulPodControlObjectManager) CreateClaim(claim *v1.PersistentVolumeClaim) error {
   102  	_, err := om.client.CoreV1().PersistentVolumeClaims(claim.Namespace).Create(context.TODO(), claim, metav1.CreateOptions{})
   103  	return err
   104  }
   105  
   106  func (om *realStatefulPodControlObjectManager) GetClaim(namespace, claimName string) (*v1.PersistentVolumeClaim, error) {
   107  	return om.claimLister.PersistentVolumeClaims(namespace).Get(claimName)
   108  }
   109  
   110  func (om *realStatefulPodControlObjectManager) UpdateClaim(claim *v1.PersistentVolumeClaim) error {
   111  	_, err := om.client.CoreV1().PersistentVolumeClaims(claim.Namespace).Update(context.TODO(), claim, metav1.UpdateOptions{})
   112  	return err
   113  }
   114  
   115  func (spc *StatefulPodControl) CreateStatefulPod(ctx context.Context, set *apps.StatefulSet, pod *v1.Pod) error {
   116  	// Create the Pod's PVCs prior to creating the Pod
   117  	if err := spc.createPersistentVolumeClaims(set, pod); err != nil {
   118  		spc.recordPodEvent("create", set, pod, err)
   119  		return err
   120  	}
   121  	// If we created the PVCs attempt to create the Pod
   122  	err := spc.objectMgr.CreatePod(ctx, pod)
   123  	// sink already exists errors
   124  	if apierrors.IsAlreadyExists(err) {
   125  		return err
   126  	}
   127  	if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) {
   128  		// Set PVC policy as much as is possible at this point.
   129  		if err := spc.UpdatePodClaimForRetentionPolicy(ctx, set, pod); err != nil {
   130  			spc.recordPodEvent("update", set, pod, err)
   131  			return err
   132  		}
   133  	}
   134  	spc.recordPodEvent("create", set, pod, err)
   135  	return err
   136  }
   137  
   138  func (spc *StatefulPodControl) UpdateStatefulPod(ctx context.Context, set *apps.StatefulSet, pod *v1.Pod) error {
   139  	attemptedUpdate := false
   140  	err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
   141  		// assume the Pod is consistent
   142  		consistent := true
   143  		// if the Pod does not conform to its identity, update the identity and dirty the Pod
   144  		if !identityMatches(set, pod) {
   145  			updateIdentity(set, pod)
   146  			consistent = false
   147  		}
   148  		// if the Pod does not conform to the StatefulSet's storage requirements, update the Pod's PVC's,
   149  		// dirty the Pod, and create any missing PVCs
   150  		if !storageMatches(set, pod) {
   151  			updateStorage(set, pod)
   152  			consistent = false
   153  			if err := spc.createPersistentVolumeClaims(set, pod); err != nil {
   154  				spc.recordPodEvent("update", set, pod, err)
   155  				return err
   156  			}
   157  		}
   158  		if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) {
   159  			// if the Pod's PVCs are not consistent with the StatefulSet's PVC deletion policy, update the PVC
   160  			// and dirty the pod.
   161  			if match, err := spc.ClaimsMatchRetentionPolicy(ctx, set, pod); err != nil {
   162  				spc.recordPodEvent("update", set, pod, err)
   163  				return err
   164  			} else if !match {
   165  				if err := spc.UpdatePodClaimForRetentionPolicy(ctx, set, pod); err != nil {
   166  					spc.recordPodEvent("update", set, pod, err)
   167  					return err
   168  				}
   169  				consistent = false
   170  			}
   171  		}
   172  
   173  		// if the Pod is not dirty, do nothing
   174  		if consistent {
   175  			return nil
   176  		}
   177  
   178  		attemptedUpdate = true
   179  		// commit the update, retrying on conflicts
   180  
   181  		updateErr := spc.objectMgr.UpdatePod(pod)
   182  		if updateErr == nil {
   183  			return nil
   184  		}
   185  
   186  		if updated, err := spc.objectMgr.GetPod(set.Namespace, pod.Name); err == nil {
   187  			// make a copy so we don't mutate the shared cache
   188  			pod = updated.DeepCopy()
   189  		} else {
   190  			utilruntime.HandleError(fmt.Errorf("error getting updated Pod %s/%s: %w", set.Namespace, pod.Name, err))
   191  		}
   192  
   193  		return updateErr
   194  	})
   195  	if attemptedUpdate {
   196  		spc.recordPodEvent("update", set, pod, err)
   197  	}
   198  	return err
   199  }
   200  
   201  func (spc *StatefulPodControl) DeleteStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error {
   202  	err := spc.objectMgr.DeletePod(pod)
   203  	spc.recordPodEvent("delete", set, pod, err)
   204  	return err
   205  }
   206  
   207  // ClaimsMatchRetentionPolicy returns false if the PVCs for pod are not consistent with set's PVC deletion policy.
   208  // An error is returned if something is not consistent. This is expected if the pod is being otherwise updated,
   209  // but a problem otherwise (see usage of this method in UpdateStatefulPod).
   210  func (spc *StatefulPodControl) ClaimsMatchRetentionPolicy(ctx context.Context, set *apps.StatefulSet, pod *v1.Pod) (bool, error) {
   211  	logger := klog.FromContext(ctx)
   212  	ordinal := getOrdinal(pod)
   213  	templates := set.Spec.VolumeClaimTemplates
   214  	for i := range templates {
   215  		claimName := getPersistentVolumeClaimName(set, &templates[i], ordinal)
   216  		claim, err := spc.objectMgr.GetClaim(set.Namespace, claimName)
   217  		switch {
   218  		case apierrors.IsNotFound(err):
   219  			klog.FromContext(ctx).V(4).Info("Expected claim missing, continuing to pick up in next iteration", "PVC", klog.KObj(claim))
   220  		case err != nil:
   221  			return false, fmt.Errorf("Could not retrieve claim %s for %s when checking PVC deletion policy", claimName, pod.Name)
   222  		default:
   223  			if !claimOwnerMatchesSetAndPod(logger, claim, set, pod) {
   224  				return false, nil
   225  			}
   226  		}
   227  	}
   228  	return true, nil
   229  }
   230  
   231  // UpdatePodClaimForRetentionPolicy updates the PVCs used by pod to match the PVC deletion policy of set.
   232  func (spc *StatefulPodControl) UpdatePodClaimForRetentionPolicy(ctx context.Context, set *apps.StatefulSet, pod *v1.Pod) error {
   233  	logger := klog.FromContext(ctx)
   234  	ordinal := getOrdinal(pod)
   235  	templates := set.Spec.VolumeClaimTemplates
   236  	for i := range templates {
   237  		claimName := getPersistentVolumeClaimName(set, &templates[i], ordinal)
   238  		claim, err := spc.objectMgr.GetClaim(set.Namespace, claimName)
   239  		switch {
   240  		case apierrors.IsNotFound(err):
   241  			logger.V(4).Info("Expected claim missing, continuing to pick up in next iteration", "PVC", klog.KObj(claim))
   242  		case err != nil:
   243  			return fmt.Errorf("Could not retrieve claim %s not found for %s when checking PVC deletion policy: %w", claimName, pod.Name, err)
   244  		default:
   245  			if !claimOwnerMatchesSetAndPod(logger, claim, set, pod) {
   246  				claim = claim.DeepCopy() // Make a copy so we don't mutate the shared cache.
   247  				needsUpdate := updateClaimOwnerRefForSetAndPod(logger, claim, set, pod)
   248  				if needsUpdate {
   249  					err := spc.objectMgr.UpdateClaim(claim)
   250  					if err != nil {
   251  						return fmt.Errorf("Could not update claim %s for delete policy ownerRefs: %w", claimName, err)
   252  					}
   253  				}
   254  			}
   255  		}
   256  	}
   257  	return nil
   258  }
   259  
   260  // PodClaimIsStale returns true for a stale PVC that should block pod creation. If the scaling
   261  // policy is deletion, and a PVC has an ownerRef that does not match the pod, the PVC is stale. This
   262  // includes pods whose UID has not been created.
   263  func (spc *StatefulPodControl) PodClaimIsStale(set *apps.StatefulSet, pod *v1.Pod) (bool, error) {
   264  	policy := getPersistentVolumeClaimRetentionPolicy(set)
   265  	if policy.WhenScaled == apps.RetainPersistentVolumeClaimRetentionPolicyType {
   266  		// PVCs are meant to be reused and so can't be stale.
   267  		return false, nil
   268  	}
   269  	for _, claim := range getPersistentVolumeClaims(set, pod) {
   270  		pvc, err := spc.objectMgr.GetClaim(claim.Namespace, claim.Name)
   271  		switch {
   272  		case apierrors.IsNotFound(err):
   273  			// If the claim doesn't exist yet, it can't be stale.
   274  			continue
   275  		case err != nil:
   276  			return false, err
   277  		case err == nil:
   278  			// A claim is stale if it doesn't match the pod's UID, including if the pod has no UID.
   279  			if hasStaleOwnerRef(pvc, pod) {
   280  				return true, nil
   281  			}
   282  		}
   283  	}
   284  	return false, nil
   285  }
   286  
   287  // recordPodEvent records an event for verb applied to a Pod in a StatefulSet. If err is nil the generated event will
   288  // have a reason of v1.EventTypeNormal. If err is not nil the generated event will have a reason of v1.EventTypeWarning.
   289  func (spc *StatefulPodControl) recordPodEvent(verb string, set *apps.StatefulSet, pod *v1.Pod, err error) {
   290  	if err == nil {
   291  		reason := fmt.Sprintf("Successful%s", strings.Title(verb))
   292  		message := fmt.Sprintf("%s Pod %s in StatefulSet %s successful",
   293  			strings.ToLower(verb), pod.Name, set.Name)
   294  		spc.recorder.Event(set, v1.EventTypeNormal, reason, message)
   295  	} else {
   296  		reason := fmt.Sprintf("Failed%s", strings.Title(verb))
   297  		message := fmt.Sprintf("%s Pod %s in StatefulSet %s failed error: %s",
   298  			strings.ToLower(verb), pod.Name, set.Name, err)
   299  		spc.recorder.Event(set, v1.EventTypeWarning, reason, message)
   300  	}
   301  }
   302  
   303  // recordClaimEvent records an event for verb applied to the PersistentVolumeClaim of a Pod in a StatefulSet. If err is
   304  // nil the generated event will have a reason of v1.EventTypeNormal. If err is not nil the generated event will have a
   305  // reason of v1.EventTypeWarning.
   306  func (spc *StatefulPodControl) recordClaimEvent(verb string, set *apps.StatefulSet, pod *v1.Pod, claim *v1.PersistentVolumeClaim, err error) {
   307  	if err == nil {
   308  		reason := fmt.Sprintf("Successful%s", strings.Title(verb))
   309  		message := fmt.Sprintf("%s Claim %s Pod %s in StatefulSet %s success",
   310  			strings.ToLower(verb), claim.Name, pod.Name, set.Name)
   311  		spc.recorder.Event(set, v1.EventTypeNormal, reason, message)
   312  	} else {
   313  		reason := fmt.Sprintf("Failed%s", strings.Title(verb))
   314  		message := fmt.Sprintf("%s Claim %s for Pod %s in StatefulSet %s failed error: %s",
   315  			strings.ToLower(verb), claim.Name, pod.Name, set.Name, err)
   316  		spc.recorder.Event(set, v1.EventTypeWarning, reason, message)
   317  	}
   318  }
   319  
   320  // createMissingPersistentVolumeClaims creates all of the required PersistentVolumeClaims for pod, and updates its retention policy
   321  func (spc *StatefulPodControl) createMissingPersistentVolumeClaims(ctx context.Context, set *apps.StatefulSet, pod *v1.Pod) error {
   322  	if err := spc.createPersistentVolumeClaims(set, pod); err != nil {
   323  		return err
   324  	}
   325  
   326  	if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) {
   327  		// Set PVC policy as much as is possible at this point.
   328  		if err := spc.UpdatePodClaimForRetentionPolicy(ctx, set, pod); err != nil {
   329  			spc.recordPodEvent("update", set, pod, err)
   330  			return err
   331  		}
   332  	}
   333  	return nil
   334  }
   335  
   336  // createPersistentVolumeClaims creates all of the required PersistentVolumeClaims for pod, which must be a member of
   337  // set. If all of the claims for Pod are successfully created, the returned error is nil. If creation fails, this method
   338  // may be called again until no error is returned, indicating the PersistentVolumeClaims for pod are consistent with
   339  // set's Spec.
   340  func (spc *StatefulPodControl) createPersistentVolumeClaims(set *apps.StatefulSet, pod *v1.Pod) error {
   341  	var errs []error
   342  	for _, claim := range getPersistentVolumeClaims(set, pod) {
   343  		pvc, err := spc.objectMgr.GetClaim(claim.Namespace, claim.Name)
   344  		switch {
   345  		case apierrors.IsNotFound(err):
   346  			err := spc.objectMgr.CreateClaim(&claim)
   347  			if err != nil {
   348  				errs = append(errs, fmt.Errorf("failed to create PVC %s: %s", claim.Name, err))
   349  			}
   350  			if err == nil || !apierrors.IsAlreadyExists(err) {
   351  				spc.recordClaimEvent("create", set, pod, &claim, err)
   352  			}
   353  		case err != nil:
   354  			errs = append(errs, fmt.Errorf("failed to retrieve PVC %s: %s", claim.Name, err))
   355  			spc.recordClaimEvent("create", set, pod, &claim, err)
   356  		default:
   357  			if pvc.DeletionTimestamp != nil {
   358  				errs = append(errs, fmt.Errorf("pvc %s is being deleted", claim.Name))
   359  			}
   360  		}
   361  		// TODO: Check resource requirements and accessmodes, update if necessary
   362  	}
   363  	return errorutils.NewAggregate(errs)
   364  }
   365  

View as plain text