...
1
16
17 package scheduling
18
19 import (
20 "context"
21 "fmt"
22 "time"
23
24 v1 "k8s.io/api/core/v1"
25 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26 "k8s.io/apimachinery/pkg/util/sets"
27 clientset "k8s.io/client-go/kubernetes"
28 "k8s.io/kubernetes/test/e2e/framework"
29 )
30
31 var (
32 timeout = 10 * time.Minute
33 waitTime = 2 * time.Second
34
35
36 SIGDescribe = framework.SIGDescribe("scheduling")
37 )
38
39
40 func WaitForStableCluster(c clientset.Interface, workerNodes sets.Set[string]) int {
41 startTime := time.Now()
42
43 allScheduledPods, allNotScheduledPods := getScheduledAndUnscheduledPods(c, workerNodes)
44 for len(allNotScheduledPods) != 0 {
45 time.Sleep(waitTime)
46 if startTime.Add(timeout).Before(time.Now()) {
47 framework.Logf("Timed out waiting for the following pods to schedule")
48 for _, p := range allNotScheduledPods {
49 framework.Logf("%v/%v", p.Namespace, p.Name)
50 }
51 framework.Failf("Timed out after %v waiting for stable cluster.", timeout)
52 break
53 }
54 allScheduledPods, allNotScheduledPods = getScheduledAndUnscheduledPods(c, workerNodes)
55 }
56 return len(allScheduledPods)
57 }
58
59
60 func getScheduledAndUnscheduledPods(c clientset.Interface, workerNodes sets.Set[string]) (scheduledPods, notScheduledPods []v1.Pod) {
61 pods, err := c.CoreV1().Pods(metav1.NamespaceAll).List(context.TODO(), metav1.ListOptions{})
62 framework.ExpectNoError(err, fmt.Sprintf("listing all pods in namespace %q while waiting for stable cluster", metav1.NamespaceAll))
63
64
65 filteredPods := make([]v1.Pod, 0, len(pods.Items))
66 for _, p := range pods.Items {
67 if !podTerminated(p) {
68 filteredPods = append(filteredPods, p)
69 }
70 }
71 pods.Items = filteredPods
72 return GetPodsScheduled(workerNodes, pods)
73 }
74
75 func podTerminated(p v1.Pod) bool {
76 return p.Status.Phase == v1.PodSucceeded || p.Status.Phase == v1.PodFailed
77 }
78
View as plain text