...

Source file src/k8s.io/kubernetes/test/integration/job/job_test.go

Documentation: k8s.io/kubernetes/test/integration/job

     1  /*
     2  Copyright 2021 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package job
    18  
    19  import (
    20  	"context"
    21  	"errors"
    22  	"fmt"
    23  	"sort"
    24  	"strconv"
    25  	"strings"
    26  	"sync"
    27  	"sync/atomic"
    28  	"testing"
    29  	"time"
    30  
    31  	"github.com/google/go-cmp/cmp"
    32  	batchv1 "k8s.io/api/batch/v1"
    33  	v1 "k8s.io/api/core/v1"
    34  	eventsv1 "k8s.io/api/events/v1"
    35  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    36  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    37  	"k8s.io/apimachinery/pkg/runtime/schema"
    38  	"k8s.io/apimachinery/pkg/types"
    39  	"k8s.io/apimachinery/pkg/util/sets"
    40  	"k8s.io/apimachinery/pkg/util/validation/field"
    41  	"k8s.io/apimachinery/pkg/util/wait"
    42  	"k8s.io/apimachinery/pkg/watch"
    43  	"k8s.io/apiserver/pkg/util/feature"
    44  	"k8s.io/client-go/informers"
    45  	clientset "k8s.io/client-go/kubernetes"
    46  	typedv1 "k8s.io/client-go/kubernetes/typed/batch/v1"
    47  	restclient "k8s.io/client-go/rest"
    48  	"k8s.io/client-go/tools/record"
    49  	"k8s.io/client-go/util/retry"
    50  	featuregatetesting "k8s.io/component-base/featuregate/testing"
    51  	basemetrics "k8s.io/component-base/metrics"
    52  	"k8s.io/component-base/metrics/testutil"
    53  	"k8s.io/klog/v2"
    54  	kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
    55  	podutil "k8s.io/kubernetes/pkg/api/v1/pod"
    56  	"k8s.io/kubernetes/pkg/controller"
    57  	jobcontroller "k8s.io/kubernetes/pkg/controller/job"
    58  	"k8s.io/kubernetes/pkg/controller/job/metrics"
    59  	"k8s.io/kubernetes/pkg/features"
    60  	"k8s.io/kubernetes/test/integration/framework"
    61  	"k8s.io/kubernetes/test/integration/util"
    62  	"k8s.io/utils/ptr"
    63  )
    64  
    65  const waitInterval = time.Second
    66  const fastPodFailureBackoff = 100 * time.Millisecond
    67  
    68  // Time duration used to account for controller latency in tests in which it is
    69  // expected the Job controller does not make a change. In that cases we wait a
    70  // little bit (more than the typical time for a couple of controller syncs) and
    71  // verify there is no change.
    72  const sleepDurationForControllerLatency = 100 * time.Millisecond
    73  
    74  type metricLabelsWithValue struct {
    75  	Labels []string
    76  	Value  int
    77  }
    78  
    79  func validateCounterMetric(ctx context.Context, t *testing.T, counterVec *basemetrics.CounterVec, wantMetric metricLabelsWithValue) {
    80  	t.Helper()
    81  	var cmpErr error
    82  	err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, 10*time.Second, true, func(ctx context.Context) (bool, error) {
    83  		cmpErr = nil
    84  		value, err := testutil.GetCounterMetricValue(counterVec.WithLabelValues(wantMetric.Labels...))
    85  		if err != nil {
    86  			return true, fmt.Errorf("collecting the %q metric: %q", counterVec.Name, err)
    87  		}
    88  		if wantMetric.Value != int(value) {
    89  			cmpErr = fmt.Errorf("Unexpected metric delta for %q metric with labels %q. want: %v, got: %v", counterVec.Name, wantMetric.Labels, wantMetric.Value, int(value))
    90  			return false, nil
    91  		}
    92  		return true, nil
    93  	})
    94  	if err != nil {
    95  		t.Errorf("Failed waiting for expected metric: %q", err)
    96  	}
    97  	if cmpErr != nil {
    98  		t.Error(cmpErr)
    99  	}
   100  }
   101  
   102  func validateTerminatedPodsTrackingFinalizerMetric(ctx context.Context, t *testing.T, want int) {
   103  	validateCounterMetric(ctx, t, metrics.TerminatedPodsTrackingFinalizerTotal, metricLabelsWithValue{
   104  		Value:  want,
   105  		Labels: []string{metrics.Add},
   106  	})
   107  	validateCounterMetric(ctx, t, metrics.TerminatedPodsTrackingFinalizerTotal, metricLabelsWithValue{
   108  		Value:  want,
   109  		Labels: []string{metrics.Delete},
   110  	})
   111  }
   112  
   113  // TestJobPodFailurePolicyWithFailedPodDeletedDuringControllerRestart verifies that the job is properly marked as Failed
   114  // in a scenario when the job controller crashes between removing pod finalizers and marking the job as Failed (based on
   115  // the pod failure policy). After the finalizer for the failed pod is removed we remove the failed pod. This step is
   116  // done to simulate what PodGC would do. Then, the test spawns the second instance of the controller to check that it
   117  // will pick up the job state properly and will mark it as Failed, even if th pod triggering the pod failure policy is
   118  // already deleted.
   119  // Note: this scenario requires the use of finalizers. Without finalizers there is no guarantee a failed pod would be
   120  // checked against the pod failure policy rules before its removal by PodGC.
   121  func TestJobPodFailurePolicyWithFailedPodDeletedDuringControllerRestart(t *testing.T) {
   122  	count := 3
   123  	job := batchv1.Job{
   124  		Spec: batchv1.JobSpec{
   125  			Template: v1.PodTemplateSpec{
   126  				Spec: v1.PodSpec{
   127  					Containers: []v1.Container{
   128  						{
   129  							Name:                     "main-container",
   130  							Image:                    "foo",
   131  							ImagePullPolicy:          v1.PullIfNotPresent,
   132  							TerminationMessagePolicy: v1.TerminationMessageFallbackToLogsOnError,
   133  						},
   134  					},
   135  				},
   136  			},
   137  			Parallelism: ptr.To(int32(count)),
   138  			Completions: ptr.To(int32(count)),
   139  			PodFailurePolicy: &batchv1.PodFailurePolicy{
   140  				Rules: []batchv1.PodFailurePolicyRule{
   141  					{
   142  						Action: batchv1.PodFailurePolicyActionFailJob,
   143  						OnExitCodes: &batchv1.PodFailurePolicyOnExitCodesRequirement{
   144  							Operator: batchv1.PodFailurePolicyOnExitCodesOpIn,
   145  							Values:   []int32{5},
   146  						},
   147  					},
   148  				},
   149  			},
   150  		},
   151  	}
   152  	podStatusMatchingOnExitCodesTerminateRule := v1.PodStatus{
   153  		Phase: v1.PodFailed,
   154  		ContainerStatuses: []v1.ContainerStatus{
   155  			{
   156  				Name: "main-container",
   157  				State: v1.ContainerState{
   158  					Terminated: &v1.ContainerStateTerminated{
   159  						ExitCode: 5,
   160  					},
   161  				},
   162  			},
   163  		},
   164  	}
   165  	defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, true)()
   166  	closeFn, restConfig, cs, ns := setup(t, "simple")
   167  	defer closeFn()
   168  
   169  	// Make the job controller significantly slower to trigger race condition.
   170  	restConfig.QPS = 1
   171  	restConfig.Burst = 1
   172  	ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
   173  	defer func() {
   174  		cancel()
   175  	}()
   176  	resetMetrics()
   177  	restConfig.QPS = 200
   178  	restConfig.Burst = 200
   179  
   180  	// create a job with a failed pod matching the exit code rule and a couple of successful pods
   181  	jobObj, err := createJobWithDefaults(ctx, cs, ns.Name, &job)
   182  	if err != nil {
   183  		t.Fatalf("Failed to create Job: %v", err)
   184  	}
   185  	validateJobPodsStatus(ctx, t, cs, jobObj, podsByStatus{
   186  		Active:      count,
   187  		Ready:       ptr.To[int32](0),
   188  		Terminating: ptr.To[int32](0),
   189  	})
   190  
   191  	jobPods, err := getJobPods(ctx, t, cs, jobObj, func(s v1.PodStatus) bool {
   192  		return (s.Phase == v1.PodPending || s.Phase == v1.PodRunning)
   193  	})
   194  	if err != nil {
   195  		t.Fatalf("Failed to list Job Pods: %v", err)
   196  	}
   197  
   198  	failedIndex := 1
   199  	wg := sync.WaitGroup{}
   200  	wg.Add(1)
   201  
   202  	// Await for the failed pod (with index failedIndex) to have its finalizer
   203  	// removed. The finalizer will be removed by the job controller just after
   204  	// appending the FailureTarget condition to the job to mark it as targeted
   205  	// for failure.
   206  	go func(ctx context.Context) {
   207  		err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, time.Minute, true, func(ctx context.Context) (bool, error) {
   208  			failedPodUpdated, err := cs.CoreV1().Pods(jobObj.Namespace).Get(ctx, jobPods[failedIndex].Name, metav1.GetOptions{})
   209  			if err != nil {
   210  				return true, err
   211  			}
   212  			if len(failedPodUpdated.Finalizers) == 0 {
   213  				return true, nil
   214  			}
   215  			return false, nil
   216  		})
   217  		if err != nil {
   218  			t.Logf("Failed awaiting for the finalizer removal for pod %v", klog.KObj(jobPods[failedIndex]))
   219  		}
   220  		wg.Done()
   221  	}(ctx)
   222  
   223  	// We update one pod as failed with state matching the pod failure policy rule. This results in removal
   224  	// of the pod finalizer from the pod by the job controller.
   225  	failedPod := jobPods[failedIndex]
   226  	updatedPod := failedPod.DeepCopy()
   227  	updatedPod.Status = podStatusMatchingOnExitCodesTerminateRule
   228  	_, err = updatePodStatuses(ctx, cs, []v1.Pod{*updatedPod})
   229  	if err != nil {
   230  		t.Fatalf("Failed to update pod statuses %q for pods of job %q", err, klog.KObj(jobObj))
   231  	}
   232  	wg.Wait()
   233  
   234  	t.Logf("Finalizer is removed for the failed pod %q. Shutting down the controller.", klog.KObj(failedPod))
   235  	// shut down the first job controller as soon as it removed the finalizer for the failed pod. This will
   236  	// likely happen before the first controller is able to mark the job as Failed.
   237  	cancel()
   238  
   239  	// Delete the failed pod to make sure it is not used by the second instance of the controller
   240  	ctx, cancel = context.WithCancel(context.Background())
   241  	err = cs.CoreV1().Pods(failedPod.Namespace).Delete(ctx, failedPod.Name, metav1.DeleteOptions{GracePeriodSeconds: ptr.To[int64](0)})
   242  	if err != nil {
   243  		t.Fatalf("Error: '%v' while deleting pod: '%v'", err, klog.KObj(failedPod))
   244  	}
   245  	t.Logf("The failed pod %q is deleted", klog.KObj(failedPod))
   246  	cancel()
   247  
   248  	// start the second controller to promote the interim FailureTarget job condition as Failed
   249  	ctx, cancel = startJobControllerAndWaitForCaches(t, restConfig)
   250  	// verify the job is correctly marked as Failed
   251  	validateJobFailed(ctx, t, cs, jobObj)
   252  	validateNoOrphanPodsWithFinalizers(ctx, t, cs, jobObj)
   253  }
   254  
   255  // TestJobPodFailurePolicy tests handling of pod failures with respect to the
   256  // configured pod failure policy rules
   257  func TestJobPodFailurePolicy(t *testing.T) {
   258  	t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, fastPodFailureBackoff))
   259  	job := batchv1.Job{
   260  		Spec: batchv1.JobSpec{
   261  			Template: v1.PodTemplateSpec{
   262  				Spec: v1.PodSpec{
   263  					Containers: []v1.Container{
   264  						{
   265  							Name:                     "main-container",
   266  							Image:                    "foo",
   267  							ImagePullPolicy:          v1.PullIfNotPresent,
   268  							TerminationMessagePolicy: v1.TerminationMessageFallbackToLogsOnError,
   269  						},
   270  					},
   271  				},
   272  			},
   273  			PodFailurePolicy: &batchv1.PodFailurePolicy{
   274  				Rules: []batchv1.PodFailurePolicyRule{
   275  					{
   276  						Action: batchv1.PodFailurePolicyActionIgnore,
   277  						OnPodConditions: []batchv1.PodFailurePolicyOnPodConditionsPattern{
   278  							{
   279  								Type: v1.DisruptionTarget,
   280  							},
   281  						},
   282  					},
   283  					{
   284  						Action: batchv1.PodFailurePolicyActionCount,
   285  						OnExitCodes: &batchv1.PodFailurePolicyOnExitCodesRequirement{
   286  							Operator: batchv1.PodFailurePolicyOnExitCodesOpIn,
   287  							Values:   []int32{10},
   288  						},
   289  					},
   290  					{
   291  						Action: batchv1.PodFailurePolicyActionFailJob,
   292  						OnExitCodes: &batchv1.PodFailurePolicyOnExitCodesRequirement{
   293  							Operator: batchv1.PodFailurePolicyOnExitCodesOpIn,
   294  							Values:   []int32{5, 6, 7},
   295  						},
   296  					},
   297  				},
   298  			},
   299  		},
   300  	}
   301  	podStatusMatchingOnExitCodesTerminateRule := v1.PodStatus{
   302  		Phase: v1.PodFailed,
   303  		ContainerStatuses: []v1.ContainerStatus{
   304  			{
   305  				Name: "main-container",
   306  				State: v1.ContainerState{
   307  					Terminated: &v1.ContainerStateTerminated{
   308  						ExitCode: 5,
   309  					},
   310  				},
   311  			},
   312  		},
   313  	}
   314  	podStatusMatchingOnExitCodesCountRule := v1.PodStatus{
   315  		Phase: v1.PodFailed,
   316  		ContainerStatuses: []v1.ContainerStatus{
   317  			{
   318  				Name: "main-container",
   319  				State: v1.ContainerState{
   320  					Terminated: &v1.ContainerStateTerminated{
   321  						ExitCode: 10,
   322  					},
   323  				},
   324  			},
   325  		},
   326  	}
   327  	podStatusMatchingOnPodConditionsIgnoreRule := v1.PodStatus{
   328  		Phase: v1.PodFailed,
   329  		Conditions: []v1.PodCondition{
   330  			{
   331  				Type:   v1.DisruptionTarget,
   332  				Status: v1.ConditionTrue,
   333  			},
   334  		},
   335  	}
   336  	podStatusNotMatchingAnyRule := v1.PodStatus{
   337  		Phase: v1.PodFailed,
   338  		ContainerStatuses: []v1.ContainerStatus{
   339  			{
   340  				State: v1.ContainerState{
   341  					Terminated: &v1.ContainerStateTerminated{},
   342  				},
   343  			},
   344  		},
   345  	}
   346  	testCases := map[string]struct {
   347  		enableJobPodFailurePolicy                bool
   348  		restartController                        bool
   349  		job                                      batchv1.Job
   350  		podStatus                                v1.PodStatus
   351  		wantActive                               int
   352  		wantFailed                               int
   353  		wantJobConditionType                     batchv1.JobConditionType
   354  		wantJobFinishedMetric                    metricLabelsWithValue
   355  		wantPodFailuresHandledByPolicyRuleMetric *metricLabelsWithValue
   356  	}{
   357  		"pod status matching the configured FailJob rule on exit codes; job terminated when JobPodFailurePolicy enabled": {
   358  			enableJobPodFailurePolicy: true,
   359  			job:                       job,
   360  			podStatus:                 podStatusMatchingOnExitCodesTerminateRule,
   361  			wantActive:                0,
   362  			wantFailed:                1,
   363  			wantJobConditionType:      batchv1.JobFailed,
   364  			wantJobFinishedMetric: metricLabelsWithValue{
   365  				Labels: []string{"NonIndexed", "failed", "PodFailurePolicy"},
   366  				Value:  1,
   367  			},
   368  			wantPodFailuresHandledByPolicyRuleMetric: &metricLabelsWithValue{
   369  				Labels: []string{"FailJob"},
   370  				Value:  1,
   371  			},
   372  		},
   373  		"pod status matching the configured FailJob rule on exit codes; with controller restart; job terminated when JobPodFailurePolicy enabled": {
   374  			enableJobPodFailurePolicy: true,
   375  			restartController:         true,
   376  			job:                       job,
   377  			podStatus:                 podStatusMatchingOnExitCodesTerminateRule,
   378  			wantActive:                0,
   379  			wantFailed:                1,
   380  			wantJobConditionType:      batchv1.JobFailed,
   381  			wantJobFinishedMetric: metricLabelsWithValue{
   382  				Labels: []string{"NonIndexed", "failed", "PodFailurePolicy"},
   383  				Value:  1,
   384  			},
   385  		},
   386  		"pod status matching the configured FailJob rule on exit codes; default handling when JobPodFailurePolicy disabled": {
   387  			enableJobPodFailurePolicy: false,
   388  			job:                       job,
   389  			podStatus:                 podStatusMatchingOnExitCodesTerminateRule,
   390  			wantActive:                1,
   391  			wantFailed:                1,
   392  			wantJobConditionType:      batchv1.JobComplete,
   393  			wantJobFinishedMetric: metricLabelsWithValue{
   394  				Labels: []string{"NonIndexed", "succeeded", ""},
   395  				Value:  1,
   396  			},
   397  		},
   398  		"pod status matching the configured Ignore rule on pod conditions; pod failure not counted when JobPodFailurePolicy enabled": {
   399  			enableJobPodFailurePolicy: true,
   400  			job:                       job,
   401  			podStatus:                 podStatusMatchingOnPodConditionsIgnoreRule,
   402  			wantActive:                1,
   403  			wantFailed:                0,
   404  			wantJobConditionType:      batchv1.JobComplete,
   405  			wantPodFailuresHandledByPolicyRuleMetric: &metricLabelsWithValue{
   406  				Labels: []string{"Ignore"},
   407  				Value:  1,
   408  			},
   409  			wantJobFinishedMetric: metricLabelsWithValue{
   410  				Labels: []string{"NonIndexed", "succeeded", ""},
   411  				Value:  1,
   412  			},
   413  		},
   414  		"pod status matching the configured Count rule on exit codes; pod failure counted when JobPodFailurePolicy enabled": {
   415  			enableJobPodFailurePolicy: true,
   416  			job:                       job,
   417  			podStatus:                 podStatusMatchingOnExitCodesCountRule,
   418  			wantActive:                1,
   419  			wantFailed:                1,
   420  			wantJobConditionType:      batchv1.JobComplete,
   421  			wantJobFinishedMetric: metricLabelsWithValue{
   422  				Labels: []string{"NonIndexed", "succeeded", ""},
   423  				Value:  1,
   424  			},
   425  			wantPodFailuresHandledByPolicyRuleMetric: &metricLabelsWithValue{
   426  				Labels: []string{"Count"},
   427  				Value:  1,
   428  			},
   429  		},
   430  		"pod status non-matching any configured rule; pod failure counted when JobPodFailurePolicy enabled": {
   431  			enableJobPodFailurePolicy: true,
   432  			job:                       job,
   433  			podStatus:                 podStatusNotMatchingAnyRule,
   434  			wantActive:                1,
   435  			wantFailed:                1,
   436  			wantJobConditionType:      batchv1.JobComplete,
   437  			wantJobFinishedMetric: metricLabelsWithValue{
   438  				Labels: []string{"NonIndexed", "succeeded", ""},
   439  				Value:  1,
   440  			},
   441  			wantPodFailuresHandledByPolicyRuleMetric: &metricLabelsWithValue{
   442  				Labels: []string{"Count"},
   443  				Value:  0,
   444  			},
   445  		},
   446  	}
   447  	for name, test := range testCases {
   448  		t.Run(name, func(t *testing.T) {
   449  			resetMetrics()
   450  			defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, test.enableJobPodFailurePolicy)()
   451  
   452  			closeFn, restConfig, clientSet, ns := setup(t, "simple")
   453  			defer closeFn()
   454  			ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
   455  			defer func() {
   456  				cancel()
   457  			}()
   458  
   459  			jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &test.job)
   460  			if err != nil {
   461  				t.Fatalf("Error %q while creating the job %q", err, jobObj.Name)
   462  			}
   463  			validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
   464  				Active:      1,
   465  				Ready:       ptr.To[int32](0),
   466  				Terminating: ptr.To[int32](0),
   467  			})
   468  
   469  			op := func(p *v1.Pod) bool {
   470  				p.Status = test.podStatus
   471  				return true
   472  			}
   473  
   474  			if err, _ := updateJobPodsStatus(ctx, clientSet, jobObj, op, 1); err != nil {
   475  				t.Fatalf("Error %q while updating pod status for Job: %q", err, jobObj.Name)
   476  			}
   477  
   478  			if test.restartController {
   479  				cancel()
   480  				ctx, cancel = startJobControllerAndWaitForCaches(t, restConfig)
   481  			}
   482  
   483  			validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
   484  				Active:      test.wantActive,
   485  				Failed:      test.wantFailed,
   486  				Ready:       ptr.To[int32](0),
   487  				Terminating: ptr.To[int32](0),
   488  			})
   489  
   490  			if test.wantJobConditionType == batchv1.JobComplete {
   491  				if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
   492  					t.Fatalf("Failed setting phase %q on Job Pod: %q", v1.PodSucceeded, err)
   493  				}
   494  			}
   495  			validateJobCondition(ctx, t, clientSet, jobObj, test.wantJobConditionType)
   496  			validateCounterMetric(ctx, t, metrics.JobFinishedNum, test.wantJobFinishedMetric)
   497  			if test.wantPodFailuresHandledByPolicyRuleMetric != nil {
   498  				validateCounterMetric(ctx, t, metrics.PodFailuresHandledByFailurePolicy, *test.wantPodFailuresHandledByPolicyRuleMetric)
   499  			}
   500  			validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
   501  		})
   502  	}
   503  }
   504  
   505  // TestSuccessPolicy tests handling of job and its pods when
   506  // successPolicy is used.
   507  func TestSuccessPolicy(t *testing.T) {
   508  	type podTerminationWithExpectations struct {
   509  		index                int
   510  		status               v1.PodStatus
   511  		wantActive           int
   512  		wantFailed           int
   513  		wantSucceeded        int
   514  		wantActiveIndexes    sets.Set[int]
   515  		wantCompletedIndexes string
   516  		wantFailedIndexes    *string
   517  	}
   518  
   519  	podTemplateSpec := v1.PodTemplateSpec{
   520  		Spec: v1.PodSpec{
   521  			Containers: []v1.Container{
   522  				{
   523  					Name:                     "main-container",
   524  					Image:                    "foo",
   525  					ImagePullPolicy:          v1.PullIfNotPresent,
   526  					TerminationMessagePolicy: v1.TerminationMessageFallbackToLogsOnError,
   527  				},
   528  			},
   529  		},
   530  	}
   531  	testCases := map[string]struct {
   532  		enableJobSuccessPolicy     bool
   533  		enableBackoffLimitPerIndex bool
   534  		job                        batchv1.Job
   535  		podTerminations            []podTerminationWithExpectations
   536  		wantConditionTypes         []batchv1.JobConditionType
   537  		wantJobFinishedNumMetric   []metricLabelsWithValue
   538  	}{
   539  		"all indexes succeeded; JobSuccessPolicy is enabled": {
   540  			enableJobSuccessPolicy: true,
   541  			job: batchv1.Job{
   542  				Spec: batchv1.JobSpec{
   543  					Parallelism:    ptr.To[int32](1),
   544  					Completions:    ptr.To[int32](1),
   545  					CompletionMode: completionModePtr(batchv1.IndexedCompletion),
   546  					Template:       podTemplateSpec,
   547  					SuccessPolicy: &batchv1.SuccessPolicy{
   548  						Rules: []batchv1.SuccessPolicyRule{{
   549  							SucceededIndexes: ptr.To("0"),
   550  						}},
   551  					},
   552  				},
   553  			},
   554  			podTerminations: []podTerminationWithExpectations{
   555  				{
   556  					index: 0,
   557  					status: v1.PodStatus{
   558  						Phase: v1.PodSucceeded,
   559  					},
   560  					wantActive:           0,
   561  					wantFailed:           0,
   562  					wantSucceeded:        1,
   563  					wantCompletedIndexes: "0",
   564  				},
   565  			},
   566  			wantConditionTypes: []batchv1.JobConditionType{batchv1.JobSuccessCriteriaMet, batchv1.JobComplete},
   567  			wantJobFinishedNumMetric: []metricLabelsWithValue{
   568  				{
   569  					Labels: []string{"Indexed", "succeeded", ""},
   570  					Value:  1,
   571  				},
   572  			},
   573  		},
   574  		"all indexes succeeded; JobSuccessPolicy is disabled": {
   575  			job: batchv1.Job{
   576  				Spec: batchv1.JobSpec{
   577  					Parallelism:    ptr.To[int32](1),
   578  					Completions:    ptr.To[int32](1),
   579  					CompletionMode: completionModePtr(batchv1.IndexedCompletion),
   580  					Template:       podTemplateSpec,
   581  					SuccessPolicy: &batchv1.SuccessPolicy{
   582  						Rules: []batchv1.SuccessPolicyRule{{
   583  							SucceededIndexes: ptr.To("0"),
   584  						}},
   585  					},
   586  				},
   587  			},
   588  			podTerminations: []podTerminationWithExpectations{
   589  				{
   590  					index: 0,
   591  					status: v1.PodStatus{
   592  						Phase: v1.PodSucceeded,
   593  					},
   594  					wantActive:           0,
   595  					wantFailed:           0,
   596  					wantSucceeded:        1,
   597  					wantCompletedIndexes: "0",
   598  				},
   599  			},
   600  			wantConditionTypes: []batchv1.JobConditionType{batchv1.JobComplete},
   601  			wantJobFinishedNumMetric: []metricLabelsWithValue{
   602  				{
   603  					Labels: []string{"Indexed", "succeeded", ""},
   604  					Value:  1,
   605  				},
   606  			},
   607  		},
   608  		"job with successPolicy with succeededIndexes; job has SuccessCriteriaMet and Complete conditions even if some indexes remain pending": {
   609  			enableJobSuccessPolicy: true,
   610  			job: batchv1.Job{
   611  				Spec: batchv1.JobSpec{
   612  					Parallelism:    ptr.To[int32](2),
   613  					Completions:    ptr.To[int32](2),
   614  					CompletionMode: completionModePtr(batchv1.IndexedCompletion),
   615  					Template:       podTemplateSpec,
   616  					SuccessPolicy: &batchv1.SuccessPolicy{
   617  						Rules: []batchv1.SuccessPolicyRule{{
   618  							SucceededIndexes: ptr.To("1"),
   619  						}},
   620  					},
   621  				},
   622  			},
   623  			podTerminations: []podTerminationWithExpectations{
   624  				{
   625  					index: 0,
   626  					status: v1.PodStatus{
   627  						Phase: v1.PodPending,
   628  					},
   629  					wantActive:        2,
   630  					wantActiveIndexes: sets.New(0, 1),
   631  					wantFailed:        0,
   632  					wantSucceeded:     0,
   633  				},
   634  				{
   635  					index: 1,
   636  					status: v1.PodStatus{
   637  						Phase: v1.PodSucceeded,
   638  					},
   639  					wantActive:           0,
   640  					wantFailed:           0,
   641  					wantSucceeded:        1,
   642  					wantCompletedIndexes: "1",
   643  				},
   644  			},
   645  			wantConditionTypes: []batchv1.JobConditionType{batchv1.JobSuccessCriteriaMet, batchv1.JobComplete},
   646  			wantJobFinishedNumMetric: []metricLabelsWithValue{
   647  				{
   648  					Labels: []string{"Indexed", "succeeded", ""},
   649  					Value:  1,
   650  				},
   651  			},
   652  		},
   653  		"job with successPolicy with succeededCount; job has SuccessCriteriaMet and Complete conditions even if some indexes remain pending": {
   654  			enableJobSuccessPolicy: true,
   655  			job: batchv1.Job{
   656  				Spec: batchv1.JobSpec{
   657  					Parallelism:    ptr.To[int32](2),
   658  					Completions:    ptr.To[int32](2),
   659  					CompletionMode: completionModePtr(batchv1.IndexedCompletion),
   660  					Template:       podTemplateSpec,
   661  					SuccessPolicy: &batchv1.SuccessPolicy{
   662  						Rules: []batchv1.SuccessPolicyRule{{
   663  							SucceededCount: ptr.To[int32](1),
   664  						}},
   665  					},
   666  				},
   667  			},
   668  			podTerminations: []podTerminationWithExpectations{
   669  				{
   670  					index: 0,
   671  					status: v1.PodStatus{
   672  						Phase: v1.PodPending,
   673  					},
   674  					wantActive:        2,
   675  					wantActiveIndexes: sets.New(0, 1),
   676  					wantFailed:        0,
   677  					wantSucceeded:     0,
   678  				},
   679  				{
   680  					index: 1,
   681  					status: v1.PodStatus{
   682  						Phase: v1.PodSucceeded,
   683  					},
   684  					wantActive:           0,
   685  					wantFailed:           0,
   686  					wantSucceeded:        1,
   687  					wantCompletedIndexes: "1",
   688  				},
   689  			},
   690  			wantConditionTypes: []batchv1.JobConditionType{batchv1.JobSuccessCriteriaMet, batchv1.JobComplete},
   691  			wantJobFinishedNumMetric: []metricLabelsWithValue{
   692  				{
   693  					Labels: []string{"Indexed", "succeeded", ""},
   694  					Value:  1,
   695  				},
   696  			},
   697  		},
   698  		"job with successPolicy and backoffLimitPerIndex; job has a Failed condition if job meets backoffLimitPerIndex": {
   699  			enableJobSuccessPolicy:     true,
   700  			enableBackoffLimitPerIndex: true,
   701  			job: batchv1.Job{
   702  				Spec: batchv1.JobSpec{
   703  					Parallelism:          ptr.To[int32](2),
   704  					Completions:          ptr.To[int32](2),
   705  					CompletionMode:       completionModePtr(batchv1.IndexedCompletion),
   706  					BackoffLimitPerIndex: ptr.To[int32](0),
   707  					Template:             podTemplateSpec,
   708  					SuccessPolicy: &batchv1.SuccessPolicy{
   709  						Rules: []batchv1.SuccessPolicyRule{{
   710  							SucceededCount: ptr.To[int32](1),
   711  						}},
   712  					},
   713  				},
   714  			},
   715  			podTerminations: []podTerminationWithExpectations{
   716  				{
   717  					index: 0,
   718  					status: v1.PodStatus{
   719  						Phase: v1.PodFailed,
   720  					},
   721  					wantActive:        1,
   722  					wantActiveIndexes: sets.New(1),
   723  					wantFailed:        1,
   724  					wantFailedIndexes: ptr.To("0"),
   725  					wantSucceeded:     0,
   726  				},
   727  				{
   728  					index: 1,
   729  					status: v1.PodStatus{
   730  						Phase: v1.PodSucceeded,
   731  					},
   732  					wantActive:           0,
   733  					wantFailed:           1,
   734  					wantSucceeded:        1,
   735  					wantFailedIndexes:    ptr.To("0"),
   736  					wantCompletedIndexes: "1",
   737  				},
   738  			},
   739  			wantConditionTypes: []batchv1.JobConditionType{batchv1.JobFailed},
   740  		},
   741  	}
   742  	for name, tc := range testCases {
   743  		t.Run(name, func(t *testing.T) {
   744  			resetMetrics()
   745  			defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobSuccessPolicy, tc.enableJobSuccessPolicy)()
   746  			defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, tc.enableBackoffLimitPerIndex)()
   747  
   748  			closeFn, restConfig, clientSet, ns := setup(t, "simple")
   749  			defer closeFn()
   750  			ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
   751  			defer func() {
   752  				cancel()
   753  			}()
   754  			jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &tc.job)
   755  			if err != nil {
   756  				t.Fatalf("Error %v while creating the Job %q", err, jobObj.Name)
   757  			}
   758  			validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
   759  				Active:      int(*tc.job.Spec.Parallelism),
   760  				Ready:       ptr.To[int32](0),
   761  				Terminating: ptr.To[int32](0),
   762  			})
   763  			for _, podTermination := range tc.podTerminations {
   764  				pod, err := getActivePodForIndex(ctx, clientSet, jobObj, podTermination.index)
   765  				if err != nil {
   766  					t.Fatalf("Listing Job Pods: %v", err)
   767  				}
   768  				pod.Status = podTermination.status
   769  				if _, err = clientSet.CoreV1().Pods(pod.Namespace).UpdateStatus(ctx, pod, metav1.UpdateOptions{}); err != nil {
   770  					t.Fatalf("Error updating the Pod %q: %v", klog.KObj(pod), err)
   771  				}
   772  				validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
   773  					Active:      podTermination.wantActive,
   774  					Succeeded:   podTermination.wantSucceeded,
   775  					Failed:      podTermination.wantFailed,
   776  					Ready:       ptr.To[int32](0),
   777  					Terminating: ptr.To[int32](0),
   778  				})
   779  				validateIndexedJobPods(ctx, t, clientSet, jobObj, podTermination.wantActiveIndexes, podTermination.wantCompletedIndexes, podTermination.wantFailedIndexes)
   780  			}
   781  			for i := range tc.wantConditionTypes {
   782  				validateJobCondition(ctx, t, clientSet, jobObj, tc.wantConditionTypes[i])
   783  			}
   784  			for i := range tc.wantJobFinishedNumMetric {
   785  				validateCounterMetric(ctx, t, metrics.JobFinishedNum, tc.wantJobFinishedNumMetric[i])
   786  			}
   787  			validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
   788  		})
   789  	}
   790  }
   791  
   792  // TestSuccessPolicy_ReEnabling tests handling of pod successful when
   793  // re-enabling the JobSuccessPolicy feature.
   794  func TestSuccessPolicy_ReEnabling(t *testing.T) {
   795  	defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobSuccessPolicy, true)()
   796  	closeFn, resetConfig, clientSet, ns := setup(t, "success-policy-re-enabling")
   797  	defer closeFn()
   798  	ctx, cancel := startJobControllerAndWaitForCaches(t, resetConfig)
   799  	defer cancel()
   800  	resetMetrics()
   801  
   802  	jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
   803  		Spec: batchv1.JobSpec{
   804  			Parallelism:    ptr.To[int32](5),
   805  			Completions:    ptr.To[int32](5),
   806  			CompletionMode: completionModePtr(batchv1.IndexedCompletion),
   807  			SuccessPolicy: &batchv1.SuccessPolicy{
   808  				Rules: []batchv1.SuccessPolicyRule{{
   809  					SucceededCount: ptr.To[int32](3),
   810  				}},
   811  			},
   812  		},
   813  	})
   814  	if err != nil {
   815  		t.Fatalf("Failed to create Job: %v", err)
   816  	}
   817  	validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
   818  		Active:      5,
   819  		Ready:       ptr.To[int32](0),
   820  		Terminating: ptr.To[int32](0),
   821  	})
   822  	validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(0, 1, 2, 3, 4), "", nil)
   823  
   824  	// First pod from index 0 succeeded
   825  	if err = setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodSucceeded, 0); err != nil {
   826  		t.Fatalf("Failed tring to succeess pod with index 0")
   827  	}
   828  	validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
   829  		Active:      4,
   830  		Succeeded:   1,
   831  		Ready:       ptr.To[int32](0),
   832  		Terminating: ptr.To[int32](0),
   833  	})
   834  	validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(1, 2, 3, 4), "0", nil)
   835  
   836  	// Disable the JobSuccessPolicy
   837  	defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobSuccessPolicy, false)()
   838  
   839  	// First pod from index 1 succeeded
   840  	if err = setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
   841  		t.Fatalf("Failed trying to succeess pod with index 1")
   842  	}
   843  	validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
   844  		Active:      3,
   845  		Succeeded:   2,
   846  		Ready:       ptr.To[int32](0),
   847  		Terminating: ptr.To[int32](0),
   848  	})
   849  	validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(2, 3, 4), "0,1", nil)
   850  
   851  	// ReEnable the JobSuccessPolicy
   852  	defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobSuccessPolicy, true)()
   853  
   854  	// First pod from index 2 succeeded
   855  	if err = setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodSucceeded, 2); err != nil {
   856  		t.Fatalf("Failed trying to success pod with index 2")
   857  	}
   858  
   859  	// Verify all indexes are terminated as job meets successPolicy.
   860  	validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
   861  		Active:      0,
   862  		Succeeded:   3,
   863  		Ready:       ptr.To[int32](0),
   864  		Terminating: ptr.To[int32](0),
   865  	})
   866  	validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New[int](), "0-2", nil)
   867  
   868  	validateJobCondition(ctx, t, clientSet, jobObj, batchv1.JobSuccessCriteriaMet)
   869  	validateJobComplete(ctx, t, clientSet, jobObj)
   870  	validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
   871  }
   872  
   873  // TestBackoffLimitPerIndex_DelayedPodDeletion tests the pod deletion is delayed
   874  // until the replacement pod is created, so that the replacement pod has the
   875  // index-failure-count annotation bumped, when BackoffLimitPerIndex is used.
   876  func TestBackoffLimitPerIndex_DelayedPodDeletion(t *testing.T) {
   877  	t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, fastPodFailureBackoff))
   878  
   879  	defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, true)()
   880  	closeFn, restConfig, clientSet, ns := setup(t, "backoff-limit-per-index-failed")
   881  	defer closeFn()
   882  	ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
   883  	defer func() {
   884  		cancel()
   885  	}()
   886  
   887  	jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
   888  		Spec: batchv1.JobSpec{
   889  			Parallelism:          ptr.To[int32](1),
   890  			Completions:          ptr.To[int32](1),
   891  			BackoffLimitPerIndex: ptr.To[int32](1),
   892  			CompletionMode:       completionModePtr(batchv1.IndexedCompletion),
   893  		},
   894  	})
   895  	if err != nil {
   896  		t.Fatalf("Failed to create Job: %v", err)
   897  	}
   898  	validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
   899  		Active:      1,
   900  		Ready:       ptr.To[int32](0),
   901  		Terminating: ptr.To[int32](0),
   902  	})
   903  	validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(0), "", ptr.To(""))
   904  
   905  	// First pod from index 0 failed.
   906  	if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodFailed, 0); err != nil {
   907  		t.Fatal("Failed trying to fail pod with index 0")
   908  	}
   909  	// Delete the failed pod
   910  	pod, err := getJobPodForIndex(ctx, clientSet, jobObj, 0, func(_ *v1.Pod) bool { return true })
   911  	if err != nil {
   912  		t.Fatalf("failed to get terminal pod for index: %v", 0)
   913  	}
   914  	if err := clientSet.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}); err != nil {
   915  		t.Fatalf("failed to delete pod: %v, error: %v", klog.KObj(pod), err)
   916  	}
   917  
   918  	validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
   919  		Active:      1,
   920  		Failed:      1,
   921  		Ready:       ptr.To[int32](0),
   922  		Terminating: ptr.To[int32](0),
   923  	})
   924  	validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(0), "", ptr.To(""))
   925  
   926  	// Verify the replacement pod is created and has the index-failure-count
   927  	// annotation bumped.
   928  	replacement, err := getActivePodForIndex(ctx, clientSet, jobObj, 0)
   929  	if err != nil {
   930  		t.Fatalf("Failed to get active replacement pod for index: %v, error: %v", 0, err)
   931  	}
   932  	gotIndexFailureCount, err := getIndexFailureCount(replacement)
   933  	if err != nil {
   934  		t.Fatalf("Failed read the index failure count annotation for pod: %v, error: %v", klog.KObj(replacement), err)
   935  	}
   936  	if diff := cmp.Diff(1, gotIndexFailureCount); diff != "" {
   937  		t.Errorf("Unexpected index failure count for the replacement pod: %s", diff)
   938  	}
   939  	if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodSucceeded, 0); err != nil {
   940  		t.Fatal("Failed trying to fail pod with index 0")
   941  	}
   942  	validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
   943  		Active:      0,
   944  		Succeeded:   1,
   945  		Failed:      1,
   946  		Ready:       ptr.To[int32](0),
   947  		Terminating: ptr.To[int32](0),
   948  	})
   949  	validateJobComplete(ctx, t, clientSet, jobObj)
   950  }
   951  
   952  // TestBackoffLimitPerIndex_Reenabling tests handling of pod failures when
   953  // reenabling the BackoffLimitPerIndex feature.
   954  func TestBackoffLimitPerIndex_Reenabling(t *testing.T) {
   955  	t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, fastPodFailureBackoff))
   956  
   957  	defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, true)()
   958  	closeFn, restConfig, clientSet, ns := setup(t, "backoff-limit-per-index-reenabled")
   959  	defer closeFn()
   960  	ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
   961  	defer cancel()
   962  	resetMetrics()
   963  
   964  	jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
   965  		Spec: batchv1.JobSpec{
   966  			Parallelism:          ptr.To[int32](3),
   967  			Completions:          ptr.To[int32](3),
   968  			BackoffLimitPerIndex: ptr.To[int32](0),
   969  			CompletionMode:       completionModePtr(batchv1.IndexedCompletion),
   970  		},
   971  	})
   972  	if err != nil {
   973  		t.Fatalf("Failed to create Job: %v", err)
   974  	}
   975  	validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
   976  		Active:      3,
   977  		Ready:       ptr.To[int32](0),
   978  		Terminating: ptr.To[int32](0),
   979  	})
   980  	validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(0, 1, 2), "", ptr.To(""))
   981  
   982  	// First pod from index 0 failed
   983  	if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodFailed, 0); err != nil {
   984  		t.Fatal("Failed trying to fail pod with index 0")
   985  	}
   986  	validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
   987  		Active:      2,
   988  		Failed:      1,
   989  		Ready:       ptr.To[int32](0),
   990  		Terminating: ptr.To[int32](0),
   991  	})
   992  	validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(1, 2), "", ptr.To("0"))
   993  
   994  	// Disable the feature
   995  	defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, false)()
   996  
   997  	// First pod from index 1 failed
   998  	if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
   999  		t.Fatal("Failed trying to fail pod with index 1")
  1000  	}
  1001  	validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
  1002  		Active:      3,
  1003  		Failed:      2,
  1004  		Ready:       ptr.To[int32](0),
  1005  		Terminating: ptr.To[int32](0),
  1006  	})
  1007  	validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(0, 1, 2), "", nil)
  1008  
  1009  	// Reenable the feature
  1010  	defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, true)()
  1011  
  1012  	// First pod from index 2 failed
  1013  	if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
  1014  		t.Fatal("Failed trying to fail pod with index 2")
  1015  	}
  1016  
  1017  	// Verify the indexes 0 and 1 are active as the failed pods don't have
  1018  	// finalizers at this point, so they are ignored.
  1019  	validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
  1020  		Active:      2,
  1021  		Failed:      3,
  1022  		Ready:       ptr.To[int32](0),
  1023  		Terminating: ptr.To[int32](0),
  1024  	})
  1025  	validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(0, 1), "", ptr.To("2"))
  1026  
  1027  	// mark remaining pods are Succeeded and verify Job status
  1028  	if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 2); err != nil {
  1029  		t.Fatalf("Failed setting phase %q on Job Pod: %q", v1.PodSucceeded, err)
  1030  	}
  1031  	validateJobFailed(ctx, t, clientSet, jobObj)
  1032  	validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
  1033  }
  1034  
  1035  // TestBackoffLimitPerIndex_JobPodsCreatedWithExponentialBackoff tests that the
  1036  // pods are recreated with expotential backoff delay computed independently
  1037  // per index. Scenario:
  1038  // - fail index 0
  1039  // - fail index 0
  1040  // - fail index 1
  1041  // - succeed index 0
  1042  // - fail index 1
  1043  // - succeed index 1
  1044  func TestBackoffLimitPerIndex_JobPodsCreatedWithExponentialBackoff(t *testing.T) {
  1045  	defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, true)()
  1046  	t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, 2*time.Second))
  1047  
  1048  	closeFn, restConfig, clientSet, ns := setup(t, "simple")
  1049  	defer closeFn()
  1050  	ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
  1051  	defer cancel()
  1052  
  1053  	jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
  1054  		Spec: batchv1.JobSpec{
  1055  			Completions:          ptr.To[int32](2),
  1056  			Parallelism:          ptr.To[int32](2),
  1057  			BackoffLimitPerIndex: ptr.To[int32](2),
  1058  			CompletionMode:       completionModePtr(batchv1.IndexedCompletion),
  1059  		},
  1060  	})
  1061  	if err != nil {
  1062  		t.Fatalf("Could not create job: %v", err)
  1063  	}
  1064  	validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
  1065  		Active:      2,
  1066  		Ready:       ptr.To[int32](0),
  1067  		Terminating: ptr.To[int32](0),
  1068  	})
  1069  	validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(0, 1), "", ptr.To(""))
  1070  
  1071  	// Fail the first pod for index 0
  1072  	if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodFailed, 0); err != nil {
  1073  		t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodFailed, err)
  1074  	}
  1075  	validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
  1076  		Active:      2,
  1077  		Failed:      1,
  1078  		Ready:       ptr.To[int32](0),
  1079  		Terminating: ptr.To[int32](0),
  1080  	})
  1081  	validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(0, 1), "", ptr.To(""))
  1082  
  1083  	// Fail the second pod for index 0
  1084  	if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodFailed, 0); err != nil {
  1085  		t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodFailed, err)
  1086  	}
  1087  	validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
  1088  		Active:      2,
  1089  		Failed:      2,
  1090  		Ready:       ptr.To[int32](0),
  1091  		Terminating: ptr.To[int32](0),
  1092  	})
  1093  	validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(0, 1), "", ptr.To(""))
  1094  
  1095  	// Fail the first pod for index 1
  1096  	if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
  1097  		t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodFailed, err)
  1098  	}
  1099  	validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
  1100  		Active:      2,
  1101  		Failed:      3,
  1102  		Ready:       ptr.To[int32](0),
  1103  		Terminating: ptr.To[int32](0),
  1104  	})
  1105  	validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(0, 1), "", ptr.To(""))
  1106  
  1107  	// Succeed the third pod for index 0
  1108  	if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodSucceeded, 0); err != nil {
  1109  		t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err)
  1110  	}
  1111  	validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
  1112  		Active:      1,
  1113  		Failed:      3,
  1114  		Succeeded:   1,
  1115  		Ready:       ptr.To[int32](0),
  1116  		Terminating: ptr.To[int32](0),
  1117  	})
  1118  	validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(1), "0", ptr.To(""))
  1119  
  1120  	// Fail the second pod for index 1
  1121  	if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
  1122  		t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodFailed, err)
  1123  	}
  1124  	validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
  1125  		Active:      1,
  1126  		Failed:      4,
  1127  		Succeeded:   1,
  1128  		Ready:       ptr.To[int32](0),
  1129  		Terminating: ptr.To[int32](0),
  1130  	})
  1131  	validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(1), "0", ptr.To(""))
  1132  
  1133  	// Succeed the third pod for index 1
  1134  	if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
  1135  		t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err)
  1136  	}
  1137  	validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
  1138  		Active:      0,
  1139  		Failed:      4,
  1140  		Succeeded:   2,
  1141  		Ready:       ptr.To[int32](0),
  1142  		Terminating: ptr.To[int32](0),
  1143  	})
  1144  	validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New[int](), "0,1", ptr.To(""))
  1145  	validateJobComplete(ctx, t, clientSet, jobObj)
  1146  
  1147  	for index := 0; index < int(*jobObj.Spec.Completions); index++ {
  1148  		podsForIndex, err := getJobPodsForIndex(ctx, clientSet, jobObj, index, func(_ *v1.Pod) bool { return true })
  1149  		if err != nil {
  1150  			t.Fatalf("Failed to list job %q pods for index %v, error: %v", klog.KObj(jobObj), index, err)
  1151  		}
  1152  		validateExpotentialBackoffDelay(t, jobcontroller.DefaultJobPodFailureBackOff, podsForIndex)
  1153  	}
  1154  }
  1155  
  1156  // TestBackoffLimitPerIndex tests handling of job and its pods when
  1157  // backoff limit per index is used.
  1158  func TestBackoffLimitPerIndex(t *testing.T) {
  1159  	t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, fastPodFailureBackoff))
  1160  
  1161  	type podTerminationWithExpectations struct {
  1162  		index                          int
  1163  		status                         v1.PodStatus
  1164  		wantActive                     int
  1165  		wantFailed                     int
  1166  		wantSucceeded                  int
  1167  		wantActiveIndexes              sets.Set[int]
  1168  		wantCompletedIndexes           string
  1169  		wantFailedIndexes              *string
  1170  		wantReplacementPodFailureCount *int
  1171  	}
  1172  
  1173  	podTemplateSpec := v1.PodTemplateSpec{
  1174  		Spec: v1.PodSpec{
  1175  			Containers: []v1.Container{
  1176  				{
  1177  					Name:                     "main-container",
  1178  					Image:                    "foo",
  1179  					ImagePullPolicy:          v1.PullIfNotPresent,
  1180  					TerminationMessagePolicy: v1.TerminationMessageFallbackToLogsOnError,
  1181  				},
  1182  			},
  1183  		},
  1184  	}
  1185  	testCases := map[string]struct {
  1186  		job                               batchv1.Job
  1187  		podTerminations                   []podTerminationWithExpectations
  1188  		wantJobConditionType              batchv1.JobConditionType
  1189  		wantJobFinishedIndexesTotalMetric []metricLabelsWithValue
  1190  	}{
  1191  		"job succeeded": {
  1192  			job: batchv1.Job{
  1193  				Spec: batchv1.JobSpec{
  1194  					Parallelism:          ptr.To[int32](2),
  1195  					Completions:          ptr.To[int32](2),
  1196  					CompletionMode:       completionModePtr(batchv1.IndexedCompletion),
  1197  					BackoffLimitPerIndex: ptr.To[int32](1),
  1198  					Template:             podTemplateSpec,
  1199  				},
  1200  			},
  1201  			podTerminations: []podTerminationWithExpectations{
  1202  				{
  1203  					status: v1.PodStatus{
  1204  						Phase: v1.PodFailed,
  1205  					},
  1206  					wantActive:                     2,
  1207  					wantFailed:                     1,
  1208  					wantActiveIndexes:              sets.New(0, 1),
  1209  					wantFailedIndexes:              ptr.To(""),
  1210  					wantReplacementPodFailureCount: ptr.To(1),
  1211  				},
  1212  			},
  1213  			wantJobConditionType: batchv1.JobComplete,
  1214  			wantJobFinishedIndexesTotalMetric: []metricLabelsWithValue{
  1215  				{
  1216  					Labels: []string{"succeeded", "perIndex"},
  1217  					Value:  2,
  1218  				},
  1219  			},
  1220  		},
  1221  		"job index fails due to exceeding backoff limit per index": {
  1222  			job: batchv1.Job{
  1223  				Spec: batchv1.JobSpec{
  1224  					Parallelism:          ptr.To[int32](2),
  1225  					Completions:          ptr.To[int32](2),
  1226  					CompletionMode:       completionModePtr(batchv1.IndexedCompletion),
  1227  					BackoffLimitPerIndex: ptr.To[int32](2),
  1228  					Template:             podTemplateSpec,
  1229  				},
  1230  			},
  1231  			podTerminations: []podTerminationWithExpectations{
  1232  				{
  1233  					status: v1.PodStatus{
  1234  						Phase: v1.PodFailed,
  1235  					},
  1236  					wantActive:                     2,
  1237  					wantFailed:                     1,
  1238  					wantActiveIndexes:              sets.New(0, 1),
  1239  					wantFailedIndexes:              ptr.To(""),
  1240  					wantReplacementPodFailureCount: ptr.To(1),
  1241  				},
  1242  				{
  1243  					status: v1.PodStatus{
  1244  						Phase: v1.PodFailed,
  1245  					},
  1246  					wantActive:                     2,
  1247  					wantFailed:                     2,
  1248  					wantActiveIndexes:              sets.New(0, 1),
  1249  					wantFailedIndexes:              ptr.To(""),
  1250  					wantReplacementPodFailureCount: ptr.To(2),
  1251  				},
  1252  				{
  1253  					status: v1.PodStatus{
  1254  						Phase: v1.PodFailed,
  1255  					},
  1256  					wantActive:        1,
  1257  					wantFailed:        3,
  1258  					wantActiveIndexes: sets.New(1),
  1259  					wantFailedIndexes: ptr.To("0"),
  1260  				},
  1261  			},
  1262  			wantJobConditionType: batchv1.JobFailed,
  1263  			wantJobFinishedIndexesTotalMetric: []metricLabelsWithValue{
  1264  				{
  1265  					Labels: []string{"failed", "perIndex"},
  1266  					Value:  1,
  1267  				},
  1268  				{
  1269  					Labels: []string{"succeeded", "perIndex"},
  1270  					Value:  1,
  1271  				},
  1272  			},
  1273  		},
  1274  		"job index fails due to exceeding the global backoff limit first": {
  1275  			job: batchv1.Job{
  1276  				Spec: batchv1.JobSpec{
  1277  					Parallelism:          ptr.To[int32](3),
  1278  					Completions:          ptr.To[int32](3),
  1279  					CompletionMode:       completionModePtr(batchv1.IndexedCompletion),
  1280  					BackoffLimitPerIndex: ptr.To[int32](1),
  1281  					BackoffLimit:         ptr.To[int32](2),
  1282  					Template:             podTemplateSpec,
  1283  				},
  1284  			},
  1285  			podTerminations: []podTerminationWithExpectations{
  1286  				{
  1287  					index: 0,
  1288  					status: v1.PodStatus{
  1289  						Phase: v1.PodFailed,
  1290  					},
  1291  					wantActive:        3,
  1292  					wantFailed:        1,
  1293  					wantActiveIndexes: sets.New(0, 1, 2),
  1294  					wantFailedIndexes: ptr.To(""),
  1295  				},
  1296  				{
  1297  					index: 1,
  1298  					status: v1.PodStatus{
  1299  						Phase: v1.PodFailed,
  1300  					},
  1301  					wantActive:        3,
  1302  					wantFailed:        2,
  1303  					wantActiveIndexes: sets.New(0, 1, 2),
  1304  					wantFailedIndexes: ptr.To(""),
  1305  				},
  1306  				{
  1307  					index: 2,
  1308  					status: v1.PodStatus{
  1309  						Phase: v1.PodFailed,
  1310  					},
  1311  					wantFailed:        5,
  1312  					wantFailedIndexes: ptr.To(""),
  1313  				},
  1314  			},
  1315  			wantJobConditionType: batchv1.JobFailed,
  1316  			wantJobFinishedIndexesTotalMetric: []metricLabelsWithValue{
  1317  				{
  1318  					Labels: []string{"succeeded", "perIndex"},
  1319  					Value:  0,
  1320  				},
  1321  				{
  1322  					Labels: []string{"failed", "perIndex"},
  1323  					Value:  0,
  1324  				},
  1325  			},
  1326  		},
  1327  		"job continues execution after a failed index, the job is marked Failed due to the failed index": {
  1328  			job: batchv1.Job{
  1329  				Spec: batchv1.JobSpec{
  1330  					Parallelism:          ptr.To[int32](2),
  1331  					Completions:          ptr.To[int32](2),
  1332  					CompletionMode:       completionModePtr(batchv1.IndexedCompletion),
  1333  					BackoffLimitPerIndex: ptr.To[int32](0),
  1334  					Template:             podTemplateSpec,
  1335  				},
  1336  			},
  1337  			podTerminations: []podTerminationWithExpectations{
  1338  				{
  1339  					index: 0,
  1340  					status: v1.PodStatus{
  1341  						Phase: v1.PodFailed,
  1342  					},
  1343  					wantActive:        1,
  1344  					wantFailed:        1,
  1345  					wantActiveIndexes: sets.New(1),
  1346  					wantFailedIndexes: ptr.To("0"),
  1347  				},
  1348  				{
  1349  					index: 1,
  1350  					status: v1.PodStatus{
  1351  						Phase: v1.PodSucceeded,
  1352  					},
  1353  					wantFailed:           1,
  1354  					wantSucceeded:        1,
  1355  					wantFailedIndexes:    ptr.To("0"),
  1356  					wantCompletedIndexes: "1",
  1357  				},
  1358  			},
  1359  			wantJobConditionType: batchv1.JobFailed,
  1360  			wantJobFinishedIndexesTotalMetric: []metricLabelsWithValue{
  1361  				{
  1362  					Labels: []string{"succeeded", "perIndex"},
  1363  					Value:  1,
  1364  				},
  1365  				{
  1366  					Labels: []string{"failed", "perIndex"},
  1367  					Value:  1,
  1368  				},
  1369  			},
  1370  		},
  1371  		"job execution terminated early due to exceeding max failed indexes": {
  1372  			job: batchv1.Job{
  1373  				Spec: batchv1.JobSpec{
  1374  					Parallelism:          ptr.To[int32](3),
  1375  					Completions:          ptr.To[int32](3),
  1376  					CompletionMode:       completionModePtr(batchv1.IndexedCompletion),
  1377  					BackoffLimitPerIndex: ptr.To[int32](0),
  1378  					MaxFailedIndexes:     ptr.To[int32](1),
  1379  					Template:             podTemplateSpec,
  1380  				},
  1381  			},
  1382  			podTerminations: []podTerminationWithExpectations{
  1383  				{
  1384  					index: 0,
  1385  					status: v1.PodStatus{
  1386  						Phase: v1.PodFailed,
  1387  					},
  1388  					wantActive:        2,
  1389  					wantFailed:        1,
  1390  					wantActiveIndexes: sets.New(1, 2),
  1391  					wantFailedIndexes: ptr.To("0"),
  1392  				},
  1393  				{
  1394  					index: 1,
  1395  					status: v1.PodStatus{
  1396  						Phase: v1.PodFailed,
  1397  					},
  1398  					wantActive:        0,
  1399  					wantFailed:        3,
  1400  					wantFailedIndexes: ptr.To("0,1"),
  1401  				},
  1402  			},
  1403  			wantJobConditionType: batchv1.JobFailed,
  1404  			wantJobFinishedIndexesTotalMetric: []metricLabelsWithValue{
  1405  				{
  1406  					Labels: []string{"failed", "perIndex"},
  1407  					Value:  2,
  1408  				},
  1409  			},
  1410  		},
  1411  		"pod failure matching pod failure policy rule with FailIndex action": {
  1412  			job: batchv1.Job{
  1413  				Spec: batchv1.JobSpec{
  1414  					Parallelism:          ptr.To[int32](2),
  1415  					Completions:          ptr.To[int32](2),
  1416  					CompletionMode:       completionModePtr(batchv1.IndexedCompletion),
  1417  					BackoffLimitPerIndex: ptr.To[int32](1),
  1418  					Template:             podTemplateSpec,
  1419  					PodFailurePolicy: &batchv1.PodFailurePolicy{
  1420  						Rules: []batchv1.PodFailurePolicyRule{
  1421  							{
  1422  								Action: batchv1.PodFailurePolicyActionFailIndex,
  1423  								OnExitCodes: &batchv1.PodFailurePolicyOnExitCodesRequirement{
  1424  									Operator: batchv1.PodFailurePolicyOnExitCodesOpIn,
  1425  									Values:   []int32{13},
  1426  								},
  1427  							},
  1428  							{
  1429  								Action: batchv1.PodFailurePolicyActionFailIndex,
  1430  								OnPodConditions: []batchv1.PodFailurePolicyOnPodConditionsPattern{
  1431  									{
  1432  										Type:   v1.DisruptionTarget,
  1433  										Status: v1.ConditionTrue,
  1434  									},
  1435  								},
  1436  							},
  1437  						},
  1438  					},
  1439  				},
  1440  			},
  1441  			podTerminations: []podTerminationWithExpectations{
  1442  				{
  1443  					index: 0,
  1444  					status: v1.PodStatus{
  1445  						Phase: v1.PodFailed,
  1446  						ContainerStatuses: []v1.ContainerStatus{
  1447  							{
  1448  								State: v1.ContainerState{
  1449  									Terminated: &v1.ContainerStateTerminated{
  1450  										ExitCode: 13,
  1451  									},
  1452  								},
  1453  							},
  1454  						},
  1455  					},
  1456  					wantActive:        1,
  1457  					wantFailed:        1,
  1458  					wantActiveIndexes: sets.New(1),
  1459  					wantFailedIndexes: ptr.To("0"),
  1460  				},
  1461  				{
  1462  					index: 1,
  1463  					status: v1.PodStatus{
  1464  						Phase: v1.PodFailed,
  1465  						Conditions: []v1.PodCondition{
  1466  							{
  1467  								Type:   v1.DisruptionTarget,
  1468  								Status: v1.ConditionTrue,
  1469  							},
  1470  						},
  1471  					},
  1472  					wantFailed:        2,
  1473  					wantFailedIndexes: ptr.To("0,1"),
  1474  				},
  1475  			},
  1476  			wantJobConditionType: batchv1.JobFailed,
  1477  			wantJobFinishedIndexesTotalMetric: []metricLabelsWithValue{
  1478  				{
  1479  					Labels: []string{"failed", "perIndex"},
  1480  					Value:  2,
  1481  				},
  1482  			},
  1483  		},
  1484  	}
  1485  	for name, test := range testCases {
  1486  		t.Run(name, func(t *testing.T) {
  1487  			resetMetrics()
  1488  			defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, true)()
  1489  			defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, true)()
  1490  
  1491  			closeFn, restConfig, clientSet, ns := setup(t, "simple")
  1492  			defer closeFn()
  1493  			ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
  1494  			defer func() {
  1495  				cancel()
  1496  			}()
  1497  			jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &test.job)
  1498  			if err != nil {
  1499  				t.Fatalf("Error %q while creating the job %q", err, jobObj.Name)
  1500  			}
  1501  			validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
  1502  				Active:      int(*test.job.Spec.Parallelism),
  1503  				Ready:       ptr.To[int32](0),
  1504  				Terminating: ptr.To[int32](0),
  1505  			})
  1506  			for _, podTermination := range test.podTerminations {
  1507  				pod, err := getActivePodForIndex(ctx, clientSet, jobObj, podTermination.index)
  1508  				if err != nil {
  1509  					t.Fatalf("listing Job Pods: %q", err)
  1510  				}
  1511  				pod.Status = podTermination.status
  1512  				if _, err = clientSet.CoreV1().Pods(pod.Namespace).UpdateStatus(ctx, pod, metav1.UpdateOptions{}); err != nil {
  1513  					t.Fatalf("Error updating the pod %q: %q", klog.KObj(pod), err)
  1514  				}
  1515  				validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
  1516  					Active:      podTermination.wantActive,
  1517  					Succeeded:   podTermination.wantSucceeded,
  1518  					Failed:      podTermination.wantFailed,
  1519  					Ready:       ptr.To[int32](0),
  1520  					Terminating: ptr.To[int32](0),
  1521  				})
  1522  				validateIndexedJobPods(ctx, t, clientSet, jobObj, podTermination.wantActiveIndexes, podTermination.wantCompletedIndexes, podTermination.wantFailedIndexes)
  1523  				if podTermination.wantReplacementPodFailureCount != nil {
  1524  					replacement, err := getActivePodForIndex(ctx, clientSet, jobObj, podTermination.index)
  1525  					if err != nil {
  1526  						t.Fatalf("Failed to get active replacement pod for index: %v, error: %v", podTermination.index, err)
  1527  					}
  1528  					gotReplacementPodFailureCount, err := getIndexFailureCount(replacement)
  1529  					if err != nil {
  1530  						t.Fatalf("Failed read the index failure count annotation for pod: %v, error: %v", klog.KObj(replacement), err)
  1531  					}
  1532  					if *podTermination.wantReplacementPodFailureCount != gotReplacementPodFailureCount {
  1533  						t.Fatalf("Unexpected value of the index failure count annotation. Want: %v, got: %v", *podTermination.wantReplacementPodFailureCount, gotReplacementPodFailureCount)
  1534  					}
  1535  				}
  1536  			}
  1537  
  1538  			remainingActive := test.podTerminations[len(test.podTerminations)-1].wantActive
  1539  			if remainingActive > 0 {
  1540  				if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, remainingActive); err != nil {
  1541  					t.Fatalf("Failed setting phase %q on Job Pod: %q", v1.PodSucceeded, err)
  1542  				}
  1543  			}
  1544  			validateJobCondition(ctx, t, clientSet, jobObj, test.wantJobConditionType)
  1545  			for _, wantMetricValue := range test.wantJobFinishedIndexesTotalMetric {
  1546  				validateCounterMetric(ctx, t, metrics.JobFinishedIndexesTotal, wantMetricValue)
  1547  			}
  1548  			validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
  1549  		})
  1550  	}
  1551  }
  1552  
  1553  // TestManagedBy verifies the Job controller correctly makes a decision to
  1554  // reconcile or skip reconciliation of the Job depending on the Job's managedBy
  1555  // field, and the enablement of the JobManagedBy feature gate.
  1556  func TestManagedBy(t *testing.T) {
  1557  	customControllerName := "example.com/custom-job-controller"
  1558  	podTemplateSpec := v1.PodTemplateSpec{
  1559  		Spec: v1.PodSpec{
  1560  			Containers: []v1.Container{
  1561  				{
  1562  					Name:  "main-container",
  1563  					Image: "foo",
  1564  				},
  1565  			},
  1566  		},
  1567  	}
  1568  	testCases := map[string]struct {
  1569  		enableJobManagedBy                     bool
  1570  		job                                    batchv1.Job
  1571  		wantReconciledByBuiltInController      bool
  1572  		wantJobByExternalControllerTotalMetric metricLabelsWithValue
  1573  	}{
  1574  		"the Job controller reconciles jobs without the managedBy": {
  1575  			enableJobManagedBy: true,
  1576  			job: batchv1.Job{
  1577  				Spec: batchv1.JobSpec{
  1578  					Template: podTemplateSpec,
  1579  				},
  1580  			},
  1581  			wantReconciledByBuiltInController: true,
  1582  			wantJobByExternalControllerTotalMetric: metricLabelsWithValue{
  1583  				// There is no good label value choice to check here, since the
  1584  				// values wasn't specified. Let's go with checking for the reserved
  1585  				// value just so that all test cases verify the metric.
  1586  				Labels: []string{batchv1.JobControllerName},
  1587  				Value:  0,
  1588  			},
  1589  		},
  1590  		"the Job controller reconciles jobs with the well known value of the managedBy field": {
  1591  			enableJobManagedBy: true,
  1592  			job: batchv1.Job{
  1593  				Spec: batchv1.JobSpec{
  1594  					Template:  podTemplateSpec,
  1595  					ManagedBy: ptr.To(batchv1.JobControllerName),
  1596  				},
  1597  			},
  1598  			wantReconciledByBuiltInController: true,
  1599  			wantJobByExternalControllerTotalMetric: metricLabelsWithValue{
  1600  				Labels: []string{batchv1.JobControllerName},
  1601  				Value:  0,
  1602  			},
  1603  		},
  1604  		"the Job controller reconciles an unsuspended with the custom value of managedBy; feature disabled": {
  1605  			enableJobManagedBy: false,
  1606  			job: batchv1.Job{
  1607  				Spec: batchv1.JobSpec{
  1608  					Template:  podTemplateSpec,
  1609  					ManagedBy: ptr.To(customControllerName),
  1610  				},
  1611  			},
  1612  			wantReconciledByBuiltInController: true,
  1613  			wantJobByExternalControllerTotalMetric: metricLabelsWithValue{
  1614  				Labels: []string{customControllerName},
  1615  				Value:  0,
  1616  			},
  1617  		},
  1618  		"the Job controller does not reconcile an unsuspended with the custom value of managedBy": {
  1619  			enableJobManagedBy: true,
  1620  			job: batchv1.Job{
  1621  				Spec: batchv1.JobSpec{
  1622  					Suspend:   ptr.To(false),
  1623  					Template:  podTemplateSpec,
  1624  					ManagedBy: ptr.To(customControllerName),
  1625  				},
  1626  			},
  1627  			wantReconciledByBuiltInController: false,
  1628  			wantJobByExternalControllerTotalMetric: metricLabelsWithValue{
  1629  				Labels: []string{customControllerName},
  1630  				Value:  1,
  1631  			},
  1632  		},
  1633  		"the Job controller does not reconcile a suspended with the custom value of managedBy": {
  1634  			enableJobManagedBy: true,
  1635  			job: batchv1.Job{
  1636  				Spec: batchv1.JobSpec{
  1637  					Suspend:   ptr.To(true),
  1638  					Template:  podTemplateSpec,
  1639  					ManagedBy: ptr.To(customControllerName),
  1640  				},
  1641  			},
  1642  			wantReconciledByBuiltInController: false,
  1643  			wantJobByExternalControllerTotalMetric: metricLabelsWithValue{
  1644  				Labels: []string{customControllerName},
  1645  				Value:  1,
  1646  			},
  1647  		},
  1648  	}
  1649  	for name, test := range testCases {
  1650  		t.Run(name, func(t *testing.T) {
  1651  			resetMetrics()
  1652  			defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobManagedBy, test.enableJobManagedBy)()
  1653  
  1654  			closeFn, restConfig, clientSet, ns := setup(t, "managed-by")
  1655  			defer closeFn()
  1656  			ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
  1657  			defer cancel()
  1658  			jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &test.job)
  1659  			if err != nil {
  1660  				t.Fatalf("Error %v while creating the job %q", err, klog.KObj(jobObj))
  1661  			}
  1662  
  1663  			if test.wantReconciledByBuiltInController {
  1664  				validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
  1665  					Active:      int(*jobObj.Spec.Parallelism),
  1666  					Ready:       ptr.To[int32](0),
  1667  					Terminating: ptr.To[int32](0),
  1668  				})
  1669  				validateCounterMetric(ctx, t, metrics.JobByExternalControllerTotal, test.wantJobByExternalControllerTotalMetric)
  1670  			} else {
  1671  				validateCounterMetric(ctx, t, metrics.JobByExternalControllerTotal, test.wantJobByExternalControllerTotalMetric)
  1672  
  1673  				time.Sleep(sleepDurationForControllerLatency)
  1674  				jobObj, err = clientSet.BatchV1().Jobs(jobObj.Namespace).Get(ctx, jobObj.Name, metav1.GetOptions{})
  1675  				if err != nil {
  1676  					t.Fatalf("Error %v when getting the latest job %v", err, klog.KObj(jobObj))
  1677  				}
  1678  				if diff := cmp.Diff(batchv1.JobStatus{}, jobObj.Status); diff != "" {
  1679  					t.Fatalf("Unexpected status (-want/+got): %s", diff)
  1680  				}
  1681  			}
  1682  		})
  1683  	}
  1684  }
  1685  
  1686  // TestManagedBy_Reenabling verifies handling a Job with a custom value of the
  1687  // managedBy field by the Job controller, as the JobManagedBy feature gate is
  1688  // disabled and reenabled again. First, when the feature gate is enabled, the
  1689  // synchronization is skipped, when it is disabled the synchronization is starts,
  1690  // and is disabled again with re-enabling of the feature gate.
  1691  func TestManagedBy_Reenabling(t *testing.T) {
  1692  	customControllerName := "example.com/custom-job-controller"
  1693  	defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobManagedBy, true)()
  1694  
  1695  	closeFn, restConfig, clientSet, ns := setup(t, "managed-by-reenabling")
  1696  	defer closeFn()
  1697  	ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
  1698  	defer func() {
  1699  		cancel()
  1700  	}()
  1701  	resetMetrics()
  1702  
  1703  	baseJob := batchv1.Job{
  1704  		ObjectMeta: metav1.ObjectMeta{
  1705  			Name:      "custom-job-test",
  1706  			Namespace: ns.Name,
  1707  		},
  1708  		Spec: batchv1.JobSpec{
  1709  			Completions: ptr.To[int32](1),
  1710  			Parallelism: ptr.To[int32](1),
  1711  			Template: v1.PodTemplateSpec{
  1712  				Spec: v1.PodSpec{
  1713  					Containers: []v1.Container{
  1714  						{
  1715  							Name:  "main-container",
  1716  							Image: "foo",
  1717  						},
  1718  					},
  1719  				},
  1720  			},
  1721  			ManagedBy: &customControllerName,
  1722  		},
  1723  	}
  1724  	jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &baseJob)
  1725  	if err != nil {
  1726  		t.Fatalf("Error %v when creating the job %q", err, klog.KObj(jobObj))
  1727  	}
  1728  	jobClient := clientSet.BatchV1().Jobs(jobObj.Namespace)
  1729  
  1730  	validateCounterMetric(ctx, t, metrics.JobByExternalControllerTotal, metricLabelsWithValue{
  1731  		Labels: []string{customControllerName},
  1732  		Value:  1,
  1733  	})
  1734  
  1735  	time.Sleep(sleepDurationForControllerLatency)
  1736  	jobObj, err = jobClient.Get(ctx, jobObj.Name, metav1.GetOptions{})
  1737  	if err != nil {
  1738  		t.Fatalf("Error %v when getting the latest job %v", err, klog.KObj(jobObj))
  1739  	}
  1740  	if diff := cmp.Diff(batchv1.JobStatus{}, jobObj.Status); diff != "" {
  1741  		t.Fatalf("Unexpected status (-want/+got): %s", diff)
  1742  	}
  1743  
  1744  	// Disable the feature gate and restart the controller
  1745  	defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobManagedBy, false)()
  1746  	cancel()
  1747  	resetMetrics()
  1748  	ctx, cancel = startJobControllerAndWaitForCaches(t, restConfig)
  1749  
  1750  	// Verify the built-in controller reconciles the Job
  1751  	validateJobsPodsStatusOnly(ctx, t, clientSet, jobObj, podsByStatus{
  1752  		Active:      1,
  1753  		Ready:       ptr.To[int32](0),
  1754  		Terminating: ptr.To[int32](0),
  1755  	})
  1756  
  1757  	validateCounterMetric(ctx, t, metrics.JobByExternalControllerTotal, metricLabelsWithValue{
  1758  		Labels: []string{customControllerName},
  1759  		Value:  0,
  1760  	})
  1761  
  1762  	// Reenable the feature gate and restart the controller
  1763  	defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobManagedBy, true)()
  1764  	cancel()
  1765  	resetMetrics()
  1766  	ctx, cancel = startJobControllerAndWaitForCaches(t, restConfig)
  1767  
  1768  	// Marking the pod as finished, but it does not result in updating of the Job status.
  1769  	if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
  1770  		t.Fatalf("Error %v when setting phase %s on the pod of job %v", err, v1.PodSucceeded, klog.KObj(jobObj))
  1771  	}
  1772  
  1773  	validateCounterMetric(ctx, t, metrics.JobByExternalControllerTotal, metricLabelsWithValue{
  1774  		Labels: []string{customControllerName},
  1775  		Value:  1,
  1776  	})
  1777  
  1778  	time.Sleep(sleepDurationForControllerLatency)
  1779  	validateJobsPodsStatusOnly(ctx, t, clientSet, jobObj, podsByStatus{
  1780  		Active:      1,
  1781  		Ready:       ptr.To[int32](0),
  1782  		Terminating: ptr.To[int32](0),
  1783  	})
  1784  }
  1785  
  1786  // TestManagedBy_RecreatedJob verifies that the Job controller skips
  1787  // reconciliation of a job with managedBy field, when this is a recreated job,
  1788  // and there is still a pending sync queued for the previous job.
  1789  // In this scenario we first create a job without managedBy field, and we mark
  1790  // its pod as succeeded. This queues the Job object sync with 1s delay. Then,
  1791  // without waiting for the Job status update we delete and recreate the job under
  1792  // the same name, but with managedBy field. The queued update starts to execute
  1793  // on the new job, but is skipped.
  1794  func TestManagedBy_RecreatedJob(t *testing.T) {
  1795  	customControllerName := "example.com/custom-job-controller"
  1796  	defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobManagedBy, true)()
  1797  
  1798  	closeFn, restConfig, clientSet, ns := setup(t, "managed-by-recreate-job")
  1799  	defer closeFn()
  1800  	ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
  1801  	defer cancel()
  1802  	resetMetrics()
  1803  
  1804  	baseJob := batchv1.Job{
  1805  		ObjectMeta: metav1.ObjectMeta{
  1806  			Name:      "custom-job-test",
  1807  			Namespace: ns.Name,
  1808  		},
  1809  		Spec: batchv1.JobSpec{
  1810  			Completions: ptr.To[int32](1),
  1811  			Parallelism: ptr.To[int32](1),
  1812  			Template: v1.PodTemplateSpec{
  1813  				Spec: v1.PodSpec{
  1814  					Containers: []v1.Container{
  1815  						{
  1816  							Name:  "main-container",
  1817  							Image: "foo",
  1818  						},
  1819  					},
  1820  				},
  1821  			},
  1822  		},
  1823  	}
  1824  	jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &baseJob)
  1825  	if err != nil {
  1826  		t.Fatalf("Error %v when creating the job %q", err, klog.KObj(jobObj))
  1827  	}
  1828  	validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
  1829  		Active:      1,
  1830  		Ready:       ptr.To[int32](0),
  1831  		Terminating: ptr.To[int32](0),
  1832  	})
  1833  
  1834  	// Marking the pod as complete queues the job reconciliation
  1835  	if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
  1836  		t.Fatalf("Error %v when setting phase %s on the pod of job %v", err, v1.PodSucceeded, klog.KObj(jobObj))
  1837  	}
  1838  
  1839  	jobClient := clientSet.BatchV1().Jobs(jobObj.Namespace)
  1840  	if err = jobClient.Delete(ctx, jobObj.Name, metav1.DeleteOptions{
  1841  		// Use propagationPolicy=background so that we don't need to wait for the job object to be gone.
  1842  		PropagationPolicy: ptr.To(metav1.DeletePropagationBackground),
  1843  	}); err != nil {
  1844  		t.Fatalf("Error %v when deleting the job %v", err, klog.KObj(jobObj))
  1845  	}
  1846  
  1847  	jobWithManagedBy := baseJob.DeepCopy()
  1848  	jobWithManagedBy.Spec.ManagedBy = ptr.To(customControllerName)
  1849  	jobObj, err = createJobWithDefaults(ctx, clientSet, ns.Name, jobWithManagedBy)
  1850  	if err != nil {
  1851  		t.Fatalf("Error %q while creating the job %q", err, klog.KObj(jobObj))
  1852  	}
  1853  
  1854  	validateCounterMetric(ctx, t, metrics.JobByExternalControllerTotal, metricLabelsWithValue{
  1855  		Labels: []string{customControllerName},
  1856  		Value:  1,
  1857  	})
  1858  
  1859  	time.Sleep(sleepDurationForControllerLatency)
  1860  	jobObj, err = jobClient.Get(ctx, jobObj.Name, metav1.GetOptions{})
  1861  	if err != nil {
  1862  		t.Fatalf("Error %v when getting the latest job %v", err, klog.KObj(jobObj))
  1863  	}
  1864  	if diff := cmp.Diff(batchv1.JobStatus{}, jobObj.Status); diff != "" {
  1865  		t.Fatalf("Unexpected status (-want/+got): %s", diff)
  1866  	}
  1867  }
  1868  
  1869  // TestManagedBy_UsingReservedJobFinalizers documents the behavior of the Job
  1870  // controller when there is a job with custom value of the managedBy field, creating
  1871  // pods with the batch.kubernetes.io/job-tracking finalizer. The built-in controller
  1872  // should not remove the finalizer. Note that, the use of the finalizer in jobs
  1873  // managed by external controllers is discouraged, but may potentially happen
  1874  // when one forks the controller and does not rename the finalizer.
  1875  func TestManagedBy_UsingReservedJobFinalizers(t *testing.T) {
  1876  	customControllerName := "example.com/custom-job-controller"
  1877  	defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobManagedBy, true)()
  1878  
  1879  	closeFn, restConfig, clientSet, ns := setup(t, "managed-by-reserved-finalizers")
  1880  	defer closeFn()
  1881  	ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
  1882  	defer cancel()
  1883  	resetMetrics()
  1884  
  1885  	jobSpec := batchv1.Job{
  1886  		TypeMeta: metav1.TypeMeta{
  1887  			APIVersion: "batch/v1",
  1888  			Kind:       "Job",
  1889  		},
  1890  		ObjectMeta: metav1.ObjectMeta{
  1891  			Name:      "custom-job-test",
  1892  			Namespace: ns.Name,
  1893  		},
  1894  		Spec: batchv1.JobSpec{
  1895  			Completions: ptr.To[int32](1),
  1896  			Parallelism: ptr.To[int32](1),
  1897  			Template: v1.PodTemplateSpec{
  1898  				Spec: v1.PodSpec{
  1899  					Containers: []v1.Container{
  1900  						{
  1901  							Name:  "main-container",
  1902  							Image: "foo",
  1903  						},
  1904  					},
  1905  				},
  1906  			},
  1907  			ManagedBy: ptr.To(customControllerName),
  1908  		},
  1909  	}
  1910  	// Create a job with custom managedBy
  1911  	jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &jobSpec)
  1912  	if err != nil {
  1913  		t.Fatalf("Error %v when creating the job %q", err, klog.KObj(jobObj))
  1914  	}
  1915  
  1916  	podControl := controller.RealPodControl{
  1917  		KubeClient: clientSet,
  1918  		Recorder:   &record.FakeRecorder{},
  1919  	}
  1920  
  1921  	// Create the pod manually simulating the behavior of the external controller
  1922  	// indicated by the managedBy field. We create the pod with the built-in
  1923  	// finalizer.
  1924  	podTemplate := jobObj.Spec.Template.DeepCopy()
  1925  	podTemplate.Finalizers = append(podTemplate.Finalizers, batchv1.JobTrackingFinalizer)
  1926  	err = podControl.CreatePodsWithGenerateName(ctx, jobObj.Namespace, podTemplate, jobObj, metav1.NewControllerRef(jobObj, batchv1.SchemeGroupVersion.WithKind("Job")), "pod1")
  1927  	if err != nil {
  1928  		t.Fatalf("Error %v when creating a pod for job %q", err, klog.KObj(jobObj))
  1929  	}
  1930  
  1931  	// Getting the list of pods for the Jobs to obtain the reference to the created pod.
  1932  	jobPods, err := getJobPods(ctx, t, clientSet, jobObj, func(ps v1.PodStatus) bool { return true })
  1933  	if err != nil {
  1934  		t.Fatalf("Error %v getting the list of pods for job %q", err, klog.KObj(jobObj))
  1935  	}
  1936  	if len(jobPods) != 1 {
  1937  		t.Fatalf("Unexpected number (%d) of pods for job: %v", len(jobPods), klog.KObj(jobObj))
  1938  	}
  1939  
  1940  	// Marking the pod as finished (succeeded), before marking the parent job as complete.
  1941  	podObj := jobPods[0]
  1942  	podObj.Status.Phase = v1.PodSucceeded
  1943  	podObj, err = clientSet.CoreV1().Pods(ns.Name).UpdateStatus(ctx, podObj, metav1.UpdateOptions{})
  1944  	if err != nil {
  1945  		t.Fatalf("Error %v when marking the %q pod as succeeded", err, klog.KObj(podObj))
  1946  	}
  1947  
  1948  	// Mark the job as finished so that the built-in controller receives the
  1949  	// UpdateJob event in reaction to each it would remove the pod's finalizer,
  1950  	// if not for the custom managedBy field.
  1951  	jobObj.Status.Conditions = append(jobObj.Status.Conditions, batchv1.JobCondition{
  1952  		Type:   batchv1.JobComplete,
  1953  		Status: v1.ConditionTrue,
  1954  	})
  1955  	jobObj.Status.StartTime = ptr.To(metav1.Now())
  1956  	jobObj.Status.CompletionTime = ptr.To(metav1.Now())
  1957  
  1958  	if jobObj, err = clientSet.BatchV1().Jobs(jobObj.Namespace).UpdateStatus(ctx, jobObj, metav1.UpdateOptions{}); err != nil {
  1959  		t.Fatalf("Error %v when updating the job as finished %v", err, klog.KObj(jobObj))
  1960  	}
  1961  
  1962  	podObj, err = clientSet.CoreV1().Pods(ns.Name).Get(ctx, podObj.Name, metav1.GetOptions{})
  1963  	if err != nil {
  1964  		t.Fatalf("Error %v when getting the latest version of the pod %v", err, klog.KObj(podObj))
  1965  	}
  1966  
  1967  	// Update the pod so that the built-in controller receives the UpdatePod event
  1968  	// in reaction to each it would remove the pod's finalizer, if not for the
  1969  	// custom value of the managedBy field on the job.
  1970  	podObj.Status.Conditions = append(podObj.Status.Conditions, v1.PodCondition{
  1971  		Type:   v1.PodConditionType("CustomCondition"),
  1972  		Status: v1.ConditionTrue,
  1973  	})
  1974  	podObj, err = clientSet.CoreV1().Pods(ns.Name).UpdateStatus(ctx, podObj, metav1.UpdateOptions{})
  1975  	if err != nil {
  1976  		t.Fatalf("Error %v when adding a condition to the pod status %v", err, klog.KObj(podObj))
  1977  	}
  1978  
  1979  	time.Sleep(sleepDurationForControllerLatency)
  1980  	podObj, err = clientSet.CoreV1().Pods(ns.Name).Get(ctx, podObj.Name, metav1.GetOptions{})
  1981  	if err != nil {
  1982  		t.Fatalf("Error %v when getting the latest version of the pod %v", err, klog.KObj(podObj))
  1983  	}
  1984  
  1985  	if diff := cmp.Diff([]string{batchv1.JobTrackingFinalizer}, podObj.Finalizers); diff != "" {
  1986  		t.Fatalf("Unexpected change in the set of finalizers for pod %q, because the owner job %q has custom managedBy, diff=%s", klog.KObj(podObj), klog.KObj(jobObj), diff)
  1987  	}
  1988  }
  1989  
  1990  func getIndexFailureCount(p *v1.Pod) (int, error) {
  1991  	if p.Annotations == nil {
  1992  		return 0, errors.New("no annotations found")
  1993  	}
  1994  	v, ok := p.Annotations[batchv1.JobIndexFailureCountAnnotation]
  1995  	if !ok {
  1996  		return 0, fmt.Errorf("annotation %s not found", batchv1.JobIndexFailureCountAnnotation)
  1997  	}
  1998  	return strconv.Atoi(v)
  1999  }
  2000  
  2001  func completionModePtr(cm batchv1.CompletionMode) *batchv1.CompletionMode {
  2002  	return &cm
  2003  }
  2004  
  2005  // TestNonParallelJob tests that a Job that only executes one Pod. The test
  2006  // recreates the Job controller at some points to make sure a new controller
  2007  // is able to pickup.
  2008  func TestNonParallelJob(t *testing.T) {
  2009  	t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, fastPodFailureBackoff))
  2010  	closeFn, restConfig, clientSet, ns := setup(t, "simple")
  2011  	defer closeFn()
  2012  	ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
  2013  	defer func() {
  2014  		cancel()
  2015  	}()
  2016  	resetMetrics()
  2017  
  2018  	jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{})
  2019  	if err != nil {
  2020  		t.Fatalf("Failed to create Job: %v", err)
  2021  	}
  2022  	validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
  2023  		Active:      1,
  2024  		Ready:       ptr.To[int32](0),
  2025  		Terminating: ptr.To[int32](0),
  2026  	})
  2027  
  2028  	// Restarting controller.
  2029  	cancel()
  2030  	ctx, cancel = startJobControllerAndWaitForCaches(t, restConfig)
  2031  
  2032  	// Failed Pod is replaced.
  2033  	if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
  2034  		t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodFailed, err)
  2035  	}
  2036  	validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
  2037  		Active:      1,
  2038  		Failed:      1,
  2039  		Ready:       ptr.To[int32](0),
  2040  		Terminating: ptr.To[int32](0),
  2041  	})
  2042  	validateCounterMetric(ctx, t, metrics.JobPodsFinished, metricLabelsWithValue{
  2043  		Labels: []string{"NonIndexed", "failed"},
  2044  		Value:  1,
  2045  	})
  2046  
  2047  	// Restarting controller.
  2048  	cancel()
  2049  	ctx, cancel = startJobControllerAndWaitForCaches(t, restConfig)
  2050  
  2051  	// No more Pods are created after the Pod succeeds.
  2052  	if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
  2053  		t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err)
  2054  	}
  2055  	validateJobComplete(ctx, t, clientSet, jobObj)
  2056  	validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
  2057  		Failed:      1,
  2058  		Succeeded:   1,
  2059  		Ready:       ptr.To[int32](0),
  2060  		Terminating: ptr.To[int32](0),
  2061  	})
  2062  	validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
  2063  	validateCounterMetric(ctx, t, metrics.JobFinishedNum, metricLabelsWithValue{
  2064  		Labels: []string{"NonIndexed", "succeeded", ""},
  2065  		Value:  1,
  2066  	})
  2067  	validateCounterMetric(ctx, t, metrics.JobPodsFinished, metricLabelsWithValue{
  2068  		Labels: []string{"NonIndexed", "succeeded"},
  2069  		Value:  1,
  2070  	})
  2071  }
  2072  
  2073  func TestParallelJob(t *testing.T) {
  2074  	t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, fastPodFailureBackoff))
  2075  	closeFn, restConfig, clientSet, ns := setup(t, "parallel")
  2076  	defer closeFn()
  2077  	ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
  2078  	defer cancel()
  2079  	resetMetrics()
  2080  
  2081  	jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
  2082  		Spec: batchv1.JobSpec{
  2083  			Parallelism: ptr.To[int32](5),
  2084  		},
  2085  	})
  2086  	if err != nil {
  2087  		t.Fatalf("Failed to create Job: %v", err)
  2088  	}
  2089  	want := podsByStatus{
  2090  		Active:      5,
  2091  		Ready:       ptr.To[int32](0),
  2092  		Terminating: ptr.To[int32](0),
  2093  	}
  2094  	validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
  2095  
  2096  	// Tracks ready pods, if enabled.
  2097  	if err, _ := setJobPodsReady(ctx, clientSet, jobObj, 2); err != nil {
  2098  		t.Fatalf("Failed Marking Pods as ready: %v", err)
  2099  	}
  2100  	want.Ready = ptr.To[int32](2)
  2101  	validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
  2102  
  2103  	// Failed Pods are replaced.
  2104  	if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
  2105  		t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err)
  2106  	}
  2107  	want = podsByStatus{
  2108  		Active:      5,
  2109  		Failed:      2,
  2110  		Ready:       ptr.To[int32](0),
  2111  		Terminating: ptr.To[int32](0),
  2112  	}
  2113  	validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
  2114  	// Once one Pod succeeds, no more Pods are created, even if some fail.
  2115  	if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
  2116  		t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err)
  2117  	}
  2118  	want = podsByStatus{
  2119  		Failed:      2,
  2120  		Succeeded:   1,
  2121  		Active:      4,
  2122  		Ready:       ptr.To[int32](0),
  2123  		Terminating: ptr.To[int32](0),
  2124  	}
  2125  	validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
  2126  	if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
  2127  		t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err)
  2128  	}
  2129  	want = podsByStatus{
  2130  		Failed:      4,
  2131  		Succeeded:   1,
  2132  		Active:      2,
  2133  		Ready:       ptr.To[int32](0),
  2134  		Terminating: ptr.To[int32](0),
  2135  	}
  2136  	validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
  2137  	// No more Pods are created after remaining Pods succeed.
  2138  	if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 2); err != nil {
  2139  		t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodSucceeded, err)
  2140  	}
  2141  	validateJobComplete(ctx, t, clientSet, jobObj)
  2142  	want = podsByStatus{
  2143  		Failed:      4,
  2144  		Succeeded:   3,
  2145  		Ready:       ptr.To[int32](0),
  2146  		Terminating: ptr.To[int32](0),
  2147  	}
  2148  	validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
  2149  	validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
  2150  	validateTerminatedPodsTrackingFinalizerMetric(ctx, t, 7)
  2151  	validateCounterMetric(ctx, t, metrics.JobFinishedNum, metricLabelsWithValue{
  2152  		Labels: []string{"NonIndexed", "succeeded", ""},
  2153  		Value:  1,
  2154  	})
  2155  	validateCounterMetric(ctx, t, metrics.JobPodsFinished, metricLabelsWithValue{
  2156  		Labels: []string{"NonIndexed", "succeeded"},
  2157  		Value:  3,
  2158  	})
  2159  	validateCounterMetric(ctx, t, metrics.JobPodsFinished, metricLabelsWithValue{
  2160  		Labels: []string{"NonIndexed", "failed"},
  2161  		Value:  4,
  2162  	})
  2163  }
  2164  
  2165  func TestParallelJobChangingParallelism(t *testing.T) {
  2166  	closeFn, restConfig, clientSet, ns := setup(t, "parallel")
  2167  	defer closeFn()
  2168  	ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
  2169  	defer cancel()
  2170  
  2171  	jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
  2172  		Spec: batchv1.JobSpec{
  2173  			BackoffLimit: ptr.To[int32](2),
  2174  			Parallelism:  ptr.To[int32](5),
  2175  		},
  2176  	})
  2177  	if err != nil {
  2178  		t.Fatalf("Failed to create Job: %v", err)
  2179  	}
  2180  	validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
  2181  		Active:      5,
  2182  		Ready:       ptr.To[int32](0),
  2183  		Terminating: ptr.To[int32](0),
  2184  	})
  2185  
  2186  	// Reduce parallelism by a number greater than backoffLimit.
  2187  	patch := []byte(`{"spec":{"parallelism":2}}`)
  2188  	jobObj, err = clientSet.BatchV1().Jobs(ns.Name).Patch(ctx, jobObj.Name, types.StrategicMergePatchType, patch, metav1.PatchOptions{})
  2189  	if err != nil {
  2190  		t.Fatalf("Updating Job: %v", err)
  2191  	}
  2192  	validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
  2193  		Active:      2,
  2194  		Ready:       ptr.To[int32](0),
  2195  		Terminating: ptr.To[int32](0),
  2196  	})
  2197  
  2198  	// Increase parallelism again.
  2199  	patch = []byte(`{"spec":{"parallelism":4}}`)
  2200  	jobObj, err = clientSet.BatchV1().Jobs(ns.Name).Patch(ctx, jobObj.Name, types.StrategicMergePatchType, patch, metav1.PatchOptions{})
  2201  	if err != nil {
  2202  		t.Fatalf("Updating Job: %v", err)
  2203  	}
  2204  	validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
  2205  		Active:      4,
  2206  		Ready:       ptr.To[int32](0),
  2207  		Terminating: ptr.To[int32](0),
  2208  	})
  2209  
  2210  	// Succeed Job
  2211  	if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 4); err != nil {
  2212  		t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err)
  2213  	}
  2214  	validateJobComplete(ctx, t, clientSet, jobObj)
  2215  	validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
  2216  		Succeeded:   4,
  2217  		Ready:       ptr.To[int32](0),
  2218  		Terminating: ptr.To[int32](0),
  2219  	})
  2220  	validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
  2221  }
  2222  
  2223  func TestParallelJobWithCompletions(t *testing.T) {
  2224  	// Lower limits for a job sync so that we can test partial updates with a low
  2225  	// number of pods.
  2226  	t.Cleanup(setDuringTest(&jobcontroller.MaxUncountedPods, 10))
  2227  	t.Cleanup(setDuringTest(&jobcontroller.MaxPodCreateDeletePerSync, 10))
  2228  	t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, fastPodFailureBackoff))
  2229  	closeFn, restConfig, clientSet, ns := setup(t, "completions")
  2230  	defer closeFn()
  2231  	ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
  2232  	defer cancel()
  2233  	resetMetrics()
  2234  
  2235  	jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
  2236  		Spec: batchv1.JobSpec{
  2237  			Parallelism: ptr.To[int32](54),
  2238  			Completions: ptr.To[int32](56),
  2239  		},
  2240  	})
  2241  	if err != nil {
  2242  		t.Fatalf("Failed to create Job: %v", err)
  2243  	}
  2244  	want := podsByStatus{
  2245  		Active:      54,
  2246  		Ready:       ptr.To[int32](0),
  2247  		Terminating: ptr.To[int32](0),
  2248  	}
  2249  	validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
  2250  	// Tracks ready pods, if enabled.
  2251  	if err, _ := setJobPodsReady(ctx, clientSet, jobObj, 52); err != nil {
  2252  		t.Fatalf("Failed Marking Pods as ready: %v", err)
  2253  	}
  2254  	want.Ready = ptr.To[int32](52)
  2255  	validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
  2256  
  2257  	// Failed Pods are replaced.
  2258  	if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
  2259  		t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err)
  2260  	}
  2261  	want = podsByStatus{
  2262  		Active:      54,
  2263  		Failed:      2,
  2264  		Ready:       ptr.To[int32](50),
  2265  		Terminating: ptr.To[int32](0),
  2266  	}
  2267  	validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
  2268  	// Pods are created until the number of succeeded Pods equals completions.
  2269  	if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 53); err != nil {
  2270  		t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err)
  2271  	}
  2272  	want = podsByStatus{
  2273  		Failed:      2,
  2274  		Succeeded:   53,
  2275  		Active:      3,
  2276  		Ready:       ptr.To[int32](0),
  2277  		Terminating: ptr.To[int32](0),
  2278  	}
  2279  	validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
  2280  	// No more Pods are created after the Job completes.
  2281  	if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 3); err != nil {
  2282  		t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodSucceeded, err)
  2283  	}
  2284  	validateJobComplete(ctx, t, clientSet, jobObj)
  2285  	want = podsByStatus{
  2286  		Failed:      2,
  2287  		Succeeded:   56,
  2288  		Ready:       ptr.To[int32](0),
  2289  		Terminating: ptr.To[int32](0),
  2290  	}
  2291  	validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
  2292  	validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
  2293  	validateCounterMetric(ctx, t, metrics.JobFinishedNum, metricLabelsWithValue{
  2294  		Labels: []string{"NonIndexed", "succeeded", ""},
  2295  		Value:  1,
  2296  	})
  2297  	validateCounterMetric(ctx, t, metrics.JobPodsFinished, metricLabelsWithValue{
  2298  		Labels: []string{"NonIndexed", "succeeded"},
  2299  		Value:  56,
  2300  	})
  2301  	validateCounterMetric(ctx, t, metrics.JobPodsFinished, metricLabelsWithValue{
  2302  		Labels: []string{"NonIndexed", "failed"},
  2303  		Value:  2,
  2304  	})
  2305  }
  2306  
  2307  func TestIndexedJob(t *testing.T) {
  2308  	t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, fastPodFailureBackoff))
  2309  	closeFn, restConfig, clientSet, ns := setup(t, "indexed")
  2310  	defer closeFn()
  2311  	ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
  2312  	defer cancel()
  2313  	resetMetrics()
  2314  
  2315  	mode := batchv1.IndexedCompletion
  2316  	jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
  2317  		Spec: batchv1.JobSpec{
  2318  			Parallelism:    ptr.To[int32](3),
  2319  			Completions:    ptr.To[int32](4),
  2320  			CompletionMode: &mode,
  2321  		},
  2322  	})
  2323  	if err != nil {
  2324  		t.Fatalf("Failed to create Job: %v", err)
  2325  	}
  2326  	validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
  2327  		Active:      3,
  2328  		Ready:       ptr.To[int32](0),
  2329  		Terminating: ptr.To[int32](0),
  2330  	})
  2331  	validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(0, 1, 2), "", nil)
  2332  	validateCounterMetric(ctx, t, metrics.JobFinishedIndexesTotal, metricLabelsWithValue{
  2333  		Labels: []string{"succeeded", "global"},
  2334  		Value:  0,
  2335  	})
  2336  
  2337  	// One Pod succeeds.
  2338  	if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
  2339  		t.Fatal("Failed trying to succeed pod with index 1")
  2340  	}
  2341  	validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
  2342  		Active:      3,
  2343  		Succeeded:   1,
  2344  		Ready:       ptr.To[int32](0),
  2345  		Terminating: ptr.To[int32](0),
  2346  	})
  2347  	validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(0, 2, 3), "1", nil)
  2348  	validateCounterMetric(ctx, t, metrics.JobFinishedIndexesTotal, metricLabelsWithValue{
  2349  		Labels: []string{"succeeded", "global"},
  2350  		Value:  1,
  2351  	})
  2352  
  2353  	// One Pod fails, which should be recreated.
  2354  	if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
  2355  		t.Fatal("Failed trying to succeed pod with index 2")
  2356  	}
  2357  	validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
  2358  		Active:      3,
  2359  		Failed:      1,
  2360  		Succeeded:   1,
  2361  		Ready:       ptr.To[int32](0),
  2362  		Terminating: ptr.To[int32](0),
  2363  	})
  2364  	validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(0, 2, 3), "1", nil)
  2365  	validateCounterMetric(ctx, t, metrics.JobFinishedIndexesTotal, metricLabelsWithValue{
  2366  		Labels: []string{"succeeded", "global"},
  2367  		Value:  1,
  2368  	})
  2369  
  2370  	// Remaining Pods succeed.
  2371  	if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 3); err != nil {
  2372  		t.Fatal("Failed trying to succeed remaining pods")
  2373  	}
  2374  	validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
  2375  		Active:      0,
  2376  		Failed:      1,
  2377  		Succeeded:   4,
  2378  		Ready:       ptr.To[int32](0),
  2379  		Terminating: ptr.To[int32](0),
  2380  	})
  2381  	validateIndexedJobPods(ctx, t, clientSet, jobObj, nil, "0-3", nil)
  2382  	validateJobComplete(ctx, t, clientSet, jobObj)
  2383  	validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
  2384  	validateTerminatedPodsTrackingFinalizerMetric(ctx, t, 5)
  2385  	validateCounterMetric(ctx, t, metrics.JobFinishedIndexesTotal, metricLabelsWithValue{
  2386  		Labels: []string{"succeeded", "global"},
  2387  		Value:  4,
  2388  	})
  2389  	validateCounterMetric(ctx, t, metrics.JobFinishedNum, metricLabelsWithValue{
  2390  		Labels: []string{"Indexed", "succeeded", ""},
  2391  		Value:  1,
  2392  	})
  2393  	validateCounterMetric(ctx, t, metrics.JobPodsFinished, metricLabelsWithValue{
  2394  		Labels: []string{"Indexed", "succeeded"},
  2395  		Value:  4,
  2396  	})
  2397  	validateCounterMetric(ctx, t, metrics.JobPodsFinished, metricLabelsWithValue{
  2398  		Labels: []string{"Indexed", "failed"},
  2399  		Value:  1,
  2400  	})
  2401  }
  2402  
  2403  func TestJobPodReplacementPolicy(t *testing.T) {
  2404  	t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, fastPodFailureBackoff))
  2405  	indexedCompletion := batchv1.IndexedCompletion
  2406  	nonIndexedCompletion := batchv1.NonIndexedCompletion
  2407  	var podReplacementPolicy = func(obj batchv1.PodReplacementPolicy) *batchv1.PodReplacementPolicy {
  2408  		return &obj
  2409  	}
  2410  	type jobStatus struct {
  2411  		active      int
  2412  		failed      int
  2413  		terminating *int32
  2414  	}
  2415  	type jobPodsCreationMetrics struct {
  2416  		new                         int
  2417  		recreateTerminatingOrFailed int
  2418  		recreateFailed              int
  2419  	}
  2420  	cases := map[string]struct {
  2421  		podReplacementPolicyEnabled bool
  2422  		jobSpec                     *batchv1.JobSpec
  2423  		wantStatusAfterDeletion     jobStatus
  2424  		wantStatusAfterFailure      jobStatus
  2425  		wantMetrics                 jobPodsCreationMetrics
  2426  	}{
  2427  		"feature flag off, delete & fail pods, recreate terminating pods, and verify job status counters": {
  2428  			jobSpec: &batchv1.JobSpec{
  2429  				Parallelism:    ptr.To[int32](2),
  2430  				Completions:    ptr.To[int32](2),
  2431  				CompletionMode: &indexedCompletion,
  2432  				Template: v1.PodTemplateSpec{
  2433  					ObjectMeta: metav1.ObjectMeta{
  2434  						Finalizers: []string{"fake.example.com/blockDeletion"},
  2435  					},
  2436  				},
  2437  			},
  2438  			wantStatusAfterDeletion: jobStatus{
  2439  				active: 2,
  2440  				failed: 2,
  2441  			},
  2442  			wantStatusAfterFailure: jobStatus{
  2443  				active: 2,
  2444  				failed: 2,
  2445  			},
  2446  			wantMetrics: jobPodsCreationMetrics{
  2447  				new: 4,
  2448  			},
  2449  		},
  2450  		"feature flag true with IndexedJob, TerminatingOrFailed policy, delete & fail pods, recreate terminating pods, and verify job status counters": {
  2451  			podReplacementPolicyEnabled: true,
  2452  			jobSpec: &batchv1.JobSpec{
  2453  				Parallelism:          ptr.To[int32](2),
  2454  				Completions:          ptr.To[int32](2),
  2455  				CompletionMode:       &indexedCompletion,
  2456  				PodReplacementPolicy: podReplacementPolicy(batchv1.TerminatingOrFailed),
  2457  				Template: v1.PodTemplateSpec{
  2458  					ObjectMeta: metav1.ObjectMeta{
  2459  						Finalizers: []string{"fake.example.com/blockDeletion"},
  2460  					},
  2461  				},
  2462  			},
  2463  			wantStatusAfterDeletion: jobStatus{
  2464  				active:      2,
  2465  				failed:      2,
  2466  				terminating: ptr.To[int32](2),
  2467  			},
  2468  			wantStatusAfterFailure: jobStatus{
  2469  				active:      2,
  2470  				failed:      2,
  2471  				terminating: ptr.To[int32](0),
  2472  			},
  2473  			wantMetrics: jobPodsCreationMetrics{
  2474  				new:                         2,
  2475  				recreateTerminatingOrFailed: 2,
  2476  			},
  2477  		},
  2478  		"feature flag true with NonIndexedJob, TerminatingOrFailed policy, delete & fail pods, recreate terminating pods, and verify job status counters": {
  2479  			podReplacementPolicyEnabled: true,
  2480  			jobSpec: &batchv1.JobSpec{
  2481  				Parallelism:          ptr.To[int32](2),
  2482  				Completions:          ptr.To[int32](2),
  2483  				CompletionMode:       &nonIndexedCompletion,
  2484  				PodReplacementPolicy: podReplacementPolicy(batchv1.TerminatingOrFailed),
  2485  				Template: v1.PodTemplateSpec{
  2486  					ObjectMeta: metav1.ObjectMeta{
  2487  						Finalizers: []string{"fake.example.com/blockDeletion"},
  2488  					},
  2489  				},
  2490  			},
  2491  			wantStatusAfterDeletion: jobStatus{
  2492  				active:      2,
  2493  				failed:      2,
  2494  				terminating: ptr.To[int32](2),
  2495  			},
  2496  			wantStatusAfterFailure: jobStatus{
  2497  				active:      2,
  2498  				failed:      2,
  2499  				terminating: ptr.To[int32](0),
  2500  			},
  2501  			wantMetrics: jobPodsCreationMetrics{
  2502  				new:                         2,
  2503  				recreateTerminatingOrFailed: 2,
  2504  			},
  2505  		},
  2506  		"feature flag false, podFailurePolicy enabled, delete & fail pods, recreate failed pods, and verify job status counters": {
  2507  			podReplacementPolicyEnabled: false,
  2508  			jobSpec: &batchv1.JobSpec{
  2509  				Parallelism:          ptr.To[int32](2),
  2510  				Completions:          ptr.To[int32](2),
  2511  				CompletionMode:       &nonIndexedCompletion,
  2512  				PodReplacementPolicy: podReplacementPolicy(batchv1.Failed),
  2513  				Template: v1.PodTemplateSpec{
  2514  					ObjectMeta: metav1.ObjectMeta{
  2515  						Finalizers: []string{"fake.example.com/blockDeletion"},
  2516  					},
  2517  				},
  2518  				PodFailurePolicy: &batchv1.PodFailurePolicy{
  2519  					Rules: []batchv1.PodFailurePolicyRule{
  2520  						{
  2521  							Action: batchv1.PodFailurePolicyActionFailJob,
  2522  							OnExitCodes: &batchv1.PodFailurePolicyOnExitCodesRequirement{
  2523  								Operator: batchv1.PodFailurePolicyOnExitCodesOpIn,
  2524  								Values:   []int32{5},
  2525  							},
  2526  						},
  2527  					},
  2528  				},
  2529  			},
  2530  			wantStatusAfterDeletion: jobStatus{
  2531  				active: 2,
  2532  			},
  2533  			wantStatusAfterFailure: jobStatus{
  2534  				active: 2,
  2535  			},
  2536  			wantMetrics: jobPodsCreationMetrics{
  2537  				new: 2,
  2538  			},
  2539  		},
  2540  		"feature flag true, Failed policy, delete & fail pods, recreate failed pods, and verify job status counters": {
  2541  			podReplacementPolicyEnabled: true,
  2542  			jobSpec: &batchv1.JobSpec{
  2543  				Parallelism:          ptr.To[int32](2),
  2544  				Completions:          ptr.To[int32](2),
  2545  				CompletionMode:       &indexedCompletion,
  2546  				PodReplacementPolicy: podReplacementPolicy(batchv1.Failed),
  2547  				Template: v1.PodTemplateSpec{
  2548  					ObjectMeta: metav1.ObjectMeta{
  2549  						Finalizers: []string{"fake.example.com/blockDeletion"},
  2550  					},
  2551  				},
  2552  			},
  2553  			wantStatusAfterDeletion: jobStatus{
  2554  				active:      0,
  2555  				failed:      0,
  2556  				terminating: ptr.To[int32](2),
  2557  			},
  2558  			wantStatusAfterFailure: jobStatus{
  2559  				active:      2,
  2560  				failed:      2,
  2561  				terminating: ptr.To[int32](0),
  2562  			},
  2563  			wantMetrics: jobPodsCreationMetrics{
  2564  				new:            2,
  2565  				recreateFailed: 2,
  2566  			},
  2567  		},
  2568  		"feature flag true with NonIndexedJob, Failed policy, delete & fail pods, recreate failed pods, and verify job status counters": {
  2569  			podReplacementPolicyEnabled: true,
  2570  			jobSpec: &batchv1.JobSpec{
  2571  				Parallelism:          ptr.To[int32](2),
  2572  				Completions:          ptr.To[int32](2),
  2573  				CompletionMode:       &nonIndexedCompletion,
  2574  				PodReplacementPolicy: podReplacementPolicy(batchv1.Failed),
  2575  				Template: v1.PodTemplateSpec{
  2576  					ObjectMeta: metav1.ObjectMeta{
  2577  						Finalizers: []string{"fake.example.com/blockDeletion"},
  2578  					},
  2579  				},
  2580  			},
  2581  			wantStatusAfterDeletion: jobStatus{
  2582  				active:      0,
  2583  				failed:      0,
  2584  				terminating: ptr.To[int32](2),
  2585  			},
  2586  			wantStatusAfterFailure: jobStatus{
  2587  				active:      2,
  2588  				failed:      2,
  2589  				terminating: ptr.To[int32](0),
  2590  			},
  2591  			wantMetrics: jobPodsCreationMetrics{
  2592  				new:            2,
  2593  				recreateFailed: 2,
  2594  			},
  2595  		},
  2596  	}
  2597  	for name, tc := range cases {
  2598  		tc := tc
  2599  		t.Run(name, func(t *testing.T) {
  2600  			defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodReplacementPolicy, tc.podReplacementPolicyEnabled)()
  2601  			defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, tc.jobSpec.PodFailurePolicy != nil)()
  2602  
  2603  			closeFn, restConfig, clientSet, ns := setup(t, "pod-replacement-policy")
  2604  			t.Cleanup(closeFn)
  2605  			ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
  2606  			t.Cleanup(cancel)
  2607  			resetMetrics()
  2608  
  2609  			jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
  2610  				Spec: *tc.jobSpec,
  2611  			})
  2612  			if err != nil {
  2613  				t.Fatalf("Failed to create Job: %v", err)
  2614  			}
  2615  			jobClient := clientSet.BatchV1().Jobs(jobObj.Namespace)
  2616  
  2617  			waitForPodsToBeActive(ctx, t, jobClient, 2, jobObj)
  2618  			t.Cleanup(func() { removePodsFinalizer(ctx, t, clientSet, ns.Name) })
  2619  
  2620  			deletePods(ctx, t, clientSet, ns.Name)
  2621  
  2622  			validateJobsPodsStatusOnly(ctx, t, clientSet, jobObj, podsByStatus{
  2623  				Terminating: tc.wantStatusAfterDeletion.terminating,
  2624  				Failed:      tc.wantStatusAfterDeletion.failed,
  2625  				Active:      tc.wantStatusAfterDeletion.active,
  2626  				Ready:       ptr.To[int32](0),
  2627  			})
  2628  
  2629  			failTerminatingPods(ctx, t, clientSet, ns.Name)
  2630  			validateJobsPodsStatusOnly(ctx, t, clientSet, jobObj, podsByStatus{
  2631  				Terminating: tc.wantStatusAfterFailure.terminating,
  2632  				Failed:      tc.wantStatusAfterFailure.failed,
  2633  				Active:      tc.wantStatusAfterFailure.active,
  2634  				Ready:       ptr.To[int32](0),
  2635  			})
  2636  
  2637  			validateCounterMetric(
  2638  				ctx,
  2639  				t,
  2640  				metrics.JobPodsCreationTotal,
  2641  				metricLabelsWithValue{Labels: []string{"new", "succeeded"}, Value: tc.wantMetrics.new},
  2642  			)
  2643  			validateCounterMetric(
  2644  				ctx,
  2645  				t,
  2646  				metrics.JobPodsCreationTotal,
  2647  				metricLabelsWithValue{Labels: []string{"recreate_terminating_or_failed", "succeeded"}, Value: tc.wantMetrics.recreateTerminatingOrFailed},
  2648  			)
  2649  			validateCounterMetric(
  2650  				ctx,
  2651  				t,
  2652  				metrics.JobPodsCreationTotal,
  2653  				metricLabelsWithValue{Labels: []string{"recreate_failed", "succeeded"}, Value: tc.wantMetrics.recreateFailed},
  2654  			)
  2655  		})
  2656  	}
  2657  }
  2658  
  2659  // This tests the feature enable -> disable -> enable path for PodReplacementPolicy.
  2660  // We verify that Failed case works as expected when turned on.
  2661  // Disable reverts to previous behavior.
  2662  // Enabling will then match the original failed case.
  2663  func TestJobPodReplacementPolicyFeatureToggling(t *testing.T) {
  2664  	t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, fastPodFailureBackoff))
  2665  	const podCount int32 = 2
  2666  	jobSpec := batchv1.JobSpec{
  2667  		Parallelism:          ptr.To(podCount),
  2668  		Completions:          ptr.To(podCount),
  2669  		CompletionMode:       ptr.To(batchv1.NonIndexedCompletion),
  2670  		PodReplacementPolicy: ptr.To(batchv1.Failed),
  2671  	}
  2672  	wantTerminating := ptr.To(podCount)
  2673  	defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodReplacementPolicy, true)()
  2674  	closeFn, restConfig, clientSet, ns := setup(t, "pod-replacement-policy")
  2675  	defer closeFn()
  2676  	ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
  2677  	defer func() {
  2678  		cancel()
  2679  	}()
  2680  	resetMetrics()
  2681  
  2682  	jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
  2683  		Spec: jobSpec,
  2684  	})
  2685  	if err != nil {
  2686  		t.Fatalf("Failed to create Job: %v", err)
  2687  	}
  2688  	jobClient := clientSet.BatchV1().Jobs(jobObj.Namespace)
  2689  
  2690  	waitForPodsToBeActive(ctx, t, jobClient, 2, jobObj)
  2691  	deletePods(ctx, t, clientSet, jobObj.Namespace)
  2692  	validateJobsPodsStatusOnly(ctx, t, clientSet, jobObj, podsByStatus{
  2693  		Terminating: wantTerminating,
  2694  		Failed:      0,
  2695  		Ready:       ptr.To[int32](0),
  2696  	})
  2697  	// Disable controller and turn feature off.
  2698  	defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodReplacementPolicy, false)()
  2699  	cancel()
  2700  	ctx, cancel = startJobControllerAndWaitForCaches(t, restConfig)
  2701  
  2702  	validateJobsPodsStatusOnly(ctx, t, clientSet, jobObj, podsByStatus{
  2703  		Terminating: nil,
  2704  		Failed:      int(podCount),
  2705  		Ready:       ptr.To[int32](0),
  2706  		Active:      int(podCount),
  2707  	})
  2708  	// Disable the controller and turn feature on again.
  2709  	defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodReplacementPolicy, true)()
  2710  	cancel()
  2711  	ctx, cancel = startJobControllerAndWaitForCaches(t, restConfig)
  2712  	waitForPodsToBeActive(ctx, t, jobClient, 2, jobObj)
  2713  	deletePods(ctx, t, clientSet, jobObj.Namespace)
  2714  
  2715  	validateJobsPodsStatusOnly(ctx, t, clientSet, jobObj, podsByStatus{
  2716  		Terminating: wantTerminating,
  2717  		Failed:      int(podCount),
  2718  		Active:      0,
  2719  		Ready:       ptr.To[int32](0),
  2720  	})
  2721  }
  2722  
  2723  func TestElasticIndexedJob(t *testing.T) {
  2724  	const initialCompletions int32 = 3
  2725  	type jobUpdate struct {
  2726  		completions          *int32
  2727  		succeedIndexes       []int
  2728  		failIndexes          []int
  2729  		wantSucceededIndexes string
  2730  		wantFailed           int
  2731  		wantRemainingIndexes sets.Set[int]
  2732  		wantActivePods       int
  2733  	}
  2734  	cases := map[string]struct {
  2735  		featureGate bool
  2736  		jobUpdates  []jobUpdate
  2737  		wantErr     *apierrors.StatusError
  2738  	}{
  2739  		"feature flag off, mutation not allowed": {
  2740  			jobUpdates: []jobUpdate{
  2741  				{
  2742  					completions: ptr.To[int32](4),
  2743  				},
  2744  			},
  2745  			wantErr: apierrors.NewInvalid(
  2746  				schema.GroupKind{Group: "batch", Kind: "Job"},
  2747  				"test-job",
  2748  				field.ErrorList{field.Invalid(field.NewPath("spec", "completions"), 4, "field is immutable")},
  2749  			),
  2750  		},
  2751  		"scale up": {
  2752  			featureGate: true,
  2753  			jobUpdates: []jobUpdate{
  2754  				{
  2755  					// Scale up completions 3->4 then succeed indexes 0-3
  2756  					completions:          ptr.To[int32](4),
  2757  					succeedIndexes:       []int{0, 1, 2, 3},
  2758  					wantSucceededIndexes: "0-3",
  2759  				},
  2760  			},
  2761  		},
  2762  		"scale down": {
  2763  			featureGate: true,
  2764  			jobUpdates: []jobUpdate{
  2765  				// First succeed index 1 and fail index 2 while completions is still original value (3).
  2766  				{
  2767  					succeedIndexes:       []int{1},
  2768  					failIndexes:          []int{2},
  2769  					wantSucceededIndexes: "1",
  2770  					wantFailed:           1,
  2771  					wantRemainingIndexes: sets.New(0, 2),
  2772  					wantActivePods:       2,
  2773  				},
  2774  				// Scale down completions 3->1, verify prev failure out of range still counts
  2775  				// but succeeded out of range does not.
  2776  				{
  2777  					completions:          ptr.To[int32](1),
  2778  					succeedIndexes:       []int{0},
  2779  					wantSucceededIndexes: "0",
  2780  					wantFailed:           1,
  2781  				},
  2782  			},
  2783  		},
  2784  		"index finishes successfully, scale down, scale up": {
  2785  			featureGate: true,
  2786  			jobUpdates: []jobUpdate{
  2787  				// First succeed index 2 while completions is still original value (3).
  2788  				{
  2789  					succeedIndexes:       []int{2},
  2790  					wantSucceededIndexes: "2",
  2791  					wantRemainingIndexes: sets.New(0, 1),
  2792  					wantActivePods:       2,
  2793  				},
  2794  				// Scale completions down 3->2 to exclude previously succeeded index.
  2795  				{
  2796  					completions:          ptr.To[int32](2),
  2797  					wantRemainingIndexes: sets.New(0, 1),
  2798  					wantActivePods:       2,
  2799  				},
  2800  				// Scale completions back up to include previously succeeded index that was temporarily out of range.
  2801  				{
  2802  					completions:          ptr.To[int32](3),
  2803  					succeedIndexes:       []int{0, 1, 2},
  2804  					wantSucceededIndexes: "0-2",
  2805  				},
  2806  			},
  2807  		},
  2808  		"scale down to 0, verify that the job succeeds": {
  2809  			featureGate: true,
  2810  			jobUpdates: []jobUpdate{
  2811  				{
  2812  					completions: ptr.To[int32](0),
  2813  				},
  2814  			},
  2815  		},
  2816  	}
  2817  
  2818  	for name, tc := range cases {
  2819  		tc := tc
  2820  		t.Run(name, func(t *testing.T) {
  2821  			defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.ElasticIndexedJob, tc.featureGate)()
  2822  			closeFn, restConfig, clientSet, ns := setup(t, "indexed")
  2823  			defer closeFn()
  2824  			ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
  2825  			defer cancel()
  2826  			resetMetrics()
  2827  
  2828  			// Set up initial Job in Indexed completion mode.
  2829  			mode := batchv1.IndexedCompletion
  2830  			jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
  2831  				Spec: batchv1.JobSpec{
  2832  					Parallelism:    ptr.To(initialCompletions),
  2833  					Completions:    ptr.To(initialCompletions),
  2834  					CompletionMode: &mode,
  2835  				},
  2836  			})
  2837  			if err != nil {
  2838  				t.Fatalf("Failed to create Job: %v", err)
  2839  			}
  2840  			jobClient := clientSet.BatchV1().Jobs(jobObj.Namespace)
  2841  
  2842  			// Wait for pods to start up.
  2843  			err = wait.PollUntilContextTimeout(ctx, 5*time.Millisecond, wait.ForeverTestTimeout, true, func(ctx context.Context) (done bool, err error) {
  2844  				job, err := jobClient.Get(ctx, jobObj.Name, metav1.GetOptions{})
  2845  				if err != nil {
  2846  					return false, err
  2847  				}
  2848  				if job.Status.Active == initialCompletions {
  2849  					return true, nil
  2850  				}
  2851  				return false, nil
  2852  			})
  2853  			if err != nil {
  2854  				t.Fatalf("Error waiting for Job pods to become active: %v", err)
  2855  			}
  2856  
  2857  			for _, update := range tc.jobUpdates {
  2858  				// Update Job spec if necessary.
  2859  				if update.completions != nil {
  2860  					if jobObj, err = updateJob(ctx, jobClient, jobObj.Name, func(j *batchv1.Job) {
  2861  						j.Spec.Completions = update.completions
  2862  						j.Spec.Parallelism = update.completions
  2863  					}); err != nil {
  2864  						if diff := cmp.Diff(tc.wantErr, err); diff != "" {
  2865  							t.Fatalf("Unexpected or missing errors (-want/+got): %s", diff)
  2866  						}
  2867  						return
  2868  					}
  2869  				}
  2870  
  2871  				// Succeed specified indexes.
  2872  				for _, idx := range update.succeedIndexes {
  2873  					if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodSucceeded, idx); err != nil {
  2874  						t.Fatalf("Failed trying to succeed pod with index %d: %v", idx, err)
  2875  					}
  2876  				}
  2877  
  2878  				// Fail specified indexes.
  2879  				for _, idx := range update.failIndexes {
  2880  					if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodFailed, idx); err != nil {
  2881  						t.Fatalf("Failed trying to fail pod with index %d: %v", idx, err)
  2882  					}
  2883  				}
  2884  
  2885  				validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
  2886  					Active:      update.wantActivePods,
  2887  					Succeeded:   len(update.succeedIndexes),
  2888  					Failed:      update.wantFailed,
  2889  					Ready:       ptr.To[int32](0),
  2890  					Terminating: ptr.To[int32](0),
  2891  				})
  2892  				validateIndexedJobPods(ctx, t, clientSet, jobObj, update.wantRemainingIndexes, update.wantSucceededIndexes, nil)
  2893  			}
  2894  
  2895  			validateJobComplete(ctx, t, clientSet, jobObj)
  2896  		})
  2897  	}
  2898  }
  2899  
  2900  // BenchmarkLargeIndexedJob benchmarks the completion of an Indexed Job.
  2901  // We expect that large jobs are more commonly used as Indexed. And they are
  2902  // also faster to track, as they need less API calls.
  2903  func BenchmarkLargeIndexedJob(b *testing.B) {
  2904  	closeFn, restConfig, clientSet, ns := setup(b, "indexed")
  2905  	restConfig.QPS = 100
  2906  	restConfig.Burst = 100
  2907  	defer closeFn()
  2908  	ctx, cancel := startJobControllerAndWaitForCaches(b, restConfig)
  2909  	defer cancel()
  2910  	backoff := wait.Backoff{
  2911  		Duration: time.Second,
  2912  		Factor:   1.5,
  2913  		Steps:    30,
  2914  		Cap:      5 * time.Minute,
  2915  	}
  2916  	cases := map[string]struct {
  2917  		nPods                int32
  2918  		backoffLimitPerIndex *int32
  2919  	}{
  2920  		"regular indexed job without failures; size=10": {
  2921  			nPods: 10,
  2922  		},
  2923  		"job with backoffLimitPerIndex without failures; size=10": {
  2924  			nPods:                10,
  2925  			backoffLimitPerIndex: ptr.To[int32](1),
  2926  		},
  2927  		"regular indexed job without failures; size=100": {
  2928  			nPods: 100,
  2929  		},
  2930  		"job with backoffLimitPerIndex without failures; size=100": {
  2931  			nPods:                100,
  2932  			backoffLimitPerIndex: ptr.To[int32](1),
  2933  		},
  2934  	}
  2935  	mode := batchv1.IndexedCompletion
  2936  	for name, tc := range cases {
  2937  		b.Run(name, func(b *testing.B) {
  2938  			enableJobBackoffLimitPerIndex := tc.backoffLimitPerIndex != nil
  2939  			defer featuregatetesting.SetFeatureGateDuringTest(b, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, enableJobBackoffLimitPerIndex)()
  2940  			b.ResetTimer()
  2941  			for n := 0; n < b.N; n++ {
  2942  				b.StartTimer()
  2943  				jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
  2944  					ObjectMeta: metav1.ObjectMeta{
  2945  						Name: fmt.Sprintf("npods-%d-%d-%v", tc.nPods, n, enableJobBackoffLimitPerIndex),
  2946  					},
  2947  					Spec: batchv1.JobSpec{
  2948  						Parallelism:          ptr.To(tc.nPods),
  2949  						Completions:          ptr.To(tc.nPods),
  2950  						CompletionMode:       &mode,
  2951  						BackoffLimitPerIndex: tc.backoffLimitPerIndex,
  2952  					},
  2953  				})
  2954  				if err != nil {
  2955  					b.Fatalf("Failed to create Job: %v", err)
  2956  				}
  2957  				b.Cleanup(func() {
  2958  					if err := cleanUp(ctx, clientSet, jobObj); err != nil {
  2959  						b.Fatalf("Failed cleanup: %v", err)
  2960  					}
  2961  				})
  2962  				remaining := int(tc.nPods)
  2963  				if err := wait.ExponentialBackoff(backoff, func() (done bool, err error) {
  2964  					if err, succ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, remaining); err != nil {
  2965  						remaining -= succ
  2966  						b.Logf("Transient failure succeeding pods: %v", err)
  2967  						return false, nil
  2968  					}
  2969  					return true, nil
  2970  				}); err != nil {
  2971  					b.Fatalf("Could not succeed the remaining %d pods: %v", remaining, err)
  2972  				}
  2973  				validateJobComplete(ctx, b, clientSet, jobObj)
  2974  				b.StopTimer()
  2975  			}
  2976  		})
  2977  	}
  2978  }
  2979  
  2980  // BenchmarkLargeFailureHandling benchmarks the handling of numerous pod failures
  2981  // of an Indexed Job. We set minimal backoff delay to make the job controller
  2982  // performance comparable for indexed jobs with global backoffLimit, and those
  2983  // with backoffLimit per-index, despite different patterns of handling failures.
  2984  func BenchmarkLargeFailureHandling(b *testing.B) {
  2985  	b.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, fastPodFailureBackoff))
  2986  	b.Cleanup(setDurationDuringTest(&jobcontroller.MaxJobPodFailureBackOff, fastPodFailureBackoff))
  2987  	closeFn, restConfig, clientSet, ns := setup(b, "indexed")
  2988  	restConfig.QPS = 100
  2989  	restConfig.Burst = 100
  2990  	defer closeFn()
  2991  	ctx, cancel := startJobControllerAndWaitForCaches(b, restConfig)
  2992  	defer cancel()
  2993  	backoff := wait.Backoff{
  2994  		Duration: time.Second,
  2995  		Factor:   1.5,
  2996  		Steps:    30,
  2997  		Cap:      5 * time.Minute,
  2998  	}
  2999  	cases := map[string]struct {
  3000  		nPods                int32
  3001  		backoffLimitPerIndex *int32
  3002  		customTimeout        *time.Duration
  3003  	}{
  3004  		"regular indexed job with failures; size=10": {
  3005  			nPods: 10,
  3006  		},
  3007  		"job with backoffLimitPerIndex with failures; size=10": {
  3008  			nPods:                10,
  3009  			backoffLimitPerIndex: ptr.To[int32](1),
  3010  		},
  3011  		"regular indexed job with failures; size=100": {
  3012  			nPods: 100,
  3013  		},
  3014  		"job with backoffLimitPerIndex with failures; size=100": {
  3015  			nPods:                100,
  3016  			backoffLimitPerIndex: ptr.To[int32](1),
  3017  		},
  3018  	}
  3019  	mode := batchv1.IndexedCompletion
  3020  	for name, tc := range cases {
  3021  		b.Run(name, func(b *testing.B) {
  3022  			enableJobBackoffLimitPerIndex := tc.backoffLimitPerIndex != nil
  3023  			timeout := ptr.Deref(tc.customTimeout, wait.ForeverTestTimeout)
  3024  			defer featuregatetesting.SetFeatureGateDuringTest(b, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, enableJobBackoffLimitPerIndex)()
  3025  			b.ResetTimer()
  3026  			for n := 0; n < b.N; n++ {
  3027  				b.StopTimer()
  3028  				jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
  3029  					ObjectMeta: metav1.ObjectMeta{
  3030  						Name: fmt.Sprintf("npods-%d-%d-%v", tc.nPods, n, enableJobBackoffLimitPerIndex),
  3031  					},
  3032  					Spec: batchv1.JobSpec{
  3033  						Parallelism:          ptr.To(tc.nPods),
  3034  						Completions:          ptr.To(tc.nPods),
  3035  						CompletionMode:       &mode,
  3036  						BackoffLimitPerIndex: tc.backoffLimitPerIndex,
  3037  						BackoffLimit:         ptr.To(tc.nPods),
  3038  					},
  3039  				})
  3040  				if err != nil {
  3041  					b.Fatalf("Failed to create Job: %v", err)
  3042  				}
  3043  				b.Cleanup(func() {
  3044  					if err := cleanUp(ctx, clientSet, jobObj); err != nil {
  3045  						b.Fatalf("Failed cleanup: %v", err)
  3046  					}
  3047  				})
  3048  				validateJobsPodsStatusOnlyWithTimeout(ctx, b, clientSet, jobObj, podsByStatus{
  3049  					Active:      int(tc.nPods),
  3050  					Ready:       ptr.To[int32](0),
  3051  					Terminating: ptr.To[int32](0),
  3052  				}, timeout)
  3053  
  3054  				b.StartTimer()
  3055  				remaining := int(tc.nPods)
  3056  				if err := wait.ExponentialBackoff(backoff, func() (done bool, err error) {
  3057  					if err, fail := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, remaining); err != nil {
  3058  						remaining -= fail
  3059  						b.Logf("Transient failure failing pods: %v", err)
  3060  						return false, nil
  3061  					}
  3062  					return true, nil
  3063  				}); err != nil {
  3064  					b.Fatalf("Could not succeed the remaining %d pods: %v", remaining, err)
  3065  				}
  3066  				validateJobsPodsStatusOnlyWithTimeout(ctx, b, clientSet, jobObj, podsByStatus{
  3067  					Active:      int(tc.nPods),
  3068  					Ready:       ptr.To[int32](0),
  3069  					Failed:      int(tc.nPods),
  3070  					Terminating: ptr.To[int32](0),
  3071  				}, timeout)
  3072  				b.StopTimer()
  3073  			}
  3074  		})
  3075  	}
  3076  }
  3077  
  3078  // cleanUp deletes all pods and the job
  3079  func cleanUp(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job) error {
  3080  	// Clean up pods in pages, because DeleteCollection might timeout.
  3081  	// #90743
  3082  	for {
  3083  		pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{Limit: 1})
  3084  		if err != nil {
  3085  			return err
  3086  		}
  3087  		if len(pods.Items) == 0 {
  3088  			break
  3089  		}
  3090  		err = clientSet.CoreV1().Pods(jobObj.Namespace).DeleteCollection(ctx,
  3091  			metav1.DeleteOptions{},
  3092  			metav1.ListOptions{
  3093  				Limit: 1000,
  3094  			})
  3095  		if err != nil {
  3096  			return err
  3097  		}
  3098  	}
  3099  	return clientSet.BatchV1().Jobs(jobObj.Namespace).Delete(ctx, jobObj.Name, metav1.DeleteOptions{})
  3100  }
  3101  
  3102  func TestOrphanPodsFinalizersClearedWithGC(t *testing.T) {
  3103  	for _, policy := range []metav1.DeletionPropagation{metav1.DeletePropagationOrphan, metav1.DeletePropagationBackground, metav1.DeletePropagationForeground} {
  3104  		t.Run(string(policy), func(t *testing.T) {
  3105  			closeFn, restConfig, clientSet, ns := setup(t, "simple")
  3106  			defer closeFn()
  3107  			informerSet := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(restConfig, "controller-informers")), 0)
  3108  			// Make the job controller significantly slower to trigger race condition.
  3109  			restConfig.QPS = 1
  3110  			restConfig.Burst = 1
  3111  			jc, ctx, cancel := createJobControllerWithSharedInformers(t, restConfig, informerSet)
  3112  			resetMetrics()
  3113  			defer cancel()
  3114  			restConfig.QPS = 200
  3115  			restConfig.Burst = 200
  3116  			runGC := util.CreateGCController(ctx, t, *restConfig, informerSet)
  3117  			informerSet.Start(ctx.Done())
  3118  			go jc.Run(ctx, 1)
  3119  			runGC()
  3120  
  3121  			jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
  3122  				Spec: batchv1.JobSpec{
  3123  					Parallelism: ptr.To[int32](2),
  3124  				},
  3125  			})
  3126  			if err != nil {
  3127  				t.Fatalf("Failed to create Job: %v", err)
  3128  			}
  3129  			validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
  3130  				Active:      2,
  3131  				Ready:       ptr.To[int32](0),
  3132  				Terminating: ptr.To[int32](0),
  3133  			})
  3134  
  3135  			// Delete Job. The GC should delete the pods in cascade.
  3136  			err = clientSet.BatchV1().Jobs(jobObj.Namespace).Delete(ctx, jobObj.Name, metav1.DeleteOptions{
  3137  				PropagationPolicy: &policy,
  3138  			})
  3139  			if err != nil {
  3140  				t.Fatalf("Failed to delete job: %v", err)
  3141  			}
  3142  			validateNoOrphanPodsWithFinalizers(ctx, t, clientSet, jobObj)
  3143  			// Pods never finished, so they are not counted in the metric.
  3144  			validateTerminatedPodsTrackingFinalizerMetric(ctx, t, 0)
  3145  		})
  3146  	}
  3147  }
  3148  
  3149  func TestFinalizersClearedWhenBackoffLimitExceeded(t *testing.T) {
  3150  	// Set a maximum number of uncounted pods below parallelism, to ensure it
  3151  	// doesn't affect the termination of pods.
  3152  	t.Cleanup(setDuringTest(&jobcontroller.MaxUncountedPods, 50))
  3153  	closeFn, restConfig, clientSet, ns := setup(t, "simple")
  3154  	defer closeFn()
  3155  	ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
  3156  	defer cancel()
  3157  
  3158  	// Job tracking with finalizers requires less calls in Indexed mode,
  3159  	// so it's more likely to process all finalizers before all the pods
  3160  	// are visible.
  3161  	mode := batchv1.IndexedCompletion
  3162  	jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
  3163  		Spec: batchv1.JobSpec{
  3164  			CompletionMode: &mode,
  3165  			Completions:    ptr.To[int32](100),
  3166  			Parallelism:    ptr.To[int32](100),
  3167  			BackoffLimit:   ptr.To[int32](0),
  3168  		},
  3169  	})
  3170  	if err != nil {
  3171  		t.Fatalf("Could not create job: %v", err)
  3172  	}
  3173  
  3174  	// Fail a pod ASAP.
  3175  	err = wait.PollUntilContextTimeout(ctx, time.Millisecond, wait.ForeverTestTimeout, true, func(ctx context.Context) (done bool, err error) {
  3176  		if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
  3177  			return false, nil
  3178  		}
  3179  		return true, nil
  3180  	})
  3181  	if err != nil {
  3182  		t.Fatalf("Could not fail pod: %v", err)
  3183  	}
  3184  
  3185  	validateJobFailed(ctx, t, clientSet, jobObj)
  3186  	validateCounterMetric(ctx, t, metrics.JobFinishedNum, metricLabelsWithValue{
  3187  		Labels: []string{"Indexed", "failed", "BackoffLimitExceeded"},
  3188  		Value:  1,
  3189  	})
  3190  
  3191  	validateNoOrphanPodsWithFinalizers(ctx, t, clientSet, jobObj)
  3192  }
  3193  
  3194  func TestJobPodsCreatedWithExponentialBackoff(t *testing.T) {
  3195  	t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, 2*time.Second))
  3196  	closeFn, restConfig, clientSet, ns := setup(t, "simple")
  3197  	defer closeFn()
  3198  	ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
  3199  	defer cancel()
  3200  
  3201  	jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{})
  3202  	if err != nil {
  3203  		t.Fatalf("Could not create job: %v", err)
  3204  	}
  3205  	validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
  3206  		Active:      1,
  3207  		Ready:       ptr.To[int32](0),
  3208  		Terminating: ptr.To[int32](0),
  3209  	})
  3210  
  3211  	// Fail the first pod
  3212  	if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
  3213  		t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodFailed, err)
  3214  	}
  3215  	validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
  3216  		Active:      1,
  3217  		Ready:       ptr.To[int32](0),
  3218  		Failed:      1,
  3219  		Terminating: ptr.To[int32](0),
  3220  	})
  3221  
  3222  	// Fail the second pod
  3223  	if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
  3224  		t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodFailed, err)
  3225  	}
  3226  	validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
  3227  		Active:      1,
  3228  		Ready:       ptr.To[int32](0),
  3229  		Failed:      2,
  3230  		Terminating: ptr.To[int32](0),
  3231  	})
  3232  
  3233  	jobPods, err := getJobPods(ctx, t, clientSet, jobObj, func(ps v1.PodStatus) bool { return true })
  3234  	if err != nil {
  3235  		t.Fatalf("Failed to list Job Pods: %v", err)
  3236  	}
  3237  	if len(jobPods) != 3 {
  3238  		t.Fatalf("Expected to get %v pods, received %v", 4, len(jobPods))
  3239  	}
  3240  	validateExpotentialBackoffDelay(t, jobcontroller.DefaultJobPodFailureBackOff, jobPods)
  3241  }
  3242  
  3243  func validateExpotentialBackoffDelay(t *testing.T, defaultPodFailureBackoff time.Duration, pods []*v1.Pod) {
  3244  	t.Helper()
  3245  	creationTime := []time.Time{}
  3246  	finishTime := []time.Time{}
  3247  	for _, pod := range pods {
  3248  		creationTime = append(creationTime, pod.CreationTimestamp.Time)
  3249  		if len(pod.Status.ContainerStatuses) > 0 {
  3250  			finishTime = append(finishTime, pod.Status.ContainerStatuses[0].State.Terminated.FinishedAt.Time)
  3251  		}
  3252  	}
  3253  
  3254  	sort.Slice(creationTime, func(i, j int) bool {
  3255  		return creationTime[i].Before(creationTime[j])
  3256  	})
  3257  	sort.Slice(finishTime, func(i, j int) bool {
  3258  		return finishTime[i].Before(finishTime[j])
  3259  	})
  3260  
  3261  	diff := creationTime[1].Sub(finishTime[0])
  3262  
  3263  	if diff < defaultPodFailureBackoff {
  3264  		t.Fatalf("Second pod should be created at least %v seconds after the first pod, time difference: %v", defaultPodFailureBackoff, diff)
  3265  	}
  3266  
  3267  	if diff >= 2*defaultPodFailureBackoff {
  3268  		t.Fatalf("Second pod should be created before %v seconds after the first pod, time difference: %v", 2*defaultPodFailureBackoff, diff)
  3269  	}
  3270  
  3271  	diff = creationTime[2].Sub(finishTime[1])
  3272  
  3273  	if diff < 2*defaultPodFailureBackoff {
  3274  		t.Fatalf("Third pod should be created at least %v seconds after the second pod, time difference: %v", 2*defaultPodFailureBackoff, diff)
  3275  	}
  3276  
  3277  	if diff >= 4*defaultPodFailureBackoff {
  3278  		t.Fatalf("Third pod should be created before %v seconds after the second pod, time difference: %v", 4*defaultPodFailureBackoff, diff)
  3279  	}
  3280  }
  3281  
  3282  // TestJobFailedWithInterrupts tests that a job were one pod fails and the rest
  3283  // succeed is marked as Failed, even if the controller fails in the middle.
  3284  func TestJobFailedWithInterrupts(t *testing.T) {
  3285  	closeFn, restConfig, clientSet, ns := setup(t, "simple")
  3286  	defer closeFn()
  3287  	ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
  3288  	defer func() {
  3289  		cancel()
  3290  	}()
  3291  	jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
  3292  		Spec: batchv1.JobSpec{
  3293  			Completions:  ptr.To[int32](10),
  3294  			Parallelism:  ptr.To[int32](10),
  3295  			BackoffLimit: ptr.To[int32](0),
  3296  			Template: v1.PodTemplateSpec{
  3297  				Spec: v1.PodSpec{
  3298  					NodeName: "foo", // Scheduled pods are not deleted immediately.
  3299  				},
  3300  			},
  3301  		},
  3302  	})
  3303  	if err != nil {
  3304  		t.Fatalf("Could not create job: %v", err)
  3305  	}
  3306  	validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
  3307  		Active:      10,
  3308  		Ready:       ptr.To[int32](0),
  3309  		Terminating: ptr.To[int32](0),
  3310  	})
  3311  	t.Log("Finishing pods")
  3312  	if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
  3313  		t.Fatalf("Could not fail a pod: %v", err)
  3314  	}
  3315  	remaining := 9
  3316  	if err := wait.PollUntilContextTimeout(ctx, 5*time.Millisecond, wait.ForeverTestTimeout, true, func(ctx context.Context) (done bool, err error) {
  3317  		if err, succ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, remaining); err != nil {
  3318  			remaining -= succ
  3319  			t.Logf("Transient failure succeeding pods: %v", err)
  3320  			return false, nil
  3321  		}
  3322  		return true, nil
  3323  	}); err != nil {
  3324  		t.Fatalf("Could not succeed the remaining %d pods: %v", remaining, err)
  3325  	}
  3326  	t.Log("Recreating job controller")
  3327  	cancel()
  3328  	ctx, cancel = startJobControllerAndWaitForCaches(t, restConfig)
  3329  	validateJobCondition(ctx, t, clientSet, jobObj, batchv1.JobFailed)
  3330  }
  3331  
  3332  func validateNoOrphanPodsWithFinalizers(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job) {
  3333  	t.Helper()
  3334  	orphanPods := 0
  3335  	if err := wait.PollUntilContextTimeout(ctx, waitInterval, wait.ForeverTestTimeout, true, func(ctx context.Context) (done bool, err error) {
  3336  		pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{
  3337  			LabelSelector: metav1.FormatLabelSelector(jobObj.Spec.Selector),
  3338  		})
  3339  		if err != nil {
  3340  			return false, err
  3341  		}
  3342  		orphanPods = 0
  3343  		for _, pod := range pods.Items {
  3344  			if hasJobTrackingFinalizer(&pod) {
  3345  				orphanPods++
  3346  			}
  3347  		}
  3348  		return orphanPods == 0, nil
  3349  	}); err != nil {
  3350  		t.Errorf("Failed waiting for pods to be freed from finalizer: %v", err)
  3351  		t.Logf("Last saw %d orphan pods", orphanPods)
  3352  	}
  3353  }
  3354  
  3355  func TestOrphanPodsFinalizersClearedOnRestart(t *testing.T) {
  3356  	// Step 0: create job.
  3357  	closeFn, restConfig, clientSet, ns := setup(t, "simple")
  3358  	defer closeFn()
  3359  	ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
  3360  	defer func() {
  3361  		cancel()
  3362  	}()
  3363  
  3364  	jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
  3365  		Spec: batchv1.JobSpec{
  3366  			Parallelism: ptr.To[int32](1),
  3367  		},
  3368  	})
  3369  	if err != nil {
  3370  		t.Fatalf("Failed to create Job: %v", err)
  3371  	}
  3372  	validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
  3373  		Active:      1,
  3374  		Ready:       ptr.To[int32](0),
  3375  		Terminating: ptr.To[int32](0),
  3376  	})
  3377  
  3378  	// Step 2: Delete the Job while the controller is stopped.
  3379  	cancel()
  3380  
  3381  	err = clientSet.BatchV1().Jobs(jobObj.Namespace).Delete(context.Background(), jobObj.Name, metav1.DeleteOptions{})
  3382  	if err != nil {
  3383  		t.Fatalf("Failed to delete job: %v", err)
  3384  	}
  3385  
  3386  	// Step 3: Restart controller.
  3387  	ctx, cancel = startJobControllerAndWaitForCaches(t, restConfig)
  3388  	validateNoOrphanPodsWithFinalizers(ctx, t, clientSet, jobObj)
  3389  }
  3390  
  3391  func TestSuspendJob(t *testing.T) {
  3392  	type step struct {
  3393  		flag       bool
  3394  		wantActive int
  3395  		wantStatus v1.ConditionStatus
  3396  		wantReason string
  3397  	}
  3398  	testCases := []struct {
  3399  		featureGate bool
  3400  		create      step
  3401  		update      step
  3402  	}{
  3403  		// Exhaustively test all combinations other than trivial true->true and
  3404  		// false->false cases.
  3405  		{
  3406  			create: step{flag: false, wantActive: 2},
  3407  			update: step{flag: true, wantActive: 0, wantStatus: v1.ConditionTrue, wantReason: "Suspended"},
  3408  		},
  3409  		{
  3410  			create: step{flag: true, wantActive: 0, wantStatus: v1.ConditionTrue, wantReason: "Suspended"},
  3411  			update: step{flag: false, wantActive: 2, wantStatus: v1.ConditionFalse, wantReason: "Resumed"},
  3412  		},
  3413  	}
  3414  
  3415  	for _, tc := range testCases {
  3416  		name := fmt.Sprintf("feature=%v,create=%v,update=%v", tc.featureGate, tc.create.flag, tc.update.flag)
  3417  		t.Run(name, func(t *testing.T) {
  3418  			closeFn, restConfig, clientSet, ns := setup(t, "suspend")
  3419  			defer closeFn()
  3420  			ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
  3421  			defer cancel()
  3422  			events, err := clientSet.EventsV1().Events(ns.Name).Watch(ctx, metav1.ListOptions{})
  3423  			if err != nil {
  3424  				t.Fatal(err)
  3425  			}
  3426  			defer events.Stop()
  3427  
  3428  			parallelism := int32(2)
  3429  			job, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
  3430  				Spec: batchv1.JobSpec{
  3431  					Parallelism: ptr.To(parallelism),
  3432  					Completions: ptr.To[int32](4),
  3433  					Suspend:     ptr.To(tc.create.flag),
  3434  				},
  3435  			})
  3436  			if err != nil {
  3437  				t.Fatalf("Failed to create Job: %v", err)
  3438  			}
  3439  
  3440  			validate := func(s string, active int, status v1.ConditionStatus, reason string) {
  3441  				validateJobPodsStatus(ctx, t, clientSet, job, podsByStatus{
  3442  					Active:      active,
  3443  					Ready:       ptr.To[int32](0),
  3444  					Terminating: ptr.To[int32](0),
  3445  				})
  3446  				job, err = clientSet.BatchV1().Jobs(ns.Name).Get(ctx, job.Name, metav1.GetOptions{})
  3447  				if err != nil {
  3448  					t.Fatalf("Failed to get Job after %s: %v", s, err)
  3449  				}
  3450  				if got, want := getJobConditionStatus(ctx, job, batchv1.JobSuspended), status; got != want {
  3451  					t.Errorf("Unexpected Job condition %q status after %s: got %q, want %q", batchv1.JobSuspended, s, got, want)
  3452  				}
  3453  				if err := waitForEvent(ctx, events, job.UID, reason); err != nil {
  3454  					t.Errorf("Waiting for event with reason %q after %s: %v", reason, s, err)
  3455  				}
  3456  			}
  3457  			validate("create", tc.create.wantActive, tc.create.wantStatus, tc.create.wantReason)
  3458  
  3459  			job.Spec.Suspend = ptr.To(tc.update.flag)
  3460  			job, err = clientSet.BatchV1().Jobs(ns.Name).Update(ctx, job, metav1.UpdateOptions{})
  3461  			if err != nil {
  3462  				t.Fatalf("Failed to update Job: %v", err)
  3463  			}
  3464  			validate("update", tc.update.wantActive, tc.update.wantStatus, tc.update.wantReason)
  3465  		})
  3466  	}
  3467  }
  3468  
  3469  func TestSuspendJobControllerRestart(t *testing.T) {
  3470  	closeFn, restConfig, clientSet, ns := setup(t, "suspend")
  3471  	defer closeFn()
  3472  	ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
  3473  	defer cancel()
  3474  
  3475  	job, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
  3476  		Spec: batchv1.JobSpec{
  3477  			Parallelism: ptr.To[int32](2),
  3478  			Completions: ptr.To[int32](4),
  3479  			Suspend:     ptr.To(true),
  3480  		},
  3481  	})
  3482  	if err != nil {
  3483  		t.Fatalf("Failed to create Job: %v", err)
  3484  	}
  3485  	validateJobPodsStatus(ctx, t, clientSet, job, podsByStatus{
  3486  		Active:      0,
  3487  		Ready:       ptr.To[int32](0),
  3488  		Terminating: ptr.To[int32](0),
  3489  	})
  3490  }
  3491  
  3492  func TestNodeSelectorUpdate(t *testing.T) {
  3493  	closeFn, restConfig, clientSet, ns := setup(t, "suspend")
  3494  	defer closeFn()
  3495  	ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
  3496  	defer cancel()
  3497  
  3498  	job, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{Spec: batchv1.JobSpec{
  3499  		Parallelism: ptr.To[int32](1),
  3500  		Suspend:     ptr.To(true),
  3501  	}})
  3502  	if err != nil {
  3503  		t.Fatalf("Failed to create Job: %v", err)
  3504  	}
  3505  	jobName := job.Name
  3506  	jobNamespace := job.Namespace
  3507  	jobClient := clientSet.BatchV1().Jobs(jobNamespace)
  3508  
  3509  	// (1) Unsuspend and set node selector in the same update.
  3510  	nodeSelector := map[string]string{"foo": "bar"}
  3511  	if _, err := updateJob(ctx, jobClient, jobName, func(j *batchv1.Job) {
  3512  		j.Spec.Template.Spec.NodeSelector = nodeSelector
  3513  		j.Spec.Suspend = ptr.To(false)
  3514  	}); err != nil {
  3515  		t.Errorf("Unexpected error: %v", err)
  3516  	}
  3517  
  3518  	// (2) Check that the pod was created using the expected node selector.
  3519  
  3520  	var pod *v1.Pod
  3521  	if err := wait.PollUntilContextTimeout(ctx, waitInterval, wait.ForeverTestTimeout, true, func(ctx context.Context) (bool, error) {
  3522  		pods, err := clientSet.CoreV1().Pods(jobNamespace).List(ctx, metav1.ListOptions{})
  3523  		if err != nil {
  3524  			t.Fatalf("Failed to list Job Pods: %v", err)
  3525  		}
  3526  		if len(pods.Items) == 0 {
  3527  			return false, nil
  3528  		}
  3529  		pod = &pods.Items[0]
  3530  		return true, nil
  3531  	}); err != nil || pod == nil {
  3532  		t.Fatalf("pod not found: %v", err)
  3533  	}
  3534  
  3535  	// if the feature gate is enabled, then the job should now be unsuspended and
  3536  	// the pod has the node selector.
  3537  	if diff := cmp.Diff(nodeSelector, pod.Spec.NodeSelector); diff != "" {
  3538  		t.Errorf("Unexpected nodeSelector (-want,+got):\n%s", diff)
  3539  	}
  3540  
  3541  	// (3) Update node selector again. It should fail since the job is unsuspended.
  3542  	_, err = updateJob(ctx, jobClient, jobName, func(j *batchv1.Job) {
  3543  		j.Spec.Template.Spec.NodeSelector = map[string]string{"foo": "baz"}
  3544  	})
  3545  
  3546  	if err == nil || !strings.Contains(err.Error(), "spec.template: Invalid value") {
  3547  		t.Errorf("Expected \"spec.template: Invalid value\" error, got: %v", err)
  3548  	}
  3549  
  3550  }
  3551  
  3552  type podsByStatus struct {
  3553  	Active      int
  3554  	Ready       *int32
  3555  	Failed      int
  3556  	Succeeded   int
  3557  	Terminating *int32
  3558  }
  3559  
  3560  func validateJobsPodsStatusOnly(ctx context.Context, t testing.TB, clientSet clientset.Interface, jobObj *batchv1.Job, desired podsByStatus) {
  3561  	t.Helper()
  3562  	validateJobsPodsStatusOnlyWithTimeout(ctx, t, clientSet, jobObj, desired, wait.ForeverTestTimeout)
  3563  }
  3564  
  3565  func validateJobsPodsStatusOnlyWithTimeout(ctx context.Context, t testing.TB, clientSet clientset.Interface, jobObj *batchv1.Job, desired podsByStatus, timeout time.Duration) {
  3566  	t.Helper()
  3567  	var actualCounts podsByStatus
  3568  	if err := wait.PollUntilContextTimeout(ctx, waitInterval, timeout, true, func(ctx context.Context) (bool, error) {
  3569  		updatedJob, err := clientSet.BatchV1().Jobs(jobObj.Namespace).Get(ctx, jobObj.Name, metav1.GetOptions{})
  3570  		if err != nil {
  3571  			t.Fatalf("Failed to get updated Job: %v", err)
  3572  		}
  3573  		actualCounts = podsByStatus{
  3574  			Active:      int(updatedJob.Status.Active),
  3575  			Ready:       updatedJob.Status.Ready,
  3576  			Succeeded:   int(updatedJob.Status.Succeeded),
  3577  			Failed:      int(updatedJob.Status.Failed),
  3578  			Terminating: updatedJob.Status.Terminating,
  3579  		}
  3580  		return cmp.Equal(actualCounts, desired), nil
  3581  	}); err != nil {
  3582  		diff := cmp.Diff(desired, actualCounts)
  3583  		t.Errorf("Waiting for Job Status: %v\nPods (-want,+got):\n%s", err, diff)
  3584  	}
  3585  }
  3586  
  3587  func validateJobPodsStatus(ctx context.Context, t testing.TB, clientSet clientset.Interface, jobObj *batchv1.Job, desired podsByStatus) {
  3588  	t.Helper()
  3589  	validateJobsPodsStatusOnly(ctx, t, clientSet, jobObj, desired)
  3590  	var active []*v1.Pod
  3591  	if err := wait.PollUntilContextTimeout(ctx, waitInterval, time.Second*5, true, func(ctx context.Context) (bool, error) {
  3592  		pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{})
  3593  		if err != nil {
  3594  			t.Fatalf("Failed to list Job Pods: %v", err)
  3595  		}
  3596  		active = nil
  3597  		for _, pod := range pods.Items {
  3598  			phase := pod.Status.Phase
  3599  			if metav1.IsControlledBy(&pod, jobObj) && (phase == v1.PodPending || phase == v1.PodRunning) {
  3600  				p := pod
  3601  				active = append(active, &p)
  3602  			}
  3603  		}
  3604  		return len(active) == desired.Active, nil
  3605  	}); err != nil {
  3606  		if len(active) != desired.Active {
  3607  			t.Errorf("Found %d active Pods, want %d", len(active), desired.Active)
  3608  		}
  3609  	}
  3610  	for _, p := range active {
  3611  		if !hasJobTrackingFinalizer(p) {
  3612  			t.Errorf("Active pod %s doesn't have tracking finalizer", p.Name)
  3613  		}
  3614  	}
  3615  }
  3616  
  3617  func getJobPods(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job, filter func(v1.PodStatus) bool) ([]*v1.Pod, error) {
  3618  	t.Helper()
  3619  	allPods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{})
  3620  	if err != nil {
  3621  		return nil, err
  3622  	}
  3623  	jobPods := make([]*v1.Pod, 0, 0)
  3624  	for _, pod := range allPods.Items {
  3625  		if metav1.IsControlledBy(&pod, jobObj) && filter(pod.Status) {
  3626  			p := pod
  3627  			jobPods = append(jobPods, &p)
  3628  		}
  3629  	}
  3630  	return jobPods, nil
  3631  }
  3632  
  3633  func validateFinishedPodsNoFinalizer(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job) {
  3634  	t.Helper()
  3635  	pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{})
  3636  	if err != nil {
  3637  		t.Fatalf("Failed to list Job Pods: %v", err)
  3638  	}
  3639  	for _, pod := range pods.Items {
  3640  		phase := pod.Status.Phase
  3641  		if metav1.IsControlledBy(&pod, jobObj) && (phase == v1.PodPending || phase == v1.PodRunning) && hasJobTrackingFinalizer(&pod) {
  3642  			t.Errorf("Finished pod %s still has a tracking finalizer", pod.Name)
  3643  		}
  3644  	}
  3645  }
  3646  
  3647  // validateIndexedJobPods validates indexes and hostname of
  3648  // active and completed Pods of an Indexed Job.
  3649  // Call after validateJobPodsStatus
  3650  func validateIndexedJobPods(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job, wantActive sets.Set[int], gotCompleted string, wantFailed *string) {
  3651  	t.Helper()
  3652  	updatedJob, err := clientSet.BatchV1().Jobs(jobObj.Namespace).Get(ctx, jobObj.Name, metav1.GetOptions{})
  3653  	if err != nil {
  3654  		t.Fatalf("Failed to get updated Job: %v", err)
  3655  	}
  3656  	if updatedJob.Status.CompletedIndexes != gotCompleted {
  3657  		t.Errorf("Got completed indexes %q, want %q", updatedJob.Status.CompletedIndexes, gotCompleted)
  3658  	}
  3659  	if diff := cmp.Diff(wantFailed, updatedJob.Status.FailedIndexes); diff != "" {
  3660  		t.Errorf("Got unexpected failed indexes: %s", diff)
  3661  	}
  3662  	pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{})
  3663  	if err != nil {
  3664  		t.Fatalf("Failed to list Job Pods: %v", err)
  3665  	}
  3666  	gotActive := sets.New[int]()
  3667  	for _, pod := range pods.Items {
  3668  		if metav1.IsControlledBy(&pod, jobObj) {
  3669  			if pod.Status.Phase == v1.PodPending || pod.Status.Phase == v1.PodRunning {
  3670  				ix, err := getCompletionIndex(&pod)
  3671  				if err != nil {
  3672  					t.Errorf("Failed getting completion index for pod %s: %v", pod.Name, err)
  3673  				} else {
  3674  					gotActive.Insert(ix)
  3675  				}
  3676  				expectedName := fmt.Sprintf("%s-%d", jobObj.Name, ix)
  3677  				if diff := cmp.Equal(expectedName, pod.Spec.Hostname); !diff {
  3678  					t.Errorf("Got pod hostname %s, want %s", pod.Spec.Hostname, expectedName)
  3679  				}
  3680  			}
  3681  		}
  3682  	}
  3683  	if wantActive == nil {
  3684  		wantActive = sets.New[int]()
  3685  	}
  3686  	if diff := cmp.Diff(sets.List(wantActive), sets.List(gotActive)); diff != "" {
  3687  		t.Errorf("Unexpected active indexes (-want,+got):\n%s", diff)
  3688  	}
  3689  }
  3690  
  3691  func waitForEvent(ctx context.Context, events watch.Interface, uid types.UID, reason string) error {
  3692  	if reason == "" {
  3693  		return nil
  3694  	}
  3695  	return wait.PollUntilContextTimeout(ctx, waitInterval, wait.ForeverTestTimeout, true, func(ctx context.Context) (bool, error) {
  3696  		for {
  3697  			var ev watch.Event
  3698  			select {
  3699  			case ev = <-events.ResultChan():
  3700  			default:
  3701  				return false, nil
  3702  			}
  3703  			e, ok := ev.Object.(*eventsv1.Event)
  3704  			if !ok {
  3705  				continue
  3706  			}
  3707  			ctrl := "job-controller"
  3708  			if (e.ReportingController == ctrl || e.DeprecatedSource.Component == ctrl) && e.Reason == reason && e.Regarding.UID == uid {
  3709  				return true, nil
  3710  			}
  3711  		}
  3712  	})
  3713  }
  3714  
  3715  func getJobConditionStatus(ctx context.Context, job *batchv1.Job, cType batchv1.JobConditionType) v1.ConditionStatus {
  3716  	for _, cond := range job.Status.Conditions {
  3717  		if cond.Type == cType {
  3718  			return cond.Status
  3719  		}
  3720  	}
  3721  	return ""
  3722  }
  3723  
  3724  func validateJobFailed(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job) {
  3725  	t.Helper()
  3726  	validateJobCondition(ctx, t, clientSet, jobObj, batchv1.JobFailed)
  3727  }
  3728  
  3729  func validateJobComplete(ctx context.Context, t testing.TB, clientSet clientset.Interface, jobObj *batchv1.Job) {
  3730  	t.Helper()
  3731  	validateJobCondition(ctx, t, clientSet, jobObj, batchv1.JobComplete)
  3732  }
  3733  
  3734  func validateJobCondition(ctx context.Context, t testing.TB, clientSet clientset.Interface, jobObj *batchv1.Job, cond batchv1.JobConditionType) {
  3735  	t.Helper()
  3736  	if err := wait.PollUntilContextTimeout(ctx, waitInterval, wait.ForeverTestTimeout, true, func(ctx context.Context) (bool, error) {
  3737  		j, err := clientSet.BatchV1().Jobs(jobObj.Namespace).Get(ctx, jobObj.Name, metav1.GetOptions{})
  3738  		if err != nil {
  3739  			t.Fatalf("Failed to obtain updated Job: %v", err)
  3740  		}
  3741  		return getJobConditionStatus(ctx, j, cond) == v1.ConditionTrue, nil
  3742  	}); err != nil {
  3743  		t.Errorf("Waiting for Job to have condition %s: %v", cond, err)
  3744  	}
  3745  }
  3746  
  3747  func setJobPodsPhase(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, phase v1.PodPhase, cnt int) (error, int) {
  3748  	op := func(p *v1.Pod) bool {
  3749  		p.Status.Phase = phase
  3750  		if phase == v1.PodFailed || phase == v1.PodSucceeded {
  3751  			p.Status.ContainerStatuses = []v1.ContainerStatus{
  3752  				{
  3753  					State: v1.ContainerState{
  3754  						Terminated: &v1.ContainerStateTerminated{
  3755  							FinishedAt: metav1.Now(),
  3756  						},
  3757  					},
  3758  				},
  3759  			}
  3760  		}
  3761  		return true
  3762  	}
  3763  	return updateJobPodsStatus(ctx, clientSet, jobObj, op, cnt)
  3764  }
  3765  
  3766  func setJobPodsReady(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, cnt int) (error, int) {
  3767  	op := func(p *v1.Pod) bool {
  3768  		if podutil.IsPodReady(p) {
  3769  			return false
  3770  		}
  3771  		p.Status.Conditions = append(p.Status.Conditions, v1.PodCondition{
  3772  			Type:   v1.PodReady,
  3773  			Status: v1.ConditionTrue,
  3774  		})
  3775  		return true
  3776  	}
  3777  	return updateJobPodsStatus(ctx, clientSet, jobObj, op, cnt)
  3778  }
  3779  
  3780  func updateJobPodsStatus(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, op func(*v1.Pod) bool, cnt int) (error, int) {
  3781  	pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{})
  3782  	if err != nil {
  3783  		return fmt.Errorf("listing Job Pods: %w", err), 0
  3784  	}
  3785  	updates := make([]v1.Pod, 0, cnt)
  3786  	for _, pod := range pods.Items {
  3787  		if len(updates) == cnt {
  3788  			break
  3789  		}
  3790  		if p := pod.Status.Phase; metav1.IsControlledBy(&pod, jobObj) && p != v1.PodFailed && p != v1.PodSucceeded {
  3791  			if !op(&pod) {
  3792  				continue
  3793  			}
  3794  			updates = append(updates, pod)
  3795  		}
  3796  	}
  3797  	successful, err := updatePodStatuses(ctx, clientSet, updates)
  3798  	if successful != cnt {
  3799  		return fmt.Errorf("couldn't set phase on %d Job pods", cnt-successful), successful
  3800  	}
  3801  	return err, successful
  3802  }
  3803  
  3804  func updatePodStatuses(ctx context.Context, clientSet clientset.Interface, updates []v1.Pod) (int, error) {
  3805  	wg := sync.WaitGroup{}
  3806  	wg.Add(len(updates))
  3807  	errCh := make(chan error, len(updates))
  3808  	var updated int32
  3809  
  3810  	for _, pod := range updates {
  3811  		pod := pod
  3812  		go func() {
  3813  			_, err := clientSet.CoreV1().Pods(pod.Namespace).UpdateStatus(ctx, &pod, metav1.UpdateOptions{})
  3814  			if err != nil {
  3815  				errCh <- err
  3816  			} else {
  3817  				atomic.AddInt32(&updated, 1)
  3818  			}
  3819  			wg.Done()
  3820  		}()
  3821  	}
  3822  	wg.Wait()
  3823  
  3824  	select {
  3825  	case err := <-errCh:
  3826  		return int(updated), fmt.Errorf("updating Pod status: %w", err)
  3827  	default:
  3828  	}
  3829  	return int(updated), nil
  3830  }
  3831  
  3832  func setJobPhaseForIndex(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, phase v1.PodPhase, ix int) error {
  3833  	pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{})
  3834  	if err != nil {
  3835  		return fmt.Errorf("listing Job Pods: %w", err)
  3836  	}
  3837  	for _, pod := range pods.Items {
  3838  		if p := pod.Status.Phase; !metav1.IsControlledBy(&pod, jobObj) || p == v1.PodFailed || p == v1.PodSucceeded {
  3839  			continue
  3840  		}
  3841  		if pix, err := getCompletionIndex(&pod); err == nil && pix == ix {
  3842  			pod.Status.Phase = phase
  3843  			if phase == v1.PodFailed || phase == v1.PodSucceeded {
  3844  				pod.Status.ContainerStatuses = []v1.ContainerStatus{
  3845  					{
  3846  						State: v1.ContainerState{
  3847  							Terminated: &v1.ContainerStateTerminated{
  3848  								FinishedAt: metav1.Now(),
  3849  							},
  3850  						},
  3851  					},
  3852  				}
  3853  			}
  3854  			_, err := clientSet.CoreV1().Pods(pod.Namespace).UpdateStatus(ctx, &pod, metav1.UpdateOptions{})
  3855  			if err != nil {
  3856  				return fmt.Errorf("updating pod %s status: %w", pod.Name, err)
  3857  			}
  3858  			return nil
  3859  		}
  3860  	}
  3861  	return errors.New("no pod matching index found")
  3862  }
  3863  
  3864  func getActivePodForIndex(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, ix int) (*v1.Pod, error) {
  3865  	return getJobPodForIndex(ctx, clientSet, jobObj, ix, func(p *v1.Pod) bool {
  3866  		return !podutil.IsPodTerminal(p)
  3867  	})
  3868  }
  3869  
  3870  func getJobPodForIndex(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, ix int, filter func(*v1.Pod) bool) (*v1.Pod, error) {
  3871  	pods, err := getJobPodsForIndex(ctx, clientSet, jobObj, ix, filter)
  3872  	if err != nil {
  3873  		return nil, err
  3874  	}
  3875  	if len(pods) == 0 {
  3876  		return nil, fmt.Errorf("Pod not found for index: %v", ix)
  3877  	}
  3878  	return pods[0], nil
  3879  }
  3880  
  3881  func getJobPodsForIndex(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, ix int, filter func(*v1.Pod) bool) ([]*v1.Pod, error) {
  3882  	pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{})
  3883  	if err != nil {
  3884  		return nil, fmt.Errorf("listing Job Pods: %w", err)
  3885  	}
  3886  	var result []*v1.Pod
  3887  	for _, pod := range pods.Items {
  3888  		pod := pod
  3889  		if !metav1.IsControlledBy(&pod, jobObj) {
  3890  			continue
  3891  		}
  3892  		if !filter(&pod) {
  3893  			continue
  3894  		}
  3895  		if pix, err := getCompletionIndex(&pod); err == nil && pix == ix {
  3896  			result = append(result, &pod)
  3897  		}
  3898  	}
  3899  	return result, nil
  3900  }
  3901  
  3902  func getCompletionIndex(p *v1.Pod) (int, error) {
  3903  	if p.Annotations == nil {
  3904  		return 0, errors.New("no annotations found")
  3905  	}
  3906  	v, ok := p.Annotations[batchv1.JobCompletionIndexAnnotation]
  3907  	if !ok {
  3908  		return 0, fmt.Errorf("annotation %s not found", batchv1.JobCompletionIndexAnnotation)
  3909  	}
  3910  	return strconv.Atoi(v)
  3911  }
  3912  
  3913  func createJobWithDefaults(ctx context.Context, clientSet clientset.Interface, ns string, jobObj *batchv1.Job) (*batchv1.Job, error) {
  3914  	if jobObj.Name == "" {
  3915  		jobObj.Name = "test-job"
  3916  	}
  3917  	if len(jobObj.Spec.Template.Spec.Containers) == 0 {
  3918  		jobObj.Spec.Template.Spec.Containers = []v1.Container{
  3919  			{Name: "foo", Image: "bar"},
  3920  		}
  3921  	}
  3922  	if jobObj.Spec.Template.Spec.RestartPolicy == "" {
  3923  		jobObj.Spec.Template.Spec.RestartPolicy = v1.RestartPolicyNever
  3924  	}
  3925  	return clientSet.BatchV1().Jobs(ns).Create(ctx, jobObj, metav1.CreateOptions{})
  3926  }
  3927  
  3928  func setup(t testing.TB, nsBaseName string) (framework.TearDownFunc, *restclient.Config, clientset.Interface, *v1.Namespace) {
  3929  	// Disable ServiceAccount admission plugin as we don't have serviceaccount controller running.
  3930  	server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, framework.SharedEtcd())
  3931  
  3932  	config := restclient.CopyConfig(server.ClientConfig)
  3933  	config.QPS = 200
  3934  	config.Burst = 200
  3935  	config.Timeout = 0
  3936  	clientSet, err := clientset.NewForConfig(config)
  3937  	if err != nil {
  3938  		t.Fatalf("Error creating clientset: %v", err)
  3939  	}
  3940  
  3941  	ns := framework.CreateNamespaceOrDie(clientSet, nsBaseName, t)
  3942  	closeFn := func() {
  3943  		framework.DeleteNamespaceOrDie(clientSet, ns, t)
  3944  		server.TearDownFn()
  3945  	}
  3946  	return closeFn, config, clientSet, ns
  3947  }
  3948  
  3949  func startJobControllerAndWaitForCaches(tb testing.TB, restConfig *restclient.Config) (context.Context, context.CancelFunc) {
  3950  	tb.Helper()
  3951  	informerSet := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(restConfig, "job-informers")), 0)
  3952  	jc, ctx, cancel := createJobControllerWithSharedInformers(tb, restConfig, informerSet)
  3953  	informerSet.Start(ctx.Done())
  3954  	go jc.Run(ctx, 1)
  3955  
  3956  	// since this method starts the controller in a separate goroutine
  3957  	// and the tests don't check /readyz there is no way
  3958  	// the tests can tell it is safe to call the server and requests won't be rejected
  3959  	// thus we wait until caches have synced
  3960  	informerSet.WaitForCacheSync(ctx.Done())
  3961  	return ctx, cancel
  3962  }
  3963  
  3964  func resetMetrics() {
  3965  	metrics.TerminatedPodsTrackingFinalizerTotal.Reset()
  3966  	metrics.JobFinishedNum.Reset()
  3967  	metrics.JobPodsFinished.Reset()
  3968  	metrics.PodFailuresHandledByFailurePolicy.Reset()
  3969  	metrics.JobFinishedIndexesTotal.Reset()
  3970  	metrics.JobPodsCreationTotal.Reset()
  3971  	metrics.JobByExternalControllerTotal.Reset()
  3972  }
  3973  
  3974  func createJobControllerWithSharedInformers(tb testing.TB, restConfig *restclient.Config, informerSet informers.SharedInformerFactory) (*jobcontroller.Controller, context.Context, context.CancelFunc) {
  3975  	tb.Helper()
  3976  	clientSet := clientset.NewForConfigOrDie(restclient.AddUserAgent(restConfig, "job-controller"))
  3977  	ctx, cancel := context.WithCancel(context.Background())
  3978  	jc, err := jobcontroller.NewController(ctx, informerSet.Core().V1().Pods(), informerSet.Batch().V1().Jobs(), clientSet)
  3979  	if err != nil {
  3980  		tb.Fatalf("Error creating Job controller: %v", err)
  3981  	}
  3982  	return jc, ctx, cancel
  3983  }
  3984  
  3985  func hasJobTrackingFinalizer(obj metav1.Object) bool {
  3986  	for _, fin := range obj.GetFinalizers() {
  3987  		if fin == batchv1.JobTrackingFinalizer {
  3988  			return true
  3989  		}
  3990  	}
  3991  	return false
  3992  }
  3993  
  3994  func setDuringTest(val *int, newVal int) func() {
  3995  	origVal := *val
  3996  	*val = newVal
  3997  	return func() {
  3998  		*val = origVal
  3999  	}
  4000  }
  4001  
  4002  func setDurationDuringTest(val *time.Duration, newVal time.Duration) func() {
  4003  	origVal := *val
  4004  	*val = newVal
  4005  	return func() {
  4006  		*val = origVal
  4007  	}
  4008  }
  4009  
  4010  func updateJob(ctx context.Context, jobClient typedv1.JobInterface, jobName string, updateFunc func(*batchv1.Job)) (*batchv1.Job, error) {
  4011  	var job *batchv1.Job
  4012  	err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
  4013  		newJob, err := jobClient.Get(ctx, jobName, metav1.GetOptions{})
  4014  		if err != nil {
  4015  			return err
  4016  		}
  4017  		updateFunc(newJob)
  4018  		job, err = jobClient.Update(ctx, newJob, metav1.UpdateOptions{})
  4019  		return err
  4020  	})
  4021  	return job, err
  4022  }
  4023  
  4024  func waitForPodsToBeActive(ctx context.Context, t *testing.T, jobClient typedv1.JobInterface, podCount int32, jobObj *batchv1.Job) {
  4025  	t.Helper()
  4026  	err := wait.PollUntilContextTimeout(ctx, 5*time.Millisecond, wait.ForeverTestTimeout, true, func(context.Context) (done bool, err error) {
  4027  		job, err := jobClient.Get(ctx, jobObj.Name, metav1.GetOptions{})
  4028  		if err != nil {
  4029  			return false, err
  4030  		}
  4031  		return job.Status.Active == podCount, nil
  4032  	})
  4033  	if err != nil {
  4034  		t.Fatalf("Error waiting for Job pods to become active: %v", err)
  4035  	}
  4036  }
  4037  
  4038  func deletePods(ctx context.Context, t *testing.T, clientSet clientset.Interface, namespace string) {
  4039  	t.Helper()
  4040  	err := clientSet.CoreV1().Pods(namespace).DeleteCollection(ctx,
  4041  		metav1.DeleteOptions{},
  4042  		metav1.ListOptions{
  4043  			Limit: 1000,
  4044  		})
  4045  	if err != nil {
  4046  		t.Fatalf("Failed to cleanup Pods: %v", err)
  4047  	}
  4048  }
  4049  
  4050  func removePodsFinalizer(ctx context.Context, t *testing.T, clientSet clientset.Interface, namespace string) {
  4051  	t.Helper()
  4052  	pods, err := clientSet.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{})
  4053  	if err != nil {
  4054  		t.Fatalf("Failed to list pods: %v", err)
  4055  	}
  4056  	updatePod(ctx, t, clientSet, pods.Items, func(pod *v1.Pod) {
  4057  		for i, finalizer := range pod.Finalizers {
  4058  			if finalizer == "fake.example.com/blockDeletion" {
  4059  				pod.Finalizers = append(pod.Finalizers[:i], pod.Finalizers[i+1:]...)
  4060  			}
  4061  		}
  4062  	})
  4063  }
  4064  
  4065  func updatePod(ctx context.Context, t *testing.T, clientSet clientset.Interface, pods []v1.Pod, updateFunc func(*v1.Pod)) {
  4066  	t.Helper()
  4067  	for _, val := range pods {
  4068  		if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
  4069  			newPod, err := clientSet.CoreV1().Pods(val.Namespace).Get(ctx, val.Name, metav1.GetOptions{})
  4070  			if err != nil {
  4071  				return err
  4072  			}
  4073  			updateFunc(newPod)
  4074  			_, err = clientSet.CoreV1().Pods(val.Namespace).Update(ctx, newPod, metav1.UpdateOptions{})
  4075  			return err
  4076  		}); err != nil {
  4077  			t.Fatalf("Failed to update pod %s: %v", val.Name, err)
  4078  		}
  4079  	}
  4080  }
  4081  
  4082  func failTerminatingPods(ctx context.Context, t *testing.T, clientSet clientset.Interface, namespace string) {
  4083  	t.Helper()
  4084  	pods, err := clientSet.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{})
  4085  	if err != nil {
  4086  		t.Fatalf("Failed to list pods: %v", err)
  4087  	}
  4088  	var terminatingPods []v1.Pod
  4089  	for _, pod := range pods.Items {
  4090  		if pod.DeletionTimestamp != nil {
  4091  			pod.Status.Phase = v1.PodFailed
  4092  			terminatingPods = append(terminatingPods, pod)
  4093  		}
  4094  	}
  4095  	_, err = updatePodStatuses(ctx, clientSet, terminatingPods)
  4096  	if err != nil {
  4097  		t.Fatalf("Failed to update pod statuses: %v", err)
  4098  	}
  4099  }
  4100  

View as plain text