1
16
17 package job
18
19 import (
20 "context"
21 "errors"
22 "fmt"
23 "sort"
24 "strconv"
25 "strings"
26 "sync"
27 "sync/atomic"
28 "testing"
29 "time"
30
31 "github.com/google/go-cmp/cmp"
32 batchv1 "k8s.io/api/batch/v1"
33 v1 "k8s.io/api/core/v1"
34 eventsv1 "k8s.io/api/events/v1"
35 apierrors "k8s.io/apimachinery/pkg/api/errors"
36 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
37 "k8s.io/apimachinery/pkg/runtime/schema"
38 "k8s.io/apimachinery/pkg/types"
39 "k8s.io/apimachinery/pkg/util/sets"
40 "k8s.io/apimachinery/pkg/util/validation/field"
41 "k8s.io/apimachinery/pkg/util/wait"
42 "k8s.io/apimachinery/pkg/watch"
43 "k8s.io/apiserver/pkg/util/feature"
44 "k8s.io/client-go/informers"
45 clientset "k8s.io/client-go/kubernetes"
46 typedv1 "k8s.io/client-go/kubernetes/typed/batch/v1"
47 restclient "k8s.io/client-go/rest"
48 "k8s.io/client-go/tools/record"
49 "k8s.io/client-go/util/retry"
50 featuregatetesting "k8s.io/component-base/featuregate/testing"
51 basemetrics "k8s.io/component-base/metrics"
52 "k8s.io/component-base/metrics/testutil"
53 "k8s.io/klog/v2"
54 kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
55 podutil "k8s.io/kubernetes/pkg/api/v1/pod"
56 "k8s.io/kubernetes/pkg/controller"
57 jobcontroller "k8s.io/kubernetes/pkg/controller/job"
58 "k8s.io/kubernetes/pkg/controller/job/metrics"
59 "k8s.io/kubernetes/pkg/features"
60 "k8s.io/kubernetes/test/integration/framework"
61 "k8s.io/kubernetes/test/integration/util"
62 "k8s.io/utils/ptr"
63 )
64
65 const waitInterval = time.Second
66 const fastPodFailureBackoff = 100 * time.Millisecond
67
68
69
70
71
72 const sleepDurationForControllerLatency = 100 * time.Millisecond
73
74 type metricLabelsWithValue struct {
75 Labels []string
76 Value int
77 }
78
79 func validateCounterMetric(ctx context.Context, t *testing.T, counterVec *basemetrics.CounterVec, wantMetric metricLabelsWithValue) {
80 t.Helper()
81 var cmpErr error
82 err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 10*time.Second, true, func(ctx context.Context) (bool, error) {
83 cmpErr = nil
84 value, err := testutil.GetCounterMetricValue(counterVec.WithLabelValues(wantMetric.Labels...))
85 if err != nil {
86 return true, fmt.Errorf("collecting the %q metric: %q", counterVec.Name, err)
87 }
88 if wantMetric.Value != int(value) {
89 cmpErr = fmt.Errorf("Unexpected metric delta for %q metric with labels %q. want: %v, got: %v", counterVec.Name, wantMetric.Labels, wantMetric.Value, int(value))
90 return false, nil
91 }
92 return true, nil
93 })
94 if err != nil {
95 t.Errorf("Failed waiting for expected metric: %q", err)
96 }
97 if cmpErr != nil {
98 t.Error(cmpErr)
99 }
100 }
101
102 func validateTerminatedPodsTrackingFinalizerMetric(ctx context.Context, t *testing.T, want int) {
103 validateCounterMetric(ctx, t, metrics.TerminatedPodsTrackingFinalizerTotal, metricLabelsWithValue{
104 Value: want,
105 Labels: []string{metrics.Add},
106 })
107 validateCounterMetric(ctx, t, metrics.TerminatedPodsTrackingFinalizerTotal, metricLabelsWithValue{
108 Value: want,
109 Labels: []string{metrics.Delete},
110 })
111 }
112
113
114
115
116
117
118
119
120
121 func TestJobPodFailurePolicyWithFailedPodDeletedDuringControllerRestart(t *testing.T) {
122 count := 3
123 job := batchv1.Job{
124 Spec: batchv1.JobSpec{
125 Template: v1.PodTemplateSpec{
126 Spec: v1.PodSpec{
127 Containers: []v1.Container{
128 {
129 Name: "main-container",
130 Image: "foo",
131 ImagePullPolicy: v1.PullIfNotPresent,
132 TerminationMessagePolicy: v1.TerminationMessageFallbackToLogsOnError,
133 },
134 },
135 },
136 },
137 Parallelism: ptr.To(int32(count)),
138 Completions: ptr.To(int32(count)),
139 PodFailurePolicy: &batchv1.PodFailurePolicy{
140 Rules: []batchv1.PodFailurePolicyRule{
141 {
142 Action: batchv1.PodFailurePolicyActionFailJob,
143 OnExitCodes: &batchv1.PodFailurePolicyOnExitCodesRequirement{
144 Operator: batchv1.PodFailurePolicyOnExitCodesOpIn,
145 Values: []int32{5},
146 },
147 },
148 },
149 },
150 },
151 }
152 podStatusMatchingOnExitCodesTerminateRule := v1.PodStatus{
153 Phase: v1.PodFailed,
154 ContainerStatuses: []v1.ContainerStatus{
155 {
156 Name: "main-container",
157 State: v1.ContainerState{
158 Terminated: &v1.ContainerStateTerminated{
159 ExitCode: 5,
160 },
161 },
162 },
163 },
164 }
165 defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, true)()
166 closeFn, restConfig, cs, ns := setup(t, "simple")
167 defer closeFn()
168
169
170 restConfig.QPS = 1
171 restConfig.Burst = 1
172 ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
173 defer func() {
174 cancel()
175 }()
176 resetMetrics()
177 restConfig.QPS = 200
178 restConfig.Burst = 200
179
180
181 jobObj, err := createJobWithDefaults(ctx, cs, ns.Name, &job)
182 if err != nil {
183 t.Fatalf("Failed to create Job: %v", err)
184 }
185 validateJobPodsStatus(ctx, t, cs, jobObj, podsByStatus{
186 Active: count,
187 Ready: ptr.To[int32](0),
188 Terminating: ptr.To[int32](0),
189 })
190
191 jobPods, err := getJobPods(ctx, t, cs, jobObj, func(s v1.PodStatus) bool {
192 return (s.Phase == v1.PodPending || s.Phase == v1.PodRunning)
193 })
194 if err != nil {
195 t.Fatalf("Failed to list Job Pods: %v", err)
196 }
197
198 failedIndex := 1
199 wg := sync.WaitGroup{}
200 wg.Add(1)
201
202
203
204
205
206 go func(ctx context.Context) {
207 err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, time.Minute, true, func(ctx context.Context) (bool, error) {
208 failedPodUpdated, err := cs.CoreV1().Pods(jobObj.Namespace).Get(ctx, jobPods[failedIndex].Name, metav1.GetOptions{})
209 if err != nil {
210 return true, err
211 }
212 if len(failedPodUpdated.Finalizers) == 0 {
213 return true, nil
214 }
215 return false, nil
216 })
217 if err != nil {
218 t.Logf("Failed awaiting for the finalizer removal for pod %v", klog.KObj(jobPods[failedIndex]))
219 }
220 wg.Done()
221 }(ctx)
222
223
224
225 failedPod := jobPods[failedIndex]
226 updatedPod := failedPod.DeepCopy()
227 updatedPod.Status = podStatusMatchingOnExitCodesTerminateRule
228 _, err = updatePodStatuses(ctx, cs, []v1.Pod{*updatedPod})
229 if err != nil {
230 t.Fatalf("Failed to update pod statuses %q for pods of job %q", err, klog.KObj(jobObj))
231 }
232 wg.Wait()
233
234 t.Logf("Finalizer is removed for the failed pod %q. Shutting down the controller.", klog.KObj(failedPod))
235
236
237 cancel()
238
239
240 ctx, cancel = context.WithCancel(context.Background())
241 err = cs.CoreV1().Pods(failedPod.Namespace).Delete(ctx, failedPod.Name, metav1.DeleteOptions{GracePeriodSeconds: ptr.To[int64](0)})
242 if err != nil {
243 t.Fatalf("Error: '%v' while deleting pod: '%v'", err, klog.KObj(failedPod))
244 }
245 t.Logf("The failed pod %q is deleted", klog.KObj(failedPod))
246 cancel()
247
248
249 ctx, cancel = startJobControllerAndWaitForCaches(t, restConfig)
250
251 validateJobFailed(ctx, t, cs, jobObj)
252 validateNoOrphanPodsWithFinalizers(ctx, t, cs, jobObj)
253 }
254
255
256
257 func TestJobPodFailurePolicy(t *testing.T) {
258 t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, fastPodFailureBackoff))
259 job := batchv1.Job{
260 Spec: batchv1.JobSpec{
261 Template: v1.PodTemplateSpec{
262 Spec: v1.PodSpec{
263 Containers: []v1.Container{
264 {
265 Name: "main-container",
266 Image: "foo",
267 ImagePullPolicy: v1.PullIfNotPresent,
268 TerminationMessagePolicy: v1.TerminationMessageFallbackToLogsOnError,
269 },
270 },
271 },
272 },
273 PodFailurePolicy: &batchv1.PodFailurePolicy{
274 Rules: []batchv1.PodFailurePolicyRule{
275 {
276 Action: batchv1.PodFailurePolicyActionIgnore,
277 OnPodConditions: []batchv1.PodFailurePolicyOnPodConditionsPattern{
278 {
279 Type: v1.DisruptionTarget,
280 },
281 },
282 },
283 {
284 Action: batchv1.PodFailurePolicyActionCount,
285 OnExitCodes: &batchv1.PodFailurePolicyOnExitCodesRequirement{
286 Operator: batchv1.PodFailurePolicyOnExitCodesOpIn,
287 Values: []int32{10},
288 },
289 },
290 {
291 Action: batchv1.PodFailurePolicyActionFailJob,
292 OnExitCodes: &batchv1.PodFailurePolicyOnExitCodesRequirement{
293 Operator: batchv1.PodFailurePolicyOnExitCodesOpIn,
294 Values: []int32{5, 6, 7},
295 },
296 },
297 },
298 },
299 },
300 }
301 podStatusMatchingOnExitCodesTerminateRule := v1.PodStatus{
302 Phase: v1.PodFailed,
303 ContainerStatuses: []v1.ContainerStatus{
304 {
305 Name: "main-container",
306 State: v1.ContainerState{
307 Terminated: &v1.ContainerStateTerminated{
308 ExitCode: 5,
309 },
310 },
311 },
312 },
313 }
314 podStatusMatchingOnExitCodesCountRule := v1.PodStatus{
315 Phase: v1.PodFailed,
316 ContainerStatuses: []v1.ContainerStatus{
317 {
318 Name: "main-container",
319 State: v1.ContainerState{
320 Terminated: &v1.ContainerStateTerminated{
321 ExitCode: 10,
322 },
323 },
324 },
325 },
326 }
327 podStatusMatchingOnPodConditionsIgnoreRule := v1.PodStatus{
328 Phase: v1.PodFailed,
329 Conditions: []v1.PodCondition{
330 {
331 Type: v1.DisruptionTarget,
332 Status: v1.ConditionTrue,
333 },
334 },
335 }
336 podStatusNotMatchingAnyRule := v1.PodStatus{
337 Phase: v1.PodFailed,
338 ContainerStatuses: []v1.ContainerStatus{
339 {
340 State: v1.ContainerState{
341 Terminated: &v1.ContainerStateTerminated{},
342 },
343 },
344 },
345 }
346 testCases := map[string]struct {
347 enableJobPodFailurePolicy bool
348 restartController bool
349 job batchv1.Job
350 podStatus v1.PodStatus
351 wantActive int
352 wantFailed int
353 wantJobConditionType batchv1.JobConditionType
354 wantJobFinishedMetric metricLabelsWithValue
355 wantPodFailuresHandledByPolicyRuleMetric *metricLabelsWithValue
356 }{
357 "pod status matching the configured FailJob rule on exit codes; job terminated when JobPodFailurePolicy enabled": {
358 enableJobPodFailurePolicy: true,
359 job: job,
360 podStatus: podStatusMatchingOnExitCodesTerminateRule,
361 wantActive: 0,
362 wantFailed: 1,
363 wantJobConditionType: batchv1.JobFailed,
364 wantJobFinishedMetric: metricLabelsWithValue{
365 Labels: []string{"NonIndexed", "failed", "PodFailurePolicy"},
366 Value: 1,
367 },
368 wantPodFailuresHandledByPolicyRuleMetric: &metricLabelsWithValue{
369 Labels: []string{"FailJob"},
370 Value: 1,
371 },
372 },
373 "pod status matching the configured FailJob rule on exit codes; with controller restart; job terminated when JobPodFailurePolicy enabled": {
374 enableJobPodFailurePolicy: true,
375 restartController: true,
376 job: job,
377 podStatus: podStatusMatchingOnExitCodesTerminateRule,
378 wantActive: 0,
379 wantFailed: 1,
380 wantJobConditionType: batchv1.JobFailed,
381 wantJobFinishedMetric: metricLabelsWithValue{
382 Labels: []string{"NonIndexed", "failed", "PodFailurePolicy"},
383 Value: 1,
384 },
385 },
386 "pod status matching the configured FailJob rule on exit codes; default handling when JobPodFailurePolicy disabled": {
387 enableJobPodFailurePolicy: false,
388 job: job,
389 podStatus: podStatusMatchingOnExitCodesTerminateRule,
390 wantActive: 1,
391 wantFailed: 1,
392 wantJobConditionType: batchv1.JobComplete,
393 wantJobFinishedMetric: metricLabelsWithValue{
394 Labels: []string{"NonIndexed", "succeeded", ""},
395 Value: 1,
396 },
397 },
398 "pod status matching the configured Ignore rule on pod conditions; pod failure not counted when JobPodFailurePolicy enabled": {
399 enableJobPodFailurePolicy: true,
400 job: job,
401 podStatus: podStatusMatchingOnPodConditionsIgnoreRule,
402 wantActive: 1,
403 wantFailed: 0,
404 wantJobConditionType: batchv1.JobComplete,
405 wantPodFailuresHandledByPolicyRuleMetric: &metricLabelsWithValue{
406 Labels: []string{"Ignore"},
407 Value: 1,
408 },
409 wantJobFinishedMetric: metricLabelsWithValue{
410 Labels: []string{"NonIndexed", "succeeded", ""},
411 Value: 1,
412 },
413 },
414 "pod status matching the configured Count rule on exit codes; pod failure counted when JobPodFailurePolicy enabled": {
415 enableJobPodFailurePolicy: true,
416 job: job,
417 podStatus: podStatusMatchingOnExitCodesCountRule,
418 wantActive: 1,
419 wantFailed: 1,
420 wantJobConditionType: batchv1.JobComplete,
421 wantJobFinishedMetric: metricLabelsWithValue{
422 Labels: []string{"NonIndexed", "succeeded", ""},
423 Value: 1,
424 },
425 wantPodFailuresHandledByPolicyRuleMetric: &metricLabelsWithValue{
426 Labels: []string{"Count"},
427 Value: 1,
428 },
429 },
430 "pod status non-matching any configured rule; pod failure counted when JobPodFailurePolicy enabled": {
431 enableJobPodFailurePolicy: true,
432 job: job,
433 podStatus: podStatusNotMatchingAnyRule,
434 wantActive: 1,
435 wantFailed: 1,
436 wantJobConditionType: batchv1.JobComplete,
437 wantJobFinishedMetric: metricLabelsWithValue{
438 Labels: []string{"NonIndexed", "succeeded", ""},
439 Value: 1,
440 },
441 wantPodFailuresHandledByPolicyRuleMetric: &metricLabelsWithValue{
442 Labels: []string{"Count"},
443 Value: 0,
444 },
445 },
446 }
447 for name, test := range testCases {
448 t.Run(name, func(t *testing.T) {
449 resetMetrics()
450 defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, test.enableJobPodFailurePolicy)()
451
452 closeFn, restConfig, clientSet, ns := setup(t, "simple")
453 defer closeFn()
454 ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
455 defer func() {
456 cancel()
457 }()
458
459 jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &test.job)
460 if err != nil {
461 t.Fatalf("Error %q while creating the job %q", err, jobObj.Name)
462 }
463 validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
464 Active: 1,
465 Ready: ptr.To[int32](0),
466 Terminating: ptr.To[int32](0),
467 })
468
469 op := func(p *v1.Pod) bool {
470 p.Status = test.podStatus
471 return true
472 }
473
474 if err, _ := updateJobPodsStatus(ctx, clientSet, jobObj, op, 1); err != nil {
475 t.Fatalf("Error %q while updating pod status for Job: %q", err, jobObj.Name)
476 }
477
478 if test.restartController {
479 cancel()
480 ctx, cancel = startJobControllerAndWaitForCaches(t, restConfig)
481 }
482
483 validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
484 Active: test.wantActive,
485 Failed: test.wantFailed,
486 Ready: ptr.To[int32](0),
487 Terminating: ptr.To[int32](0),
488 })
489
490 if test.wantJobConditionType == batchv1.JobComplete {
491 if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
492 t.Fatalf("Failed setting phase %q on Job Pod: %q", v1.PodSucceeded, err)
493 }
494 }
495 validateJobCondition(ctx, t, clientSet, jobObj, test.wantJobConditionType)
496 validateCounterMetric(ctx, t, metrics.JobFinishedNum, test.wantJobFinishedMetric)
497 if test.wantPodFailuresHandledByPolicyRuleMetric != nil {
498 validateCounterMetric(ctx, t, metrics.PodFailuresHandledByFailurePolicy, *test.wantPodFailuresHandledByPolicyRuleMetric)
499 }
500 validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
501 })
502 }
503 }
504
505
506
507 func TestSuccessPolicy(t *testing.T) {
508 type podTerminationWithExpectations struct {
509 index int
510 status v1.PodStatus
511 wantActive int
512 wantFailed int
513 wantSucceeded int
514 wantActiveIndexes sets.Set[int]
515 wantCompletedIndexes string
516 wantFailedIndexes *string
517 }
518
519 podTemplateSpec := v1.PodTemplateSpec{
520 Spec: v1.PodSpec{
521 Containers: []v1.Container{
522 {
523 Name: "main-container",
524 Image: "foo",
525 ImagePullPolicy: v1.PullIfNotPresent,
526 TerminationMessagePolicy: v1.TerminationMessageFallbackToLogsOnError,
527 },
528 },
529 },
530 }
531 testCases := map[string]struct {
532 enableJobSuccessPolicy bool
533 enableBackoffLimitPerIndex bool
534 job batchv1.Job
535 podTerminations []podTerminationWithExpectations
536 wantConditionTypes []batchv1.JobConditionType
537 wantJobFinishedNumMetric []metricLabelsWithValue
538 }{
539 "all indexes succeeded; JobSuccessPolicy is enabled": {
540 enableJobSuccessPolicy: true,
541 job: batchv1.Job{
542 Spec: batchv1.JobSpec{
543 Parallelism: ptr.To[int32](1),
544 Completions: ptr.To[int32](1),
545 CompletionMode: completionModePtr(batchv1.IndexedCompletion),
546 Template: podTemplateSpec,
547 SuccessPolicy: &batchv1.SuccessPolicy{
548 Rules: []batchv1.SuccessPolicyRule{{
549 SucceededIndexes: ptr.To("0"),
550 }},
551 },
552 },
553 },
554 podTerminations: []podTerminationWithExpectations{
555 {
556 index: 0,
557 status: v1.PodStatus{
558 Phase: v1.PodSucceeded,
559 },
560 wantActive: 0,
561 wantFailed: 0,
562 wantSucceeded: 1,
563 wantCompletedIndexes: "0",
564 },
565 },
566 wantConditionTypes: []batchv1.JobConditionType{batchv1.JobSuccessCriteriaMet, batchv1.JobComplete},
567 wantJobFinishedNumMetric: []metricLabelsWithValue{
568 {
569 Labels: []string{"Indexed", "succeeded", ""},
570 Value: 1,
571 },
572 },
573 },
574 "all indexes succeeded; JobSuccessPolicy is disabled": {
575 job: batchv1.Job{
576 Spec: batchv1.JobSpec{
577 Parallelism: ptr.To[int32](1),
578 Completions: ptr.To[int32](1),
579 CompletionMode: completionModePtr(batchv1.IndexedCompletion),
580 Template: podTemplateSpec,
581 SuccessPolicy: &batchv1.SuccessPolicy{
582 Rules: []batchv1.SuccessPolicyRule{{
583 SucceededIndexes: ptr.To("0"),
584 }},
585 },
586 },
587 },
588 podTerminations: []podTerminationWithExpectations{
589 {
590 index: 0,
591 status: v1.PodStatus{
592 Phase: v1.PodSucceeded,
593 },
594 wantActive: 0,
595 wantFailed: 0,
596 wantSucceeded: 1,
597 wantCompletedIndexes: "0",
598 },
599 },
600 wantConditionTypes: []batchv1.JobConditionType{batchv1.JobComplete},
601 wantJobFinishedNumMetric: []metricLabelsWithValue{
602 {
603 Labels: []string{"Indexed", "succeeded", ""},
604 Value: 1,
605 },
606 },
607 },
608 "job with successPolicy with succeededIndexes; job has SuccessCriteriaMet and Complete conditions even if some indexes remain pending": {
609 enableJobSuccessPolicy: true,
610 job: batchv1.Job{
611 Spec: batchv1.JobSpec{
612 Parallelism: ptr.To[int32](2),
613 Completions: ptr.To[int32](2),
614 CompletionMode: completionModePtr(batchv1.IndexedCompletion),
615 Template: podTemplateSpec,
616 SuccessPolicy: &batchv1.SuccessPolicy{
617 Rules: []batchv1.SuccessPolicyRule{{
618 SucceededIndexes: ptr.To("1"),
619 }},
620 },
621 },
622 },
623 podTerminations: []podTerminationWithExpectations{
624 {
625 index: 0,
626 status: v1.PodStatus{
627 Phase: v1.PodPending,
628 },
629 wantActive: 2,
630 wantActiveIndexes: sets.New(0, 1),
631 wantFailed: 0,
632 wantSucceeded: 0,
633 },
634 {
635 index: 1,
636 status: v1.PodStatus{
637 Phase: v1.PodSucceeded,
638 },
639 wantActive: 0,
640 wantFailed: 0,
641 wantSucceeded: 1,
642 wantCompletedIndexes: "1",
643 },
644 },
645 wantConditionTypes: []batchv1.JobConditionType{batchv1.JobSuccessCriteriaMet, batchv1.JobComplete},
646 wantJobFinishedNumMetric: []metricLabelsWithValue{
647 {
648 Labels: []string{"Indexed", "succeeded", ""},
649 Value: 1,
650 },
651 },
652 },
653 "job with successPolicy with succeededCount; job has SuccessCriteriaMet and Complete conditions even if some indexes remain pending": {
654 enableJobSuccessPolicy: true,
655 job: batchv1.Job{
656 Spec: batchv1.JobSpec{
657 Parallelism: ptr.To[int32](2),
658 Completions: ptr.To[int32](2),
659 CompletionMode: completionModePtr(batchv1.IndexedCompletion),
660 Template: podTemplateSpec,
661 SuccessPolicy: &batchv1.SuccessPolicy{
662 Rules: []batchv1.SuccessPolicyRule{{
663 SucceededCount: ptr.To[int32](1),
664 }},
665 },
666 },
667 },
668 podTerminations: []podTerminationWithExpectations{
669 {
670 index: 0,
671 status: v1.PodStatus{
672 Phase: v1.PodPending,
673 },
674 wantActive: 2,
675 wantActiveIndexes: sets.New(0, 1),
676 wantFailed: 0,
677 wantSucceeded: 0,
678 },
679 {
680 index: 1,
681 status: v1.PodStatus{
682 Phase: v1.PodSucceeded,
683 },
684 wantActive: 0,
685 wantFailed: 0,
686 wantSucceeded: 1,
687 wantCompletedIndexes: "1",
688 },
689 },
690 wantConditionTypes: []batchv1.JobConditionType{batchv1.JobSuccessCriteriaMet, batchv1.JobComplete},
691 wantJobFinishedNumMetric: []metricLabelsWithValue{
692 {
693 Labels: []string{"Indexed", "succeeded", ""},
694 Value: 1,
695 },
696 },
697 },
698 "job with successPolicy and backoffLimitPerIndex; job has a Failed condition if job meets backoffLimitPerIndex": {
699 enableJobSuccessPolicy: true,
700 enableBackoffLimitPerIndex: true,
701 job: batchv1.Job{
702 Spec: batchv1.JobSpec{
703 Parallelism: ptr.To[int32](2),
704 Completions: ptr.To[int32](2),
705 CompletionMode: completionModePtr(batchv1.IndexedCompletion),
706 BackoffLimitPerIndex: ptr.To[int32](0),
707 Template: podTemplateSpec,
708 SuccessPolicy: &batchv1.SuccessPolicy{
709 Rules: []batchv1.SuccessPolicyRule{{
710 SucceededCount: ptr.To[int32](1),
711 }},
712 },
713 },
714 },
715 podTerminations: []podTerminationWithExpectations{
716 {
717 index: 0,
718 status: v1.PodStatus{
719 Phase: v1.PodFailed,
720 },
721 wantActive: 1,
722 wantActiveIndexes: sets.New(1),
723 wantFailed: 1,
724 wantFailedIndexes: ptr.To("0"),
725 wantSucceeded: 0,
726 },
727 {
728 index: 1,
729 status: v1.PodStatus{
730 Phase: v1.PodSucceeded,
731 },
732 wantActive: 0,
733 wantFailed: 1,
734 wantSucceeded: 1,
735 wantFailedIndexes: ptr.To("0"),
736 wantCompletedIndexes: "1",
737 },
738 },
739 wantConditionTypes: []batchv1.JobConditionType{batchv1.JobFailed},
740 },
741 }
742 for name, tc := range testCases {
743 t.Run(name, func(t *testing.T) {
744 resetMetrics()
745 defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobSuccessPolicy, tc.enableJobSuccessPolicy)()
746 defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, tc.enableBackoffLimitPerIndex)()
747
748 closeFn, restConfig, clientSet, ns := setup(t, "simple")
749 defer closeFn()
750 ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
751 defer func() {
752 cancel()
753 }()
754 jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &tc.job)
755 if err != nil {
756 t.Fatalf("Error %v while creating the Job %q", err, jobObj.Name)
757 }
758 validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
759 Active: int(*tc.job.Spec.Parallelism),
760 Ready: ptr.To[int32](0),
761 Terminating: ptr.To[int32](0),
762 })
763 for _, podTermination := range tc.podTerminations {
764 pod, err := getActivePodForIndex(ctx, clientSet, jobObj, podTermination.index)
765 if err != nil {
766 t.Fatalf("Listing Job Pods: %v", err)
767 }
768 pod.Status = podTermination.status
769 if _, err = clientSet.CoreV1().Pods(pod.Namespace).UpdateStatus(ctx, pod, metav1.UpdateOptions{}); err != nil {
770 t.Fatalf("Error updating the Pod %q: %v", klog.KObj(pod), err)
771 }
772 validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
773 Active: podTermination.wantActive,
774 Succeeded: podTermination.wantSucceeded,
775 Failed: podTermination.wantFailed,
776 Ready: ptr.To[int32](0),
777 Terminating: ptr.To[int32](0),
778 })
779 validateIndexedJobPods(ctx, t, clientSet, jobObj, podTermination.wantActiveIndexes, podTermination.wantCompletedIndexes, podTermination.wantFailedIndexes)
780 }
781 for i := range tc.wantConditionTypes {
782 validateJobCondition(ctx, t, clientSet, jobObj, tc.wantConditionTypes[i])
783 }
784 for i := range tc.wantJobFinishedNumMetric {
785 validateCounterMetric(ctx, t, metrics.JobFinishedNum, tc.wantJobFinishedNumMetric[i])
786 }
787 validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
788 })
789 }
790 }
791
792
793
794 func TestSuccessPolicy_ReEnabling(t *testing.T) {
795 defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobSuccessPolicy, true)()
796 closeFn, resetConfig, clientSet, ns := setup(t, "success-policy-re-enabling")
797 defer closeFn()
798 ctx, cancel := startJobControllerAndWaitForCaches(t, resetConfig)
799 defer cancel()
800 resetMetrics()
801
802 jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
803 Spec: batchv1.JobSpec{
804 Parallelism: ptr.To[int32](5),
805 Completions: ptr.To[int32](5),
806 CompletionMode: completionModePtr(batchv1.IndexedCompletion),
807 SuccessPolicy: &batchv1.SuccessPolicy{
808 Rules: []batchv1.SuccessPolicyRule{{
809 SucceededCount: ptr.To[int32](3),
810 }},
811 },
812 },
813 })
814 if err != nil {
815 t.Fatalf("Failed to create Job: %v", err)
816 }
817 validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
818 Active: 5,
819 Ready: ptr.To[int32](0),
820 Terminating: ptr.To[int32](0),
821 })
822 validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(0, 1, 2, 3, 4), "", nil)
823
824
825 if err = setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodSucceeded, 0); err != nil {
826 t.Fatalf("Failed tring to succeess pod with index 0")
827 }
828 validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
829 Active: 4,
830 Succeeded: 1,
831 Ready: ptr.To[int32](0),
832 Terminating: ptr.To[int32](0),
833 })
834 validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(1, 2, 3, 4), "0", nil)
835
836
837 defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobSuccessPolicy, false)()
838
839
840 if err = setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
841 t.Fatalf("Failed trying to succeess pod with index 1")
842 }
843 validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
844 Active: 3,
845 Succeeded: 2,
846 Ready: ptr.To[int32](0),
847 Terminating: ptr.To[int32](0),
848 })
849 validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(2, 3, 4), "0,1", nil)
850
851
852 defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobSuccessPolicy, true)()
853
854
855 if err = setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodSucceeded, 2); err != nil {
856 t.Fatalf("Failed trying to success pod with index 2")
857 }
858
859
860 validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
861 Active: 0,
862 Succeeded: 3,
863 Ready: ptr.To[int32](0),
864 Terminating: ptr.To[int32](0),
865 })
866 validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New[int](), "0-2", nil)
867
868 validateJobCondition(ctx, t, clientSet, jobObj, batchv1.JobSuccessCriteriaMet)
869 validateJobComplete(ctx, t, clientSet, jobObj)
870 validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
871 }
872
873
874
875
876 func TestBackoffLimitPerIndex_DelayedPodDeletion(t *testing.T) {
877 t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, fastPodFailureBackoff))
878
879 defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, true)()
880 closeFn, restConfig, clientSet, ns := setup(t, "backoff-limit-per-index-failed")
881 defer closeFn()
882 ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
883 defer func() {
884 cancel()
885 }()
886
887 jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
888 Spec: batchv1.JobSpec{
889 Parallelism: ptr.To[int32](1),
890 Completions: ptr.To[int32](1),
891 BackoffLimitPerIndex: ptr.To[int32](1),
892 CompletionMode: completionModePtr(batchv1.IndexedCompletion),
893 },
894 })
895 if err != nil {
896 t.Fatalf("Failed to create Job: %v", err)
897 }
898 validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
899 Active: 1,
900 Ready: ptr.To[int32](0),
901 Terminating: ptr.To[int32](0),
902 })
903 validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(0), "", ptr.To(""))
904
905
906 if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodFailed, 0); err != nil {
907 t.Fatal("Failed trying to fail pod with index 0")
908 }
909
910 pod, err := getJobPodForIndex(ctx, clientSet, jobObj, 0, func(_ *v1.Pod) bool { return true })
911 if err != nil {
912 t.Fatalf("failed to get terminal pod for index: %v", 0)
913 }
914 if err := clientSet.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}); err != nil {
915 t.Fatalf("failed to delete pod: %v, error: %v", klog.KObj(pod), err)
916 }
917
918 validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
919 Active: 1,
920 Failed: 1,
921 Ready: ptr.To[int32](0),
922 Terminating: ptr.To[int32](0),
923 })
924 validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(0), "", ptr.To(""))
925
926
927
928 replacement, err := getActivePodForIndex(ctx, clientSet, jobObj, 0)
929 if err != nil {
930 t.Fatalf("Failed to get active replacement pod for index: %v, error: %v", 0, err)
931 }
932 gotIndexFailureCount, err := getIndexFailureCount(replacement)
933 if err != nil {
934 t.Fatalf("Failed read the index failure count annotation for pod: %v, error: %v", klog.KObj(replacement), err)
935 }
936 if diff := cmp.Diff(1, gotIndexFailureCount); diff != "" {
937 t.Errorf("Unexpected index failure count for the replacement pod: %s", diff)
938 }
939 if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodSucceeded, 0); err != nil {
940 t.Fatal("Failed trying to fail pod with index 0")
941 }
942 validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
943 Active: 0,
944 Succeeded: 1,
945 Failed: 1,
946 Ready: ptr.To[int32](0),
947 Terminating: ptr.To[int32](0),
948 })
949 validateJobComplete(ctx, t, clientSet, jobObj)
950 }
951
952
953
954 func TestBackoffLimitPerIndex_Reenabling(t *testing.T) {
955 t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, fastPodFailureBackoff))
956
957 defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, true)()
958 closeFn, restConfig, clientSet, ns := setup(t, "backoff-limit-per-index-reenabled")
959 defer closeFn()
960 ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
961 defer cancel()
962 resetMetrics()
963
964 jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
965 Spec: batchv1.JobSpec{
966 Parallelism: ptr.To[int32](3),
967 Completions: ptr.To[int32](3),
968 BackoffLimitPerIndex: ptr.To[int32](0),
969 CompletionMode: completionModePtr(batchv1.IndexedCompletion),
970 },
971 })
972 if err != nil {
973 t.Fatalf("Failed to create Job: %v", err)
974 }
975 validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
976 Active: 3,
977 Ready: ptr.To[int32](0),
978 Terminating: ptr.To[int32](0),
979 })
980 validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(0, 1, 2), "", ptr.To(""))
981
982
983 if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodFailed, 0); err != nil {
984 t.Fatal("Failed trying to fail pod with index 0")
985 }
986 validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
987 Active: 2,
988 Failed: 1,
989 Ready: ptr.To[int32](0),
990 Terminating: ptr.To[int32](0),
991 })
992 validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(1, 2), "", ptr.To("0"))
993
994
995 defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, false)()
996
997
998 if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
999 t.Fatal("Failed trying to fail pod with index 1")
1000 }
1001 validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
1002 Active: 3,
1003 Failed: 2,
1004 Ready: ptr.To[int32](0),
1005 Terminating: ptr.To[int32](0),
1006 })
1007 validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(0, 1, 2), "", nil)
1008
1009
1010 defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, true)()
1011
1012
1013 if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
1014 t.Fatal("Failed trying to fail pod with index 2")
1015 }
1016
1017
1018
1019 validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
1020 Active: 2,
1021 Failed: 3,
1022 Ready: ptr.To[int32](0),
1023 Terminating: ptr.To[int32](0),
1024 })
1025 validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(0, 1), "", ptr.To("2"))
1026
1027
1028 if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 2); err != nil {
1029 t.Fatalf("Failed setting phase %q on Job Pod: %q", v1.PodSucceeded, err)
1030 }
1031 validateJobFailed(ctx, t, clientSet, jobObj)
1032 validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
1033 }
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044 func TestBackoffLimitPerIndex_JobPodsCreatedWithExponentialBackoff(t *testing.T) {
1045 defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, true)()
1046 t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, 2*time.Second))
1047
1048 closeFn, restConfig, clientSet, ns := setup(t, "simple")
1049 defer closeFn()
1050 ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
1051 defer cancel()
1052
1053 jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
1054 Spec: batchv1.JobSpec{
1055 Completions: ptr.To[int32](2),
1056 Parallelism: ptr.To[int32](2),
1057 BackoffLimitPerIndex: ptr.To[int32](2),
1058 CompletionMode: completionModePtr(batchv1.IndexedCompletion),
1059 },
1060 })
1061 if err != nil {
1062 t.Fatalf("Could not create job: %v", err)
1063 }
1064 validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
1065 Active: 2,
1066 Ready: ptr.To[int32](0),
1067 Terminating: ptr.To[int32](0),
1068 })
1069 validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(0, 1), "", ptr.To(""))
1070
1071
1072 if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodFailed, 0); err != nil {
1073 t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodFailed, err)
1074 }
1075 validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
1076 Active: 2,
1077 Failed: 1,
1078 Ready: ptr.To[int32](0),
1079 Terminating: ptr.To[int32](0),
1080 })
1081 validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(0, 1), "", ptr.To(""))
1082
1083
1084 if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodFailed, 0); err != nil {
1085 t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodFailed, err)
1086 }
1087 validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
1088 Active: 2,
1089 Failed: 2,
1090 Ready: ptr.To[int32](0),
1091 Terminating: ptr.To[int32](0),
1092 })
1093 validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(0, 1), "", ptr.To(""))
1094
1095
1096 if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
1097 t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodFailed, err)
1098 }
1099 validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
1100 Active: 2,
1101 Failed: 3,
1102 Ready: ptr.To[int32](0),
1103 Terminating: ptr.To[int32](0),
1104 })
1105 validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(0, 1), "", ptr.To(""))
1106
1107
1108 if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodSucceeded, 0); err != nil {
1109 t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err)
1110 }
1111 validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
1112 Active: 1,
1113 Failed: 3,
1114 Succeeded: 1,
1115 Ready: ptr.To[int32](0),
1116 Terminating: ptr.To[int32](0),
1117 })
1118 validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(1), "0", ptr.To(""))
1119
1120
1121 if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
1122 t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodFailed, err)
1123 }
1124 validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
1125 Active: 1,
1126 Failed: 4,
1127 Succeeded: 1,
1128 Ready: ptr.To[int32](0),
1129 Terminating: ptr.To[int32](0),
1130 })
1131 validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(1), "0", ptr.To(""))
1132
1133
1134 if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
1135 t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err)
1136 }
1137 validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
1138 Active: 0,
1139 Failed: 4,
1140 Succeeded: 2,
1141 Ready: ptr.To[int32](0),
1142 Terminating: ptr.To[int32](0),
1143 })
1144 validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New[int](), "0,1", ptr.To(""))
1145 validateJobComplete(ctx, t, clientSet, jobObj)
1146
1147 for index := 0; index < int(*jobObj.Spec.Completions); index++ {
1148 podsForIndex, err := getJobPodsForIndex(ctx, clientSet, jobObj, index, func(_ *v1.Pod) bool { return true })
1149 if err != nil {
1150 t.Fatalf("Failed to list job %q pods for index %v, error: %v", klog.KObj(jobObj), index, err)
1151 }
1152 validateExpotentialBackoffDelay(t, jobcontroller.DefaultJobPodFailureBackOff, podsForIndex)
1153 }
1154 }
1155
1156
1157
1158 func TestBackoffLimitPerIndex(t *testing.T) {
1159 t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, fastPodFailureBackoff))
1160
1161 type podTerminationWithExpectations struct {
1162 index int
1163 status v1.PodStatus
1164 wantActive int
1165 wantFailed int
1166 wantSucceeded int
1167 wantActiveIndexes sets.Set[int]
1168 wantCompletedIndexes string
1169 wantFailedIndexes *string
1170 wantReplacementPodFailureCount *int
1171 }
1172
1173 podTemplateSpec := v1.PodTemplateSpec{
1174 Spec: v1.PodSpec{
1175 Containers: []v1.Container{
1176 {
1177 Name: "main-container",
1178 Image: "foo",
1179 ImagePullPolicy: v1.PullIfNotPresent,
1180 TerminationMessagePolicy: v1.TerminationMessageFallbackToLogsOnError,
1181 },
1182 },
1183 },
1184 }
1185 testCases := map[string]struct {
1186 job batchv1.Job
1187 podTerminations []podTerminationWithExpectations
1188 wantJobConditionType batchv1.JobConditionType
1189 wantJobFinishedIndexesTotalMetric []metricLabelsWithValue
1190 }{
1191 "job succeeded": {
1192 job: batchv1.Job{
1193 Spec: batchv1.JobSpec{
1194 Parallelism: ptr.To[int32](2),
1195 Completions: ptr.To[int32](2),
1196 CompletionMode: completionModePtr(batchv1.IndexedCompletion),
1197 BackoffLimitPerIndex: ptr.To[int32](1),
1198 Template: podTemplateSpec,
1199 },
1200 },
1201 podTerminations: []podTerminationWithExpectations{
1202 {
1203 status: v1.PodStatus{
1204 Phase: v1.PodFailed,
1205 },
1206 wantActive: 2,
1207 wantFailed: 1,
1208 wantActiveIndexes: sets.New(0, 1),
1209 wantFailedIndexes: ptr.To(""),
1210 wantReplacementPodFailureCount: ptr.To(1),
1211 },
1212 },
1213 wantJobConditionType: batchv1.JobComplete,
1214 wantJobFinishedIndexesTotalMetric: []metricLabelsWithValue{
1215 {
1216 Labels: []string{"succeeded", "perIndex"},
1217 Value: 2,
1218 },
1219 },
1220 },
1221 "job index fails due to exceeding backoff limit per index": {
1222 job: batchv1.Job{
1223 Spec: batchv1.JobSpec{
1224 Parallelism: ptr.To[int32](2),
1225 Completions: ptr.To[int32](2),
1226 CompletionMode: completionModePtr(batchv1.IndexedCompletion),
1227 BackoffLimitPerIndex: ptr.To[int32](2),
1228 Template: podTemplateSpec,
1229 },
1230 },
1231 podTerminations: []podTerminationWithExpectations{
1232 {
1233 status: v1.PodStatus{
1234 Phase: v1.PodFailed,
1235 },
1236 wantActive: 2,
1237 wantFailed: 1,
1238 wantActiveIndexes: sets.New(0, 1),
1239 wantFailedIndexes: ptr.To(""),
1240 wantReplacementPodFailureCount: ptr.To(1),
1241 },
1242 {
1243 status: v1.PodStatus{
1244 Phase: v1.PodFailed,
1245 },
1246 wantActive: 2,
1247 wantFailed: 2,
1248 wantActiveIndexes: sets.New(0, 1),
1249 wantFailedIndexes: ptr.To(""),
1250 wantReplacementPodFailureCount: ptr.To(2),
1251 },
1252 {
1253 status: v1.PodStatus{
1254 Phase: v1.PodFailed,
1255 },
1256 wantActive: 1,
1257 wantFailed: 3,
1258 wantActiveIndexes: sets.New(1),
1259 wantFailedIndexes: ptr.To("0"),
1260 },
1261 },
1262 wantJobConditionType: batchv1.JobFailed,
1263 wantJobFinishedIndexesTotalMetric: []metricLabelsWithValue{
1264 {
1265 Labels: []string{"failed", "perIndex"},
1266 Value: 1,
1267 },
1268 {
1269 Labels: []string{"succeeded", "perIndex"},
1270 Value: 1,
1271 },
1272 },
1273 },
1274 "job index fails due to exceeding the global backoff limit first": {
1275 job: batchv1.Job{
1276 Spec: batchv1.JobSpec{
1277 Parallelism: ptr.To[int32](3),
1278 Completions: ptr.To[int32](3),
1279 CompletionMode: completionModePtr(batchv1.IndexedCompletion),
1280 BackoffLimitPerIndex: ptr.To[int32](1),
1281 BackoffLimit: ptr.To[int32](2),
1282 Template: podTemplateSpec,
1283 },
1284 },
1285 podTerminations: []podTerminationWithExpectations{
1286 {
1287 index: 0,
1288 status: v1.PodStatus{
1289 Phase: v1.PodFailed,
1290 },
1291 wantActive: 3,
1292 wantFailed: 1,
1293 wantActiveIndexes: sets.New(0, 1, 2),
1294 wantFailedIndexes: ptr.To(""),
1295 },
1296 {
1297 index: 1,
1298 status: v1.PodStatus{
1299 Phase: v1.PodFailed,
1300 },
1301 wantActive: 3,
1302 wantFailed: 2,
1303 wantActiveIndexes: sets.New(0, 1, 2),
1304 wantFailedIndexes: ptr.To(""),
1305 },
1306 {
1307 index: 2,
1308 status: v1.PodStatus{
1309 Phase: v1.PodFailed,
1310 },
1311 wantFailed: 5,
1312 wantFailedIndexes: ptr.To(""),
1313 },
1314 },
1315 wantJobConditionType: batchv1.JobFailed,
1316 wantJobFinishedIndexesTotalMetric: []metricLabelsWithValue{
1317 {
1318 Labels: []string{"succeeded", "perIndex"},
1319 Value: 0,
1320 },
1321 {
1322 Labels: []string{"failed", "perIndex"},
1323 Value: 0,
1324 },
1325 },
1326 },
1327 "job continues execution after a failed index, the job is marked Failed due to the failed index": {
1328 job: batchv1.Job{
1329 Spec: batchv1.JobSpec{
1330 Parallelism: ptr.To[int32](2),
1331 Completions: ptr.To[int32](2),
1332 CompletionMode: completionModePtr(batchv1.IndexedCompletion),
1333 BackoffLimitPerIndex: ptr.To[int32](0),
1334 Template: podTemplateSpec,
1335 },
1336 },
1337 podTerminations: []podTerminationWithExpectations{
1338 {
1339 index: 0,
1340 status: v1.PodStatus{
1341 Phase: v1.PodFailed,
1342 },
1343 wantActive: 1,
1344 wantFailed: 1,
1345 wantActiveIndexes: sets.New(1),
1346 wantFailedIndexes: ptr.To("0"),
1347 },
1348 {
1349 index: 1,
1350 status: v1.PodStatus{
1351 Phase: v1.PodSucceeded,
1352 },
1353 wantFailed: 1,
1354 wantSucceeded: 1,
1355 wantFailedIndexes: ptr.To("0"),
1356 wantCompletedIndexes: "1",
1357 },
1358 },
1359 wantJobConditionType: batchv1.JobFailed,
1360 wantJobFinishedIndexesTotalMetric: []metricLabelsWithValue{
1361 {
1362 Labels: []string{"succeeded", "perIndex"},
1363 Value: 1,
1364 },
1365 {
1366 Labels: []string{"failed", "perIndex"},
1367 Value: 1,
1368 },
1369 },
1370 },
1371 "job execution terminated early due to exceeding max failed indexes": {
1372 job: batchv1.Job{
1373 Spec: batchv1.JobSpec{
1374 Parallelism: ptr.To[int32](3),
1375 Completions: ptr.To[int32](3),
1376 CompletionMode: completionModePtr(batchv1.IndexedCompletion),
1377 BackoffLimitPerIndex: ptr.To[int32](0),
1378 MaxFailedIndexes: ptr.To[int32](1),
1379 Template: podTemplateSpec,
1380 },
1381 },
1382 podTerminations: []podTerminationWithExpectations{
1383 {
1384 index: 0,
1385 status: v1.PodStatus{
1386 Phase: v1.PodFailed,
1387 },
1388 wantActive: 2,
1389 wantFailed: 1,
1390 wantActiveIndexes: sets.New(1, 2),
1391 wantFailedIndexes: ptr.To("0"),
1392 },
1393 {
1394 index: 1,
1395 status: v1.PodStatus{
1396 Phase: v1.PodFailed,
1397 },
1398 wantActive: 0,
1399 wantFailed: 3,
1400 wantFailedIndexes: ptr.To("0,1"),
1401 },
1402 },
1403 wantJobConditionType: batchv1.JobFailed,
1404 wantJobFinishedIndexesTotalMetric: []metricLabelsWithValue{
1405 {
1406 Labels: []string{"failed", "perIndex"},
1407 Value: 2,
1408 },
1409 },
1410 },
1411 "pod failure matching pod failure policy rule with FailIndex action": {
1412 job: batchv1.Job{
1413 Spec: batchv1.JobSpec{
1414 Parallelism: ptr.To[int32](2),
1415 Completions: ptr.To[int32](2),
1416 CompletionMode: completionModePtr(batchv1.IndexedCompletion),
1417 BackoffLimitPerIndex: ptr.To[int32](1),
1418 Template: podTemplateSpec,
1419 PodFailurePolicy: &batchv1.PodFailurePolicy{
1420 Rules: []batchv1.PodFailurePolicyRule{
1421 {
1422 Action: batchv1.PodFailurePolicyActionFailIndex,
1423 OnExitCodes: &batchv1.PodFailurePolicyOnExitCodesRequirement{
1424 Operator: batchv1.PodFailurePolicyOnExitCodesOpIn,
1425 Values: []int32{13},
1426 },
1427 },
1428 {
1429 Action: batchv1.PodFailurePolicyActionFailIndex,
1430 OnPodConditions: []batchv1.PodFailurePolicyOnPodConditionsPattern{
1431 {
1432 Type: v1.DisruptionTarget,
1433 Status: v1.ConditionTrue,
1434 },
1435 },
1436 },
1437 },
1438 },
1439 },
1440 },
1441 podTerminations: []podTerminationWithExpectations{
1442 {
1443 index: 0,
1444 status: v1.PodStatus{
1445 Phase: v1.PodFailed,
1446 ContainerStatuses: []v1.ContainerStatus{
1447 {
1448 State: v1.ContainerState{
1449 Terminated: &v1.ContainerStateTerminated{
1450 ExitCode: 13,
1451 },
1452 },
1453 },
1454 },
1455 },
1456 wantActive: 1,
1457 wantFailed: 1,
1458 wantActiveIndexes: sets.New(1),
1459 wantFailedIndexes: ptr.To("0"),
1460 },
1461 {
1462 index: 1,
1463 status: v1.PodStatus{
1464 Phase: v1.PodFailed,
1465 Conditions: []v1.PodCondition{
1466 {
1467 Type: v1.DisruptionTarget,
1468 Status: v1.ConditionTrue,
1469 },
1470 },
1471 },
1472 wantFailed: 2,
1473 wantFailedIndexes: ptr.To("0,1"),
1474 },
1475 },
1476 wantJobConditionType: batchv1.JobFailed,
1477 wantJobFinishedIndexesTotalMetric: []metricLabelsWithValue{
1478 {
1479 Labels: []string{"failed", "perIndex"},
1480 Value: 2,
1481 },
1482 },
1483 },
1484 }
1485 for name, test := range testCases {
1486 t.Run(name, func(t *testing.T) {
1487 resetMetrics()
1488 defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, true)()
1489 defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, true)()
1490
1491 closeFn, restConfig, clientSet, ns := setup(t, "simple")
1492 defer closeFn()
1493 ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
1494 defer func() {
1495 cancel()
1496 }()
1497 jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &test.job)
1498 if err != nil {
1499 t.Fatalf("Error %q while creating the job %q", err, jobObj.Name)
1500 }
1501 validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
1502 Active: int(*test.job.Spec.Parallelism),
1503 Ready: ptr.To[int32](0),
1504 Terminating: ptr.To[int32](0),
1505 })
1506 for _, podTermination := range test.podTerminations {
1507 pod, err := getActivePodForIndex(ctx, clientSet, jobObj, podTermination.index)
1508 if err != nil {
1509 t.Fatalf("listing Job Pods: %q", err)
1510 }
1511 pod.Status = podTermination.status
1512 if _, err = clientSet.CoreV1().Pods(pod.Namespace).UpdateStatus(ctx, pod, metav1.UpdateOptions{}); err != nil {
1513 t.Fatalf("Error updating the pod %q: %q", klog.KObj(pod), err)
1514 }
1515 validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
1516 Active: podTermination.wantActive,
1517 Succeeded: podTermination.wantSucceeded,
1518 Failed: podTermination.wantFailed,
1519 Ready: ptr.To[int32](0),
1520 Terminating: ptr.To[int32](0),
1521 })
1522 validateIndexedJobPods(ctx, t, clientSet, jobObj, podTermination.wantActiveIndexes, podTermination.wantCompletedIndexes, podTermination.wantFailedIndexes)
1523 if podTermination.wantReplacementPodFailureCount != nil {
1524 replacement, err := getActivePodForIndex(ctx, clientSet, jobObj, podTermination.index)
1525 if err != nil {
1526 t.Fatalf("Failed to get active replacement pod for index: %v, error: %v", podTermination.index, err)
1527 }
1528 gotReplacementPodFailureCount, err := getIndexFailureCount(replacement)
1529 if err != nil {
1530 t.Fatalf("Failed read the index failure count annotation for pod: %v, error: %v", klog.KObj(replacement), err)
1531 }
1532 if *podTermination.wantReplacementPodFailureCount != gotReplacementPodFailureCount {
1533 t.Fatalf("Unexpected value of the index failure count annotation. Want: %v, got: %v", *podTermination.wantReplacementPodFailureCount, gotReplacementPodFailureCount)
1534 }
1535 }
1536 }
1537
1538 remainingActive := test.podTerminations[len(test.podTerminations)-1].wantActive
1539 if remainingActive > 0 {
1540 if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, remainingActive); err != nil {
1541 t.Fatalf("Failed setting phase %q on Job Pod: %q", v1.PodSucceeded, err)
1542 }
1543 }
1544 validateJobCondition(ctx, t, clientSet, jobObj, test.wantJobConditionType)
1545 for _, wantMetricValue := range test.wantJobFinishedIndexesTotalMetric {
1546 validateCounterMetric(ctx, t, metrics.JobFinishedIndexesTotal, wantMetricValue)
1547 }
1548 validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
1549 })
1550 }
1551 }
1552
1553
1554
1555
1556 func TestManagedBy(t *testing.T) {
1557 customControllerName := "example.com/custom-job-controller"
1558 podTemplateSpec := v1.PodTemplateSpec{
1559 Spec: v1.PodSpec{
1560 Containers: []v1.Container{
1561 {
1562 Name: "main-container",
1563 Image: "foo",
1564 },
1565 },
1566 },
1567 }
1568 testCases := map[string]struct {
1569 enableJobManagedBy bool
1570 job batchv1.Job
1571 wantReconciledByBuiltInController bool
1572 wantJobByExternalControllerTotalMetric metricLabelsWithValue
1573 }{
1574 "the Job controller reconciles jobs without the managedBy": {
1575 enableJobManagedBy: true,
1576 job: batchv1.Job{
1577 Spec: batchv1.JobSpec{
1578 Template: podTemplateSpec,
1579 },
1580 },
1581 wantReconciledByBuiltInController: true,
1582 wantJobByExternalControllerTotalMetric: metricLabelsWithValue{
1583
1584
1585
1586 Labels: []string{batchv1.JobControllerName},
1587 Value: 0,
1588 },
1589 },
1590 "the Job controller reconciles jobs with the well known value of the managedBy field": {
1591 enableJobManagedBy: true,
1592 job: batchv1.Job{
1593 Spec: batchv1.JobSpec{
1594 Template: podTemplateSpec,
1595 ManagedBy: ptr.To(batchv1.JobControllerName),
1596 },
1597 },
1598 wantReconciledByBuiltInController: true,
1599 wantJobByExternalControllerTotalMetric: metricLabelsWithValue{
1600 Labels: []string{batchv1.JobControllerName},
1601 Value: 0,
1602 },
1603 },
1604 "the Job controller reconciles an unsuspended with the custom value of managedBy; feature disabled": {
1605 enableJobManagedBy: false,
1606 job: batchv1.Job{
1607 Spec: batchv1.JobSpec{
1608 Template: podTemplateSpec,
1609 ManagedBy: ptr.To(customControllerName),
1610 },
1611 },
1612 wantReconciledByBuiltInController: true,
1613 wantJobByExternalControllerTotalMetric: metricLabelsWithValue{
1614 Labels: []string{customControllerName},
1615 Value: 0,
1616 },
1617 },
1618 "the Job controller does not reconcile an unsuspended with the custom value of managedBy": {
1619 enableJobManagedBy: true,
1620 job: batchv1.Job{
1621 Spec: batchv1.JobSpec{
1622 Suspend: ptr.To(false),
1623 Template: podTemplateSpec,
1624 ManagedBy: ptr.To(customControllerName),
1625 },
1626 },
1627 wantReconciledByBuiltInController: false,
1628 wantJobByExternalControllerTotalMetric: metricLabelsWithValue{
1629 Labels: []string{customControllerName},
1630 Value: 1,
1631 },
1632 },
1633 "the Job controller does not reconcile a suspended with the custom value of managedBy": {
1634 enableJobManagedBy: true,
1635 job: batchv1.Job{
1636 Spec: batchv1.JobSpec{
1637 Suspend: ptr.To(true),
1638 Template: podTemplateSpec,
1639 ManagedBy: ptr.To(customControllerName),
1640 },
1641 },
1642 wantReconciledByBuiltInController: false,
1643 wantJobByExternalControllerTotalMetric: metricLabelsWithValue{
1644 Labels: []string{customControllerName},
1645 Value: 1,
1646 },
1647 },
1648 }
1649 for name, test := range testCases {
1650 t.Run(name, func(t *testing.T) {
1651 resetMetrics()
1652 defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobManagedBy, test.enableJobManagedBy)()
1653
1654 closeFn, restConfig, clientSet, ns := setup(t, "managed-by")
1655 defer closeFn()
1656 ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
1657 defer cancel()
1658 jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &test.job)
1659 if err != nil {
1660 t.Fatalf("Error %v while creating the job %q", err, klog.KObj(jobObj))
1661 }
1662
1663 if test.wantReconciledByBuiltInController {
1664 validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
1665 Active: int(*jobObj.Spec.Parallelism),
1666 Ready: ptr.To[int32](0),
1667 Terminating: ptr.To[int32](0),
1668 })
1669 validateCounterMetric(ctx, t, metrics.JobByExternalControllerTotal, test.wantJobByExternalControllerTotalMetric)
1670 } else {
1671 validateCounterMetric(ctx, t, metrics.JobByExternalControllerTotal, test.wantJobByExternalControllerTotalMetric)
1672
1673 time.Sleep(sleepDurationForControllerLatency)
1674 jobObj, err = clientSet.BatchV1().Jobs(jobObj.Namespace).Get(ctx, jobObj.Name, metav1.GetOptions{})
1675 if err != nil {
1676 t.Fatalf("Error %v when getting the latest job %v", err, klog.KObj(jobObj))
1677 }
1678 if diff := cmp.Diff(batchv1.JobStatus{}, jobObj.Status); diff != "" {
1679 t.Fatalf("Unexpected status (-want/+got): %s", diff)
1680 }
1681 }
1682 })
1683 }
1684 }
1685
1686
1687
1688
1689
1690
1691 func TestManagedBy_Reenabling(t *testing.T) {
1692 customControllerName := "example.com/custom-job-controller"
1693 defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobManagedBy, true)()
1694
1695 closeFn, restConfig, clientSet, ns := setup(t, "managed-by-reenabling")
1696 defer closeFn()
1697 ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
1698 defer func() {
1699 cancel()
1700 }()
1701 resetMetrics()
1702
1703 baseJob := batchv1.Job{
1704 ObjectMeta: metav1.ObjectMeta{
1705 Name: "custom-job-test",
1706 Namespace: ns.Name,
1707 },
1708 Spec: batchv1.JobSpec{
1709 Completions: ptr.To[int32](1),
1710 Parallelism: ptr.To[int32](1),
1711 Template: v1.PodTemplateSpec{
1712 Spec: v1.PodSpec{
1713 Containers: []v1.Container{
1714 {
1715 Name: "main-container",
1716 Image: "foo",
1717 },
1718 },
1719 },
1720 },
1721 ManagedBy: &customControllerName,
1722 },
1723 }
1724 jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &baseJob)
1725 if err != nil {
1726 t.Fatalf("Error %v when creating the job %q", err, klog.KObj(jobObj))
1727 }
1728 jobClient := clientSet.BatchV1().Jobs(jobObj.Namespace)
1729
1730 validateCounterMetric(ctx, t, metrics.JobByExternalControllerTotal, metricLabelsWithValue{
1731 Labels: []string{customControllerName},
1732 Value: 1,
1733 })
1734
1735 time.Sleep(sleepDurationForControllerLatency)
1736 jobObj, err = jobClient.Get(ctx, jobObj.Name, metav1.GetOptions{})
1737 if err != nil {
1738 t.Fatalf("Error %v when getting the latest job %v", err, klog.KObj(jobObj))
1739 }
1740 if diff := cmp.Diff(batchv1.JobStatus{}, jobObj.Status); diff != "" {
1741 t.Fatalf("Unexpected status (-want/+got): %s", diff)
1742 }
1743
1744
1745 defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobManagedBy, false)()
1746 cancel()
1747 resetMetrics()
1748 ctx, cancel = startJobControllerAndWaitForCaches(t, restConfig)
1749
1750
1751 validateJobsPodsStatusOnly(ctx, t, clientSet, jobObj, podsByStatus{
1752 Active: 1,
1753 Ready: ptr.To[int32](0),
1754 Terminating: ptr.To[int32](0),
1755 })
1756
1757 validateCounterMetric(ctx, t, metrics.JobByExternalControllerTotal, metricLabelsWithValue{
1758 Labels: []string{customControllerName},
1759 Value: 0,
1760 })
1761
1762
1763 defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobManagedBy, true)()
1764 cancel()
1765 resetMetrics()
1766 ctx, cancel = startJobControllerAndWaitForCaches(t, restConfig)
1767
1768
1769 if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
1770 t.Fatalf("Error %v when setting phase %s on the pod of job %v", err, v1.PodSucceeded, klog.KObj(jobObj))
1771 }
1772
1773 validateCounterMetric(ctx, t, metrics.JobByExternalControllerTotal, metricLabelsWithValue{
1774 Labels: []string{customControllerName},
1775 Value: 1,
1776 })
1777
1778 time.Sleep(sleepDurationForControllerLatency)
1779 validateJobsPodsStatusOnly(ctx, t, clientSet, jobObj, podsByStatus{
1780 Active: 1,
1781 Ready: ptr.To[int32](0),
1782 Terminating: ptr.To[int32](0),
1783 })
1784 }
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794 func TestManagedBy_RecreatedJob(t *testing.T) {
1795 customControllerName := "example.com/custom-job-controller"
1796 defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobManagedBy, true)()
1797
1798 closeFn, restConfig, clientSet, ns := setup(t, "managed-by-recreate-job")
1799 defer closeFn()
1800 ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
1801 defer cancel()
1802 resetMetrics()
1803
1804 baseJob := batchv1.Job{
1805 ObjectMeta: metav1.ObjectMeta{
1806 Name: "custom-job-test",
1807 Namespace: ns.Name,
1808 },
1809 Spec: batchv1.JobSpec{
1810 Completions: ptr.To[int32](1),
1811 Parallelism: ptr.To[int32](1),
1812 Template: v1.PodTemplateSpec{
1813 Spec: v1.PodSpec{
1814 Containers: []v1.Container{
1815 {
1816 Name: "main-container",
1817 Image: "foo",
1818 },
1819 },
1820 },
1821 },
1822 },
1823 }
1824 jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &baseJob)
1825 if err != nil {
1826 t.Fatalf("Error %v when creating the job %q", err, klog.KObj(jobObj))
1827 }
1828 validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
1829 Active: 1,
1830 Ready: ptr.To[int32](0),
1831 Terminating: ptr.To[int32](0),
1832 })
1833
1834
1835 if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
1836 t.Fatalf("Error %v when setting phase %s on the pod of job %v", err, v1.PodSucceeded, klog.KObj(jobObj))
1837 }
1838
1839 jobClient := clientSet.BatchV1().Jobs(jobObj.Namespace)
1840 if err = jobClient.Delete(ctx, jobObj.Name, metav1.DeleteOptions{
1841
1842 PropagationPolicy: ptr.To(metav1.DeletePropagationBackground),
1843 }); err != nil {
1844 t.Fatalf("Error %v when deleting the job %v", err, klog.KObj(jobObj))
1845 }
1846
1847 jobWithManagedBy := baseJob.DeepCopy()
1848 jobWithManagedBy.Spec.ManagedBy = ptr.To(customControllerName)
1849 jobObj, err = createJobWithDefaults(ctx, clientSet, ns.Name, jobWithManagedBy)
1850 if err != nil {
1851 t.Fatalf("Error %q while creating the job %q", err, klog.KObj(jobObj))
1852 }
1853
1854 validateCounterMetric(ctx, t, metrics.JobByExternalControllerTotal, metricLabelsWithValue{
1855 Labels: []string{customControllerName},
1856 Value: 1,
1857 })
1858
1859 time.Sleep(sleepDurationForControllerLatency)
1860 jobObj, err = jobClient.Get(ctx, jobObj.Name, metav1.GetOptions{})
1861 if err != nil {
1862 t.Fatalf("Error %v when getting the latest job %v", err, klog.KObj(jobObj))
1863 }
1864 if diff := cmp.Diff(batchv1.JobStatus{}, jobObj.Status); diff != "" {
1865 t.Fatalf("Unexpected status (-want/+got): %s", diff)
1866 }
1867 }
1868
1869
1870
1871
1872
1873
1874
1875 func TestManagedBy_UsingReservedJobFinalizers(t *testing.T) {
1876 customControllerName := "example.com/custom-job-controller"
1877 defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobManagedBy, true)()
1878
1879 closeFn, restConfig, clientSet, ns := setup(t, "managed-by-reserved-finalizers")
1880 defer closeFn()
1881 ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
1882 defer cancel()
1883 resetMetrics()
1884
1885 jobSpec := batchv1.Job{
1886 TypeMeta: metav1.TypeMeta{
1887 APIVersion: "batch/v1",
1888 Kind: "Job",
1889 },
1890 ObjectMeta: metav1.ObjectMeta{
1891 Name: "custom-job-test",
1892 Namespace: ns.Name,
1893 },
1894 Spec: batchv1.JobSpec{
1895 Completions: ptr.To[int32](1),
1896 Parallelism: ptr.To[int32](1),
1897 Template: v1.PodTemplateSpec{
1898 Spec: v1.PodSpec{
1899 Containers: []v1.Container{
1900 {
1901 Name: "main-container",
1902 Image: "foo",
1903 },
1904 },
1905 },
1906 },
1907 ManagedBy: ptr.To(customControllerName),
1908 },
1909 }
1910
1911 jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &jobSpec)
1912 if err != nil {
1913 t.Fatalf("Error %v when creating the job %q", err, klog.KObj(jobObj))
1914 }
1915
1916 podControl := controller.RealPodControl{
1917 KubeClient: clientSet,
1918 Recorder: &record.FakeRecorder{},
1919 }
1920
1921
1922
1923
1924 podTemplate := jobObj.Spec.Template.DeepCopy()
1925 podTemplate.Finalizers = append(podTemplate.Finalizers, batchv1.JobTrackingFinalizer)
1926 err = podControl.CreatePodsWithGenerateName(ctx, jobObj.Namespace, podTemplate, jobObj, metav1.NewControllerRef(jobObj, batchv1.SchemeGroupVersion.WithKind("Job")), "pod1")
1927 if err != nil {
1928 t.Fatalf("Error %v when creating a pod for job %q", err, klog.KObj(jobObj))
1929 }
1930
1931
1932 jobPods, err := getJobPods(ctx, t, clientSet, jobObj, func(ps v1.PodStatus) bool { return true })
1933 if err != nil {
1934 t.Fatalf("Error %v getting the list of pods for job %q", err, klog.KObj(jobObj))
1935 }
1936 if len(jobPods) != 1 {
1937 t.Fatalf("Unexpected number (%d) of pods for job: %v", len(jobPods), klog.KObj(jobObj))
1938 }
1939
1940
1941 podObj := jobPods[0]
1942 podObj.Status.Phase = v1.PodSucceeded
1943 podObj, err = clientSet.CoreV1().Pods(ns.Name).UpdateStatus(ctx, podObj, metav1.UpdateOptions{})
1944 if err != nil {
1945 t.Fatalf("Error %v when marking the %q pod as succeeded", err, klog.KObj(podObj))
1946 }
1947
1948
1949
1950
1951 jobObj.Status.Conditions = append(jobObj.Status.Conditions, batchv1.JobCondition{
1952 Type: batchv1.JobComplete,
1953 Status: v1.ConditionTrue,
1954 })
1955 jobObj.Status.StartTime = ptr.To(metav1.Now())
1956 jobObj.Status.CompletionTime = ptr.To(metav1.Now())
1957
1958 if jobObj, err = clientSet.BatchV1().Jobs(jobObj.Namespace).UpdateStatus(ctx, jobObj, metav1.UpdateOptions{}); err != nil {
1959 t.Fatalf("Error %v when updating the job as finished %v", err, klog.KObj(jobObj))
1960 }
1961
1962 podObj, err = clientSet.CoreV1().Pods(ns.Name).Get(ctx, podObj.Name, metav1.GetOptions{})
1963 if err != nil {
1964 t.Fatalf("Error %v when getting the latest version of the pod %v", err, klog.KObj(podObj))
1965 }
1966
1967
1968
1969
1970 podObj.Status.Conditions = append(podObj.Status.Conditions, v1.PodCondition{
1971 Type: v1.PodConditionType("CustomCondition"),
1972 Status: v1.ConditionTrue,
1973 })
1974 podObj, err = clientSet.CoreV1().Pods(ns.Name).UpdateStatus(ctx, podObj, metav1.UpdateOptions{})
1975 if err != nil {
1976 t.Fatalf("Error %v when adding a condition to the pod status %v", err, klog.KObj(podObj))
1977 }
1978
1979 time.Sleep(sleepDurationForControllerLatency)
1980 podObj, err = clientSet.CoreV1().Pods(ns.Name).Get(ctx, podObj.Name, metav1.GetOptions{})
1981 if err != nil {
1982 t.Fatalf("Error %v when getting the latest version of the pod %v", err, klog.KObj(podObj))
1983 }
1984
1985 if diff := cmp.Diff([]string{batchv1.JobTrackingFinalizer}, podObj.Finalizers); diff != "" {
1986 t.Fatalf("Unexpected change in the set of finalizers for pod %q, because the owner job %q has custom managedBy, diff=%s", klog.KObj(podObj), klog.KObj(jobObj), diff)
1987 }
1988 }
1989
1990 func getIndexFailureCount(p *v1.Pod) (int, error) {
1991 if p.Annotations == nil {
1992 return 0, errors.New("no annotations found")
1993 }
1994 v, ok := p.Annotations[batchv1.JobIndexFailureCountAnnotation]
1995 if !ok {
1996 return 0, fmt.Errorf("annotation %s not found", batchv1.JobIndexFailureCountAnnotation)
1997 }
1998 return strconv.Atoi(v)
1999 }
2000
2001 func completionModePtr(cm batchv1.CompletionMode) *batchv1.CompletionMode {
2002 return &cm
2003 }
2004
2005
2006
2007
2008 func TestNonParallelJob(t *testing.T) {
2009 t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, fastPodFailureBackoff))
2010 closeFn, restConfig, clientSet, ns := setup(t, "simple")
2011 defer closeFn()
2012 ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
2013 defer func() {
2014 cancel()
2015 }()
2016 resetMetrics()
2017
2018 jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{})
2019 if err != nil {
2020 t.Fatalf("Failed to create Job: %v", err)
2021 }
2022 validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
2023 Active: 1,
2024 Ready: ptr.To[int32](0),
2025 Terminating: ptr.To[int32](0),
2026 })
2027
2028
2029 cancel()
2030 ctx, cancel = startJobControllerAndWaitForCaches(t, restConfig)
2031
2032
2033 if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
2034 t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodFailed, err)
2035 }
2036 validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
2037 Active: 1,
2038 Failed: 1,
2039 Ready: ptr.To[int32](0),
2040 Terminating: ptr.To[int32](0),
2041 })
2042 validateCounterMetric(ctx, t, metrics.JobPodsFinished, metricLabelsWithValue{
2043 Labels: []string{"NonIndexed", "failed"},
2044 Value: 1,
2045 })
2046
2047
2048 cancel()
2049 ctx, cancel = startJobControllerAndWaitForCaches(t, restConfig)
2050
2051
2052 if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
2053 t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err)
2054 }
2055 validateJobComplete(ctx, t, clientSet, jobObj)
2056 validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
2057 Failed: 1,
2058 Succeeded: 1,
2059 Ready: ptr.To[int32](0),
2060 Terminating: ptr.To[int32](0),
2061 })
2062 validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
2063 validateCounterMetric(ctx, t, metrics.JobFinishedNum, metricLabelsWithValue{
2064 Labels: []string{"NonIndexed", "succeeded", ""},
2065 Value: 1,
2066 })
2067 validateCounterMetric(ctx, t, metrics.JobPodsFinished, metricLabelsWithValue{
2068 Labels: []string{"NonIndexed", "succeeded"},
2069 Value: 1,
2070 })
2071 }
2072
2073 func TestParallelJob(t *testing.T) {
2074 t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, fastPodFailureBackoff))
2075 closeFn, restConfig, clientSet, ns := setup(t, "parallel")
2076 defer closeFn()
2077 ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
2078 defer cancel()
2079 resetMetrics()
2080
2081 jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
2082 Spec: batchv1.JobSpec{
2083 Parallelism: ptr.To[int32](5),
2084 },
2085 })
2086 if err != nil {
2087 t.Fatalf("Failed to create Job: %v", err)
2088 }
2089 want := podsByStatus{
2090 Active: 5,
2091 Ready: ptr.To[int32](0),
2092 Terminating: ptr.To[int32](0),
2093 }
2094 validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
2095
2096
2097 if err, _ := setJobPodsReady(ctx, clientSet, jobObj, 2); err != nil {
2098 t.Fatalf("Failed Marking Pods as ready: %v", err)
2099 }
2100 want.Ready = ptr.To[int32](2)
2101 validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
2102
2103
2104 if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
2105 t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err)
2106 }
2107 want = podsByStatus{
2108 Active: 5,
2109 Failed: 2,
2110 Ready: ptr.To[int32](0),
2111 Terminating: ptr.To[int32](0),
2112 }
2113 validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
2114
2115 if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
2116 t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err)
2117 }
2118 want = podsByStatus{
2119 Failed: 2,
2120 Succeeded: 1,
2121 Active: 4,
2122 Ready: ptr.To[int32](0),
2123 Terminating: ptr.To[int32](0),
2124 }
2125 validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
2126 if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
2127 t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err)
2128 }
2129 want = podsByStatus{
2130 Failed: 4,
2131 Succeeded: 1,
2132 Active: 2,
2133 Ready: ptr.To[int32](0),
2134 Terminating: ptr.To[int32](0),
2135 }
2136 validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
2137
2138 if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 2); err != nil {
2139 t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodSucceeded, err)
2140 }
2141 validateJobComplete(ctx, t, clientSet, jobObj)
2142 want = podsByStatus{
2143 Failed: 4,
2144 Succeeded: 3,
2145 Ready: ptr.To[int32](0),
2146 Terminating: ptr.To[int32](0),
2147 }
2148 validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
2149 validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
2150 validateTerminatedPodsTrackingFinalizerMetric(ctx, t, 7)
2151 validateCounterMetric(ctx, t, metrics.JobFinishedNum, metricLabelsWithValue{
2152 Labels: []string{"NonIndexed", "succeeded", ""},
2153 Value: 1,
2154 })
2155 validateCounterMetric(ctx, t, metrics.JobPodsFinished, metricLabelsWithValue{
2156 Labels: []string{"NonIndexed", "succeeded"},
2157 Value: 3,
2158 })
2159 validateCounterMetric(ctx, t, metrics.JobPodsFinished, metricLabelsWithValue{
2160 Labels: []string{"NonIndexed", "failed"},
2161 Value: 4,
2162 })
2163 }
2164
2165 func TestParallelJobChangingParallelism(t *testing.T) {
2166 closeFn, restConfig, clientSet, ns := setup(t, "parallel")
2167 defer closeFn()
2168 ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
2169 defer cancel()
2170
2171 jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
2172 Spec: batchv1.JobSpec{
2173 BackoffLimit: ptr.To[int32](2),
2174 Parallelism: ptr.To[int32](5),
2175 },
2176 })
2177 if err != nil {
2178 t.Fatalf("Failed to create Job: %v", err)
2179 }
2180 validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
2181 Active: 5,
2182 Ready: ptr.To[int32](0),
2183 Terminating: ptr.To[int32](0),
2184 })
2185
2186
2187 patch := []byte(`{"spec":{"parallelism":2}}`)
2188 jobObj, err = clientSet.BatchV1().Jobs(ns.Name).Patch(ctx, jobObj.Name, types.StrategicMergePatchType, patch, metav1.PatchOptions{})
2189 if err != nil {
2190 t.Fatalf("Updating Job: %v", err)
2191 }
2192 validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
2193 Active: 2,
2194 Ready: ptr.To[int32](0),
2195 Terminating: ptr.To[int32](0),
2196 })
2197
2198
2199 patch = []byte(`{"spec":{"parallelism":4}}`)
2200 jobObj, err = clientSet.BatchV1().Jobs(ns.Name).Patch(ctx, jobObj.Name, types.StrategicMergePatchType, patch, metav1.PatchOptions{})
2201 if err != nil {
2202 t.Fatalf("Updating Job: %v", err)
2203 }
2204 validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
2205 Active: 4,
2206 Ready: ptr.To[int32](0),
2207 Terminating: ptr.To[int32](0),
2208 })
2209
2210
2211 if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 4); err != nil {
2212 t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err)
2213 }
2214 validateJobComplete(ctx, t, clientSet, jobObj)
2215 validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
2216 Succeeded: 4,
2217 Ready: ptr.To[int32](0),
2218 Terminating: ptr.To[int32](0),
2219 })
2220 validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
2221 }
2222
2223 func TestParallelJobWithCompletions(t *testing.T) {
2224
2225
2226 t.Cleanup(setDuringTest(&jobcontroller.MaxUncountedPods, 10))
2227 t.Cleanup(setDuringTest(&jobcontroller.MaxPodCreateDeletePerSync, 10))
2228 t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, fastPodFailureBackoff))
2229 closeFn, restConfig, clientSet, ns := setup(t, "completions")
2230 defer closeFn()
2231 ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
2232 defer cancel()
2233 resetMetrics()
2234
2235 jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
2236 Spec: batchv1.JobSpec{
2237 Parallelism: ptr.To[int32](54),
2238 Completions: ptr.To[int32](56),
2239 },
2240 })
2241 if err != nil {
2242 t.Fatalf("Failed to create Job: %v", err)
2243 }
2244 want := podsByStatus{
2245 Active: 54,
2246 Ready: ptr.To[int32](0),
2247 Terminating: ptr.To[int32](0),
2248 }
2249 validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
2250
2251 if err, _ := setJobPodsReady(ctx, clientSet, jobObj, 52); err != nil {
2252 t.Fatalf("Failed Marking Pods as ready: %v", err)
2253 }
2254 want.Ready = ptr.To[int32](52)
2255 validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
2256
2257
2258 if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
2259 t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err)
2260 }
2261 want = podsByStatus{
2262 Active: 54,
2263 Failed: 2,
2264 Ready: ptr.To[int32](50),
2265 Terminating: ptr.To[int32](0),
2266 }
2267 validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
2268
2269 if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 53); err != nil {
2270 t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err)
2271 }
2272 want = podsByStatus{
2273 Failed: 2,
2274 Succeeded: 53,
2275 Active: 3,
2276 Ready: ptr.To[int32](0),
2277 Terminating: ptr.To[int32](0),
2278 }
2279 validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
2280
2281 if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 3); err != nil {
2282 t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodSucceeded, err)
2283 }
2284 validateJobComplete(ctx, t, clientSet, jobObj)
2285 want = podsByStatus{
2286 Failed: 2,
2287 Succeeded: 56,
2288 Ready: ptr.To[int32](0),
2289 Terminating: ptr.To[int32](0),
2290 }
2291 validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
2292 validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
2293 validateCounterMetric(ctx, t, metrics.JobFinishedNum, metricLabelsWithValue{
2294 Labels: []string{"NonIndexed", "succeeded", ""},
2295 Value: 1,
2296 })
2297 validateCounterMetric(ctx, t, metrics.JobPodsFinished, metricLabelsWithValue{
2298 Labels: []string{"NonIndexed", "succeeded"},
2299 Value: 56,
2300 })
2301 validateCounterMetric(ctx, t, metrics.JobPodsFinished, metricLabelsWithValue{
2302 Labels: []string{"NonIndexed", "failed"},
2303 Value: 2,
2304 })
2305 }
2306
2307 func TestIndexedJob(t *testing.T) {
2308 t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, fastPodFailureBackoff))
2309 closeFn, restConfig, clientSet, ns := setup(t, "indexed")
2310 defer closeFn()
2311 ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
2312 defer cancel()
2313 resetMetrics()
2314
2315 mode := batchv1.IndexedCompletion
2316 jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
2317 Spec: batchv1.JobSpec{
2318 Parallelism: ptr.To[int32](3),
2319 Completions: ptr.To[int32](4),
2320 CompletionMode: &mode,
2321 },
2322 })
2323 if err != nil {
2324 t.Fatalf("Failed to create Job: %v", err)
2325 }
2326 validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
2327 Active: 3,
2328 Ready: ptr.To[int32](0),
2329 Terminating: ptr.To[int32](0),
2330 })
2331 validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(0, 1, 2), "", nil)
2332 validateCounterMetric(ctx, t, metrics.JobFinishedIndexesTotal, metricLabelsWithValue{
2333 Labels: []string{"succeeded", "global"},
2334 Value: 0,
2335 })
2336
2337
2338 if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
2339 t.Fatal("Failed trying to succeed pod with index 1")
2340 }
2341 validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
2342 Active: 3,
2343 Succeeded: 1,
2344 Ready: ptr.To[int32](0),
2345 Terminating: ptr.To[int32](0),
2346 })
2347 validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(0, 2, 3), "1", nil)
2348 validateCounterMetric(ctx, t, metrics.JobFinishedIndexesTotal, metricLabelsWithValue{
2349 Labels: []string{"succeeded", "global"},
2350 Value: 1,
2351 })
2352
2353
2354 if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
2355 t.Fatal("Failed trying to succeed pod with index 2")
2356 }
2357 validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
2358 Active: 3,
2359 Failed: 1,
2360 Succeeded: 1,
2361 Ready: ptr.To[int32](0),
2362 Terminating: ptr.To[int32](0),
2363 })
2364 validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(0, 2, 3), "1", nil)
2365 validateCounterMetric(ctx, t, metrics.JobFinishedIndexesTotal, metricLabelsWithValue{
2366 Labels: []string{"succeeded", "global"},
2367 Value: 1,
2368 })
2369
2370
2371 if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 3); err != nil {
2372 t.Fatal("Failed trying to succeed remaining pods")
2373 }
2374 validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
2375 Active: 0,
2376 Failed: 1,
2377 Succeeded: 4,
2378 Ready: ptr.To[int32](0),
2379 Terminating: ptr.To[int32](0),
2380 })
2381 validateIndexedJobPods(ctx, t, clientSet, jobObj, nil, "0-3", nil)
2382 validateJobComplete(ctx, t, clientSet, jobObj)
2383 validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
2384 validateTerminatedPodsTrackingFinalizerMetric(ctx, t, 5)
2385 validateCounterMetric(ctx, t, metrics.JobFinishedIndexesTotal, metricLabelsWithValue{
2386 Labels: []string{"succeeded", "global"},
2387 Value: 4,
2388 })
2389 validateCounterMetric(ctx, t, metrics.JobFinishedNum, metricLabelsWithValue{
2390 Labels: []string{"Indexed", "succeeded", ""},
2391 Value: 1,
2392 })
2393 validateCounterMetric(ctx, t, metrics.JobPodsFinished, metricLabelsWithValue{
2394 Labels: []string{"Indexed", "succeeded"},
2395 Value: 4,
2396 })
2397 validateCounterMetric(ctx, t, metrics.JobPodsFinished, metricLabelsWithValue{
2398 Labels: []string{"Indexed", "failed"},
2399 Value: 1,
2400 })
2401 }
2402
2403 func TestJobPodReplacementPolicy(t *testing.T) {
2404 t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, fastPodFailureBackoff))
2405 indexedCompletion := batchv1.IndexedCompletion
2406 nonIndexedCompletion := batchv1.NonIndexedCompletion
2407 var podReplacementPolicy = func(obj batchv1.PodReplacementPolicy) *batchv1.PodReplacementPolicy {
2408 return &obj
2409 }
2410 type jobStatus struct {
2411 active int
2412 failed int
2413 terminating *int32
2414 }
2415 type jobPodsCreationMetrics struct {
2416 new int
2417 recreateTerminatingOrFailed int
2418 recreateFailed int
2419 }
2420 cases := map[string]struct {
2421 podReplacementPolicyEnabled bool
2422 jobSpec *batchv1.JobSpec
2423 wantStatusAfterDeletion jobStatus
2424 wantStatusAfterFailure jobStatus
2425 wantMetrics jobPodsCreationMetrics
2426 }{
2427 "feature flag off, delete & fail pods, recreate terminating pods, and verify job status counters": {
2428 jobSpec: &batchv1.JobSpec{
2429 Parallelism: ptr.To[int32](2),
2430 Completions: ptr.To[int32](2),
2431 CompletionMode: &indexedCompletion,
2432 Template: v1.PodTemplateSpec{
2433 ObjectMeta: metav1.ObjectMeta{
2434 Finalizers: []string{"fake.example.com/blockDeletion"},
2435 },
2436 },
2437 },
2438 wantStatusAfterDeletion: jobStatus{
2439 active: 2,
2440 failed: 2,
2441 },
2442 wantStatusAfterFailure: jobStatus{
2443 active: 2,
2444 failed: 2,
2445 },
2446 wantMetrics: jobPodsCreationMetrics{
2447 new: 4,
2448 },
2449 },
2450 "feature flag true with IndexedJob, TerminatingOrFailed policy, delete & fail pods, recreate terminating pods, and verify job status counters": {
2451 podReplacementPolicyEnabled: true,
2452 jobSpec: &batchv1.JobSpec{
2453 Parallelism: ptr.To[int32](2),
2454 Completions: ptr.To[int32](2),
2455 CompletionMode: &indexedCompletion,
2456 PodReplacementPolicy: podReplacementPolicy(batchv1.TerminatingOrFailed),
2457 Template: v1.PodTemplateSpec{
2458 ObjectMeta: metav1.ObjectMeta{
2459 Finalizers: []string{"fake.example.com/blockDeletion"},
2460 },
2461 },
2462 },
2463 wantStatusAfterDeletion: jobStatus{
2464 active: 2,
2465 failed: 2,
2466 terminating: ptr.To[int32](2),
2467 },
2468 wantStatusAfterFailure: jobStatus{
2469 active: 2,
2470 failed: 2,
2471 terminating: ptr.To[int32](0),
2472 },
2473 wantMetrics: jobPodsCreationMetrics{
2474 new: 2,
2475 recreateTerminatingOrFailed: 2,
2476 },
2477 },
2478 "feature flag true with NonIndexedJob, TerminatingOrFailed policy, delete & fail pods, recreate terminating pods, and verify job status counters": {
2479 podReplacementPolicyEnabled: true,
2480 jobSpec: &batchv1.JobSpec{
2481 Parallelism: ptr.To[int32](2),
2482 Completions: ptr.To[int32](2),
2483 CompletionMode: &nonIndexedCompletion,
2484 PodReplacementPolicy: podReplacementPolicy(batchv1.TerminatingOrFailed),
2485 Template: v1.PodTemplateSpec{
2486 ObjectMeta: metav1.ObjectMeta{
2487 Finalizers: []string{"fake.example.com/blockDeletion"},
2488 },
2489 },
2490 },
2491 wantStatusAfterDeletion: jobStatus{
2492 active: 2,
2493 failed: 2,
2494 terminating: ptr.To[int32](2),
2495 },
2496 wantStatusAfterFailure: jobStatus{
2497 active: 2,
2498 failed: 2,
2499 terminating: ptr.To[int32](0),
2500 },
2501 wantMetrics: jobPodsCreationMetrics{
2502 new: 2,
2503 recreateTerminatingOrFailed: 2,
2504 },
2505 },
2506 "feature flag false, podFailurePolicy enabled, delete & fail pods, recreate failed pods, and verify job status counters": {
2507 podReplacementPolicyEnabled: false,
2508 jobSpec: &batchv1.JobSpec{
2509 Parallelism: ptr.To[int32](2),
2510 Completions: ptr.To[int32](2),
2511 CompletionMode: &nonIndexedCompletion,
2512 PodReplacementPolicy: podReplacementPolicy(batchv1.Failed),
2513 Template: v1.PodTemplateSpec{
2514 ObjectMeta: metav1.ObjectMeta{
2515 Finalizers: []string{"fake.example.com/blockDeletion"},
2516 },
2517 },
2518 PodFailurePolicy: &batchv1.PodFailurePolicy{
2519 Rules: []batchv1.PodFailurePolicyRule{
2520 {
2521 Action: batchv1.PodFailurePolicyActionFailJob,
2522 OnExitCodes: &batchv1.PodFailurePolicyOnExitCodesRequirement{
2523 Operator: batchv1.PodFailurePolicyOnExitCodesOpIn,
2524 Values: []int32{5},
2525 },
2526 },
2527 },
2528 },
2529 },
2530 wantStatusAfterDeletion: jobStatus{
2531 active: 2,
2532 },
2533 wantStatusAfterFailure: jobStatus{
2534 active: 2,
2535 },
2536 wantMetrics: jobPodsCreationMetrics{
2537 new: 2,
2538 },
2539 },
2540 "feature flag true, Failed policy, delete & fail pods, recreate failed pods, and verify job status counters": {
2541 podReplacementPolicyEnabled: true,
2542 jobSpec: &batchv1.JobSpec{
2543 Parallelism: ptr.To[int32](2),
2544 Completions: ptr.To[int32](2),
2545 CompletionMode: &indexedCompletion,
2546 PodReplacementPolicy: podReplacementPolicy(batchv1.Failed),
2547 Template: v1.PodTemplateSpec{
2548 ObjectMeta: metav1.ObjectMeta{
2549 Finalizers: []string{"fake.example.com/blockDeletion"},
2550 },
2551 },
2552 },
2553 wantStatusAfterDeletion: jobStatus{
2554 active: 0,
2555 failed: 0,
2556 terminating: ptr.To[int32](2),
2557 },
2558 wantStatusAfterFailure: jobStatus{
2559 active: 2,
2560 failed: 2,
2561 terminating: ptr.To[int32](0),
2562 },
2563 wantMetrics: jobPodsCreationMetrics{
2564 new: 2,
2565 recreateFailed: 2,
2566 },
2567 },
2568 "feature flag true with NonIndexedJob, Failed policy, delete & fail pods, recreate failed pods, and verify job status counters": {
2569 podReplacementPolicyEnabled: true,
2570 jobSpec: &batchv1.JobSpec{
2571 Parallelism: ptr.To[int32](2),
2572 Completions: ptr.To[int32](2),
2573 CompletionMode: &nonIndexedCompletion,
2574 PodReplacementPolicy: podReplacementPolicy(batchv1.Failed),
2575 Template: v1.PodTemplateSpec{
2576 ObjectMeta: metav1.ObjectMeta{
2577 Finalizers: []string{"fake.example.com/blockDeletion"},
2578 },
2579 },
2580 },
2581 wantStatusAfterDeletion: jobStatus{
2582 active: 0,
2583 failed: 0,
2584 terminating: ptr.To[int32](2),
2585 },
2586 wantStatusAfterFailure: jobStatus{
2587 active: 2,
2588 failed: 2,
2589 terminating: ptr.To[int32](0),
2590 },
2591 wantMetrics: jobPodsCreationMetrics{
2592 new: 2,
2593 recreateFailed: 2,
2594 },
2595 },
2596 }
2597 for name, tc := range cases {
2598 tc := tc
2599 t.Run(name, func(t *testing.T) {
2600 defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodReplacementPolicy, tc.podReplacementPolicyEnabled)()
2601 defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, tc.jobSpec.PodFailurePolicy != nil)()
2602
2603 closeFn, restConfig, clientSet, ns := setup(t, "pod-replacement-policy")
2604 t.Cleanup(closeFn)
2605 ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
2606 t.Cleanup(cancel)
2607 resetMetrics()
2608
2609 jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
2610 Spec: *tc.jobSpec,
2611 })
2612 if err != nil {
2613 t.Fatalf("Failed to create Job: %v", err)
2614 }
2615 jobClient := clientSet.BatchV1().Jobs(jobObj.Namespace)
2616
2617 waitForPodsToBeActive(ctx, t, jobClient, 2, jobObj)
2618 t.Cleanup(func() { removePodsFinalizer(ctx, t, clientSet, ns.Name) })
2619
2620 deletePods(ctx, t, clientSet, ns.Name)
2621
2622 validateJobsPodsStatusOnly(ctx, t, clientSet, jobObj, podsByStatus{
2623 Terminating: tc.wantStatusAfterDeletion.terminating,
2624 Failed: tc.wantStatusAfterDeletion.failed,
2625 Active: tc.wantStatusAfterDeletion.active,
2626 Ready: ptr.To[int32](0),
2627 })
2628
2629 failTerminatingPods(ctx, t, clientSet, ns.Name)
2630 validateJobsPodsStatusOnly(ctx, t, clientSet, jobObj, podsByStatus{
2631 Terminating: tc.wantStatusAfterFailure.terminating,
2632 Failed: tc.wantStatusAfterFailure.failed,
2633 Active: tc.wantStatusAfterFailure.active,
2634 Ready: ptr.To[int32](0),
2635 })
2636
2637 validateCounterMetric(
2638 ctx,
2639 t,
2640 metrics.JobPodsCreationTotal,
2641 metricLabelsWithValue{Labels: []string{"new", "succeeded"}, Value: tc.wantMetrics.new},
2642 )
2643 validateCounterMetric(
2644 ctx,
2645 t,
2646 metrics.JobPodsCreationTotal,
2647 metricLabelsWithValue{Labels: []string{"recreate_terminating_or_failed", "succeeded"}, Value: tc.wantMetrics.recreateTerminatingOrFailed},
2648 )
2649 validateCounterMetric(
2650 ctx,
2651 t,
2652 metrics.JobPodsCreationTotal,
2653 metricLabelsWithValue{Labels: []string{"recreate_failed", "succeeded"}, Value: tc.wantMetrics.recreateFailed},
2654 )
2655 })
2656 }
2657 }
2658
2659
2660
2661
2662
2663 func TestJobPodReplacementPolicyFeatureToggling(t *testing.T) {
2664 t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, fastPodFailureBackoff))
2665 const podCount int32 = 2
2666 jobSpec := batchv1.JobSpec{
2667 Parallelism: ptr.To(podCount),
2668 Completions: ptr.To(podCount),
2669 CompletionMode: ptr.To(batchv1.NonIndexedCompletion),
2670 PodReplacementPolicy: ptr.To(batchv1.Failed),
2671 }
2672 wantTerminating := ptr.To(podCount)
2673 defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodReplacementPolicy, true)()
2674 closeFn, restConfig, clientSet, ns := setup(t, "pod-replacement-policy")
2675 defer closeFn()
2676 ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
2677 defer func() {
2678 cancel()
2679 }()
2680 resetMetrics()
2681
2682 jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
2683 Spec: jobSpec,
2684 })
2685 if err != nil {
2686 t.Fatalf("Failed to create Job: %v", err)
2687 }
2688 jobClient := clientSet.BatchV1().Jobs(jobObj.Namespace)
2689
2690 waitForPodsToBeActive(ctx, t, jobClient, 2, jobObj)
2691 deletePods(ctx, t, clientSet, jobObj.Namespace)
2692 validateJobsPodsStatusOnly(ctx, t, clientSet, jobObj, podsByStatus{
2693 Terminating: wantTerminating,
2694 Failed: 0,
2695 Ready: ptr.To[int32](0),
2696 })
2697
2698 defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodReplacementPolicy, false)()
2699 cancel()
2700 ctx, cancel = startJobControllerAndWaitForCaches(t, restConfig)
2701
2702 validateJobsPodsStatusOnly(ctx, t, clientSet, jobObj, podsByStatus{
2703 Terminating: nil,
2704 Failed: int(podCount),
2705 Ready: ptr.To[int32](0),
2706 Active: int(podCount),
2707 })
2708
2709 defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodReplacementPolicy, true)()
2710 cancel()
2711 ctx, cancel = startJobControllerAndWaitForCaches(t, restConfig)
2712 waitForPodsToBeActive(ctx, t, jobClient, 2, jobObj)
2713 deletePods(ctx, t, clientSet, jobObj.Namespace)
2714
2715 validateJobsPodsStatusOnly(ctx, t, clientSet, jobObj, podsByStatus{
2716 Terminating: wantTerminating,
2717 Failed: int(podCount),
2718 Active: 0,
2719 Ready: ptr.To[int32](0),
2720 })
2721 }
2722
2723 func TestElasticIndexedJob(t *testing.T) {
2724 const initialCompletions int32 = 3
2725 type jobUpdate struct {
2726 completions *int32
2727 succeedIndexes []int
2728 failIndexes []int
2729 wantSucceededIndexes string
2730 wantFailed int
2731 wantRemainingIndexes sets.Set[int]
2732 wantActivePods int
2733 }
2734 cases := map[string]struct {
2735 featureGate bool
2736 jobUpdates []jobUpdate
2737 wantErr *apierrors.StatusError
2738 }{
2739 "feature flag off, mutation not allowed": {
2740 jobUpdates: []jobUpdate{
2741 {
2742 completions: ptr.To[int32](4),
2743 },
2744 },
2745 wantErr: apierrors.NewInvalid(
2746 schema.GroupKind{Group: "batch", Kind: "Job"},
2747 "test-job",
2748 field.ErrorList{field.Invalid(field.NewPath("spec", "completions"), 4, "field is immutable")},
2749 ),
2750 },
2751 "scale up": {
2752 featureGate: true,
2753 jobUpdates: []jobUpdate{
2754 {
2755
2756 completions: ptr.To[int32](4),
2757 succeedIndexes: []int{0, 1, 2, 3},
2758 wantSucceededIndexes: "0-3",
2759 },
2760 },
2761 },
2762 "scale down": {
2763 featureGate: true,
2764 jobUpdates: []jobUpdate{
2765
2766 {
2767 succeedIndexes: []int{1},
2768 failIndexes: []int{2},
2769 wantSucceededIndexes: "1",
2770 wantFailed: 1,
2771 wantRemainingIndexes: sets.New(0, 2),
2772 wantActivePods: 2,
2773 },
2774
2775
2776 {
2777 completions: ptr.To[int32](1),
2778 succeedIndexes: []int{0},
2779 wantSucceededIndexes: "0",
2780 wantFailed: 1,
2781 },
2782 },
2783 },
2784 "index finishes successfully, scale down, scale up": {
2785 featureGate: true,
2786 jobUpdates: []jobUpdate{
2787
2788 {
2789 succeedIndexes: []int{2},
2790 wantSucceededIndexes: "2",
2791 wantRemainingIndexes: sets.New(0, 1),
2792 wantActivePods: 2,
2793 },
2794
2795 {
2796 completions: ptr.To[int32](2),
2797 wantRemainingIndexes: sets.New(0, 1),
2798 wantActivePods: 2,
2799 },
2800
2801 {
2802 completions: ptr.To[int32](3),
2803 succeedIndexes: []int{0, 1, 2},
2804 wantSucceededIndexes: "0-2",
2805 },
2806 },
2807 },
2808 "scale down to 0, verify that the job succeeds": {
2809 featureGate: true,
2810 jobUpdates: []jobUpdate{
2811 {
2812 completions: ptr.To[int32](0),
2813 },
2814 },
2815 },
2816 }
2817
2818 for name, tc := range cases {
2819 tc := tc
2820 t.Run(name, func(t *testing.T) {
2821 defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.ElasticIndexedJob, tc.featureGate)()
2822 closeFn, restConfig, clientSet, ns := setup(t, "indexed")
2823 defer closeFn()
2824 ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
2825 defer cancel()
2826 resetMetrics()
2827
2828
2829 mode := batchv1.IndexedCompletion
2830 jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
2831 Spec: batchv1.JobSpec{
2832 Parallelism: ptr.To(initialCompletions),
2833 Completions: ptr.To(initialCompletions),
2834 CompletionMode: &mode,
2835 },
2836 })
2837 if err != nil {
2838 t.Fatalf("Failed to create Job: %v", err)
2839 }
2840 jobClient := clientSet.BatchV1().Jobs(jobObj.Namespace)
2841
2842
2843 err = wait.PollUntilContextTimeout(ctx, 5*time.Millisecond, wait.ForeverTestTimeout, true, func(ctx context.Context) (done bool, err error) {
2844 job, err := jobClient.Get(ctx, jobObj.Name, metav1.GetOptions{})
2845 if err != nil {
2846 return false, err
2847 }
2848 if job.Status.Active == initialCompletions {
2849 return true, nil
2850 }
2851 return false, nil
2852 })
2853 if err != nil {
2854 t.Fatalf("Error waiting for Job pods to become active: %v", err)
2855 }
2856
2857 for _, update := range tc.jobUpdates {
2858
2859 if update.completions != nil {
2860 if jobObj, err = updateJob(ctx, jobClient, jobObj.Name, func(j *batchv1.Job) {
2861 j.Spec.Completions = update.completions
2862 j.Spec.Parallelism = update.completions
2863 }); err != nil {
2864 if diff := cmp.Diff(tc.wantErr, err); diff != "" {
2865 t.Fatalf("Unexpected or missing errors (-want/+got): %s", diff)
2866 }
2867 return
2868 }
2869 }
2870
2871
2872 for _, idx := range update.succeedIndexes {
2873 if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodSucceeded, idx); err != nil {
2874 t.Fatalf("Failed trying to succeed pod with index %d: %v", idx, err)
2875 }
2876 }
2877
2878
2879 for _, idx := range update.failIndexes {
2880 if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodFailed, idx); err != nil {
2881 t.Fatalf("Failed trying to fail pod with index %d: %v", idx, err)
2882 }
2883 }
2884
2885 validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
2886 Active: update.wantActivePods,
2887 Succeeded: len(update.succeedIndexes),
2888 Failed: update.wantFailed,
2889 Ready: ptr.To[int32](0),
2890 Terminating: ptr.To[int32](0),
2891 })
2892 validateIndexedJobPods(ctx, t, clientSet, jobObj, update.wantRemainingIndexes, update.wantSucceededIndexes, nil)
2893 }
2894
2895 validateJobComplete(ctx, t, clientSet, jobObj)
2896 })
2897 }
2898 }
2899
2900
2901
2902
2903 func BenchmarkLargeIndexedJob(b *testing.B) {
2904 closeFn, restConfig, clientSet, ns := setup(b, "indexed")
2905 restConfig.QPS = 100
2906 restConfig.Burst = 100
2907 defer closeFn()
2908 ctx, cancel := startJobControllerAndWaitForCaches(b, restConfig)
2909 defer cancel()
2910 backoff := wait.Backoff{
2911 Duration: time.Second,
2912 Factor: 1.5,
2913 Steps: 30,
2914 Cap: 5 * time.Minute,
2915 }
2916 cases := map[string]struct {
2917 nPods int32
2918 backoffLimitPerIndex *int32
2919 }{
2920 "regular indexed job without failures; size=10": {
2921 nPods: 10,
2922 },
2923 "job with backoffLimitPerIndex without failures; size=10": {
2924 nPods: 10,
2925 backoffLimitPerIndex: ptr.To[int32](1),
2926 },
2927 "regular indexed job without failures; size=100": {
2928 nPods: 100,
2929 },
2930 "job with backoffLimitPerIndex without failures; size=100": {
2931 nPods: 100,
2932 backoffLimitPerIndex: ptr.To[int32](1),
2933 },
2934 }
2935 mode := batchv1.IndexedCompletion
2936 for name, tc := range cases {
2937 b.Run(name, func(b *testing.B) {
2938 enableJobBackoffLimitPerIndex := tc.backoffLimitPerIndex != nil
2939 defer featuregatetesting.SetFeatureGateDuringTest(b, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, enableJobBackoffLimitPerIndex)()
2940 b.ResetTimer()
2941 for n := 0; n < b.N; n++ {
2942 b.StartTimer()
2943 jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
2944 ObjectMeta: metav1.ObjectMeta{
2945 Name: fmt.Sprintf("npods-%d-%d-%v", tc.nPods, n, enableJobBackoffLimitPerIndex),
2946 },
2947 Spec: batchv1.JobSpec{
2948 Parallelism: ptr.To(tc.nPods),
2949 Completions: ptr.To(tc.nPods),
2950 CompletionMode: &mode,
2951 BackoffLimitPerIndex: tc.backoffLimitPerIndex,
2952 },
2953 })
2954 if err != nil {
2955 b.Fatalf("Failed to create Job: %v", err)
2956 }
2957 b.Cleanup(func() {
2958 if err := cleanUp(ctx, clientSet, jobObj); err != nil {
2959 b.Fatalf("Failed cleanup: %v", err)
2960 }
2961 })
2962 remaining := int(tc.nPods)
2963 if err := wait.ExponentialBackoff(backoff, func() (done bool, err error) {
2964 if err, succ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, remaining); err != nil {
2965 remaining -= succ
2966 b.Logf("Transient failure succeeding pods: %v", err)
2967 return false, nil
2968 }
2969 return true, nil
2970 }); err != nil {
2971 b.Fatalf("Could not succeed the remaining %d pods: %v", remaining, err)
2972 }
2973 validateJobComplete(ctx, b, clientSet, jobObj)
2974 b.StopTimer()
2975 }
2976 })
2977 }
2978 }
2979
2980
2981
2982
2983
2984 func BenchmarkLargeFailureHandling(b *testing.B) {
2985 b.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, fastPodFailureBackoff))
2986 b.Cleanup(setDurationDuringTest(&jobcontroller.MaxJobPodFailureBackOff, fastPodFailureBackoff))
2987 closeFn, restConfig, clientSet, ns := setup(b, "indexed")
2988 restConfig.QPS = 100
2989 restConfig.Burst = 100
2990 defer closeFn()
2991 ctx, cancel := startJobControllerAndWaitForCaches(b, restConfig)
2992 defer cancel()
2993 backoff := wait.Backoff{
2994 Duration: time.Second,
2995 Factor: 1.5,
2996 Steps: 30,
2997 Cap: 5 * time.Minute,
2998 }
2999 cases := map[string]struct {
3000 nPods int32
3001 backoffLimitPerIndex *int32
3002 customTimeout *time.Duration
3003 }{
3004 "regular indexed job with failures; size=10": {
3005 nPods: 10,
3006 },
3007 "job with backoffLimitPerIndex with failures; size=10": {
3008 nPods: 10,
3009 backoffLimitPerIndex: ptr.To[int32](1),
3010 },
3011 "regular indexed job with failures; size=100": {
3012 nPods: 100,
3013 },
3014 "job with backoffLimitPerIndex with failures; size=100": {
3015 nPods: 100,
3016 backoffLimitPerIndex: ptr.To[int32](1),
3017 },
3018 }
3019 mode := batchv1.IndexedCompletion
3020 for name, tc := range cases {
3021 b.Run(name, func(b *testing.B) {
3022 enableJobBackoffLimitPerIndex := tc.backoffLimitPerIndex != nil
3023 timeout := ptr.Deref(tc.customTimeout, wait.ForeverTestTimeout)
3024 defer featuregatetesting.SetFeatureGateDuringTest(b, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, enableJobBackoffLimitPerIndex)()
3025 b.ResetTimer()
3026 for n := 0; n < b.N; n++ {
3027 b.StopTimer()
3028 jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
3029 ObjectMeta: metav1.ObjectMeta{
3030 Name: fmt.Sprintf("npods-%d-%d-%v", tc.nPods, n, enableJobBackoffLimitPerIndex),
3031 },
3032 Spec: batchv1.JobSpec{
3033 Parallelism: ptr.To(tc.nPods),
3034 Completions: ptr.To(tc.nPods),
3035 CompletionMode: &mode,
3036 BackoffLimitPerIndex: tc.backoffLimitPerIndex,
3037 BackoffLimit: ptr.To(tc.nPods),
3038 },
3039 })
3040 if err != nil {
3041 b.Fatalf("Failed to create Job: %v", err)
3042 }
3043 b.Cleanup(func() {
3044 if err := cleanUp(ctx, clientSet, jobObj); err != nil {
3045 b.Fatalf("Failed cleanup: %v", err)
3046 }
3047 })
3048 validateJobsPodsStatusOnlyWithTimeout(ctx, b, clientSet, jobObj, podsByStatus{
3049 Active: int(tc.nPods),
3050 Ready: ptr.To[int32](0),
3051 Terminating: ptr.To[int32](0),
3052 }, timeout)
3053
3054 b.StartTimer()
3055 remaining := int(tc.nPods)
3056 if err := wait.ExponentialBackoff(backoff, func() (done bool, err error) {
3057 if err, fail := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, remaining); err != nil {
3058 remaining -= fail
3059 b.Logf("Transient failure failing pods: %v", err)
3060 return false, nil
3061 }
3062 return true, nil
3063 }); err != nil {
3064 b.Fatalf("Could not succeed the remaining %d pods: %v", remaining, err)
3065 }
3066 validateJobsPodsStatusOnlyWithTimeout(ctx, b, clientSet, jobObj, podsByStatus{
3067 Active: int(tc.nPods),
3068 Ready: ptr.To[int32](0),
3069 Failed: int(tc.nPods),
3070 Terminating: ptr.To[int32](0),
3071 }, timeout)
3072 b.StopTimer()
3073 }
3074 })
3075 }
3076 }
3077
3078
3079 func cleanUp(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job) error {
3080
3081
3082 for {
3083 pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{Limit: 1})
3084 if err != nil {
3085 return err
3086 }
3087 if len(pods.Items) == 0 {
3088 break
3089 }
3090 err = clientSet.CoreV1().Pods(jobObj.Namespace).DeleteCollection(ctx,
3091 metav1.DeleteOptions{},
3092 metav1.ListOptions{
3093 Limit: 1000,
3094 })
3095 if err != nil {
3096 return err
3097 }
3098 }
3099 return clientSet.BatchV1().Jobs(jobObj.Namespace).Delete(ctx, jobObj.Name, metav1.DeleteOptions{})
3100 }
3101
3102 func TestOrphanPodsFinalizersClearedWithGC(t *testing.T) {
3103 for _, policy := range []metav1.DeletionPropagation{metav1.DeletePropagationOrphan, metav1.DeletePropagationBackground, metav1.DeletePropagationForeground} {
3104 t.Run(string(policy), func(t *testing.T) {
3105 closeFn, restConfig, clientSet, ns := setup(t, "simple")
3106 defer closeFn()
3107 informerSet := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(restConfig, "controller-informers")), 0)
3108
3109 restConfig.QPS = 1
3110 restConfig.Burst = 1
3111 jc, ctx, cancel := createJobControllerWithSharedInformers(t, restConfig, informerSet)
3112 resetMetrics()
3113 defer cancel()
3114 restConfig.QPS = 200
3115 restConfig.Burst = 200
3116 runGC := util.CreateGCController(ctx, t, *restConfig, informerSet)
3117 informerSet.Start(ctx.Done())
3118 go jc.Run(ctx, 1)
3119 runGC()
3120
3121 jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
3122 Spec: batchv1.JobSpec{
3123 Parallelism: ptr.To[int32](2),
3124 },
3125 })
3126 if err != nil {
3127 t.Fatalf("Failed to create Job: %v", err)
3128 }
3129 validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
3130 Active: 2,
3131 Ready: ptr.To[int32](0),
3132 Terminating: ptr.To[int32](0),
3133 })
3134
3135
3136 err = clientSet.BatchV1().Jobs(jobObj.Namespace).Delete(ctx, jobObj.Name, metav1.DeleteOptions{
3137 PropagationPolicy: &policy,
3138 })
3139 if err != nil {
3140 t.Fatalf("Failed to delete job: %v", err)
3141 }
3142 validateNoOrphanPodsWithFinalizers(ctx, t, clientSet, jobObj)
3143
3144 validateTerminatedPodsTrackingFinalizerMetric(ctx, t, 0)
3145 })
3146 }
3147 }
3148
3149 func TestFinalizersClearedWhenBackoffLimitExceeded(t *testing.T) {
3150
3151
3152 t.Cleanup(setDuringTest(&jobcontroller.MaxUncountedPods, 50))
3153 closeFn, restConfig, clientSet, ns := setup(t, "simple")
3154 defer closeFn()
3155 ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
3156 defer cancel()
3157
3158
3159
3160
3161 mode := batchv1.IndexedCompletion
3162 jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
3163 Spec: batchv1.JobSpec{
3164 CompletionMode: &mode,
3165 Completions: ptr.To[int32](100),
3166 Parallelism: ptr.To[int32](100),
3167 BackoffLimit: ptr.To[int32](0),
3168 },
3169 })
3170 if err != nil {
3171 t.Fatalf("Could not create job: %v", err)
3172 }
3173
3174
3175 err = wait.PollUntilContextTimeout(ctx, time.Millisecond, wait.ForeverTestTimeout, true, func(ctx context.Context) (done bool, err error) {
3176 if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
3177 return false, nil
3178 }
3179 return true, nil
3180 })
3181 if err != nil {
3182 t.Fatalf("Could not fail pod: %v", err)
3183 }
3184
3185 validateJobFailed(ctx, t, clientSet, jobObj)
3186 validateCounterMetric(ctx, t, metrics.JobFinishedNum, metricLabelsWithValue{
3187 Labels: []string{"Indexed", "failed", "BackoffLimitExceeded"},
3188 Value: 1,
3189 })
3190
3191 validateNoOrphanPodsWithFinalizers(ctx, t, clientSet, jobObj)
3192 }
3193
3194 func TestJobPodsCreatedWithExponentialBackoff(t *testing.T) {
3195 t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, 2*time.Second))
3196 closeFn, restConfig, clientSet, ns := setup(t, "simple")
3197 defer closeFn()
3198 ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
3199 defer cancel()
3200
3201 jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{})
3202 if err != nil {
3203 t.Fatalf("Could not create job: %v", err)
3204 }
3205 validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
3206 Active: 1,
3207 Ready: ptr.To[int32](0),
3208 Terminating: ptr.To[int32](0),
3209 })
3210
3211
3212 if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
3213 t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodFailed, err)
3214 }
3215 validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
3216 Active: 1,
3217 Ready: ptr.To[int32](0),
3218 Failed: 1,
3219 Terminating: ptr.To[int32](0),
3220 })
3221
3222
3223 if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
3224 t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodFailed, err)
3225 }
3226 validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
3227 Active: 1,
3228 Ready: ptr.To[int32](0),
3229 Failed: 2,
3230 Terminating: ptr.To[int32](0),
3231 })
3232
3233 jobPods, err := getJobPods(ctx, t, clientSet, jobObj, func(ps v1.PodStatus) bool { return true })
3234 if err != nil {
3235 t.Fatalf("Failed to list Job Pods: %v", err)
3236 }
3237 if len(jobPods) != 3 {
3238 t.Fatalf("Expected to get %v pods, received %v", 4, len(jobPods))
3239 }
3240 validateExpotentialBackoffDelay(t, jobcontroller.DefaultJobPodFailureBackOff, jobPods)
3241 }
3242
3243 func validateExpotentialBackoffDelay(t *testing.T, defaultPodFailureBackoff time.Duration, pods []*v1.Pod) {
3244 t.Helper()
3245 creationTime := []time.Time{}
3246 finishTime := []time.Time{}
3247 for _, pod := range pods {
3248 creationTime = append(creationTime, pod.CreationTimestamp.Time)
3249 if len(pod.Status.ContainerStatuses) > 0 {
3250 finishTime = append(finishTime, pod.Status.ContainerStatuses[0].State.Terminated.FinishedAt.Time)
3251 }
3252 }
3253
3254 sort.Slice(creationTime, func(i, j int) bool {
3255 return creationTime[i].Before(creationTime[j])
3256 })
3257 sort.Slice(finishTime, func(i, j int) bool {
3258 return finishTime[i].Before(finishTime[j])
3259 })
3260
3261 diff := creationTime[1].Sub(finishTime[0])
3262
3263 if diff < defaultPodFailureBackoff {
3264 t.Fatalf("Second pod should be created at least %v seconds after the first pod, time difference: %v", defaultPodFailureBackoff, diff)
3265 }
3266
3267 if diff >= 2*defaultPodFailureBackoff {
3268 t.Fatalf("Second pod should be created before %v seconds after the first pod, time difference: %v", 2*defaultPodFailureBackoff, diff)
3269 }
3270
3271 diff = creationTime[2].Sub(finishTime[1])
3272
3273 if diff < 2*defaultPodFailureBackoff {
3274 t.Fatalf("Third pod should be created at least %v seconds after the second pod, time difference: %v", 2*defaultPodFailureBackoff, diff)
3275 }
3276
3277 if diff >= 4*defaultPodFailureBackoff {
3278 t.Fatalf("Third pod should be created before %v seconds after the second pod, time difference: %v", 4*defaultPodFailureBackoff, diff)
3279 }
3280 }
3281
3282
3283
3284 func TestJobFailedWithInterrupts(t *testing.T) {
3285 closeFn, restConfig, clientSet, ns := setup(t, "simple")
3286 defer closeFn()
3287 ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
3288 defer func() {
3289 cancel()
3290 }()
3291 jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
3292 Spec: batchv1.JobSpec{
3293 Completions: ptr.To[int32](10),
3294 Parallelism: ptr.To[int32](10),
3295 BackoffLimit: ptr.To[int32](0),
3296 Template: v1.PodTemplateSpec{
3297 Spec: v1.PodSpec{
3298 NodeName: "foo",
3299 },
3300 },
3301 },
3302 })
3303 if err != nil {
3304 t.Fatalf("Could not create job: %v", err)
3305 }
3306 validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
3307 Active: 10,
3308 Ready: ptr.To[int32](0),
3309 Terminating: ptr.To[int32](0),
3310 })
3311 t.Log("Finishing pods")
3312 if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
3313 t.Fatalf("Could not fail a pod: %v", err)
3314 }
3315 remaining := 9
3316 if err := wait.PollUntilContextTimeout(ctx, 5*time.Millisecond, wait.ForeverTestTimeout, true, func(ctx context.Context) (done bool, err error) {
3317 if err, succ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, remaining); err != nil {
3318 remaining -= succ
3319 t.Logf("Transient failure succeeding pods: %v", err)
3320 return false, nil
3321 }
3322 return true, nil
3323 }); err != nil {
3324 t.Fatalf("Could not succeed the remaining %d pods: %v", remaining, err)
3325 }
3326 t.Log("Recreating job controller")
3327 cancel()
3328 ctx, cancel = startJobControllerAndWaitForCaches(t, restConfig)
3329 validateJobCondition(ctx, t, clientSet, jobObj, batchv1.JobFailed)
3330 }
3331
3332 func validateNoOrphanPodsWithFinalizers(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job) {
3333 t.Helper()
3334 orphanPods := 0
3335 if err := wait.PollUntilContextTimeout(ctx, waitInterval, wait.ForeverTestTimeout, true, func(ctx context.Context) (done bool, err error) {
3336 pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{
3337 LabelSelector: metav1.FormatLabelSelector(jobObj.Spec.Selector),
3338 })
3339 if err != nil {
3340 return false, err
3341 }
3342 orphanPods = 0
3343 for _, pod := range pods.Items {
3344 if hasJobTrackingFinalizer(&pod) {
3345 orphanPods++
3346 }
3347 }
3348 return orphanPods == 0, nil
3349 }); err != nil {
3350 t.Errorf("Failed waiting for pods to be freed from finalizer: %v", err)
3351 t.Logf("Last saw %d orphan pods", orphanPods)
3352 }
3353 }
3354
3355 func TestOrphanPodsFinalizersClearedOnRestart(t *testing.T) {
3356
3357 closeFn, restConfig, clientSet, ns := setup(t, "simple")
3358 defer closeFn()
3359 ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
3360 defer func() {
3361 cancel()
3362 }()
3363
3364 jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
3365 Spec: batchv1.JobSpec{
3366 Parallelism: ptr.To[int32](1),
3367 },
3368 })
3369 if err != nil {
3370 t.Fatalf("Failed to create Job: %v", err)
3371 }
3372 validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
3373 Active: 1,
3374 Ready: ptr.To[int32](0),
3375 Terminating: ptr.To[int32](0),
3376 })
3377
3378
3379 cancel()
3380
3381 err = clientSet.BatchV1().Jobs(jobObj.Namespace).Delete(context.Background(), jobObj.Name, metav1.DeleteOptions{})
3382 if err != nil {
3383 t.Fatalf("Failed to delete job: %v", err)
3384 }
3385
3386
3387 ctx, cancel = startJobControllerAndWaitForCaches(t, restConfig)
3388 validateNoOrphanPodsWithFinalizers(ctx, t, clientSet, jobObj)
3389 }
3390
3391 func TestSuspendJob(t *testing.T) {
3392 type step struct {
3393 flag bool
3394 wantActive int
3395 wantStatus v1.ConditionStatus
3396 wantReason string
3397 }
3398 testCases := []struct {
3399 featureGate bool
3400 create step
3401 update step
3402 }{
3403
3404
3405 {
3406 create: step{flag: false, wantActive: 2},
3407 update: step{flag: true, wantActive: 0, wantStatus: v1.ConditionTrue, wantReason: "Suspended"},
3408 },
3409 {
3410 create: step{flag: true, wantActive: 0, wantStatus: v1.ConditionTrue, wantReason: "Suspended"},
3411 update: step{flag: false, wantActive: 2, wantStatus: v1.ConditionFalse, wantReason: "Resumed"},
3412 },
3413 }
3414
3415 for _, tc := range testCases {
3416 name := fmt.Sprintf("feature=%v,create=%v,update=%v", tc.featureGate, tc.create.flag, tc.update.flag)
3417 t.Run(name, func(t *testing.T) {
3418 closeFn, restConfig, clientSet, ns := setup(t, "suspend")
3419 defer closeFn()
3420 ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
3421 defer cancel()
3422 events, err := clientSet.EventsV1().Events(ns.Name).Watch(ctx, metav1.ListOptions{})
3423 if err != nil {
3424 t.Fatal(err)
3425 }
3426 defer events.Stop()
3427
3428 parallelism := int32(2)
3429 job, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
3430 Spec: batchv1.JobSpec{
3431 Parallelism: ptr.To(parallelism),
3432 Completions: ptr.To[int32](4),
3433 Suspend: ptr.To(tc.create.flag),
3434 },
3435 })
3436 if err != nil {
3437 t.Fatalf("Failed to create Job: %v", err)
3438 }
3439
3440 validate := func(s string, active int, status v1.ConditionStatus, reason string) {
3441 validateJobPodsStatus(ctx, t, clientSet, job, podsByStatus{
3442 Active: active,
3443 Ready: ptr.To[int32](0),
3444 Terminating: ptr.To[int32](0),
3445 })
3446 job, err = clientSet.BatchV1().Jobs(ns.Name).Get(ctx, job.Name, metav1.GetOptions{})
3447 if err != nil {
3448 t.Fatalf("Failed to get Job after %s: %v", s, err)
3449 }
3450 if got, want := getJobConditionStatus(ctx, job, batchv1.JobSuspended), status; got != want {
3451 t.Errorf("Unexpected Job condition %q status after %s: got %q, want %q", batchv1.JobSuspended, s, got, want)
3452 }
3453 if err := waitForEvent(ctx, events, job.UID, reason); err != nil {
3454 t.Errorf("Waiting for event with reason %q after %s: %v", reason, s, err)
3455 }
3456 }
3457 validate("create", tc.create.wantActive, tc.create.wantStatus, tc.create.wantReason)
3458
3459 job.Spec.Suspend = ptr.To(tc.update.flag)
3460 job, err = clientSet.BatchV1().Jobs(ns.Name).Update(ctx, job, metav1.UpdateOptions{})
3461 if err != nil {
3462 t.Fatalf("Failed to update Job: %v", err)
3463 }
3464 validate("update", tc.update.wantActive, tc.update.wantStatus, tc.update.wantReason)
3465 })
3466 }
3467 }
3468
3469 func TestSuspendJobControllerRestart(t *testing.T) {
3470 closeFn, restConfig, clientSet, ns := setup(t, "suspend")
3471 defer closeFn()
3472 ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
3473 defer cancel()
3474
3475 job, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
3476 Spec: batchv1.JobSpec{
3477 Parallelism: ptr.To[int32](2),
3478 Completions: ptr.To[int32](4),
3479 Suspend: ptr.To(true),
3480 },
3481 })
3482 if err != nil {
3483 t.Fatalf("Failed to create Job: %v", err)
3484 }
3485 validateJobPodsStatus(ctx, t, clientSet, job, podsByStatus{
3486 Active: 0,
3487 Ready: ptr.To[int32](0),
3488 Terminating: ptr.To[int32](0),
3489 })
3490 }
3491
3492 func TestNodeSelectorUpdate(t *testing.T) {
3493 closeFn, restConfig, clientSet, ns := setup(t, "suspend")
3494 defer closeFn()
3495 ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
3496 defer cancel()
3497
3498 job, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{Spec: batchv1.JobSpec{
3499 Parallelism: ptr.To[int32](1),
3500 Suspend: ptr.To(true),
3501 }})
3502 if err != nil {
3503 t.Fatalf("Failed to create Job: %v", err)
3504 }
3505 jobName := job.Name
3506 jobNamespace := job.Namespace
3507 jobClient := clientSet.BatchV1().Jobs(jobNamespace)
3508
3509
3510 nodeSelector := map[string]string{"foo": "bar"}
3511 if _, err := updateJob(ctx, jobClient, jobName, func(j *batchv1.Job) {
3512 j.Spec.Template.Spec.NodeSelector = nodeSelector
3513 j.Spec.Suspend = ptr.To(false)
3514 }); err != nil {
3515 t.Errorf("Unexpected error: %v", err)
3516 }
3517
3518
3519
3520 var pod *v1.Pod
3521 if err := wait.PollUntilContextTimeout(ctx, waitInterval, wait.ForeverTestTimeout, true, func(ctx context.Context) (bool, error) {
3522 pods, err := clientSet.CoreV1().Pods(jobNamespace).List(ctx, metav1.ListOptions{})
3523 if err != nil {
3524 t.Fatalf("Failed to list Job Pods: %v", err)
3525 }
3526 if len(pods.Items) == 0 {
3527 return false, nil
3528 }
3529 pod = &pods.Items[0]
3530 return true, nil
3531 }); err != nil || pod == nil {
3532 t.Fatalf("pod not found: %v", err)
3533 }
3534
3535
3536
3537 if diff := cmp.Diff(nodeSelector, pod.Spec.NodeSelector); diff != "" {
3538 t.Errorf("Unexpected nodeSelector (-want,+got):\n%s", diff)
3539 }
3540
3541
3542 _, err = updateJob(ctx, jobClient, jobName, func(j *batchv1.Job) {
3543 j.Spec.Template.Spec.NodeSelector = map[string]string{"foo": "baz"}
3544 })
3545
3546 if err == nil || !strings.Contains(err.Error(), "spec.template: Invalid value") {
3547 t.Errorf("Expected \"spec.template: Invalid value\" error, got: %v", err)
3548 }
3549
3550 }
3551
3552 type podsByStatus struct {
3553 Active int
3554 Ready *int32
3555 Failed int
3556 Succeeded int
3557 Terminating *int32
3558 }
3559
3560 func validateJobsPodsStatusOnly(ctx context.Context, t testing.TB, clientSet clientset.Interface, jobObj *batchv1.Job, desired podsByStatus) {
3561 t.Helper()
3562 validateJobsPodsStatusOnlyWithTimeout(ctx, t, clientSet, jobObj, desired, wait.ForeverTestTimeout)
3563 }
3564
3565 func validateJobsPodsStatusOnlyWithTimeout(ctx context.Context, t testing.TB, clientSet clientset.Interface, jobObj *batchv1.Job, desired podsByStatus, timeout time.Duration) {
3566 t.Helper()
3567 var actualCounts podsByStatus
3568 if err := wait.PollUntilContextTimeout(ctx, waitInterval, timeout, true, func(ctx context.Context) (bool, error) {
3569 updatedJob, err := clientSet.BatchV1().Jobs(jobObj.Namespace).Get(ctx, jobObj.Name, metav1.GetOptions{})
3570 if err != nil {
3571 t.Fatalf("Failed to get updated Job: %v", err)
3572 }
3573 actualCounts = podsByStatus{
3574 Active: int(updatedJob.Status.Active),
3575 Ready: updatedJob.Status.Ready,
3576 Succeeded: int(updatedJob.Status.Succeeded),
3577 Failed: int(updatedJob.Status.Failed),
3578 Terminating: updatedJob.Status.Terminating,
3579 }
3580 return cmp.Equal(actualCounts, desired), nil
3581 }); err != nil {
3582 diff := cmp.Diff(desired, actualCounts)
3583 t.Errorf("Waiting for Job Status: %v\nPods (-want,+got):\n%s", err, diff)
3584 }
3585 }
3586
3587 func validateJobPodsStatus(ctx context.Context, t testing.TB, clientSet clientset.Interface, jobObj *batchv1.Job, desired podsByStatus) {
3588 t.Helper()
3589 validateJobsPodsStatusOnly(ctx, t, clientSet, jobObj, desired)
3590 var active []*v1.Pod
3591 if err := wait.PollUntilContextTimeout(ctx, waitInterval, time.Second*5, true, func(ctx context.Context) (bool, error) {
3592 pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{})
3593 if err != nil {
3594 t.Fatalf("Failed to list Job Pods: %v", err)
3595 }
3596 active = nil
3597 for _, pod := range pods.Items {
3598 phase := pod.Status.Phase
3599 if metav1.IsControlledBy(&pod, jobObj) && (phase == v1.PodPending || phase == v1.PodRunning) {
3600 p := pod
3601 active = append(active, &p)
3602 }
3603 }
3604 return len(active) == desired.Active, nil
3605 }); err != nil {
3606 if len(active) != desired.Active {
3607 t.Errorf("Found %d active Pods, want %d", len(active), desired.Active)
3608 }
3609 }
3610 for _, p := range active {
3611 if !hasJobTrackingFinalizer(p) {
3612 t.Errorf("Active pod %s doesn't have tracking finalizer", p.Name)
3613 }
3614 }
3615 }
3616
3617 func getJobPods(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job, filter func(v1.PodStatus) bool) ([]*v1.Pod, error) {
3618 t.Helper()
3619 allPods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{})
3620 if err != nil {
3621 return nil, err
3622 }
3623 jobPods := make([]*v1.Pod, 0, 0)
3624 for _, pod := range allPods.Items {
3625 if metav1.IsControlledBy(&pod, jobObj) && filter(pod.Status) {
3626 p := pod
3627 jobPods = append(jobPods, &p)
3628 }
3629 }
3630 return jobPods, nil
3631 }
3632
3633 func validateFinishedPodsNoFinalizer(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job) {
3634 t.Helper()
3635 pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{})
3636 if err != nil {
3637 t.Fatalf("Failed to list Job Pods: %v", err)
3638 }
3639 for _, pod := range pods.Items {
3640 phase := pod.Status.Phase
3641 if metav1.IsControlledBy(&pod, jobObj) && (phase == v1.PodPending || phase == v1.PodRunning) && hasJobTrackingFinalizer(&pod) {
3642 t.Errorf("Finished pod %s still has a tracking finalizer", pod.Name)
3643 }
3644 }
3645 }
3646
3647
3648
3649
3650 func validateIndexedJobPods(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job, wantActive sets.Set[int], gotCompleted string, wantFailed *string) {
3651 t.Helper()
3652 updatedJob, err := clientSet.BatchV1().Jobs(jobObj.Namespace).Get(ctx, jobObj.Name, metav1.GetOptions{})
3653 if err != nil {
3654 t.Fatalf("Failed to get updated Job: %v", err)
3655 }
3656 if updatedJob.Status.CompletedIndexes != gotCompleted {
3657 t.Errorf("Got completed indexes %q, want %q", updatedJob.Status.CompletedIndexes, gotCompleted)
3658 }
3659 if diff := cmp.Diff(wantFailed, updatedJob.Status.FailedIndexes); diff != "" {
3660 t.Errorf("Got unexpected failed indexes: %s", diff)
3661 }
3662 pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{})
3663 if err != nil {
3664 t.Fatalf("Failed to list Job Pods: %v", err)
3665 }
3666 gotActive := sets.New[int]()
3667 for _, pod := range pods.Items {
3668 if metav1.IsControlledBy(&pod, jobObj) {
3669 if pod.Status.Phase == v1.PodPending || pod.Status.Phase == v1.PodRunning {
3670 ix, err := getCompletionIndex(&pod)
3671 if err != nil {
3672 t.Errorf("Failed getting completion index for pod %s: %v", pod.Name, err)
3673 } else {
3674 gotActive.Insert(ix)
3675 }
3676 expectedName := fmt.Sprintf("%s-%d", jobObj.Name, ix)
3677 if diff := cmp.Equal(expectedName, pod.Spec.Hostname); !diff {
3678 t.Errorf("Got pod hostname %s, want %s", pod.Spec.Hostname, expectedName)
3679 }
3680 }
3681 }
3682 }
3683 if wantActive == nil {
3684 wantActive = sets.New[int]()
3685 }
3686 if diff := cmp.Diff(sets.List(wantActive), sets.List(gotActive)); diff != "" {
3687 t.Errorf("Unexpected active indexes (-want,+got):\n%s", diff)
3688 }
3689 }
3690
3691 func waitForEvent(ctx context.Context, events watch.Interface, uid types.UID, reason string) error {
3692 if reason == "" {
3693 return nil
3694 }
3695 return wait.PollUntilContextTimeout(ctx, waitInterval, wait.ForeverTestTimeout, true, func(ctx context.Context) (bool, error) {
3696 for {
3697 var ev watch.Event
3698 select {
3699 case ev = <-events.ResultChan():
3700 default:
3701 return false, nil
3702 }
3703 e, ok := ev.Object.(*eventsv1.Event)
3704 if !ok {
3705 continue
3706 }
3707 ctrl := "job-controller"
3708 if (e.ReportingController == ctrl || e.DeprecatedSource.Component == ctrl) && e.Reason == reason && e.Regarding.UID == uid {
3709 return true, nil
3710 }
3711 }
3712 })
3713 }
3714
3715 func getJobConditionStatus(ctx context.Context, job *batchv1.Job, cType batchv1.JobConditionType) v1.ConditionStatus {
3716 for _, cond := range job.Status.Conditions {
3717 if cond.Type == cType {
3718 return cond.Status
3719 }
3720 }
3721 return ""
3722 }
3723
3724 func validateJobFailed(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job) {
3725 t.Helper()
3726 validateJobCondition(ctx, t, clientSet, jobObj, batchv1.JobFailed)
3727 }
3728
3729 func validateJobComplete(ctx context.Context, t testing.TB, clientSet clientset.Interface, jobObj *batchv1.Job) {
3730 t.Helper()
3731 validateJobCondition(ctx, t, clientSet, jobObj, batchv1.JobComplete)
3732 }
3733
3734 func validateJobCondition(ctx context.Context, t testing.TB, clientSet clientset.Interface, jobObj *batchv1.Job, cond batchv1.JobConditionType) {
3735 t.Helper()
3736 if err := wait.PollUntilContextTimeout(ctx, waitInterval, wait.ForeverTestTimeout, true, func(ctx context.Context) (bool, error) {
3737 j, err := clientSet.BatchV1().Jobs(jobObj.Namespace).Get(ctx, jobObj.Name, metav1.GetOptions{})
3738 if err != nil {
3739 t.Fatalf("Failed to obtain updated Job: %v", err)
3740 }
3741 return getJobConditionStatus(ctx, j, cond) == v1.ConditionTrue, nil
3742 }); err != nil {
3743 t.Errorf("Waiting for Job to have condition %s: %v", cond, err)
3744 }
3745 }
3746
3747 func setJobPodsPhase(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, phase v1.PodPhase, cnt int) (error, int) {
3748 op := func(p *v1.Pod) bool {
3749 p.Status.Phase = phase
3750 if phase == v1.PodFailed || phase == v1.PodSucceeded {
3751 p.Status.ContainerStatuses = []v1.ContainerStatus{
3752 {
3753 State: v1.ContainerState{
3754 Terminated: &v1.ContainerStateTerminated{
3755 FinishedAt: metav1.Now(),
3756 },
3757 },
3758 },
3759 }
3760 }
3761 return true
3762 }
3763 return updateJobPodsStatus(ctx, clientSet, jobObj, op, cnt)
3764 }
3765
3766 func setJobPodsReady(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, cnt int) (error, int) {
3767 op := func(p *v1.Pod) bool {
3768 if podutil.IsPodReady(p) {
3769 return false
3770 }
3771 p.Status.Conditions = append(p.Status.Conditions, v1.PodCondition{
3772 Type: v1.PodReady,
3773 Status: v1.ConditionTrue,
3774 })
3775 return true
3776 }
3777 return updateJobPodsStatus(ctx, clientSet, jobObj, op, cnt)
3778 }
3779
3780 func updateJobPodsStatus(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, op func(*v1.Pod) bool, cnt int) (error, int) {
3781 pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{})
3782 if err != nil {
3783 return fmt.Errorf("listing Job Pods: %w", err), 0
3784 }
3785 updates := make([]v1.Pod, 0, cnt)
3786 for _, pod := range pods.Items {
3787 if len(updates) == cnt {
3788 break
3789 }
3790 if p := pod.Status.Phase; metav1.IsControlledBy(&pod, jobObj) && p != v1.PodFailed && p != v1.PodSucceeded {
3791 if !op(&pod) {
3792 continue
3793 }
3794 updates = append(updates, pod)
3795 }
3796 }
3797 successful, err := updatePodStatuses(ctx, clientSet, updates)
3798 if successful != cnt {
3799 return fmt.Errorf("couldn't set phase on %d Job pods", cnt-successful), successful
3800 }
3801 return err, successful
3802 }
3803
3804 func updatePodStatuses(ctx context.Context, clientSet clientset.Interface, updates []v1.Pod) (int, error) {
3805 wg := sync.WaitGroup{}
3806 wg.Add(len(updates))
3807 errCh := make(chan error, len(updates))
3808 var updated int32
3809
3810 for _, pod := range updates {
3811 pod := pod
3812 go func() {
3813 _, err := clientSet.CoreV1().Pods(pod.Namespace).UpdateStatus(ctx, &pod, metav1.UpdateOptions{})
3814 if err != nil {
3815 errCh <- err
3816 } else {
3817 atomic.AddInt32(&updated, 1)
3818 }
3819 wg.Done()
3820 }()
3821 }
3822 wg.Wait()
3823
3824 select {
3825 case err := <-errCh:
3826 return int(updated), fmt.Errorf("updating Pod status: %w", err)
3827 default:
3828 }
3829 return int(updated), nil
3830 }
3831
3832 func setJobPhaseForIndex(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, phase v1.PodPhase, ix int) error {
3833 pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{})
3834 if err != nil {
3835 return fmt.Errorf("listing Job Pods: %w", err)
3836 }
3837 for _, pod := range pods.Items {
3838 if p := pod.Status.Phase; !metav1.IsControlledBy(&pod, jobObj) || p == v1.PodFailed || p == v1.PodSucceeded {
3839 continue
3840 }
3841 if pix, err := getCompletionIndex(&pod); err == nil && pix == ix {
3842 pod.Status.Phase = phase
3843 if phase == v1.PodFailed || phase == v1.PodSucceeded {
3844 pod.Status.ContainerStatuses = []v1.ContainerStatus{
3845 {
3846 State: v1.ContainerState{
3847 Terminated: &v1.ContainerStateTerminated{
3848 FinishedAt: metav1.Now(),
3849 },
3850 },
3851 },
3852 }
3853 }
3854 _, err := clientSet.CoreV1().Pods(pod.Namespace).UpdateStatus(ctx, &pod, metav1.UpdateOptions{})
3855 if err != nil {
3856 return fmt.Errorf("updating pod %s status: %w", pod.Name, err)
3857 }
3858 return nil
3859 }
3860 }
3861 return errors.New("no pod matching index found")
3862 }
3863
3864 func getActivePodForIndex(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, ix int) (*v1.Pod, error) {
3865 return getJobPodForIndex(ctx, clientSet, jobObj, ix, func(p *v1.Pod) bool {
3866 return !podutil.IsPodTerminal(p)
3867 })
3868 }
3869
3870 func getJobPodForIndex(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, ix int, filter func(*v1.Pod) bool) (*v1.Pod, error) {
3871 pods, err := getJobPodsForIndex(ctx, clientSet, jobObj, ix, filter)
3872 if err != nil {
3873 return nil, err
3874 }
3875 if len(pods) == 0 {
3876 return nil, fmt.Errorf("Pod not found for index: %v", ix)
3877 }
3878 return pods[0], nil
3879 }
3880
3881 func getJobPodsForIndex(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, ix int, filter func(*v1.Pod) bool) ([]*v1.Pod, error) {
3882 pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{})
3883 if err != nil {
3884 return nil, fmt.Errorf("listing Job Pods: %w", err)
3885 }
3886 var result []*v1.Pod
3887 for _, pod := range pods.Items {
3888 pod := pod
3889 if !metav1.IsControlledBy(&pod, jobObj) {
3890 continue
3891 }
3892 if !filter(&pod) {
3893 continue
3894 }
3895 if pix, err := getCompletionIndex(&pod); err == nil && pix == ix {
3896 result = append(result, &pod)
3897 }
3898 }
3899 return result, nil
3900 }
3901
3902 func getCompletionIndex(p *v1.Pod) (int, error) {
3903 if p.Annotations == nil {
3904 return 0, errors.New("no annotations found")
3905 }
3906 v, ok := p.Annotations[batchv1.JobCompletionIndexAnnotation]
3907 if !ok {
3908 return 0, fmt.Errorf("annotation %s not found", batchv1.JobCompletionIndexAnnotation)
3909 }
3910 return strconv.Atoi(v)
3911 }
3912
3913 func createJobWithDefaults(ctx context.Context, clientSet clientset.Interface, ns string, jobObj *batchv1.Job) (*batchv1.Job, error) {
3914 if jobObj.Name == "" {
3915 jobObj.Name = "test-job"
3916 }
3917 if len(jobObj.Spec.Template.Spec.Containers) == 0 {
3918 jobObj.Spec.Template.Spec.Containers = []v1.Container{
3919 {Name: "foo", Image: "bar"},
3920 }
3921 }
3922 if jobObj.Spec.Template.Spec.RestartPolicy == "" {
3923 jobObj.Spec.Template.Spec.RestartPolicy = v1.RestartPolicyNever
3924 }
3925 return clientSet.BatchV1().Jobs(ns).Create(ctx, jobObj, metav1.CreateOptions{})
3926 }
3927
3928 func setup(t testing.TB, nsBaseName string) (framework.TearDownFunc, *restclient.Config, clientset.Interface, *v1.Namespace) {
3929
3930 server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, framework.SharedEtcd())
3931
3932 config := restclient.CopyConfig(server.ClientConfig)
3933 config.QPS = 200
3934 config.Burst = 200
3935 config.Timeout = 0
3936 clientSet, err := clientset.NewForConfig(config)
3937 if err != nil {
3938 t.Fatalf("Error creating clientset: %v", err)
3939 }
3940
3941 ns := framework.CreateNamespaceOrDie(clientSet, nsBaseName, t)
3942 closeFn := func() {
3943 framework.DeleteNamespaceOrDie(clientSet, ns, t)
3944 server.TearDownFn()
3945 }
3946 return closeFn, config, clientSet, ns
3947 }
3948
3949 func startJobControllerAndWaitForCaches(tb testing.TB, restConfig *restclient.Config) (context.Context, context.CancelFunc) {
3950 tb.Helper()
3951 informerSet := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(restConfig, "job-informers")), 0)
3952 jc, ctx, cancel := createJobControllerWithSharedInformers(tb, restConfig, informerSet)
3953 informerSet.Start(ctx.Done())
3954 go jc.Run(ctx, 1)
3955
3956
3957
3958
3959
3960 informerSet.WaitForCacheSync(ctx.Done())
3961 return ctx, cancel
3962 }
3963
3964 func resetMetrics() {
3965 metrics.TerminatedPodsTrackingFinalizerTotal.Reset()
3966 metrics.JobFinishedNum.Reset()
3967 metrics.JobPodsFinished.Reset()
3968 metrics.PodFailuresHandledByFailurePolicy.Reset()
3969 metrics.JobFinishedIndexesTotal.Reset()
3970 metrics.JobPodsCreationTotal.Reset()
3971 metrics.JobByExternalControllerTotal.Reset()
3972 }
3973
3974 func createJobControllerWithSharedInformers(tb testing.TB, restConfig *restclient.Config, informerSet informers.SharedInformerFactory) (*jobcontroller.Controller, context.Context, context.CancelFunc) {
3975 tb.Helper()
3976 clientSet := clientset.NewForConfigOrDie(restclient.AddUserAgent(restConfig, "job-controller"))
3977 ctx, cancel := context.WithCancel(context.Background())
3978 jc, err := jobcontroller.NewController(ctx, informerSet.Core().V1().Pods(), informerSet.Batch().V1().Jobs(), clientSet)
3979 if err != nil {
3980 tb.Fatalf("Error creating Job controller: %v", err)
3981 }
3982 return jc, ctx, cancel
3983 }
3984
3985 func hasJobTrackingFinalizer(obj metav1.Object) bool {
3986 for _, fin := range obj.GetFinalizers() {
3987 if fin == batchv1.JobTrackingFinalizer {
3988 return true
3989 }
3990 }
3991 return false
3992 }
3993
3994 func setDuringTest(val *int, newVal int) func() {
3995 origVal := *val
3996 *val = newVal
3997 return func() {
3998 *val = origVal
3999 }
4000 }
4001
4002 func setDurationDuringTest(val *time.Duration, newVal time.Duration) func() {
4003 origVal := *val
4004 *val = newVal
4005 return func() {
4006 *val = origVal
4007 }
4008 }
4009
4010 func updateJob(ctx context.Context, jobClient typedv1.JobInterface, jobName string, updateFunc func(*batchv1.Job)) (*batchv1.Job, error) {
4011 var job *batchv1.Job
4012 err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
4013 newJob, err := jobClient.Get(ctx, jobName, metav1.GetOptions{})
4014 if err != nil {
4015 return err
4016 }
4017 updateFunc(newJob)
4018 job, err = jobClient.Update(ctx, newJob, metav1.UpdateOptions{})
4019 return err
4020 })
4021 return job, err
4022 }
4023
4024 func waitForPodsToBeActive(ctx context.Context, t *testing.T, jobClient typedv1.JobInterface, podCount int32, jobObj *batchv1.Job) {
4025 t.Helper()
4026 err := wait.PollUntilContextTimeout(ctx, 5*time.Millisecond, wait.ForeverTestTimeout, true, func(context.Context) (done bool, err error) {
4027 job, err := jobClient.Get(ctx, jobObj.Name, metav1.GetOptions{})
4028 if err != nil {
4029 return false, err
4030 }
4031 return job.Status.Active == podCount, nil
4032 })
4033 if err != nil {
4034 t.Fatalf("Error waiting for Job pods to become active: %v", err)
4035 }
4036 }
4037
4038 func deletePods(ctx context.Context, t *testing.T, clientSet clientset.Interface, namespace string) {
4039 t.Helper()
4040 err := clientSet.CoreV1().Pods(namespace).DeleteCollection(ctx,
4041 metav1.DeleteOptions{},
4042 metav1.ListOptions{
4043 Limit: 1000,
4044 })
4045 if err != nil {
4046 t.Fatalf("Failed to cleanup Pods: %v", err)
4047 }
4048 }
4049
4050 func removePodsFinalizer(ctx context.Context, t *testing.T, clientSet clientset.Interface, namespace string) {
4051 t.Helper()
4052 pods, err := clientSet.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{})
4053 if err != nil {
4054 t.Fatalf("Failed to list pods: %v", err)
4055 }
4056 updatePod(ctx, t, clientSet, pods.Items, func(pod *v1.Pod) {
4057 for i, finalizer := range pod.Finalizers {
4058 if finalizer == "fake.example.com/blockDeletion" {
4059 pod.Finalizers = append(pod.Finalizers[:i], pod.Finalizers[i+1:]...)
4060 }
4061 }
4062 })
4063 }
4064
4065 func updatePod(ctx context.Context, t *testing.T, clientSet clientset.Interface, pods []v1.Pod, updateFunc func(*v1.Pod)) {
4066 t.Helper()
4067 for _, val := range pods {
4068 if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
4069 newPod, err := clientSet.CoreV1().Pods(val.Namespace).Get(ctx, val.Name, metav1.GetOptions{})
4070 if err != nil {
4071 return err
4072 }
4073 updateFunc(newPod)
4074 _, err = clientSet.CoreV1().Pods(val.Namespace).Update(ctx, newPod, metav1.UpdateOptions{})
4075 return err
4076 }); err != nil {
4077 t.Fatalf("Failed to update pod %s: %v", val.Name, err)
4078 }
4079 }
4080 }
4081
4082 func failTerminatingPods(ctx context.Context, t *testing.T, clientSet clientset.Interface, namespace string) {
4083 t.Helper()
4084 pods, err := clientSet.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{})
4085 if err != nil {
4086 t.Fatalf("Failed to list pods: %v", err)
4087 }
4088 var terminatingPods []v1.Pod
4089 for _, pod := range pods.Items {
4090 if pod.DeletionTimestamp != nil {
4091 pod.Status.Phase = v1.PodFailed
4092 terminatingPods = append(terminatingPods, pod)
4093 }
4094 }
4095 _, err = updatePodStatuses(ctx, clientSet, terminatingPods)
4096 if err != nil {
4097 t.Fatalf("Failed to update pod statuses: %v", err)
4098 }
4099 }
4100
View as plain text