1
16
17 package apps
18
19 import (
20 "context"
21
22 appsv1 "k8s.io/api/apps/v1"
23 v1 "k8s.io/api/core/v1"
24 clientset "k8s.io/client-go/kubernetes"
25
26 podutil "k8s.io/kubernetes/pkg/api/v1/pod"
27 "k8s.io/kubernetes/test/e2e/framework"
28 e2estatefulset "k8s.io/kubernetes/test/e2e/framework/statefulset"
29 )
30
31
32
33
34
35 func waitForPartitionedRollingUpdate(ctx context.Context, c clientset.Interface, set *appsv1.StatefulSet) (*appsv1.StatefulSet, *v1.PodList) {
36 var pods *v1.PodList
37 if set.Spec.UpdateStrategy.Type != appsv1.RollingUpdateStatefulSetStrategyType {
38 framework.Failf("StatefulSet %s/%s attempt to wait for partitioned update with updateStrategy %s",
39 set.Namespace,
40 set.Name,
41 set.Spec.UpdateStrategy.Type)
42 }
43 if set.Spec.UpdateStrategy.RollingUpdate == nil || set.Spec.UpdateStrategy.RollingUpdate.Partition == nil {
44 framework.Failf("StatefulSet %s/%s attempt to wait for partitioned update with nil RollingUpdate or nil Partition",
45 set.Namespace,
46 set.Name)
47 }
48 e2estatefulset.WaitForState(ctx, c, set, func(set2 *appsv1.StatefulSet, pods2 *v1.PodList) (bool, error) {
49 set = set2
50 pods = pods2
51 partition := int(*set.Spec.UpdateStrategy.RollingUpdate.Partition)
52 if len(pods.Items) < int(*set.Spec.Replicas) {
53 return false, nil
54 }
55 if partition <= 0 && set.Status.UpdateRevision != set.Status.CurrentRevision {
56 framework.Logf("Waiting for StatefulSet %s/%s to complete update",
57 set.Namespace,
58 set.Name,
59 )
60 e2estatefulset.SortStatefulPods(pods)
61 for i := range pods.Items {
62 if pods.Items[i].Labels[appsv1.StatefulSetRevisionLabel] != set.Status.UpdateRevision {
63 framework.Logf("Waiting for Pod %s/%s to have revision %s update revision %s",
64 pods.Items[i].Namespace,
65 pods.Items[i].Name,
66 set.Status.UpdateRevision,
67 pods.Items[i].Labels[appsv1.StatefulSetRevisionLabel])
68 }
69 }
70 return false, nil
71 }
72 for i := int(*set.Spec.Replicas) - 1; i >= partition; i-- {
73 if pods.Items[i].Labels[appsv1.StatefulSetRevisionLabel] != set.Status.UpdateRevision {
74 framework.Logf("Waiting for Pod %s/%s to have revision %s update revision %s",
75 pods.Items[i].Namespace,
76 pods.Items[i].Name,
77 set.Status.UpdateRevision,
78 pods.Items[i].Labels[appsv1.StatefulSetRevisionLabel])
79 return false, nil
80 }
81 }
82 return true, nil
83 })
84 return set, pods
85 }
86
87
88
89 func waitForStatus(ctx context.Context, c clientset.Interface, set *appsv1.StatefulSet) *appsv1.StatefulSet {
90 e2estatefulset.WaitForState(ctx, c, set, func(set2 *appsv1.StatefulSet, pods *v1.PodList) (bool, error) {
91 if set2.Status.ObservedGeneration >= set.Generation {
92 set = set2
93 return true, nil
94 }
95 return false, nil
96 })
97 return set
98 }
99
100
101 func waitForPodNames(ctx context.Context, c clientset.Interface, set *appsv1.StatefulSet, expectedPodNames []string) {
102 e2estatefulset.WaitForState(ctx, c, set,
103 func(intSet *appsv1.StatefulSet, pods *v1.PodList) (bool, error) {
104 if err := expectPodNames(pods, expectedPodNames); err != nil {
105 framework.Logf("Currently %v", err)
106 return false, nil
107 }
108 return true, nil
109 })
110 }
111
112
113
114 func waitForStatusCurrentReplicas(ctx context.Context, c clientset.Interface, set *appsv1.StatefulSet, expectedReplicas int32) *appsv1.StatefulSet {
115 e2estatefulset.WaitForState(ctx, c, set, func(set2 *appsv1.StatefulSet, pods *v1.PodList) (bool, error) {
116 if set2.Status.ObservedGeneration >= set.Generation && set2.Status.CurrentReplicas == expectedReplicas {
117 set = set2
118 return true, nil
119 }
120 return false, nil
121 })
122 return set
123 }
124
125
126 func waitForPodNotReady(ctx context.Context, c clientset.Interface, set *appsv1.StatefulSet, podName string) (*appsv1.StatefulSet, *v1.PodList) {
127 var pods *v1.PodList
128 e2estatefulset.WaitForState(ctx, c, set, func(set2 *appsv1.StatefulSet, pods2 *v1.PodList) (bool, error) {
129 set = set2
130 pods = pods2
131 for i := range pods.Items {
132 if pods.Items[i].Name == podName {
133 return !podutil.IsPodReady(&pods.Items[i]), nil
134 }
135 }
136 return false, nil
137 })
138 return set, pods
139 }
140
141
142
143 func waitForRollingUpdate(ctx context.Context, c clientset.Interface, set *appsv1.StatefulSet) (*appsv1.StatefulSet, *v1.PodList) {
144 var pods *v1.PodList
145 if set.Spec.UpdateStrategy.Type != appsv1.RollingUpdateStatefulSetStrategyType {
146 framework.Failf("StatefulSet %s/%s attempt to wait for rolling update with updateStrategy %s",
147 set.Namespace,
148 set.Name,
149 set.Spec.UpdateStrategy.Type)
150 }
151 e2estatefulset.WaitForState(ctx, c, set, func(set2 *appsv1.StatefulSet, pods2 *v1.PodList) (bool, error) {
152 set = set2
153 pods = pods2
154 if len(pods.Items) < int(*set.Spec.Replicas) {
155 return false, nil
156 }
157 if set.Status.UpdateRevision != set.Status.CurrentRevision {
158 framework.Logf("Waiting for StatefulSet %s/%s to complete update",
159 set.Namespace,
160 set.Name,
161 )
162 e2estatefulset.SortStatefulPods(pods)
163 for i := range pods.Items {
164 if pods.Items[i].Labels[appsv1.StatefulSetRevisionLabel] != set.Status.UpdateRevision {
165 framework.Logf("Waiting for Pod %s/%s to have revision %s update revision %s",
166 pods.Items[i].Namespace,
167 pods.Items[i].Name,
168 set.Status.UpdateRevision,
169 pods.Items[i].Labels[appsv1.StatefulSetRevisionLabel])
170 }
171 }
172 return false, nil
173 }
174 return true, nil
175 })
176 return set, pods
177 }
178
179
180 func waitForRunningAndNotReady(ctx context.Context, c clientset.Interface, numStatefulPods int32, ss *appsv1.StatefulSet) {
181 e2estatefulset.WaitForRunning(ctx, c, numStatefulPods, 0, ss)
182 }
183
View as plain text