1
16
17 package statefulset
18
19 import (
20 "context"
21 "fmt"
22 "reflect"
23 "regexp"
24 "sort"
25 "strconv"
26
27 appsv1 "k8s.io/api/apps/v1"
28 v1 "k8s.io/api/core/v1"
29 "k8s.io/apimachinery/pkg/api/resource"
30 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
31 clientset "k8s.io/client-go/kubernetes"
32 "k8s.io/kubectl/pkg/util/podutils"
33 "k8s.io/kubernetes/test/e2e/framework"
34 e2epodoutput "k8s.io/kubernetes/test/e2e/framework/pod/output"
35 imageutils "k8s.io/kubernetes/test/utils/image"
36 "k8s.io/utils/pointer"
37 )
38
39
40
41
42 func NewStatefulSet(name, ns, governingSvcName string, replicas int32, statefulPodMounts []v1.VolumeMount, podMounts []v1.VolumeMount, labels map[string]string) *appsv1.StatefulSet {
43 mounts := append(statefulPodMounts, podMounts...)
44 claims := []v1.PersistentVolumeClaim{}
45 for _, m := range statefulPodMounts {
46 claims = append(claims, NewStatefulSetPVC(m.Name))
47 }
48
49 vols := []v1.Volume{}
50 for _, m := range podMounts {
51 vols = append(vols, v1.Volume{
52 Name: m.Name,
53 VolumeSource: v1.VolumeSource{
54 HostPath: &v1.HostPathVolumeSource{
55 Path: fmt.Sprintf("/tmp/%v", m.Name),
56 },
57 },
58 })
59 }
60
61 return &appsv1.StatefulSet{
62 TypeMeta: metav1.TypeMeta{
63 Kind: "StatefulSet",
64 APIVersion: "apps/v1",
65 },
66 ObjectMeta: metav1.ObjectMeta{
67 Name: name,
68 Namespace: ns,
69 },
70 Spec: appsv1.StatefulSetSpec{
71 Selector: &metav1.LabelSelector{
72 MatchLabels: labels,
73 },
74 Replicas: pointer.Int32(replicas),
75 Template: v1.PodTemplateSpec{
76 ObjectMeta: metav1.ObjectMeta{
77 Labels: labels,
78 Annotations: map[string]string{},
79 },
80 Spec: v1.PodSpec{
81 Containers: []v1.Container{
82 {
83 Name: "webserver",
84 Image: imageutils.GetE2EImage(imageutils.Httpd),
85 VolumeMounts: mounts,
86 },
87 },
88 Volumes: vols,
89 },
90 },
91 UpdateStrategy: appsv1.StatefulSetUpdateStrategy{Type: appsv1.RollingUpdateStatefulSetStrategyType},
92 VolumeClaimTemplates: claims,
93 ServiceName: governingSvcName,
94 },
95 }
96 }
97
98
99 func NewStatefulSetPVC(name string) v1.PersistentVolumeClaim {
100 return v1.PersistentVolumeClaim{
101 ObjectMeta: metav1.ObjectMeta{
102 Name: name,
103 },
104 Spec: v1.PersistentVolumeClaimSpec{
105 AccessModes: []v1.PersistentVolumeAccessMode{
106 v1.ReadWriteOnce,
107 },
108 Resources: v1.VolumeResourceRequirements{
109 Requests: v1.ResourceList{
110 v1.ResourceStorage: *resource.NewQuantity(1, resource.BinarySI),
111 },
112 },
113 },
114 }
115 }
116
117 func hasPauseProbe(pod *v1.Pod) bool {
118 probe := pod.Spec.Containers[0].ReadinessProbe
119 return probe != nil && reflect.DeepEqual(probe.Exec.Command, pauseProbe.Exec.Command)
120 }
121
122 var pauseProbe = &v1.Probe{
123 ProbeHandler: v1.ProbeHandler{
124 Exec: &v1.ExecAction{Command: []string{"test", "-f", "/data/statefulset-continue"}},
125 },
126 PeriodSeconds: 1,
127 SuccessThreshold: 1,
128 FailureThreshold: 1,
129 }
130
131 type statefulPodsByOrdinal []v1.Pod
132
133 func (sp statefulPodsByOrdinal) Len() int {
134 return len(sp)
135 }
136
137 func (sp statefulPodsByOrdinal) Swap(i, j int) {
138 sp[i], sp[j] = sp[j], sp[i]
139 }
140
141 func (sp statefulPodsByOrdinal) Less(i, j int) bool {
142 return getStatefulPodOrdinal(&sp[i]) < getStatefulPodOrdinal(&sp[j])
143 }
144
145
146
147
148
149 func PauseNewPods(ss *appsv1.StatefulSet) {
150 ss.Spec.Template.Spec.Containers[0].ReadinessProbe = pauseProbe
151 }
152
153
154
155
156
157
158 func ResumeNextPod(ctx context.Context, c clientset.Interface, ss *appsv1.StatefulSet) {
159 podList := GetPodList(ctx, c, ss)
160 resumedPod := ""
161 for _, pod := range podList.Items {
162 if pod.Status.Phase != v1.PodRunning {
163 framework.Failf("Found pod in phase %q, cannot resume", pod.Status.Phase)
164 }
165 if podutils.IsPodReady(&pod) || !hasPauseProbe(&pod) {
166 continue
167 }
168 if resumedPod != "" {
169 framework.Failf("Found multiple paused stateful pods: %v and %v", pod.Name, resumedPod)
170 }
171 _, err := e2epodoutput.RunHostCmdWithRetries(pod.Namespace, pod.Name, "dd if=/dev/zero of=/data/statefulset-continue bs=1 count=1 conv=fsync", StatefulSetPoll, StatefulPodTimeout)
172 framework.ExpectNoError(err)
173 framework.Logf("Resumed pod %v", pod.Name)
174 resumedPod = pod.Name
175 }
176 }
177
178
179 func SortStatefulPods(pods *v1.PodList) {
180 sort.Sort(statefulPodsByOrdinal(pods.Items))
181 }
182
183 var statefulPodRegex = regexp.MustCompile("(.*)-([0-9]+)$")
184
185 func getStatefulPodOrdinal(pod *v1.Pod) int {
186 ordinal := -1
187 subMatches := statefulPodRegex.FindStringSubmatch(pod.Name)
188 if len(subMatches) < 3 {
189 return ordinal
190 }
191 if i, err := strconv.ParseInt(subMatches[2], 10, 32); err == nil {
192 ordinal = int(i)
193 }
194 return ordinal
195 }
196
View as plain text