/* Copyright 2019 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package statefulset import ( "context" "fmt" "reflect" "regexp" "sort" "strconv" appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" clientset "k8s.io/client-go/kubernetes" "k8s.io/kubectl/pkg/util/podutils" "k8s.io/kubernetes/test/e2e/framework" e2epodoutput "k8s.io/kubernetes/test/e2e/framework/pod/output" imageutils "k8s.io/kubernetes/test/utils/image" "k8s.io/utils/pointer" ) // NewStatefulSet creates a new Webserver StatefulSet for testing. The StatefulSet is named name, is in namespace ns, // statefulPodsMounts are the mounts that will be backed by PVs. podsMounts are the mounts that are mounted directly // to the Pod. labels are the labels that will be usd for the StatefulSet selector. func NewStatefulSet(name, ns, governingSvcName string, replicas int32, statefulPodMounts []v1.VolumeMount, podMounts []v1.VolumeMount, labels map[string]string) *appsv1.StatefulSet { mounts := append(statefulPodMounts, podMounts...) claims := []v1.PersistentVolumeClaim{} for _, m := range statefulPodMounts { claims = append(claims, NewStatefulSetPVC(m.Name)) } vols := []v1.Volume{} for _, m := range podMounts { vols = append(vols, v1.Volume{ Name: m.Name, VolumeSource: v1.VolumeSource{ HostPath: &v1.HostPathVolumeSource{ Path: fmt.Sprintf("/tmp/%v", m.Name), }, }, }) } return &appsv1.StatefulSet{ TypeMeta: metav1.TypeMeta{ Kind: "StatefulSet", APIVersion: "apps/v1", }, ObjectMeta: metav1.ObjectMeta{ Name: name, Namespace: ns, }, Spec: appsv1.StatefulSetSpec{ Selector: &metav1.LabelSelector{ MatchLabels: labels, }, Replicas: pointer.Int32(replicas), Template: v1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: labels, Annotations: map[string]string{}, }, Spec: v1.PodSpec{ Containers: []v1.Container{ { Name: "webserver", Image: imageutils.GetE2EImage(imageutils.Httpd), VolumeMounts: mounts, }, }, Volumes: vols, }, }, UpdateStrategy: appsv1.StatefulSetUpdateStrategy{Type: appsv1.RollingUpdateStatefulSetStrategyType}, VolumeClaimTemplates: claims, ServiceName: governingSvcName, }, } } // NewStatefulSetPVC returns a PersistentVolumeClaim named name, for testing StatefulSets. func NewStatefulSetPVC(name string) v1.PersistentVolumeClaim { return v1.PersistentVolumeClaim{ ObjectMeta: metav1.ObjectMeta{ Name: name, }, Spec: v1.PersistentVolumeClaimSpec{ AccessModes: []v1.PersistentVolumeAccessMode{ v1.ReadWriteOnce, }, Resources: v1.VolumeResourceRequirements{ Requests: v1.ResourceList{ v1.ResourceStorage: *resource.NewQuantity(1, resource.BinarySI), }, }, }, } } func hasPauseProbe(pod *v1.Pod) bool { probe := pod.Spec.Containers[0].ReadinessProbe return probe != nil && reflect.DeepEqual(probe.Exec.Command, pauseProbe.Exec.Command) } var pauseProbe = &v1.Probe{ ProbeHandler: v1.ProbeHandler{ Exec: &v1.ExecAction{Command: []string{"test", "-f", "/data/statefulset-continue"}}, }, PeriodSeconds: 1, SuccessThreshold: 1, FailureThreshold: 1, } type statefulPodsByOrdinal []v1.Pod func (sp statefulPodsByOrdinal) Len() int { return len(sp) } func (sp statefulPodsByOrdinal) Swap(i, j int) { sp[i], sp[j] = sp[j], sp[i] } func (sp statefulPodsByOrdinal) Less(i, j int) bool { return getStatefulPodOrdinal(&sp[i]) < getStatefulPodOrdinal(&sp[j]) } // PauseNewPods adds an always-failing ReadinessProbe to the StatefulSet PodTemplate. // This causes all newly-created Pods to stay Unready until they are manually resumed // with ResumeNextPod(). // Note that this cannot be used together with SetHTTPProbe(). func PauseNewPods(ss *appsv1.StatefulSet) { ss.Spec.Template.Spec.Containers[0].ReadinessProbe = pauseProbe } // ResumeNextPod allows the next Pod in the StatefulSet to continue by removing the ReadinessProbe // added by PauseNewPods(), if it's still there. // It fails the test if it finds any pods that are not in phase Running, // or if it finds more than one paused Pod existing at the same time. // This is a no-op if there are no paused pods. func ResumeNextPod(ctx context.Context, c clientset.Interface, ss *appsv1.StatefulSet) { podList := GetPodList(ctx, c, ss) resumedPod := "" for _, pod := range podList.Items { if pod.Status.Phase != v1.PodRunning { framework.Failf("Found pod in phase %q, cannot resume", pod.Status.Phase) } if podutils.IsPodReady(&pod) || !hasPauseProbe(&pod) { continue } if resumedPod != "" { framework.Failf("Found multiple paused stateful pods: %v and %v", pod.Name, resumedPod) } _, err := e2epodoutput.RunHostCmdWithRetries(pod.Namespace, pod.Name, "dd if=/dev/zero of=/data/statefulset-continue bs=1 count=1 conv=fsync", StatefulSetPoll, StatefulPodTimeout) framework.ExpectNoError(err) framework.Logf("Resumed pod %v", pod.Name) resumedPod = pod.Name } } // SortStatefulPods sorts pods by their ordinals func SortStatefulPods(pods *v1.PodList) { sort.Sort(statefulPodsByOrdinal(pods.Items)) } var statefulPodRegex = regexp.MustCompile("(.*)-([0-9]+)$") func getStatefulPodOrdinal(pod *v1.Pod) int { ordinal := -1 subMatches := statefulPodRegex.FindStringSubmatch(pod.Name) if len(subMatches) < 3 { return ordinal } if i, err := strconv.ParseInt(subMatches[2], 10, 32); err == nil { ordinal = int(i) } return ordinal }