1
16
17 package job
18
19 import (
20 "context"
21 "fmt"
22 "time"
23
24 batchv1 "k8s.io/api/batch/v1"
25 v1 "k8s.io/api/core/v1"
26 apierrors "k8s.io/apimachinery/pkg/api/errors"
27 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28 "k8s.io/apimachinery/pkg/util/wait"
29 clientset "k8s.io/client-go/kubernetes"
30 "k8s.io/kubernetes/test/e2e/framework"
31 "k8s.io/kubernetes/test/utils/format"
32 "k8s.io/utils/ptr"
33 )
34
35
36
37
38
39
40 type JobState func(job *batchv1.Job) string
41
42
43
44 func WaitForJobPodsRunning(ctx context.Context, c clientset.Interface, ns, jobName string, expectedCount int32) error {
45 return waitForJobPodsInPhase(ctx, c, ns, jobName, expectedCount, v1.PodRunning)
46 }
47
48
49 func WaitForJobPodsSucceeded(ctx context.Context, c clientset.Interface, ns, jobName string, expectedCount int32) error {
50 return waitForJobPodsInPhase(ctx, c, ns, jobName, expectedCount, v1.PodSucceeded)
51 }
52
53
54 func waitForJobPodsInPhase(ctx context.Context, c clientset.Interface, ns, jobName string, expectedCount int32, phase v1.PodPhase) error {
55 return wait.PollWithContext(ctx, framework.Poll, JobTimeout, func(ctx context.Context) (bool, error) {
56 pods, err := GetJobPods(ctx, c, ns, jobName)
57 if err != nil {
58 return false, err
59 }
60 count := int32(0)
61 for _, p := range pods.Items {
62 if p.Status.Phase == phase {
63 count++
64 }
65 }
66 return count == expectedCount, nil
67 })
68 }
69
70
71 func WaitForJobComplete(ctx context.Context, c clientset.Interface, ns, jobName string, completions int32) error {
72 return wait.PollWithContext(ctx, framework.Poll, JobTimeout, func(ctx context.Context) (bool, error) {
73 curr, err := c.BatchV1().Jobs(ns).Get(ctx, jobName, metav1.GetOptions{})
74 if err != nil {
75 return false, err
76 }
77 return curr.Status.Succeeded == completions, nil
78 })
79 }
80
81
82 func WaitForJobReady(ctx context.Context, c clientset.Interface, ns, jobName string, ready *int32) error {
83 return WaitForJobState(ctx, c, ns, jobName, JobTimeout, func(job *batchv1.Job) string {
84 if ptr.Equal(ready, job.Status.Ready) {
85 return ""
86 }
87 return "job does not match intended ready status"
88 })
89 }
90
91
92 func WaitForJobSuspend(ctx context.Context, c clientset.Interface, ns, jobName string) error {
93 return WaitForJobState(ctx, c, ns, jobName, JobTimeout, func(job *batchv1.Job) string {
94 for _, c := range job.Status.Conditions {
95 if c.Type == batchv1.JobSuspended && c.Status == v1.ConditionTrue {
96 return ""
97 }
98 }
99 return "job should be suspended"
100 })
101 }
102
103
104 func WaitForJobFailed(c clientset.Interface, ns, jobName string) error {
105 return wait.PollImmediate(framework.Poll, JobTimeout, func() (bool, error) {
106 curr, err := c.BatchV1().Jobs(ns).Get(context.TODO(), jobName, metav1.GetOptions{})
107 if err != nil {
108 return false, err
109 }
110
111 return isJobFailed(curr), nil
112 })
113 }
114
115 func isJobFailed(j *batchv1.Job) bool {
116 for _, c := range j.Status.Conditions {
117 if (c.Type == batchv1.JobFailed) && c.Status == v1.ConditionTrue {
118 return true
119 }
120 }
121 return false
122 }
123
124
125 func WaitForJobFinish(ctx context.Context, c clientset.Interface, ns, jobName string) error {
126 return wait.PollUntilContextTimeout(ctx, framework.Poll, JobTimeout, true, func(ctx context.Context) (bool, error) {
127 curr, err := c.BatchV1().Jobs(ns).Get(ctx, jobName, metav1.GetOptions{})
128 if err != nil {
129 return false, err
130 }
131
132 return isJobFinished(curr), nil
133 })
134 }
135
136 func isJobFinished(j *batchv1.Job) bool {
137 for _, c := range j.Status.Conditions {
138 if (c.Type == batchv1.JobComplete || c.Type == batchv1.JobFailed) && c.Status == v1.ConditionTrue {
139 return true
140 }
141 }
142
143 return false
144 }
145
146
147 func WaitForJobGone(ctx context.Context, c clientset.Interface, ns, jobName string, timeout time.Duration) error {
148 return wait.PollWithContext(ctx, framework.Poll, timeout, func(ctx context.Context) (bool, error) {
149 _, err := c.BatchV1().Jobs(ns).Get(ctx, jobName, metav1.GetOptions{})
150 if apierrors.IsNotFound(err) {
151 return true, nil
152 }
153 return false, err
154 })
155 }
156
157
158
159 func WaitForAllJobPodsGone(ctx context.Context, c clientset.Interface, ns, jobName string) error {
160 return wait.PollUntilContextTimeout(ctx, framework.Poll, JobTimeout, true, func(ctx context.Context) (bool, error) {
161 pods, err := GetJobPods(ctx, c, ns, jobName)
162 if err != nil {
163 return false, err
164 }
165 return len(pods.Items) == 0, nil
166 })
167 }
168
169
170
171 func WaitForJobState(ctx context.Context, c clientset.Interface, ns, jobName string, timeout time.Duration, state JobState) error {
172 return framework.Gomega().
173 Eventually(ctx, framework.RetryNotFound(framework.GetObject(c.BatchV1().Jobs(ns).Get, jobName, metav1.GetOptions{}))).
174 WithTimeout(timeout).
175 Should(framework.MakeMatcher(func(job *batchv1.Job) (func() string, error) {
176 matches := state(job)
177 if matches == "" {
178 return nil, nil
179 }
180 return func() string {
181 return fmt.Sprintf("%v\n%s", matches, format.Object(job, 1))
182 }, nil
183 }))
184 }
185
View as plain text