1
16
17 package statefulset
18
19 import (
20 "context"
21 "fmt"
22
23 appsv1 "k8s.io/api/apps/v1"
24 v1 "k8s.io/api/core/v1"
25 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26 "k8s.io/apimachinery/pkg/util/wait"
27 clientset "k8s.io/client-go/kubernetes"
28 "k8s.io/kubectl/pkg/util/podutils"
29 "k8s.io/kubernetes/test/e2e/framework"
30 )
31
32
33
34 func WaitForRunning(ctx context.Context, c clientset.Interface, numPodsRunning, numPodsReady int32, ss *appsv1.StatefulSet) {
35 pollErr := wait.PollUntilContextTimeout(ctx, StatefulSetPoll, StatefulSetTimeout, true,
36 func(ctx context.Context) (bool, error) {
37 podList := GetPodList(ctx, c, ss)
38 SortStatefulPods(podList)
39 if int32(len(podList.Items)) < numPodsRunning {
40 framework.Logf("Found %d stateful pods, waiting for %d", len(podList.Items), numPodsRunning)
41 return false, nil
42 }
43 if int32(len(podList.Items)) > numPodsRunning {
44 return false, fmt.Errorf("too many pods scheduled, expected %d got %d", numPodsRunning, len(podList.Items))
45 }
46 for _, p := range podList.Items {
47 shouldBeReady := getStatefulPodOrdinal(&p) < int(numPodsReady)
48 isReady := podutils.IsPodReady(&p)
49 desiredReadiness := shouldBeReady == isReady
50 framework.Logf("Waiting for pod %v to enter %v - Ready=%v, currently %v - Ready=%v", p.Name, v1.PodRunning, shouldBeReady, p.Status.Phase, isReady)
51 if p.Status.Phase != v1.PodRunning || !desiredReadiness {
52 return false, nil
53 }
54 }
55 return true, nil
56 })
57 if pollErr != nil {
58 framework.Failf("Failed waiting for pods to enter running: %v", pollErr)
59 }
60 }
61
62
63 func WaitForState(ctx context.Context, c clientset.Interface, ss *appsv1.StatefulSet, until func(*appsv1.StatefulSet, *v1.PodList) (bool, error)) {
64 pollErr := wait.PollUntilContextTimeout(ctx, StatefulSetPoll, StatefulSetTimeout, true,
65 func(ctx context.Context) (bool, error) {
66 ssGet, err := c.AppsV1().StatefulSets(ss.Namespace).Get(ctx, ss.Name, metav1.GetOptions{})
67 if err != nil {
68 return false, err
69 }
70 podList := GetPodList(ctx, c, ssGet)
71 return until(ssGet, podList)
72 })
73 if pollErr != nil {
74 framework.Failf("Failed waiting for state update: %v", pollErr)
75 }
76 }
77
78
79 func WaitForRunningAndReady(ctx context.Context, c clientset.Interface, numStatefulPods int32, ss *appsv1.StatefulSet) {
80 WaitForRunning(ctx, c, numStatefulPods, numStatefulPods, ss)
81 }
82
83
84 func WaitForPodReady(ctx context.Context, c clientset.Interface, set *appsv1.StatefulSet, podName string) (*appsv1.StatefulSet, *v1.PodList) {
85 var pods *v1.PodList
86 WaitForState(ctx, c, set, func(set2 *appsv1.StatefulSet, pods2 *v1.PodList) (bool, error) {
87 set = set2
88 pods = pods2
89 for i := range pods.Items {
90 if pods.Items[i].Name == podName {
91 return podutils.IsPodReady(&pods.Items[i]), nil
92 }
93 }
94 return false, nil
95 })
96 return set, pods
97 }
98
99
100 func WaitForStatusReadyReplicas(ctx context.Context, c clientset.Interface, ss *appsv1.StatefulSet, expectedReplicas int32) {
101 framework.Logf("Waiting for statefulset status.readyReplicas updated to %d", expectedReplicas)
102
103 ns, name := ss.Namespace, ss.Name
104 pollErr := wait.PollUntilContextTimeout(ctx, StatefulSetPoll, StatefulSetTimeout, true,
105 func(ctx context.Context) (bool, error) {
106 ssGet, err := c.AppsV1().StatefulSets(ns).Get(ctx, name, metav1.GetOptions{})
107 if err != nil {
108 return false, err
109 }
110 if ssGet.Status.ObservedGeneration < ss.Generation {
111 return false, nil
112 }
113 if ssGet.Status.ReadyReplicas != expectedReplicas {
114 framework.Logf("Waiting for statefulset status.readyReplicas to become %d, currently %d", expectedReplicas, ssGet.Status.ReadyReplicas)
115 return false, nil
116 }
117 return true, nil
118 })
119 if pollErr != nil {
120 framework.Failf("Failed waiting for statefulset status.readyReplicas updated to %d: %v", expectedReplicas, pollErr)
121 }
122 }
123
124
125 func WaitForStatusAvailableReplicas(ctx context.Context, c clientset.Interface, ss *appsv1.StatefulSet, expectedReplicas int32) {
126 framework.Logf("Waiting for statefulset status.AvailableReplicas updated to %d", expectedReplicas)
127
128 ns, name := ss.Namespace, ss.Name
129 pollErr := wait.PollUntilContextTimeout(ctx, StatefulSetPoll, StatefulSetTimeout, true,
130 func(ctx context.Context) (bool, error) {
131 ssGet, err := c.AppsV1().StatefulSets(ns).Get(ctx, name, metav1.GetOptions{})
132 if err != nil {
133 return false, err
134 }
135 if ssGet.Status.ObservedGeneration < ss.Generation {
136 return false, nil
137 }
138 if ssGet.Status.AvailableReplicas != expectedReplicas {
139 framework.Logf("Waiting for statefulset status.AvailableReplicas to become %d, currently %d", expectedReplicas, ssGet.Status.AvailableReplicas)
140 return false, nil
141 }
142 return true, nil
143 })
144 if pollErr != nil {
145 framework.Failf("Failed waiting for statefulset status.AvailableReplicas updated to %d: %v", expectedReplicas, pollErr)
146 }
147 }
148
149
150 func WaitForStatusReplicas(ctx context.Context, c clientset.Interface, ss *appsv1.StatefulSet, expectedReplicas int32) {
151 framework.Logf("Waiting for statefulset status.replicas updated to %d", expectedReplicas)
152
153 ns, name := ss.Namespace, ss.Name
154 pollErr := wait.PollUntilContextTimeout(ctx, StatefulSetPoll, StatefulSetTimeout, true,
155 func(ctx context.Context) (bool, error) {
156 ssGet, err := c.AppsV1().StatefulSets(ns).Get(ctx, name, metav1.GetOptions{})
157 if err != nil {
158 return false, err
159 }
160 if ssGet.Status.ObservedGeneration < ss.Generation {
161 return false, nil
162 }
163 if ssGet.Status.Replicas != expectedReplicas {
164 framework.Logf("Waiting for statefulset status.replicas to become %d, currently %d", expectedReplicas, ssGet.Status.Replicas)
165 return false, nil
166 }
167 return true, nil
168 })
169 if pollErr != nil {
170 framework.Failf("Failed waiting for statefulset status.replicas updated to %d: %v", expectedReplicas, pollErr)
171 }
172 }
173
174
175 func Saturate(ctx context.Context, c clientset.Interface, ss *appsv1.StatefulSet) {
176 var i int32
177 for i = 0; i < *(ss.Spec.Replicas); i++ {
178 framework.Logf("Waiting for stateful pod at index %v to enter Running", i)
179 WaitForRunning(ctx, c, i+1, i, ss)
180 framework.Logf("Resuming stateful pod at index %v", i)
181 ResumeNextPod(ctx, c, ss)
182 }
183 }
184
View as plain text