...

Source file src/k8s.io/kubernetes/pkg/controller/statefulset/stateful_set_utils.go

Documentation: k8s.io/kubernetes/pkg/controller/statefulset

     1  /*
     2  Copyright 2016 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package statefulset
    18  
    19  import (
    20  	"encoding/json"
    21  	"fmt"
    22  	"regexp"
    23  	"strconv"
    24  
    25  	apps "k8s.io/api/apps/v1"
    26  	v1 "k8s.io/api/core/v1"
    27  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    28  	"k8s.io/apimachinery/pkg/runtime"
    29  	"k8s.io/apimachinery/pkg/util/intstr"
    30  	"k8s.io/apimachinery/pkg/util/strategicpatch"
    31  	utilfeature "k8s.io/apiserver/pkg/util/feature"
    32  	"k8s.io/client-go/kubernetes/scheme"
    33  	"k8s.io/klog/v2"
    34  	podutil "k8s.io/kubernetes/pkg/api/v1/pod"
    35  	"k8s.io/kubernetes/pkg/controller"
    36  	"k8s.io/kubernetes/pkg/controller/history"
    37  	"k8s.io/kubernetes/pkg/features"
    38  )
    39  
    40  var patchCodec = scheme.Codecs.LegacyCodec(apps.SchemeGroupVersion)
    41  
    42  // overlappingStatefulSets sorts a list of StatefulSets by creation timestamp, using their names as a tie breaker.
    43  // Generally used to tie break between StatefulSets that have overlapping selectors.
    44  type overlappingStatefulSets []*apps.StatefulSet
    45  
    46  func (o overlappingStatefulSets) Len() int { return len(o) }
    47  
    48  func (o overlappingStatefulSets) Swap(i, j int) { o[i], o[j] = o[j], o[i] }
    49  
    50  func (o overlappingStatefulSets) Less(i, j int) bool {
    51  	if o[i].CreationTimestamp.Equal(&o[j].CreationTimestamp) {
    52  		return o[i].Name < o[j].Name
    53  	}
    54  	return o[i].CreationTimestamp.Before(&o[j].CreationTimestamp)
    55  }
    56  
    57  // statefulPodRegex is a regular expression that extracts the parent StatefulSet and ordinal from the Name of a Pod
    58  var statefulPodRegex = regexp.MustCompile("(.*)-([0-9]+)$")
    59  
    60  // getParentNameAndOrdinal gets the name of pod's parent StatefulSet and pod's ordinal as extracted from its Name. If
    61  // the Pod was not created by a StatefulSet, its parent is considered to be empty string, and its ordinal is considered
    62  // to be -1.
    63  func getParentNameAndOrdinal(pod *v1.Pod) (string, int) {
    64  	parent := ""
    65  	ordinal := -1
    66  	subMatches := statefulPodRegex.FindStringSubmatch(pod.Name)
    67  	if len(subMatches) < 3 {
    68  		return parent, ordinal
    69  	}
    70  	parent = subMatches[1]
    71  	if i, err := strconv.ParseInt(subMatches[2], 10, 32); err == nil {
    72  		ordinal = int(i)
    73  	}
    74  	return parent, ordinal
    75  }
    76  
    77  // getParentName gets the name of pod's parent StatefulSet. If pod has not parent, the empty string is returned.
    78  func getParentName(pod *v1.Pod) string {
    79  	parent, _ := getParentNameAndOrdinal(pod)
    80  	return parent
    81  }
    82  
    83  // getOrdinal gets pod's ordinal. If pod has no ordinal, -1 is returned.
    84  func getOrdinal(pod *v1.Pod) int {
    85  	_, ordinal := getParentNameAndOrdinal(pod)
    86  	return ordinal
    87  }
    88  
    89  // getStartOrdinal gets the first possible ordinal (inclusive).
    90  // Returns spec.ordinals.start if spec.ordinals is set, otherwise returns 0.
    91  func getStartOrdinal(set *apps.StatefulSet) int {
    92  	if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetStartOrdinal) {
    93  		if set.Spec.Ordinals != nil {
    94  			return int(set.Spec.Ordinals.Start)
    95  		}
    96  	}
    97  	return 0
    98  }
    99  
   100  // getEndOrdinal gets the last possible ordinal (inclusive).
   101  func getEndOrdinal(set *apps.StatefulSet) int {
   102  	return getStartOrdinal(set) + int(*set.Spec.Replicas) - 1
   103  }
   104  
   105  // podInOrdinalRange returns true if the pod ordinal is within the allowed
   106  // range of ordinals that this StatefulSet is set to control.
   107  func podInOrdinalRange(pod *v1.Pod, set *apps.StatefulSet) bool {
   108  	ordinal := getOrdinal(pod)
   109  	return ordinal >= getStartOrdinal(set) && ordinal <= getEndOrdinal(set)
   110  }
   111  
   112  // getPodName gets the name of set's child Pod with an ordinal index of ordinal
   113  func getPodName(set *apps.StatefulSet, ordinal int) string {
   114  	return fmt.Sprintf("%s-%d", set.Name, ordinal)
   115  }
   116  
   117  // getPersistentVolumeClaimName gets the name of PersistentVolumeClaim for a Pod with an ordinal index of ordinal. claim
   118  // must be a PersistentVolumeClaim from set's VolumeClaims template.
   119  func getPersistentVolumeClaimName(set *apps.StatefulSet, claim *v1.PersistentVolumeClaim, ordinal int) string {
   120  	// NOTE: This name format is used by the heuristics for zone spreading in ChooseZoneForVolume
   121  	return fmt.Sprintf("%s-%s-%d", claim.Name, set.Name, ordinal)
   122  }
   123  
   124  // isMemberOf tests if pod is a member of set.
   125  func isMemberOf(set *apps.StatefulSet, pod *v1.Pod) bool {
   126  	return getParentName(pod) == set.Name
   127  }
   128  
   129  // identityMatches returns true if pod has a valid identity and network identity for a member of set.
   130  func identityMatches(set *apps.StatefulSet, pod *v1.Pod) bool {
   131  	parent, ordinal := getParentNameAndOrdinal(pod)
   132  	return ordinal >= 0 &&
   133  		set.Name == parent &&
   134  		pod.Name == getPodName(set, ordinal) &&
   135  		pod.Namespace == set.Namespace &&
   136  		pod.Labels[apps.StatefulSetPodNameLabel] == pod.Name
   137  }
   138  
   139  // storageMatches returns true if pod's Volumes cover the set of PersistentVolumeClaims
   140  func storageMatches(set *apps.StatefulSet, pod *v1.Pod) bool {
   141  	ordinal := getOrdinal(pod)
   142  	if ordinal < 0 {
   143  		return false
   144  	}
   145  	volumes := make(map[string]v1.Volume, len(pod.Spec.Volumes))
   146  	for _, volume := range pod.Spec.Volumes {
   147  		volumes[volume.Name] = volume
   148  	}
   149  	for _, claim := range set.Spec.VolumeClaimTemplates {
   150  		volume, found := volumes[claim.Name]
   151  		if !found ||
   152  			volume.VolumeSource.PersistentVolumeClaim == nil ||
   153  			volume.VolumeSource.PersistentVolumeClaim.ClaimName !=
   154  				getPersistentVolumeClaimName(set, &claim, ordinal) {
   155  			return false
   156  		}
   157  	}
   158  	return true
   159  }
   160  
   161  // getPersistentVolumeClaimPolicy returns the PVC policy for a StatefulSet, returning a retain policy if the set policy is nil.
   162  func getPersistentVolumeClaimRetentionPolicy(set *apps.StatefulSet) apps.StatefulSetPersistentVolumeClaimRetentionPolicy {
   163  	policy := apps.StatefulSetPersistentVolumeClaimRetentionPolicy{
   164  		WhenDeleted: apps.RetainPersistentVolumeClaimRetentionPolicyType,
   165  		WhenScaled:  apps.RetainPersistentVolumeClaimRetentionPolicyType,
   166  	}
   167  	if set.Spec.PersistentVolumeClaimRetentionPolicy != nil {
   168  		policy = *set.Spec.PersistentVolumeClaimRetentionPolicy
   169  	}
   170  	return policy
   171  }
   172  
   173  // claimOwnerMatchesSetAndPod returns false if the ownerRefs of the claim are not set consistently with the
   174  // PVC deletion policy for the StatefulSet.
   175  func claimOwnerMatchesSetAndPod(logger klog.Logger, claim *v1.PersistentVolumeClaim, set *apps.StatefulSet, pod *v1.Pod) bool {
   176  	policy := getPersistentVolumeClaimRetentionPolicy(set)
   177  	const retain = apps.RetainPersistentVolumeClaimRetentionPolicyType
   178  	const delete = apps.DeletePersistentVolumeClaimRetentionPolicyType
   179  	switch {
   180  	default:
   181  		logger.Error(nil, "Unknown policy, treating as Retain", "policy", set.Spec.PersistentVolumeClaimRetentionPolicy)
   182  		fallthrough
   183  	case policy.WhenScaled == retain && policy.WhenDeleted == retain:
   184  		if hasOwnerRef(claim, set) ||
   185  			hasOwnerRef(claim, pod) {
   186  			return false
   187  		}
   188  	case policy.WhenScaled == retain && policy.WhenDeleted == delete:
   189  		if !hasOwnerRef(claim, set) ||
   190  			hasOwnerRef(claim, pod) {
   191  			return false
   192  		}
   193  	case policy.WhenScaled == delete && policy.WhenDeleted == retain:
   194  		if hasOwnerRef(claim, set) {
   195  			return false
   196  		}
   197  		podScaledDown := !podInOrdinalRange(pod, set)
   198  		if podScaledDown != hasOwnerRef(claim, pod) {
   199  			return false
   200  		}
   201  	case policy.WhenScaled == delete && policy.WhenDeleted == delete:
   202  		podScaledDown := !podInOrdinalRange(pod, set)
   203  		// If a pod is scaled down, there should be no set ref and a pod ref;
   204  		// if the pod is not scaled down it's the other way around.
   205  		if podScaledDown == hasOwnerRef(claim, set) {
   206  			return false
   207  		}
   208  		if podScaledDown != hasOwnerRef(claim, pod) {
   209  			return false
   210  		}
   211  	}
   212  	return true
   213  }
   214  
   215  // updateClaimOwnerRefForSetAndPod updates the ownerRefs for the claim according to the deletion policy of
   216  // the StatefulSet. Returns true if the claim was changed and should be updated and false otherwise.
   217  func updateClaimOwnerRefForSetAndPod(logger klog.Logger, claim *v1.PersistentVolumeClaim, set *apps.StatefulSet, pod *v1.Pod) bool {
   218  	needsUpdate := false
   219  	// Sometimes the version and kind are not set {pod,set}.TypeMeta. These are necessary for the ownerRef.
   220  	// This is the case both in real clusters and the unittests.
   221  	// TODO: there must be a better way to do this other than hardcoding the pod version?
   222  	updateMeta := func(tm *metav1.TypeMeta, kind string) {
   223  		if tm.APIVersion == "" {
   224  			if kind == "StatefulSet" {
   225  				tm.APIVersion = "apps/v1"
   226  			} else {
   227  				tm.APIVersion = "v1"
   228  			}
   229  		}
   230  		if tm.Kind == "" {
   231  			tm.Kind = kind
   232  		}
   233  	}
   234  	podMeta := pod.TypeMeta
   235  	updateMeta(&podMeta, "Pod")
   236  	setMeta := set.TypeMeta
   237  	updateMeta(&setMeta, "StatefulSet")
   238  	policy := getPersistentVolumeClaimRetentionPolicy(set)
   239  	const retain = apps.RetainPersistentVolumeClaimRetentionPolicyType
   240  	const delete = apps.DeletePersistentVolumeClaimRetentionPolicyType
   241  	switch {
   242  	default:
   243  		logger.Error(nil, "Unknown policy, treating as Retain", "policy", set.Spec.PersistentVolumeClaimRetentionPolicy)
   244  		fallthrough
   245  	case policy.WhenScaled == retain && policy.WhenDeleted == retain:
   246  		needsUpdate = removeOwnerRef(claim, set) || needsUpdate
   247  		needsUpdate = removeOwnerRef(claim, pod) || needsUpdate
   248  	case policy.WhenScaled == retain && policy.WhenDeleted == delete:
   249  		needsUpdate = setOwnerRef(claim, set, &setMeta) || needsUpdate
   250  		needsUpdate = removeOwnerRef(claim, pod) || needsUpdate
   251  	case policy.WhenScaled == delete && policy.WhenDeleted == retain:
   252  		needsUpdate = removeOwnerRef(claim, set) || needsUpdate
   253  		podScaledDown := !podInOrdinalRange(pod, set)
   254  		if podScaledDown {
   255  			needsUpdate = setOwnerRef(claim, pod, &podMeta) || needsUpdate
   256  		}
   257  		if !podScaledDown {
   258  			needsUpdate = removeOwnerRef(claim, pod) || needsUpdate
   259  		}
   260  	case policy.WhenScaled == delete && policy.WhenDeleted == delete:
   261  		podScaledDown := !podInOrdinalRange(pod, set)
   262  		if podScaledDown {
   263  			needsUpdate = removeOwnerRef(claim, set) || needsUpdate
   264  			needsUpdate = setOwnerRef(claim, pod, &podMeta) || needsUpdate
   265  		}
   266  		if !podScaledDown {
   267  			needsUpdate = setOwnerRef(claim, set, &setMeta) || needsUpdate
   268  			needsUpdate = removeOwnerRef(claim, pod) || needsUpdate
   269  		}
   270  	}
   271  	return needsUpdate
   272  }
   273  
   274  // hasOwnerRef returns true if target has an ownerRef to owner.
   275  func hasOwnerRef(target, owner metav1.Object) bool {
   276  	ownerUID := owner.GetUID()
   277  	for _, ownerRef := range target.GetOwnerReferences() {
   278  		if ownerRef.UID == ownerUID {
   279  			return true
   280  		}
   281  	}
   282  	return false
   283  }
   284  
   285  // hasStaleOwnerRef returns true if target has a ref to owner that appears to be stale.
   286  func hasStaleOwnerRef(target, owner metav1.Object) bool {
   287  	for _, ownerRef := range target.GetOwnerReferences() {
   288  		if ownerRef.Name == owner.GetName() && ownerRef.UID != owner.GetUID() {
   289  			return true
   290  		}
   291  	}
   292  	return false
   293  }
   294  
   295  // setOwnerRef adds owner to the ownerRefs of target, if necessary. Returns true if target needs to be
   296  // updated and false otherwise.
   297  func setOwnerRef(target, owner metav1.Object, ownerType *metav1.TypeMeta) bool {
   298  	if hasOwnerRef(target, owner) {
   299  		return false
   300  	}
   301  	ownerRefs := append(
   302  		target.GetOwnerReferences(),
   303  		metav1.OwnerReference{
   304  			APIVersion: ownerType.APIVersion,
   305  			Kind:       ownerType.Kind,
   306  			Name:       owner.GetName(),
   307  			UID:        owner.GetUID(),
   308  		})
   309  	target.SetOwnerReferences(ownerRefs)
   310  	return true
   311  }
   312  
   313  // removeOwnerRef removes owner from the ownerRefs of target, if necessary. Returns true if target needs
   314  // to be updated and false otherwise.
   315  func removeOwnerRef(target, owner metav1.Object) bool {
   316  	if !hasOwnerRef(target, owner) {
   317  		return false
   318  	}
   319  	ownerUID := owner.GetUID()
   320  	oldRefs := target.GetOwnerReferences()
   321  	newRefs := make([]metav1.OwnerReference, len(oldRefs)-1)
   322  	skip := 0
   323  	for i := range oldRefs {
   324  		if oldRefs[i].UID == ownerUID {
   325  			skip = -1
   326  		} else {
   327  			newRefs[i+skip] = oldRefs[i]
   328  		}
   329  	}
   330  	target.SetOwnerReferences(newRefs)
   331  	return true
   332  }
   333  
   334  // getPersistentVolumeClaims gets a map of PersistentVolumeClaims to their template names, as defined in set. The
   335  // returned PersistentVolumeClaims are each constructed with a the name specific to the Pod. This name is determined
   336  // by getPersistentVolumeClaimName.
   337  func getPersistentVolumeClaims(set *apps.StatefulSet, pod *v1.Pod) map[string]v1.PersistentVolumeClaim {
   338  	ordinal := getOrdinal(pod)
   339  	templates := set.Spec.VolumeClaimTemplates
   340  	claims := make(map[string]v1.PersistentVolumeClaim, len(templates))
   341  	for i := range templates {
   342  		claim := templates[i].DeepCopy()
   343  		claim.Name = getPersistentVolumeClaimName(set, claim, ordinal)
   344  		claim.Namespace = set.Namespace
   345  		if claim.Labels != nil {
   346  			for key, value := range set.Spec.Selector.MatchLabels {
   347  				claim.Labels[key] = value
   348  			}
   349  		} else {
   350  			claim.Labels = set.Spec.Selector.MatchLabels
   351  		}
   352  		claims[templates[i].Name] = *claim
   353  	}
   354  	return claims
   355  }
   356  
   357  // updateStorage updates pod's Volumes to conform with the PersistentVolumeClaim of set's templates. If pod has
   358  // conflicting local Volumes these are replaced with Volumes that conform to the set's templates.
   359  func updateStorage(set *apps.StatefulSet, pod *v1.Pod) {
   360  	currentVolumes := pod.Spec.Volumes
   361  	claims := getPersistentVolumeClaims(set, pod)
   362  	newVolumes := make([]v1.Volume, 0, len(claims))
   363  	for name, claim := range claims {
   364  		newVolumes = append(newVolumes, v1.Volume{
   365  			Name: name,
   366  			VolumeSource: v1.VolumeSource{
   367  				PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
   368  					ClaimName: claim.Name,
   369  					// TODO: Use source definition to set this value when we have one.
   370  					ReadOnly: false,
   371  				},
   372  			},
   373  		})
   374  	}
   375  	for i := range currentVolumes {
   376  		if _, ok := claims[currentVolumes[i].Name]; !ok {
   377  			newVolumes = append(newVolumes, currentVolumes[i])
   378  		}
   379  	}
   380  	pod.Spec.Volumes = newVolumes
   381  }
   382  
   383  func initIdentity(set *apps.StatefulSet, pod *v1.Pod) {
   384  	updateIdentity(set, pod)
   385  	// Set these immutable fields only on initial Pod creation, not updates.
   386  	pod.Spec.Hostname = pod.Name
   387  	pod.Spec.Subdomain = set.Spec.ServiceName
   388  }
   389  
   390  // updateIdentity updates pod's name, hostname, and subdomain, and StatefulSetPodNameLabel to conform to set's name
   391  // and headless service.
   392  func updateIdentity(set *apps.StatefulSet, pod *v1.Pod) {
   393  	ordinal := getOrdinal(pod)
   394  	pod.Name = getPodName(set, ordinal)
   395  	pod.Namespace = set.Namespace
   396  	if pod.Labels == nil {
   397  		pod.Labels = make(map[string]string)
   398  	}
   399  	pod.Labels[apps.StatefulSetPodNameLabel] = pod.Name
   400  	if utilfeature.DefaultFeatureGate.Enabled(features.PodIndexLabel) {
   401  		pod.Labels[apps.PodIndexLabel] = strconv.Itoa(ordinal)
   402  	}
   403  }
   404  
   405  // isRunningAndReady returns true if pod is in the PodRunning Phase, if it has a condition of PodReady.
   406  func isRunningAndReady(pod *v1.Pod) bool {
   407  	return pod.Status.Phase == v1.PodRunning && podutil.IsPodReady(pod)
   408  }
   409  
   410  func isRunningAndAvailable(pod *v1.Pod, minReadySeconds int32) bool {
   411  	return podutil.IsPodAvailable(pod, minReadySeconds, metav1.Now())
   412  }
   413  
   414  // isCreated returns true if pod has been created and is maintained by the API server
   415  func isCreated(pod *v1.Pod) bool {
   416  	return pod.Status.Phase != ""
   417  }
   418  
   419  // isPending returns true if pod has a Phase of PodPending
   420  func isPending(pod *v1.Pod) bool {
   421  	return pod.Status.Phase == v1.PodPending
   422  }
   423  
   424  // isFailed returns true if pod has a Phase of PodFailed
   425  func isFailed(pod *v1.Pod) bool {
   426  	return pod.Status.Phase == v1.PodFailed
   427  }
   428  
   429  // isSucceeded returns true if pod has a Phase of PodSucceeded
   430  func isSucceeded(pod *v1.Pod) bool {
   431  	return pod.Status.Phase == v1.PodSucceeded
   432  }
   433  
   434  // isTerminating returns true if pod's DeletionTimestamp has been set
   435  func isTerminating(pod *v1.Pod) bool {
   436  	return pod.DeletionTimestamp != nil
   437  }
   438  
   439  // isHealthy returns true if pod is running and ready and has not been terminated
   440  func isHealthy(pod *v1.Pod) bool {
   441  	return isRunningAndReady(pod) && !isTerminating(pod)
   442  }
   443  
   444  // allowsBurst is true if the alpha burst annotation is set.
   445  func allowsBurst(set *apps.StatefulSet) bool {
   446  	return set.Spec.PodManagementPolicy == apps.ParallelPodManagement
   447  }
   448  
   449  // setPodRevision sets the revision of Pod to revision by adding the StatefulSetRevisionLabel
   450  func setPodRevision(pod *v1.Pod, revision string) {
   451  	if pod.Labels == nil {
   452  		pod.Labels = make(map[string]string)
   453  	}
   454  	pod.Labels[apps.StatefulSetRevisionLabel] = revision
   455  }
   456  
   457  // getPodRevision gets the revision of Pod by inspecting the StatefulSetRevisionLabel. If pod has no revision the empty
   458  // string is returned.
   459  func getPodRevision(pod *v1.Pod) string {
   460  	if pod.Labels == nil {
   461  		return ""
   462  	}
   463  	return pod.Labels[apps.StatefulSetRevisionLabel]
   464  }
   465  
   466  // newStatefulSetPod returns a new Pod conforming to the set's Spec with an identity generated from ordinal.
   467  func newStatefulSetPod(set *apps.StatefulSet, ordinal int) *v1.Pod {
   468  	pod, _ := controller.GetPodFromTemplate(&set.Spec.Template, set, metav1.NewControllerRef(set, controllerKind))
   469  	pod.Name = getPodName(set, ordinal)
   470  	initIdentity(set, pod)
   471  	updateStorage(set, pod)
   472  	return pod
   473  }
   474  
   475  // newVersionedStatefulSetPod creates a new Pod for a StatefulSet. currentSet is the representation of the set at the
   476  // current revision. updateSet is the representation of the set at the updateRevision. currentRevision is the name of
   477  // the current revision. updateRevision is the name of the update revision. ordinal is the ordinal of the Pod. If the
   478  // returned error is nil, the returned Pod is valid.
   479  func newVersionedStatefulSetPod(currentSet, updateSet *apps.StatefulSet, currentRevision, updateRevision string, ordinal int) *v1.Pod {
   480  	if currentSet.Spec.UpdateStrategy.Type == apps.RollingUpdateStatefulSetStrategyType &&
   481  		(currentSet.Spec.UpdateStrategy.RollingUpdate == nil && ordinal < (getStartOrdinal(currentSet)+int(currentSet.Status.CurrentReplicas))) ||
   482  		(currentSet.Spec.UpdateStrategy.RollingUpdate != nil && ordinal < (getStartOrdinal(currentSet)+int(*currentSet.Spec.UpdateStrategy.RollingUpdate.Partition))) {
   483  		pod := newStatefulSetPod(currentSet, ordinal)
   484  		setPodRevision(pod, currentRevision)
   485  		return pod
   486  	}
   487  	pod := newStatefulSetPod(updateSet, ordinal)
   488  	setPodRevision(pod, updateRevision)
   489  	return pod
   490  }
   491  
   492  // getPatch returns a strategic merge patch that can be applied to restore a StatefulSet to a
   493  // previous version. If the returned error is nil the patch is valid. The current state that we save is just the
   494  // PodSpecTemplate. We can modify this later to encompass more state (or less) and remain compatible with previously
   495  // recorded patches.
   496  func getPatch(set *apps.StatefulSet) ([]byte, error) {
   497  	data, err := runtime.Encode(patchCodec, set)
   498  	if err != nil {
   499  		return nil, err
   500  	}
   501  	var raw map[string]interface{}
   502  	err = json.Unmarshal(data, &raw)
   503  	if err != nil {
   504  		return nil, err
   505  	}
   506  	objCopy := make(map[string]interface{})
   507  	specCopy := make(map[string]interface{})
   508  	spec := raw["spec"].(map[string]interface{})
   509  	template := spec["template"].(map[string]interface{})
   510  	specCopy["template"] = template
   511  	template["$patch"] = "replace"
   512  	objCopy["spec"] = specCopy
   513  	patch, err := json.Marshal(objCopy)
   514  	return patch, err
   515  }
   516  
   517  // newRevision creates a new ControllerRevision containing a patch that reapplies the target state of set.
   518  // The Revision of the returned ControllerRevision is set to revision. If the returned error is nil, the returned
   519  // ControllerRevision is valid. StatefulSet revisions are stored as patches that re-apply the current state of set
   520  // to a new StatefulSet using a strategic merge patch to replace the saved state of the new StatefulSet.
   521  func newRevision(set *apps.StatefulSet, revision int64, collisionCount *int32) (*apps.ControllerRevision, error) {
   522  	patch, err := getPatch(set)
   523  	if err != nil {
   524  		return nil, err
   525  	}
   526  	cr, err := history.NewControllerRevision(set,
   527  		controllerKind,
   528  		set.Spec.Template.Labels,
   529  		runtime.RawExtension{Raw: patch},
   530  		revision,
   531  		collisionCount)
   532  	if err != nil {
   533  		return nil, err
   534  	}
   535  	if cr.ObjectMeta.Annotations == nil {
   536  		cr.ObjectMeta.Annotations = make(map[string]string)
   537  	}
   538  	for key, value := range set.Annotations {
   539  		cr.ObjectMeta.Annotations[key] = value
   540  	}
   541  	return cr, nil
   542  }
   543  
   544  // ApplyRevision returns a new StatefulSet constructed by restoring the state in revision to set. If the returned error
   545  // is nil, the returned StatefulSet is valid.
   546  func ApplyRevision(set *apps.StatefulSet, revision *apps.ControllerRevision) (*apps.StatefulSet, error) {
   547  	clone := set.DeepCopy()
   548  	patched, err := strategicpatch.StrategicMergePatch([]byte(runtime.EncodeOrDie(patchCodec, clone)), revision.Data.Raw, clone)
   549  	if err != nil {
   550  		return nil, err
   551  	}
   552  	restoredSet := &apps.StatefulSet{}
   553  	err = json.Unmarshal(patched, restoredSet)
   554  	if err != nil {
   555  		return nil, err
   556  	}
   557  	return restoredSet, nil
   558  }
   559  
   560  // nextRevision finds the next valid revision number based on revisions. If the length of revisions
   561  // is 0 this is 1. Otherwise, it is 1 greater than the largest revision's Revision. This method
   562  // assumes that revisions has been sorted by Revision.
   563  func nextRevision(revisions []*apps.ControllerRevision) int64 {
   564  	count := len(revisions)
   565  	if count <= 0 {
   566  		return 1
   567  	}
   568  	return revisions[count-1].Revision + 1
   569  }
   570  
   571  // inconsistentStatus returns true if the ObservedGeneration of status is greater than set's
   572  // Generation or if any of the status's fields do not match those of set's status.
   573  func inconsistentStatus(set *apps.StatefulSet, status *apps.StatefulSetStatus) bool {
   574  	return status.ObservedGeneration > set.Status.ObservedGeneration ||
   575  		status.Replicas != set.Status.Replicas ||
   576  		status.CurrentReplicas != set.Status.CurrentReplicas ||
   577  		status.ReadyReplicas != set.Status.ReadyReplicas ||
   578  		status.UpdatedReplicas != set.Status.UpdatedReplicas ||
   579  		status.CurrentRevision != set.Status.CurrentRevision ||
   580  		status.AvailableReplicas != set.Status.AvailableReplicas ||
   581  		status.UpdateRevision != set.Status.UpdateRevision
   582  }
   583  
   584  // completeRollingUpdate completes a rolling update when all of set's replica Pods have been updated
   585  // to the updateRevision. status's currentRevision is set to updateRevision and its' updateRevision
   586  // is set to the empty string. status's currentReplicas is set to updateReplicas and its updateReplicas
   587  // are set to 0.
   588  func completeRollingUpdate(set *apps.StatefulSet, status *apps.StatefulSetStatus) {
   589  	if set.Spec.UpdateStrategy.Type == apps.RollingUpdateStatefulSetStrategyType &&
   590  		status.UpdatedReplicas == *set.Spec.Replicas &&
   591  		status.ReadyReplicas == *set.Spec.Replicas &&
   592  		status.Replicas == *set.Spec.Replicas {
   593  		status.CurrentReplicas = status.UpdatedReplicas
   594  		status.CurrentRevision = status.UpdateRevision
   595  	}
   596  }
   597  
   598  // ascendingOrdinal is a sort.Interface that Sorts a list of Pods based on the ordinals extracted
   599  // from the Pod. Pod's that have not been constructed by StatefulSet's have an ordinal of -1, and are therefore pushed
   600  // to the front of the list.
   601  type ascendingOrdinal []*v1.Pod
   602  
   603  func (ao ascendingOrdinal) Len() int {
   604  	return len(ao)
   605  }
   606  
   607  func (ao ascendingOrdinal) Swap(i, j int) {
   608  	ao[i], ao[j] = ao[j], ao[i]
   609  }
   610  
   611  func (ao ascendingOrdinal) Less(i, j int) bool {
   612  	return getOrdinal(ao[i]) < getOrdinal(ao[j])
   613  }
   614  
   615  // descendingOrdinal is a sort.Interface that Sorts a list of Pods based on the ordinals extracted
   616  // from the Pod. Pod's that have not been constructed by StatefulSet's have an ordinal of -1, and are therefore pushed
   617  // to the end of the list.
   618  type descendingOrdinal []*v1.Pod
   619  
   620  func (do descendingOrdinal) Len() int {
   621  	return len(do)
   622  }
   623  
   624  func (do descendingOrdinal) Swap(i, j int) {
   625  	do[i], do[j] = do[j], do[i]
   626  }
   627  
   628  func (do descendingOrdinal) Less(i, j int) bool {
   629  	return getOrdinal(do[i]) > getOrdinal(do[j])
   630  }
   631  
   632  // getStatefulSetMaxUnavailable calculates the real maxUnavailable number according to the replica count
   633  // and maxUnavailable from rollingUpdateStrategy. The number defaults to 1 if the maxUnavailable field is
   634  // not set, and it will be round down to at least 1 if the maxUnavailable value is a percentage.
   635  // Note that API validation has already guaranteed the maxUnavailable field to be >1 if it is an integer
   636  // or 0% < value <= 100% if it is a percentage, so we don't have to consider other cases.
   637  func getStatefulSetMaxUnavailable(maxUnavailable *intstr.IntOrString, replicaCount int) (int, error) {
   638  	maxUnavailableNum, err := intstr.GetScaledValueFromIntOrPercent(intstr.ValueOrDefault(maxUnavailable, intstr.FromInt32(1)), replicaCount, false)
   639  	if err != nil {
   640  		return 0, err
   641  	}
   642  	// maxUnavailable might be zero for small percentage with round down.
   643  	// So we have to enforce it not to be less than 1.
   644  	if maxUnavailableNum < 1 {
   645  		maxUnavailableNum = 1
   646  	}
   647  	return maxUnavailableNum, nil
   648  }
   649  

View as plain text