1
16
17 package apps
18
19 import (
20 "context"
21 "fmt"
22 "strings"
23 "time"
24
25 "github.com/onsi/gomega"
26
27 jsonpatch "github.com/evanphx/json-patch"
28 "github.com/onsi/ginkgo/v2"
29
30 appsv1 "k8s.io/api/apps/v1"
31 v1 "k8s.io/api/core/v1"
32 policyv1 "k8s.io/api/policy/v1"
33 apierrors "k8s.io/apimachinery/pkg/api/errors"
34 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
35 "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
36 "k8s.io/apimachinery/pkg/labels"
37 "k8s.io/apimachinery/pkg/runtime"
38 "k8s.io/apimachinery/pkg/types"
39 "k8s.io/apimachinery/pkg/util/intstr"
40 "k8s.io/apimachinery/pkg/util/json"
41 "k8s.io/apimachinery/pkg/util/wait"
42 "k8s.io/client-go/dynamic"
43 "k8s.io/client-go/kubernetes"
44 clientscheme "k8s.io/client-go/kubernetes/scheme"
45 "k8s.io/client-go/util/retry"
46 podutil "k8s.io/kubernetes/pkg/api/v1/pod"
47 "k8s.io/kubernetes/test/e2e/framework"
48 e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
49 imageutils "k8s.io/kubernetes/test/utils/image"
50 admissionapi "k8s.io/pod-security-admission/api"
51 )
52
53
54
55
56 const (
57 bigClusterSize = 7
58 schedulingTimeout = 10 * time.Minute
59 timeout = 60 * time.Second
60 defaultName = "foo"
61 )
62
63 var defaultLabels = map[string]string{"foo": "bar"}
64
65 var _ = SIGDescribe("DisruptionController", func() {
66 f := framework.NewDefaultFramework("disruption")
67 f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
68 var ns string
69 var cs kubernetes.Interface
70 var dc dynamic.Interface
71
72 ginkgo.BeforeEach(func() {
73 cs = f.ClientSet
74 ns = f.Namespace.Name
75 dc = f.DynamicClient
76 })
77
78 ginkgo.Context("Listing PodDisruptionBudgets for all namespaces", func() {
79 anotherFramework := framework.NewDefaultFramework("disruption-2")
80 anotherFramework.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
81
82
87 framework.ConformanceIt("should list and delete a collection of PodDisruptionBudgets", func(ctx context.Context) {
88 specialLabels := map[string]string{"foo_pdb": "bar_pdb"}
89 labelSelector := labels.SelectorFromSet(specialLabels).String()
90 createPDBMinAvailableOrDie(ctx, cs, ns, defaultName, intstr.FromInt32(2), specialLabels)
91 createPDBMinAvailableOrDie(ctx, cs, ns, "foo2", intstr.FromString("1%"), specialLabels)
92 createPDBMinAvailableOrDie(ctx, anotherFramework.ClientSet, anotherFramework.Namespace.Name, "foo3", intstr.FromInt32(2), specialLabels)
93
94 ginkgo.By("listing a collection of PDBs across all namespaces")
95 listPDBs(ctx, cs, metav1.NamespaceAll, labelSelector, 3, []string{defaultName, "foo2", "foo3"})
96
97 ginkgo.By("listing a collection of PDBs in namespace " + ns)
98 listPDBs(ctx, cs, ns, labelSelector, 2, []string{defaultName, "foo2"})
99 deletePDBCollection(ctx, cs, ns)
100 })
101 })
102
103
108 framework.ConformanceIt("should create a PodDisruptionBudget", func(ctx context.Context) {
109 ginkgo.By("creating the pdb")
110 createPDBMinAvailableOrDie(ctx, cs, ns, defaultName, intstr.FromString("1%"), defaultLabels)
111
112 ginkgo.By("updating the pdb")
113 updatedPDB := updatePDBOrDie(ctx, cs, ns, defaultName, func(pdb *policyv1.PodDisruptionBudget) *policyv1.PodDisruptionBudget {
114 newMinAvailable := intstr.FromString("2%")
115 pdb.Spec.MinAvailable = &newMinAvailable
116 return pdb
117 }, cs.PolicyV1().PodDisruptionBudgets(ns).Update)
118 gomega.Expect(updatedPDB.Spec.MinAvailable.String()).To(gomega.Equal("2%"))
119
120 ginkgo.By("patching the pdb")
121 patchedPDB := patchPDBOrDie(ctx, cs, dc, ns, defaultName, func(old *policyv1.PodDisruptionBudget) (bytes []byte, err error) {
122 newBytes, err := json.Marshal(map[string]interface{}{
123 "spec": map[string]interface{}{
124 "minAvailable": "3%",
125 },
126 })
127 framework.ExpectNoError(err, "failed to marshal JSON for new data")
128 return newBytes, nil
129 })
130 gomega.Expect(patchedPDB.Spec.MinAvailable.String()).To(gomega.Equal("3%"))
131
132 deletePDBOrDie(ctx, cs, ns, defaultName)
133 })
134
135
141 framework.ConformanceIt("should observe PodDisruptionBudget status updated", func(ctx context.Context) {
142 createPDBMinAvailableOrDie(ctx, cs, ns, defaultName, intstr.FromInt32(1), defaultLabels)
143
144 createPodsOrDie(ctx, cs, ns, 3)
145 waitForPodsOrDie(ctx, cs, ns, 3)
146
147
148
149 err := wait.PollUntilContextTimeout(ctx, framework.Poll, timeout, true, func(ctx context.Context) (bool, error) {
150 pdb, err := cs.PolicyV1().PodDisruptionBudgets(ns).Get(ctx, defaultName, metav1.GetOptions{})
151 if err != nil {
152 return false, err
153 }
154 return pdb.Status.DisruptionsAllowed > 0, nil
155 })
156 framework.ExpectNoError(err)
157 })
158
159
164 framework.ConformanceIt("should update/patch PodDisruptionBudget status", func(ctx context.Context) {
165 createPDBMinAvailableOrDie(ctx, cs, ns, defaultName, intstr.FromInt32(1), defaultLabels)
166
167 ginkgo.By("Updating PodDisruptionBudget status")
168
169
170 createPodsOrDie(ctx, cs, ns, 1)
171 waitForPodsOrDie(ctx, cs, ns, 1)
172 pod, _ := locateRunningPod(ctx, cs, ns)
173 updatePDBOrDie(ctx, cs, ns, defaultName, func(old *policyv1.PodDisruptionBudget) *policyv1.PodDisruptionBudget {
174 old.Status.DisruptedPods = make(map[string]metav1.Time)
175 old.Status.DisruptedPods[pod.Name] = metav1.NewTime(time.Now())
176 return old
177 }, cs.PolicyV1().PodDisruptionBudgets(ns).UpdateStatus)
178
179 updated := getPDBStatusOrDie(ctx, dc, ns, defaultName)
180 gomega.Expect(updated.Status.DisruptedPods).To(gomega.HaveKey(pod.Name), "Expecting the DisruptedPods have %s", pod.Name)
181
182 ginkgo.By("Patching PodDisruptionBudget status")
183 patched := patchPDBOrDie(ctx, cs, dc, ns, defaultName, func(old *policyv1.PodDisruptionBudget) (bytes []byte, err error) {
184 oldBytes, err := json.Marshal(old)
185 framework.ExpectNoError(err, "failed to marshal JSON for old data")
186 old.Status.DisruptedPods = make(map[string]metav1.Time)
187 newBytes, err := json.Marshal(old)
188 framework.ExpectNoError(err, "failed to marshal JSON for new data")
189 return jsonpatch.CreateMergePatch(oldBytes, newBytes)
190 }, "status")
191 gomega.Expect(patched.Status.DisruptedPods).To(gomega.BeEmpty(), "Expecting the PodDisruptionBudget's be empty")
192 })
193
194
195 ginkgo.It("should observe that the PodDisruptionBudget status is not updated for unmanaged pods",
196 func(ctx context.Context) {
197 createPDBMinAvailableOrDie(ctx, cs, ns, defaultName, intstr.FromInt32(1), defaultLabels)
198
199 createPodsOrDie(ctx, cs, ns, 3)
200 waitForPodsOrDie(ctx, cs, ns, 3)
201
202
203 gomega.Consistently(ctx, func(ctx context.Context) (bool, error) {
204 pdb, err := cs.PolicyV1().PodDisruptionBudgets(ns).Get(ctx, defaultName, metav1.GetOptions{})
205 if err != nil {
206 return false, err
207 }
208 return isPDBErroring(pdb), nil
209 }, 1*time.Minute, 1*time.Second).ShouldNot(gomega.BeTrue(), "pod shouldn't error for "+
210 "unmanaged pod")
211 })
212
213 evictionCases := []struct {
214 description string
215 minAvailable intstr.IntOrString
216 maxUnavailable intstr.IntOrString
217 podCount int
218 replicaSetSize int32
219 shouldDeny bool
220 exclusive bool
221 skipForBigClusters bool
222 }{
223 {
224 description: "no PDB",
225 minAvailable: intstr.FromString(""),
226 maxUnavailable: intstr.FromString(""),
227 podCount: 1,
228 shouldDeny: false,
229 }, {
230 description: "too few pods, absolute",
231 minAvailable: intstr.FromInt32(2),
232 maxUnavailable: intstr.FromString(""),
233 podCount: 2,
234 shouldDeny: true,
235 }, {
236 description: "enough pods, absolute",
237 minAvailable: intstr.FromInt32(2),
238 maxUnavailable: intstr.FromString(""),
239 podCount: 3,
240 shouldDeny: false,
241 }, {
242 description: "enough pods, replicaSet, percentage",
243 minAvailable: intstr.FromString("90%"),
244 maxUnavailable: intstr.FromString(""),
245 replicaSetSize: 10,
246 exclusive: false,
247 shouldDeny: false,
248 }, {
249 description: "too few pods, replicaSet, percentage",
250 minAvailable: intstr.FromString("90%"),
251 maxUnavailable: intstr.FromString(""),
252 replicaSetSize: 10,
253 exclusive: true,
254 shouldDeny: true,
255
256 skipForBigClusters: true,
257 },
258 {
259 description: "maxUnavailable allow single eviction, percentage",
260 minAvailable: intstr.FromString(""),
261 maxUnavailable: intstr.FromString("10%"),
262 replicaSetSize: 10,
263 exclusive: false,
264 shouldDeny: false,
265 },
266 {
267 description: "maxUnavailable deny evictions, integer",
268 minAvailable: intstr.FromString(""),
269 maxUnavailable: intstr.FromInt32(1),
270 replicaSetSize: 10,
271 exclusive: true,
272 shouldDeny: true,
273
274 skipForBigClusters: true,
275 },
276 }
277 for i := range evictionCases {
278 c := evictionCases[i]
279 expectation := "should allow an eviction"
280 if c.shouldDeny {
281 expectation = "should not allow an eviction"
282 }
283
284
285
286 args := []interface{}{fmt.Sprintf("evictions: %s => %s", c.description, expectation)}
287 if c.exclusive {
288 args = append(args, framework.WithSerial())
289 }
290 f.It(append(args, func(ctx context.Context) {
291 if c.skipForBigClusters {
292 e2eskipper.SkipUnlessNodeCountIsAtMost(bigClusterSize - 1)
293 }
294 createPodsOrDie(ctx, cs, ns, c.podCount)
295 if c.replicaSetSize > 0 {
296 createReplicaSetOrDie(ctx, cs, ns, c.replicaSetSize, c.exclusive)
297 }
298
299 if c.minAvailable.String() != "" {
300 createPDBMinAvailableOrDie(ctx, cs, ns, defaultName, c.minAvailable, defaultLabels)
301 }
302
303 if c.maxUnavailable.String() != "" {
304 createPDBMaxUnavailableOrDie(ctx, cs, ns, defaultName, c.maxUnavailable)
305 }
306
307
308 pod, err := locateRunningPod(ctx, cs, ns)
309 framework.ExpectNoError(err)
310
311 e := &policyv1.Eviction{
312 ObjectMeta: metav1.ObjectMeta{
313 Name: pod.Name,
314 Namespace: ns,
315 },
316 }
317
318 if c.shouldDeny {
319 err = cs.CoreV1().Pods(ns).EvictV1(ctx, e)
320 gomega.Expect(err).To(gomega.MatchError(func(err error) bool {
321 return apierrors.HasStatusCause(err, policyv1.DisruptionBudgetCause)
322 }, "pod eviction should fail with DisruptionBudget cause"))
323 } else {
324
325
326
327 waitForPodsOrDie(ctx, cs, ns, c.podCount+int(c.replicaSetSize))
328
329
330
331 err = wait.PollUntilContextTimeout(ctx, framework.Poll, timeout, true, func(ctx context.Context) (bool, error) {
332 err = cs.CoreV1().Pods(ns).EvictV1(ctx, e)
333 if err != nil {
334 return false, nil
335 }
336 return true, nil
337 })
338 framework.ExpectNoError(err)
339 }
340 })...)
341 }
342
343
348 framework.ConformanceIt("should block an eviction until the PDB is updated to allow it", func(ctx context.Context) {
349 ginkgo.By("Creating a pdb that targets all three pods in a test replica set")
350 createPDBMinAvailableOrDie(ctx, cs, ns, defaultName, intstr.FromInt32(3), defaultLabels)
351 createReplicaSetOrDie(ctx, cs, ns, 3, false)
352
353 ginkgo.By("First trying to evict a pod which shouldn't be evictable")
354 waitForPodsOrDie(ctx, cs, ns, 3)
355
356 pod, err := locateRunningPod(ctx, cs, ns)
357 framework.ExpectNoError(err)
358 e := &policyv1.Eviction{
359 ObjectMeta: metav1.ObjectMeta{
360 Name: pod.Name,
361 Namespace: ns,
362 },
363 }
364 err = cs.CoreV1().Pods(ns).EvictV1(ctx, e)
365 gomega.Expect(err).To(gomega.MatchError(func(err error) bool {
366 return apierrors.HasStatusCause(err, policyv1.DisruptionBudgetCause)
367 }, fmt.Sprintf("pod eviction should fail with DisruptionBudget cause. The error was \"%v\"\n", err)))
368
369 ginkgo.By("Updating the pdb to allow a pod to be evicted")
370 updatePDBOrDie(ctx, cs, ns, defaultName, func(pdb *policyv1.PodDisruptionBudget) *policyv1.PodDisruptionBudget {
371 newMinAvailable := intstr.FromInt32(2)
372 pdb.Spec.MinAvailable = &newMinAvailable
373 return pdb
374 }, cs.PolicyV1().PodDisruptionBudgets(ns).Update)
375
376 ginkgo.By("Trying to evict the same pod we tried earlier which should now be evictable")
377 waitForPodsOrDie(ctx, cs, ns, 3)
378 waitForPdbToObserveHealthyPods(ctx, cs, ns, 3)
379 err = cs.CoreV1().Pods(ns).EvictV1(ctx, e)
380 framework.ExpectNoError(err)
381
382 ginkgo.By("Patching the pdb to disallow a pod to be evicted")
383 patchPDBOrDie(ctx, cs, dc, ns, defaultName, func(old *policyv1.PodDisruptionBudget) (bytes []byte, err error) {
384 oldData, err := json.Marshal(old)
385 framework.ExpectNoError(err, "failed to marshal JSON for old data")
386 old.Spec.MinAvailable = nil
387 maxUnavailable := intstr.FromInt32(0)
388 old.Spec.MaxUnavailable = &maxUnavailable
389 newData, err := json.Marshal(old)
390 framework.ExpectNoError(err, "failed to marshal JSON for new data")
391 return jsonpatch.CreateMergePatch(oldData, newData)
392 })
393
394 waitForPodsOrDie(ctx, cs, ns, 3)
395 pod, err = locateRunningPod(ctx, cs, ns)
396 framework.ExpectNoError(err)
397 e = &policyv1.Eviction{
398 ObjectMeta: metav1.ObjectMeta{
399 Name: pod.Name,
400 Namespace: ns,
401 },
402 }
403 err = cs.CoreV1().Pods(ns).EvictV1(ctx, e)
404 gomega.Expect(err).To(gomega.MatchError(func(err error) bool {
405 return apierrors.HasStatusCause(err, policyv1.DisruptionBudgetCause)
406 }, fmt.Sprintf("pod eviction should fail with DisruptionBudget cause. The error was \"%v\"\n", err)))
407
408 ginkgo.By("Deleting the pdb to allow a pod to be evicted")
409 deletePDBOrDie(ctx, cs, ns, defaultName)
410
411 ginkgo.By("Trying to evict the same pod we tried earlier which should now be evictable")
412 waitForPodsOrDie(ctx, cs, ns, 3)
413 err = cs.CoreV1().Pods(ns).EvictV1(ctx, e)
414 framework.ExpectNoError(err)
415 })
416
417 })
418
419 func createPDBMinAvailableOrDie(ctx context.Context, cs kubernetes.Interface, ns string, name string, minAvailable intstr.IntOrString, labels map[string]string) {
420 pdb := policyv1.PodDisruptionBudget{
421 ObjectMeta: metav1.ObjectMeta{
422 Name: name,
423 Namespace: ns,
424 Labels: labels,
425 },
426 Spec: policyv1.PodDisruptionBudgetSpec{
427 Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}},
428 MinAvailable: &minAvailable,
429 },
430 }
431 _, err := cs.PolicyV1().PodDisruptionBudgets(ns).Create(ctx, &pdb, metav1.CreateOptions{})
432 framework.ExpectNoError(err, "Waiting for the pdb to be created with minAvailable %d in namespace %s", minAvailable.IntVal, ns)
433 waitForPdbToBeProcessed(ctx, cs, ns, name)
434 }
435
436 func createPDBMaxUnavailableOrDie(ctx context.Context, cs kubernetes.Interface, ns string, name string, maxUnavailable intstr.IntOrString) {
437 pdb := policyv1.PodDisruptionBudget{
438 ObjectMeta: metav1.ObjectMeta{
439 Name: name,
440 Namespace: ns,
441 },
442 Spec: policyv1.PodDisruptionBudgetSpec{
443 Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}},
444 MaxUnavailable: &maxUnavailable,
445 },
446 }
447 _, err := cs.PolicyV1().PodDisruptionBudgets(ns).Create(ctx, &pdb, metav1.CreateOptions{})
448 framework.ExpectNoError(err, "Waiting for the pdb to be created with maxUnavailable %d in namespace %s", maxUnavailable.IntVal, ns)
449 waitForPdbToBeProcessed(ctx, cs, ns, name)
450 }
451
452 type updateFunc func(pdb *policyv1.PodDisruptionBudget) *policyv1.PodDisruptionBudget
453 type updateRestAPI func(ctx context.Context, podDisruptionBudget *policyv1.PodDisruptionBudget, opts metav1.UpdateOptions) (*policyv1.PodDisruptionBudget, error)
454 type patchFunc func(pdb *policyv1.PodDisruptionBudget) ([]byte, error)
455
456 func updatePDBOrDie(ctx context.Context, cs kubernetes.Interface, ns string, name string, f updateFunc, api updateRestAPI) (updated *policyv1.PodDisruptionBudget) {
457 err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
458 old, err := cs.PolicyV1().PodDisruptionBudgets(ns).Get(ctx, name, metav1.GetOptions{})
459 if err != nil {
460 return err
461 }
462 old = f(old)
463 if updated, err = api(ctx, old, metav1.UpdateOptions{}); err != nil {
464 return err
465 }
466 return nil
467 })
468
469 framework.ExpectNoError(err, "Waiting for the PDB update to be processed in namespace %s", ns)
470 waitForPdbToBeProcessed(ctx, cs, ns, name)
471 return updated
472 }
473
474 func patchPDBOrDie(ctx context.Context, cs kubernetes.Interface, dc dynamic.Interface, ns string, name string, f patchFunc, subresources ...string) (updated *policyv1.PodDisruptionBudget) {
475 err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
476 old := getPDBStatusOrDie(ctx, dc, ns, name)
477 patchBytes, err := f(old)
478 framework.ExpectNoError(err)
479 if updated, err = cs.PolicyV1().PodDisruptionBudgets(ns).Patch(ctx, old.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{}, subresources...); err != nil {
480 return err
481 }
482 framework.ExpectNoError(err)
483 return nil
484 })
485
486 framework.ExpectNoError(err, "Waiting for the pdb update to be processed in namespace %s", ns)
487 waitForPdbToBeProcessed(ctx, cs, ns, name)
488 return updated
489 }
490
491 func deletePDBOrDie(ctx context.Context, cs kubernetes.Interface, ns string, name string) {
492 err := cs.PolicyV1().PodDisruptionBudgets(ns).Delete(ctx, name, metav1.DeleteOptions{})
493 framework.ExpectNoError(err, "Deleting pdb in namespace %s", ns)
494 waitForPdbToBeDeleted(ctx, cs, ns, name)
495 }
496
497 func listPDBs(ctx context.Context, cs kubernetes.Interface, ns string, labelSelector string, count int, expectedPDBNames []string) {
498 pdbList, err := cs.PolicyV1().PodDisruptionBudgets(ns).List(ctx, metav1.ListOptions{LabelSelector: labelSelector})
499 framework.ExpectNoError(err, "Listing PDB set in namespace %s", ns)
500 gomega.Expect(pdbList.Items).To(gomega.HaveLen(count), "Expecting %d PDBs returned in namespace %s", count, ns)
501
502 pdbNames := make([]string, 0)
503 for _, item := range pdbList.Items {
504 pdbNames = append(pdbNames, item.Name)
505 }
506 gomega.Expect(pdbNames).To(gomega.ConsistOf(expectedPDBNames), "Expecting returned PDBs '%s' in namespace %s", expectedPDBNames, ns)
507 }
508
509 func deletePDBCollection(ctx context.Context, cs kubernetes.Interface, ns string) {
510 ginkgo.By("deleting a collection of PDBs")
511 err := cs.PolicyV1().PodDisruptionBudgets(ns).DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{})
512 framework.ExpectNoError(err, "Deleting PDB set in namespace %s", ns)
513
514 waitForPDBCollectionToBeDeleted(ctx, cs, ns)
515 }
516
517 func waitForPDBCollectionToBeDeleted(ctx context.Context, cs kubernetes.Interface, ns string) {
518 ginkgo.By("Waiting for the PDB collection to be deleted")
519 err := wait.PollUntilContextTimeout(ctx, framework.Poll, schedulingTimeout, true, func(ctx context.Context) (bool, error) {
520 pdbList, err := cs.PolicyV1().PodDisruptionBudgets(ns).List(ctx, metav1.ListOptions{})
521 if err != nil {
522 return false, err
523 }
524 if len(pdbList.Items) != 0 {
525 return false, nil
526 }
527 return true, nil
528 })
529 framework.ExpectNoError(err, "Waiting for the PDB collection to be deleted in namespace %s", ns)
530 }
531
532 func createPodsOrDie(ctx context.Context, cs kubernetes.Interface, ns string, n int) {
533 for i := 0; i < n; i++ {
534 pod := &v1.Pod{
535 ObjectMeta: metav1.ObjectMeta{
536 Name: fmt.Sprintf("pod-%d", i),
537 Namespace: ns,
538 Labels: map[string]string{"foo": "bar"},
539 },
540 Spec: v1.PodSpec{
541 Containers: []v1.Container{
542 {
543 Name: "donothing",
544 Image: imageutils.GetPauseImageName(),
545 },
546 },
547 RestartPolicy: v1.RestartPolicyAlways,
548 },
549 }
550
551 _, err := cs.CoreV1().Pods(ns).Create(ctx, pod, metav1.CreateOptions{})
552 framework.ExpectNoError(err, "Creating pod %q in namespace %q", pod.Name, ns)
553 }
554 }
555
556 func waitForPodsOrDie(ctx context.Context, cs kubernetes.Interface, ns string, n int) {
557 ginkgo.By("Waiting for all pods to be running")
558 err := wait.PollUntilContextTimeout(ctx, framework.Poll, schedulingTimeout, true, func(ctx context.Context) (bool, error) {
559 pods, err := cs.CoreV1().Pods(ns).List(ctx, metav1.ListOptions{LabelSelector: "foo=bar"})
560 if err != nil {
561 return false, err
562 }
563 if pods == nil {
564 return false, fmt.Errorf("pods is nil")
565 }
566 if len(pods.Items) < n {
567 framework.Logf("pods: %v < %v", len(pods.Items), n)
568 return false, nil
569 }
570 ready := 0
571 for i := range pods.Items {
572 pod := pods.Items[i]
573 if podutil.IsPodReady(&pod) && pod.ObjectMeta.DeletionTimestamp.IsZero() {
574 ready++
575 }
576 }
577 if ready < n {
578 framework.Logf("running pods: %v < %v", ready, n)
579 return false, nil
580 }
581 return true, nil
582 })
583 framework.ExpectNoError(err, "Waiting for pods in namespace %q to be ready", ns)
584 }
585
586 func createReplicaSetOrDie(ctx context.Context, cs kubernetes.Interface, ns string, size int32, exclusive bool) {
587 container := v1.Container{
588 Name: "donothing",
589 Image: imageutils.GetPauseImageName(),
590 }
591 if exclusive {
592 container.Ports = []v1.ContainerPort{
593 {HostPort: 5555, ContainerPort: 5555},
594 }
595 }
596
597 rs := &appsv1.ReplicaSet{
598 ObjectMeta: metav1.ObjectMeta{
599 Name: "rs",
600 Namespace: ns,
601 },
602 Spec: appsv1.ReplicaSetSpec{
603 Replicas: &size,
604 Selector: &metav1.LabelSelector{
605 MatchLabels: map[string]string{"foo": "bar"},
606 },
607 Template: v1.PodTemplateSpec{
608 ObjectMeta: metav1.ObjectMeta{
609 Labels: map[string]string{"foo": "bar"},
610 },
611 Spec: v1.PodSpec{
612 Containers: []v1.Container{container},
613 },
614 },
615 },
616 }
617
618 _, err := cs.AppsV1().ReplicaSets(ns).Create(ctx, rs, metav1.CreateOptions{})
619 framework.ExpectNoError(err, "Creating replica set %q in namespace %q", rs.Name, ns)
620 }
621
622 func locateRunningPod(ctx context.Context, cs kubernetes.Interface, ns string) (pod *v1.Pod, err error) {
623 ginkgo.By("locating a running pod")
624 err = wait.PollUntilContextTimeout(ctx, framework.Poll, schedulingTimeout, true, func(ctx context.Context) (bool, error) {
625 podList, err := cs.CoreV1().Pods(ns).List(ctx, metav1.ListOptions{})
626 if err != nil {
627 return false, err
628 }
629
630 for i := range podList.Items {
631 p := podList.Items[i]
632 if podutil.IsPodReady(&p) && p.ObjectMeta.DeletionTimestamp.IsZero() {
633 pod = &p
634 return true, nil
635 }
636 }
637
638 return false, nil
639 })
640 return pod, err
641 }
642
643 func waitForPdbToBeProcessed(ctx context.Context, cs kubernetes.Interface, ns string, name string) {
644 ginkgo.By("Waiting for the pdb to be processed")
645 err := wait.PollUntilContextTimeout(ctx, framework.Poll, schedulingTimeout, true, func(ctx context.Context) (bool, error) {
646 pdb, err := cs.PolicyV1().PodDisruptionBudgets(ns).Get(ctx, name, metav1.GetOptions{})
647 if err != nil {
648 return false, err
649 }
650 if pdb.Status.ObservedGeneration < pdb.Generation {
651 return false, nil
652 }
653 return true, nil
654 })
655 framework.ExpectNoError(err, "Waiting for the pdb to be processed in namespace %s", ns)
656 }
657
658 func waitForPdbToBeDeleted(ctx context.Context, cs kubernetes.Interface, ns string, name string) {
659 ginkgo.By("Waiting for the pdb to be deleted")
660 err := wait.PollUntilContextTimeout(ctx, framework.Poll, schedulingTimeout, true, func(ctx context.Context) (bool, error) {
661 _, err := cs.PolicyV1().PodDisruptionBudgets(ns).Get(ctx, name, metav1.GetOptions{})
662 if apierrors.IsNotFound(err) {
663 return true, nil
664 }
665 if err != nil {
666 return false, err
667 }
668 return false, nil
669 })
670 framework.ExpectNoError(err, "Waiting for the pdb to be deleted in namespace %s", ns)
671 }
672
673 func waitForPdbToObserveHealthyPods(ctx context.Context, cs kubernetes.Interface, ns string, healthyCount int32) {
674 ginkgo.By("Waiting for the pdb to observed all healthy pods")
675 err := wait.PollUntilContextTimeout(ctx, framework.Poll, wait.ForeverTestTimeout, true, func(ctx context.Context) (bool, error) {
676 pdb, err := cs.PolicyV1().PodDisruptionBudgets(ns).Get(ctx, "foo", metav1.GetOptions{})
677 if err != nil {
678 return false, err
679 }
680 if pdb.Status.CurrentHealthy != healthyCount {
681 return false, nil
682 }
683 return true, nil
684 })
685 framework.ExpectNoError(err, "Waiting for the pdb in namespace %s to observed %d healthy pods", ns, healthyCount)
686 }
687
688 func getPDBStatusOrDie(ctx context.Context, dc dynamic.Interface, ns string, name string) *policyv1.PodDisruptionBudget {
689 pdbStatusResource := policyv1.SchemeGroupVersion.WithResource("poddisruptionbudgets")
690 unstruct, err := dc.Resource(pdbStatusResource).Namespace(ns).Get(ctx, name, metav1.GetOptions{}, "status")
691 framework.ExpectNoError(err)
692 pdb, err := unstructuredToPDB(unstruct)
693 framework.ExpectNoError(err, "Getting the status of the pdb %s in namespace %s", name, ns)
694 return pdb
695 }
696
697 func unstructuredToPDB(obj *unstructured.Unstructured) (*policyv1.PodDisruptionBudget, error) {
698 json, err := runtime.Encode(unstructured.UnstructuredJSONScheme, obj)
699 if err != nil {
700 return nil, err
701 }
702 pdb := &policyv1.PodDisruptionBudget{}
703 err = runtime.DecodeInto(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), json, pdb)
704 pdb.Kind = ""
705 pdb.APIVersion = ""
706 return pdb, err
707 }
708
709
710 func isPDBErroring(pdb *policyv1.PodDisruptionBudget) bool {
711 hasFailed := false
712 for _, condition := range pdb.Status.Conditions {
713 if strings.Contains(condition.Reason, "SyncFailed") &&
714 strings.Contains(condition.Message, "found no controller ref for pod") {
715 hasFailed = true
716 }
717 }
718 return hasFailed
719 }
720
View as plain text