1
16
17 package job
18
19 import (
20 "math"
21 "strconv"
22 "testing"
23 "time"
24
25 "github.com/google/go-cmp/cmp"
26 batch "k8s.io/api/batch/v1"
27 v1 "k8s.io/api/core/v1"
28 "k8s.io/apimachinery/pkg/util/sets"
29 "k8s.io/apiserver/pkg/util/feature"
30 featuregatetesting "k8s.io/component-base/featuregate/testing"
31 "k8s.io/klog/v2/ktesting"
32 "k8s.io/kubernetes/pkg/controller"
33 "k8s.io/kubernetes/pkg/features"
34 "k8s.io/utils/ptr"
35 )
36
37 const noIndex = "-"
38
39 func TestCalculateSucceededIndexes(t *testing.T) {
40 logger, _ := ktesting.NewTestContext(t)
41 cases := map[string]struct {
42 prevSucceeded string
43 pods []indexPhase
44 completions int32
45 wantStatusIntervals orderedIntervals
46 wantIntervals orderedIntervals
47 }{
48 "one index": {
49 pods: []indexPhase{{"1", v1.PodSucceeded}},
50 completions: 2,
51 wantIntervals: []interval{{1, 1}},
52 },
53 "two separate": {
54 pods: []indexPhase{
55 {"2", v1.PodFailed},
56 {"5", v1.PodSucceeded},
57 {"5", v1.PodSucceeded},
58 {"10", v1.PodFailed},
59 {"10", v1.PodSucceeded},
60 },
61 completions: 11,
62 wantIntervals: []interval{{5, 5}, {10, 10}},
63 },
64 "two intervals": {
65 pods: []indexPhase{
66 {"0", v1.PodRunning},
67 {"1", v1.PodPending},
68 {"2", v1.PodSucceeded},
69 {"3", v1.PodSucceeded},
70 {"5", v1.PodSucceeded},
71 {"6", v1.PodSucceeded},
72 {"7", v1.PodSucceeded},
73 },
74 completions: 8,
75 wantIntervals: []interval{{2, 3}, {5, 7}},
76 },
77 "one index and one interval": {
78 pods: []indexPhase{
79 {"0", v1.PodSucceeded},
80 {"1", v1.PodFailed},
81 {"2", v1.PodSucceeded},
82 {"3", v1.PodSucceeded},
83 {"4", v1.PodSucceeded},
84 {"5", v1.PodSucceeded},
85 {noIndex, v1.PodSucceeded},
86 {"-2", v1.PodSucceeded},
87 },
88 completions: 6,
89 wantIntervals: []interval{{0, 0}, {2, 5}},
90 },
91 "out of range": {
92 pods: []indexPhase{
93 {"0", v1.PodSucceeded},
94 {"1", v1.PodSucceeded},
95 {"2", v1.PodSucceeded},
96 {"3", v1.PodFailed},
97 {"4", v1.PodSucceeded},
98 {"5", v1.PodSucceeded},
99 {noIndex, v1.PodSucceeded},
100 {"-2", v1.PodSucceeded},
101 },
102 completions: 5,
103 wantIntervals: []interval{{0, 2}, {4, 4}},
104 },
105 "prev interval out of range": {
106 prevSucceeded: "0-5,8-10",
107 completions: 8,
108 wantStatusIntervals: []interval{{0, 5}},
109 wantIntervals: []interval{{0, 5}},
110 },
111 "prev interval partially out of range": {
112 prevSucceeded: "0-5,8-10",
113 completions: 10,
114 wantStatusIntervals: []interval{{0, 5}, {8, 9}},
115 wantIntervals: []interval{{0, 5}, {8, 9}},
116 },
117 "prev and new separate": {
118 prevSucceeded: "0,4,5,10-12",
119 pods: []indexPhase{
120 {"2", v1.PodSucceeded},
121 {"7", v1.PodSucceeded},
122 {"8", v1.PodSucceeded},
123 },
124 completions: 13,
125 wantStatusIntervals: []interval{
126 {0, 0},
127 {4, 5},
128 {10, 12},
129 },
130 wantIntervals: []interval{
131 {0, 0},
132 {2, 2},
133 {4, 5},
134 {7, 8},
135 {10, 12},
136 },
137 },
138 "prev between new": {
139 prevSucceeded: "3,4,6",
140 pods: []indexPhase{
141 {"2", v1.PodSucceeded},
142 {"7", v1.PodSucceeded},
143 {"8", v1.PodSucceeded},
144 },
145 completions: 9,
146 wantStatusIntervals: []interval{
147 {3, 4},
148 {6, 6},
149 },
150 wantIntervals: []interval{
151 {2, 4},
152 {6, 8},
153 },
154 },
155 "new between prev": {
156 prevSucceeded: "2,7,8",
157 pods: []indexPhase{
158 {"3", v1.PodSucceeded},
159 {"4", v1.PodSucceeded},
160 {"6", v1.PodSucceeded},
161 },
162 completions: 9,
163 wantStatusIntervals: []interval{
164 {2, 2},
165 {7, 8},
166 },
167 wantIntervals: []interval{
168 {2, 4},
169 {6, 8},
170 },
171 },
172 "new within prev": {
173 prevSucceeded: "2-7",
174 pods: []indexPhase{
175 {"0", v1.PodSucceeded},
176 {"3", v1.PodSucceeded},
177 {"5", v1.PodSucceeded},
178 {"9", v1.PodSucceeded},
179 },
180 completions: 10,
181 wantStatusIntervals: []interval{
182 {2, 7},
183 },
184 wantIntervals: []interval{
185 {0, 0},
186 {2, 7},
187 {9, 9},
188 },
189 },
190 "corrupted interval": {
191 prevSucceeded: "0,1-foo,bar",
192 pods: []indexPhase{
193 {"3", v1.PodSucceeded},
194 },
195 completions: 4,
196 wantStatusIntervals: []interval{
197 {0, 0},
198 },
199 wantIntervals: []interval{
200 {0, 0},
201 {3, 3},
202 },
203 },
204 }
205 for name, tc := range cases {
206 t.Run(name, func(t *testing.T) {
207 job := &batch.Job{
208 Status: batch.JobStatus{
209 CompletedIndexes: tc.prevSucceeded,
210 },
211 Spec: batch.JobSpec{
212 Completions: ptr.To(tc.completions),
213 },
214 }
215 pods := hollowPodsWithIndexPhase(tc.pods)
216 for _, p := range pods {
217 p.Finalizers = append(p.Finalizers, batch.JobTrackingFinalizer)
218 }
219 gotStatusIntervals, gotIntervals := calculateSucceededIndexes(logger, job, pods)
220 if diff := cmp.Diff(tc.wantStatusIntervals, gotStatusIntervals); diff != "" {
221 t.Errorf("Unexpected completed indexes from status (-want,+got):\n%s", diff)
222 }
223 if diff := cmp.Diff(tc.wantIntervals, gotIntervals); diff != "" {
224 t.Errorf("Unexpected completed indexes (-want,+got):\n%s", diff)
225 }
226 })
227 }
228 }
229
230 func TestIsIndexFailed(t *testing.T) {
231 logger, _ := ktesting.NewTestContext(t)
232 cases := map[string]struct {
233 enableJobPodFailurePolicy bool
234 job batch.Job
235 pod *v1.Pod
236 wantResult bool
237 }{
238 "failed pod exceeding backoffLimitPerIndex, when backoffLimitPerIndex=0": {
239 job: batch.Job{
240 Spec: batch.JobSpec{
241 Completions: ptr.To[int32](2),
242 BackoffLimitPerIndex: ptr.To[int32](0),
243 },
244 },
245 pod: buildPod().indexFailureCount("0").phase(v1.PodFailed).index("0").trackingFinalizer().Pod,
246 wantResult: true,
247 },
248 "failed pod exceeding backoffLimitPerIndex, when backoffLimitPerIndex=1": {
249 job: batch.Job{
250 Spec: batch.JobSpec{
251 Completions: ptr.To[int32](2),
252 BackoffLimitPerIndex: ptr.To[int32](1),
253 },
254 },
255 pod: buildPod().indexFailureCount("1").phase(v1.PodFailed).index("1").trackingFinalizer().Pod,
256 wantResult: true,
257 },
258 "matching FailIndex pod failure policy; JobPodFailurePolicy enabled": {
259 enableJobPodFailurePolicy: true,
260 job: batch.Job{
261 Spec: batch.JobSpec{
262 Completions: ptr.To[int32](2),
263 BackoffLimitPerIndex: ptr.To[int32](1),
264 PodFailurePolicy: &batch.PodFailurePolicy{
265 Rules: []batch.PodFailurePolicyRule{
266 {
267 Action: batch.PodFailurePolicyActionFailIndex,
268 OnExitCodes: &batch.PodFailurePolicyOnExitCodesRequirement{
269 Operator: batch.PodFailurePolicyOnExitCodesOpIn,
270 Values: []int32{3},
271 },
272 },
273 },
274 },
275 },
276 },
277 pod: buildPod().indexFailureCount("0").status(v1.PodStatus{
278 Phase: v1.PodFailed,
279 ContainerStatuses: []v1.ContainerStatus{
280 {
281 State: v1.ContainerState{
282 Terminated: &v1.ContainerStateTerminated{
283 ExitCode: 3,
284 },
285 },
286 },
287 },
288 }).index("0").trackingFinalizer().Pod,
289 wantResult: true,
290 },
291 "matching FailIndex pod failure policy; JobPodFailurePolicy disabled": {
292 enableJobPodFailurePolicy: false,
293 job: batch.Job{
294 Spec: batch.JobSpec{
295 Completions: ptr.To[int32](2),
296 BackoffLimitPerIndex: ptr.To[int32](1),
297 PodFailurePolicy: &batch.PodFailurePolicy{
298 Rules: []batch.PodFailurePolicyRule{
299 {
300 Action: batch.PodFailurePolicyActionFailIndex,
301 OnExitCodes: &batch.PodFailurePolicyOnExitCodesRequirement{
302 Operator: batch.PodFailurePolicyOnExitCodesOpIn,
303 Values: []int32{3},
304 },
305 },
306 },
307 },
308 },
309 },
310 pod: buildPod().indexFailureCount("0").status(v1.PodStatus{
311 Phase: v1.PodFailed,
312 ContainerStatuses: []v1.ContainerStatus{
313 {
314 State: v1.ContainerState{
315 Terminated: &v1.ContainerStateTerminated{
316 ExitCode: 3,
317 },
318 },
319 },
320 },
321 }).index("0").trackingFinalizer().Pod,
322 wantResult: false,
323 },
324 }
325 for name, tc := range cases {
326 t.Run(name, func(t *testing.T) {
327 defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, true)()
328 defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, tc.enableJobPodFailurePolicy)()
329 gotResult := isIndexFailed(logger, &tc.job, tc.pod)
330 if diff := cmp.Diff(tc.wantResult, gotResult); diff != "" {
331 t.Errorf("Unexpected result (-want,+got):\n%s", diff)
332 }
333 })
334 }
335 }
336
337 func TestCalculateFailedIndexes(t *testing.T) {
338 logger, _ := ktesting.NewTestContext(t)
339 cases := map[string]struct {
340 enableJobPodFailurePolicy bool
341 job batch.Job
342 pods []*v1.Pod
343 wantPrevFailedIndexes orderedIntervals
344 wantFailedIndexes orderedIntervals
345 }{
346 "one new index failed": {
347 job: batch.Job{
348 Spec: batch.JobSpec{
349 Completions: ptr.To[int32](2),
350 BackoffLimitPerIndex: ptr.To[int32](1),
351 },
352 },
353 pods: []*v1.Pod{
354 buildPod().indexFailureCount("0").phase(v1.PodFailed).index("0").trackingFinalizer().Pod,
355 buildPod().indexFailureCount("1").phase(v1.PodFailed).index("1").trackingFinalizer().Pod,
356 },
357 wantFailedIndexes: []interval{{1, 1}},
358 },
359 "pod without finalizer is ignored": {
360 job: batch.Job{
361 Spec: batch.JobSpec{
362 Completions: ptr.To[int32](2),
363 BackoffLimitPerIndex: ptr.To[int32](0),
364 },
365 },
366 pods: []*v1.Pod{
367 buildPod().indexFailureCount("0").phase(v1.PodFailed).index("0").Pod,
368 },
369 wantFailedIndexes: nil,
370 },
371 "pod outside completions is ignored": {
372 job: batch.Job{
373 Spec: batch.JobSpec{
374 Completions: ptr.To[int32](2),
375 BackoffLimitPerIndex: ptr.To[int32](0),
376 },
377 },
378 pods: []*v1.Pod{
379 buildPod().indexFailureCount("0").phase(v1.PodFailed).index("3").Pod,
380 },
381 wantFailedIndexes: nil,
382 },
383 "extend the failed indexes": {
384 job: batch.Job{
385 Status: batch.JobStatus{
386 FailedIndexes: ptr.To("0"),
387 },
388 Spec: batch.JobSpec{
389 Completions: ptr.To[int32](2),
390 BackoffLimitPerIndex: ptr.To[int32](0),
391 },
392 },
393 pods: []*v1.Pod{
394 buildPod().indexFailureCount("0").phase(v1.PodFailed).index("1").trackingFinalizer().Pod,
395 },
396 wantFailedIndexes: []interval{{0, 1}},
397 },
398 "prev failed indexes empty": {
399 job: batch.Job{
400 Status: batch.JobStatus{
401 FailedIndexes: ptr.To(""),
402 },
403 Spec: batch.JobSpec{
404 Completions: ptr.To[int32](2),
405 BackoffLimitPerIndex: ptr.To[int32](0),
406 },
407 },
408 pods: []*v1.Pod{
409 buildPod().indexFailureCount("0").phase(v1.PodFailed).index("1").trackingFinalizer().Pod,
410 },
411 wantFailedIndexes: []interval{{1, 1}},
412 },
413 "prev failed indexes outside the completions": {
414 job: batch.Job{
415 Status: batch.JobStatus{
416 FailedIndexes: ptr.To("9"),
417 },
418 Spec: batch.JobSpec{
419 Completions: ptr.To[int32](2),
420 BackoffLimitPerIndex: ptr.To[int32](0),
421 },
422 },
423 pods: []*v1.Pod{
424 buildPod().indexFailureCount("0").phase(v1.PodFailed).index("1").trackingFinalizer().Pod,
425 },
426 wantFailedIndexes: []interval{{1, 1}},
427 },
428 }
429 for name, tc := range cases {
430 t.Run(name, func(t *testing.T) {
431 failedIndexes := calculateFailedIndexes(logger, &tc.job, tc.pods)
432 if diff := cmp.Diff(&tc.wantFailedIndexes, failedIndexes); diff != "" {
433 t.Errorf("Unexpected failed indexes (-want,+got):\n%s", diff)
434 }
435 })
436 }
437 }
438
439 func TestGetPodsWithDelayedDeletionPerIndex(t *testing.T) {
440 logger, _ := ktesting.NewTestContext(t)
441 now := time.Now()
442 cases := map[string]struct {
443 enableJobPodFailurePolicy bool
444 job batch.Job
445 pods []*v1.Pod
446 expectedRmFinalizers sets.Set[string]
447 wantPodsWithDelayedDeletionPerIndex []string
448 }{
449 "failed pods are kept corresponding to non-failed indexes are kept": {
450 job: batch.Job{
451 Spec: batch.JobSpec{
452 Completions: ptr.To[int32](3),
453 BackoffLimitPerIndex: ptr.To[int32](1),
454 },
455 },
456 pods: []*v1.Pod{
457 buildPod().uid("a").indexFailureCount("0").phase(v1.PodFailed).index("0").trackingFinalizer().Pod,
458 buildPod().uid("b").indexFailureCount("1").phase(v1.PodFailed).index("1").trackingFinalizer().Pod,
459 buildPod().uid("c").indexFailureCount("0").phase(v1.PodFailed).index("2").trackingFinalizer().Pod,
460 },
461 wantPodsWithDelayedDeletionPerIndex: []string{"a", "c"},
462 },
463 "failed pod without finalizer; the pod's deletion is not delayed as it already started": {
464 job: batch.Job{
465 Spec: batch.JobSpec{
466 Completions: ptr.To[int32](2),
467 BackoffLimitPerIndex: ptr.To[int32](0),
468 },
469 },
470 pods: []*v1.Pod{
471 buildPod().uid("a").indexFailureCount("0").phase(v1.PodFailed).index("0").Pod,
472 },
473 wantPodsWithDelayedDeletionPerIndex: []string{},
474 },
475 "failed pod with expected finalizer removal; the pod's deletion is not delayed as it already started": {
476 job: batch.Job{
477 Spec: batch.JobSpec{
478 Completions: ptr.To[int32](2),
479 BackoffLimitPerIndex: ptr.To[int32](0),
480 },
481 },
482 pods: []*v1.Pod{
483 buildPod().uid("a").indexFailureCount("0").phase(v1.PodFailed).index("0").trackingFinalizer().Pod,
484 },
485 expectedRmFinalizers: sets.New("a"),
486 wantPodsWithDelayedDeletionPerIndex: []string{},
487 },
488 "failed pod with index outside of completions; the pod's deletion is not delayed": {
489 job: batch.Job{
490 Spec: batch.JobSpec{
491 Completions: ptr.To[int32](2),
492 BackoffLimitPerIndex: ptr.To[int32](0),
493 },
494 },
495 pods: []*v1.Pod{
496 buildPod().uid("a").indexFailureCount("0").phase(v1.PodFailed).index("4").trackingFinalizer().Pod,
497 },
498 wantPodsWithDelayedDeletionPerIndex: []string{},
499 },
500 "failed pod for active index; the pod's deletion is not delayed as it is already replaced": {
501 job: batch.Job{
502 Spec: batch.JobSpec{
503 Completions: ptr.To[int32](2),
504 BackoffLimitPerIndex: ptr.To[int32](1),
505 },
506 },
507 pods: []*v1.Pod{
508 buildPod().uid("a1").indexFailureCount("0").phase(v1.PodFailed).index("0").trackingFinalizer().Pod,
509 buildPod().uid("a2").indexFailureCount("1").phase(v1.PodRunning).index("0").trackingFinalizer().Pod,
510 },
511 wantPodsWithDelayedDeletionPerIndex: []string{},
512 },
513 "failed pod for succeeded index; the pod's deletion is not delayed as it is already replaced": {
514 job: batch.Job{
515 Spec: batch.JobSpec{
516 Completions: ptr.To[int32](2),
517 BackoffLimitPerIndex: ptr.To[int32](1),
518 },
519 },
520 pods: []*v1.Pod{
521 buildPod().uid("a1").indexFailureCount("0").phase(v1.PodFailed).index("0").trackingFinalizer().Pod,
522 buildPod().uid("a2").indexFailureCount("1").phase(v1.PodSucceeded).index("0").trackingFinalizer().Pod,
523 },
524 wantPodsWithDelayedDeletionPerIndex: []string{},
525 },
526 "multiple failed pods for index with different failure count; only the pod with highest failure count is kept": {
527 job: batch.Job{
528 Spec: batch.JobSpec{
529 Completions: ptr.To[int32](2),
530 BackoffLimitPerIndex: ptr.To[int32](4),
531 },
532 },
533 pods: []*v1.Pod{
534 buildPod().uid("a1").indexFailureCount("0").phase(v1.PodFailed).index("0").trackingFinalizer().Pod,
535 buildPod().uid("a3").indexFailureCount("2").phase(v1.PodFailed).index("0").trackingFinalizer().Pod,
536 buildPod().uid("a2").indexFailureCount("1").phase(v1.PodFailed).index("0").trackingFinalizer().Pod,
537 },
538 wantPodsWithDelayedDeletionPerIndex: []string{"a3"},
539 },
540 "multiple failed pods for index with different finish times; only the last failed pod is kept": {
541 job: batch.Job{
542 Spec: batch.JobSpec{
543 Completions: ptr.To[int32](2),
544 BackoffLimitPerIndex: ptr.To[int32](4),
545 },
546 },
547 pods: []*v1.Pod{
548 buildPod().uid("a1").indexFailureCount("1").phase(v1.PodFailed).index("0").customDeletionTimestamp(now.Add(-time.Second)).trackingFinalizer().Pod,
549 buildPod().uid("a3").indexFailureCount("1").phase(v1.PodFailed).index("0").customDeletionTimestamp(now).trackingFinalizer().Pod,
550 buildPod().uid("a2").indexFailureCount("1").phase(v1.PodFailed).index("0").customDeletionTimestamp(now.Add(-2 * time.Second)).trackingFinalizer().Pod,
551 },
552 wantPodsWithDelayedDeletionPerIndex: []string{"a3"},
553 },
554 }
555 for name, tc := range cases {
556 t.Run(name, func(t *testing.T) {
557 defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, true)()
558 activePods := controller.FilterActivePods(logger, tc.pods)
559 failedIndexes := calculateFailedIndexes(logger, &tc.job, tc.pods)
560 _, succeededIndexes := calculateSucceededIndexes(logger, &tc.job, tc.pods)
561 jobCtx := &syncJobCtx{
562 job: &tc.job,
563 pods: tc.pods,
564 activePods: activePods,
565 succeededIndexes: succeededIndexes,
566 failedIndexes: failedIndexes,
567 expectedRmFinalizers: tc.expectedRmFinalizers,
568 }
569 gotPodsWithDelayedDeletionPerIndex := getPodsWithDelayedDeletionPerIndex(logger, jobCtx)
570 gotPodsWithDelayedDeletionPerIndexSet := sets.New[string]()
571 for _, pod := range gotPodsWithDelayedDeletionPerIndex {
572 gotPodsWithDelayedDeletionPerIndexSet.Insert(string(pod.UID))
573 }
574 if diff := cmp.Diff(tc.wantPodsWithDelayedDeletionPerIndex, sets.List(gotPodsWithDelayedDeletionPerIndexSet)); diff != "" {
575 t.Errorf("Unexpected set of pods with delayed deletion (-want,+got):\n%s", diff)
576 }
577 })
578 }
579 }
580
581 func TestGetNewIndexFailureCountValue(t *testing.T) {
582 logger, _ := ktesting.NewTestContext(t)
583 cases := map[string]struct {
584 enableJobPodFailurePolicy bool
585 job batch.Job
586 pod *v1.Pod
587 wantNewIndexFailureCount int32
588 wantNewIndexIgnoredFailureCount int32
589 }{
590 "first pod created": {
591 job: batch.Job{},
592 wantNewIndexFailureCount: 0,
593 },
594 "failed pod being replaced with 0 index failure count": {
595 job: batch.Job{},
596 pod: buildPod().uid("a").indexFailureCount("0").phase(v1.PodFailed).index("0").trackingFinalizer().Pod,
597 wantNewIndexFailureCount: 1,
598 },
599 "failed pod being replaced with >0 index failure count": {
600 job: batch.Job{},
601 pod: buildPod().uid("a").indexFailureCount("3").phase(v1.PodFailed).index("0").trackingFinalizer().Pod,
602 wantNewIndexFailureCount: 4,
603 },
604 "failed pod being replaced, matching the ignore rule; JobPodFailurePolicy enabled": {
605 enableJobPodFailurePolicy: true,
606 job: batch.Job{
607 Spec: batch.JobSpec{
608 PodFailurePolicy: &batch.PodFailurePolicy{
609 Rules: []batch.PodFailurePolicyRule{
610 {
611 Action: batch.PodFailurePolicyActionIgnore,
612 OnPodConditions: []batch.PodFailurePolicyOnPodConditionsPattern{
613 {
614 Type: v1.DisruptionTarget,
615 Status: v1.ConditionTrue,
616 },
617 },
618 },
619 },
620 },
621 },
622 },
623 pod: buildPod().uid("a").indexFailureCount("3").status(v1.PodStatus{
624 Phase: v1.PodFailed,
625 Conditions: []v1.PodCondition{
626 {
627 Type: v1.DisruptionTarget,
628 Status: v1.ConditionTrue,
629 },
630 },
631 }).index("3").trackingFinalizer().Pod,
632 wantNewIndexFailureCount: 3,
633 wantNewIndexIgnoredFailureCount: 1,
634 },
635 }
636 for name, tc := range cases {
637 t.Run(name, func(t *testing.T) {
638 defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, true)()
639 defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, tc.enableJobPodFailurePolicy)()
640 gotNewIndexFailureCount, gotNewIndexIgnoredFailureCount := getNewIndexFailureCounts(logger, &tc.job, tc.pod)
641 if diff := cmp.Diff(tc.wantNewIndexFailureCount, gotNewIndexFailureCount); diff != "" {
642 t.Errorf("Unexpected set of pods with delayed deletion (-want,+got):\n%s", diff)
643 }
644 if diff := cmp.Diff(tc.wantNewIndexIgnoredFailureCount, gotNewIndexIgnoredFailureCount); diff != "" {
645 t.Errorf("Unexpected set of pods with delayed deletion (-want,+got):\n%s", diff)
646 }
647 })
648 }
649 }
650
651 func TestIntervalsHaveIndex(t *testing.T) {
652 cases := map[string]struct {
653 intervals orderedIntervals
654 index int
655 wantHas bool
656 }{
657 "empty": {
658 index: 4,
659 },
660 "before all": {
661 index: 1,
662 intervals: []interval{{2, 4}, {5, 7}},
663 },
664 "after all": {
665 index: 9,
666 intervals: []interval{{2, 4}, {6, 8}},
667 },
668 "in between": {
669 index: 5,
670 intervals: []interval{{2, 4}, {6, 8}},
671 },
672 "in first": {
673 index: 2,
674 intervals: []interval{{2, 4}, {6, 8}},
675 wantHas: true,
676 },
677 "in second": {
678 index: 8,
679 intervals: []interval{{2, 4}, {6, 8}},
680 wantHas: true,
681 },
682 }
683 for name, tc := range cases {
684 t.Run(name, func(t *testing.T) {
685 has := tc.intervals.has(tc.index)
686 if has != tc.wantHas {
687 t.Errorf("intervalsHaveIndex(_, _) = %t, want %t", has, tc.wantHas)
688 }
689 })
690 }
691 }
692
693 func TestFirstPendingIndexes(t *testing.T) {
694 cases := map[string]struct {
695 cnt int
696 completions int
697 activePods []indexPhase
698 succeededIndexes []interval
699 failedIndexes *orderedIntervals
700 want []int
701 }{
702 "cnt greater than completions": {
703 cnt: 5,
704 completions: 3,
705 want: []int{0, 1, 2},
706 },
707 "cnt less than completions": {
708 cnt: 2,
709 completions: 5,
710 want: []int{0, 1},
711 },
712 "first pods active": {
713 activePods: []indexPhase{
714 {"0", v1.PodRunning},
715 {"1", v1.PodPending},
716 },
717 cnt: 3,
718 completions: 10,
719 want: []int{2, 3, 4},
720 },
721 "last pods active or succeeded": {
722 activePods: []indexPhase{
723 {"6", v1.PodPending},
724 },
725 succeededIndexes: []interval{{4, 5}},
726 cnt: 6,
727 completions: 6,
728 want: []int{0, 1, 2, 3},
729 },
730 "mixed": {
731 activePods: []indexPhase{
732 {"3", v1.PodPending},
733 {"5", v1.PodRunning},
734 {"8", v1.PodPending},
735 {noIndex, v1.PodRunning},
736 {"-3", v1.PodRunning},
737 },
738 succeededIndexes: []interval{{2, 4}, {9, 9}},
739 cnt: 5,
740 completions: 20,
741 want: []int{0, 1, 6, 7, 10},
742 },
743 "with failed indexes": {
744 activePods: []indexPhase{
745 {"3", v1.PodPending},
746 {"9", v1.PodPending},
747 },
748 succeededIndexes: []interval{{1, 1}, {5, 5}, {9, 9}},
749 failedIndexes: &orderedIntervals{{2, 2}, {6, 7}},
750 cnt: 5,
751 completions: 20,
752 want: []int{0, 4, 8, 10, 11},
753 },
754 }
755 for name, tc := range cases {
756 t.Run(name, func(t *testing.T) {
757 jobCtx := &syncJobCtx{
758 activePods: hollowPodsWithIndexPhase(tc.activePods),
759 succeededIndexes: tc.succeededIndexes,
760 failedIndexes: tc.failedIndexes,
761 job: newJob(1, 1, 1, batch.IndexedCompletion),
762 }
763 got := firstPendingIndexes(jobCtx, tc.cnt, tc.completions)
764 if diff := cmp.Diff(tc.want, got); diff != "" {
765 t.Errorf("Wrong first pending indexes (-want,+got):\n%s", diff)
766 }
767 })
768 }
769 }
770
771 func TestAppendDuplicatedIndexPodsForRemoval(t *testing.T) {
772 cases := map[string]struct {
773 pods []indexPhase
774 wantRm []indexPhase
775 wantLeft []indexPhase
776 completions int32
777 }{
778 "all unique": {
779 pods: []indexPhase{
780 {noIndex, v1.PodPending},
781 {"2", v1.PodPending},
782 {"5", v1.PodRunning},
783 {"6", v1.PodRunning},
784 },
785 wantRm: []indexPhase{
786 {noIndex, v1.PodPending},
787 {"6", v1.PodRunning},
788 },
789 wantLeft: []indexPhase{
790 {"2", v1.PodPending},
791 {"5", v1.PodRunning},
792 },
793 completions: 6,
794 },
795 "all with index": {
796 pods: []indexPhase{
797 {"5", v1.PodPending},
798 {"0", v1.PodRunning},
799 {"3", v1.PodPending},
800 {"0", v1.PodRunning},
801 {"3", v1.PodRunning},
802 {"0", v1.PodPending},
803 {"6", v1.PodRunning},
804 {"6", v1.PodPending},
805 },
806 wantRm: []indexPhase{
807 {"0", v1.PodPending},
808 {"0", v1.PodRunning},
809 {"3", v1.PodPending},
810 {"6", v1.PodRunning},
811 {"6", v1.PodPending},
812 },
813 wantLeft: []indexPhase{
814 {"0", v1.PodRunning},
815 {"3", v1.PodRunning},
816 {"5", v1.PodPending},
817 },
818 completions: 6,
819 },
820 "mixed": {
821 pods: []indexPhase{
822 {noIndex, v1.PodPending},
823 {"invalid", v1.PodRunning},
824 {"-2", v1.PodRunning},
825 {"0", v1.PodPending},
826 {"1", v1.PodPending},
827 {"1", v1.PodPending},
828 {"1", v1.PodRunning},
829 },
830 wantRm: []indexPhase{
831 {noIndex, v1.PodPending},
832 {"invalid", v1.PodRunning},
833 {"-2", v1.PodRunning},
834 {"1", v1.PodPending},
835 {"1", v1.PodPending},
836 },
837 wantLeft: []indexPhase{
838 {"0", v1.PodPending},
839 {"1", v1.PodRunning},
840 },
841 completions: 6,
842 },
843 }
844 for name, tc := range cases {
845 t.Run(name, func(t *testing.T) {
846 pods := hollowPodsWithIndexPhase(tc.pods)
847 rm, left := appendDuplicatedIndexPodsForRemoval(nil, nil, pods, int(tc.completions))
848 rmInt := toIndexPhases(rm)
849 leftInt := toIndexPhases(left)
850 if diff := cmp.Diff(tc.wantRm, rmInt); diff != "" {
851 t.Errorf("Unexpected pods for removal (-want,+got):\n%s", diff)
852 }
853 if diff := cmp.Diff(tc.wantLeft, leftInt); diff != "" {
854 t.Errorf("Unexpected pods to keep (-want,+got):\n%s", diff)
855 }
856 })
857 }
858 }
859
860 func TestPodGenerateNameWithIndex(t *testing.T) {
861 cases := map[string]struct {
862 jobname string
863 index int
864 wantPodGenerateName string
865 }{
866 "short job name": {
867 jobname: "indexed-job",
868 index: 1,
869 wantPodGenerateName: "indexed-job-1-",
870 },
871 "job name exceeds MaxGeneneratedNameLength": {
872 jobname: "hhhhhooooohhhhhooooohhhhhooooohhhhhooooohhhhhooooohhhhhooooohhhhhooooo",
873 index: 1,
874 wantPodGenerateName: "hhhhhooooohhhhhooooohhhhhooooohhhhhooooohhhhhooooohhhhh-1-",
875 },
876 "job name with index suffix exceeds MaxGeneratedNameLength": {
877 jobname: "hhhhhooooohhhhhooooohhhhhooooohhhhhooooohhhhhooooohhhhhoo",
878 index: 1,
879 wantPodGenerateName: "hhhhhooooohhhhhooooohhhhhooooohhhhhooooohhhhhooooohhhhh-1-",
880 },
881 }
882 for name, tc := range cases {
883 t.Run(name, func(t *testing.T) {
884 podGenerateName := podGenerateNameWithIndex(tc.jobname, tc.index)
885 if diff := cmp.Equal(tc.wantPodGenerateName, podGenerateName); !diff {
886 t.Errorf("Got pod generateName %s, want %s", podGenerateName, tc.wantPodGenerateName)
887 }
888 })
889 }
890 }
891
892 func TestGetIndexFailureCount(t *testing.T) {
893 logger, _ := ktesting.NewTestContext(t)
894 cases := map[string]struct {
895 pod *v1.Pod
896 wantResult int32
897 }{
898 "no annotation": {
899 pod: &v1.Pod{},
900 wantResult: 0,
901 },
902 "valid value": {
903 pod: buildPod().indexFailureCount("2").Pod,
904 wantResult: 2,
905 },
906 "valid maxint32 value": {
907 pod: buildPod().indexFailureCount(strconv.FormatInt(math.MaxInt32, 10)).Pod,
908 wantResult: math.MaxInt32,
909 },
910 "too large value": {
911 pod: buildPod().indexFailureCount(strconv.FormatInt(math.MaxInt32+1, 10)).Pod,
912 wantResult: 0,
913 },
914 "negative value": {
915 pod: buildPod().indexFailureCount("-1").Pod,
916 wantResult: 0,
917 },
918 "invalid int value": {
919 pod: buildPod().indexFailureCount("xyz").Pod,
920 wantResult: 0,
921 },
922 }
923 for name, tc := range cases {
924 t.Run(name, func(t *testing.T) {
925 gotResult := getIndexFailureCount(logger, tc.pod)
926 if diff := cmp.Equal(tc.wantResult, gotResult); !diff {
927 t.Errorf("Unexpected result. want: %d, got: %d", tc.wantResult, gotResult)
928 }
929 })
930 }
931 }
932
933 func hollowPodsWithIndexPhase(descs []indexPhase) []*v1.Pod {
934 pods := make([]*v1.Pod, 0, len(descs))
935 for _, desc := range descs {
936 p := &v1.Pod{
937 Status: v1.PodStatus{
938 Phase: desc.Phase,
939 },
940 }
941 if desc.Index != noIndex {
942 p.Annotations = map[string]string{
943 batch.JobCompletionIndexAnnotation: desc.Index,
944 }
945 }
946 pods = append(pods, p)
947 }
948 return pods
949 }
950
951 type indexPhase struct {
952 Index string
953 Phase v1.PodPhase
954 }
955
956 func toIndexPhases(pods []*v1.Pod) []indexPhase {
957 result := make([]indexPhase, len(pods))
958 for i, p := range pods {
959 index := noIndex
960 if p.Annotations != nil {
961 index = p.Annotations[batch.JobCompletionIndexAnnotation]
962 }
963 result[i] = indexPhase{index, p.Status.Phase}
964 }
965 return result
966 }
967
View as plain text