...

Source file src/k8s.io/kubernetes/pkg/controller/job/indexed_job_utils_test.go

Documentation: k8s.io/kubernetes/pkg/controller/job

     1  /*
     2  Copyright 2021 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package job
    18  
    19  import (
    20  	"math"
    21  	"strconv"
    22  	"testing"
    23  	"time"
    24  
    25  	"github.com/google/go-cmp/cmp"
    26  	batch "k8s.io/api/batch/v1"
    27  	v1 "k8s.io/api/core/v1"
    28  	"k8s.io/apimachinery/pkg/util/sets"
    29  	"k8s.io/apiserver/pkg/util/feature"
    30  	featuregatetesting "k8s.io/component-base/featuregate/testing"
    31  	"k8s.io/klog/v2/ktesting"
    32  	"k8s.io/kubernetes/pkg/controller"
    33  	"k8s.io/kubernetes/pkg/features"
    34  	"k8s.io/utils/ptr"
    35  )
    36  
    37  const noIndex = "-"
    38  
    39  func TestCalculateSucceededIndexes(t *testing.T) {
    40  	logger, _ := ktesting.NewTestContext(t)
    41  	cases := map[string]struct {
    42  		prevSucceeded       string
    43  		pods                []indexPhase
    44  		completions         int32
    45  		wantStatusIntervals orderedIntervals
    46  		wantIntervals       orderedIntervals
    47  	}{
    48  		"one index": {
    49  			pods:          []indexPhase{{"1", v1.PodSucceeded}},
    50  			completions:   2,
    51  			wantIntervals: []interval{{1, 1}},
    52  		},
    53  		"two separate": {
    54  			pods: []indexPhase{
    55  				{"2", v1.PodFailed},
    56  				{"5", v1.PodSucceeded},
    57  				{"5", v1.PodSucceeded},
    58  				{"10", v1.PodFailed},
    59  				{"10", v1.PodSucceeded},
    60  			},
    61  			completions:   11,
    62  			wantIntervals: []interval{{5, 5}, {10, 10}},
    63  		},
    64  		"two intervals": {
    65  			pods: []indexPhase{
    66  				{"0", v1.PodRunning},
    67  				{"1", v1.PodPending},
    68  				{"2", v1.PodSucceeded},
    69  				{"3", v1.PodSucceeded},
    70  				{"5", v1.PodSucceeded},
    71  				{"6", v1.PodSucceeded},
    72  				{"7", v1.PodSucceeded},
    73  			},
    74  			completions:   8,
    75  			wantIntervals: []interval{{2, 3}, {5, 7}},
    76  		},
    77  		"one index and one interval": {
    78  			pods: []indexPhase{
    79  				{"0", v1.PodSucceeded},
    80  				{"1", v1.PodFailed},
    81  				{"2", v1.PodSucceeded},
    82  				{"3", v1.PodSucceeded},
    83  				{"4", v1.PodSucceeded},
    84  				{"5", v1.PodSucceeded},
    85  				{noIndex, v1.PodSucceeded},
    86  				{"-2", v1.PodSucceeded},
    87  			},
    88  			completions:   6,
    89  			wantIntervals: []interval{{0, 0}, {2, 5}},
    90  		},
    91  		"out of range": {
    92  			pods: []indexPhase{
    93  				{"0", v1.PodSucceeded},
    94  				{"1", v1.PodSucceeded},
    95  				{"2", v1.PodSucceeded},
    96  				{"3", v1.PodFailed},
    97  				{"4", v1.PodSucceeded},
    98  				{"5", v1.PodSucceeded},
    99  				{noIndex, v1.PodSucceeded},
   100  				{"-2", v1.PodSucceeded},
   101  			},
   102  			completions:   5,
   103  			wantIntervals: []interval{{0, 2}, {4, 4}},
   104  		},
   105  		"prev interval out of range": {
   106  			prevSucceeded:       "0-5,8-10",
   107  			completions:         8,
   108  			wantStatusIntervals: []interval{{0, 5}},
   109  			wantIntervals:       []interval{{0, 5}},
   110  		},
   111  		"prev interval partially out of range": {
   112  			prevSucceeded:       "0-5,8-10",
   113  			completions:         10,
   114  			wantStatusIntervals: []interval{{0, 5}, {8, 9}},
   115  			wantIntervals:       []interval{{0, 5}, {8, 9}},
   116  		},
   117  		"prev and new separate": {
   118  			prevSucceeded: "0,4,5,10-12",
   119  			pods: []indexPhase{
   120  				{"2", v1.PodSucceeded},
   121  				{"7", v1.PodSucceeded},
   122  				{"8", v1.PodSucceeded},
   123  			},
   124  			completions: 13,
   125  			wantStatusIntervals: []interval{
   126  				{0, 0},
   127  				{4, 5},
   128  				{10, 12},
   129  			},
   130  			wantIntervals: []interval{
   131  				{0, 0},
   132  				{2, 2},
   133  				{4, 5},
   134  				{7, 8},
   135  				{10, 12},
   136  			},
   137  		},
   138  		"prev between new": {
   139  			prevSucceeded: "3,4,6",
   140  			pods: []indexPhase{
   141  				{"2", v1.PodSucceeded},
   142  				{"7", v1.PodSucceeded},
   143  				{"8", v1.PodSucceeded},
   144  			},
   145  			completions: 9,
   146  			wantStatusIntervals: []interval{
   147  				{3, 4},
   148  				{6, 6},
   149  			},
   150  			wantIntervals: []interval{
   151  				{2, 4},
   152  				{6, 8},
   153  			},
   154  		},
   155  		"new between prev": {
   156  			prevSucceeded: "2,7,8",
   157  			pods: []indexPhase{
   158  				{"3", v1.PodSucceeded},
   159  				{"4", v1.PodSucceeded},
   160  				{"6", v1.PodSucceeded},
   161  			},
   162  			completions: 9,
   163  			wantStatusIntervals: []interval{
   164  				{2, 2},
   165  				{7, 8},
   166  			},
   167  			wantIntervals: []interval{
   168  				{2, 4},
   169  				{6, 8},
   170  			},
   171  		},
   172  		"new within prev": {
   173  			prevSucceeded: "2-7",
   174  			pods: []indexPhase{
   175  				{"0", v1.PodSucceeded},
   176  				{"3", v1.PodSucceeded},
   177  				{"5", v1.PodSucceeded},
   178  				{"9", v1.PodSucceeded},
   179  			},
   180  			completions: 10,
   181  			wantStatusIntervals: []interval{
   182  				{2, 7},
   183  			},
   184  			wantIntervals: []interval{
   185  				{0, 0},
   186  				{2, 7},
   187  				{9, 9},
   188  			},
   189  		},
   190  		"corrupted interval": {
   191  			prevSucceeded: "0,1-foo,bar",
   192  			pods: []indexPhase{
   193  				{"3", v1.PodSucceeded},
   194  			},
   195  			completions: 4,
   196  			wantStatusIntervals: []interval{
   197  				{0, 0},
   198  			},
   199  			wantIntervals: []interval{
   200  				{0, 0},
   201  				{3, 3},
   202  			},
   203  		},
   204  	}
   205  	for name, tc := range cases {
   206  		t.Run(name, func(t *testing.T) {
   207  			job := &batch.Job{
   208  				Status: batch.JobStatus{
   209  					CompletedIndexes: tc.prevSucceeded,
   210  				},
   211  				Spec: batch.JobSpec{
   212  					Completions: ptr.To(tc.completions),
   213  				},
   214  			}
   215  			pods := hollowPodsWithIndexPhase(tc.pods)
   216  			for _, p := range pods {
   217  				p.Finalizers = append(p.Finalizers, batch.JobTrackingFinalizer)
   218  			}
   219  			gotStatusIntervals, gotIntervals := calculateSucceededIndexes(logger, job, pods)
   220  			if diff := cmp.Diff(tc.wantStatusIntervals, gotStatusIntervals); diff != "" {
   221  				t.Errorf("Unexpected completed indexes from status (-want,+got):\n%s", diff)
   222  			}
   223  			if diff := cmp.Diff(tc.wantIntervals, gotIntervals); diff != "" {
   224  				t.Errorf("Unexpected completed indexes (-want,+got):\n%s", diff)
   225  			}
   226  		})
   227  	}
   228  }
   229  
   230  func TestIsIndexFailed(t *testing.T) {
   231  	logger, _ := ktesting.NewTestContext(t)
   232  	cases := map[string]struct {
   233  		enableJobPodFailurePolicy bool
   234  		job                       batch.Job
   235  		pod                       *v1.Pod
   236  		wantResult                bool
   237  	}{
   238  		"failed pod exceeding backoffLimitPerIndex, when backoffLimitPerIndex=0": {
   239  			job: batch.Job{
   240  				Spec: batch.JobSpec{
   241  					Completions:          ptr.To[int32](2),
   242  					BackoffLimitPerIndex: ptr.To[int32](0),
   243  				},
   244  			},
   245  			pod:        buildPod().indexFailureCount("0").phase(v1.PodFailed).index("0").trackingFinalizer().Pod,
   246  			wantResult: true,
   247  		},
   248  		"failed pod exceeding backoffLimitPerIndex, when backoffLimitPerIndex=1": {
   249  			job: batch.Job{
   250  				Spec: batch.JobSpec{
   251  					Completions:          ptr.To[int32](2),
   252  					BackoffLimitPerIndex: ptr.To[int32](1),
   253  				},
   254  			},
   255  			pod:        buildPod().indexFailureCount("1").phase(v1.PodFailed).index("1").trackingFinalizer().Pod,
   256  			wantResult: true,
   257  		},
   258  		"matching FailIndex pod failure policy; JobPodFailurePolicy enabled": {
   259  			enableJobPodFailurePolicy: true,
   260  			job: batch.Job{
   261  				Spec: batch.JobSpec{
   262  					Completions:          ptr.To[int32](2),
   263  					BackoffLimitPerIndex: ptr.To[int32](1),
   264  					PodFailurePolicy: &batch.PodFailurePolicy{
   265  						Rules: []batch.PodFailurePolicyRule{
   266  							{
   267  								Action: batch.PodFailurePolicyActionFailIndex,
   268  								OnExitCodes: &batch.PodFailurePolicyOnExitCodesRequirement{
   269  									Operator: batch.PodFailurePolicyOnExitCodesOpIn,
   270  									Values:   []int32{3},
   271  								},
   272  							},
   273  						},
   274  					},
   275  				},
   276  			},
   277  			pod: buildPod().indexFailureCount("0").status(v1.PodStatus{
   278  				Phase: v1.PodFailed,
   279  				ContainerStatuses: []v1.ContainerStatus{
   280  					{
   281  						State: v1.ContainerState{
   282  							Terminated: &v1.ContainerStateTerminated{
   283  								ExitCode: 3,
   284  							},
   285  						},
   286  					},
   287  				},
   288  			}).index("0").trackingFinalizer().Pod,
   289  			wantResult: true,
   290  		},
   291  		"matching FailIndex pod failure policy; JobPodFailurePolicy disabled": {
   292  			enableJobPodFailurePolicy: false,
   293  			job: batch.Job{
   294  				Spec: batch.JobSpec{
   295  					Completions:          ptr.To[int32](2),
   296  					BackoffLimitPerIndex: ptr.To[int32](1),
   297  					PodFailurePolicy: &batch.PodFailurePolicy{
   298  						Rules: []batch.PodFailurePolicyRule{
   299  							{
   300  								Action: batch.PodFailurePolicyActionFailIndex,
   301  								OnExitCodes: &batch.PodFailurePolicyOnExitCodesRequirement{
   302  									Operator: batch.PodFailurePolicyOnExitCodesOpIn,
   303  									Values:   []int32{3},
   304  								},
   305  							},
   306  						},
   307  					},
   308  				},
   309  			},
   310  			pod: buildPod().indexFailureCount("0").status(v1.PodStatus{
   311  				Phase: v1.PodFailed,
   312  				ContainerStatuses: []v1.ContainerStatus{
   313  					{
   314  						State: v1.ContainerState{
   315  							Terminated: &v1.ContainerStateTerminated{
   316  								ExitCode: 3,
   317  							},
   318  						},
   319  					},
   320  				},
   321  			}).index("0").trackingFinalizer().Pod,
   322  			wantResult: false,
   323  		},
   324  	}
   325  	for name, tc := range cases {
   326  		t.Run(name, func(t *testing.T) {
   327  			defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, true)()
   328  			defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, tc.enableJobPodFailurePolicy)()
   329  			gotResult := isIndexFailed(logger, &tc.job, tc.pod)
   330  			if diff := cmp.Diff(tc.wantResult, gotResult); diff != "" {
   331  				t.Errorf("Unexpected result (-want,+got):\n%s", diff)
   332  			}
   333  		})
   334  	}
   335  }
   336  
   337  func TestCalculateFailedIndexes(t *testing.T) {
   338  	logger, _ := ktesting.NewTestContext(t)
   339  	cases := map[string]struct {
   340  		enableJobPodFailurePolicy bool
   341  		job                       batch.Job
   342  		pods                      []*v1.Pod
   343  		wantPrevFailedIndexes     orderedIntervals
   344  		wantFailedIndexes         orderedIntervals
   345  	}{
   346  		"one new index failed": {
   347  			job: batch.Job{
   348  				Spec: batch.JobSpec{
   349  					Completions:          ptr.To[int32](2),
   350  					BackoffLimitPerIndex: ptr.To[int32](1),
   351  				},
   352  			},
   353  			pods: []*v1.Pod{
   354  				buildPod().indexFailureCount("0").phase(v1.PodFailed).index("0").trackingFinalizer().Pod,
   355  				buildPod().indexFailureCount("1").phase(v1.PodFailed).index("1").trackingFinalizer().Pod,
   356  			},
   357  			wantFailedIndexes: []interval{{1, 1}},
   358  		},
   359  		"pod without finalizer is ignored": {
   360  			job: batch.Job{
   361  				Spec: batch.JobSpec{
   362  					Completions:          ptr.To[int32](2),
   363  					BackoffLimitPerIndex: ptr.To[int32](0),
   364  				},
   365  			},
   366  			pods: []*v1.Pod{
   367  				buildPod().indexFailureCount("0").phase(v1.PodFailed).index("0").Pod,
   368  			},
   369  			wantFailedIndexes: nil,
   370  		},
   371  		"pod outside completions is ignored": {
   372  			job: batch.Job{
   373  				Spec: batch.JobSpec{
   374  					Completions:          ptr.To[int32](2),
   375  					BackoffLimitPerIndex: ptr.To[int32](0),
   376  				},
   377  			},
   378  			pods: []*v1.Pod{
   379  				buildPod().indexFailureCount("0").phase(v1.PodFailed).index("3").Pod,
   380  			},
   381  			wantFailedIndexes: nil,
   382  		},
   383  		"extend the failed indexes": {
   384  			job: batch.Job{
   385  				Status: batch.JobStatus{
   386  					FailedIndexes: ptr.To("0"),
   387  				},
   388  				Spec: batch.JobSpec{
   389  					Completions:          ptr.To[int32](2),
   390  					BackoffLimitPerIndex: ptr.To[int32](0),
   391  				},
   392  			},
   393  			pods: []*v1.Pod{
   394  				buildPod().indexFailureCount("0").phase(v1.PodFailed).index("1").trackingFinalizer().Pod,
   395  			},
   396  			wantFailedIndexes: []interval{{0, 1}},
   397  		},
   398  		"prev failed indexes empty": {
   399  			job: batch.Job{
   400  				Status: batch.JobStatus{
   401  					FailedIndexes: ptr.To(""),
   402  				},
   403  				Spec: batch.JobSpec{
   404  					Completions:          ptr.To[int32](2),
   405  					BackoffLimitPerIndex: ptr.To[int32](0),
   406  				},
   407  			},
   408  			pods: []*v1.Pod{
   409  				buildPod().indexFailureCount("0").phase(v1.PodFailed).index("1").trackingFinalizer().Pod,
   410  			},
   411  			wantFailedIndexes: []interval{{1, 1}},
   412  		},
   413  		"prev failed indexes outside the completions": {
   414  			job: batch.Job{
   415  				Status: batch.JobStatus{
   416  					FailedIndexes: ptr.To("9"),
   417  				},
   418  				Spec: batch.JobSpec{
   419  					Completions:          ptr.To[int32](2),
   420  					BackoffLimitPerIndex: ptr.To[int32](0),
   421  				},
   422  			},
   423  			pods: []*v1.Pod{
   424  				buildPod().indexFailureCount("0").phase(v1.PodFailed).index("1").trackingFinalizer().Pod,
   425  			},
   426  			wantFailedIndexes: []interval{{1, 1}},
   427  		},
   428  	}
   429  	for name, tc := range cases {
   430  		t.Run(name, func(t *testing.T) {
   431  			failedIndexes := calculateFailedIndexes(logger, &tc.job, tc.pods)
   432  			if diff := cmp.Diff(&tc.wantFailedIndexes, failedIndexes); diff != "" {
   433  				t.Errorf("Unexpected failed indexes (-want,+got):\n%s", diff)
   434  			}
   435  		})
   436  	}
   437  }
   438  
   439  func TestGetPodsWithDelayedDeletionPerIndex(t *testing.T) {
   440  	logger, _ := ktesting.NewTestContext(t)
   441  	now := time.Now()
   442  	cases := map[string]struct {
   443  		enableJobPodFailurePolicy           bool
   444  		job                                 batch.Job
   445  		pods                                []*v1.Pod
   446  		expectedRmFinalizers                sets.Set[string]
   447  		wantPodsWithDelayedDeletionPerIndex []string
   448  	}{
   449  		"failed pods are kept corresponding to non-failed indexes are kept": {
   450  			job: batch.Job{
   451  				Spec: batch.JobSpec{
   452  					Completions:          ptr.To[int32](3),
   453  					BackoffLimitPerIndex: ptr.To[int32](1),
   454  				},
   455  			},
   456  			pods: []*v1.Pod{
   457  				buildPod().uid("a").indexFailureCount("0").phase(v1.PodFailed).index("0").trackingFinalizer().Pod,
   458  				buildPod().uid("b").indexFailureCount("1").phase(v1.PodFailed).index("1").trackingFinalizer().Pod,
   459  				buildPod().uid("c").indexFailureCount("0").phase(v1.PodFailed).index("2").trackingFinalizer().Pod,
   460  			},
   461  			wantPodsWithDelayedDeletionPerIndex: []string{"a", "c"},
   462  		},
   463  		"failed pod without finalizer; the pod's deletion is not delayed as it already started": {
   464  			job: batch.Job{
   465  				Spec: batch.JobSpec{
   466  					Completions:          ptr.To[int32](2),
   467  					BackoffLimitPerIndex: ptr.To[int32](0),
   468  				},
   469  			},
   470  			pods: []*v1.Pod{
   471  				buildPod().uid("a").indexFailureCount("0").phase(v1.PodFailed).index("0").Pod,
   472  			},
   473  			wantPodsWithDelayedDeletionPerIndex: []string{},
   474  		},
   475  		"failed pod with expected finalizer removal; the pod's deletion is not delayed as it already started": {
   476  			job: batch.Job{
   477  				Spec: batch.JobSpec{
   478  					Completions:          ptr.To[int32](2),
   479  					BackoffLimitPerIndex: ptr.To[int32](0),
   480  				},
   481  			},
   482  			pods: []*v1.Pod{
   483  				buildPod().uid("a").indexFailureCount("0").phase(v1.PodFailed).index("0").trackingFinalizer().Pod,
   484  			},
   485  			expectedRmFinalizers:                sets.New("a"),
   486  			wantPodsWithDelayedDeletionPerIndex: []string{},
   487  		},
   488  		"failed pod with index outside of completions; the pod's deletion is not delayed": {
   489  			job: batch.Job{
   490  				Spec: batch.JobSpec{
   491  					Completions:          ptr.To[int32](2),
   492  					BackoffLimitPerIndex: ptr.To[int32](0),
   493  				},
   494  			},
   495  			pods: []*v1.Pod{
   496  				buildPod().uid("a").indexFailureCount("0").phase(v1.PodFailed).index("4").trackingFinalizer().Pod,
   497  			},
   498  			wantPodsWithDelayedDeletionPerIndex: []string{},
   499  		},
   500  		"failed pod for active index; the pod's deletion is not delayed as it is already replaced": {
   501  			job: batch.Job{
   502  				Spec: batch.JobSpec{
   503  					Completions:          ptr.To[int32](2),
   504  					BackoffLimitPerIndex: ptr.To[int32](1),
   505  				},
   506  			},
   507  			pods: []*v1.Pod{
   508  				buildPod().uid("a1").indexFailureCount("0").phase(v1.PodFailed).index("0").trackingFinalizer().Pod,
   509  				buildPod().uid("a2").indexFailureCount("1").phase(v1.PodRunning).index("0").trackingFinalizer().Pod,
   510  			},
   511  			wantPodsWithDelayedDeletionPerIndex: []string{},
   512  		},
   513  		"failed pod for succeeded index; the pod's deletion is not delayed as it is already replaced": {
   514  			job: batch.Job{
   515  				Spec: batch.JobSpec{
   516  					Completions:          ptr.To[int32](2),
   517  					BackoffLimitPerIndex: ptr.To[int32](1),
   518  				},
   519  			},
   520  			pods: []*v1.Pod{
   521  				buildPod().uid("a1").indexFailureCount("0").phase(v1.PodFailed).index("0").trackingFinalizer().Pod,
   522  				buildPod().uid("a2").indexFailureCount("1").phase(v1.PodSucceeded).index("0").trackingFinalizer().Pod,
   523  			},
   524  			wantPodsWithDelayedDeletionPerIndex: []string{},
   525  		},
   526  		"multiple failed pods for index with different failure count; only the pod with highest failure count is kept": {
   527  			job: batch.Job{
   528  				Spec: batch.JobSpec{
   529  					Completions:          ptr.To[int32](2),
   530  					BackoffLimitPerIndex: ptr.To[int32](4),
   531  				},
   532  			},
   533  			pods: []*v1.Pod{
   534  				buildPod().uid("a1").indexFailureCount("0").phase(v1.PodFailed).index("0").trackingFinalizer().Pod,
   535  				buildPod().uid("a3").indexFailureCount("2").phase(v1.PodFailed).index("0").trackingFinalizer().Pod,
   536  				buildPod().uid("a2").indexFailureCount("1").phase(v1.PodFailed).index("0").trackingFinalizer().Pod,
   537  			},
   538  			wantPodsWithDelayedDeletionPerIndex: []string{"a3"},
   539  		},
   540  		"multiple failed pods for index with different finish times; only the last failed pod is kept": {
   541  			job: batch.Job{
   542  				Spec: batch.JobSpec{
   543  					Completions:          ptr.To[int32](2),
   544  					BackoffLimitPerIndex: ptr.To[int32](4),
   545  				},
   546  			},
   547  			pods: []*v1.Pod{
   548  				buildPod().uid("a1").indexFailureCount("1").phase(v1.PodFailed).index("0").customDeletionTimestamp(now.Add(-time.Second)).trackingFinalizer().Pod,
   549  				buildPod().uid("a3").indexFailureCount("1").phase(v1.PodFailed).index("0").customDeletionTimestamp(now).trackingFinalizer().Pod,
   550  				buildPod().uid("a2").indexFailureCount("1").phase(v1.PodFailed).index("0").customDeletionTimestamp(now.Add(-2 * time.Second)).trackingFinalizer().Pod,
   551  			},
   552  			wantPodsWithDelayedDeletionPerIndex: []string{"a3"},
   553  		},
   554  	}
   555  	for name, tc := range cases {
   556  		t.Run(name, func(t *testing.T) {
   557  			defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, true)()
   558  			activePods := controller.FilterActivePods(logger, tc.pods)
   559  			failedIndexes := calculateFailedIndexes(logger, &tc.job, tc.pods)
   560  			_, succeededIndexes := calculateSucceededIndexes(logger, &tc.job, tc.pods)
   561  			jobCtx := &syncJobCtx{
   562  				job:                  &tc.job,
   563  				pods:                 tc.pods,
   564  				activePods:           activePods,
   565  				succeededIndexes:     succeededIndexes,
   566  				failedIndexes:        failedIndexes,
   567  				expectedRmFinalizers: tc.expectedRmFinalizers,
   568  			}
   569  			gotPodsWithDelayedDeletionPerIndex := getPodsWithDelayedDeletionPerIndex(logger, jobCtx)
   570  			gotPodsWithDelayedDeletionPerIndexSet := sets.New[string]()
   571  			for _, pod := range gotPodsWithDelayedDeletionPerIndex {
   572  				gotPodsWithDelayedDeletionPerIndexSet.Insert(string(pod.UID))
   573  			}
   574  			if diff := cmp.Diff(tc.wantPodsWithDelayedDeletionPerIndex, sets.List(gotPodsWithDelayedDeletionPerIndexSet)); diff != "" {
   575  				t.Errorf("Unexpected set of pods with delayed deletion (-want,+got):\n%s", diff)
   576  			}
   577  		})
   578  	}
   579  }
   580  
   581  func TestGetNewIndexFailureCountValue(t *testing.T) {
   582  	logger, _ := ktesting.NewTestContext(t)
   583  	cases := map[string]struct {
   584  		enableJobPodFailurePolicy       bool
   585  		job                             batch.Job
   586  		pod                             *v1.Pod
   587  		wantNewIndexFailureCount        int32
   588  		wantNewIndexIgnoredFailureCount int32
   589  	}{
   590  		"first pod created": {
   591  			job:                      batch.Job{},
   592  			wantNewIndexFailureCount: 0,
   593  		},
   594  		"failed pod being replaced with 0 index failure count": {
   595  			job:                      batch.Job{},
   596  			pod:                      buildPod().uid("a").indexFailureCount("0").phase(v1.PodFailed).index("0").trackingFinalizer().Pod,
   597  			wantNewIndexFailureCount: 1,
   598  		},
   599  		"failed pod being replaced with >0 index failure count": {
   600  			job:                      batch.Job{},
   601  			pod:                      buildPod().uid("a").indexFailureCount("3").phase(v1.PodFailed).index("0").trackingFinalizer().Pod,
   602  			wantNewIndexFailureCount: 4,
   603  		},
   604  		"failed pod being replaced, matching the ignore rule; JobPodFailurePolicy enabled": {
   605  			enableJobPodFailurePolicy: true,
   606  			job: batch.Job{
   607  				Spec: batch.JobSpec{
   608  					PodFailurePolicy: &batch.PodFailurePolicy{
   609  						Rules: []batch.PodFailurePolicyRule{
   610  							{
   611  								Action: batch.PodFailurePolicyActionIgnore,
   612  								OnPodConditions: []batch.PodFailurePolicyOnPodConditionsPattern{
   613  									{
   614  										Type:   v1.DisruptionTarget,
   615  										Status: v1.ConditionTrue,
   616  									},
   617  								},
   618  							},
   619  						},
   620  					},
   621  				},
   622  			},
   623  			pod: buildPod().uid("a").indexFailureCount("3").status(v1.PodStatus{
   624  				Phase: v1.PodFailed,
   625  				Conditions: []v1.PodCondition{
   626  					{
   627  						Type:   v1.DisruptionTarget,
   628  						Status: v1.ConditionTrue,
   629  					},
   630  				},
   631  			}).index("3").trackingFinalizer().Pod,
   632  			wantNewIndexFailureCount:        3,
   633  			wantNewIndexIgnoredFailureCount: 1,
   634  		},
   635  	}
   636  	for name, tc := range cases {
   637  		t.Run(name, func(t *testing.T) {
   638  			defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, true)()
   639  			defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, tc.enableJobPodFailurePolicy)()
   640  			gotNewIndexFailureCount, gotNewIndexIgnoredFailureCount := getNewIndexFailureCounts(logger, &tc.job, tc.pod)
   641  			if diff := cmp.Diff(tc.wantNewIndexFailureCount, gotNewIndexFailureCount); diff != "" {
   642  				t.Errorf("Unexpected set of pods with delayed deletion (-want,+got):\n%s", diff)
   643  			}
   644  			if diff := cmp.Diff(tc.wantNewIndexIgnoredFailureCount, gotNewIndexIgnoredFailureCount); diff != "" {
   645  				t.Errorf("Unexpected set of pods with delayed deletion (-want,+got):\n%s", diff)
   646  			}
   647  		})
   648  	}
   649  }
   650  
   651  func TestIntervalsHaveIndex(t *testing.T) {
   652  	cases := map[string]struct {
   653  		intervals orderedIntervals
   654  		index     int
   655  		wantHas   bool
   656  	}{
   657  		"empty": {
   658  			index: 4,
   659  		},
   660  		"before all": {
   661  			index:     1,
   662  			intervals: []interval{{2, 4}, {5, 7}},
   663  		},
   664  		"after all": {
   665  			index:     9,
   666  			intervals: []interval{{2, 4}, {6, 8}},
   667  		},
   668  		"in between": {
   669  			index:     5,
   670  			intervals: []interval{{2, 4}, {6, 8}},
   671  		},
   672  		"in first": {
   673  			index:     2,
   674  			intervals: []interval{{2, 4}, {6, 8}},
   675  			wantHas:   true,
   676  		},
   677  		"in second": {
   678  			index:     8,
   679  			intervals: []interval{{2, 4}, {6, 8}},
   680  			wantHas:   true,
   681  		},
   682  	}
   683  	for name, tc := range cases {
   684  		t.Run(name, func(t *testing.T) {
   685  			has := tc.intervals.has(tc.index)
   686  			if has != tc.wantHas {
   687  				t.Errorf("intervalsHaveIndex(_, _) = %t, want %t", has, tc.wantHas)
   688  			}
   689  		})
   690  	}
   691  }
   692  
   693  func TestFirstPendingIndexes(t *testing.T) {
   694  	cases := map[string]struct {
   695  		cnt              int
   696  		completions      int
   697  		activePods       []indexPhase
   698  		succeededIndexes []interval
   699  		failedIndexes    *orderedIntervals
   700  		want             []int
   701  	}{
   702  		"cnt greater than completions": {
   703  			cnt:         5,
   704  			completions: 3,
   705  			want:        []int{0, 1, 2},
   706  		},
   707  		"cnt less than completions": {
   708  			cnt:         2,
   709  			completions: 5,
   710  			want:        []int{0, 1},
   711  		},
   712  		"first pods active": {
   713  			activePods: []indexPhase{
   714  				{"0", v1.PodRunning},
   715  				{"1", v1.PodPending},
   716  			},
   717  			cnt:         3,
   718  			completions: 10,
   719  			want:        []int{2, 3, 4},
   720  		},
   721  		"last pods active or succeeded": {
   722  			activePods: []indexPhase{
   723  				{"6", v1.PodPending},
   724  			},
   725  			succeededIndexes: []interval{{4, 5}},
   726  			cnt:              6,
   727  			completions:      6,
   728  			want:             []int{0, 1, 2, 3},
   729  		},
   730  		"mixed": {
   731  			activePods: []indexPhase{
   732  				{"3", v1.PodPending},
   733  				{"5", v1.PodRunning},
   734  				{"8", v1.PodPending},
   735  				{noIndex, v1.PodRunning},
   736  				{"-3", v1.PodRunning},
   737  			},
   738  			succeededIndexes: []interval{{2, 4}, {9, 9}},
   739  			cnt:              5,
   740  			completions:      20,
   741  			want:             []int{0, 1, 6, 7, 10},
   742  		},
   743  		"with failed indexes": {
   744  			activePods: []indexPhase{
   745  				{"3", v1.PodPending},
   746  				{"9", v1.PodPending},
   747  			},
   748  			succeededIndexes: []interval{{1, 1}, {5, 5}, {9, 9}},
   749  			failedIndexes:    &orderedIntervals{{2, 2}, {6, 7}},
   750  			cnt:              5,
   751  			completions:      20,
   752  			want:             []int{0, 4, 8, 10, 11},
   753  		},
   754  	}
   755  	for name, tc := range cases {
   756  		t.Run(name, func(t *testing.T) {
   757  			jobCtx := &syncJobCtx{
   758  				activePods:       hollowPodsWithIndexPhase(tc.activePods),
   759  				succeededIndexes: tc.succeededIndexes,
   760  				failedIndexes:    tc.failedIndexes,
   761  				job:              newJob(1, 1, 1, batch.IndexedCompletion),
   762  			}
   763  			got := firstPendingIndexes(jobCtx, tc.cnt, tc.completions)
   764  			if diff := cmp.Diff(tc.want, got); diff != "" {
   765  				t.Errorf("Wrong first pending indexes (-want,+got):\n%s", diff)
   766  			}
   767  		})
   768  	}
   769  }
   770  
   771  func TestAppendDuplicatedIndexPodsForRemoval(t *testing.T) {
   772  	cases := map[string]struct {
   773  		pods        []indexPhase
   774  		wantRm      []indexPhase
   775  		wantLeft    []indexPhase
   776  		completions int32
   777  	}{
   778  		"all unique": {
   779  			pods: []indexPhase{
   780  				{noIndex, v1.PodPending},
   781  				{"2", v1.PodPending},
   782  				{"5", v1.PodRunning},
   783  				{"6", v1.PodRunning},
   784  			},
   785  			wantRm: []indexPhase{
   786  				{noIndex, v1.PodPending},
   787  				{"6", v1.PodRunning},
   788  			},
   789  			wantLeft: []indexPhase{
   790  				{"2", v1.PodPending},
   791  				{"5", v1.PodRunning},
   792  			},
   793  			completions: 6,
   794  		},
   795  		"all with index": {
   796  			pods: []indexPhase{
   797  				{"5", v1.PodPending},
   798  				{"0", v1.PodRunning},
   799  				{"3", v1.PodPending},
   800  				{"0", v1.PodRunning},
   801  				{"3", v1.PodRunning},
   802  				{"0", v1.PodPending},
   803  				{"6", v1.PodRunning},
   804  				{"6", v1.PodPending},
   805  			},
   806  			wantRm: []indexPhase{
   807  				{"0", v1.PodPending},
   808  				{"0", v1.PodRunning},
   809  				{"3", v1.PodPending},
   810  				{"6", v1.PodRunning},
   811  				{"6", v1.PodPending},
   812  			},
   813  			wantLeft: []indexPhase{
   814  				{"0", v1.PodRunning},
   815  				{"3", v1.PodRunning},
   816  				{"5", v1.PodPending},
   817  			},
   818  			completions: 6,
   819  		},
   820  		"mixed": {
   821  			pods: []indexPhase{
   822  				{noIndex, v1.PodPending},
   823  				{"invalid", v1.PodRunning},
   824  				{"-2", v1.PodRunning},
   825  				{"0", v1.PodPending},
   826  				{"1", v1.PodPending},
   827  				{"1", v1.PodPending},
   828  				{"1", v1.PodRunning},
   829  			},
   830  			wantRm: []indexPhase{
   831  				{noIndex, v1.PodPending},
   832  				{"invalid", v1.PodRunning},
   833  				{"-2", v1.PodRunning},
   834  				{"1", v1.PodPending},
   835  				{"1", v1.PodPending},
   836  			},
   837  			wantLeft: []indexPhase{
   838  				{"0", v1.PodPending},
   839  				{"1", v1.PodRunning},
   840  			},
   841  			completions: 6,
   842  		},
   843  	}
   844  	for name, tc := range cases {
   845  		t.Run(name, func(t *testing.T) {
   846  			pods := hollowPodsWithIndexPhase(tc.pods)
   847  			rm, left := appendDuplicatedIndexPodsForRemoval(nil, nil, pods, int(tc.completions))
   848  			rmInt := toIndexPhases(rm)
   849  			leftInt := toIndexPhases(left)
   850  			if diff := cmp.Diff(tc.wantRm, rmInt); diff != "" {
   851  				t.Errorf("Unexpected pods for removal (-want,+got):\n%s", diff)
   852  			}
   853  			if diff := cmp.Diff(tc.wantLeft, leftInt); diff != "" {
   854  				t.Errorf("Unexpected pods to keep (-want,+got):\n%s", diff)
   855  			}
   856  		})
   857  	}
   858  }
   859  
   860  func TestPodGenerateNameWithIndex(t *testing.T) {
   861  	cases := map[string]struct {
   862  		jobname             string
   863  		index               int
   864  		wantPodGenerateName string
   865  	}{
   866  		"short job name": {
   867  			jobname:             "indexed-job",
   868  			index:               1,
   869  			wantPodGenerateName: "indexed-job-1-",
   870  		},
   871  		"job name exceeds MaxGeneneratedNameLength": {
   872  			jobname:             "hhhhhooooohhhhhooooohhhhhooooohhhhhooooohhhhhooooohhhhhooooohhhhhooooo",
   873  			index:               1,
   874  			wantPodGenerateName: "hhhhhooooohhhhhooooohhhhhooooohhhhhooooohhhhhooooohhhhh-1-",
   875  		},
   876  		"job name with index suffix exceeds MaxGeneratedNameLength": {
   877  			jobname:             "hhhhhooooohhhhhooooohhhhhooooohhhhhooooohhhhhooooohhhhhoo",
   878  			index:               1,
   879  			wantPodGenerateName: "hhhhhooooohhhhhooooohhhhhooooohhhhhooooohhhhhooooohhhhh-1-",
   880  		},
   881  	}
   882  	for name, tc := range cases {
   883  		t.Run(name, func(t *testing.T) {
   884  			podGenerateName := podGenerateNameWithIndex(tc.jobname, tc.index)
   885  			if diff := cmp.Equal(tc.wantPodGenerateName, podGenerateName); !diff {
   886  				t.Errorf("Got pod generateName %s, want %s", podGenerateName, tc.wantPodGenerateName)
   887  			}
   888  		})
   889  	}
   890  }
   891  
   892  func TestGetIndexFailureCount(t *testing.T) {
   893  	logger, _ := ktesting.NewTestContext(t)
   894  	cases := map[string]struct {
   895  		pod        *v1.Pod
   896  		wantResult int32
   897  	}{
   898  		"no annotation": {
   899  			pod:        &v1.Pod{},
   900  			wantResult: 0,
   901  		},
   902  		"valid value": {
   903  			pod:        buildPod().indexFailureCount("2").Pod,
   904  			wantResult: 2,
   905  		},
   906  		"valid maxint32 value": {
   907  			pod:        buildPod().indexFailureCount(strconv.FormatInt(math.MaxInt32, 10)).Pod,
   908  			wantResult: math.MaxInt32,
   909  		},
   910  		"too large value": {
   911  			pod:        buildPod().indexFailureCount(strconv.FormatInt(math.MaxInt32+1, 10)).Pod,
   912  			wantResult: 0,
   913  		},
   914  		"negative value": {
   915  			pod:        buildPod().indexFailureCount("-1").Pod,
   916  			wantResult: 0,
   917  		},
   918  		"invalid int value": {
   919  			pod:        buildPod().indexFailureCount("xyz").Pod,
   920  			wantResult: 0,
   921  		},
   922  	}
   923  	for name, tc := range cases {
   924  		t.Run(name, func(t *testing.T) {
   925  			gotResult := getIndexFailureCount(logger, tc.pod)
   926  			if diff := cmp.Equal(tc.wantResult, gotResult); !diff {
   927  				t.Errorf("Unexpected result. want: %d, got: %d", tc.wantResult, gotResult)
   928  			}
   929  		})
   930  	}
   931  }
   932  
   933  func hollowPodsWithIndexPhase(descs []indexPhase) []*v1.Pod {
   934  	pods := make([]*v1.Pod, 0, len(descs))
   935  	for _, desc := range descs {
   936  		p := &v1.Pod{
   937  			Status: v1.PodStatus{
   938  				Phase: desc.Phase,
   939  			},
   940  		}
   941  		if desc.Index != noIndex {
   942  			p.Annotations = map[string]string{
   943  				batch.JobCompletionIndexAnnotation: desc.Index,
   944  			}
   945  		}
   946  		pods = append(pods, p)
   947  	}
   948  	return pods
   949  }
   950  
   951  type indexPhase struct {
   952  	Index string
   953  	Phase v1.PodPhase
   954  }
   955  
   956  func toIndexPhases(pods []*v1.Pod) []indexPhase {
   957  	result := make([]indexPhase, len(pods))
   958  	for i, p := range pods {
   959  		index := noIndex
   960  		if p.Annotations != nil {
   961  			index = p.Annotations[batch.JobCompletionIndexAnnotation]
   962  		}
   963  		result[i] = indexPhase{index, p.Status.Phase}
   964  	}
   965  	return result
   966  }
   967  

View as plain text