1
16
17 package statefulset
18
19 import (
20 "encoding/json"
21 "fmt"
22 "regexp"
23 "strconv"
24
25 apps "k8s.io/api/apps/v1"
26 v1 "k8s.io/api/core/v1"
27 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28 "k8s.io/apimachinery/pkg/runtime"
29 "k8s.io/apimachinery/pkg/util/intstr"
30 "k8s.io/apimachinery/pkg/util/strategicpatch"
31 utilfeature "k8s.io/apiserver/pkg/util/feature"
32 "k8s.io/client-go/kubernetes/scheme"
33 "k8s.io/klog/v2"
34 podutil "k8s.io/kubernetes/pkg/api/v1/pod"
35 "k8s.io/kubernetes/pkg/controller"
36 "k8s.io/kubernetes/pkg/controller/history"
37 "k8s.io/kubernetes/pkg/features"
38 )
39
40 var patchCodec = scheme.Codecs.LegacyCodec(apps.SchemeGroupVersion)
41
42
43
44 type overlappingStatefulSets []*apps.StatefulSet
45
46 func (o overlappingStatefulSets) Len() int { return len(o) }
47
48 func (o overlappingStatefulSets) Swap(i, j int) { o[i], o[j] = o[j], o[i] }
49
50 func (o overlappingStatefulSets) Less(i, j int) bool {
51 if o[i].CreationTimestamp.Equal(&o[j].CreationTimestamp) {
52 return o[i].Name < o[j].Name
53 }
54 return o[i].CreationTimestamp.Before(&o[j].CreationTimestamp)
55 }
56
57
58 var statefulPodRegex = regexp.MustCompile("(.*)-([0-9]+)$")
59
60
61
62
63 func getParentNameAndOrdinal(pod *v1.Pod) (string, int) {
64 parent := ""
65 ordinal := -1
66 subMatches := statefulPodRegex.FindStringSubmatch(pod.Name)
67 if len(subMatches) < 3 {
68 return parent, ordinal
69 }
70 parent = subMatches[1]
71 if i, err := strconv.ParseInt(subMatches[2], 10, 32); err == nil {
72 ordinal = int(i)
73 }
74 return parent, ordinal
75 }
76
77
78 func getParentName(pod *v1.Pod) string {
79 parent, _ := getParentNameAndOrdinal(pod)
80 return parent
81 }
82
83
84 func getOrdinal(pod *v1.Pod) int {
85 _, ordinal := getParentNameAndOrdinal(pod)
86 return ordinal
87 }
88
89
90
91 func getStartOrdinal(set *apps.StatefulSet) int {
92 if utilfeature.DefaultFeatureGate.Enabled(features.StatefulSetStartOrdinal) {
93 if set.Spec.Ordinals != nil {
94 return int(set.Spec.Ordinals.Start)
95 }
96 }
97 return 0
98 }
99
100
101 func getEndOrdinal(set *apps.StatefulSet) int {
102 return getStartOrdinal(set) + int(*set.Spec.Replicas) - 1
103 }
104
105
106
107 func podInOrdinalRange(pod *v1.Pod, set *apps.StatefulSet) bool {
108 ordinal := getOrdinal(pod)
109 return ordinal >= getStartOrdinal(set) && ordinal <= getEndOrdinal(set)
110 }
111
112
113 func getPodName(set *apps.StatefulSet, ordinal int) string {
114 return fmt.Sprintf("%s-%d", set.Name, ordinal)
115 }
116
117
118
119 func getPersistentVolumeClaimName(set *apps.StatefulSet, claim *v1.PersistentVolumeClaim, ordinal int) string {
120
121 return fmt.Sprintf("%s-%s-%d", claim.Name, set.Name, ordinal)
122 }
123
124
125 func isMemberOf(set *apps.StatefulSet, pod *v1.Pod) bool {
126 return getParentName(pod) == set.Name
127 }
128
129
130 func identityMatches(set *apps.StatefulSet, pod *v1.Pod) bool {
131 parent, ordinal := getParentNameAndOrdinal(pod)
132 return ordinal >= 0 &&
133 set.Name == parent &&
134 pod.Name == getPodName(set, ordinal) &&
135 pod.Namespace == set.Namespace &&
136 pod.Labels[apps.StatefulSetPodNameLabel] == pod.Name
137 }
138
139
140 func storageMatches(set *apps.StatefulSet, pod *v1.Pod) bool {
141 ordinal := getOrdinal(pod)
142 if ordinal < 0 {
143 return false
144 }
145 volumes := make(map[string]v1.Volume, len(pod.Spec.Volumes))
146 for _, volume := range pod.Spec.Volumes {
147 volumes[volume.Name] = volume
148 }
149 for _, claim := range set.Spec.VolumeClaimTemplates {
150 volume, found := volumes[claim.Name]
151 if !found ||
152 volume.VolumeSource.PersistentVolumeClaim == nil ||
153 volume.VolumeSource.PersistentVolumeClaim.ClaimName !=
154 getPersistentVolumeClaimName(set, &claim, ordinal) {
155 return false
156 }
157 }
158 return true
159 }
160
161
162 func getPersistentVolumeClaimRetentionPolicy(set *apps.StatefulSet) apps.StatefulSetPersistentVolumeClaimRetentionPolicy {
163 policy := apps.StatefulSetPersistentVolumeClaimRetentionPolicy{
164 WhenDeleted: apps.RetainPersistentVolumeClaimRetentionPolicyType,
165 WhenScaled: apps.RetainPersistentVolumeClaimRetentionPolicyType,
166 }
167 if set.Spec.PersistentVolumeClaimRetentionPolicy != nil {
168 policy = *set.Spec.PersistentVolumeClaimRetentionPolicy
169 }
170 return policy
171 }
172
173
174
175 func claimOwnerMatchesSetAndPod(logger klog.Logger, claim *v1.PersistentVolumeClaim, set *apps.StatefulSet, pod *v1.Pod) bool {
176 policy := getPersistentVolumeClaimRetentionPolicy(set)
177 const retain = apps.RetainPersistentVolumeClaimRetentionPolicyType
178 const delete = apps.DeletePersistentVolumeClaimRetentionPolicyType
179 switch {
180 default:
181 logger.Error(nil, "Unknown policy, treating as Retain", "policy", set.Spec.PersistentVolumeClaimRetentionPolicy)
182 fallthrough
183 case policy.WhenScaled == retain && policy.WhenDeleted == retain:
184 if hasOwnerRef(claim, set) ||
185 hasOwnerRef(claim, pod) {
186 return false
187 }
188 case policy.WhenScaled == retain && policy.WhenDeleted == delete:
189 if !hasOwnerRef(claim, set) ||
190 hasOwnerRef(claim, pod) {
191 return false
192 }
193 case policy.WhenScaled == delete && policy.WhenDeleted == retain:
194 if hasOwnerRef(claim, set) {
195 return false
196 }
197 podScaledDown := !podInOrdinalRange(pod, set)
198 if podScaledDown != hasOwnerRef(claim, pod) {
199 return false
200 }
201 case policy.WhenScaled == delete && policy.WhenDeleted == delete:
202 podScaledDown := !podInOrdinalRange(pod, set)
203
204
205 if podScaledDown == hasOwnerRef(claim, set) {
206 return false
207 }
208 if podScaledDown != hasOwnerRef(claim, pod) {
209 return false
210 }
211 }
212 return true
213 }
214
215
216
217 func updateClaimOwnerRefForSetAndPod(logger klog.Logger, claim *v1.PersistentVolumeClaim, set *apps.StatefulSet, pod *v1.Pod) bool {
218 needsUpdate := false
219
220
221
222 updateMeta := func(tm *metav1.TypeMeta, kind string) {
223 if tm.APIVersion == "" {
224 if kind == "StatefulSet" {
225 tm.APIVersion = "apps/v1"
226 } else {
227 tm.APIVersion = "v1"
228 }
229 }
230 if tm.Kind == "" {
231 tm.Kind = kind
232 }
233 }
234 podMeta := pod.TypeMeta
235 updateMeta(&podMeta, "Pod")
236 setMeta := set.TypeMeta
237 updateMeta(&setMeta, "StatefulSet")
238 policy := getPersistentVolumeClaimRetentionPolicy(set)
239 const retain = apps.RetainPersistentVolumeClaimRetentionPolicyType
240 const delete = apps.DeletePersistentVolumeClaimRetentionPolicyType
241 switch {
242 default:
243 logger.Error(nil, "Unknown policy, treating as Retain", "policy", set.Spec.PersistentVolumeClaimRetentionPolicy)
244 fallthrough
245 case policy.WhenScaled == retain && policy.WhenDeleted == retain:
246 needsUpdate = removeOwnerRef(claim, set) || needsUpdate
247 needsUpdate = removeOwnerRef(claim, pod) || needsUpdate
248 case policy.WhenScaled == retain && policy.WhenDeleted == delete:
249 needsUpdate = setOwnerRef(claim, set, &setMeta) || needsUpdate
250 needsUpdate = removeOwnerRef(claim, pod) || needsUpdate
251 case policy.WhenScaled == delete && policy.WhenDeleted == retain:
252 needsUpdate = removeOwnerRef(claim, set) || needsUpdate
253 podScaledDown := !podInOrdinalRange(pod, set)
254 if podScaledDown {
255 needsUpdate = setOwnerRef(claim, pod, &podMeta) || needsUpdate
256 }
257 if !podScaledDown {
258 needsUpdate = removeOwnerRef(claim, pod) || needsUpdate
259 }
260 case policy.WhenScaled == delete && policy.WhenDeleted == delete:
261 podScaledDown := !podInOrdinalRange(pod, set)
262 if podScaledDown {
263 needsUpdate = removeOwnerRef(claim, set) || needsUpdate
264 needsUpdate = setOwnerRef(claim, pod, &podMeta) || needsUpdate
265 }
266 if !podScaledDown {
267 needsUpdate = setOwnerRef(claim, set, &setMeta) || needsUpdate
268 needsUpdate = removeOwnerRef(claim, pod) || needsUpdate
269 }
270 }
271 return needsUpdate
272 }
273
274
275 func hasOwnerRef(target, owner metav1.Object) bool {
276 ownerUID := owner.GetUID()
277 for _, ownerRef := range target.GetOwnerReferences() {
278 if ownerRef.UID == ownerUID {
279 return true
280 }
281 }
282 return false
283 }
284
285
286 func hasStaleOwnerRef(target, owner metav1.Object) bool {
287 for _, ownerRef := range target.GetOwnerReferences() {
288 if ownerRef.Name == owner.GetName() && ownerRef.UID != owner.GetUID() {
289 return true
290 }
291 }
292 return false
293 }
294
295
296
297 func setOwnerRef(target, owner metav1.Object, ownerType *metav1.TypeMeta) bool {
298 if hasOwnerRef(target, owner) {
299 return false
300 }
301 ownerRefs := append(
302 target.GetOwnerReferences(),
303 metav1.OwnerReference{
304 APIVersion: ownerType.APIVersion,
305 Kind: ownerType.Kind,
306 Name: owner.GetName(),
307 UID: owner.GetUID(),
308 })
309 target.SetOwnerReferences(ownerRefs)
310 return true
311 }
312
313
314
315 func removeOwnerRef(target, owner metav1.Object) bool {
316 if !hasOwnerRef(target, owner) {
317 return false
318 }
319 ownerUID := owner.GetUID()
320 oldRefs := target.GetOwnerReferences()
321 newRefs := make([]metav1.OwnerReference, len(oldRefs)-1)
322 skip := 0
323 for i := range oldRefs {
324 if oldRefs[i].UID == ownerUID {
325 skip = -1
326 } else {
327 newRefs[i+skip] = oldRefs[i]
328 }
329 }
330 target.SetOwnerReferences(newRefs)
331 return true
332 }
333
334
335
336
337 func getPersistentVolumeClaims(set *apps.StatefulSet, pod *v1.Pod) map[string]v1.PersistentVolumeClaim {
338 ordinal := getOrdinal(pod)
339 templates := set.Spec.VolumeClaimTemplates
340 claims := make(map[string]v1.PersistentVolumeClaim, len(templates))
341 for i := range templates {
342 claim := templates[i].DeepCopy()
343 claim.Name = getPersistentVolumeClaimName(set, claim, ordinal)
344 claim.Namespace = set.Namespace
345 if claim.Labels != nil {
346 for key, value := range set.Spec.Selector.MatchLabels {
347 claim.Labels[key] = value
348 }
349 } else {
350 claim.Labels = set.Spec.Selector.MatchLabels
351 }
352 claims[templates[i].Name] = *claim
353 }
354 return claims
355 }
356
357
358
359 func updateStorage(set *apps.StatefulSet, pod *v1.Pod) {
360 currentVolumes := pod.Spec.Volumes
361 claims := getPersistentVolumeClaims(set, pod)
362 newVolumes := make([]v1.Volume, 0, len(claims))
363 for name, claim := range claims {
364 newVolumes = append(newVolumes, v1.Volume{
365 Name: name,
366 VolumeSource: v1.VolumeSource{
367 PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
368 ClaimName: claim.Name,
369
370 ReadOnly: false,
371 },
372 },
373 })
374 }
375 for i := range currentVolumes {
376 if _, ok := claims[currentVolumes[i].Name]; !ok {
377 newVolumes = append(newVolumes, currentVolumes[i])
378 }
379 }
380 pod.Spec.Volumes = newVolumes
381 }
382
383 func initIdentity(set *apps.StatefulSet, pod *v1.Pod) {
384 updateIdentity(set, pod)
385
386 pod.Spec.Hostname = pod.Name
387 pod.Spec.Subdomain = set.Spec.ServiceName
388 }
389
390
391
392 func updateIdentity(set *apps.StatefulSet, pod *v1.Pod) {
393 ordinal := getOrdinal(pod)
394 pod.Name = getPodName(set, ordinal)
395 pod.Namespace = set.Namespace
396 if pod.Labels == nil {
397 pod.Labels = make(map[string]string)
398 }
399 pod.Labels[apps.StatefulSetPodNameLabel] = pod.Name
400 if utilfeature.DefaultFeatureGate.Enabled(features.PodIndexLabel) {
401 pod.Labels[apps.PodIndexLabel] = strconv.Itoa(ordinal)
402 }
403 }
404
405
406 func isRunningAndReady(pod *v1.Pod) bool {
407 return pod.Status.Phase == v1.PodRunning && podutil.IsPodReady(pod)
408 }
409
410 func isRunningAndAvailable(pod *v1.Pod, minReadySeconds int32) bool {
411 return podutil.IsPodAvailable(pod, minReadySeconds, metav1.Now())
412 }
413
414
415 func isCreated(pod *v1.Pod) bool {
416 return pod.Status.Phase != ""
417 }
418
419
420 func isPending(pod *v1.Pod) bool {
421 return pod.Status.Phase == v1.PodPending
422 }
423
424
425 func isFailed(pod *v1.Pod) bool {
426 return pod.Status.Phase == v1.PodFailed
427 }
428
429
430 func isSucceeded(pod *v1.Pod) bool {
431 return pod.Status.Phase == v1.PodSucceeded
432 }
433
434
435 func isTerminating(pod *v1.Pod) bool {
436 return pod.DeletionTimestamp != nil
437 }
438
439
440 func isHealthy(pod *v1.Pod) bool {
441 return isRunningAndReady(pod) && !isTerminating(pod)
442 }
443
444
445 func allowsBurst(set *apps.StatefulSet) bool {
446 return set.Spec.PodManagementPolicy == apps.ParallelPodManagement
447 }
448
449
450 func setPodRevision(pod *v1.Pod, revision string) {
451 if pod.Labels == nil {
452 pod.Labels = make(map[string]string)
453 }
454 pod.Labels[apps.StatefulSetRevisionLabel] = revision
455 }
456
457
458
459 func getPodRevision(pod *v1.Pod) string {
460 if pod.Labels == nil {
461 return ""
462 }
463 return pod.Labels[apps.StatefulSetRevisionLabel]
464 }
465
466
467 func newStatefulSetPod(set *apps.StatefulSet, ordinal int) *v1.Pod {
468 pod, _ := controller.GetPodFromTemplate(&set.Spec.Template, set, metav1.NewControllerRef(set, controllerKind))
469 pod.Name = getPodName(set, ordinal)
470 initIdentity(set, pod)
471 updateStorage(set, pod)
472 return pod
473 }
474
475
476
477
478
479 func newVersionedStatefulSetPod(currentSet, updateSet *apps.StatefulSet, currentRevision, updateRevision string, ordinal int) *v1.Pod {
480 if currentSet.Spec.UpdateStrategy.Type == apps.RollingUpdateStatefulSetStrategyType &&
481 (currentSet.Spec.UpdateStrategy.RollingUpdate == nil && ordinal < (getStartOrdinal(currentSet)+int(currentSet.Status.CurrentReplicas))) ||
482 (currentSet.Spec.UpdateStrategy.RollingUpdate != nil && ordinal < (getStartOrdinal(currentSet)+int(*currentSet.Spec.UpdateStrategy.RollingUpdate.Partition))) {
483 pod := newStatefulSetPod(currentSet, ordinal)
484 setPodRevision(pod, currentRevision)
485 return pod
486 }
487 pod := newStatefulSetPod(updateSet, ordinal)
488 setPodRevision(pod, updateRevision)
489 return pod
490 }
491
492
493
494
495
496 func getPatch(set *apps.StatefulSet) ([]byte, error) {
497 data, err := runtime.Encode(patchCodec, set)
498 if err != nil {
499 return nil, err
500 }
501 var raw map[string]interface{}
502 err = json.Unmarshal(data, &raw)
503 if err != nil {
504 return nil, err
505 }
506 objCopy := make(map[string]interface{})
507 specCopy := make(map[string]interface{})
508 spec := raw["spec"].(map[string]interface{})
509 template := spec["template"].(map[string]interface{})
510 specCopy["template"] = template
511 template["$patch"] = "replace"
512 objCopy["spec"] = specCopy
513 patch, err := json.Marshal(objCopy)
514 return patch, err
515 }
516
517
518
519
520
521 func newRevision(set *apps.StatefulSet, revision int64, collisionCount *int32) (*apps.ControllerRevision, error) {
522 patch, err := getPatch(set)
523 if err != nil {
524 return nil, err
525 }
526 cr, err := history.NewControllerRevision(set,
527 controllerKind,
528 set.Spec.Template.Labels,
529 runtime.RawExtension{Raw: patch},
530 revision,
531 collisionCount)
532 if err != nil {
533 return nil, err
534 }
535 if cr.ObjectMeta.Annotations == nil {
536 cr.ObjectMeta.Annotations = make(map[string]string)
537 }
538 for key, value := range set.Annotations {
539 cr.ObjectMeta.Annotations[key] = value
540 }
541 return cr, nil
542 }
543
544
545
546 func ApplyRevision(set *apps.StatefulSet, revision *apps.ControllerRevision) (*apps.StatefulSet, error) {
547 clone := set.DeepCopy()
548 patched, err := strategicpatch.StrategicMergePatch([]byte(runtime.EncodeOrDie(patchCodec, clone)), revision.Data.Raw, clone)
549 if err != nil {
550 return nil, err
551 }
552 restoredSet := &apps.StatefulSet{}
553 err = json.Unmarshal(patched, restoredSet)
554 if err != nil {
555 return nil, err
556 }
557 return restoredSet, nil
558 }
559
560
561
562
563 func nextRevision(revisions []*apps.ControllerRevision) int64 {
564 count := len(revisions)
565 if count <= 0 {
566 return 1
567 }
568 return revisions[count-1].Revision + 1
569 }
570
571
572
573 func inconsistentStatus(set *apps.StatefulSet, status *apps.StatefulSetStatus) bool {
574 return status.ObservedGeneration > set.Status.ObservedGeneration ||
575 status.Replicas != set.Status.Replicas ||
576 status.CurrentReplicas != set.Status.CurrentReplicas ||
577 status.ReadyReplicas != set.Status.ReadyReplicas ||
578 status.UpdatedReplicas != set.Status.UpdatedReplicas ||
579 status.CurrentRevision != set.Status.CurrentRevision ||
580 status.AvailableReplicas != set.Status.AvailableReplicas ||
581 status.UpdateRevision != set.Status.UpdateRevision
582 }
583
584
585
586
587
588 func completeRollingUpdate(set *apps.StatefulSet, status *apps.StatefulSetStatus) {
589 if set.Spec.UpdateStrategy.Type == apps.RollingUpdateStatefulSetStrategyType &&
590 status.UpdatedReplicas == *set.Spec.Replicas &&
591 status.ReadyReplicas == *set.Spec.Replicas &&
592 status.Replicas == *set.Spec.Replicas {
593 status.CurrentReplicas = status.UpdatedReplicas
594 status.CurrentRevision = status.UpdateRevision
595 }
596 }
597
598
599
600
601 type ascendingOrdinal []*v1.Pod
602
603 func (ao ascendingOrdinal) Len() int {
604 return len(ao)
605 }
606
607 func (ao ascendingOrdinal) Swap(i, j int) {
608 ao[i], ao[j] = ao[j], ao[i]
609 }
610
611 func (ao ascendingOrdinal) Less(i, j int) bool {
612 return getOrdinal(ao[i]) < getOrdinal(ao[j])
613 }
614
615
616
617
618 type descendingOrdinal []*v1.Pod
619
620 func (do descendingOrdinal) Len() int {
621 return len(do)
622 }
623
624 func (do descendingOrdinal) Swap(i, j int) {
625 do[i], do[j] = do[j], do[i]
626 }
627
628 func (do descendingOrdinal) Less(i, j int) bool {
629 return getOrdinal(do[i]) > getOrdinal(do[j])
630 }
631
632
633
634
635
636
637 func getStatefulSetMaxUnavailable(maxUnavailable *intstr.IntOrString, replicaCount int) (int, error) {
638 maxUnavailableNum, err := intstr.GetScaledValueFromIntOrPercent(intstr.ValueOrDefault(maxUnavailable, intstr.FromInt32(1)), replicaCount, false)
639 if err != nil {
640 return 0, err
641 }
642
643
644 if maxUnavailableNum < 1 {
645 maxUnavailableNum = 1
646 }
647 return maxUnavailableNum, nil
648 }
649
View as plain text