    17  package statefulset
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"reflect"
    23  	"regexp"
    24  	"sort"
    25  	"strconv"
    27  	appsv1 "k8s.io/api/apps/v1"
    28  	v1 "k8s.io/api/core/v1"
    29  	"k8s.io/apimachinery/pkg/api/resource"
    30  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    31  	clientset "k8s.io/client-go/kubernetes"
    32  	"k8s.io/kubectl/pkg/util/podutils"
    33  	"k8s.io/kubernetes/test/e2e/framework"
    34  	e2epodoutput "k8s.io/kubernetes/test/e2e/framework/pod/output"
    35  	imageutils "k8s.io/kubernetes/test/utils/image"
    36  	"k8s.io/utils/pointer"
    37  )
    39  // NewStatefulSet creates a new Webserver StatefulSet for testing. The StatefulSet is named name, is in namespace ns,
    40  // statefulPodsMounts are the mounts that will be backed by PVs. podsMounts are the mounts that are mounted directly
    41  // to the Pod. labels are the labels that will be usd for the StatefulSet selector.
    42  func NewStatefulSet(name, ns, governingSvcName string, replicas int32, statefulPodMounts []v1.VolumeMount, podMounts []v1.VolumeMount, labels map[string]string) *appsv1.StatefulSet {
    43  	mounts := append(statefulPodMounts, podMounts...)
    44  	claims := []v1.PersistentVolumeClaim{}
    45  	for _, m := range statefulPodMounts {
    46  		claims = append(claims, NewStatefulSetPVC(m.Name))
    47  	}
    49  	vols := []v1.Volume{}
    50  	for _, m := range podMounts {
    51  		vols = append(vols, v1.Volume{
    52  			Name: m.Name,
    53  			VolumeSource: v1.VolumeSource{
    54  				HostPath: &v1.HostPathVolumeSource{
    55  					Path: fmt.Sprintf("/tmp/%v", m.Name),
    56  				},
    57  			},
    58  		})
    59  	}
    61  	return &appsv1.StatefulSet{
    62  		TypeMeta: metav1.TypeMeta{
    63  			Kind:       "StatefulSet",
    64  			APIVersion: "apps/v1",
    65  		},
    66  		ObjectMeta: metav1.ObjectMeta{
    67  			Name:      name,
    68  			Namespace: ns,
    69  		},
    70  		Spec: appsv1.StatefulSetSpec{
    71  			Selector: &metav1.LabelSelector{
    72  				MatchLabels: labels,
    73  			},
    74  			Replicas: pointer.Int32(replicas),
    75  			Template: v1.PodTemplateSpec{
    76  				ObjectMeta: metav1.ObjectMeta{
    77  					Labels:      labels,
    78  					Annotations: map[string]string{},
    79  				},
    80  				Spec: v1.PodSpec{
    81  					Containers: []v1.Container{
    82  						{
    83  							Name:         "webserver",
    84  							Image:        imageutils.GetE2EImage(imageutils.Httpd),
    85  							VolumeMounts: mounts,
    86  						},
    87  					},
    88  					Volumes: vols,
    89  				},
    90  			},
    91  			UpdateStrategy:       appsv1.StatefulSetUpdateStrategy{Type: appsv1.RollingUpdateStatefulSetStrategyType},
    92  			VolumeClaimTemplates: claims,
    93  			ServiceName:          governingSvcName,
    94  		},
    95  	}
    96  }
    98  // NewStatefulSetPVC returns a PersistentVolumeClaim named name, for testing StatefulSets.
    99  func NewStatefulSetPVC(name string) v1.PersistentVolumeClaim {
   100  	return v1.PersistentVolumeClaim{
   101  		ObjectMeta: metav1.ObjectMeta{
   102  			Name: name,
   103  		},
   104  		Spec: v1.PersistentVolumeClaimSpec{
   105  			AccessModes: []v1.PersistentVolumeAccessMode{
   106  				v1.ReadWriteOnce,
   107  			},
   108  			Resources: v1.VolumeResourceRequirements{
   109  				Requests: v1.ResourceList{
   110  					v1.ResourceStorage: *resource.NewQuantity(1, resource.BinarySI),
   111  				},
   112  			},
   113  		},
   114  	}
   115  }
   117  func hasPauseProbe(pod *v1.Pod) bool {
   118  	probe := pod.Spec.Containers[0].ReadinessProbe
   119  	return probe != nil && reflect.DeepEqual(probe.Exec.Command, pauseProbe.Exec.Command)
   120  }
   122  var pauseProbe = &v1.Probe{
   123  	ProbeHandler: v1.ProbeHandler{
   124  		Exec: &v1.ExecAction{Command: []string{"test", "-f", "/data/statefulset-continue"}},
   125  	},
   126  	PeriodSeconds:    1,
   127  	SuccessThreshold: 1,
   128  	FailureThreshold: 1,
   129  }
   131  type statefulPodsByOrdinal []v1.Pod
   133  func (sp statefulPodsByOrdinal) Len() int {
   134  	return len(sp)
   135  }
   137  func (sp statefulPodsByOrdinal) Swap(i, j int) {
   138  	sp[i], sp[j] = sp[j], sp[i]
   139  }
   141  func (sp statefulPodsByOrdinal) Less(i, j int) bool {
   142  	return getStatefulPodOrdinal(&sp[i]) < getStatefulPodOrdinal(&sp[j])
   143  }
   145  // PauseNewPods adds an always-failing ReadinessProbe to the StatefulSet PodTemplate.
   146  // This causes all newly-created Pods to stay Unready until they are manually resumed
   147  // with ResumeNextPod().
   148  // Note that this cannot be used together with SetHTTPProbe().
   149  func PauseNewPods(ss *appsv1.StatefulSet) {
   150  	ss.Spec.Template.Spec.Containers[0].ReadinessProbe = pauseProbe
   151  }
   153  // ResumeNextPod allows the next Pod in the StatefulSet to continue by removing the ReadinessProbe
   154  // added by PauseNewPods(), if it's still there.
   155  // It fails the test if it finds any pods that are not in phase Running,
   156  // or if it finds more than one paused Pod existing at the same time.
   157  // This is a no-op if there are no paused pods.
   158  func ResumeNextPod(ctx context.Context, c clientset.Interface, ss *appsv1.StatefulSet) {
   159  	podList := GetPodList(ctx, c, ss)
   160  	resumedPod := ""
   161  	for _, pod := range podList.Items {
   162  		if pod.Status.Phase != v1.PodRunning {
   163  			framework.Failf("Found pod in phase %q, cannot resume", pod.Status.Phase)
   164  		}
   165  		if podutils.IsPodReady(&pod) || !hasPauseProbe(&pod) {
   166  			continue
   167  		}
   168  		if resumedPod != "" {
   169  			framework.Failf("Found multiple paused stateful pods: %v and %v", pod.Name, resumedPod)
   170  		}
   171  		_, err := e2epodoutput.RunHostCmdWithRetries(pod.Namespace, pod.Name, "dd if=/dev/zero of=/data/statefulset-continue bs=1 count=1 conv=fsync", StatefulSetPoll, StatefulPodTimeout)
   172  		framework.ExpectNoError(err)
   173  		framework.Logf("Resumed pod %v", pod.Name)
   174  		resumedPod = pod.Name
   175  	}
   176  }
   178  // SortStatefulPods sorts pods by their ordinals
   179  func SortStatefulPods(pods *v1.PodList) {
   180  	sort.Sort(statefulPodsByOrdinal(pods.Items))
   181  }
   183  var statefulPodRegex = regexp.MustCompile("(.*)-([0-9]+)$")
   185  func getStatefulPodOrdinal(pod *v1.Pod) int {
   186  	ordinal := -1
   187  	subMatches := statefulPodRegex.FindStringSubmatch(pod.Name)
   188  	if len(subMatches) < 3 {
   189  		return ordinal
   190  	}
   191  	if i, err := strconv.ParseInt(subMatches[2], 10, 32); err == nil {
   192  		ordinal = int(i)
   193  	}
   194  	return ordinal
   195  }

