1
16
17 package statefulset
18
19 import (
20 "context"
21 "fmt"
22 "path/filepath"
23 "strings"
24 "time"
25
26 appsv1 "k8s.io/api/apps/v1"
27 v1 "k8s.io/api/core/v1"
28 apierrors "k8s.io/apimachinery/pkg/api/errors"
29 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30 "k8s.io/apimachinery/pkg/labels"
31 "k8s.io/apimachinery/pkg/util/sets"
32 "k8s.io/apimachinery/pkg/util/wait"
33 clientset "k8s.io/client-go/kubernetes"
34 "k8s.io/kubectl/pkg/util/podutils"
35 "k8s.io/kubernetes/test/e2e/framework"
36 e2emanifest "k8s.io/kubernetes/test/e2e/framework/manifest"
37 e2epodoutput "k8s.io/kubernetes/test/e2e/framework/pod/output"
38 )
39
40
41 func CreateStatefulSet(ctx context.Context, c clientset.Interface, manifestPath, ns string) *appsv1.StatefulSet {
42 mkpath := func(file string) string {
43 return filepath.Join(manifestPath, file)
44 }
45
46 framework.Logf("Parsing statefulset from %v", mkpath("statefulset.yaml"))
47 ss, err := e2emanifest.StatefulSetFromManifest(mkpath("statefulset.yaml"), ns)
48 framework.ExpectNoError(err)
49 framework.Logf("Parsing service from %v", mkpath("service.yaml"))
50 svc, err := e2emanifest.SvcFromManifest(mkpath("service.yaml"))
51 framework.ExpectNoError(err)
52
53 framework.Logf(fmt.Sprintf("creating " + ss.Name + " service"))
54 _, err = c.CoreV1().Services(ns).Create(ctx, svc, metav1.CreateOptions{})
55 framework.ExpectNoError(err)
56
57 framework.Logf(fmt.Sprintf("creating statefulset %v/%v with %d replicas and selector %+v", ss.Namespace, ss.Name, *(ss.Spec.Replicas), ss.Spec.Selector))
58 _, err = c.AppsV1().StatefulSets(ns).Create(ctx, ss, metav1.CreateOptions{})
59 framework.ExpectNoError(err)
60 WaitForRunningAndReady(ctx, c, *ss.Spec.Replicas, ss)
61 return ss
62 }
63
64
65 func GetPodList(ctx context.Context, c clientset.Interface, ss *appsv1.StatefulSet) *v1.PodList {
66 selector, err := metav1.LabelSelectorAsSelector(ss.Spec.Selector)
67 framework.ExpectNoError(err)
68 podList, err := c.CoreV1().Pods(ss.Namespace).List(ctx, metav1.ListOptions{LabelSelector: selector.String()})
69 framework.ExpectNoError(err)
70 return podList
71 }
72
73
74 func DeleteAllStatefulSets(ctx context.Context, c clientset.Interface, ns string) {
75 ssList, err := c.AppsV1().StatefulSets(ns).List(ctx, metav1.ListOptions{LabelSelector: labels.Everything().String()})
76 framework.ExpectNoError(err)
77
78
79
80 errList := []string{}
81 for i := range ssList.Items {
82 ss := &ssList.Items[i]
83 var err error
84 if ss, err = Scale(ctx, c, ss, 0); err != nil {
85 errList = append(errList, fmt.Sprintf("%v", err))
86 }
87 WaitForStatusReplicas(ctx, c, ss, 0)
88 framework.Logf("Deleting statefulset %v", ss.Name)
89
90
91 if err := c.AppsV1().StatefulSets(ss.Namespace).Delete(ctx, ss.Name, metav1.DeleteOptions{OrphanDependents: new(bool)}); err != nil {
92 errList = append(errList, fmt.Sprintf("%v", err))
93 }
94 }
95
96
97 pvNames := sets.NewString()
98
99 pvcPollErr := wait.PollUntilContextTimeout(ctx, StatefulSetPoll, StatefulSetTimeout, true, func(ctx context.Context) (bool, error) {
100 pvcList, err := c.CoreV1().PersistentVolumeClaims(ns).List(ctx, metav1.ListOptions{LabelSelector: labels.Everything().String()})
101 if err != nil {
102 framework.Logf("WARNING: Failed to list pvcs, retrying %v", err)
103 return false, nil
104 }
105 for _, pvc := range pvcList.Items {
106 pvNames.Insert(pvc.Spec.VolumeName)
107
108 framework.Logf("Deleting pvc: %v with volume %v", pvc.Name, pvc.Spec.VolumeName)
109 if err := c.CoreV1().PersistentVolumeClaims(ns).Delete(ctx, pvc.Name, metav1.DeleteOptions{}); err != nil {
110 return false, nil
111 }
112 }
113 return true, nil
114 })
115 if pvcPollErr != nil {
116 errList = append(errList, fmt.Sprintf("Timeout waiting for pvc deletion."))
117 }
118
119 pollErr := wait.PollUntilContextTimeout(ctx, StatefulSetPoll, StatefulSetTimeout, true, func(ctx context.Context) (bool, error) {
120 pvList, err := c.CoreV1().PersistentVolumes().List(ctx, metav1.ListOptions{LabelSelector: labels.Everything().String()})
121 if err != nil {
122 framework.Logf("WARNING: Failed to list pvs, retrying %v", err)
123 return false, nil
124 }
125 waitingFor := []string{}
126 for _, pv := range pvList.Items {
127 if pvNames.Has(pv.Name) {
128 waitingFor = append(waitingFor, fmt.Sprintf("%v: %+v", pv.Name, pv.Status))
129 }
130 }
131 if len(waitingFor) == 0 {
132 return true, nil
133 }
134 framework.Logf("Still waiting for pvs of statefulset to disappear:\n%v", strings.Join(waitingFor, "\n"))
135 return false, nil
136 })
137 if pollErr != nil {
138 errList = append(errList, fmt.Sprintf("Timeout waiting for pv provisioner to delete pvs, this might mean the test leaked pvs."))
139 }
140 if len(errList) != 0 {
141 framework.ExpectNoError(fmt.Errorf("%v", strings.Join(errList, "\n")))
142 }
143 }
144
145
146 func Scale(ctx context.Context, c clientset.Interface, ss *appsv1.StatefulSet, count int32) (*appsv1.StatefulSet, error) {
147 name := ss.Name
148 ns := ss.Namespace
149
150 framework.Logf("Scaling statefulset %s to %d", name, count)
151 ss = update(ctx, c, ns, name, count)
152
153 var statefulPodList *v1.PodList
154 pollErr := wait.PollUntilContextTimeout(ctx, StatefulSetPoll, StatefulSetTimeout, true, func(ctx context.Context) (bool, error) {
155 statefulPodList = GetPodList(ctx, c, ss)
156 if int32(len(statefulPodList.Items)) == count {
157 return true, nil
158 }
159 return false, nil
160 })
161 if pollErr != nil {
162 unhealthy := []string{}
163 for _, statefulPod := range statefulPodList.Items {
164 delTs, phase, readiness := statefulPod.DeletionTimestamp, statefulPod.Status.Phase, podutils.IsPodReady(&statefulPod)
165 if delTs != nil || phase != v1.PodRunning || !readiness {
166 unhealthy = append(unhealthy, fmt.Sprintf("%v: deletion %v, phase %v, readiness %v", statefulPod.Name, delTs, phase, readiness))
167 }
168 }
169 return ss, fmt.Errorf("Failed to scale statefulset to %d in %v. Remaining pods:\n%v", count, StatefulSetTimeout, unhealthy)
170 }
171 return ss, nil
172 }
173
174
175 func UpdateReplicas(ctx context.Context, c clientset.Interface, ss *appsv1.StatefulSet, count int32) {
176 update(ctx, c, ss.Namespace, ss.Name, count)
177 }
178
179
180 func Restart(ctx context.Context, c clientset.Interface, ss *appsv1.StatefulSet) {
181 oldReplicas := *(ss.Spec.Replicas)
182 ss, err := Scale(ctx, c, ss, 0)
183 framework.ExpectNoError(err)
184
185
186
187 WaitForStatusReplicas(ctx, c, ss, 0)
188 update(ctx, c, ss.Namespace, ss.Name, oldReplicas)
189 }
190
191
192 func CheckHostname(ctx context.Context, c clientset.Interface, ss *appsv1.StatefulSet) error {
193 cmd := "printf $(hostname)"
194 podList := GetPodList(ctx, c, ss)
195 for _, statefulPod := range podList.Items {
196 hostname, err := e2epodoutput.RunHostCmdWithRetries(statefulPod.Namespace, statefulPod.Name, cmd, StatefulSetPoll, StatefulPodTimeout)
197 if err != nil {
198 return err
199 }
200 if hostname != statefulPod.Name {
201 return fmt.Errorf("unexpected hostname (%s) and stateful pod name (%s) not equal", hostname, statefulPod.Name)
202 }
203 }
204 return nil
205 }
206
207
208 func CheckMount(ctx context.Context, c clientset.Interface, ss *appsv1.StatefulSet, mountPath string) error {
209 for _, cmd := range []string{
210
211 fmt.Sprintf("ls -idlh %v", mountPath),
212
213 fmt.Sprintf("find %v", mountPath),
214
215 fmt.Sprintf("touch %v", filepath.Join(mountPath, fmt.Sprintf("%v", time.Now().UnixNano()))),
216 } {
217 if err := ExecInStatefulPods(ctx, c, ss, cmd); err != nil {
218 return fmt.Errorf("failed to execute %v, error: %w", cmd, err)
219 }
220 }
221 return nil
222 }
223
224
225 func CheckServiceName(ss *appsv1.StatefulSet, expectedServiceName string) error {
226 framework.Logf("Checking if statefulset spec.serviceName is %s", expectedServiceName)
227
228 if expectedServiceName != ss.Spec.ServiceName {
229 return fmt.Errorf("wrong service name governing statefulset. Expected %s got %s",
230 expectedServiceName, ss.Spec.ServiceName)
231 }
232
233 return nil
234 }
235
236
237 func ExecInStatefulPods(ctx context.Context, c clientset.Interface, ss *appsv1.StatefulSet, cmd string) error {
238 podList := GetPodList(ctx, c, ss)
239 for _, statefulPod := range podList.Items {
240 stdout, err := e2epodoutput.RunHostCmdWithRetries(statefulPod.Namespace, statefulPod.Name, cmd, StatefulSetPoll, StatefulPodTimeout)
241 framework.Logf("stdout of %v on %v: %v", cmd, statefulPod.Name, stdout)
242 if err != nil {
243 return err
244 }
245 }
246 return nil
247 }
248
249
250 func update(ctx context.Context, c clientset.Interface, ns, name string, replicas int32) *appsv1.StatefulSet {
251 for i := 0; i < 3; i++ {
252 ss, err := c.AppsV1().StatefulSets(ns).Get(ctx, name, metav1.GetOptions{})
253 if err != nil {
254 framework.Failf("failed to get statefulset %q: %v", name, err)
255 }
256 *(ss.Spec.Replicas) = replicas
257 ss, err = c.AppsV1().StatefulSets(ns).Update(ctx, ss, metav1.UpdateOptions{})
258 if err == nil {
259 return ss
260 }
261 if !apierrors.IsConflict(err) && !apierrors.IsServerTimeout(err) {
262 framework.Failf("failed to update statefulset %q: %v", name, err)
263 }
264 }
265 framework.Failf("too many retries draining statefulset %q", name)
266 return nil
267 }
268
View as plain text