1
16
17 package job
18
19 import (
20 "context"
21 "errors"
22 "fmt"
23 "math"
24 "sort"
25 "strconv"
26 "testing"
27 "time"
28
29 "github.com/google/go-cmp/cmp"
30 "github.com/google/go-cmp/cmp/cmpopts"
31 batch "k8s.io/api/batch/v1"
32 v1 "k8s.io/api/core/v1"
33 apiequality "k8s.io/apimachinery/pkg/api/equality"
34 apierrors "k8s.io/apimachinery/pkg/api/errors"
35 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
36 "k8s.io/apimachinery/pkg/runtime/schema"
37 "k8s.io/apimachinery/pkg/types"
38 "k8s.io/apimachinery/pkg/util/rand"
39 "k8s.io/apimachinery/pkg/util/sets"
40 "k8s.io/apimachinery/pkg/util/uuid"
41 "k8s.io/apimachinery/pkg/util/wait"
42 "k8s.io/apimachinery/pkg/watch"
43 "k8s.io/apiserver/pkg/util/feature"
44 "k8s.io/client-go/informers"
45 clientset "k8s.io/client-go/kubernetes"
46 "k8s.io/client-go/kubernetes/fake"
47 restclient "k8s.io/client-go/rest"
48 core "k8s.io/client-go/testing"
49 "k8s.io/client-go/tools/cache"
50 "k8s.io/client-go/util/workqueue"
51 featuregatetesting "k8s.io/component-base/featuregate/testing"
52 metricstestutil "k8s.io/component-base/metrics/testutil"
53 "k8s.io/klog/v2"
54 "k8s.io/klog/v2/ktesting"
55 _ "k8s.io/kubernetes/pkg/apis/core/install"
56 "k8s.io/kubernetes/pkg/controller"
57 "k8s.io/kubernetes/pkg/controller/job/metrics"
58 "k8s.io/kubernetes/pkg/controller/testutil"
59 "k8s.io/kubernetes/pkg/features"
60 "k8s.io/utils/clock"
61 clocktesting "k8s.io/utils/clock/testing"
62 "k8s.io/utils/ptr"
63 )
64
65 var realClock = &clock.RealClock{}
66 var alwaysReady = func() bool { return true }
67
68 const fastSyncJobBatchPeriod = 10 * time.Millisecond
69 const fastJobApiBackoff = 10 * time.Millisecond
70 const fastRequeue = 10 * time.Millisecond
71
72
73
74 var testFinishedAt = metav1.NewTime((time.Time{}).Add(time.Second))
75
76 func newJobWithName(name string, parallelism, completions, backoffLimit int32, completionMode batch.CompletionMode) *batch.Job {
77 j := &batch.Job{
78 TypeMeta: metav1.TypeMeta{Kind: "Job"},
79 ObjectMeta: metav1.ObjectMeta{
80 Name: name,
81 UID: uuid.NewUUID(),
82 Namespace: metav1.NamespaceDefault,
83 },
84 Spec: batch.JobSpec{
85 Selector: &metav1.LabelSelector{
86 MatchLabels: map[string]string{"foo": "bar"},
87 },
88 Template: v1.PodTemplateSpec{
89 ObjectMeta: metav1.ObjectMeta{
90 Labels: map[string]string{
91 "foo": "bar",
92 },
93 },
94 Spec: v1.PodSpec{
95 Containers: []v1.Container{
96 {Image: "foo/bar"},
97 },
98 },
99 },
100 },
101 }
102 if completionMode != "" {
103 j.Spec.CompletionMode = &completionMode
104 }
105
106
107 if completions >= 0 {
108 j.Spec.Completions = &completions
109 } else {
110 j.Spec.Completions = nil
111 }
112 if parallelism >= 0 {
113 j.Spec.Parallelism = ¶llelism
114 } else {
115 j.Spec.Parallelism = nil
116 }
117 j.Spec.BackoffLimit = &backoffLimit
118
119 return j
120 }
121
122 func newJob(parallelism, completions, backoffLimit int32, completionMode batch.CompletionMode) *batch.Job {
123 return newJobWithName("foobar", parallelism, completions, backoffLimit, completionMode)
124 }
125
126 func newControllerFromClient(ctx context.Context, t *testing.T, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc) (*Controller, informers.SharedInformerFactory) {
127 t.Helper()
128 return newControllerFromClientWithClock(ctx, t, kubeClient, resyncPeriod, realClock)
129 }
130
131 func newControllerFromClientWithClock(ctx context.Context, t *testing.T, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, clock clock.WithTicker) (*Controller, informers.SharedInformerFactory) {
132 t.Helper()
133 sharedInformers := informers.NewSharedInformerFactory(kubeClient, resyncPeriod())
134 jm, err := newControllerWithClock(ctx, sharedInformers.Core().V1().Pods(), sharedInformers.Batch().V1().Jobs(), kubeClient, clock)
135 if err != nil {
136 t.Fatalf("Error creating Job controller: %v", err)
137 }
138 jm.podControl = &controller.FakePodControl{}
139 return jm, sharedInformers
140 }
141
142 func newPod(name string, job *batch.Job) *v1.Pod {
143 return &v1.Pod{
144 ObjectMeta: metav1.ObjectMeta{
145 Name: name,
146 UID: types.UID(name),
147 Labels: job.Spec.Selector.MatchLabels,
148 Namespace: job.Namespace,
149 OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(job, controllerKind)},
150 },
151 }
152 }
153
154
155 func newPodList(count int, status v1.PodPhase, job *batch.Job) []*v1.Pod {
156 var pods []*v1.Pod
157 for i := 0; i < count; i++ {
158 newPod := newPod(fmt.Sprintf("pod-%v", rand.String(10)), job)
159 newPod.Status = v1.PodStatus{Phase: status}
160 newPod.Status.ContainerStatuses = []v1.ContainerStatus{
161 {
162 State: v1.ContainerState{
163 Terminated: &v1.ContainerStateTerminated{
164 FinishedAt: testFinishedAt,
165 },
166 },
167 },
168 }
169 newPod.Finalizers = append(newPod.Finalizers, batch.JobTrackingFinalizer)
170 pods = append(pods, newPod)
171 }
172 return pods
173 }
174
175 func setPodsStatuses(podIndexer cache.Indexer, job *batch.Job, pendingPods, activePods, succeededPods, failedPods, terminatingPods, readyPods int) {
176 for _, pod := range newPodList(pendingPods, v1.PodPending, job) {
177 podIndexer.Add(pod)
178 }
179 running := newPodList(activePods, v1.PodRunning, job)
180 for i, p := range running {
181 if i >= readyPods {
182 break
183 }
184 p.Status.Conditions = append(p.Status.Conditions, v1.PodCondition{
185 Type: v1.PodReady,
186 Status: v1.ConditionTrue,
187 })
188 }
189 for _, pod := range running {
190 podIndexer.Add(pod)
191 }
192 for _, pod := range newPodList(succeededPods, v1.PodSucceeded, job) {
193 podIndexer.Add(pod)
194 }
195 for _, pod := range newPodList(failedPods, v1.PodFailed, job) {
196 podIndexer.Add(pod)
197 }
198 terminating := newPodList(terminatingPods, v1.PodRunning, job)
199 for _, p := range terminating {
200 now := metav1.Now()
201 p.DeletionTimestamp = &now
202 }
203 for _, pod := range terminating {
204 podIndexer.Add(pod)
205 }
206 }
207
208 func setPodsStatusesWithIndexes(podIndexer cache.Indexer, job *batch.Job, status []indexPhase) {
209 for _, s := range status {
210 p := newPod(fmt.Sprintf("pod-%s", rand.String(10)), job)
211 p.Status = v1.PodStatus{Phase: s.Phase}
212 if s.Phase == v1.PodFailed || s.Phase == v1.PodSucceeded {
213 p.Status.ContainerStatuses = []v1.ContainerStatus{
214 {
215 State: v1.ContainerState{
216 Terminated: &v1.ContainerStateTerminated{
217 FinishedAt: testFinishedAt,
218 },
219 },
220 },
221 }
222 }
223 if s.Index != noIndex {
224 p.Annotations = map[string]string{
225 batch.JobCompletionIndexAnnotation: s.Index,
226 }
227 p.Spec.Hostname = fmt.Sprintf("%s-%s", job.Name, s.Index)
228 }
229 p.Finalizers = append(p.Finalizers, batch.JobTrackingFinalizer)
230 podIndexer.Add(p)
231 }
232 }
233
234 type jobInitialStatus struct {
235 active int
236 succeed int
237 failed int
238 startTime *time.Time
239 }
240
241 func TestControllerSyncJob(t *testing.T) {
242 _, ctx := ktesting.NewTestContext(t)
243 jobConditionComplete := batch.JobComplete
244 jobConditionFailed := batch.JobFailed
245 jobConditionSuspended := batch.JobSuspended
246 referenceTime := time.Now()
247
248 testCases := map[string]struct {
249
250 parallelism int32
251 completions int32
252 backoffLimit int32
253 deleting bool
254 podLimit int
255 completionMode batch.CompletionMode
256 wasSuspended bool
257 suspend bool
258 podReplacementPolicy *batch.PodReplacementPolicy
259 podFailurePolicy *batch.PodFailurePolicy
260 initialStatus *jobInitialStatus
261 backoffRecord *backoffRecord
262 controllerTime *time.Time
263
264
265
266
267
268
269 podControllerError error
270 pendingPods int
271 activePods int
272 readyPods int
273 succeededPods int
274 failedPods int
275 terminatingPods int
276 podsWithIndexes []indexPhase
277 fakeExpectationAtCreation int32
278
279
280 expectedCreations int32
281 expectedDeletions int32
282 expectedActive int32
283 expectedReady *int32
284 expectedSucceeded int32
285 expectedCompletedIdxs string
286 expectedFailed int32
287 expectedTerminating *int32
288 expectedCondition *batch.JobConditionType
289 expectedConditionStatus v1.ConditionStatus
290 expectedConditionReason string
291 expectedCreatedIndexes sets.Set[int]
292 expectedPodPatches int
293
294
295 podIndexLabelDisabled bool
296 jobPodReplacementPolicy bool
297 jobPodFailurePolicy bool
298 }{
299 "job start": {
300 parallelism: 2,
301 completions: 5,
302 backoffLimit: 6,
303 expectedCreations: 2,
304 expectedActive: 2,
305 expectedReady: ptr.To[int32](0),
306 },
307 "WQ job start": {
308 parallelism: 2,
309 completions: -1,
310 backoffLimit: 6,
311 expectedCreations: 2,
312 expectedActive: 2,
313 expectedReady: ptr.To[int32](0),
314 },
315 "pending pods": {
316 parallelism: 2,
317 completions: 5,
318 backoffLimit: 6,
319 pendingPods: 2,
320 expectedActive: 2,
321 expectedReady: ptr.To[int32](0),
322 },
323 "correct # of pods": {
324 parallelism: 3,
325 completions: 5,
326 backoffLimit: 6,
327 activePods: 3,
328 readyPods: 2,
329 expectedActive: 3,
330 expectedReady: ptr.To[int32](2),
331 },
332 "WQ job: correct # of pods": {
333 parallelism: 2,
334 completions: -1,
335 backoffLimit: 6,
336 activePods: 2,
337 expectedActive: 2,
338 expectedReady: ptr.To[int32](0),
339 },
340 "too few active pods": {
341 parallelism: 2,
342 completions: 5,
343 backoffLimit: 6,
344 activePods: 1,
345 succeededPods: 1,
346 expectedCreations: 1,
347 expectedActive: 2,
348 expectedSucceeded: 1,
349 expectedPodPatches: 1,
350 expectedReady: ptr.To[int32](0),
351 },
352 "WQ job: recreate pods when failed": {
353 parallelism: 1,
354 completions: -1,
355 backoffLimit: 6,
356 activePods: 1,
357 failedPods: 1,
358 podReplacementPolicy: podReplacementPolicy(batch.Failed),
359 jobPodReplacementPolicy: true,
360 terminatingPods: 1,
361 expectedTerminating: ptr.To[int32](1),
362 expectedReady: ptr.To[int32](0),
363
364 expectedPodPatches: 1,
365 expectedFailed: 1,
366 expectedActive: 1,
367 },
368 "WQ job: turn on PodReplacementPolicy but not set PodReplacementPolicy": {
369 parallelism: 1,
370 completions: 1,
371 backoffLimit: 6,
372 activePods: 1,
373 failedPods: 1,
374 jobPodReplacementPolicy: true,
375 expectedTerminating: ptr.To[int32](1),
376 expectedReady: ptr.To[int32](0),
377 terminatingPods: 1,
378 expectedActive: 1,
379 expectedPodPatches: 2,
380 expectedFailed: 2,
381 },
382 "WQ job: recreate pods when terminating or failed": {
383 parallelism: 1,
384 completions: -1,
385 backoffLimit: 6,
386 activePods: 1,
387 failedPods: 1,
388 podReplacementPolicy: podReplacementPolicy(batch.TerminatingOrFailed),
389 jobPodReplacementPolicy: true,
390 terminatingPods: 1,
391 expectedTerminating: ptr.To[int32](1),
392 expectedReady: ptr.To[int32](0),
393 expectedActive: 1,
394 expectedPodPatches: 2,
395 expectedFailed: 2,
396 },
397 "more terminating pods than parallelism": {
398 parallelism: 1,
399 completions: 1,
400 backoffLimit: 6,
401 activePods: 2,
402 failedPods: 0,
403 terminatingPods: 4,
404 podReplacementPolicy: podReplacementPolicy(batch.Failed),
405 jobPodReplacementPolicy: true,
406 expectedTerminating: ptr.To[int32](4),
407 expectedReady: ptr.To[int32](0),
408 expectedActive: 1,
409 expectedDeletions: 1,
410 expectedPodPatches: 1,
411 },
412 "more terminating pods than parallelism; PodFailurePolicy used": {
413
414 parallelism: 1,
415 completions: 1,
416 backoffLimit: 6,
417 activePods: 2,
418 failedPods: 0,
419 terminatingPods: 4,
420 jobPodFailurePolicy: true,
421 podFailurePolicy: &batch.PodFailurePolicy{},
422 expectedTerminating: nil,
423 expectedReady: ptr.To[int32](0),
424 expectedActive: 1,
425 expectedDeletions: 1,
426 expectedPodPatches: 1,
427 },
428 "too few active pods and active back-off": {
429 parallelism: 1,
430 completions: 1,
431 backoffLimit: 6,
432 backoffRecord: &backoffRecord{
433 failuresAfterLastSuccess: 1,
434 lastFailureTime: &referenceTime,
435 },
436 initialStatus: &jobInitialStatus{
437 startTime: func() *time.Time {
438 now := time.Now()
439 return &now
440 }(),
441 },
442 activePods: 0,
443 succeededPods: 0,
444 expectedCreations: 0,
445 expectedActive: 0,
446 expectedSucceeded: 0,
447 expectedPodPatches: 0,
448 expectedReady: ptr.To[int32](0),
449 controllerTime: &referenceTime,
450 },
451 "too few active pods and no back-offs": {
452 parallelism: 1,
453 completions: 1,
454 backoffLimit: 6,
455 backoffRecord: &backoffRecord{
456 failuresAfterLastSuccess: 0,
457 lastFailureTime: &referenceTime,
458 },
459 activePods: 0,
460 succeededPods: 0,
461 expectedCreations: 1,
462 expectedActive: 1,
463 expectedSucceeded: 0,
464 expectedPodPatches: 0,
465 expectedReady: ptr.To[int32](0),
466 controllerTime: &referenceTime,
467 },
468 "too few active pods with a dynamic job": {
469 parallelism: 2,
470 completions: -1,
471 backoffLimit: 6,
472 activePods: 1,
473 expectedCreations: 1,
474 expectedActive: 2,
475 expectedReady: ptr.To[int32](0),
476 },
477 "too few active pods, with controller error": {
478 parallelism: 2,
479 completions: 5,
480 backoffLimit: 6,
481 podControllerError: fmt.Errorf("fake error"),
482 activePods: 1,
483 succeededPods: 1,
484 expectedCreations: 1,
485 expectedActive: 1,
486 expectedSucceeded: 0,
487 expectedPodPatches: 1,
488 expectedReady: ptr.To[int32](0),
489 },
490 "too many active pods": {
491 parallelism: 2,
492 completions: 5,
493 backoffLimit: 6,
494 activePods: 3,
495 expectedDeletions: 1,
496 expectedActive: 2,
497 expectedPodPatches: 1,
498 expectedReady: ptr.To[int32](0),
499 },
500 "too many active pods, with controller error": {
501 parallelism: 2,
502 completions: 5,
503 backoffLimit: 6,
504 podControllerError: fmt.Errorf("fake error"),
505 activePods: 3,
506 expectedDeletions: 0,
507 expectedPodPatches: 1,
508 expectedActive: 3,
509 expectedReady: ptr.To[int32](0),
510 },
511 "failed + succeed pods: reset backoff delay": {
512 parallelism: 2,
513 completions: 5,
514 backoffLimit: 6,
515 activePods: 1,
516 succeededPods: 1,
517 failedPods: 1,
518 expectedCreations: 1,
519 expectedActive: 2,
520 expectedSucceeded: 1,
521 expectedFailed: 1,
522 expectedPodPatches: 2,
523 expectedReady: ptr.To[int32](0),
524 },
525 "new failed pod": {
526 parallelism: 2,
527 completions: 5,
528 backoffLimit: 6,
529 activePods: 1,
530 failedPods: 1,
531 expectedCreations: 1,
532 expectedActive: 2,
533 expectedFailed: 1,
534 expectedPodPatches: 1,
535 expectedReady: ptr.To[int32](0),
536 },
537 "no new pod; possible finalizer update of failed pod": {
538 parallelism: 1,
539 completions: 1,
540 backoffLimit: 6,
541 initialStatus: &jobInitialStatus{
542 active: 1,
543 succeed: 0,
544 failed: 1,
545 },
546 activePods: 1,
547 failedPods: 0,
548 expectedCreations: 0,
549 expectedActive: 1,
550 expectedFailed: 1,
551 expectedPodPatches: 0,
552 expectedReady: ptr.To[int32](0),
553 },
554 "only new failed pod with controller error": {
555 parallelism: 2,
556 completions: 5,
557 backoffLimit: 6,
558 podControllerError: fmt.Errorf("fake error"),
559 activePods: 1,
560 failedPods: 1,
561 expectedCreations: 1,
562 expectedActive: 1,
563 expectedFailed: 0,
564 expectedPodPatches: 1,
565 expectedReady: ptr.To[int32](0),
566 },
567 "job finish": {
568 parallelism: 2,
569 completions: 5,
570 backoffLimit: 6,
571 succeededPods: 5,
572 expectedSucceeded: 5,
573 expectedCondition: &jobConditionComplete,
574 expectedConditionStatus: v1.ConditionTrue,
575 expectedPodPatches: 5,
576 expectedReady: ptr.To[int32](0),
577 },
578 "WQ job finishing": {
579 parallelism: 2,
580 completions: -1,
581 backoffLimit: 6,
582 activePods: 1,
583 succeededPods: 1,
584 expectedActive: 1,
585 expectedSucceeded: 1,
586 expectedPodPatches: 1,
587 expectedReady: ptr.To[int32](0),
588 },
589 "WQ job all finished": {
590 parallelism: 2,
591 completions: -1,
592 backoffLimit: 6,
593 succeededPods: 2,
594 expectedSucceeded: 2,
595 expectedCondition: &jobConditionComplete,
596 expectedConditionStatus: v1.ConditionTrue,
597 expectedPodPatches: 2,
598 expectedReady: ptr.To[int32](0),
599 },
600 "WQ job all finished despite one failure": {
601 parallelism: 2,
602 completions: -1,
603 backoffLimit: 6,
604 succeededPods: 1,
605 failedPods: 1,
606 expectedSucceeded: 1,
607 expectedFailed: 1,
608 expectedCondition: &jobConditionComplete,
609 expectedConditionStatus: v1.ConditionTrue,
610 expectedPodPatches: 2,
611 expectedReady: ptr.To[int32](0),
612 },
613 "more active pods than parallelism": {
614 parallelism: 2,
615 completions: 5,
616 backoffLimit: 6,
617 activePods: 10,
618 expectedDeletions: 8,
619 expectedActive: 2,
620 expectedPodPatches: 8,
621 expectedReady: ptr.To[int32](0),
622 },
623 "more active pods than remaining completions": {
624 parallelism: 3,
625 completions: 4,
626 backoffLimit: 6,
627 activePods: 3,
628 succeededPods: 2,
629 expectedDeletions: 1,
630 expectedActive: 2,
631 expectedSucceeded: 2,
632 expectedPodPatches: 3,
633 expectedReady: ptr.To[int32](0),
634 },
635 "status change": {
636 parallelism: 2,
637 completions: 5,
638 backoffLimit: 6,
639 activePods: 2,
640 succeededPods: 2,
641 expectedActive: 2,
642 expectedSucceeded: 2,
643 expectedPodPatches: 2,
644 expectedReady: ptr.To[int32](0),
645 },
646 "deleting job": {
647 parallelism: 2,
648 completions: 5,
649 backoffLimit: 6,
650 deleting: true,
651 pendingPods: 1,
652 activePods: 1,
653 succeededPods: 1,
654 expectedActive: 2,
655 expectedSucceeded: 1,
656 expectedPodPatches: 3,
657 expectedReady: ptr.To[int32](0),
658 },
659 "limited pods": {
660 parallelism: 100,
661 completions: 200,
662 backoffLimit: 6,
663 podLimit: 10,
664 expectedCreations: 10,
665 expectedActive: 10,
666 expectedReady: ptr.To[int32](0),
667 },
668 "too many job failures": {
669 parallelism: 2,
670 completions: 5,
671 deleting: true,
672 failedPods: 1,
673 expectedFailed: 1,
674 expectedCondition: &jobConditionFailed,
675 expectedConditionStatus: v1.ConditionTrue,
676 expectedConditionReason: "BackoffLimitExceeded",
677 expectedPodPatches: 1,
678 expectedReady: ptr.To[int32](0),
679 },
680 "job failures, unsatisfied expectations": {
681 parallelism: 2,
682 completions: 5,
683 deleting: true,
684 failedPods: 1,
685 fakeExpectationAtCreation: 1,
686 expectedFailed: 1,
687 expectedPodPatches: 1,
688 expectedReady: ptr.To[int32](0),
689 },
690 "indexed job start": {
691 parallelism: 2,
692 completions: 5,
693 backoffLimit: 6,
694 completionMode: batch.IndexedCompletion,
695 expectedCreations: 2,
696 expectedActive: 2,
697 expectedCreatedIndexes: sets.New(0, 1),
698 expectedReady: ptr.To[int32](0),
699 },
700 "indexed job with some pods deleted, podReplacementPolicy Failed": {
701 parallelism: 2,
702 completions: 5,
703 backoffLimit: 6,
704 completionMode: batch.IndexedCompletion,
705 expectedCreations: 1,
706 expectedActive: 1,
707 expectedCreatedIndexes: sets.New(0),
708 podReplacementPolicy: podReplacementPolicy(batch.Failed),
709 jobPodReplacementPolicy: true,
710 terminatingPods: 1,
711 expectedTerminating: ptr.To[int32](1),
712 expectedReady: ptr.To[int32](0),
713 },
714 "indexed job with some pods deleted, podReplacementPolicy TerminatingOrFailed": {
715 parallelism: 2,
716 completions: 5,
717 backoffLimit: 6,
718 completionMode: batch.IndexedCompletion,
719 expectedCreations: 2,
720 expectedActive: 2,
721 expectedCreatedIndexes: sets.New(0, 1),
722 podReplacementPolicy: podReplacementPolicy(batch.TerminatingOrFailed),
723 jobPodReplacementPolicy: true,
724 terminatingPods: 1,
725 expectedTerminating: ptr.To[int32](1),
726 expectedReady: ptr.To[int32](0),
727 expectedPodPatches: 1,
728 },
729 "indexed job completed": {
730 parallelism: 2,
731 completions: 3,
732 backoffLimit: 6,
733 completionMode: batch.IndexedCompletion,
734 podsWithIndexes: []indexPhase{
735 {"0", v1.PodSucceeded},
736 {"1", v1.PodFailed},
737 {"1", v1.PodSucceeded},
738 {"2", v1.PodSucceeded},
739 },
740 expectedSucceeded: 3,
741 expectedFailed: 1,
742 expectedCompletedIdxs: "0-2",
743 expectedCondition: &jobConditionComplete,
744 expectedConditionStatus: v1.ConditionTrue,
745 expectedPodPatches: 4,
746 expectedReady: ptr.To[int32](0),
747 },
748 "indexed job repeated completed index": {
749 parallelism: 2,
750 completions: 3,
751 backoffLimit: 6,
752 completionMode: batch.IndexedCompletion,
753 podsWithIndexes: []indexPhase{
754 {"0", v1.PodSucceeded},
755 {"1", v1.PodSucceeded},
756 {"1", v1.PodSucceeded},
757 },
758 expectedCreations: 1,
759 expectedActive: 1,
760 expectedSucceeded: 2,
761 expectedCompletedIdxs: "0,1",
762 expectedCreatedIndexes: sets.New(2),
763 expectedPodPatches: 3,
764 expectedReady: ptr.To[int32](0),
765 },
766 "indexed job some running and completed pods": {
767 parallelism: 8,
768 completions: 20,
769 backoffLimit: 6,
770 completionMode: batch.IndexedCompletion,
771 podsWithIndexes: []indexPhase{
772 {"0", v1.PodRunning},
773 {"2", v1.PodSucceeded},
774 {"3", v1.PodPending},
775 {"4", v1.PodSucceeded},
776 {"5", v1.PodSucceeded},
777 {"7", v1.PodSucceeded},
778 {"8", v1.PodSucceeded},
779 {"9", v1.PodSucceeded},
780 },
781 expectedCreations: 6,
782 expectedActive: 8,
783 expectedSucceeded: 6,
784 expectedCompletedIdxs: "2,4,5,7-9",
785 expectedCreatedIndexes: sets.New(1, 6, 10, 11, 12, 13),
786 expectedPodPatches: 6,
787 expectedReady: ptr.To[int32](0),
788 },
789 "indexed job some failed pods": {
790 parallelism: 3,
791 completions: 4,
792 backoffLimit: 6,
793 completionMode: batch.IndexedCompletion,
794 podsWithIndexes: []indexPhase{
795 {"0", v1.PodFailed},
796 {"1", v1.PodPending},
797 {"2", v1.PodFailed},
798 },
799 expectedCreations: 2,
800 expectedActive: 3,
801 expectedFailed: 2,
802 expectedCreatedIndexes: sets.New(0, 2),
803 expectedPodPatches: 2,
804 expectedReady: ptr.To[int32](0),
805 },
806 "indexed job some pods without index": {
807 parallelism: 2,
808 completions: 5,
809 backoffLimit: 6,
810 completionMode: batch.IndexedCompletion,
811 activePods: 1,
812 succeededPods: 1,
813 failedPods: 1,
814 podsWithIndexes: []indexPhase{
815 {"invalid", v1.PodRunning},
816 {"invalid", v1.PodSucceeded},
817 {"invalid", v1.PodFailed},
818 {"invalid", v1.PodPending},
819 {"0", v1.PodSucceeded},
820 {"1", v1.PodRunning},
821 {"2", v1.PodRunning},
822 },
823 expectedDeletions: 3,
824 expectedActive: 2,
825 expectedSucceeded: 1,
826 expectedFailed: 0,
827 expectedCompletedIdxs: "0",
828 expectedPodPatches: 8,
829 expectedReady: ptr.To[int32](0),
830 },
831 "indexed job repeated indexes": {
832 parallelism: 5,
833 completions: 5,
834 backoffLimit: 6,
835 completionMode: batch.IndexedCompletion,
836 succeededPods: 1,
837 failedPods: 1,
838 podsWithIndexes: []indexPhase{
839 {"invalid", v1.PodRunning},
840 {"0", v1.PodSucceeded},
841 {"1", v1.PodRunning},
842 {"2", v1.PodRunning},
843 {"2", v1.PodPending},
844 },
845 expectedCreations: 0,
846 expectedDeletions: 2,
847 expectedActive: 2,
848 expectedSucceeded: 1,
849 expectedCompletedIdxs: "0",
850 expectedPodPatches: 5,
851 expectedReady: ptr.To[int32](0),
852 },
853 "indexed job with indexes outside of range": {
854 parallelism: 2,
855 completions: 5,
856 backoffLimit: 6,
857 completionMode: batch.IndexedCompletion,
858 podsWithIndexes: []indexPhase{
859 {"0", v1.PodSucceeded},
860 {"5", v1.PodRunning},
861 {"6", v1.PodSucceeded},
862 {"7", v1.PodPending},
863 {"8", v1.PodFailed},
864 },
865 expectedCreations: 0,
866 expectedSucceeded: 1,
867 expectedDeletions: 2,
868 expectedCompletedIdxs: "0",
869 expectedActive: 0,
870 expectedFailed: 0,
871 expectedPodPatches: 5,
872 expectedReady: ptr.To[int32](0),
873 },
874 "suspending a job with satisfied expectations": {
875
876
877 suspend: true,
878 parallelism: 2,
879 activePods: 2,
880 completions: 4,
881 backoffLimit: 6,
882 expectedCreations: 0,
883 expectedDeletions: 2,
884 expectedActive: 0,
885 expectedCondition: &jobConditionSuspended,
886 expectedConditionStatus: v1.ConditionTrue,
887 expectedConditionReason: "JobSuspended",
888 expectedPodPatches: 2,
889 expectedReady: ptr.To[int32](0),
890 },
891 "suspending a job with unsatisfied expectations": {
892
893
894
895
896 suspend: true,
897 parallelism: 2,
898 activePods: 3,
899 fakeExpectationAtCreation: -1,
900 completions: 4,
901 backoffLimit: 6,
902 expectedCreations: 0,
903 expectedDeletions: 0,
904 expectedActive: 3,
905 expectedReady: ptr.To[int32](0),
906 },
907 "resuming a suspended job": {
908 wasSuspended: true,
909 suspend: false,
910 parallelism: 2,
911 completions: 4,
912 backoffLimit: 6,
913 expectedCreations: 2,
914 expectedDeletions: 0,
915 expectedActive: 2,
916 expectedCondition: &jobConditionSuspended,
917 expectedConditionStatus: v1.ConditionFalse,
918 expectedConditionReason: "JobResumed",
919 expectedReady: ptr.To[int32](0),
920 },
921 "suspending a deleted job": {
922
923
924
925
926 suspend: true,
927 deleting: true,
928 parallelism: 2,
929 activePods: 2,
930 completions: 4,
931 backoffLimit: 6,
932 expectedCreations: 0,
933 expectedDeletions: 0,
934 expectedActive: 2,
935 expectedPodPatches: 2,
936 expectedReady: ptr.To[int32](0),
937 },
938 "indexed job with podIndexLabel feature disabled": {
939 parallelism: 2,
940 completions: 5,
941 backoffLimit: 6,
942 completionMode: batch.IndexedCompletion,
943 expectedCreations: 2,
944 expectedActive: 2,
945 expectedCreatedIndexes: sets.New(0, 1),
946 podIndexLabelDisabled: true,
947 expectedReady: ptr.To[int32](0),
948 },
949 }
950
951 for name, tc := range testCases {
952 t.Run(name, func(t *testing.T) {
953 logger, _ := ktesting.NewTestContext(t)
954 defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.PodIndexLabel, !tc.podIndexLabelDisabled)()
955 defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodReplacementPolicy, tc.jobPodReplacementPolicy)()
956 defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, tc.jobPodFailurePolicy)()
957
958 clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
959
960 var fakeClock clock.WithTicker
961 if tc.controllerTime != nil {
962 fakeClock = clocktesting.NewFakeClock(*tc.controllerTime)
963 } else {
964 fakeClock = clocktesting.NewFakeClock(time.Now())
965 }
966
967 manager, sharedInformerFactory := newControllerFromClientWithClock(ctx, t, clientSet, controller.NoResyncPeriodFunc, fakeClock)
968 fakePodControl := controller.FakePodControl{Err: tc.podControllerError, CreateLimit: tc.podLimit}
969 manager.podControl = &fakePodControl
970 manager.podStoreSynced = alwaysReady
971 manager.jobStoreSynced = alwaysReady
972
973
974 job := newJob(tc.parallelism, tc.completions, tc.backoffLimit, tc.completionMode)
975 job.Spec.Suspend = ptr.To(tc.suspend)
976 if tc.jobPodReplacementPolicy {
977 job.Spec.PodReplacementPolicy = tc.podReplacementPolicy
978 }
979 if tc.jobPodFailurePolicy {
980 job.Spec.PodFailurePolicy = tc.podFailurePolicy
981 }
982 if tc.initialStatus != nil {
983 startTime := metav1.Now()
984 job.Status.StartTime = &startTime
985 job.Status.Active = int32(tc.initialStatus.active)
986 job.Status.Succeeded = int32(tc.initialStatus.succeed)
987 job.Status.Failed = int32(tc.initialStatus.failed)
988 if tc.initialStatus.startTime != nil {
989 startTime := metav1.NewTime(*tc.initialStatus.startTime)
990 job.Status.StartTime = &startTime
991 }
992 }
993
994 key, err := controller.KeyFunc(job)
995 if err != nil {
996 t.Errorf("Unexpected error getting job key: %v", err)
997 }
998
999 if tc.backoffRecord != nil {
1000 tc.backoffRecord.key = key
1001 manager.podBackoffStore.updateBackoffRecord(*tc.backoffRecord)
1002 }
1003 if tc.fakeExpectationAtCreation < 0 {
1004 manager.expectations.ExpectDeletions(logger, key, int(-tc.fakeExpectationAtCreation))
1005 } else if tc.fakeExpectationAtCreation > 0 {
1006 manager.expectations.ExpectCreations(logger, key, int(tc.fakeExpectationAtCreation))
1007 }
1008 if tc.wasSuspended {
1009 job.Status.Conditions = append(job.Status.Conditions, *newCondition(batch.JobSuspended, v1.ConditionTrue, "JobSuspended", "Job suspended", realClock.Now()))
1010 }
1011 if tc.deleting {
1012 now := metav1.Now()
1013 job.DeletionTimestamp = &now
1014 }
1015 sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
1016 podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer()
1017 setPodsStatuses(podIndexer, job, tc.pendingPods, tc.activePods, tc.succeededPods, tc.failedPods, tc.terminatingPods, tc.readyPods)
1018 setPodsStatusesWithIndexes(podIndexer, job, tc.podsWithIndexes)
1019
1020 actual := job
1021 manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) {
1022 actual = job
1023 return job, nil
1024 }
1025
1026
1027 err = manager.syncJob(context.TODO(), testutil.GetKey(job, t))
1028
1029
1030 if tc.podControllerError != nil {
1031 if err == nil {
1032 t.Error("Syncing jobs expected to return error on podControl exception")
1033 }
1034 } else if tc.podLimit != 0 && fakePodControl.CreateCallCount > tc.podLimit {
1035 if err == nil {
1036 t.Error("Syncing jobs expected to return error when reached the podControl limit")
1037 }
1038 } else if err != nil {
1039 t.Errorf("Unexpected error when syncing jobs: %v", err)
1040 }
1041
1042 if int32(len(fakePodControl.Templates)) != tc.expectedCreations {
1043 t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", tc.expectedCreations, len(fakePodControl.Templates))
1044 }
1045 if tc.completionMode == batch.IndexedCompletion {
1046 checkIndexedJobPods(t, &fakePodControl, tc.expectedCreatedIndexes, job.Name, tc.podIndexLabelDisabled)
1047 } else {
1048 for _, p := range fakePodControl.Templates {
1049
1050 if p.GenerateName != "" {
1051 t.Errorf("Got pod generate name %s, want %s", p.GenerateName, "")
1052 }
1053 if p.Spec.Hostname != "" {
1054 t.Errorf("Got pod hostname %q, want none", p.Spec.Hostname)
1055 }
1056 }
1057 }
1058 if int32(len(fakePodControl.DeletePodName)) != tc.expectedDeletions {
1059 t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", tc.expectedDeletions, len(fakePodControl.DeletePodName))
1060 }
1061
1062 if len(fakePodControl.ControllerRefs) != int(tc.expectedCreations) {
1063 t.Errorf("Unexpected number of ControllerRefs. Expected %d, saw %d\n", tc.expectedCreations, len(fakePodControl.ControllerRefs))
1064 }
1065
1066 for _, controllerRef := range fakePodControl.ControllerRefs {
1067 if got, want := controllerRef.APIVersion, "batch/v1"; got != want {
1068 t.Errorf("controllerRef.APIVersion = %q, want %q", got, want)
1069 }
1070 if got, want := controllerRef.Kind, "Job"; got != want {
1071 t.Errorf("controllerRef.Kind = %q, want %q", got, want)
1072 }
1073 if got, want := controllerRef.Name, job.Name; got != want {
1074 t.Errorf("controllerRef.Name = %q, want %q", got, want)
1075 }
1076 if got, want := controllerRef.UID, job.UID; got != want {
1077 t.Errorf("controllerRef.UID = %q, want %q", got, want)
1078 }
1079 if controllerRef.Controller == nil || *controllerRef.Controller != true {
1080 t.Errorf("controllerRef.Controller is not set to true")
1081 }
1082 }
1083
1084 if actual.Status.Active != tc.expectedActive {
1085 t.Errorf("Unexpected number of active pods. Expected %d, saw %d\n", tc.expectedActive, actual.Status.Active)
1086 }
1087 if diff := cmp.Diff(tc.expectedReady, actual.Status.Ready); diff != "" {
1088 t.Errorf("Unexpected number of ready pods (-want,+got): %s", diff)
1089 }
1090 if actual.Status.Succeeded != tc.expectedSucceeded {
1091 t.Errorf("Unexpected number of succeeded pods. Expected %d, saw %d\n", tc.expectedSucceeded, actual.Status.Succeeded)
1092 }
1093 if diff := cmp.Diff(tc.expectedCompletedIdxs, actual.Status.CompletedIndexes); diff != "" {
1094 t.Errorf("Unexpected completed indexes (-want,+got):\n%s", diff)
1095 }
1096 if actual.Status.Failed != tc.expectedFailed {
1097 t.Errorf("Unexpected number of failed pods. Expected %d, saw %d\n", tc.expectedFailed, actual.Status.Failed)
1098 }
1099 if diff := cmp.Diff(tc.expectedTerminating, actual.Status.Terminating); diff != "" {
1100 t.Errorf("Unexpected number of terminating pods (-want,+got): %s", diff)
1101 }
1102 if actual.Status.StartTime != nil && tc.suspend {
1103 t.Error("Unexpected .status.startTime not nil when suspend is true")
1104 }
1105 if actual.Status.StartTime == nil && !tc.suspend {
1106 t.Error("Missing .status.startTime")
1107 }
1108
1109 if tc.expectedCondition != nil {
1110 if !getCondition(actual, *tc.expectedCondition, tc.expectedConditionStatus, tc.expectedConditionReason) {
1111 t.Errorf("Expected completion condition. Got %#v", actual.Status.Conditions)
1112 }
1113 } else {
1114 if cond := hasTrueCondition(actual); cond != nil {
1115 t.Errorf("Got condition %s, want none", *cond)
1116 }
1117 }
1118 if tc.expectedCondition == nil && tc.suspend && len(actual.Status.Conditions) != 0 {
1119 t.Errorf("Unexpected conditions %v", actual.Status.Conditions)
1120 }
1121
1122 expectedLimit := 0
1123 for pass := uint8(0); expectedLimit <= tc.podLimit; pass++ {
1124 expectedLimit += controller.SlowStartInitialBatchSize << pass
1125 }
1126 if tc.podLimit > 0 && fakePodControl.CreateCallCount > expectedLimit {
1127 t.Errorf("Unexpected number of create calls. Expected <= %d, saw %d\n", fakePodControl.CreateLimit*2, fakePodControl.CreateCallCount)
1128 }
1129 if p := len(fakePodControl.Patches); p != tc.expectedPodPatches {
1130 t.Errorf("Got %d pod patches, want %d", p, tc.expectedPodPatches)
1131 }
1132 })
1133 }
1134 }
1135
1136 func checkIndexedJobPods(t *testing.T, control *controller.FakePodControl, wantIndexes sets.Set[int], jobName string, podIndexLabelDisabled bool) {
1137 t.Helper()
1138 gotIndexes := sets.New[int]()
1139 for _, p := range control.Templates {
1140 checkJobCompletionEnvVariable(t, &p.Spec, podIndexLabelDisabled)
1141 if !podIndexLabelDisabled {
1142 checkJobCompletionLabel(t, &p)
1143 }
1144 ix := getCompletionIndex(p.Annotations)
1145 if ix == -1 {
1146 t.Errorf("Created pod %s didn't have completion index", p.Name)
1147 } else {
1148 gotIndexes.Insert(ix)
1149 }
1150 expectedName := fmt.Sprintf("%s-%d", jobName, ix)
1151 if expectedName != p.Spec.Hostname {
1152 t.Errorf("Got pod hostname %s, want %s", p.Spec.Hostname, expectedName)
1153 }
1154 expectedName += "-"
1155 if expectedName != p.GenerateName {
1156 t.Errorf("Got pod generate name %s, want %s", p.GenerateName, expectedName)
1157 }
1158 }
1159 if diff := cmp.Diff(sets.List(wantIndexes), sets.List(gotIndexes)); diff != "" {
1160 t.Errorf("Unexpected created completion indexes (-want,+got):\n%s", diff)
1161 }
1162 }
1163
1164 func TestGetNewFinshedPods(t *testing.T) {
1165 cases := map[string]struct {
1166 job batch.Job
1167 pods []*v1.Pod
1168 expectedRmFinalizers sets.Set[string]
1169 wantSucceeded int32
1170 wantFailed int32
1171 }{
1172 "some counted": {
1173 job: batch.Job{
1174 Status: batch.JobStatus{
1175 Succeeded: 2,
1176 Failed: 1,
1177 UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
1178 },
1179 },
1180 pods: []*v1.Pod{
1181 buildPod().uid("a").phase(v1.PodSucceeded).Pod,
1182 buildPod().uid("b").phase(v1.PodSucceeded).trackingFinalizer().Pod,
1183 buildPod().uid("c").phase(v1.PodSucceeded).trackingFinalizer().Pod,
1184 buildPod().uid("d").phase(v1.PodFailed).Pod,
1185 buildPod().uid("e").phase(v1.PodFailed).trackingFinalizer().Pod,
1186 buildPod().uid("f").phase(v1.PodRunning).Pod,
1187 },
1188 wantSucceeded: 4,
1189 wantFailed: 2,
1190 },
1191 "some uncounted": {
1192 job: batch.Job{
1193 Status: batch.JobStatus{
1194 Succeeded: 1,
1195 Failed: 1,
1196 UncountedTerminatedPods: &batch.UncountedTerminatedPods{
1197 Succeeded: []types.UID{"a", "c"},
1198 Failed: []types.UID{"e", "f"},
1199 },
1200 },
1201 },
1202 pods: []*v1.Pod{
1203 buildPod().uid("a").phase(v1.PodSucceeded).Pod,
1204 buildPod().uid("b").phase(v1.PodSucceeded).Pod,
1205 buildPod().uid("c").phase(v1.PodSucceeded).trackingFinalizer().Pod,
1206 buildPod().uid("d").phase(v1.PodSucceeded).trackingFinalizer().Pod,
1207 buildPod().uid("e").phase(v1.PodFailed).Pod,
1208 buildPod().uid("f").phase(v1.PodFailed).trackingFinalizer().Pod,
1209 buildPod().uid("g").phase(v1.PodFailed).trackingFinalizer().Pod,
1210 },
1211 wantSucceeded: 4,
1212 wantFailed: 4,
1213 },
1214 "with expected removed finalizers": {
1215 job: batch.Job{
1216 Status: batch.JobStatus{
1217 Succeeded: 2,
1218 Failed: 2,
1219 UncountedTerminatedPods: &batch.UncountedTerminatedPods{
1220 Succeeded: []types.UID{"a"},
1221 Failed: []types.UID{"d"},
1222 },
1223 },
1224 },
1225 expectedRmFinalizers: sets.New("b", "f"),
1226 pods: []*v1.Pod{
1227 buildPod().uid("a").phase(v1.PodSucceeded).Pod,
1228 buildPod().uid("b").phase(v1.PodSucceeded).trackingFinalizer().Pod,
1229 buildPod().uid("c").phase(v1.PodSucceeded).trackingFinalizer().Pod,
1230 buildPod().uid("d").phase(v1.PodFailed).Pod,
1231 buildPod().uid("e").phase(v1.PodFailed).trackingFinalizer().Pod,
1232 buildPod().uid("f").phase(v1.PodFailed).trackingFinalizer().Pod,
1233 buildPod().uid("g").phase(v1.PodFailed).trackingFinalizer().Pod,
1234 },
1235 wantSucceeded: 4,
1236 wantFailed: 5,
1237 },
1238 "deleted pods": {
1239 job: batch.Job{
1240 Status: batch.JobStatus{
1241 Succeeded: 1,
1242 Failed: 1,
1243 UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
1244 },
1245 },
1246 pods: []*v1.Pod{
1247 buildPod().uid("a").phase(v1.PodSucceeded).trackingFinalizer().deletionTimestamp().Pod,
1248 buildPod().uid("b").phase(v1.PodFailed).trackingFinalizer().deletionTimestamp().Pod,
1249 buildPod().uid("c").phase(v1.PodRunning).trackingFinalizer().deletionTimestamp().Pod,
1250 buildPod().uid("d").phase(v1.PodPending).trackingFinalizer().deletionTimestamp().Pod,
1251 buildPod().uid("e").phase(v1.PodRunning).deletionTimestamp().Pod,
1252 buildPod().uid("f").phase(v1.PodPending).deletionTimestamp().Pod,
1253 },
1254 wantSucceeded: 2,
1255 wantFailed: 4,
1256 },
1257 }
1258 for name, tc := range cases {
1259 t.Run(name, func(t *testing.T) {
1260 uncounted := newUncountedTerminatedPods(*tc.job.Status.UncountedTerminatedPods)
1261 jobCtx := &syncJobCtx{job: &tc.job, pods: tc.pods, uncounted: uncounted, expectedRmFinalizers: tc.expectedRmFinalizers}
1262 succeededPods, failedPods := getNewFinishedPods(jobCtx)
1263 succeeded := int32(len(succeededPods)) + tc.job.Status.Succeeded + int32(len(uncounted.succeeded))
1264 failed := int32(len(failedPods)) + tc.job.Status.Failed + int32(len(uncounted.failed))
1265 if succeeded != tc.wantSucceeded {
1266 t.Errorf("getStatus reports %d succeeded pods, want %d", succeeded, tc.wantSucceeded)
1267 }
1268 if failed != tc.wantFailed {
1269 t.Errorf("getStatus reports %d succeeded pods, want %d", failed, tc.wantFailed)
1270 }
1271 })
1272 }
1273 }
1274
1275 func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
1276 logger, ctx := ktesting.NewTestContext(t)
1277 fakeClock := clocktesting.NewFakeClock(time.Now())
1278 now := fakeClock.Now()
1279 minuteAgo := now.Add(-time.Minute)
1280 completedCond := newCondition(batch.JobComplete, v1.ConditionTrue, "", "", now)
1281 succeededCond := newCondition(batch.JobSuccessCriteriaMet, v1.ConditionTrue, "", "", minuteAgo)
1282 failedCond := newCondition(batch.JobFailed, v1.ConditionTrue, "", "", now)
1283 indexedCompletion := batch.IndexedCompletion
1284 mockErr := errors.New("mock error")
1285 cases := map[string]struct {
1286 job batch.Job
1287 pods []*v1.Pod
1288 finishedCond *batch.JobCondition
1289 expectedRmFinalizers sets.Set[string]
1290 needsFlush bool
1291 statusUpdateErr error
1292 podControlErr error
1293 wantErr error
1294 wantRmFinalizers int
1295 wantStatusUpdates []batch.JobStatus
1296 wantSucceededPodsMetric int
1297 wantFailedPodsMetric int
1298
1299
1300 enableJobBackoffLimitPerIndex bool
1301 enableJobSuccessPolicy bool
1302 }{
1303 "no updates": {},
1304 "new active": {
1305 job: batch.Job{
1306 Status: batch.JobStatus{
1307 Active: 1,
1308 },
1309 },
1310 needsFlush: true,
1311 wantStatusUpdates: []batch.JobStatus{
1312 {
1313 UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
1314 Active: 1,
1315 },
1316 },
1317 },
1318 "track finished pods": {
1319 pods: []*v1.Pod{
1320 buildPod().uid("a").phase(v1.PodSucceeded).trackingFinalizer().Pod,
1321 buildPod().uid("b").phase(v1.PodFailed).trackingFinalizer().Pod,
1322 buildPod().uid("c").phase(v1.PodSucceeded).trackingFinalizer().deletionTimestamp().Pod,
1323 buildPod().uid("d").phase(v1.PodFailed).trackingFinalizer().deletionTimestamp().Pod,
1324 buildPod().uid("e").phase(v1.PodPending).trackingFinalizer().deletionTimestamp().Pod,
1325 buildPod().phase(v1.PodPending).trackingFinalizer().Pod,
1326 buildPod().phase(v1.PodRunning).trackingFinalizer().Pod,
1327 },
1328 wantRmFinalizers: 5,
1329 wantStatusUpdates: []batch.JobStatus{
1330 {
1331 UncountedTerminatedPods: &batch.UncountedTerminatedPods{
1332 Succeeded: []types.UID{"a", "c"},
1333 Failed: []types.UID{"b", "d", "e"},
1334 },
1335 },
1336 {
1337 UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
1338 Succeeded: 2,
1339 Failed: 3,
1340 },
1341 },
1342 wantSucceededPodsMetric: 2,
1343 wantFailedPodsMetric: 3,
1344 },
1345 "past and new finished pods": {
1346 job: batch.Job{
1347 Status: batch.JobStatus{
1348 Active: 1,
1349 Succeeded: 2,
1350 Failed: 3,
1351 UncountedTerminatedPods: &batch.UncountedTerminatedPods{
1352 Succeeded: []types.UID{"a", "e"},
1353 Failed: []types.UID{"b", "f"},
1354 },
1355 },
1356 },
1357 pods: []*v1.Pod{
1358 buildPod().uid("e").phase(v1.PodSucceeded).Pod,
1359 buildPod().phase(v1.PodFailed).Pod,
1360 buildPod().phase(v1.PodPending).Pod,
1361 buildPod().uid("a").phase(v1.PodSucceeded).trackingFinalizer().Pod,
1362 buildPod().uid("b").phase(v1.PodFailed).trackingFinalizer().Pod,
1363 buildPod().uid("c").phase(v1.PodSucceeded).trackingFinalizer().Pod,
1364 buildPod().uid("d").phase(v1.PodFailed).trackingFinalizer().Pod,
1365 },
1366 wantRmFinalizers: 4,
1367 wantStatusUpdates: []batch.JobStatus{
1368 {
1369 UncountedTerminatedPods: &batch.UncountedTerminatedPods{
1370 Succeeded: []types.UID{"a", "c"},
1371 Failed: []types.UID{"b", "d"},
1372 },
1373 Active: 1,
1374 Succeeded: 3,
1375 Failed: 4,
1376 },
1377 {
1378 UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
1379 Active: 1,
1380 Succeeded: 5,
1381 Failed: 6,
1382 },
1383 },
1384 wantSucceededPodsMetric: 3,
1385 wantFailedPodsMetric: 3,
1386 },
1387 "expecting removed finalizers": {
1388 job: batch.Job{
1389 Status: batch.JobStatus{
1390 Succeeded: 2,
1391 Failed: 3,
1392 UncountedTerminatedPods: &batch.UncountedTerminatedPods{
1393 Succeeded: []types.UID{"a", "g"},
1394 Failed: []types.UID{"b", "h"},
1395 },
1396 },
1397 },
1398 expectedRmFinalizers: sets.New("c", "d", "g", "h"),
1399 pods: []*v1.Pod{
1400 buildPod().uid("a").phase(v1.PodSucceeded).trackingFinalizer().Pod,
1401 buildPod().uid("b").phase(v1.PodFailed).trackingFinalizer().Pod,
1402 buildPod().uid("c").phase(v1.PodSucceeded).trackingFinalizer().Pod,
1403 buildPod().uid("d").phase(v1.PodFailed).trackingFinalizer().Pod,
1404 buildPod().uid("e").phase(v1.PodSucceeded).trackingFinalizer().Pod,
1405 buildPod().uid("f").phase(v1.PodFailed).trackingFinalizer().Pod,
1406 buildPod().uid("g").phase(v1.PodSucceeded).trackingFinalizer().Pod,
1407 buildPod().uid("h").phase(v1.PodFailed).trackingFinalizer().Pod,
1408 },
1409 wantRmFinalizers: 4,
1410 wantStatusUpdates: []batch.JobStatus{
1411 {
1412 UncountedTerminatedPods: &batch.UncountedTerminatedPods{
1413 Succeeded: []types.UID{"a", "e"},
1414 Failed: []types.UID{"b", "f"},
1415 },
1416 Succeeded: 3,
1417 Failed: 4,
1418 },
1419 {
1420 UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
1421 Succeeded: 5,
1422 Failed: 6,
1423 },
1424 },
1425 wantSucceededPodsMetric: 3,
1426 wantFailedPodsMetric: 3,
1427 },
1428 "succeeding job by JobSuccessPolicy": {
1429 pods: []*v1.Pod{
1430 buildPod().uid("a").phase(v1.PodSucceeded).trackingFinalizer().Pod,
1431 buildPod().uid("b").phase(v1.PodFailed).trackingFinalizer().Pod,
1432 buildPod().uid("c").phase(v1.PodPending).trackingFinalizer().Pod,
1433 },
1434 finishedCond: succeededCond,
1435 wantRmFinalizers: 3,
1436 wantStatusUpdates: []batch.JobStatus{
1437 {
1438 UncountedTerminatedPods: &batch.UncountedTerminatedPods{
1439 Succeeded: []types.UID{"a"},
1440 Failed: []types.UID{"b"},
1441 },
1442 Conditions: []batch.JobCondition{*succeededCond},
1443 },
1444 {
1445 UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
1446 Succeeded: 1,
1447 Failed: 1,
1448 Conditions: []batch.JobCondition{*succeededCond, *completedCond},
1449 CompletionTime: ptr.To(metav1.NewTime(now)),
1450 },
1451 },
1452 wantSucceededPodsMetric: 1,
1453 wantFailedPodsMetric: 1,
1454 enableJobSuccessPolicy: true,
1455 },
1456 "completing job": {
1457 pods: []*v1.Pod{
1458 buildPod().uid("a").phase(v1.PodSucceeded).trackingFinalizer().Pod,
1459 buildPod().uid("b").phase(v1.PodFailed).trackingFinalizer().Pod,
1460 },
1461 finishedCond: completedCond,
1462 wantRmFinalizers: 2,
1463 wantStatusUpdates: []batch.JobStatus{
1464 {
1465 UncountedTerminatedPods: &batch.UncountedTerminatedPods{
1466 Succeeded: []types.UID{"a"},
1467 Failed: []types.UID{"b"},
1468 },
1469 },
1470 {
1471 UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
1472 Succeeded: 1,
1473 Failed: 1,
1474 Conditions: []batch.JobCondition{*completedCond},
1475 CompletionTime: &completedCond.LastTransitionTime,
1476 },
1477 },
1478 wantSucceededPodsMetric: 1,
1479 wantFailedPodsMetric: 1,
1480 },
1481 "failing job": {
1482 pods: []*v1.Pod{
1483 buildPod().uid("a").phase(v1.PodSucceeded).trackingFinalizer().Pod,
1484 buildPod().uid("b").phase(v1.PodFailed).trackingFinalizer().Pod,
1485 buildPod().uid("c").phase(v1.PodRunning).trackingFinalizer().Pod,
1486 },
1487 finishedCond: failedCond,
1488
1489 wantRmFinalizers: 3,
1490 wantStatusUpdates: []batch.JobStatus{
1491 {
1492 UncountedTerminatedPods: &batch.UncountedTerminatedPods{
1493 Succeeded: []types.UID{"a"},
1494 Failed: []types.UID{"b", "c"},
1495 },
1496 },
1497 {
1498 UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
1499 Succeeded: 1,
1500 Failed: 2,
1501 Conditions: []batch.JobCondition{*failedCond},
1502 },
1503 },
1504 wantSucceededPodsMetric: 1,
1505 wantFailedPodsMetric: 2,
1506 },
1507 "deleted job": {
1508 job: batch.Job{
1509 ObjectMeta: metav1.ObjectMeta{
1510 DeletionTimestamp: &metav1.Time{},
1511 },
1512 Status: batch.JobStatus{
1513 Active: 1,
1514 },
1515 },
1516 pods: []*v1.Pod{
1517 buildPod().uid("a").phase(v1.PodSucceeded).trackingFinalizer().Pod,
1518 buildPod().uid("b").phase(v1.PodFailed).trackingFinalizer().Pod,
1519 buildPod().phase(v1.PodRunning).trackingFinalizer().Pod,
1520 },
1521
1522 wantRmFinalizers: 3,
1523 wantStatusUpdates: []batch.JobStatus{
1524 {
1525 UncountedTerminatedPods: &batch.UncountedTerminatedPods{
1526 Succeeded: []types.UID{"a"},
1527 Failed: []types.UID{"b"},
1528 },
1529 Active: 1,
1530 },
1531 {
1532 UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
1533 Active: 1,
1534 Succeeded: 1,
1535 Failed: 1,
1536 },
1537 },
1538 wantSucceededPodsMetric: 1,
1539 wantFailedPodsMetric: 1,
1540 },
1541 "status update error": {
1542 pods: []*v1.Pod{
1543 buildPod().uid("a").phase(v1.PodSucceeded).trackingFinalizer().Pod,
1544 buildPod().uid("b").phase(v1.PodFailed).trackingFinalizer().Pod,
1545 },
1546 statusUpdateErr: mockErr,
1547 wantErr: mockErr,
1548 wantStatusUpdates: []batch.JobStatus{
1549 {
1550 UncountedTerminatedPods: &batch.UncountedTerminatedPods{
1551 Succeeded: []types.UID{"a"},
1552 Failed: []types.UID{"b"},
1553 },
1554 },
1555 },
1556 },
1557 "pod patch errors": {
1558 pods: []*v1.Pod{
1559 buildPod().uid("a").phase(v1.PodSucceeded).trackingFinalizer().Pod,
1560 buildPod().uid("b").phase(v1.PodFailed).trackingFinalizer().Pod,
1561 },
1562 podControlErr: mockErr,
1563 wantErr: mockErr,
1564 wantRmFinalizers: 2,
1565 wantStatusUpdates: []batch.JobStatus{
1566 {
1567 UncountedTerminatedPods: &batch.UncountedTerminatedPods{
1568 Succeeded: []types.UID{"a"},
1569 Failed: []types.UID{"b"},
1570 },
1571 },
1572 },
1573 },
1574 "pod patch errors with partial success": {
1575 job: batch.Job{
1576 Status: batch.JobStatus{
1577 UncountedTerminatedPods: &batch.UncountedTerminatedPods{
1578 Succeeded: []types.UID{"a"},
1579 Failed: []types.UID{"b"},
1580 },
1581 },
1582 },
1583 pods: []*v1.Pod{
1584 buildPod().uid("a").phase(v1.PodSucceeded).Pod,
1585 buildPod().uid("c").phase(v1.PodSucceeded).trackingFinalizer().Pod,
1586 buildPod().uid("d").phase(v1.PodFailed).trackingFinalizer().Pod,
1587 },
1588 podControlErr: mockErr,
1589 wantErr: mockErr,
1590 wantRmFinalizers: 2,
1591 wantStatusUpdates: []batch.JobStatus{
1592 {
1593 UncountedTerminatedPods: &batch.UncountedTerminatedPods{
1594 Succeeded: []types.UID{"c"},
1595 Failed: []types.UID{"d"},
1596 },
1597 Succeeded: 1,
1598 Failed: 1,
1599 },
1600 },
1601 },
1602 "indexed job new successful pods": {
1603 job: batch.Job{
1604 Spec: batch.JobSpec{
1605 CompletionMode: &indexedCompletion,
1606 Completions: ptr.To[int32](6),
1607 },
1608 Status: batch.JobStatus{
1609 Active: 1,
1610 },
1611 },
1612 pods: []*v1.Pod{
1613 buildPod().phase(v1.PodSucceeded).trackingFinalizer().index("1").Pod,
1614 buildPod().phase(v1.PodSucceeded).trackingFinalizer().index("3").Pod,
1615 buildPod().phase(v1.PodSucceeded).trackingFinalizer().index("3").Pod,
1616 buildPod().phase(v1.PodRunning).trackingFinalizer().index("5").Pod,
1617 buildPod().phase(v1.PodSucceeded).trackingFinalizer().Pod,
1618 },
1619 wantRmFinalizers: 4,
1620 wantStatusUpdates: []batch.JobStatus{
1621 {
1622 Active: 1,
1623 Succeeded: 2,
1624 CompletedIndexes: "1,3",
1625 UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
1626 },
1627 },
1628 wantSucceededPodsMetric: 2,
1629 },
1630 "indexed job prev successful pods outside current completions index range with no new succeeded pods": {
1631 job: batch.Job{
1632 Spec: batch.JobSpec{
1633 CompletionMode: &indexedCompletion,
1634 Completions: ptr.To[int32](2),
1635 Parallelism: ptr.To[int32](2),
1636 },
1637 Status: batch.JobStatus{
1638 Active: 2,
1639 Succeeded: 1,
1640 CompletedIndexes: "3",
1641 },
1642 },
1643 pods: []*v1.Pod{
1644 buildPod().phase(v1.PodRunning).trackingFinalizer().index("0").Pod,
1645 buildPod().phase(v1.PodRunning).trackingFinalizer().index("1").Pod,
1646 },
1647 wantRmFinalizers: 0,
1648 wantStatusUpdates: []batch.JobStatus{
1649 {
1650 Active: 2,
1651 Succeeded: 0,
1652 CompletedIndexes: "",
1653 UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
1654 },
1655 },
1656 },
1657 "indexed job prev successful pods outside current completions index range with new succeeded pods in range": {
1658 job: batch.Job{
1659 Spec: batch.JobSpec{
1660 CompletionMode: &indexedCompletion,
1661 Completions: ptr.To[int32](2),
1662 Parallelism: ptr.To[int32](2),
1663 },
1664 Status: batch.JobStatus{
1665 Active: 2,
1666 Succeeded: 1,
1667 CompletedIndexes: "3",
1668 },
1669 },
1670 pods: []*v1.Pod{
1671 buildPod().phase(v1.PodRunning).trackingFinalizer().index("0").Pod,
1672 buildPod().phase(v1.PodSucceeded).trackingFinalizer().index("1").Pod,
1673 },
1674 wantRmFinalizers: 1,
1675 wantStatusUpdates: []batch.JobStatus{
1676 {
1677 Active: 2,
1678 Succeeded: 1,
1679 CompletedIndexes: "1",
1680 UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
1681 },
1682 },
1683 wantSucceededPodsMetric: 1,
1684 },
1685 "indexed job new failed pods": {
1686 job: batch.Job{
1687 Spec: batch.JobSpec{
1688 CompletionMode: &indexedCompletion,
1689 Completions: ptr.To[int32](6),
1690 },
1691 Status: batch.JobStatus{
1692 Active: 1,
1693 },
1694 },
1695 pods: []*v1.Pod{
1696 buildPod().uid("a").phase(v1.PodFailed).trackingFinalizer().index("1").Pod,
1697 buildPod().uid("b").phase(v1.PodFailed).trackingFinalizer().index("3").Pod,
1698 buildPod().uid("c").phase(v1.PodFailed).trackingFinalizer().index("3").Pod,
1699 buildPod().uid("d").phase(v1.PodRunning).trackingFinalizer().index("5").Pod,
1700 buildPod().phase(v1.PodFailed).trackingFinalizer().Pod,
1701 },
1702 wantRmFinalizers: 4,
1703 wantStatusUpdates: []batch.JobStatus{
1704 {
1705 Active: 1,
1706 UncountedTerminatedPods: &batch.UncountedTerminatedPods{
1707 Failed: []types.UID{"a", "b", "c"},
1708 },
1709 },
1710 {
1711 Active: 1,
1712 Failed: 3,
1713 UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
1714 },
1715 },
1716 wantFailedPodsMetric: 3,
1717 },
1718 "indexed job past and new pods": {
1719 job: batch.Job{
1720 Spec: batch.JobSpec{
1721 CompletionMode: &indexedCompletion,
1722 Completions: ptr.To[int32](7),
1723 },
1724 Status: batch.JobStatus{
1725 Failed: 2,
1726 Succeeded: 5,
1727 CompletedIndexes: "0-2,4,6,7",
1728 },
1729 },
1730 pods: []*v1.Pod{
1731 buildPod().phase(v1.PodSucceeded).index("0").Pod,
1732 buildPod().phase(v1.PodFailed).index("1").Pod,
1733 buildPod().phase(v1.PodSucceeded).trackingFinalizer().index("1").Pod,
1734 buildPod().phase(v1.PodSucceeded).trackingFinalizer().index("3").Pod,
1735 buildPod().uid("a").phase(v1.PodFailed).trackingFinalizer().index("2").Pod,
1736 buildPod().uid("b").phase(v1.PodFailed).trackingFinalizer().index("5").Pod,
1737 },
1738 wantRmFinalizers: 4,
1739 wantStatusUpdates: []batch.JobStatus{
1740 {
1741 Succeeded: 6,
1742 Failed: 2,
1743 CompletedIndexes: "0-4,6",
1744 UncountedTerminatedPods: &batch.UncountedTerminatedPods{
1745 Failed: []types.UID{"a", "b"},
1746 },
1747 },
1748 {
1749 Succeeded: 6,
1750 Failed: 4,
1751 CompletedIndexes: "0-4,6",
1752 UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
1753 },
1754 },
1755 wantSucceededPodsMetric: 1,
1756 wantFailedPodsMetric: 2,
1757 },
1758 "too many finished": {
1759 job: batch.Job{
1760 Status: batch.JobStatus{
1761 UncountedTerminatedPods: &batch.UncountedTerminatedPods{
1762 Failed: []types.UID{"a", "b"},
1763 },
1764 },
1765 },
1766 pods: func() []*v1.Pod {
1767 pods := make([]*v1.Pod, 500)
1768 for i := range pods {
1769 pods[i] = buildPod().uid(strconv.Itoa(i)).phase(v1.PodSucceeded).trackingFinalizer().Pod
1770 }
1771 pods = append(pods, buildPod().uid("b").phase(v1.PodFailed).trackingFinalizer().Pod)
1772 return pods
1773 }(),
1774 wantRmFinalizers: 499,
1775 wantStatusUpdates: []batch.JobStatus{
1776 {
1777 UncountedTerminatedPods: &batch.UncountedTerminatedPods{
1778 Succeeded: func() []types.UID {
1779 uids := make([]types.UID, 499)
1780 for i := range uids {
1781 uids[i] = types.UID(strconv.Itoa(i))
1782 }
1783 return uids
1784 }(),
1785 Failed: []types.UID{"b"},
1786 },
1787 Failed: 1,
1788 },
1789 {
1790 UncountedTerminatedPods: &batch.UncountedTerminatedPods{
1791 Failed: []types.UID{"b"},
1792 },
1793 Succeeded: 499,
1794 Failed: 1,
1795 },
1796 },
1797 wantSucceededPodsMetric: 499,
1798 wantFailedPodsMetric: 1,
1799 },
1800 "too many indexed finished": {
1801 job: batch.Job{
1802 Spec: batch.JobSpec{
1803 CompletionMode: &indexedCompletion,
1804 Completions: ptr.To[int32](501),
1805 },
1806 },
1807 pods: func() []*v1.Pod {
1808 pods := make([]*v1.Pod, 501)
1809 for i := range pods {
1810 pods[i] = buildPod().uid(strconv.Itoa(i)).index(strconv.Itoa(i)).phase(v1.PodSucceeded).trackingFinalizer().Pod
1811 }
1812 return pods
1813 }(),
1814 wantRmFinalizers: 500,
1815 wantStatusUpdates: []batch.JobStatus{
1816 {
1817 UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
1818 CompletedIndexes: "0-499",
1819 Succeeded: 500,
1820 },
1821 },
1822 wantSucceededPodsMetric: 500,
1823 },
1824 "pod flips from failed to succeeded": {
1825 job: batch.Job{
1826 Spec: batch.JobSpec{
1827 Completions: ptr.To[int32](2),
1828 Parallelism: ptr.To[int32](2),
1829 },
1830 Status: batch.JobStatus{
1831 UncountedTerminatedPods: &batch.UncountedTerminatedPods{
1832 Failed: []types.UID{"a", "b"},
1833 },
1834 },
1835 },
1836 pods: []*v1.Pod{
1837 buildPod().uid("a").phase(v1.PodFailed).trackingFinalizer().Pod,
1838 buildPod().uid("b").phase(v1.PodSucceeded).trackingFinalizer().Pod,
1839 },
1840 finishedCond: failedCond,
1841 wantRmFinalizers: 2,
1842 wantStatusUpdates: []batch.JobStatus{
1843 {
1844 UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
1845 Failed: 2,
1846 Conditions: []batch.JobCondition{*failedCond},
1847 },
1848 },
1849 wantFailedPodsMetric: 2,
1850 },
1851 "indexed job with a failed pod with delayed finalizer removal; the pod is not counted": {
1852 enableJobBackoffLimitPerIndex: true,
1853 job: batch.Job{
1854 Spec: batch.JobSpec{
1855 CompletionMode: &indexedCompletion,
1856 Completions: ptr.To[int32](6),
1857 BackoffLimitPerIndex: ptr.To[int32](1),
1858 },
1859 },
1860 pods: []*v1.Pod{
1861 buildPod().uid("a").phase(v1.PodFailed).indexFailureCount("0").trackingFinalizer().index("1").Pod,
1862 },
1863 wantStatusUpdates: []batch.JobStatus{
1864 {
1865 UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
1866 FailedIndexes: ptr.To(""),
1867 },
1868 },
1869 },
1870 "indexed job with a failed pod which is recreated by a running pod; the pod is counted": {
1871 enableJobBackoffLimitPerIndex: true,
1872 job: batch.Job{
1873 Spec: batch.JobSpec{
1874 CompletionMode: &indexedCompletion,
1875 Completions: ptr.To[int32](6),
1876 BackoffLimitPerIndex: ptr.To[int32](1),
1877 },
1878 Status: batch.JobStatus{
1879 Active: 1,
1880 },
1881 },
1882 pods: []*v1.Pod{
1883 buildPod().uid("a1").phase(v1.PodFailed).indexFailureCount("0").trackingFinalizer().index("1").Pod,
1884 buildPod().uid("a2").phase(v1.PodRunning).indexFailureCount("1").trackingFinalizer().index("1").Pod,
1885 },
1886 wantRmFinalizers: 1,
1887 wantStatusUpdates: []batch.JobStatus{
1888 {
1889 Active: 1,
1890 UncountedTerminatedPods: &batch.UncountedTerminatedPods{
1891 Failed: []types.UID{"a1"},
1892 },
1893 FailedIndexes: ptr.To(""),
1894 },
1895 {
1896 Active: 1,
1897 Failed: 1,
1898 UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
1899 FailedIndexes: ptr.To(""),
1900 },
1901 },
1902 wantFailedPodsMetric: 1,
1903 },
1904 "indexed job with a failed pod for a failed index; the pod is counted": {
1905 enableJobBackoffLimitPerIndex: true,
1906 job: batch.Job{
1907 Spec: batch.JobSpec{
1908 CompletionMode: &indexedCompletion,
1909 Completions: ptr.To[int32](6),
1910 BackoffLimitPerIndex: ptr.To[int32](1),
1911 },
1912 },
1913 pods: []*v1.Pod{
1914 buildPod().uid("a").phase(v1.PodFailed).indexFailureCount("1").trackingFinalizer().index("1").Pod,
1915 },
1916 wantRmFinalizers: 1,
1917 wantStatusUpdates: []batch.JobStatus{
1918 {
1919 FailedIndexes: ptr.To("1"),
1920 UncountedTerminatedPods: &batch.UncountedTerminatedPods{
1921 Failed: []types.UID{"a"},
1922 },
1923 },
1924 {
1925 Failed: 1,
1926 FailedIndexes: ptr.To("1"),
1927 UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
1928 },
1929 },
1930 wantFailedPodsMetric: 1,
1931 },
1932 }
1933 for name, tc := range cases {
1934 t.Run(name, func(t *testing.T) {
1935 defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, tc.enableJobBackoffLimitPerIndex)()
1936 defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobSuccessPolicy, tc.enableJobSuccessPolicy)()
1937
1938 clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
1939 manager, _ := newControllerFromClientWithClock(ctx, t, clientSet, controller.NoResyncPeriodFunc, fakeClock)
1940 fakePodControl := controller.FakePodControl{Err: tc.podControlErr}
1941 metrics.JobPodsFinished.Reset()
1942 manager.podControl = &fakePodControl
1943 var statusUpdates []batch.JobStatus
1944 manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) {
1945 statusUpdates = append(statusUpdates, *job.Status.DeepCopy())
1946 return job, tc.statusUpdateErr
1947 }
1948 job := tc.job.DeepCopy()
1949 if job.Status.UncountedTerminatedPods == nil {
1950 job.Status.UncountedTerminatedPods = &batch.UncountedTerminatedPods{}
1951 }
1952 jobCtx := &syncJobCtx{
1953 job: job,
1954 pods: tc.pods,
1955 uncounted: newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods),
1956 expectedRmFinalizers: tc.expectedRmFinalizers,
1957 finishedCondition: tc.finishedCond,
1958 }
1959 if isIndexedJob(job) {
1960 jobCtx.succeededIndexes = parseIndexesFromString(logger, job.Status.CompletedIndexes, int(*job.Spec.Completions))
1961 if tc.enableJobBackoffLimitPerIndex && job.Spec.BackoffLimitPerIndex != nil {
1962 jobCtx.failedIndexes = calculateFailedIndexes(logger, job, tc.pods)
1963 jobCtx.activePods = controller.FilterActivePods(logger, tc.pods)
1964 jobCtx.podsWithDelayedDeletionPerIndex = getPodsWithDelayedDeletionPerIndex(logger, jobCtx)
1965 }
1966 }
1967
1968 err := manager.trackJobStatusAndRemoveFinalizers(ctx, jobCtx, tc.needsFlush)
1969 if !errors.Is(err, tc.wantErr) {
1970 t.Errorf("Got error %v, want %v", err, tc.wantErr)
1971 }
1972 if diff := cmp.Diff(tc.wantStatusUpdates, statusUpdates,
1973 cmpopts.IgnoreFields(batch.JobCondition{}, "LastProbeTime", "LastTransitionTime")); diff != "" {
1974 t.Errorf("Unexpected status updates (-want,+got):\n%s", diff)
1975 }
1976 rmFinalizers := len(fakePodControl.Patches)
1977 if rmFinalizers != tc.wantRmFinalizers {
1978 t.Errorf("Removed %d finalizers, want %d", rmFinalizers, tc.wantRmFinalizers)
1979 }
1980 if tc.wantErr == nil {
1981 completionMode := completionModeStr(job)
1982 v, err := metricstestutil.GetCounterMetricValue(metrics.JobPodsFinished.WithLabelValues(completionMode, metrics.Succeeded))
1983 if err != nil {
1984 t.Fatalf("Obtaining succeeded job_pods_finished_total: %v", err)
1985 }
1986 if float64(tc.wantSucceededPodsMetric) != v {
1987 t.Errorf("Metric reports %.0f succeeded pods, want %d", v, tc.wantSucceededPodsMetric)
1988 }
1989 v, err = metricstestutil.GetCounterMetricValue(metrics.JobPodsFinished.WithLabelValues(completionMode, metrics.Failed))
1990 if err != nil {
1991 t.Fatalf("Obtaining failed job_pods_finished_total: %v", err)
1992 }
1993 if float64(tc.wantFailedPodsMetric) != v {
1994 t.Errorf("Metric reports %.0f failed pods, want %d", v, tc.wantFailedPodsMetric)
1995 }
1996 }
1997 })
1998 }
1999 }
2000
2001
2002 func TestSyncJobPastDeadline(t *testing.T) {
2003 _, ctx := ktesting.NewTestContext(t)
2004 testCases := map[string]struct {
2005
2006 parallelism int32
2007 completions int32
2008 activeDeadlineSeconds int64
2009 startTime int64
2010 backoffLimit int32
2011 suspend bool
2012
2013
2014 activePods int
2015 succeededPods int
2016 failedPods int
2017
2018
2019 expectedDeletions int32
2020 expectedActive int32
2021 expectedSucceeded int32
2022 expectedFailed int32
2023 expectedCondition batch.JobConditionType
2024 expectedConditionReason string
2025 }{
2026 "activeDeadlineSeconds less than single pod execution": {
2027 parallelism: 1,
2028 completions: 1,
2029 activeDeadlineSeconds: 10,
2030 startTime: 15,
2031 backoffLimit: 6,
2032 activePods: 1,
2033 expectedDeletions: 1,
2034 expectedFailed: 1,
2035 expectedCondition: batch.JobFailed,
2036 expectedConditionReason: batch.JobReasonDeadlineExceeded,
2037 },
2038 "activeDeadlineSeconds bigger than single pod execution": {
2039 parallelism: 1,
2040 completions: 2,
2041 activeDeadlineSeconds: 10,
2042 startTime: 15,
2043 backoffLimit: 6,
2044 activePods: 1,
2045 succeededPods: 1,
2046 expectedDeletions: 1,
2047 expectedSucceeded: 1,
2048 expectedFailed: 1,
2049 expectedCondition: batch.JobFailed,
2050 expectedConditionReason: batch.JobReasonDeadlineExceeded,
2051 },
2052 "activeDeadlineSeconds times-out before any pod starts": {
2053 parallelism: 1,
2054 completions: 1,
2055 activeDeadlineSeconds: 10,
2056 startTime: 10,
2057 backoffLimit: 6,
2058 expectedCondition: batch.JobFailed,
2059 expectedConditionReason: batch.JobReasonDeadlineExceeded,
2060 },
2061 "activeDeadlineSeconds with backofflimit reach": {
2062 parallelism: 1,
2063 completions: 1,
2064 activeDeadlineSeconds: 1,
2065 startTime: 10,
2066 failedPods: 1,
2067 expectedFailed: 1,
2068 expectedCondition: batch.JobFailed,
2069 expectedConditionReason: batch.JobReasonBackoffLimitExceeded,
2070 },
2071 "activeDeadlineSeconds is not triggered when Job is suspended": {
2072 suspend: true,
2073 parallelism: 1,
2074 completions: 2,
2075 activeDeadlineSeconds: 10,
2076 startTime: 15,
2077 backoffLimit: 6,
2078 expectedCondition: batch.JobSuspended,
2079 expectedConditionReason: "JobSuspended",
2080 },
2081 }
2082
2083 for name, tc := range testCases {
2084 t.Run(name, func(t *testing.T) {
2085
2086 clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
2087 manager, sharedInformerFactory := newControllerFromClient(ctx, t, clientSet, controller.NoResyncPeriodFunc)
2088 fakePodControl := controller.FakePodControl{}
2089 manager.podControl = &fakePodControl
2090 manager.podStoreSynced = alwaysReady
2091 manager.jobStoreSynced = alwaysReady
2092 var actual *batch.Job
2093 manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) {
2094 actual = job
2095 return job, nil
2096 }
2097
2098
2099 job := newJob(tc.parallelism, tc.completions, tc.backoffLimit, batch.NonIndexedCompletion)
2100 job.Spec.ActiveDeadlineSeconds = &tc.activeDeadlineSeconds
2101 job.Spec.Suspend = ptr.To(tc.suspend)
2102 start := metav1.Unix(metav1.Now().Time.Unix()-tc.startTime, 0)
2103 job.Status.StartTime = &start
2104 sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
2105 podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer()
2106 setPodsStatuses(podIndexer, job, 0, tc.activePods, tc.succeededPods, tc.failedPods, 0, 0)
2107
2108
2109 err := manager.syncJob(context.TODO(), testutil.GetKey(job, t))
2110 if err != nil {
2111 t.Errorf("Unexpected error when syncing jobs %v", err)
2112 }
2113
2114 if int32(len(fakePodControl.Templates)) != 0 {
2115 t.Errorf("Unexpected number of creates. Expected 0, saw %d\n", len(fakePodControl.Templates))
2116 }
2117 if int32(len(fakePodControl.DeletePodName)) != tc.expectedDeletions {
2118 t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", tc.expectedDeletions, len(fakePodControl.DeletePodName))
2119 }
2120
2121 if actual.Status.Active != tc.expectedActive {
2122 t.Errorf("Unexpected number of active pods. Expected %d, saw %d\n", tc.expectedActive, actual.Status.Active)
2123 }
2124 if actual.Status.Succeeded != tc.expectedSucceeded {
2125 t.Errorf("Unexpected number of succeeded pods. Expected %d, saw %d\n", tc.expectedSucceeded, actual.Status.Succeeded)
2126 }
2127 if actual.Status.Failed != tc.expectedFailed {
2128 t.Errorf("Unexpected number of failed pods. Expected %d, saw %d\n", tc.expectedFailed, actual.Status.Failed)
2129 }
2130 if actual.Status.StartTime == nil {
2131 t.Error("Missing .status.startTime")
2132 }
2133
2134 if !getCondition(actual, tc.expectedCondition, v1.ConditionTrue, tc.expectedConditionReason) {
2135 t.Errorf("Expected fail condition. Got %#v", actual.Status.Conditions)
2136 }
2137 })
2138 }
2139 }
2140
2141 func getCondition(job *batch.Job, condition batch.JobConditionType, status v1.ConditionStatus, reason string) bool {
2142 for _, v := range job.Status.Conditions {
2143 if v.Type == condition && v.Status == status && v.Reason == reason {
2144 return true
2145 }
2146 }
2147 return false
2148 }
2149
2150 func hasTrueCondition(job *batch.Job) *batch.JobConditionType {
2151 for _, v := range job.Status.Conditions {
2152 if v.Status == v1.ConditionTrue {
2153 return &v.Type
2154 }
2155 }
2156 return nil
2157 }
2158
2159
2160
2161 func TestPastDeadlineJobFinished(t *testing.T) {
2162 _, ctx := ktesting.NewTestContext(t)
2163 clientset := fake.NewSimpleClientset()
2164 fakeClock := clocktesting.NewFakeClock(time.Now().Truncate(time.Second))
2165 manager, sharedInformerFactory := newControllerFromClientWithClock(ctx, t, clientset, controller.NoResyncPeriodFunc, fakeClock)
2166 manager.podStoreSynced = alwaysReady
2167 manager.jobStoreSynced = alwaysReady
2168 manager.expectations = FakeJobExpectations{
2169 controller.NewControllerExpectations(), true, func() {
2170 },
2171 }
2172 ctx, cancel := context.WithCancel(context.Background())
2173 defer cancel()
2174 sharedInformerFactory.Start(ctx.Done())
2175 sharedInformerFactory.WaitForCacheSync(ctx.Done())
2176
2177 go manager.Run(ctx, 1)
2178
2179 tests := []struct {
2180 name string
2181 setStartTime bool
2182 jobName string
2183 }{
2184 {
2185 name: "New job created without start time being set",
2186 setStartTime: false,
2187 jobName: "job1",
2188 },
2189 {
2190 name: "New job created with start time being set",
2191 setStartTime: true,
2192 jobName: "job2",
2193 },
2194 }
2195 for _, tc := range tests {
2196 t.Run(tc.name, func(t *testing.T) {
2197 job := newJobWithName(tc.jobName, 1, 1, 6, batch.NonIndexedCompletion)
2198 job.Spec.ActiveDeadlineSeconds = ptr.To[int64](1)
2199 if tc.setStartTime {
2200 start := metav1.NewTime(fakeClock.Now())
2201 job.Status.StartTime = &start
2202 }
2203
2204 _, err := clientset.BatchV1().Jobs(job.GetNamespace()).Create(ctx, job, metav1.CreateOptions{})
2205 if err != nil {
2206 t.Errorf("Could not create Job: %v", err)
2207 }
2208
2209 var j *batch.Job
2210 err = wait.PollUntilContextTimeout(ctx, 200*time.Microsecond, 3*time.Second, true, func(ctx context.Context) (done bool, err error) {
2211 j, err = clientset.BatchV1().Jobs(metav1.NamespaceDefault).Get(ctx, job.GetName(), metav1.GetOptions{})
2212 if err != nil {
2213 return false, err
2214 }
2215 return j.Status.StartTime != nil, nil
2216 })
2217 if err != nil {
2218 t.Errorf("Job failed to ensure that start time was set: %v", err)
2219 }
2220 err = wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 3*time.Second, false, func(ctx context.Context) (done bool, err error) {
2221 j, err = clientset.BatchV1().Jobs(metav1.NamespaceDefault).Get(ctx, job.GetName(), metav1.GetOptions{})
2222 if err != nil {
2223 return false, nil
2224 }
2225 if getCondition(j, batch.JobFailed, v1.ConditionTrue, batch.JobReasonDeadlineExceeded) {
2226 if manager.clock.Since(j.Status.StartTime.Time) < time.Duration(*j.Spec.ActiveDeadlineSeconds)*time.Second {
2227 return true, errors.New("Job contains DeadlineExceeded condition earlier than expected")
2228 }
2229 return true, nil
2230 }
2231 manager.clock.Sleep(100 * time.Millisecond)
2232 return false, nil
2233 })
2234 if err != nil {
2235 t.Errorf("Job failed to enforce activeDeadlineSeconds configuration. Expected condition with Reason 'DeadlineExceeded' was not found in %v", j.Status)
2236 }
2237 })
2238 }
2239 }
2240
2241 func TestSingleJobFailedCondition(t *testing.T) {
2242 _, ctx := ktesting.NewTestContext(t)
2243 clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
2244 manager, sharedInformerFactory := newControllerFromClient(ctx, t, clientset, controller.NoResyncPeriodFunc)
2245 fakePodControl := controller.FakePodControl{}
2246 manager.podControl = &fakePodControl
2247 manager.podStoreSynced = alwaysReady
2248 manager.jobStoreSynced = alwaysReady
2249 var actual *batch.Job
2250 manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) {
2251 actual = job
2252 return job, nil
2253 }
2254
2255 job := newJob(1, 1, 6, batch.NonIndexedCompletion)
2256 job.Spec.ActiveDeadlineSeconds = ptr.To[int64](10)
2257 start := metav1.Unix(metav1.Now().Time.Unix()-15, 0)
2258 job.Status.StartTime = &start
2259 job.Status.Conditions = append(job.Status.Conditions, *newCondition(batch.JobFailed, v1.ConditionFalse, "DeadlineExceeded", "Job was active longer than specified deadline", realClock.Now()))
2260 sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
2261 err := manager.syncJob(context.TODO(), testutil.GetKey(job, t))
2262 if err != nil {
2263 t.Errorf("Unexpected error when syncing jobs %v", err)
2264 }
2265 if len(fakePodControl.DeletePodName) != 0 {
2266 t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", 0, len(fakePodControl.DeletePodName))
2267 }
2268 if actual == nil {
2269 t.Error("Expected job modification\n")
2270 }
2271 failedConditions := getConditionsByType(actual.Status.Conditions, batch.JobFailed)
2272 if len(failedConditions) != 1 {
2273 t.Error("Unexpected number of failed conditions\n")
2274 }
2275 if failedConditions[0].Status != v1.ConditionTrue {
2276 t.Errorf("Unexpected status for the failed condition. Expected: %v, saw %v\n", v1.ConditionTrue, failedConditions[0].Status)
2277 }
2278
2279 }
2280
2281 func TestSyncJobComplete(t *testing.T) {
2282 _, ctx := ktesting.NewTestContext(t)
2283 clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
2284 manager, sharedInformerFactory := newControllerFromClient(ctx, t, clientset, controller.NoResyncPeriodFunc)
2285 fakePodControl := controller.FakePodControl{}
2286 manager.podControl = &fakePodControl
2287 manager.podStoreSynced = alwaysReady
2288 manager.jobStoreSynced = alwaysReady
2289
2290 job := newJob(1, 1, 6, batch.NonIndexedCompletion)
2291 job.Status.Conditions = append(job.Status.Conditions, *newCondition(batch.JobComplete, v1.ConditionTrue, "", "", realClock.Now()))
2292 sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
2293 err := manager.syncJob(context.TODO(), testutil.GetKey(job, t))
2294 if err != nil {
2295 t.Fatalf("Unexpected error when syncing jobs %v", err)
2296 }
2297 actual, err := manager.jobLister.Jobs(job.Namespace).Get(job.Name)
2298 if err != nil {
2299 t.Fatalf("Unexpected error when trying to get job from the store: %v", err)
2300 }
2301
2302 if got, expected := len(actual.Status.Conditions), 1; got != expected {
2303 t.Fatalf("Unexpected job status conditions amount; expected %d, got %d", expected, got)
2304 }
2305 }
2306
2307 func TestSyncJobDeleted(t *testing.T) {
2308 _, ctx := ktesting.NewTestContext(t)
2309 clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
2310 manager, _ := newControllerFromClient(ctx, t, clientset, controller.NoResyncPeriodFunc)
2311 fakePodControl := controller.FakePodControl{}
2312 manager.podControl = &fakePodControl
2313 manager.podStoreSynced = alwaysReady
2314 manager.jobStoreSynced = alwaysReady
2315 manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) {
2316 return job, nil
2317 }
2318 job := newJob(2, 2, 6, batch.NonIndexedCompletion)
2319 err := manager.syncJob(context.TODO(), testutil.GetKey(job, t))
2320 if err != nil {
2321 t.Errorf("Unexpected error when syncing jobs %v", err)
2322 }
2323 if len(fakePodControl.Templates) != 0 {
2324 t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", 0, len(fakePodControl.Templates))
2325 }
2326 if len(fakePodControl.DeletePodName) != 0 {
2327 t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", 0, len(fakePodControl.DeletePodName))
2328 }
2329 }
2330
2331 func TestSyncJobWhenManagedBy(t *testing.T) {
2332 _, ctx := ktesting.NewTestContext(t)
2333 now := metav1.Now()
2334 baseJob := batch.Job{
2335 TypeMeta: metav1.TypeMeta{Kind: "Job"},
2336 ObjectMeta: metav1.ObjectMeta{
2337 Name: "foobar",
2338 Namespace: metav1.NamespaceDefault,
2339 },
2340 Spec: batch.JobSpec{
2341 Template: v1.PodTemplateSpec{
2342 ObjectMeta: metav1.ObjectMeta{
2343 Labels: map[string]string{
2344 "foo": "bar",
2345 },
2346 },
2347 Spec: v1.PodSpec{
2348 Containers: []v1.Container{
2349 {Image: "foo/bar"},
2350 },
2351 },
2352 },
2353 Parallelism: ptr.To[int32](2),
2354 Completions: ptr.To[int32](2),
2355 BackoffLimit: ptr.To[int32](6),
2356 },
2357 Status: batch.JobStatus{
2358 Active: 1,
2359 Ready: ptr.To[int32](1),
2360 StartTime: &now,
2361 },
2362 }
2363
2364 testCases := map[string]struct {
2365 enableJobManagedBy bool
2366 job batch.Job
2367 wantStatus batch.JobStatus
2368 }{
2369 "job with custom value of managedBy; feature enabled; the status is unchanged": {
2370 enableJobManagedBy: true,
2371 job: func() batch.Job {
2372 job := baseJob.DeepCopy()
2373 job.Spec.ManagedBy = ptr.To("custom-managed-by")
2374 return *job
2375 }(),
2376 wantStatus: baseJob.Status,
2377 },
2378 "job with well known value of the managedBy; feature enabled; the status is updated": {
2379 enableJobManagedBy: true,
2380 job: func() batch.Job {
2381 job := baseJob.DeepCopy()
2382 job.Spec.ManagedBy = ptr.To(batch.JobControllerName)
2383 return *job
2384 }(),
2385 wantStatus: batch.JobStatus{
2386 Active: 2,
2387 Ready: ptr.To[int32](0),
2388 StartTime: &now,
2389 Terminating: ptr.To[int32](0),
2390 UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
2391 },
2392 },
2393 "job with custom value of managedBy; feature disabled; the status is updated": {
2394 job: func() batch.Job {
2395 job := baseJob.DeepCopy()
2396 job.Spec.ManagedBy = ptr.To("custom-managed-by")
2397 return *job
2398 }(),
2399 wantStatus: batch.JobStatus{
2400 Active: 2,
2401 Ready: ptr.To[int32](0),
2402 StartTime: &now,
2403 Terminating: ptr.To[int32](0),
2404 UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
2405 },
2406 },
2407 "job without the managedBy; feature enabled; the status is updated": {
2408 enableJobManagedBy: true,
2409 job: baseJob,
2410 wantStatus: batch.JobStatus{
2411 Active: 2,
2412 Ready: ptr.To[int32](0),
2413 StartTime: &now,
2414 Terminating: ptr.To[int32](0),
2415 UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
2416 },
2417 },
2418 }
2419 for name, tc := range testCases {
2420 t.Run(name, func(t *testing.T) {
2421 defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobManagedBy, tc.enableJobManagedBy)()
2422
2423 clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
2424 manager, sharedInformerFactory := newControllerFromClient(ctx, t, clientset, controller.NoResyncPeriodFunc)
2425 fakePodControl := controller.FakePodControl{}
2426 manager.podControl = &fakePodControl
2427 manager.podStoreSynced = alwaysReady
2428 manager.jobStoreSynced = alwaysReady
2429 job := &tc.job
2430
2431 actual := job
2432 manager.updateStatusHandler = func(_ context.Context, job *batch.Job) (*batch.Job, error) {
2433 actual = job
2434 return job, nil
2435 }
2436 if err := sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job); err != nil {
2437 t.Fatalf("error %v while adding the %v job to the index", err, klog.KObj(job))
2438 }
2439
2440 if err := manager.syncJob(ctx, testutil.GetKey(job, t)); err != nil {
2441 t.Fatalf("error %v while reconciling the job %v", err, testutil.GetKey(job, t))
2442 }
2443
2444 if diff := cmp.Diff(tc.wantStatus, actual.Status); diff != "" {
2445 t.Errorf("Unexpected job status (-want,+got):\n%s", diff)
2446 }
2447 })
2448 }
2449 }
2450
2451 func TestSyncJobWithJobPodFailurePolicy(t *testing.T) {
2452 _, ctx := ktesting.NewTestContext(t)
2453 now := metav1.Now()
2454 indexedCompletionMode := batch.IndexedCompletion
2455 validObjectMeta := metav1.ObjectMeta{
2456 Name: "foobar",
2457 UID: uuid.NewUUID(),
2458 Namespace: metav1.NamespaceDefault,
2459 }
2460 validSelector := &metav1.LabelSelector{
2461 MatchLabels: map[string]string{"foo": "bar"},
2462 }
2463 validTemplate := v1.PodTemplateSpec{
2464 ObjectMeta: metav1.ObjectMeta{
2465 Labels: map[string]string{
2466 "foo": "bar",
2467 },
2468 },
2469 Spec: v1.PodSpec{
2470 Containers: []v1.Container{
2471 {Image: "foo/bar"},
2472 },
2473 },
2474 }
2475
2476 onExitCodeRules := []batch.PodFailurePolicyRule{
2477 {
2478 Action: batch.PodFailurePolicyActionIgnore,
2479 OnExitCodes: &batch.PodFailurePolicyOnExitCodesRequirement{
2480 Operator: batch.PodFailurePolicyOnExitCodesOpIn,
2481 Values: []int32{1, 2, 3},
2482 },
2483 },
2484 {
2485 Action: batch.PodFailurePolicyActionFailJob,
2486 OnExitCodes: &batch.PodFailurePolicyOnExitCodesRequirement{
2487 Operator: batch.PodFailurePolicyOnExitCodesOpIn,
2488 Values: []int32{5, 6, 7},
2489 },
2490 },
2491 }
2492
2493 testCases := map[string]struct {
2494 enableJobPodFailurePolicy bool
2495 enablePodDisruptionConditions bool
2496 enableJobPodReplacementPolicy bool
2497 job batch.Job
2498 pods []v1.Pod
2499 wantConditions *[]batch.JobCondition
2500 wantStatusFailed int32
2501 wantStatusActive int32
2502 wantStatusSucceeded int32
2503 wantStatusTerminating *int32
2504 }{
2505 "default handling for pod failure if the container matching the exit codes does not match the containerName restriction": {
2506 enableJobPodFailurePolicy: true,
2507 job: batch.Job{
2508 TypeMeta: metav1.TypeMeta{Kind: "Job"},
2509 ObjectMeta: validObjectMeta,
2510 Spec: batch.JobSpec{
2511 Selector: validSelector,
2512 Template: validTemplate,
2513 Parallelism: ptr.To[int32](1),
2514 Completions: ptr.To[int32](1),
2515 BackoffLimit: ptr.To[int32](6),
2516 PodFailurePolicy: &batch.PodFailurePolicy{
2517 Rules: []batch.PodFailurePolicyRule{
2518 {
2519 Action: batch.PodFailurePolicyActionIgnore,
2520 OnExitCodes: &batch.PodFailurePolicyOnExitCodesRequirement{
2521 ContainerName: ptr.To("main-container"),
2522 Operator: batch.PodFailurePolicyOnExitCodesOpIn,
2523 Values: []int32{1, 2, 3},
2524 },
2525 },
2526 {
2527 Action: batch.PodFailurePolicyActionFailJob,
2528 OnExitCodes: &batch.PodFailurePolicyOnExitCodesRequirement{
2529 ContainerName: ptr.To("main-container"),
2530 Operator: batch.PodFailurePolicyOnExitCodesOpIn,
2531 Values: []int32{5, 6, 7},
2532 },
2533 },
2534 },
2535 },
2536 },
2537 },
2538 pods: []v1.Pod{
2539 {
2540 Status: v1.PodStatus{
2541 Phase: v1.PodFailed,
2542 ContainerStatuses: []v1.ContainerStatus{
2543 {
2544 Name: "monitoring-container",
2545 State: v1.ContainerState{
2546 Terminated: &v1.ContainerStateTerminated{
2547 ExitCode: 5,
2548 },
2549 },
2550 },
2551 {
2552 Name: "main-container",
2553 State: v1.ContainerState{
2554 Terminated: &v1.ContainerStateTerminated{
2555 ExitCode: 42,
2556 FinishedAt: testFinishedAt,
2557 },
2558 },
2559 },
2560 },
2561 },
2562 },
2563 },
2564 wantConditions: nil,
2565 wantStatusActive: 1,
2566 wantStatusSucceeded: 0,
2567 wantStatusFailed: 1,
2568 },
2569 "running pod should not result in job fail based on OnExitCodes": {
2570 enableJobPodFailurePolicy: true,
2571 job: batch.Job{
2572 TypeMeta: metav1.TypeMeta{Kind: "Job"},
2573 ObjectMeta: validObjectMeta,
2574 Spec: batch.JobSpec{
2575 Selector: validSelector,
2576 Template: validTemplate,
2577 Parallelism: ptr.To[int32](1),
2578 Completions: ptr.To[int32](1),
2579 BackoffLimit: ptr.To[int32](6),
2580 PodFailurePolicy: &batch.PodFailurePolicy{
2581 Rules: onExitCodeRules,
2582 },
2583 },
2584 },
2585 pods: []v1.Pod{
2586 {
2587 Status: v1.PodStatus{
2588 Phase: v1.PodRunning,
2589 ContainerStatuses: []v1.ContainerStatus{
2590 {
2591 Name: "main-container",
2592 State: v1.ContainerState{
2593 Terminated: &v1.ContainerStateTerminated{
2594 ExitCode: 5,
2595 },
2596 },
2597 },
2598 },
2599 },
2600 },
2601 },
2602 wantConditions: nil,
2603 wantStatusActive: 1,
2604 wantStatusFailed: 0,
2605 wantStatusSucceeded: 0,
2606 },
2607 "fail job based on OnExitCodes": {
2608 enableJobPodFailurePolicy: true,
2609 job: batch.Job{
2610 TypeMeta: metav1.TypeMeta{Kind: "Job"},
2611 ObjectMeta: validObjectMeta,
2612 Spec: batch.JobSpec{
2613 Selector: validSelector,
2614 Template: validTemplate,
2615 Parallelism: ptr.To[int32](1),
2616 Completions: ptr.To[int32](1),
2617 BackoffLimit: ptr.To[int32](6),
2618 PodFailurePolicy: &batch.PodFailurePolicy{
2619 Rules: onExitCodeRules,
2620 },
2621 },
2622 },
2623 pods: []v1.Pod{
2624 {
2625 Status: v1.PodStatus{
2626 Phase: v1.PodFailed,
2627 ContainerStatuses: []v1.ContainerStatus{
2628 {
2629 Name: "main-container",
2630 State: v1.ContainerState{
2631 Terminated: &v1.ContainerStateTerminated{
2632 ExitCode: 5,
2633 },
2634 },
2635 },
2636 },
2637 },
2638 },
2639 },
2640 wantConditions: &[]batch.JobCondition{
2641 {
2642 Type: batch.JobFailed,
2643 Status: v1.ConditionTrue,
2644 Reason: batch.JobReasonPodFailurePolicy,
2645 Message: "Container main-container for pod default/mypod-0 failed with exit code 5 matching FailJob rule at index 1",
2646 },
2647 },
2648 wantStatusActive: 0,
2649 wantStatusFailed: 1,
2650 wantStatusSucceeded: 0,
2651 },
2652 "job marked already as failure target with failed pod": {
2653 enableJobPodFailurePolicy: true,
2654 job: batch.Job{
2655 TypeMeta: metav1.TypeMeta{Kind: "Job"},
2656 ObjectMeta: validObjectMeta,
2657 Spec: batch.JobSpec{
2658 Selector: validSelector,
2659 Template: validTemplate,
2660 Parallelism: ptr.To[int32](1),
2661 Completions: ptr.To[int32](1),
2662 BackoffLimit: ptr.To[int32](6),
2663 PodFailurePolicy: &batch.PodFailurePolicy{
2664 Rules: onExitCodeRules,
2665 },
2666 },
2667 Status: batch.JobStatus{
2668 Conditions: []batch.JobCondition{
2669 {
2670 Type: batch.JobFailureTarget,
2671 Status: v1.ConditionTrue,
2672 Reason: batch.JobReasonPodFailurePolicy,
2673 Message: "Container main-container for pod default/mypod-0 failed with exit code 5 matching FailJob rule at index 1",
2674 },
2675 },
2676 },
2677 },
2678 pods: []v1.Pod{
2679 {
2680 Status: v1.PodStatus{
2681 Phase: v1.PodFailed,
2682 ContainerStatuses: []v1.ContainerStatus{
2683 {
2684 Name: "main-container",
2685 State: v1.ContainerState{
2686 Terminated: &v1.ContainerStateTerminated{
2687 ExitCode: 5,
2688 },
2689 },
2690 },
2691 },
2692 },
2693 },
2694 },
2695 wantConditions: &[]batch.JobCondition{
2696 {
2697 Type: batch.JobFailed,
2698 Status: v1.ConditionTrue,
2699 Reason: batch.JobReasonPodFailurePolicy,
2700 Message: "Container main-container for pod default/mypod-0 failed with exit code 5 matching FailJob rule at index 1",
2701 },
2702 },
2703 wantStatusActive: 0,
2704 wantStatusFailed: 1,
2705 wantStatusSucceeded: 0,
2706 },
2707 "job marked already as failure target with failed pod, message based on already deleted pod": {
2708 enableJobPodFailurePolicy: true,
2709 job: batch.Job{
2710 TypeMeta: metav1.TypeMeta{Kind: "Job"},
2711 ObjectMeta: validObjectMeta,
2712 Spec: batch.JobSpec{
2713 Selector: validSelector,
2714 Template: validTemplate,
2715 Parallelism: ptr.To[int32](1),
2716 Completions: ptr.To[int32](1),
2717 BackoffLimit: ptr.To[int32](6),
2718 PodFailurePolicy: &batch.PodFailurePolicy{
2719 Rules: onExitCodeRules,
2720 },
2721 },
2722 Status: batch.JobStatus{
2723 Conditions: []batch.JobCondition{
2724 {
2725 Type: batch.JobFailureTarget,
2726 Status: v1.ConditionTrue,
2727 Reason: batch.JobReasonPodFailurePolicy,
2728 Message: "Container main-container for pod default/already-deleted-pod failed with exit code 5 matching FailJob rule at index 1",
2729 },
2730 },
2731 },
2732 },
2733 pods: []v1.Pod{
2734 {
2735 Status: v1.PodStatus{
2736 Phase: v1.PodFailed,
2737 ContainerStatuses: []v1.ContainerStatus{
2738 {
2739 Name: "main-container",
2740 State: v1.ContainerState{
2741 Terminated: &v1.ContainerStateTerminated{
2742 ExitCode: 5,
2743 },
2744 },
2745 },
2746 },
2747 },
2748 },
2749 },
2750 wantConditions: &[]batch.JobCondition{
2751 {
2752 Type: batch.JobFailed,
2753 Status: v1.ConditionTrue,
2754 Reason: batch.JobReasonPodFailurePolicy,
2755 Message: "Container main-container for pod default/already-deleted-pod failed with exit code 5 matching FailJob rule at index 1",
2756 },
2757 },
2758 wantStatusActive: 0,
2759 wantStatusFailed: 1,
2760 wantStatusSucceeded: 0,
2761 },
2762 "default handling for a failed pod when the feature is disabled even, despite matching rule": {
2763 enableJobPodFailurePolicy: false,
2764 job: batch.Job{
2765 TypeMeta: metav1.TypeMeta{Kind: "Job"},
2766 ObjectMeta: validObjectMeta,
2767 Spec: batch.JobSpec{
2768 Selector: validSelector,
2769 Template: validTemplate,
2770 Parallelism: ptr.To[int32](1),
2771 Completions: ptr.To[int32](1),
2772 BackoffLimit: ptr.To[int32](6),
2773 PodFailurePolicy: &batch.PodFailurePolicy{
2774 Rules: onExitCodeRules,
2775 },
2776 },
2777 },
2778 pods: []v1.Pod{
2779 {
2780 Status: v1.PodStatus{
2781 Phase: v1.PodFailed,
2782 ContainerStatuses: []v1.ContainerStatus{
2783 {
2784 Name: "main-container",
2785 State: v1.ContainerState{
2786 Terminated: &v1.ContainerStateTerminated{
2787 ExitCode: 5,
2788 FinishedAt: testFinishedAt,
2789 },
2790 },
2791 },
2792 },
2793 },
2794 },
2795 },
2796 wantConditions: nil,
2797 wantStatusActive: 1,
2798 wantStatusFailed: 1,
2799 wantStatusSucceeded: 0,
2800 },
2801 "fail job with multiple pods": {
2802 enableJobPodFailurePolicy: true,
2803 job: batch.Job{
2804 TypeMeta: metav1.TypeMeta{Kind: "Job"},
2805 ObjectMeta: validObjectMeta,
2806 Spec: batch.JobSpec{
2807 Selector: validSelector,
2808 Template: validTemplate,
2809 Parallelism: ptr.To[int32](2),
2810 Completions: ptr.To[int32](2),
2811 BackoffLimit: ptr.To[int32](6),
2812 PodFailurePolicy: &batch.PodFailurePolicy{
2813 Rules: onExitCodeRules,
2814 },
2815 },
2816 },
2817 pods: []v1.Pod{
2818 {
2819 Status: v1.PodStatus{
2820 Phase: v1.PodRunning,
2821 },
2822 },
2823 {
2824 Status: v1.PodStatus{
2825 Phase: v1.PodFailed,
2826 ContainerStatuses: []v1.ContainerStatus{
2827 {
2828 Name: "main-container",
2829 State: v1.ContainerState{
2830 Terminated: &v1.ContainerStateTerminated{
2831 ExitCode: 5,
2832 },
2833 },
2834 },
2835 },
2836 },
2837 },
2838 },
2839 wantConditions: &[]batch.JobCondition{
2840 {
2841 Type: batch.JobFailed,
2842 Status: v1.ConditionTrue,
2843 Reason: batch.JobReasonPodFailurePolicy,
2844 Message: "Container main-container for pod default/mypod-1 failed with exit code 5 matching FailJob rule at index 1",
2845 },
2846 },
2847 wantStatusActive: 0,
2848 wantStatusFailed: 2,
2849 wantStatusSucceeded: 0,
2850 },
2851 "fail indexed job based on OnExitCodes": {
2852 enableJobPodFailurePolicy: true,
2853 job: batch.Job{
2854 TypeMeta: metav1.TypeMeta{Kind: "Job"},
2855 ObjectMeta: validObjectMeta,
2856 Spec: batch.JobSpec{
2857 Selector: validSelector,
2858 Template: validTemplate,
2859 CompletionMode: &indexedCompletionMode,
2860 Parallelism: ptr.To[int32](1),
2861 Completions: ptr.To[int32](1),
2862 BackoffLimit: ptr.To[int32](6),
2863 PodFailurePolicy: &batch.PodFailurePolicy{
2864 Rules: onExitCodeRules,
2865 },
2866 },
2867 },
2868 pods: []v1.Pod{
2869 {
2870 Status: v1.PodStatus{
2871 Phase: v1.PodFailed,
2872 ContainerStatuses: []v1.ContainerStatus{
2873 {
2874 Name: "main-container",
2875 State: v1.ContainerState{
2876 Terminated: &v1.ContainerStateTerminated{
2877 ExitCode: 5,
2878 },
2879 },
2880 },
2881 },
2882 },
2883 },
2884 },
2885 wantConditions: &[]batch.JobCondition{
2886 {
2887 Type: batch.JobFailed,
2888 Status: v1.ConditionTrue,
2889 Reason: batch.JobReasonPodFailurePolicy,
2890 Message: "Container main-container for pod default/mypod-0 failed with exit code 5 matching FailJob rule at index 1",
2891 },
2892 },
2893 wantStatusActive: 0,
2894 wantStatusFailed: 1,
2895 wantStatusSucceeded: 0,
2896 },
2897 "fail job based on OnExitCodes with NotIn operator": {
2898 enableJobPodFailurePolicy: true,
2899 job: batch.Job{
2900 TypeMeta: metav1.TypeMeta{Kind: "Job"},
2901 ObjectMeta: validObjectMeta,
2902 Spec: batch.JobSpec{
2903 Selector: validSelector,
2904 Template: validTemplate,
2905 Parallelism: ptr.To[int32](1),
2906 Completions: ptr.To[int32](1),
2907 BackoffLimit: ptr.To[int32](6),
2908 PodFailurePolicy: &batch.PodFailurePolicy{
2909 Rules: []batch.PodFailurePolicyRule{
2910 {
2911 Action: batch.PodFailurePolicyActionFailJob,
2912 OnExitCodes: &batch.PodFailurePolicyOnExitCodesRequirement{
2913 Operator: batch.PodFailurePolicyOnExitCodesOpNotIn,
2914 Values: []int32{5, 6, 7},
2915 },
2916 },
2917 },
2918 },
2919 },
2920 },
2921 pods: []v1.Pod{
2922 {
2923 Status: v1.PodStatus{
2924 Phase: v1.PodFailed,
2925 ContainerStatuses: []v1.ContainerStatus{
2926 {
2927 Name: "main-container",
2928 State: v1.ContainerState{
2929 Terminated: &v1.ContainerStateTerminated{
2930 ExitCode: 42,
2931 },
2932 },
2933 },
2934 },
2935 },
2936 },
2937 },
2938 wantConditions: &[]batch.JobCondition{
2939 {
2940 Type: batch.JobFailed,
2941 Status: v1.ConditionTrue,
2942 Reason: batch.JobReasonPodFailurePolicy,
2943 Message: "Container main-container for pod default/mypod-0 failed with exit code 42 matching FailJob rule at index 0",
2944 },
2945 },
2946 wantStatusActive: 0,
2947 wantStatusFailed: 1,
2948 wantStatusSucceeded: 0,
2949 },
2950 "default handling job based on OnExitCodes with NotIn operator": {
2951 enableJobPodFailurePolicy: true,
2952 job: batch.Job{
2953 TypeMeta: metav1.TypeMeta{Kind: "Job"},
2954 ObjectMeta: validObjectMeta,
2955 Spec: batch.JobSpec{
2956 Selector: validSelector,
2957 Template: validTemplate,
2958 Parallelism: ptr.To[int32](1),
2959 Completions: ptr.To[int32](1),
2960 BackoffLimit: ptr.To[int32](6),
2961 PodFailurePolicy: &batch.PodFailurePolicy{
2962 Rules: []batch.PodFailurePolicyRule{
2963 {
2964 Action: batch.PodFailurePolicyActionFailJob,
2965 OnExitCodes: &batch.PodFailurePolicyOnExitCodesRequirement{
2966 Operator: batch.PodFailurePolicyOnExitCodesOpNotIn,
2967 Values: []int32{5, 6, 7},
2968 },
2969 },
2970 },
2971 },
2972 },
2973 },
2974 pods: []v1.Pod{
2975 {
2976 Status: v1.PodStatus{
2977 Phase: v1.PodFailed,
2978 ContainerStatuses: []v1.ContainerStatus{
2979 {
2980 Name: "main-container",
2981 State: v1.ContainerState{
2982 Terminated: &v1.ContainerStateTerminated{
2983 ExitCode: 5,
2984 FinishedAt: testFinishedAt,
2985 },
2986 },
2987 },
2988 },
2989 },
2990 },
2991 },
2992 wantConditions: nil,
2993 wantStatusActive: 1,
2994 wantStatusFailed: 1,
2995 wantStatusSucceeded: 0,
2996 },
2997 "fail job based on OnExitCodes for InitContainer": {
2998 enableJobPodFailurePolicy: true,
2999 job: batch.Job{
3000 TypeMeta: metav1.TypeMeta{Kind: "Job"},
3001 ObjectMeta: validObjectMeta,
3002 Spec: batch.JobSpec{
3003 Selector: validSelector,
3004 Template: validTemplate,
3005 Parallelism: ptr.To[int32](1),
3006 Completions: ptr.To[int32](1),
3007 BackoffLimit: ptr.To[int32](6),
3008 PodFailurePolicy: &batch.PodFailurePolicy{
3009 Rules: onExitCodeRules,
3010 },
3011 },
3012 },
3013 pods: []v1.Pod{
3014 {
3015 Status: v1.PodStatus{
3016 Phase: v1.PodFailed,
3017 InitContainerStatuses: []v1.ContainerStatus{
3018 {
3019 Name: "init-container",
3020 State: v1.ContainerState{
3021 Terminated: &v1.ContainerStateTerminated{
3022 ExitCode: 5,
3023 },
3024 },
3025 },
3026 },
3027 ContainerStatuses: []v1.ContainerStatus{
3028 {
3029 Name: "main-container",
3030 State: v1.ContainerState{
3031 Terminated: &v1.ContainerStateTerminated{
3032 ExitCode: 143,
3033 },
3034 },
3035 },
3036 },
3037 },
3038 },
3039 },
3040 wantConditions: &[]batch.JobCondition{
3041 {
3042 Type: batch.JobFailed,
3043 Status: v1.ConditionTrue,
3044 Reason: batch.JobReasonPodFailurePolicy,
3045 Message: "Container init-container for pod default/mypod-0 failed with exit code 5 matching FailJob rule at index 1",
3046 },
3047 },
3048 wantStatusActive: 0,
3049 wantStatusFailed: 1,
3050 wantStatusSucceeded: 0,
3051 },
3052 "ignore pod failure; both rules are matching, the first is executed only": {
3053 enableJobPodFailurePolicy: true,
3054 job: batch.Job{
3055 TypeMeta: metav1.TypeMeta{Kind: "Job"},
3056 ObjectMeta: validObjectMeta,
3057 Spec: batch.JobSpec{
3058 Selector: validSelector,
3059 Template: validTemplate,
3060 Parallelism: ptr.To[int32](1),
3061 Completions: ptr.To[int32](1),
3062 BackoffLimit: ptr.To[int32](0),
3063 PodFailurePolicy: &batch.PodFailurePolicy{
3064 Rules: onExitCodeRules,
3065 },
3066 },
3067 },
3068 pods: []v1.Pod{
3069 {
3070 Status: v1.PodStatus{
3071 Phase: v1.PodFailed,
3072 ContainerStatuses: []v1.ContainerStatus{
3073 {
3074 Name: "container1",
3075 State: v1.ContainerState{
3076 Terminated: &v1.ContainerStateTerminated{
3077 ExitCode: 2,
3078 },
3079 },
3080 },
3081 {
3082 Name: "container2",
3083 State: v1.ContainerState{
3084 Terminated: &v1.ContainerStateTerminated{
3085 ExitCode: 6,
3086 },
3087 },
3088 },
3089 },
3090 },
3091 },
3092 },
3093 wantConditions: nil,
3094 wantStatusActive: 1,
3095 wantStatusFailed: 0,
3096 wantStatusSucceeded: 0,
3097 },
3098 "ignore pod failure based on OnExitCodes": {
3099 enableJobPodFailurePolicy: true,
3100 job: batch.Job{
3101 TypeMeta: metav1.TypeMeta{Kind: "Job"},
3102 ObjectMeta: validObjectMeta,
3103 Spec: batch.JobSpec{
3104 Selector: validSelector,
3105 Template: validTemplate,
3106 Parallelism: ptr.To[int32](1),
3107 Completions: ptr.To[int32](1),
3108 BackoffLimit: ptr.To[int32](0),
3109 PodFailurePolicy: &batch.PodFailurePolicy{
3110 Rules: onExitCodeRules,
3111 },
3112 },
3113 },
3114 pods: []v1.Pod{
3115 {
3116 Status: v1.PodStatus{
3117 Phase: v1.PodFailed,
3118 ContainerStatuses: []v1.ContainerStatus{
3119 {
3120 State: v1.ContainerState{
3121 Terminated: &v1.ContainerStateTerminated{
3122 ExitCode: 1,
3123 },
3124 },
3125 },
3126 },
3127 },
3128 },
3129 },
3130 wantConditions: nil,
3131 wantStatusActive: 1,
3132 wantStatusFailed: 0,
3133 wantStatusSucceeded: 0,
3134 },
3135 "default job based on OnExitCodes": {
3136 enableJobPodFailurePolicy: true,
3137 job: batch.Job{
3138 TypeMeta: metav1.TypeMeta{Kind: "Job"},
3139 ObjectMeta: validObjectMeta,
3140 Spec: batch.JobSpec{
3141 Selector: validSelector,
3142 Template: validTemplate,
3143 Parallelism: ptr.To[int32](1),
3144 Completions: ptr.To[int32](1),
3145 BackoffLimit: ptr.To[int32](0),
3146 PodFailurePolicy: &batch.PodFailurePolicy{
3147 Rules: onExitCodeRules,
3148 },
3149 },
3150 },
3151 pods: []v1.Pod{
3152 {
3153 Status: v1.PodStatus{
3154 Phase: v1.PodFailed,
3155 ContainerStatuses: []v1.ContainerStatus{
3156 {
3157 State: v1.ContainerState{
3158 Terminated: &v1.ContainerStateTerminated{
3159 ExitCode: 10,
3160 },
3161 },
3162 },
3163 },
3164 },
3165 },
3166 },
3167 wantConditions: &[]batch.JobCondition{
3168 {
3169 Type: batch.JobFailed,
3170 Status: v1.ConditionTrue,
3171 Reason: batch.JobReasonBackoffLimitExceeded,
3172 Message: "Job has reached the specified backoff limit",
3173 },
3174 },
3175 wantStatusActive: 0,
3176 wantStatusFailed: 1,
3177 wantStatusSucceeded: 0,
3178 },
3179 "count pod failure based on OnExitCodes; both rules are matching, the first is executed only": {
3180 enableJobPodFailurePolicy: true,
3181 job: batch.Job{
3182 TypeMeta: metav1.TypeMeta{Kind: "Job"},
3183 ObjectMeta: validObjectMeta,
3184 Spec: batch.JobSpec{
3185 Selector: validSelector,
3186 Template: validTemplate,
3187 Parallelism: ptr.To[int32](1),
3188 Completions: ptr.To[int32](1),
3189 BackoffLimit: ptr.To[int32](6),
3190 PodFailurePolicy: &batch.PodFailurePolicy{
3191 Rules: []batch.PodFailurePolicyRule{
3192 {
3193 Action: batch.PodFailurePolicyActionCount,
3194 OnExitCodes: &batch.PodFailurePolicyOnExitCodesRequirement{
3195 Operator: batch.PodFailurePolicyOnExitCodesOpIn,
3196 Values: []int32{1, 2},
3197 },
3198 },
3199 {
3200 Action: batch.PodFailurePolicyActionIgnore,
3201 OnExitCodes: &batch.PodFailurePolicyOnExitCodesRequirement{
3202 Operator: batch.PodFailurePolicyOnExitCodesOpIn,
3203 Values: []int32{2, 3},
3204 },
3205 },
3206 },
3207 },
3208 },
3209 },
3210 pods: []v1.Pod{
3211 {
3212 Status: v1.PodStatus{
3213 Phase: v1.PodFailed,
3214 ContainerStatuses: []v1.ContainerStatus{
3215 {
3216 State: v1.ContainerState{
3217 Terminated: &v1.ContainerStateTerminated{
3218 ExitCode: 2,
3219 FinishedAt: testFinishedAt,
3220 },
3221 },
3222 },
3223 },
3224 },
3225 },
3226 },
3227 wantConditions: nil,
3228 wantStatusActive: 1,
3229 wantStatusFailed: 1,
3230 wantStatusSucceeded: 0,
3231 },
3232 "count pod failure based on OnPodConditions; both rules are matching, the first is executed only": {
3233 enableJobPodFailurePolicy: true,
3234 job: batch.Job{
3235 TypeMeta: metav1.TypeMeta{Kind: "Job"},
3236 ObjectMeta: validObjectMeta,
3237 Spec: batch.JobSpec{
3238 Selector: validSelector,
3239 Template: validTemplate,
3240 Parallelism: ptr.To[int32](1),
3241 Completions: ptr.To[int32](1),
3242 BackoffLimit: ptr.To[int32](6),
3243 PodFailurePolicy: &batch.PodFailurePolicy{
3244 Rules: []batch.PodFailurePolicyRule{
3245 {
3246 Action: batch.PodFailurePolicyActionCount,
3247 OnPodConditions: []batch.PodFailurePolicyOnPodConditionsPattern{
3248 {
3249 Type: v1.PodConditionType("ResourceLimitExceeded"),
3250 Status: v1.ConditionTrue,
3251 },
3252 },
3253 },
3254 {
3255 Action: batch.PodFailurePolicyActionIgnore,
3256 OnPodConditions: []batch.PodFailurePolicyOnPodConditionsPattern{
3257 {
3258 Type: v1.DisruptionTarget,
3259 Status: v1.ConditionTrue,
3260 },
3261 },
3262 },
3263 },
3264 },
3265 },
3266 },
3267 pods: []v1.Pod{
3268 {
3269 Status: v1.PodStatus{
3270 Phase: v1.PodFailed,
3271 Conditions: []v1.PodCondition{
3272 {
3273 Type: v1.PodConditionType("ResourceLimitExceeded"),
3274 Status: v1.ConditionTrue,
3275 },
3276 {
3277 Type: v1.DisruptionTarget,
3278 Status: v1.ConditionTrue,
3279 },
3280 },
3281 ContainerStatuses: []v1.ContainerStatus{
3282 {
3283 State: v1.ContainerState{
3284 Terminated: &v1.ContainerStateTerminated{
3285 FinishedAt: testFinishedAt,
3286 },
3287 },
3288 },
3289 },
3290 },
3291 },
3292 },
3293 wantConditions: nil,
3294 wantStatusActive: 1,
3295 wantStatusFailed: 1,
3296 wantStatusSucceeded: 0,
3297 },
3298 "ignore pod failure based on OnPodConditions": {
3299 enableJobPodFailurePolicy: true,
3300 job: batch.Job{
3301 TypeMeta: metav1.TypeMeta{Kind: "Job"},
3302 ObjectMeta: validObjectMeta,
3303 Spec: batch.JobSpec{
3304 Selector: validSelector,
3305 Template: validTemplate,
3306 Parallelism: ptr.To[int32](1),
3307 Completions: ptr.To[int32](1),
3308 BackoffLimit: ptr.To[int32](0),
3309 PodFailurePolicy: &batch.PodFailurePolicy{
3310 Rules: []batch.PodFailurePolicyRule{
3311 {
3312 Action: batch.PodFailurePolicyActionIgnore,
3313 OnPodConditions: []batch.PodFailurePolicyOnPodConditionsPattern{
3314 {
3315 Type: v1.DisruptionTarget,
3316 Status: v1.ConditionTrue,
3317 },
3318 },
3319 },
3320 },
3321 },
3322 },
3323 },
3324 pods: []v1.Pod{
3325 {
3326 Status: v1.PodStatus{
3327 Phase: v1.PodFailed,
3328 Conditions: []v1.PodCondition{
3329 {
3330 Type: v1.DisruptionTarget,
3331 Status: v1.ConditionTrue,
3332 },
3333 },
3334 },
3335 },
3336 },
3337 wantConditions: nil,
3338 wantStatusActive: 1,
3339 wantStatusFailed: 0,
3340 wantStatusSucceeded: 0,
3341 },
3342 "ignore pod failure based on OnPodConditions, ignored failures delays pod recreation": {
3343 enableJobPodFailurePolicy: true,
3344 job: batch.Job{
3345 TypeMeta: metav1.TypeMeta{Kind: "Job"},
3346 ObjectMeta: validObjectMeta,
3347 Spec: batch.JobSpec{
3348 Selector: validSelector,
3349 Template: validTemplate,
3350 Parallelism: ptr.To[int32](1),
3351 Completions: ptr.To[int32](1),
3352 BackoffLimit: ptr.To[int32](0),
3353 PodFailurePolicy: &batch.PodFailurePolicy{
3354 Rules: []batch.PodFailurePolicyRule{
3355 {
3356 Action: batch.PodFailurePolicyActionIgnore,
3357 OnPodConditions: []batch.PodFailurePolicyOnPodConditionsPattern{
3358 {
3359 Type: v1.DisruptionTarget,
3360 Status: v1.ConditionTrue,
3361 },
3362 },
3363 },
3364 },
3365 },
3366 },
3367 },
3368 pods: []v1.Pod{
3369 {
3370 ObjectMeta: metav1.ObjectMeta{
3371 DeletionTimestamp: &now,
3372 },
3373 Status: v1.PodStatus{
3374 Phase: v1.PodFailed,
3375 Conditions: []v1.PodCondition{
3376 {
3377 Type: v1.DisruptionTarget,
3378 Status: v1.ConditionTrue,
3379 },
3380 },
3381 },
3382 },
3383 },
3384 wantConditions: nil,
3385 wantStatusActive: 0,
3386 wantStatusFailed: 0,
3387 wantStatusSucceeded: 0,
3388 },
3389 "fail job based on OnPodConditions": {
3390 enableJobPodFailurePolicy: true,
3391 job: batch.Job{
3392 TypeMeta: metav1.TypeMeta{Kind: "Job"},
3393 ObjectMeta: validObjectMeta,
3394 Spec: batch.JobSpec{
3395 Selector: validSelector,
3396 Template: validTemplate,
3397 Parallelism: ptr.To[int32](1),
3398 Completions: ptr.To[int32](1),
3399 BackoffLimit: ptr.To[int32](6),
3400 PodFailurePolicy: &batch.PodFailurePolicy{
3401 Rules: []batch.PodFailurePolicyRule{
3402 {
3403 Action: batch.PodFailurePolicyActionFailJob,
3404 OnPodConditions: []batch.PodFailurePolicyOnPodConditionsPattern{
3405 {
3406 Type: v1.DisruptionTarget,
3407 Status: v1.ConditionTrue,
3408 },
3409 },
3410 },
3411 },
3412 },
3413 },
3414 },
3415 pods: []v1.Pod{
3416 {
3417 Status: v1.PodStatus{
3418 Phase: v1.PodFailed,
3419 Conditions: []v1.PodCondition{
3420 {
3421 Type: v1.DisruptionTarget,
3422 Status: v1.ConditionTrue,
3423 },
3424 },
3425 },
3426 },
3427 },
3428 wantConditions: &[]batch.JobCondition{
3429 {
3430 Type: batch.JobFailed,
3431 Status: v1.ConditionTrue,
3432 Reason: batch.JobReasonPodFailurePolicy,
3433 Message: "Pod default/mypod-0 has condition DisruptionTarget matching FailJob rule at index 0",
3434 },
3435 },
3436 wantStatusActive: 0,
3437 wantStatusFailed: 1,
3438 wantStatusSucceeded: 0,
3439 },
3440 "terminating Pod considered failed when PodDisruptionConditions is disabled": {
3441 enableJobPodFailurePolicy: true,
3442 job: batch.Job{
3443 TypeMeta: metav1.TypeMeta{Kind: "Job"},
3444 ObjectMeta: validObjectMeta,
3445 Spec: batch.JobSpec{
3446 Parallelism: ptr.To[int32](1),
3447 Selector: validSelector,
3448 Template: validTemplate,
3449 BackoffLimit: ptr.To[int32](0),
3450 PodFailurePolicy: &batch.PodFailurePolicy{
3451 Rules: []batch.PodFailurePolicyRule{
3452 {
3453 Action: batch.PodFailurePolicyActionCount,
3454 OnPodConditions: []batch.PodFailurePolicyOnPodConditionsPattern{
3455 {
3456 Type: v1.DisruptionTarget,
3457 Status: v1.ConditionTrue,
3458 },
3459 },
3460 },
3461 },
3462 },
3463 },
3464 },
3465 pods: []v1.Pod{
3466 {
3467 ObjectMeta: metav1.ObjectMeta{
3468 DeletionTimestamp: &now,
3469 },
3470 },
3471 },
3472 },
3473 "terminating Pod not considered failed when PodDisruptionConditions is enabled": {
3474 enableJobPodFailurePolicy: true,
3475 enablePodDisruptionConditions: true,
3476 job: batch.Job{
3477 TypeMeta: metav1.TypeMeta{Kind: "Job"},
3478 ObjectMeta: validObjectMeta,
3479 Spec: batch.JobSpec{
3480 Parallelism: ptr.To[int32](1),
3481 Selector: validSelector,
3482 Template: validTemplate,
3483 BackoffLimit: ptr.To[int32](0),
3484 PodFailurePolicy: &batch.PodFailurePolicy{
3485 Rules: []batch.PodFailurePolicyRule{
3486 {
3487 Action: batch.PodFailurePolicyActionCount,
3488 OnPodConditions: []batch.PodFailurePolicyOnPodConditionsPattern{
3489 {
3490 Type: v1.DisruptionTarget,
3491 Status: v1.ConditionTrue,
3492 },
3493 },
3494 },
3495 },
3496 },
3497 },
3498 },
3499 pods: []v1.Pod{
3500 {
3501 ObjectMeta: metav1.ObjectMeta{
3502 DeletionTimestamp: &now,
3503 },
3504 Status: v1.PodStatus{
3505 Phase: v1.PodRunning,
3506 },
3507 },
3508 },
3509 },
3510 }
3511 for name, tc := range testCases {
3512 t.Run(name, func(t *testing.T) {
3513 defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, tc.enableJobPodFailurePolicy)()
3514 defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.PodDisruptionConditions, tc.enablePodDisruptionConditions)()
3515 defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodReplacementPolicy, tc.enableJobPodReplacementPolicy)()
3516
3517 if tc.job.Spec.PodReplacementPolicy == nil {
3518 tc.job.Spec.PodReplacementPolicy = podReplacementPolicy(batch.Failed)
3519 }
3520 clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
3521 manager, sharedInformerFactory := newControllerFromClient(ctx, t, clientset, controller.NoResyncPeriodFunc)
3522 fakePodControl := controller.FakePodControl{}
3523 manager.podControl = &fakePodControl
3524 manager.podStoreSynced = alwaysReady
3525 manager.jobStoreSynced = alwaysReady
3526 job := &tc.job
3527
3528 actual := job
3529 manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) {
3530 actual = job
3531 return job, nil
3532 }
3533 sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
3534 for i, pod := range tc.pods {
3535 pod := pod
3536 pb := podBuilder{Pod: &pod}.name(fmt.Sprintf("mypod-%d", i)).job(job)
3537 if job.Spec.CompletionMode != nil && *job.Spec.CompletionMode == batch.IndexedCompletion {
3538 pb.index(fmt.Sprintf("%v", i))
3539 }
3540 pb = pb.trackingFinalizer()
3541 sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer().Add(pb.Pod)
3542 }
3543
3544 manager.syncJob(context.TODO(), testutil.GetKey(job, t))
3545
3546 if tc.wantConditions != nil {
3547 for _, wantCondition := range *tc.wantConditions {
3548 conditions := getConditionsByType(actual.Status.Conditions, wantCondition.Type)
3549 if len(conditions) != 1 {
3550 t.Fatalf("Expected a single completion condition. Got %#v for type: %q", conditions, wantCondition.Type)
3551 }
3552 condition := *conditions[0]
3553 if diff := cmp.Diff(wantCondition, condition, cmpopts.IgnoreFields(batch.JobCondition{}, "LastProbeTime", "LastTransitionTime")); diff != "" {
3554 t.Errorf("Unexpected job condition (-want,+got):\n%s", diff)
3555 }
3556 }
3557 } else {
3558 if cond := hasTrueCondition(actual); cond != nil {
3559 t.Errorf("Got condition %s, want none", *cond)
3560 }
3561 }
3562
3563 if actual.Status.Active != tc.wantStatusActive {
3564 t.Errorf("unexpected number of active pods. Expected %d, saw %d\n", tc.wantStatusActive, actual.Status.Active)
3565 }
3566 if actual.Status.Succeeded != tc.wantStatusSucceeded {
3567 t.Errorf("unexpected number of succeeded pods. Expected %d, saw %d\n", tc.wantStatusSucceeded, actual.Status.Succeeded)
3568 }
3569 if actual.Status.Failed != tc.wantStatusFailed {
3570 t.Errorf("unexpected number of failed pods. Expected %d, saw %d\n", tc.wantStatusFailed, actual.Status.Failed)
3571 }
3572 if ptr.Deref(actual.Status.Terminating, 0) != ptr.Deref(tc.wantStatusTerminating, 0) {
3573 t.Errorf("unexpected number of terminating pods. Expected %d, saw %d\n", ptr.Deref(tc.wantStatusTerminating, 0), ptr.Deref(actual.Status.Terminating, 0))
3574 }
3575 })
3576 }
3577 }
3578
3579 func TestSyncJobWithJobSuccessPolicy(t *testing.T) {
3580 now := time.Now()
3581 validTypeMeta := metav1.TypeMeta{
3582 APIVersion: batch.SchemeGroupVersion.String(),
3583 Kind: "Job",
3584 }
3585 validObjectMeta := metav1.ObjectMeta{
3586 Name: "foobar",
3587 UID: uuid.NewUUID(),
3588 Namespace: metav1.NamespaceDefault,
3589 }
3590 validSelector := &metav1.LabelSelector{
3591 MatchLabels: map[string]string{"foo": "bar"},
3592 }
3593 validTemplate := v1.PodTemplateSpec{
3594 ObjectMeta: metav1.ObjectMeta{
3595 Labels: map[string]string{
3596 "foo": "bar",
3597 },
3598 },
3599 Spec: v1.PodSpec{
3600 Containers: []v1.Container{
3601 {Image: "foobar"},
3602 },
3603 },
3604 }
3605
3606 testCases := map[string]struct {
3607 enableJobFailurePolicy bool
3608 enableBackoffLimitPerIndex bool
3609 enableJobSuccessPolicy bool
3610 job batch.Job
3611 pods []v1.Pod
3612 wantStatus batch.JobStatus
3613 }{
3614 "job with successPolicy; job has SuccessCriteriaMet condition if job meets to successPolicy and some indexes fail": {
3615 enableJobSuccessPolicy: true,
3616 job: batch.Job{
3617 TypeMeta: validTypeMeta,
3618 ObjectMeta: validObjectMeta,
3619 Spec: batch.JobSpec{
3620 Selector: validSelector,
3621 Template: validTemplate,
3622 CompletionMode: completionModePtr(batch.IndexedCompletion),
3623 Parallelism: ptr.To[int32](3),
3624 Completions: ptr.To[int32](3),
3625 BackoffLimit: ptr.To[int32](math.MaxInt32),
3626 SuccessPolicy: &batch.SuccessPolicy{
3627 Rules: []batch.SuccessPolicyRule{{
3628 SucceededIndexes: ptr.To("0,1"),
3629 SucceededCount: ptr.To[int32](1),
3630 }},
3631 },
3632 },
3633 },
3634 pods: []v1.Pod{
3635 *buildPod().uid("a1").index("0").phase(v1.PodFailed).trackingFinalizer().Pod,
3636 *buildPod().uid("a2").index("0").phase(v1.PodRunning).trackingFinalizer().Pod,
3637 *buildPod().uid("b").index("1").phase(v1.PodSucceeded).trackingFinalizer().Pod,
3638 *buildPod().uid("c").index("2").phase(v1.PodRunning).trackingFinalizer().Pod,
3639 },
3640 wantStatus: batch.JobStatus{
3641 Failed: 1,
3642 Succeeded: 1,
3643 Terminating: ptr.To[int32](0),
3644 CompletedIndexes: "1",
3645 UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
3646 Conditions: []batch.JobCondition{
3647 {
3648 Type: batch.JobSuccessCriteriaMet,
3649 Status: v1.ConditionTrue,
3650 Reason: batch.JobReasonSuccessPolicy,
3651 Message: "Matched rules at index 0",
3652 },
3653 {
3654 Type: batch.JobComplete,
3655 Status: v1.ConditionTrue,
3656 Reason: batch.JobReasonSuccessPolicy,
3657 Message: "Matched rules at index 0",
3658 },
3659 },
3660 },
3661 },
3662 "job with podFailurePolicy and successPolicy; job has SuccessCriteriaMet condition if job meets to successPolicy and doesn't meet to podFailurePolicy": {
3663 enableJobSuccessPolicy: true,
3664 enableJobFailurePolicy: true,
3665 job: batch.Job{
3666 TypeMeta: validTypeMeta,
3667 ObjectMeta: validObjectMeta,
3668 Spec: batch.JobSpec{
3669 Selector: validSelector,
3670 Template: validTemplate,
3671 CompletionMode: completionModePtr(batch.IndexedCompletion),
3672 Parallelism: ptr.To[int32](2),
3673 Completions: ptr.To[int32](2),
3674 BackoffLimit: ptr.To[int32](math.MaxInt32),
3675 SuccessPolicy: &batch.SuccessPolicy{
3676 Rules: []batch.SuccessPolicyRule{{
3677 SucceededIndexes: ptr.To("0,1"),
3678 SucceededCount: ptr.To[int32](1),
3679 }},
3680 },
3681 PodFailurePolicy: &batch.PodFailurePolicy{
3682 Rules: []batch.PodFailurePolicyRule{{
3683 Action: batch.PodFailurePolicyActionFailJob,
3684 OnPodConditions: []batch.PodFailurePolicyOnPodConditionsPattern{{
3685 Type: v1.DisruptionTarget,
3686 Status: v1.ConditionTrue,
3687 }},
3688 }},
3689 },
3690 },
3691 },
3692 pods: []v1.Pod{
3693 *buildPod().uid("a1").index("0").phase(v1.PodFailed).trackingFinalizer().Pod,
3694 *buildPod().uid("a2").index("0").phase(v1.PodRunning).trackingFinalizer().Pod,
3695 *buildPod().uid("b").index("1").phase(v1.PodSucceeded).trackingFinalizer().Pod,
3696 },
3697 wantStatus: batch.JobStatus{
3698 Failed: 1,
3699 Succeeded: 1,
3700 Terminating: ptr.To[int32](0),
3701 CompletedIndexes: "1",
3702 UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
3703 Conditions: []batch.JobCondition{
3704 {
3705 Type: batch.JobSuccessCriteriaMet,
3706 Status: v1.ConditionTrue,
3707 Reason: batch.JobReasonSuccessPolicy,
3708 Message: "Matched rules at index 0",
3709 },
3710 {
3711 Type: batch.JobComplete,
3712 Status: v1.ConditionTrue,
3713 Reason: batch.JobReasonSuccessPolicy,
3714 Message: "Matched rules at index 0",
3715 },
3716 },
3717 },
3718 },
3719 "job with backoffLimitPerIndex and successPolicy; job has SuccessCriteriaMet condition if job meets to successPolicy and doesn't meet backoffLimitPerIndex": {
3720 enableJobSuccessPolicy: true,
3721 enableBackoffLimitPerIndex: true,
3722 job: batch.Job{
3723 TypeMeta: validTypeMeta,
3724 ObjectMeta: validObjectMeta,
3725 Spec: batch.JobSpec{
3726 Selector: validSelector,
3727 Template: validTemplate,
3728 CompletionMode: completionModePtr(batch.IndexedCompletion),
3729 Parallelism: ptr.To[int32](2),
3730 Completions: ptr.To[int32](2),
3731 BackoffLimit: ptr.To[int32](math.MaxInt32),
3732 BackoffLimitPerIndex: ptr.To[int32](2),
3733 SuccessPolicy: &batch.SuccessPolicy{
3734 Rules: []batch.SuccessPolicyRule{{
3735 SucceededIndexes: ptr.To("0,1"),
3736 SucceededCount: ptr.To[int32](1),
3737 }},
3738 },
3739 },
3740 },
3741 pods: []v1.Pod{
3742 *buildPod().uid("a1").index("0").phase(v1.PodFailed).trackingFinalizer().Pod,
3743 *buildPod().uid("a2").index("0").phase(v1.PodRunning).trackingFinalizer().Pod,
3744 *buildPod().uid("b").index("1").phase(v1.PodSucceeded).trackingFinalizer().Pod,
3745 },
3746 wantStatus: batch.JobStatus{
3747 Failed: 1,
3748 Succeeded: 1,
3749 Terminating: ptr.To[int32](0),
3750 CompletedIndexes: "1",
3751 FailedIndexes: ptr.To(""),
3752 UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
3753 Conditions: []batch.JobCondition{
3754 {
3755 Type: batch.JobSuccessCriteriaMet,
3756 Status: v1.ConditionTrue,
3757 Reason: batch.JobReasonSuccessPolicy,
3758 Message: "Matched rules at index 0",
3759 },
3760 {
3761 Type: batch.JobComplete,
3762 Status: v1.ConditionTrue,
3763 Reason: batch.JobReasonSuccessPolicy,
3764 Message: "Matched rules at index 0",
3765 },
3766 },
3767 },
3768 },
3769 "job with successPolicy; job has both Complete and SuccessCriteriaMet condition when job meets to successPolicy and all pods have been already removed": {
3770 enableJobSuccessPolicy: true,
3771 job: batch.Job{
3772 TypeMeta: validTypeMeta,
3773 ObjectMeta: validObjectMeta,
3774 Spec: batch.JobSpec{
3775 Selector: validSelector,
3776 Template: validTemplate,
3777 CompletionMode: completionModePtr(batch.IndexedCompletion),
3778 Parallelism: ptr.To[int32](2),
3779 Completions: ptr.To[int32](2),
3780 BackoffLimit: ptr.To[int32](math.MaxInt32),
3781 BackoffLimitPerIndex: ptr.To[int32](2),
3782 SuccessPolicy: &batch.SuccessPolicy{
3783 Rules: []batch.SuccessPolicyRule{{
3784 SucceededIndexes: ptr.To("0,1"),
3785 SucceededCount: ptr.To[int32](1),
3786 }},
3787 },
3788 },
3789 Status: batch.JobStatus{
3790 Conditions: []batch.JobCondition{
3791 {
3792 Type: batch.JobSuccessCriteriaMet,
3793 Status: v1.ConditionTrue,
3794 Reason: batch.JobReasonSuccessPolicy,
3795 Message: "Matched rules at index 0",
3796 },
3797 },
3798 },
3799 },
3800 pods: []v1.Pod{
3801 *buildPod().uid("a").index("0").phase(v1.PodFailed).trackingFinalizer().Pod,
3802 *buildPod().uid("b").index("1").phase(v1.PodSucceeded).trackingFinalizer().Pod,
3803 },
3804 wantStatus: batch.JobStatus{
3805 Failed: 1,
3806 Succeeded: 1,
3807 Terminating: ptr.To[int32](0),
3808 CompletedIndexes: "1",
3809 UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
3810 Conditions: []batch.JobCondition{
3811 {
3812 Type: batch.JobSuccessCriteriaMet,
3813 Status: v1.ConditionTrue,
3814 Reason: batch.JobReasonSuccessPolicy,
3815 Message: "Matched rules at index 0",
3816 },
3817 {
3818 Type: batch.JobComplete,
3819 Status: v1.ConditionTrue,
3820 Reason: batch.JobReasonSuccessPolicy,
3821 Message: "Matched rules at index 0",
3822 },
3823 },
3824 },
3825 },
3826
3827
3828
3829
3830
3831 "job with successPolicy; job has SuccessCriteriaMet and Complete condition when job meets to successPolicy and some pods still are running": {
3832 enableJobSuccessPolicy: true,
3833 job: batch.Job{
3834 TypeMeta: validTypeMeta,
3835 ObjectMeta: validObjectMeta,
3836 Spec: batch.JobSpec{
3837 Selector: validSelector,
3838 Template: validTemplate,
3839 CompletionMode: completionModePtr(batch.IndexedCompletion),
3840 Parallelism: ptr.To[int32](3),
3841 Completions: ptr.To[int32](3),
3842 BackoffLimit: ptr.To[int32](math.MaxInt32),
3843 BackoffLimitPerIndex: ptr.To[int32](3),
3844 SuccessPolicy: &batch.SuccessPolicy{
3845 Rules: []batch.SuccessPolicyRule{{
3846 SucceededIndexes: ptr.To("0,1"),
3847 SucceededCount: ptr.To[int32](1),
3848 }},
3849 },
3850 },
3851 Status: batch.JobStatus{
3852 Conditions: []batch.JobCondition{
3853 {
3854 Type: batch.JobSuccessCriteriaMet,
3855 Status: v1.ConditionTrue,
3856 Reason: batch.JobReasonSuccessPolicy,
3857 Message: "Matched rules at index 0",
3858 },
3859 },
3860 },
3861 },
3862 pods: []v1.Pod{
3863 *buildPod().uid("a1").index("0").phase(v1.PodFailed).trackingFinalizer().Pod,
3864 *buildPod().uid("a2").index("1").phase(v1.PodRunning).trackingFinalizer().Pod,
3865 *buildPod().uid("b").index("1").phase(v1.PodSucceeded).trackingFinalizer().Pod,
3866 *buildPod().uid("c").index("2").phase(v1.PodRunning).trackingFinalizer().Pod,
3867 },
3868 wantStatus: batch.JobStatus{
3869 Failed: 1,
3870 Succeeded: 1,
3871 Terminating: ptr.To[int32](0),
3872 CompletedIndexes: "1",
3873 UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
3874 Conditions: []batch.JobCondition{
3875 {
3876 Type: batch.JobSuccessCriteriaMet,
3877 Status: v1.ConditionTrue,
3878 Reason: batch.JobReasonSuccessPolicy,
3879 Message: "Matched rules at index 0",
3880 },
3881 {
3882 Type: batch.JobComplete,
3883 Status: v1.ConditionTrue,
3884 Reason: batch.JobReasonSuccessPolicy,
3885 Message: "Matched rules at index 0",
3886 },
3887 },
3888 },
3889 },
3890 "job with successPolicy and podFailurePolicy; job has a failed condition when job meets to both successPolicy and podFailurePolicy": {
3891 enableJobSuccessPolicy: true,
3892 enableJobFailurePolicy: true,
3893 job: batch.Job{
3894 TypeMeta: validTypeMeta,
3895 ObjectMeta: validObjectMeta,
3896 Spec: batch.JobSpec{
3897 Selector: validSelector,
3898 Template: validTemplate,
3899 CompletionMode: completionModePtr(batch.IndexedCompletion),
3900 Parallelism: ptr.To[int32](2),
3901 Completions: ptr.To[int32](2),
3902 BackoffLimit: ptr.To[int32](math.MaxInt32),
3903 SuccessPolicy: &batch.SuccessPolicy{
3904 Rules: []batch.SuccessPolicyRule{{
3905 SucceededIndexes: ptr.To("0,1"),
3906 SucceededCount: ptr.To[int32](1),
3907 }},
3908 },
3909 PodFailurePolicy: &batch.PodFailurePolicy{
3910 Rules: []batch.PodFailurePolicyRule{{
3911 Action: batch.PodFailurePolicyActionFailJob,
3912 OnPodConditions: []batch.PodFailurePolicyOnPodConditionsPattern{{
3913 Type: v1.DisruptionTarget,
3914 Status: v1.ConditionTrue,
3915 }},
3916 }},
3917 },
3918 },
3919 },
3920 pods: []v1.Pod{
3921 *buildPod().uid("a1").index("0").status(v1.PodStatus{
3922 Phase: v1.PodFailed,
3923 Conditions: []v1.PodCondition{{
3924 Type: v1.DisruptionTarget,
3925 Status: v1.ConditionTrue,
3926 }},
3927 }).trackingFinalizer().Pod,
3928 *buildPod().uid("a2").index("0").phase(v1.PodRunning).trackingFinalizer().Pod,
3929 *buildPod().uid("b").index("1").phase(v1.PodSucceeded).trackingFinalizer().Pod,
3930 },
3931 wantStatus: batch.JobStatus{
3932 Failed: 2,
3933 Succeeded: 1,
3934 Terminating: ptr.To[int32](0),
3935 CompletedIndexes: "1",
3936 UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
3937 Conditions: []batch.JobCondition{
3938 {
3939 Type: batch.JobFailureTarget,
3940 Status: v1.ConditionTrue,
3941 Reason: batch.JobReasonPodFailurePolicy,
3942 Message: "Pod default/mypod-0 has condition DisruptionTarget matching FailJob rule at index 0",
3943 },
3944 {
3945 Type: batch.JobFailed,
3946 Status: v1.ConditionTrue,
3947 Reason: batch.JobReasonPodFailurePolicy,
3948 Message: "Pod default/mypod-0 has condition DisruptionTarget matching FailJob rule at index 0",
3949 },
3950 },
3951 },
3952 },
3953 "job with successPolicy and backoffLimitPerIndex; job has a failed condition when job meets to both successPolicy and backoffLimitPerIndex": {
3954 enableJobSuccessPolicy: true,
3955 enableBackoffLimitPerIndex: true,
3956 job: batch.Job{
3957 TypeMeta: validTypeMeta,
3958 ObjectMeta: validObjectMeta,
3959 Spec: batch.JobSpec{
3960 Selector: validSelector,
3961 Template: validTemplate,
3962 CompletionMode: completionModePtr(batch.IndexedCompletion),
3963 Parallelism: ptr.To[int32](2),
3964 Completions: ptr.To[int32](2),
3965 BackoffLimit: ptr.To[int32](math.MaxInt32),
3966 BackoffLimitPerIndex: ptr.To[int32](1),
3967 SuccessPolicy: &batch.SuccessPolicy{
3968 Rules: []batch.SuccessPolicyRule{{
3969 SucceededIndexes: ptr.To("0,1"),
3970 SucceededCount: ptr.To[int32](1),
3971 }},
3972 },
3973 },
3974 },
3975 pods: []v1.Pod{
3976 *buildPod().uid("a").index("0").phase(v1.PodFailed).indexFailureCount("1").trackingFinalizer().Pod,
3977 *buildPod().uid("b").index("1").phase(v1.PodSucceeded).trackingFinalizer().Pod,
3978 },
3979 wantStatus: batch.JobStatus{
3980 Failed: 1,
3981 Succeeded: 1,
3982 Terminating: ptr.To[int32](0),
3983 CompletedIndexes: "1",
3984 FailedIndexes: ptr.To("0"),
3985 UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
3986 Conditions: []batch.JobCondition{
3987 {
3988 Type: batch.JobFailed,
3989 Status: v1.ConditionTrue,
3990 Reason: batch.JobReasonFailedIndexes,
3991 Message: "Job has failed indexes",
3992 },
3993 },
3994 },
3995 },
3996 "job with successPolicy and backoffLimit; job has a failed condition when job meets to both successPolicy and backoffLimit": {
3997 enableJobSuccessPolicy: true,
3998 job: batch.Job{
3999 TypeMeta: validTypeMeta,
4000 ObjectMeta: validObjectMeta,
4001 Spec: batch.JobSpec{
4002 Selector: validSelector,
4003 Template: validTemplate,
4004 CompletionMode: completionModePtr(batch.IndexedCompletion),
4005 Parallelism: ptr.To[int32](2),
4006 Completions: ptr.To[int32](2),
4007 BackoffLimit: ptr.To[int32](1),
4008 SuccessPolicy: &batch.SuccessPolicy{
4009 Rules: []batch.SuccessPolicyRule{{
4010 SucceededIndexes: ptr.To("0,1"),
4011 SucceededCount: ptr.To[int32](1),
4012 }},
4013 },
4014 },
4015 },
4016 pods: []v1.Pod{
4017 *buildPod().uid("a1").index("0").phase(v1.PodFailed).trackingFinalizer().Pod,
4018 *buildPod().uid("a2").index("0").phase(v1.PodFailed).trackingFinalizer().Pod,
4019 *buildPod().uid("b").index("1").phase(v1.PodSucceeded).trackingFinalizer().Pod,
4020 },
4021 wantStatus: batch.JobStatus{
4022 Failed: 2,
4023 Succeeded: 1,
4024 Terminating: ptr.To[int32](0),
4025 CompletedIndexes: "1",
4026 UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
4027 Conditions: []batch.JobCondition{
4028 {
4029 Type: batch.JobFailed,
4030 Status: v1.ConditionTrue,
4031 Reason: batch.JobReasonBackoffLimitExceeded,
4032 Message: "Job has reached the specified backoff limit",
4033 },
4034 },
4035 },
4036 },
4037 "job with successPolicy and podFailurePolicy; job with SuccessCriteriaMet has never been transitioned to FailureTarget and Failed even if job meets podFailurePolicy": {
4038 enableJobSuccessPolicy: true,
4039 enableJobFailurePolicy: true,
4040 job: batch.Job{
4041 TypeMeta: validTypeMeta,
4042 ObjectMeta: validObjectMeta,
4043 Spec: batch.JobSpec{
4044 Selector: validSelector,
4045 Template: validTemplate,
4046 CompletionMode: completionModePtr(batch.IndexedCompletion),
4047 Parallelism: ptr.To[int32](2),
4048 Completions: ptr.To[int32](2),
4049 BackoffLimit: ptr.To[int32](math.MaxInt32),
4050 SuccessPolicy: &batch.SuccessPolicy{
4051 Rules: []batch.SuccessPolicyRule{{
4052 SucceededCount: ptr.To[int32](1),
4053 }},
4054 },
4055 PodFailurePolicy: &batch.PodFailurePolicy{
4056 Rules: []batch.PodFailurePolicyRule{{
4057 Action: batch.PodFailurePolicyActionFailJob,
4058 OnPodConditions: []batch.PodFailurePolicyOnPodConditionsPattern{{
4059 Type: v1.DisruptionTarget,
4060 Status: v1.ConditionTrue,
4061 }},
4062 }},
4063 },
4064 },
4065 Status: batch.JobStatus{
4066 Failed: 0,
4067 Succeeded: 1,
4068 Terminating: ptr.To[int32](0),
4069 CompletedIndexes: "1",
4070 UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
4071 Conditions: []batch.JobCondition{
4072 {
4073 Type: batch.JobSuccessCriteriaMet,
4074 Status: v1.ConditionTrue,
4075 Reason: batch.JobReasonSuccessPolicy,
4076 Message: "Matched rules at index 0",
4077 },
4078 },
4079 },
4080 },
4081 pods: []v1.Pod{
4082 *buildPod().uid("a").index("0").status(v1.PodStatus{
4083 Phase: v1.PodFailed,
4084 Conditions: []v1.PodCondition{{
4085 Type: v1.DisruptionTarget,
4086 Status: v1.ConditionTrue,
4087 }},
4088 }).trackingFinalizer().Pod,
4089 *buildPod().uid("b").index("1").phase(v1.PodSucceeded).trackingFinalizer().Pod,
4090 },
4091 wantStatus: batch.JobStatus{
4092 Failed: 1,
4093 Succeeded: 1,
4094 Terminating: ptr.To[int32](0),
4095 CompletedIndexes: "1",
4096 UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
4097 Conditions: []batch.JobCondition{
4098 {
4099 Type: batch.JobSuccessCriteriaMet,
4100 Status: v1.ConditionTrue,
4101 Reason: batch.JobReasonSuccessPolicy,
4102 Message: "Matched rules at index 0",
4103 },
4104 {
4105 Type: batch.JobComplete,
4106 Status: v1.ConditionTrue,
4107 Reason: batch.JobReasonSuccessPolicy,
4108 Message: "Matched rules at index 0",
4109 },
4110 },
4111 },
4112 },
4113 "job with successPolicy and backoffLimitPerIndex; job with SuccessCriteriaMet has never been transitioned to FailureTarget and Failed even if job meet backoffLimitPerIndex": {
4114 enableJobSuccessPolicy: true,
4115 enableBackoffLimitPerIndex: true,
4116 job: batch.Job{
4117 TypeMeta: validTypeMeta,
4118 ObjectMeta: validObjectMeta,
4119 Spec: batch.JobSpec{
4120 Selector: validSelector,
4121 Template: validTemplate,
4122 CompletionMode: completionModePtr(batch.IndexedCompletion),
4123 Parallelism: ptr.To[int32](2),
4124 Completions: ptr.To[int32](2),
4125 BackoffLimit: ptr.To[int32](math.MaxInt32),
4126 BackoffLimitPerIndex: ptr.To[int32](1),
4127 SuccessPolicy: &batch.SuccessPolicy{
4128 Rules: []batch.SuccessPolicyRule{{
4129 SucceededIndexes: ptr.To("1"),
4130 }},
4131 },
4132 },
4133 Status: batch.JobStatus{
4134 Failed: 0,
4135 Succeeded: 1,
4136 Terminating: ptr.To[int32](0),
4137 CompletedIndexes: "1",
4138 UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
4139 Conditions: []batch.JobCondition{
4140 {
4141 Type: batch.JobSuccessCriteriaMet,
4142 Status: v1.ConditionTrue,
4143 Reason: batch.JobReasonSuccessPolicy,
4144 Message: "Matched rules at index 0",
4145 },
4146 },
4147 },
4148 },
4149 pods: []v1.Pod{
4150 *buildPod().uid("a").index("0").phase(v1.PodFailed).indexFailureCount("1").trackingFinalizer().Pod,
4151 *buildPod().uid("b").index("1").phase(v1.PodSucceeded).trackingFinalizer().Pod,
4152 },
4153 wantStatus: batch.JobStatus{
4154 Failed: 1,
4155 Succeeded: 1,
4156 Terminating: ptr.To[int32](0),
4157 CompletedIndexes: "1",
4158 FailedIndexes: ptr.To("0"),
4159 UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
4160 Conditions: []batch.JobCondition{
4161 {
4162 Type: batch.JobSuccessCriteriaMet,
4163 Status: v1.ConditionTrue,
4164 Reason: batch.JobReasonSuccessPolicy,
4165 Message: "Matched rules at index 0",
4166 },
4167 {
4168 Type: batch.JobComplete,
4169 Status: v1.ConditionTrue,
4170 Reason: batch.JobReasonSuccessPolicy,
4171 Message: "Matched rules at index 0",
4172 },
4173 },
4174 },
4175 },
4176 "job with successPolicy and backoffLimit: job with SuccessCriteriaMet has never been transitioned to FailureTarget and Failed even if job meets backoffLimit": {
4177 enableJobSuccessPolicy: true,
4178 job: batch.Job{
4179 TypeMeta: validTypeMeta,
4180 ObjectMeta: validObjectMeta,
4181 Spec: batch.JobSpec{
4182 Selector: validSelector,
4183 Template: validTemplate,
4184 CompletionMode: completionModePtr(batch.IndexedCompletion),
4185 Parallelism: ptr.To[int32](2),
4186 Completions: ptr.To[int32](2),
4187 BackoffLimit: ptr.To[int32](1),
4188 SuccessPolicy: &batch.SuccessPolicy{
4189 Rules: []batch.SuccessPolicyRule{{
4190 SucceededIndexes: ptr.To("0,1"),
4191 SucceededCount: ptr.To[int32](1),
4192 }},
4193 },
4194 },
4195 Status: batch.JobStatus{
4196 Failed: 0,
4197 Succeeded: 1,
4198 Terminating: ptr.To[int32](0),
4199 CompletedIndexes: "1",
4200 UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
4201 Conditions: []batch.JobCondition{
4202 {
4203 Type: batch.JobSuccessCriteriaMet,
4204 Status: v1.ConditionTrue,
4205 Reason: batch.JobReasonSuccessPolicy,
4206 Message: "Matched rules at index 0",
4207 },
4208 },
4209 },
4210 },
4211 pods: []v1.Pod{
4212 *buildPod().uid("a").index("0").phase(v1.PodFailed).trackingFinalizer().Pod,
4213 *buildPod().uid("b").index("1").phase(v1.PodSucceeded).trackingFinalizer().Pod,
4214 },
4215 wantStatus: batch.JobStatus{
4216 Failed: 1,
4217 Succeeded: 1,
4218 Terminating: ptr.To[int32](0),
4219 CompletedIndexes: "1",
4220 UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
4221 Conditions: []batch.JobCondition{
4222 {
4223 Type: batch.JobSuccessCriteriaMet,
4224 Status: v1.ConditionTrue,
4225 Reason: batch.JobReasonSuccessPolicy,
4226 Message: "Matched rules at index 0",
4227 },
4228 {
4229 Type: batch.JobComplete,
4230 Status: v1.ConditionTrue,
4231 Reason: batch.JobReasonSuccessPolicy,
4232 Message: "Matched rules at index 0",
4233 },
4234 },
4235 },
4236 },
4237 "job with successPolicy and podFailureTarget; job with FailureTarget has never been transitioned to SuccessCriteriaMet even if job meets successPolicy": {
4238 enableJobSuccessPolicy: true,
4239 enableJobFailurePolicy: true,
4240 job: batch.Job{
4241 TypeMeta: validTypeMeta,
4242 ObjectMeta: validObjectMeta,
4243 Spec: batch.JobSpec{
4244 Selector: validSelector,
4245 Template: validTemplate,
4246 CompletionMode: completionModePtr(batch.IndexedCompletion),
4247 Parallelism: ptr.To[int32](2),
4248 Completions: ptr.To[int32](2),
4249 BackoffLimit: ptr.To[int32](math.MaxInt32),
4250 SuccessPolicy: &batch.SuccessPolicy{
4251 Rules: []batch.SuccessPolicyRule{{
4252 SucceededIndexes: ptr.To("0,1"),
4253 SucceededCount: ptr.To[int32](1),
4254 }},
4255 },
4256 PodFailurePolicy: &batch.PodFailurePolicy{
4257 Rules: []batch.PodFailurePolicyRule{{
4258 Action: batch.PodFailurePolicyActionFailJob,
4259 OnPodConditions: []batch.PodFailurePolicyOnPodConditionsPattern{{
4260 Type: v1.DisruptionTarget,
4261 Status: v1.ConditionTrue,
4262 }},
4263 }},
4264 },
4265 },
4266 Status: batch.JobStatus{
4267 Failed: 1,
4268 Succeeded: 0,
4269 Terminating: ptr.To[int32](0),
4270 CompletedIndexes: "1",
4271 UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
4272 Conditions: []batch.JobCondition{
4273 {
4274 Type: batch.JobFailureTarget,
4275 Status: v1.ConditionTrue,
4276 Reason: batch.JobReasonPodFailurePolicy,
4277 Message: "Pod default/mypod-0 has condition DisruptionTarget matching FailJob rule at index 0",
4278 },
4279 },
4280 },
4281 },
4282 pods: []v1.Pod{
4283 *buildPod().uid("a").index("0").status(v1.PodStatus{
4284 Phase: v1.PodFailed,
4285 Conditions: []v1.PodCondition{{
4286 Type: v1.DisruptionTarget,
4287 Status: v1.ConditionTrue,
4288 }},
4289 }).Pod,
4290 *buildPod().uid("b").index("1").phase(v1.PodSucceeded).trackingFinalizer().Pod,
4291 },
4292 wantStatus: batch.JobStatus{
4293 Failed: 1,
4294 Succeeded: 1,
4295 Terminating: ptr.To[int32](0),
4296 CompletedIndexes: "1",
4297 UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
4298 Conditions: []batch.JobCondition{
4299 {
4300 Type: batch.JobFailureTarget,
4301 Status: v1.ConditionTrue,
4302 Reason: batch.JobReasonPodFailurePolicy,
4303 Message: "Pod default/mypod-0 has condition DisruptionTarget matching FailJob rule at index 0",
4304 },
4305 {
4306 Type: batch.JobFailed,
4307 Status: v1.ConditionTrue,
4308 Reason: batch.JobReasonPodFailurePolicy,
4309 Message: "Pod default/mypod-0 has condition DisruptionTarget matching FailJob rule at index 0",
4310 },
4311 },
4312 },
4313 },
4314 }
4315 for name, tc := range testCases {
4316 t.Run(name, func(t *testing.T) {
4317 defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, tc.enableJobFailurePolicy)()
4318 defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, tc.enableBackoffLimitPerIndex)()
4319 defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobSuccessPolicy, tc.enableJobSuccessPolicy)()
4320
4321 clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
4322 fakeClock := clocktesting.NewFakeClock(now)
4323 _, ctx := ktesting.NewTestContext(t)
4324 manager, sharedInformerFactory := newControllerFromClientWithClock(ctx, t, clientSet, controller.NoResyncPeriodFunc, fakeClock)
4325 manager.podControl = &controller.FakePodControl{}
4326 manager.podStoreSynced = alwaysReady
4327 manager.jobStoreSynced = alwaysReady
4328 job := &tc.job
4329
4330 actual := job
4331 manager.updateStatusHandler = func(_ context.Context, j *batch.Job) (*batch.Job, error) {
4332 actual = j
4333 return j, nil
4334 }
4335 if err := sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job); err != nil {
4336 t.Fatalf("Failed to add the Job %q to sharedInformer: %v", klog.KObj(job), err)
4337 }
4338 for i, pod := range tc.pods {
4339 pb := podBuilder{Pod: pod.DeepCopy()}.name(fmt.Sprintf("mypod-%d", i)).job(job)
4340 if isIndexedJob(job) {
4341 pb.index(strconv.Itoa(getCompletionIndex(pod.Annotations)))
4342 }
4343 if err := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer().Add(pb.Pod); err != nil {
4344 t.Fatalf("Failed to add the Pod %q to sharedInformer: %v", klog.KObj(pb.Pod), err)
4345 }
4346 }
4347
4348 if err := manager.syncJob(ctx, testutil.GetKey(job, t)); err != nil {
4349 t.Fatalf("Failed to complete syncJob: %v", err)
4350 }
4351
4352 if diff := cmp.Diff(tc.wantStatus, actual.Status,
4353 cmpopts.IgnoreFields(batch.JobStatus{}, "StartTime", "CompletionTime", "Ready"),
4354 cmpopts.IgnoreFields(batch.JobCondition{}, "LastProbeTime", "LastTransitionTime")); diff != "" {
4355 t.Errorf("Unexpectd Job status (-want,+got):\n%s", diff)
4356 }
4357 })
4358 }
4359 }
4360
4361 func TestSyncJobWithJobBackoffLimitPerIndex(t *testing.T) {
4362 _, ctx := ktesting.NewTestContext(t)
4363 now := time.Now()
4364 validObjectMeta := metav1.ObjectMeta{
4365 Name: "foobar",
4366 UID: uuid.NewUUID(),
4367 Namespace: metav1.NamespaceDefault,
4368 }
4369 validSelector := &metav1.LabelSelector{
4370 MatchLabels: map[string]string{"foo": "bar"},
4371 }
4372 validTemplate := v1.PodTemplateSpec{
4373 ObjectMeta: metav1.ObjectMeta{
4374 Labels: map[string]string{
4375 "foo": "bar",
4376 },
4377 },
4378 Spec: v1.PodSpec{
4379 Containers: []v1.Container{
4380 {Image: "foo/bar"},
4381 },
4382 },
4383 }
4384
4385 testCases := map[string]struct {
4386 enableJobBackoffLimitPerIndex bool
4387 enableJobPodFailurePolicy bool
4388 job batch.Job
4389 pods []v1.Pod
4390 wantStatus batch.JobStatus
4391 }{
4392 "successful job after a single failure within index": {
4393 enableJobBackoffLimitPerIndex: true,
4394 job: batch.Job{
4395 TypeMeta: metav1.TypeMeta{Kind: "Job"},
4396 ObjectMeta: validObjectMeta,
4397 Spec: batch.JobSpec{
4398 Selector: validSelector,
4399 Template: validTemplate,
4400 Parallelism: ptr.To[int32](2),
4401 Completions: ptr.To[int32](2),
4402 BackoffLimit: ptr.To[int32](math.MaxInt32),
4403 CompletionMode: completionModePtr(batch.IndexedCompletion),
4404 BackoffLimitPerIndex: ptr.To[int32](1),
4405 },
4406 },
4407 pods: []v1.Pod{
4408 *buildPod().uid("a1").index("0").phase(v1.PodFailed).indexFailureCount("0").trackingFinalizer().Pod,
4409 *buildPod().uid("a2").index("0").phase(v1.PodSucceeded).indexFailureCount("1").trackingFinalizer().Pod,
4410 *buildPod().uid("b").index("1").phase(v1.PodSucceeded).indexFailureCount("0").trackingFinalizer().Pod,
4411 },
4412 wantStatus: batch.JobStatus{
4413 Failed: 1,
4414 Succeeded: 2,
4415 Terminating: ptr.To[int32](0),
4416 CompletedIndexes: "0,1",
4417 FailedIndexes: ptr.To(""),
4418 UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
4419 Conditions: []batch.JobCondition{
4420 {
4421 Type: batch.JobComplete,
4422 Status: v1.ConditionTrue,
4423 },
4424 },
4425 },
4426 },
4427 "single failed pod, not counted as the replacement pod creation is delayed": {
4428 enableJobBackoffLimitPerIndex: true,
4429 job: batch.Job{
4430 TypeMeta: metav1.TypeMeta{Kind: "Job"},
4431 ObjectMeta: validObjectMeta,
4432 Spec: batch.JobSpec{
4433 Selector: validSelector,
4434 Template: validTemplate,
4435 Parallelism: ptr.To[int32](2),
4436 Completions: ptr.To[int32](2),
4437 BackoffLimit: ptr.To[int32](math.MaxInt32),
4438 CompletionMode: completionModePtr(batch.IndexedCompletion),
4439 BackoffLimitPerIndex: ptr.To[int32](1),
4440 },
4441 },
4442 pods: []v1.Pod{
4443 *buildPod().uid("a").index("0").phase(v1.PodFailed).indexFailureCount("0").trackingFinalizer().Pod,
4444 },
4445 wantStatus: batch.JobStatus{
4446 Active: 2,
4447 Terminating: ptr.To[int32](0),
4448 UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
4449 FailedIndexes: ptr.To(""),
4450 },
4451 },
4452 "single failed pod replaced already": {
4453 enableJobBackoffLimitPerIndex: true,
4454 job: batch.Job{
4455 TypeMeta: metav1.TypeMeta{Kind: "Job"},
4456 ObjectMeta: validObjectMeta,
4457 Spec: batch.JobSpec{
4458 Selector: validSelector,
4459 Template: validTemplate,
4460 Parallelism: ptr.To[int32](2),
4461 Completions: ptr.To[int32](2),
4462 BackoffLimit: ptr.To[int32](math.MaxInt32),
4463 CompletionMode: completionModePtr(batch.IndexedCompletion),
4464 BackoffLimitPerIndex: ptr.To[int32](1),
4465 },
4466 },
4467 pods: []v1.Pod{
4468 *buildPod().uid("a").index("0").phase(v1.PodFailed).indexFailureCount("0").trackingFinalizer().Pod,
4469 *buildPod().uid("b").index("0").phase(v1.PodPending).indexFailureCount("1").trackingFinalizer().Pod,
4470 },
4471 wantStatus: batch.JobStatus{
4472 Active: 2,
4473 Failed: 1,
4474 Terminating: ptr.To[int32](0),
4475 UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
4476 FailedIndexes: ptr.To(""),
4477 },
4478 },
4479 "single failed index due to exceeding the backoff limit per index, the job continues": {
4480 enableJobBackoffLimitPerIndex: true,
4481 job: batch.Job{
4482 TypeMeta: metav1.TypeMeta{Kind: "Job"},
4483 ObjectMeta: validObjectMeta,
4484 Spec: batch.JobSpec{
4485 Selector: validSelector,
4486 Template: validTemplate,
4487 Parallelism: ptr.To[int32](2),
4488 Completions: ptr.To[int32](2),
4489 BackoffLimit: ptr.To[int32](math.MaxInt32),
4490 CompletionMode: completionModePtr(batch.IndexedCompletion),
4491 BackoffLimitPerIndex: ptr.To[int32](1),
4492 },
4493 },
4494 pods: []v1.Pod{
4495 *buildPod().uid("a").index("0").phase(v1.PodFailed).indexFailureCount("1").trackingFinalizer().Pod,
4496 },
4497 wantStatus: batch.JobStatus{
4498 Active: 1,
4499 Failed: 1,
4500 FailedIndexes: ptr.To("0"),
4501 Terminating: ptr.To[int32](0),
4502 UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
4503 },
4504 },
4505 "single failed index due to FailIndex action, the job continues": {
4506 enableJobBackoffLimitPerIndex: true,
4507 enableJobPodFailurePolicy: true,
4508 job: batch.Job{
4509 TypeMeta: metav1.TypeMeta{Kind: "Job"},
4510 ObjectMeta: validObjectMeta,
4511 Spec: batch.JobSpec{
4512 Selector: validSelector,
4513 Template: validTemplate,
4514 Parallelism: ptr.To[int32](2),
4515 Completions: ptr.To[int32](2),
4516 BackoffLimit: ptr.To[int32](math.MaxInt32),
4517 CompletionMode: completionModePtr(batch.IndexedCompletion),
4518 BackoffLimitPerIndex: ptr.To[int32](1),
4519 PodFailurePolicy: &batch.PodFailurePolicy{
4520 Rules: []batch.PodFailurePolicyRule{
4521 {
4522 Action: batch.PodFailurePolicyActionFailIndex,
4523 OnExitCodes: &batch.PodFailurePolicyOnExitCodesRequirement{
4524 Operator: batch.PodFailurePolicyOnExitCodesOpIn,
4525 Values: []int32{3},
4526 },
4527 },
4528 },
4529 },
4530 },
4531 },
4532 pods: []v1.Pod{
4533 *buildPod().uid("a").index("0").status(v1.PodStatus{
4534 Phase: v1.PodFailed,
4535 ContainerStatuses: []v1.ContainerStatus{
4536 {
4537 State: v1.ContainerState{
4538 Terminated: &v1.ContainerStateTerminated{
4539 ExitCode: 3,
4540 },
4541 },
4542 },
4543 },
4544 }).indexFailureCount("0").trackingFinalizer().Pod,
4545 },
4546 wantStatus: batch.JobStatus{
4547 Active: 1,
4548 Failed: 1,
4549 FailedIndexes: ptr.To("0"),
4550 Terminating: ptr.To[int32](0),
4551 UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
4552 },
4553 },
4554 "job failed index due to FailJob action": {
4555 enableJobBackoffLimitPerIndex: true,
4556 enableJobPodFailurePolicy: true,
4557 job: batch.Job{
4558 TypeMeta: metav1.TypeMeta{Kind: "Job"},
4559 ObjectMeta: validObjectMeta,
4560 Spec: batch.JobSpec{
4561 Selector: validSelector,
4562 Template: validTemplate,
4563 Parallelism: ptr.To[int32](2),
4564 Completions: ptr.To[int32](2),
4565 BackoffLimit: ptr.To[int32](6),
4566 CompletionMode: completionModePtr(batch.IndexedCompletion),
4567 BackoffLimitPerIndex: ptr.To[int32](1),
4568 PodFailurePolicy: &batch.PodFailurePolicy{
4569 Rules: []batch.PodFailurePolicyRule{
4570 {
4571 Action: batch.PodFailurePolicyActionFailJob,
4572 OnExitCodes: &batch.PodFailurePolicyOnExitCodesRequirement{
4573 Operator: batch.PodFailurePolicyOnExitCodesOpIn,
4574 Values: []int32{3},
4575 },
4576 },
4577 },
4578 },
4579 },
4580 },
4581 pods: []v1.Pod{
4582 *buildPod().uid("a").index("0").status(v1.PodStatus{
4583 Phase: v1.PodFailed,
4584 ContainerStatuses: []v1.ContainerStatus{
4585 {
4586 Name: "x",
4587 State: v1.ContainerState{
4588 Terminated: &v1.ContainerStateTerminated{
4589 ExitCode: 3,
4590 },
4591 },
4592 },
4593 },
4594 }).indexFailureCount("0").trackingFinalizer().Pod,
4595 },
4596 wantStatus: batch.JobStatus{
4597 Active: 0,
4598 Failed: 1,
4599 FailedIndexes: ptr.To(""),
4600 Terminating: ptr.To[int32](0),
4601 UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
4602 Conditions: []batch.JobCondition{
4603 {
4604 Type: batch.JobFailureTarget,
4605 Status: v1.ConditionTrue,
4606 Reason: batch.JobReasonPodFailurePolicy,
4607 Message: "Container x for pod default/mypod-0 failed with exit code 3 matching FailJob rule at index 0",
4608 },
4609 {
4610 Type: batch.JobFailed,
4611 Status: v1.ConditionTrue,
4612 Reason: batch.JobReasonPodFailurePolicy,
4613 Message: "Container x for pod default/mypod-0 failed with exit code 3 matching FailJob rule at index 0",
4614 },
4615 },
4616 },
4617 },
4618 "job pod failure ignored due to matching Ignore action": {
4619 enableJobBackoffLimitPerIndex: true,
4620 enableJobPodFailurePolicy: true,
4621 job: batch.Job{
4622 TypeMeta: metav1.TypeMeta{Kind: "Job"},
4623 ObjectMeta: validObjectMeta,
4624 Spec: batch.JobSpec{
4625 Selector: validSelector,
4626 Template: validTemplate,
4627 Parallelism: ptr.To[int32](2),
4628 Completions: ptr.To[int32](2),
4629 BackoffLimit: ptr.To[int32](6),
4630 CompletionMode: completionModePtr(batch.IndexedCompletion),
4631 BackoffLimitPerIndex: ptr.To[int32](1),
4632 PodFailurePolicy: &batch.PodFailurePolicy{
4633 Rules: []batch.PodFailurePolicyRule{
4634 {
4635 Action: batch.PodFailurePolicyActionIgnore,
4636 OnExitCodes: &batch.PodFailurePolicyOnExitCodesRequirement{
4637 Operator: batch.PodFailurePolicyOnExitCodesOpIn,
4638 Values: []int32{3},
4639 },
4640 },
4641 },
4642 },
4643 },
4644 },
4645 pods: []v1.Pod{
4646 *buildPod().uid("a").index("0").status(v1.PodStatus{
4647 Phase: v1.PodFailed,
4648 ContainerStatuses: []v1.ContainerStatus{
4649 {
4650 Name: "x",
4651 State: v1.ContainerState{
4652 Terminated: &v1.ContainerStateTerminated{
4653 ExitCode: 3,
4654 },
4655 },
4656 },
4657 },
4658 }).indexFailureCount("0").trackingFinalizer().Pod,
4659 },
4660 wantStatus: batch.JobStatus{
4661 Active: 2,
4662 Failed: 0,
4663 FailedIndexes: ptr.To(""),
4664 Terminating: ptr.To[int32](0),
4665 UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
4666 },
4667 },
4668 "job failed due to exceeding backoffLimit before backoffLimitPerIndex": {
4669 enableJobBackoffLimitPerIndex: true,
4670 job: batch.Job{
4671 TypeMeta: metav1.TypeMeta{Kind: "Job"},
4672 ObjectMeta: validObjectMeta,
4673 Spec: batch.JobSpec{
4674 Selector: validSelector,
4675 Template: validTemplate,
4676 Parallelism: ptr.To[int32](2),
4677 Completions: ptr.To[int32](2),
4678 BackoffLimit: ptr.To[int32](1),
4679 CompletionMode: completionModePtr(batch.IndexedCompletion),
4680 BackoffLimitPerIndex: ptr.To[int32](1),
4681 },
4682 },
4683 pods: []v1.Pod{
4684 *buildPod().uid("a").index("0").phase(v1.PodFailed).indexFailureCount("0").trackingFinalizer().Pod,
4685 *buildPod().uid("b").index("1").phase(v1.PodFailed).indexFailureCount("0").trackingFinalizer().Pod,
4686 },
4687 wantStatus: batch.JobStatus{
4688 Failed: 2,
4689 Succeeded: 0,
4690 FailedIndexes: ptr.To(""),
4691 Terminating: ptr.To[int32](0),
4692 UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
4693 Conditions: []batch.JobCondition{
4694 {
4695 Type: batch.JobFailed,
4696 Status: v1.ConditionTrue,
4697 Reason: batch.JobReasonBackoffLimitExceeded,
4698 Message: "Job has reached the specified backoff limit",
4699 },
4700 },
4701 },
4702 },
4703 "job failed due to failed indexes": {
4704 enableJobBackoffLimitPerIndex: true,
4705 job: batch.Job{
4706 TypeMeta: metav1.TypeMeta{Kind: "Job"},
4707 ObjectMeta: validObjectMeta,
4708 Spec: batch.JobSpec{
4709 Selector: validSelector,
4710 Template: validTemplate,
4711 Parallelism: ptr.To[int32](2),
4712 Completions: ptr.To[int32](2),
4713 BackoffLimit: ptr.To[int32](math.MaxInt32),
4714 CompletionMode: completionModePtr(batch.IndexedCompletion),
4715 BackoffLimitPerIndex: ptr.To[int32](1),
4716 },
4717 },
4718 pods: []v1.Pod{
4719 *buildPod().uid("a").index("0").phase(v1.PodFailed).indexFailureCount("1").trackingFinalizer().Pod,
4720 *buildPod().uid("b").index("1").phase(v1.PodSucceeded).indexFailureCount("0").trackingFinalizer().Pod,
4721 },
4722 wantStatus: batch.JobStatus{
4723 Failed: 1,
4724 Succeeded: 1,
4725 Terminating: ptr.To[int32](0),
4726 FailedIndexes: ptr.To("0"),
4727 CompletedIndexes: "1",
4728 UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
4729 Conditions: []batch.JobCondition{
4730 {
4731 Type: batch.JobFailed,
4732 Status: v1.ConditionTrue,
4733 Reason: batch.JobReasonFailedIndexes,
4734 Message: "Job has failed indexes",
4735 },
4736 },
4737 },
4738 },
4739 "job failed due to exceeding max failed indexes": {
4740 enableJobBackoffLimitPerIndex: true,
4741 job: batch.Job{
4742 TypeMeta: metav1.TypeMeta{Kind: "Job"},
4743 ObjectMeta: validObjectMeta,
4744 Spec: batch.JobSpec{
4745 Selector: validSelector,
4746 Template: validTemplate,
4747 Parallelism: ptr.To[int32](4),
4748 Completions: ptr.To[int32](4),
4749 BackoffLimit: ptr.To[int32](math.MaxInt32),
4750 CompletionMode: completionModePtr(batch.IndexedCompletion),
4751 BackoffLimitPerIndex: ptr.To[int32](1),
4752 MaxFailedIndexes: ptr.To[int32](1),
4753 },
4754 },
4755 pods: []v1.Pod{
4756 *buildPod().uid("a").index("0").phase(v1.PodFailed).indexFailureCount("1").trackingFinalizer().Pod,
4757 *buildPod().uid("b").index("1").phase(v1.PodSucceeded).indexFailureCount("0").trackingFinalizer().Pod,
4758 *buildPod().uid("c").index("2").phase(v1.PodFailed).indexFailureCount("1").trackingFinalizer().Pod,
4759 *buildPod().uid("d").index("3").phase(v1.PodRunning).indexFailureCount("0").trackingFinalizer().Pod,
4760 },
4761 wantStatus: batch.JobStatus{
4762 Failed: 3,
4763 Succeeded: 1,
4764 Terminating: ptr.To[int32](0),
4765 FailedIndexes: ptr.To("0,2"),
4766 CompletedIndexes: "1",
4767 UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
4768 Conditions: []batch.JobCondition{
4769 {
4770 Type: batch.JobFailed,
4771 Status: v1.ConditionTrue,
4772 Reason: batch.JobReasonMaxFailedIndexesExceeded,
4773 Message: "Job has exceeded the specified maximal number of failed indexes",
4774 },
4775 },
4776 },
4777 },
4778 "job with finished indexes; failedIndexes are cleaned when JobBackoffLimitPerIndex disabled": {
4779 enableJobBackoffLimitPerIndex: false,
4780 job: batch.Job{
4781 TypeMeta: metav1.TypeMeta{Kind: "Job"},
4782 ObjectMeta: validObjectMeta,
4783 Spec: batch.JobSpec{
4784 Selector: validSelector,
4785 Template: validTemplate,
4786 Parallelism: ptr.To[int32](3),
4787 Completions: ptr.To[int32](3),
4788 BackoffLimit: ptr.To[int32](math.MaxInt32),
4789 CompletionMode: completionModePtr(batch.IndexedCompletion),
4790 BackoffLimitPerIndex: ptr.To[int32](1),
4791 },
4792 Status: batch.JobStatus{
4793 FailedIndexes: ptr.To("0"),
4794 CompletedIndexes: "1",
4795 },
4796 },
4797 pods: []v1.Pod{
4798 *buildPod().uid("c").index("2").phase(v1.PodPending).indexFailureCount("1").trackingFinalizer().Pod,
4799 },
4800 wantStatus: batch.JobStatus{
4801 Active: 2,
4802 Succeeded: 1,
4803 Terminating: ptr.To[int32](0),
4804 CompletedIndexes: "1",
4805 UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
4806 },
4807 },
4808 }
4809 for name, tc := range testCases {
4810 t.Run(name, func(t *testing.T) {
4811 defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, tc.enableJobBackoffLimitPerIndex)()
4812 defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, tc.enableJobPodFailurePolicy)()
4813 clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
4814 fakeClock := clocktesting.NewFakeClock(now)
4815 manager, sharedInformerFactory := newControllerFromClientWithClock(ctx, t, clientset, controller.NoResyncPeriodFunc, fakeClock)
4816 fakePodControl := controller.FakePodControl{}
4817 manager.podControl = &fakePodControl
4818 manager.podStoreSynced = alwaysReady
4819 manager.jobStoreSynced = alwaysReady
4820 job := &tc.job
4821
4822 actual := job
4823 manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) {
4824 actual = job
4825 return job, nil
4826 }
4827 sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
4828 for i, pod := range tc.pods {
4829 pod := pod
4830 pb := podBuilder{Pod: &pod}.name(fmt.Sprintf("mypod-%d", i)).job(job)
4831 if job.Spec.CompletionMode != nil && *job.Spec.CompletionMode == batch.IndexedCompletion {
4832 pb.index(fmt.Sprintf("%v", getCompletionIndex(pod.Annotations)))
4833 }
4834 pb = pb.trackingFinalizer()
4835 sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer().Add(pb.Pod)
4836 }
4837
4838 manager.syncJob(context.TODO(), testutil.GetKey(job, t))
4839
4840
4841 if diff := cmp.Diff(tc.wantStatus, actual.Status,
4842 cmpopts.IgnoreFields(batch.JobStatus{}, "StartTime", "CompletionTime", "Ready"),
4843 cmpopts.IgnoreFields(batch.JobCondition{}, "LastProbeTime", "LastTransitionTime")); diff != "" {
4844 t.Errorf("unexpected job status. Diff: %s\n", diff)
4845 }
4846 })
4847 }
4848 }
4849
4850 func TestSyncJobUpdateRequeue(t *testing.T) {
4851 _, ctx := ktesting.NewTestContext(t)
4852 clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
4853 cases := map[string]struct {
4854 updateErr error
4855 wantRequeued bool
4856 }{
4857 "no error": {
4858 wantRequeued: false,
4859 },
4860 "generic error": {
4861 updateErr: fmt.Errorf("update error"),
4862 wantRequeued: true,
4863 },
4864 "conflict error": {
4865 updateErr: apierrors.NewConflict(schema.GroupResource{}, "", nil),
4866 wantRequeued: true,
4867 },
4868 }
4869 for name, tc := range cases {
4870 t.Run(name, func(t *testing.T) {
4871 t.Cleanup(setDurationDuringTest(&DefaultJobApiBackOff, fastJobApiBackoff))
4872 fakeClient := clocktesting.NewFakeClock(time.Now())
4873 manager, sharedInformerFactory := newControllerFromClientWithClock(ctx, t, clientset, controller.NoResyncPeriodFunc, fakeClient)
4874 fakePodControl := controller.FakePodControl{}
4875 manager.podControl = &fakePodControl
4876 manager.podStoreSynced = alwaysReady
4877 manager.jobStoreSynced = alwaysReady
4878 manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) {
4879 return job, tc.updateErr
4880 }
4881 job := newJob(2, 2, 6, batch.NonIndexedCompletion)
4882 sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
4883 manager.queue.Add(testutil.GetKey(job, t))
4884 manager.processNextWorkItem(context.TODO())
4885 if tc.wantRequeued {
4886 verifyEmptyQueueAndAwaitForQueueLen(ctx, t, manager, 1)
4887 } else {
4888
4889
4890
4891
4892 manager.clock.Sleep(fastJobApiBackoff)
4893 time.Sleep(time.Millisecond)
4894 verifyEmptyQueue(ctx, t, manager)
4895 }
4896 })
4897 }
4898 }
4899
4900 func TestUpdateJobRequeue(t *testing.T) {
4901 logger, ctx := ktesting.NewTestContext(t)
4902 clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
4903 cases := map[string]struct {
4904 oldJob *batch.Job
4905 updateFn func(job *batch.Job)
4906 wantRequeuedImmediately bool
4907 }{
4908 "spec update": {
4909 oldJob: newJob(1, 1, 1, batch.IndexedCompletion),
4910 updateFn: func(job *batch.Job) {
4911 job.Spec.Suspend = ptr.To(false)
4912 job.Generation++
4913 },
4914 wantRequeuedImmediately: true,
4915 },
4916 "status update": {
4917 oldJob: newJob(1, 1, 1, batch.IndexedCompletion),
4918 updateFn: func(job *batch.Job) {
4919 job.Status.StartTime = &metav1.Time{Time: time.Now()}
4920 },
4921 wantRequeuedImmediately: false,
4922 },
4923 }
4924 for name, tc := range cases {
4925 t.Run(name, func(t *testing.T) {
4926 manager, sharedInformerFactory := newControllerFromClient(ctx, t, clientset, controller.NoResyncPeriodFunc)
4927 manager.podStoreSynced = alwaysReady
4928 manager.jobStoreSynced = alwaysReady
4929
4930 sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(tc.oldJob)
4931 newJob := tc.oldJob.DeepCopy()
4932 if tc.updateFn != nil {
4933 tc.updateFn(newJob)
4934 }
4935 manager.updateJob(logger, tc.oldJob, newJob)
4936 gotRequeuedImmediately := manager.queue.Len() > 0
4937 if tc.wantRequeuedImmediately != gotRequeuedImmediately {
4938 t.Fatalf("Want immediate requeue: %v, got immediate requeue: %v", tc.wantRequeuedImmediately, gotRequeuedImmediately)
4939 }
4940 })
4941 }
4942 }
4943
4944 func TestGetPodCreationInfoForIndependentIndexes(t *testing.T) {
4945 logger, ctx := ktesting.NewTestContext(t)
4946 now := time.Now()
4947 clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
4948 cases := map[string]struct {
4949 indexesToAdd []int
4950 podsWithDelayedDeletionPerIndex map[int]*v1.Pod
4951 wantIndexesToAdd []int
4952 wantRemainingTime time.Duration
4953 }{
4954 "simple index creation": {
4955 indexesToAdd: []int{1, 3},
4956 wantIndexesToAdd: []int{1, 3},
4957 },
4958 "subset of indexes can be recreated now": {
4959 indexesToAdd: []int{1, 3},
4960 podsWithDelayedDeletionPerIndex: map[int]*v1.Pod{
4961 1: buildPod().indexFailureCount("0").index("1").customDeletionTimestamp(now).Pod,
4962 },
4963 wantIndexesToAdd: []int{3},
4964 },
4965 "subset of indexes can be recreated now as the pods failed long time ago": {
4966 indexesToAdd: []int{1, 3},
4967 podsWithDelayedDeletionPerIndex: map[int]*v1.Pod{
4968 1: buildPod().indexFailureCount("0").customDeletionTimestamp(now).Pod,
4969 3: buildPod().indexFailureCount("0").customDeletionTimestamp(now.Add(-DefaultJobPodFailureBackOff)).Pod,
4970 },
4971 wantIndexesToAdd: []int{3},
4972 },
4973 "no indexes can be recreated now, need to wait default pod failure backoff": {
4974 indexesToAdd: []int{1, 2, 3},
4975 podsWithDelayedDeletionPerIndex: map[int]*v1.Pod{
4976 1: buildPod().indexFailureCount("1").customDeletionTimestamp(now).Pod,
4977 2: buildPod().indexFailureCount("0").customDeletionTimestamp(now).Pod,
4978 3: buildPod().indexFailureCount("2").customDeletionTimestamp(now).Pod,
4979 },
4980 wantRemainingTime: DefaultJobPodFailureBackOff,
4981 },
4982 "no indexes can be recreated now, need to wait but 1s already passed": {
4983 indexesToAdd: []int{1, 2, 3},
4984 podsWithDelayedDeletionPerIndex: map[int]*v1.Pod{
4985 1: buildPod().indexFailureCount("1").customDeletionTimestamp(now.Add(-time.Second)).Pod,
4986 2: buildPod().indexFailureCount("0").customDeletionTimestamp(now.Add(-time.Second)).Pod,
4987 3: buildPod().indexFailureCount("2").customDeletionTimestamp(now.Add(-time.Second)).Pod,
4988 },
4989 wantRemainingTime: DefaultJobPodFailureBackOff - time.Second,
4990 },
4991 }
4992 for name, tc := range cases {
4993 t.Run(name, func(t *testing.T) {
4994 fakeClock := clocktesting.NewFakeClock(now)
4995 manager, _ := newControllerFromClientWithClock(ctx, t, clientset, controller.NoResyncPeriodFunc, fakeClock)
4996 gotIndexesToAdd, gotRemainingTime := manager.getPodCreationInfoForIndependentIndexes(logger, tc.indexesToAdd, tc.podsWithDelayedDeletionPerIndex)
4997 if diff := cmp.Diff(tc.wantIndexesToAdd, gotIndexesToAdd); diff != "" {
4998 t.Fatalf("Unexpected indexes to add: %s", diff)
4999 }
5000 if diff := cmp.Diff(tc.wantRemainingTime, gotRemainingTime); diff != "" {
5001 t.Fatalf("Unexpected remaining time: %s", diff)
5002 }
5003 })
5004 }
5005 }
5006
5007 func TestJobPodLookup(t *testing.T) {
5008 _, ctx := ktesting.NewTestContext(t)
5009 clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
5010 manager, sharedInformerFactory := newControllerFromClient(ctx, t, clientset, controller.NoResyncPeriodFunc)
5011 manager.podStoreSynced = alwaysReady
5012 manager.jobStoreSynced = alwaysReady
5013 testCases := []struct {
5014 job *batch.Job
5015 pod *v1.Pod
5016
5017 expectedName string
5018 }{
5019
5020 {
5021 job: &batch.Job{
5022 ObjectMeta: metav1.ObjectMeta{Name: "basic"},
5023 },
5024 pod: &v1.Pod{
5025 ObjectMeta: metav1.ObjectMeta{Name: "foo1", Namespace: metav1.NamespaceAll},
5026 },
5027 expectedName: "",
5028 },
5029
5030 {
5031 job: &batch.Job{
5032 ObjectMeta: metav1.ObjectMeta{Name: "foo"},
5033 Spec: batch.JobSpec{
5034 Selector: &metav1.LabelSelector{
5035 MatchLabels: map[string]string{"foo": "bar"},
5036 },
5037 },
5038 },
5039 pod: &v1.Pod{
5040 ObjectMeta: metav1.ObjectMeta{
5041 Name: "foo2",
5042 Namespace: "ns",
5043 Labels: map[string]string{"foo": "bar"},
5044 },
5045 },
5046 expectedName: "",
5047 },
5048
5049 {
5050 job: &batch.Job{
5051 ObjectMeta: metav1.ObjectMeta{Name: "bar", Namespace: "ns"},
5052 Spec: batch.JobSpec{
5053 Selector: &metav1.LabelSelector{
5054 MatchExpressions: []metav1.LabelSelectorRequirement{
5055 {
5056 Key: "foo",
5057 Operator: metav1.LabelSelectorOpIn,
5058 Values: []string{"bar"},
5059 },
5060 },
5061 },
5062 },
5063 },
5064 pod: &v1.Pod{
5065 ObjectMeta: metav1.ObjectMeta{
5066 Name: "foo3",
5067 Namespace: "ns",
5068 Labels: map[string]string{"foo": "bar"},
5069 },
5070 },
5071 expectedName: "bar",
5072 },
5073 }
5074 for _, tc := range testCases {
5075 sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(tc.job)
5076 if jobs := manager.getPodJobs(tc.pod); len(jobs) > 0 {
5077 if got, want := len(jobs), 1; got != want {
5078 t.Errorf("len(jobs) = %v, want %v", got, want)
5079 }
5080 job := jobs[0]
5081 if tc.expectedName != job.Name {
5082 t.Errorf("Got job %+v expected %+v", job.Name, tc.expectedName)
5083 }
5084 } else if tc.expectedName != "" {
5085 t.Errorf("Expected a job %v pod %v, found none", tc.expectedName, tc.pod.Name)
5086 }
5087 }
5088 }
5089
5090 func TestGetPodsForJob(t *testing.T) {
5091 _, ctx := ktesting.NewTestContext(t)
5092 job := newJob(1, 1, 6, batch.NonIndexedCompletion)
5093 job.Name = "test_job"
5094 otherJob := newJob(1, 1, 6, batch.NonIndexedCompletion)
5095 otherJob.Name = "other_job"
5096 cases := map[string]struct {
5097 jobDeleted bool
5098 jobDeletedInCache bool
5099 pods []*v1.Pod
5100 wantPods []string
5101 wantPodsFinalizer []string
5102 }{
5103 "only matching": {
5104 pods: []*v1.Pod{
5105 buildPod().name("pod1").job(job).trackingFinalizer().Pod,
5106 buildPod().name("pod2").job(otherJob).Pod,
5107 buildPod().name("pod3").ns(job.Namespace).Pod,
5108 buildPod().name("pod4").job(job).Pod,
5109 },
5110 wantPods: []string{"pod1", "pod4"},
5111 wantPodsFinalizer: []string{"pod1"},
5112 },
5113 "adopt": {
5114 pods: []*v1.Pod{
5115 buildPod().name("pod1").job(job).Pod,
5116 buildPod().name("pod2").job(job).clearOwner().Pod,
5117 buildPod().name("pod3").job(otherJob).Pod,
5118 },
5119 wantPods: []string{"pod1", "pod2"},
5120 wantPodsFinalizer: []string{"pod2"},
5121 },
5122 "no adopt when deleting": {
5123 jobDeleted: true,
5124 jobDeletedInCache: true,
5125 pods: []*v1.Pod{
5126 buildPod().name("pod1").job(job).Pod,
5127 buildPod().name("pod2").job(job).clearOwner().Pod,
5128 },
5129 wantPods: []string{"pod1"},
5130 },
5131 "no adopt when deleting race": {
5132 jobDeleted: true,
5133 pods: []*v1.Pod{
5134 buildPod().name("pod1").job(job).Pod,
5135 buildPod().name("pod2").job(job).clearOwner().Pod,
5136 },
5137 wantPods: []string{"pod1"},
5138 },
5139 "release": {
5140 pods: []*v1.Pod{
5141 buildPod().name("pod1").job(job).Pod,
5142 buildPod().name("pod2").job(job).clearLabels().Pod,
5143 },
5144 wantPods: []string{"pod1"},
5145 },
5146 }
5147 for name, tc := range cases {
5148 t.Run(name, func(t *testing.T) {
5149 job := job.DeepCopy()
5150 if tc.jobDeleted {
5151 job.DeletionTimestamp = &metav1.Time{}
5152 }
5153 clientSet := fake.NewSimpleClientset(job, otherJob)
5154 jm, informer := newControllerFromClient(ctx, t, clientSet, controller.NoResyncPeriodFunc)
5155 jm.podStoreSynced = alwaysReady
5156 jm.jobStoreSynced = alwaysReady
5157 cachedJob := job.DeepCopy()
5158 if tc.jobDeletedInCache {
5159 cachedJob.DeletionTimestamp = &metav1.Time{}
5160 }
5161 informer.Batch().V1().Jobs().Informer().GetIndexer().Add(cachedJob)
5162 informer.Batch().V1().Jobs().Informer().GetIndexer().Add(otherJob)
5163 for _, p := range tc.pods {
5164 informer.Core().V1().Pods().Informer().GetIndexer().Add(p)
5165 }
5166
5167 pods, err := jm.getPodsForJob(context.TODO(), job)
5168 if err != nil {
5169 t.Fatalf("getPodsForJob() error: %v", err)
5170 }
5171 got := make([]string, len(pods))
5172 var gotFinalizer []string
5173 for i, p := range pods {
5174 got[i] = p.Name
5175 if hasJobTrackingFinalizer(p) {
5176 gotFinalizer = append(gotFinalizer, p.Name)
5177 }
5178 }
5179 sort.Strings(got)
5180 if diff := cmp.Diff(tc.wantPods, got); diff != "" {
5181 t.Errorf("getPodsForJob() returned (-want,+got):\n%s", diff)
5182 }
5183 sort.Strings(gotFinalizer)
5184 if diff := cmp.Diff(tc.wantPodsFinalizer, gotFinalizer); diff != "" {
5185 t.Errorf("Pods with finalizers (-want,+got):\n%s", diff)
5186 }
5187 })
5188 }
5189 }
5190
5191 func TestAddPod(t *testing.T) {
5192 t.Cleanup(setDurationDuringTest(&syncJobBatchPeriod, fastSyncJobBatchPeriod))
5193 _, ctx := ktesting.NewTestContext(t)
5194 logger := klog.FromContext(ctx)
5195
5196 clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
5197 fakeClock := clocktesting.NewFakeClock(time.Now())
5198 jm, informer := newControllerFromClientWithClock(ctx, t, clientset, controller.NoResyncPeriodFunc, fakeClock)
5199 jm.podStoreSynced = alwaysReady
5200 jm.jobStoreSynced = alwaysReady
5201
5202 job1 := newJob(1, 1, 6, batch.NonIndexedCompletion)
5203 job1.Name = "job1"
5204 job2 := newJob(1, 1, 6, batch.NonIndexedCompletion)
5205 job2.Name = "job2"
5206 informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
5207 informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2)
5208
5209 pod1 := newPod("pod1", job1)
5210 pod2 := newPod("pod2", job2)
5211 informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
5212 informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2)
5213
5214 jm.addPod(logger, pod1)
5215 verifyEmptyQueueAndAwaitForQueueLen(ctx, t, jm, 1)
5216 key, done := jm.queue.Get()
5217 if key == nil || done {
5218 t.Fatalf("failed to enqueue controller for pod %v", pod1.Name)
5219 }
5220 expectedKey, _ := controller.KeyFunc(job1)
5221 if got, want := key.(string), expectedKey; got != want {
5222 t.Errorf("queue.Get() = %v, want %v", got, want)
5223 }
5224
5225 jm.addPod(logger, pod2)
5226 verifyEmptyQueueAndAwaitForQueueLen(ctx, t, jm, 1)
5227 key, done = jm.queue.Get()
5228 if key == nil || done {
5229 t.Fatalf("failed to enqueue controller for pod %v", pod2.Name)
5230 }
5231 expectedKey, _ = controller.KeyFunc(job2)
5232 if got, want := key.(string), expectedKey; got != want {
5233 t.Errorf("queue.Get() = %v, want %v", got, want)
5234 }
5235 }
5236
5237 func TestAddPodOrphan(t *testing.T) {
5238 t.Cleanup(setDurationDuringTest(&syncJobBatchPeriod, fastSyncJobBatchPeriod))
5239 logger, ctx := ktesting.NewTestContext(t)
5240 clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
5241 fakeClock := clocktesting.NewFakeClock(time.Now())
5242 jm, informer := newControllerFromClientWithClock(ctx, t, clientset, controller.NoResyncPeriodFunc, fakeClock)
5243 jm.podStoreSynced = alwaysReady
5244 jm.jobStoreSynced = alwaysReady
5245
5246 job1 := newJob(1, 1, 6, batch.NonIndexedCompletion)
5247 job1.Name = "job1"
5248 job2 := newJob(1, 1, 6, batch.NonIndexedCompletion)
5249 job2.Name = "job2"
5250 job3 := newJob(1, 1, 6, batch.NonIndexedCompletion)
5251 job3.Name = "job3"
5252 job3.Spec.Selector.MatchLabels = map[string]string{"other": "labels"}
5253 informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
5254 informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2)
5255 informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job3)
5256
5257 pod1 := newPod("pod1", job1)
5258
5259 pod1.OwnerReferences = nil
5260 informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
5261
5262 jm.addPod(logger, pod1)
5263 verifyEmptyQueueAndAwaitForQueueLen(ctx, t, jm, 2)
5264 }
5265
5266 func TestUpdatePod(t *testing.T) {
5267 t.Cleanup(setDurationDuringTest(&syncJobBatchPeriod, fastSyncJobBatchPeriod))
5268 _, ctx := ktesting.NewTestContext(t)
5269 logger := klog.FromContext(ctx)
5270 clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
5271 fakeClock := clocktesting.NewFakeClock(time.Now())
5272 jm, informer := newControllerFromClientWithClock(ctx, t, clientset, controller.NoResyncPeriodFunc, fakeClock)
5273 jm.podStoreSynced = alwaysReady
5274 jm.jobStoreSynced = alwaysReady
5275
5276 job1 := newJob(1, 1, 6, batch.NonIndexedCompletion)
5277 job1.Name = "job1"
5278 job2 := newJob(1, 1, 6, batch.NonIndexedCompletion)
5279 job2.Name = "job2"
5280 informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
5281 informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2)
5282
5283 pod1 := newPod("pod1", job1)
5284 pod2 := newPod("pod2", job2)
5285 informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
5286 informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2)
5287
5288 prev := *pod1
5289 bumpResourceVersion(pod1)
5290 jm.updatePod(logger, &prev, pod1)
5291 verifyEmptyQueueAndAwaitForQueueLen(ctx, t, jm, 1)
5292 key, done := jm.queue.Get()
5293 if key == nil || done {
5294 t.Fatalf("failed to enqueue controller for pod %v", pod1.Name)
5295 }
5296 expectedKey, _ := controller.KeyFunc(job1)
5297 if got, want := key.(string), expectedKey; got != want {
5298 t.Errorf("queue.Get() = %v, want %v", got, want)
5299 }
5300
5301 prev = *pod2
5302 bumpResourceVersion(pod2)
5303 jm.updatePod(logger, &prev, pod2)
5304 verifyEmptyQueueAndAwaitForQueueLen(ctx, t, jm, 1)
5305 key, done = jm.queue.Get()
5306 if key == nil || done {
5307 t.Fatalf("failed to enqueue controller for pod %v", pod2.Name)
5308 }
5309 expectedKey, _ = controller.KeyFunc(job2)
5310 if got, want := key.(string), expectedKey; got != want {
5311 t.Errorf("queue.Get() = %v, want %v", got, want)
5312 }
5313 }
5314
5315 func TestUpdatePodOrphanWithNewLabels(t *testing.T) {
5316 t.Cleanup(setDurationDuringTest(&syncJobBatchPeriod, fastSyncJobBatchPeriod))
5317 logger, ctx := ktesting.NewTestContext(t)
5318 clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
5319 fakeClock := clocktesting.NewFakeClock(time.Now())
5320 jm, informer := newControllerFromClientWithClock(ctx, t, clientset, controller.NoResyncPeriodFunc, fakeClock)
5321 jm.podStoreSynced = alwaysReady
5322 jm.jobStoreSynced = alwaysReady
5323
5324 job1 := newJob(1, 1, 6, batch.NonIndexedCompletion)
5325 job1.Name = "job1"
5326 job2 := newJob(1, 1, 6, batch.NonIndexedCompletion)
5327 job2.Name = "job2"
5328 informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
5329 informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2)
5330
5331 pod1 := newPod("pod1", job1)
5332 pod1.OwnerReferences = nil
5333 informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
5334
5335
5336 prev := *pod1
5337 prev.Labels = map[string]string{"foo2": "bar2"}
5338 bumpResourceVersion(pod1)
5339 jm.updatePod(logger, &prev, pod1)
5340 verifyEmptyQueueAndAwaitForQueueLen(ctx, t, jm, 2)
5341 }
5342
5343 func TestUpdatePodChangeControllerRef(t *testing.T) {
5344 t.Cleanup(setDurationDuringTest(&syncJobBatchPeriod, fastSyncJobBatchPeriod))
5345 _, ctx := ktesting.NewTestContext(t)
5346 logger := klog.FromContext(ctx)
5347 clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
5348 fakeClock := clocktesting.NewFakeClock(time.Now())
5349 jm, informer := newControllerFromClientWithClock(ctx, t, clientset, controller.NoResyncPeriodFunc, fakeClock)
5350 jm.podStoreSynced = alwaysReady
5351 jm.jobStoreSynced = alwaysReady
5352
5353 job1 := newJob(1, 1, 6, batch.NonIndexedCompletion)
5354 job1.Name = "job1"
5355 job2 := newJob(1, 1, 6, batch.NonIndexedCompletion)
5356 job2.Name = "job2"
5357 informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
5358 informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2)
5359
5360 pod1 := newPod("pod1", job1)
5361 informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
5362
5363
5364 prev := *pod1
5365 prev.OwnerReferences = []metav1.OwnerReference{*metav1.NewControllerRef(job2, controllerKind)}
5366 bumpResourceVersion(pod1)
5367 jm.updatePod(logger, &prev, pod1)
5368 verifyEmptyQueueAndAwaitForQueueLen(ctx, t, jm, 2)
5369 }
5370
5371 func TestUpdatePodRelease(t *testing.T) {
5372 t.Cleanup(setDurationDuringTest(&syncJobBatchPeriod, fastSyncJobBatchPeriod))
5373 _, ctx := ktesting.NewTestContext(t)
5374 logger := klog.FromContext(ctx)
5375 clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
5376 fakeClock := clocktesting.NewFakeClock(time.Now())
5377 jm, informer := newControllerFromClientWithClock(ctx, t, clientset, controller.NoResyncPeriodFunc, fakeClock)
5378 jm.podStoreSynced = alwaysReady
5379 jm.jobStoreSynced = alwaysReady
5380
5381 job1 := newJob(1, 1, 6, batch.NonIndexedCompletion)
5382 job1.Name = "job1"
5383 job2 := newJob(1, 1, 6, batch.NonIndexedCompletion)
5384 job2.Name = "job2"
5385 informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
5386 informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2)
5387
5388 pod1 := newPod("pod1", job1)
5389 informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
5390
5391
5392 prev := *pod1
5393 pod1.OwnerReferences = nil
5394 bumpResourceVersion(pod1)
5395 jm.updatePod(logger, &prev, pod1)
5396 verifyEmptyQueueAndAwaitForQueueLen(ctx, t, jm, 2)
5397 }
5398
5399 func TestDeletePod(t *testing.T) {
5400 t.Cleanup(setDurationDuringTest(&syncJobBatchPeriod, fastSyncJobBatchPeriod))
5401 logger, ctx := ktesting.NewTestContext(t)
5402 clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
5403 fakeClock := clocktesting.NewFakeClock(time.Now())
5404 jm, informer := newControllerFromClientWithClock(ctx, t, clientset, controller.NoResyncPeriodFunc, fakeClock)
5405 jm.podStoreSynced = alwaysReady
5406 jm.jobStoreSynced = alwaysReady
5407
5408 job1 := newJob(1, 1, 6, batch.NonIndexedCompletion)
5409 job1.Name = "job1"
5410 job2 := newJob(1, 1, 6, batch.NonIndexedCompletion)
5411 job2.Name = "job2"
5412 informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
5413 informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2)
5414
5415 pod1 := newPod("pod1", job1)
5416 pod2 := newPod("pod2", job2)
5417 informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
5418 informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2)
5419
5420 jm.deletePod(logger, pod1, true)
5421 verifyEmptyQueueAndAwaitForQueueLen(ctx, t, jm, 1)
5422 key, done := jm.queue.Get()
5423 if key == nil || done {
5424 t.Fatalf("failed to enqueue controller for pod %v", pod1.Name)
5425 }
5426 expectedKey, _ := controller.KeyFunc(job1)
5427 if got, want := key.(string), expectedKey; got != want {
5428 t.Errorf("queue.Get() = %v, want %v", got, want)
5429 }
5430
5431 jm.deletePod(logger, pod2, true)
5432 verifyEmptyQueueAndAwaitForQueueLen(ctx, t, jm, 1)
5433 key, done = jm.queue.Get()
5434 if key == nil || done {
5435 t.Fatalf("failed to enqueue controller for pod %v", pod2.Name)
5436 }
5437 expectedKey, _ = controller.KeyFunc(job2)
5438 if got, want := key.(string), expectedKey; got != want {
5439 t.Errorf("queue.Get() = %v, want %v", got, want)
5440 }
5441 }
5442
5443 func TestDeletePodOrphan(t *testing.T) {
5444
5445 t.Cleanup(setDurationDuringTest(&syncJobBatchPeriod, 0))
5446 logger, ctx := ktesting.NewTestContext(t)
5447 clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
5448 jm, informer := newControllerFromClient(ctx, t, clientset, controller.NoResyncPeriodFunc)
5449 jm.podStoreSynced = alwaysReady
5450 jm.jobStoreSynced = alwaysReady
5451
5452 job1 := newJob(1, 1, 6, batch.NonIndexedCompletion)
5453 job1.Name = "job1"
5454 job2 := newJob(1, 1, 6, batch.NonIndexedCompletion)
5455 job2.Name = "job2"
5456 job3 := newJob(1, 1, 6, batch.NonIndexedCompletion)
5457 job3.Name = "job3"
5458 job3.Spec.Selector.MatchLabels = map[string]string{"other": "labels"}
5459 informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
5460 informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2)
5461 informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job3)
5462
5463 pod1 := newPod("pod1", job1)
5464 pod1.OwnerReferences = nil
5465 informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
5466
5467 jm.deletePod(logger, pod1, true)
5468 if got, want := jm.queue.Len(), 0; got != want {
5469 t.Fatalf("queue.Len() = %v, want %v", got, want)
5470 }
5471 }
5472
5473 type FakeJobExpectations struct {
5474 *controller.ControllerExpectations
5475 satisfied bool
5476 expSatisfied func()
5477 }
5478
5479 func (fe FakeJobExpectations) SatisfiedExpectations(logger klog.Logger, controllerKey string) bool {
5480 fe.expSatisfied()
5481 return fe.satisfied
5482 }
5483
5484
5485
5486 func TestSyncJobExpectations(t *testing.T) {
5487 _, ctx := ktesting.NewTestContext(t)
5488 clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
5489 manager, sharedInformerFactory := newControllerFromClient(ctx, t, clientset, controller.NoResyncPeriodFunc)
5490 fakePodControl := controller.FakePodControl{}
5491 manager.podControl = &fakePodControl
5492 manager.podStoreSynced = alwaysReady
5493 manager.jobStoreSynced = alwaysReady
5494 manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) {
5495 return job, nil
5496 }
5497
5498 job := newJob(2, 2, 6, batch.NonIndexedCompletion)
5499 sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
5500 pods := newPodList(2, v1.PodPending, job)
5501 podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer()
5502 podIndexer.Add(pods[0])
5503
5504 manager.expectations = FakeJobExpectations{
5505 controller.NewControllerExpectations(), true, func() {
5506
5507
5508
5509 podIndexer.Add(pods[1])
5510 },
5511 }
5512 manager.syncJob(context.TODO(), testutil.GetKey(job, t))
5513 if len(fakePodControl.Templates) != 0 {
5514 t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", 0, len(fakePodControl.Templates))
5515 }
5516 if len(fakePodControl.DeletePodName) != 0 {
5517 t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", 0, len(fakePodControl.DeletePodName))
5518 }
5519 }
5520
5521 func TestWatchJobs(t *testing.T) {
5522 _, ctx := ktesting.NewTestContext(t)
5523 clientset := fake.NewSimpleClientset()
5524 fakeWatch := watch.NewFake()
5525 clientset.PrependWatchReactor("jobs", core.DefaultWatchReactor(fakeWatch, nil))
5526 manager, sharedInformerFactory := newControllerFromClient(ctx, t, clientset, controller.NoResyncPeriodFunc)
5527 manager.podStoreSynced = alwaysReady
5528 manager.jobStoreSynced = alwaysReady
5529
5530 var testJob batch.Job
5531 received := make(chan struct{})
5532
5533
5534
5535 manager.syncHandler = func(ctx context.Context, key string) error {
5536 defer close(received)
5537 ns, name, err := cache.SplitMetaNamespaceKey(key)
5538 if err != nil {
5539 t.Errorf("Error getting namespace/name from key %v: %v", key, err)
5540 }
5541 job, err := manager.jobLister.Jobs(ns).Get(name)
5542 if err != nil || job == nil {
5543 t.Errorf("Expected to find job under key %v: %v", key, err)
5544 return nil
5545 }
5546 if !apiequality.Semantic.DeepDerivative(*job, testJob) {
5547 t.Errorf("Expected %#v, but got %#v", testJob, *job)
5548 }
5549 return nil
5550 }
5551
5552
5553 stopCh := make(chan struct{})
5554 defer close(stopCh)
5555 sharedInformerFactory.Start(stopCh)
5556 go manager.Run(context.TODO(), 1)
5557
5558
5559 testJob.Namespace = "bar"
5560 testJob.Name = "foo"
5561 fakeWatch.Add(&testJob)
5562 t.Log("Waiting for job to reach syncHandler")
5563 <-received
5564 }
5565
5566 func TestWatchPods(t *testing.T) {
5567 _, ctx := ktesting.NewTestContext(t)
5568 testJob := newJob(2, 2, 6, batch.NonIndexedCompletion)
5569 clientset := fake.NewSimpleClientset(testJob)
5570 fakeWatch := watch.NewFake()
5571 clientset.PrependWatchReactor("pods", core.DefaultWatchReactor(fakeWatch, nil))
5572 manager, sharedInformerFactory := newControllerFromClient(ctx, t, clientset, controller.NoResyncPeriodFunc)
5573 manager.podStoreSynced = alwaysReady
5574 manager.jobStoreSynced = alwaysReady
5575
5576
5577 sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(testJob)
5578 received := make(chan struct{})
5579
5580
5581 manager.syncHandler = func(ctx context.Context, key string) error {
5582 ns, name, err := cache.SplitMetaNamespaceKey(key)
5583 if err != nil {
5584 t.Errorf("Error getting namespace/name from key %v: %v", key, err)
5585 }
5586 job, err := manager.jobLister.Jobs(ns).Get(name)
5587 if err != nil {
5588 t.Errorf("Expected to find job under key %v: %v", key, err)
5589 }
5590 if !apiequality.Semantic.DeepDerivative(job, testJob) {
5591 t.Errorf("\nExpected %#v,\nbut got %#v", testJob, job)
5592 close(received)
5593 return nil
5594 }
5595 close(received)
5596 return nil
5597 }
5598
5599
5600 stopCh := make(chan struct{})
5601 defer close(stopCh)
5602 go sharedInformerFactory.Core().V1().Pods().Informer().Run(stopCh)
5603 go manager.Run(context.TODO(), 1)
5604
5605 pods := newPodList(1, v1.PodRunning, testJob)
5606 testPod := pods[0]
5607 testPod.Status.Phase = v1.PodFailed
5608 fakeWatch.Add(testPod)
5609
5610 t.Log("Waiting for pod to reach syncHandler")
5611 <-received
5612 }
5613
5614 func TestWatchOrphanPods(t *testing.T) {
5615 _, ctx := ktesting.NewTestContext(t)
5616 clientset := fake.NewSimpleClientset()
5617 sharedInformers := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc())
5618 manager, err := NewController(ctx, sharedInformers.Core().V1().Pods(), sharedInformers.Batch().V1().Jobs(), clientset)
5619 if err != nil {
5620 t.Fatalf("Error creating Job controller: %v", err)
5621 }
5622 manager.podStoreSynced = alwaysReady
5623 manager.jobStoreSynced = alwaysReady
5624
5625 stopCh := make(chan struct{})
5626 defer close(stopCh)
5627 podInformer := sharedInformers.Core().V1().Pods().Informer()
5628 go podInformer.Run(stopCh)
5629 cache.WaitForCacheSync(stopCh, podInformer.HasSynced)
5630 go manager.Run(context.TODO(), 1)
5631
5632
5633 cases := map[string]struct {
5634 job *batch.Job
5635 inCache bool
5636 }{
5637 "job_does_not_exist": {
5638 job: newJob(2, 2, 6, batch.NonIndexedCompletion),
5639 },
5640 "orphan": {},
5641 "job_finished": {
5642 job: func() *batch.Job {
5643 j := newJob(2, 2, 6, batch.NonIndexedCompletion)
5644 j.Status.Conditions = append(j.Status.Conditions, batch.JobCondition{
5645 Type: batch.JobComplete,
5646 Status: v1.ConditionTrue,
5647 })
5648 return j
5649 }(),
5650 inCache: true,
5651 },
5652 }
5653 for name, tc := range cases {
5654 t.Run(name, func(t *testing.T) {
5655 if tc.inCache {
5656 if err := sharedInformers.Batch().V1().Jobs().Informer().GetIndexer().Add(tc.job); err != nil {
5657 t.Fatalf("Failed to insert job in index: %v", err)
5658 }
5659 t.Cleanup(func() {
5660 sharedInformers.Batch().V1().Jobs().Informer().GetIndexer().Delete(tc.job)
5661 })
5662 }
5663
5664 podBuilder := buildPod().name(name).deletionTimestamp().trackingFinalizer()
5665 if tc.job != nil {
5666 podBuilder = podBuilder.job(tc.job)
5667 }
5668 orphanPod := podBuilder.Pod
5669 orphanPod, err := clientset.CoreV1().Pods("default").Create(context.Background(), orphanPod, metav1.CreateOptions{})
5670 if err != nil {
5671 t.Fatalf("Creating orphan pod: %v", err)
5672 }
5673
5674 if err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) {
5675 p, err := clientset.CoreV1().Pods(orphanPod.Namespace).Get(context.Background(), orphanPod.Name, metav1.GetOptions{})
5676 if err != nil {
5677 return false, err
5678 }
5679 return !hasJobTrackingFinalizer(p), nil
5680 }); err != nil {
5681 t.Errorf("Waiting for Pod to get the finalizer removed: %v", err)
5682 }
5683 })
5684 }
5685 }
5686
5687 func bumpResourceVersion(obj metav1.Object) {
5688 ver, _ := strconv.ParseInt(obj.GetResourceVersion(), 10, 32)
5689 obj.SetResourceVersion(strconv.FormatInt(ver+1, 10))
5690 }
5691
5692 func TestJobApiBackoffReset(t *testing.T) {
5693 t.Cleanup(setDurationDuringTest(&DefaultJobApiBackOff, fastJobApiBackoff))
5694 _, ctx := ktesting.NewTestContext(t)
5695
5696 clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
5697 fakeClock := clocktesting.NewFakeClock(time.Now())
5698 manager, sharedInformerFactory := newControllerFromClientWithClock(ctx, t, clientset, controller.NoResyncPeriodFunc, fakeClock)
5699 fakePodControl := controller.FakePodControl{}
5700 manager.podControl = &fakePodControl
5701 manager.podStoreSynced = alwaysReady
5702 manager.jobStoreSynced = alwaysReady
5703 manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) {
5704 return job, nil
5705 }
5706
5707 job := newJob(1, 1, 2, batch.NonIndexedCompletion)
5708 key := testutil.GetKey(job, t)
5709 sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
5710
5711
5712 fakePodControl.Err = errors.New("Controller error")
5713 manager.queue.Add(key)
5714 manager.processNextWorkItem(context.TODO())
5715 retries := manager.queue.NumRequeues(key)
5716 if retries != 1 {
5717 t.Fatalf("%s: expected exactly 1 retry, got %d", job.Name, retries)
5718 }
5719
5720 awaitForQueueLen(ctx, t, manager, 1)
5721
5722
5723 fakePodControl.Err = nil
5724 manager.processNextWorkItem(context.TODO())
5725 verifyEmptyQueue(ctx, t, manager)
5726 }
5727
5728 var _ workqueue.RateLimitingInterface = &fakeRateLimitingQueue{}
5729
5730 type fakeRateLimitingQueue struct {
5731 workqueue.Interface
5732 requeues int
5733 item interface{}
5734 duration time.Duration
5735 }
5736
5737 func (f *fakeRateLimitingQueue) AddRateLimited(item interface{}) {}
5738 func (f *fakeRateLimitingQueue) Forget(item interface{}) {
5739 f.requeues = 0
5740 }
5741 func (f *fakeRateLimitingQueue) NumRequeues(item interface{}) int {
5742 return f.requeues
5743 }
5744 func (f *fakeRateLimitingQueue) AddAfter(item interface{}, duration time.Duration) {
5745 f.item = item
5746 f.duration = duration
5747 }
5748
5749 func TestJobBackoff(t *testing.T) {
5750 _, ctx := ktesting.NewTestContext(t)
5751 logger := klog.FromContext(ctx)
5752 job := newJob(1, 1, 1, batch.NonIndexedCompletion)
5753 oldPod := newPod(fmt.Sprintf("pod-%v", rand.String(10)), job)
5754 oldPod.ResourceVersion = "1"
5755 newPod := oldPod.DeepCopy()
5756 newPod.ResourceVersion = "2"
5757
5758 testCases := map[string]struct {
5759 requeues int
5760 oldPodPhase v1.PodPhase
5761 phase v1.PodPhase
5762 wantBackoff time.Duration
5763 }{
5764 "failure with pod updates batching": {
5765 requeues: 0,
5766 phase: v1.PodFailed,
5767 wantBackoff: syncJobBatchPeriod,
5768 },
5769 }
5770
5771 for name, tc := range testCases {
5772 t.Run(name, func(t *testing.T) {
5773 clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
5774 manager, sharedInformerFactory := newControllerFromClient(ctx, t, clientset, controller.NoResyncPeriodFunc)
5775 fakePodControl := controller.FakePodControl{}
5776 manager.podControl = &fakePodControl
5777 manager.podStoreSynced = alwaysReady
5778 manager.jobStoreSynced = alwaysReady
5779 queue := &fakeRateLimitingQueue{}
5780 manager.queue = queue
5781 sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
5782
5783 queue.requeues = tc.requeues
5784 newPod.Status.Phase = tc.phase
5785 oldPod.Status.Phase = v1.PodRunning
5786 if tc.oldPodPhase != "" {
5787 oldPod.Status.Phase = tc.oldPodPhase
5788 }
5789 manager.updatePod(logger, oldPod, newPod)
5790 if queue.duration != tc.wantBackoff {
5791 t.Errorf("unexpected backoff %v, expected %v", queue.duration, tc.wantBackoff)
5792 }
5793 })
5794 }
5795 }
5796
5797 func TestJobBackoffForOnFailure(t *testing.T) {
5798 _, ctx := ktesting.NewTestContext(t)
5799 jobConditionComplete := batch.JobComplete
5800 jobConditionFailed := batch.JobFailed
5801 jobConditionSuspended := batch.JobSuspended
5802
5803 testCases := map[string]struct {
5804
5805 parallelism int32
5806 completions int32
5807 backoffLimit int32
5808 suspend bool
5809
5810
5811 restartCounts []int32
5812 podPhase v1.PodPhase
5813
5814
5815 expectedActive int32
5816 expectedSucceeded int32
5817 expectedFailed int32
5818 expectedCondition *batch.JobConditionType
5819 expectedConditionReason string
5820 }{
5821 "backoffLimit 0 should have 1 pod active": {
5822 1, 1, 0,
5823 false, []int32{0}, v1.PodRunning,
5824 1, 0, 0, nil, "",
5825 },
5826 "backoffLimit 1 with restartCount 0 should have 1 pod active": {
5827 1, 1, 1,
5828 false, []int32{0}, v1.PodRunning,
5829 1, 0, 0, nil, "",
5830 },
5831 "backoffLimit 1 with restartCount 1 and podRunning should have 0 pod active": {
5832 1, 1, 1,
5833 false, []int32{1}, v1.PodRunning,
5834 0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded",
5835 },
5836 "backoffLimit 1 with restartCount 1 and podPending should have 0 pod active": {
5837 1, 1, 1,
5838 false, []int32{1}, v1.PodPending,
5839 0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded",
5840 },
5841 "too many job failures with podRunning - single pod": {
5842 1, 5, 2,
5843 false, []int32{2}, v1.PodRunning,
5844 0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded",
5845 },
5846 "too many job failures with podPending - single pod": {
5847 1, 5, 2,
5848 false, []int32{2}, v1.PodPending,
5849 0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded",
5850 },
5851 "too many job failures with podRunning - multiple pods": {
5852 2, 5, 2,
5853 false, []int32{1, 1}, v1.PodRunning,
5854 0, 0, 2, &jobConditionFailed, "BackoffLimitExceeded",
5855 },
5856 "too many job failures with podPending - multiple pods": {
5857 2, 5, 2,
5858 false, []int32{1, 1}, v1.PodPending,
5859 0, 0, 2, &jobConditionFailed, "BackoffLimitExceeded",
5860 },
5861 "not enough failures": {
5862 2, 5, 3,
5863 false, []int32{1, 1}, v1.PodRunning,
5864 2, 0, 0, nil, "",
5865 },
5866 "suspending a job": {
5867 2, 4, 6,
5868 true, []int32{1, 1}, v1.PodRunning,
5869 0, 0, 0, &jobConditionSuspended, "JobSuspended",
5870 },
5871 "finshed job": {
5872 2, 4, 6,
5873 true, []int32{1, 1, 2, 0}, v1.PodSucceeded,
5874 0, 4, 0, &jobConditionComplete, "",
5875 },
5876 }
5877
5878 for name, tc := range testCases {
5879 t.Run(name, func(t *testing.T) {
5880
5881 clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
5882 manager, sharedInformerFactory := newControllerFromClient(ctx, t, clientset, controller.NoResyncPeriodFunc)
5883 fakePodControl := controller.FakePodControl{}
5884 manager.podControl = &fakePodControl
5885 manager.podStoreSynced = alwaysReady
5886 manager.jobStoreSynced = alwaysReady
5887 var actual *batch.Job
5888 manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) {
5889 actual = job
5890 return job, nil
5891 }
5892
5893
5894 job := newJob(tc.parallelism, tc.completions, tc.backoffLimit, batch.NonIndexedCompletion)
5895 job.Spec.Template.Spec.RestartPolicy = v1.RestartPolicyOnFailure
5896 job.Spec.Suspend = ptr.To(tc.suspend)
5897 sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
5898 podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer()
5899 for i, pod := range newPodList(len(tc.restartCounts), tc.podPhase, job) {
5900 pod.Status.ContainerStatuses = []v1.ContainerStatus{{RestartCount: tc.restartCounts[i]}}
5901 podIndexer.Add(pod)
5902 }
5903
5904
5905 err := manager.syncJob(context.TODO(), testutil.GetKey(job, t))
5906
5907 if err != nil {
5908 t.Errorf("unexpected error syncing job. Got %#v", err)
5909 }
5910
5911 if actual.Status.Active != tc.expectedActive {
5912 t.Errorf("unexpected number of active pods. Expected %d, saw %d\n", tc.expectedActive, actual.Status.Active)
5913 }
5914 if actual.Status.Succeeded != tc.expectedSucceeded {
5915 t.Errorf("unexpected number of succeeded pods. Expected %d, saw %d\n", tc.expectedSucceeded, actual.Status.Succeeded)
5916 }
5917 if actual.Status.Failed != tc.expectedFailed {
5918 t.Errorf("unexpected number of failed pods. Expected %d, saw %d\n", tc.expectedFailed, actual.Status.Failed)
5919 }
5920
5921 if tc.expectedCondition != nil && !getCondition(actual, *tc.expectedCondition, v1.ConditionTrue, tc.expectedConditionReason) {
5922 t.Errorf("expected completion condition. Got %#v", actual.Status.Conditions)
5923 }
5924 })
5925 }
5926 }
5927
5928 func TestJobBackoffOnRestartPolicyNever(t *testing.T) {
5929 _, ctx := ktesting.NewTestContext(t)
5930 jobConditionFailed := batch.JobFailed
5931
5932 testCases := map[string]struct {
5933
5934 parallelism int32
5935 completions int32
5936 backoffLimit int32
5937
5938
5939 activePodsPhase v1.PodPhase
5940 activePods int
5941 failedPods int
5942
5943
5944 expectedActive int32
5945 expectedSucceeded int32
5946 expectedFailed int32
5947 expectedCondition *batch.JobConditionType
5948 expectedConditionReason string
5949 }{
5950 "not enough failures with backoffLimit 0 - single pod": {
5951 1, 1, 0,
5952 v1.PodRunning, 1, 0,
5953 1, 0, 0, nil, "",
5954 },
5955 "not enough failures with backoffLimit 1 - single pod": {
5956 1, 1, 1,
5957 "", 0, 1,
5958 1, 0, 1, nil, "",
5959 },
5960 "too many failures with backoffLimit 1 - single pod": {
5961 1, 1, 1,
5962 "", 0, 2,
5963 0, 0, 2, &jobConditionFailed, "BackoffLimitExceeded",
5964 },
5965 "not enough failures with backoffLimit 6 - multiple pods": {
5966 2, 2, 6,
5967 v1.PodRunning, 1, 6,
5968 2, 0, 6, nil, "",
5969 },
5970 "too many failures with backoffLimit 6 - multiple pods": {
5971 2, 2, 6,
5972 "", 0, 7,
5973 0, 0, 7, &jobConditionFailed, "BackoffLimitExceeded",
5974 },
5975 }
5976
5977 for name, tc := range testCases {
5978 t.Run(name, func(t *testing.T) {
5979
5980 clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
5981 manager, sharedInformerFactory := newControllerFromClient(ctx, t, clientset, controller.NoResyncPeriodFunc)
5982 fakePodControl := controller.FakePodControl{}
5983 manager.podControl = &fakePodControl
5984 manager.podStoreSynced = alwaysReady
5985 manager.jobStoreSynced = alwaysReady
5986 var actual *batch.Job
5987 manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) {
5988 actual = job
5989 return job, nil
5990 }
5991
5992
5993 job := newJob(tc.parallelism, tc.completions, tc.backoffLimit, batch.NonIndexedCompletion)
5994 job.Spec.Template.Spec.RestartPolicy = v1.RestartPolicyNever
5995 sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
5996 podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer()
5997 for _, pod := range newPodList(tc.failedPods, v1.PodFailed, job) {
5998 pod.Status.ContainerStatuses = []v1.ContainerStatus{{State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{
5999 FinishedAt: testFinishedAt,
6000 }}}}
6001 podIndexer.Add(pod)
6002 }
6003 for _, pod := range newPodList(tc.activePods, tc.activePodsPhase, job) {
6004 podIndexer.Add(pod)
6005 }
6006
6007
6008 err := manager.syncJob(context.TODO(), testutil.GetKey(job, t))
6009 if err != nil {
6010 t.Fatalf("unexpected error syncing job: %#v\n", err)
6011 }
6012
6013 if actual.Status.Active != tc.expectedActive {
6014 t.Errorf("unexpected number of active pods. Expected %d, saw %d\n", tc.expectedActive, actual.Status.Active)
6015 }
6016 if actual.Status.Succeeded != tc.expectedSucceeded {
6017 t.Errorf("unexpected number of succeeded pods. Expected %d, saw %d\n", tc.expectedSucceeded, actual.Status.Succeeded)
6018 }
6019 if actual.Status.Failed != tc.expectedFailed {
6020 t.Errorf("unexpected number of failed pods. Expected %d, saw %d\n", tc.expectedFailed, actual.Status.Failed)
6021 }
6022
6023 if tc.expectedCondition != nil && !getCondition(actual, *tc.expectedCondition, v1.ConditionTrue, tc.expectedConditionReason) {
6024 t.Errorf("expected completion condition. Got %#v", actual.Status.Conditions)
6025 }
6026 })
6027 }
6028 }
6029
6030 func TestEnsureJobConditions(t *testing.T) {
6031 testCases := []struct {
6032 name string
6033 haveList []batch.JobCondition
6034 wantType batch.JobConditionType
6035 wantStatus v1.ConditionStatus
6036 wantReason string
6037 expectList []batch.JobCondition
6038 expectUpdate bool
6039 }{
6040 {
6041 name: "append true condition",
6042 haveList: []batch.JobCondition{},
6043 wantType: batch.JobSuspended,
6044 wantStatus: v1.ConditionTrue,
6045 wantReason: "foo",
6046 expectList: []batch.JobCondition{*newCondition(batch.JobSuspended, v1.ConditionTrue, "foo", "", realClock.Now())},
6047 expectUpdate: true,
6048 },
6049 {
6050 name: "append false condition",
6051 haveList: []batch.JobCondition{},
6052 wantType: batch.JobSuspended,
6053 wantStatus: v1.ConditionFalse,
6054 wantReason: "foo",
6055 expectList: []batch.JobCondition{},
6056 expectUpdate: false,
6057 },
6058 {
6059 name: "update true condition reason",
6060 haveList: []batch.JobCondition{*newCondition(batch.JobSuspended, v1.ConditionTrue, "foo", "", realClock.Now())},
6061 wantType: batch.JobSuspended,
6062 wantStatus: v1.ConditionTrue,
6063 wantReason: "bar",
6064 expectList: []batch.JobCondition{*newCondition(batch.JobSuspended, v1.ConditionTrue, "bar", "", realClock.Now())},
6065 expectUpdate: true,
6066 },
6067 {
6068 name: "update true condition status",
6069 haveList: []batch.JobCondition{*newCondition(batch.JobSuspended, v1.ConditionTrue, "foo", "", realClock.Now())},
6070 wantType: batch.JobSuspended,
6071 wantStatus: v1.ConditionFalse,
6072 wantReason: "foo",
6073 expectList: []batch.JobCondition{*newCondition(batch.JobSuspended, v1.ConditionFalse, "foo", "", realClock.Now())},
6074 expectUpdate: true,
6075 },
6076 {
6077 name: "update false condition status",
6078 haveList: []batch.JobCondition{*newCondition(batch.JobSuspended, v1.ConditionFalse, "foo", "", realClock.Now())},
6079 wantType: batch.JobSuspended,
6080 wantStatus: v1.ConditionTrue,
6081 wantReason: "foo",
6082 expectList: []batch.JobCondition{*newCondition(batch.JobSuspended, v1.ConditionTrue, "foo", "", realClock.Now())},
6083 expectUpdate: true,
6084 },
6085 {
6086 name: "condition already exists",
6087 haveList: []batch.JobCondition{*newCondition(batch.JobSuspended, v1.ConditionTrue, "foo", "", realClock.Now())},
6088 wantType: batch.JobSuspended,
6089 wantStatus: v1.ConditionTrue,
6090 wantReason: "foo",
6091 expectList: []batch.JobCondition{*newCondition(batch.JobSuspended, v1.ConditionTrue, "foo", "", realClock.Now())},
6092 expectUpdate: false,
6093 },
6094 }
6095 for _, tc := range testCases {
6096 t.Run(tc.name, func(t *testing.T) {
6097 gotList, isUpdated := ensureJobConditionStatus(tc.haveList, tc.wantType, tc.wantStatus, tc.wantReason, "", realClock.Now())
6098 if isUpdated != tc.expectUpdate {
6099 t.Errorf("Got isUpdated=%v, want %v", isUpdated, tc.expectUpdate)
6100 }
6101 if len(gotList) != len(tc.expectList) {
6102 t.Errorf("got a list of length %d, want %d", len(gotList), len(tc.expectList))
6103 }
6104 if diff := cmp.Diff(tc.expectList, gotList, cmpopts.IgnoreFields(batch.JobCondition{}, "LastProbeTime", "LastTransitionTime")); diff != "" {
6105 t.Errorf("Unexpected JobCondition list: (-want,+got):\n%s", diff)
6106 }
6107 })
6108 }
6109 }
6110
6111 func TestFinalizersRemovedExpectations(t *testing.T) {
6112 _, ctx := ktesting.NewTestContext(t)
6113 clientset := fake.NewSimpleClientset()
6114 sharedInformers := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc())
6115 manager, err := NewController(ctx, sharedInformers.Core().V1().Pods(), sharedInformers.Batch().V1().Jobs(), clientset)
6116 if err != nil {
6117 t.Fatalf("Error creating Job controller: %v", err)
6118 }
6119 manager.podStoreSynced = alwaysReady
6120 manager.jobStoreSynced = alwaysReady
6121 manager.podControl = &controller.FakePodControl{Err: errors.New("fake pod controller error")}
6122 manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) {
6123 return job, nil
6124 }
6125
6126 job := newJob(2, 2, 6, batch.NonIndexedCompletion)
6127 sharedInformers.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
6128 pods := append(newPodList(2, v1.PodSucceeded, job), newPodList(2, v1.PodFailed, job)...)
6129 podInformer := sharedInformers.Core().V1().Pods().Informer()
6130 podIndexer := podInformer.GetIndexer()
6131 uids := sets.New[string]()
6132 for i := range pods {
6133 clientset.Tracker().Add(pods[i])
6134 podIndexer.Add(pods[i])
6135 uids.Insert(string(pods[i].UID))
6136 }
6137 jobKey := testutil.GetKey(job, t)
6138
6139 manager.syncJob(context.TODO(), jobKey)
6140 gotExpectedUIDs := manager.finalizerExpectations.getExpectedUIDs(jobKey)
6141 if len(gotExpectedUIDs) != 0 {
6142 t.Errorf("Got unwanted expectations for removed finalizers after first syncJob with client failures:\n%s", sets.List(gotExpectedUIDs))
6143 }
6144
6145
6146 manager.podControl.(*controller.FakePodControl).Err = nil
6147 manager.syncJob(context.TODO(), jobKey)
6148 gotExpectedUIDs = manager.finalizerExpectations.getExpectedUIDs(jobKey)
6149 if diff := cmp.Diff(uids, gotExpectedUIDs); diff != "" {
6150 t.Errorf("Different expectations for removed finalizers after syncJob (-want,+got):\n%s", diff)
6151 }
6152
6153 stopCh := make(chan struct{})
6154 defer close(stopCh)
6155 go sharedInformers.Core().V1().Pods().Informer().Run(stopCh)
6156 cache.WaitForCacheSync(stopCh, podInformer.HasSynced)
6157
6158
6159 gotExpectedUIDs = manager.finalizerExpectations.getExpectedUIDs(jobKey)
6160 if diff := cmp.Diff(uids, gotExpectedUIDs); diff != "" {
6161 t.Errorf("Different expectations for removed finalizers after syncJob and cacheSync (-want,+got):\n%s", diff)
6162 }
6163
6164
6165
6166 podsResource := schema.GroupVersionResource{Version: "v1", Resource: "pods"}
6167
6168 update := pods[0].DeepCopy()
6169 update.Finalizers = nil
6170 update.ResourceVersion = "1"
6171 err = clientset.Tracker().Update(podsResource, update, update.Namespace)
6172 if err != nil {
6173 t.Errorf("Removing finalizer: %v", err)
6174 }
6175
6176 update = pods[1].DeepCopy()
6177 update.Finalizers = nil
6178 update.DeletionTimestamp = &metav1.Time{Time: time.Now()}
6179 update.ResourceVersion = "1"
6180 err = clientset.Tracker().Update(podsResource, update, update.Namespace)
6181 if err != nil {
6182 t.Errorf("Removing finalizer and setting deletion timestamp: %v", err)
6183 }
6184
6185
6186 update = pods[2].DeepCopy()
6187 update.DeletionTimestamp = &metav1.Time{Time: time.Now()}
6188 update.ResourceVersion = "1"
6189 err = clientset.Tracker().Update(podsResource, update, update.Namespace)
6190 if err != nil {
6191 t.Errorf("Setting deletion timestamp: %v", err)
6192 }
6193
6194 err = clientset.Tracker().Delete(podsResource, pods[3].Namespace, pods[3].Name)
6195 if err != nil {
6196 t.Errorf("Deleting pod that had finalizer: %v", err)
6197 }
6198
6199 uids = sets.New(string(pods[2].UID))
6200 var diff string
6201 if err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) {
6202 gotExpectedUIDs = manager.finalizerExpectations.getExpectedUIDs(jobKey)
6203 diff = cmp.Diff(uids, gotExpectedUIDs)
6204 return diff == "", nil
6205 }); err != nil {
6206 t.Errorf("Timeout waiting for expectations (-want, +got):\n%s", diff)
6207 }
6208 }
6209
6210 func TestFinalizerCleanup(t *testing.T) {
6211 _, ctx := ktesting.NewTestContext(t)
6212 ctx, cancel := context.WithCancel(ctx)
6213 defer cancel()
6214
6215 clientset := fake.NewSimpleClientset()
6216 sharedInformers := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc())
6217 manager, err := NewController(ctx, sharedInformers.Core().V1().Pods(), sharedInformers.Batch().V1().Jobs(), clientset)
6218 if err != nil {
6219 t.Fatalf("Error creating Job controller: %v", err)
6220 }
6221 manager.podStoreSynced = alwaysReady
6222 manager.jobStoreSynced = alwaysReady
6223
6224
6225 sharedInformers.Start(ctx.Done())
6226 sharedInformers.WaitForCacheSync(ctx.Done())
6227
6228
6229 go manager.Run(ctx, 0)
6230
6231
6232 job := newJob(1, 1, 1, batch.NonIndexedCompletion)
6233 job, err = clientset.BatchV1().Jobs(job.GetNamespace()).Create(ctx, job, metav1.CreateOptions{})
6234 if err != nil {
6235 t.Fatalf("Creating job: %v", err)
6236 }
6237
6238
6239 if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) {
6240 job, _ := manager.jobLister.Jobs(job.GetNamespace()).Get(job.Name)
6241 return job != nil, nil
6242 }); err != nil {
6243 t.Fatalf("Waiting for Job object to appear in jobLister: %v", err)
6244 }
6245
6246
6247 pod := newPod("test-pod", job)
6248 pod.Finalizers = append(pod.Finalizers, batch.JobTrackingFinalizer)
6249 pod, err = clientset.CoreV1().Pods(pod.GetNamespace()).Create(ctx, pod, metav1.CreateOptions{})
6250 if err != nil {
6251 t.Fatalf("Creating pod: %v", err)
6252 }
6253
6254
6255
6256
6257 if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) {
6258 pod, _ := manager.podStore.Pods(pod.GetNamespace()).Get(pod.Name)
6259 return pod != nil, nil
6260 }); err != nil {
6261 t.Fatalf("Waiting for Pod to appear in podLister: %v", err)
6262 }
6263
6264
6265 job.Status.Conditions = append(job.Status.Conditions, batch.JobCondition{
6266 Type: batch.JobComplete,
6267 Status: v1.ConditionTrue,
6268 })
6269 _, err = clientset.BatchV1().Jobs(job.GetNamespace()).UpdateStatus(ctx, job, metav1.UpdateOptions{})
6270 if err != nil {
6271 t.Fatalf("Updating job status: %v", err)
6272 }
6273
6274
6275
6276 if err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, wait.ForeverTestTimeout, true, func(ctx context.Context) (bool, error) {
6277 p, err := clientset.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{})
6278 if err != nil {
6279 return false, err
6280 }
6281 return !hasJobTrackingFinalizer(p), nil
6282 }); err != nil {
6283 t.Errorf("Waiting for Pod to get the finalizer removed: %v", err)
6284 }
6285
6286 }
6287
6288 func checkJobCompletionLabel(t *testing.T, p *v1.PodTemplateSpec) {
6289 t.Helper()
6290 labels := p.GetLabels()
6291 if labels == nil || labels[batch.JobCompletionIndexAnnotation] == "" {
6292 t.Errorf("missing expected pod label %s", batch.JobCompletionIndexAnnotation)
6293 }
6294 }
6295
6296 func checkJobCompletionEnvVariable(t *testing.T, spec *v1.PodSpec, podIndexLabelDisabled bool) {
6297 t.Helper()
6298 var fieldPath string
6299 if podIndexLabelDisabled {
6300 fieldPath = fmt.Sprintf("metadata.annotations['%s']", batch.JobCompletionIndexAnnotation)
6301 } else {
6302 fieldPath = fmt.Sprintf("metadata.labels['%s']", batch.JobCompletionIndexAnnotation)
6303 }
6304 want := []v1.EnvVar{
6305 {
6306 Name: "JOB_COMPLETION_INDEX",
6307 ValueFrom: &v1.EnvVarSource{
6308 FieldRef: &v1.ObjectFieldSelector{
6309 FieldPath: fieldPath,
6310 },
6311 },
6312 },
6313 }
6314 for _, c := range spec.InitContainers {
6315 if diff := cmp.Diff(want, c.Env); diff != "" {
6316 t.Errorf("Unexpected Env in container %s (-want,+got):\n%s", c.Name, diff)
6317 }
6318 }
6319 for _, c := range spec.Containers {
6320 if diff := cmp.Diff(want, c.Env); diff != "" {
6321 t.Errorf("Unexpected Env in container %s (-want,+got):\n%s", c.Name, diff)
6322 }
6323 }
6324 }
6325
6326 func podReplacementPolicy(m batch.PodReplacementPolicy) *batch.PodReplacementPolicy {
6327 return &m
6328 }
6329
6330 func verifyEmptyQueueAndAwaitForQueueLen(ctx context.Context, t *testing.T, jm *Controller, wantQueueLen int) {
6331 t.Helper()
6332 verifyEmptyQueue(ctx, t, jm)
6333 awaitForQueueLen(ctx, t, jm, wantQueueLen)
6334 }
6335
6336 func awaitForQueueLen(ctx context.Context, t *testing.T, jm *Controller, wantQueueLen int) {
6337 t.Helper()
6338 verifyEmptyQueue(ctx, t, jm)
6339 if err := wait.PollUntilContextTimeout(ctx, fastRequeue, time.Second, true, func(ctx context.Context) (bool, error) {
6340 if requeued := jm.queue.Len() == wantQueueLen; requeued {
6341 return true, nil
6342 }
6343 jm.clock.Sleep(fastRequeue)
6344 return false, nil
6345 }); err != nil {
6346 t.Errorf("Failed to await for expected queue.Len(). want %v, got: %v", wantQueueLen, jm.queue.Len())
6347 }
6348 }
6349
6350 func verifyEmptyQueue(ctx context.Context, t *testing.T, jm *Controller) {
6351 t.Helper()
6352 if jm.queue.Len() > 0 {
6353 t.Errorf("Unexpected queue.Len(). Want: %d, got: %d", 0, jm.queue.Len())
6354 }
6355 }
6356
6357 type podBuilder struct {
6358 *v1.Pod
6359 }
6360
6361 func buildPod() podBuilder {
6362 return podBuilder{Pod: &v1.Pod{
6363 ObjectMeta: metav1.ObjectMeta{
6364 UID: types.UID(rand.String(5)),
6365 },
6366 }}
6367 }
6368
6369 func getConditionsByType(list []batch.JobCondition, cType batch.JobConditionType) []*batch.JobCondition {
6370 var result []*batch.JobCondition
6371 for i := range list {
6372 if list[i].Type == cType {
6373 result = append(result, &list[i])
6374 }
6375 }
6376 return result
6377 }
6378
6379 func (pb podBuilder) name(n string) podBuilder {
6380 pb.Name = n
6381 return pb
6382 }
6383
6384 func (pb podBuilder) ns(n string) podBuilder {
6385 pb.Namespace = n
6386 return pb
6387 }
6388
6389 func (pb podBuilder) uid(u string) podBuilder {
6390 pb.UID = types.UID(u)
6391 return pb
6392 }
6393
6394 func (pb podBuilder) job(j *batch.Job) podBuilder {
6395 pb.Labels = j.Spec.Selector.MatchLabels
6396 pb.Namespace = j.Namespace
6397 pb.OwnerReferences = []metav1.OwnerReference{*metav1.NewControllerRef(j, controllerKind)}
6398 return pb
6399 }
6400
6401 func (pb podBuilder) clearOwner() podBuilder {
6402 pb.OwnerReferences = nil
6403 return pb
6404 }
6405
6406 func (pb podBuilder) clearLabels() podBuilder {
6407 pb.Labels = nil
6408 return pb
6409 }
6410
6411 func (pb podBuilder) index(ix string) podBuilder {
6412 return pb.annotation(batch.JobCompletionIndexAnnotation, ix)
6413 }
6414
6415 func (pb podBuilder) indexFailureCount(count string) podBuilder {
6416 return pb.annotation(batch.JobIndexFailureCountAnnotation, count)
6417 }
6418
6419 func (pb podBuilder) indexIgnoredFailureCount(count string) podBuilder {
6420 return pb.annotation(batch.JobIndexIgnoredFailureCountAnnotation, count)
6421 }
6422
6423 func (pb podBuilder) annotation(key, value string) podBuilder {
6424 if pb.Annotations == nil {
6425 pb.Annotations = make(map[string]string)
6426 }
6427 pb.Annotations[key] = value
6428 return pb
6429 }
6430
6431 func (pb podBuilder) status(s v1.PodStatus) podBuilder {
6432 pb.Status = s
6433 return pb
6434 }
6435
6436 func (pb podBuilder) phase(p v1.PodPhase) podBuilder {
6437 pb.Status.Phase = p
6438 return pb
6439 }
6440
6441 func (pb podBuilder) trackingFinalizer() podBuilder {
6442 for _, f := range pb.Finalizers {
6443 if f == batch.JobTrackingFinalizer {
6444 return pb
6445 }
6446 }
6447 pb.Finalizers = append(pb.Finalizers, batch.JobTrackingFinalizer)
6448 return pb
6449 }
6450
6451 func (pb podBuilder) deletionTimestamp() podBuilder {
6452 pb.DeletionTimestamp = &metav1.Time{}
6453 return pb
6454 }
6455
6456 func (pb podBuilder) customDeletionTimestamp(t time.Time) podBuilder {
6457 pb.DeletionTimestamp = &metav1.Time{Time: t}
6458 return pb
6459 }
6460
6461 func completionModePtr(m batch.CompletionMode) *batch.CompletionMode {
6462 return &m
6463 }
6464
6465 func setDurationDuringTest(val *time.Duration, newVal time.Duration) func() {
6466 origVal := *val
6467 *val = newVal
6468 return func() {
6469 *val = origVal
6470 }
6471 }
6472
View as plain text