1
16
17 package statefulset
18
19 import (
20 "context"
21 "fmt"
22 "k8s.io/kubernetes/test/utils/ktesting"
23 "sync"
24 "testing"
25 "time"
26
27 appsv1 "k8s.io/api/apps/v1"
28 v1 "k8s.io/api/core/v1"
29 "k8s.io/apimachinery/pkg/api/resource"
30 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
31 "k8s.io/apimachinery/pkg/labels"
32 "k8s.io/apimachinery/pkg/util/wait"
33 "k8s.io/apiserver/pkg/admission"
34 "k8s.io/client-go/informers"
35 clientset "k8s.io/client-go/kubernetes"
36 typedappsv1 "k8s.io/client-go/kubernetes/typed/apps/v1"
37 typedv1 "k8s.io/client-go/kubernetes/typed/core/v1"
38 restclient "k8s.io/client-go/rest"
39 "k8s.io/client-go/util/retry"
40 kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
41 api "k8s.io/kubernetes/pkg/apis/core"
42
43
44 "k8s.io/kubernetes/pkg/controller/statefulset"
45 "k8s.io/kubernetes/test/integration/framework"
46 )
47
48 const (
49 pollInterval = 100 * time.Millisecond
50 pollTimeout = 60 * time.Second
51 )
52
53 func labelMap() map[string]string {
54 return map[string]string{"foo": "bar"}
55 }
56
57
58 func newHeadlessService(namespace string) *v1.Service {
59 return &v1.Service{
60 TypeMeta: metav1.TypeMeta{
61 Kind: "Service",
62 APIVersion: "v1",
63 },
64 ObjectMeta: metav1.ObjectMeta{
65 Namespace: namespace,
66 Name: "fake-service-name",
67 },
68 Spec: v1.ServiceSpec{
69 ClusterIP: "None",
70 Ports: []v1.ServicePort{
71 {Port: 80, Name: "http", Protocol: "TCP"},
72 },
73 Selector: labelMap(),
74 },
75 }
76 }
77
78
79 func newSTS(name, namespace string, replicas int) *appsv1.StatefulSet {
80 replicasCopy := int32(replicas)
81 return &appsv1.StatefulSet{
82 TypeMeta: metav1.TypeMeta{
83 Kind: "StatefulSet",
84 APIVersion: "apps/v1",
85 },
86 ObjectMeta: metav1.ObjectMeta{
87 Namespace: namespace,
88 Name: name,
89 },
90 Spec: appsv1.StatefulSetSpec{
91 PodManagementPolicy: appsv1.ParallelPodManagement,
92 Replicas: &replicasCopy,
93 Selector: &metav1.LabelSelector{
94 MatchLabels: labelMap(),
95 },
96 Template: v1.PodTemplateSpec{
97 ObjectMeta: metav1.ObjectMeta{
98 Labels: labelMap(),
99 },
100 Spec: v1.PodSpec{
101 Containers: []v1.Container{
102 {
103 Name: "fake-name",
104 Image: "fakeimage",
105 VolumeMounts: []v1.VolumeMount{
106 {Name: "datadir", MountPath: "/data/"},
107 {Name: "home", MountPath: "/home"},
108 },
109 },
110 },
111 Volumes: []v1.Volume{
112 {
113 Name: "datadir",
114 VolumeSource: v1.VolumeSource{
115 PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
116 ClaimName: "fake-pvc-name",
117 },
118 },
119 },
120 {
121 Name: "home",
122 VolumeSource: v1.VolumeSource{
123 HostPath: &v1.HostPathVolumeSource{
124 Path: fmt.Sprintf("/tmp/%v", "home"),
125 },
126 },
127 },
128 },
129 },
130 },
131 ServiceName: "fake-service-name",
132 UpdateStrategy: appsv1.StatefulSetUpdateStrategy{
133 Type: appsv1.RollingUpdateStatefulSetStrategyType,
134 },
135 VolumeClaimTemplates: []v1.PersistentVolumeClaim{
136
137 newStatefulSetPVC("fake-pvc-name"),
138 },
139 },
140 }
141 }
142
143 func newStatefulSetPVC(name string) v1.PersistentVolumeClaim {
144 return v1.PersistentVolumeClaim{
145 ObjectMeta: metav1.ObjectMeta{
146 Name: name,
147 Annotations: map[string]string{
148 "volume.alpha.kubernetes.io/storage-class": "anything",
149 },
150 },
151 Spec: v1.PersistentVolumeClaimSpec{
152 AccessModes: []v1.PersistentVolumeAccessMode{
153 v1.ReadWriteOnce,
154 },
155 Resources: v1.VolumeResourceRequirements{
156 Requests: v1.ResourceList{
157 v1.ResourceStorage: *resource.NewQuantity(1, resource.BinarySI),
158 },
159 },
160 },
161 }
162 }
163
164
165 func scSetup(t *testing.T) (context.Context, kubeapiservertesting.TearDownFunc, *statefulset.StatefulSetController, informers.SharedInformerFactory, clientset.Interface) {
166 tCtx := ktesting.Init(t)
167
168 server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, framework.SharedEtcd())
169
170 config := restclient.CopyConfig(server.ClientConfig)
171 clientSet, err := clientset.NewForConfig(config)
172 if err != nil {
173 t.Fatalf("error in create clientset: %v", err)
174 }
175 resyncPeriod := 12 * time.Hour
176 informers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(config, "statefulset-informers")), resyncPeriod)
177
178 sc := statefulset.NewStatefulSetController(
179 tCtx,
180 informers.Core().V1().Pods(),
181 informers.Apps().V1().StatefulSets(),
182 informers.Core().V1().PersistentVolumeClaims(),
183 informers.Apps().V1().ControllerRevisions(),
184 clientset.NewForConfigOrDie(restclient.AddUserAgent(config, "statefulset-controller")),
185 )
186
187 teardown := func() {
188 tCtx.Cancel("tearing down controller")
189 server.TearDownFn()
190 }
191 return tCtx, teardown, sc, informers, clientSet
192 }
193
194
195 func runControllerAndInformers(ctx context.Context, sc *statefulset.StatefulSetController, informers informers.SharedInformerFactory) context.CancelFunc {
196 ctx, cancel := context.WithCancel(ctx)
197 informers.Start(ctx.Done())
198 go sc.Run(ctx, 5)
199 return cancel
200 }
201
202 func createHeadlessService(t *testing.T, clientSet clientset.Interface, headlessService *v1.Service) {
203 _, err := clientSet.CoreV1().Services(headlessService.Namespace).Create(context.TODO(), headlessService, metav1.CreateOptions{})
204 if err != nil {
205 t.Fatalf("failed creating headless service: %v", err)
206 }
207 }
208
209 func createSTSs(t *testing.T, clientSet clientset.Interface, stss []*appsv1.StatefulSet) []*appsv1.StatefulSet {
210 var createdSTSs []*appsv1.StatefulSet
211 for _, sts := range stss {
212 createdSTS, err := clientSet.AppsV1().StatefulSets(sts.Namespace).Create(context.TODO(), sts, metav1.CreateOptions{})
213 if err != nil {
214 t.Fatalf("failed to create sts %s: %v", sts.Name, err)
215 }
216 createdSTSs = append(createdSTSs, createdSTS)
217 }
218 return createdSTSs
219 }
220
221 func createPods(t *testing.T, clientSet clientset.Interface, pods []*v1.Pod) []*v1.Pod {
222 var createdPods []*v1.Pod
223 for _, pod := range pods {
224 createdPod, err := clientSet.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{})
225 if err != nil {
226 t.Fatalf("failed to create pod %s: %v", pod.Name, err)
227 }
228 createdPods = append(createdPods, createdPod)
229 }
230
231 return createdPods
232 }
233
234 func createSTSsPods(t *testing.T, clientSet clientset.Interface, stss []*appsv1.StatefulSet, pods []*v1.Pod) ([]*appsv1.StatefulSet, []*v1.Pod) {
235 return createSTSs(t, clientSet, stss), createPods(t, clientSet, pods)
236 }
237
238
239 func waitSTSStable(t *testing.T, clientSet clientset.Interface, sts *appsv1.StatefulSet) {
240 stsClient := clientSet.AppsV1().StatefulSets(sts.Namespace)
241 desiredGeneration := sts.Generation
242 if err := wait.PollImmediate(pollInterval, pollTimeout, func() (bool, error) {
243 newSTS, err := stsClient.Get(context.TODO(), sts.Name, metav1.GetOptions{})
244 if err != nil {
245 return false, err
246 }
247 return newSTS.Status.Replicas == *newSTS.Spec.Replicas && newSTS.Status.ObservedGeneration >= desiredGeneration, nil
248 }); err != nil {
249 t.Fatalf("failed to verify .Status.Replicas is equal to .Spec.Replicas for sts %s: %v", sts.Name, err)
250 }
251 }
252
253 func updatePod(t *testing.T, podClient typedv1.PodInterface, podName string, updateFunc func(*v1.Pod)) *v1.Pod {
254 var pod *v1.Pod
255 if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
256 newPod, err := podClient.Get(context.TODO(), podName, metav1.GetOptions{})
257 if err != nil {
258 return err
259 }
260 updateFunc(newPod)
261 pod, err = podClient.Update(context.TODO(), newPod, metav1.UpdateOptions{})
262 return err
263 }); err != nil {
264 t.Fatalf("failed to update pod %s: %v", podName, err)
265 }
266 return pod
267 }
268
269 func updatePodStatus(t *testing.T, podClient typedv1.PodInterface, podName string, updateStatusFunc func(*v1.Pod)) *v1.Pod {
270 var pod *v1.Pod
271 if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
272 newPod, err := podClient.Get(context.TODO(), podName, metav1.GetOptions{})
273 if err != nil {
274 return err
275 }
276 updateStatusFunc(newPod)
277 pod, err = podClient.UpdateStatus(context.TODO(), newPod, metav1.UpdateOptions{})
278 return err
279 }); err != nil {
280 t.Fatalf("failed to update status of pod %s: %v", podName, err)
281 }
282 return pod
283 }
284
285 func getPods(t *testing.T, podClient typedv1.PodInterface, labelMap map[string]string) *v1.PodList {
286 podSelector := labels.Set(labelMap).AsSelector()
287 options := metav1.ListOptions{LabelSelector: podSelector.String()}
288 pods, err := podClient.List(context.TODO(), options)
289 if err != nil {
290 t.Fatalf("failed obtaining a list of pods that match the pod labels %v: %v", labelMap, err)
291 }
292 if pods == nil {
293 t.Fatalf("obtained a nil list of pods")
294 }
295 return pods
296 }
297
298 func getStatefulSetPVCs(t *testing.T, pvcClient typedv1.PersistentVolumeClaimInterface, sts *appsv1.StatefulSet) []*v1.PersistentVolumeClaim {
299 pvcs := []*v1.PersistentVolumeClaim{}
300 for i := int32(0); i < *sts.Spec.Replicas; i++ {
301 pvcName := fmt.Sprintf("%s-%s-%d", sts.Spec.VolumeClaimTemplates[0].Name, sts.Name, i)
302 pvc, err := pvcClient.Get(context.TODO(), pvcName, metav1.GetOptions{})
303 if err != nil {
304 t.Fatalf("failed to get PVC %s: %v", pvcName, err)
305 }
306 pvcs = append(pvcs, pvc)
307 }
308 return pvcs
309 }
310
311 func verifyOwnerRef(t *testing.T, pvc *v1.PersistentVolumeClaim, kind string, expected bool) {
312 found := false
313 for _, ref := range pvc.GetOwnerReferences() {
314 if ref.Kind == kind {
315 if expected {
316 found = true
317 } else {
318 t.Fatalf("Found %s ref but expected none for PVC %s", kind, pvc.Name)
319 }
320 }
321 }
322 if expected && !found {
323 t.Fatalf("Expected %s ref but found none for PVC %s", kind, pvc.Name)
324 }
325 }
326
327 func updateSTS(t *testing.T, stsClient typedappsv1.StatefulSetInterface, stsName string, updateFunc func(*appsv1.StatefulSet)) *appsv1.StatefulSet {
328 var sts *appsv1.StatefulSet
329 if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
330 newSTS, err := stsClient.Get(context.TODO(), stsName, metav1.GetOptions{})
331 if err != nil {
332 return err
333 }
334 updateFunc(newSTS)
335 sts, err = stsClient.Update(context.TODO(), newSTS, metav1.UpdateOptions{})
336 return err
337 }); err != nil {
338 t.Fatalf("failed to update sts %s: %v", stsName, err)
339 }
340 return sts
341 }
342
343
344 func scaleSTS(t *testing.T, c clientset.Interface, sts *appsv1.StatefulSet, replicas int32) {
345 stsClient := c.AppsV1().StatefulSets(sts.Namespace)
346 if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
347 newSTS, err := stsClient.Get(context.TODO(), sts.Name, metav1.GetOptions{})
348 if err != nil {
349 return err
350 }
351 *newSTS.Spec.Replicas = replicas
352 sts, err = stsClient.Update(context.TODO(), newSTS, metav1.UpdateOptions{})
353 return err
354 }); err != nil {
355 t.Fatalf("failed to update .Spec.Replicas to %d for sts %s: %v", replicas, sts.Name, err)
356 }
357 waitSTSStable(t, c, sts)
358 }
359
360 var _ admission.ValidationInterface = &fakePodFailAdmission{}
361
362 type fakePodFailAdmission struct {
363 lock sync.Mutex
364 limitedPodNumber int
365 succeedPodsCount int
366 }
367
368 func (f *fakePodFailAdmission) Handles(operation admission.Operation) bool {
369 return operation == admission.Create
370 }
371
372 func (f *fakePodFailAdmission) Validate(ctx context.Context, attr admission.Attributes, o admission.ObjectInterfaces) (err error) {
373 if attr.GetKind().GroupKind() != api.Kind("Pod") {
374 return nil
375 }
376
377 f.lock.Lock()
378 defer f.lock.Unlock()
379
380 if f.succeedPodsCount >= f.limitedPodNumber {
381 return fmt.Errorf("fakePodFailAdmission error")
382 }
383 f.succeedPodsCount++
384 return nil
385 }
386
View as plain text