/* Copyright 2016 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package statefulset import ( "context" "fmt" "strings" apps "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" errorutils "k8s.io/apimachinery/pkg/util/errors" utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilfeature "k8s.io/apiserver/pkg/util/feature" clientset "k8s.io/client-go/kubernetes" corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/retry" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/features" ) // StatefulPodControlObjectManager abstracts the manipulation of Pods and PVCs. The real controller implements this // with a clientset for writes and listers for reads; for tests we provide stubs. type StatefulPodControlObjectManager interface { CreatePod(ctx context.Context, pod *v1.Pod) error GetPod(namespace, podName string) (*v1.Pod, error) UpdatePod(pod *v1.Pod) error DeletePod(pod *v1.Pod) error CreateClaim(claim *v1.PersistentVolumeClaim) error GetClaim(namespace, claimName string) (*v1.PersistentVolumeClaim, error) UpdateClaim(claim *v1.PersistentVolumeClaim) error } // StatefulPodControl defines the interface that StatefulSetController uses to create, update, and delete Pods, // and to update the Status of a StatefulSet. It follows the design paradigms used for PodControl, but its // implementation provides for PVC creation, ordered Pod creation, ordered Pod termination, and Pod identity enforcement. // Manipulation of objects is provided through objectMgr, which allows the k8s API to be mocked out for testing. type StatefulPodControl struct { objectMgr StatefulPodControlObjectManager recorder record.EventRecorder } // NewStatefulPodControl constructs a StatefulPodControl using a realStatefulPodControlObjectManager with the given // clientset, listers and EventRecorder. func NewStatefulPodControl( client clientset.Interface, podLister corelisters.PodLister, claimLister corelisters.PersistentVolumeClaimLister, recorder record.EventRecorder, ) *StatefulPodControl { return &StatefulPodControl{&realStatefulPodControlObjectManager{client, podLister, claimLister}, recorder} } // NewStatefulPodControlFromManager creates a StatefulPodControl using the given StatefulPodControlObjectManager and recorder. func NewStatefulPodControlFromManager(om StatefulPodControlObjectManager, recorder record.EventRecorder) *StatefulPodControl { return &StatefulPodControl{om, recorder} } // realStatefulPodControlObjectManager uses a clientset.Interface and listers. type realStatefulPodControlObjectManager struct { client clientset.Interface podLister corelisters.PodLister claimLister corelisters.PersistentVolumeClaimLister } func (om *realStatefulPodControlObjectManager) CreatePod(ctx context.Context, pod *v1.Pod) error { _, err := om.client.CoreV1().Pods(pod.Namespace).Create(ctx, pod, metav1.CreateOptions{}) return err } func (om *realStatefulPodControlObjectManager) GetPod(namespace, podName string) (*v1.Pod, error) { return om.podLister.Pods(namespace).Get(podName) } func (om *realStatefulPodControlObjectManager) UpdatePod(pod *v1.Pod) error { _, err := om.client.CoreV1().Pods(pod.Namespace).Update(context.TODO(), pod, metav1.UpdateOptions{}) return err } func (om *realStatefulPodControlObjectManager) DeletePod(pod *v1.Pod) error { return om.client.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{}) } func (om *realStatefulPodControlObjectManager) CreateClaim(claim *v1.PersistentVolumeClaim) error { _, err := om.client.CoreV1().PersistentVolumeClaims(claim.Namespace).Create(context.TODO(), claim, metav1.CreateOptions{}) return err } func (om *realStatefulPodControlObjectManager) GetClaim(namespace, claimName string) (*v1.PersistentVolumeClaim, error) { return om.claimLister.PersistentVolumeClaims(namespace).Get(claimName) } func (om *realStatefulPodControlObjectManager) UpdateClaim(claim *v1.PersistentVolumeClaim) error { _, err := om.client.CoreV1().PersistentVolumeClaims(claim.Namespace).Update(context.TODO(), claim, metav1.UpdateOptions{}) return err } func (spc *StatefulPodControl) CreateStatefulPod(ctx context.Context, set *apps.StatefulSet, pod *v1.Pod) error { // Create the Pod's PVCs prior to creating the Pod if err := spc.createPersistentVolumeClaims(set, pod); err != nil { spc.recordPodEvent("create", set, pod, err) return err } // If we created the PVCs attempt to create the Pod err := spc.objectMgr.CreatePod(ctx, pod) // sink already exists errors if apierrors.IsAlreadyExists(err) { return err } if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) { // Set PVC policy as much as is possible at this point. if err := spc.UpdatePodClaimForRetentionPolicy(ctx, set, pod); err != nil { spc.recordPodEvent("update", set, pod, err) return err } } spc.recordPodEvent("create", set, pod, err) return err } func (spc *StatefulPodControl) UpdateStatefulPod(ctx context.Context, set *apps.StatefulSet, pod *v1.Pod) error { attemptedUpdate := false err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { // assume the Pod is consistent consistent := true // if the Pod does not conform to its identity, update the identity and dirty the Pod if !identityMatches(set, pod) { updateIdentity(set, pod) consistent = false } // if the Pod does not conform to the StatefulSet's storage requirements, update the Pod's PVC's, // dirty the Pod, and create any missing PVCs if !storageMatches(set, pod) { updateStorage(set, pod) consistent = false if err := spc.createPersistentVolumeClaims(set, pod); err != nil { spc.recordPodEvent("update", set, pod, err) return err } } if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) { // if the Pod's PVCs are not consistent with the StatefulSet's PVC deletion policy, update the PVC // and dirty the pod. if match, err := spc.ClaimsMatchRetentionPolicy(ctx, set, pod); err != nil { spc.recordPodEvent("update", set, pod, err) return err } else if !match { if err := spc.UpdatePodClaimForRetentionPolicy(ctx, set, pod); err != nil { spc.recordPodEvent("update", set, pod, err) return err } consistent = false } } // if the Pod is not dirty, do nothing if consistent { return nil } attemptedUpdate = true // commit the update, retrying on conflicts updateErr := spc.objectMgr.UpdatePod(pod) if updateErr == nil { return nil } if updated, err := spc.objectMgr.GetPod(set.Namespace, pod.Name); err == nil { // make a copy so we don't mutate the shared cache pod = updated.DeepCopy() } else { utilruntime.HandleError(fmt.Errorf("error getting updated Pod %s/%s: %w", set.Namespace, pod.Name, err)) } return updateErr }) if attemptedUpdate { spc.recordPodEvent("update", set, pod, err) } return err } func (spc *StatefulPodControl) DeleteStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error { err := spc.objectMgr.DeletePod(pod) spc.recordPodEvent("delete", set, pod, err) return err } // ClaimsMatchRetentionPolicy returns false if the PVCs for pod are not consistent with set's PVC deletion policy. // An error is returned if something is not consistent. This is expected if the pod is being otherwise updated, // but a problem otherwise (see usage of this method in UpdateStatefulPod). func (spc *StatefulPodControl) ClaimsMatchRetentionPolicy(ctx context.Context, set *apps.StatefulSet, pod *v1.Pod) (bool, error) { logger := klog.FromContext(ctx) ordinal := getOrdinal(pod) templates := set.Spec.VolumeClaimTemplates for i := range templates { claimName := getPersistentVolumeClaimName(set, &templates[i], ordinal) claim, err := spc.objectMgr.GetClaim(set.Namespace, claimName) switch { case apierrors.IsNotFound(err): klog.FromContext(ctx).V(4).Info("Expected claim missing, continuing to pick up in next iteration", "PVC", klog.KObj(claim)) case err != nil: return false, fmt.Errorf("Could not retrieve claim %s for %s when checking PVC deletion policy", claimName, pod.Name) default: if !claimOwnerMatchesSetAndPod(logger, claim, set, pod) { return false, nil } } } return true, nil } // UpdatePodClaimForRetentionPolicy updates the PVCs used by pod to match the PVC deletion policy of set. func (spc *StatefulPodControl) UpdatePodClaimForRetentionPolicy(ctx context.Context, set *apps.StatefulSet, pod *v1.Pod) error { logger := klog.FromContext(ctx) ordinal := getOrdinal(pod) templates := set.Spec.VolumeClaimTemplates for i := range templates { claimName := getPersistentVolumeClaimName(set, &templates[i], ordinal) claim, err := spc.objectMgr.GetClaim(set.Namespace, claimName) switch { case apierrors.IsNotFound(err): logger.V(4).Info("Expected claim missing, continuing to pick up in next iteration", "PVC", klog.KObj(claim)) case err != nil: return fmt.Errorf("Could not retrieve claim %s not found for %s when checking PVC deletion policy: %w", claimName, pod.Name, err) default: if !claimOwnerMatchesSetAndPod(logger, claim, set, pod) { claim = claim.DeepCopy() // Make a copy so we don't mutate the shared cache. needsUpdate := updateClaimOwnerRefForSetAndPod(logger, claim, set, pod) if needsUpdate { err := spc.objectMgr.UpdateClaim(claim) if err != nil { return fmt.Errorf("Could not update claim %s for delete policy ownerRefs: %w", claimName, err) } } } } } return nil } // PodClaimIsStale returns true for a stale PVC that should block pod creation. If the scaling // policy is deletion, and a PVC has an ownerRef that does not match the pod, the PVC is stale. This // includes pods whose UID has not been created. func (spc *StatefulPodControl) PodClaimIsStale(set *apps.StatefulSet, pod *v1.Pod) (bool, error) { policy := getPersistentVolumeClaimRetentionPolicy(set) if policy.WhenScaled == apps.RetainPersistentVolumeClaimRetentionPolicyType { // PVCs are meant to be reused and so can't be stale. return false, nil } for _, claim := range getPersistentVolumeClaims(set, pod) { pvc, err := spc.objectMgr.GetClaim(claim.Namespace, claim.Name) switch { case apierrors.IsNotFound(err): // If the claim doesn't exist yet, it can't be stale. continue case err != nil: return false, err case err == nil: // A claim is stale if it doesn't match the pod's UID, including if the pod has no UID. if hasStaleOwnerRef(pvc, pod) { return true, nil } } } return false, nil } // recordPodEvent records an event for verb applied to a Pod in a StatefulSet. If err is nil the generated event will // have a reason of v1.EventTypeNormal. If err is not nil the generated event will have a reason of v1.EventTypeWarning. func (spc *StatefulPodControl) recordPodEvent(verb string, set *apps.StatefulSet, pod *v1.Pod, err error) { if err == nil { reason := fmt.Sprintf("Successful%s", strings.Title(verb)) message := fmt.Sprintf("%s Pod %s in StatefulSet %s successful", strings.ToLower(verb), pod.Name, set.Name) spc.recorder.Event(set, v1.EventTypeNormal, reason, message) } else { reason := fmt.Sprintf("Failed%s", strings.Title(verb)) message := fmt.Sprintf("%s Pod %s in StatefulSet %s failed error: %s", strings.ToLower(verb), pod.Name, set.Name, err) spc.recorder.Event(set, v1.EventTypeWarning, reason, message) } } // recordClaimEvent records an event for verb applied to the PersistentVolumeClaim of a Pod in a StatefulSet. If err is // nil the generated event will have a reason of v1.EventTypeNormal. If err is not nil the generated event will have a // reason of v1.EventTypeWarning. func (spc *StatefulPodControl) recordClaimEvent(verb string, set *apps.StatefulSet, pod *v1.Pod, claim *v1.PersistentVolumeClaim, err error) { if err == nil { reason := fmt.Sprintf("Successful%s", strings.Title(verb)) message := fmt.Sprintf("%s Claim %s Pod %s in StatefulSet %s success", strings.ToLower(verb), claim.Name, pod.Name, set.Name) spc.recorder.Event(set, v1.EventTypeNormal, reason, message) } else { reason := fmt.Sprintf("Failed%s", strings.Title(verb)) message := fmt.Sprintf("%s Claim %s for Pod %s in StatefulSet %s failed error: %s", strings.ToLower(verb), claim.Name, pod.Name, set.Name, err) spc.recorder.Event(set, v1.EventTypeWarning, reason, message) } } // createMissingPersistentVolumeClaims creates all of the required PersistentVolumeClaims for pod, and updates its retention policy func (spc *StatefulPodControl) createMissingPersistentVolumeClaims(ctx context.Context, set *apps.StatefulSet, pod *v1.Pod) error { if err := spc.createPersistentVolumeClaims(set, pod); err != nil { return err } if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetAutoDeletePVC) { // Set PVC policy as much as is possible at this point. if err := spc.UpdatePodClaimForRetentionPolicy(ctx, set, pod); err != nil { spc.recordPodEvent("update", set, pod, err) return err } } return nil } // createPersistentVolumeClaims creates all of the required PersistentVolumeClaims for pod, which must be a member of // set. If all of the claims for Pod are successfully created, the returned error is nil. If creation fails, this method // may be called again until no error is returned, indicating the PersistentVolumeClaims for pod are consistent with // set's Spec. func (spc *StatefulPodControl) createPersistentVolumeClaims(set *apps.StatefulSet, pod *v1.Pod) error { var errs []error for _, claim := range getPersistentVolumeClaims(set, pod) { pvc, err := spc.objectMgr.GetClaim(claim.Namespace, claim.Name) switch { case apierrors.IsNotFound(err): err := spc.objectMgr.CreateClaim(&claim) if err != nil { errs = append(errs, fmt.Errorf("failed to create PVC %s: %s", claim.Name, err)) } if err == nil || !apierrors.IsAlreadyExists(err) { spc.recordClaimEvent("create", set, pod, &claim, err) } case err != nil: errs = append(errs, fmt.Errorf("failed to retrieve PVC %s: %s", claim.Name, err)) spc.recordClaimEvent("create", set, pod, &claim, err) default: if pvc.DeletionTimestamp != nil { errs = append(errs, fmt.Errorf("pvc %s is being deleted", claim.Name)) } } // TODO: Check resource requirements and accessmodes, update if necessary } return errorutils.NewAggregate(errs) }