1
16
17 package apps
18
19 import (
20 "context"
21 "encoding/json"
22 "fmt"
23 "strconv"
24 "time"
25
26 batchv1 "k8s.io/api/batch/v1"
27 v1 "k8s.io/api/core/v1"
28 policyv1 "k8s.io/api/policy/v1"
29 apierrors "k8s.io/apimachinery/pkg/api/errors"
30 "k8s.io/apimachinery/pkg/api/resource"
31 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
32 "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
33 "k8s.io/apimachinery/pkg/labels"
34 "k8s.io/apimachinery/pkg/runtime/schema"
35 "k8s.io/apimachinery/pkg/types"
36 utilrand "k8s.io/apimachinery/pkg/util/rand"
37 "k8s.io/apimachinery/pkg/util/sets"
38 "k8s.io/apimachinery/pkg/util/wait"
39 "k8s.io/apimachinery/pkg/watch"
40 clientset "k8s.io/client-go/kubernetes"
41 "k8s.io/client-go/tools/cache"
42 watchtools "k8s.io/client-go/tools/watch"
43 "k8s.io/client-go/util/retry"
44 batchinternal "k8s.io/kubernetes/pkg/apis/batch"
45 "k8s.io/kubernetes/test/e2e/framework"
46 e2ejob "k8s.io/kubernetes/test/e2e/framework/job"
47 e2enode "k8s.io/kubernetes/test/e2e/framework/node"
48 e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
49 e2eresource "k8s.io/kubernetes/test/e2e/framework/resource"
50 "k8s.io/kubernetes/test/e2e/scheduling"
51 admissionapi "k8s.io/pod-security-admission/api"
52 "k8s.io/utils/pointer"
53 "k8s.io/utils/ptr"
54
55 "github.com/onsi/ginkgo/v2"
56 "github.com/onsi/gomega"
57 )
58
59 type watchEventConfig struct {
60 framework *framework.Framework
61 resourceVersion string
62 w *cache.ListWatch
63 jobName string
64 watchEvent watch.EventType
65 extJob *batchv1.Job
66 updatedMetadataType string
67 updatedKey string
68 updatedValue string
69 }
70
71 var _ = SIGDescribe("Job", func() {
72 f := framework.NewDefaultFramework("job")
73 f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
74
75
76 ginkgo.It("should run a job to completion when tasks succeed", func(ctx context.Context) {
77 parallelism := int32(2)
78 completions := int32(4)
79 backoffLimit := int32(6)
80
81 ginkgo.By("Creating a job")
82 job := e2ejob.NewTestJob("succeed", "all-succeed", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit)
83 job, err := e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job)
84 framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
85
86 ginkgo.By("Ensuring job reaches completions")
87 err = e2ejob.WaitForJobComplete(ctx, f.ClientSet, f.Namespace.Name, job.Name, completions)
88 framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name)
89
90 ginkgo.By("Ensuring pods for job exist")
91 pods, err := e2ejob.GetJobPods(ctx, f.ClientSet, f.Namespace.Name, job.Name)
92 framework.ExpectNoError(err, "failed to get pod list for job in namespace: %s", f.Namespace.Name)
93 successes := int32(0)
94 for _, pod := range pods.Items {
95 if pod.Status.Phase == v1.PodSucceeded {
96 successes++
97 }
98 }
99 gomega.Expect(successes).To(gomega.Equal(completions), "expected %d successful job pods, but got %d", completions, successes)
100 })
101
102 ginkgo.It("should allow to use the pod failure policy on exit code to fail the job early", func(ctx context.Context) {
103
104
105
106
107
108
109
110 ginkgo.By("Looking for a node to schedule job pod")
111 node, err := e2enode.GetRandomReadySchedulableNode(ctx, f.ClientSet)
112 framework.ExpectNoError(err)
113
114 parallelism := int32(2)
115 completions := int32(4)
116 backoffLimit := int32(6)
117 ginkgo.By("Creating a job")
118 job := e2ejob.NewTestJobOnNode("failOnce", "pod-failure-failjob", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit, node.Name)
119 job.Spec.PodFailurePolicy = &batchv1.PodFailurePolicy{
120 Rules: []batchv1.PodFailurePolicyRule{
121 {
122 Action: batchv1.PodFailurePolicyActionFailJob,
123 OnExitCodes: &batchv1.PodFailurePolicyOnExitCodesRequirement{
124 Operator: batchv1.PodFailurePolicyOnExitCodesOpIn,
125 Values: []int32{1},
126 },
127 },
128 },
129 }
130 job, err = e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job)
131 framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
132
133 ginkgo.By("Ensuring job fails")
134 err = e2ejob.WaitForJobFailed(f.ClientSet, f.Namespace.Name, job.Name)
135 framework.ExpectNoError(err, "failed to ensure job failure in namespace: %s", f.Namespace.Name)
136 })
137
138 ginkgo.It("should allow to use the pod failure policy to not count the failure towards the backoffLimit", func(ctx context.Context) {
139
140
141
142
143
144
145
146
147 parallelism := int32(2)
148 completions := int32(4)
149 backoffLimit := int32(0)
150
151 ginkgo.By("Looking for a node to schedule job pod")
152 node, err := e2enode.GetRandomReadySchedulableNode(ctx, f.ClientSet)
153 framework.ExpectNoError(err)
154
155 ginkgo.By("Creating a job")
156 job := e2ejob.NewTestJobOnNode("failOnce", "pod-failure-ignore", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit, node.Name)
157 job.Spec.PodFailurePolicy = &batchv1.PodFailurePolicy{
158 Rules: []batchv1.PodFailurePolicyRule{
159 {
160 Action: batchv1.PodFailurePolicyActionIgnore,
161 OnExitCodes: &batchv1.PodFailurePolicyOnExitCodesRequirement{
162 Operator: batchv1.PodFailurePolicyOnExitCodesOpIn,
163 Values: []int32{1},
164 },
165 },
166 },
167 }
168 job, err = e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job)
169 framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
170
171 ginkgo.By("Ensuring job reaches completions")
172 err = e2ejob.WaitForJobComplete(ctx, f.ClientSet, f.Namespace.Name, job.Name, completions)
173 framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name)
174 })
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191 ginkgo.DescribeTable("Using a pod failure policy to not count some failures towards the backoffLimit",
192 func(ctx context.Context, policy *batchv1.PodFailurePolicy) {
193 mode := batchv1.IndexedCompletion
194
195
196
197
198 parallelism := int32(2)
199 completions := int32(4)
200 backoffLimit := int32(0)
201
202 ginkgo.By("Looking for a node to schedule job pods")
203 node, err := e2enode.GetRandomReadySchedulableNode(ctx, f.ClientSet)
204 framework.ExpectNoError(err)
205
206 ginkgo.By("Creating a job")
207 job := e2ejob.NewTestJobOnNode("notTerminateOnce", "pod-disruption-failure-ignore", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit, node.Name)
208 job.Spec.CompletionMode = &mode
209 job.Spec.PodFailurePolicy = policy
210 job, err = e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job)
211 framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
212
213 ginkgo.By("Awaiting for all non 0-indexed pods to succeed to ensure the marker file is created")
214 err = e2ejob.WaitForJobPodsSucceeded(ctx, f.ClientSet, f.Namespace.Name, job.Name, completions-1)
215 framework.ExpectNoError(err, "failed to await for all non 0-indexed pods to succeed for job: %s/%s", job.Name, job.Namespace)
216
217 ginkgo.By("Awaiting for the 0-indexed pod to be running")
218 err = e2ejob.WaitForJobPodsRunning(ctx, f.ClientSet, f.Namespace.Name, job.Name, 1)
219 framework.ExpectNoError(err, "failed to await for the 0-indexed pod to be running for the job: %s/%s", job.Name, job.Namespace)
220
221 pods, err := e2ejob.GetAllRunningJobPods(ctx, f.ClientSet, f.Namespace.Name, job.Name)
222 framework.ExpectNoError(err, "failed to get running pods for the job: %s/%s", job.Name, job.Namespace)
223 gomega.Expect(pods).To(gomega.HaveLen(1), "Exactly one running pod is expected")
224 pod := pods[0]
225 ginkgo.By(fmt.Sprintf("Evicting the running pod: %s/%s", pod.Name, pod.Namespace))
226 evictTarget := &policyv1.Eviction{
227 ObjectMeta: metav1.ObjectMeta{
228 Name: pod.Name,
229 Namespace: pod.Namespace,
230 },
231 }
232 f.ClientSet.CoreV1().Pods(pod.Namespace).EvictV1(context.TODO(), evictTarget)
233 framework.ExpectNoError(err, "failed to evict the pod: %s/%s", pod.Name, pod.Namespace)
234
235 ginkgo.By(fmt.Sprintf("Awaiting for the pod: %s/%s to be deleted", pod.Name, pod.Namespace))
236 err = e2epod.WaitForPodNotFoundInNamespace(ctx, f.ClientSet, pod.Name, pod.Namespace, f.Timeouts.PodDelete)
237 framework.ExpectNoError(err, "failed to await for the pod to be deleted: %s/%s", pod.Name, pod.Namespace)
238
239 ginkgo.By("Ensuring job reaches completions")
240 err = e2ejob.WaitForJobComplete(ctx, f.ClientSet, f.Namespace.Name, job.Name, completions)
241 framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name)
242 },
243 ginkgo.Entry("Ignore DisruptionTarget condition", &batchv1.PodFailurePolicy{
244 Rules: []batchv1.PodFailurePolicyRule{
245 {
246
247 Action: batchv1.PodFailurePolicyActionIgnore,
248 OnExitCodes: &batchv1.PodFailurePolicyOnExitCodesRequirement{
249 Operator: batchv1.PodFailurePolicyOnExitCodesOpIn,
250 Values: []int32{1},
251 },
252 },
253 {
254
255 Action: batchv1.PodFailurePolicyActionIgnore,
256 OnPodConditions: []batchv1.PodFailurePolicyOnPodConditionsPattern{
257 {
258 Type: v1.DisruptionTarget,
259 Status: v1.ConditionTrue,
260 },
261 },
262 },
263 },
264 }),
265 ginkgo.Entry("Ignore exit code 137", &batchv1.PodFailurePolicy{
266 Rules: []batchv1.PodFailurePolicyRule{
267 {
268
269
270 Action: batchv1.PodFailurePolicyActionIgnore,
271 OnExitCodes: &batchv1.PodFailurePolicyOnExitCodesRequirement{
272 Operator: batchv1.PodFailurePolicyOnExitCodesOpIn,
273 Values: []int32{1, 137},
274 },
275 },
276 },
277 }),
278 )
279
280 ginkgo.It("should not create pods when created in suspend state", func(ctx context.Context) {
281 parallelism := int32(2)
282 completions := int32(4)
283 backoffLimit := int32(6)
284
285 ginkgo.By("Creating a job with suspend=true")
286 job := e2ejob.NewTestJob("succeed", "suspend-true-to-false", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit)
287 job.Spec.Suspend = pointer.BoolPtr(true)
288 job, err := e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job)
289 framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
290
291 ginkgo.By("Checking Job status to observe Suspended state")
292 err = e2ejob.WaitForJobSuspend(ctx, f.ClientSet, f.Namespace.Name, job.Name)
293 framework.ExpectNoError(err, "failed to observe suspend state: %s", f.Namespace.Name)
294
295 ginkgo.By("Ensuring pods aren't created for job")
296 pods, err := e2ejob.GetJobPods(ctx, f.ClientSet, f.Namespace.Name, job.Name)
297 framework.ExpectNoError(err, "failed to list pod for a given job %s in namespace %s", job.Name, f.Namespace.Name)
298 gomega.Expect(pods.Items).To(gomega.BeEmpty())
299
300 ginkgo.By("Updating the job with suspend=false")
301 job, err = f.ClientSet.BatchV1().Jobs(f.Namespace.Name).Get(ctx, job.Name, metav1.GetOptions{})
302 framework.ExpectNoError(err, "failed to get job in namespace: %s", f.Namespace.Name)
303 job.Spec.Suspend = pointer.BoolPtr(false)
304 job, err = e2ejob.UpdateJob(ctx, f.ClientSet, f.Namespace.Name, job)
305 framework.ExpectNoError(err, "failed to update job in namespace: %s", f.Namespace.Name)
306
307 ginkgo.By("Waiting for job to complete")
308 err = e2ejob.WaitForJobComplete(ctx, f.ClientSet, f.Namespace.Name, job.Name, completions)
309 framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name)
310 })
311
312 ginkgo.It("should delete pods when suspended", func(ctx context.Context) {
313 parallelism := int32(2)
314 completions := int32(4)
315 backoffLimit := int32(6)
316
317 ginkgo.By("Creating a job with suspend=false")
318 job := e2ejob.NewTestJob("notTerminate", "suspend-false-to-true", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit)
319 job.Spec.Suspend = pointer.Bool(false)
320 job, err := e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job)
321 framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
322
323 ginkgo.By("Ensure pods equal to parallelism count is attached to the job")
324 err = e2ejob.WaitForJobPodsRunning(ctx, f.ClientSet, f.Namespace.Name, job.Name, parallelism)
325 framework.ExpectNoError(err, "failed to ensure number of pods associated with job %s is equal to parallelism count in namespace: %s", job.Name, f.Namespace.Name)
326
327 ginkgo.By("Updating the job with suspend=true")
328 err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
329 job, err = e2ejob.GetJob(ctx, f.ClientSet, f.Namespace.Name, job.Name)
330 framework.ExpectNoError(err, "unable to get job %s in namespace %s", job.Name, f.Namespace.Name)
331 job.Spec.Suspend = pointer.Bool(true)
332 updatedJob, err := e2ejob.UpdateJob(ctx, f.ClientSet, f.Namespace.Name, job)
333 if err == nil {
334 job = updatedJob
335 }
336 return err
337 })
338 framework.ExpectNoError(err, "failed to update job in namespace: %s", f.Namespace.Name)
339
340 ginkgo.By("Ensuring pods are deleted")
341 err = e2ejob.WaitForAllJobPodsGone(ctx, f.ClientSet, f.Namespace.Name, job.Name)
342 framework.ExpectNoError(err, "failed to ensure pods are deleted after suspend=true")
343
344 ginkgo.By("Checking Job status to observe Suspended state")
345 job, err = e2ejob.GetJob(ctx, f.ClientSet, f.Namespace.Name, job.Name)
346 framework.ExpectNoError(err, "failed to retrieve latest job object")
347 exists := false
348 for _, c := range job.Status.Conditions {
349 if c.Type == batchv1.JobSuspended {
350 exists = true
351 break
352 }
353 }
354 if !exists {
355 framework.Failf("Job was expected to be completed or failed")
356 }
357 })
358
359 ginkgo.It("should recreate pods only after they have failed if pod replacement policy is set to Failed", func(ctx context.Context) {
360 ginkgo.By("Creating a job")
361 job := e2ejob.NewTestJob("", "pod-recreate-failed", v1.RestartPolicyNever, 1, 1, nil, 1)
362 job.Spec.PodReplacementPolicy = ptr.To(batchv1.Failed)
363 job.Spec.Template.Spec.Containers[0].Command = []string{"/bin/sh", "-c", `_term(){
364 sleep 5
365 exit 143
366 }
367 trap _term SIGTERM
368 while true; do
369 sleep 1
370 done`}
371 job, err := e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job)
372 framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
373
374 err = e2ejob.WaitForJobPodsRunning(ctx, f.ClientSet, f.Namespace.Name, job.Name, 1)
375 framework.ExpectNoError(err, "failed to wait for job pod to become running in namespace: %s", f.Namespace.Name)
376
377 ginkgo.By("Deleting job pod")
378 pods, err := e2ejob.GetJobPods(ctx, f.ClientSet, f.Namespace.Name, job.Name)
379 framework.ExpectNoError(err, "failed to get pod list for job %s in namespace: %s", job.Name, f.Namespace.Name)
380
381 framework.ExpectNoError(e2epod.DeletePodsWithGracePeriod(ctx, f.ClientSet, pods.Items, 30), "failed to delete pods in namespace: %s", f.Namespace.Name)
382
383 ginkgo.By("Ensuring pod does not get recreated while it is in terminating state")
384 err = e2ejob.WaitForJobState(ctx, f.ClientSet, f.Namespace.Name, job.Name, f.Timeouts.PodDelete, func(job *batchv1.Job) string {
385 if job.Status.Active == 0 && job.Status.Failed == 0 && *job.Status.Terminating == 1 {
386 return ""
387 } else {
388 return fmt.Sprintf(
389 "expected job to have 0 active pod, 0 failed pod and 1 terminating pods, but got %d active pods, %d failed pods and %d terminating pods",
390 job.Status.Active,
391 job.Status.Failed,
392 *job.Status.Terminating,
393 )
394 }
395 })
396 framework.ExpectNoError(err, "failed to ensure pod is not recreated while it is in terminating state")
397
398 ginkgo.By("Ensuring pod gets recreated after it has failed")
399 err = e2ejob.WaitForJobState(ctx, f.ClientSet, f.Namespace.Name, job.Name, f.Timeouts.PodDelete, func(job *batchv1.Job) string {
400 if job.Status.Active == 1 && job.Status.Failed == 1 && *job.Status.Terminating == 0 {
401 return ""
402 } else {
403 return fmt.Sprintf(
404 "expected job to have 1 active pods, 1 failed pods and 0 terminating pod, but got %d active pods, %d failed pods and %d terminating pods",
405 job.Status.Active,
406 job.Status.Failed,
407 *job.Status.Terminating,
408 )
409 }
410 })
411 framework.ExpectNoError(err, "failed to wait for pod to get recreated")
412 })
413
414
420 framework.ConformanceIt("should create pods for an Indexed job with completion indexes and specified hostname", func(ctx context.Context) {
421 parallelism := int32(2)
422 completions := int32(4)
423 backoffLimit := int32(6)
424
425 ginkgo.By("Creating Indexed job")
426 job := e2ejob.NewTestJob("succeed", "indexed-job", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit)
427 mode := batchv1.IndexedCompletion
428 job.Spec.CompletionMode = &mode
429 job, err := e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job)
430 framework.ExpectNoError(err, "failed to create indexed job in namespace %s", f.Namespace.Name)
431
432 ginkgo.By("Ensuring job reaches completions")
433 err = e2ejob.WaitForJobComplete(ctx, f.ClientSet, f.Namespace.Name, job.Name, completions)
434 framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name)
435
436 ginkgo.By("Ensuring pods with index for job exist")
437 pods, err := e2ejob.GetJobPods(ctx, f.ClientSet, f.Namespace.Name, job.Name)
438 framework.ExpectNoError(err, "failed to get pod list for job in namespace: %s", f.Namespace.Name)
439 succeededIndexes := sets.NewInt()
440 for _, pod := range pods.Items {
441 if pod.Status.Phase == v1.PodSucceeded && pod.Annotations != nil {
442 ix, err := strconv.Atoi(pod.Annotations[batchv1.JobCompletionIndexAnnotation])
443 framework.ExpectNoError(err, "failed obtaining completion index from pod in namespace: %s", f.Namespace.Name)
444 succeededIndexes.Insert(ix)
445 expectedName := fmt.Sprintf("%s-%d", job.Name, ix)
446 gomega.Expect(pod.Spec.Hostname).To(gomega.Equal(expectedName), "expected completed pod with hostname %s, but got %s", expectedName, pod.Spec.Hostname)
447 }
448 }
449 gotIndexes := succeededIndexes.List()
450 wantIndexes := []int{0, 1, 2, 3}
451 gomega.Expect(gotIndexes).To(gomega.Equal(wantIndexes), "expected completed indexes %s, but got %s", wantIndexes, gotIndexes)
452 })
453
454
461 ginkgo.It("should execute all indexes despite some failing when using backoffLimitPerIndex", func(ctx context.Context) {
462 parallelism := int32(2)
463 completions := int32(4)
464 backoffLimit := int32(6)
465
466 ginkgo.By("Creating an indexed job with backoffLimit per index and failing pods")
467 job := e2ejob.NewTestJob("failOddSucceedEven", "with-backoff-limit-per-index", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit)
468 job.Spec.BackoffLimit = nil
469 job.Spec.BackoffLimitPerIndex = ptr.To[int32](1)
470 mode := batchv1.IndexedCompletion
471 job.Spec.CompletionMode = &mode
472 job, err := e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job)
473 framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
474
475 ginkgo.By("Awaiting for the job to fail as there are failed indexes")
476 err = e2ejob.WaitForJobFailed(f.ClientSet, f.Namespace.Name, job.Name)
477 framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name)
478
479 ginkgo.By("Verifying the Job status fields to ensure all indexes were executed")
480 job, err = e2ejob.GetJob(ctx, f.ClientSet, f.Namespace.Name, job.Name)
481 framework.ExpectNoError(err, "failed to retrieve latest job object")
482 gomega.Expect(job.Status.FailedIndexes).Should(gomega.HaveValue(gomega.Equal("1,3")))
483 gomega.Expect(job.Status.CompletedIndexes).Should(gomega.Equal("0,2"))
484 gomega.Expect(job.Status.Failed).Should(gomega.Equal(int32(4)))
485 gomega.Expect(job.Status.Succeeded).Should(gomega.Equal(int32(2)))
486 })
487
488
494 ginkgo.It("should terminate job execution when the number of failed indexes exceeds maxFailedIndexes", func(ctx context.Context) {
495
496 parallelism := int32(1)
497 completions := int32(4)
498 backoffLimit := int32(6)
499
500 ginkgo.By("Creating an indexed job with backoffLimit per index and maxFailedIndexes")
501 job := e2ejob.NewTestJob("fail", "with-max-failed-indexes", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit)
502 job.Spec.BackoffLimit = nil
503 job.Spec.BackoffLimitPerIndex = ptr.To[int32](0)
504 job.Spec.MaxFailedIndexes = ptr.To[int32](0)
505
506 mode := batchv1.IndexedCompletion
507 job.Spec.CompletionMode = &mode
508 job, err := e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job)
509 framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
510
511 ginkgo.By("Awaiting for the job to fail as the number of max failed indexes is exceeded")
512 err = e2ejob.WaitForJobFailed(f.ClientSet, f.Namespace.Name, job.Name)
513 framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name)
514
515 ginkgo.By("Verifying the Job status fields to ensure early termination of the job")
516 job, err = e2ejob.GetJob(ctx, f.ClientSet, f.Namespace.Name, job.Name)
517 framework.ExpectNoError(err, "failed to retrieve latest job object")
518 gomega.Expect(job.Status.FailedIndexes).Should(gomega.HaveValue(gomega.Equal("0")))
519 gomega.Expect(job.Status.Failed).Should(gomega.Equal(int32(1)))
520 })
521
522
529 ginkgo.It("should mark indexes as failed when the FailIndex action is matched in podFailurePolicy", func(ctx context.Context) {
530 parallelism := int32(2)
531 completions := int32(2)
532 backoffLimit := int32(6)
533
534 ginkgo.By("Creating an indexed job with failing pods matching the FailIndex action")
535 job := e2ejob.NewTestJob("failOddSucceedEven", "matching-fail-index-action", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit)
536 job.Spec.BackoffLimit = nil
537 job.Spec.BackoffLimitPerIndex = ptr.To[int32](1)
538 job.Spec.PodFailurePolicy = &batchv1.PodFailurePolicy{
539 Rules: []batchv1.PodFailurePolicyRule{
540 {
541 Action: batchv1.PodFailurePolicyActionFailIndex,
542 OnExitCodes: &batchv1.PodFailurePolicyOnExitCodesRequirement{
543 Operator: batchv1.PodFailurePolicyOnExitCodesOpIn,
544 Values: []int32{1},
545 },
546 },
547 },
548 }
549 mode := batchv1.IndexedCompletion
550 job.Spec.CompletionMode = &mode
551 job, err := e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job)
552 framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
553
554 ginkgo.By("Awaiting for the job to fail as all indexes are failed")
555 err = e2ejob.WaitForJobFailed(f.ClientSet, f.Namespace.Name, job.Name)
556 framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name)
557
558 ginkgo.By("Verifying the Job status fields to ensure the upper indexes didn't execute")
559 job, err = e2ejob.GetJob(ctx, f.ClientSet, f.Namespace.Name, job.Name)
560 framework.ExpectNoError(err, "failed to retrieve latest job object")
561 gomega.Expect(job.Status.FailedIndexes).Should(gomega.HaveValue(gomega.Equal("1")))
562 gomega.Expect(job.Status.CompletedIndexes).Should(gomega.Equal("0"))
563 gomega.Expect(job.Status.Failed).Should(gomega.Equal(int32(1)))
564 gomega.Expect(job.Status.Succeeded).Should(gomega.Equal(int32(1)))
565 })
566
567
572 ginkgo.It("should remove pods when job is deleted", func(ctx context.Context) {
573 parallelism := int32(2)
574 completions := int32(4)
575 backoffLimit := int32(6)
576
577 ginkgo.By("Creating a job")
578 job := e2ejob.NewTestJob("notTerminate", "all-pods-removed", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit)
579 job, err := e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job)
580 framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
581
582 ginkgo.By("Ensure pods equal to parallelism count is attached to the job")
583 err = e2ejob.WaitForJobPodsRunning(ctx, f.ClientSet, f.Namespace.Name, job.Name, parallelism)
584 framework.ExpectNoError(err, "failed to ensure number of pods associated with job %s is equal to parallelism count in namespace: %s", job.Name, f.Namespace.Name)
585
586 ginkgo.By("Delete the job")
587 err = e2eresource.DeleteResourceAndWaitForGC(ctx, f.ClientSet, batchinternal.Kind("Job"), f.Namespace.Name, job.Name)
588 framework.ExpectNoError(err, "failed to delete the job in namespace: %s", f.Namespace.Name)
589
590 ginkgo.By("Ensure the pods associated with the job are also deleted")
591 err = e2ejob.WaitForAllJobPodsGone(ctx, f.ClientSet, f.Namespace.Name, job.Name)
592 framework.ExpectNoError(err, "failed to get PodList for job %s in namespace: %s", job.Name, f.Namespace.Name)
593 })
594
595
601 framework.ConformanceIt("should run a job to completion when tasks sometimes fail and are locally restarted", func(ctx context.Context) {
602 parallelism := int32(2)
603 completions := int32(4)
604 backoffLimit := int32(6)
605
606 ginkgo.By("Creating a job")
607
608
609
610
611
612 job := e2ejob.NewTestJob("failOnce", "fail-once-local", v1.RestartPolicyOnFailure, parallelism, completions, nil, backoffLimit)
613 job, err := e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job)
614 framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
615
616 ginkgo.By("Ensuring job reaches completions")
617 err = e2ejob.WaitForJobComplete(ctx, f.ClientSet, f.Namespace.Name, job.Name, completions)
618 framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name)
619 })
620
621
622 ginkgo.It("should run a job to completion when tasks sometimes fail and are not locally restarted", func(ctx context.Context) {
623
624
625
626
627
628
629
630 ginkgo.By("Looking for a node to schedule job pod")
631 node, err := e2enode.GetRandomReadySchedulableNode(ctx, f.ClientSet)
632 framework.ExpectNoError(err)
633
634 parallelism := int32(2)
635 completions := int32(4)
636 backoffLimit := int32(6)
637
638 ginkgo.By("Creating a job")
639 job := e2ejob.NewTestJobOnNode("failOnce", "fail-once-non-local", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit, node.Name)
640 job, err = e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job)
641 framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
642
643 ginkgo.By("Ensuring job reaches completions")
644 err = e2ejob.WaitForJobComplete(ctx, f.ClientSet, f.Namespace.Name, job.Name, *job.Spec.Completions)
645 framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name)
646 })
647
648 ginkgo.It("should fail when exceeds active deadline", func(ctx context.Context) {
649 activeDeadlineSeconds := int64(1)
650 parallelism := int32(2)
651 completions := int32(4)
652 backoffLimit := int32(6)
653
654 ginkgo.By("Creating a job")
655 job := e2ejob.NewTestJob("notTerminate", "exceed-active-deadline", v1.RestartPolicyNever, parallelism, completions, &activeDeadlineSeconds, backoffLimit)
656 job, err := e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job)
657 framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
658 ginkgo.By("Ensuring job past active deadline")
659 err = waitForJobFailure(ctx, f.ClientSet, f.Namespace.Name, job.Name, time.Duration(activeDeadlineSeconds+15)*time.Second, "DeadlineExceeded")
660 framework.ExpectNoError(err, "failed to ensure job past active deadline in namespace: %s", f.Namespace.Name)
661 })
662
663
668 framework.ConformanceIt("should delete a job", func(ctx context.Context) {
669 parallelism := int32(2)
670 completions := int32(4)
671 backoffLimit := int32(6)
672
673 ginkgo.By("Creating a job")
674 job := e2ejob.NewTestJob("notTerminate", "foo", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit)
675 job, err := e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job)
676 framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
677
678 ginkgo.By("Ensuring active pods == parallelism")
679 err = e2ejob.WaitForJobPodsRunning(ctx, f.ClientSet, f.Namespace.Name, job.Name, parallelism)
680 framework.ExpectNoError(err, "failed to ensure active pods == parallelism in namespace: %s", f.Namespace.Name)
681
682 ginkgo.By("delete a job")
683 framework.ExpectNoError(e2eresource.DeleteResourceAndWaitForGC(ctx, f.ClientSet, batchinternal.Kind("Job"), f.Namespace.Name, job.Name))
684
685 ginkgo.By("Ensuring job was deleted")
686 _, err = e2ejob.GetJob(ctx, f.ClientSet, f.Namespace.Name, job.Name)
687 gomega.Expect(err).To(gomega.MatchError(apierrors.IsNotFound, fmt.Sprintf("failed to ensure job %s was deleted in namespace: %s", job.Name, f.Namespace.Name)))
688 })
689
690
697 framework.ConformanceIt("should adopt matching orphans and release non-matching pods", func(ctx context.Context) {
698 parallelism := int32(2)
699 completions := int32(4)
700 backoffLimit := int32(6)
701
702 ginkgo.By("Creating a job")
703 job := e2ejob.NewTestJob("notTerminate", "adopt-release", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit)
704
705
706 kind := job.Kind
707 job, err := e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job)
708 framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
709 job.Kind = kind
710
711 ginkgo.By("Ensuring active pods == parallelism")
712 err = e2ejob.WaitForJobPodsRunning(ctx, f.ClientSet, f.Namespace.Name, job.Name, parallelism)
713 framework.ExpectNoError(err, "failed to ensure active pods == parallelism in namespace: %s", f.Namespace.Name)
714
715 ginkgo.By("Orphaning one of the Job's Pods")
716 pods, err := e2ejob.GetJobPods(ctx, f.ClientSet, f.Namespace.Name, job.Name)
717 framework.ExpectNoError(err, "failed to get PodList for job %s in namespace: %s", job.Name, f.Namespace.Name)
718 gomega.Expect(pods.Items).To(gomega.HaveLen(int(parallelism)))
719 pod := pods.Items[0]
720 e2epod.NewPodClient(f).Update(ctx, pod.Name, func(pod *v1.Pod) {
721 pod.OwnerReferences = nil
722 })
723
724 ginkgo.By("Checking that the Job readopts the Pod")
725 gomega.Expect(e2epod.WaitForPodCondition(ctx, f.ClientSet, pod.Namespace, pod.Name, "adopted", e2ejob.JobTimeout,
726 func(pod *v1.Pod) (bool, error) {
727 controllerRef := metav1.GetControllerOf(pod)
728 if controllerRef == nil {
729 return false, nil
730 }
731 if controllerRef.Kind != job.Kind || controllerRef.Name != job.Name || controllerRef.UID != job.UID {
732 return false, fmt.Errorf("pod has wrong controllerRef: got %v, want %v", controllerRef, job)
733 }
734 return true, nil
735 },
736 )).To(gomega.Succeed(), "wait for pod %q to be readopted", pod.Name)
737
738 ginkgo.By("Removing the labels from the Job's Pod")
739 e2epod.NewPodClient(f).Update(ctx, pod.Name, func(pod *v1.Pod) {
740 pod.Labels = nil
741 })
742
743 ginkgo.By("Checking that the Job releases the Pod")
744 gomega.Expect(e2epod.WaitForPodCondition(ctx, f.ClientSet, pod.Namespace, pod.Name, "released", e2ejob.JobTimeout,
745 func(pod *v1.Pod) (bool, error) {
746 controllerRef := metav1.GetControllerOf(pod)
747 if controllerRef != nil {
748 return false, nil
749 }
750 return true, nil
751 },
752 )).To(gomega.Succeed(), "wait for pod %q to be released", pod.Name)
753 })
754
755 ginkgo.It("should fail to exceed backoffLimit", func(ctx context.Context) {
756 ginkgo.By("Creating a job")
757 backoff := 1
758 job := e2ejob.NewTestJob("fail", "backofflimit", v1.RestartPolicyNever, 1, 1, nil, int32(backoff))
759 job, err := e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job)
760 framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
761 ginkgo.By("Ensuring job exceed backofflimit")
762
763 err = waitForJobFailure(ctx, f.ClientSet, f.Namespace.Name, job.Name, e2ejob.JobTimeout, "BackoffLimitExceeded")
764 framework.ExpectNoError(err, "failed to ensure job exceed backofflimit in namespace: %s", f.Namespace.Name)
765
766 ginkgo.By(fmt.Sprintf("Checking that %d pod created and status is failed", backoff+1))
767 pods, err := e2ejob.GetJobPods(ctx, f.ClientSet, f.Namespace.Name, job.Name)
768 framework.ExpectNoError(err, "failed to get PodList for job %s in namespace: %s", job.Name, f.Namespace.Name)
769 gomega.Expect(pods.Items).To(gomega.HaveLen(backoff + 1))
770 for _, pod := range pods.Items {
771 gomega.Expect(pod.Status.Phase).To(gomega.Equal(v1.PodFailed))
772 }
773 })
774
775 f.It("should run a job to completion with CPU requests", f.WithSerial(), func(ctx context.Context) {
776 ginkgo.By("Creating a job that with CPU requests")
777
778 testNodeName := scheduling.GetNodeThatCanRunPod(ctx, f)
779 targetNode, err := f.ClientSet.CoreV1().Nodes().Get(ctx, testNodeName, metav1.GetOptions{})
780 framework.ExpectNoError(err, "unable to get node object for node %v", testNodeName)
781
782 cpu, ok := targetNode.Status.Allocatable[v1.ResourceCPU]
783 if !ok {
784 framework.Failf("Unable to get node's %q cpu", targetNode.Name)
785 }
786
787 cpuRequest := fmt.Sprint(int64(0.2 * float64(cpu.Value())))
788
789 parallelism := int32(90)
790 completions := int32(90)
791 backoffLimit := int32(0)
792
793 ginkgo.By("Creating a job")
794 job := e2ejob.NewTestJob("succeed", "all-succeed", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit)
795 for i := range job.Spec.Template.Spec.Containers {
796 job.Spec.Template.Spec.Containers[i].Resources = v1.ResourceRequirements{
797 Requests: v1.ResourceList{
798 v1.ResourceCPU: resource.MustParse(cpuRequest),
799 },
800 }
801 job.Spec.Template.Spec.NodeSelector = map[string]string{"kubernetes.io/hostname": testNodeName}
802 }
803
804 framework.Logf("Creating job %q with a node hostname selector %q with cpu request %q", job.Name, testNodeName, cpuRequest)
805 job, err = e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job)
806 framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
807
808 ginkgo.By("Ensuring job reaches completions")
809 err = e2ejob.WaitForJobComplete(ctx, f.ClientSet, f.Namespace.Name, job.Name, completions)
810 framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", f.Namespace.Name)
811
812 ginkgo.By("Ensuring pods for job exist")
813 pods, err := e2ejob.GetJobPods(ctx, f.ClientSet, f.Namespace.Name, job.Name)
814 framework.ExpectNoError(err, "failed to get pod list for job in namespace: %s", f.Namespace.Name)
815 successes := int32(0)
816 for _, pod := range pods.Items {
817 if pod.Status.Phase == v1.PodSucceeded {
818 successes++
819 }
820 }
821 gomega.Expect(successes).To(gomega.Equal(completions), "expected %d successful job pods, but got %d", completions, successes)
822 })
823
824
833 framework.ConformanceIt("should apply changes to a job status", func(ctx context.Context) {
834
835 ns := f.Namespace.Name
836 jClient := f.ClientSet.BatchV1().Jobs(ns)
837
838 parallelism := int32(2)
839 completions := int32(4)
840 backoffLimit := int32(6)
841
842 ginkgo.By("Creating a job")
843 job := e2ejob.NewTestJob("notTerminate", "suspend-false-to-true", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit)
844 job, err := e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job)
845 framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
846
847 ginkgo.By("Ensure pods equal to parallelism count is attached to the job")
848 err = e2ejob.WaitForJobPodsRunning(ctx, f.ClientSet, f.Namespace.Name, job.Name, parallelism)
849 framework.ExpectNoError(err, "failed to ensure number of pods associated with job %s is equal to parallelism count in namespace: %s", job.Name, f.Namespace.Name)
850
851 customConditionType := batchv1.JobConditionType("CustomConditionType")
852
853 ginkgo.By("patching /status")
854
855 now1 := metav1.Now().Rfc3339Copy()
856 jStatus := batchv1.JobStatus{
857 Conditions: []batchv1.JobCondition{
858 {
859 Type: customConditionType,
860 Status: v1.ConditionTrue,
861 LastTransitionTime: now1,
862 },
863 },
864 }
865
866 jStatusJSON, err := json.Marshal(jStatus)
867 framework.ExpectNoError(err)
868 patchedStatus, err := jClient.Patch(ctx, job.Name, types.MergePatchType,
869 []byte(`{"metadata":{"annotations":{"patchedstatus":"true"}},"status":`+string(jStatusJSON)+`}`),
870 metav1.PatchOptions{}, "status")
871 framework.ExpectNoError(err)
872 if condition := findConditionByType(patchedStatus.Status.Conditions, customConditionType); condition != nil {
873 if !condition.LastTransitionTime.Equal(&now1) {
874 framework.Failf("patched object should have the applied condition with LastTransitionTime %#v, got %#v instead", now1, condition.LastTransitionTime)
875 }
876 } else {
877 framework.Failf("patched object does not have the required condition %v", customConditionType)
878 }
879 gomega.Expect(patchedStatus.Annotations).To(gomega.HaveKeyWithValue("patchedstatus", "true"), "patched object should have the applied annotation")
880
881 ginkgo.By("updating /status")
882
883 now2 := metav1.Now().Rfc3339Copy()
884 var statusToUpdate, updatedStatus *batchv1.Job
885 err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
886 statusToUpdate, err = jClient.Get(ctx, job.Name, metav1.GetOptions{})
887 if err != nil {
888 return err
889 }
890 if condition := findConditionByType(statusToUpdate.Status.Conditions, customConditionType); condition != nil {
891 condition.LastTransitionTime = now2
892 } else {
893 framework.Failf("patched object does not have the required condition %v", customConditionType)
894 }
895 updatedStatus, err = jClient.UpdateStatus(ctx, statusToUpdate, metav1.UpdateOptions{})
896 return err
897 })
898 framework.ExpectNoError(err)
899 if condition := findConditionByType(updatedStatus.Status.Conditions, customConditionType); condition != nil {
900 if !condition.LastTransitionTime.Equal(&now2) {
901 framework.Failf("patched object should have the applied condition with LastTransitionTime %#v, got %#v instead", now2, condition.LastTransitionTime)
902 }
903 } else {
904 framework.Failf("patched object does not have the required condition %v", customConditionType)
905 }
906
907 ginkgo.By("get /status")
908 jResource := schema.GroupVersionResource{Group: "batch", Version: "v1", Resource: "jobs"}
909 gottenStatus, err := f.DynamicClient.Resource(jResource).Namespace(ns).Get(ctx, job.Name, metav1.GetOptions{}, "status")
910 framework.ExpectNoError(err)
911 statusUID, _, err := unstructured.NestedFieldCopy(gottenStatus.Object, "metadata", "uid")
912 framework.ExpectNoError(err)
913 gomega.Expect(string(job.UID)).To(gomega.Equal(statusUID), fmt.Sprintf("job.UID: %v expected to match statusUID: %v ", job.UID, statusUID))
914 })
915
916
927 framework.ConformanceIt("should manage the lifecycle of a job", func(ctx context.Context) {
928 jobName := "e2e-" + utilrand.String(5)
929 label := map[string]string{"e2e-job-label": jobName}
930 labelSelector := labels.SelectorFromSet(label).String()
931
932 ns := f.Namespace.Name
933 jobClient := f.ClientSet.BatchV1().Jobs(ns)
934
935 w := &cache.ListWatch{
936 WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
937 options.LabelSelector = labelSelector
938 return jobClient.Watch(ctx, options)
939 },
940 }
941 jobsList, err := jobClient.List(ctx, metav1.ListOptions{LabelSelector: labelSelector})
942 framework.ExpectNoError(err, "failed to list Job")
943
944 parallelism := int32(2)
945 completions := int32(4)
946 backoffLimit := int32(6)
947
948 ginkgo.By("Creating a suspended job")
949 job := e2ejob.NewTestJob("succeed", jobName, v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit)
950 job.Labels = label
951 job.Spec.Suspend = pointer.BoolPtr(true)
952 job, err = e2ejob.CreateJob(ctx, f.ClientSet, ns, job)
953 framework.ExpectNoError(err, "failed to create job in namespace: %s", ns)
954
955 ginkgo.By("Patching the Job")
956 payload := "{\"metadata\":{\"labels\":{\"" + jobName + "\":\"patched\"}}}"
957 patchedJob, err := f.ClientSet.BatchV1().Jobs(ns).Patch(ctx, jobName, types.StrategicMergePatchType, []byte(payload), metav1.PatchOptions{})
958 framework.ExpectNoError(err, "failed to patch Job %s in namespace %s", jobName, ns)
959
960 ginkgo.By("Watching for Job to be patched")
961 c := watchEventConfig{
962 framework: f,
963 resourceVersion: jobsList.ResourceVersion,
964 w: w,
965 jobName: jobName,
966 watchEvent: watch.Modified,
967 extJob: patchedJob,
968 updatedMetadataType: "label",
969 updatedKey: jobName,
970 updatedValue: "patched",
971 }
972 waitForJobEvent(ctx, c)
973 gomega.Expect(patchedJob.Labels).To(gomega.HaveKeyWithValue(jobName, "patched"), "Did not find job label for this job. Current labels: %v", patchedJob.Labels)
974
975 ginkgo.By("Updating the job")
976 var updatedJob *batchv1.Job
977
978 err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
979 patchedJob, err = jobClient.Get(ctx, jobName, metav1.GetOptions{})
980 framework.ExpectNoError(err, "Unable to get job %s", jobName)
981 patchedJob.Spec.Suspend = pointer.BoolPtr(false)
982 if patchedJob.Annotations == nil {
983 patchedJob.Annotations = map[string]string{}
984 }
985 patchedJob.Annotations["updated"] = "true"
986 updatedJob, err = e2ejob.UpdateJob(ctx, f.ClientSet, ns, patchedJob)
987 return err
988 })
989 framework.ExpectNoError(err, "failed to update job in namespace: %s", ns)
990
991 ginkgo.By("Watching for Job to be updated")
992 c = watchEventConfig{
993 framework: f,
994 resourceVersion: patchedJob.ResourceVersion,
995 w: w,
996 jobName: jobName,
997 watchEvent: watch.Modified,
998 extJob: updatedJob,
999 updatedMetadataType: "annotation",
1000 updatedKey: "updated",
1001 updatedValue: "true",
1002 }
1003 waitForJobEvent(ctx, c)
1004 gomega.Expect(updatedJob.Annotations).To(gomega.HaveKeyWithValue("updated", "true"), "updated Job should have the applied annotation")
1005 framework.Logf("Found Job annotations: %#v", patchedJob.Annotations)
1006
1007 ginkgo.By("Listing all Jobs with LabelSelector")
1008 jobs, err := f.ClientSet.BatchV1().Jobs("").List(ctx, metav1.ListOptions{LabelSelector: labelSelector})
1009 framework.ExpectNoError(err, "Failed to list job. %v", err)
1010 gomega.Expect(jobs.Items).To(gomega.HaveLen(1), "Failed to find job %v", jobName)
1011 testJob := jobs.Items[0]
1012 framework.Logf("Job: %v as labels: %v", testJob.Name, testJob.Labels)
1013
1014 ginkgo.By("Waiting for job to complete")
1015 err = e2ejob.WaitForJobComplete(ctx, f.ClientSet, ns, jobName, completions)
1016 framework.ExpectNoError(err, "failed to ensure job completion in namespace: %s", ns)
1017
1018 ginkgo.By("Delete a job collection with a labelselector")
1019 propagationPolicy := metav1.DeletePropagationBackground
1020 err = f.ClientSet.BatchV1().Jobs(ns).DeleteCollection(ctx, metav1.DeleteOptions{PropagationPolicy: &propagationPolicy}, metav1.ListOptions{LabelSelector: labelSelector})
1021 framework.ExpectNoError(err, "failed to delete job %s in namespace: %s", job.Name, ns)
1022
1023 ginkgo.By("Watching for Job to be deleted")
1024 c = watchEventConfig{
1025 framework: f,
1026 resourceVersion: updatedJob.ResourceVersion,
1027 w: w,
1028 jobName: jobName,
1029 watchEvent: watch.Deleted,
1030 extJob: &testJob,
1031 updatedMetadataType: "label",
1032 updatedKey: "e2e-job-label",
1033 updatedValue: jobName,
1034 }
1035 waitForJobEvent(ctx, c)
1036
1037 ginkgo.By("Relist jobs to confirm deletion")
1038 jobs, err = f.ClientSet.BatchV1().Jobs("").List(ctx, metav1.ListOptions{LabelSelector: labelSelector})
1039 framework.ExpectNoError(err, "Failed to list job. %v", err)
1040 gomega.Expect(jobs.Items).To(gomega.BeEmpty(), "Found job %v", jobName)
1041 })
1042
1043 ginkgo.It("should update the status ready field", func(ctx context.Context) {
1044 parallelism := int32(2)
1045 completions := int32(4)
1046 backoffLimit := int32(6)
1047
1048 ginkgo.By("Creating a job with suspend=true")
1049 job := e2ejob.NewTestJob("notTerminate", "all-ready", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit)
1050 job.Spec.Suspend = ptr.To[bool](true)
1051 job, err := e2ejob.CreateJob(ctx, f.ClientSet, f.Namespace.Name, job)
1052 framework.ExpectNoError(err, "failed to create job in namespace: %s", f.Namespace.Name)
1053
1054 ginkgo.By("Ensure the job controller updates the status.ready field")
1055 err = e2ejob.WaitForJobReady(ctx, f.ClientSet, f.Namespace.Name, job.Name, ptr.To[int32](0))
1056 framework.ExpectNoError(err, "failed to ensure job status ready field in namespace: %s", f.Namespace.Name)
1057
1058 ginkgo.By("Updating the job with suspend=false")
1059 err = updateJobSuspendWithRetries(ctx, f, job, ptr.To[bool](false))
1060 framework.ExpectNoError(err, "failed to update job in namespace: %s", f.Namespace.Name)
1061
1062 ginkgo.By("Ensure the job controller updates the status.ready field")
1063 err = e2ejob.WaitForJobReady(ctx, f.ClientSet, f.Namespace.Name, job.Name, ¶llelism)
1064 framework.ExpectNoError(err, "failed to ensure job status ready field in namespace: %s", f.Namespace.Name)
1065
1066 ginkgo.By("Updating the job with suspend=true")
1067 err = updateJobSuspendWithRetries(ctx, f, job, ptr.To[bool](true))
1068 framework.ExpectNoError(err, "failed to update job in namespace: %s", f.Namespace.Name)
1069
1070 ginkgo.By("Ensure the job controller updates the status.ready field")
1071 err = e2ejob.WaitForJobReady(ctx, f.ClientSet, f.Namespace.Name, job.Name, ptr.To[int32](0))
1072 framework.ExpectNoError(err, "failed to ensure job status ready field in namespace: %s", f.Namespace.Name)
1073 })
1074 })
1075
1076 func updateJobSuspendWithRetries(ctx context.Context, f *framework.Framework, job *batchv1.Job, suspend *bool) error {
1077 return retry.RetryOnConflict(retry.DefaultRetry, func() error {
1078 job, err := e2ejob.GetJob(ctx, f.ClientSet, f.Namespace.Name, job.Name)
1079 framework.ExpectNoError(err, "unable to get job %s in namespace %s", job.Name, f.Namespace.Name)
1080 job.Spec.Suspend = suspend
1081 _, err = e2ejob.UpdateJob(ctx, f.ClientSet, f.Namespace.Name, job)
1082 return err
1083 })
1084 }
1085
1086
1087
1088
1089 func waitForJobEvent(ctx context.Context, config watchEventConfig) {
1090 f := config.framework
1091 ctx, cancel := context.WithTimeout(ctx, f.Timeouts.PodStartShort)
1092 defer cancel()
1093 _, err := watchtools.Until(ctx, config.resourceVersion, config.w, func(event watch.Event) (bool, error) {
1094 if job, ok := event.Object.(*batchv1.Job); ok {
1095
1096 var key string
1097 switch config.updatedMetadataType {
1098 case "annotation":
1099 key = job.Annotations[config.updatedKey]
1100 case "label":
1101 key = job.Labels[config.updatedKey]
1102 }
1103
1104 found := job.ObjectMeta.Name == config.extJob.ObjectMeta.Name &&
1105 job.ObjectMeta.Namespace == f.Namespace.Name &&
1106 key == config.updatedValue &&
1107 event.Type == config.watchEvent
1108 if !found {
1109 framework.Logf("Event %v observed for Job %v in namespace %v with labels: %v and annotations: %v", event.Type, job.ObjectMeta.Name, job.ObjectMeta.Namespace, job.Labels, job.Annotations)
1110 return false, nil
1111 }
1112 framework.Logf("Event %v found for Job %v in namespace %v with labels: %v and annotations: %v", event.Type, job.ObjectMeta.Name, job.ObjectMeta.Namespace, job.Labels, job.Annotations)
1113 return found, nil
1114 }
1115 framework.Logf("Observed event: %+v", event.Object)
1116 return false, nil
1117 })
1118 if err != nil {
1119 j, _ := f.ClientSet.BatchV1().Jobs(f.Namespace.Name).Get(ctx, config.jobName, metav1.GetOptions{})
1120 framework.Logf("We missed the %v event. Job details: %+v", config.watchEvent, j)
1121 }
1122 }
1123
1124
1125 func waitForJobFailure(ctx context.Context, c clientset.Interface, ns, jobName string, timeout time.Duration, reason string) error {
1126 return wait.Poll(framework.Poll, timeout, func() (bool, error) {
1127 curr, err := c.BatchV1().Jobs(ns).Get(ctx, jobName, metav1.GetOptions{})
1128 if err != nil {
1129 return false, err
1130 }
1131 for _, c := range curr.Status.Conditions {
1132 if c.Type == batchv1.JobFailed && c.Status == v1.ConditionTrue {
1133 if reason == "" || reason == c.Reason {
1134 return true, nil
1135 }
1136 }
1137 }
1138 return false, nil
1139 })
1140 }
1141
1142 func findConditionByType(list []batchv1.JobCondition, cType batchv1.JobConditionType) *batchv1.JobCondition {
1143 for i := range list {
1144 if list[i].Type == cType {
1145 return &list[i]
1146 }
1147 }
1148 return nil
1149 }
1150
View as plain text